use std::collections::HashMap;
use std::fmt;
use std::os::raw::{c_double, c_int};
use std::sync::{Mutex, OnceLock};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
#[cfg(any(feature = "pushgateway", test))]
use crossbeam_channel::bounded;
use crossbeam_channel::{
Receiver, RecvTimeoutError, Sender, TryRecvError, TrySendError, unbounded,
};
use crate::iperf::{IperfTest, RawIperfTest, Role};
#[cfg(all(feature = "pushgateway", feature = "serde"))]
use crate::metrics_file::MetricsFileSink;
#[cfg(feature = "pushgateway")]
use crate::pushgateway::PushGateway;
use crate::{Error, Result};
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
#[non_exhaustive]
pub enum TransportProtocol {
#[default]
Unknown,
Tcp,
Udp,
Sctp,
Other(i32),
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
#[non_exhaustive]
pub enum MetricDirection {
#[default]
Unknown,
Sender,
Receiver,
Other(i32),
}
impl TransportProtocol {
fn from_callback_value(value: c_int) -> Self {
match value {
1 => Self::Tcp,
2 => Self::Udp,
3 => Self::Sctp,
0 => Self::Unknown,
other => Self::Other(other),
}
}
}
impl MetricDirection {
fn from_callback_value(value: c_int) -> Self {
match value {
1 => Self::Sender,
2 => Self::Receiver,
0 => Self::Unknown,
other => Self::Other(other),
}
}
}
#[derive(Debug, Clone, Default, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
#[non_exhaustive]
pub struct Metrics {
pub timestamp_unix_seconds: f64,
pub role: Role,
pub direction: MetricDirection,
pub stream_count: usize,
pub protocol: TransportProtocol,
pub transferred_bytes: f64,
pub bandwidth_bits_per_second: f64,
pub tcp_retransmits: Option<f64>,
pub tcp_rtt_seconds: Option<f64>,
pub tcp_rttvar_seconds: Option<f64>,
pub tcp_snd_cwnd_bytes: Option<f64>,
pub tcp_snd_wnd_bytes: Option<f64>,
pub tcp_pmtu_bytes: Option<f64>,
pub tcp_reorder_events: Option<f64>,
pub udp_packets: Option<f64>,
pub udp_lost_packets: Option<f64>,
pub udp_jitter_seconds: Option<f64>,
pub udp_out_of_order_packets: Option<f64>,
pub interval_duration_seconds: f64,
pub omitted: bool,
}
impl Metrics {
pub fn new() -> Self {
Self::default()
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
#[non_exhaustive]
pub struct WindowGaugeStats {
pub samples: usize,
pub mean: f64,
pub min: f64,
pub max: f64,
}
impl WindowGaugeStats {
pub fn new() -> Self {
Self::default()
}
}
#[derive(Debug, Clone, Default, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
#[non_exhaustive]
pub struct WindowMetrics {
pub timestamp_unix_seconds: f64,
pub role: Role,
pub direction: MetricDirection,
pub stream_count: usize,
pub protocol: TransportProtocol,
pub duration_seconds: f64,
pub transferred_bytes: f64,
pub bandwidth_bits_per_second: WindowGaugeStats,
pub tcp_rtt_seconds: WindowGaugeStats,
pub tcp_rttvar_seconds: WindowGaugeStats,
pub tcp_snd_cwnd_bytes: WindowGaugeStats,
pub tcp_snd_wnd_bytes: WindowGaugeStats,
pub tcp_pmtu_bytes: WindowGaugeStats,
pub udp_jitter_seconds: WindowGaugeStats,
pub tcp_retransmits: Option<f64>,
pub tcp_reorder_events: Option<f64>,
pub udp_packets: Option<f64>,
pub udp_lost_packets: Option<f64>,
pub udp_out_of_order_packets: Option<f64>,
pub omitted_intervals: f64,
}
impl WindowMetrics {
pub fn new() -> Self {
Self::default()
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
#[non_exhaustive]
pub enum MetricsMode {
#[default]
Disabled,
Interval,
Window(Duration),
}
impl MetricsMode {
pub const fn is_enabled(self) -> bool {
!matches!(self, Self::Disabled)
}
pub(crate) const fn callback_queue(self) -> Option<MetricsQueue> {
match self {
Self::Disabled => None,
Self::Interval | Self::Window(_) => Some(MetricsQueue::All),
}
}
}
#[derive(Debug, Clone, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
#[non_exhaustive]
pub enum MetricEvent {
Interval(Metrics),
Window(WindowMetrics),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum MetricsRecvError {
Empty,
Timeout,
Closed,
}
impl fmt::Display for MetricsRecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Empty => f.write_str("no metrics event is currently queued"),
Self::Timeout => f.write_str("timed out waiting for metrics event"),
Self::Closed => f.write_str("metrics stream is closed"),
}
}
}
impl std::error::Error for MetricsRecvError {}
#[derive(Debug)]
pub struct MetricsStream {
rx: Receiver<MetricEvent>,
}
impl MetricsStream {
fn new(rx: Receiver<MetricEvent>) -> Self {
Self { rx }
}
pub fn recv(&self) -> Option<MetricEvent> {
self.rx.recv().ok()
}
pub fn recv_timeout(
&self,
timeout: Duration,
) -> std::result::Result<MetricEvent, MetricsRecvError> {
match self.rx.recv_timeout(timeout) {
Ok(event) => Ok(event),
Err(RecvTimeoutError::Timeout) => Err(MetricsRecvError::Timeout),
Err(RecvTimeoutError::Disconnected) => Err(MetricsRecvError::Closed),
}
}
pub fn try_recv(&self) -> std::result::Result<MetricEvent, MetricsRecvError> {
match self.rx.try_recv() {
Ok(event) => Ok(event),
Err(TryRecvError::Empty) => Err(MetricsRecvError::Empty),
Err(TryRecvError::Disconnected) => Err(MetricsRecvError::Closed),
}
}
}
impl Iterator for MetricsStream {
type Item = MetricEvent;
fn next(&mut self) -> Option<Self::Item> {
self.recv()
}
}
#[cfg(feature = "pushgateway")]
pub(crate) struct IntervalMetricsReporter {
callback: Option<CallbackMetricsReporter>,
worker: Option<JoinHandle<Result<()>>>,
}
#[cfg(feature = "pushgateway")]
pub(crate) struct MetricsSinks {
pushgateway: Option<PushGatewaySink>,
#[cfg(feature = "serde")]
file: Option<MetricsFileSink>,
}
#[cfg(feature = "pushgateway")]
impl MetricsSinks {
pub(crate) fn new() -> Self {
Self {
pushgateway: None,
#[cfg(feature = "serde")]
file: None,
}
}
pub(crate) fn pushgateway(&mut self, sink: PushGateway, push_interval: Option<Duration>) {
self.pushgateway = Some(PushGatewaySink {
sink,
push_interval,
});
}
#[cfg(feature = "serde")]
pub(crate) fn file(&mut self, sink: MetricsFileSink) {
self.file = Some(sink);
}
#[cfg(feature = "serde")]
pub(crate) fn is_empty(&self) -> bool {
self.pushgateway.is_none() && self.file_is_empty()
}
fn queue(&self) -> MetricsQueue {
if self.file_is_present()
|| self
.pushgateway
.as_ref()
.and_then(|pushgateway| pushgateway.push_interval)
.is_some()
{
MetricsQueue::All
} else {
MetricsQueue::Latest
}
}
#[cfg(feature = "serde")]
fn file_is_empty(&self) -> bool {
self.file.is_none()
}
#[cfg(feature = "serde")]
fn file_is_present(&self) -> bool {
self.file.is_some()
}
#[cfg(not(feature = "serde"))]
fn file_is_present(&self) -> bool {
false
}
}
#[cfg(feature = "pushgateway")]
struct PushGatewaySink {
sink: PushGateway,
push_interval: Option<Duration>,
}
#[cfg(feature = "pushgateway")]
impl IntervalMetricsReporter {
pub(crate) fn attach(
test: &mut IperfTest,
sink: PushGateway,
push_interval: Option<Duration>,
) -> Result<Self> {
let mut sinks = MetricsSinks::new();
sinks.pushgateway(sink, push_interval);
Self::attach_sinks(test, sinks)
}
pub(crate) fn attach_sinks(test: &mut IperfTest, sinks: MetricsSinks) -> Result<Self> {
let queue = sinks.queue();
let (callback, rx) = CallbackMetricsReporter::attach(test, queue)?;
let worker = thread::spawn(move || run_metrics_sinks(rx, sinks));
Ok(Self {
callback: Some(callback),
worker: Some(worker),
})
}
pub(crate) fn finish(mut self) -> Result<()> {
self.stop()
}
fn stop(&mut self) -> Result<()> {
drop(self.callback.take());
if let Some(worker) = self.worker.take() {
worker
.join()
.map_err(|_| Error::worker("metrics sink worker thread panicked"))?
} else {
Ok(())
}
}
}
pub(crate) struct CallbackMetricsReporter {
test_key: usize,
}
impl CallbackMetricsReporter {
pub(crate) fn attach(
test: &mut IperfTest,
queue: MetricsQueue,
) -> Result<(Self, Receiver<Metrics>)> {
let (target, rx) = callback_channel(queue, test.role());
let test_key = test.as_ptr() as usize;
callbacks()
.lock()
.map_err(|_| Error::internal("metrics callback registry is poisoned"))?
.insert(test_key, target);
test.enable_interval_metrics(metrics_callback);
Ok((Self { test_key }, rx))
}
}
impl Drop for CallbackMetricsReporter {
fn drop(&mut self) {
if let Ok(mut callbacks) = callbacks().lock() {
callbacks.remove(&self.test_key);
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum MetricsQueue {
#[cfg(feature = "pushgateway")]
Latest,
All,
}
fn callback_channel(queue: MetricsQueue, role: Role) -> (CallbackTarget, Receiver<Metrics>) {
match queue {
MetricsQueue::All => {
let (tx, rx) = unbounded::<Metrics>();
(
CallbackTarget {
tx,
latest_rx: None,
role,
},
rx,
)
}
#[cfg(feature = "pushgateway")]
MetricsQueue::Latest => {
let (tx, rx) = bounded::<Metrics>(1);
(
CallbackTarget {
tx,
latest_rx: Some(rx.clone()),
role,
},
rx,
)
}
}
}
pub(crate) fn metric_event_stream(
rx: Receiver<Metrics>,
mode: MetricsMode,
) -> (MetricsStream, JoinHandle<()>) {
let (tx, event_rx) = unbounded::<MetricEvent>();
let worker = thread::spawn(move || match mode {
MetricsMode::Disabled => {}
MetricsMode::Interval => forward_interval_events(rx, tx),
MetricsMode::Window(interval) => forward_window_events(rx, tx, interval),
});
(MetricsStream::new(event_rx), worker)
}
fn forward_interval_events(rx: Receiver<Metrics>, tx: Sender<MetricEvent>) {
for metrics in rx {
if tx.send(MetricEvent::Interval(metrics)).is_err() {
break;
}
}
}
fn forward_window_events(rx: Receiver<Metrics>, tx: Sender<MetricEvent>, interval: Duration) {
let mut window = Vec::new();
let mut deadline = None;
loop {
match deadline {
Some(flush_at) => {
let now = Instant::now();
if now >= flush_at {
if !flush_window_event(&tx, &mut window) {
break;
}
deadline = None;
continue;
}
match rx.recv_timeout(flush_at - now) {
Ok(metrics) => {
if window_context_changes(&window, &metrics) {
if !flush_window_event(&tx, &mut window) {
break;
}
deadline = Some(window_deadline(interval));
}
window.push(metrics);
}
Err(RecvTimeoutError::Timeout) => {
if !flush_window_event(&tx, &mut window) {
break;
}
deadline = None;
}
Err(RecvTimeoutError::Disconnected) => break,
}
}
None => match rx.recv() {
Ok(metrics) => {
window.push(metrics);
deadline = Some(window_deadline(interval));
}
Err(_) => break,
},
}
}
let _ = flush_window_event(&tx, &mut window);
}
fn flush_window_event(tx: &Sender<MetricEvent>, window: &mut Vec<Metrics>) -> bool {
let Some(metrics) = aggregate_window(window) else {
return true;
};
window.clear();
tx.send(MetricEvent::Window(metrics)).is_ok()
}
fn window_deadline(interval: Duration) -> Instant {
Instant::now()
.checked_add(interval)
.unwrap_or_else(Instant::now)
}
fn window_context_changes(window: &[Metrics], metrics: &Metrics) -> bool {
window
.first()
.map(|first| !same_window_context(first, metrics))
.unwrap_or(false)
}
fn same_window_context(left: &Metrics, right: &Metrics) -> bool {
left.role == right.role
&& left.direction == right.direction
&& left.stream_count == right.stream_count
&& left.protocol == right.protocol
}
#[cfg(feature = "pushgateway")]
fn run_metrics_sinks(rx: Receiver<Metrics>, sinks: MetricsSinks) -> Result<()> {
match sinks
.pushgateway
.as_ref()
.and_then(|pushgateway| pushgateway.push_interval)
{
Some(interval) => push_window_metrics(rx, sinks, interval),
None => push_interval_metrics(rx, sinks),
}
}
#[cfg(feature = "pushgateway")]
fn push_interval_metrics(rx: Receiver<Metrics>, sinks: MetricsSinks) -> Result<()> {
let mut result = Ok(());
for metrics in rx {
if let Err(err) = write_metrics_file(&sinks, &metrics) {
result = Err(err);
break;
}
push_interval_to_gateway(&sinks, &metrics);
}
delete_pushgateway_on_finish(&sinks);
result
}
#[cfg(feature = "pushgateway")]
fn push_window_metrics(
rx: Receiver<Metrics>,
sinks: MetricsSinks,
interval: Duration,
) -> Result<()> {
let mut window = Vec::new();
let mut deadline = None;
let mut result = Ok(());
loop {
match deadline {
Some(flush_at) => {
let now = Instant::now();
if now >= flush_at {
flush_window_metrics(&sinks, &mut window);
deadline = None;
continue;
}
match rx.recv_timeout(flush_at - now) {
Ok(metrics) => {
if let Err(err) = write_metrics_file(&sinks, &metrics) {
result = Err(err);
break;
}
if window_context_changes(&window, &metrics) {
flush_window_metrics(&sinks, &mut window);
deadline = Some(window_deadline(interval));
}
window.push(metrics);
}
Err(RecvTimeoutError::Timeout) => {
flush_window_metrics(&sinks, &mut window);
deadline = None;
}
Err(RecvTimeoutError::Disconnected) => break,
}
}
None => match rx.recv() {
Ok(metrics) => {
if let Err(err) = write_metrics_file(&sinks, &metrics) {
result = Err(err);
break;
}
window.push(metrics);
deadline = Some(window_deadline(interval));
}
Err(_) => break,
},
}
}
if result.is_ok() {
flush_window_metrics(&sinks, &mut window);
}
delete_pushgateway_on_finish(&sinks);
result
}
#[cfg(feature = "pushgateway")]
fn push_interval_to_gateway(sinks: &MetricsSinks, metrics: &Metrics) {
let result = sinks
.pushgateway
.as_ref()
.map(|pushgateway| pushgateway.sink.push(metrics));
if let Some(Err(err)) = result {
eprintln!("failed to push metrics: {err:#}");
}
}
#[cfg(feature = "pushgateway")]
fn flush_window_metrics(sinks: &MetricsSinks, window: &mut Vec<Metrics>) {
let Some(metrics) = aggregate_window(window) else {
return;
};
let result = sinks
.pushgateway
.as_ref()
.map(|pushgateway| pushgateway.sink.push_window(&metrics));
if let Some(Err(err)) = result {
eprintln!("failed to push window metrics: {err:#}");
}
window.clear();
}
#[cfg(all(feature = "pushgateway", feature = "serde"))]
fn write_metrics_file(sinks: &MetricsSinks, metrics: &Metrics) -> Result<()> {
if let Some(file) = &sinks.file {
file.write_interval(metrics)?;
}
Ok(())
}
#[cfg(all(feature = "pushgateway", not(feature = "serde")))]
fn write_metrics_file(_sinks: &MetricsSinks, _metrics: &Metrics) -> Result<()> {
Ok(())
}
#[cfg(feature = "pushgateway")]
fn delete_pushgateway_on_finish(sinks: &MetricsSinks) {
let result = sinks
.pushgateway
.as_ref()
.filter(|pushgateway| pushgateway.sink.delete_on_finish())
.map(|pushgateway| pushgateway.sink.delete());
if let Some(Err(err)) = result {
eprintln!("failed to delete Pushgateway metrics: {err:#}");
}
}
#[cfg(feature = "pushgateway")]
impl Drop for IntervalMetricsReporter {
fn drop(&mut self) {
let _ = self.stop();
}
}
struct CallbackTarget {
tx: Sender<Metrics>,
latest_rx: Option<Receiver<Metrics>>,
role: Role,
}
static CALLBACKS: OnceLock<Mutex<HashMap<usize, CallbackTarget>>> = OnceLock::new();
fn callbacks() -> &'static Mutex<HashMap<usize, CallbackTarget>> {
CALLBACKS.get_or_init(|| Mutex::new(HashMap::new()))
}
unsafe extern "C" fn metrics_callback(
test: *mut RawIperfTest,
transferred_bytes: c_double,
bandwidth_bits_per_second: c_double,
tcp_retransmits: c_double,
tcp_rtt_seconds: c_double,
tcp_rttvar_seconds: c_double,
tcp_snd_cwnd_bytes: c_double,
tcp_snd_wnd_bytes: c_double,
tcp_pmtu_bytes: c_double,
tcp_reorder_events: c_double,
udp_packets: c_double,
udp_lost_packets: c_double,
udp_jitter_seconds: c_double,
udp_out_of_order_packets: c_double,
interval_duration_seconds: c_double,
omitted: c_double,
protocol: c_int,
direction: c_int,
stream_count: c_int,
tcp_retransmits_available: c_int,
tcp_rtt_seconds_available: c_int,
tcp_rttvar_seconds_available: c_int,
tcp_snd_cwnd_bytes_available: c_int,
tcp_snd_wnd_bytes_available: c_int,
tcp_pmtu_bytes_available: c_int,
tcp_reorder_events_available: c_int,
udp_packets_available: c_int,
udp_lost_packets_available: c_int,
udp_jitter_seconds_available: c_int,
udp_out_of_order_packets_available: c_int,
) {
if test.is_null() {
return;
}
let Ok(callbacks) = callbacks().lock() else {
return;
};
let Some(target) = callbacks.get(&(test as usize)) else {
return;
};
enqueue_latest(
target,
Metrics {
timestamp_unix_seconds: current_unix_timestamp_seconds(),
role: target.role,
direction: MetricDirection::from_callback_value(direction),
stream_count: nonnegative_usize(stream_count),
protocol: TransportProtocol::from_callback_value(protocol),
transferred_bytes,
bandwidth_bits_per_second,
tcp_retransmits: available(tcp_retransmits_available, tcp_retransmits),
tcp_rtt_seconds: available(tcp_rtt_seconds_available, tcp_rtt_seconds),
tcp_rttvar_seconds: available(tcp_rttvar_seconds_available, tcp_rttvar_seconds),
tcp_snd_cwnd_bytes: available(tcp_snd_cwnd_bytes_available, tcp_snd_cwnd_bytes),
tcp_snd_wnd_bytes: available(tcp_snd_wnd_bytes_available, tcp_snd_wnd_bytes),
tcp_pmtu_bytes: available(tcp_pmtu_bytes_available, tcp_pmtu_bytes),
tcp_reorder_events: available(tcp_reorder_events_available, tcp_reorder_events),
udp_packets: available(udp_packets_available, udp_packets),
udp_lost_packets: available(udp_lost_packets_available, udp_lost_packets),
udp_jitter_seconds: available(udp_jitter_seconds_available, udp_jitter_seconds),
udp_out_of_order_packets: available(
udp_out_of_order_packets_available,
udp_out_of_order_packets,
),
interval_duration_seconds,
omitted: omitted != 0.0,
},
);
}
fn current_unix_timestamp_seconds() -> f64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_secs_f64())
.unwrap_or(0.0)
}
fn available(flag: c_int, value: c_double) -> Option<f64> {
(flag != 0).then_some(value)
}
fn nonnegative_usize(value: c_int) -> usize {
usize::try_from(value).unwrap_or(0)
}
fn enqueue_latest(target: &CallbackTarget, metrics: Metrics) {
match target.tx.try_send(metrics) {
Ok(()) => {}
Err(TrySendError::Full(metrics)) => {
if let Some(rx) = &target.latest_rx {
let _ = rx.try_recv();
}
let _ = target.tx.try_send(metrics);
}
Err(TrySendError::Disconnected(_)) => {}
}
}
pub fn aggregate_window(samples: &[Metrics]) -> Option<WindowMetrics> {
if samples.is_empty() {
return None;
}
let mut bandwidth = GaugeAccumulator::default();
let mut tcp_rtt = GaugeAccumulator::default();
let mut tcp_rttvar = GaugeAccumulator::default();
let mut tcp_snd_cwnd = GaugeAccumulator::default();
let mut tcp_snd_wnd = GaugeAccumulator::default();
let mut tcp_pmtu = GaugeAccumulator::default();
let mut udp_jitter = GaugeAccumulator::default();
let mut duration_seconds = 0.0;
let mut transferred_bytes = 0.0;
let mut tcp_retransmits = OptionalCounter::default();
let mut tcp_reorder_events = OptionalCounter::default();
let mut udp_packets = OptionalCounter::default();
let mut udp_lost_packets = OptionalCounter::default();
let mut udp_out_of_order_packets = OptionalCounter::default();
let mut omitted_intervals = 0.0;
let context = &samples[0];
for metrics in samples {
duration_seconds += finite_nonnegative(metrics.interval_duration_seconds);
transferred_bytes += finite_nonnegative(metrics.transferred_bytes);
bandwidth.observe(metrics.bandwidth_bits_per_second);
tcp_rtt.observe_option(metrics.tcp_rtt_seconds);
tcp_rttvar.observe_option(metrics.tcp_rttvar_seconds);
tcp_snd_cwnd.observe_option(metrics.tcp_snd_cwnd_bytes);
tcp_snd_wnd.observe_option(metrics.tcp_snd_wnd_bytes);
tcp_pmtu.observe_option(metrics.tcp_pmtu_bytes);
udp_jitter.observe_option(metrics.udp_jitter_seconds);
tcp_retransmits.observe(metrics.tcp_retransmits);
tcp_reorder_events.observe(metrics.tcp_reorder_events);
udp_packets.observe(metrics.udp_packets);
udp_lost_packets.observe(metrics.udp_lost_packets);
udp_out_of_order_packets.observe(metrics.udp_out_of_order_packets);
if metrics.omitted {
omitted_intervals += 1.0;
}
}
let bandwidth_mean = if duration_seconds > 0.0 {
(transferred_bytes * 8.0) / duration_seconds
} else {
bandwidth.finish().mean
};
Some(WindowMetrics {
timestamp_unix_seconds: samples
.last()
.map(|metrics| metrics.timestamp_unix_seconds)
.unwrap_or_default(),
role: context.role,
direction: context.direction,
stream_count: context.stream_count,
protocol: context.protocol,
duration_seconds,
transferred_bytes,
bandwidth_bits_per_second: bandwidth.finish_with_mean(bandwidth_mean),
tcp_rtt_seconds: tcp_rtt.finish(),
tcp_rttvar_seconds: tcp_rttvar.finish(),
tcp_snd_cwnd_bytes: tcp_snd_cwnd.finish(),
tcp_snd_wnd_bytes: tcp_snd_wnd.finish(),
tcp_pmtu_bytes: tcp_pmtu.finish(),
udp_jitter_seconds: udp_jitter.finish(),
tcp_retransmits: tcp_retransmits.finish(),
tcp_reorder_events: tcp_reorder_events.finish(),
udp_packets: udp_packets.finish(),
udp_lost_packets: udp_lost_packets.finish(),
udp_out_of_order_packets: udp_out_of_order_packets.finish(),
omitted_intervals,
})
}
#[derive(Debug, Clone, Default)]
struct GaugeAccumulator {
count: usize,
sum: f64,
min: f64,
max: f64,
}
impl GaugeAccumulator {
fn observe(&mut self, value: f64) {
if !value.is_finite() {
return;
}
if self.count == 0 {
self.min = value;
self.max = value;
} else {
self.min = self.min.min(value);
self.max = self.max.max(value);
}
self.count += 1;
self.sum += value;
}
fn observe_option(&mut self, value: Option<f64>) {
if let Some(value) = value {
self.observe(value);
}
}
fn finish(&self) -> WindowGaugeStats {
if self.count == 0 {
return WindowGaugeStats::default();
}
WindowGaugeStats {
samples: self.count,
mean: self.sum / self.count as f64,
min: self.min,
max: self.max,
}
}
fn finish_with_mean(&self, mean: f64) -> WindowGaugeStats {
let mut stats = self.finish();
if self.count > 0 && mean.is_finite() {
stats.mean = mean;
}
stats
}
}
#[derive(Debug, Clone, Default)]
struct OptionalCounter {
observed: bool,
sum: f64,
}
impl OptionalCounter {
fn observe(&mut self, value: Option<f64>) {
let Some(value) = value else {
return;
};
self.observed = true;
self.sum += finite_nonnegative(value);
}
fn finish(&self) -> Option<f64> {
self.observed.then_some(self.sum)
}
}
fn finite_nonnegative(value: f64) -> f64 {
if value.is_finite() && value > 0.0 {
value
} else {
0.0
}
}
#[cfg(kani)]
mod verification {
use super::*;
#[kani::proof]
fn empty_window_has_no_summary() {
assert!(aggregate_window(&[]).is_none());
}
#[kani::proof]
fn metrics_mode_callback_policy_matches_variant() {
let variant: u8 = kani::any();
let mode = match variant % 3 {
0 => MetricsMode::Disabled,
1 => MetricsMode::Interval,
_ => MetricsMode::Window(Duration::from_secs(1)),
};
assert_eq!(mode.is_enabled(), !matches!(mode, MetricsMode::Disabled));
assert_eq!(mode.callback_queue().is_some(), mode.is_enabled());
}
#[kani::proof]
fn callback_availability_flag_controls_optional_metric() {
let flag: c_int = kani::any();
let value = f64::from(kani::any::<i16>());
assert_eq!(available(flag, value), (flag != 0).then_some(value));
}
#[kani::proof]
fn callback_stream_count_never_wraps_negative_values() {
let raw: i16 = kani::any();
let expected = if raw < 0 { 0 } else { raw as usize };
assert_eq!(nonnegative_usize(c_int::from(raw)), expected);
}
#[kani::proof]
fn callback_context_mappers_preserve_known_values() {
let protocol: i8 = kani::any();
let direction: i8 = kani::any();
let expected_protocol = match protocol {
0 => TransportProtocol::Unknown,
1 => TransportProtocol::Tcp,
2 => TransportProtocol::Udp,
3 => TransportProtocol::Sctp,
other => TransportProtocol::Other(c_int::from(other)),
};
let expected_direction = match direction {
0 => MetricDirection::Unknown,
1 => MetricDirection::Sender,
2 => MetricDirection::Receiver,
other => MetricDirection::Other(c_int::from(other)),
};
assert_eq!(
TransportProtocol::from_callback_value(c_int::from(protocol)),
expected_protocol
);
assert_eq!(
MetricDirection::from_callback_value(c_int::from(direction)),
expected_direction
);
}
#[kani::proof]
#[kani::unwind(3)]
fn window_counters_are_nonnegative_for_bounded_inputs() {
let sample = Metrics {
transferred_bytes: f64::from(kani::any::<i16>()),
tcp_retransmits: Some(f64::from(kani::any::<i16>())),
tcp_reorder_events: Some(f64::from(kani::any::<i16>())),
udp_packets: Some(f64::from(kani::any::<i16>())),
udp_lost_packets: Some(f64::from(kani::any::<i16>())),
udp_out_of_order_packets: Some(f64::from(kani::any::<i16>())),
interval_duration_seconds: f64::from(kani::any::<i16>()),
omitted: kani::any(),
..Metrics::default()
};
let window = aggregate_window(&[sample]).expect("nonempty windows summarize");
assert!(window.duration_seconds >= 0.0);
assert!(window.transferred_bytes >= 0.0);
assert!(window.tcp_retransmits.unwrap_or(0.0) >= 0.0);
assert!(window.tcp_reorder_events.unwrap_or(0.0) >= 0.0);
assert!(window.udp_packets.unwrap_or(0.0) >= 0.0);
assert!(window.udp_lost_packets.unwrap_or(0.0) >= 0.0);
assert!(window.udp_out_of_order_packets.unwrap_or(0.0) >= 0.0);
assert!(window.omitted_intervals >= 0.0);
}
#[kani::proof]
#[kani::unwind(3)]
fn window_bandwidth_mean_uses_total_bits_over_duration_for_unit_intervals() {
let bytes_a: u8 = kani::any();
let bytes_b: u8 = kani::any();
let samples = [
metrics_with_unit_duration(bytes_a),
metrics_with_unit_duration(bytes_b),
];
let window = aggregate_window(&samples).expect("nonempty windows summarize");
let expected = ((f64::from(bytes_a) + f64::from(bytes_b)) * 8.0) / 2.0;
assert_eq!(window.bandwidth_bits_per_second.mean, expected);
}
#[kani::proof]
#[kani::unwind(3)]
fn window_gauge_statistics_remain_ordered_for_consistent_samples() {
let bytes_a: u8 = kani::any();
let bytes_b: u8 = kani::any();
let rtt_a: u8 = kani::any();
let rtt_b: u8 = kani::any();
let samples = [
Metrics {
transferred_bytes: f64::from(bytes_a),
bandwidth_bits_per_second: f64::from(bytes_a) * 8.0,
tcp_rtt_seconds: Some(f64::from(rtt_a)),
interval_duration_seconds: 1.0,
..Metrics::default()
},
Metrics {
transferred_bytes: f64::from(bytes_b),
bandwidth_bits_per_second: f64::from(bytes_b) * 8.0,
tcp_rtt_seconds: Some(f64::from(rtt_b)),
interval_duration_seconds: 1.0,
..Metrics::default()
},
];
let window = aggregate_window(&samples).expect("nonempty windows summarize");
assert_ordered(window.bandwidth_bits_per_second);
assert_ordered(window.tcp_rtt_seconds);
}
fn metrics_with_unit_duration(bytes: u8) -> Metrics {
Metrics {
transferred_bytes: f64::from(bytes),
bandwidth_bits_per_second: f64::from(bytes) * 8.0,
interval_duration_seconds: 1.0,
..Metrics::default()
}
}
fn assert_ordered(stats: WindowGaugeStats) {
assert!(stats.samples > 0);
assert!(stats.min <= stats.mean);
assert!(stats.mean <= stats.max);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn transport_protocol_maps_callback_values() {
assert_eq!(
TransportProtocol::from_callback_value(0),
TransportProtocol::Unknown
);
assert_eq!(
TransportProtocol::from_callback_value(1),
TransportProtocol::Tcp
);
assert_eq!(
TransportProtocol::from_callback_value(2),
TransportProtocol::Udp
);
assert_eq!(
TransportProtocol::from_callback_value(3),
TransportProtocol::Sctp
);
assert_eq!(
TransportProtocol::from_callback_value(99),
TransportProtocol::Other(99)
);
}
#[test]
fn metric_direction_maps_callback_values() {
assert_eq!(
MetricDirection::from_callback_value(0),
MetricDirection::Unknown
);
assert_eq!(
MetricDirection::from_callback_value(1),
MetricDirection::Sender
);
assert_eq!(
MetricDirection::from_callback_value(2),
MetricDirection::Receiver
);
assert_eq!(
MetricDirection::from_callback_value(99),
MetricDirection::Other(99)
);
}
#[test]
fn enqueue_latest_replaces_queued_metric() {
let (tx, rx) = bounded::<Metrics>(1);
let target = CallbackTarget {
tx,
latest_rx: Some(rx.clone()),
role: Role::Client,
};
enqueue_latest(
&target,
Metrics {
transferred_bytes: 1.0,
..Metrics::default()
},
);
enqueue_latest(
&target,
Metrics {
transferred_bytes: 2.0,
..Metrics::default()
},
);
assert_eq!(rx.try_recv().unwrap().transferred_bytes, 2.0);
assert!(rx.try_recv().is_err());
}
#[test]
fn metrics_stream_try_recv_reports_empty_and_closed() {
let (tx, rx) = unbounded::<MetricEvent>();
let stream = MetricsStream::new(rx);
assert_eq!(stream.try_recv(), Err(MetricsRecvError::Empty));
let event = MetricEvent::Interval(Metrics {
transferred_bytes: 42.0,
..Metrics::default()
});
tx.send(event.clone()).unwrap();
assert_eq!(stream.try_recv(), Ok(event));
drop(tx);
assert_eq!(stream.try_recv(), Err(MetricsRecvError::Closed));
}
#[test]
fn metrics_stream_recv_timeout_reports_timeout_and_closed() {
let (tx, rx) = unbounded::<MetricEvent>();
let stream = MetricsStream::new(rx);
assert_eq!(
stream.recv_timeout(Duration::from_millis(1)),
Err(MetricsRecvError::Timeout)
);
let event = MetricEvent::Interval(Metrics {
transferred_bytes: 7.0,
..Metrics::default()
});
tx.send(event.clone()).unwrap();
assert_eq!(stream.recv_timeout(Duration::from_secs(1)), Ok(event));
drop(tx);
assert_eq!(
stream.recv_timeout(Duration::from_secs(1)),
Err(MetricsRecvError::Closed)
);
}
#[test]
fn metric_event_stream_forwards_interval_samples() {
let (tx, rx) = unbounded::<Metrics>();
let sample = Metrics {
transferred_bytes: 42.0,
..Metrics::default()
};
let (mut stream, worker) = metric_event_stream(rx, MetricsMode::Interval);
tx.send(sample.clone()).unwrap();
drop(tx);
assert_eq!(stream.next(), Some(MetricEvent::Interval(sample)));
worker.join().unwrap();
assert_eq!(stream.next(), None);
}
#[test]
fn metric_event_stream_flushes_final_window() {
let (tx, rx) = unbounded::<Metrics>();
let (mut stream, worker) =
metric_event_stream(rx, MetricsMode::Window(Duration::from_secs(60)));
tx.send(Metrics {
timestamp_unix_seconds: 10.0,
role: Role::Client,
direction: MetricDirection::Sender,
stream_count: 2,
protocol: TransportProtocol::Tcp,
transferred_bytes: 4.0,
bandwidth_bits_per_second: 32.0,
interval_duration_seconds: 1.0,
..Metrics::default()
})
.unwrap();
tx.send(Metrics {
timestamp_unix_seconds: 11.0,
role: Role::Client,
direction: MetricDirection::Sender,
stream_count: 2,
protocol: TransportProtocol::Tcp,
transferred_bytes: 8.0,
bandwidth_bits_per_second: 64.0,
interval_duration_seconds: 1.0,
..Metrics::default()
})
.unwrap();
drop(tx);
let Some(MetricEvent::Window(window)) = stream.next() else {
panic!("expected a final window event");
};
assert_eq!(window.transferred_bytes, 12.0);
assert_eq!(window.duration_seconds, 2.0);
assert_eq!(window.bandwidth_bits_per_second.mean, 48.0);
assert_eq!(window.timestamp_unix_seconds, 11.0);
assert_eq!(window.role, Role::Client);
assert_eq!(window.direction, MetricDirection::Sender);
assert_eq!(window.stream_count, 2);
assert_eq!(window.protocol, TransportProtocol::Tcp);
worker.join().unwrap();
assert_eq!(stream.next(), None);
}
#[test]
fn metric_event_stream_splits_windows_when_context_changes() {
let (tx, rx) = unbounded::<Metrics>();
let (mut stream, worker) =
metric_event_stream(rx, MetricsMode::Window(Duration::from_secs(60)));
tx.send(Metrics {
role: Role::Client,
direction: MetricDirection::Sender,
stream_count: 1,
protocol: TransportProtocol::Tcp,
transferred_bytes: 4.0,
interval_duration_seconds: 1.0,
..Metrics::default()
})
.unwrap();
tx.send(Metrics {
role: Role::Client,
direction: MetricDirection::Receiver,
stream_count: 1,
protocol: TransportProtocol::Tcp,
transferred_bytes: 8.0,
interval_duration_seconds: 1.0,
..Metrics::default()
})
.unwrap();
drop(tx);
let Some(MetricEvent::Window(first)) = stream.next() else {
panic!("expected sender window");
};
let Some(MetricEvent::Window(second)) = stream.next() else {
panic!("expected receiver window");
};
assert_eq!(first.transferred_bytes, 4.0);
assert_eq!(first.direction, MetricDirection::Sender);
assert_eq!(second.transferred_bytes, 8.0);
assert_eq!(second.direction, MetricDirection::Receiver);
worker.join().unwrap();
assert_eq!(stream.next(), None);
}
#[cfg(all(feature = "pushgateway", feature = "serde"))]
#[test]
fn metrics_file_errors_are_returned_from_sink_worker() {
use std::fs;
use std::time::{SystemTime, UNIX_EPOCH};
use crate::metrics_file::{MetricsFileFormat, MetricsFileSink};
let nonce = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
let path = std::env::temp_dir().join(format!(
"iperf3-rs-metrics-worker-{}-{nonce}.jsonl",
std::process::id()
));
let sink = MetricsFileSink::new(&path, MetricsFileFormat::Jsonl).unwrap();
fs::remove_file(&path).unwrap();
fs::create_dir(&path).unwrap();
let mut sinks = MetricsSinks::new();
sinks.file(sink);
let (tx, rx) = unbounded();
tx.send(Metrics {
transferred_bytes: 1.0,
interval_duration_seconds: 1.0,
..Metrics::default()
})
.unwrap();
drop(tx);
let err = run_metrics_sinks(rx, sinks).unwrap_err();
assert_eq!(err.kind(), crate::ErrorKind::MetricsFile);
let _ = fs::remove_dir(path);
}
#[test]
fn aggregate_window_returns_none_for_empty_samples() {
assert!(aggregate_window(&[]).is_none());
}
#[test]
fn aggregate_window_summarizes_interval_samples_by_metric_semantics() {
let window = aggregate_window(&[
Metrics {
timestamp_unix_seconds: 10.0,
role: Role::Client,
direction: MetricDirection::Sender,
stream_count: 2,
protocol: TransportProtocol::Tcp,
transferred_bytes: 100.0,
bandwidth_bits_per_second: 800.0,
tcp_retransmits: Some(1.0),
tcp_rtt_seconds: Some(0.010),
tcp_snd_cwnd_bytes: Some(1000.0),
udp_packets: Some(2.0),
interval_duration_seconds: 1.0,
..Metrics::default()
},
Metrics {
timestamp_unix_seconds: 11.0,
role: Role::Client,
direction: MetricDirection::Sender,
stream_count: 2,
protocol: TransportProtocol::Tcp,
transferred_bytes: 900.0,
bandwidth_bits_per_second: 2400.0,
tcp_retransmits: Some(2.0),
tcp_rtt_seconds: Some(0.030),
tcp_snd_cwnd_bytes: Some(3000.0),
udp_packets: Some(3.0),
interval_duration_seconds: 3.0,
omitted: true,
..Metrics::default()
},
])
.unwrap();
assert_eq!(window.duration_seconds, 4.0);
assert_eq!(window.transferred_bytes, 1000.0);
assert_eq!(window.timestamp_unix_seconds, 11.0);
assert_eq!(window.role, Role::Client);
assert_eq!(window.direction, MetricDirection::Sender);
assert_eq!(window.stream_count, 2);
assert_eq!(window.protocol, TransportProtocol::Tcp);
assert_eq!(
window.bandwidth_bits_per_second,
WindowGaugeStats {
samples: 2,
mean: 2000.0,
min: 800.0,
max: 2400.0
}
);
assert_eq!(
window.tcp_rtt_seconds,
WindowGaugeStats {
samples: 2,
mean: 0.020,
min: 0.010,
max: 0.030
}
);
assert_eq!(
window.tcp_snd_cwnd_bytes,
WindowGaugeStats {
samples: 2,
mean: 2000.0,
min: 1000.0,
max: 3000.0
}
);
assert_eq!(window.tcp_retransmits, Some(3.0));
assert_eq!(window.udp_packets, Some(5.0));
assert_eq!(window.omitted_intervals, 1.0);
}
#[test]
fn aggregate_window_falls_back_to_observed_bandwidth_when_duration_is_zero() {
let window = aggregate_window(&[
Metrics {
transferred_bytes: 100.0,
bandwidth_bits_per_second: 800.0,
..Metrics::default()
},
Metrics {
transferred_bytes: 900.0,
bandwidth_bits_per_second: 2400.0,
..Metrics::default()
},
])
.unwrap();
assert_eq!(window.duration_seconds, 0.0);
assert_eq!(
window.bandwidth_bits_per_second,
WindowGaugeStats {
samples: 2,
mean: 1600.0,
min: 800.0,
max: 2400.0
}
);
}
#[test]
fn aggregate_window_ignores_invalid_counter_values() {
let window = aggregate_window(&[
Metrics {
transferred_bytes: f64::NAN,
bandwidth_bits_per_second: f64::INFINITY,
tcp_retransmits: Some(-1.0),
interval_duration_seconds: -1.0,
..Metrics::default()
},
Metrics {
transferred_bytes: 8.0,
bandwidth_bits_per_second: 64.0,
tcp_retransmits: Some(2.0),
interval_duration_seconds: 1.0,
..Metrics::default()
},
])
.unwrap();
assert_eq!(window.duration_seconds, 1.0);
assert_eq!(window.transferred_bytes, 8.0);
assert_eq!(window.tcp_retransmits, Some(2.0));
assert_eq!(
window.bandwidth_bits_per_second,
WindowGaugeStats {
samples: 1,
mean: 64.0,
min: 64.0,
max: 64.0
}
);
}
}