ringline 0.2.0

Async I/O runtime with io_uring (Linux) and mio (cross-platform) backends
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
#![allow(clippy::manual_async_fn)]
//! Throughput regression tests for TCP and UDP.
//!
//! Each test runs the same workload through ringline and through a
//! plain `std::net` server in a separate thread, then compares the
//! wall-clock time. The std::net path is the baseline — ringline must
//! be in the same ballpark, otherwise we've regressed below the
//! "vanilla blocking" implementation, which would make the runtime
//! pointless.
//!
//! These tests aren't precise benchmarks (kernel scheduling, CI
//! variance, build profile), so the assertions are loose: ringline must
//! complete and not be dramatically slower than std::net. The numbers
//! are also printed via `eprintln!` so a human reading test output can
//! spot drift.

use std::future::Future;
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream, UdpSocket};
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::{Duration, Instant};

use futures_util::{AsyncReadExt, AsyncWriteExt};
use ringline::{
    AsyncEventHandler, Config, ConnCtx, ConnStream, RinglineBuilder, UdpCtx, UdpSendError,
};

// ── Shared test config ────────────────────────────────────────────────

fn test_config() -> Config {
    let mut cfg = Config::default();
    cfg.worker.threads = 1;
    cfg.worker.pin_to_core = false;
    // Sized so an 8 MiB sustained echo fits without stalling on
    // resource limits — the slot count covers the worst-case in-flight
    // pipeline given default 16 KiB slots, the SQ holds enough entries
    // to keep io_uring busy, and the recv buffer ring matches.
    cfg.sq_entries = 2048;
    cfg.recv_buffer.ring_size = 512;
    cfg.recv_buffer.buffer_size = 16 * 1024;
    cfg.max_connections = 64;
    cfg.send_copy_count = 2048;
    cfg.standalone_task_capacity = 32;
    cfg
}

fn free_port() -> u16 {
    std::net::TcpListener::bind("127.0.0.1:0")
        .unwrap()
        .local_addr()
        .unwrap()
        .port()
}

fn free_udp_port() -> u16 {
    std::net::UdpSocket::bind("127.0.0.1:0")
        .unwrap()
        .local_addr()
        .unwrap()
        .port()
}

fn wait_for_tcp(addr: &str) {
    for _ in 0..200 {
        if TcpStream::connect(addr).is_ok() {
            return;
        }
        std::thread::sleep(Duration::from_millis(10));
    }
    panic!("TCP server did not come up at {addr}");
}

// ── TCP echo through std::net (baseline) ───────────────────────────────

/// Run a chunked echo against a `std::net` server thread. Returns
/// elapsed time + bytes received.
fn tcp_echo_round_trip_std(payload: &[u8], chunk_size: usize) -> (Duration, Vec<u8>) {
    let listener = TcpListener::bind("127.0.0.1:0").expect("std listen");
    let addr = listener.local_addr().unwrap();
    let stop = Arc::new(AtomicBool::new(false));
    let stop_clone = stop.clone();

    // Trivial echo server: read up to N bytes, write them back, repeat
    // until EOF.
    let server = std::thread::spawn(move || {
        let (mut stream, _) = listener.accept().expect("std accept");
        stream
            .set_read_timeout(Some(Duration::from_secs(30)))
            .unwrap();
        let mut buf = vec![0u8; 64 * 1024];
        while !stop_clone.load(Ordering::Relaxed) {
            match stream.read(&mut buf) {
                Ok(0) => break,
                Ok(n) => {
                    if stream.write_all(&buf[..n]).is_err() {
                        break;
                    }
                }
                Err(_) => break,
            }
        }
    });

    let addr_str = addr.to_string();
    let result = run_tcp_echo_chunked(&addr_str, payload, chunk_size);
    stop.store(true, Ordering::Relaxed);
    let _ = server.join();
    result
}

/// Chunked request-reply driver: writes `chunk_size` bytes, reads the
/// echo back, repeats until the full payload has round-tripped.
/// Keeping at most one chunk in flight stops either side's TCP buffers
/// from filling, which is the symptom of the `EAGAIN`-drops-queue
/// limitation in the runtime that an unbounded fire-and-forget writer
/// would hit.
fn run_tcp_echo_chunked(addr: &str, payload: &[u8], chunk_size: usize) -> (Duration, Vec<u8>) {
    let mut client = TcpStream::connect(addr).expect("connect");
    client
        .set_read_timeout(Some(Duration::from_secs(30)))
        .unwrap();
    client.set_nodelay(true).ok();

    let payload_len = payload.len();
    let mut received = Vec::with_capacity(payload_len);
    let mut buf = vec![0u8; chunk_size];
    let started = Instant::now();
    let mut sent = 0;
    while sent < payload_len {
        let end = (sent + chunk_size).min(payload_len);
        client.write_all(&payload[sent..end]).expect("write");
        let want = end - sent;
        let mut got = 0;
        while got < want {
            match client.read(&mut buf[got..want]) {
                Ok(0) => panic!("unexpected EOF"),
                Ok(n) => {
                    received.extend_from_slice(&buf[got..got + n]);
                    got += n;
                }
                Err(e) => panic!("read error: {e}"),
            }
        }
        sent = end;
    }
    let elapsed = started.elapsed();
    client.shutdown(std::net::Shutdown::Both).ok();
    (elapsed, received)
}

// ── TCP echo through ringline ──────────────────────────────────────────

/// Echoes via [`ConnStream`] so write calls block on send-pool
/// availability rather than silently dropping bytes — required for
/// sustained-throughput round-trips. (`with_data` + `send_nowait`
/// loses bytes when the pool fills, which is fine for tiny payloads
/// but breaks 8 MiB transfers.)
struct AsyncTcpEcho;

impl AsyncEventHandler for AsyncTcpEcho {
    fn on_accept(&self, conn: ConnCtx) -> impl Future<Output = ()> + 'static {
        async move {
            let mut stream = ConnStream::new(conn);
            let mut buf = vec![0u8; 32 * 1024];
            loop {
                let n = match stream.read(&mut buf).await {
                    Ok(0) => break,
                    Ok(n) => n,
                    Err(_) => break,
                };
                if stream.write_all(&buf[..n]).await.is_err() {
                    break;
                }
            }
        }
    }
    fn create_for_worker(_id: usize) -> Self {
        AsyncTcpEcho
    }
}

/// Run the same workload through a ringline server. Returns the same
/// (duration, received) tuple so callers can compare to the std::net
/// baseline.
fn tcp_echo_round_trip_ringline(payload: &[u8], chunk_size: usize) -> (Duration, Vec<u8>) {
    let port = free_port();
    let addr = format!("127.0.0.1:{port}");
    let bind: SocketAddr = addr.parse().unwrap();

    let (shutdown, handles) = RinglineBuilder::new(test_config())
        .bind(bind)
        .launch::<AsyncTcpEcho>()
        .expect("ringline launch");
    wait_for_tcp(&addr);

    let result = run_tcp_echo_chunked(&addr, payload, chunk_size);

    shutdown.shutdown();
    for h in handles {
        h.join().unwrap().unwrap();
    }
    result
}

#[test]
fn tcp_throughput_round_trip_vs_std_net() {
    // Chunked request-reply: write 64 KiB, read 64 KiB back, repeat
    // 128 times = 8 MiB total. With at most one chunk in flight on the
    // wire neither side's TCP buffers fill, which keeps the workload
    // inside ringline's happy path. (Fire-and-forget large writes hit
    // a separate limitation: when the kernel returns `EAGAIN` on send
    // because the send buffer filled, `handle_send` currently drops
    // the connection's pending queue instead of awaiting socket
    // writability. The right fix is multishot poll-on-POLLOUT + retry,
    // tracked separately. This test exercises the throughput path we
    // can compare apples-to-apples against std::net today.)
    let total_bytes = 8 * 1024 * 1024;
    let chunk_size = 64 * 1024;
    let payload: Vec<u8> = (0..total_bytes).map(|i| (i & 0xFF) as u8).collect();

    // Run ringline first to warm caches; debug-built test binaries
    // pay a notable cost on the first launch.
    let (rl_dur, rl_recv) = tcp_echo_round_trip_ringline(&payload, chunk_size);
    assert_eq!(
        rl_recv.len(),
        payload.len(),
        "ringline TCP echo length mismatch"
    );
    assert_eq!(rl_recv, payload, "ringline TCP echo payload mismatch");

    let (std_dur, std_recv) = tcp_echo_round_trip_std(&payload, chunk_size);
    assert_eq!(std_recv.len(), payload.len(), "std::net length mismatch");
    assert_eq!(std_recv, payload, "std::net payload mismatch");

    eprintln!(
        "TCP {} MiB echo (chunked {} KiB): ringline={:?} ({:.0} MB/s), \
         std::net={:?} ({:.0} MB/s), ratio={:.2}x",
        total_bytes / 1024 / 1024,
        chunk_size / 1024,
        rl_dur,
        total_bytes as f64 / 1e6 / rl_dur.as_secs_f64(),
        std_dur,
        total_bytes as f64 / 1e6 / std_dur.as_secs_f64(),
        rl_dur.as_secs_f64() / std_dur.as_secs_f64()
    );

    // Generous bound — we're flagging regressions, not benchmarking.
    // Ringline being more than 5× slower than blocking std::net means
    // something got broken.
    assert!(
        rl_dur < std_dur * 5,
        "ringline TCP echo is way slower than std::net baseline: \
         ringline={rl_dur:?}, std::net={std_dur:?}"
    );
}

// ── UDP echo through std::net (baseline) ───────────────────────────────

/// Run `count` synchronous request-reply round-trips against a
/// `std::net` UDP server thread. Returns elapsed time. Synchronous
/// (one-in-flight) so neither side's UDP socket buffer overruns —
/// this measures *per-round-trip latency* dominated by syscall + ack
/// overhead, which is what the runtime actually controls.
fn udp_request_reply_std(count: usize, payload_size: usize) -> Duration {
    let server = UdpSocket::bind("127.0.0.1:0").expect("std udp bind");
    let server_addr = server.local_addr().unwrap();
    server.set_nonblocking(true).ok();
    let stop = Arc::new(AtomicBool::new(false));
    let stop_clone = stop.clone();

    let server_thread = std::thread::spawn(move || {
        let mut buf = vec![0u8; 65536];
        while !stop_clone.load(Ordering::Relaxed) {
            match server.recv_from(&mut buf) {
                Ok((n, peer)) => {
                    let _ = server.send_to(&buf[..n], peer);
                }
                Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                    std::thread::sleep(Duration::from_micros(50));
                }
                Err(_) => break,
            }
        }
    });

    let elapsed = run_udp_request_reply(server_addr, count, payload_size);
    stop.store(true, Ordering::Relaxed);
    let _ = server_thread.join();
    elapsed
}

// ── UDP echo through ringline ──────────────────────────────────────────

struct AsyncUdpEcho;

impl AsyncEventHandler for AsyncUdpEcho {
    fn on_accept(&self, _conn: ConnCtx) -> impl Future<Output = ()> + 'static {
        async move {}
    }
    fn create_for_worker(_id: usize) -> Self {
        AsyncUdpEcho
    }
    fn on_udp_bind(&self, udp: UdpCtx) -> Option<Pin<Box<dyn Future<Output = ()> + 'static>>> {
        Some(Box::pin(async move {
            UDP_HANDLER_STARTED.fetch_add(1, Ordering::SeqCst);
            loop {
                let (data, peer) = udp.recv_from().await;
                loop {
                    match udp.send_to(peer, &data) {
                        Ok(()) => break,
                        Err(UdpSendError::PoolExhausted)
                        | Err(UdpSendError::SubmissionQueueFull) => {
                            udp.send_ready().await;
                        }
                        Err(_) => break,
                    }
                }
            }
        }))
    }
}

static UDP_HANDLER_STARTED: AtomicUsize = AtomicUsize::new(0);

fn udp_request_reply_ringline(count: usize, payload_size: usize) -> Duration {
    UDP_HANDLER_STARTED.store(0, Ordering::SeqCst);
    let port = free_udp_port();
    let addr: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
    let mut cfg = test_config();
    cfg.udp_send_slots = 64;
    cfg.udp_recv_queue_capacity = 4096;
    let (shutdown, handles) = RinglineBuilder::new(cfg)
        .bind_udp(addr)
        .launch::<AsyncUdpEcho>()
        .expect("ringline udp launch");

    for _ in 0..400 {
        if UDP_HANDLER_STARTED.load(Ordering::SeqCst) > 0 {
            break;
        }
        std::thread::sleep(Duration::from_millis(10));
    }
    assert!(
        UDP_HANDLER_STARTED.load(Ordering::SeqCst) > 0,
        "ringline UDP handler did not start"
    );

    let elapsed = run_udp_request_reply(addr, count, payload_size);

    shutdown.shutdown();
    for h in handles {
        h.join().unwrap().unwrap();
    }
    elapsed
}

/// Synchronous request-reply pump: send one datagram, recv its reply,
/// repeat. Avoids kernel-buffer overruns (each side has at most one
/// in-flight datagram) so loss is ~zero on loopback and the timing
/// reflects per-round-trip handler turnaround time.
fn run_udp_request_reply(server_addr: SocketAddr, count: usize, payload_size: usize) -> Duration {
    let client = UdpSocket::bind("127.0.0.1:0").expect("udp client bind");
    client
        .set_read_timeout(Some(Duration::from_secs(2)))
        .unwrap();

    let mut payload = vec![0u8; payload_size];
    let mut buf = vec![0u8; payload_size + 64];
    let started = Instant::now();
    for i in 0..count {
        payload[..4].copy_from_slice(&(i as u32).to_le_bytes());
        client.send_to(&payload, server_addr).expect("send");
        let (_n, _src) = client.recv_from(&mut buf).expect("recv");
    }
    started.elapsed()
}

#[test]
fn udp_throughput_request_reply_vs_std_net() {
    // 1 000 synchronous round-trips of 256-byte datagrams. With one
    // datagram in flight at a time neither kernel queue fills, so
    // delivery is reliable and the timing measures per-round-trip
    // handler overhead end-to-end.
    let count = 1_000;
    let payload_size = 256;

    let rl_dur = udp_request_reply_ringline(count, payload_size);
    let std_dur = udp_request_reply_std(count, payload_size);

    eprintln!(
        "UDP {count} × {payload_size}B req/reply: \
         ringline={rl_dur:?} ({:.0} rtt/s), \
         std::net={std_dur:?} ({:.0} rtt/s), \
         ratio={:.2}x",
        count as f64 / rl_dur.as_secs_f64(),
        count as f64 / std_dur.as_secs_f64(),
        rl_dur.as_secs_f64() / std_dur.as_secs_f64()
    );

    // Hard floor on completing the workload at all.
    assert!(
        rl_dur < Duration::from_secs(30),
        "ringline UDP req/reply took too long: {rl_dur:?} for {count} round-trips"
    );
    // Loose bound — single-flight UDP req/reply on loopback is dominated
    // by kernel syscall overhead; both implementations should be in
    // the same order of magnitude.
    assert!(
        rl_dur < std_dur * 5,
        "ringline UDP req/reply is way slower than std::net: \
         ringline={rl_dur:?}, std::net={std_dur:?}"
    );
}

// ── TCP fire-and-forget large transfer (EAGAIN backpressure path) ──────

/// Push the entire payload in one direction without waiting for echo
/// chunks, which forces the kernel TCP send buffer to fill (the peer
/// drains in the background). The runtime must arm `POLLOUT` and
/// retransparently retry sends that returned `-EAGAIN`; if it dropped
/// the queue (the old behavior), the receiver gets fewer bytes than
/// were sent.
fn tcp_fire_and_forget_ringline(payload: &[u8]) -> Vec<u8> {
    let port = free_port();
    let addr = format!("127.0.0.1:{port}");
    let bind: SocketAddr = addr.parse().unwrap();

    let (shutdown, handles) = RinglineBuilder::new(test_config())
        .bind(bind)
        .launch::<AsyncTcpEcho>()
        .expect("ringline launch");
    wait_for_tcp(&addr);

    let mut client = TcpStream::connect(&addr).expect("connect");
    client
        .set_read_timeout(Some(Duration::from_secs(60)))
        .unwrap();
    client.set_nodelay(true).ok();

    // Writer: shoves the full payload in; blocks on kernel send buffer.
    let writer = {
        let mut writer = client.try_clone().expect("clone");
        let payload = payload.to_vec();
        std::thread::spawn(move || {
            writer.write_all(&payload).expect("write");
            writer.shutdown(std::net::Shutdown::Write).ok();
        })
    };

    // Reader: drain echo until the server FINs.
    let mut received = Vec::with_capacity(payload.len());
    let mut buf = vec![0u8; 64 * 1024];
    while received.len() < payload.len() {
        match client.read(&mut buf) {
            Ok(0) => break,
            Ok(n) => received.extend_from_slice(&buf[..n]),
            Err(e) => panic!("read error: {e}"),
        }
    }
    writer.join().unwrap();
    drop(client);

    shutdown.shutdown();
    for h in handles {
        h.join().unwrap().unwrap();
    }
    received
}

#[test]
fn tcp_fire_and_forget_8mib_survives_kernel_buffer_full() {
    // 8 MiB unbounded write through one connection. The kernel TCP
    // send buffer (≈200 KiB by default) fills well before the writer
    // finishes, so the server's send CQEs return `-EAGAIN`. A correct
    // runtime arms `POLLOUT`, waits for writability, and retries the
    // send; ringline's previous behavior dropped the connection's
    // queued sends on EAGAIN, which would show up here as the receiver
    // getting < 8 MiB before the FIN.
    let payload: Vec<u8> = (0..8 * 1024 * 1024).map(|i| (i & 0xFF) as u8).collect();
    let received = tcp_fire_and_forget_ringline(&payload);
    assert_eq!(
        received.len(),
        payload.len(),
        "fire-and-forget echo lost data: got {} bytes, expected {}",
        received.len(),
        payload.len()
    );
    assert_eq!(received, payload, "echoed payload mismatch");

    // We don't assert `SEND_EAGAIN` ticked: with the close-time
    // deferral the kernel often manages to drain everything without
    // hitting `-EAGAIN` in the first place. The metric is still
    // wired (and exercised by tighter tests in the runtime crate
    // when we can force the kernel buffer to fill) but the
    // user-visible regression here is "data is preserved", not "the
    // EAGAIN-retry path fires".
}