channels-console 0.2.0

Real-time monitoring and metrics for Rust channels.
Documentation
use futures_channel::mpsc;
use futures_channel::mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender};
use futures_channel::oneshot;
use std::mem;

use crate::RT;
use crate::{init_stats_state, ChannelType, StatsEvent};

/// Wrap the inner futures channel with proxy ends. Returns (outer_tx, outer_rx).
/// All messages pass through the two forwarders.
pub(crate) fn wrap_channel<T: Send + 'static>(
    inner: (Sender<T>, Receiver<T>),
    channel_id: &'static str,
    label: Option<&'static str>,
    capacity: usize,
) -> (Sender<T>, Receiver<T>) {
    let (mut inner_tx, mut inner_rx) = inner;
    let type_name = std::any::type_name::<T>();

    let (outer_tx, mut to_inner_rx) = mpsc::channel::<T>(capacity);
    let (mut from_inner_tx, outer_rx) = mpsc::channel::<T>(capacity);

    let (stats_tx, _) = init_stats_state();

    let _ = stats_tx.send(StatsEvent::Created {
        id: channel_id,
        display_label: label,
        channel_type: ChannelType::Bounded(capacity),
        type_name,
        type_size: mem::size_of::<T>(),
    });

    let stats_tx_send = stats_tx.clone();
    let stats_tx_recv = stats_tx.clone();

    // Create a signal channel to notify send-forwarder when outer_rx is closed
    let (close_signal_tx, mut close_signal_rx) = tokio::sync::oneshot::channel::<()>();

    // Forward outer -> inner (proxy the send path)
    RT.spawn(async move {
        use futures_util::stream::StreamExt;
        loop {
            tokio::select! {
                msg = to_inner_rx.next() => {
                    match msg {
                        Some(msg) => {
                            if inner_tx.try_send(msg).is_err() {
                                to_inner_rx.close();
                                break;
                            }
                            let _ = stats_tx_send.send(StatsEvent::MessageSent { id: channel_id });
                        }
                        None => break, // Outer sender dropped
                    }
                }
                _ = &mut close_signal_rx => {
                    // Outer receiver was closed/dropped, close our receiver to reject further sends
                    to_inner_rx.close();
                    break;
                }
            }
        }
        // Channel is closed
        let _ = stats_tx_send.send(StatsEvent::Closed { id: channel_id });
    });

    // Forward inner -> outer (proxy the recv path)
    RT.spawn(async move {
        use futures_util::stream::StreamExt;
        while let Some(msg) = inner_rx.next().await {
            if from_inner_tx.try_send(msg).is_ok() {
                let _ = stats_tx_recv.send(StatsEvent::MessageReceived { id: channel_id });
            } else {
                // Outer receiver was closed
                let _ = close_signal_tx.send(());
                break;
            }
        }
        // Channel is closed (either inner sender dropped or outer receiver closed)
        let _ = stats_tx_recv.send(StatsEvent::Closed { id: channel_id });
    });

    (outer_tx, outer_rx)
}

/// Wrap an unbounded futures channel with proxy ends. Returns (outer_tx, outer_rx).
pub(crate) fn wrap_unbounded<T: Send + 'static>(
    inner: (UnboundedSender<T>, UnboundedReceiver<T>),
    channel_id: &'static str,
    label: Option<&'static str>,
) -> (UnboundedSender<T>, UnboundedReceiver<T>) {
    let (inner_tx, mut inner_rx) = inner;
    let type_name = std::any::type_name::<T>();

    let (outer_tx, mut to_inner_rx) = mpsc::unbounded::<T>();
    let (from_inner_tx, outer_rx) = mpsc::unbounded::<T>();

    let (stats_tx, _) = init_stats_state();

    let _ = stats_tx.send(StatsEvent::Created {
        id: channel_id,
        display_label: label,
        channel_type: ChannelType::Unbounded,
        type_name,
        type_size: mem::size_of::<T>(),
    });

    let stats_tx_send = stats_tx.clone();
    let stats_tx_recv = stats_tx.clone();

    // Create a signal channel to notify send-forwarder when outer_rx is closed
    let (close_signal_tx, mut close_signal_rx) = tokio::sync::oneshot::channel::<()>();

    // Forward outer -> inner (proxy the send path)
    RT.spawn(async move {
        use futures_util::stream::StreamExt;
        loop {
            tokio::select! {
                msg = to_inner_rx.next() => {
                    match msg {
                        Some(msg) => {
                            if inner_tx.unbounded_send(msg).is_err() {
                                to_inner_rx.close();
                                break;
                            }
                            let _ = stats_tx_send.send(StatsEvent::MessageSent { id: channel_id });
                        }
                        None => break, // Outer sender dropped
                    }
                }
                _ = &mut close_signal_rx => {
                    // Outer receiver was closed/dropped, close our receiver to reject further sends
                    to_inner_rx.close();
                    break;
                }
            }
        }
        // Channel is closed
        let _ = stats_tx_send.send(StatsEvent::Closed { id: channel_id });
    });

    // Forward inner -> outer (proxy the recv path)
    RT.spawn(async move {
        use futures_util::stream::StreamExt;
        while let Some(msg) = inner_rx.next().await {
            if from_inner_tx.unbounded_send(msg).is_ok() {
                let _ = stats_tx_recv.send(StatsEvent::MessageReceived { id: channel_id });
            } else {
                // Outer receiver was closed
                let _ = close_signal_tx.send(());
                break;
            }
        }
        // Channel is closed (either inner sender dropped or outer receiver closed)
        let _ = stats_tx_recv.send(StatsEvent::Closed { id: channel_id });
    });

    (outer_tx, outer_rx)
}

/// Wrap a oneshot futures channel with proxy ends. Returns (outer_tx, outer_rx).
pub(crate) fn wrap_oneshot<T: Send + 'static>(
    inner: (oneshot::Sender<T>, oneshot::Receiver<T>),
    channel_id: &'static str,
    label: Option<&'static str>,
) -> (oneshot::Sender<T>, oneshot::Receiver<T>) {
    let (inner_tx, inner_rx) = inner;
    let type_name = std::any::type_name::<T>();

    let (outer_tx, outer_rx_proxy) = oneshot::channel::<T>();
    let (inner_tx_proxy, outer_rx) = oneshot::channel::<T>();

    let (stats_tx, _) = init_stats_state();

    let _ = stats_tx.send(StatsEvent::Created {
        id: channel_id,
        display_label: label,
        channel_type: ChannelType::Oneshot,
        type_name,
        type_size: mem::size_of::<T>(),
    });

    let stats_tx_send = stats_tx.clone();
    let stats_tx_recv = stats_tx;

    // Create a signal channel to notify send-forwarder when outer_rx is closed
    let (close_signal_tx, mut close_signal_rx) = tokio::sync::oneshot::channel::<()>();

    // Monitor outer receiver and drop inner receiver when outer is dropped
    RT.spawn(async move {
        let mut inner_rx = Some(inner_rx);
        let mut message_received = false;
        tokio::select! {
            msg = async { inner_rx.take().unwrap().await }, if inner_rx.is_some() => {
                // Message received from inner
                match msg {
                    Ok(msg) => {
                        if inner_tx_proxy.send(msg).is_ok() {
                            let _ = stats_tx_recv.send(StatsEvent::MessageReceived { id: channel_id });
                            message_received = true;
                        }
                    }
                    Err(_) => {
                        // Inner sender was dropped without sending
                    }
                }
            }
            _ = async {
                // Check if outer receiver is canceled
                loop {
                    tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
                    if inner_tx_proxy.is_canceled() {
                        break;
                    }
                }
            } => {
                // Outer receiver was dropped - drop inner_rx to make sends fail
                drop(inner_rx);
                let _ = close_signal_tx.send(());
            }
        }
        // Only send Closed if message was not successfully received
        if !message_received {
            let _ = stats_tx_recv.send(StatsEvent::Closed { id: channel_id });
        }
    });

    // Forward outer -> inner (proxy the send path)
    RT.spawn(async move {
        let mut message_sent = false;
        tokio::select! {
            msg = outer_rx_proxy => {
                match msg {
                    Ok(msg) => {
                        if inner_tx.send(msg).is_ok() {
                            let _ = stats_tx_send.send(StatsEvent::MessageSent { id: channel_id });
                            let _ = stats_tx_send.send(StatsEvent::Notified { id: channel_id });
                            message_sent = true;
                        }
                    }
                    Err(_) => {
                        // Outer sender was dropped without sending
                    }
                }
            }
            _ = &mut close_signal_rx => {
                // Outer receiver was closed/dropped before send
            }
        }
        // Only send Closed if message was not successfully sent
        if !message_sent {
            let _ = stats_tx_send.send(StatsEvent::Closed { id: channel_id });
        }
    });

    (outer_tx, outer_rx)
}

use crate::Instrument;

impl<T: Send + 'static> Instrument
    for (
        futures_channel::mpsc::Sender<T>,
        futures_channel::mpsc::Receiver<T>,
    )
{
    type Output = (
        futures_channel::mpsc::Sender<T>,
        futures_channel::mpsc::Receiver<T>,
    );
    fn instrument(
        self,
        channel_id: &'static str,
        label: Option<&'static str>,
        capacity: Option<usize>,
    ) -> Self::Output {
        if capacity.is_none() {
            panic!("Capacity is required for bounded futures channels, because they don't expose their capacity in a public API");
        }
        wrap_channel(self, channel_id, label, capacity.unwrap())
    }
}

impl<T: Send + 'static> Instrument
    for (
        futures_channel::mpsc::UnboundedSender<T>,
        futures_channel::mpsc::UnboundedReceiver<T>,
    )
{
    type Output = (
        futures_channel::mpsc::UnboundedSender<T>,
        futures_channel::mpsc::UnboundedReceiver<T>,
    );
    fn instrument(
        self,
        channel_id: &'static str,
        label: Option<&'static str>,
        _capacity: Option<usize>,
    ) -> Self::Output {
        wrap_unbounded(self, channel_id, label)
    }
}

impl<T: Send + 'static> Instrument
    for (
        futures_channel::oneshot::Sender<T>,
        futures_channel::oneshot::Receiver<T>,
    )
{
    type Output = (
        futures_channel::oneshot::Sender<T>,
        futures_channel::oneshot::Receiver<T>,
    );
    fn instrument(
        self,
        channel_id: &'static str,
        label: Option<&'static str>,
        _capacity: Option<usize>,
    ) -> Self::Output {
        wrap_oneshot(self, channel_id, label)
    }
}