channels_console/
lib.rs

1use crossbeam_channel::{unbounded, Sender as CbSender};
2use serde::{Deserialize, Serialize};
3use std::collections::{HashMap, VecDeque};
4use std::sync::atomic::AtomicU64;
5use std::sync::{Arc, OnceLock, RwLock};
6use std::time::Instant;
7
8pub mod channels_guard;
9pub use channels_guard::{ChannelsGuard, ChannelsGuardBuilder};
10
11use crate::http_api::start_metrics_server;
12mod http_api;
13mod wrappers;
14
15/// A single log entry for a message sent or received.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct LogEntry {
18    pub index: u64,
19    pub timestamp: u64,
20    pub message: Option<String>,
21}
22
23impl LogEntry {
24    pub(crate) fn new(index: u64, timestamp: Instant, message: Option<String>) -> Self {
25        let start_time = START_TIME.get().copied().unwrap_or(timestamp);
26        let timestamp_nanos = timestamp.duration_since(start_time).as_nanos() as u64;
27        Self {
28            index,
29            timestamp: timestamp_nanos,
30            message,
31        }
32    }
33}
34
35/// Type of a channel.
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum ChannelType {
38    Bounded(usize),
39    Unbounded,
40    Oneshot,
41}
42
43impl std::fmt::Display for ChannelType {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        match self {
46            ChannelType::Bounded(size) => write!(f, "bounded[{}]", size),
47            ChannelType::Unbounded => write!(f, "unbounded"),
48            ChannelType::Oneshot => write!(f, "oneshot"),
49        }
50    }
51}
52
53impl Serialize for ChannelType {
54    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
55    where
56        S: serde::Serializer,
57    {
58        serializer.serialize_str(&self.to_string())
59    }
60}
61
62impl<'de> Deserialize<'de> for ChannelType {
63    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
64    where
65        D: serde::Deserializer<'de>,
66    {
67        let s = String::deserialize(deserializer)?;
68
69        match s.as_str() {
70            "unbounded" => Ok(ChannelType::Unbounded),
71            "oneshot" => Ok(ChannelType::Oneshot),
72            _ => {
73                // try: bounded[123]
74                if let Some(inner) = s.strip_prefix("bounded[").and_then(|x| x.strip_suffix(']')) {
75                    let size = inner
76                        .parse()
77                        .map_err(|_| serde::de::Error::custom("invalid bounded size"))?;
78                    Ok(ChannelType::Bounded(size))
79                } else {
80                    Err(serde::de::Error::custom("invalid channel type"))
81                }
82            }
83        }
84    }
85}
86
87/// Format of the output produced by ChannelsGuard on drop.
88#[derive(Clone, Copy, Debug, Default)]
89pub enum Format {
90    #[default]
91    Table,
92    Json,
93    JsonPretty,
94}
95
96/// State of a instrumented channel.
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
98pub enum ChannelState {
99    #[default]
100    Active,
101    Closed,
102    Full,
103    Notified,
104}
105
106impl std::fmt::Display for ChannelState {
107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108        write!(f, "{}", self.as_str())
109    }
110}
111
112impl ChannelState {
113    pub fn as_str(&self) -> &'static str {
114        match self {
115            ChannelState::Active => "active",
116            ChannelState::Closed => "closed",
117            ChannelState::Full => "full",
118            ChannelState::Notified => "notified",
119        }
120    }
121}
122
123impl Serialize for ChannelState {
124    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
125    where
126        S: serde::Serializer,
127    {
128        serializer.serialize_str(self.as_str())
129    }
130}
131
132impl<'de> Deserialize<'de> for ChannelState {
133    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
134    where
135        D: serde::Deserializer<'de>,
136    {
137        let s = String::deserialize(deserializer)?;
138        match s.as_str() {
139            "active" => Ok(ChannelState::Active),
140            "closed" => Ok(ChannelState::Closed),
141            "full" => Ok(ChannelState::Full),
142            "notified" => Ok(ChannelState::Notified),
143            _ => Err(serde::de::Error::custom("invalid channel state")),
144        }
145    }
146}
147
148/// Statistics for a single instrumented channel.
149#[derive(Debug, Clone)]
150pub(crate) struct ChannelStats {
151    pub(crate) id: u64,
152    pub(crate) source: &'static str,
153    pub(crate) label: Option<&'static str>,
154    pub(crate) channel_type: ChannelType,
155    pub(crate) state: ChannelState,
156    pub(crate) sent_count: u64,
157    pub(crate) received_count: u64,
158    pub(crate) type_name: &'static str,
159    pub(crate) type_size: usize,
160    pub(crate) sent_logs: VecDeque<LogEntry>,
161    pub(crate) received_logs: VecDeque<LogEntry>,
162    pub(crate) iter: u32,
163}
164
165impl ChannelStats {
166    pub fn queued(&self) -> u64 {
167        self.sent_count
168            .saturating_sub(self.received_count)
169            .saturating_sub(1)
170    }
171
172    pub fn queued_bytes(&self) -> u64 {
173        self.queued() * self.type_size as u64
174    }
175}
176
177/// Serializable version of channel statistics for JSON responses.
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct SerializableChannelStats {
180    pub id: u64,
181    pub source: String,
182    pub label: String,
183    pub has_custom_label: bool,
184    pub channel_type: ChannelType,
185    pub state: ChannelState,
186    pub sent_count: u64,
187    pub received_count: u64,
188    pub queued: u64,
189    pub type_name: String,
190    pub type_size: usize,
191    pub queued_bytes: u64,
192    pub iter: u32,
193}
194
195impl From<&ChannelStats> for SerializableChannelStats {
196    fn from(stats: &ChannelStats) -> Self {
197        let label = resolve_label(stats.source, stats.label, stats.iter);
198        Self {
199            id: stats.id,
200            source: stats.source.to_string(),
201            label,
202            has_custom_label: stats.label.is_some(),
203            channel_type: stats.channel_type,
204            state: stats.state,
205            sent_count: stats.sent_count,
206            received_count: stats.received_count,
207            queued: stats.queued(),
208            type_name: stats.type_name.to_string(),
209            type_size: stats.type_size,
210            queued_bytes: stats.queued_bytes(),
211            iter: stats.iter,
212        }
213    }
214}
215
216impl ChannelStats {
217    fn new(
218        id: u64,
219        source: &'static str,
220        label: Option<&'static str>,
221        channel_type: ChannelType,
222        type_name: &'static str,
223        type_size: usize,
224        iter: u32,
225    ) -> Self {
226        Self {
227            id,
228            source,
229            label,
230            channel_type,
231            state: ChannelState::default(),
232            sent_count: 0,
233            received_count: 0,
234            type_name,
235            type_size,
236            sent_logs: VecDeque::new(),
237            received_logs: VecDeque::new(),
238            iter,
239        }
240    }
241
242    fn update_state(&mut self) {
243        if self.state == ChannelState::Closed || self.state == ChannelState::Notified {
244            return;
245        }
246
247        let queued = self.queued();
248        let is_full = match self.channel_type {
249            ChannelType::Bounded(cap) => queued >= cap as u64,
250            ChannelType::Oneshot => queued >= 1,
251            ChannelType::Unbounded => false,
252        };
253
254        if is_full {
255            self.state = ChannelState::Full;
256        } else {
257            self.state = ChannelState::Active;
258        }
259    }
260}
261
262/// Events sent to the background statistics collection thread.
263#[derive(Debug)]
264pub(crate) enum StatsEvent {
265    Created {
266        id: u64,
267        source: &'static str,
268        display_label: Option<&'static str>,
269        channel_type: ChannelType,
270        type_name: &'static str,
271        type_size: usize,
272    },
273    MessageSent {
274        id: u64,
275        log: Option<String>,
276        timestamp: Instant,
277    },
278    MessageReceived {
279        id: u64,
280        timestamp: Instant,
281    },
282    Closed {
283        id: u64,
284    },
285    #[allow(dead_code)]
286    Notified {
287        id: u64,
288    },
289}
290
291type StatsState = (
292    CbSender<StatsEvent>,
293    Arc<RwLock<HashMap<u64, ChannelStats>>>,
294);
295
296/// Global state for statistics collection.
297static STATS_STATE: OnceLock<StatsState> = OnceLock::new();
298
299static START_TIME: OnceLock<Instant> = OnceLock::new();
300
301/// Global counter for assigning unique IDs to channels.
302pub(crate) static CHANNEL_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
303
304const DEFAULT_LOG_LIMIT: usize = 50;
305
306fn get_log_limit() -> usize {
307    std::env::var("CHANNELS_CONSOLE_LOG_LIMIT")
308        .ok()
309        .and_then(|s| s.parse().ok())
310        .unwrap_or(DEFAULT_LOG_LIMIT)
311}
312
313/// Initialize the statistics collection system (called on first instrumented channel).
314/// Returns a reference to the global state.
315fn init_stats_state() -> &'static StatsState {
316    STATS_STATE.get_or_init(|| {
317        START_TIME.get_or_init(Instant::now);
318
319        let (tx, rx) = unbounded::<StatsEvent>();
320        let stats_map = Arc::new(RwLock::new(HashMap::<u64, ChannelStats>::new()));
321        let stats_map_clone = Arc::clone(&stats_map);
322
323        std::thread::Builder::new()
324            .name("channel-stats-collector".into())
325            .spawn(move || {
326                while let Ok(event) = rx.recv() {
327                    let mut stats = stats_map_clone.write().unwrap();
328                    match event {
329                        StatsEvent::Created {
330                            id,
331                            source,
332                            display_label,
333                            channel_type,
334                            type_name,
335                            type_size,
336                        } => {
337                            // Count existing channels with the same source location
338                            let iter =
339                                stats.values().filter(|cs| cs.source == source).count() as u32;
340
341                            stats.insert(
342                                id,
343                                ChannelStats::new(
344                                    id,
345                                    source,
346                                    display_label,
347                                    channel_type,
348                                    type_name,
349                                    type_size,
350                                    iter,
351                                ),
352                            );
353                        }
354                        StatsEvent::MessageSent { id, log, timestamp } => {
355                            if let Some(channel_stats) = stats.get_mut(&id) {
356                                channel_stats.sent_count += 1;
357                                channel_stats.update_state();
358
359                                let limit = get_log_limit();
360                                if channel_stats.sent_logs.len() >= limit {
361                                    channel_stats.sent_logs.pop_front();
362                                }
363                                channel_stats.sent_logs.push_back(LogEntry::new(
364                                    channel_stats.sent_count,
365                                    timestamp,
366                                    log,
367                                ));
368                            }
369                        }
370                        StatsEvent::MessageReceived { id, timestamp } => {
371                            if let Some(channel_stats) = stats.get_mut(&id) {
372                                channel_stats.received_count += 1;
373                                channel_stats.update_state();
374
375                                let limit = get_log_limit();
376                                if channel_stats.received_logs.len() >= limit {
377                                    channel_stats.received_logs.pop_front();
378                                }
379                                channel_stats.received_logs.push_back(LogEntry::new(
380                                    channel_stats.received_count,
381                                    timestamp,
382                                    None,
383                                ));
384                            }
385                        }
386                        StatsEvent::Closed { id } => {
387                            if let Some(channel_stats) = stats.get_mut(&id) {
388                                channel_stats.state = ChannelState::Closed;
389                            }
390                        }
391                        StatsEvent::Notified { id } => {
392                            if let Some(channel_stats) = stats.get_mut(&id) {
393                                channel_stats.state = ChannelState::Notified;
394                            }
395                        }
396                    }
397                }
398            })
399            .expect("Failed to spawn channel-stats-collector thread");
400
401        // Spawn the metrics HTTP server in the background
402        // Check environment variable for custom port, default to 6770
403        let port = std::env::var("CHANNELS_CONSOLE_METRICS_PORT")
404            .ok()
405            .and_then(|p| p.parse::<u16>().ok())
406            .unwrap_or(6770);
407        let addr = format!("127.0.0.1:{}", port);
408
409        std::thread::spawn(move || {
410            start_metrics_server(&addr);
411        });
412
413        (tx, stats_map)
414    })
415}
416
417fn resolve_label(id: &'static str, provided: Option<&'static str>, iter: u32) -> String {
418    let base_label = if let Some(l) = provided {
419        l.to_string()
420    } else if let Some(pos) = id.rfind(':') {
421        let (path, line_part) = id.split_at(pos);
422        let line = &line_part[1..];
423        format!("{}:{}", extract_filename(path), line)
424    } else {
425        extract_filename(id)
426    };
427
428    if iter > 0 {
429        format!("{}-{}", base_label, iter + 1)
430    } else {
431        base_label
432    }
433}
434
435fn extract_filename(path: &str) -> String {
436    let components: Vec<&str> = path.split('/').collect();
437    if components.len() >= 2 {
438        format!(
439            "{}/{}",
440            components[components.len() - 2],
441            components[components.len() - 1]
442        )
443    } else {
444        path.to_string()
445    }
446}
447
448/// Format bytes into human-readable units (B, KB, MB, GB, TB).
449pub fn format_bytes(bytes: u64) -> String {
450    if bytes == 0 {
451        return "0 B".to_string();
452    }
453
454    const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
455    let mut size = bytes as f64;
456    let mut unit_idx = 0;
457
458    while size >= 1024.0 && unit_idx < UNITS.len() - 1 {
459        size /= 1024.0;
460        unit_idx += 1;
461    }
462
463    if unit_idx == 0 {
464        format!("{} {}", bytes, UNITS[unit_idx])
465    } else {
466        format!("{:.1} {}", size, UNITS[unit_idx])
467    }
468}
469
470/// Trait for instrumenting channels.
471///
472/// This trait is not intended for direct use. Use the `instrument!` macro instead.
473#[doc(hidden)]
474pub trait Instrument {
475    type Output;
476    fn instrument(
477        self,
478        source: &'static str,
479        label: Option<&'static str>,
480        capacity: Option<usize>,
481    ) -> Self::Output;
482}
483
484/// Trait for instrumenting channels with message logging.
485///
486/// This trait is not intended for direct use. Use the `instrument!` macro with `log = true` instead.
487#[doc(hidden)]
488pub trait InstrumentLog {
489    type Output;
490    fn instrument_log(
491        self,
492        source: &'static str,
493        label: Option<&'static str>,
494        capacity: Option<usize>,
495    ) -> Self::Output;
496}
497
498cfg_if::cfg_if! {
499    if #[cfg(any(feature = "tokio", feature = "futures"))] {
500        use std::sync::LazyLock;
501        pub static RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
502            tokio::runtime::Builder::new_multi_thread()
503                .enable_time()
504                .build()
505                .unwrap()
506        });
507    }
508}
509
510/// Instrument a channel creation to wrap it with debugging proxies.
511/// Currently only supports bounded, unbounded and oneshot channels.
512///
513/// # Examples
514///
515/// ```
516/// use tokio::sync::mpsc;
517/// use channels_console::instrument;
518///
519/// #[tokio::main]
520/// async fn main() {
521///
522///    // Create channels normally
523///    let (tx, rx) = mpsc::channel::<String>(100);
524///
525///    // Instrument them only when the feature is enabled
526///    #[cfg(feature = "channels-console")]
527///    let (tx, rx) = channels_console::instrument!((tx, rx));
528///
529///    // The channel works exactly the same way
530///    tx.send("Hello".to_string()).await.unwrap();
531/// }
532/// ```
533///
534/// By default, channels are labeled with their file location and line number (e.g., `src/worker.rs:25`). You can provide custom labels for easier identification:
535///
536/// ```rust,no_run
537/// use tokio::sync::mpsc;
538/// use channels_console::instrument;
539/// let (tx, rx) = mpsc::channel::<String>(10);
540/// #[cfg(feature = "channels-console")]
541/// let (tx, rx) = channels_console::instrument!((tx, rx), label = "task-queue");
542/// ```
543///
544/// # Important: Capacity Parameter
545///
546/// **For `std::sync::mpsc` and `futures::channel::mpsc` bounded channels**, you **must** specify the `capacity` parameter
547/// because their APIs don't expose the capacity after creation:
548///
549/// ```rust,no_run
550/// use std::sync::mpsc;
551/// use channels_console::instrument;
552///
553/// // std::sync::mpsc::sync_channel - MUST specify capacity
554/// let (tx, rx) = mpsc::sync_channel::<String>(10);
555/// let (tx, rx) = instrument!((tx, rx), capacity = 10);
556///
557/// // With label
558/// let (tx, rx) = mpsc::sync_channel::<String>(10);
559/// let (tx, rx) = instrument!((tx, rx), label = "my-channel", capacity = 10);
560/// ```
561///
562/// Tokio channels don't require this because their capacity is accessible from the channel handles.
563///
564/// **Message Logging:**
565///
566/// By default, instrumentation only tracks message timestamps. To capture the actual content of messages for debugging,
567/// enable logging with the `log = true` parameter (the message type must implement `std::fmt::Debug`):
568///
569/// ```rust,no_run
570/// use tokio::sync::mpsc;
571/// use channels_console::instrument;
572///
573/// // Enable message logging (requires Debug trait on the message type)
574/// let (tx, rx) = mpsc::channel::<String>(10);
575/// #[cfg(feature = "channels-console")]
576/// let (tx, rx) = channels_console::instrument!((tx, rx), log = true);
577///
578///
579#[macro_export]
580macro_rules! instrument {
581    ($expr:expr) => {{
582        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
583        $crate::Instrument::instrument($expr, CHANNEL_ID, None, None)
584    }};
585
586    ($expr:expr, label = $label:literal) => {{
587        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
588        $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label), None)
589    }};
590
591    ($expr:expr, capacity = $capacity:expr) => {{
592        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
593        const _: usize = $capacity;
594        $crate::Instrument::instrument($expr, CHANNEL_ID, None, Some($capacity))
595    }};
596
597    ($expr:expr, label = $label:literal, capacity = $capacity:expr) => {{
598        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
599        const _: usize = $capacity;
600        $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label), Some($capacity))
601    }};
602
603    ($expr:expr, capacity = $capacity:expr, label = $label:literal) => {{
604        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
605        const _: usize = $capacity;
606        $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label), Some($capacity))
607    }};
608
609    // Variants with log = true
610    ($expr:expr, log = true) => {{
611        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
612        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, None)
613    }};
614
615    ($expr:expr, label = $label:literal, log = true) => {{
616        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
617        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), None)
618    }};
619
620    ($expr:expr, log = true, label = $label:literal) => {{
621        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
622        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), None)
623    }};
624
625    ($expr:expr, capacity = $capacity:expr, log = true) => {{
626        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
627        const _: usize = $capacity;
628        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, Some($capacity))
629    }};
630
631    ($expr:expr, log = true, capacity = $capacity:expr) => {{
632        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
633        const _: usize = $capacity;
634        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, Some($capacity))
635    }};
636
637    ($expr:expr, label = $label:literal, capacity = $capacity:expr, log = true) => {{
638        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
639        const _: usize = $capacity;
640        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
641    }};
642
643    ($expr:expr, label = $label:literal, log = true, capacity = $capacity:expr) => {{
644        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
645        const _: usize = $capacity;
646        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
647    }};
648
649    ($expr:expr, capacity = $capacity:expr, label = $label:literal, log = true) => {{
650        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
651        const _: usize = $capacity;
652        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
653    }};
654
655    ($expr:expr, capacity = $capacity:expr, log = true, label = $label:literal) => {{
656        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
657        const _: usize = $capacity;
658        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
659    }};
660
661    ($expr:expr, log = true, label = $label:literal, capacity = $capacity:expr) => {{
662        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
663        const _: usize = $capacity;
664        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
665    }};
666
667    ($expr:expr, log = true, capacity = $capacity:expr, label = $label:literal) => {{
668        const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
669        const _: usize = $capacity;
670        $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
671    }};
672}
673
674fn get_channel_stats() -> HashMap<u64, ChannelStats> {
675    if let Some((_, stats_map)) = STATS_STATE.get() {
676        stats_map.read().unwrap().clone()
677    } else {
678        HashMap::new()
679    }
680}
681
682/// Compare two ChannelStats for sorting.
683/// Custom labels come first (sorted alphabetically), then auto-generated labels (sorted by source and iter).
684fn compare_channel_stats(a: &ChannelStats, b: &ChannelStats) -> std::cmp::Ordering {
685    match (a.label.is_some(), b.label.is_some()) {
686        (true, false) => std::cmp::Ordering::Less,
687        (false, true) => std::cmp::Ordering::Greater,
688        (true, true) => a
689            .label
690            .unwrap()
691            .cmp(b.label.unwrap())
692            .then_with(|| a.iter.cmp(&b.iter)),
693        (false, false) => a.source.cmp(b.source).then_with(|| a.iter.cmp(&b.iter)),
694    }
695}
696
697pub(crate) fn get_sorted_channel_stats() -> Vec<ChannelStats> {
698    let mut stats: Vec<ChannelStats> = get_channel_stats().into_values().collect();
699    stats.sort_by(compare_channel_stats);
700    stats
701}
702
703fn get_serializable_stats() -> Vec<SerializableChannelStats> {
704    get_sorted_channel_stats()
705        .iter()
706        .map(SerializableChannelStats::from)
707        .collect()
708}
709
710/// Serializable log response containing sent and received logs.
711#[derive(Debug, Clone, Serialize, Deserialize)]
712pub struct ChannelLogs {
713    pub id: String,
714    pub sent_logs: Vec<LogEntry>,
715    pub received_logs: Vec<LogEntry>,
716}
717
718pub(crate) fn get_channel_logs(channel_id: &str) -> Option<ChannelLogs> {
719    let id = channel_id.parse::<u64>().ok()?;
720    let stats = get_channel_stats();
721    stats.get(&id).map(|channel_stats| {
722        let mut sent_logs: Vec<LogEntry> = channel_stats.sent_logs.iter().cloned().collect();
723
724        let mut received_logs: Vec<LogEntry> =
725            channel_stats.received_logs.iter().cloned().collect();
726
727        // Sort by index descending (most recent first)
728        sent_logs.sort_by(|a, b| b.index.cmp(&a.index));
729        received_logs.sort_by(|a, b| b.index.cmp(&a.index));
730
731        ChannelLogs {
732            id: channel_id.to_string(),
733            sent_logs,
734            received_logs,
735        }
736    })
737}