async_inspect/channel/
mod.rs

1//! Tracked channel primitives
2//!
3//! This module provides drop-in replacements for Tokio's channel types
4//! (`mpsc`, `oneshot`, `broadcast`) that automatically track message flow
5//! and integrate with async-inspect's visualization.
6//!
7//! # Example
8//!
9//! ```rust,no_run
10//! use async_inspect::channel::mpsc;
11//!
12//! #[tokio::main]
13//! async fn main() {
14//!     let (tx, mut rx) = mpsc::channel(32, "commands");
15//!
16//!     tokio::spawn(async move {
17//!         tx.send("hello").await.unwrap();
18//!     });
19//!
20//!     let msg = rx.recv().await.unwrap();
21//!     println!("Received: {}", msg);
22//!
23//!     // Check channel metrics
24//!     let metrics = rx.metrics();
25//!     println!("Messages received: {}", metrics.received);
26//! }
27//! ```
28
29pub mod broadcast;
30pub mod mpsc;
31pub mod oneshot;
32
33use std::sync::atomic::{AtomicU64, Ordering};
34use std::time::{Duration, Instant};
35
36/// Metrics for a tracked channel
37#[derive(Debug, Clone, Default)]
38pub struct ChannelMetrics {
39    /// Total messages sent
40    pub sent: u64,
41    /// Total messages received
42    pub received: u64,
43    /// Messages currently buffered (approximate)
44    pub buffered: u64,
45    /// Number of times send blocked waiting for capacity
46    pub send_waits: u64,
47    /// Number of times recv blocked waiting for messages
48    pub recv_waits: u64,
49    /// Total time spent waiting to send
50    pub total_send_wait_time: Duration,
51    /// Total time spent waiting to receive
52    pub total_recv_wait_time: Duration,
53    /// Channel closed
54    pub closed: bool,
55}
56
57impl ChannelMetrics {
58    /// Calculate the send utilization (how often sends block)
59    #[must_use]
60    pub fn send_block_rate(&self) -> f64 {
61        if self.sent == 0 {
62            0.0
63        } else {
64            self.send_waits as f64 / self.sent as f64
65        }
66    }
67
68    /// Calculate the receive utilization (how often recvs block)
69    #[must_use]
70    pub fn recv_block_rate(&self) -> f64 {
71        if self.received == 0 {
72            0.0
73        } else {
74            self.recv_waits as f64 / self.received as f64
75        }
76    }
77}
78
79/// Internal metrics tracker for channels
80#[derive(Debug)]
81pub(crate) struct ChannelMetricsTracker {
82    sent: AtomicU64,
83    received: AtomicU64,
84    send_waits: AtomicU64,
85    recv_waits: AtomicU64,
86    total_send_wait_nanos: AtomicU64,
87    total_recv_wait_nanos: AtomicU64,
88    closed: std::sync::atomic::AtomicBool,
89}
90
91impl ChannelMetricsTracker {
92    pub fn new() -> Self {
93        Self {
94            sent: AtomicU64::new(0),
95            received: AtomicU64::new(0),
96            send_waits: AtomicU64::new(0),
97            recv_waits: AtomicU64::new(0),
98            total_send_wait_nanos: AtomicU64::new(0),
99            total_recv_wait_nanos: AtomicU64::new(0),
100            closed: std::sync::atomic::AtomicBool::new(false),
101        }
102    }
103
104    pub fn record_send(&self, wait_time: Option<Duration>) {
105        self.sent.fetch_add(1, Ordering::Relaxed);
106        if let Some(wait) = wait_time {
107            self.send_waits.fetch_add(1, Ordering::Relaxed);
108            self.total_send_wait_nanos
109                .fetch_add(wait.as_nanos() as u64, Ordering::Relaxed);
110        }
111    }
112
113    pub fn record_recv(&self, wait_time: Option<Duration>) {
114        self.received.fetch_add(1, Ordering::Relaxed);
115        if let Some(wait) = wait_time {
116            self.recv_waits.fetch_add(1, Ordering::Relaxed);
117            self.total_recv_wait_nanos
118                .fetch_add(wait.as_nanos() as u64, Ordering::Relaxed);
119        }
120    }
121
122    pub fn mark_closed(&self) {
123        self.closed.store(true, Ordering::Relaxed);
124    }
125
126    pub fn get_metrics(&self, buffered: u64) -> ChannelMetrics {
127        ChannelMetrics {
128            sent: self.sent.load(Ordering::Relaxed),
129            received: self.received.load(Ordering::Relaxed),
130            buffered,
131            send_waits: self.send_waits.load(Ordering::Relaxed),
132            recv_waits: self.recv_waits.load(Ordering::Relaxed),
133            total_send_wait_time: Duration::from_nanos(
134                self.total_send_wait_nanos.load(Ordering::Relaxed),
135            ),
136            total_recv_wait_time: Duration::from_nanos(
137                self.total_recv_wait_nanos.load(Ordering::Relaxed),
138            ),
139            closed: self.closed.load(Ordering::Relaxed),
140        }
141    }
142}
143
144impl Default for ChannelMetricsTracker {
145    fn default() -> Self {
146        Self::new()
147    }
148}
149
150/// Helper to measure wait time for channel operations
151pub(crate) struct WaitTimer {
152    start: Instant,
153    threshold: Duration,
154}
155
156impl WaitTimer {
157    pub fn start() -> Self {
158        Self {
159            start: Instant::now(),
160            threshold: Duration::from_micros(10),
161        }
162    }
163
164    pub fn elapsed_if_waited(&self) -> Option<Duration> {
165        let elapsed = self.start.elapsed();
166        if elapsed > self.threshold {
167            Some(elapsed)
168        } else {
169            None
170        }
171    }
172}