use aerosync::core::server::{FileReceiver, ServerConfig, WsEvent};
use futures::StreamExt;
use std::time::{Duration, Instant};
use tempfile::tempdir;
use tokio::time::timeout;
use tokio_tungstenite::{connect_async, tungstenite::Message};
async fn start_bench_server() -> (FileReceiver, u16) {
let dir = tempdir().unwrap();
let port = free_port();
let config = ServerConfig {
http_port: port,
quic_port: port + 1,
bind_address: "127.0.0.1".to_string(),
receive_directory: dir.path().to_path_buf(),
enable_http: true,
enable_quic: false,
enable_ws: true,
ws_event_buffer: 4096,
enable_metrics: false,
..ServerConfig::default()
};
let mut receiver = FileReceiver::new(config);
receiver.start().await.expect("server start");
tokio::time::sleep(Duration::from_millis(80)).await;
(receiver, port)
}
fn free_port() -> u16 {
std::net::TcpListener::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap()
.port()
}
fn percentiles_ms(mut samples: Vec<u128>) -> (f64, f64) {
if samples.is_empty() {
return (0.0, 0.0);
}
samples.sort_unstable();
let p50 = samples[samples.len() / 2] as f64 / 1_000_000.0;
let p99_idx = (samples.len() as f64 * 0.99) as usize;
let p99 = samples[p99_idx.min(samples.len() - 1)] as f64 / 1_000_000.0;
(p50, p99)
}
fn completed_event(name: &str) -> WsEvent {
WsEvent::Completed {
filename: name.to_string(),
size: 1024,
sha256: "deadbeef00000000".to_string(),
}
}
#[tokio::test]
async fn bench_broadcast_latency_vs_clients() {
let (receiver, port) = start_bench_server().await;
let ws_tx = receiver.ws_sender();
let url = format!("ws://127.0.0.1:{}/ws", port);
const ROUNDS: usize = 20;
eprintln!("\n━━━ Bench 1: Broadcast latency vs. concurrent clients ━━━");
eprintln!("{:>8} {:>10} {:>10}", "clients", "p50 (ms)", "p99 (ms)");
eprintln!("{}", "─".repeat(32));
for &n_clients in &[1usize, 10, 50, 100] {
let mut readers = Vec::with_capacity(n_clients);
for _ in 0..n_clients {
let (ws, _) = connect_async(&url).await.expect("connect");
let (_, r) = ws.split();
readers.push(r);
}
tokio::time::sleep(Duration::from_millis(20)).await;
let mut latencies_ns: Vec<u128> = Vec::with_capacity(ROUNDS);
for i in 0..ROUNDS {
let t0 = Instant::now();
ws_tx.send(completed_event(&format!("f{i}"))).unwrap();
let mut futs = Vec::new();
for r in &mut readers {
futs.push(timeout(Duration::from_secs(3), r.next()));
}
for fut in futs {
fut.await
.expect("timeout")
.expect("stream ended")
.expect("ws error");
}
latencies_ns.push(t0.elapsed().as_nanos());
}
let (p50, p99) = percentiles_ms(latencies_ns);
eprintln!("{:>8} {:>9.2}ms {:>9.2}ms", n_clients, p50, p99);
drop(readers);
tokio::time::sleep(Duration::from_millis(30)).await;
}
eprintln!();
}
#[tokio::test]
async fn bench_event_throughput_single_client() {
let (receiver, port) = start_bench_server().await;
let ws_tx = receiver.ws_sender();
let url = format!("ws://127.0.0.1:{}/ws", port);
const N: usize = 1000;
let (ws, _) = connect_async(&url).await.expect("connect");
let (_, mut read) = ws.split();
tokio::time::sleep(Duration::from_millis(20)).await;
let t0 = Instant::now();
for i in 0..N {
ws_tx.send(completed_event(&format!("f{i}"))).unwrap();
}
let mut received = 0usize;
while received < N {
match timeout(Duration::from_secs(10), read.next()).await {
Ok(Some(Ok(Message::Text(_)))) => received += 1,
Ok(Some(Ok(_))) => {}
other => panic!("unexpected at msg {}: {:?}", received, other),
}
}
let elapsed = t0.elapsed();
let throughput = N as f64 / elapsed.as_secs_f64();
let avg_us = elapsed.as_micros() as f64 / N as f64;
eprintln!(
"\n━━━ Bench 2: Event throughput (single client, {} events) ━━━",
N
);
eprintln!(" Total time : {:.2}ms", elapsed.as_millis());
eprintln!(" Throughput : {:.0} events/s", throughput);
eprintln!(" Avg latency: {:.1}μs / event", avg_us);
assert!(
elapsed < Duration::from_secs(3),
"throughput too low: {} events took {:?}",
N,
elapsed
);
}
#[tokio::test]
async fn bench_concurrent_connect_time() {
let (_receiver, port) = start_bench_server().await;
let url = format!("ws://127.0.0.1:{}/ws", port);
const N: usize = 100;
let t0 = Instant::now();
let handles: Vec<_> = (0..N)
.map(|_| {
let u = url.clone();
tokio::spawn(async move { connect_async(&u).await.expect("connect") })
})
.collect();
let mut streams = Vec::with_capacity(N);
for h in handles {
streams.push(h.await.unwrap());
}
let elapsed = t0.elapsed();
let rate = N as f64 / elapsed.as_secs_f64();
eprintln!("\n━━━ Bench 3: Concurrent connect ({} clients) ━━━", N);
eprintln!(" Total time : {:.2}ms", elapsed.as_millis());
eprintln!(" Connect rate: {:.0} conn/s", rate);
eprintln!(
" Avg per conn: {:.2}ms",
elapsed.as_millis() as f64 / N as f64
);
assert!(
elapsed < Duration::from_secs(2),
"{} connections took {:?}",
N,
elapsed
);
drop(streams);
}
#[tokio::test]
async fn bench_broadcast_with_slow_consumer() {
let (receiver, port) = start_bench_server().await;
let ws_tx = receiver.ws_sender();
let url = format!("ws://127.0.0.1:{}/ws", port);
const N: usize = 50;
let (fast_ws, _) = connect_async(&url).await.expect("fast connect");
let (_, mut fast_read) = fast_ws.split();
let (_slow_ws, _) = connect_async(&url).await.expect("slow connect");
tokio::time::sleep(Duration::from_millis(20)).await;
let mut latencies_ns = Vec::with_capacity(N);
for i in 0..N {
let t0 = Instant::now();
ws_tx.send(completed_event(&format!("f{i}"))).unwrap();
timeout(Duration::from_secs(3), fast_read.next())
.await
.expect("fast client timeout")
.expect("stream ended")
.expect("ws error");
latencies_ns.push(t0.elapsed().as_nanos());
}
let (p50, p99) = percentiles_ms(latencies_ns);
eprintln!("\n━━━ Bench 4: Fast client latency with slow consumer ━━━");
eprintln!(" Events : {}", N);
eprintln!(" p50 : {:.2}ms", p50);
eprintln!(" p99 : {:.2}ms", p99);
assert!(
p99 < 50.0,
"slow consumer caused p99={:.2}ms on fast client (expected < 50ms)",
p99
);
}