use crate::buffer_pool::BufferPool;
use crate::config::{Config, Protocol};
use crate::interval_reporter::{run_reporter_task, IntervalReport, IntervalReporter};
use crate::measurements::{
get_connection_info, get_system_info, get_tcp_stats, IntervalStats, MeasurementsCollector,
TestConfig,
};
use crate::protocol::{deserialize_message, serialize_message, Message, DEFAULT_STREAM_ID};
use crate::{Error, Result};
use log::{debug, error, info};
use socket2::SockRef;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpStream, UdpSocket};
use tokio::time;
use tokio_util::sync::CancellationToken;
fn configure_tcp_socket(stream: &TcpStream) -> Result<()> {
stream.set_nodelay(true).map_err(|e| {
Error::Io(std::io::Error::new(
e.kind(),
format!("Failed to set TCP_NODELAY: {}", e),
))
})?;
const BUFFER_SIZE: usize = 256 * 1024; let sock_ref = SockRef::from(stream);
sock_ref.set_send_buffer_size(BUFFER_SIZE).map_err(|e| {
Error::Io(std::io::Error::new(
e.kind(),
format!("Failed to set send buffer size: {}", e),
))
})?;
sock_ref.set_recv_buffer_size(BUFFER_SIZE).map_err(|e| {
Error::Io(std::io::Error::new(
e.kind(),
format!("Failed to set recv buffer size: {}", e),
))
})?;
debug!(
"TCP socket configured: TCP_NODELAY=true, buffers={}KB",
BUFFER_SIZE / 1024
);
Ok(())
}
fn configure_udp_socket(socket: &UdpSocket) -> Result<()> {
const BUFFER_SIZE: usize = 2 * 1024 * 1024; let sock_ref = SockRef::from(socket);
sock_ref.set_send_buffer_size(BUFFER_SIZE).map_err(|e| {
Error::Io(std::io::Error::new(
e.kind(),
format!("Failed to set UDP send buffer size: {}", e),
))
})?;
sock_ref.set_recv_buffer_size(BUFFER_SIZE).map_err(|e| {
Error::Io(std::io::Error::new(
e.kind(),
format!("Failed to set UDP recv buffer size: {}", e),
))
})?;
debug!(
"UDP socket configured: buffers={}MB",
BUFFER_SIZE / (1024 * 1024)
);
Ok(())
}
#[derive(Debug, Clone)]
pub enum ProgressEvent {
TestStarted,
IntervalUpdate {
interval_start: Duration,
interval_end: Duration,
bytes: u64,
bits_per_second: f64,
packets: Option<u64>,
jitter_ms: Option<f64>,
lost_packets: Option<u64>,
lost_percent: Option<f64>,
retransmits: Option<u64>,
},
TestCompleted {
total_bytes: u64,
duration: Duration,
bits_per_second: f64,
total_packets: Option<u64>,
jitter_ms: Option<f64>,
lost_packets: Option<u64>,
lost_percent: Option<f64>,
out_of_order: Option<u64>,
},
Error(String),
}
pub trait ProgressCallback: Send + Sync {
fn on_progress(&self, event: ProgressEvent);
}
impl<F> ProgressCallback for F
where
F: Fn(ProgressEvent) + Send + Sync,
{
fn on_progress(&self, event: ProgressEvent) {
self(event)
}
}
type CallbackRef = Arc<dyn ProgressCallback>;
pub struct Client {
config: Config,
measurements: MeasurementsCollector,
callback: Option<CallbackRef>,
tcp_buffer_pool: Arc<BufferPool>,
udp_buffer_pool: Arc<BufferPool>,
cancellation_token: CancellationToken,
stream_id: usize,
}
impl Client {
pub fn new(config: Config) -> Result<Self> {
if config.server_addr.is_none() {
return Err(Error::Config(
"Server address is required for client mode".to_string(),
));
}
let tcp_pool_size = config.parallel * 2; let tcp_buffer_pool = Arc::new(BufferPool::new(config.buffer_size, tcp_pool_size));
let udp_buffer_pool = Arc::new(BufferPool::new(65536, 10));
Ok(Self {
config,
measurements: MeasurementsCollector::new(),
callback: None,
tcp_buffer_pool,
udp_buffer_pool,
cancellation_token: CancellationToken::new(),
stream_id: DEFAULT_STREAM_ID, })
}
pub fn with_callback<C: ProgressCallback + 'static>(mut self, callback: C) -> Self {
self.callback = Some(Arc::new(callback));
self
}
fn notify(&self, event: ProgressEvent) {
if let Some(callback) = &self.callback {
callback.on_progress(event);
}
}
pub fn cancellation_token(&self) -> &CancellationToken {
&self.cancellation_token
}
pub async fn run(&self) -> Result<()> {
let server_addr = self
.config
.server_addr
.as_ref()
.ok_or_else(|| Error::Config("Server address not set".to_string()))?;
let full_addr = format!("{}:{}", server_addr, self.config.port);
info!("Connecting to rperf3 server at {}", full_addr);
match self.config.protocol {
Protocol::Tcp => self.run_tcp(&full_addr).await,
Protocol::Udp => self.run_udp(&full_addr).await,
}
}
async fn run_tcp(&self, server_addr: &str) -> Result<()> {
let mut stream = TcpStream::connect(server_addr).await?;
info!("Connected to {}", server_addr);
configure_tcp_socket(&stream)?;
if !self.config.json {
let local_addr = stream.local_addr()?;
let remote_addr = stream.peer_addr()?;
println!(
"Connecting to host {}, port {}",
remote_addr.ip(),
remote_addr.port()
);
println!(
"[{:3}] local {} port {} connected to {} port {}",
self.stream_id,
local_addr.ip(),
local_addr.port(),
remote_addr.ip(),
remote_addr.port()
);
}
let connection_info = get_connection_info(&stream).ok();
let system_info = Some(get_system_info());
let setup = Message::setup(
self.config.protocol.as_str().to_string(),
self.config.duration,
self.config.bandwidth,
self.config.buffer_size,
self.config.parallel,
self.config.reverse,
);
let setup_bytes = serialize_message(&setup)?;
stream.write_all(&setup_bytes).await?;
stream.flush().await?;
let ack_msg = deserialize_message(&mut stream).await?;
match ack_msg {
Message::SetupAck { port, cookie } => {
debug!("Received setup ack: port={}, cookie={}", port, cookie);
}
Message::Error { message } => {
return Err(Error::Protocol(format!("Server error: {}", message)));
}
_ => {
return Err(Error::Protocol("Expected SetupAck message".to_string()));
}
}
let start_msg = deserialize_message(&mut stream).await?;
match start_msg {
Message::Start { .. } => {
info!("Test started");
self.notify(ProgressEvent::TestStarted);
}
_ => {
return Err(Error::Protocol("Expected Start message".to_string()));
}
}
self.measurements.set_start_time(Instant::now());
if !self.config.json {
if self.config.reverse {
println!("[ ID] Interval Transfer Bitrate Retr");
} else {
println!("[ ID] Interval Transfer Bitrate Retr Cwnd");
}
}
if self.config.reverse {
receive_data(
&mut stream,
self.stream_id,
&self.measurements,
&self.config,
&self.callback,
self.tcp_buffer_pool.clone(),
&self.cancellation_token,
)
.await?;
} else {
send_data(
&mut stream,
self.stream_id,
&self.measurements,
&self.config,
&self.callback,
self.tcp_buffer_pool.clone(),
&self.cancellation_token,
)
.await?;
}
match deserialize_message(&mut stream).await {
Ok(result_msg) => match result_msg {
Message::Result {
stream_id,
bytes_sent,
bytes_received,
duration: _,
bits_per_second,
..
} => {
info!(
"Stream {}: {} bytes sent, {} bytes received, {:.2} Mbps",
stream_id,
bytes_sent,
bytes_received,
bits_per_second / 1_000_000.0
);
}
_ => {
debug!("Unexpected message, continuing");
}
},
Err(e) => {
debug!(
"Could not read result message (connection may be closed): {}",
e
);
}
}
match deserialize_message(&mut stream).await {
Ok(done_msg) => match done_msg {
Message::Done => {
info!("Test completed");
}
_ => {
debug!("Expected Done message");
}
},
Err(e) => {
debug!(
"Could not read done message (connection may be closed): {}",
e
);
info!("Test completed");
}
}
let final_measurements = self.measurements.get();
self.notify(ProgressEvent::TestCompleted {
total_bytes: final_measurements.total_bytes_sent
+ final_measurements.total_bytes_received,
duration: final_measurements.total_duration,
bits_per_second: final_measurements.total_bits_per_second(),
total_packets: None, jitter_ms: None,
lost_packets: None,
lost_percent: None,
out_of_order: None,
});
if !self.config.json {
print_results(&final_measurements, self.stream_id, self.config.reverse);
} else {
let test_config = TestConfig {
protocol: self.config.protocol.as_str().to_string(),
num_streams: self.config.parallel,
blksize: self.config.buffer_size,
omit: 0,
duration: self.config.duration.as_secs(),
reverse: self.config.reverse,
};
let detailed_results =
self.measurements
.get_detailed_results(connection_info, system_info, test_config);
let json = serde_json::to_string_pretty(&detailed_results)?;
println!("{}", json);
}
Ok(())
}
async fn run_udp(&self, server_addr: &str) -> Result<()> {
let mut control_stream = TcpStream::connect(server_addr).await?;
configure_tcp_socket(&control_stream)?;
let setup = Message::setup(
self.config.protocol.as_str().to_string(),
self.config.duration,
self.config.bandwidth,
self.config.buffer_size,
self.config.parallel,
self.config.reverse,
);
let setup_bytes = serialize_message(&setup)?;
control_stream.write_all(&setup_bytes).await?;
control_stream.flush().await?;
let ack_msg = deserialize_message(&mut control_stream).await?;
match ack_msg {
Message::SetupAck { port, cookie } => {
debug!("Received setup ack: port={}, cookie={}", port, cookie);
}
Message::Error { message } => {
return Err(Error::Protocol(format!("Server error: {}", message)));
}
_ => {
return Err(Error::Protocol("Expected SetupAck message".to_string()));
}
}
let start_msg = deserialize_message(&mut control_stream).await?;
match start_msg {
Message::Start { .. } => {
info!("Test started");
self.notify(ProgressEvent::TestStarted);
}
_ => {
return Err(Error::Protocol("Expected Start message".to_string()));
}
}
let socket = UdpSocket::bind("0.0.0.0:0").await?;
socket.connect(server_addr).await?;
configure_udp_socket(&socket)?;
info!("UDP client connected to {}", server_addr);
if !self.config.json {
let local_addr = socket.local_addr()?;
let remote_addr = socket.peer_addr()?;
println!(
"Connecting to host {}, port {}",
remote_addr.ip(),
remote_addr.port()
);
println!(
"[{:3}] local {} port {} connected to {} port {}",
self.stream_id,
local_addr.ip(),
local_addr.port(),
remote_addr.ip(),
remote_addr.port()
);
println!("[ ID] Interval Transfer Bitrate Total Datagrams");
}
let result = if self.config.reverse {
let init_packet = crate::udp_packet::create_packet(0, 0);
socket.send(&init_packet).await?;
self.run_udp_receive(socket).await
} else {
self.run_udp_send(socket).await
};
drop(control_stream);
result
}
async fn run_udp_send(&self, socket: UdpSocket) -> Result<()> {
#[cfg(target_os = "linux")]
return self.run_udp_send_batched(socket).await;
#[cfg(not(target_os = "linux"))]
return self.run_udp_send_standard(socket).await;
}
#[cfg_attr(target_os = "linux", allow(dead_code))]
async fn run_udp_send_standard(&self, socket: UdpSocket) -> Result<()> {
let (reporter, receiver) = IntervalReporter::new();
let reporter_task = tokio::spawn(run_reporter_task(
receiver,
self.config.json,
self.callback.clone(),
));
let start = Instant::now();
let mut last_interval = start;
let mut interval_bytes = 0u64;
let mut interval_packets = 0u64;
let mut sequence = 0u64;
let payload_size = if self.config.buffer_size > crate::udp_packet::UdpPacketHeader::SIZE {
self.config.buffer_size - crate::udp_packet::UdpPacketHeader::SIZE
} else {
1024
};
let mut token_bucket = self
.config
.bandwidth
.map(|bw| crate::token_bucket::TokenBucket::new(bw / 8));
while start.elapsed() < self.config.duration {
if self.cancellation_token.is_cancelled() {
info!("Test cancelled by user");
break;
}
let packet = crate::udp_packet::create_packet_fast(sequence, payload_size);
match socket.send(&packet).await {
Ok(n) => {
self.measurements.record_bytes_sent(0, n as u64);
self.measurements.record_udp_packet(0);
interval_bytes += n as u64;
interval_packets += 1;
sequence += 1;
if let Some(ref mut bucket) = token_bucket {
bucket.consume(n).await;
}
if last_interval.elapsed() >= self.config.interval {
let elapsed = start.elapsed();
let interval_duration = last_interval.elapsed();
let bps = (interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
let interval_start = if elapsed > interval_duration {
elapsed - interval_duration
} else {
Duration::ZERO
};
self.measurements.add_interval(IntervalStats {
start: interval_start,
end: elapsed,
bytes: interval_bytes,
bits_per_second: bps,
packets: interval_packets,
});
let (lost, expected) = self.measurements.calculate_udp_loss();
let loss_percent = if expected > 0 {
(lost as f64 / expected as f64) * 100.0
} else {
0.0
};
let measurements = self.measurements.get();
reporter.report(IntervalReport {
stream_id: self.stream_id,
interval_start,
interval_end: elapsed,
bytes: interval_bytes,
bits_per_second: bps,
packets: Some(interval_packets),
jitter_ms: Some(measurements.jitter_ms),
lost_packets: Some(lost),
lost_percent: Some(loss_percent),
retransmits: None,
cwnd: None,
});
interval_bytes = 0;
interval_packets = 0;
last_interval = Instant::now();
}
}
Err(e) => {
error!("Error sending UDP packet: {}", e);
break;
}
}
}
reporter.complete();
let _ = reporter_task.await;
self.measurements.set_duration(start.elapsed());
let final_measurements = self.measurements.get();
let (lost, expected) = self.measurements.calculate_udp_loss();
let loss_percent = if expected > 0 {
(lost as f64 / expected as f64) * 100.0
} else {
0.0
};
self.notify(ProgressEvent::TestCompleted {
total_bytes: final_measurements.total_bytes_sent
+ final_measurements.total_bytes_received,
duration: final_measurements.total_duration,
bits_per_second: final_measurements.total_bits_per_second(),
total_packets: Some(final_measurements.total_packets),
jitter_ms: Some(final_measurements.jitter_ms),
lost_packets: Some(lost),
lost_percent: Some(loss_percent),
out_of_order: Some(final_measurements.out_of_order_packets),
});
if !self.config.json {
print_results(&final_measurements, self.stream_id, self.config.reverse);
} else {
let system_info = Some(get_system_info());
let test_config = TestConfig {
protocol: self.config.protocol.as_str().to_string(),
num_streams: self.config.parallel,
blksize: self.config.buffer_size,
omit: 0,
duration: self.config.duration.as_secs(),
reverse: self.config.reverse,
};
let detailed_results = self.measurements.get_detailed_results(
None, system_info,
test_config,
);
let json = serde_json::to_string_pretty(&detailed_results)?;
println!("{}", json);
}
Ok(())
}
#[cfg(target_os = "linux")]
async fn run_udp_send_batched(&self, socket: UdpSocket) -> Result<()> {
use crate::batch_socket::{UdpSendBatch, MAX_BATCH_SIZE};
let (reporter, receiver) = IntervalReporter::new();
let reporter_task = tokio::spawn(run_reporter_task(
receiver,
self.config.json,
self.callback.clone(),
));
let start = Instant::now();
let mut last_interval = start;
let mut interval_bytes = 0u64;
let mut interval_packets = 0u64;
let mut sequence = 0u64;
let payload_size = if self.config.buffer_size > crate::udp_packet::UdpPacketHeader::SIZE {
self.config.buffer_size - crate::udp_packet::UdpPacketHeader::SIZE
} else {
1024
};
let mut token_bucket = self
.config
.bandwidth
.map(|bw| crate::token_bucket::TokenBucket::new(bw / 8));
let mut batch = UdpSendBatch::new();
let remote_addr = socket.peer_addr()?;
let adaptive_batch_size = if let Some(ref bucket) = token_bucket {
let target_bps = bucket.bytes_per_sec;
let packets_per_sec = target_bps / payload_size as u64;
if packets_per_sec < 1000 {
(MAX_BATCH_SIZE / 4).max(4)
} else if packets_per_sec < 10000 {
MAX_BATCH_SIZE / 2
} else {
MAX_BATCH_SIZE
}
} else {
MAX_BATCH_SIZE
};
while start.elapsed() < self.config.duration {
if self.cancellation_token.is_cancelled() {
info!("Test cancelled by user");
break;
}
while !batch.is_full()
&& batch.len() < adaptive_batch_size
&& start.elapsed() < self.config.duration
{
let packet = crate::udp_packet::create_packet_fast(sequence, payload_size);
batch.add(packet, remote_addr);
sequence += 1;
}
if !batch.is_empty() {
match batch.send(&socket).await {
Ok((bytes_sent, packets_sent)) => {
self.measurements.record_bytes_sent(0, bytes_sent as u64);
for _ in 0..packets_sent {
self.measurements.record_udp_packet(0);
}
interval_bytes += bytes_sent as u64;
interval_packets += packets_sent as u64;
if let Some(ref mut bucket) = token_bucket {
bucket.consume(bytes_sent).await;
}
if last_interval.elapsed() >= self.config.interval {
let elapsed = start.elapsed();
let interval_duration = last_interval.elapsed();
let bps =
(interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
let interval_start = if elapsed > interval_duration {
elapsed - interval_duration
} else {
Duration::ZERO
};
self.measurements.add_interval(IntervalStats {
start: interval_start,
end: elapsed,
bytes: interval_bytes,
bits_per_second: bps,
packets: interval_packets,
});
let (lost, expected) = self.measurements.calculate_udp_loss();
let loss_percent = if expected > 0 {
(lost as f64 / expected as f64) * 100.0
} else {
0.0
};
let measurements = self.measurements.get();
reporter.report(IntervalReport {
stream_id: self.stream_id,
interval_start,
interval_end: elapsed,
bytes: interval_bytes,
bits_per_second: bps,
packets: Some(interval_packets),
jitter_ms: Some(measurements.jitter_ms),
lost_packets: Some(lost),
lost_percent: Some(loss_percent),
retransmits: None,
cwnd: None,
});
interval_bytes = 0;
interval_packets = 0;
last_interval = Instant::now();
}
}
Err(e) => {
error!("Error sending batch: {}", e);
break;
}
}
}
}
reporter.complete();
let _ = reporter_task.await;
self.measurements.set_duration(start.elapsed());
let final_measurements = self.measurements.get();
let (lost, expected) = self.measurements.calculate_udp_loss();
let loss_percent = if expected > 0 {
(lost as f64 / expected as f64) * 100.0
} else {
0.0
};
self.notify(ProgressEvent::TestCompleted {
total_bytes: final_measurements.total_bytes_sent
+ final_measurements.total_bytes_received,
duration: final_measurements.total_duration,
bits_per_second: final_measurements.total_bits_per_second(),
total_packets: Some(final_measurements.total_packets),
jitter_ms: Some(final_measurements.jitter_ms),
lost_packets: Some(lost),
lost_percent: Some(loss_percent),
out_of_order: Some(final_measurements.out_of_order_packets),
});
if !self.config.json {
print_results(&final_measurements, self.stream_id, self.config.reverse);
} else {
let system_info = Some(get_system_info());
let test_config = TestConfig {
protocol: self.config.protocol.as_str().to_string(),
num_streams: self.config.parallel,
blksize: self.config.buffer_size,
omit: 0,
duration: self.config.duration.as_secs(),
reverse: self.config.reverse,
};
let detailed_results = self.measurements.get_detailed_results(
None, system_info,
test_config,
);
let json = serde_json::to_string_pretty(&detailed_results)?;
println!("{}", json);
}
Ok(())
}
async fn run_udp_receive(&self, socket: UdpSocket) -> Result<()> {
let (reporter, receiver) = IntervalReporter::new();
let reporter_task = tokio::spawn(run_reporter_task(
receiver,
self.config.json,
self.callback.clone(),
));
let start = Instant::now();
let mut last_interval = start;
let mut interval_bytes = 0u64;
let mut interval_packets = 0u64;
let mut buffer = self.udp_buffer_pool.get();
while start.elapsed() < self.config.duration {
if self.cancellation_token.is_cancelled() {
info!("Test cancelled by user");
break;
}
let timeout =
tokio::time::timeout(Duration::from_millis(100), socket.recv(&mut buffer));
match timeout.await {
Ok(Ok(n)) => {
if let Some((header, _payload)) = crate::udp_packet::parse_packet(&buffer[..n])
{
let recv_timestamp_us = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_micros() as u64;
self.measurements.record_udp_packet_received(
header.sequence,
header.timestamp_us,
recv_timestamp_us,
);
}
self.measurements.record_bytes_received(0, n as u64);
interval_bytes += n as u64;
interval_packets += 1;
if last_interval.elapsed() >= self.config.interval {
let elapsed = start.elapsed();
let interval_duration = last_interval.elapsed();
let bps = (interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
let interval_start = if elapsed > interval_duration {
elapsed - interval_duration
} else {
Duration::ZERO
};
self.measurements.add_interval(IntervalStats {
start: interval_start,
end: elapsed,
bytes: interval_bytes,
bits_per_second: bps,
packets: interval_packets,
});
let (lost, expected) = self.measurements.calculate_udp_loss();
let loss_percent = if expected > 0 {
(lost as f64 / expected as f64) * 100.0
} else {
0.0
};
let measurements = self.measurements.get();
reporter.report(IntervalReport {
stream_id: self.stream_id,
interval_start,
interval_end: elapsed,
bytes: interval_bytes,
bits_per_second: bps,
packets: Some(interval_packets),
jitter_ms: Some(measurements.jitter_ms),
lost_packets: Some(lost),
lost_percent: Some(loss_percent),
retransmits: None,
cwnd: None,
});
interval_bytes = 0;
interval_packets = 0;
last_interval = Instant::now();
}
}
Ok(Err(e)) => {
error!("Error receiving UDP packet: {}", e);
break;
}
Err(_) => {
continue;
}
}
}
reporter.complete();
let _ = reporter_task.await;
self.measurements.set_duration(start.elapsed());
let final_measurements = self.measurements.get();
let (lost, expected) = self.measurements.calculate_udp_loss();
let loss_percent = if expected > 0 {
(lost as f64 / expected as f64) * 100.0
} else {
0.0
};
self.notify(ProgressEvent::TestCompleted {
total_bytes: final_measurements.total_bytes_sent
+ final_measurements.total_bytes_received,
duration: final_measurements.total_duration,
bits_per_second: final_measurements.total_bits_per_second(),
total_packets: Some(final_measurements.total_packets),
jitter_ms: Some(final_measurements.jitter_ms),
lost_packets: Some(lost),
lost_percent: Some(loss_percent),
out_of_order: Some(final_measurements.out_of_order_packets),
});
if !self.config.json {
print_results(&final_measurements, self.stream_id, self.config.reverse);
} else {
let system_info = Some(get_system_info());
let test_config = TestConfig {
protocol: self.config.protocol.as_str().to_string(),
num_streams: self.config.parallel,
blksize: self.config.buffer_size,
omit: 0,
duration: self.config.duration.as_secs(),
reverse: self.config.reverse,
};
let detailed_results = self.measurements.get_detailed_results(
None, system_info,
test_config,
);
let json = serde_json::to_string_pretty(&detailed_results)?;
println!("{}", json);
}
Ok(())
}
pub fn get_measurements(&self) -> crate::Measurements {
self.measurements.get()
}
}
async fn send_data(
stream: &mut TcpStream,
stream_id: usize,
measurements: &MeasurementsCollector,
config: &Config,
callback: &Option<CallbackRef>,
buffer_pool: Arc<BufferPool>,
cancel_token: &CancellationToken,
) -> Result<()> {
let (reporter, receiver) = IntervalReporter::new();
let reporter_task = tokio::spawn(run_reporter_task(receiver, config.json, callback.clone()));
let buffer = buffer_pool.get();
let start = Instant::now();
let mut last_interval = start;
let mut interval_bytes = 0u64;
let mut last_retransmits = 0u64;
while start.elapsed() < config.duration {
if cancel_token.is_cancelled() {
info!("Test cancelled by user");
break;
}
match stream.write(&buffer).await {
Ok(n) => {
measurements.record_bytes_sent(stream_id, n as u64);
interval_bytes += n as u64;
if last_interval.elapsed() >= config.interval {
let elapsed = start.elapsed();
let interval_duration = last_interval.elapsed();
let bps = (interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
let interval_start = if elapsed > interval_duration {
elapsed - interval_duration
} else {
Duration::ZERO
};
let tcp_stats = get_tcp_stats(stream).ok();
let current_retransmits =
tcp_stats.as_ref().map(|s| s.retransmits).unwrap_or(0);
let interval_retransmits = current_retransmits.saturating_sub(last_retransmits);
last_retransmits = current_retransmits;
measurements.add_interval(IntervalStats {
start: interval_start,
end: elapsed,
bytes: interval_bytes,
bits_per_second: bps,
packets: u64::MAX,
});
let cwnd_kbytes = tcp_stats
.as_ref()
.and_then(|s| s.snd_cwnd_opt())
.map(|cwnd| cwnd / 1024);
reporter.report(IntervalReport {
stream_id,
interval_start,
interval_end: elapsed,
bytes: interval_bytes,
bits_per_second: bps,
packets: None,
jitter_ms: None,
lost_packets: None,
lost_percent: None,
retransmits: if interval_retransmits > 0 {
Some(interval_retransmits)
} else {
None
},
cwnd: cwnd_kbytes,
});
interval_bytes = 0;
last_interval = Instant::now();
}
}
Err(e) => {
error!("Error sending data: {}", e);
break;
}
}
}
reporter.complete();
let _ = reporter_task.await;
measurements.set_duration(start.elapsed());
stream.flush().await?;
Ok(())
}
async fn receive_data(
stream: &mut TcpStream,
stream_id: usize,
measurements: &MeasurementsCollector,
config: &Config,
callback: &Option<CallbackRef>,
buffer_pool: Arc<BufferPool>,
cancel_token: &CancellationToken,
) -> Result<()> {
let (reporter, receiver) = IntervalReporter::new();
let reporter_task = tokio::spawn(run_reporter_task(receiver, config.json, callback.clone()));
let mut buffer = buffer_pool.get();
let start = Instant::now();
let mut last_interval = start;
let mut interval_bytes = 0u64;
let mut last_retransmits = 0u64;
while start.elapsed() < config.duration {
if cancel_token.is_cancelled() {
info!("Test cancelled by user");
break;
}
match time::timeout(Duration::from_millis(100), stream.read(&mut buffer)).await {
Ok(Ok(0)) => {
break;
}
Ok(Ok(n)) => {
measurements.record_bytes_received(stream_id, n as u64);
interval_bytes += n as u64;
if last_interval.elapsed() >= config.interval {
let elapsed = start.elapsed();
let interval_duration = last_interval.elapsed();
let bps = (interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
let interval_start = if elapsed > interval_duration {
elapsed - interval_duration
} else {
Duration::ZERO
};
let tcp_stats = get_tcp_stats(stream).ok();
let current_retransmits =
tcp_stats.as_ref().map(|s| s.retransmits).unwrap_or(0);
let interval_retransmits = current_retransmits.saturating_sub(last_retransmits);
last_retransmits = current_retransmits;
measurements.add_interval(IntervalStats {
start: interval_start,
end: elapsed,
bytes: interval_bytes,
bits_per_second: bps,
packets: u64::MAX,
});
reporter.report(IntervalReport {
stream_id,
interval_start,
interval_end: elapsed,
bytes: interval_bytes,
bits_per_second: bps,
packets: None,
jitter_ms: None,
lost_packets: None,
lost_percent: None,
retransmits: if interval_retransmits > 0 {
Some(interval_retransmits)
} else {
None
},
cwnd: None, });
interval_bytes = 0;
last_interval = Instant::now();
}
}
Ok(Err(e)) => {
error!("Error receiving data: {}", e);
break;
}
Err(_) => {
if start.elapsed() >= config.duration {
break;
}
}
}
}
reporter.complete();
let _ = reporter_task.await;
measurements.set_duration(start.elapsed());
Ok(())
}
fn print_results(measurements: &crate::Measurements, stream_id: usize, _reverse: bool) {
let is_udp = measurements.total_packets > 0;
if !is_udp {
println!("- - - - - - - - - - - - - - - - - - - - - - - - -");
let duration = measurements.total_duration.as_secs_f64();
println!("[ ID] Interval Transfer Bitrate Retr");
let sent_bytes = measurements.total_bytes_sent;
let (sent_val, sent_unit) = if sent_bytes >= 1_000_000_000 {
(sent_bytes as f64 / 1_000_000_000.0, "GBytes")
} else {
(sent_bytes as f64 / 1_000_000.0, "MBytes")
};
let sent_bps = (sent_bytes as f64 * 8.0) / duration;
let (sent_bitrate_val, sent_bitrate_unit) = if sent_bps >= 1_000_000_000.0 {
(sent_bps / 1_000_000_000.0, "Gbits/sec")
} else {
(sent_bps / 1_000_000.0, "Mbits/sec")
};
println!(
"[{:3}] {:4.2}-{:4.2} sec {:6.2} {:>7} {:6.1} {:>10} {:4} sender",
stream_id,
0.0,
duration,
sent_val,
sent_unit,
sent_bitrate_val,
sent_bitrate_unit,
0 );
if measurements.total_bytes_received > 0 {
let recv_bytes = measurements.total_bytes_received;
let (recv_val, recv_unit) = if recv_bytes >= 1_000_000_000 {
(recv_bytes as f64 / 1_000_000_000.0, "GBytes")
} else {
(recv_bytes as f64 / 1_000_000.0, "MBytes")
};
let recv_bps = (recv_bytes as f64 * 8.0) / duration;
let (recv_bitrate_val, recv_bitrate_unit) = if recv_bps >= 1_000_000_000.0 {
(recv_bps / 1_000_000_000.0, "Gbits/sec")
} else {
(recv_bps / 1_000_000.0, "Mbits/sec")
};
println!(
"[{:3}] {:4.2}-{:4.2} sec {:6.2} {:>7} {:6.1} {:>10} receiver",
stream_id, 0.0, duration, recv_val, recv_unit, recv_bitrate_val, recv_bitrate_unit
);
}
println!();
} else {
println!("- - - - - - - - - - - - - - - - - - - - - - - - -");
let duration = measurements.total_duration.as_secs_f64();
let (lost, expected) = if measurements.total_bytes_received > 0 {
let (l, e) = measurements.calculate_udp_loss();
(l, e)
} else {
(0, measurements.total_packets)
};
let loss_percent = if expected > 0 {
(lost as f64 / expected as f64) * 100.0
} else {
0.0
};
println!(
"[ ID] Interval Transfer Bitrate Jitter Lost/Total Datagrams"
);
if measurements.total_bytes_sent > 0 {
let sent_bytes = measurements.total_bytes_sent;
let (sent_val, sent_unit) = if sent_bytes >= 1_000_000_000 {
(sent_bytes as f64 / 1_000_000_000.0, "GBytes")
} else if sent_bytes >= 1_000_000 {
(sent_bytes as f64 / 1_000_000.0, "MBytes")
} else {
(sent_bytes as f64 / 1_000.0, "KBytes")
};
let sent_bps = (sent_bytes as f64 * 8.0) / duration;
let (sent_bitrate_val, sent_bitrate_unit) = if sent_bps >= 1_000_000_000.0 {
(sent_bps / 1_000_000_000.0, "Gbits/sec")
} else {
(sent_bps / 1_000_000.0, "Mbits/sec")
};
println!(
"[{:3}] {:4.2}-{:4.2} sec {:6.2} {:>7} {:6.1} {:>10} {:6.3} ms {}/{} ({:.0}%) sender",
stream_id,
0.0,
duration,
sent_val,
sent_unit,
sent_bitrate_val,
sent_bitrate_unit,
0.0, lost,
expected,
loss_percent
);
}
if measurements.total_bytes_received > 0 {
let recv_bytes = measurements.total_bytes_received;
let (recv_val, recv_unit) = if recv_bytes >= 1_000_000_000 {
(recv_bytes as f64 / 1_000_000_000.0, "GBytes")
} else if recv_bytes >= 1_000_000 {
(recv_bytes as f64 / 1_000_000.0, "MBytes")
} else {
(recv_bytes as f64 / 1_000.0, "KBytes")
};
let recv_bps = (recv_bytes as f64 * 8.0) / duration;
let (recv_bitrate_val, recv_bitrate_unit) = if recv_bps >= 1_000_000_000.0 {
(recv_bps / 1_000_000_000.0, "Gbits/sec")
} else {
(recv_bps / 1_000_000.0, "Mbits/sec")
};
println!(
"[{:3}] {:4.2}-{:4.2} sec {:6.2} {:>7} {:6.1} {:>10} {:6.3} ms {}/{} ({:.0}%) receiver",
stream_id,
0.0,
duration,
recv_val,
recv_unit,
recv_bitrate_val,
recv_bitrate_unit,
measurements.jitter_ms,
lost,
expected,
loss_percent
);
}
println!();
}
}