Skip to main content

ipckit/
metrics.rs

1//! # Channel Metrics
2//!
3//! This module provides performance monitoring capabilities for IPC channels.
4//! It tracks message counts, byte throughput, errors, latency, and queue depth.
5//!
6//! ## Example
7//!
8//! ```rust,ignore
9//! use ipckit::{ChannelMetrics, MeteredChannel};
10//!
11//! let channel = NamedPipeChannel::new("my_pipe")?.with_metrics();
12//!
13//! // ... use channel ...
14//!
15//! let metrics = channel.metrics();
16//! println!("Messages sent: {}", metrics.messages_sent());
17//! println!("Avg latency: {}µs", metrics.avg_latency_us());
18//!
19//! // Export for monitoring
20//! log::info!("IPC metrics: {}", metrics.to_json());
21//! ```
22
23use parking_lot::RwLock;
24use serde::{Deserialize, Serialize};
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::time::{Duration, Instant};
27
28/// Atomic metrics counters for thread-safe updates.
29#[derive(Debug, Default)]
30pub struct ChannelMetrics {
31    /// Total messages sent
32    messages_sent: AtomicU64,
33    /// Total messages received
34    messages_received: AtomicU64,
35    /// Total bytes sent
36    bytes_sent: AtomicU64,
37    /// Total bytes received
38    bytes_received: AtomicU64,
39    /// Send errors
40    send_errors: AtomicU64,
41    /// Receive errors
42    receive_errors: AtomicU64,
43    /// Current queue depth (for buffered channels)
44    queue_depth: AtomicU64,
45    /// Peak queue depth
46    peak_queue_depth: AtomicU64,
47    /// Sum of latencies in microseconds (for averaging)
48    latency_sum_us: AtomicU64,
49    /// Count of latency samples
50    latency_count: AtomicU64,
51    /// Minimum latency in microseconds
52    min_latency_us: AtomicU64,
53    /// Maximum latency in microseconds
54    max_latency_us: AtomicU64,
55    /// Histogram for latency distribution
56    latency_histogram: RwLock<LatencyHistogram>,
57    /// Start time for rate calculations
58    start_time: RwLock<Option<Instant>>,
59}
60
61impl ChannelMetrics {
62    /// Create a new metrics instance.
63    pub fn new() -> Self {
64        Self {
65            min_latency_us: AtomicU64::new(u64::MAX),
66            ..Default::default()
67        }
68    }
69
70    /// Record a message sent.
71    pub fn record_send(&self, bytes: usize) {
72        self.ensure_started();
73        self.messages_sent.fetch_add(1, Ordering::Relaxed);
74        self.bytes_sent.fetch_add(bytes as u64, Ordering::Relaxed);
75    }
76
77    /// Record a message received.
78    pub fn record_recv(&self, bytes: usize) {
79        self.ensure_started();
80        self.messages_received.fetch_add(1, Ordering::Relaxed);
81        self.bytes_received
82            .fetch_add(bytes as u64, Ordering::Relaxed);
83    }
84
85    /// Record a send error.
86    pub fn record_send_error(&self) {
87        self.send_errors.fetch_add(1, Ordering::Relaxed);
88    }
89
90    /// Record a receive error.
91    pub fn record_recv_error(&self) {
92        self.receive_errors.fetch_add(1, Ordering::Relaxed);
93    }
94
95    /// Record latency for a message.
96    pub fn record_latency(&self, latency: Duration) {
97        let us = latency.as_micros() as u64;
98        self.latency_sum_us.fetch_add(us, Ordering::Relaxed);
99        self.latency_count.fetch_add(1, Ordering::Relaxed);
100
101        // Update min latency
102        let mut current_min = self.min_latency_us.load(Ordering::Relaxed);
103        while us < current_min {
104            match self.min_latency_us.compare_exchange_weak(
105                current_min,
106                us,
107                Ordering::Relaxed,
108                Ordering::Relaxed,
109            ) {
110                Ok(_) => break,
111                Err(x) => current_min = x,
112            }
113        }
114
115        // Update max latency
116        let mut current_max = self.max_latency_us.load(Ordering::Relaxed);
117        while us > current_max {
118            match self.max_latency_us.compare_exchange_weak(
119                current_max,
120                us,
121                Ordering::Relaxed,
122                Ordering::Relaxed,
123            ) {
124                Ok(_) => break,
125                Err(x) => current_max = x,
126            }
127        }
128
129        // Update histogram
130        self.latency_histogram.write().record(us);
131    }
132
133    /// Update queue depth.
134    pub fn set_queue_depth(&self, depth: u64) {
135        self.queue_depth.store(depth, Ordering::Relaxed);
136
137        // Update peak
138        let mut current_peak = self.peak_queue_depth.load(Ordering::Relaxed);
139        while depth > current_peak {
140            match self.peak_queue_depth.compare_exchange_weak(
141                current_peak,
142                depth,
143                Ordering::Relaxed,
144                Ordering::Relaxed,
145            ) {
146                Ok(_) => break,
147                Err(x) => current_peak = x,
148            }
149        }
150    }
151
152    /// Get messages sent count.
153    pub fn messages_sent(&self) -> u64 {
154        self.messages_sent.load(Ordering::Relaxed)
155    }
156
157    /// Get messages received count.
158    pub fn messages_received(&self) -> u64 {
159        self.messages_received.load(Ordering::Relaxed)
160    }
161
162    /// Get bytes sent count.
163    pub fn bytes_sent(&self) -> u64 {
164        self.bytes_sent.load(Ordering::Relaxed)
165    }
166
167    /// Get bytes received count.
168    pub fn bytes_received(&self) -> u64 {
169        self.bytes_received.load(Ordering::Relaxed)
170    }
171
172    /// Get send errors count.
173    pub fn send_errors(&self) -> u64 {
174        self.send_errors.load(Ordering::Relaxed)
175    }
176
177    /// Get receive errors count.
178    pub fn receive_errors(&self) -> u64 {
179        self.receive_errors.load(Ordering::Relaxed)
180    }
181
182    /// Get current queue depth.
183    pub fn queue_depth(&self) -> u64 {
184        self.queue_depth.load(Ordering::Relaxed)
185    }
186
187    /// Get peak queue depth.
188    pub fn peak_queue_depth(&self) -> u64 {
189        self.peak_queue_depth.load(Ordering::Relaxed)
190    }
191
192    /// Get average latency in microseconds.
193    pub fn avg_latency_us(&self) -> u64 {
194        let count = self.latency_count.load(Ordering::Relaxed);
195        if count == 0 {
196            return 0;
197        }
198        self.latency_sum_us.load(Ordering::Relaxed) / count
199    }
200
201    /// Get minimum latency in microseconds.
202    pub fn min_latency_us(&self) -> Option<u64> {
203        let min = self.min_latency_us.load(Ordering::Relaxed);
204        if min == u64::MAX {
205            None
206        } else {
207            Some(min)
208        }
209    }
210
211    /// Get maximum latency in microseconds.
212    pub fn max_latency_us(&self) -> u64 {
213        self.max_latency_us.load(Ordering::Relaxed)
214    }
215
216    /// Get latency percentile (e.g., 99 for p99).
217    pub fn latency_percentile(&self, percentile: u8) -> u64 {
218        self.latency_histogram.read().percentile(percentile)
219    }
220
221    /// Get elapsed time since metrics started.
222    pub fn elapsed(&self) -> Duration {
223        self.start_time
224            .read()
225            .map(|t| t.elapsed())
226            .unwrap_or_default()
227    }
228
229    /// Get send throughput in messages per second.
230    pub fn send_throughput(&self) -> f64 {
231        let elapsed = self.elapsed().as_secs_f64();
232        if elapsed == 0.0 {
233            return 0.0;
234        }
235        self.messages_sent() as f64 / elapsed
236    }
237
238    /// Get receive throughput in messages per second.
239    pub fn recv_throughput(&self) -> f64 {
240        let elapsed = self.elapsed().as_secs_f64();
241        if elapsed == 0.0 {
242            return 0.0;
243        }
244        self.messages_received() as f64 / elapsed
245    }
246
247    /// Get send bandwidth in bytes per second.
248    pub fn send_bandwidth(&self) -> f64 {
249        let elapsed = self.elapsed().as_secs_f64();
250        if elapsed == 0.0 {
251            return 0.0;
252        }
253        self.bytes_sent() as f64 / elapsed
254    }
255
256    /// Get receive bandwidth in bytes per second.
257    pub fn recv_bandwidth(&self) -> f64 {
258        let elapsed = self.elapsed().as_secs_f64();
259        if elapsed == 0.0 {
260            return 0.0;
261        }
262        self.bytes_received() as f64 / elapsed
263    }
264
265    /// Reset all metrics.
266    pub fn reset(&self) {
267        self.messages_sent.store(0, Ordering::Relaxed);
268        self.messages_received.store(0, Ordering::Relaxed);
269        self.bytes_sent.store(0, Ordering::Relaxed);
270        self.bytes_received.store(0, Ordering::Relaxed);
271        self.send_errors.store(0, Ordering::Relaxed);
272        self.receive_errors.store(0, Ordering::Relaxed);
273        self.queue_depth.store(0, Ordering::Relaxed);
274        self.peak_queue_depth.store(0, Ordering::Relaxed);
275        self.latency_sum_us.store(0, Ordering::Relaxed);
276        self.latency_count.store(0, Ordering::Relaxed);
277        self.min_latency_us.store(u64::MAX, Ordering::Relaxed);
278        self.max_latency_us.store(0, Ordering::Relaxed);
279        self.latency_histogram.write().reset();
280        *self.start_time.write() = Some(Instant::now());
281    }
282
283    /// Get a snapshot of all metrics.
284    pub fn snapshot(&self) -> MetricsSnapshot {
285        MetricsSnapshot {
286            messages_sent: self.messages_sent(),
287            messages_received: self.messages_received(),
288            bytes_sent: self.bytes_sent(),
289            bytes_received: self.bytes_received(),
290            send_errors: self.send_errors(),
291            receive_errors: self.receive_errors(),
292            queue_depth: self.queue_depth(),
293            peak_queue_depth: self.peak_queue_depth(),
294            avg_latency_us: self.avg_latency_us(),
295            min_latency_us: self.min_latency_us(),
296            max_latency_us: self.max_latency_us(),
297            p50_latency_us: self.latency_percentile(50),
298            p95_latency_us: self.latency_percentile(95),
299            p99_latency_us: self.latency_percentile(99),
300            elapsed_secs: self.elapsed().as_secs_f64(),
301            send_throughput: self.send_throughput(),
302            recv_throughput: self.recv_throughput(),
303            send_bandwidth: self.send_bandwidth(),
304            recv_bandwidth: self.recv_bandwidth(),
305        }
306    }
307
308    /// Export metrics as JSON string.
309    pub fn to_json(&self) -> String {
310        serde_json::to_string(&self.snapshot()).unwrap_or_default()
311    }
312
313    /// Export metrics as pretty JSON string.
314    pub fn to_json_pretty(&self) -> String {
315        serde_json::to_string_pretty(&self.snapshot()).unwrap_or_default()
316    }
317
318    /// Export metrics in Prometheus format.
319    pub fn to_prometheus(&self, prefix: &str) -> String {
320        let snapshot = self.snapshot();
321        let mut output = String::new();
322
323        output.push_str(&format!(
324            "# HELP {prefix}_messages_sent_total Total messages sent\n"
325        ));
326        output.push_str(&format!("# TYPE {prefix}_messages_sent_total counter\n"));
327        output.push_str(&format!(
328            "{prefix}_messages_sent_total {}\n",
329            snapshot.messages_sent
330        ));
331
332        output.push_str(&format!(
333            "# HELP {prefix}_messages_received_total Total messages received\n"
334        ));
335        output.push_str(&format!(
336            "# TYPE {prefix}_messages_received_total counter\n"
337        ));
338        output.push_str(&format!(
339            "{prefix}_messages_received_total {}\n",
340            snapshot.messages_received
341        ));
342
343        output.push_str(&format!(
344            "# HELP {prefix}_bytes_sent_total Total bytes sent\n"
345        ));
346        output.push_str(&format!("# TYPE {prefix}_bytes_sent_total counter\n"));
347        output.push_str(&format!(
348            "{prefix}_bytes_sent_total {}\n",
349            snapshot.bytes_sent
350        ));
351
352        output.push_str(&format!(
353            "# HELP {prefix}_bytes_received_total Total bytes received\n"
354        ));
355        output.push_str(&format!("# TYPE {prefix}_bytes_received_total counter\n"));
356        output.push_str(&format!(
357            "{prefix}_bytes_received_total {}\n",
358            snapshot.bytes_received
359        ));
360
361        output.push_str(&format!(
362            "# HELP {prefix}_send_errors_total Total send errors\n"
363        ));
364        output.push_str(&format!("# TYPE {prefix}_send_errors_total counter\n"));
365        output.push_str(&format!(
366            "{prefix}_send_errors_total {}\n",
367            snapshot.send_errors
368        ));
369
370        output.push_str(&format!(
371            "# HELP {prefix}_receive_errors_total Total receive errors\n"
372        ));
373        output.push_str(&format!("# TYPE {prefix}_receive_errors_total counter\n"));
374        output.push_str(&format!(
375            "{prefix}_receive_errors_total {}\n",
376            snapshot.receive_errors
377        ));
378
379        output.push_str(&format!(
380            "# HELP {prefix}_queue_depth Current queue depth\n"
381        ));
382        output.push_str(&format!("# TYPE {prefix}_queue_depth gauge\n"));
383        output.push_str(&format!("{prefix}_queue_depth {}\n", snapshot.queue_depth));
384
385        output.push_str(&format!(
386            "# HELP {prefix}_latency_microseconds Latency in microseconds\n"
387        ));
388        output.push_str(&format!("# TYPE {prefix}_latency_microseconds summary\n"));
389        output.push_str(&format!(
390            "{prefix}_latency_microseconds{{quantile=\"0.5\"}} {}\n",
391            snapshot.p50_latency_us
392        ));
393        output.push_str(&format!(
394            "{prefix}_latency_microseconds{{quantile=\"0.95\"}} {}\n",
395            snapshot.p95_latency_us
396        ));
397        output.push_str(&format!(
398            "{prefix}_latency_microseconds{{quantile=\"0.99\"}} {}\n",
399            snapshot.p99_latency_us
400        ));
401
402        output.push_str(&format!(
403            "# HELP {prefix}_throughput_messages_per_second Message throughput\n"
404        ));
405        output.push_str(&format!(
406            "# TYPE {prefix}_throughput_messages_per_second gauge\n"
407        ));
408        output.push_str(&format!(
409            "{prefix}_throughput_messages_per_second{{direction=\"send\"}} {:.2}\n",
410            snapshot.send_throughput
411        ));
412        output.push_str(&format!(
413            "{prefix}_throughput_messages_per_second{{direction=\"recv\"}} {:.2}\n",
414            snapshot.recv_throughput
415        ));
416
417        output
418    }
419
420    fn ensure_started(&self) {
421        let mut start = self.start_time.write();
422        if start.is_none() {
423            *start = Some(Instant::now());
424        }
425    }
426}
427
428/// A snapshot of metrics at a point in time.
429#[derive(Debug, Clone, Serialize, Deserialize)]
430pub struct MetricsSnapshot {
431    /// Total messages sent
432    pub messages_sent: u64,
433    /// Total messages received
434    pub messages_received: u64,
435    /// Total bytes sent
436    pub bytes_sent: u64,
437    /// Total bytes received
438    pub bytes_received: u64,
439    /// Send errors
440    pub send_errors: u64,
441    /// Receive errors
442    pub receive_errors: u64,
443    /// Current queue depth
444    pub queue_depth: u64,
445    /// Peak queue depth
446    pub peak_queue_depth: u64,
447    /// Average latency in microseconds
448    pub avg_latency_us: u64,
449    /// Minimum latency in microseconds
450    pub min_latency_us: Option<u64>,
451    /// Maximum latency in microseconds
452    pub max_latency_us: u64,
453    /// 50th percentile latency
454    pub p50_latency_us: u64,
455    /// 95th percentile latency
456    pub p95_latency_us: u64,
457    /// 99th percentile latency
458    pub p99_latency_us: u64,
459    /// Elapsed time in seconds
460    pub elapsed_secs: f64,
461    /// Send throughput (messages/second)
462    pub send_throughput: f64,
463    /// Receive throughput (messages/second)
464    pub recv_throughput: f64,
465    /// Send bandwidth (bytes/second)
466    pub send_bandwidth: f64,
467    /// Receive bandwidth (bytes/second)
468    pub recv_bandwidth: f64,
469}
470
471/// A simple histogram for latency distribution.
472#[derive(Debug, Default)]
473struct LatencyHistogram {
474    // Buckets: 0-10us, 10-100us, 100us-1ms, 1-10ms, 10-100ms, 100ms-1s, 1s+
475    buckets: [u64; 7],
476    // For percentile calculation, keep sorted samples (up to a limit)
477    samples: Vec<u64>,
478    max_samples: usize,
479}
480
481impl LatencyHistogram {
482    #[allow(dead_code)]
483    fn new() -> Self {
484        Self {
485            buckets: [0; 7],
486            samples: Vec::new(),
487            max_samples: 10000,
488        }
489    }
490
491    fn record(&mut self, latency_us: u64) {
492        // Update bucket
493        let bucket = match latency_us {
494            0..=10 => 0,
495            11..=100 => 1,
496            101..=1000 => 2,
497            1001..=10000 => 3,
498            10001..=100000 => 4,
499            100001..=1000000 => 5,
500            _ => 6,
501        };
502        self.buckets[bucket] += 1;
503
504        // Store sample for percentile calculation
505        if self.samples.len() < self.max_samples {
506            self.samples.push(latency_us);
507        } else {
508            // Reservoir sampling
509            let idx = rand_usize() % (self.samples.len() + 1);
510            if idx < self.samples.len() {
511                self.samples[idx] = latency_us;
512            }
513        }
514    }
515
516    fn percentile(&self, p: u8) -> u64 {
517        if self.samples.is_empty() {
518            return 0;
519        }
520
521        let mut sorted = self.samples.clone();
522        sorted.sort_unstable();
523
524        let idx = ((p as f64 / 100.0) * (sorted.len() - 1) as f64) as usize;
525        sorted[idx]
526    }
527
528    fn reset(&mut self) {
529        self.buckets = [0; 7];
530        self.samples.clear();
531    }
532}
533
534/// Simple pseudo-random number for reservoir sampling.
535fn rand_usize() -> usize {
536    use std::collections::hash_map::RandomState;
537    use std::hash::{BuildHasher, Hasher};
538    RandomState::new().build_hasher().finish() as usize
539}
540
541/// Trait for channels that support metrics.
542pub trait MeteredChannel {
543    /// Get the metrics for this channel.
544    fn metrics(&self) -> &ChannelMetrics;
545}
546
547/// A wrapper that adds metrics to any channel.
548pub struct MeteredWrapper<C> {
549    inner: C,
550    metrics: ChannelMetrics,
551}
552
553impl<C> MeteredWrapper<C> {
554    /// Create a new metered wrapper around a channel.
555    pub fn new(channel: C) -> Self {
556        Self {
557            inner: channel,
558            metrics: ChannelMetrics::new(),
559        }
560    }
561
562    /// Get a reference to the inner channel.
563    pub fn inner(&self) -> &C {
564        &self.inner
565    }
566
567    /// Get a mutable reference to the inner channel.
568    pub fn inner_mut(&mut self) -> &mut C {
569        &mut self.inner
570    }
571
572    /// Consume the wrapper and return the inner channel.
573    pub fn into_inner(self) -> C {
574        self.inner
575    }
576}
577
578impl<C> MeteredChannel for MeteredWrapper<C> {
579    fn metrics(&self) -> &ChannelMetrics {
580        &self.metrics
581    }
582}
583
584/// Extension trait for adding metrics to channels.
585pub trait WithMetrics: Sized {
586    /// Wrap this channel with metrics tracking.
587    fn with_metrics(self) -> MeteredWrapper<Self> {
588        MeteredWrapper::new(self)
589    }
590}
591
592// Implement for all types
593impl<T> WithMetrics for T {}
594
595/// A sender wrapper that automatically records metrics.
596pub struct MeteredSender<S> {
597    inner: S,
598    metrics: std::sync::Arc<ChannelMetrics>,
599}
600
601impl<S> MeteredSender<S> {
602    /// Create a new metered sender.
603    pub fn new(sender: S, metrics: std::sync::Arc<ChannelMetrics>) -> Self {
604        Self {
605            inner: sender,
606            metrics,
607        }
608    }
609
610    /// Get a reference to the inner sender.
611    pub fn inner(&self) -> &S {
612        &self.inner
613    }
614
615    /// Get a mutable reference to the inner sender.
616    pub fn inner_mut(&mut self) -> &mut S {
617        &mut self.inner
618    }
619
620    /// Get the metrics.
621    pub fn metrics(&self) -> &ChannelMetrics {
622        &self.metrics
623    }
624
625    /// Consume the wrapper and return the inner sender.
626    pub fn into_inner(self) -> S {
627        self.inner
628    }
629}
630
631impl<S: Clone> Clone for MeteredSender<S> {
632    fn clone(&self) -> Self {
633        Self {
634            inner: self.inner.clone(),
635            metrics: self.metrics.clone(),
636        }
637    }
638}
639
640/// A receiver wrapper that automatically records metrics.
641pub struct MeteredReceiver<R> {
642    inner: R,
643    metrics: std::sync::Arc<ChannelMetrics>,
644}
645
646impl<R> MeteredReceiver<R> {
647    /// Create a new metered receiver.
648    pub fn new(receiver: R, metrics: std::sync::Arc<ChannelMetrics>) -> Self {
649        Self {
650            inner: receiver,
651            metrics,
652        }
653    }
654
655    /// Get a reference to the inner receiver.
656    pub fn inner(&self) -> &R {
657        &self.inner
658    }
659
660    /// Get a mutable reference to the inner receiver.
661    pub fn inner_mut(&mut self) -> &mut R {
662        &mut self.inner
663    }
664
665    /// Get the metrics.
666    pub fn metrics(&self) -> &ChannelMetrics {
667        &self.metrics
668    }
669
670    /// Consume the wrapper and return the inner receiver.
671    pub fn into_inner(self) -> R {
672        self.inner
673    }
674}
675
676/// Helper trait for creating metered sender/receiver pairs.
677pub trait IntoMetered: Sized {
678    /// Wrap this sender with metrics tracking.
679    fn metered(self, metrics: std::sync::Arc<ChannelMetrics>) -> MeteredSender<Self> {
680        MeteredSender::new(self, metrics)
681    }
682}
683
684impl<T> IntoMetered for T {}
685
686/// Create a metered channel pair with shared metrics.
687///
688/// Returns (sender, receiver, metrics) where both sender and receiver
689/// share the same metrics instance.
690pub fn metered_pair<S, R>(
691    sender: S,
692    receiver: R,
693) -> (
694    MeteredSender<S>,
695    MeteredReceiver<R>,
696    std::sync::Arc<ChannelMetrics>,
697) {
698    let metrics = std::sync::Arc::new(ChannelMetrics::new());
699    let metered_sender = MeteredSender::new(sender, metrics.clone());
700    let metered_receiver = MeteredReceiver::new(receiver, metrics.clone());
701    (metered_sender, metered_receiver, metrics)
702}
703
704/// Aggregated metrics from multiple channels.
705#[derive(Debug, Default)]
706pub struct AggregatedMetrics {
707    channels: parking_lot::RwLock<Vec<std::sync::Arc<ChannelMetrics>>>,
708}
709
710impl AggregatedMetrics {
711    /// Create a new aggregated metrics instance.
712    pub fn new() -> Self {
713        Self::default()
714    }
715
716    /// Register a channel's metrics for aggregation.
717    pub fn register(&self, metrics: std::sync::Arc<ChannelMetrics>) {
718        self.channels.write().push(metrics);
719    }
720
721    /// Get the total messages sent across all channels.
722    pub fn total_messages_sent(&self) -> u64 {
723        self.channels.read().iter().map(|m| m.messages_sent()).sum()
724    }
725
726    /// Get the total messages received across all channels.
727    pub fn total_messages_received(&self) -> u64 {
728        self.channels
729            .read()
730            .iter()
731            .map(|m| m.messages_received())
732            .sum()
733    }
734
735    /// Get the total bytes sent across all channels.
736    pub fn total_bytes_sent(&self) -> u64 {
737        self.channels.read().iter().map(|m| m.bytes_sent()).sum()
738    }
739
740    /// Get the total bytes received across all channels.
741    pub fn total_bytes_received(&self) -> u64 {
742        self.channels
743            .read()
744            .iter()
745            .map(|m| m.bytes_received())
746            .sum()
747    }
748
749    /// Get the total send errors across all channels.
750    pub fn total_send_errors(&self) -> u64 {
751        self.channels.read().iter().map(|m| m.send_errors()).sum()
752    }
753
754    /// Get the total receive errors across all channels.
755    pub fn total_receive_errors(&self) -> u64 {
756        self.channels
757            .read()
758            .iter()
759            .map(|m| m.receive_errors())
760            .sum()
761    }
762
763    /// Get the number of registered channels.
764    pub fn channel_count(&self) -> usize {
765        self.channels.read().len()
766    }
767
768    /// Get snapshots from all channels.
769    pub fn snapshots(&self) -> Vec<MetricsSnapshot> {
770        self.channels.read().iter().map(|m| m.snapshot()).collect()
771    }
772
773    /// Export aggregated metrics as JSON.
774    pub fn to_json(&self) -> String {
775        let aggregate = serde_json::json!({
776            "channel_count": self.channel_count(),
777            "total_messages_sent": self.total_messages_sent(),
778            "total_messages_received": self.total_messages_received(),
779            "total_bytes_sent": self.total_bytes_sent(),
780            "total_bytes_received": self.total_bytes_received(),
781            "total_send_errors": self.total_send_errors(),
782            "total_receive_errors": self.total_receive_errors(),
783            "channels": self.snapshots(),
784        });
785        serde_json::to_string_pretty(&aggregate).unwrap_or_default()
786    }
787
788    /// Export aggregated metrics in Prometheus format.
789    pub fn to_prometheus(&self, prefix: &str) -> String {
790        let mut output = String::new();
791
792        output.push_str(&format!(
793            "# HELP {prefix}_channels_total Number of registered channels\n"
794        ));
795        output.push_str(&format!("# TYPE {prefix}_channels_total gauge\n"));
796        output.push_str(&format!(
797            "{prefix}_channels_total {}\n",
798            self.channel_count()
799        ));
800
801        output.push_str(&format!(
802            "# HELP {prefix}_messages_sent_total Total messages sent across all channels\n"
803        ));
804        output.push_str(&format!("# TYPE {prefix}_messages_sent_total counter\n"));
805        output.push_str(&format!(
806            "{prefix}_messages_sent_total {}\n",
807            self.total_messages_sent()
808        ));
809
810        output.push_str(&format!(
811            "# HELP {prefix}_messages_received_total Total messages received across all channels\n"
812        ));
813        output.push_str(&format!(
814            "# TYPE {prefix}_messages_received_total counter\n"
815        ));
816        output.push_str(&format!(
817            "{prefix}_messages_received_total {}\n",
818            self.total_messages_received()
819        ));
820
821        output.push_str(&format!(
822            "# HELP {prefix}_bytes_sent_total Total bytes sent across all channels\n"
823        ));
824        output.push_str(&format!("# TYPE {prefix}_bytes_sent_total counter\n"));
825        output.push_str(&format!(
826            "{prefix}_bytes_sent_total {}\n",
827            self.total_bytes_sent()
828        ));
829
830        output.push_str(&format!(
831            "# HELP {prefix}_bytes_received_total Total bytes received across all channels\n"
832        ));
833        output.push_str(&format!("# TYPE {prefix}_bytes_received_total counter\n"));
834        output.push_str(&format!(
835            "{prefix}_bytes_received_total {}\n",
836            self.total_bytes_received()
837        ));
838
839        output
840    }
841}
842
843#[cfg(test)]
844mod tests {
845    use super::*;
846
847    #[test]
848    fn test_basic_metrics() {
849        let metrics = ChannelMetrics::new();
850
851        metrics.record_send(100);
852        metrics.record_send(200);
853        metrics.record_recv(150);
854
855        assert_eq!(metrics.messages_sent(), 2);
856        assert_eq!(metrics.messages_received(), 1);
857        assert_eq!(metrics.bytes_sent(), 300);
858        assert_eq!(metrics.bytes_received(), 150);
859    }
860
861    #[test]
862    fn test_error_tracking() {
863        let metrics = ChannelMetrics::new();
864
865        metrics.record_send_error();
866        metrics.record_send_error();
867        metrics.record_recv_error();
868
869        assert_eq!(metrics.send_errors(), 2);
870        assert_eq!(metrics.receive_errors(), 1);
871    }
872
873    #[test]
874    fn test_latency_tracking() {
875        let metrics = ChannelMetrics::new();
876
877        metrics.record_latency(Duration::from_micros(100));
878        metrics.record_latency(Duration::from_micros(200));
879        metrics.record_latency(Duration::from_micros(300));
880
881        assert_eq!(metrics.avg_latency_us(), 200);
882        assert_eq!(metrics.min_latency_us(), Some(100));
883        assert_eq!(metrics.max_latency_us(), 300);
884    }
885
886    #[test]
887    fn test_queue_depth() {
888        let metrics = ChannelMetrics::new();
889
890        metrics.set_queue_depth(5);
891        assert_eq!(metrics.queue_depth(), 5);
892        assert_eq!(metrics.peak_queue_depth(), 5);
893
894        metrics.set_queue_depth(10);
895        assert_eq!(metrics.queue_depth(), 10);
896        assert_eq!(metrics.peak_queue_depth(), 10);
897
898        metrics.set_queue_depth(3);
899        assert_eq!(metrics.queue_depth(), 3);
900        assert_eq!(metrics.peak_queue_depth(), 10); // Peak unchanged
901    }
902
903    #[test]
904    fn test_snapshot() {
905        let metrics = ChannelMetrics::new();
906        metrics.record_send(100);
907        metrics.record_recv(50);
908
909        let snapshot = metrics.snapshot();
910        assert_eq!(snapshot.messages_sent, 1);
911        assert_eq!(snapshot.messages_received, 1);
912        assert_eq!(snapshot.bytes_sent, 100);
913        assert_eq!(snapshot.bytes_received, 50);
914    }
915
916    #[test]
917    fn test_json_export() {
918        let metrics = ChannelMetrics::new();
919        metrics.record_send(100);
920
921        let json = metrics.to_json();
922        assert!(json.contains("messages_sent"));
923        assert!(json.contains("1"));
924    }
925
926    #[test]
927    fn test_prometheus_export() {
928        let metrics = ChannelMetrics::new();
929        metrics.record_send(100);
930
931        let prom = metrics.to_prometheus("ipckit");
932        assert!(prom.contains("ipckit_messages_sent_total 1"));
933    }
934
935    #[test]
936    fn test_reset() {
937        let metrics = ChannelMetrics::new();
938        metrics.record_send(100);
939        metrics.record_recv(50);
940
941        metrics.reset();
942
943        assert_eq!(metrics.messages_sent(), 0);
944        assert_eq!(metrics.messages_received(), 0);
945        assert_eq!(metrics.bytes_sent(), 0);
946        assert_eq!(metrics.bytes_received(), 0);
947    }
948
949    #[test]
950    fn test_with_metrics() {
951        struct DummyChannel;
952
953        let wrapped = DummyChannel.with_metrics();
954        wrapped.metrics().record_send(100);
955        assert_eq!(wrapped.metrics().messages_sent(), 1);
956    }
957
958    #[test]
959    fn test_metered_sender_receiver() {
960        struct DummySender;
961        struct DummyReceiver;
962
963        let (sender, receiver, metrics) = metered_pair(DummySender, DummyReceiver);
964
965        // Both share the same metrics
966        sender.metrics().record_send(100);
967        assert_eq!(receiver.metrics().messages_sent(), 1);
968        assert_eq!(metrics.messages_sent(), 1);
969    }
970
971    #[test]
972    fn test_aggregated_metrics() {
973        let agg = AggregatedMetrics::new();
974
975        let m1 = std::sync::Arc::new(ChannelMetrics::new());
976        let m2 = std::sync::Arc::new(ChannelMetrics::new());
977
978        m1.record_send(100);
979        m1.record_send(200);
980        m2.record_send(50);
981
982        agg.register(m1);
983        agg.register(m2);
984
985        assert_eq!(agg.channel_count(), 2);
986        assert_eq!(agg.total_messages_sent(), 3);
987        assert_eq!(agg.total_bytes_sent(), 350);
988    }
989}