use network_protocol::protocol::message::Message;
use network_protocol::service::daemon;
use std::time::{Duration, Instant};
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio::time::sleep;
mod test_utils;
use test_utils::BenchmarkClient;
async fn setup_server(addr: &str) -> (JoinHandle<()>, oneshot::Sender<()>) {
let addr = addr.to_string();
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let server_handle = tokio::spawn(async move {
let server_task = tokio::spawn(async move {
#[allow(clippy::unwrap_used)]
daemon::start(&addr).await.unwrap();
});
let _ = shutdown_rx.await;
server_task.abort();
});
sleep(Duration::from_millis(500)).await;
(server_handle, shutdown_tx)
}
#[tokio::test]
async fn benchmark_roundtrip_latency() {
let addr = "127.0.0.1:7799";
let (server_handle, shutdown_tx) = setup_server(addr).await;
let mut client = match BenchmarkClient::connect(addr).await {
Ok(client) => {
println!("[benchmark] Client connected successfully");
client
}
Err(e) => {
#[allow(clippy::panic)]
{
panic!("Failed to connect: {e:?}");
}
}
};
let rounds = 50; let mut total = Duration::ZERO;
let mut successful = 0;
for i in 0..rounds {
if i % 10 == 0 {
sleep(Duration::from_millis(10)).await;
}
let start = Instant::now();
if let Err(e) = client.send(Message::Ping).await {
println!("Error sending ping message: {e:?}");
continue;
}
match tokio::time::timeout(Duration::from_millis(500), client.recv()).await {
Ok(Ok(response)) => {
match response {
Message::Pong => {
total += start.elapsed();
successful += 1;
}
_ => {
println!("Unexpected response type: {response:?}");
}
}
}
Ok(Err(e)) => {
println!("Error receiving response: {e:?}");
}
Err(_) => {
println!("Timeout waiting for response");
}
}
}
if successful > 0 {
let avg = total / successful;
println!(
"Average roundtrip latency over {successful} successful packets: {avg:?} per message"
);
} else {
println!("No successful ping-pong exchanges completed");
}
println!("[benchmark] Shutting down server...");
let _ = shutdown_tx.send(());
sleep(Duration::from_millis(100)).await;
let _ = tokio::time::timeout(Duration::from_secs(1), server_handle).await;
}
#[tokio::test]
async fn benchmark_throughput() {
let addr = "127.0.0.1:7798";
let (server_handle, shutdown_tx) = setup_server(addr).await;
let mut client = match BenchmarkClient::connect(addr).await {
Ok(client) => {
println!("[benchmark] Client connected successfully");
client
}
Err(e) => {
#[allow(clippy::panic)]
{
panic!("Failed to connect: {e:?}");
}
}
};
let rounds = 50;
let mut successful = 0;
let start = Instant::now();
for i in 0..rounds {
if i % 10 == 0 {
sleep(Duration::from_millis(20)).await;
}
if let Err(e) = client.send(Message::Ping).await {
println!("Error sending ping message: {e:?}");
continue;
}
match tokio::time::timeout(Duration::from_millis(500), client.recv()).await {
Ok(Ok(response)) => {
match response {
Message::Pong => {
successful += 1;
}
_ => {
println!("Unexpected response type: {response:?}");
}
}
}
Ok(Err(e)) => {
println!("Error receiving response: {e:?}");
}
Err(_) => {
println!("Timeout waiting for response");
}
}
}
let elapsed = start.elapsed();
if successful > 0 {
let per_sec = successful as f64 / elapsed.as_secs_f64();
println!("Throughput: {per_sec:.0} messages/sec ({successful} successful of {rounds} attempts) over {elapsed:?} total");
} else {
println!("No successful exchanges completed");
}
println!("[benchmark] Shutting down server...");
let _ = shutdown_tx.send(());
sleep(Duration::from_millis(100)).await;
let _ = tokio::time::timeout(Duration::from_secs(1), server_handle).await;
}