channels_console/
lib.rs

1use crossbeam_channel::{unbounded, Sender as CbSender};
2use serde::{Deserialize, Serialize};
3use std::collections::{HashMap, VecDeque};
4use std::sync::atomic::AtomicU64;
5use std::sync::{Arc, OnceLock, RwLock};
6use std::time::Instant;
7
8pub mod channels_guard;
9pub use channels_guard::{ChannelsGuard, ChannelsGuardBuilder};
10
11use crate::http_api::start_metrics_server;
12mod http_api;
13mod stream_wrappers;
14mod wrappers;
15
16/// A single log entry for a message sent or received.
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct LogEntry {
19    pub index: u64,
20    pub timestamp: u64,
21    pub message: Option<String>,
22}
23
24impl LogEntry {
25    pub(crate) fn new(index: u64, timestamp: Instant, message: Option<String>) -> Self {
26        let start_time = START_TIME.get().copied().unwrap_or(timestamp);
27        let timestamp_nanos = timestamp.duration_since(start_time).as_nanos() as u64;
28        Self {
29            index,
30            timestamp: timestamp_nanos,
31            message,
32        }
33    }
34}
35
36/// Type of a channel.
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum ChannelType {
39    Bounded(usize),
40    Unbounded,
41    Oneshot,
42}
43
44impl std::fmt::Display for ChannelType {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        match self {
47            ChannelType::Bounded(size) => write!(f, "bounded[{}]", size),
48            ChannelType::Unbounded => write!(f, "unbounded"),
49            ChannelType::Oneshot => write!(f, "oneshot"),
50        }
51    }
52}
53
54impl Serialize for ChannelType {
55    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
56    where
57        S: serde::Serializer,
58    {
59        serializer.serialize_str(&self.to_string())
60    }
61}
62
63impl<'de> Deserialize<'de> for ChannelType {
64    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
65    where
66        D: serde::Deserializer<'de>,
67    {
68        let s = String::deserialize(deserializer)?;
69
70        match s.as_str() {
71            "unbounded" => Ok(ChannelType::Unbounded),
72            "oneshot" => Ok(ChannelType::Oneshot),
73            _ => {
74                // try: bounded[123]
75                if let Some(inner) = s.strip_prefix("bounded[").and_then(|x| x.strip_suffix(']')) {
76                    let size = inner
77                        .parse()
78                        .map_err(|_| serde::de::Error::custom("invalid bounded size"))?;
79                    Ok(ChannelType::Bounded(size))
80                } else {
81                    Err(serde::de::Error::custom("invalid channel type"))
82                }
83            }
84        }
85    }
86}
87
88/// Format of the output produced by ChannelsGuard on drop.
89#[derive(Clone, Copy, Debug, Default)]
90pub enum Format {
91    #[default]
92    Table,
93    Json,
94    JsonPretty,
95}
96
97/// State of a instrumented channel.
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
99pub enum ChannelState {
100    #[default]
101    Active,
102    Closed,
103    Full,
104    Notified,
105}
106
107impl std::fmt::Display for ChannelState {
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109        write!(f, "{}", self.as_str())
110    }
111}
112
113impl ChannelState {
114    pub fn as_str(&self) -> &'static str {
115        match self {
116            ChannelState::Active => "active",
117            ChannelState::Closed => "closed",
118            ChannelState::Full => "full",
119            ChannelState::Notified => "notified",
120        }
121    }
122}
123
124impl Serialize for ChannelState {
125    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
126    where
127        S: serde::Serializer,
128    {
129        serializer.serialize_str(self.as_str())
130    }
131}
132
133impl<'de> Deserialize<'de> for ChannelState {
134    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
135    where
136        D: serde::Deserializer<'de>,
137    {
138        let s = String::deserialize(deserializer)?;
139        match s.as_str() {
140            "active" => Ok(ChannelState::Active),
141            "closed" => Ok(ChannelState::Closed),
142            "full" => Ok(ChannelState::Full),
143            "notified" => Ok(ChannelState::Notified),
144            _ => Err(serde::de::Error::custom("invalid channel state")),
145        }
146    }
147}
148
149/// Statistics for a single instrumented channel.
150#[derive(Debug, Clone)]
151pub(crate) struct ChannelStats {
152    pub(crate) id: u64,
153    pub(crate) source: &'static str,
154    pub(crate) label: Option<String>,
155    pub(crate) channel_type: ChannelType,
156    pub(crate) state: ChannelState,
157    pub(crate) sent_count: u64,
158    pub(crate) received_count: u64,
159    pub(crate) type_name: &'static str,
160    pub(crate) type_size: usize,
161    pub(crate) sent_logs: VecDeque<LogEntry>,
162    pub(crate) received_logs: VecDeque<LogEntry>,
163    pub(crate) iter: u32,
164}
165
166impl ChannelStats {
167    pub fn queued(&self) -> u64 {
168        self.sent_count
169            .saturating_sub(self.received_count)
170            .saturating_sub(1)
171    }
172
173    pub fn queued_bytes(&self) -> u64 {
174        self.queued() * self.type_size as u64
175    }
176}
177
178/// Statistics for a single instrumented stream.
179#[derive(Debug, Clone)]
180pub(crate) struct StreamStats {
181    pub(crate) id: u64,
182    pub(crate) source: &'static str,
183    pub(crate) label: Option<String>,
184    pub(crate) state: ChannelState, // Only Active or Closed
185    pub(crate) items_yielded: u64,
186    pub(crate) type_name: &'static str,
187    pub(crate) type_size: usize,
188    pub(crate) logs: VecDeque<LogEntry>,
189    pub(crate) iter: u32,
190}
191
192/// Wrapper for channels-only JSON response
193#[derive(Debug, Clone, Serialize, Deserialize)]
194pub struct ChannelsJson {
195    /// Current elapsed time since program start in nanoseconds
196    pub current_elapsed_ns: u64,
197    /// Channel statistics
198    pub channels: Vec<SerializableChannelStats>,
199}
200
201/// Wrapper for streams-only JSON response
202#[derive(Debug, Clone, Serialize, Deserialize)]
203pub struct StreamsJson {
204    /// Current elapsed time since program start in nanoseconds
205    pub current_elapsed_ns: u64,
206    /// Stream statistics
207    pub streams: Vec<SerializableStreamStats>,
208}
209
210/// Combined wrapper for both channels and streams JSON response
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct CombinedJson {
213    /// Current elapsed time since program start in nanoseconds
214    pub current_elapsed_ns: u64,
215    /// Channel statistics
216    pub channels: Vec<SerializableChannelStats>,
217    /// Stream statistics
218    pub streams: Vec<SerializableStreamStats>,
219}
220
221/// Serializable version of channel statistics for JSON responses.
222#[derive(Debug, Clone, Serialize, Deserialize)]
223pub struct SerializableChannelStats {
224    pub id: u64,
225    pub source: String,
226    pub label: String,
227    pub has_custom_label: bool,
228    pub channel_type: ChannelType,
229    pub state: ChannelState,
230    pub sent_count: u64,
231    pub received_count: u64,
232    pub queued: u64,
233    pub type_name: String,
234    pub type_size: usize,
235    pub queued_bytes: u64,
236    pub iter: u32,
237}
238
239/// Serializable version of stream statistics for JSON responses.
240#[derive(Debug, Clone, Serialize, Deserialize)]
241pub struct SerializableStreamStats {
242    pub id: u64,
243    pub source: String,
244    pub label: String,
245    pub has_custom_label: bool,
246    pub state: ChannelState,
247    pub items_yielded: u64,
248    pub type_name: String,
249    pub type_size: usize,
250    pub iter: u32,
251}
252
253impl From<&ChannelStats> for SerializableChannelStats {
254    fn from(channel_stats: &ChannelStats) -> Self {
255        let label = resolve_label(
256            channel_stats.source,
257            channel_stats.label.as_deref(),
258            channel_stats.iter,
259        );
260
261        Self {
262            id: channel_stats.id,
263            source: channel_stats.source.to_string(),
264            label,
265            has_custom_label: channel_stats.label.is_some(),
266            channel_type: channel_stats.channel_type,
267            state: channel_stats.state,
268            sent_count: channel_stats.sent_count,
269            received_count: channel_stats.received_count,
270            queued: channel_stats.queued(),
271            type_name: channel_stats.type_name.to_string(),
272            type_size: channel_stats.type_size,
273            queued_bytes: channel_stats.queued_bytes(),
274            iter: channel_stats.iter,
275        }
276    }
277}
278
279impl From<&StreamStats> for SerializableStreamStats {
280    fn from(stream_stats: &StreamStats) -> Self {
281        let label = resolve_label(
282            stream_stats.source,
283            stream_stats.label.as_deref(),
284            stream_stats.iter,
285        );
286
287        Self {
288            id: stream_stats.id,
289            source: stream_stats.source.to_string(),
290            label,
291            has_custom_label: stream_stats.label.is_some(),
292            state: stream_stats.state,
293            items_yielded: stream_stats.items_yielded,
294            type_name: stream_stats.type_name.to_string(),
295            type_size: stream_stats.type_size,
296            iter: stream_stats.iter,
297        }
298    }
299}
300
301impl ChannelStats {
302    fn new(
303        id: u64,
304        source: &'static str,
305        label: Option<String>,
306        channel_type: ChannelType,
307        type_name: &'static str,
308        type_size: usize,
309        iter: u32,
310    ) -> Self {
311        Self {
312            id,
313            source,
314            label,
315            channel_type,
316            state: ChannelState::default(),
317            sent_count: 0,
318            received_count: 0,
319            type_name,
320            type_size,
321            sent_logs: VecDeque::new(),
322            received_logs: VecDeque::new(),
323            iter,
324        }
325    }
326
327    fn update_state(&mut self) {
328        if self.state == ChannelState::Closed || self.state == ChannelState::Notified {
329            return;
330        }
331
332        let queued = self.queued();
333        let is_full = match self.channel_type {
334            ChannelType::Bounded(cap) => queued >= cap as u64,
335            ChannelType::Oneshot => queued >= 1,
336            ChannelType::Unbounded => false,
337        };
338
339        if is_full {
340            self.state = ChannelState::Full;
341        } else {
342            self.state = ChannelState::Active;
343        }
344    }
345}
346
347impl StreamStats {
348    fn new(
349        id: u64,
350        source: &'static str,
351        label: Option<String>,
352        type_name: &'static str,
353        type_size: usize,
354        iter: u32,
355    ) -> Self {
356        Self {
357            id,
358            source,
359            label,
360            state: ChannelState::Active,
361            items_yielded: 0,
362            type_name,
363            type_size,
364            logs: VecDeque::new(),
365            iter,
366        }
367    }
368}
369
370/// Events sent to the background channel statistics collection thread.
371#[derive(Debug)]
372pub(crate) enum ChannelEvent {
373    Created {
374        id: u64,
375        source: &'static str,
376        display_label: Option<String>,
377        channel_type: ChannelType,
378        type_name: &'static str,
379        type_size: usize,
380    },
381    MessageSent {
382        id: u64,
383        log: Option<String>,
384        timestamp: Instant,
385    },
386    MessageReceived {
387        id: u64,
388        timestamp: Instant,
389    },
390    Closed {
391        id: u64,
392    },
393    #[allow(dead_code)]
394    Notified {
395        id: u64,
396    },
397}
398
399/// Events sent to the background stream statistics collection thread.
400#[derive(Debug)]
401pub(crate) enum StreamEvent {
402    Created {
403        id: u64,
404        source: &'static str,
405        display_label: Option<String>,
406        type_name: &'static str,
407        type_size: usize,
408    },
409    Yielded {
410        id: u64,
411        log: Option<String>,
412        timestamp: Instant,
413    },
414    Completed {
415        id: u64,
416    },
417}
418
419type ChannelStatsState = (
420    CbSender<ChannelEvent>,
421    Arc<RwLock<HashMap<u64, ChannelStats>>>,
422);
423type StreamStatsState = (
424    CbSender<StreamEvent>,
425    Arc<RwLock<HashMap<u64, StreamStats>>>,
426);
427
428static CHANNELS_STATE: OnceLock<ChannelStatsState> = OnceLock::new();
429
430static STREAMS_STATE: OnceLock<StreamStatsState> = OnceLock::new();
431
432static START_TIME: OnceLock<Instant> = OnceLock::new();
433
434pub(crate) static CHANNEL_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
435
436pub(crate) static STREAM_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
437
438const DEFAULT_LOG_LIMIT: usize = 50;
439
440fn get_log_limit() -> usize {
441    std::env::var("CHANNELS_CONSOLE_LOG_LIMIT")
442        .ok()
443        .and_then(|s| s.parse().ok())
444        .unwrap_or(DEFAULT_LOG_LIMIT)
445}
446
447/// Initialize the channel statistics collection system (called on first instrumented channel).
448/// Returns a reference to the global state.
449pub(crate) fn init_channels_state() -> &'static ChannelStatsState {
450    CHANNELS_STATE.get_or_init(|| {
451        START_TIME.get_or_init(Instant::now);
452
453        let (tx, rx) = unbounded::<ChannelEvent>();
454        let stats_map = Arc::new(RwLock::new(HashMap::<u64, ChannelStats>::new()));
455        let stats_map_clone = Arc::clone(&stats_map);
456
457        std::thread::Builder::new()
458            .name("channel-stats-collector".into())
459            .spawn(move || {
460                while let Ok(event) = rx.recv() {
461                    let mut stats = stats_map_clone.write().unwrap();
462                    match event {
463                        ChannelEvent::Created {
464                            id,
465                            source,
466                            display_label,
467                            channel_type,
468                            type_name,
469                            type_size,
470                        } => {
471                            // Count existing items with the same source location
472                            let iter = stats.values().filter(|s| s.source == source).count() as u32;
473
474                            stats.insert(
475                                id,
476                                ChannelStats::new(
477                                    id,
478                                    source,
479                                    display_label,
480                                    channel_type,
481                                    type_name,
482                                    type_size,
483                                    iter,
484                                ),
485                            );
486                        }
487                        ChannelEvent::MessageSent { id, log, timestamp } => {
488                            if let Some(channel_stats) = stats.get_mut(&id) {
489                                channel_stats.sent_count += 1;
490                                channel_stats.update_state();
491
492                                let limit = get_log_limit();
493                                if channel_stats.sent_logs.len() >= limit {
494                                    channel_stats.sent_logs.pop_front();
495                                }
496                                channel_stats.sent_logs.push_back(LogEntry::new(
497                                    channel_stats.sent_count,
498                                    timestamp,
499                                    log,
500                                ));
501                            }
502                        }
503                        ChannelEvent::MessageReceived { id, timestamp } => {
504                            if let Some(channel_stats) = stats.get_mut(&id) {
505                                channel_stats.received_count += 1;
506                                channel_stats.update_state();
507
508                                let limit = get_log_limit();
509                                if channel_stats.received_logs.len() >= limit {
510                                    channel_stats.received_logs.pop_front();
511                                }
512                                channel_stats.received_logs.push_back(LogEntry::new(
513                                    channel_stats.received_count,
514                                    timestamp,
515                                    None,
516                                ));
517                            }
518                        }
519                        ChannelEvent::Closed { id } => {
520                            if let Some(channel_stats) = stats.get_mut(&id) {
521                                channel_stats.state = ChannelState::Closed;
522                            }
523                        }
524                        ChannelEvent::Notified { id } => {
525                            if let Some(channel_stats) = stats.get_mut(&id) {
526                                channel_stats.state = ChannelState::Notified;
527                            }
528                        }
529                    }
530                }
531            })
532            .expect("Failed to spawn channel-stats-collector thread");
533
534        // Spawn the metrics HTTP server in the background
535        // Check environment variable for custom port, default to 6770
536        let port = std::env::var("CHANNELS_CONSOLE_METRICS_PORT")
537            .ok()
538            .and_then(|p| p.parse::<u16>().ok())
539            .unwrap_or(6770);
540        let addr = format!("127.0.0.1:{}", port);
541
542        std::thread::spawn(move || {
543            start_metrics_server(&addr);
544        });
545
546        (tx, stats_map)
547    })
548}
549
550/// Initialize the stream statistics collection system (called on first instrumented stream).
551/// Returns a reference to the global state.
552pub(crate) fn init_streams_state() -> &'static StreamStatsState {
553    STREAMS_STATE.get_or_init(|| {
554        START_TIME.get_or_init(Instant::now);
555
556        let (tx, rx) = unbounded::<StreamEvent>();
557        let stats_map = Arc::new(RwLock::new(HashMap::<u64, StreamStats>::new()));
558        let stats_map_clone = Arc::clone(&stats_map);
559
560        std::thread::Builder::new()
561            .name("stream-stats-collector".into())
562            .spawn(move || {
563                while let Ok(event) = rx.recv() {
564                    let mut stats = stats_map_clone.write().unwrap();
565                    match event {
566                        StreamEvent::Created {
567                            id,
568                            source,
569                            display_label,
570                            type_name,
571                            type_size,
572                        } => {
573                            // Count existing items with the same source location
574                            let iter = stats.values().filter(|s| s.source == source).count() as u32;
575
576                            stats.insert(
577                                id,
578                                StreamStats::new(
579                                    id,
580                                    source,
581                                    display_label,
582                                    type_name,
583                                    type_size,
584                                    iter,
585                                ),
586                            );
587                        }
588                        StreamEvent::Yielded { id, log, timestamp } => {
589                            if let Some(stream_stats) = stats.get_mut(&id) {
590                                stream_stats.items_yielded += 1;
591
592                                let limit = get_log_limit();
593                                if stream_stats.logs.len() >= limit {
594                                    stream_stats.logs.pop_front();
595                                }
596                                stream_stats.logs.push_back(LogEntry::new(
597                                    stream_stats.items_yielded,
598                                    timestamp,
599                                    log,
600                                ));
601                            }
602                        }
603                        StreamEvent::Completed { id } => {
604                            if let Some(stream_stats) = stats.get_mut(&id) {
605                                stream_stats.state = ChannelState::Closed;
606                            }
607                        }
608                    }
609                }
610            })
611            .expect("Failed to spawn stream-stats-collector thread");
612
613        (tx, stats_map)
614    })
615}
616
617fn resolve_label(id: &'static str, provided: Option<&str>, iter: u32) -> String {
618    let base_label = if let Some(l) = provided {
619        l.to_string()
620    } else if let Some(pos) = id.rfind(':') {
621        let (path, line_part) = id.split_at(pos);
622        let line = &line_part[1..];
623        format!("{}:{}", extract_filename(path), line)
624    } else {
625        extract_filename(id)
626    };
627
628    if iter > 0 {
629        format!("{}-{}", base_label, iter + 1)
630    } else {
631        base_label
632    }
633}
634
635fn extract_filename(path: &str) -> String {
636    let components: Vec<&str> = path.split('/').collect();
637    if components.len() >= 2 {
638        format!(
639            "{}/{}",
640            components[components.len() - 2],
641            components[components.len() - 1]
642        )
643    } else {
644        path.to_string()
645    }
646}
647
648/// Format bytes into human-readable units (B, KB, MB, GB, TB).
649pub fn format_bytes(bytes: u64) -> String {
650    if bytes == 0 {
651        return "0 B".to_string();
652    }
653
654    const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
655    let mut size = bytes as f64;
656    let mut unit_idx = 0;
657
658    while size >= 1024.0 && unit_idx < UNITS.len() - 1 {
659        size /= 1024.0;
660        unit_idx += 1;
661    }
662
663    if unit_idx == 0 {
664        format!("{} {}", bytes, UNITS[unit_idx])
665    } else {
666        format!("{:.1} {}", size, UNITS[unit_idx])
667    }
668}
669
670/// Trait for instrumenting channels.
671///
672/// This trait is not intended for direct use. Use the `channel!` macro instead.
673#[doc(hidden)]
674pub trait Instrument {
675    type Output;
676    fn instrument(
677        self,
678        source: &'static str,
679        label: Option<String>,
680        capacity: Option<usize>,
681    ) -> Self::Output;
682}
683
684/// Trait for instrumenting channels with message logging.
685///
686/// This trait is not intended for direct use. Use the `channel!` macro with `log = true` instead.
687#[doc(hidden)]
688pub trait InstrumentLog {
689    type Output;
690    fn instrument_log(
691        self,
692        source: &'static str,
693        label: Option<String>,
694        capacity: Option<usize>,
695    ) -> Self::Output;
696}
697
698/// Trait for instrumenting streams.
699///
700/// This trait is not intended for direct use. Use the `stream!` macro instead.
701#[doc(hidden)]
702pub trait InstrumentStream {
703    type Output;
704    fn instrument_stream(self, source: &'static str, label: Option<String>) -> Self::Output;
705}
706
707/// Trait for instrumenting streams with message logging.
708///
709/// This trait is not intended for direct use. Use the `stream!` macro with `log = true` instead.
710#[doc(hidden)]
711pub trait InstrumentStreamLog {
712    type Output;
713    fn instrument_stream_log(self, source: &'static str, label: Option<String>) -> Self::Output;
714}
715
716// Implement InstrumentStream for all Stream types
717impl<S> InstrumentStream for S
718where
719    S: futures_util::Stream,
720{
721    type Output = stream_wrappers::InstrumentedStream<S>;
722
723    fn instrument_stream(self, source: &'static str, label: Option<String>) -> Self::Output {
724        stream_wrappers::InstrumentedStream::new(self, source, label)
725    }
726}
727
728// Implement InstrumentStreamLog for all Stream types with Debug items
729impl<S> InstrumentStreamLog for S
730where
731    S: futures_util::Stream,
732    S::Item: std::fmt::Debug,
733{
734    type Output = stream_wrappers::InstrumentedStreamLog<S>;
735
736    fn instrument_stream_log(self, source: &'static str, label: Option<String>) -> Self::Output {
737        stream_wrappers::InstrumentedStreamLog::new(self, source, label)
738    }
739}
740
741cfg_if::cfg_if! {
742    if #[cfg(any(feature = "tokio", feature = "futures"))] {
743        use std::sync::LazyLock;
744        pub static RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
745            tokio::runtime::Builder::new_multi_thread()
746                .enable_time()
747                .build()
748                .unwrap()
749        });
750    }
751}
752
753/// Instrument a channel creation to wrap it with debugging proxies.
754/// Currently only supports bounded, unbounded and oneshot channels.
755///
756/// # Examples
757///
758/// ```
759/// use tokio::sync::mpsc;
760/// use channels_console::channel;
761///
762/// #[tokio::main]
763/// async fn main() {
764///    // Create channels normally
765///    let (tx, rx) = mpsc::channel::<String>(100);
766///
767///    // Instrument them only when the feature is enabled
768///    #[cfg(feature = "channels-console")]
769///    let (tx, rx) = channels_console::channel!((tx, rx));
770///
771///    // The channel works exactly the same way
772///    tx.send("Hello".to_string()).await.unwrap();
773/// }
774/// ```
775///
776/// See the `channel!` macro documentation for full usage details.
777#[macro_export]
778macro_rules! channel {
779    ($expr:expr) => {{
780        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
781        $crate::Instrument::instrument($expr, CHANNEL_ID, None, None)
782    }};
783
784    ($expr:expr, label = $label:expr) => {{
785        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
786        $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label.to_string()), None)
787    }};
788
789    ($expr:expr, capacity = $capacity:expr) => {{
790        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
791        const _: usize = $capacity;
792        $crate::Instrument::instrument($expr, CHANNEL_ID, None, Some($capacity))
793    }};
794
795    ($expr:expr, label = $label:expr, capacity = $capacity:expr) => {{
796        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
797        const _: usize = $capacity;
798        $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label.to_string()), Some($capacity))
799    }};
800
801    ($expr:expr, capacity = $capacity:expr, label = $label:expr) => {{
802        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
803        const _: usize = $capacity;
804        $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label.to_string()), Some($capacity))
805    }};
806
807    // Variants with log = true
808    ($expr:expr, log = true) => {{
809        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
810        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, None)
811    }};
812
813    ($expr:expr, label = $label:expr, log = true) => {{
814        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
815        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label.to_string()), None)
816    }};
817
818    ($expr:expr, log = true, label = $label:expr) => {{
819        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
820        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label.to_string()), None)
821    }};
822
823    ($expr:expr, capacity = $capacity:expr, log = true) => {{
824        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
825        const _: usize = $capacity;
826        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, Some($capacity))
827    }};
828
829    ($expr:expr, log = true, capacity = $capacity:expr) => {{
830        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
831        const _: usize = $capacity;
832        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, Some($capacity))
833    }};
834
835    ($expr:expr, label = $label:expr, capacity = $capacity:expr, log = true) => {{
836        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
837        const _: usize = $capacity;
838        $crate::InstrumentLog::instrument_log(
839            $expr,
840            CHANNEL_ID,
841            Some($label.to_string()),
842            Some($capacity),
843        )
844    }};
845
846    ($expr:expr, label = $label:expr, log = true, capacity = $capacity:expr) => {{
847        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
848        const _: usize = $capacity;
849        $crate::InstrumentLog::instrument_log(
850            $expr,
851            CHANNEL_ID,
852            Some($label.to_string()),
853            Some($capacity),
854        )
855    }};
856
857    ($expr:expr, capacity = $capacity:expr, label = $label:expr, log = true) => {{
858        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
859        const _: usize = $capacity;
860        $crate::InstrumentLog::instrument_log(
861            $expr,
862            CHANNEL_ID,
863            Some($label.to_string()),
864            Some($capacity),
865        )
866    }};
867
868    ($expr:expr, capacity = $capacity:expr, log = true, label = $label:expr) => {{
869        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
870        const _: usize = $capacity;
871        $crate::InstrumentLog::instrument_log(
872            $expr,
873            CHANNEL_ID,
874            Some($label.to_string()),
875            Some($capacity),
876        )
877    }};
878
879    ($expr:expr, log = true, label = $label:expr, capacity = $capacity:expr) => {{
880        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
881        const _: usize = $capacity;
882        $crate::InstrumentLog::instrument_log(
883            $expr,
884            CHANNEL_ID,
885            Some($label.to_string()),
886            Some($capacity),
887        )
888    }};
889
890    ($expr:expr, log = true, capacity = $capacity:expr, label = $label:expr) => {{
891        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
892        const _: usize = $capacity;
893        $crate::InstrumentLog::instrument_log(
894            $expr,
895            CHANNEL_ID,
896            Some($label.to_string()),
897            Some($capacity),
898        )
899    }};
900}
901
902/// Instrument a stream to track its item yields.
903///
904/// # Examples
905///
906/// ```rust,ignore
907/// use futures::stream::{self, StreamExt};
908/// use channels_console::stream;
909///
910/// #[tokio::main]
911/// async fn main() {
912///     // Create a stream
913///     let s = stream::iter(1..=10);
914///
915///     // Instrument it
916///     let s = stream!(s);
917///
918///     // Use it normally
919///     let _items: Vec<_> = s.collect().await;
920/// }
921/// ```
922///
923/// See the `stream!` macro documentation for full usage details.
924#[macro_export]
925macro_rules! stream {
926    ($expr:expr) => {{
927        const STREAM_ID: &'static str = concat!(file!(), ":", line!());
928        $crate::InstrumentStream::instrument_stream($expr, STREAM_ID, None)
929    }};
930
931    ($expr:expr, label = $label:expr) => {{
932        const STREAM_ID: &'static str = concat!(file!(), ":", line!());
933        $crate::InstrumentStream::instrument_stream($expr, STREAM_ID, Some($label.to_string()))
934    }};
935
936    ($expr:expr, log = true) => {{
937        const STREAM_ID: &'static str = concat!(file!(), ":", line!());
938        $crate::InstrumentStreamLog::instrument_stream_log($expr, STREAM_ID, None)
939    }};
940
941    ($expr:expr, label = $label:expr, log = true) => {{
942        const STREAM_ID: &'static str = concat!(file!(), ":", line!());
943        $crate::InstrumentStreamLog::instrument_stream_log(
944            $expr,
945            STREAM_ID,
946            Some($label.to_string()),
947        )
948    }};
949
950    ($expr:expr, log = true, label = $label:expr) => {{
951        const STREAM_ID: &'static str = concat!(file!(), ":", line!());
952        $crate::InstrumentStreamLog::instrument_stream_log(
953            $expr,
954            STREAM_ID,
955            Some($label.to_string()),
956        )
957    }};
958}
959
960fn get_all_channel_stats() -> HashMap<u64, ChannelStats> {
961    if let Some((_, stats_map)) = CHANNELS_STATE.get() {
962        stats_map.read().unwrap().clone()
963    } else {
964        HashMap::new()
965    }
966}
967
968fn get_all_stream_stats() -> HashMap<u64, StreamStats> {
969    if let Some((_, stats_map)) = STREAMS_STATE.get() {
970        stats_map.read().unwrap().clone()
971    } else {
972        HashMap::new()
973    }
974}
975
976/// Compare two channel stats for sorting.
977/// Custom labels come first (sorted alphabetically), then auto-generated labels (sorted by source and iter).
978fn compare_channel_stats(a: &ChannelStats, b: &ChannelStats) -> std::cmp::Ordering {
979    let a_has_label = a.label.is_some();
980    let b_has_label = b.label.is_some();
981
982    match (a_has_label, b_has_label) {
983        (true, false) => std::cmp::Ordering::Less,
984        (false, true) => std::cmp::Ordering::Greater,
985        (true, true) => a
986            .label
987            .as_ref()
988            .unwrap()
989            .cmp(b.label.as_ref().unwrap())
990            .then_with(|| a.iter.cmp(&b.iter)),
991        (false, false) => a.source.cmp(b.source).then_with(|| a.iter.cmp(&b.iter)),
992    }
993}
994
995/// Compare two stream stats for sorting.
996/// Custom labels come first (sorted alphabetically), then auto-generated labels (sorted by source and iter).
997fn compare_stream_stats(a: &StreamStats, b: &StreamStats) -> std::cmp::Ordering {
998    let a_has_label = a.label.is_some();
999    let b_has_label = b.label.is_some();
1000
1001    match (a_has_label, b_has_label) {
1002        (true, false) => std::cmp::Ordering::Less,
1003        (false, true) => std::cmp::Ordering::Greater,
1004        (true, true) => a
1005            .label
1006            .as_ref()
1007            .unwrap()
1008            .cmp(b.label.as_ref().unwrap())
1009            .then_with(|| a.iter.cmp(&b.iter)),
1010        (false, false) => a.source.cmp(b.source).then_with(|| a.iter.cmp(&b.iter)),
1011    }
1012}
1013
1014pub(crate) fn get_sorted_channel_stats() -> Vec<ChannelStats> {
1015    let mut stats: Vec<ChannelStats> = get_all_channel_stats().into_values().collect();
1016    stats.sort_by(compare_channel_stats);
1017    stats
1018}
1019
1020pub(crate) fn get_sorted_stream_stats() -> Vec<StreamStats> {
1021    let mut stats: Vec<StreamStats> = get_all_stream_stats().into_values().collect();
1022    stats.sort_by(compare_stream_stats);
1023    stats
1024}
1025
1026pub(crate) fn get_channels_json() -> ChannelsJson {
1027    let channels = get_sorted_channel_stats()
1028        .iter()
1029        .map(SerializableChannelStats::from)
1030        .collect();
1031
1032    let current_elapsed_ns = START_TIME
1033        .get()
1034        .expect("START_TIME must be initialized")
1035        .elapsed()
1036        .as_nanos() as u64;
1037
1038    ChannelsJson {
1039        current_elapsed_ns,
1040        channels,
1041    }
1042}
1043
1044pub(crate) fn get_streams_json() -> StreamsJson {
1045    let streams = get_sorted_stream_stats()
1046        .iter()
1047        .map(SerializableStreamStats::from)
1048        .collect();
1049
1050    let current_elapsed_ns = START_TIME
1051        .get()
1052        .expect("START_TIME must be initialized")
1053        .elapsed()
1054        .as_nanos() as u64;
1055
1056    StreamsJson {
1057        current_elapsed_ns,
1058        streams,
1059    }
1060}
1061
1062pub(crate) fn get_combined_json() -> CombinedJson {
1063    let channels = get_sorted_channel_stats()
1064        .iter()
1065        .map(SerializableChannelStats::from)
1066        .collect();
1067
1068    let streams = get_sorted_stream_stats()
1069        .iter()
1070        .map(SerializableStreamStats::from)
1071        .collect();
1072
1073    let current_elapsed_ns = START_TIME
1074        .get()
1075        .expect("START_TIME must be initialized")
1076        .elapsed()
1077        .as_nanos() as u64;
1078
1079    CombinedJson {
1080        current_elapsed_ns,
1081        channels,
1082        streams,
1083    }
1084}
1085
1086/// Serializable log response containing sent and received logs for channels.
1087#[derive(Debug, Clone, Serialize, Deserialize)]
1088pub struct ChannelLogs {
1089    pub id: String,
1090    pub sent_logs: Vec<LogEntry>,
1091    pub received_logs: Vec<LogEntry>,
1092}
1093
1094/// Serializable log response containing yielded logs for streams.
1095#[derive(Debug, Clone, Serialize, Deserialize)]
1096pub struct StreamLogs {
1097    pub id: String,
1098    pub logs: Vec<LogEntry>,
1099}
1100
1101pub(crate) fn get_channel_logs(channel_id: &str) -> Option<ChannelLogs> {
1102    let id = channel_id.parse::<u64>().ok()?;
1103    let stats = get_all_channel_stats();
1104    stats.get(&id).map(|channel_stats| {
1105        let mut sent_logs: Vec<LogEntry> = channel_stats.sent_logs.iter().cloned().collect();
1106        let mut received_logs: Vec<LogEntry> =
1107            channel_stats.received_logs.iter().cloned().collect();
1108
1109        // Sort by index descending (most recent first)
1110        sent_logs.sort_by(|a, b| b.index.cmp(&a.index));
1111        received_logs.sort_by(|a, b| b.index.cmp(&a.index));
1112
1113        ChannelLogs {
1114            id: channel_id.to_string(),
1115            sent_logs,
1116            received_logs,
1117        }
1118    })
1119}
1120
1121pub(crate) fn get_stream_logs(stream_id: &str) -> Option<StreamLogs> {
1122    let id = stream_id.parse::<u64>().ok()?;
1123    let stats = get_all_stream_stats();
1124    stats.get(&id).map(|stream_stats| {
1125        let mut yielded_logs: Vec<LogEntry> = stream_stats.logs.iter().cloned().collect();
1126
1127        // Sort by index descending (most recent first)
1128        yielded_logs.sort_by(|a, b| b.index.cmp(&a.index));
1129
1130        StreamLogs {
1131            id: stream_id.to_string(),
1132            logs: yielded_logs,
1133        }
1134    })
1135}