#![allow(clippy::manual_async_fn)]
use std::future::Future;
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream, UdpSocket};
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use futures_util::{AsyncReadExt, AsyncWriteExt};
use ringline::{
AsyncEventHandler, Config, ConnCtx, ConnStream, RinglineBuilder, UdpCtx, UdpSendError,
};
fn test_config() -> Config {
let mut cfg = Config::default();
cfg.worker.threads = 1;
cfg.worker.pin_to_core = false;
cfg.sq_entries = 2048;
cfg.recv_buffer.ring_size = 512;
cfg.recv_buffer.buffer_size = 16 * 1024;
cfg.max_connections = 64;
cfg.send_copy_count = 2048;
cfg.standalone_task_capacity = 32;
cfg
}
fn free_port() -> u16 {
std::net::TcpListener::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap()
.port()
}
fn free_udp_port() -> u16 {
std::net::UdpSocket::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap()
.port()
}
fn wait_for_tcp(addr: &str) {
for _ in 0..200 {
if TcpStream::connect(addr).is_ok() {
return;
}
std::thread::sleep(Duration::from_millis(10));
}
panic!("TCP server did not come up at {addr}");
}
fn tcp_echo_round_trip_std(payload: &[u8], chunk_size: usize) -> (Duration, Vec<u8>) {
let listener = TcpListener::bind("127.0.0.1:0").expect("std listen");
let addr = listener.local_addr().unwrap();
let stop = Arc::new(AtomicBool::new(false));
let stop_clone = stop.clone();
let server = std::thread::spawn(move || {
let (mut stream, _) = listener.accept().expect("std accept");
stream
.set_read_timeout(Some(Duration::from_secs(30)))
.unwrap();
let mut buf = vec![0u8; 64 * 1024];
while !stop_clone.load(Ordering::Relaxed) {
match stream.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
if stream.write_all(&buf[..n]).is_err() {
break;
}
}
Err(_) => break,
}
}
});
let addr_str = addr.to_string();
let result = run_tcp_echo_chunked(&addr_str, payload, chunk_size);
stop.store(true, Ordering::Relaxed);
let _ = server.join();
result
}
fn run_tcp_echo_chunked(addr: &str, payload: &[u8], chunk_size: usize) -> (Duration, Vec<u8>) {
let mut client = TcpStream::connect(addr).expect("connect");
client
.set_read_timeout(Some(Duration::from_secs(30)))
.unwrap();
client.set_nodelay(true).ok();
let payload_len = payload.len();
let mut received = Vec::with_capacity(payload_len);
let mut buf = vec![0u8; chunk_size];
let started = Instant::now();
let mut sent = 0;
while sent < payload_len {
let end = (sent + chunk_size).min(payload_len);
client.write_all(&payload[sent..end]).expect("write");
let want = end - sent;
let mut got = 0;
while got < want {
match client.read(&mut buf[got..want]) {
Ok(0) => panic!("unexpected EOF"),
Ok(n) => {
received.extend_from_slice(&buf[got..got + n]);
got += n;
}
Err(e) => panic!("read error: {e}"),
}
}
sent = end;
}
let elapsed = started.elapsed();
client.shutdown(std::net::Shutdown::Both).ok();
(elapsed, received)
}
struct AsyncTcpEcho;
impl AsyncEventHandler for AsyncTcpEcho {
fn on_accept(&self, conn: ConnCtx) -> impl Future<Output = ()> + 'static {
async move {
let mut stream = ConnStream::new(conn);
let mut buf = vec![0u8; 32 * 1024];
loop {
let n = match stream.read(&mut buf).await {
Ok(0) => break,
Ok(n) => n,
Err(_) => break,
};
if stream.write_all(&buf[..n]).await.is_err() {
break;
}
}
}
}
fn create_for_worker(_id: usize) -> Self {
AsyncTcpEcho
}
}
fn tcp_echo_round_trip_ringline(payload: &[u8], chunk_size: usize) -> (Duration, Vec<u8>) {
let port = free_port();
let addr = format!("127.0.0.1:{port}");
let bind: SocketAddr = addr.parse().unwrap();
let (shutdown, handles) = RinglineBuilder::new(test_config())
.bind(bind)
.launch::<AsyncTcpEcho>()
.expect("ringline launch");
wait_for_tcp(&addr);
let result = run_tcp_echo_chunked(&addr, payload, chunk_size);
shutdown.shutdown();
for h in handles {
h.join().unwrap().unwrap();
}
result
}
#[test]
fn tcp_throughput_round_trip_vs_std_net() {
let total_bytes = 8 * 1024 * 1024;
let chunk_size = 64 * 1024;
let payload: Vec<u8> = (0..total_bytes).map(|i| (i & 0xFF) as u8).collect();
let (rl_dur, rl_recv) = tcp_echo_round_trip_ringline(&payload, chunk_size);
assert_eq!(
rl_recv.len(),
payload.len(),
"ringline TCP echo length mismatch"
);
assert_eq!(rl_recv, payload, "ringline TCP echo payload mismatch");
let (std_dur, std_recv) = tcp_echo_round_trip_std(&payload, chunk_size);
assert_eq!(std_recv.len(), payload.len(), "std::net length mismatch");
assert_eq!(std_recv, payload, "std::net payload mismatch");
eprintln!(
"TCP {} MiB echo (chunked {} KiB): ringline={:?} ({:.0} MB/s), \
std::net={:?} ({:.0} MB/s), ratio={:.2}x",
total_bytes / 1024 / 1024,
chunk_size / 1024,
rl_dur,
total_bytes as f64 / 1e6 / rl_dur.as_secs_f64(),
std_dur,
total_bytes as f64 / 1e6 / std_dur.as_secs_f64(),
rl_dur.as_secs_f64() / std_dur.as_secs_f64()
);
assert!(
rl_dur < std_dur * 5,
"ringline TCP echo is way slower than std::net baseline: \
ringline={rl_dur:?}, std::net={std_dur:?}"
);
}
fn udp_request_reply_std(count: usize, payload_size: usize) -> Duration {
let server = UdpSocket::bind("127.0.0.1:0").expect("std udp bind");
let server_addr = server.local_addr().unwrap();
server.set_nonblocking(true).ok();
let stop = Arc::new(AtomicBool::new(false));
let stop_clone = stop.clone();
let server_thread = std::thread::spawn(move || {
let mut buf = vec![0u8; 65536];
while !stop_clone.load(Ordering::Relaxed) {
match server.recv_from(&mut buf) {
Ok((n, peer)) => {
let _ = server.send_to(&buf[..n], peer);
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
std::thread::sleep(Duration::from_micros(50));
}
Err(_) => break,
}
}
});
let elapsed = run_udp_request_reply(server_addr, count, payload_size);
stop.store(true, Ordering::Relaxed);
let _ = server_thread.join();
elapsed
}
struct AsyncUdpEcho;
impl AsyncEventHandler for AsyncUdpEcho {
fn on_accept(&self, _conn: ConnCtx) -> impl Future<Output = ()> + 'static {
async move {}
}
fn create_for_worker(_id: usize) -> Self {
AsyncUdpEcho
}
fn on_udp_bind(&self, udp: UdpCtx) -> Option<Pin<Box<dyn Future<Output = ()> + 'static>>> {
Some(Box::pin(async move {
UDP_HANDLER_STARTED.fetch_add(1, Ordering::SeqCst);
loop {
let (data, peer) = udp.recv_from().await;
loop {
match udp.send_to(peer, &data) {
Ok(()) => break,
Err(UdpSendError::PoolExhausted)
| Err(UdpSendError::SubmissionQueueFull) => {
udp.send_ready().await;
}
Err(_) => break,
}
}
}
}))
}
}
static UDP_HANDLER_STARTED: AtomicUsize = AtomicUsize::new(0);
fn udp_request_reply_ringline(count: usize, payload_size: usize) -> Duration {
UDP_HANDLER_STARTED.store(0, Ordering::SeqCst);
let port = free_udp_port();
let addr: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
let mut cfg = test_config();
cfg.udp_send_slots = 64;
cfg.udp_recv_queue_capacity = 4096;
let (shutdown, handles) = RinglineBuilder::new(cfg)
.bind_udp(addr)
.launch::<AsyncUdpEcho>()
.expect("ringline udp launch");
for _ in 0..400 {
if UDP_HANDLER_STARTED.load(Ordering::SeqCst) > 0 {
break;
}
std::thread::sleep(Duration::from_millis(10));
}
assert!(
UDP_HANDLER_STARTED.load(Ordering::SeqCst) > 0,
"ringline UDP handler did not start"
);
let elapsed = run_udp_request_reply(addr, count, payload_size);
shutdown.shutdown();
for h in handles {
h.join().unwrap().unwrap();
}
elapsed
}
fn run_udp_request_reply(server_addr: SocketAddr, count: usize, payload_size: usize) -> Duration {
let client = UdpSocket::bind("127.0.0.1:0").expect("udp client bind");
client
.set_read_timeout(Some(Duration::from_secs(2)))
.unwrap();
let mut payload = vec![0u8; payload_size];
let mut buf = vec![0u8; payload_size + 64];
let started = Instant::now();
for i in 0..count {
payload[..4].copy_from_slice(&(i as u32).to_le_bytes());
client.send_to(&payload, server_addr).expect("send");
let (_n, _src) = client.recv_from(&mut buf).expect("recv");
}
started.elapsed()
}
#[test]
fn udp_throughput_request_reply_vs_std_net() {
let count = 1_000;
let payload_size = 256;
let rl_dur = udp_request_reply_ringline(count, payload_size);
let std_dur = udp_request_reply_std(count, payload_size);
eprintln!(
"UDP {count} × {payload_size}B req/reply: \
ringline={rl_dur:?} ({:.0} rtt/s), \
std::net={std_dur:?} ({:.0} rtt/s), \
ratio={:.2}x",
count as f64 / rl_dur.as_secs_f64(),
count as f64 / std_dur.as_secs_f64(),
rl_dur.as_secs_f64() / std_dur.as_secs_f64()
);
assert!(
rl_dur < Duration::from_secs(30),
"ringline UDP req/reply took too long: {rl_dur:?} for {count} round-trips"
);
assert!(
rl_dur < std_dur * 5,
"ringline UDP req/reply is way slower than std::net: \
ringline={rl_dur:?}, std::net={std_dur:?}"
);
}
fn tcp_fire_and_forget_ringline(payload: &[u8]) -> Vec<u8> {
let port = free_port();
let addr = format!("127.0.0.1:{port}");
let bind: SocketAddr = addr.parse().unwrap();
let (shutdown, handles) = RinglineBuilder::new(test_config())
.bind(bind)
.launch::<AsyncTcpEcho>()
.expect("ringline launch");
wait_for_tcp(&addr);
let mut client = TcpStream::connect(&addr).expect("connect");
client
.set_read_timeout(Some(Duration::from_secs(60)))
.unwrap();
client.set_nodelay(true).ok();
let writer = {
let mut writer = client.try_clone().expect("clone");
let payload = payload.to_vec();
std::thread::spawn(move || {
writer.write_all(&payload).expect("write");
writer.shutdown(std::net::Shutdown::Write).ok();
})
};
let mut received = Vec::with_capacity(payload.len());
let mut buf = vec![0u8; 64 * 1024];
while received.len() < payload.len() {
match client.read(&mut buf) {
Ok(0) => break,
Ok(n) => received.extend_from_slice(&buf[..n]),
Err(e) => panic!("read error: {e}"),
}
}
writer.join().unwrap();
drop(client);
shutdown.shutdown();
for h in handles {
h.join().unwrap().unwrap();
}
received
}
#[test]
fn tcp_fire_and_forget_8mib_survives_kernel_buffer_full() {
let payload: Vec<u8> = (0..8 * 1024 * 1024).map(|i| (i & 0xFF) as u8).collect();
let received = tcp_fire_and_forget_ringline(&payload);
assert_eq!(
received.len(),
payload.len(),
"fire-and-forget echo lost data: got {} bytes, expected {}",
received.len(),
payload.len()
);
assert_eq!(received, payload, "echoed payload mismatch");
}