channels_console/
lib.rs

1use crossbeam_channel::{unbounded, Sender as CbSender};
2use serde::{Deserialize, Serialize};
3use std::collections::{HashMap, VecDeque};
4use std::sync::atomic::AtomicU64;
5use std::sync::{Arc, OnceLock, RwLock};
6use std::time::Instant;
7
8pub mod channels_guard;
9pub use channels_guard::{ChannelsGuard, ChannelsGuardBuilder};
10
11use crate::http_api::start_metrics_server;
12mod http_api;
13mod wrappers;
14
15/// A single log entry for a message sent or received.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct LogEntry {
18    pub index: u64,
19    pub timestamp: u64,
20    pub message: Option<String>,
21}
22
23impl LogEntry {
24    pub(crate) fn new(index: u64, timestamp: Instant, message: Option<String>) -> Self {
25        let start_time = START_TIME.get().copied().unwrap_or(timestamp);
26        let timestamp_nanos = timestamp.duration_since(start_time).as_nanos() as u64;
27        Self {
28            index,
29            timestamp: timestamp_nanos,
30            message,
31        }
32    }
33}
34
35/// Type of a channel.
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum ChannelType {
38    Bounded(usize),
39    Unbounded,
40    Oneshot,
41}
42
43impl std::fmt::Display for ChannelType {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        match self {
46            ChannelType::Bounded(size) => write!(f, "bounded[{}]", size),
47            ChannelType::Unbounded => write!(f, "unbounded"),
48            ChannelType::Oneshot => write!(f, "oneshot"),
49        }
50    }
51}
52
53impl Serialize for ChannelType {
54    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
55    where
56        S: serde::Serializer,
57    {
58        serializer.serialize_str(&self.to_string())
59    }
60}
61
62impl<'de> Deserialize<'de> for ChannelType {
63    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
64    where
65        D: serde::Deserializer<'de>,
66    {
67        let s = String::deserialize(deserializer)?;
68
69        match s.as_str() {
70            "unbounded" => Ok(ChannelType::Unbounded),
71            "oneshot" => Ok(ChannelType::Oneshot),
72            _ => {
73                // try: bounded[123]
74                if let Some(inner) = s.strip_prefix("bounded[").and_then(|x| x.strip_suffix(']')) {
75                    let size = inner
76                        .parse()
77                        .map_err(|_| serde::de::Error::custom("invalid bounded size"))?;
78                    Ok(ChannelType::Bounded(size))
79                } else {
80                    Err(serde::de::Error::custom("invalid channel type"))
81                }
82            }
83        }
84    }
85}
86
87/// Format of the output produced by ChannelsGuard on drop.
88#[derive(Clone, Copy, Debug, Default)]
89pub enum Format {
90    #[default]
91    Table,
92    Json,
93    JsonPretty,
94}
95
96/// State of a instrumented channel.
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
98pub enum ChannelState {
99    #[default]
100    Active,
101    Closed,
102    Full,
103    Notified,
104}
105
106impl std::fmt::Display for ChannelState {
107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108        write!(f, "{}", self.as_str())
109    }
110}
111
112impl ChannelState {
113    pub fn as_str(&self) -> &'static str {
114        match self {
115            ChannelState::Active => "active",
116            ChannelState::Closed => "closed",
117            ChannelState::Full => "full",
118            ChannelState::Notified => "notified",
119        }
120    }
121}
122
123impl Serialize for ChannelState {
124    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
125    where
126        S: serde::Serializer,
127    {
128        serializer.serialize_str(self.as_str())
129    }
130}
131
132impl<'de> Deserialize<'de> for ChannelState {
133    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
134    where
135        D: serde::Deserializer<'de>,
136    {
137        let s = String::deserialize(deserializer)?;
138        match s.as_str() {
139            "active" => Ok(ChannelState::Active),
140            "closed" => Ok(ChannelState::Closed),
141            "full" => Ok(ChannelState::Full),
142            "notified" => Ok(ChannelState::Notified),
143            _ => Err(serde::de::Error::custom("invalid channel state")),
144        }
145    }
146}
147
148/// Statistics for a single instrumented channel.
149#[derive(Debug, Clone)]
150pub(crate) struct ChannelStats {
151    pub(crate) id: u64,
152    pub(crate) source: &'static str,
153    pub(crate) label: Option<String>,
154    pub(crate) channel_type: ChannelType,
155    pub(crate) state: ChannelState,
156    pub(crate) sent_count: u64,
157    pub(crate) received_count: u64,
158    pub(crate) type_name: &'static str,
159    pub(crate) type_size: usize,
160    pub(crate) sent_logs: VecDeque<LogEntry>,
161    pub(crate) received_logs: VecDeque<LogEntry>,
162    pub(crate) iter: u32,
163}
164
165impl ChannelStats {
166    pub fn queued(&self) -> u64 {
167        self.sent_count
168            .saturating_sub(self.received_count)
169            .saturating_sub(1)
170    }
171
172    pub fn queued_bytes(&self) -> u64 {
173        self.queued() * self.type_size as u64
174    }
175}
176
177/// Wrapper for metrics JSON response containing stats and current time
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct MetricsJson {
180    /// Current elapsed time since program start in nanoseconds
181    pub current_elapsed_ns: u64,
182    /// Channel statistics
183    pub stats: Vec<SerializableChannelStats>,
184}
185
186/// Serializable version of channel statistics for JSON responses.
187#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct SerializableChannelStats {
189    pub id: u64,
190    pub source: String,
191    pub label: String,
192    pub has_custom_label: bool,
193    pub channel_type: ChannelType,
194    pub state: ChannelState,
195    pub sent_count: u64,
196    pub received_count: u64,
197    pub queued: u64,
198    pub type_name: String,
199    pub type_size: usize,
200    pub queued_bytes: u64,
201    pub iter: u32,
202}
203
204impl From<&ChannelStats> for SerializableChannelStats {
205    fn from(stats: &ChannelStats) -> Self {
206        let label = resolve_label(stats.source, stats.label.as_deref(), stats.iter);
207
208        Self {
209            id: stats.id,
210            source: stats.source.to_string(),
211            label,
212            has_custom_label: stats.label.is_some(),
213            channel_type: stats.channel_type,
214            state: stats.state,
215            sent_count: stats.sent_count,
216            received_count: stats.received_count,
217            queued: stats.queued(),
218            type_name: stats.type_name.to_string(),
219            type_size: stats.type_size,
220            queued_bytes: stats.queued_bytes(),
221            iter: stats.iter,
222        }
223    }
224}
225
226impl ChannelStats {
227    fn new(
228        id: u64,
229        source: &'static str,
230        label: Option<String>,
231        channel_type: ChannelType,
232        type_name: &'static str,
233        type_size: usize,
234        iter: u32,
235    ) -> Self {
236        Self {
237            id,
238            source,
239            label,
240            channel_type,
241            state: ChannelState::default(),
242            sent_count: 0,
243            received_count: 0,
244            type_name,
245            type_size,
246            sent_logs: VecDeque::new(),
247            received_logs: VecDeque::new(),
248            iter,
249        }
250    }
251
252    fn update_state(&mut self) {
253        if self.state == ChannelState::Closed || self.state == ChannelState::Notified {
254            return;
255        }
256
257        let queued = self.queued();
258        let is_full = match self.channel_type {
259            ChannelType::Bounded(cap) => queued >= cap as u64,
260            ChannelType::Oneshot => queued >= 1,
261            ChannelType::Unbounded => false,
262        };
263
264        if is_full {
265            self.state = ChannelState::Full;
266        } else {
267            self.state = ChannelState::Active;
268        }
269    }
270}
271
272/// Events sent to the background statistics collection thread.
273#[derive(Debug)]
274pub(crate) enum StatsEvent {
275    Created {
276        id: u64,
277        source: &'static str,
278        display_label: Option<String>,
279        channel_type: ChannelType,
280        type_name: &'static str,
281        type_size: usize,
282    },
283    MessageSent {
284        id: u64,
285        log: Option<String>,
286        timestamp: Instant,
287    },
288    MessageReceived {
289        id: u64,
290        timestamp: Instant,
291    },
292    Closed {
293        id: u64,
294    },
295    #[allow(dead_code)]
296    Notified {
297        id: u64,
298    },
299}
300
301type StatsState = (
302    CbSender<StatsEvent>,
303    Arc<RwLock<HashMap<u64, ChannelStats>>>,
304);
305
306/// Global state for statistics collection.
307static STATS_STATE: OnceLock<StatsState> = OnceLock::new();
308
309static START_TIME: OnceLock<Instant> = OnceLock::new();
310
311/// Global counter for assigning unique IDs to channels.
312pub(crate) static CHANNEL_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
313
314const DEFAULT_LOG_LIMIT: usize = 50;
315
316fn get_log_limit() -> usize {
317    std::env::var("CHANNELS_CONSOLE_LOG_LIMIT")
318        .ok()
319        .and_then(|s| s.parse().ok())
320        .unwrap_or(DEFAULT_LOG_LIMIT)
321}
322
323/// Initialize the statistics collection system (called on first instrumented channel).
324/// Returns a reference to the global state.
325fn init_stats_state() -> &'static StatsState {
326    STATS_STATE.get_or_init(|| {
327        START_TIME.get_or_init(Instant::now);
328
329        let (tx, rx) = unbounded::<StatsEvent>();
330        let stats_map = Arc::new(RwLock::new(HashMap::<u64, ChannelStats>::new()));
331        let stats_map_clone = Arc::clone(&stats_map);
332
333        std::thread::Builder::new()
334            .name("channel-stats-collector".into())
335            .spawn(move || {
336                while let Ok(event) = rx.recv() {
337                    let mut stats = stats_map_clone.write().unwrap();
338                    match event {
339                        StatsEvent::Created {
340                            id,
341                            source,
342                            display_label,
343                            channel_type,
344                            type_name,
345                            type_size,
346                        } => {
347                            // Count existing channels with the same source location
348                            let iter =
349                                stats.values().filter(|cs| cs.source == source).count() as u32;
350
351                            stats.insert(
352                                id,
353                                ChannelStats::new(
354                                    id,
355                                    source,
356                                    display_label,
357                                    channel_type,
358                                    type_name,
359                                    type_size,
360                                    iter,
361                                ),
362                            );
363                        }
364                        StatsEvent::MessageSent { id, log, timestamp } => {
365                            if let Some(channel_stats) = stats.get_mut(&id) {
366                                channel_stats.sent_count += 1;
367                                channel_stats.update_state();
368
369                                let limit = get_log_limit();
370                                if channel_stats.sent_logs.len() >= limit {
371                                    channel_stats.sent_logs.pop_front();
372                                }
373                                channel_stats.sent_logs.push_back(LogEntry::new(
374                                    channel_stats.sent_count,
375                                    timestamp,
376                                    log,
377                                ));
378                            }
379                        }
380                        StatsEvent::MessageReceived { id, timestamp } => {
381                            if let Some(channel_stats) = stats.get_mut(&id) {
382                                channel_stats.received_count += 1;
383                                channel_stats.update_state();
384
385                                let limit = get_log_limit();
386                                if channel_stats.received_logs.len() >= limit {
387                                    channel_stats.received_logs.pop_front();
388                                }
389                                channel_stats.received_logs.push_back(LogEntry::new(
390                                    channel_stats.received_count,
391                                    timestamp,
392                                    None,
393                                ));
394                            }
395                        }
396                        StatsEvent::Closed { id } => {
397                            if let Some(channel_stats) = stats.get_mut(&id) {
398                                channel_stats.state = ChannelState::Closed;
399                            }
400                        }
401                        StatsEvent::Notified { id } => {
402                            if let Some(channel_stats) = stats.get_mut(&id) {
403                                channel_stats.state = ChannelState::Notified;
404                            }
405                        }
406                    }
407                }
408            })
409            .expect("Failed to spawn channel-stats-collector thread");
410
411        // Spawn the metrics HTTP server in the background
412        // Check environment variable for custom port, default to 6770
413        let port = std::env::var("CHANNELS_CONSOLE_METRICS_PORT")
414            .ok()
415            .and_then(|p| p.parse::<u16>().ok())
416            .unwrap_or(6770);
417        let addr = format!("127.0.0.1:{}", port);
418
419        std::thread::spawn(move || {
420            start_metrics_server(&addr);
421        });
422
423        (tx, stats_map)
424    })
425}
426
427fn resolve_label(id: &'static str, provided: Option<&str>, iter: u32) -> String {
428    let base_label = if let Some(l) = provided {
429        l.to_string()
430    } else if let Some(pos) = id.rfind(':') {
431        let (path, line_part) = id.split_at(pos);
432        let line = &line_part[1..];
433        format!("{}:{}", extract_filename(path), line)
434    } else {
435        extract_filename(id)
436    };
437
438    if iter > 0 {
439        format!("{}-{}", base_label, iter + 1)
440    } else {
441        base_label
442    }
443}
444
445fn extract_filename(path: &str) -> String {
446    let components: Vec<&str> = path.split('/').collect();
447    if components.len() >= 2 {
448        format!(
449            "{}/{}",
450            components[components.len() - 2],
451            components[components.len() - 1]
452        )
453    } else {
454        path.to_string()
455    }
456}
457
458/// Format bytes into human-readable units (B, KB, MB, GB, TB).
459pub fn format_bytes(bytes: u64) -> String {
460    if bytes == 0 {
461        return "0 B".to_string();
462    }
463
464    const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
465    let mut size = bytes as f64;
466    let mut unit_idx = 0;
467
468    while size >= 1024.0 && unit_idx < UNITS.len() - 1 {
469        size /= 1024.0;
470        unit_idx += 1;
471    }
472
473    if unit_idx == 0 {
474        format!("{} {}", bytes, UNITS[unit_idx])
475    } else {
476        format!("{:.1} {}", size, UNITS[unit_idx])
477    }
478}
479
480/// Trait for instrumenting channels.
481///
482/// This trait is not intended for direct use. Use the `instrument!` macro instead.
483#[doc(hidden)]
484pub trait Instrument {
485    type Output;
486    fn instrument(
487        self,
488        source: &'static str,
489        label: Option<String>,
490        capacity: Option<usize>,
491    ) -> Self::Output;
492}
493
494/// Trait for instrumenting channels with message logging.
495///
496/// This trait is not intended for direct use. Use the `instrument!` macro with `log = true` instead.
497#[doc(hidden)]
498pub trait InstrumentLog {
499    type Output;
500    fn instrument_log(
501        self,
502        source: &'static str,
503        label: Option<String>,
504        capacity: Option<usize>,
505    ) -> Self::Output;
506}
507
508cfg_if::cfg_if! {
509    if #[cfg(any(feature = "tokio", feature = "futures"))] {
510        use std::sync::LazyLock;
511        pub static RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
512            tokio::runtime::Builder::new_multi_thread()
513                .enable_time()
514                .build()
515                .unwrap()
516        });
517    }
518}
519
520/// Instrument a channel creation to wrap it with debugging proxies.
521/// Currently only supports bounded, unbounded and oneshot channels.
522///
523/// # Examples
524///
525/// ```
526/// use tokio::sync::mpsc;
527/// use channels_console::instrument;
528///
529/// #[tokio::main]
530/// async fn main() {
531///
532///    // Create channels normally
533///    let (tx, rx) = mpsc::channel::<String>(100);
534///
535///    // Instrument them only when the feature is enabled
536///    #[cfg(feature = "channels-console")]
537///    let (tx, rx) = channels_console::instrument!((tx, rx));
538///
539///    // The channel works exactly the same way
540///    tx.send("Hello".to_string()).await.unwrap();
541/// }
542/// ```
543///
544/// By default, channels are labeled with their file location and line number (e.g., `src/worker.rs:25`). You can provide custom labels for easier identification:
545///
546/// ```rust,no_run
547/// use tokio::sync::mpsc;
548/// use channels_console::instrument;
549/// let (tx, rx) = mpsc::channel::<String>(10);
550/// #[cfg(feature = "channels-console")]
551/// let (tx, rx) = channels_console::instrument!((tx, rx), label = "task-queue");
552/// ```
553///
554/// ## Capacity Parameter
555///
556/// **For `std::sync::mpsc` and `futures::channel::mpsc` bounded channels**, you **must** specify the `capacity` parameter
557/// because their APIs don't expose the capacity after creation:
558///
559/// ```rust,no_run
560/// use std::sync::mpsc;
561/// use channels_console::instrument;
562///
563/// // std::sync::mpsc::sync_channel - MUST specify capacity
564/// let (tx, rx) = mpsc::sync_channel::<String>(10);
565/// let (tx, rx) = instrument!((tx, rx), capacity = 10);
566///
567/// // With label
568/// let (tx, rx) = mpsc::sync_channel::<String>(10);
569/// let (tx, rx) = instrument!((tx, rx), label = "my-channel", capacity = 10);
570/// ```
571///
572/// Tokio channels don't require this because their capacity is accessible from the channel handles.
573///
574/// ## Message Logging
575///
576/// By default, instrumentation only tracks message timestamps. To capture the actual content of messages for debugging,
577/// enable logging with the `log = true` parameter (the message type must implement `std::fmt::Debug`):
578///
579/// ```rust,no_run
580/// use tokio::sync::mpsc;
581/// use channels_console::instrument;
582///
583/// // Enable message logging (requires Debug trait on the message type)
584/// let (tx, rx) = mpsc::channel::<String>(10);
585/// #[cfg(feature = "channels-console")]
586/// let (tx, rx) = channels_console::instrument!((tx, rx), log = true);
587///
588///
589#[macro_export]
590macro_rules! instrument {
591    ($expr:expr) => {{
592        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
593        $crate::Instrument::instrument($expr, CHANNEL_ID, None, None)
594    }};
595
596    ($expr:expr, label = $label:expr) => {{
597        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
598        $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label.to_string()), None)
599    }};
600
601    ($expr:expr, capacity = $capacity:expr) => {{
602        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
603        const _: usize = $capacity;
604        $crate::Instrument::instrument($expr, CHANNEL_ID, None, Some($capacity))
605    }};
606
607    ($expr:expr, label = $label:expr, capacity = $capacity:expr) => {{
608        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
609        const _: usize = $capacity;
610        $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label.to_string()), Some($capacity))
611    }};
612
613    ($expr:expr, capacity = $capacity:expr, label = $label:expr) => {{
614        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
615        const _: usize = $capacity;
616        $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label.to_string()), Some($capacity))
617    }};
618
619    // Variants with log = true
620    ($expr:expr, log = true) => {{
621        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
622        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, None)
623    }};
624
625    ($expr:expr, label = $label:expr, log = true) => {{
626        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
627        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label.to_string()), None)
628    }};
629
630    ($expr:expr, log = true, label = $label:expr) => {{
631        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
632        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label.to_string()), None)
633    }};
634
635    ($expr:expr, capacity = $capacity:expr, log = true) => {{
636        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
637        const _: usize = $capacity;
638        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, Some($capacity))
639    }};
640
641    ($expr:expr, log = true, capacity = $capacity:expr) => {{
642        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
643        const _: usize = $capacity;
644        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, Some($capacity))
645    }};
646
647    ($expr:expr, label = $label:expr, capacity = $capacity:expr, log = true) => {{
648        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
649        const _: usize = $capacity;
650        $crate::InstrumentLog::instrument_log(
651            $expr,
652            CHANNEL_ID,
653            Some($label.to_string()),
654            Some($capacity),
655        )
656    }};
657
658    ($expr:expr, label = $label:expr, log = true, capacity = $capacity:expr) => {{
659        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
660        const _: usize = $capacity;
661        $crate::InstrumentLog::instrument_log(
662            $expr,
663            CHANNEL_ID,
664            Some($label.to_string()),
665            Some($capacity),
666        )
667    }};
668
669    ($expr:expr, capacity = $capacity:expr, label = $label:expr, log = true) => {{
670        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
671        const _: usize = $capacity;
672        $crate::InstrumentLog::instrument_log(
673            $expr,
674            CHANNEL_ID,
675            Some($label.to_string()),
676            Some($capacity),
677        )
678    }};
679
680    ($expr:expr, capacity = $capacity:expr, log = true, label = $label:expr) => {{
681        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
682        const _: usize = $capacity;
683        $crate::InstrumentLog::instrument_log(
684            $expr,
685            CHANNEL_ID,
686            Some($label.to_string()),
687            Some($capacity),
688        )
689    }};
690
691    ($expr:expr, log = true, label = $label:expr, capacity = $capacity:expr) => {{
692        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
693        const _: usize = $capacity;
694        $crate::InstrumentLog::instrument_log(
695            $expr,
696            CHANNEL_ID,
697            Some($label.to_string()),
698            Some($capacity),
699        )
700    }};
701
702    ($expr:expr, log = true, capacity = $capacity:expr, label = $label:expr) => {{
703        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
704        const _: usize = $capacity;
705        $crate::InstrumentLog::instrument_log(
706            $expr,
707            CHANNEL_ID,
708            Some($label.to_string()),
709            Some($capacity),
710        )
711    }};
712}
713
714fn get_channel_stats() -> HashMap<u64, ChannelStats> {
715    if let Some((_, stats_map)) = STATS_STATE.get() {
716        stats_map.read().unwrap().clone()
717    } else {
718        HashMap::new()
719    }
720}
721
722/// Compare two ChannelStats for sorting.
723/// Custom labels come first (sorted alphabetically), then auto-generated labels (sorted by source and iter).
724fn compare_channel_stats(a: &ChannelStats, b: &ChannelStats) -> std::cmp::Ordering {
725    match (a.label.is_some(), b.label.is_some()) {
726        (true, false) => std::cmp::Ordering::Less,
727        (false, true) => std::cmp::Ordering::Greater,
728        (true, true) => a
729            .label
730            .as_ref()
731            .unwrap()
732            .cmp(b.label.as_ref().unwrap())
733            .then_with(|| a.iter.cmp(&b.iter)),
734        (false, false) => a.source.cmp(b.source).then_with(|| a.iter.cmp(&b.iter)),
735    }
736}
737
738pub(crate) fn get_sorted_channel_stats() -> Vec<ChannelStats> {
739    let mut stats: Vec<ChannelStats> = get_channel_stats().into_values().collect();
740    stats.sort_by(compare_channel_stats);
741    stats
742}
743
744fn get_metrics_json() -> MetricsJson {
745    let stats = get_sorted_channel_stats()
746        .iter()
747        .map(SerializableChannelStats::from)
748        .collect();
749
750    let current_elapsed_ns = START_TIME
751        .get()
752        .expect("START_TIME must be initialized")
753        .elapsed()
754        .as_nanos() as u64;
755
756    MetricsJson {
757        current_elapsed_ns,
758        stats,
759    }
760}
761
762/// Serializable log response containing sent and received logs.
763#[derive(Debug, Clone, Serialize, Deserialize)]
764pub struct ChannelLogs {
765    pub id: String,
766    pub sent_logs: Vec<LogEntry>,
767    pub received_logs: Vec<LogEntry>,
768}
769
770pub(crate) fn get_channel_logs(channel_id: &str) -> Option<ChannelLogs> {
771    let id = channel_id.parse::<u64>().ok()?;
772    let stats = get_channel_stats();
773    stats.get(&id).map(|channel_stats| {
774        let mut sent_logs: Vec<LogEntry> = channel_stats.sent_logs.iter().cloned().collect();
775
776        let mut received_logs: Vec<LogEntry> =
777            channel_stats.received_logs.iter().cloned().collect();
778
779        // Sort by index descending (most recent first)
780        sent_logs.sort_by(|a, b| b.index.cmp(&a.index));
781        received_logs.sort_by(|a, b| b.index.cmp(&a.index));
782
783        ChannelLogs {
784            id: channel_id.to_string(),
785            sent_logs,
786            received_logs,
787        }
788    })
789}