1use parking_lot::RwLock;
24use serde::{Deserialize, Serialize};
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::time::{Duration, Instant};
27
28#[derive(Debug, Default)]
30pub struct ChannelMetrics {
31 messages_sent: AtomicU64,
33 messages_received: AtomicU64,
35 bytes_sent: AtomicU64,
37 bytes_received: AtomicU64,
39 send_errors: AtomicU64,
41 receive_errors: AtomicU64,
43 queue_depth: AtomicU64,
45 peak_queue_depth: AtomicU64,
47 latency_sum_us: AtomicU64,
49 latency_count: AtomicU64,
51 min_latency_us: AtomicU64,
53 max_latency_us: AtomicU64,
55 latency_histogram: RwLock<LatencyHistogram>,
57 start_time: RwLock<Option<Instant>>,
59}
60
61impl ChannelMetrics {
62 pub fn new() -> Self {
64 Self {
65 min_latency_us: AtomicU64::new(u64::MAX),
66 ..Default::default()
67 }
68 }
69
70 pub fn record_send(&self, bytes: usize) {
72 self.ensure_started();
73 self.messages_sent.fetch_add(1, Ordering::Relaxed);
74 self.bytes_sent.fetch_add(bytes as u64, Ordering::Relaxed);
75 }
76
77 pub fn record_recv(&self, bytes: usize) {
79 self.ensure_started();
80 self.messages_received.fetch_add(1, Ordering::Relaxed);
81 self.bytes_received
82 .fetch_add(bytes as u64, Ordering::Relaxed);
83 }
84
85 pub fn record_send_error(&self) {
87 self.send_errors.fetch_add(1, Ordering::Relaxed);
88 }
89
90 pub fn record_recv_error(&self) {
92 self.receive_errors.fetch_add(1, Ordering::Relaxed);
93 }
94
95 pub fn record_latency(&self, latency: Duration) {
97 let us = latency.as_micros() as u64;
98 self.latency_sum_us.fetch_add(us, Ordering::Relaxed);
99 self.latency_count.fetch_add(1, Ordering::Relaxed);
100
101 let mut current_min = self.min_latency_us.load(Ordering::Relaxed);
103 while us < current_min {
104 match self.min_latency_us.compare_exchange_weak(
105 current_min,
106 us,
107 Ordering::Relaxed,
108 Ordering::Relaxed,
109 ) {
110 Ok(_) => break,
111 Err(x) => current_min = x,
112 }
113 }
114
115 let mut current_max = self.max_latency_us.load(Ordering::Relaxed);
117 while us > current_max {
118 match self.max_latency_us.compare_exchange_weak(
119 current_max,
120 us,
121 Ordering::Relaxed,
122 Ordering::Relaxed,
123 ) {
124 Ok(_) => break,
125 Err(x) => current_max = x,
126 }
127 }
128
129 self.latency_histogram.write().record(us);
131 }
132
133 pub fn set_queue_depth(&self, depth: u64) {
135 self.queue_depth.store(depth, Ordering::Relaxed);
136
137 let mut current_peak = self.peak_queue_depth.load(Ordering::Relaxed);
139 while depth > current_peak {
140 match self.peak_queue_depth.compare_exchange_weak(
141 current_peak,
142 depth,
143 Ordering::Relaxed,
144 Ordering::Relaxed,
145 ) {
146 Ok(_) => break,
147 Err(x) => current_peak = x,
148 }
149 }
150 }
151
152 pub fn messages_sent(&self) -> u64 {
154 self.messages_sent.load(Ordering::Relaxed)
155 }
156
157 pub fn messages_received(&self) -> u64 {
159 self.messages_received.load(Ordering::Relaxed)
160 }
161
162 pub fn bytes_sent(&self) -> u64 {
164 self.bytes_sent.load(Ordering::Relaxed)
165 }
166
167 pub fn bytes_received(&self) -> u64 {
169 self.bytes_received.load(Ordering::Relaxed)
170 }
171
172 pub fn send_errors(&self) -> u64 {
174 self.send_errors.load(Ordering::Relaxed)
175 }
176
177 pub fn receive_errors(&self) -> u64 {
179 self.receive_errors.load(Ordering::Relaxed)
180 }
181
182 pub fn queue_depth(&self) -> u64 {
184 self.queue_depth.load(Ordering::Relaxed)
185 }
186
187 pub fn peak_queue_depth(&self) -> u64 {
189 self.peak_queue_depth.load(Ordering::Relaxed)
190 }
191
192 pub fn avg_latency_us(&self) -> u64 {
194 let count = self.latency_count.load(Ordering::Relaxed);
195 if count == 0 {
196 return 0;
197 }
198 self.latency_sum_us.load(Ordering::Relaxed) / count
199 }
200
201 pub fn min_latency_us(&self) -> Option<u64> {
203 let min = self.min_latency_us.load(Ordering::Relaxed);
204 if min == u64::MAX {
205 None
206 } else {
207 Some(min)
208 }
209 }
210
211 pub fn max_latency_us(&self) -> u64 {
213 self.max_latency_us.load(Ordering::Relaxed)
214 }
215
216 pub fn latency_percentile(&self, percentile: u8) -> u64 {
218 self.latency_histogram.read().percentile(percentile)
219 }
220
221 pub fn elapsed(&self) -> Duration {
223 self.start_time
224 .read()
225 .map(|t| t.elapsed())
226 .unwrap_or_default()
227 }
228
229 pub fn send_throughput(&self) -> f64 {
231 let elapsed = self.elapsed().as_secs_f64();
232 if elapsed == 0.0 {
233 return 0.0;
234 }
235 self.messages_sent() as f64 / elapsed
236 }
237
238 pub fn recv_throughput(&self) -> f64 {
240 let elapsed = self.elapsed().as_secs_f64();
241 if elapsed == 0.0 {
242 return 0.0;
243 }
244 self.messages_received() as f64 / elapsed
245 }
246
247 pub fn send_bandwidth(&self) -> f64 {
249 let elapsed = self.elapsed().as_secs_f64();
250 if elapsed == 0.0 {
251 return 0.0;
252 }
253 self.bytes_sent() as f64 / elapsed
254 }
255
256 pub fn recv_bandwidth(&self) -> f64 {
258 let elapsed = self.elapsed().as_secs_f64();
259 if elapsed == 0.0 {
260 return 0.0;
261 }
262 self.bytes_received() as f64 / elapsed
263 }
264
265 pub fn reset(&self) {
267 self.messages_sent.store(0, Ordering::Relaxed);
268 self.messages_received.store(0, Ordering::Relaxed);
269 self.bytes_sent.store(0, Ordering::Relaxed);
270 self.bytes_received.store(0, Ordering::Relaxed);
271 self.send_errors.store(0, Ordering::Relaxed);
272 self.receive_errors.store(0, Ordering::Relaxed);
273 self.queue_depth.store(0, Ordering::Relaxed);
274 self.peak_queue_depth.store(0, Ordering::Relaxed);
275 self.latency_sum_us.store(0, Ordering::Relaxed);
276 self.latency_count.store(0, Ordering::Relaxed);
277 self.min_latency_us.store(u64::MAX, Ordering::Relaxed);
278 self.max_latency_us.store(0, Ordering::Relaxed);
279 self.latency_histogram.write().reset();
280 *self.start_time.write() = Some(Instant::now());
281 }
282
283 pub fn snapshot(&self) -> MetricsSnapshot {
285 MetricsSnapshot {
286 messages_sent: self.messages_sent(),
287 messages_received: self.messages_received(),
288 bytes_sent: self.bytes_sent(),
289 bytes_received: self.bytes_received(),
290 send_errors: self.send_errors(),
291 receive_errors: self.receive_errors(),
292 queue_depth: self.queue_depth(),
293 peak_queue_depth: self.peak_queue_depth(),
294 avg_latency_us: self.avg_latency_us(),
295 min_latency_us: self.min_latency_us(),
296 max_latency_us: self.max_latency_us(),
297 p50_latency_us: self.latency_percentile(50),
298 p95_latency_us: self.latency_percentile(95),
299 p99_latency_us: self.latency_percentile(99),
300 elapsed_secs: self.elapsed().as_secs_f64(),
301 send_throughput: self.send_throughput(),
302 recv_throughput: self.recv_throughput(),
303 send_bandwidth: self.send_bandwidth(),
304 recv_bandwidth: self.recv_bandwidth(),
305 }
306 }
307
308 pub fn to_json(&self) -> String {
310 serde_json::to_string(&self.snapshot()).unwrap_or_default()
311 }
312
313 pub fn to_json_pretty(&self) -> String {
315 serde_json::to_string_pretty(&self.snapshot()).unwrap_or_default()
316 }
317
318 pub fn to_prometheus(&self, prefix: &str) -> String {
320 let snapshot = self.snapshot();
321 let mut output = String::new();
322
323 output.push_str(&format!(
324 "# HELP {prefix}_messages_sent_total Total messages sent\n"
325 ));
326 output.push_str(&format!("# TYPE {prefix}_messages_sent_total counter\n"));
327 output.push_str(&format!(
328 "{prefix}_messages_sent_total {}\n",
329 snapshot.messages_sent
330 ));
331
332 output.push_str(&format!(
333 "# HELP {prefix}_messages_received_total Total messages received\n"
334 ));
335 output.push_str(&format!(
336 "# TYPE {prefix}_messages_received_total counter\n"
337 ));
338 output.push_str(&format!(
339 "{prefix}_messages_received_total {}\n",
340 snapshot.messages_received
341 ));
342
343 output.push_str(&format!(
344 "# HELP {prefix}_bytes_sent_total Total bytes sent\n"
345 ));
346 output.push_str(&format!("# TYPE {prefix}_bytes_sent_total counter\n"));
347 output.push_str(&format!(
348 "{prefix}_bytes_sent_total {}\n",
349 snapshot.bytes_sent
350 ));
351
352 output.push_str(&format!(
353 "# HELP {prefix}_bytes_received_total Total bytes received\n"
354 ));
355 output.push_str(&format!("# TYPE {prefix}_bytes_received_total counter\n"));
356 output.push_str(&format!(
357 "{prefix}_bytes_received_total {}\n",
358 snapshot.bytes_received
359 ));
360
361 output.push_str(&format!(
362 "# HELP {prefix}_send_errors_total Total send errors\n"
363 ));
364 output.push_str(&format!("# TYPE {prefix}_send_errors_total counter\n"));
365 output.push_str(&format!(
366 "{prefix}_send_errors_total {}\n",
367 snapshot.send_errors
368 ));
369
370 output.push_str(&format!(
371 "# HELP {prefix}_receive_errors_total Total receive errors\n"
372 ));
373 output.push_str(&format!("# TYPE {prefix}_receive_errors_total counter\n"));
374 output.push_str(&format!(
375 "{prefix}_receive_errors_total {}\n",
376 snapshot.receive_errors
377 ));
378
379 output.push_str(&format!(
380 "# HELP {prefix}_queue_depth Current queue depth\n"
381 ));
382 output.push_str(&format!("# TYPE {prefix}_queue_depth gauge\n"));
383 output.push_str(&format!("{prefix}_queue_depth {}\n", snapshot.queue_depth));
384
385 output.push_str(&format!(
386 "# HELP {prefix}_latency_microseconds Latency in microseconds\n"
387 ));
388 output.push_str(&format!("# TYPE {prefix}_latency_microseconds summary\n"));
389 output.push_str(&format!(
390 "{prefix}_latency_microseconds{{quantile=\"0.5\"}} {}\n",
391 snapshot.p50_latency_us
392 ));
393 output.push_str(&format!(
394 "{prefix}_latency_microseconds{{quantile=\"0.95\"}} {}\n",
395 snapshot.p95_latency_us
396 ));
397 output.push_str(&format!(
398 "{prefix}_latency_microseconds{{quantile=\"0.99\"}} {}\n",
399 snapshot.p99_latency_us
400 ));
401
402 output.push_str(&format!(
403 "# HELP {prefix}_throughput_messages_per_second Message throughput\n"
404 ));
405 output.push_str(&format!(
406 "# TYPE {prefix}_throughput_messages_per_second gauge\n"
407 ));
408 output.push_str(&format!(
409 "{prefix}_throughput_messages_per_second{{direction=\"send\"}} {:.2}\n",
410 snapshot.send_throughput
411 ));
412 output.push_str(&format!(
413 "{prefix}_throughput_messages_per_second{{direction=\"recv\"}} {:.2}\n",
414 snapshot.recv_throughput
415 ));
416
417 output
418 }
419
420 fn ensure_started(&self) {
421 let mut start = self.start_time.write();
422 if start.is_none() {
423 *start = Some(Instant::now());
424 }
425 }
426}
427
428#[derive(Debug, Clone, Serialize, Deserialize)]
430pub struct MetricsSnapshot {
431 pub messages_sent: u64,
433 pub messages_received: u64,
435 pub bytes_sent: u64,
437 pub bytes_received: u64,
439 pub send_errors: u64,
441 pub receive_errors: u64,
443 pub queue_depth: u64,
445 pub peak_queue_depth: u64,
447 pub avg_latency_us: u64,
449 pub min_latency_us: Option<u64>,
451 pub max_latency_us: u64,
453 pub p50_latency_us: u64,
455 pub p95_latency_us: u64,
457 pub p99_latency_us: u64,
459 pub elapsed_secs: f64,
461 pub send_throughput: f64,
463 pub recv_throughput: f64,
465 pub send_bandwidth: f64,
467 pub recv_bandwidth: f64,
469}
470
471#[derive(Debug, Default)]
473struct LatencyHistogram {
474 buckets: [u64; 7],
476 samples: Vec<u64>,
478 max_samples: usize,
479}
480
481impl LatencyHistogram {
482 #[allow(dead_code)]
483 fn new() -> Self {
484 Self {
485 buckets: [0; 7],
486 samples: Vec::new(),
487 max_samples: 10000,
488 }
489 }
490
491 fn record(&mut self, latency_us: u64) {
492 let bucket = match latency_us {
494 0..=10 => 0,
495 11..=100 => 1,
496 101..=1000 => 2,
497 1001..=10000 => 3,
498 10001..=100000 => 4,
499 100001..=1000000 => 5,
500 _ => 6,
501 };
502 self.buckets[bucket] += 1;
503
504 if self.samples.len() < self.max_samples {
506 self.samples.push(latency_us);
507 } else {
508 let idx = rand_usize() % (self.samples.len() + 1);
510 if idx < self.samples.len() {
511 self.samples[idx] = latency_us;
512 }
513 }
514 }
515
516 fn percentile(&self, p: u8) -> u64 {
517 if self.samples.is_empty() {
518 return 0;
519 }
520
521 let mut sorted = self.samples.clone();
522 sorted.sort_unstable();
523
524 let idx = ((p as f64 / 100.0) * (sorted.len() - 1) as f64) as usize;
525 sorted[idx]
526 }
527
528 fn reset(&mut self) {
529 self.buckets = [0; 7];
530 self.samples.clear();
531 }
532}
533
534fn rand_usize() -> usize {
536 use std::collections::hash_map::RandomState;
537 use std::hash::{BuildHasher, Hasher};
538 RandomState::new().build_hasher().finish() as usize
539}
540
541pub trait MeteredChannel {
543 fn metrics(&self) -> &ChannelMetrics;
545}
546
547pub struct MeteredWrapper<C> {
549 inner: C,
550 metrics: ChannelMetrics,
551}
552
553impl<C> MeteredWrapper<C> {
554 pub fn new(channel: C) -> Self {
556 Self {
557 inner: channel,
558 metrics: ChannelMetrics::new(),
559 }
560 }
561
562 pub fn inner(&self) -> &C {
564 &self.inner
565 }
566
567 pub fn inner_mut(&mut self) -> &mut C {
569 &mut self.inner
570 }
571
572 pub fn into_inner(self) -> C {
574 self.inner
575 }
576}
577
578impl<C> MeteredChannel for MeteredWrapper<C> {
579 fn metrics(&self) -> &ChannelMetrics {
580 &self.metrics
581 }
582}
583
584pub trait WithMetrics: Sized {
586 fn with_metrics(self) -> MeteredWrapper<Self> {
588 MeteredWrapper::new(self)
589 }
590}
591
592impl<T> WithMetrics for T {}
594
595pub struct MeteredSender<S> {
597 inner: S,
598 metrics: std::sync::Arc<ChannelMetrics>,
599}
600
601impl<S> MeteredSender<S> {
602 pub fn new(sender: S, metrics: std::sync::Arc<ChannelMetrics>) -> Self {
604 Self {
605 inner: sender,
606 metrics,
607 }
608 }
609
610 pub fn inner(&self) -> &S {
612 &self.inner
613 }
614
615 pub fn inner_mut(&mut self) -> &mut S {
617 &mut self.inner
618 }
619
620 pub fn metrics(&self) -> &ChannelMetrics {
622 &self.metrics
623 }
624
625 pub fn into_inner(self) -> S {
627 self.inner
628 }
629}
630
631impl<S: Clone> Clone for MeteredSender<S> {
632 fn clone(&self) -> Self {
633 Self {
634 inner: self.inner.clone(),
635 metrics: self.metrics.clone(),
636 }
637 }
638}
639
640pub struct MeteredReceiver<R> {
642 inner: R,
643 metrics: std::sync::Arc<ChannelMetrics>,
644}
645
646impl<R> MeteredReceiver<R> {
647 pub fn new(receiver: R, metrics: std::sync::Arc<ChannelMetrics>) -> Self {
649 Self {
650 inner: receiver,
651 metrics,
652 }
653 }
654
655 pub fn inner(&self) -> &R {
657 &self.inner
658 }
659
660 pub fn inner_mut(&mut self) -> &mut R {
662 &mut self.inner
663 }
664
665 pub fn metrics(&self) -> &ChannelMetrics {
667 &self.metrics
668 }
669
670 pub fn into_inner(self) -> R {
672 self.inner
673 }
674}
675
676pub trait IntoMetered: Sized {
678 fn metered(self, metrics: std::sync::Arc<ChannelMetrics>) -> MeteredSender<Self> {
680 MeteredSender::new(self, metrics)
681 }
682}
683
684impl<T> IntoMetered for T {}
685
686pub fn metered_pair<S, R>(
691 sender: S,
692 receiver: R,
693) -> (
694 MeteredSender<S>,
695 MeteredReceiver<R>,
696 std::sync::Arc<ChannelMetrics>,
697) {
698 let metrics = std::sync::Arc::new(ChannelMetrics::new());
699 let metered_sender = MeteredSender::new(sender, metrics.clone());
700 let metered_receiver = MeteredReceiver::new(receiver, metrics.clone());
701 (metered_sender, metered_receiver, metrics)
702}
703
704#[derive(Debug, Default)]
706pub struct AggregatedMetrics {
707 channels: parking_lot::RwLock<Vec<std::sync::Arc<ChannelMetrics>>>,
708}
709
710impl AggregatedMetrics {
711 pub fn new() -> Self {
713 Self::default()
714 }
715
716 pub fn register(&self, metrics: std::sync::Arc<ChannelMetrics>) {
718 self.channels.write().push(metrics);
719 }
720
721 pub fn total_messages_sent(&self) -> u64 {
723 self.channels.read().iter().map(|m| m.messages_sent()).sum()
724 }
725
726 pub fn total_messages_received(&self) -> u64 {
728 self.channels
729 .read()
730 .iter()
731 .map(|m| m.messages_received())
732 .sum()
733 }
734
735 pub fn total_bytes_sent(&self) -> u64 {
737 self.channels.read().iter().map(|m| m.bytes_sent()).sum()
738 }
739
740 pub fn total_bytes_received(&self) -> u64 {
742 self.channels
743 .read()
744 .iter()
745 .map(|m| m.bytes_received())
746 .sum()
747 }
748
749 pub fn total_send_errors(&self) -> u64 {
751 self.channels.read().iter().map(|m| m.send_errors()).sum()
752 }
753
754 pub fn total_receive_errors(&self) -> u64 {
756 self.channels
757 .read()
758 .iter()
759 .map(|m| m.receive_errors())
760 .sum()
761 }
762
763 pub fn channel_count(&self) -> usize {
765 self.channels.read().len()
766 }
767
768 pub fn snapshots(&self) -> Vec<MetricsSnapshot> {
770 self.channels.read().iter().map(|m| m.snapshot()).collect()
771 }
772
773 pub fn to_json(&self) -> String {
775 let aggregate = serde_json::json!({
776 "channel_count": self.channel_count(),
777 "total_messages_sent": self.total_messages_sent(),
778 "total_messages_received": self.total_messages_received(),
779 "total_bytes_sent": self.total_bytes_sent(),
780 "total_bytes_received": self.total_bytes_received(),
781 "total_send_errors": self.total_send_errors(),
782 "total_receive_errors": self.total_receive_errors(),
783 "channels": self.snapshots(),
784 });
785 serde_json::to_string_pretty(&aggregate).unwrap_or_default()
786 }
787
788 pub fn to_prometheus(&self, prefix: &str) -> String {
790 let mut output = String::new();
791
792 output.push_str(&format!(
793 "# HELP {prefix}_channels_total Number of registered channels\n"
794 ));
795 output.push_str(&format!("# TYPE {prefix}_channels_total gauge\n"));
796 output.push_str(&format!(
797 "{prefix}_channels_total {}\n",
798 self.channel_count()
799 ));
800
801 output.push_str(&format!(
802 "# HELP {prefix}_messages_sent_total Total messages sent across all channels\n"
803 ));
804 output.push_str(&format!("# TYPE {prefix}_messages_sent_total counter\n"));
805 output.push_str(&format!(
806 "{prefix}_messages_sent_total {}\n",
807 self.total_messages_sent()
808 ));
809
810 output.push_str(&format!(
811 "# HELP {prefix}_messages_received_total Total messages received across all channels\n"
812 ));
813 output.push_str(&format!(
814 "# TYPE {prefix}_messages_received_total counter\n"
815 ));
816 output.push_str(&format!(
817 "{prefix}_messages_received_total {}\n",
818 self.total_messages_received()
819 ));
820
821 output.push_str(&format!(
822 "# HELP {prefix}_bytes_sent_total Total bytes sent across all channels\n"
823 ));
824 output.push_str(&format!("# TYPE {prefix}_bytes_sent_total counter\n"));
825 output.push_str(&format!(
826 "{prefix}_bytes_sent_total {}\n",
827 self.total_bytes_sent()
828 ));
829
830 output.push_str(&format!(
831 "# HELP {prefix}_bytes_received_total Total bytes received across all channels\n"
832 ));
833 output.push_str(&format!("# TYPE {prefix}_bytes_received_total counter\n"));
834 output.push_str(&format!(
835 "{prefix}_bytes_received_total {}\n",
836 self.total_bytes_received()
837 ));
838
839 output
840 }
841}
842
843#[cfg(test)]
844mod tests {
845 use super::*;
846
847 #[test]
848 fn test_basic_metrics() {
849 let metrics = ChannelMetrics::new();
850
851 metrics.record_send(100);
852 metrics.record_send(200);
853 metrics.record_recv(150);
854
855 assert_eq!(metrics.messages_sent(), 2);
856 assert_eq!(metrics.messages_received(), 1);
857 assert_eq!(metrics.bytes_sent(), 300);
858 assert_eq!(metrics.bytes_received(), 150);
859 }
860
861 #[test]
862 fn test_error_tracking() {
863 let metrics = ChannelMetrics::new();
864
865 metrics.record_send_error();
866 metrics.record_send_error();
867 metrics.record_recv_error();
868
869 assert_eq!(metrics.send_errors(), 2);
870 assert_eq!(metrics.receive_errors(), 1);
871 }
872
873 #[test]
874 fn test_latency_tracking() {
875 let metrics = ChannelMetrics::new();
876
877 metrics.record_latency(Duration::from_micros(100));
878 metrics.record_latency(Duration::from_micros(200));
879 metrics.record_latency(Duration::from_micros(300));
880
881 assert_eq!(metrics.avg_latency_us(), 200);
882 assert_eq!(metrics.min_latency_us(), Some(100));
883 assert_eq!(metrics.max_latency_us(), 300);
884 }
885
886 #[test]
887 fn test_queue_depth() {
888 let metrics = ChannelMetrics::new();
889
890 metrics.set_queue_depth(5);
891 assert_eq!(metrics.queue_depth(), 5);
892 assert_eq!(metrics.peak_queue_depth(), 5);
893
894 metrics.set_queue_depth(10);
895 assert_eq!(metrics.queue_depth(), 10);
896 assert_eq!(metrics.peak_queue_depth(), 10);
897
898 metrics.set_queue_depth(3);
899 assert_eq!(metrics.queue_depth(), 3);
900 assert_eq!(metrics.peak_queue_depth(), 10); }
902
903 #[test]
904 fn test_snapshot() {
905 let metrics = ChannelMetrics::new();
906 metrics.record_send(100);
907 metrics.record_recv(50);
908
909 let snapshot = metrics.snapshot();
910 assert_eq!(snapshot.messages_sent, 1);
911 assert_eq!(snapshot.messages_received, 1);
912 assert_eq!(snapshot.bytes_sent, 100);
913 assert_eq!(snapshot.bytes_received, 50);
914 }
915
916 #[test]
917 fn test_json_export() {
918 let metrics = ChannelMetrics::new();
919 metrics.record_send(100);
920
921 let json = metrics.to_json();
922 assert!(json.contains("messages_sent"));
923 assert!(json.contains("1"));
924 }
925
926 #[test]
927 fn test_prometheus_export() {
928 let metrics = ChannelMetrics::new();
929 metrics.record_send(100);
930
931 let prom = metrics.to_prometheus("ipckit");
932 assert!(prom.contains("ipckit_messages_sent_total 1"));
933 }
934
935 #[test]
936 fn test_reset() {
937 let metrics = ChannelMetrics::new();
938 metrics.record_send(100);
939 metrics.record_recv(50);
940
941 metrics.reset();
942
943 assert_eq!(metrics.messages_sent(), 0);
944 assert_eq!(metrics.messages_received(), 0);
945 assert_eq!(metrics.bytes_sent(), 0);
946 assert_eq!(metrics.bytes_received(), 0);
947 }
948
949 #[test]
950 fn test_with_metrics() {
951 struct DummyChannel;
952
953 let wrapped = DummyChannel.with_metrics();
954 wrapped.metrics().record_send(100);
955 assert_eq!(wrapped.metrics().messages_sent(), 1);
956 }
957
958 #[test]
959 fn test_metered_sender_receiver() {
960 struct DummySender;
961 struct DummyReceiver;
962
963 let (sender, receiver, metrics) = metered_pair(DummySender, DummyReceiver);
964
965 sender.metrics().record_send(100);
967 assert_eq!(receiver.metrics().messages_sent(), 1);
968 assert_eq!(metrics.messages_sent(), 1);
969 }
970
971 #[test]
972 fn test_aggregated_metrics() {
973 let agg = AggregatedMetrics::new();
974
975 let m1 = std::sync::Arc::new(ChannelMetrics::new());
976 let m2 = std::sync::Arc::new(ChannelMetrics::new());
977
978 m1.record_send(100);
979 m1.record_send(200);
980 m2.record_send(50);
981
982 agg.register(m1);
983 agg.register(m2);
984
985 assert_eq!(agg.channel_count(), 2);
986 assert_eq!(agg.total_messages_sent(), 3);
987 assert_eq!(agg.total_bytes_sent(), 350);
988 }
989}