use engineering_repr::EngineeringRepr as _;
use human_repr::{HumanCount, HumanDuration, HumanThroughput};
use num_format::ToFormattedString as _;
use quinn::ConnectionStats;
use std::{
cmp::{self, max},
fmt::Display,
time::Duration,
};
use tracing::{info, warn};
use crate::{
config::Configuration,
protocol::{
Variant,
control::{ClosedownReportExtension, ClosedownReportV1, Direction},
},
session::CommandStats,
};
#[derive(Debug, Clone, Copy)]
pub(crate) struct DataRate {
rate: Option<f64>,
}
impl DataRate {
#[must_use]
pub(crate) fn new(bytes: u64, time: Option<Duration>) -> Self {
match time {
None => Self { rate: None },
Some(time) if time.is_zero() => Self { rate: None }, Some(time) => Self {
#[allow(clippy::cast_precision_loss)]
rate: Some((bytes as f64) / time.as_secs_f64()),
},
}
}
}
impl From<DataRate> for f64 {
fn from(d: DataRate) -> Self {
d.rate.unwrap_or_default()
}
}
impl From<DataRate> for u64 {
fn from(d: DataRate) -> Self {
use num_traits::AsPrimitive as _;
d.rate.unwrap_or_default().as_()
}
}
impl Display for DataRate {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.rate {
None => f.write_str("unknown"),
Some(rate) => rate.human_throughput_bytes().fmt(f),
}
}
}
pub(crate) fn format_rate(size: u64, time: Option<Duration>, measured_peak: u64) -> String {
let average_rate = DataRate::new(size, time);
let actual_peak = max(measured_peak, average_rate.into());
format!(
"{size} in {time}; average {rate}; peak {peak}",
size = size.human_count_bytes(),
time = time.map_or("unknown".to_string(), |d| d.human_duration().to_string()),
rate = average_rate,
peak = actual_peak.human_throughput_bytes(),
)
}
#[cfg_attr(coverage_nightly, coverage(off))]
pub(crate) fn process_statistics(
stats: &ConnectionStats,
command_stats: CommandStats,
transport_time: Option<Duration>,
remote_stats: &ClosedownReportV1,
bandwidth: &Configuration,
show_statistics: bool,
direction: Direction,
) {
#![allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
let locale = &num_format::Locale::en;
let payload_bytes = command_stats.payload_bytes;
if payload_bytes != 0 {
info!(
"Transferred {}",
format_rate(
payload_bytes,
transport_time,
command_stats.peak_transfer_rate,
)
);
}
if show_statistics {
info!(
"Total packets sent: {} by us; {} by remote",
stats.path.sent_packets.to_formatted_string(locale),
remote_stats.sent_packets.0.to_formatted_string(locale),
);
}
let congestion = stats.path.congestion_events + remote_stats.congestion_events.0;
if congestion > 0 {
warn!(
"Congestion events detected: {}",
congestion.human_count_bare()
);
}
if stats.path.lost_packets > 0 {
#[allow(clippy::cast_precision_loss)]
let pct = 100. * stats.path.lost_packets as f64 / stats.path.sent_packets as f64;
warn!(
"Lost packets: {count}/{total} ({pct:.2}%, for {bytes})",
count = stats.path.lost_packets.human_count_bare(),
total = stats.path.sent_packets.human_count_bare(),
bytes = stats.path.lost_bytes.human_count_bytes(),
);
}
if remote_stats.lost_packets.0 > 0 {
#[allow(clippy::cast_precision_loss)]
let pct = 100. * remote_stats.lost_packets.0 as f64 / remote_stats.sent_packets.0 as f64;
warn!(
"Remote lost packets: {count}/{total} ({pct:.2}%, for {bytes})",
count = remote_stats.lost_packets.0.human_count_bare(),
total = remote_stats.sent_packets.0.human_count_bare(),
bytes = remote_stats.lost_bytes.0.human_count_bytes(),
);
}
if show_statistics {
advanced_statistics(stats, command_stats, remote_stats);
}
check_rtt(stats, bandwidth);
suggest_bandwidth_tuning(
stats,
command_stats,
transport_time,
remote_stats,
bandwidth,
direction,
);
}
#[cfg_attr(coverage_nightly, coverage(off))]
fn advanced_statistics(
stats: &ConnectionStats,
command_stats: CommandStats,
remote_stats: &ClosedownReportV1,
) {
let sender_sent_bytes = cmp::max(stats.udp_tx.bytes, remote_stats.sent_bytes.0);
let locale = &num_format::Locale::en;
let payload_bytes = command_stats.payload_bytes;
{
use crate::protocol::FindTag as _;
use serde_bare::Uint;
let cwnd = cmp::max(stats.path.cwnd, remote_stats.cwnd.0);
let not_reported = Variant::from("<not reported>");
let remote_pmtu: &Variant = remote_stats
.extension
.find_tag(ClosedownReportExtension::Pmtu)
.unwrap_or(¬_reported);
let remote_rtt = remote_stats
.extension
.find_tag(ClosedownReportExtension::Rtt);
let remote_rtt = if let Some(Variant::Unsigned(Uint(v))) = remote_rtt {
Duration::from_micros(*v).human_duration().to_string()
} else {
"<not reported>".into()
};
info!(
"Path MTU {pmtu} (remote: {remote_pmtu}), round-trip time {rtt} (remote: {remote_rtt}), final congestion window {cwnd}",
pmtu = stats.path.current_mtu,
rtt = stats.path.rtt.human_duration(),
cwnd = cwnd.to_formatted_string(locale),
);
let black_holes = stats.path.black_holes_detected + remote_stats.black_holes.0;
info!(
"{tx} datagrams sent, {rx} received, {black_holes} black holes detected",
tx = stats.udp_tx.datagrams.human_count_bare(),
rx = stats.udp_rx.datagrams.human_count_bare(),
black_holes = black_holes.to_formatted_string(locale),
);
if payload_bytes != 0 {
#[allow(clippy::cast_precision_loss)]
let overhead_pct =
100. * (sender_sent_bytes - payload_bytes) as f64 / payload_bytes as f64;
info!(
"{} total bytes sent for {} bytes payload ({:.2}% overhead/loss)",
sender_sent_bytes.to_formatted_string(locale),
payload_bytes.to_formatted_string(locale),
overhead_pct
);
}
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
fn check_rtt(stats: &ConnectionStats, bandwidth: &Configuration) {
#[allow(clippy::cast_possible_truncation)]
if (stats.path.rtt.as_millis() as u32) > u32::from(bandwidth.rtt) * 110 / 100 {
warn!(
"Measured path RTT {rtt_measured:?} was greater than configuration {rtt_arg}; for better performance, next time try --rtt {rtt_param}",
rtt_measured = stats.path.rtt,
rtt_arg = bandwidth.rtt,
rtt_param = stats.path.rtt.as_millis() + 1, );
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
fn suggest_bandwidth_tuning(
stats: &ConnectionStats,
command_stats: CommandStats,
transport_time: Option<Duration>,
remote_stats: &ClosedownReportV1,
config: &Configuration,
direction: Direction,
) {
let payload_bytes = command_stats.payload_bytes;
if payload_bytes == 0 {
return;
}
let rtt_ms = u64::try_from(stats.path.rtt.as_millis()).unwrap_or(0);
let rtt_ms = if rtt_ms == 0 {
u64::from(config.rtt)
} else {
rtt_ms
};
if rtt_ms == 0 {
return;
}
let (sender_cwnd, current_bw, sender_frame_stats) = match direction {
Direction::ClientToServer => (stats.path.cwnd, config.tx(), &stats.frame_tx),
Direction::ServerToClient => (remote_stats.cwnd.0, config.rx(), &stats.frame_rx),
Direction::Both => return,
};
if sender_cwnd == 0 || current_bw == 0 {
return;
}
let blocked_data = sender_frame_stats.data_blocked;
let blocked_stream = sender_frame_stats.stream_data_blocked;
let flow_control_blocked = blocked_data > 0 || blocked_stream > 0;
let bw_from_cwnd = {
let numerator = u128::from(sender_cwnd).saturating_mul(1000);
let denominator = u128::from(rtt_ms);
let bw = numerator.div_ceil(denominator);
u64::try_from(bw).unwrap_or(u64::MAX)
};
let bw_from_average = transport_time
.and_then(|t| {
let nanos = t.as_nanos();
(nanos != 0).then(|| {
let bytes = u128::from(payload_bytes);
bytes
.saturating_mul(1_000_000_000)
.div_ceil(nanos)
.min(u128::from(u64::MAX))
})
})
.and_then(|v| u64::try_from(v).ok())
.unwrap_or(0);
let mut suggested_bw = bw_from_cwnd.max(bw_from_average);
suggested_bw =
u64::try_from(u128::from(suggested_bw).saturating_mul(5).div_ceil(4)).unwrap_or(u64::MAX);
if flow_control_blocked && suggested_bw <= current_bw {
suggested_bw = current_bw.saturating_mul(2);
}
suggested_bw = suggested_bw.max(current_bw);
let flag = match direction {
Direction::ClientToServer => "--tx",
Direction::ServerToClient => "--rx",
Direction::Both => unreachable!(),
};
let rtt_str = if stats.path.rtt.as_millis() == 0 {
config.rtt_duration().human_duration().to_string()
} else {
stats.path.rtt.human_duration().to_string()
};
let cwnd_str = sender_cwnd.human_count_bytes();
let suggested_str = suggested_bw.to_eng(4);
let current_str = current_bw.to_eng(4);
let increase_substantial = suggested_bw > current_bw.saturating_mul(11) / 10;
if !flow_control_blocked && !increase_substantial {
return;
}
if flow_control_blocked {
warn!(
"Observed flow-control blocking (DATA_BLOCKED={blocked_data}, STREAM_DATA_BLOCKED={blocked_stream}); measured RTT {rtt_str}, sender cwnd {cwnd_str}; for better performance, next time try {flag} {suggested_str} (current {current_str})"
);
} else {
warn!(
"Measured RTT {rtt_str}, sender cwnd {cwnd_str}; for better performance, next time try {flag} {suggested_str} (current {current_str})"
);
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
#[allow(clippy::float_cmp)]
mod tests {
use super::DataRate;
use pretty_assertions::assert_eq;
use std::time::Duration;
#[test]
fn unknown() {
let r = DataRate::new(1234, None);
assert_eq!(u64::from(r), 0);
assert_eq!(f64::from(r), 0.);
assert_eq!(format!("{r}"), "unknown");
}
#[test]
fn zero() {
let r = DataRate::new(1234, Some(Duration::from_secs(0)));
assert_eq!(u64::from(r), 0);
assert_eq!(f64::from(r), 0.);
assert_eq!(format!("{r}"), "unknown");
}
fn test_case(bytes: u64, time: u64, expect_u: u64, expect_s: &str) {
let r = DataRate::new(bytes, Some(Duration::from_secs(time)));
assert_eq!(u64::from(r), expect_u);
assert_eq!(format!("{r}"), expect_s);
}
#[test]
fn correctness() {
test_case(42, 1, 42, "42B/s");
test_case(1234, 1, 1234, "1.2kB/s");
test_case(10_000_000_000, 500, 20_000_000, "20MB/s");
test_case(1_000_000_000_000_000, 1234, 810_372_771_474, "810.37GB/s");
}
#[test]
fn format_rate_() {
assert_eq!(
super::format_rate(10_000_000_000, Some(Duration::from_secs(500)), 40_000_000),
"10GB in 8:20; average 20MB/s; peak 40MB/s"
);
}
}