use parking_lot::Mutex;
use serde::ser::SerializeStruct;
use serde::{Deserialize, Serialize, Serializer};
use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
const MAX_INTERVALS: usize = if cfg!(test) { 100 } else { 86400 };
#[derive(Debug, Clone)]
pub struct CircularIntervalBuffer {
intervals: VecDeque<IntervalStats>,
}
impl Serialize for CircularIntervalBuffer {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
self.intervals.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for CircularIntervalBuffer {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let intervals: VecDeque<IntervalStats> = VecDeque::deserialize(deserializer)?;
Ok(Self { intervals })
}
}
impl CircularIntervalBuffer {
#[inline]
fn new() -> Self {
Self {
intervals: VecDeque::new(),
}
}
#[inline]
fn with_capacity(_capacity: usize) -> Self {
Self {
intervals: VecDeque::with_capacity(_capacity.min(MAX_INTERVALS)),
}
}
#[inline]
fn push_back(&mut self, item: IntervalStats) {
if self.intervals.len() >= MAX_INTERVALS {
self.intervals.pop_front(); }
self.intervals.push_back(item);
}
#[inline]
fn len(&self) -> usize {
self.intervals.len()
}
#[inline]
fn iter(&self) -> std::collections::vec_deque::Iter<'_, IntervalStats> {
self.intervals.iter()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConnectionInfo {
pub socket_fd: Option<i32>,
pub local_host: String,
pub local_port: u16,
pub remote_host: String,
pub remote_port: u16,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TestConfig {
pub protocol: String,
pub num_streams: usize,
pub blksize: usize,
pub omit: u64,
pub duration: u64,
pub reverse: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemInfo {
pub version: String,
pub system_info: String,
pub timestamp: i64,
pub timestamp_str: String,
}
const TCP_SND_CWND_PRESENT: u8 = 1 << 0;
const TCP_RTT_PRESENT: u8 = 1 << 1;
const TCP_RTTVAR_PRESENT: u8 = 1 << 2;
const TCP_PMTU_PRESENT: u8 = 1 << 3;
impl TcpStats {
#[inline(always)]
const fn has_flag(&self, flag: u8) -> bool {
self.flags & flag != 0
}
#[inline(always)]
#[allow(dead_code)]
const fn set_flag(&mut self, flag: u8) {
self.flags |= flag;
}
}
mod option_u64_max {
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S>(value: &u64, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
if *value == u64::MAX {
serializer.serialize_none()
} else {
serializer.serialize_some(value)
}
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<u64, D::Error>
where
D: Deserializer<'de>,
{
let opt: Option<u64> = Option::deserialize(deserializer)?;
Ok(opt.unwrap_or(u64::MAX))
}
}
#[derive(Debug, Clone, Default, Deserialize)]
#[serde(from = "TcpStatsDto")]
pub struct TcpStats {
pub retransmits: u64,
pub snd_cwnd: u64,
pub rtt: u64,
pub rttvar: u64,
pub pmtu: u64,
#[serde(skip)]
pub flags: u8,
}
#[derive(Deserialize)]
struct TcpStatsDto {
#[serde(default)]
retransmits: u64,
snd_cwnd: Option<u64>,
rtt: Option<u64>,
rttvar: Option<u64>,
pmtu: Option<u64>,
}
impl From<TcpStatsDto> for TcpStats {
fn from(dto: TcpStatsDto) -> Self {
let mut flags = 0;
let snd_cwnd = if let Some(v) = dto.snd_cwnd {
flags |= TCP_SND_CWND_PRESENT;
v
} else {
0
};
let rtt = if let Some(v) = dto.rtt {
flags |= TCP_RTT_PRESENT;
v
} else {
0
};
let rttvar = if let Some(v) = dto.rttvar {
flags |= TCP_RTTVAR_PRESENT;
v
} else {
0
};
let pmtu = if let Some(v) = dto.pmtu {
flags |= TCP_PMTU_PRESENT;
v
} else {
0
};
Self {
retransmits: dto.retransmits,
snd_cwnd,
rtt,
rttvar,
pmtu,
flags,
}
}
}
impl Serialize for TcpStats {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("TcpStats", 5)?;
state.serialize_field("retransmits", &self.retransmits)?;
if self.flags & TCP_SND_CWND_PRESENT != 0 {
state.serialize_field("snd_cwnd", &self.snd_cwnd)?;
} else {
state.serialize_field("snd_cwnd", &None::<u64>)?;
}
if self.flags & TCP_RTT_PRESENT != 0 {
state.serialize_field("rtt", &self.rtt)?;
} else {
state.serialize_field("rtt", &None::<u64>)?;
}
if self.flags & TCP_RTTVAR_PRESENT != 0 {
state.serialize_field("rttvar", &self.rttvar)?;
} else {
state.serialize_field("rttvar", &None::<u64>)?;
}
if self.flags & TCP_PMTU_PRESENT != 0 {
state.serialize_field("pmtu", &self.pmtu)?;
} else {
state.serialize_field("pmtu", &None::<u64>)?;
}
state.end()
}
}
impl TcpStats {
#[inline(always)]
pub const fn snd_cwnd_opt(&self) -> Option<u64> {
if self.has_flag(TCP_SND_CWND_PRESENT) {
Some(self.snd_cwnd)
} else {
None
}
}
#[inline(always)]
pub const fn rtt_opt(&self) -> Option<u64> {
if self.has_flag(TCP_RTT_PRESENT) {
Some(self.rtt)
} else {
None
}
}
#[inline(always)]
pub const fn rttvar_opt(&self) -> Option<u64> {
if self.has_flag(TCP_RTTVAR_PRESENT) {
Some(self.rttvar)
} else {
None
}
}
#[inline(always)]
pub const fn pmtu_opt(&self) -> Option<u64> {
if self.has_flag(TCP_PMTU_PRESENT) {
Some(self.pmtu)
} else {
None
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UdpStats {
pub jitter_ms: f64,
pub lost_packets: u64,
pub packets: u64,
pub lost_percent: f64,
#[serde(default, with = "option_u64_max")]
pub out_of_order: u64,
}
impl Default for UdpStats {
fn default() -> Self {
Self {
jitter_ms: 0.0,
lost_packets: 0,
packets: 0,
lost_percent: 0.0,
out_of_order: u64::MAX,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DetailedIntervalStats {
pub socket: Option<i32>,
pub start: f64,
pub end: f64,
pub seconds: f64,
pub bytes: u64,
pub bits_per_second: f64,
#[serde(flatten)]
pub tcp_stats: TcpStats,
#[serde(skip_serializing_if = "Option::is_none")]
pub packets: Option<u64>,
pub omitted: bool,
pub sender: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UdpIntervalStats {
pub socket: Option<i32>,
pub start: f64,
pub end: f64,
pub seconds: f64,
pub bytes: u64,
pub bits_per_second: f64,
pub packets: u64,
pub omitted: bool,
pub sender: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamSummary {
pub socket: Option<i32>,
pub start: f64,
pub end: f64,
pub seconds: f64,
pub bytes: u64,
pub bits_per_second: f64,
pub retransmits: u64,
pub max_snd_cwnd: Option<u64>,
pub max_rtt: Option<u64>,
pub min_rtt: Option<u64>,
pub mean_rtt: Option<u64>,
pub sender: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UdpStreamSummary {
pub socket: Option<i32>,
pub start: f64,
pub end: f64,
pub seconds: f64,
pub bytes: u64,
pub bits_per_second: f64,
pub jitter_ms: f64,
pub lost_packets: u64,
pub packets: u64,
pub lost_percent: f64,
#[serde(skip_serializing_if = "Option::is_none")]
pub out_of_order: Option<u64>,
pub sender: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UdpSum {
pub start: f64,
pub end: f64,
pub seconds: f64,
pub bytes: u64,
pub bits_per_second: f64,
pub jitter_ms: f64,
pub lost_packets: u64,
pub packets: u64,
pub lost_percent: f64,
pub sender: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CpuUtilization {
pub host_total: f64,
pub host_user: f64,
pub host_system: f64,
pub remote_total: f64,
pub remote_user: f64,
pub remote_system: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DetailedTestResults {
pub start: TestStartInfo,
pub intervals: Vec<IntervalData>,
pub end: TestEndInfo,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TestStartInfo {
pub connected: Vec<ConnectionInfo>,
pub version: String,
pub system_info: String,
pub timestamp: TimestampInfo,
pub connecting_to: ConnectingTo,
pub cookie: String,
pub tcp_mss_default: Option<u32>,
pub sock_bufsize: u32,
pub sndbuf_actual: u32,
pub rcvbuf_actual: u32,
pub test_start: TestConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimestampInfo {
pub time: String,
pub timesecs: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConnectingTo {
pub host: String,
pub port: u16,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum IntervalData {
Tcp {
streams: Vec<DetailedIntervalStats>,
sum: DetailedIntervalStats,
},
Udp {
streams: Vec<UdpIntervalStats>,
sum: UdpIntervalStats,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum TestEndInfo {
Tcp {
streams: Vec<EndStreamInfo>,
sum_sent: Box<StreamSummary>,
sum_received: Box<StreamSummary>,
#[serde(skip_serializing_if = "Option::is_none")]
cpu_utilization_percent: Option<CpuUtilization>,
#[serde(skip_serializing_if = "Option::is_none")]
sender_tcp_congestion: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
receiver_tcp_congestion: Option<String>,
},
Udp {
streams: Vec<UdpEndStreamInfo>,
sum: UdpSum,
#[serde(skip_serializing_if = "Option::is_none")]
cpu_utilization_percent: Option<CpuUtilization>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EndStreamInfo {
pub sender: StreamSummary,
pub receiver: StreamSummary,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UdpEndStreamInfo {
pub udp: UdpStreamSummary,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamStats {
pub stream_id: usize,
pub bytes_sent: u64,
pub bytes_received: u64,
pub duration: Duration,
pub retransmits: Option<u64>,
}
impl StreamStats {
pub fn new(stream_id: usize) -> Self {
Self {
stream_id,
bytes_sent: 0,
bytes_received: 0,
duration: Duration::ZERO,
retransmits: None,
}
}
pub fn bits_per_second(&self) -> f64 {
if self.duration.as_secs_f64() > 0.0 {
let total_bytes = self.bytes_sent + self.bytes_received;
(total_bytes as f64 * 8.0) / self.duration.as_secs_f64()
} else {
0.0
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IntervalStats {
pub start: Duration,
pub end: Duration,
pub bytes: u64,
pub bits_per_second: f64,
#[serde(default, with = "option_u64_max")]
pub packets: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Measurements {
pub streams: Vec<StreamStats>,
pub intervals: CircularIntervalBuffer,
pub total_bytes_sent: u64,
pub total_bytes_received: u64,
pub total_duration: Duration,
pub total_packets: u64,
pub lost_packets: u64,
pub out_of_order_packets: u64,
pub jitter_ms: f64,
#[serde(skip)]
pub start_time: Option<Instant>,
}
impl Measurements {
pub fn new() -> Self {
Self {
streams: Vec::new(),
intervals: CircularIntervalBuffer::new(),
total_bytes_sent: 0,
total_bytes_received: 0,
total_duration: Duration::ZERO,
total_packets: 0,
lost_packets: 0,
out_of_order_packets: 0,
jitter_ms: 0.0,
start_time: None,
}
}
pub fn with_capacity(expected_streams: usize, expected_intervals: usize) -> Self {
Self {
streams: Vec::with_capacity(expected_streams),
intervals: CircularIntervalBuffer::with_capacity(expected_intervals),
total_bytes_sent: 0,
total_bytes_received: 0,
total_duration: Duration::ZERO,
total_packets: 0,
lost_packets: 0,
out_of_order_packets: 0,
jitter_ms: 0.0,
start_time: None,
}
}
pub fn total_bits_per_second(&self) -> f64 {
if self.total_duration.as_secs_f64() > 0.0 {
let total_bytes = self.total_bytes_sent + self.total_bytes_received;
(total_bytes as f64 * 8.0) / self.total_duration.as_secs_f64()
} else {
0.0
}
}
pub fn add_stream(&mut self, stats: StreamStats) {
self.total_bytes_sent += stats.bytes_sent;
self.total_bytes_received += stats.bytes_received;
self.streams.push(stats);
}
#[inline]
pub fn add_interval(&mut self, interval: IntervalStats) {
self.intervals.push_back(interval);
}
pub fn set_duration(&mut self, duration: Duration) {
self.total_duration = duration;
for stream in &mut self.streams {
stream.duration = duration;
}
}
pub fn set_start_time(&mut self, time: Instant) {
self.start_time = Some(time);
}
pub fn calculate_udp_loss(&self) -> (u64, u64) {
(self.lost_packets, self.total_packets)
}
}
impl Default for Measurements {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
struct PerStreamMeasurements {
stream_id: usize,
bytes_sent: AtomicU64,
bytes_received: AtomicU64,
packets: AtomicU64,
}
impl PerStreamMeasurements {
fn new(stream_id: usize) -> Self {
Self {
stream_id,
bytes_sent: AtomicU64::new(0),
bytes_received: AtomicU64::new(0),
packets: AtomicU64::new(0),
}
}
fn to_stream_stats(&self, duration: Duration) -> StreamStats {
StreamStats {
stream_id: self.stream_id,
bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
bytes_received: self.bytes_received.load(Ordering::Relaxed),
duration,
retransmits: None,
}
}
}
#[derive(Debug)]
pub struct MeasurementsCollector {
inner: Arc<Mutex<Measurements>>,
udp_state: Arc<Mutex<UdpPacketState>>,
per_stream: Arc<Mutex<Vec<Arc<PerStreamMeasurements>>>>,
atomic_bytes_sent: Arc<AtomicU64>,
atomic_bytes_received: Arc<AtomicU64>,
atomic_packets: Arc<AtomicU64>,
}
impl Clone for MeasurementsCollector {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
udp_state: Arc::clone(&self.udp_state),
per_stream: Arc::clone(&self.per_stream),
atomic_bytes_sent: Arc::clone(&self.atomic_bytes_sent),
atomic_bytes_received: Arc::clone(&self.atomic_bytes_received),
atomic_packets: Arc::clone(&self.atomic_packets),
}
}
}
#[derive(Debug, Clone)]
struct UdpPacketState {
last_sequence: Option<u64>,
max_sequence: Option<u64>,
received_count: u64,
last_arrival_us: Option<u64>,
last_send_timestamp_us: Option<u64>,
jitter_ms: f64,
out_of_order: u64,
}
impl Default for UdpPacketState {
fn default() -> Self {
Self {
last_sequence: None,
max_sequence: None,
received_count: 0,
last_arrival_us: None,
last_send_timestamp_us: None,
jitter_ms: 0.0,
out_of_order: 0,
}
}
}
impl MeasurementsCollector {
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(Measurements::new())),
udp_state: Arc::new(Mutex::new(UdpPacketState::default())),
per_stream: Arc::new(Mutex::new(Vec::new())),
atomic_bytes_sent: Arc::new(AtomicU64::new(0)),
atomic_bytes_received: Arc::new(AtomicU64::new(0)),
atomic_packets: Arc::new(AtomicU64::new(0)),
}
}
pub fn with_capacity(expected_streams: usize, expected_intervals: usize) -> Self {
Self {
inner: Arc::new(Mutex::new(Measurements::with_capacity(
expected_streams,
expected_intervals,
))),
udp_state: Arc::new(Mutex::new(UdpPacketState::default())),
per_stream: Arc::new(Mutex::new(Vec::with_capacity(expected_streams))),
atomic_bytes_sent: Arc::new(AtomicU64::new(0)),
atomic_bytes_received: Arc::new(AtomicU64::new(0)),
atomic_packets: Arc::new(AtomicU64::new(0)),
}
}
fn get_or_create_stream(&self, stream_id: usize) -> Arc<PerStreamMeasurements> {
let mut streams = self.per_stream.lock();
if let Some(stream) = streams.iter().find(|s| s.stream_id == stream_id) {
return Arc::clone(stream);
}
let stream = Arc::new(PerStreamMeasurements::new(stream_id));
streams.push(Arc::clone(&stream));
stream
}
pub fn record_bytes_sent(&self, stream_id: usize, bytes: u64) {
self.atomic_bytes_sent.fetch_add(bytes, Ordering::Relaxed);
let stream = self.get_or_create_stream(stream_id);
stream.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
}
pub fn record_bytes_received(&self, stream_id: usize, bytes: u64) {
self.atomic_bytes_received
.fetch_add(bytes, Ordering::Relaxed);
let stream = self.get_or_create_stream(stream_id);
stream.bytes_received.fetch_add(bytes, Ordering::Relaxed);
}
pub fn add_interval(&self, interval: IntervalStats) {
self.inner.lock().add_interval(interval);
}
pub fn record_udp_packet(&self, stream_id: usize) {
self.atomic_packets.fetch_add(1, Ordering::Relaxed);
let stream = self.get_or_create_stream(stream_id);
stream.packets.fetch_add(1, Ordering::Relaxed);
}
pub fn sync_atomic_counters(&self) {
let mut m = self.inner.lock();
m.total_bytes_sent = self.atomic_bytes_sent.load(Ordering::Relaxed);
m.total_bytes_received = self.atomic_bytes_received.load(Ordering::Relaxed);
m.total_packets = self.atomic_packets.load(Ordering::Relaxed);
}
pub fn record_udp_packet_received(
&self,
sequence: u64,
send_timestamp_us: u64,
recv_timestamp_us: u64,
) {
if sequence == u64::MAX {
return;
}
let mut state = self.udp_state.lock();
let mut m = self.inner.lock();
state.received_count += 1;
match state.max_sequence {
None => state.max_sequence = Some(sequence),
Some(max) if sequence > max => state.max_sequence = Some(sequence),
_ => {}
}
if let Some(last_seq) = state.last_sequence {
if sequence < last_seq {
state.out_of_order += 1;
m.out_of_order_packets += 1;
}
}
if let (Some(last_arrival), Some(last_send)) =
(state.last_arrival_us, state.last_send_timestamp_us)
{
let current_transit = recv_timestamp_us.saturating_sub(send_timestamp_us);
let previous_transit = last_arrival.saturating_sub(last_send);
let transit_delta = current_transit.abs_diff(previous_transit);
state.jitter_ms = state.jitter_ms + (transit_delta as f64 - state.jitter_ms) / 16.0;
m.jitter_ms = state.jitter_ms / 1000.0;
}
state.last_sequence = Some(sequence);
state.last_arrival_us = Some(recv_timestamp_us);
state.last_send_timestamp_us = Some(send_timestamp_us);
}
pub fn calculate_udp_loss(&self) -> (u64, u64) {
let state = self.udp_state.lock();
let max_seq = match state.max_sequence {
Some(max) => max,
None => return (0, 0),
};
let expected = max_seq + 1;
let received = state.received_count;
let lost = expected.saturating_sub(received);
(lost, expected)
}
pub fn record_udp_loss(&self, lost: u64) {
let mut m = self.inner.lock();
m.lost_packets += lost;
}
pub fn update_jitter(&self, jitter: f64) {
let mut m = self.inner.lock();
m.jitter_ms = if m.jitter_ms == 0.0 {
jitter
} else {
m.jitter_ms * 0.875 + jitter * 0.125
};
}
pub fn set_duration(&self, duration: Duration) {
self.inner.lock().set_duration(duration);
}
pub fn set_start_time(&self, time: Instant) {
self.inner.lock().set_start_time(time);
}
pub fn get(&self) -> Measurements {
let mut m = self.inner.lock().clone();
m.total_bytes_sent = self.atomic_bytes_sent.load(Ordering::Relaxed);
m.total_bytes_received = self.atomic_bytes_received.load(Ordering::Relaxed);
m.total_packets = self.atomic_packets.load(Ordering::Relaxed);
let streams = self.per_stream.lock();
m.streams.clear();
for stream in streams.iter() {
m.streams.push(stream.to_stream_stats(m.total_duration));
}
drop(streams);
if m.total_bytes_received > 0 {
let (lost, expected) = self.calculate_udp_loss();
m.lost_packets = lost;
m.total_packets = expected;
}
m
}
pub fn get_stream_stats(&self, stream_id: usize) -> Option<StreamStats> {
self.inner
.lock()
.streams
.iter()
.find(|s| s.stream_id == stream_id)
.cloned()
}
pub fn get_detailed_results(
&self,
connection_info: Option<ConnectionInfo>,
system_info: Option<SystemInfo>,
test_config: TestConfig,
) -> DetailedTestResults {
let m = self.inner.lock();
let is_udp = test_config.protocol.to_uppercase() == "UDP";
let start_info = TestStartInfo {
connected: connection_info.clone().into_iter().collect(),
version: format!("rperf3 {}", env!("CARGO_PKG_VERSION")),
system_info: system_info
.as_ref()
.map(|s| s.system_info.clone())
.unwrap_or_else(|| format!("{} {}", std::env::consts::OS, std::env::consts::ARCH)),
timestamp: TimestampInfo {
time: system_info
.as_ref()
.map(|s| s.timestamp_str.clone())
.unwrap_or_else(|| chrono::Utc::now().to_rfc2822()),
timesecs: system_info
.as_ref()
.map(|s| s.timestamp)
.unwrap_or_else(|| chrono::Utc::now().timestamp()),
},
connecting_to: ConnectingTo {
host: connection_info
.as_ref()
.map(|c| c.remote_host.clone())
.unwrap_or_default(),
port: connection_info
.as_ref()
.map(|c| c.remote_port)
.unwrap_or(5201),
},
cookie: format!("{:x}", rand::random::<u128>()),
tcp_mss_default: if is_udp { None } else { Some(1448) },
sock_bufsize: 0,
sndbuf_actual: if is_udp { 212992 } else { 16384 },
rcvbuf_actual: if is_udp { 212992 } else { 131072 },
test_start: test_config.clone(),
};
let intervals = if is_udp {
self.build_udp_intervals(&m, &connection_info)
} else {
self.build_tcp_intervals(&m, &connection_info)
};
let end_info = if is_udp {
self.build_udp_end_info(&m, &connection_info)
} else {
self.build_tcp_end_info(&m, &connection_info)
};
DetailedTestResults {
start: start_info,
intervals,
end: end_info,
}
}
fn build_tcp_intervals(
&self,
m: &Measurements,
connection_info: &Option<ConnectionInfo>,
) -> Vec<IntervalData> {
let mut intervals = Vec::with_capacity(m.intervals.len());
for interval in m.intervals.iter() {
let socket_fd = connection_info.as_ref().and_then(|c| c.socket_fd);
let start = interval.start.as_secs_f64();
let end = interval.end.as_secs_f64();
let seconds = (interval.end - interval.start).as_secs_f64();
let stream_stat = DetailedIntervalStats {
socket: socket_fd,
start,
end,
seconds,
bytes: interval.bytes,
bits_per_second: interval.bits_per_second,
tcp_stats: TcpStats::default(),
packets: None,
omitted: false,
sender: true,
};
let sum_stat = DetailedIntervalStats {
socket: socket_fd,
start,
end,
seconds,
bytes: interval.bytes,
bits_per_second: interval.bits_per_second,
tcp_stats: TcpStats::default(),
packets: None,
omitted: false,
sender: true,
};
intervals.push(IntervalData::Tcp {
streams: vec![stream_stat],
sum: sum_stat,
});
}
intervals
}
fn build_udp_intervals(
&self,
m: &Measurements,
connection_info: &Option<ConnectionInfo>,
) -> Vec<IntervalData> {
let mut intervals = Vec::with_capacity(m.intervals.len());
for interval in m.intervals.iter() {
let packets = if interval.packets == u64::MAX {
0
} else {
interval.packets
};
let socket_fd = connection_info.as_ref().and_then(|c| c.socket_fd);
let start = interval.start.as_secs_f64();
let end = interval.end.as_secs_f64();
let seconds = (interval.end - interval.start).as_secs_f64();
let stream_stat = UdpIntervalStats {
socket: socket_fd,
start,
end,
seconds,
bytes: interval.bytes,
bits_per_second: interval.bits_per_second,
packets,
omitted: false,
sender: true,
};
let sum_stat = UdpIntervalStats {
socket: socket_fd,
start,
end,
seconds,
bytes: interval.bytes,
bits_per_second: interval.bits_per_second,
packets,
omitted: false,
sender: true,
};
intervals.push(IntervalData::Udp {
streams: vec![stream_stat],
sum: sum_stat,
});
}
intervals
}
fn build_tcp_end_info(
&self,
m: &Measurements,
connection_info: &Option<ConnectionInfo>,
) -> TestEndInfo {
let total_duration = m.total_duration.as_secs_f64();
let sender_summary = StreamSummary {
socket: connection_info.as_ref().and_then(|c| c.socket_fd),
start: 0.0,
end: total_duration,
seconds: total_duration,
bytes: m.total_bytes_sent,
bits_per_second: m.total_bits_per_second(),
retransmits: 0,
max_snd_cwnd: None,
max_rtt: None,
min_rtt: None,
mean_rtt: None,
sender: true,
};
let receiver_summary = StreamSummary {
socket: connection_info.as_ref().and_then(|c| c.socket_fd),
start: 0.0,
end: total_duration,
seconds: total_duration,
bytes: m.total_bytes_received,
bits_per_second: if total_duration > 0.0 {
(m.total_bytes_received as f64 * 8.0) / total_duration
} else {
0.0
},
retransmits: 0,
max_snd_cwnd: None,
max_rtt: None,
min_rtt: None,
mean_rtt: None,
sender: true,
};
TestEndInfo::Tcp {
streams: vec![EndStreamInfo {
sender: sender_summary.clone(),
receiver: receiver_summary.clone(),
}],
sum_sent: Box::new(sender_summary),
sum_received: Box::new(receiver_summary),
cpu_utilization_percent: None,
sender_tcp_congestion: Some("cubic".to_string()),
receiver_tcp_congestion: Some("cubic".to_string()),
}
}
fn build_udp_end_info(
&self,
m: &Measurements,
connection_info: &Option<ConnectionInfo>,
) -> TestEndInfo {
let total_duration = m.total_duration.as_secs_f64();
let (lost_packets, expected_packets) = self.calculate_udp_loss();
let lost_percent = if expected_packets > 0 {
(lost_packets as f64 / expected_packets as f64) * 100.0
} else {
0.0
};
let udp_summary = UdpStreamSummary {
socket: connection_info.as_ref().and_then(|c| c.socket_fd),
start: 0.0,
end: total_duration,
seconds: total_duration,
bytes: m.total_bytes_sent + m.total_bytes_received,
bits_per_second: if total_duration > 0.0 {
((m.total_bytes_sent + m.total_bytes_received) as f64 * 8.0) / total_duration
} else {
0.0
},
jitter_ms: m.jitter_ms,
lost_packets,
packets: expected_packets,
lost_percent,
out_of_order: if m.out_of_order_packets > 0 {
Some(m.out_of_order_packets)
} else {
None
},
sender: m.total_bytes_sent > m.total_bytes_received,
};
let udp_sum = UdpSum {
start: 0.0,
end: total_duration,
seconds: total_duration,
bytes: m.total_bytes_sent + m.total_bytes_received,
bits_per_second: if total_duration > 0.0 {
((m.total_bytes_sent + m.total_bytes_received) as f64 * 8.0) / total_duration
} else {
0.0
},
jitter_ms: m.jitter_ms,
lost_packets,
packets: expected_packets,
lost_percent,
sender: m.total_bytes_sent > m.total_bytes_received,
};
TestEndInfo::Udp {
streams: vec![UdpEndStreamInfo { udp: udp_summary }],
sum: udp_sum,
cpu_utilization_percent: None,
}
}
}
impl Default for MeasurementsCollector {
fn default() -> Self {
Self::new()
}
}
pub fn get_system_info() -> SystemInfo {
SystemInfo {
version: format!("rperf3 {}", env!("CARGO_PKG_VERSION")),
system_info: format!(
"{} {} {}",
std::env::consts::OS,
std::env::consts::ARCH,
hostname::get()
.ok()
.and_then(|h| h.into_string().ok())
.unwrap_or_else(|| "unknown".to_string())
),
timestamp_str: chrono::Utc::now().to_rfc2822(),
timestamp: chrono::Utc::now().timestamp(),
}
}
#[cfg(target_os = "linux")]
pub fn get_connection_info(stream: &tokio::net::TcpStream) -> std::io::Result<ConnectionInfo> {
use std::os::unix::io::AsRawFd;
let local_addr = stream.local_addr()?;
let remote_addr = stream.peer_addr()?;
let fd = stream.as_raw_fd();
Ok(ConnectionInfo {
socket_fd: Some(fd),
local_host: local_addr.ip().to_string(),
local_port: local_addr.port(),
remote_host: remote_addr.ip().to_string(),
remote_port: remote_addr.port(),
})
}
#[cfg(not(target_os = "linux"))]
pub fn get_connection_info(stream: &tokio::net::TcpStream) -> std::io::Result<ConnectionInfo> {
let local_addr = stream.local_addr()?;
let remote_addr = stream.peer_addr()?;
Ok(ConnectionInfo {
socket_fd: None,
local_host: local_addr.ip().to_string(),
local_port: local_addr.port(),
remote_host: remote_addr.ip().to_string(),
remote_port: remote_addr.port(),
})
}
#[cfg(target_os = "linux")]
pub fn get_tcp_stats(stream: &tokio::net::TcpStream) -> std::io::Result<TcpStats> {
use std::mem;
use std::os::unix::io::AsRawFd;
let fd = stream.as_raw_fd();
#[repr(C)]
struct TcpInfo {
state: u8,
ca_state: u8,
retransmits: u8,
probes: u8,
backoff: u8,
options: u8,
snd_wscale: u8,
rcv_wscale: u8,
rto: u32,
ato: u32,
snd_mss: u32,
rcv_mss: u32,
unacked: u32,
sacked: u32,
lost: u32,
retrans: u32,
fackets: u32,
last_data_sent: u32,
last_ack_sent: u32,
last_data_recv: u32,
last_ack_recv: u32,
pmtu: u32,
rcv_ssthresh: u32,
rtt: u32,
rttvar: u32,
snd_ssthresh: u32,
snd_cwnd: u32,
advmss: u32,
reordering: u32,
rcv_rtt: u32,
rcv_space: u32,
total_retrans: u32,
}
const TCP_INFO: i32 = 11;
const SOL_TCP: i32 = 6;
let mut info: TcpInfo = unsafe { mem::zeroed() };
let mut len = mem::size_of::<TcpInfo>() as u32;
let result = unsafe {
libc::getsockopt(
fd,
SOL_TCP,
TCP_INFO,
&mut info as *mut _ as *mut libc::c_void,
&mut len as *mut u32,
)
};
if result == 0 {
let snd_cwnd_bytes = (info.snd_cwnd as u64) * (info.snd_mss as u64);
let mut flags = 0;
flags |= TCP_SND_CWND_PRESENT;
flags |= TCP_RTT_PRESENT;
flags |= TCP_RTTVAR_PRESENT;
flags |= TCP_PMTU_PRESENT;
Ok(TcpStats {
retransmits: info.total_retrans as u64,
snd_cwnd: snd_cwnd_bytes,
rtt: info.rtt as u64,
rttvar: info.rttvar as u64,
pmtu: info.pmtu as u64,
flags,
})
} else {
Ok(TcpStats::default())
}
}
#[cfg(not(target_os = "linux"))]
pub fn get_tcp_stats(_stream: &tokio::net::TcpStream) -> std::io::Result<TcpStats> {
Ok(TcpStats::default())
}