channels_console/
lib.rs

1use crossbeam_channel::{unbounded, Sender as CbSender};
2use serde::{Deserialize, Serialize};
3use std::collections::{HashMap, VecDeque};
4use std::sync::{Arc, OnceLock, RwLock};
5use std::time::Instant;
6
7pub mod channels_guard;
8pub use channels_guard::{ChannelsGuard, ChannelsGuardBuilder};
9
10use crate::http_api::start_metrics_server;
11mod http_api;
12mod wrappers;
13
14/// A single log entry for a message sent or received.
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct LogEntry {
17    pub index: u64,
18    pub timestamp: u64,
19    pub message: Option<String>,
20}
21
22impl LogEntry {
23    pub(crate) fn new(index: u64, timestamp: Instant, message: Option<String>) -> Self {
24        let start_time = START_TIME.get().copied().unwrap_or(timestamp);
25        let timestamp_nanos = timestamp.duration_since(start_time).as_nanos() as u64;
26        Self {
27            index,
28            timestamp: timestamp_nanos,
29            message,
30        }
31    }
32}
33
34/// Type of a channel.
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum ChannelType {
37    Bounded(usize),
38    Unbounded,
39    Oneshot,
40}
41
42impl std::fmt::Display for ChannelType {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        match self {
45            ChannelType::Bounded(size) => write!(f, "bounded[{}]", size),
46            ChannelType::Unbounded => write!(f, "unbounded"),
47            ChannelType::Oneshot => write!(f, "oneshot"),
48        }
49    }
50}
51
52impl Serialize for ChannelType {
53    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
54    where
55        S: serde::Serializer,
56    {
57        serializer.serialize_str(&self.to_string())
58    }
59}
60
61impl<'de> Deserialize<'de> for ChannelType {
62    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
63    where
64        D: serde::Deserializer<'de>,
65    {
66        let s = String::deserialize(deserializer)?;
67
68        match s.as_str() {
69            "unbounded" => Ok(ChannelType::Unbounded),
70            "oneshot" => Ok(ChannelType::Oneshot),
71            _ => {
72                // try: bounded[123]
73                if let Some(inner) = s.strip_prefix("bounded[").and_then(|x| x.strip_suffix(']')) {
74                    let size = inner
75                        .parse()
76                        .map_err(|_| serde::de::Error::custom("invalid bounded size"))?;
77                    Ok(ChannelType::Bounded(size))
78                } else {
79                    Err(serde::de::Error::custom("invalid channel type"))
80                }
81            }
82        }
83    }
84}
85
86/// Format of the output produced by ChannelsGuard on drop.
87#[derive(Clone, Copy, Debug, Default)]
88pub enum Format {
89    #[default]
90    Table,
91    Json,
92    JsonPretty,
93}
94
95/// State of a instrumented channel.
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
97pub enum ChannelState {
98    #[default]
99    Active,
100    Closed,
101    Full,
102    Notified,
103}
104
105impl std::fmt::Display for ChannelState {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        write!(f, "{}", self.as_str())
108    }
109}
110
111impl ChannelState {
112    pub fn as_str(&self) -> &'static str {
113        match self {
114            ChannelState::Active => "active",
115            ChannelState::Closed => "closed",
116            ChannelState::Full => "full",
117            ChannelState::Notified => "notified",
118        }
119    }
120}
121
122impl Serialize for ChannelState {
123    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
124    where
125        S: serde::Serializer,
126    {
127        serializer.serialize_str(self.as_str())
128    }
129}
130
131impl<'de> Deserialize<'de> for ChannelState {
132    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
133    where
134        D: serde::Deserializer<'de>,
135    {
136        let s = String::deserialize(deserializer)?;
137        match s.as_str() {
138            "active" => Ok(ChannelState::Active),
139            "closed" => Ok(ChannelState::Closed),
140            "full" => Ok(ChannelState::Full),
141            "notified" => Ok(ChannelState::Notified),
142            _ => Err(serde::de::Error::custom("invalid channel state")),
143        }
144    }
145}
146
147/// Statistics for a single instrumented channel.
148#[derive(Debug, Clone)]
149pub(crate) struct ChannelStats {
150    pub(crate) id: &'static str,
151    pub(crate) label: Option<&'static str>,
152    pub(crate) channel_type: ChannelType,
153    pub(crate) state: ChannelState,
154    pub(crate) sent_count: u64,
155    pub(crate) received_count: u64,
156    pub(crate) type_name: &'static str,
157    pub(crate) type_size: usize,
158    pub(crate) sent_logs: VecDeque<LogEntry>,
159    pub(crate) received_logs: VecDeque<LogEntry>,
160}
161
162impl ChannelStats {
163    pub fn queued(&self) -> u64 {
164        self.sent_count
165            .saturating_sub(self.received_count)
166            .saturating_sub(1)
167    }
168
169    pub fn total_bytes(&self) -> u64 {
170        self.sent_count * self.type_size as u64
171    }
172
173    pub fn queued_bytes(&self) -> u64 {
174        self.queued() * self.type_size as u64
175    }
176}
177
178/// Serializable version of channel statistics for JSON responses.
179#[derive(Debug, Clone, Serialize, Deserialize)]
180pub struct SerializableChannelStats {
181    pub id: String,
182    pub label: String,
183    pub channel_type: ChannelType,
184    pub state: ChannelState,
185    pub sent_count: u64,
186    pub received_count: u64,
187    pub queued: u64,
188    pub type_name: String,
189    pub type_size: usize,
190    pub total_bytes: u64,
191    pub queued_bytes: u64,
192}
193
194impl From<&ChannelStats> for SerializableChannelStats {
195    fn from(stats: &ChannelStats) -> Self {
196        let label = resolve_label(stats.id, stats.label);
197        Self {
198            id: stats.id.to_string(),
199            label,
200            channel_type: stats.channel_type,
201            state: stats.state,
202            sent_count: stats.sent_count,
203            received_count: stats.received_count,
204            queued: stats.queued(),
205            type_name: stats.type_name.to_string(),
206            type_size: stats.type_size,
207            total_bytes: stats.total_bytes(),
208            queued_bytes: stats.queued_bytes(),
209        }
210    }
211}
212
213impl ChannelStats {
214    fn new(
215        id: &'static str,
216        label: Option<&'static str>,
217        channel_type: ChannelType,
218        type_name: &'static str,
219        type_size: usize,
220    ) -> Self {
221        Self {
222            id,
223            label,
224            channel_type,
225            state: ChannelState::default(),
226            sent_count: 0,
227            received_count: 0,
228            type_name,
229            type_size,
230            sent_logs: VecDeque::new(),
231            received_logs: VecDeque::new(),
232        }
233    }
234
235    fn update_state(&mut self) {
236        if self.state == ChannelState::Closed || self.state == ChannelState::Notified {
237            return;
238        }
239
240        let queued = self.queued();
241        let is_full = match self.channel_type {
242            ChannelType::Bounded(cap) => queued >= cap as u64,
243            ChannelType::Oneshot => queued >= 1,
244            ChannelType::Unbounded => false,
245        };
246
247        if is_full {
248            self.state = ChannelState::Full;
249        } else {
250            self.state = ChannelState::Active;
251        }
252    }
253}
254
255/// Events sent to the background statistics collection thread.
256#[derive(Debug)]
257pub(crate) enum StatsEvent {
258    Created {
259        id: &'static str,
260        display_label: Option<&'static str>,
261        channel_type: ChannelType,
262        type_name: &'static str,
263        type_size: usize,
264    },
265    MessageSent {
266        id: &'static str,
267        log: Option<String>,
268        timestamp: Instant,
269    },
270    MessageReceived {
271        id: &'static str,
272        timestamp: Instant,
273    },
274    Closed {
275        id: &'static str,
276    },
277    #[allow(dead_code)]
278    Notified {
279        id: &'static str,
280    },
281}
282
283type StatsState = (
284    CbSender<StatsEvent>,
285    Arc<RwLock<HashMap<&'static str, ChannelStats>>>,
286);
287
288/// Global state for statistics collection.
289static STATS_STATE: OnceLock<StatsState> = OnceLock::new();
290
291static START_TIME: OnceLock<Instant> = OnceLock::new();
292
293const DEFAULT_LOG_LIMIT: usize = 50;
294
295fn get_log_limit() -> usize {
296    std::env::var("CHANNELS_CONSOLE_LOG_LIMIT")
297        .ok()
298        .and_then(|s| s.parse().ok())
299        .unwrap_or(DEFAULT_LOG_LIMIT)
300}
301
302/// Initialize the statistics collection system (called on first instrumented channel).
303/// Returns a reference to the global state.
304fn init_stats_state() -> &'static StatsState {
305    STATS_STATE.get_or_init(|| {
306        START_TIME.get_or_init(Instant::now);
307
308        let (tx, rx) = unbounded::<StatsEvent>();
309        let stats_map = Arc::new(RwLock::new(HashMap::<&'static str, ChannelStats>::new()));
310        let stats_map_clone = Arc::clone(&stats_map);
311
312        std::thread::Builder::new()
313            .name("channel-stats-collector".into())
314            .spawn(move || {
315                while let Ok(event) = rx.recv() {
316                    let mut stats = stats_map_clone.write().unwrap();
317                    match event {
318                        StatsEvent::Created {
319                            id: key,
320                            display_label,
321                            channel_type,
322                            type_name,
323                            type_size,
324                        } => {
325                            stats.insert(
326                                key,
327                                ChannelStats::new(
328                                    key,
329                                    display_label,
330                                    channel_type,
331                                    type_name,
332                                    type_size,
333                                ),
334                            );
335                        }
336                        StatsEvent::MessageSent { id, log, timestamp } => {
337                            if let Some(channel_stats) = stats.get_mut(id) {
338                                channel_stats.sent_count += 1;
339                                channel_stats.update_state();
340
341                                let limit = get_log_limit();
342                                if channel_stats.sent_logs.len() >= limit {
343                                    channel_stats.sent_logs.pop_front();
344                                }
345                                channel_stats.sent_logs.push_back(LogEntry::new(
346                                    channel_stats.sent_count,
347                                    timestamp,
348                                    log,
349                                ));
350                            }
351                        }
352                        StatsEvent::MessageReceived { id, timestamp } => {
353                            if let Some(channel_stats) = stats.get_mut(id) {
354                                channel_stats.received_count += 1;
355                                channel_stats.update_state();
356
357                                let limit = get_log_limit();
358                                if channel_stats.received_logs.len() >= limit {
359                                    channel_stats.received_logs.pop_front();
360                                }
361                                channel_stats.received_logs.push_back(LogEntry::new(
362                                    channel_stats.received_count,
363                                    timestamp,
364                                    None,
365                                ));
366                            }
367                        }
368                        StatsEvent::Closed { id } => {
369                            if let Some(channel_stats) = stats.get_mut(id) {
370                                channel_stats.state = ChannelState::Closed;
371                            }
372                        }
373                        StatsEvent::Notified { id } => {
374                            if let Some(channel_stats) = stats.get_mut(id) {
375                                channel_stats.state = ChannelState::Notified;
376                            }
377                        }
378                    }
379                }
380            })
381            .expect("Failed to spawn channel-stats-collector thread");
382
383        // Spawn the metrics HTTP server in the background
384        // Check environment variable for custom port, default to 6770
385        let port = std::env::var("CHANNELS_CONSOLE_METRICS_PORT")
386            .ok()
387            .and_then(|p| p.parse::<u16>().ok())
388            .unwrap_or(6770);
389        let addr = format!("127.0.0.1:{}", port);
390
391        std::thread::spawn(move || {
392            start_metrics_server(&addr);
393        });
394
395        (tx, stats_map)
396    })
397}
398
399fn resolve_label(id: &'static str, provided: Option<&'static str>) -> String {
400    if let Some(l) = provided {
401        return l.to_string();
402    }
403    if let Some(pos) = id.rfind(':') {
404        let (path, line_part) = id.split_at(pos);
405        let line = &line_part[1..];
406        format!("{}:{}", extract_filename(path), line)
407    } else {
408        extract_filename(id)
409    }
410}
411
412fn extract_filename(path: &str) -> String {
413    let components: Vec<&str> = path.split('/').collect();
414    if components.len() >= 2 {
415        format!(
416            "{}/{}",
417            components[components.len() - 2],
418            components[components.len() - 1]
419        )
420    } else {
421        path.to_string()
422    }
423}
424
425/// Format bytes into human-readable units (B, KB, MB, GB, TB).
426pub fn format_bytes(bytes: u64) -> String {
427    if bytes == 0 {
428        return "0 B".to_string();
429    }
430
431    const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
432    let mut size = bytes as f64;
433    let mut unit_idx = 0;
434
435    while size >= 1024.0 && unit_idx < UNITS.len() - 1 {
436        size /= 1024.0;
437        unit_idx += 1;
438    }
439
440    if unit_idx == 0 {
441        format!("{} {}", bytes, UNITS[unit_idx])
442    } else {
443        format!("{:.1} {}", size, UNITS[unit_idx])
444    }
445}
446
447/// Trait for instrumenting channels.
448///
449/// This trait is not intended for direct use. Use the `instrument!` macro instead.
450#[doc(hidden)]
451pub trait Instrument {
452    type Output;
453    fn instrument(
454        self,
455        channel_id: &'static str,
456        label: Option<&'static str>,
457        capacity: Option<usize>,
458    ) -> Self::Output;
459}
460
461/// Trait for instrumenting channels with message logging.
462///
463/// This trait is not intended for direct use. Use the `instrument!` macro with `log = true` instead.
464#[doc(hidden)]
465pub trait InstrumentLog {
466    type Output;
467    fn instrument_log(
468        self,
469        channel_id: &'static str,
470        label: Option<&'static str>,
471        capacity: Option<usize>,
472    ) -> Self::Output;
473}
474
475cfg_if::cfg_if! {
476    if #[cfg(any(feature = "tokio", feature = "futures"))] {
477        use std::sync::LazyLock;
478        pub static RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
479            tokio::runtime::Builder::new_multi_thread()
480                .enable_time()
481                .build()
482                .unwrap()
483        });
484    }
485}
486
487/// Instrument a channel creation to wrap it with debugging proxies.
488/// Currently only supports bounded, unbounded and oneshot channels.
489///
490/// # Examples
491///
492/// ```
493/// use tokio::sync::mpsc;
494/// use channels_console::instrument;
495///
496/// #[tokio::main]
497/// async fn main() {
498///
499///    // Create channels normally
500///    let (tx, rx) = mpsc::channel::<String>(100);
501///
502///    // Instrument them only when the feature is enabled
503///    #[cfg(feature = "channels-console")]
504///    let (tx, rx) = channels_console::instrument!((tx, rx));
505///
506///    // The channel works exactly the same way
507///    tx.send("Hello".to_string()).await.unwrap();
508/// }
509/// ```
510///
511/// By default, channels are labeled with their file location and line number (e.g., `src/worker.rs:25`). You can provide custom labels for easier identification:
512///
513/// ```rust,no_run
514/// use tokio::sync::mpsc;
515/// use channels_console::instrument;
516/// let (tx, rx) = mpsc::channel::<String>(10);
517/// #[cfg(feature = "channels-console")]
518/// let (tx, rx) = channels_console::instrument!((tx, rx), label = "task-queue");
519/// ```
520///
521/// # Important: Capacity Parameter
522///
523/// **For `std::sync::mpsc` and `futures::channel::mpsc` bounded channels**, you **must** specify the `capacity` parameter
524/// because their APIs don't expose the capacity after creation:
525///
526/// ```rust,no_run
527/// use std::sync::mpsc;
528/// use channels_console::instrument;
529///
530/// // std::sync::mpsc::sync_channel - MUST specify capacity
531/// let (tx, rx) = mpsc::sync_channel::<String>(10);
532/// let (tx, rx) = instrument!((tx, rx), capacity = 10);
533///
534/// // With label
535/// let (tx, rx) = mpsc::sync_channel::<String>(10);
536/// let (tx, rx) = instrument!((tx, rx), label = "my-channel", capacity = 10);
537/// ```
538///
539/// Tokio channels don't require this because their capacity is accessible from the channel handles.
540///
541/// **Message Logging:**
542///
543/// By default, instrumentation only tracks message timestamps. To capture the actual content of messages for debugging,
544/// enable logging with the `log = true` parameter (the message type must implement `std::fmt::Debug`):
545///
546/// ```rust,no_run
547/// use tokio::sync::mpsc;
548/// use channels_console::instrument;
549///
550/// // Enable message logging (requires Debug trait on the message type)
551/// let (tx, rx) = mpsc::channel::<String>(10);
552/// #[cfg(feature = "channels-console")]
553/// let (tx, rx) = channels_console::instrument!((tx, rx), log = true);
554///
555///
556#[macro_export]
557macro_rules! instrument {
558    ($expr:expr) => {{
559        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
560        $crate::Instrument::instrument($expr, CHANNEL_ID, None, None)
561    }};
562
563    ($expr:expr, label = $label:literal) => {{
564        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
565        $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label), None)
566    }};
567
568    ($expr:expr, capacity = $capacity:expr) => {{
569        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
570        const _: usize = $capacity;
571        $crate::Instrument::instrument($expr, CHANNEL_ID, None, Some($capacity))
572    }};
573
574    ($expr:expr, label = $label:literal, capacity = $capacity:expr) => {{
575        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
576        const _: usize = $capacity;
577        $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label), Some($capacity))
578    }};
579
580    ($expr:expr, capacity = $capacity:expr, label = $label:literal) => {{
581        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
582        const _: usize = $capacity;
583        $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label), Some($capacity))
584    }};
585
586    // Variants with log = true
587    ($expr:expr, log = true) => {{
588        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
589        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, None)
590    }};
591
592    ($expr:expr, label = $label:literal, log = true) => {{
593        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
594        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), None)
595    }};
596
597    ($expr:expr, log = true, label = $label:literal) => {{
598        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
599        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), None)
600    }};
601
602    ($expr:expr, capacity = $capacity:expr, log = true) => {{
603        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
604        const _: usize = $capacity;
605        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, Some($capacity))
606    }};
607
608    ($expr:expr, log = true, capacity = $capacity:expr) => {{
609        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
610        const _: usize = $capacity;
611        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, Some($capacity))
612    }};
613
614    ($expr:expr, label = $label:literal, capacity = $capacity:expr, log = true) => {{
615        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
616        const _: usize = $capacity;
617        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
618    }};
619
620    ($expr:expr, label = $label:literal, log = true, capacity = $capacity:expr) => {{
621        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
622        const _: usize = $capacity;
623        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
624    }};
625
626    ($expr:expr, capacity = $capacity:expr, label = $label:literal, log = true) => {{
627        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
628        const _: usize = $capacity;
629        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
630    }};
631
632    ($expr:expr, capacity = $capacity:expr, log = true, label = $label:literal) => {{
633        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
634        const _: usize = $capacity;
635        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
636    }};
637
638    ($expr:expr, log = true, label = $label:literal, capacity = $capacity:expr) => {{
639        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
640        const _: usize = $capacity;
641        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
642    }};
643
644    ($expr:expr, log = true, capacity = $capacity:expr, label = $label:literal) => {{
645        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
646        const _: usize = $capacity;
647        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
648    }};
649}
650
651fn get_channel_stats() -> HashMap<&'static str, ChannelStats> {
652    if let Some((_, stats_map)) = STATS_STATE.get() {
653        stats_map.read().unwrap().clone()
654    } else {
655        HashMap::new()
656    }
657}
658
659fn get_serializable_stats() -> Vec<SerializableChannelStats> {
660    let mut stats: Vec<SerializableChannelStats> = get_channel_stats()
661        .values()
662        .map(SerializableChannelStats::from)
663        .collect();
664
665    stats.sort_by(|a, b| a.id.cmp(&b.id));
666    stats
667}
668
669/// Serializable log response containing sent and received logs.
670#[derive(Debug, Clone, Serialize, Deserialize)]
671pub struct ChannelLogs {
672    pub id: String,
673    pub sent_logs: Vec<LogEntry>,
674    pub received_logs: Vec<LogEntry>,
675}
676
677pub(crate) fn get_channel_logs(channel_id: &str) -> Option<ChannelLogs> {
678    let stats = get_channel_stats();
679    stats.get(channel_id).map(|channel_stats| {
680        let mut sent_logs: Vec<LogEntry> = channel_stats.sent_logs.iter().cloned().collect();
681
682        let mut received_logs: Vec<LogEntry> =
683            channel_stats.received_logs.iter().cloned().collect();
684
685        // Sort by index descending (most recent first)
686        sent_logs.sort_by(|a, b| b.index.cmp(&a.index));
687        received_logs.sort_by(|a, b| b.index.cmp(&a.index));
688
689        ChannelLogs {
690            id: channel_id.to_string(),
691            sent_logs,
692            received_logs,
693        }
694    })
695}