use std::time::Duration;
use chrono::{DateTime, Utc};
use colored::*;
use humansize::{BINARY, BaseUnit, DECIMAL, format_size};
use num_format::{Locale, ToFormattedString};
use serde::{Deserialize, Serialize};
use crate::report::{ConnectionError, Outcome, Sample};
use std::collections::HashMap;
use std::fmt;
pub const WIRE_OVERHEAD_TCP_BYTES: usize = 52;
pub const WIRE_OVERHEAD_UDP_BYTES: usize = 28;
pub const STANDARD_MTU: usize = 1500;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
#[derive(Default)]
pub enum ThroughputAccounting {
#[default]
Goodput,
Wire,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamSamples {
pub stream_id: u32,
pub start_offset_us: u64,
pub samples: Vec<Sample>,
}
impl StreamSamples {
pub fn bytes_transferred(&self) -> u64 {
self.samples
.iter()
.filter(|s| !s.is_warmup && s.is_success())
.map(|s| s.bytes)
.sum()
}
pub fn avg_throughput_bps(&self, window: Duration) -> f64 {
if window.is_zero() {
return 0.0;
}
(self.bytes_transferred() as f64 * 8.0) / window.as_secs_f64()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ThroughputResult {
pub streams: Vec<StreamSamples>,
pub total_duration_us: u64,
pub timestamp: DateTime<Utc>,
#[serde(default)]
pub udp_stats: Option<UdpRunStats>,
#[serde(default)]
pub udp_series: Vec<UdpStatsBucket>,
#[serde(default)]
pub udp_series_window_us: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UdpRunStats {
pub observed_by: UdpStatsSide,
pub received_packets: u64,
pub bytes_received: u64,
pub lost_packets: u64,
pub out_of_order: u64,
pub duplicates: u64,
pub jitter_us: u64,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum UdpStatsSide {
Local,
Remote,
}
impl UdpRunStats {
pub fn loss_fraction(&self) -> Option<f64> {
let sent = self.received_packets + self.lost_packets;
if sent == 0 {
return None;
}
Some(self.lost_packets as f64 / sent as f64)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UdpStatsBucket {
pub t_offset_us: u64,
pub received: u64,
pub bytes_received: u64,
pub lost: u64,
pub out_of_order: u64,
pub duplicates: u64,
pub jitter_us: u32,
}
impl ThroughputResult {
pub fn samples_iter(&self) -> impl Iterator<Item = &Sample> {
self.streams.iter().flat_map(|s| s.samples.iter())
}
pub fn non_warmup_iter(&self) -> impl Iterator<Item = &Sample> {
self.samples_iter().filter(|s| !s.is_warmup)
}
pub fn total_duration(&self) -> Duration {
Duration::from_micros(self.total_duration_us)
}
}
impl fmt::Display for ThroughputResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(
f,
" {}: {}",
"Data Transferred".bright_green().bold(),
format_size(self.bytes_transferred(), BINARY).cyan()
)?;
writeln!(
f,
" {}: {}",
"Duration".bright_green().bold(),
format!("{:.2}s", self.total_duration().as_secs_f64()).yellow()
)?;
writeln!(
f,
" {}: {}",
"Average Throughput".bright_green().bold(),
format_size(
(self.avg_throughput() * 8.0) as u64,
DECIMAL.base_unit(BaseUnit::Bit).suffix("/s"),
)
.magenta()
)?;
let percentiles = [
("Min Throughput", 0.0),
("p50 Throughput", 50.0),
("p90 Throughput", 90.0),
("p95 Throughput", 95.0),
("p99 Throughput", 99.0),
("Max Throughput", 100.0),
];
for (label, p) in percentiles {
if let Some(bps) = self.percentile_throughput_bps(p) {
writeln!(
f,
" {}: {}",
label.bright_green().bold(),
format_size(bps as u64, DECIMAL.base_unit(BaseUnit::Bit).suffix("/s"),)
.magenta()
)?;
}
}
writeln!(
f,
" {}: {}",
"Connection Success Rate".bright_green().bold(),
format!("{:.1}%", self.connection_success_rate() * 100.0).green()
)?;
writeln!(
f,
" {}: {}",
"Request Success Rate".bright_green().bold(),
format!("{:.1}%", self.request_success_rate() * 100.0).green()
)?;
let (total_retries, failed_after_retry) = self.retry_statistics();
if total_retries > 0 {
writeln!(
f,
" {}: {} ({} failed after retry)",
"Total Retries".bright_green().bold(),
total_retries.to_formatted_string(&Locale::en).yellow(),
failed_after_retry.to_formatted_string(&Locale::en).red()
)?;
}
let error_distribution = self.error_distribution();
if !error_distribution.is_empty() {
writeln!(
f,
" {}: {} total",
"Errors".bright_green().bold(),
self.total_errors().to_formatted_string(&Locale::en).red()
)?;
for (error_type, count) in error_distribution {
writeln!(
f,
" {}: {}",
error_type.bright_yellow(),
count.to_formatted_string(&Locale::en).red()
)?;
}
}
writeln!(
f,
" {}: {}",
"Samples".bright_green().bold(),
self.sample_count().to_formatted_string(&Locale::en).white()
)?;
if self.streams.len() > 1 {
writeln!(f, " {}:", "Per-Stream".bright_green().bold())?;
for s in &self.streams {
let bps = s.avg_throughput_bps(self.total_duration());
writeln!(
f,
" stream {:>3}: {} ({} samples)",
s.stream_id.to_string().yellow(),
format_size(bps as u64, DECIMAL.base_unit(BaseUnit::Bit).suffix("/s"))
.magenta(),
s.samples.len().to_formatted_string(&Locale::en).white()
)?;
}
}
if let Some(udp) = &self.udp_stats {
let side = match udp.observed_by {
UdpStatsSide::Local => "client-local",
UdpStatsSide::Remote => "server-reported",
};
writeln!(
f,
" {} ({}):",
"UDP Packet Stats".bright_green().bold(),
side.bright_blue()
)?;
writeln!(
f,
" Packets received: {}",
udp.received_packets.to_formatted_string(&Locale::en).cyan()
)?;
writeln!(
f,
" Lost: {} {}",
udp.lost_packets.to_formatted_string(&Locale::en).red(),
match udp.loss_fraction() {
Some(f) => format!("({:.3}%)", f * 100.0),
None => String::new(),
}
.red()
)?;
writeln!(
f,
" Out-of-order: {}",
udp.out_of_order.to_formatted_string(&Locale::en).yellow()
)?;
writeln!(
f,
" Duplicates: {}",
udp.duplicates.to_formatted_string(&Locale::en).yellow()
)?;
writeln!(
f,
" Jitter (RFC 3550): {} us",
udp.jitter_us.to_formatted_string(&Locale::en).magenta()
)?;
}
if !self.udp_series.is_empty() {
writeln!(
f,
" {}: {} buckets @ {} ms each",
"UDP Stats Series".bright_green().bold(),
self.udp_series
.len()
.to_formatted_string(&Locale::en)
.cyan(),
(self.udp_series_window_us / 1000)
.to_formatted_string(&Locale::en)
.yellow()
)?;
}
writeln!(
f,
" {}: {}",
"Timestamp".bright_green().bold(),
self.timestamp
.format("%Y-%m-%d %H:%M:%S UTC")
.to_string()
.blue()
)?;
Ok(())
}
}
impl ThroughputResult {
pub fn sample_count(&self) -> usize {
self.non_warmup_iter().count()
}
pub fn bytes_transferred(&self) -> u64 {
self.non_warmup_iter()
.filter(|s| s.is_success())
.map(|s| s.bytes)
.sum()
}
pub fn avg_throughput(&self) -> f64 {
if self.total_duration_us == 0 {
return 0.0;
}
(self.bytes_transferred() as f64) / (self.total_duration_us as f64 / 1_000_000.0)
}
pub fn avg_throughput_wire_bps(&self, overhead_per_segment: usize, mtu: usize) -> f64 {
if self.total_duration_us == 0 || mtu == 0 {
return 0.0;
}
let payload_per_segment = mtu.saturating_sub(overhead_per_segment).max(1) as u64;
let total_overhead: u64 = self
.non_warmup_iter()
.filter(|s| s.is_success())
.map(|s| {
let segments = s.bytes.div_ceil(payload_per_segment);
segments * overhead_per_segment as u64
})
.sum();
let total_wire_bytes = self.bytes_transferred() + total_overhead;
(total_wire_bytes as f64 * 8.0) / (self.total_duration_us as f64 / 1_000_000.0)
}
fn analysis_window_us(&self) -> u64 {
const TARGET_WINDOWS: u64 = 100;
const MIN_WINDOW_US: u64 = 10_000; const MAX_WINDOW_US: u64 = 250_000; if self.total_duration_us == 0 {
return MIN_WINDOW_US;
}
(self.total_duration_us / TARGET_WINDOWS).clamp(MIN_WINDOW_US, MAX_WINDOW_US)
}
fn windowed_bps_series(&self, window_us: u64) -> Vec<f64> {
let window_us = window_us.max(1);
let span_us = self.total_duration_us;
if span_us == 0 {
return Vec::new();
}
let origin = self
.non_warmup_iter()
.filter(|s| s.is_success())
.map(|s| s.t_start_us)
.min();
let Some(origin) = origin else {
return Vec::new();
};
let n_windows = span_us.div_ceil(window_us) as usize;
if n_windows == 0 {
return Vec::new();
}
let mut bytes_per_window = vec![0f64; n_windows];
for s in self.non_warmup_iter().filter(|s| s.is_success()) {
let bytes = s.bytes as f64;
if bytes == 0.0 {
continue;
}
let raw_start = s.t_start_us.saturating_sub(origin);
let start = raw_start.min(span_us - 1);
let raw_end = raw_start.saturating_add(s.duration_us.max(1));
let end = raw_end.min(span_us).max(start + 1);
let sample_span = (end - start) as f64;
let first = (start / window_us) as usize;
let last = (((end - 1) / window_us) as usize).min(n_windows - 1);
for (w, bucket) in bytes_per_window
.iter_mut()
.enumerate()
.skip(first)
.take(last + 1 - first)
{
let w_start = (w as u64) * window_us;
let w_end = (w_start + window_us).min(span_us);
let overlap = end.min(w_end).saturating_sub(start.max(w_start));
if overlap > 0 {
*bucket += bytes * (overlap as f64 / sample_span);
}
}
}
(0..n_windows)
.map(|w| {
let w_start = (w as u64) * window_us;
let w_end = (w_start + window_us).min(span_us);
let width_s = (w_end - w_start) as f64 / 1_000_000.0;
if width_s <= 0.0 {
0.0
} else {
(bytes_per_window[w] * 8.0) / width_s
}
})
.collect()
}
pub fn percentile_throughput_bps(&self, n: f64) -> Option<f64> {
if !(0.0..=100.0).contains(&n) {
return None;
}
let mut series = self.windowed_bps_series(self.analysis_window_us());
if series.is_empty() {
return None;
}
series.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let idx = if n == 0.0 {
0
} else if n == 100.0 {
series.len() - 1
} else {
((n / 100.0) * (series.len() - 1) as f64).round() as usize
};
Some(series[idx])
}
pub fn min_throughput_bps(&self) -> Option<f64> {
self.percentile_throughput_bps(0.0)
}
pub fn max_throughput_bps(&self) -> Option<f64> {
self.percentile_throughput_bps(100.0)
}
pub fn connection_success_rate(&self) -> f64 {
let total = self.non_warmup_iter().count();
if total == 0 {
return 0.0;
}
let successful = self.non_warmup_iter().filter(|s| s.is_success()).count();
successful as f64 / total as f64
}
pub fn request_success_rate(&self) -> f64 {
self.connection_success_rate()
}
pub fn retry_statistics(&self) -> (u32, u32) {
let mut total_retries = 0;
let mut failed_after_retry = 0;
for s in self.non_warmup_iter() {
if let Outcome::Failure { retry_count, .. } = &s.outcome {
total_retries += retry_count;
failed_after_retry += 1;
}
}
(total_retries, failed_after_retry)
}
pub fn error_distribution(&self) -> HashMap<String, u32> {
let mut distribution = HashMap::new();
for s in self.non_warmup_iter() {
if let Outcome::Failure { error, .. } = &s.outcome {
let kind = match error {
ConnectionError::ConnectionFailed(_) => "Connection Failed",
ConnectionError::TransferFailed(_) => "Transfer Failed",
ConnectionError::Timeout(_) => "Timeout",
ConnectionError::Unknown(_) => "Unknown",
};
*distribution.entry(kind.to_string()).or_insert(0) += 1;
}
}
distribution
}
pub fn total_errors(&self) -> u32 {
self.non_warmup_iter().filter(|s| !s.is_success()).count() as u32
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
fn result_from(samples: Vec<Sample>, total_duration_us: u64) -> ThroughputResult {
ThroughputResult {
streams: vec![StreamSamples {
stream_id: 0,
start_offset_us: samples.first().map(|s| s.t_start_us).unwrap_or(0),
samples,
}],
total_duration_us,
timestamp: Utc::now(),
udp_stats: None,
udp_series: Vec::new(),
udp_series_window_us: 0,
}
}
fn assert_min_le_avg_le_max(r: &ThroughputResult) {
let avg_bps = r.avg_throughput() * 8.0;
let min = r.min_throughput_bps().expect("min");
let max = r.max_throughput_bps().expect("max");
let eps = avg_bps * 1e-6 + 1.0;
assert!(
min <= avg_bps + eps,
"min {min} must not exceed avg {avg_bps}"
);
assert!(
avg_bps <= max + eps,
"avg {avg_bps} must not exceed max {max}"
);
assert!(min <= max + eps, "min {min} must not exceed max {max}");
}
#[test]
fn udp_like_paced_stream_keeps_min_below_avg() {
let mut samples = Vec::new();
for i in 0..10_000u64 {
samples.push(Sample::success(i * 100, 2, 1200, false));
}
let r = result_from(samples, 1_000_000); assert_min_le_avg_le_max(&r);
let max = r.max_throughput_bps().unwrap();
assert!(max < 500_000_000.0, "windowed max {max} unexpectedly high");
}
#[test]
fn http_like_large_slow_samples_keep_min_le_avg_le_max() {
let eight_mb = 8 * 1024 * 1024;
let samples = vec![
Sample::success(0, 2_000_000, eight_mb, false),
Sample::success(2_000_000, 2_000_000, eight_mb, false),
Sample::success(4_000_000, 2_000_000, eight_mb, false),
];
let r = result_from(samples, 6_000_000); assert_min_le_avg_le_max(&r);
}
#[test]
fn single_window_collapses_to_average() {
let r = result_from(vec![Sample::success(0, 500, 4096, false)], 1000);
let avg_bps = r.avg_throughput() * 8.0;
let min = r.min_throughput_bps().unwrap();
let max = r.max_throughput_bps().unwrap();
assert!((min - max).abs() < 1.0, "single window must be flat");
assert!((min - avg_bps).abs() <= avg_bps * 1e-6 + 1.0);
}
#[test]
fn no_successful_samples_yields_no_percentile() {
let r = result_from(Vec::new(), 1_000_000);
assert!(r.min_throughput_bps().is_none());
assert!(r.max_throughput_bps().is_none());
}
}