Skip to main content

iperf3_rs/
metrics.rs

1//! Metric structures and streams produced from libiperf interval callbacks.
2
3use std::collections::HashMap;
4use std::fmt;
5use std::os::raw::{c_double, c_int};
6use std::sync::{Mutex, OnceLock};
7use std::thread::{self, JoinHandle};
8use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
9
10#[cfg(any(feature = "pushgateway", test))]
11use crossbeam_channel::bounded;
12use crossbeam_channel::{
13    Receiver, RecvTimeoutError, Sender, TryRecvError, TrySendError, unbounded,
14};
15
16use crate::iperf::{IperfTest, RawIperfTest, Role};
17#[cfg(all(feature = "pushgateway", feature = "serde"))]
18use crate::metrics_file::MetricsFileSink;
19#[cfg(feature = "pushgateway")]
20use crate::pushgateway::PushGateway;
21use crate::{Error, Result};
22
23/// Transport protocol selected by libiperf for a metrics sample.
24#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
25#[cfg_attr(feature = "serde", derive(serde::Serialize))]
26#[non_exhaustive]
27pub enum TransportProtocol {
28    /// The protocol was not reported or is not currently recognized.
29    #[default]
30    Unknown,
31    /// TCP mode.
32    Tcp,
33    /// UDP mode.
34    Udp,
35    /// SCTP mode.
36    Sctp,
37    /// Another upstream protocol id.
38    Other(i32),
39}
40
41/// Direction of the libiperf streams represented by a metrics sample.
42///
43/// Each sample aggregates one direction selected by the native callback. For a
44/// bidirectional run, the current callback reports the client-side sending
45/// aggregate and the server-side receiving aggregate, not both directions from
46/// the same process.
47#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
48#[cfg_attr(feature = "serde", derive(serde::Serialize))]
49#[non_exhaustive]
50pub enum MetricDirection {
51    /// Direction was not reported.
52    #[default]
53    Unknown,
54    /// Streams sending from this iperf process.
55    Sender,
56    /// Streams receiving into this iperf process.
57    Receiver,
58    /// Another upstream direction id.
59    Other(i32),
60}
61
62impl TransportProtocol {
63    fn from_callback_value(value: c_int) -> Self {
64        match value {
65            1 => Self::Tcp,
66            2 => Self::Udp,
67            3 => Self::Sctp,
68            0 => Self::Unknown,
69            other => Self::Other(other),
70        }
71    }
72}
73
74impl MetricDirection {
75    fn from_callback_value(value: c_int) -> Self {
76        match value {
77            1 => Self::Sender,
78            2 => Self::Receiver,
79            0 => Self::Unknown,
80            other => Self::Other(other),
81        }
82    }
83}
84
85#[derive(Debug, Clone, Default, PartialEq)]
86#[cfg_attr(feature = "serde", derive(serde::Serialize))]
87/// One libiperf interval sample.
88///
89/// Fields are normalized to Prometheus-friendly units where practical.
90/// Protocol-specific fields use `Option<f64>` so callers can distinguish an
91/// observed zero from a value that libiperf or the operating system did not
92/// report for this interval.
93#[non_exhaustive]
94pub struct Metrics {
95    /// Unix timestamp, in seconds, when Rust received this interval sample.
96    pub timestamp_unix_seconds: f64,
97    /// Role of the iperf test that produced this interval.
98    pub role: Role,
99    /// Sender/receiver direction represented by this aggregate sample.
100    pub direction: MetricDirection,
101    /// Number of libiperf streams represented by this aggregate sample.
102    pub stream_count: usize,
103    /// Transport protocol used by this interval.
104    pub protocol: TransportProtocol,
105    /// Bytes transferred during the interval.
106    pub transferred_bytes: f64,
107    /// Interval throughput in bits per second.
108    pub bandwidth_bits_per_second: f64,
109    /// TCP retransmits reported for the interval.
110    pub tcp_retransmits: Option<f64>,
111    /// TCP smoothed RTT in seconds.
112    pub tcp_rtt_seconds: Option<f64>,
113    /// TCP RTT variance in seconds.
114    pub tcp_rttvar_seconds: Option<f64>,
115    /// TCP sender congestion window in bytes.
116    pub tcp_snd_cwnd_bytes: Option<f64>,
117    /// TCP sender window in bytes when available.
118    pub tcp_snd_wnd_bytes: Option<f64>,
119    /// TCP path MTU in bytes when available.
120    pub tcp_pmtu_bytes: Option<f64>,
121    /// TCP reordering events when available.
122    pub tcp_reorder_events: Option<f64>,
123    /// UDP packet count reported for the interval.
124    pub udp_packets: Option<f64>,
125    /// UDP packets inferred lost from sequence gaps.
126    pub udp_lost_packets: Option<f64>,
127    /// UDP receiver jitter in seconds.
128    pub udp_jitter_seconds: Option<f64>,
129    /// UDP out-of-order packets observed in the interval.
130    pub udp_out_of_order_packets: Option<f64>,
131    /// Interval duration in seconds.
132    pub interval_duration_seconds: f64,
133    /// Whether this sample belongs to an omitted warm-up interval.
134    pub omitted: bool,
135}
136
137impl Metrics {
138    /// Build an empty metrics sample with default values.
139    pub fn new() -> Self {
140        Self::default()
141    }
142}
143
144/// Mean, minimum, and maximum values for a gauge-like metric in a window.
145#[derive(Debug, Clone, Copy, Default, PartialEq)]
146#[cfg_attr(feature = "serde", derive(serde::Serialize))]
147#[non_exhaustive]
148pub struct WindowGaugeStats {
149    /// Number of observed samples represented by these statistics.
150    pub samples: usize,
151    /// Arithmetic mean over samples in the window.
152    pub mean: f64,
153    /// Minimum observed value in the window.
154    pub min: f64,
155    /// Maximum observed value in the window.
156    pub max: f64,
157}
158
159impl WindowGaugeStats {
160    /// Build empty gauge statistics.
161    pub fn new() -> Self {
162        Self::default()
163    }
164}
165
166/// Summary of one aggregated metrics window.
167///
168/// Counter-like fields are accumulated across the window. Gauge-like fields use
169/// [`WindowGaugeStats`].
170#[derive(Debug, Clone, Default, PartialEq)]
171#[cfg_attr(feature = "serde", derive(serde::Serialize))]
172#[non_exhaustive]
173pub struct WindowMetrics {
174    /// Unix timestamp, in seconds, of the last interval sample in this window.
175    pub timestamp_unix_seconds: f64,
176    /// Role of the iperf test that produced this window.
177    pub role: Role,
178    /// Sender/receiver direction represented by this window.
179    pub direction: MetricDirection,
180    /// Number of libiperf streams represented by this window.
181    pub stream_count: usize,
182    /// Transport protocol used by this window.
183    pub protocol: TransportProtocol,
184    /// Total interval duration represented by this window.
185    pub duration_seconds: f64,
186    /// Total bytes transferred across this window.
187    pub transferred_bytes: f64,
188    /// Bandwidth statistics in bits per second.
189    pub bandwidth_bits_per_second: WindowGaugeStats,
190    /// TCP smoothed RTT statistics in seconds.
191    pub tcp_rtt_seconds: WindowGaugeStats,
192    /// TCP RTT variance statistics in seconds.
193    pub tcp_rttvar_seconds: WindowGaugeStats,
194    /// TCP sender congestion window statistics in bytes.
195    pub tcp_snd_cwnd_bytes: WindowGaugeStats,
196    /// TCP sender window statistics in bytes.
197    pub tcp_snd_wnd_bytes: WindowGaugeStats,
198    /// TCP path MTU statistics in bytes.
199    pub tcp_pmtu_bytes: WindowGaugeStats,
200    /// UDP jitter statistics in seconds.
201    pub udp_jitter_seconds: WindowGaugeStats,
202    /// TCP retransmits accumulated across the window.
203    pub tcp_retransmits: Option<f64>,
204    /// TCP reordering events accumulated across the window.
205    pub tcp_reorder_events: Option<f64>,
206    /// UDP packet count accumulated across the window.
207    pub udp_packets: Option<f64>,
208    /// UDP lost packet count accumulated across the window.
209    pub udp_lost_packets: Option<f64>,
210    /// UDP out-of-order packet count accumulated across the window.
211    pub udp_out_of_order_packets: Option<f64>,
212    /// Number of omitted libiperf intervals in the window.
213    pub omitted_intervals: f64,
214}
215
216impl WindowMetrics {
217    /// Build an empty window summary with default values.
218    pub fn new() -> Self {
219        Self::default()
220    }
221}
222
223/// Controls whether a run emits live metrics and how interval samples are shaped.
224///
225/// Library metrics modes are every-sample contracts. `Interval` forwards each
226/// libiperf interval sample, and `Window` forwards each completed aggregation
227/// window. Internally those streams use unbounded queues so the libiperf
228/// reporting callback is not blocked by application code. Keep the returned
229/// [`MetricsStream`] drained for long-running runs, or leave metrics disabled.
230#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
231#[non_exhaustive]
232pub enum MetricsMode {
233    /// Do not register the libiperf interval callback.
234    #[default]
235    Disabled,
236    /// Emit one event for every libiperf interval sample.
237    ///
238    /// This mode preserves every sample. It is appropriate for short runs or
239    /// consumers that continuously drain the stream.
240    Interval,
241    /// Aggregate interval samples into fixed-duration summary windows.
242    ///
243    /// This mode still consumes every libiperf interval sample internally. It
244    /// emits fewer public events than `Interval`, but the stream should still be
245    /// drained for long-running runs so completed windows do not accumulate.
246    Window(Duration),
247}
248
249impl MetricsMode {
250    /// Return `true` when this mode installs the libiperf metrics callback.
251    pub const fn is_enabled(self) -> bool {
252        !matches!(self, Self::Disabled)
253    }
254
255    pub(crate) const fn callback_queue(self) -> Option<MetricsQueue> {
256        match self {
257            Self::Disabled => None,
258            // Library consumers should receive every sample. The freshness-only
259            // replacement queue is reserved for immediate Pushgateway writes.
260            Self::Interval | Self::Window(_) => Some(MetricsQueue::All),
261        }
262    }
263}
264
265/// Metric event emitted by a running iperf test.
266#[derive(Debug, Clone, PartialEq)]
267#[cfg_attr(feature = "serde", derive(serde::Serialize))]
268#[non_exhaustive]
269pub enum MetricEvent {
270    /// A raw libiperf interval sample.
271    Interval(Metrics),
272    /// A summary produced from one or more interval samples.
273    Window(WindowMetrics),
274}
275
276/// Reason a non-blocking or timed metrics receive did not return an event.
277#[derive(Debug, Clone, Copy, PartialEq, Eq)]
278#[non_exhaustive]
279pub enum MetricsRecvError {
280    /// No event was currently queued.
281    Empty,
282    /// No event arrived before the requested timeout elapsed.
283    Timeout,
284    /// The iperf run has ended and no more events can arrive.
285    Closed,
286}
287
288impl fmt::Display for MetricsRecvError {
289    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290        match self {
291            Self::Empty => f.write_str("no metrics event is currently queued"),
292            Self::Timeout => f.write_str("timed out waiting for metrics event"),
293            Self::Closed => f.write_str("metrics stream is closed"),
294        }
295    }
296}
297
298impl std::error::Error for MetricsRecvError {}
299
300/// Receiver for metric events emitted by a running iperf test.
301///
302/// A `MetricsStream` is not a bounded history buffer. `MetricsMode::Interval`
303/// and `MetricsMode::Window` preserve every emitted event, so keeping the
304/// stream alive but unread can grow memory on long-running runs. Drain it until
305/// it returns `None` or `MetricsRecvError::Closed`, or drop it when the
306/// application no longer needs live metrics.
307#[derive(Debug)]
308pub struct MetricsStream {
309    rx: Receiver<MetricEvent>,
310}
311
312impl MetricsStream {
313    fn new(rx: Receiver<MetricEvent>) -> Self {
314        Self { rx }
315    }
316
317    /// Block until the next metric event arrives or the run ends.
318    pub fn recv(&self) -> Option<MetricEvent> {
319        self.rx.recv().ok()
320    }
321
322    /// Wait for the next metric event up to `timeout`.
323    pub fn recv_timeout(
324        &self,
325        timeout: Duration,
326    ) -> std::result::Result<MetricEvent, MetricsRecvError> {
327        match self.rx.recv_timeout(timeout) {
328            Ok(event) => Ok(event),
329            Err(RecvTimeoutError::Timeout) => Err(MetricsRecvError::Timeout),
330            Err(RecvTimeoutError::Disconnected) => Err(MetricsRecvError::Closed),
331        }
332    }
333
334    /// Return the next metric event if one is already queued.
335    pub fn try_recv(&self) -> std::result::Result<MetricEvent, MetricsRecvError> {
336        match self.rx.try_recv() {
337            Ok(event) => Ok(event),
338            Err(TryRecvError::Empty) => Err(MetricsRecvError::Empty),
339            Err(TryRecvError::Disconnected) => Err(MetricsRecvError::Closed),
340        }
341    }
342}
343
344impl Iterator for MetricsStream {
345    type Item = MetricEvent;
346
347    fn next(&mut self) -> Option<Self::Item> {
348        self.recv()
349    }
350}
351
352#[cfg(feature = "pushgateway")]
353pub(crate) struct IntervalMetricsReporter {
354    callback: Option<CallbackMetricsReporter>,
355    worker: Option<JoinHandle<Result<()>>>,
356}
357
358#[cfg(feature = "pushgateway")]
359pub(crate) struct MetricsSinks {
360    pushgateway: Option<PushGatewaySink>,
361    #[cfg(feature = "serde")]
362    file: Option<MetricsFileSink>,
363}
364
365#[cfg(feature = "pushgateway")]
366impl MetricsSinks {
367    pub(crate) fn new() -> Self {
368        Self {
369            pushgateway: None,
370            #[cfg(feature = "serde")]
371            file: None,
372        }
373    }
374
375    pub(crate) fn pushgateway(&mut self, sink: PushGateway, push_interval: Option<Duration>) {
376        self.pushgateway = Some(PushGatewaySink {
377            sink,
378            push_interval,
379        });
380    }
381
382    #[cfg(feature = "serde")]
383    pub(crate) fn file(&mut self, sink: MetricsFileSink) {
384        self.file = Some(sink);
385    }
386
387    #[cfg(feature = "serde")]
388    pub(crate) fn is_empty(&self) -> bool {
389        self.pushgateway.is_none() && self.file_is_empty()
390    }
391
392    fn queue(&self) -> MetricsQueue {
393        if self.file_is_present()
394            || self
395                .pushgateway
396                .as_ref()
397                .and_then(|pushgateway| pushgateway.push_interval)
398                .is_some()
399        {
400            MetricsQueue::All
401        } else {
402            MetricsQueue::Latest
403        }
404    }
405
406    #[cfg(feature = "serde")]
407    fn file_is_empty(&self) -> bool {
408        self.file.is_none()
409    }
410
411    #[cfg(feature = "serde")]
412    fn file_is_present(&self) -> bool {
413        self.file.is_some()
414    }
415
416    #[cfg(not(feature = "serde"))]
417    fn file_is_present(&self) -> bool {
418        false
419    }
420}
421
422#[cfg(feature = "pushgateway")]
423struct PushGatewaySink {
424    sink: PushGateway,
425    push_interval: Option<Duration>,
426}
427
428#[cfg(feature = "pushgateway")]
429impl IntervalMetricsReporter {
430    pub(crate) fn attach(
431        test: &mut IperfTest,
432        sink: PushGateway,
433        push_interval: Option<Duration>,
434    ) -> Result<Self> {
435        let mut sinks = MetricsSinks::new();
436        sinks.pushgateway(sink, push_interval);
437        Self::attach_sinks(test, sinks)
438    }
439
440    pub(crate) fn attach_sinks(test: &mut IperfTest, sinks: MetricsSinks) -> Result<Self> {
441        let queue = sinks.queue();
442        let (callback, rx) = CallbackMetricsReporter::attach(test, queue)?;
443
444        // Network I/O happens off the libiperf callback path so slow or
445        // unavailable sinks do not stall the iperf test itself.
446        let worker = thread::spawn(move || run_metrics_sinks(rx, sinks));
447
448        Ok(Self {
449            callback: Some(callback),
450            worker: Some(worker),
451        })
452    }
453
454    pub(crate) fn finish(mut self) -> Result<()> {
455        self.stop()
456    }
457
458    fn stop(&mut self) -> Result<()> {
459        drop(self.callback.take());
460        if let Some(worker) = self.worker.take() {
461            worker
462                .join()
463                .map_err(|_| Error::worker("metrics sink worker thread panicked"))?
464        } else {
465            Ok(())
466        }
467    }
468}
469
470pub(crate) struct CallbackMetricsReporter {
471    test_key: usize,
472}
473
474impl CallbackMetricsReporter {
475    pub(crate) fn attach(
476        test: &mut IperfTest,
477        queue: MetricsQueue,
478    ) -> Result<(Self, Receiver<Metrics>)> {
479        let (target, rx) = callback_channel(queue, test.role());
480        let test_key = test.as_ptr() as usize;
481        callbacks()
482            .lock()
483            .map_err(|_| Error::internal("metrics callback registry is poisoned"))?
484            .insert(test_key, target);
485
486        test.enable_interval_metrics(metrics_callback);
487
488        Ok((Self { test_key }, rx))
489    }
490}
491
492impl Drop for CallbackMetricsReporter {
493    fn drop(&mut self) {
494        if let Ok(mut callbacks) = callbacks().lock() {
495            callbacks.remove(&self.test_key);
496        }
497    }
498}
499
500#[derive(Debug, Clone, Copy, PartialEq, Eq)]
501pub(crate) enum MetricsQueue {
502    #[cfg(feature = "pushgateway")]
503    Latest,
504    All,
505}
506
507fn callback_channel(queue: MetricsQueue, role: Role) -> (CallbackTarget, Receiver<Metrics>) {
508    match queue {
509        MetricsQueue::All => {
510            // Library streams promise every sample and must not block
511            // libiperf's reporting callback. This means callers own the drain
512            // responsibility for long-running streams.
513            let (tx, rx) = unbounded::<Metrics>();
514            (
515                CallbackTarget {
516                    tx,
517                    latest_rx: None,
518                    role,
519                },
520                rx,
521            )
522        }
523        #[cfg(feature = "pushgateway")]
524        MetricsQueue::Latest => {
525            // Pushgateway stores only the latest value for a grouping key.
526            // Keep the callback nonblocking and replace stale queued samples if
527            // HTTP writes fall behind.
528            let (tx, rx) = bounded::<Metrics>(1);
529            (
530                CallbackTarget {
531                    tx,
532                    latest_rx: Some(rx.clone()),
533                    role,
534                },
535                rx,
536            )
537        }
538    }
539}
540
541pub(crate) fn metric_event_stream(
542    rx: Receiver<Metrics>,
543    mode: MetricsMode,
544) -> (MetricsStream, JoinHandle<()>) {
545    // The public stream is also unbounded to preserve every event. This keeps
546    // the metrics worker simple and nonblocking, but it makes unread streams a
547    // caller-visible memory risk on long-running runs.
548    let (tx, event_rx) = unbounded::<MetricEvent>();
549    let worker = thread::spawn(move || match mode {
550        MetricsMode::Disabled => {}
551        MetricsMode::Interval => forward_interval_events(rx, tx),
552        MetricsMode::Window(interval) => forward_window_events(rx, tx, interval),
553    });
554    (MetricsStream::new(event_rx), worker)
555}
556
557fn forward_interval_events(rx: Receiver<Metrics>, tx: Sender<MetricEvent>) {
558    for metrics in rx {
559        if tx.send(MetricEvent::Interval(metrics)).is_err() {
560            break;
561        }
562    }
563}
564
565fn forward_window_events(rx: Receiver<Metrics>, tx: Sender<MetricEvent>, interval: Duration) {
566    let mut window = Vec::new();
567    let mut deadline = None;
568
569    loop {
570        match deadline {
571            Some(flush_at) => {
572                let now = Instant::now();
573                if now >= flush_at {
574                    if !flush_window_event(&tx, &mut window) {
575                        break;
576                    }
577                    deadline = None;
578                    continue;
579                }
580
581                match rx.recv_timeout(flush_at - now) {
582                    Ok(metrics) => {
583                        if window_context_changes(&window, &metrics) {
584                            if !flush_window_event(&tx, &mut window) {
585                                break;
586                            }
587                            deadline = Some(window_deadline(interval));
588                        }
589                        window.push(metrics);
590                    }
591                    Err(RecvTimeoutError::Timeout) => {
592                        if !flush_window_event(&tx, &mut window) {
593                            break;
594                        }
595                        deadline = None;
596                    }
597                    Err(RecvTimeoutError::Disconnected) => break,
598                }
599            }
600            None => match rx.recv() {
601                Ok(metrics) => {
602                    window.push(metrics);
603                    deadline = Some(window_deadline(interval));
604                }
605                Err(_) => break,
606            },
607        }
608    }
609
610    let _ = flush_window_event(&tx, &mut window);
611}
612
613fn flush_window_event(tx: &Sender<MetricEvent>, window: &mut Vec<Metrics>) -> bool {
614    let Some(metrics) = aggregate_window(window) else {
615        return true;
616    };
617    window.clear();
618    tx.send(MetricEvent::Window(metrics)).is_ok()
619}
620
621fn window_deadline(interval: Duration) -> Instant {
622    Instant::now()
623        .checked_add(interval)
624        .unwrap_or_else(Instant::now)
625}
626
627fn window_context_changes(window: &[Metrics], metrics: &Metrics) -> bool {
628    window
629        .first()
630        .map(|first| !same_window_context(first, metrics))
631        .unwrap_or(false)
632}
633
634fn same_window_context(left: &Metrics, right: &Metrics) -> bool {
635    left.role == right.role
636        && left.direction == right.direction
637        && left.stream_count == right.stream_count
638        && left.protocol == right.protocol
639}
640
641#[cfg(feature = "pushgateway")]
642fn run_metrics_sinks(rx: Receiver<Metrics>, sinks: MetricsSinks) -> Result<()> {
643    match sinks
644        .pushgateway
645        .as_ref()
646        .and_then(|pushgateway| pushgateway.push_interval)
647    {
648        Some(interval) => push_window_metrics(rx, sinks, interval),
649        None => push_interval_metrics(rx, sinks),
650    }
651}
652
653#[cfg(feature = "pushgateway")]
654fn push_interval_metrics(rx: Receiver<Metrics>, sinks: MetricsSinks) -> Result<()> {
655    let mut result = Ok(());
656    for metrics in rx {
657        if let Err(err) = write_metrics_file(&sinks, &metrics) {
658            result = Err(err);
659            break;
660        }
661        push_interval_to_gateway(&sinks, &metrics);
662    }
663    delete_pushgateway_on_finish(&sinks);
664    result
665}
666
667#[cfg(feature = "pushgateway")]
668fn push_window_metrics(
669    rx: Receiver<Metrics>,
670    sinks: MetricsSinks,
671    interval: Duration,
672) -> Result<()> {
673    let mut window = Vec::new();
674    let mut deadline = None;
675    let mut result = Ok(());
676
677    loop {
678        match deadline {
679            Some(flush_at) => {
680                let now = Instant::now();
681                if now >= flush_at {
682                    flush_window_metrics(&sinks, &mut window);
683                    deadline = None;
684                    continue;
685                }
686
687                match rx.recv_timeout(flush_at - now) {
688                    Ok(metrics) => {
689                        if let Err(err) = write_metrics_file(&sinks, &metrics) {
690                            result = Err(err);
691                            break;
692                        }
693                        if window_context_changes(&window, &metrics) {
694                            flush_window_metrics(&sinks, &mut window);
695                            deadline = Some(window_deadline(interval));
696                        }
697                        window.push(metrics);
698                    }
699                    Err(RecvTimeoutError::Timeout) => {
700                        flush_window_metrics(&sinks, &mut window);
701                        deadline = None;
702                    }
703                    Err(RecvTimeoutError::Disconnected) => break,
704                }
705            }
706            None => match rx.recv() {
707                Ok(metrics) => {
708                    if let Err(err) = write_metrics_file(&sinks, &metrics) {
709                        result = Err(err);
710                        break;
711                    }
712                    window.push(metrics);
713                    deadline = Some(window_deadline(interval));
714                }
715                Err(_) => break,
716            },
717        }
718    }
719
720    // The final iperf interval often arrives shortly before the process exits.
721    // Flush a partial window so short tests still publish useful summaries.
722    if result.is_ok() {
723        flush_window_metrics(&sinks, &mut window);
724    }
725    delete_pushgateway_on_finish(&sinks);
726    result
727}
728
729#[cfg(feature = "pushgateway")]
730fn push_interval_to_gateway(sinks: &MetricsSinks, metrics: &Metrics) {
731    // Pushgateway delivery is intentionally best-effort. File metrics are the
732    // required artifact path; transient Pushgateway failures should not change
733    // the iperf run's exit status.
734    let result = sinks
735        .pushgateway
736        .as_ref()
737        .map(|pushgateway| pushgateway.sink.push(metrics));
738    if let Some(Err(err)) = result {
739        eprintln!("failed to push metrics: {err:#}");
740    }
741}
742
743#[cfg(feature = "pushgateway")]
744fn flush_window_metrics(sinks: &MetricsSinks, window: &mut Vec<Metrics>) {
745    let Some(metrics) = aggregate_window(window) else {
746        return;
747    };
748    let result = sinks
749        .pushgateway
750        .as_ref()
751        .map(|pushgateway| pushgateway.sink.push_window(&metrics));
752    if let Some(Err(err)) = result {
753        eprintln!("failed to push window metrics: {err:#}");
754    }
755    window.clear();
756}
757
758#[cfg(all(feature = "pushgateway", feature = "serde"))]
759fn write_metrics_file(sinks: &MetricsSinks, metrics: &Metrics) -> Result<()> {
760    if let Some(file) = &sinks.file {
761        file.write_interval(metrics)?;
762    }
763    Ok(())
764}
765
766#[cfg(all(feature = "pushgateway", not(feature = "serde")))]
767fn write_metrics_file(_sinks: &MetricsSinks, _metrics: &Metrics) -> Result<()> {
768    Ok(())
769}
770
771#[cfg(feature = "pushgateway")]
772fn delete_pushgateway_on_finish(sinks: &MetricsSinks) {
773    // Deleting a retained Pushgateway group has the same best-effort contract
774    // as pushing samples. Operators can rely on warnings without turning a
775    // successful bandwidth test into a failed process exit.
776    let result = sinks
777        .pushgateway
778        .as_ref()
779        .filter(|pushgateway| pushgateway.sink.delete_on_finish())
780        .map(|pushgateway| pushgateway.sink.delete());
781    if let Some(Err(err)) = result {
782        eprintln!("failed to delete Pushgateway metrics: {err:#}");
783    }
784}
785
786#[cfg(feature = "pushgateway")]
787impl Drop for IntervalMetricsReporter {
788    fn drop(&mut self) {
789        let _ = self.stop();
790    }
791}
792
793struct CallbackTarget {
794    tx: Sender<Metrics>,
795    latest_rx: Option<Receiver<Metrics>>,
796    role: Role,
797}
798
799static CALLBACKS: OnceLock<Mutex<HashMap<usize, CallbackTarget>>> = OnceLock::new();
800
801fn callbacks() -> &'static Mutex<HashMap<usize, CallbackTarget>> {
802    // The same extern callback is registered for every test, so dispatch by the
803    // iperf_test pointer passed back from C.
804    CALLBACKS.get_or_init(|| Mutex::new(HashMap::new()))
805}
806
807unsafe extern "C" fn metrics_callback(
808    test: *mut RawIperfTest,
809    transferred_bytes: c_double,
810    bandwidth_bits_per_second: c_double,
811    tcp_retransmits: c_double,
812    tcp_rtt_seconds: c_double,
813    tcp_rttvar_seconds: c_double,
814    tcp_snd_cwnd_bytes: c_double,
815    tcp_snd_wnd_bytes: c_double,
816    tcp_pmtu_bytes: c_double,
817    tcp_reorder_events: c_double,
818    udp_packets: c_double,
819    udp_lost_packets: c_double,
820    udp_jitter_seconds: c_double,
821    udp_out_of_order_packets: c_double,
822    interval_duration_seconds: c_double,
823    omitted: c_double,
824    protocol: c_int,
825    direction: c_int,
826    stream_count: c_int,
827    tcp_retransmits_available: c_int,
828    tcp_rtt_seconds_available: c_int,
829    tcp_rttvar_seconds_available: c_int,
830    tcp_snd_cwnd_bytes_available: c_int,
831    tcp_snd_wnd_bytes_available: c_int,
832    tcp_pmtu_bytes_available: c_int,
833    tcp_reorder_events_available: c_int,
834    udp_packets_available: c_int,
835    udp_lost_packets_available: c_int,
836    udp_jitter_seconds_available: c_int,
837    udp_out_of_order_packets_available: c_int,
838) {
839    if test.is_null() {
840        return;
841    }
842
843    let Ok(callbacks) = callbacks().lock() else {
844        return;
845    };
846    let Some(target) = callbacks.get(&(test as usize)) else {
847        return;
848    };
849
850    enqueue_latest(
851        target,
852        Metrics {
853            timestamp_unix_seconds: current_unix_timestamp_seconds(),
854            role: target.role,
855            direction: MetricDirection::from_callback_value(direction),
856            stream_count: nonnegative_usize(stream_count),
857            protocol: TransportProtocol::from_callback_value(protocol),
858            transferred_bytes,
859            bandwidth_bits_per_second,
860            tcp_retransmits: available(tcp_retransmits_available, tcp_retransmits),
861            tcp_rtt_seconds: available(tcp_rtt_seconds_available, tcp_rtt_seconds),
862            tcp_rttvar_seconds: available(tcp_rttvar_seconds_available, tcp_rttvar_seconds),
863            tcp_snd_cwnd_bytes: available(tcp_snd_cwnd_bytes_available, tcp_snd_cwnd_bytes),
864            tcp_snd_wnd_bytes: available(tcp_snd_wnd_bytes_available, tcp_snd_wnd_bytes),
865            tcp_pmtu_bytes: available(tcp_pmtu_bytes_available, tcp_pmtu_bytes),
866            tcp_reorder_events: available(tcp_reorder_events_available, tcp_reorder_events),
867            udp_packets: available(udp_packets_available, udp_packets),
868            udp_lost_packets: available(udp_lost_packets_available, udp_lost_packets),
869            udp_jitter_seconds: available(udp_jitter_seconds_available, udp_jitter_seconds),
870            udp_out_of_order_packets: available(
871                udp_out_of_order_packets_available,
872                udp_out_of_order_packets,
873            ),
874            interval_duration_seconds,
875            omitted: omitted != 0.0,
876        },
877    );
878}
879
880fn current_unix_timestamp_seconds() -> f64 {
881    SystemTime::now()
882        .duration_since(UNIX_EPOCH)
883        .map(|duration| duration.as_secs_f64())
884        .unwrap_or(0.0)
885}
886
887fn available(flag: c_int, value: c_double) -> Option<f64> {
888    (flag != 0).then_some(value)
889}
890
891fn nonnegative_usize(value: c_int) -> usize {
892    usize::try_from(value).unwrap_or(0)
893}
894
895fn enqueue_latest(target: &CallbackTarget, metrics: Metrics) {
896    match target.tx.try_send(metrics) {
897        Ok(()) => {}
898        Err(TrySendError::Full(metrics)) => {
899            // Prefer freshness over completeness when pushes fall behind.
900            if let Some(rx) = &target.latest_rx {
901                let _ = rx.try_recv();
902            }
903            let _ = target.tx.try_send(metrics);
904        }
905        Err(TrySendError::Disconnected(_)) => {}
906    }
907}
908
909/// Aggregate raw interval samples into one representative window.
910///
911/// Counter-like fields are summed. Gauge-like fields return mean/min/max
912/// statistics. Invalid and negative counter values are treated as zero.
913/// The returned context is copied from the first sample, with the timestamp
914/// taken from the last sample. Built-in window streams flush before context
915/// changes so a window does not mix role, direction, stream count, or protocol.
916pub fn aggregate_window(samples: &[Metrics]) -> Option<WindowMetrics> {
917    if samples.is_empty() {
918        return None;
919    }
920
921    let mut bandwidth = GaugeAccumulator::default();
922    let mut tcp_rtt = GaugeAccumulator::default();
923    let mut tcp_rttvar = GaugeAccumulator::default();
924    let mut tcp_snd_cwnd = GaugeAccumulator::default();
925    let mut tcp_snd_wnd = GaugeAccumulator::default();
926    let mut tcp_pmtu = GaugeAccumulator::default();
927    let mut udp_jitter = GaugeAccumulator::default();
928
929    let mut duration_seconds = 0.0;
930    let mut transferred_bytes = 0.0;
931    let mut tcp_retransmits = OptionalCounter::default();
932    let mut tcp_reorder_events = OptionalCounter::default();
933    let mut udp_packets = OptionalCounter::default();
934    let mut udp_lost_packets = OptionalCounter::default();
935    let mut udp_out_of_order_packets = OptionalCounter::default();
936    let mut omitted_intervals = 0.0;
937    let context = &samples[0];
938
939    for metrics in samples {
940        duration_seconds += finite_nonnegative(metrics.interval_duration_seconds);
941        transferred_bytes += finite_nonnegative(metrics.transferred_bytes);
942        bandwidth.observe(metrics.bandwidth_bits_per_second);
943        tcp_rtt.observe_option(metrics.tcp_rtt_seconds);
944        tcp_rttvar.observe_option(metrics.tcp_rttvar_seconds);
945        tcp_snd_cwnd.observe_option(metrics.tcp_snd_cwnd_bytes);
946        tcp_snd_wnd.observe_option(metrics.tcp_snd_wnd_bytes);
947        tcp_pmtu.observe_option(metrics.tcp_pmtu_bytes);
948        udp_jitter.observe_option(metrics.udp_jitter_seconds);
949        tcp_retransmits.observe(metrics.tcp_retransmits);
950        tcp_reorder_events.observe(metrics.tcp_reorder_events);
951        udp_packets.observe(metrics.udp_packets);
952        udp_lost_packets.observe(metrics.udp_lost_packets);
953        udp_out_of_order_packets.observe(metrics.udp_out_of_order_packets);
954        if metrics.omitted {
955            omitted_intervals += 1.0;
956        }
957    }
958
959    let bandwidth_mean = if duration_seconds > 0.0 {
960        (transferred_bytes * 8.0) / duration_seconds
961    } else {
962        bandwidth.finish().mean
963    };
964
965    Some(WindowMetrics {
966        timestamp_unix_seconds: samples
967            .last()
968            .map(|metrics| metrics.timestamp_unix_seconds)
969            .unwrap_or_default(),
970        role: context.role,
971        direction: context.direction,
972        stream_count: context.stream_count,
973        protocol: context.protocol,
974        duration_seconds,
975        transferred_bytes,
976        bandwidth_bits_per_second: bandwidth.finish_with_mean(bandwidth_mean),
977        tcp_rtt_seconds: tcp_rtt.finish(),
978        tcp_rttvar_seconds: tcp_rttvar.finish(),
979        tcp_snd_cwnd_bytes: tcp_snd_cwnd.finish(),
980        tcp_snd_wnd_bytes: tcp_snd_wnd.finish(),
981        tcp_pmtu_bytes: tcp_pmtu.finish(),
982        udp_jitter_seconds: udp_jitter.finish(),
983        tcp_retransmits: tcp_retransmits.finish(),
984        tcp_reorder_events: tcp_reorder_events.finish(),
985        udp_packets: udp_packets.finish(),
986        udp_lost_packets: udp_lost_packets.finish(),
987        udp_out_of_order_packets: udp_out_of_order_packets.finish(),
988        omitted_intervals,
989    })
990}
991
992#[derive(Debug, Clone, Default)]
993struct GaugeAccumulator {
994    count: usize,
995    sum: f64,
996    min: f64,
997    max: f64,
998}
999
1000impl GaugeAccumulator {
1001    fn observe(&mut self, value: f64) {
1002        if !value.is_finite() {
1003            return;
1004        }
1005        if self.count == 0 {
1006            self.min = value;
1007            self.max = value;
1008        } else {
1009            self.min = self.min.min(value);
1010            self.max = self.max.max(value);
1011        }
1012        self.count += 1;
1013        self.sum += value;
1014    }
1015
1016    fn observe_option(&mut self, value: Option<f64>) {
1017        if let Some(value) = value {
1018            self.observe(value);
1019        }
1020    }
1021
1022    fn finish(&self) -> WindowGaugeStats {
1023        if self.count == 0 {
1024            return WindowGaugeStats::default();
1025        }
1026        WindowGaugeStats {
1027            samples: self.count,
1028            mean: self.sum / self.count as f64,
1029            min: self.min,
1030            max: self.max,
1031        }
1032    }
1033
1034    fn finish_with_mean(&self, mean: f64) -> WindowGaugeStats {
1035        let mut stats = self.finish();
1036        if self.count > 0 && mean.is_finite() {
1037            stats.mean = mean;
1038        }
1039        stats
1040    }
1041}
1042
1043#[derive(Debug, Clone, Default)]
1044struct OptionalCounter {
1045    observed: bool,
1046    sum: f64,
1047}
1048
1049impl OptionalCounter {
1050    fn observe(&mut self, value: Option<f64>) {
1051        let Some(value) = value else {
1052            return;
1053        };
1054        self.observed = true;
1055        self.sum += finite_nonnegative(value);
1056    }
1057
1058    fn finish(&self) -> Option<f64> {
1059        self.observed.then_some(self.sum)
1060    }
1061}
1062
1063fn finite_nonnegative(value: f64) -> f64 {
1064    if value.is_finite() && value > 0.0 {
1065        value
1066    } else {
1067        0.0
1068    }
1069}
1070
1071#[cfg(kani)]
1072mod verification {
1073    use super::*;
1074
1075    // Keep symbolic domains small and concrete enough that Kani explores the
1076    // aggregation logic itself instead of spending the budget on floating-point
1077    // arithmetic edge cases already handled by `finite_nonnegative`.
1078    #[kani::proof]
1079    fn empty_window_has_no_summary() {
1080        assert!(aggregate_window(&[]).is_none());
1081    }
1082
1083    #[kani::proof]
1084    fn metrics_mode_callback_policy_matches_variant() {
1085        let variant: u8 = kani::any();
1086        let mode = match variant % 3 {
1087            0 => MetricsMode::Disabled,
1088            1 => MetricsMode::Interval,
1089            _ => MetricsMode::Window(Duration::from_secs(1)),
1090        };
1091
1092        assert_eq!(mode.is_enabled(), !matches!(mode, MetricsMode::Disabled));
1093        assert_eq!(mode.callback_queue().is_some(), mode.is_enabled());
1094    }
1095
1096    #[kani::proof]
1097    fn callback_availability_flag_controls_optional_metric() {
1098        let flag: c_int = kani::any();
1099        let value = f64::from(kani::any::<i16>());
1100
1101        assert_eq!(available(flag, value), (flag != 0).then_some(value));
1102    }
1103
1104    #[kani::proof]
1105    fn callback_stream_count_never_wraps_negative_values() {
1106        let raw: i16 = kani::any();
1107        let expected = if raw < 0 { 0 } else { raw as usize };
1108
1109        assert_eq!(nonnegative_usize(c_int::from(raw)), expected);
1110    }
1111
1112    #[kani::proof]
1113    fn callback_context_mappers_preserve_known_values() {
1114        let protocol: i8 = kani::any();
1115        let direction: i8 = kani::any();
1116
1117        let expected_protocol = match protocol {
1118            0 => TransportProtocol::Unknown,
1119            1 => TransportProtocol::Tcp,
1120            2 => TransportProtocol::Udp,
1121            3 => TransportProtocol::Sctp,
1122            other => TransportProtocol::Other(c_int::from(other)),
1123        };
1124        let expected_direction = match direction {
1125            0 => MetricDirection::Unknown,
1126            1 => MetricDirection::Sender,
1127            2 => MetricDirection::Receiver,
1128            other => MetricDirection::Other(c_int::from(other)),
1129        };
1130
1131        assert_eq!(
1132            TransportProtocol::from_callback_value(c_int::from(protocol)),
1133            expected_protocol
1134        );
1135        assert_eq!(
1136            MetricDirection::from_callback_value(c_int::from(direction)),
1137            expected_direction
1138        );
1139    }
1140
1141    #[kani::proof]
1142    #[kani::unwind(3)]
1143    fn window_counters_are_nonnegative_for_bounded_inputs() {
1144        let sample = Metrics {
1145            transferred_bytes: f64::from(kani::any::<i16>()),
1146            tcp_retransmits: Some(f64::from(kani::any::<i16>())),
1147            tcp_reorder_events: Some(f64::from(kani::any::<i16>())),
1148            udp_packets: Some(f64::from(kani::any::<i16>())),
1149            udp_lost_packets: Some(f64::from(kani::any::<i16>())),
1150            udp_out_of_order_packets: Some(f64::from(kani::any::<i16>())),
1151            interval_duration_seconds: f64::from(kani::any::<i16>()),
1152            omitted: kani::any(),
1153            ..Metrics::default()
1154        };
1155
1156        let window = aggregate_window(&[sample]).expect("nonempty windows summarize");
1157
1158        assert!(window.duration_seconds >= 0.0);
1159        assert!(window.transferred_bytes >= 0.0);
1160        assert!(window.tcp_retransmits.unwrap_or(0.0) >= 0.0);
1161        assert!(window.tcp_reorder_events.unwrap_or(0.0) >= 0.0);
1162        assert!(window.udp_packets.unwrap_or(0.0) >= 0.0);
1163        assert!(window.udp_lost_packets.unwrap_or(0.0) >= 0.0);
1164        assert!(window.udp_out_of_order_packets.unwrap_or(0.0) >= 0.0);
1165        assert!(window.omitted_intervals >= 0.0);
1166    }
1167
1168    #[kani::proof]
1169    #[kani::unwind(3)]
1170    fn window_bandwidth_mean_uses_total_bits_over_duration_for_unit_intervals() {
1171        let bytes_a: u8 = kani::any();
1172        let bytes_b: u8 = kani::any();
1173
1174        let samples = [
1175            metrics_with_unit_duration(bytes_a),
1176            metrics_with_unit_duration(bytes_b),
1177        ];
1178        let window = aggregate_window(&samples).expect("nonempty windows summarize");
1179
1180        let expected = ((f64::from(bytes_a) + f64::from(bytes_b)) * 8.0) / 2.0;
1181        assert_eq!(window.bandwidth_bits_per_second.mean, expected);
1182    }
1183
1184    #[kani::proof]
1185    #[kani::unwind(3)]
1186    fn window_gauge_statistics_remain_ordered_for_consistent_samples() {
1187        let bytes_a: u8 = kani::any();
1188        let bytes_b: u8 = kani::any();
1189        let rtt_a: u8 = kani::any();
1190        let rtt_b: u8 = kani::any();
1191
1192        let samples = [
1193            Metrics {
1194                transferred_bytes: f64::from(bytes_a),
1195                bandwidth_bits_per_second: f64::from(bytes_a) * 8.0,
1196                tcp_rtt_seconds: Some(f64::from(rtt_a)),
1197                interval_duration_seconds: 1.0,
1198                ..Metrics::default()
1199            },
1200            Metrics {
1201                transferred_bytes: f64::from(bytes_b),
1202                bandwidth_bits_per_second: f64::from(bytes_b) * 8.0,
1203                tcp_rtt_seconds: Some(f64::from(rtt_b)),
1204                interval_duration_seconds: 1.0,
1205                ..Metrics::default()
1206            },
1207        ];
1208        let window = aggregate_window(&samples).expect("nonempty windows summarize");
1209
1210        assert_ordered(window.bandwidth_bits_per_second);
1211        assert_ordered(window.tcp_rtt_seconds);
1212    }
1213
1214    fn metrics_with_unit_duration(bytes: u8) -> Metrics {
1215        Metrics {
1216            transferred_bytes: f64::from(bytes),
1217            bandwidth_bits_per_second: f64::from(bytes) * 8.0,
1218            interval_duration_seconds: 1.0,
1219            ..Metrics::default()
1220        }
1221    }
1222
1223    fn assert_ordered(stats: WindowGaugeStats) {
1224        assert!(stats.samples > 0);
1225        assert!(stats.min <= stats.mean);
1226        assert!(stats.mean <= stats.max);
1227    }
1228}
1229
1230#[cfg(test)]
1231mod tests {
1232    use super::*;
1233
1234    #[test]
1235    fn transport_protocol_maps_callback_values() {
1236        assert_eq!(
1237            TransportProtocol::from_callback_value(0),
1238            TransportProtocol::Unknown
1239        );
1240        assert_eq!(
1241            TransportProtocol::from_callback_value(1),
1242            TransportProtocol::Tcp
1243        );
1244        assert_eq!(
1245            TransportProtocol::from_callback_value(2),
1246            TransportProtocol::Udp
1247        );
1248        assert_eq!(
1249            TransportProtocol::from_callback_value(3),
1250            TransportProtocol::Sctp
1251        );
1252        assert_eq!(
1253            TransportProtocol::from_callback_value(99),
1254            TransportProtocol::Other(99)
1255        );
1256    }
1257
1258    #[test]
1259    fn metric_direction_maps_callback_values() {
1260        assert_eq!(
1261            MetricDirection::from_callback_value(0),
1262            MetricDirection::Unknown
1263        );
1264        assert_eq!(
1265            MetricDirection::from_callback_value(1),
1266            MetricDirection::Sender
1267        );
1268        assert_eq!(
1269            MetricDirection::from_callback_value(2),
1270            MetricDirection::Receiver
1271        );
1272        assert_eq!(
1273            MetricDirection::from_callback_value(99),
1274            MetricDirection::Other(99)
1275        );
1276    }
1277
1278    #[test]
1279    fn enqueue_latest_replaces_queued_metric() {
1280        let (tx, rx) = bounded::<Metrics>(1);
1281        let target = CallbackTarget {
1282            tx,
1283            latest_rx: Some(rx.clone()),
1284            role: Role::Client,
1285        };
1286
1287        enqueue_latest(
1288            &target,
1289            Metrics {
1290                transferred_bytes: 1.0,
1291                ..Metrics::default()
1292            },
1293        );
1294        enqueue_latest(
1295            &target,
1296            Metrics {
1297                transferred_bytes: 2.0,
1298                ..Metrics::default()
1299            },
1300        );
1301
1302        assert_eq!(rx.try_recv().unwrap().transferred_bytes, 2.0);
1303        assert!(rx.try_recv().is_err());
1304    }
1305
1306    #[test]
1307    fn metrics_stream_try_recv_reports_empty_and_closed() {
1308        let (tx, rx) = unbounded::<MetricEvent>();
1309        let stream = MetricsStream::new(rx);
1310
1311        assert_eq!(stream.try_recv(), Err(MetricsRecvError::Empty));
1312
1313        let event = MetricEvent::Interval(Metrics {
1314            transferred_bytes: 42.0,
1315            ..Metrics::default()
1316        });
1317        tx.send(event.clone()).unwrap();
1318        assert_eq!(stream.try_recv(), Ok(event));
1319
1320        drop(tx);
1321        assert_eq!(stream.try_recv(), Err(MetricsRecvError::Closed));
1322    }
1323
1324    #[test]
1325    fn metrics_stream_recv_timeout_reports_timeout_and_closed() {
1326        let (tx, rx) = unbounded::<MetricEvent>();
1327        let stream = MetricsStream::new(rx);
1328
1329        assert_eq!(
1330            stream.recv_timeout(Duration::from_millis(1)),
1331            Err(MetricsRecvError::Timeout)
1332        );
1333
1334        let event = MetricEvent::Interval(Metrics {
1335            transferred_bytes: 7.0,
1336            ..Metrics::default()
1337        });
1338        tx.send(event.clone()).unwrap();
1339        assert_eq!(stream.recv_timeout(Duration::from_secs(1)), Ok(event));
1340
1341        drop(tx);
1342        assert_eq!(
1343            stream.recv_timeout(Duration::from_secs(1)),
1344            Err(MetricsRecvError::Closed)
1345        );
1346    }
1347
1348    #[test]
1349    fn metric_event_stream_forwards_interval_samples() {
1350        let (tx, rx) = unbounded::<Metrics>();
1351        let sample = Metrics {
1352            transferred_bytes: 42.0,
1353            ..Metrics::default()
1354        };
1355        let (mut stream, worker) = metric_event_stream(rx, MetricsMode::Interval);
1356
1357        tx.send(sample.clone()).unwrap();
1358        drop(tx);
1359
1360        assert_eq!(stream.next(), Some(MetricEvent::Interval(sample)));
1361        worker.join().unwrap();
1362        assert_eq!(stream.next(), None);
1363    }
1364
1365    #[test]
1366    fn metric_event_stream_flushes_final_window() {
1367        let (tx, rx) = unbounded::<Metrics>();
1368        let (mut stream, worker) =
1369            metric_event_stream(rx, MetricsMode::Window(Duration::from_secs(60)));
1370
1371        tx.send(Metrics {
1372            timestamp_unix_seconds: 10.0,
1373            role: Role::Client,
1374            direction: MetricDirection::Sender,
1375            stream_count: 2,
1376            protocol: TransportProtocol::Tcp,
1377            transferred_bytes: 4.0,
1378            bandwidth_bits_per_second: 32.0,
1379            interval_duration_seconds: 1.0,
1380            ..Metrics::default()
1381        })
1382        .unwrap();
1383        tx.send(Metrics {
1384            timestamp_unix_seconds: 11.0,
1385            role: Role::Client,
1386            direction: MetricDirection::Sender,
1387            stream_count: 2,
1388            protocol: TransportProtocol::Tcp,
1389            transferred_bytes: 8.0,
1390            bandwidth_bits_per_second: 64.0,
1391            interval_duration_seconds: 1.0,
1392            ..Metrics::default()
1393        })
1394        .unwrap();
1395        drop(tx);
1396
1397        let Some(MetricEvent::Window(window)) = stream.next() else {
1398            panic!("expected a final window event");
1399        };
1400        assert_eq!(window.transferred_bytes, 12.0);
1401        assert_eq!(window.duration_seconds, 2.0);
1402        assert_eq!(window.bandwidth_bits_per_second.mean, 48.0);
1403        assert_eq!(window.timestamp_unix_seconds, 11.0);
1404        assert_eq!(window.role, Role::Client);
1405        assert_eq!(window.direction, MetricDirection::Sender);
1406        assert_eq!(window.stream_count, 2);
1407        assert_eq!(window.protocol, TransportProtocol::Tcp);
1408        worker.join().unwrap();
1409        assert_eq!(stream.next(), None);
1410    }
1411
1412    #[test]
1413    fn metric_event_stream_splits_windows_when_context_changes() {
1414        let (tx, rx) = unbounded::<Metrics>();
1415        let (mut stream, worker) =
1416            metric_event_stream(rx, MetricsMode::Window(Duration::from_secs(60)));
1417
1418        tx.send(Metrics {
1419            role: Role::Client,
1420            direction: MetricDirection::Sender,
1421            stream_count: 1,
1422            protocol: TransportProtocol::Tcp,
1423            transferred_bytes: 4.0,
1424            interval_duration_seconds: 1.0,
1425            ..Metrics::default()
1426        })
1427        .unwrap();
1428        tx.send(Metrics {
1429            role: Role::Client,
1430            direction: MetricDirection::Receiver,
1431            stream_count: 1,
1432            protocol: TransportProtocol::Tcp,
1433            transferred_bytes: 8.0,
1434            interval_duration_seconds: 1.0,
1435            ..Metrics::default()
1436        })
1437        .unwrap();
1438        drop(tx);
1439
1440        let Some(MetricEvent::Window(first)) = stream.next() else {
1441            panic!("expected sender window");
1442        };
1443        let Some(MetricEvent::Window(second)) = stream.next() else {
1444            panic!("expected receiver window");
1445        };
1446
1447        assert_eq!(first.transferred_bytes, 4.0);
1448        assert_eq!(first.direction, MetricDirection::Sender);
1449        assert_eq!(second.transferred_bytes, 8.0);
1450        assert_eq!(second.direction, MetricDirection::Receiver);
1451        worker.join().unwrap();
1452        assert_eq!(stream.next(), None);
1453    }
1454
1455    #[cfg(all(feature = "pushgateway", feature = "serde"))]
1456    #[test]
1457    fn metrics_file_errors_are_returned_from_sink_worker() {
1458        use std::fs;
1459        use std::time::{SystemTime, UNIX_EPOCH};
1460
1461        use crate::metrics_file::{MetricsFileFormat, MetricsFileSink};
1462
1463        let nonce = SystemTime::now()
1464            .duration_since(UNIX_EPOCH)
1465            .unwrap()
1466            .as_nanos();
1467        let path = std::env::temp_dir().join(format!(
1468            "iperf3-rs-metrics-worker-{}-{nonce}.jsonl",
1469            std::process::id()
1470        ));
1471        let sink = MetricsFileSink::new(&path, MetricsFileFormat::Jsonl).unwrap();
1472        fs::remove_file(&path).unwrap();
1473        fs::create_dir(&path).unwrap();
1474
1475        let mut sinks = MetricsSinks::new();
1476        sinks.file(sink);
1477        let (tx, rx) = unbounded();
1478        tx.send(Metrics {
1479            transferred_bytes: 1.0,
1480            interval_duration_seconds: 1.0,
1481            ..Metrics::default()
1482        })
1483        .unwrap();
1484        drop(tx);
1485
1486        let err = run_metrics_sinks(rx, sinks).unwrap_err();
1487
1488        assert_eq!(err.kind(), crate::ErrorKind::MetricsFile);
1489        let _ = fs::remove_dir(path);
1490    }
1491
1492    #[test]
1493    fn aggregate_window_returns_none_for_empty_samples() {
1494        assert!(aggregate_window(&[]).is_none());
1495    }
1496
1497    #[test]
1498    fn aggregate_window_summarizes_interval_samples_by_metric_semantics() {
1499        let window = aggregate_window(&[
1500            Metrics {
1501                timestamp_unix_seconds: 10.0,
1502                role: Role::Client,
1503                direction: MetricDirection::Sender,
1504                stream_count: 2,
1505                protocol: TransportProtocol::Tcp,
1506                transferred_bytes: 100.0,
1507                bandwidth_bits_per_second: 800.0,
1508                tcp_retransmits: Some(1.0),
1509                tcp_rtt_seconds: Some(0.010),
1510                tcp_snd_cwnd_bytes: Some(1000.0),
1511                udp_packets: Some(2.0),
1512                interval_duration_seconds: 1.0,
1513                ..Metrics::default()
1514            },
1515            Metrics {
1516                timestamp_unix_seconds: 11.0,
1517                role: Role::Client,
1518                direction: MetricDirection::Sender,
1519                stream_count: 2,
1520                protocol: TransportProtocol::Tcp,
1521                transferred_bytes: 900.0,
1522                bandwidth_bits_per_second: 2400.0,
1523                tcp_retransmits: Some(2.0),
1524                tcp_rtt_seconds: Some(0.030),
1525                tcp_snd_cwnd_bytes: Some(3000.0),
1526                udp_packets: Some(3.0),
1527                interval_duration_seconds: 3.0,
1528                omitted: true,
1529                ..Metrics::default()
1530            },
1531        ])
1532        .unwrap();
1533
1534        assert_eq!(window.duration_seconds, 4.0);
1535        assert_eq!(window.transferred_bytes, 1000.0);
1536        assert_eq!(window.timestamp_unix_seconds, 11.0);
1537        assert_eq!(window.role, Role::Client);
1538        assert_eq!(window.direction, MetricDirection::Sender);
1539        assert_eq!(window.stream_count, 2);
1540        assert_eq!(window.protocol, TransportProtocol::Tcp);
1541        assert_eq!(
1542            window.bandwidth_bits_per_second,
1543            WindowGaugeStats {
1544                samples: 2,
1545                mean: 2000.0,
1546                min: 800.0,
1547                max: 2400.0
1548            }
1549        );
1550        assert_eq!(
1551            window.tcp_rtt_seconds,
1552            WindowGaugeStats {
1553                samples: 2,
1554                mean: 0.020,
1555                min: 0.010,
1556                max: 0.030
1557            }
1558        );
1559        assert_eq!(
1560            window.tcp_snd_cwnd_bytes,
1561            WindowGaugeStats {
1562                samples: 2,
1563                mean: 2000.0,
1564                min: 1000.0,
1565                max: 3000.0
1566            }
1567        );
1568        assert_eq!(window.tcp_retransmits, Some(3.0));
1569        assert_eq!(window.udp_packets, Some(5.0));
1570        assert_eq!(window.omitted_intervals, 1.0);
1571    }
1572
1573    #[test]
1574    fn aggregate_window_falls_back_to_observed_bandwidth_when_duration_is_zero() {
1575        let window = aggregate_window(&[
1576            Metrics {
1577                transferred_bytes: 100.0,
1578                bandwidth_bits_per_second: 800.0,
1579                ..Metrics::default()
1580            },
1581            Metrics {
1582                transferred_bytes: 900.0,
1583                bandwidth_bits_per_second: 2400.0,
1584                ..Metrics::default()
1585            },
1586        ])
1587        .unwrap();
1588
1589        assert_eq!(window.duration_seconds, 0.0);
1590        assert_eq!(
1591            window.bandwidth_bits_per_second,
1592            WindowGaugeStats {
1593                samples: 2,
1594                mean: 1600.0,
1595                min: 800.0,
1596                max: 2400.0
1597            }
1598        );
1599    }
1600
1601    #[test]
1602    fn aggregate_window_ignores_invalid_counter_values() {
1603        let window = aggregate_window(&[
1604            Metrics {
1605                transferred_bytes: f64::NAN,
1606                bandwidth_bits_per_second: f64::INFINITY,
1607                tcp_retransmits: Some(-1.0),
1608                interval_duration_seconds: -1.0,
1609                ..Metrics::default()
1610            },
1611            Metrics {
1612                transferred_bytes: 8.0,
1613                bandwidth_bits_per_second: 64.0,
1614                tcp_retransmits: Some(2.0),
1615                interval_duration_seconds: 1.0,
1616                ..Metrics::default()
1617            },
1618        ])
1619        .unwrap();
1620
1621        assert_eq!(window.duration_seconds, 1.0);
1622        assert_eq!(window.transferred_bytes, 8.0);
1623        assert_eq!(window.tcp_retransmits, Some(2.0));
1624        assert_eq!(
1625            window.bandwidth_bits_per_second,
1626            WindowGaugeStats {
1627                samples: 1,
1628                mean: 64.0,
1629                min: 64.0,
1630                max: 64.0
1631            }
1632        );
1633    }
1634}