channels_console/
lib.rs

1use crossbeam_channel::{unbounded, Sender as CbSender};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use std::sync::{Arc, OnceLock, RwLock};
5
6pub mod channels_guard;
7pub use channels_guard::{ChannelsGuard, ChannelsGuardBuilder};
8
9use crate::http_api::start_metrics_server;
10mod http_api;
11mod wrappers;
12
13/// Type of a channel.
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum ChannelType {
16    Bounded(usize),
17    Unbounded,
18    Oneshot,
19}
20
21impl std::fmt::Display for ChannelType {
22    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23        match self {
24            ChannelType::Bounded(size) => write!(f, "bounded[{}]", size),
25            ChannelType::Unbounded => write!(f, "unbounded"),
26            ChannelType::Oneshot => write!(f, "oneshot"),
27        }
28    }
29}
30
31impl Serialize for ChannelType {
32    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
33    where
34        S: serde::Serializer,
35    {
36        serializer.serialize_str(&self.to_string())
37    }
38}
39
40impl<'de> Deserialize<'de> for ChannelType {
41    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
42    where
43        D: serde::Deserializer<'de>,
44    {
45        let s = String::deserialize(deserializer)?;
46
47        match s.as_str() {
48            "unbounded" => Ok(ChannelType::Unbounded),
49            "oneshot" => Ok(ChannelType::Oneshot),
50            _ => {
51                // try: bounded[123]
52                if let Some(inner) = s.strip_prefix("bounded[").and_then(|x| x.strip_suffix(']')) {
53                    let size = inner
54                        .parse()
55                        .map_err(|_| serde::de::Error::custom("invalid bounded size"))?;
56                    Ok(ChannelType::Bounded(size))
57                } else {
58                    Err(serde::de::Error::custom("invalid channel type"))
59                }
60            }
61        }
62    }
63}
64
65/// Format of the output produced by ChannelsGuard on drop.
66#[derive(Clone, Copy, Debug, Default)]
67pub enum Format {
68    #[default]
69    Table,
70    Json,
71    JsonPretty,
72}
73
74/// State of a instrumented channel.
75#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
76pub enum ChannelState {
77    #[default]
78    Active,
79    Closed,
80    Full,
81    Notified,
82}
83
84impl std::fmt::Display for ChannelState {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        write!(f, "{}", self.as_str())
87    }
88}
89
90impl ChannelState {
91    pub fn as_str(&self) -> &'static str {
92        match self {
93            ChannelState::Active => "active",
94            ChannelState::Closed => "closed",
95            ChannelState::Full => "full",
96            ChannelState::Notified => "notified",
97        }
98    }
99}
100
101impl Serialize for ChannelState {
102    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
103    where
104        S: serde::Serializer,
105    {
106        serializer.serialize_str(self.as_str())
107    }
108}
109
110impl<'de> Deserialize<'de> for ChannelState {
111    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
112    where
113        D: serde::Deserializer<'de>,
114    {
115        let s = String::deserialize(deserializer)?;
116        match s.as_str() {
117            "active" => Ok(ChannelState::Active),
118            "closed" => Ok(ChannelState::Closed),
119            "full" => Ok(ChannelState::Full),
120            "notified" => Ok(ChannelState::Notified),
121            _ => Err(serde::de::Error::custom("invalid channel state")),
122        }
123    }
124}
125
126/// Statistics for a single instrumented channel.
127#[derive(Debug, Clone)]
128pub(crate) struct ChannelStats {
129    pub(crate) id: &'static str,
130    pub(crate) label: Option<&'static str>,
131    pub(crate) channel_type: ChannelType,
132    pub(crate) state: ChannelState,
133    pub(crate) sent_count: u64,
134    pub(crate) received_count: u64,
135    pub(crate) type_name: &'static str,
136    pub(crate) type_size: usize,
137}
138
139impl ChannelStats {
140    pub fn queued(&self) -> u64 {
141        self.sent_count.saturating_sub(self.received_count)
142    }
143
144    pub fn total_bytes(&self) -> u64 {
145        self.sent_count * self.type_size as u64
146    }
147
148    pub fn queued_bytes(&self) -> u64 {
149        self.queued() * self.type_size as u64
150    }
151}
152
153/// Serializable version of channel statistics for JSON responses.
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct SerializableChannelStats {
156    pub id: String,
157    pub label: String,
158    pub channel_type: ChannelType,
159    pub state: ChannelState,
160    pub sent_count: u64,
161    pub received_count: u64,
162    pub queued: u64,
163    pub type_name: String,
164    pub type_size: usize,
165    pub total_bytes: u64,
166    pub queued_bytes: u64,
167}
168
169impl From<&ChannelStats> for SerializableChannelStats {
170    fn from(stats: &ChannelStats) -> Self {
171        let label = resolve_label(stats.id, stats.label);
172        Self {
173            id: stats.id.to_string(),
174            label,
175            channel_type: stats.channel_type,
176            state: stats.state,
177            sent_count: stats.sent_count,
178            received_count: stats.received_count,
179            queued: stats.queued(),
180            type_name: stats.type_name.to_string(),
181            type_size: stats.type_size,
182            total_bytes: stats.total_bytes(),
183            queued_bytes: stats.queued_bytes(),
184        }
185    }
186}
187
188impl ChannelStats {
189    fn new(
190        id: &'static str,
191        label: Option<&'static str>,
192        channel_type: ChannelType,
193        type_name: &'static str,
194        type_size: usize,
195    ) -> Self {
196        Self {
197            id,
198            label,
199            channel_type,
200            state: ChannelState::default(),
201            sent_count: 0,
202            received_count: 0,
203            type_name,
204            type_size,
205        }
206    }
207
208    /// Update the channel state based on sent/received counts.
209    /// Sets state to Full if sent > received, otherwise Active (unless explicitly closed).
210    fn update_state(&mut self) {
211        if self.state == ChannelState::Closed || self.state == ChannelState::Notified {
212            return;
213        }
214
215        if self.sent_count > self.received_count {
216            self.state = ChannelState::Full;
217        } else {
218            self.state = ChannelState::Active;
219        }
220    }
221}
222
223/// Events sent to the background statistics collection thread.
224#[derive(Debug)]
225pub(crate) enum StatsEvent {
226    Created {
227        id: &'static str,
228        display_label: Option<&'static str>,
229        channel_type: ChannelType,
230        type_name: &'static str,
231        type_size: usize,
232    },
233    MessageSent {
234        id: &'static str,
235    },
236    MessageReceived {
237        id: &'static str,
238    },
239    Closed {
240        id: &'static str,
241    },
242    #[allow(dead_code)]
243    Notified {
244        id: &'static str,
245    },
246}
247
248type StatsState = (
249    CbSender<StatsEvent>,
250    Arc<RwLock<HashMap<&'static str, ChannelStats>>>,
251);
252
253/// Global state for statistics collection.
254static STATS_STATE: OnceLock<StatsState> = OnceLock::new();
255
256/// Initialize the statistics collection system (called on first instrumented channel).
257/// Returns a reference to the global state.
258fn init_stats_state() -> &'static StatsState {
259    STATS_STATE.get_or_init(|| {
260        let (tx, rx) = unbounded::<StatsEvent>();
261        let stats_map = Arc::new(RwLock::new(HashMap::<&'static str, ChannelStats>::new()));
262        let stats_map_clone = Arc::clone(&stats_map);
263
264        std::thread::Builder::new()
265            .name("channel-stats-collector".into())
266            .spawn(move || {
267                while let Ok(event) = rx.recv() {
268                    let mut stats = stats_map_clone.write().unwrap();
269                    match event {
270                        StatsEvent::Created {
271                            id: key,
272                            display_label,
273                            channel_type,
274                            type_name,
275                            type_size,
276                        } => {
277                            stats.insert(
278                                key,
279                                ChannelStats::new(
280                                    key,
281                                    display_label,
282                                    channel_type,
283                                    type_name,
284                                    type_size,
285                                ),
286                            );
287                        }
288                        StatsEvent::MessageSent { id } => {
289                            if let Some(channel_stats) = stats.get_mut(id) {
290                                channel_stats.sent_count += 1;
291                                channel_stats.update_state();
292                            }
293                        }
294                        StatsEvent::MessageReceived { id } => {
295                            if let Some(channel_stats) = stats.get_mut(id) {
296                                channel_stats.received_count += 1;
297                                channel_stats.update_state();
298                            }
299                        }
300                        StatsEvent::Closed { id } => {
301                            if let Some(channel_stats) = stats.get_mut(id) {
302                                channel_stats.state = ChannelState::Closed;
303                            }
304                        }
305                        StatsEvent::Notified { id } => {
306                            if let Some(channel_stats) = stats.get_mut(id) {
307                                channel_stats.state = ChannelState::Notified;
308                            }
309                        }
310                    }
311                }
312            })
313            .expect("Failed to spawn channel-stats-collector thread");
314
315        // Spawn the metrics HTTP server in the background
316        // Check environment variable for custom port, default to 6770
317        let port = std::env::var("CHANNELS_CONSOLE_METRICS_PORT")
318            .ok()
319            .and_then(|p| p.parse::<u16>().ok())
320            .unwrap_or(6770);
321        let addr = format!("127.0.0.1:{}", port);
322
323        std::thread::spawn(move || {
324            start_metrics_server(&addr);
325        });
326
327        (tx, stats_map)
328    })
329}
330
331fn resolve_label(id: &'static str, provided: Option<&'static str>) -> String {
332    if let Some(l) = provided {
333        return l.to_string();
334    }
335    if let Some(pos) = id.rfind(':') {
336        let (path, line_part) = id.split_at(pos);
337        let line = &line_part[1..];
338        format!("{}:{}", extract_filename(path), line)
339    } else {
340        extract_filename(id)
341    }
342}
343
344fn extract_filename(path: &str) -> String {
345    let components: Vec<&str> = path.split('/').collect();
346    if components.len() >= 2 {
347        format!(
348            "{}/{}",
349            components[components.len() - 2],
350            components[components.len() - 1]
351        )
352    } else {
353        path.to_string()
354    }
355}
356
357/// Format bytes into human-readable units (B, KB, MB, GB, TB).
358pub fn format_bytes(bytes: u64) -> String {
359    if bytes == 0 {
360        return "0 B".to_string();
361    }
362
363    const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
364    let mut size = bytes as f64;
365    let mut unit_idx = 0;
366
367    while size >= 1024.0 && unit_idx < UNITS.len() - 1 {
368        size /= 1024.0;
369        unit_idx += 1;
370    }
371
372    if unit_idx == 0 {
373        format!("{} {}", bytes, UNITS[unit_idx])
374    } else {
375        format!("{:.1} {}", size, UNITS[unit_idx])
376    }
377}
378
379/// Trait for instrumenting channels.
380///
381/// This trait is not intended for direct use. Use the `instrument!` macro instead.
382#[doc(hidden)]
383pub trait Instrument {
384    type Output;
385    fn instrument(
386        self,
387        channel_id: &'static str,
388        label: Option<&'static str>,
389        capacity: Option<usize>,
390    ) -> Self::Output;
391}
392
393cfg_if::cfg_if! {
394    if #[cfg(any(feature = "tokio", feature = "futures"))] {
395        use std::sync::LazyLock;
396        pub static RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
397            tokio::runtime::Builder::new_multi_thread()
398                .enable_time()
399                .build()
400                .unwrap()
401        });
402    }
403}
404
405/// Instrument a channel creation to wrap it with debugging proxies.
406/// Currently only supports bounded, unbounded and oneshot channels.
407///
408/// # Examples
409///
410/// ```
411/// use tokio::sync::mpsc;
412/// use channels_console::instrument;
413///
414/// #[tokio::main]
415/// async fn main() {
416///
417///    // Create channels normally
418///    let (tx, rx) = mpsc::channel::<String>(100);
419///
420///    // Instrument them only when the feature is enabled
421///    #[cfg(feature = "channels-console")]
422///    let (tx, rx) = channels_console::instrument!((tx, rx));
423///
424///    // The channel works exactly the same way
425///    tx.send("Hello".to_string()).await.unwrap();
426/// }
427/// ```
428///
429/// By default, channels are labeled with their file location and line number (e.g., `src/worker.rs:25`). You can provide custom labels for easier identification:
430///
431/// ```rust,no_run
432/// use tokio::sync::mpsc;
433/// use channels_console::instrument;
434/// let (tx, rx) = mpsc::channel::<String>(10);
435/// #[cfg(feature = "channels-console")]
436/// let (tx, rx) = channels_console::instrument!((tx, rx), label = "task-queue");
437/// ```
438///
439/// # Important: Capacity Parameter
440///
441/// **For `std::sync::mpsc` and `futures::channel::mpsc` bounded channels**, you **must** specify the `capacity` parameter
442/// because their APIs don't expose the capacity after creation:
443///
444/// ```rust,no_run
445/// use std::sync::mpsc;
446/// use channels_console::instrument;
447///
448/// // std::sync::mpsc::sync_channel - MUST specify capacity
449/// let (tx, rx) = mpsc::sync_channel::<String>(10);
450/// let (tx, rx) = instrument!((tx, rx), capacity = 10);
451///
452/// // With label
453/// let (tx, rx) = mpsc::sync_channel::<String>(10);
454/// let (tx, rx) = instrument!((tx, rx), label = "my-channel", capacity = 10);
455/// ```
456///
457/// Tokio channels don't require this because their capacity is accessible from the channel handles.
458///
459#[macro_export]
460macro_rules! instrument {
461    ($expr:expr) => {{
462        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
463        $crate::Instrument::instrument($expr, CHANNEL_ID, None, None)
464    }};
465
466    ($expr:expr, label = $label:literal) => {{
467        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
468        $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label), None)
469    }};
470
471    ($expr:expr, capacity = $capacity:expr) => {{
472        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
473        const _: usize = $capacity;
474        $crate::Instrument::instrument($expr, CHANNEL_ID, None, Some($capacity))
475    }};
476
477    ($expr:expr, label = $label:literal, capacity = $capacity:expr) => {{
478        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
479        const _: usize = $capacity;
480        $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label), Some($capacity))
481    }};
482
483    ($expr:expr, capacity = $capacity:expr, label = $label:literal) => {{
484        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
485        const _: usize = $capacity;
486        $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label), Some($capacity))
487    }};
488}
489
490fn get_channel_stats() -> HashMap<&'static str, ChannelStats> {
491    if let Some((_, stats_map)) = STATS_STATE.get() {
492        stats_map.read().unwrap().clone()
493    } else {
494        HashMap::new()
495    }
496}
497
498fn get_serializable_stats() -> Vec<SerializableChannelStats> {
499    let mut stats: Vec<SerializableChannelStats> = get_channel_stats()
500        .values()
501        .map(SerializableChannelStats::from)
502        .collect();
503
504    stats.sort_by(|a, b| a.id.cmp(&b.id));
505    stats
506}