async_inspect/channel/
mod.rs1pub mod broadcast;
30pub mod mpsc;
31pub mod oneshot;
32
33use std::sync::atomic::{AtomicU64, Ordering};
34use std::time::{Duration, Instant};
35
36#[derive(Debug, Clone, Default)]
38pub struct ChannelMetrics {
39 pub sent: u64,
41 pub received: u64,
43 pub buffered: u64,
45 pub send_waits: u64,
47 pub recv_waits: u64,
49 pub total_send_wait_time: Duration,
51 pub total_recv_wait_time: Duration,
53 pub closed: bool,
55}
56
57impl ChannelMetrics {
58 #[must_use]
60 pub fn send_block_rate(&self) -> f64 {
61 if self.sent == 0 {
62 0.0
63 } else {
64 self.send_waits as f64 / self.sent as f64
65 }
66 }
67
68 #[must_use]
70 pub fn recv_block_rate(&self) -> f64 {
71 if self.received == 0 {
72 0.0
73 } else {
74 self.recv_waits as f64 / self.received as f64
75 }
76 }
77}
78
79#[derive(Debug)]
81pub(crate) struct ChannelMetricsTracker {
82 sent: AtomicU64,
83 received: AtomicU64,
84 send_waits: AtomicU64,
85 recv_waits: AtomicU64,
86 total_send_wait_nanos: AtomicU64,
87 total_recv_wait_nanos: AtomicU64,
88 closed: std::sync::atomic::AtomicBool,
89}
90
91impl ChannelMetricsTracker {
92 pub fn new() -> Self {
93 Self {
94 sent: AtomicU64::new(0),
95 received: AtomicU64::new(0),
96 send_waits: AtomicU64::new(0),
97 recv_waits: AtomicU64::new(0),
98 total_send_wait_nanos: AtomicU64::new(0),
99 total_recv_wait_nanos: AtomicU64::new(0),
100 closed: std::sync::atomic::AtomicBool::new(false),
101 }
102 }
103
104 pub fn record_send(&self, wait_time: Option<Duration>) {
105 self.sent.fetch_add(1, Ordering::Relaxed);
106 if let Some(wait) = wait_time {
107 self.send_waits.fetch_add(1, Ordering::Relaxed);
108 self.total_send_wait_nanos
109 .fetch_add(wait.as_nanos() as u64, Ordering::Relaxed);
110 }
111 }
112
113 pub fn record_recv(&self, wait_time: Option<Duration>) {
114 self.received.fetch_add(1, Ordering::Relaxed);
115 if let Some(wait) = wait_time {
116 self.recv_waits.fetch_add(1, Ordering::Relaxed);
117 self.total_recv_wait_nanos
118 .fetch_add(wait.as_nanos() as u64, Ordering::Relaxed);
119 }
120 }
121
122 pub fn mark_closed(&self) {
123 self.closed.store(true, Ordering::Relaxed);
124 }
125
126 pub fn get_metrics(&self, buffered: u64) -> ChannelMetrics {
127 ChannelMetrics {
128 sent: self.sent.load(Ordering::Relaxed),
129 received: self.received.load(Ordering::Relaxed),
130 buffered,
131 send_waits: self.send_waits.load(Ordering::Relaxed),
132 recv_waits: self.recv_waits.load(Ordering::Relaxed),
133 total_send_wait_time: Duration::from_nanos(
134 self.total_send_wait_nanos.load(Ordering::Relaxed),
135 ),
136 total_recv_wait_time: Duration::from_nanos(
137 self.total_recv_wait_nanos.load(Ordering::Relaxed),
138 ),
139 closed: self.closed.load(Ordering::Relaxed),
140 }
141 }
142}
143
144impl Default for ChannelMetricsTracker {
145 fn default() -> Self {
146 Self::new()
147 }
148}
149
150pub(crate) struct WaitTimer {
152 start: Instant,
153 threshold: Duration,
154}
155
156impl WaitTimer {
157 pub fn start() -> Self {
158 Self {
159 start: Instant::now(),
160 threshold: Duration::from_micros(10),
161 }
162 }
163
164 pub fn elapsed_if_waited(&self) -> Option<Duration> {
165 let elapsed = self.start.elapsed();
166 if elapsed > self.threshold {
167 Some(elapsed)
168 } else {
169 None
170 }
171 }
172}