channels_console/
lib.rs

1use crossbeam_channel::{unbounded, Sender as CbSender};
2use serde::{Deserialize, Serialize};
3use std::collections::{HashMap, VecDeque};
4use std::sync::{Arc, OnceLock, RwLock};
5use std::time::SystemTime;
6
7pub mod channels_guard;
8pub use channels_guard::{ChannelsGuard, ChannelsGuardBuilder};
9
10use crate::http_api::start_metrics_server;
11mod http_api;
12mod wrappers;
13
14/// A single log entry for a message sent or received.
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct LogEntry {
17    pub index: u64,
18    pub timestamp: u64,
19    pub message: Option<String>,
20}
21
22impl LogEntry {
23    pub(crate) fn new(index: u64, timestamp: SystemTime, message: Option<String>) -> Self {
24        let timestamp_secs = timestamp
25            .duration_since(SystemTime::UNIX_EPOCH)
26            .unwrap_or_default()
27            .as_secs();
28        Self {
29            index,
30            timestamp: timestamp_secs,
31            message,
32        }
33    }
34}
35
36/// Type of a channel.
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum ChannelType {
39    Bounded(usize),
40    Unbounded,
41    Oneshot,
42}
43
44impl std::fmt::Display for ChannelType {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        match self {
47            ChannelType::Bounded(size) => write!(f, "bounded[{}]", size),
48            ChannelType::Unbounded => write!(f, "unbounded"),
49            ChannelType::Oneshot => write!(f, "oneshot"),
50        }
51    }
52}
53
54impl Serialize for ChannelType {
55    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
56    where
57        S: serde::Serializer,
58    {
59        serializer.serialize_str(&self.to_string())
60    }
61}
62
63impl<'de> Deserialize<'de> for ChannelType {
64    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
65    where
66        D: serde::Deserializer<'de>,
67    {
68        let s = String::deserialize(deserializer)?;
69
70        match s.as_str() {
71            "unbounded" => Ok(ChannelType::Unbounded),
72            "oneshot" => Ok(ChannelType::Oneshot),
73            _ => {
74                // try: bounded[123]
75                if let Some(inner) = s.strip_prefix("bounded[").and_then(|x| x.strip_suffix(']')) {
76                    let size = inner
77                        .parse()
78                        .map_err(|_| serde::de::Error::custom("invalid bounded size"))?;
79                    Ok(ChannelType::Bounded(size))
80                } else {
81                    Err(serde::de::Error::custom("invalid channel type"))
82                }
83            }
84        }
85    }
86}
87
88/// Format of the output produced by ChannelsGuard on drop.
89#[derive(Clone, Copy, Debug, Default)]
90pub enum Format {
91    #[default]
92    Table,
93    Json,
94    JsonPretty,
95}
96
97/// State of a instrumented channel.
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
99pub enum ChannelState {
100    #[default]
101    Active,
102    Closed,
103    Full,
104    Notified,
105}
106
107impl std::fmt::Display for ChannelState {
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109        write!(f, "{}", self.as_str())
110    }
111}
112
113impl ChannelState {
114    pub fn as_str(&self) -> &'static str {
115        match self {
116            ChannelState::Active => "active",
117            ChannelState::Closed => "closed",
118            ChannelState::Full => "full",
119            ChannelState::Notified => "notified",
120        }
121    }
122}
123
124impl Serialize for ChannelState {
125    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
126    where
127        S: serde::Serializer,
128    {
129        serializer.serialize_str(self.as_str())
130    }
131}
132
133impl<'de> Deserialize<'de> for ChannelState {
134    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
135    where
136        D: serde::Deserializer<'de>,
137    {
138        let s = String::deserialize(deserializer)?;
139        match s.as_str() {
140            "active" => Ok(ChannelState::Active),
141            "closed" => Ok(ChannelState::Closed),
142            "full" => Ok(ChannelState::Full),
143            "notified" => Ok(ChannelState::Notified),
144            _ => Err(serde::de::Error::custom("invalid channel state")),
145        }
146    }
147}
148
149/// Statistics for a single instrumented channel.
150#[derive(Debug, Clone)]
151pub(crate) struct ChannelStats {
152    pub(crate) id: &'static str,
153    pub(crate) label: Option<&'static str>,
154    pub(crate) channel_type: ChannelType,
155    pub(crate) state: ChannelState,
156    pub(crate) sent_count: u64,
157    pub(crate) received_count: u64,
158    pub(crate) type_name: &'static str,
159    pub(crate) type_size: usize,
160    pub(crate) sent_logs: VecDeque<LogEntry>,
161    pub(crate) received_logs: VecDeque<LogEntry>,
162}
163
164impl ChannelStats {
165    pub fn queued(&self) -> u64 {
166        self.sent_count.saturating_sub(self.received_count)
167    }
168
169    pub fn total_bytes(&self) -> u64 {
170        self.sent_count * self.type_size as u64
171    }
172
173    pub fn queued_bytes(&self) -> u64 {
174        self.queued() * self.type_size as u64
175    }
176}
177
178/// Serializable version of channel statistics for JSON responses.
179#[derive(Debug, Clone, Serialize, Deserialize)]
180pub struct SerializableChannelStats {
181    pub id: String,
182    pub label: String,
183    pub channel_type: ChannelType,
184    pub state: ChannelState,
185    pub sent_count: u64,
186    pub received_count: u64,
187    pub queued: u64,
188    pub type_name: String,
189    pub type_size: usize,
190    pub total_bytes: u64,
191    pub queued_bytes: u64,
192}
193
194impl From<&ChannelStats> for SerializableChannelStats {
195    fn from(stats: &ChannelStats) -> Self {
196        let label = resolve_label(stats.id, stats.label);
197        Self {
198            id: stats.id.to_string(),
199            label,
200            channel_type: stats.channel_type,
201            state: stats.state,
202            sent_count: stats.sent_count,
203            received_count: stats.received_count,
204            queued: stats.queued(),
205            type_name: stats.type_name.to_string(),
206            type_size: stats.type_size,
207            total_bytes: stats.total_bytes(),
208            queued_bytes: stats.queued_bytes(),
209        }
210    }
211}
212
213impl ChannelStats {
214    fn new(
215        id: &'static str,
216        label: Option<&'static str>,
217        channel_type: ChannelType,
218        type_name: &'static str,
219        type_size: usize,
220    ) -> Self {
221        Self {
222            id,
223            label,
224            channel_type,
225            state: ChannelState::default(),
226            sent_count: 0,
227            received_count: 0,
228            type_name,
229            type_size,
230            sent_logs: VecDeque::new(),
231            received_logs: VecDeque::new(),
232        }
233    }
234
235    /// Update the channel state based on sent/received counts.
236    /// Sets state to Full if sent > received, otherwise Active (unless explicitly closed).
237    fn update_state(&mut self) {
238        if self.state == ChannelState::Closed || self.state == ChannelState::Notified {
239            return;
240        }
241
242        if self.sent_count > self.received_count {
243            self.state = ChannelState::Full;
244        } else {
245            self.state = ChannelState::Active;
246        }
247    }
248}
249
250/// Events sent to the background statistics collection thread.
251#[derive(Debug)]
252pub(crate) enum StatsEvent {
253    Created {
254        id: &'static str,
255        display_label: Option<&'static str>,
256        channel_type: ChannelType,
257        type_name: &'static str,
258        type_size: usize,
259    },
260    MessageSent {
261        id: &'static str,
262        log: Option<String>,
263        timestamp: SystemTime,
264    },
265    MessageReceived {
266        id: &'static str,
267        timestamp: SystemTime,
268    },
269    Closed {
270        id: &'static str,
271    },
272    #[allow(dead_code)]
273    Notified {
274        id: &'static str,
275    },
276}
277
278type StatsState = (
279    CbSender<StatsEvent>,
280    Arc<RwLock<HashMap<&'static str, ChannelStats>>>,
281);
282
283/// Global state for statistics collection.
284static STATS_STATE: OnceLock<StatsState> = OnceLock::new();
285
286const DEFAULT_LOG_LIMIT: usize = 50;
287
288fn get_log_limit() -> usize {
289    std::env::var("CHANNELS_CONSOLE_LOG_LIMIT")
290        .ok()
291        .and_then(|s| s.parse().ok())
292        .unwrap_or(DEFAULT_LOG_LIMIT)
293}
294
295/// Initialize the statistics collection system (called on first instrumented channel).
296/// Returns a reference to the global state.
297fn init_stats_state() -> &'static StatsState {
298    STATS_STATE.get_or_init(|| {
299        let (tx, rx) = unbounded::<StatsEvent>();
300        let stats_map = Arc::new(RwLock::new(HashMap::<&'static str, ChannelStats>::new()));
301        let stats_map_clone = Arc::clone(&stats_map);
302
303        std::thread::Builder::new()
304            .name("channel-stats-collector".into())
305            .spawn(move || {
306                while let Ok(event) = rx.recv() {
307                    let mut stats = stats_map_clone.write().unwrap();
308                    match event {
309                        StatsEvent::Created {
310                            id: key,
311                            display_label,
312                            channel_type,
313                            type_name,
314                            type_size,
315                        } => {
316                            stats.insert(
317                                key,
318                                ChannelStats::new(
319                                    key,
320                                    display_label,
321                                    channel_type,
322                                    type_name,
323                                    type_size,
324                                ),
325                            );
326                        }
327                        StatsEvent::MessageSent { id, log, timestamp } => {
328                            if let Some(channel_stats) = stats.get_mut(id) {
329                                channel_stats.sent_count += 1;
330                                channel_stats.update_state();
331
332                                let limit = get_log_limit();
333                                if channel_stats.sent_logs.len() >= limit {
334                                    channel_stats.sent_logs.pop_front();
335                                }
336                                channel_stats.sent_logs.push_back(LogEntry::new(
337                                    channel_stats.sent_count,
338                                    timestamp,
339                                    log,
340                                ));
341                            }
342                        }
343                        StatsEvent::MessageReceived { id, timestamp } => {
344                            if let Some(channel_stats) = stats.get_mut(id) {
345                                channel_stats.received_count += 1;
346                                channel_stats.update_state();
347
348                                let limit = get_log_limit();
349                                if channel_stats.received_logs.len() >= limit {
350                                    channel_stats.received_logs.pop_front();
351                                }
352                                channel_stats.received_logs.push_back(LogEntry::new(
353                                    channel_stats.received_count,
354                                    timestamp,
355                                    None,
356                                ));
357                            }
358                        }
359                        StatsEvent::Closed { id } => {
360                            if let Some(channel_stats) = stats.get_mut(id) {
361                                channel_stats.state = ChannelState::Closed;
362                            }
363                        }
364                        StatsEvent::Notified { id } => {
365                            if let Some(channel_stats) = stats.get_mut(id) {
366                                channel_stats.state = ChannelState::Notified;
367                            }
368                        }
369                    }
370                }
371            })
372            .expect("Failed to spawn channel-stats-collector thread");
373
374        // Spawn the metrics HTTP server in the background
375        // Check environment variable for custom port, default to 6770
376        let port = std::env::var("CHANNELS_CONSOLE_METRICS_PORT")
377            .ok()
378            .and_then(|p| p.parse::<u16>().ok())
379            .unwrap_or(6770);
380        let addr = format!("127.0.0.1:{}", port);
381
382        std::thread::spawn(move || {
383            start_metrics_server(&addr);
384        });
385
386        (tx, stats_map)
387    })
388}
389
390fn resolve_label(id: &'static str, provided: Option<&'static str>) -> String {
391    if let Some(l) = provided {
392        return l.to_string();
393    }
394    if let Some(pos) = id.rfind(':') {
395        let (path, line_part) = id.split_at(pos);
396        let line = &line_part[1..];
397        format!("{}:{}", extract_filename(path), line)
398    } else {
399        extract_filename(id)
400    }
401}
402
403fn extract_filename(path: &str) -> String {
404    let components: Vec<&str> = path.split('/').collect();
405    if components.len() >= 2 {
406        format!(
407            "{}/{}",
408            components[components.len() - 2],
409            components[components.len() - 1]
410        )
411    } else {
412        path.to_string()
413    }
414}
415
416/// Format bytes into human-readable units (B, KB, MB, GB, TB).
417pub fn format_bytes(bytes: u64) -> String {
418    if bytes == 0 {
419        return "0 B".to_string();
420    }
421
422    const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
423    let mut size = bytes as f64;
424    let mut unit_idx = 0;
425
426    while size >= 1024.0 && unit_idx < UNITS.len() - 1 {
427        size /= 1024.0;
428        unit_idx += 1;
429    }
430
431    if unit_idx == 0 {
432        format!("{} {}", bytes, UNITS[unit_idx])
433    } else {
434        format!("{:.1} {}", size, UNITS[unit_idx])
435    }
436}
437
438/// Trait for instrumenting channels.
439///
440/// This trait is not intended for direct use. Use the `instrument!` macro instead.
441#[doc(hidden)]
442pub trait Instrument {
443    type Output;
444    fn instrument(
445        self,
446        channel_id: &'static str,
447        label: Option<&'static str>,
448        capacity: Option<usize>,
449    ) -> Self::Output;
450}
451
452/// Trait for instrumenting channels with message logging.
453///
454/// This trait is not intended for direct use. Use the `instrument!` macro with `log = true` instead.
455#[doc(hidden)]
456pub trait InstrumentLog {
457    type Output;
458    fn instrument_log(
459        self,
460        channel_id: &'static str,
461        label: Option<&'static str>,
462        capacity: Option<usize>,
463    ) -> Self::Output;
464}
465
466cfg_if::cfg_if! {
467    if #[cfg(any(feature = "tokio", feature = "futures"))] {
468        use std::sync::LazyLock;
469        pub static RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
470            tokio::runtime::Builder::new_multi_thread()
471                .enable_time()
472                .build()
473                .unwrap()
474        });
475    }
476}
477
478/// Instrument a channel creation to wrap it with debugging proxies.
479/// Currently only supports bounded, unbounded and oneshot channels.
480///
481/// # Examples
482///
483/// ```
484/// use tokio::sync::mpsc;
485/// use channels_console::instrument;
486///
487/// #[tokio::main]
488/// async fn main() {
489///
490///    // Create channels normally
491///    let (tx, rx) = mpsc::channel::<String>(100);
492///
493///    // Instrument them only when the feature is enabled
494///    #[cfg(feature = "channels-console")]
495///    let (tx, rx) = channels_console::instrument!((tx, rx));
496///
497///    // The channel works exactly the same way
498///    tx.send("Hello".to_string()).await.unwrap();
499/// }
500/// ```
501///
502/// By default, channels are labeled with their file location and line number (e.g., `src/worker.rs:25`). You can provide custom labels for easier identification:
503///
504/// ```rust,no_run
505/// use tokio::sync::mpsc;
506/// use channels_console::instrument;
507/// let (tx, rx) = mpsc::channel::<String>(10);
508/// #[cfg(feature = "channels-console")]
509/// let (tx, rx) = channels_console::instrument!((tx, rx), label = "task-queue");
510/// ```
511///
512/// # Important: Capacity Parameter
513///
514/// **For `std::sync::mpsc` and `futures::channel::mpsc` bounded channels**, you **must** specify the `capacity` parameter
515/// because their APIs don't expose the capacity after creation:
516///
517/// ```rust,no_run
518/// use std::sync::mpsc;
519/// use channels_console::instrument;
520///
521/// // std::sync::mpsc::sync_channel - MUST specify capacity
522/// let (tx, rx) = mpsc::sync_channel::<String>(10);
523/// let (tx, rx) = instrument!((tx, rx), capacity = 10);
524///
525/// // With label
526/// let (tx, rx) = mpsc::sync_channel::<String>(10);
527/// let (tx, rx) = instrument!((tx, rx), label = "my-channel", capacity = 10);
528/// ```
529///
530/// Tokio channels don't require this because their capacity is accessible from the channel handles.
531///
532/// **Message Logging:**
533///
534/// By default, instrumentation only tracks message timestamps. To capture the actual content of messages for debugging,
535/// enable logging with the `log = true` parameter (the message type must implement `std::fmt::Debug`):
536///
537/// ```rust,no_run
538/// use tokio::sync::mpsc;
539/// use channels_console::instrument;
540///
541/// // Enable message logging (requires Debug trait on the message type)
542/// let (tx, rx) = mpsc::channel::<String>(10);
543/// #[cfg(feature = "channels-console")]
544/// let (tx, rx) = channels_console::instrument!((tx, rx), log = true);
545///
546///
547#[macro_export]
548macro_rules! instrument {
549    ($expr:expr) => {{
550        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
551        $crate::Instrument::instrument($expr, CHANNEL_ID, None, None)
552    }};
553
554    ($expr:expr, label = $label:literal) => {{
555        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
556        $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label), None)
557    }};
558
559    ($expr:expr, capacity = $capacity:expr) => {{
560        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
561        const _: usize = $capacity;
562        $crate::Instrument::instrument($expr, CHANNEL_ID, None, Some($capacity))
563    }};
564
565    ($expr:expr, label = $label:literal, capacity = $capacity:expr) => {{
566        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
567        const _: usize = $capacity;
568        $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label), Some($capacity))
569    }};
570
571    ($expr:expr, capacity = $capacity:expr, label = $label:literal) => {{
572        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
573        const _: usize = $capacity;
574        $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label), Some($capacity))
575    }};
576
577    // Variants with log = true
578    ($expr:expr, log = true) => {{
579        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
580        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, None)
581    }};
582
583    ($expr:expr, label = $label:literal, log = true) => {{
584        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
585        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), None)
586    }};
587
588    ($expr:expr, log = true, label = $label:literal) => {{
589        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
590        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), None)
591    }};
592
593    ($expr:expr, capacity = $capacity:expr, log = true) => {{
594        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
595        const _: usize = $capacity;
596        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, Some($capacity))
597    }};
598
599    ($expr:expr, log = true, capacity = $capacity:expr) => {{
600        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
601        const _: usize = $capacity;
602        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, Some($capacity))
603    }};
604
605    ($expr:expr, label = $label:literal, capacity = $capacity:expr, log = true) => {{
606        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
607        const _: usize = $capacity;
608        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
609    }};
610
611    ($expr:expr, label = $label:literal, log = true, capacity = $capacity:expr) => {{
612        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
613        const _: usize = $capacity;
614        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
615    }};
616
617    ($expr:expr, capacity = $capacity:expr, label = $label:literal, log = true) => {{
618        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
619        const _: usize = $capacity;
620        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
621    }};
622
623    ($expr:expr, capacity = $capacity:expr, log = true, label = $label:literal) => {{
624        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
625        const _: usize = $capacity;
626        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
627    }};
628
629    ($expr:expr, log = true, label = $label:literal, capacity = $capacity:expr) => {{
630        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
631        const _: usize = $capacity;
632        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
633    }};
634
635    ($expr:expr, log = true, capacity = $capacity:expr, label = $label:literal) => {{
636        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
637        const _: usize = $capacity;
638        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
639    }};
640}
641
642fn get_channel_stats() -> HashMap<&'static str, ChannelStats> {
643    if let Some((_, stats_map)) = STATS_STATE.get() {
644        stats_map.read().unwrap().clone()
645    } else {
646        HashMap::new()
647    }
648}
649
650fn get_serializable_stats() -> Vec<SerializableChannelStats> {
651    let mut stats: Vec<SerializableChannelStats> = get_channel_stats()
652        .values()
653        .map(SerializableChannelStats::from)
654        .collect();
655
656    stats.sort_by(|a, b| a.id.cmp(&b.id));
657    stats
658}
659
660/// Serializable log response containing sent and received logs.
661#[derive(Debug, Clone, Serialize, Deserialize)]
662pub struct ChannelLogs {
663    pub id: String,
664    pub sent_logs: Vec<LogEntry>,
665    pub received_logs: Vec<LogEntry>,
666}
667
668pub(crate) fn get_channel_logs(channel_id: &str) -> Option<ChannelLogs> {
669    let stats = get_channel_stats();
670    stats.get(channel_id).map(|channel_stats| {
671        let mut sent_logs: Vec<LogEntry> = channel_stats.sent_logs.iter().cloned().collect();
672
673        let mut received_logs: Vec<LogEntry> =
674            channel_stats.received_logs.iter().cloned().collect();
675
676        // Sort by index descending (most recent first)
677        sent_logs.sort_by(|a, b| b.index.cmp(&a.index));
678        received_logs.sort_by(|a, b| b.index.cmp(&a.index));
679
680        ChannelLogs {
681            id: channel_id.to_string(),
682            sent_logs,
683            received_logs,
684        }
685    })
686}