1use std::collections::HashMap;
4use std::fmt;
5use std::os::raw::{c_double, c_int};
6use std::sync::{Mutex, OnceLock};
7use std::thread::{self, JoinHandle};
8use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
9
10#[cfg(any(feature = "pushgateway", test))]
11use crossbeam_channel::bounded;
12use crossbeam_channel::{
13 Receiver, RecvTimeoutError, Sender, TryRecvError, TrySendError, unbounded,
14};
15
16use crate::iperf::{IperfTest, RawIperfTest, Role};
17#[cfg(all(feature = "pushgateway", feature = "serde"))]
18use crate::metrics_file::MetricsFileSink;
19#[cfg(feature = "pushgateway")]
20use crate::pushgateway::PushGateway;
21use crate::{Error, Result};
22
23#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
25#[cfg_attr(feature = "serde", derive(serde::Serialize))]
26#[non_exhaustive]
27pub enum TransportProtocol {
28 #[default]
30 Unknown,
31 Tcp,
33 Udp,
35 Sctp,
37 Other(i32),
39}
40
41#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
48#[cfg_attr(feature = "serde", derive(serde::Serialize))]
49#[non_exhaustive]
50pub enum MetricDirection {
51 #[default]
53 Unknown,
54 Sender,
56 Receiver,
58 Other(i32),
60}
61
62impl TransportProtocol {
63 fn from_callback_value(value: c_int) -> Self {
64 match value {
65 1 => Self::Tcp,
66 2 => Self::Udp,
67 3 => Self::Sctp,
68 0 => Self::Unknown,
69 other => Self::Other(other),
70 }
71 }
72}
73
74impl MetricDirection {
75 fn from_callback_value(value: c_int) -> Self {
76 match value {
77 1 => Self::Sender,
78 2 => Self::Receiver,
79 0 => Self::Unknown,
80 other => Self::Other(other),
81 }
82 }
83}
84
85#[derive(Debug, Clone, Default, PartialEq)]
86#[cfg_attr(feature = "serde", derive(serde::Serialize))]
87#[non_exhaustive]
94pub struct Metrics {
95 pub timestamp_unix_seconds: f64,
97 pub role: Role,
99 pub direction: MetricDirection,
101 pub stream_count: usize,
103 pub protocol: TransportProtocol,
105 pub transferred_bytes: f64,
107 pub bandwidth_bits_per_second: f64,
109 pub tcp_retransmits: Option<f64>,
111 pub tcp_rtt_seconds: Option<f64>,
113 pub tcp_rttvar_seconds: Option<f64>,
115 pub tcp_snd_cwnd_bytes: Option<f64>,
117 pub tcp_snd_wnd_bytes: Option<f64>,
119 pub tcp_pmtu_bytes: Option<f64>,
121 pub tcp_reorder_events: Option<f64>,
123 pub udp_packets: Option<f64>,
125 pub udp_lost_packets: Option<f64>,
127 pub udp_jitter_seconds: Option<f64>,
129 pub udp_out_of_order_packets: Option<f64>,
131 pub interval_duration_seconds: f64,
133 pub omitted: bool,
135}
136
137impl Metrics {
138 pub fn new() -> Self {
140 Self::default()
141 }
142}
143
144#[derive(Debug, Clone, Copy, Default, PartialEq)]
146#[cfg_attr(feature = "serde", derive(serde::Serialize))]
147#[non_exhaustive]
148pub struct WindowGaugeStats {
149 pub samples: usize,
151 pub mean: f64,
153 pub min: f64,
155 pub max: f64,
157}
158
159impl WindowGaugeStats {
160 pub fn new() -> Self {
162 Self::default()
163 }
164}
165
166#[derive(Debug, Clone, Default, PartialEq)]
171#[cfg_attr(feature = "serde", derive(serde::Serialize))]
172#[non_exhaustive]
173pub struct WindowMetrics {
174 pub timestamp_unix_seconds: f64,
176 pub role: Role,
178 pub direction: MetricDirection,
180 pub stream_count: usize,
182 pub protocol: TransportProtocol,
184 pub duration_seconds: f64,
186 pub transferred_bytes: f64,
188 pub bandwidth_bits_per_second: WindowGaugeStats,
190 pub tcp_rtt_seconds: WindowGaugeStats,
192 pub tcp_rttvar_seconds: WindowGaugeStats,
194 pub tcp_snd_cwnd_bytes: WindowGaugeStats,
196 pub tcp_snd_wnd_bytes: WindowGaugeStats,
198 pub tcp_pmtu_bytes: WindowGaugeStats,
200 pub udp_jitter_seconds: WindowGaugeStats,
202 pub tcp_retransmits: Option<f64>,
204 pub tcp_reorder_events: Option<f64>,
206 pub udp_packets: Option<f64>,
208 pub udp_lost_packets: Option<f64>,
210 pub udp_out_of_order_packets: Option<f64>,
212 pub omitted_intervals: f64,
214}
215
216impl WindowMetrics {
217 pub fn new() -> Self {
219 Self::default()
220 }
221}
222
223#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
231#[non_exhaustive]
232pub enum MetricsMode {
233 #[default]
235 Disabled,
236 Interval,
241 Window(Duration),
247}
248
249impl MetricsMode {
250 pub const fn is_enabled(self) -> bool {
252 !matches!(self, Self::Disabled)
253 }
254
255 pub(crate) const fn callback_queue(self) -> Option<MetricsQueue> {
256 match self {
257 Self::Disabled => None,
258 Self::Interval | Self::Window(_) => Some(MetricsQueue::All),
261 }
262 }
263}
264
265#[derive(Debug, Clone, PartialEq)]
267#[cfg_attr(feature = "serde", derive(serde::Serialize))]
268#[non_exhaustive]
269pub enum MetricEvent {
270 Interval(Metrics),
272 Window(WindowMetrics),
274}
275
276#[derive(Debug, Clone, Copy, PartialEq, Eq)]
278#[non_exhaustive]
279pub enum MetricsRecvError {
280 Empty,
282 Timeout,
284 Closed,
286}
287
288impl fmt::Display for MetricsRecvError {
289 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290 match self {
291 Self::Empty => f.write_str("no metrics event is currently queued"),
292 Self::Timeout => f.write_str("timed out waiting for metrics event"),
293 Self::Closed => f.write_str("metrics stream is closed"),
294 }
295 }
296}
297
298impl std::error::Error for MetricsRecvError {}
299
300#[derive(Debug)]
308pub struct MetricsStream {
309 rx: Receiver<MetricEvent>,
310}
311
312impl MetricsStream {
313 fn new(rx: Receiver<MetricEvent>) -> Self {
314 Self { rx }
315 }
316
317 pub fn recv(&self) -> Option<MetricEvent> {
319 self.rx.recv().ok()
320 }
321
322 pub fn recv_timeout(
324 &self,
325 timeout: Duration,
326 ) -> std::result::Result<MetricEvent, MetricsRecvError> {
327 match self.rx.recv_timeout(timeout) {
328 Ok(event) => Ok(event),
329 Err(RecvTimeoutError::Timeout) => Err(MetricsRecvError::Timeout),
330 Err(RecvTimeoutError::Disconnected) => Err(MetricsRecvError::Closed),
331 }
332 }
333
334 pub fn try_recv(&self) -> std::result::Result<MetricEvent, MetricsRecvError> {
336 match self.rx.try_recv() {
337 Ok(event) => Ok(event),
338 Err(TryRecvError::Empty) => Err(MetricsRecvError::Empty),
339 Err(TryRecvError::Disconnected) => Err(MetricsRecvError::Closed),
340 }
341 }
342}
343
344impl Iterator for MetricsStream {
345 type Item = MetricEvent;
346
347 fn next(&mut self) -> Option<Self::Item> {
348 self.recv()
349 }
350}
351
352#[cfg(feature = "pushgateway")]
353pub(crate) struct IntervalMetricsReporter {
354 callback: Option<CallbackMetricsReporter>,
355 worker: Option<JoinHandle<Result<()>>>,
356}
357
358#[cfg(feature = "pushgateway")]
359pub(crate) struct MetricsSinks {
360 pushgateway: Option<PushGatewaySink>,
361 #[cfg(feature = "serde")]
362 file: Option<MetricsFileSink>,
363}
364
365#[cfg(feature = "pushgateway")]
366impl MetricsSinks {
367 pub(crate) fn new() -> Self {
368 Self {
369 pushgateway: None,
370 #[cfg(feature = "serde")]
371 file: None,
372 }
373 }
374
375 pub(crate) fn pushgateway(&mut self, sink: PushGateway, push_interval: Option<Duration>) {
376 self.pushgateway = Some(PushGatewaySink {
377 sink,
378 push_interval,
379 });
380 }
381
382 #[cfg(feature = "serde")]
383 pub(crate) fn file(&mut self, sink: MetricsFileSink) {
384 self.file = Some(sink);
385 }
386
387 #[cfg(feature = "serde")]
388 pub(crate) fn is_empty(&self) -> bool {
389 self.pushgateway.is_none() && self.file_is_empty()
390 }
391
392 fn queue(&self) -> MetricsQueue {
393 if self.file_is_present()
394 || self
395 .pushgateway
396 .as_ref()
397 .and_then(|pushgateway| pushgateway.push_interval)
398 .is_some()
399 {
400 MetricsQueue::All
401 } else {
402 MetricsQueue::Latest
403 }
404 }
405
406 #[cfg(feature = "serde")]
407 fn file_is_empty(&self) -> bool {
408 self.file.is_none()
409 }
410
411 #[cfg(feature = "serde")]
412 fn file_is_present(&self) -> bool {
413 self.file.is_some()
414 }
415
416 #[cfg(not(feature = "serde"))]
417 fn file_is_present(&self) -> bool {
418 false
419 }
420}
421
422#[cfg(feature = "pushgateway")]
423struct PushGatewaySink {
424 sink: PushGateway,
425 push_interval: Option<Duration>,
426}
427
428#[cfg(feature = "pushgateway")]
429impl IntervalMetricsReporter {
430 pub(crate) fn attach(
431 test: &mut IperfTest,
432 sink: PushGateway,
433 push_interval: Option<Duration>,
434 ) -> Result<Self> {
435 let mut sinks = MetricsSinks::new();
436 sinks.pushgateway(sink, push_interval);
437 Self::attach_sinks(test, sinks)
438 }
439
440 pub(crate) fn attach_sinks(test: &mut IperfTest, sinks: MetricsSinks) -> Result<Self> {
441 let queue = sinks.queue();
442 let (callback, rx) = CallbackMetricsReporter::attach(test, queue)?;
443
444 let worker = thread::spawn(move || run_metrics_sinks(rx, sinks));
447
448 Ok(Self {
449 callback: Some(callback),
450 worker: Some(worker),
451 })
452 }
453
454 pub(crate) fn finish(mut self) -> Result<()> {
455 self.stop()
456 }
457
458 fn stop(&mut self) -> Result<()> {
459 drop(self.callback.take());
460 if let Some(worker) = self.worker.take() {
461 worker
462 .join()
463 .map_err(|_| Error::worker("metrics sink worker thread panicked"))?
464 } else {
465 Ok(())
466 }
467 }
468}
469
470pub(crate) struct CallbackMetricsReporter {
471 test_key: usize,
472}
473
474impl CallbackMetricsReporter {
475 pub(crate) fn attach(
476 test: &mut IperfTest,
477 queue: MetricsQueue,
478 ) -> Result<(Self, Receiver<Metrics>)> {
479 let (target, rx) = callback_channel(queue, test.role());
480 let test_key = test.as_ptr() as usize;
481 callbacks()
482 .lock()
483 .map_err(|_| Error::internal("metrics callback registry is poisoned"))?
484 .insert(test_key, target);
485
486 test.enable_interval_metrics(metrics_callback);
487
488 Ok((Self { test_key }, rx))
489 }
490}
491
492impl Drop for CallbackMetricsReporter {
493 fn drop(&mut self) {
494 if let Ok(mut callbacks) = callbacks().lock() {
495 callbacks.remove(&self.test_key);
496 }
497 }
498}
499
500#[derive(Debug, Clone, Copy, PartialEq, Eq)]
501pub(crate) enum MetricsQueue {
502 #[cfg(feature = "pushgateway")]
503 Latest,
504 All,
505}
506
507fn callback_channel(queue: MetricsQueue, role: Role) -> (CallbackTarget, Receiver<Metrics>) {
508 match queue {
509 MetricsQueue::All => {
510 let (tx, rx) = unbounded::<Metrics>();
514 (
515 CallbackTarget {
516 tx,
517 latest_rx: None,
518 role,
519 },
520 rx,
521 )
522 }
523 #[cfg(feature = "pushgateway")]
524 MetricsQueue::Latest => {
525 let (tx, rx) = bounded::<Metrics>(1);
529 (
530 CallbackTarget {
531 tx,
532 latest_rx: Some(rx.clone()),
533 role,
534 },
535 rx,
536 )
537 }
538 }
539}
540
541pub(crate) fn metric_event_stream(
542 rx: Receiver<Metrics>,
543 mode: MetricsMode,
544) -> (MetricsStream, JoinHandle<()>) {
545 let (tx, event_rx) = unbounded::<MetricEvent>();
549 let worker = thread::spawn(move || match mode {
550 MetricsMode::Disabled => {}
551 MetricsMode::Interval => forward_interval_events(rx, tx),
552 MetricsMode::Window(interval) => forward_window_events(rx, tx, interval),
553 });
554 (MetricsStream::new(event_rx), worker)
555}
556
557fn forward_interval_events(rx: Receiver<Metrics>, tx: Sender<MetricEvent>) {
558 for metrics in rx {
559 if tx.send(MetricEvent::Interval(metrics)).is_err() {
560 break;
561 }
562 }
563}
564
565fn forward_window_events(rx: Receiver<Metrics>, tx: Sender<MetricEvent>, interval: Duration) {
566 let mut window = Vec::new();
567 let mut deadline = None;
568
569 loop {
570 match deadline {
571 Some(flush_at) => {
572 let now = Instant::now();
573 if now >= flush_at {
574 if !flush_window_event(&tx, &mut window) {
575 break;
576 }
577 deadline = None;
578 continue;
579 }
580
581 match rx.recv_timeout(flush_at - now) {
582 Ok(metrics) => {
583 if window_context_changes(&window, &metrics) {
584 if !flush_window_event(&tx, &mut window) {
585 break;
586 }
587 deadline = Some(window_deadline(interval));
588 }
589 window.push(metrics);
590 }
591 Err(RecvTimeoutError::Timeout) => {
592 if !flush_window_event(&tx, &mut window) {
593 break;
594 }
595 deadline = None;
596 }
597 Err(RecvTimeoutError::Disconnected) => break,
598 }
599 }
600 None => match rx.recv() {
601 Ok(metrics) => {
602 window.push(metrics);
603 deadline = Some(window_deadline(interval));
604 }
605 Err(_) => break,
606 },
607 }
608 }
609
610 let _ = flush_window_event(&tx, &mut window);
611}
612
613fn flush_window_event(tx: &Sender<MetricEvent>, window: &mut Vec<Metrics>) -> bool {
614 let Some(metrics) = aggregate_window(window) else {
615 return true;
616 };
617 window.clear();
618 tx.send(MetricEvent::Window(metrics)).is_ok()
619}
620
621fn window_deadline(interval: Duration) -> Instant {
622 Instant::now()
623 .checked_add(interval)
624 .unwrap_or_else(Instant::now)
625}
626
627fn window_context_changes(window: &[Metrics], metrics: &Metrics) -> bool {
628 window
629 .first()
630 .map(|first| !same_window_context(first, metrics))
631 .unwrap_or(false)
632}
633
634fn same_window_context(left: &Metrics, right: &Metrics) -> bool {
635 left.role == right.role
636 && left.direction == right.direction
637 && left.stream_count == right.stream_count
638 && left.protocol == right.protocol
639}
640
641#[cfg(feature = "pushgateway")]
642fn run_metrics_sinks(rx: Receiver<Metrics>, sinks: MetricsSinks) -> Result<()> {
643 match sinks
644 .pushgateway
645 .as_ref()
646 .and_then(|pushgateway| pushgateway.push_interval)
647 {
648 Some(interval) => push_window_metrics(rx, sinks, interval),
649 None => push_interval_metrics(rx, sinks),
650 }
651}
652
653#[cfg(feature = "pushgateway")]
654fn push_interval_metrics(rx: Receiver<Metrics>, sinks: MetricsSinks) -> Result<()> {
655 let mut result = Ok(());
656 for metrics in rx {
657 if let Err(err) = write_metrics_file(&sinks, &metrics) {
658 result = Err(err);
659 break;
660 }
661 push_interval_to_gateway(&sinks, &metrics);
662 }
663 delete_pushgateway_on_finish(&sinks);
664 result
665}
666
667#[cfg(feature = "pushgateway")]
668fn push_window_metrics(
669 rx: Receiver<Metrics>,
670 sinks: MetricsSinks,
671 interval: Duration,
672) -> Result<()> {
673 let mut window = Vec::new();
674 let mut deadline = None;
675 let mut result = Ok(());
676
677 loop {
678 match deadline {
679 Some(flush_at) => {
680 let now = Instant::now();
681 if now >= flush_at {
682 flush_window_metrics(&sinks, &mut window);
683 deadline = None;
684 continue;
685 }
686
687 match rx.recv_timeout(flush_at - now) {
688 Ok(metrics) => {
689 if let Err(err) = write_metrics_file(&sinks, &metrics) {
690 result = Err(err);
691 break;
692 }
693 if window_context_changes(&window, &metrics) {
694 flush_window_metrics(&sinks, &mut window);
695 deadline = Some(window_deadline(interval));
696 }
697 window.push(metrics);
698 }
699 Err(RecvTimeoutError::Timeout) => {
700 flush_window_metrics(&sinks, &mut window);
701 deadline = None;
702 }
703 Err(RecvTimeoutError::Disconnected) => break,
704 }
705 }
706 None => match rx.recv() {
707 Ok(metrics) => {
708 if let Err(err) = write_metrics_file(&sinks, &metrics) {
709 result = Err(err);
710 break;
711 }
712 window.push(metrics);
713 deadline = Some(window_deadline(interval));
714 }
715 Err(_) => break,
716 },
717 }
718 }
719
720 if result.is_ok() {
723 flush_window_metrics(&sinks, &mut window);
724 }
725 delete_pushgateway_on_finish(&sinks);
726 result
727}
728
729#[cfg(feature = "pushgateway")]
730fn push_interval_to_gateway(sinks: &MetricsSinks, metrics: &Metrics) {
731 let result = sinks
735 .pushgateway
736 .as_ref()
737 .map(|pushgateway| pushgateway.sink.push(metrics));
738 if let Some(Err(err)) = result {
739 eprintln!("failed to push metrics: {err:#}");
740 }
741}
742
743#[cfg(feature = "pushgateway")]
744fn flush_window_metrics(sinks: &MetricsSinks, window: &mut Vec<Metrics>) {
745 let Some(metrics) = aggregate_window(window) else {
746 return;
747 };
748 let result = sinks
749 .pushgateway
750 .as_ref()
751 .map(|pushgateway| pushgateway.sink.push_window(&metrics));
752 if let Some(Err(err)) = result {
753 eprintln!("failed to push window metrics: {err:#}");
754 }
755 window.clear();
756}
757
758#[cfg(all(feature = "pushgateway", feature = "serde"))]
759fn write_metrics_file(sinks: &MetricsSinks, metrics: &Metrics) -> Result<()> {
760 if let Some(file) = &sinks.file {
761 file.write_interval(metrics)?;
762 }
763 Ok(())
764}
765
766#[cfg(all(feature = "pushgateway", not(feature = "serde")))]
767fn write_metrics_file(_sinks: &MetricsSinks, _metrics: &Metrics) -> Result<()> {
768 Ok(())
769}
770
771#[cfg(feature = "pushgateway")]
772fn delete_pushgateway_on_finish(sinks: &MetricsSinks) {
773 let result = sinks
777 .pushgateway
778 .as_ref()
779 .filter(|pushgateway| pushgateway.sink.delete_on_finish())
780 .map(|pushgateway| pushgateway.sink.delete());
781 if let Some(Err(err)) = result {
782 eprintln!("failed to delete Pushgateway metrics: {err:#}");
783 }
784}
785
786#[cfg(feature = "pushgateway")]
787impl Drop for IntervalMetricsReporter {
788 fn drop(&mut self) {
789 let _ = self.stop();
790 }
791}
792
793struct CallbackTarget {
794 tx: Sender<Metrics>,
795 latest_rx: Option<Receiver<Metrics>>,
796 role: Role,
797}
798
799static CALLBACKS: OnceLock<Mutex<HashMap<usize, CallbackTarget>>> = OnceLock::new();
800
801fn callbacks() -> &'static Mutex<HashMap<usize, CallbackTarget>> {
802 CALLBACKS.get_or_init(|| Mutex::new(HashMap::new()))
805}
806
807unsafe extern "C" fn metrics_callback(
808 test: *mut RawIperfTest,
809 transferred_bytes: c_double,
810 bandwidth_bits_per_second: c_double,
811 tcp_retransmits: c_double,
812 tcp_rtt_seconds: c_double,
813 tcp_rttvar_seconds: c_double,
814 tcp_snd_cwnd_bytes: c_double,
815 tcp_snd_wnd_bytes: c_double,
816 tcp_pmtu_bytes: c_double,
817 tcp_reorder_events: c_double,
818 udp_packets: c_double,
819 udp_lost_packets: c_double,
820 udp_jitter_seconds: c_double,
821 udp_out_of_order_packets: c_double,
822 interval_duration_seconds: c_double,
823 omitted: c_double,
824 protocol: c_int,
825 direction: c_int,
826 stream_count: c_int,
827 tcp_retransmits_available: c_int,
828 tcp_rtt_seconds_available: c_int,
829 tcp_rttvar_seconds_available: c_int,
830 tcp_snd_cwnd_bytes_available: c_int,
831 tcp_snd_wnd_bytes_available: c_int,
832 tcp_pmtu_bytes_available: c_int,
833 tcp_reorder_events_available: c_int,
834 udp_packets_available: c_int,
835 udp_lost_packets_available: c_int,
836 udp_jitter_seconds_available: c_int,
837 udp_out_of_order_packets_available: c_int,
838) {
839 if test.is_null() {
840 return;
841 }
842
843 let Ok(callbacks) = callbacks().lock() else {
844 return;
845 };
846 let Some(target) = callbacks.get(&(test as usize)) else {
847 return;
848 };
849
850 enqueue_latest(
851 target,
852 Metrics {
853 timestamp_unix_seconds: current_unix_timestamp_seconds(),
854 role: target.role,
855 direction: MetricDirection::from_callback_value(direction),
856 stream_count: nonnegative_usize(stream_count),
857 protocol: TransportProtocol::from_callback_value(protocol),
858 transferred_bytes,
859 bandwidth_bits_per_second,
860 tcp_retransmits: available(tcp_retransmits_available, tcp_retransmits),
861 tcp_rtt_seconds: available(tcp_rtt_seconds_available, tcp_rtt_seconds),
862 tcp_rttvar_seconds: available(tcp_rttvar_seconds_available, tcp_rttvar_seconds),
863 tcp_snd_cwnd_bytes: available(tcp_snd_cwnd_bytes_available, tcp_snd_cwnd_bytes),
864 tcp_snd_wnd_bytes: available(tcp_snd_wnd_bytes_available, tcp_snd_wnd_bytes),
865 tcp_pmtu_bytes: available(tcp_pmtu_bytes_available, tcp_pmtu_bytes),
866 tcp_reorder_events: available(tcp_reorder_events_available, tcp_reorder_events),
867 udp_packets: available(udp_packets_available, udp_packets),
868 udp_lost_packets: available(udp_lost_packets_available, udp_lost_packets),
869 udp_jitter_seconds: available(udp_jitter_seconds_available, udp_jitter_seconds),
870 udp_out_of_order_packets: available(
871 udp_out_of_order_packets_available,
872 udp_out_of_order_packets,
873 ),
874 interval_duration_seconds,
875 omitted: omitted != 0.0,
876 },
877 );
878}
879
880fn current_unix_timestamp_seconds() -> f64 {
881 SystemTime::now()
882 .duration_since(UNIX_EPOCH)
883 .map(|duration| duration.as_secs_f64())
884 .unwrap_or(0.0)
885}
886
887fn available(flag: c_int, value: c_double) -> Option<f64> {
888 (flag != 0).then_some(value)
889}
890
891fn nonnegative_usize(value: c_int) -> usize {
892 usize::try_from(value).unwrap_or(0)
893}
894
895fn enqueue_latest(target: &CallbackTarget, metrics: Metrics) {
896 match target.tx.try_send(metrics) {
897 Ok(()) => {}
898 Err(TrySendError::Full(metrics)) => {
899 if let Some(rx) = &target.latest_rx {
901 let _ = rx.try_recv();
902 }
903 let _ = target.tx.try_send(metrics);
904 }
905 Err(TrySendError::Disconnected(_)) => {}
906 }
907}
908
909pub fn aggregate_window(samples: &[Metrics]) -> Option<WindowMetrics> {
917 if samples.is_empty() {
918 return None;
919 }
920
921 let mut bandwidth = GaugeAccumulator::default();
922 let mut tcp_rtt = GaugeAccumulator::default();
923 let mut tcp_rttvar = GaugeAccumulator::default();
924 let mut tcp_snd_cwnd = GaugeAccumulator::default();
925 let mut tcp_snd_wnd = GaugeAccumulator::default();
926 let mut tcp_pmtu = GaugeAccumulator::default();
927 let mut udp_jitter = GaugeAccumulator::default();
928
929 let mut duration_seconds = 0.0;
930 let mut transferred_bytes = 0.0;
931 let mut tcp_retransmits = OptionalCounter::default();
932 let mut tcp_reorder_events = OptionalCounter::default();
933 let mut udp_packets = OptionalCounter::default();
934 let mut udp_lost_packets = OptionalCounter::default();
935 let mut udp_out_of_order_packets = OptionalCounter::default();
936 let mut omitted_intervals = 0.0;
937 let context = &samples[0];
938
939 for metrics in samples {
940 duration_seconds += finite_nonnegative(metrics.interval_duration_seconds);
941 transferred_bytes += finite_nonnegative(metrics.transferred_bytes);
942 bandwidth.observe(metrics.bandwidth_bits_per_second);
943 tcp_rtt.observe_option(metrics.tcp_rtt_seconds);
944 tcp_rttvar.observe_option(metrics.tcp_rttvar_seconds);
945 tcp_snd_cwnd.observe_option(metrics.tcp_snd_cwnd_bytes);
946 tcp_snd_wnd.observe_option(metrics.tcp_snd_wnd_bytes);
947 tcp_pmtu.observe_option(metrics.tcp_pmtu_bytes);
948 udp_jitter.observe_option(metrics.udp_jitter_seconds);
949 tcp_retransmits.observe(metrics.tcp_retransmits);
950 tcp_reorder_events.observe(metrics.tcp_reorder_events);
951 udp_packets.observe(metrics.udp_packets);
952 udp_lost_packets.observe(metrics.udp_lost_packets);
953 udp_out_of_order_packets.observe(metrics.udp_out_of_order_packets);
954 if metrics.omitted {
955 omitted_intervals += 1.0;
956 }
957 }
958
959 let bandwidth_mean = if duration_seconds > 0.0 {
960 (transferred_bytes * 8.0) / duration_seconds
961 } else {
962 bandwidth.finish().mean
963 };
964
965 Some(WindowMetrics {
966 timestamp_unix_seconds: samples
967 .last()
968 .map(|metrics| metrics.timestamp_unix_seconds)
969 .unwrap_or_default(),
970 role: context.role,
971 direction: context.direction,
972 stream_count: context.stream_count,
973 protocol: context.protocol,
974 duration_seconds,
975 transferred_bytes,
976 bandwidth_bits_per_second: bandwidth.finish_with_mean(bandwidth_mean),
977 tcp_rtt_seconds: tcp_rtt.finish(),
978 tcp_rttvar_seconds: tcp_rttvar.finish(),
979 tcp_snd_cwnd_bytes: tcp_snd_cwnd.finish(),
980 tcp_snd_wnd_bytes: tcp_snd_wnd.finish(),
981 tcp_pmtu_bytes: tcp_pmtu.finish(),
982 udp_jitter_seconds: udp_jitter.finish(),
983 tcp_retransmits: tcp_retransmits.finish(),
984 tcp_reorder_events: tcp_reorder_events.finish(),
985 udp_packets: udp_packets.finish(),
986 udp_lost_packets: udp_lost_packets.finish(),
987 udp_out_of_order_packets: udp_out_of_order_packets.finish(),
988 omitted_intervals,
989 })
990}
991
992#[derive(Debug, Clone, Default)]
993struct GaugeAccumulator {
994 count: usize,
995 sum: f64,
996 min: f64,
997 max: f64,
998}
999
1000impl GaugeAccumulator {
1001 fn observe(&mut self, value: f64) {
1002 if !value.is_finite() {
1003 return;
1004 }
1005 if self.count == 0 {
1006 self.min = value;
1007 self.max = value;
1008 } else {
1009 self.min = self.min.min(value);
1010 self.max = self.max.max(value);
1011 }
1012 self.count += 1;
1013 self.sum += value;
1014 }
1015
1016 fn observe_option(&mut self, value: Option<f64>) {
1017 if let Some(value) = value {
1018 self.observe(value);
1019 }
1020 }
1021
1022 fn finish(&self) -> WindowGaugeStats {
1023 if self.count == 0 {
1024 return WindowGaugeStats::default();
1025 }
1026 WindowGaugeStats {
1027 samples: self.count,
1028 mean: self.sum / self.count as f64,
1029 min: self.min,
1030 max: self.max,
1031 }
1032 }
1033
1034 fn finish_with_mean(&self, mean: f64) -> WindowGaugeStats {
1035 let mut stats = self.finish();
1036 if self.count > 0 && mean.is_finite() {
1037 stats.mean = mean;
1038 }
1039 stats
1040 }
1041}
1042
1043#[derive(Debug, Clone, Default)]
1044struct OptionalCounter {
1045 observed: bool,
1046 sum: f64,
1047}
1048
1049impl OptionalCounter {
1050 fn observe(&mut self, value: Option<f64>) {
1051 let Some(value) = value else {
1052 return;
1053 };
1054 self.observed = true;
1055 self.sum += finite_nonnegative(value);
1056 }
1057
1058 fn finish(&self) -> Option<f64> {
1059 self.observed.then_some(self.sum)
1060 }
1061}
1062
1063fn finite_nonnegative(value: f64) -> f64 {
1064 if value.is_finite() && value > 0.0 {
1065 value
1066 } else {
1067 0.0
1068 }
1069}
1070
1071#[cfg(kani)]
1072mod verification {
1073 use super::*;
1074
1075 #[kani::proof]
1079 fn empty_window_has_no_summary() {
1080 assert!(aggregate_window(&[]).is_none());
1081 }
1082
1083 #[kani::proof]
1084 fn metrics_mode_callback_policy_matches_variant() {
1085 let variant: u8 = kani::any();
1086 let mode = match variant % 3 {
1087 0 => MetricsMode::Disabled,
1088 1 => MetricsMode::Interval,
1089 _ => MetricsMode::Window(Duration::from_secs(1)),
1090 };
1091
1092 assert_eq!(mode.is_enabled(), !matches!(mode, MetricsMode::Disabled));
1093 assert_eq!(mode.callback_queue().is_some(), mode.is_enabled());
1094 }
1095
1096 #[kani::proof]
1097 fn callback_availability_flag_controls_optional_metric() {
1098 let flag: c_int = kani::any();
1099 let value = f64::from(kani::any::<i16>());
1100
1101 assert_eq!(available(flag, value), (flag != 0).then_some(value));
1102 }
1103
1104 #[kani::proof]
1105 fn callback_stream_count_never_wraps_negative_values() {
1106 let raw: i16 = kani::any();
1107 let expected = if raw < 0 { 0 } else { raw as usize };
1108
1109 assert_eq!(nonnegative_usize(c_int::from(raw)), expected);
1110 }
1111
1112 #[kani::proof]
1113 fn callback_context_mappers_preserve_known_values() {
1114 let protocol: i8 = kani::any();
1115 let direction: i8 = kani::any();
1116
1117 let expected_protocol = match protocol {
1118 0 => TransportProtocol::Unknown,
1119 1 => TransportProtocol::Tcp,
1120 2 => TransportProtocol::Udp,
1121 3 => TransportProtocol::Sctp,
1122 other => TransportProtocol::Other(c_int::from(other)),
1123 };
1124 let expected_direction = match direction {
1125 0 => MetricDirection::Unknown,
1126 1 => MetricDirection::Sender,
1127 2 => MetricDirection::Receiver,
1128 other => MetricDirection::Other(c_int::from(other)),
1129 };
1130
1131 assert_eq!(
1132 TransportProtocol::from_callback_value(c_int::from(protocol)),
1133 expected_protocol
1134 );
1135 assert_eq!(
1136 MetricDirection::from_callback_value(c_int::from(direction)),
1137 expected_direction
1138 );
1139 }
1140
1141 #[kani::proof]
1142 #[kani::unwind(3)]
1143 fn window_counters_are_nonnegative_for_bounded_inputs() {
1144 let sample = Metrics {
1145 transferred_bytes: f64::from(kani::any::<i16>()),
1146 tcp_retransmits: Some(f64::from(kani::any::<i16>())),
1147 tcp_reorder_events: Some(f64::from(kani::any::<i16>())),
1148 udp_packets: Some(f64::from(kani::any::<i16>())),
1149 udp_lost_packets: Some(f64::from(kani::any::<i16>())),
1150 udp_out_of_order_packets: Some(f64::from(kani::any::<i16>())),
1151 interval_duration_seconds: f64::from(kani::any::<i16>()),
1152 omitted: kani::any(),
1153 ..Metrics::default()
1154 };
1155
1156 let window = aggregate_window(&[sample]).expect("nonempty windows summarize");
1157
1158 assert!(window.duration_seconds >= 0.0);
1159 assert!(window.transferred_bytes >= 0.0);
1160 assert!(window.tcp_retransmits.unwrap_or(0.0) >= 0.0);
1161 assert!(window.tcp_reorder_events.unwrap_or(0.0) >= 0.0);
1162 assert!(window.udp_packets.unwrap_or(0.0) >= 0.0);
1163 assert!(window.udp_lost_packets.unwrap_or(0.0) >= 0.0);
1164 assert!(window.udp_out_of_order_packets.unwrap_or(0.0) >= 0.0);
1165 assert!(window.omitted_intervals >= 0.0);
1166 }
1167
1168 #[kani::proof]
1169 #[kani::unwind(3)]
1170 fn window_bandwidth_mean_uses_total_bits_over_duration_for_unit_intervals() {
1171 let bytes_a: u8 = kani::any();
1172 let bytes_b: u8 = kani::any();
1173
1174 let samples = [
1175 metrics_with_unit_duration(bytes_a),
1176 metrics_with_unit_duration(bytes_b),
1177 ];
1178 let window = aggregate_window(&samples).expect("nonempty windows summarize");
1179
1180 let expected = ((f64::from(bytes_a) + f64::from(bytes_b)) * 8.0) / 2.0;
1181 assert_eq!(window.bandwidth_bits_per_second.mean, expected);
1182 }
1183
1184 #[kani::proof]
1185 #[kani::unwind(3)]
1186 fn window_gauge_statistics_remain_ordered_for_consistent_samples() {
1187 let bytes_a: u8 = kani::any();
1188 let bytes_b: u8 = kani::any();
1189 let rtt_a: u8 = kani::any();
1190 let rtt_b: u8 = kani::any();
1191
1192 let samples = [
1193 Metrics {
1194 transferred_bytes: f64::from(bytes_a),
1195 bandwidth_bits_per_second: f64::from(bytes_a) * 8.0,
1196 tcp_rtt_seconds: Some(f64::from(rtt_a)),
1197 interval_duration_seconds: 1.0,
1198 ..Metrics::default()
1199 },
1200 Metrics {
1201 transferred_bytes: f64::from(bytes_b),
1202 bandwidth_bits_per_second: f64::from(bytes_b) * 8.0,
1203 tcp_rtt_seconds: Some(f64::from(rtt_b)),
1204 interval_duration_seconds: 1.0,
1205 ..Metrics::default()
1206 },
1207 ];
1208 let window = aggregate_window(&samples).expect("nonempty windows summarize");
1209
1210 assert_ordered(window.bandwidth_bits_per_second);
1211 assert_ordered(window.tcp_rtt_seconds);
1212 }
1213
1214 fn metrics_with_unit_duration(bytes: u8) -> Metrics {
1215 Metrics {
1216 transferred_bytes: f64::from(bytes),
1217 bandwidth_bits_per_second: f64::from(bytes) * 8.0,
1218 interval_duration_seconds: 1.0,
1219 ..Metrics::default()
1220 }
1221 }
1222
1223 fn assert_ordered(stats: WindowGaugeStats) {
1224 assert!(stats.samples > 0);
1225 assert!(stats.min <= stats.mean);
1226 assert!(stats.mean <= stats.max);
1227 }
1228}
1229
1230#[cfg(test)]
1231mod tests {
1232 use super::*;
1233
1234 #[test]
1235 fn transport_protocol_maps_callback_values() {
1236 assert_eq!(
1237 TransportProtocol::from_callback_value(0),
1238 TransportProtocol::Unknown
1239 );
1240 assert_eq!(
1241 TransportProtocol::from_callback_value(1),
1242 TransportProtocol::Tcp
1243 );
1244 assert_eq!(
1245 TransportProtocol::from_callback_value(2),
1246 TransportProtocol::Udp
1247 );
1248 assert_eq!(
1249 TransportProtocol::from_callback_value(3),
1250 TransportProtocol::Sctp
1251 );
1252 assert_eq!(
1253 TransportProtocol::from_callback_value(99),
1254 TransportProtocol::Other(99)
1255 );
1256 }
1257
1258 #[test]
1259 fn metric_direction_maps_callback_values() {
1260 assert_eq!(
1261 MetricDirection::from_callback_value(0),
1262 MetricDirection::Unknown
1263 );
1264 assert_eq!(
1265 MetricDirection::from_callback_value(1),
1266 MetricDirection::Sender
1267 );
1268 assert_eq!(
1269 MetricDirection::from_callback_value(2),
1270 MetricDirection::Receiver
1271 );
1272 assert_eq!(
1273 MetricDirection::from_callback_value(99),
1274 MetricDirection::Other(99)
1275 );
1276 }
1277
1278 #[test]
1279 fn enqueue_latest_replaces_queued_metric() {
1280 let (tx, rx) = bounded::<Metrics>(1);
1281 let target = CallbackTarget {
1282 tx,
1283 latest_rx: Some(rx.clone()),
1284 role: Role::Client,
1285 };
1286
1287 enqueue_latest(
1288 &target,
1289 Metrics {
1290 transferred_bytes: 1.0,
1291 ..Metrics::default()
1292 },
1293 );
1294 enqueue_latest(
1295 &target,
1296 Metrics {
1297 transferred_bytes: 2.0,
1298 ..Metrics::default()
1299 },
1300 );
1301
1302 assert_eq!(rx.try_recv().unwrap().transferred_bytes, 2.0);
1303 assert!(rx.try_recv().is_err());
1304 }
1305
1306 #[test]
1307 fn metrics_stream_try_recv_reports_empty_and_closed() {
1308 let (tx, rx) = unbounded::<MetricEvent>();
1309 let stream = MetricsStream::new(rx);
1310
1311 assert_eq!(stream.try_recv(), Err(MetricsRecvError::Empty));
1312
1313 let event = MetricEvent::Interval(Metrics {
1314 transferred_bytes: 42.0,
1315 ..Metrics::default()
1316 });
1317 tx.send(event.clone()).unwrap();
1318 assert_eq!(stream.try_recv(), Ok(event));
1319
1320 drop(tx);
1321 assert_eq!(stream.try_recv(), Err(MetricsRecvError::Closed));
1322 }
1323
1324 #[test]
1325 fn metrics_stream_recv_timeout_reports_timeout_and_closed() {
1326 let (tx, rx) = unbounded::<MetricEvent>();
1327 let stream = MetricsStream::new(rx);
1328
1329 assert_eq!(
1330 stream.recv_timeout(Duration::from_millis(1)),
1331 Err(MetricsRecvError::Timeout)
1332 );
1333
1334 let event = MetricEvent::Interval(Metrics {
1335 transferred_bytes: 7.0,
1336 ..Metrics::default()
1337 });
1338 tx.send(event.clone()).unwrap();
1339 assert_eq!(stream.recv_timeout(Duration::from_secs(1)), Ok(event));
1340
1341 drop(tx);
1342 assert_eq!(
1343 stream.recv_timeout(Duration::from_secs(1)),
1344 Err(MetricsRecvError::Closed)
1345 );
1346 }
1347
1348 #[test]
1349 fn metric_event_stream_forwards_interval_samples() {
1350 let (tx, rx) = unbounded::<Metrics>();
1351 let sample = Metrics {
1352 transferred_bytes: 42.0,
1353 ..Metrics::default()
1354 };
1355 let (mut stream, worker) = metric_event_stream(rx, MetricsMode::Interval);
1356
1357 tx.send(sample.clone()).unwrap();
1358 drop(tx);
1359
1360 assert_eq!(stream.next(), Some(MetricEvent::Interval(sample)));
1361 worker.join().unwrap();
1362 assert_eq!(stream.next(), None);
1363 }
1364
1365 #[test]
1366 fn metric_event_stream_flushes_final_window() {
1367 let (tx, rx) = unbounded::<Metrics>();
1368 let (mut stream, worker) =
1369 metric_event_stream(rx, MetricsMode::Window(Duration::from_secs(60)));
1370
1371 tx.send(Metrics {
1372 timestamp_unix_seconds: 10.0,
1373 role: Role::Client,
1374 direction: MetricDirection::Sender,
1375 stream_count: 2,
1376 protocol: TransportProtocol::Tcp,
1377 transferred_bytes: 4.0,
1378 bandwidth_bits_per_second: 32.0,
1379 interval_duration_seconds: 1.0,
1380 ..Metrics::default()
1381 })
1382 .unwrap();
1383 tx.send(Metrics {
1384 timestamp_unix_seconds: 11.0,
1385 role: Role::Client,
1386 direction: MetricDirection::Sender,
1387 stream_count: 2,
1388 protocol: TransportProtocol::Tcp,
1389 transferred_bytes: 8.0,
1390 bandwidth_bits_per_second: 64.0,
1391 interval_duration_seconds: 1.0,
1392 ..Metrics::default()
1393 })
1394 .unwrap();
1395 drop(tx);
1396
1397 let Some(MetricEvent::Window(window)) = stream.next() else {
1398 panic!("expected a final window event");
1399 };
1400 assert_eq!(window.transferred_bytes, 12.0);
1401 assert_eq!(window.duration_seconds, 2.0);
1402 assert_eq!(window.bandwidth_bits_per_second.mean, 48.0);
1403 assert_eq!(window.timestamp_unix_seconds, 11.0);
1404 assert_eq!(window.role, Role::Client);
1405 assert_eq!(window.direction, MetricDirection::Sender);
1406 assert_eq!(window.stream_count, 2);
1407 assert_eq!(window.protocol, TransportProtocol::Tcp);
1408 worker.join().unwrap();
1409 assert_eq!(stream.next(), None);
1410 }
1411
1412 #[test]
1413 fn metric_event_stream_splits_windows_when_context_changes() {
1414 let (tx, rx) = unbounded::<Metrics>();
1415 let (mut stream, worker) =
1416 metric_event_stream(rx, MetricsMode::Window(Duration::from_secs(60)));
1417
1418 tx.send(Metrics {
1419 role: Role::Client,
1420 direction: MetricDirection::Sender,
1421 stream_count: 1,
1422 protocol: TransportProtocol::Tcp,
1423 transferred_bytes: 4.0,
1424 interval_duration_seconds: 1.0,
1425 ..Metrics::default()
1426 })
1427 .unwrap();
1428 tx.send(Metrics {
1429 role: Role::Client,
1430 direction: MetricDirection::Receiver,
1431 stream_count: 1,
1432 protocol: TransportProtocol::Tcp,
1433 transferred_bytes: 8.0,
1434 interval_duration_seconds: 1.0,
1435 ..Metrics::default()
1436 })
1437 .unwrap();
1438 drop(tx);
1439
1440 let Some(MetricEvent::Window(first)) = stream.next() else {
1441 panic!("expected sender window");
1442 };
1443 let Some(MetricEvent::Window(second)) = stream.next() else {
1444 panic!("expected receiver window");
1445 };
1446
1447 assert_eq!(first.transferred_bytes, 4.0);
1448 assert_eq!(first.direction, MetricDirection::Sender);
1449 assert_eq!(second.transferred_bytes, 8.0);
1450 assert_eq!(second.direction, MetricDirection::Receiver);
1451 worker.join().unwrap();
1452 assert_eq!(stream.next(), None);
1453 }
1454
1455 #[cfg(all(feature = "pushgateway", feature = "serde"))]
1456 #[test]
1457 fn metrics_file_errors_are_returned_from_sink_worker() {
1458 use std::fs;
1459 use std::time::{SystemTime, UNIX_EPOCH};
1460
1461 use crate::metrics_file::{MetricsFileFormat, MetricsFileSink};
1462
1463 let nonce = SystemTime::now()
1464 .duration_since(UNIX_EPOCH)
1465 .unwrap()
1466 .as_nanos();
1467 let path = std::env::temp_dir().join(format!(
1468 "iperf3-rs-metrics-worker-{}-{nonce}.jsonl",
1469 std::process::id()
1470 ));
1471 let sink = MetricsFileSink::new(&path, MetricsFileFormat::Jsonl).unwrap();
1472 fs::remove_file(&path).unwrap();
1473 fs::create_dir(&path).unwrap();
1474
1475 let mut sinks = MetricsSinks::new();
1476 sinks.file(sink);
1477 let (tx, rx) = unbounded();
1478 tx.send(Metrics {
1479 transferred_bytes: 1.0,
1480 interval_duration_seconds: 1.0,
1481 ..Metrics::default()
1482 })
1483 .unwrap();
1484 drop(tx);
1485
1486 let err = run_metrics_sinks(rx, sinks).unwrap_err();
1487
1488 assert_eq!(err.kind(), crate::ErrorKind::MetricsFile);
1489 let _ = fs::remove_dir(path);
1490 }
1491
1492 #[test]
1493 fn aggregate_window_returns_none_for_empty_samples() {
1494 assert!(aggregate_window(&[]).is_none());
1495 }
1496
1497 #[test]
1498 fn aggregate_window_summarizes_interval_samples_by_metric_semantics() {
1499 let window = aggregate_window(&[
1500 Metrics {
1501 timestamp_unix_seconds: 10.0,
1502 role: Role::Client,
1503 direction: MetricDirection::Sender,
1504 stream_count: 2,
1505 protocol: TransportProtocol::Tcp,
1506 transferred_bytes: 100.0,
1507 bandwidth_bits_per_second: 800.0,
1508 tcp_retransmits: Some(1.0),
1509 tcp_rtt_seconds: Some(0.010),
1510 tcp_snd_cwnd_bytes: Some(1000.0),
1511 udp_packets: Some(2.0),
1512 interval_duration_seconds: 1.0,
1513 ..Metrics::default()
1514 },
1515 Metrics {
1516 timestamp_unix_seconds: 11.0,
1517 role: Role::Client,
1518 direction: MetricDirection::Sender,
1519 stream_count: 2,
1520 protocol: TransportProtocol::Tcp,
1521 transferred_bytes: 900.0,
1522 bandwidth_bits_per_second: 2400.0,
1523 tcp_retransmits: Some(2.0),
1524 tcp_rtt_seconds: Some(0.030),
1525 tcp_snd_cwnd_bytes: Some(3000.0),
1526 udp_packets: Some(3.0),
1527 interval_duration_seconds: 3.0,
1528 omitted: true,
1529 ..Metrics::default()
1530 },
1531 ])
1532 .unwrap();
1533
1534 assert_eq!(window.duration_seconds, 4.0);
1535 assert_eq!(window.transferred_bytes, 1000.0);
1536 assert_eq!(window.timestamp_unix_seconds, 11.0);
1537 assert_eq!(window.role, Role::Client);
1538 assert_eq!(window.direction, MetricDirection::Sender);
1539 assert_eq!(window.stream_count, 2);
1540 assert_eq!(window.protocol, TransportProtocol::Tcp);
1541 assert_eq!(
1542 window.bandwidth_bits_per_second,
1543 WindowGaugeStats {
1544 samples: 2,
1545 mean: 2000.0,
1546 min: 800.0,
1547 max: 2400.0
1548 }
1549 );
1550 assert_eq!(
1551 window.tcp_rtt_seconds,
1552 WindowGaugeStats {
1553 samples: 2,
1554 mean: 0.020,
1555 min: 0.010,
1556 max: 0.030
1557 }
1558 );
1559 assert_eq!(
1560 window.tcp_snd_cwnd_bytes,
1561 WindowGaugeStats {
1562 samples: 2,
1563 mean: 2000.0,
1564 min: 1000.0,
1565 max: 3000.0
1566 }
1567 );
1568 assert_eq!(window.tcp_retransmits, Some(3.0));
1569 assert_eq!(window.udp_packets, Some(5.0));
1570 assert_eq!(window.omitted_intervals, 1.0);
1571 }
1572
1573 #[test]
1574 fn aggregate_window_falls_back_to_observed_bandwidth_when_duration_is_zero() {
1575 let window = aggregate_window(&[
1576 Metrics {
1577 transferred_bytes: 100.0,
1578 bandwidth_bits_per_second: 800.0,
1579 ..Metrics::default()
1580 },
1581 Metrics {
1582 transferred_bytes: 900.0,
1583 bandwidth_bits_per_second: 2400.0,
1584 ..Metrics::default()
1585 },
1586 ])
1587 .unwrap();
1588
1589 assert_eq!(window.duration_seconds, 0.0);
1590 assert_eq!(
1591 window.bandwidth_bits_per_second,
1592 WindowGaugeStats {
1593 samples: 2,
1594 mean: 1600.0,
1595 min: 800.0,
1596 max: 2400.0
1597 }
1598 );
1599 }
1600
1601 #[test]
1602 fn aggregate_window_ignores_invalid_counter_values() {
1603 let window = aggregate_window(&[
1604 Metrics {
1605 transferred_bytes: f64::NAN,
1606 bandwidth_bits_per_second: f64::INFINITY,
1607 tcp_retransmits: Some(-1.0),
1608 interval_duration_seconds: -1.0,
1609 ..Metrics::default()
1610 },
1611 Metrics {
1612 transferred_bytes: 8.0,
1613 bandwidth_bits_per_second: 64.0,
1614 tcp_retransmits: Some(2.0),
1615 interval_duration_seconds: 1.0,
1616 ..Metrics::default()
1617 },
1618 ])
1619 .unwrap();
1620
1621 assert_eq!(window.duration_seconds, 1.0);
1622 assert_eq!(window.transferred_bytes, 8.0);
1623 assert_eq!(window.tcp_retransmits, Some(2.0));
1624 assert_eq!(
1625 window.bandwidth_bits_per_second,
1626 WindowGaugeStats {
1627 samples: 1,
1628 mean: 64.0,
1629 min: 64.0,
1630 max: 64.0
1631 }
1632 );
1633 }
1634}