tokio-console 0.1.7

The Tokio console: a debugger for async Rust.
use console_api::instrument::{
    instrument_client::InstrumentClient, InstrumentRequest, PauseRequest, ResumeRequest,
    TaskDetailsRequest, Update,
use console_api::tasks::TaskDetails;
use futures::stream::StreamExt;
use std::{error::Error, pin::Pin, time::Duration};
use tonic::{transport::Channel, transport::Uri, Streaming};

pub struct Connection {
    target: Uri,
    state: State,

// clippy doesn't like that the "connected" case is much larger than the
// disconnected case, and suggests boxing the connected side's stream.
// however, this is rarely disconnected; it's normally connected. boxing the
// stream just adds a heap pointer dereference, slightly penalizing polling
// the stream in most cases. so, don't listen to clippy on this.
enum State {
    Connected {
        client: InstrumentClient<Channel>,
        stream: Box<Streaming<Update>>,

macro_rules! with_client {
    ($me:ident, $client:ident, $block:expr) => ({
        loop {
            match $me.state {
                State::Connected { client: ref mut $client, .. } => {
                    match $block {
                        Ok(resp) => break Ok(resp),
                        // If the error is a `h2::Error`, that indicates
                        // something went wrong at the connection level, rather
                        // than the server returning an error code. In that
                        // case, let's try reconnecting...
                        Err(error) if error.source().iter().any(|src|<h2::Error>()) => {
                                error = %error,
                                "connection error sending command"
                            $me.state = State::Disconnected(Self::BACKOFF);
                        // Otherwise, return the error.
                        Err(e) => {
                            break Err(e);
                State::Disconnected(_) => $me.connect().await,

impl Connection {
    const BACKOFF: Duration = Duration::from_millis(500);
    pub fn new(target: Uri) -> Self {
        Self {
            state: State::Disconnected(Duration::from_secs(0)),

    async fn connect(&mut self) {
        const MAX_BACKOFF: Duration = Duration::from_secs(5);

        while let State::Disconnected(backoff) = self.state {
            if backoff == Duration::from_secs(0) {
                tracing::debug!(to =, "connecting");
            } else {
                tracing::debug!(reconnect_in = ?backoff, "reconnecting");
            let try_connect = async {
                let mut client = InstrumentClient::connect(;
                let request = tonic::Request::new(InstrumentRequest {});
                let stream = Box::new(client.watch_updates(request).await?.into_inner());
                Ok::<State, Box<dyn Error + Send + Sync>>(State::Connected { client, stream })
            self.state = match try_connect.await {
                Ok(connected) => {
                    tracing::debug!("connected successfully!");
                Err(error) => {
                    tracing::warn!(%error, "error connecting");
                    let backoff = std::cmp::max(backoff + Self::BACKOFF, MAX_BACKOFF);

    pub async fn next_update(&mut self) -> Update {
        loop {
            match self.state {
                State::Connected { ref mut stream, .. } => match Pin::new(stream).next().await {
                    Some(Ok(update)) => return update,
                    Some(Err(status)) => {
                        tracing::warn!(%status, "error from stream");
                        self.state = State::Disconnected(Self::BACKOFF);
                    None => {
                        tracing::error!("stream closed by server");
                        self.state = State::Disconnected(Self::BACKOFF);
                State::Disconnected(_) => self.connect().await,

    pub async fn watch_details(
        &mut self,
        task_id: u64,
    ) -> Result<Streaming<TaskDetails>, tonic::Status> {
        with_client!(self, client, {
            let request = tonic::Request::new(TaskDetailsRequest {
                id: Some(task_id.into()),
        .map(|watch| watch.into_inner())

    pub async fn pause(&mut self) {
        let res = with_client!(self, client, {
            let request = tonic::Request::new(PauseRequest {});

        if let Err(e) = res {
            tracing::error!(error = %e, "rpc error sending pause command");

    pub async fn resume(&mut self) {
        let res = with_client!(self, client, {
            let request = tonic::Request::new(ResumeRequest {});

        if let Err(e) = res {
            tracing::error!(error = %e, "rpc error sending resume command");

    pub fn render(&self, styles: &crate::view::Styles) -> tui::text::Spans {
        use tui::{
            style::{Color, Modifier},
            text::{Span, Spans},
        let state = match self.state {
            State::Connected { .. } => Span::styled(
            State::Disconnected(d) if d == Duration::from_secs(0) => Span::styled(
            State::Disconnected(d) => Span::styled(
                format!("(RECONNECTING IN {:?})", d),
            Span::raw("connection: "),
            Span::raw(" "),