use clap::Parser;
use rand::Rng as _;
use rand::SeedableRng;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Instant;
use tokio::net::UdpSocket;
use tokio::sync::mpsc;
use entrouter_line::relay::crypto::{self, TunnelCrypto};
use entrouter_line::relay::fec::{FecConfig, FecEncoder, LossTracker};
use entrouter_line::relay::wire;
#[derive(Parser)]
#[command(
name = "test-harness",
about = "End-to-end tunnel test with simulated loss"
)]
struct Args {
#[arg(long, default_value = "0.05")]
loss: f64,
#[arg(long, default_value = "100")]
blocks: usize,
#[arg(long, default_value = "4000")]
block_size: usize,
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let args = Args::parse();
let loss_rate = args.loss;
let num_blocks = args.blocks;
let block_size = args.block_size;
println!("=== Entrouter Line - End-to-End Test ===");
println!("Loss rate: {:.1}%", loss_rate * 100.0);
println!("Blocks: {num_blocks}");
println!("Block size: {block_size} bytes");
println!();
let key = crypto::generate_key();
let fec_config = FecConfig::for_loss_rate(loss_rate);
println!(
"FEC config: {} data + {} parity shards ({:.0}% overhead)",
fec_config.data_shards,
fec_config.parity_shards,
fec_config.overhead() * 100.0
);
let sender_sock = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let proxy_sock = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let receiver_sock = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let sender_addr = sender_sock.local_addr().unwrap();
let proxy_addr = proxy_sock.local_addr().unwrap();
let receiver_addr = receiver_sock.local_addr().unwrap();
println!("Sender: {sender_addr}");
println!("Proxy: {proxy_addr}");
println!("Receiver: {receiver_addr}");
println!();
let proxy_sock = Arc::new(proxy_sock);
let receiver_sock = Arc::new(receiver_sock);
let (result_tx, mut result_rx) = mpsc::channel::<BlockResult>(num_blocks + 1);
let proxy_sock_clone = proxy_sock.clone();
let proxy_handle = tokio::spawn(async move {
run_lossy_proxy(proxy_sock_clone, receiver_addr, loss_rate).await;
});
let recv_key = key;
let recv_sock = receiver_sock.clone();
let receiver_handle = tokio::spawn(async move {
run_receiver(recv_sock, recv_key, fec_config, num_blocks, result_tx).await;
});
let start = Instant::now();
let crypto = TunnelCrypto::new(&key);
let mut seq: u64 = 0;
let mut rng = rand::thread_rng();
for block_idx in 0..num_blocks {
let data: Vec<u8> = (0..block_size).map(|_| rng.r#gen()).collect();
let encoder = FecEncoder::new(fec_config);
let shard_size = data.len().div_ceil(fec_config.data_shards);
let mut shards: Vec<Vec<u8>> = data
.chunks(shard_size)
.map(|c| {
let mut s = c.to_vec();
s.resize(shard_size, 0);
s
})
.collect();
while shards.len() < fec_config.data_shards {
shards.push(vec![0u8; shard_size]);
}
encoder.encode(&mut shards);
for (i, shard) in shards.iter().enumerate() {
let ptype = if i < fec_config.data_shards {
wire::PACKET_DATA
} else {
wire::PACKET_PARITY
};
let mut payload = Vec::with_capacity(6 + shard.len());
payload.extend_from_slice(&(block_idx as u32).to_be_bytes());
payload.extend_from_slice(&(i as u16).to_be_bytes());
payload.extend_from_slice(shard);
let ciphertext = crypto.encrypt(seq, &payload);
let ct_len = ciphertext.len() as u16;
let mut frame = vec![0u8; wire::HEADER_SIZE + ciphertext.len()];
wire::encode_header(&mut frame, ptype, seq, ct_len);
frame[wire::HEADER_SIZE..].copy_from_slice(&ciphertext);
sender_sock.send_to(&frame, proxy_addr).await.unwrap();
seq = seq.wrapping_add(1);
}
}
let term_payload = b"DONE";
let ciphertext = crypto.encrypt(seq, term_payload);
let mut frame = vec![0u8; wire::HEADER_SIZE + ciphertext.len()];
wire::encode_header(
&mut frame,
wire::PACKET_CONTROL,
seq,
ciphertext.len() as u16,
);
frame[wire::HEADER_SIZE..].copy_from_slice(&ciphertext);
sender_sock.send_to(&frame, proxy_addr).await.unwrap();
let send_elapsed = start.elapsed();
let _ = receiver_handle.await;
proxy_handle.abort();
let total_elapsed = start.elapsed();
println!("\n=== Results ===");
let mut recovered = 0u64;
let mut failed = 0u64;
let mut total_shards_lost = 0u64;
while let Ok(result) = result_rx.try_recv() {
if result.success {
recovered += 1;
} else {
failed += 1;
}
total_shards_lost += result.shards_lost as u64;
}
let total_data = num_blocks as u64 * block_size as u64;
let total_shards_sent = num_blocks as u64 * fec_config.total_shards() as u64;
println!("Blocks sent: {num_blocks}");
println!("Blocks recovered: {recovered}");
println!("Blocks failed: {failed}");
println!("Total shards sent: {total_shards_sent}");
println!("Total shards lost: {total_shards_lost}");
println!(
"Actual loss rate: {:.2}%",
total_shards_lost as f64 / total_shards_sent as f64 * 100.0
);
println!("Data transferred: {total_data} bytes");
println!(
"Send time: {:.2}ms",
send_elapsed.as_secs_f64() * 1000.0
);
println!(
"Total time: {:.2}ms",
total_elapsed.as_secs_f64() * 1000.0
);
println!(
"Throughput: {:.2} MB/s",
total_data as f64 / total_elapsed.as_secs_f64() / 1_000_000.0
);
if failed == 0 {
println!(
"\n*** ZERO-LOSS: All blocks recovered despite {:.1}% packet loss ***",
loss_rate * 100.0
);
} else {
println!("\n!!! {failed} blocks could NOT be recovered - FEC config may need tuning !!!");
}
}
struct BlockResult {
success: bool,
shards_lost: usize,
}
async fn run_lossy_proxy(socket: Arc<UdpSocket>, target: SocketAddr, loss_rate: f64) {
let mut buf = [0u8; 2048];
let mut rng = rand::rngs::StdRng::from_entropy();
let mut total: u64 = 0;
let mut dropped: u64 = 0;
let mut sender_addr: Option<SocketAddr> = None;
loop {
let (len, from) = match socket.recv_from(&mut buf).await {
Ok(r) => r,
Err(_) => continue,
};
total += 1;
if from == target {
if let Some(addr) = sender_addr {
let _ = socket.send_to(&buf[..len], addr).await;
}
continue;
}
sender_addr = Some(from);
if rng.r#gen::<f64>() < loss_rate {
dropped += 1;
continue;
}
let _ = socket.send_to(&buf[..len], target).await;
if total.is_multiple_of(5000) {
tracing::debug!(
total,
dropped,
actual_loss = format!("{:.2}%", dropped as f64 / total as f64 * 100.0),
"proxy"
);
}
}
}
async fn run_receiver(
socket: Arc<UdpSocket>,
key: [u8; 32],
fec_config: FecConfig,
expected_blocks: usize,
result_tx: mpsc::Sender<BlockResult>,
) {
let crypto = TunnelCrypto::new(&key);
let mut buf = [0u8; 2048];
let mut loss_tracker = LossTracker::new(1000);
let mut expected_seq: u64 = 0;
let mut blocks: std::collections::HashMap<u32, Vec<Option<Vec<u8>>>> =
std::collections::HashMap::new();
let total_shards = fec_config.total_shards();
loop {
let recv = tokio::time::timeout(
std::time::Duration::from_secs(2),
socket.recv_from(&mut buf),
)
.await;
let (len, _from) = match recv {
Ok(Ok(r)) => r,
Ok(Err(_)) => continue,
Err(_) => break, };
if len < wire::HEADER_SIZE {
continue;
}
let (packet_type, seq, payload_len) = wire::decode_header(&buf);
let ct_end = wire::HEADER_SIZE + payload_len as usize;
if ct_end > len {
continue;
}
while expected_seq != seq {
loss_tracker.record(false);
expected_seq = expected_seq.wrapping_add(1);
}
loss_tracker.record(true);
expected_seq = expected_seq.wrapping_add(1);
let ciphertext = &buf[wire::HEADER_SIZE..ct_end];
let plaintext = match crypto.decrypt(seq, ciphertext) {
Ok(p) => p,
Err(_) => continue,
};
if packet_type == wire::PACKET_CONTROL && plaintext == b"DONE" {
break;
}
if plaintext.len() < 6 {
continue;
}
let block_idx =
u32::from_be_bytes([plaintext[0], plaintext[1], plaintext[2], plaintext[3]]);
let shard_idx = u16::from_be_bytes([plaintext[4], plaintext[5]]) as usize;
let shard_data = plaintext[6..].to_vec();
let block = blocks
.entry(block_idx)
.or_insert_with(|| vec![None; total_shards]);
if shard_idx < total_shards {
block[shard_idx] = Some(shard_data);
}
}
for block_idx in 0..expected_blocks as u32 {
let block = blocks
.entry(block_idx)
.or_insert_with(|| vec![None; total_shards]);
let shards_lost = block.iter().filter(|s| s.is_none()).count();
let encoder = FecEncoder::new(fec_config);
let success = encoder.reconstruct(block).is_ok();
let _ = result_tx
.send(BlockResult {
success,
shards_lost,
})
.await;
}
let rate = loss_tracker.loss_rate();
println!(
"Receiver: measured loss rate = {:.2}%, recommended FEC = {}+{}",
rate * 100.0,
loss_tracker.recommended_config().data_shards,
loss_tracker.recommended_config().parity_shards,
);
}