kevy_uring/ring.rs
1//! The io_uring engine: `IoUring::new` sets up the kernel ring + maps the
2//! three shared regions; `prep_*` queues SQEs into the SQ; `submit_and_wait`
3//! enters the kernel; `for_each_completion` reaps completed CQEs.
4
5use core::ffi::{c_int, c_long, c_void};
6use core::ptr;
7use core::sync::atomic::{AtomicU32, Ordering};
8use std::io;
9
10use crate::completion::Completion;
11use crate::ffi::{
12 self, IORING_ENTER_GETEVENTS, IORING_ENTER_SQ_WAKEUP, IORING_OFF_CQ_RING, IORING_OFF_SQ_RING,
13 IORING_OFF_SQES, IORING_SETUP_COOP_TASKRUN, IORING_SETUP_SINGLE_ISSUER, IORING_SETUP_SQ_AFF,
14 IORING_SETUP_SQPOLL, IORING_SQ_NEED_WAKEUP, MAP_POPULATE, MAP_SHARED, PROT_READ, PROT_WRITE,
15 SYS_IO_URING_ENTER, SYS_IO_URING_SETUP,
16};
17use crate::layout::{IoUringParams, IoUringSqe};
18
19/// A Linux io_uring instance: one submission ring + one completion ring.
20pub struct IoUring {
21 pub(crate) ring_fd: c_int,
22 sq_mmap: *mut c_void,
23 sq_mmap_len: usize,
24 cq_mmap: *mut c_void,
25 cq_mmap_len: usize,
26 sqes: *mut IoUringSqe,
27 sqes_len: usize,
28 sq_entries: u32,
29 sq_mask: u32,
30 /// Local producer cursor; published to the kernel on `submit`.
31 sq_tail: u32,
32 sq_khead: *const AtomicU32,
33 sq_ktail: *const AtomicU32,
34 sq_array: *mut u32,
35 cq_mask: u32,
36 cq_khead: *const AtomicU32,
37 cq_ktail: *const AtomicU32,
38 cqes: *const Completion,
39 /// `*const AtomicU32` to the shared SQ flag word, **only** populated when
40 /// the ring was set up with `IORING_SETUP_SQPOLL`. `None` => classic mode,
41 /// always call `io_uring_enter` to submit; `Some` => check
42 /// `IORING_SQ_NEED_WAKEUP` first and skip the syscall when the SQ poll
43 /// thread is awake.
44 sq_flags: Option<*const AtomicU32>,
45 /// `(index, enter_flag)` for a successful registered-ring-fd setup. When
46 /// `Some((i, _))`, `submit_and_wait` passes `i` as the syscall fd and
47 /// ORs `IORING_ENTER_REGISTERED_RING` into the enter flags — the kernel
48 /// resolves the ring via the registered-rings table, skipping
49 /// `fget`/`fput` per syscall. `None` = raw `ring_fd` path.
50 pub(crate) enter_ring: Option<(u32, u32)>,
51 /// E14: iterations since the last `io_uring_enter` syscall. The reactor
52 /// calls `submit_and_wait(0)` every iter; if neither new SQEs were
53 /// queued nor `wait_nr > 0`, the syscall does no useful work (just
54 /// runs task_work for COOP_TASKRUN). Tracking this lets us skip the
55 /// syscall for up to [`ENTER_SKIP_THRESHOLD`] empty iterations — a
56 /// forced enter every N iters still flushes deferred task_work so
57 /// completions don't stall.
58 iters_since_enter: u32,
59}
60
61/// E14: maximum empty reactor iterations between forced `io_uring_enter`
62/// syscalls. Tuned so that completion delivery (task_work flush under
63/// COOP_TASKRUN) is bounded to ~16 microseconds even on a quiet shard
64/// (~1 M iters/s observed at -c1). The H4 diagnostic measured 10.7 M
65/// enter/s = ~12 wasted enters per actual op; capping at 16 cuts this to
66/// roughly 1 enter per op while preserving completion latency.
67const ENTER_SKIP_THRESHOLD: u32 = 2;
68
69// SAFETY: `IoUring` owns its fd and mappings exclusively; moving the whole
70// engine to another thread (one per shard) is sound. It is not `Sync`
71// (single owner).
72unsafe impl Send for IoUring {}
73
74/// Cursors recovered from the SQ ring mapping.
75struct SqCursors {
76 khead: *const AtomicU32,
77 ktail: *const AtomicU32,
78 array: *mut u32,
79 mask: u32,
80 tail: u32,
81 /// SQ flag word — `IORING_SQ_NEED_WAKEUP` lives here under SQPOLL.
82 flags: *const AtomicU32,
83}
84
85/// Cursors recovered from the CQ ring mapping.
86struct CqCursors {
87 khead: *const AtomicU32,
88 ktail: *const AtomicU32,
89 cqes: *const Completion,
90 mask: u32,
91}
92
93impl IoUring {
94 /// Create a ring sized for at least `entries` in-flight submissions.
95 pub fn new(entries: u32) -> io::Result<IoUring> {
96 Self::new_inner(entries, None)
97 }
98
99 /// Create a ring backed by a kernel-side **submission poll thread**
100 /// (`IORING_SETUP_SQPOLL`). Submissions are reaped without an
101 /// `io_uring_enter` syscall on the steady state; when the SQ poll
102 /// thread parks (after `idle_ms` ms with no work), userland wakes it
103 /// via [`Self::submit_and_wait`]'s SQ_WAKEUP path.
104 ///
105 /// `cpu = Some(c)` pins the kernel thread to CPU `c` via
106 /// `IORING_SETUP_SQ_AFF`. Costs 1 core at ~100% whenever traffic
107 /// flows; requires Linux 5.13+ (the version that dropped CAP_SYS_NICE
108 /// for SQPOLL).
109 ///
110 /// **Not suitable for kevy's per-shard reactor.** Each ring spawns
111 /// one kernel poll thread; in kevy's shared-nothing layout N shards
112 /// would spawn N poll threads, each contending for the same cores
113 /// as the shard threads (measured 2–15× throughput regression on
114 /// the lx64 reference box, 10 shards on 16 cores — see
115 /// `bench/PERF-ATTACK-LOG-2026-06-20.md` attack D5). Reserved for
116 /// callers with a single-threaded reactor and an unallocated core
117 /// budget for the kernel poll thread.
118 pub fn new_sqpoll(entries: u32, idle_ms: u32, cpu: Option<u32>) -> io::Result<IoUring> {
119 Self::new_inner(entries, Some((idle_ms, cpu)))
120 }
121
122 fn new_inner(entries: u32, sqpoll: Option<(u32, Option<u32>)>) -> io::Result<IoUring> {
123 let (ring_fd, p) = Self::setup_ring(entries, sqpoll)?;
124 let (sq_len, cq_len, sqes_len) = Self::region_sizes(&p);
125 let (sq_mmap, cq_mmap, sqes_map) =
126 Self::map_three_regions(ring_fd, sq_len, cq_len, sqes_len)?;
127
128 // SAFETY: `sq_off` / `cq_off` were filled by the kernel for this ring;
129 // their byte offsets lie inside the just-mapped regions.
130 let sq = unsafe { Self::sq_cursors(sq_mmap, &p) };
131 let cq = unsafe { Self::cq_cursors(cq_mmap, &p) };
132 let sq_flags = if sqpoll.is_some() { Some(sq.flags) } else { None };
133
134 let mut ring = IoUring {
135 ring_fd,
136 sq_mmap,
137 sq_mmap_len: sq_len,
138 cq_mmap,
139 cq_mmap_len: cq_len,
140 sqes: sqes_map as *mut IoUringSqe,
141 sqes_len,
142 sq_entries: p.sq_entries,
143 sq_mask: sq.mask,
144 sq_tail: sq.tail,
145 sq_khead: sq.khead,
146 sq_ktail: sq.ktail,
147 sq_array: sq.array,
148 cq_mask: cq.mask,
149 cq_khead: cq.khead,
150 cq_ktail: cq.ktail,
151 cqes: cq.cqes,
152 sq_flags,
153 enter_ring: None,
154 iters_since_enter: 0,
155 };
156 // Best-effort: register the ring's own fd into the calling thread's
157 // io_uring registered-rings table (Linux 5.18+). On success, subsequent
158 // `submit_and_wait` syscalls reference the ring by index and the
159 // kernel skips fget/fput on the ring fd per syscall. On older kernels
160 // this fails with EINVAL — the raw fd path stays in use.
161 ring.try_register_ring_fd();
162 Ok(ring)
163 }
164
165 /// `mmap` all three io_uring shared regions. On any failure, cleans up
166 /// the partial state (close fd, unmap what was already mapped) and
167 /// returns the original syscall error.
168 fn map_three_regions(
169 ring_fd: c_int,
170 sq_len: usize,
171 cq_len: usize,
172 sqes_len: usize,
173 ) -> io::Result<(*mut c_void, *mut c_void, *mut c_void)> {
174 let sq_mmap = Self::map_region(ring_fd, sq_len, IORING_OFF_SQ_RING).inspect_err(|_| {
175 // SAFETY: ring_fd came from setup; not yet observed elsewhere.
176 unsafe { ffi::close(ring_fd) };
177 })?;
178 let cq_mmap = Self::map_region(ring_fd, cq_len, IORING_OFF_CQ_RING).inspect_err(|_| {
179 // SAFETY: free what we mapped + close the fd.
180 unsafe {
181 ffi::munmap(sq_mmap, sq_len);
182 ffi::close(ring_fd);
183 }
184 })?;
185 let sqes_map = Self::map_region(ring_fd, sqes_len, IORING_OFF_SQES).inspect_err(|_| {
186 // SAFETY: free what we mapped + close the fd.
187 unsafe {
188 ffi::munmap(cq_mmap, cq_len);
189 ffi::munmap(sq_mmap, sq_len);
190 ffi::close(ring_fd);
191 }
192 })?;
193 Ok((sq_mmap, cq_mmap, sqes_map))
194 }
195
196 /// Issue `io_uring_setup` and return `(ring_fd, params)`. When `sqpoll`
197 /// is `Some((idle_ms, cpu))`, configures the kernel-side SQ poll thread.
198 ///
199 /// For the non-SQPOLL path (the default kevy reactor) tries
200 /// `IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_COOP_TASKRUN` first
201 /// (Linux 6.0+, +3–5% measured on the lx64 reference) and falls back
202 /// to a plain setup if the kernel rejects them (EINVAL). The fallback
203 /// keeps Linux 5.13+ supported with no hard version check.
204 ///
205 /// **Not enabled**: `IORING_SETUP_DEFER_TASKRUN` (Linux 6.1+) — it
206 /// changes the CQ ring semantics so completions only land after
207 /// `io_uring_enter` is called. kevy's reactor busy-polls the CQ ring
208 /// directly without entering the kernel on the steady state, so
209 /// DEFER_TASKRUN starves completions (measured 65–73% regression in
210 /// the E2 isolation, see `bench/PERF-ATTACK-LOG-2026-06-20.md`).
211 fn setup_ring(
212 entries: u32,
213 sqpoll: Option<(u32, Option<u32>)>,
214 ) -> io::Result<(c_int, IoUringParams)> {
215 // SQPOLL is mutually exclusive with the cooperative flags
216 // (the SQ poll kernel thread is the one running task_work, not the
217 // user thread). Otherwise prefer the strongest set the kernel
218 // accepts; fall back on EINVAL by dropping flags level by level.
219 let sqpoll_flags: u32 = match sqpoll {
220 Some(_) => IORING_SETUP_SQPOLL,
221 None => 0,
222 };
223 let modern_flag_tiers: &[u32] = if sqpoll.is_some() {
224 &[0]
225 } else {
226 &[IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_COOP_TASKRUN, 0]
227 };
228
229 for &modern in modern_flag_tiers {
230 let mut p = IoUringParams::default();
231 p.flags = sqpoll_flags | modern;
232 if let Some((idle_ms, cpu)) = sqpoll {
233 p.sq_thread_idle = idle_ms;
234 if let Some(c) = cpu {
235 p.flags |= IORING_SETUP_SQ_AFF;
236 p.sq_thread_cpu = c;
237 }
238 }
239 // SAFETY: `&mut p` lives across this call; kernel writes via ptr.
240 let fd = unsafe {
241 ffi::syscall(
242 SYS_IO_URING_SETUP,
243 entries as c_long,
244 &mut p as *mut IoUringParams,
245 )
246 };
247 if fd >= 0 {
248 return Ok((fd as c_int, p));
249 }
250 let err = io::Error::last_os_error();
251 // EINVAL = kernel doesn't recognise these flags. Try next tier.
252 if err.raw_os_error() != Some(22) {
253 return Err(err);
254 }
255 }
256 Err(io::Error::last_os_error())
257 }
258
259 /// Compute the three mapping lengths the kernel needs us to map.
260 fn region_sizes(p: &IoUringParams) -> (usize, usize, usize) {
261 let sq_len = (p.sq_off.array as usize) + (p.sq_entries as usize) * 4;
262 let cq_len =
263 (p.cq_off.cqes as usize) + (p.cq_entries as usize) * core::mem::size_of::<Completion>();
264 let sqes_len = (p.sq_entries as usize) * core::mem::size_of::<IoUringSqe>();
265 (sq_len, cq_len, sqes_len)
266 }
267
268 /// `mmap` one of the three io_uring regions (`MAP_SHARED | MAP_POPULATE`).
269 fn map_region(ring_fd: c_int, len: usize, off: i64) -> io::Result<*mut c_void> {
270 // SAFETY: kernel-validated `len`/`off`/`ring_fd`; null hint lets the
271 // kernel pick the address. Returns -1 on failure.
272 let m = unsafe {
273 ffi::mmap(
274 ptr::null_mut(),
275 len,
276 PROT_READ | PROT_WRITE,
277 MAP_SHARED | MAP_POPULATE,
278 ring_fd,
279 off,
280 )
281 };
282 if m as isize == -1 {
283 return Err(io::Error::last_os_error());
284 }
285 Ok(m)
286 }
287
288 /// Extract the SQ cursors from a just-mapped SQ region.
289 ///
290 /// # Safety
291 /// `sq_mmap` must point to a region of at least
292 /// `p.sq_off.array + p.sq_entries * 4` bytes, and the kernel must have
293 /// filled `p.sq_off` for this ring.
294 unsafe fn sq_cursors(sq_mmap: *mut c_void, p: &IoUringParams) -> SqCursors {
295 let base = sq_mmap as usize;
296 let at = |off: u32| (base + off as usize) as *const AtomicU32;
297 let khead = at(p.sq_off.head);
298 let ktail = at(p.sq_off.tail);
299 let flags = at(p.sq_off.flags);
300 let array = (base + p.sq_off.array as usize) as *mut u32;
301 // SAFETY: caller's invariant says `ring_mask` is inside the region.
302 let mask = unsafe { *((base + p.sq_off.ring_mask as usize) as *const u32) };
303 // SAFETY: ktail is published by the kernel; reading current tail at
304 // construction lets us start the local cursor in sync.
305 let tail = unsafe { (*ktail).load(Ordering::Acquire) };
306 SqCursors { khead, ktail, array, mask, tail, flags }
307 }
308
309 /// Extract the CQ cursors from a just-mapped CQ region.
310 ///
311 /// # Safety
312 /// `cq_mmap` must point to a region of at least
313 /// `p.cq_off.cqes + p.cq_entries * sizeof(Completion)` bytes.
314 unsafe fn cq_cursors(cq_mmap: *mut c_void, p: &IoUringParams) -> CqCursors {
315 let base = cq_mmap as usize;
316 let at = |off: u32| (base + off as usize) as *const AtomicU32;
317 let khead = at(p.cq_off.head);
318 let ktail = at(p.cq_off.tail);
319 let cqes = (base + p.cq_off.cqes as usize) as *const Completion;
320 // SAFETY: caller's invariant says `ring_mask` is inside the region.
321 let mask = unsafe { *((base + p.cq_off.ring_mask as usize) as *const u32) };
322 CqCursors { khead, ktail, cqes, mask }
323 }
324
325 /// Reserve the next SQ slot (advancing the producer cursor + array map);
326 /// returns its SQE index, or `None` if the submission queue is full.
327 /// Called from the `prep_*` helpers in [`crate::prep`].
328 pub(crate) fn reserve(&mut self) -> Option<usize> {
329 // SAFETY: `sq_khead` is the kernel-published head ptr.
330 let khead = unsafe { (*self.sq_khead).load(Ordering::Acquire) };
331 if self.sq_tail.wrapping_sub(khead) >= self.sq_entries {
332 return None; // SQ full
333 }
334 let idx = (self.sq_tail & self.sq_mask) as usize;
335 // The SQ `array` maps a ring slot to an SQE index (here 1:1).
336 // SAFETY: `idx < sq_entries` ensures we're inside `sq_array`.
337 unsafe { *self.sq_array.add(idx) = idx as u32 };
338 self.sq_tail = self.sq_tail.wrapping_add(1);
339 Some(idx)
340 }
341
342 /// Raw SQE table pointer — exposed for the `prep_*` helpers in
343 /// [`crate::prep`]. Returned slot `idx` must come from `reserve()`.
344 #[inline]
345 pub(crate) fn sqes_ptr(&mut self) -> *mut IoUringSqe {
346 self.sqes
347 }
348
349 /// Publish queued submissions and enter the kernel, optionally waiting for
350 /// `wait_nr` completions. Returns the number of SQEs consumed.
351 ///
352 /// **SQPOLL fast path**: when the ring was constructed via
353 /// [`Self::new_sqpoll`] and the SQ poll thread is awake
354 /// (`IORING_SQ_NEED_WAKEUP` clear) and the caller doesn't need to block
355 /// on completions (`wait_nr == 0`), we publish the tail and return
356 /// **without any syscall** — the kernel thread will reap submissions on
357 /// its next poll spin.
358 pub fn submit_and_wait(&mut self, wait_nr: u32) -> io::Result<u32> {
359 // SAFETY: `sq_ktail` is the kernel-published tail ptr.
360 let prev = unsafe { (*self.sq_ktail).load(Ordering::Relaxed) };
361 let to_submit = self.sq_tail.wrapping_sub(prev);
362 // SAFETY: publishing our local tail to the kernel-shared atomic.
363 unsafe { (*self.sq_ktail).store(self.sq_tail, Ordering::Release) };
364
365 // **E14 (replaces dropped E3).** H4 diagnostic showed 10.7 M
366 // io_uring_enter/s aggregate = ~12 wasted enters per actual op
367 // on the steady -c1 hot path. E3's naive "skip when to_submit==0
368 // && wait_nr==0" regressed because COOP_TASKRUN delays
369 // completion task_work until the next enter. E14 keeps a
370 // periodic forced enter: skip up to ENTER_SKIP_THRESHOLD empty
371 // iters in a row, then enter once to flush task_work. The skip
372 // path is gated on the non-SQPOLL case (SQPOLL has its own skip
373 // below) and on wait_nr == 0 (the caller doesn't need a
374 // completion to arrive).
375 //
376 // **A11 attempted (2026-06-20), REVERTED.** Tried adding
377 // `IORING_SETUP_TASKRUN_FLAG` (Linux 6.0+) so the kernel sets
378 // `IORING_SQ_TASKRUN` in sq_flags when task_work is pending,
379 // skipping the syscall whenever the bit was clear. On the lx64
380 // reference box this regressed -c1 GET by ~30% with multi-second
381 // 3.6k-rps stalls mid-test; the bit's set/clear timing under
382 // COOP_TASKRUN didn't match the busy-poll loop closely enough
383 // to remain race-free, and adding the E14 counter as a safety
384 // net on top still left a window where CQEs piled up between
385 // bit-clear observations. See bench/PERF-ATTACK-LOG-2026-06-20.md
386 // for the data.
387 if to_submit == 0 && wait_nr == 0 && self.sq_flags.is_none() {
388 self.iters_since_enter = self.iters_since_enter.saturating_add(1);
389 if self.iters_since_enter < ENTER_SKIP_THRESHOLD {
390 return Ok(0);
391 }
392 // Reached the threshold — fall through to the syscall path
393 // below so task_work flushes. Counter resets after syscall.
394 }
395
396 let mut enter_flags = if wait_nr > 0 { IORING_ENTER_GETEVENTS } else { 0 };
397 if let Some(sq_flags_ptr) = self.sq_flags {
398 // SAFETY: `sq_flags_ptr` lives inside the SQ mmap, valid for ring
399 // lifetime. Kernel writes IORING_SQ_NEED_WAKEUP on park; Acquire
400 // pairs with the kernel's Release on update.
401 let sq_flags = unsafe { (*sq_flags_ptr).load(Ordering::Acquire) };
402 if sq_flags & IORING_SQ_NEED_WAKEUP != 0 {
403 enter_flags |= IORING_ENTER_SQ_WAKEUP;
404 } else if wait_nr == 0 {
405 // SQ poll thread is awake and caller doesn't need to wait —
406 // skip the syscall entirely. This is the SQPOLL fast path.
407 return Ok(to_submit);
408 }
409 }
410 // E1.5: when the ring is self-registered (IORING_REGISTER_RING_FDS),
411 // pass the registered index instead of the raw fd. The kernel skips
412 // its per-syscall fget/fput on the ring.
413 let (syscall_fd, extra_flags) = match self.enter_ring {
414 Some((idx, flag)) => (idx as c_long, flag),
415 None => (self.ring_fd as c_long, 0),
416 };
417 enter_flags |= extra_flags;
418 // SAFETY: kernel-validated args; no Rust memory is read/written.
419 let ret = unsafe {
420 ffi::syscall(
421 SYS_IO_URING_ENTER,
422 syscall_fd,
423 to_submit as c_long,
424 wait_nr as c_long,
425 enter_flags as c_long,
426 ptr::null::<c_void>(),
427 0usize,
428 )
429 };
430 if ret < 0 {
431 return Err(io::Error::last_os_error());
432 }
433 // E14: real enter happened — counter resets.
434 self.iters_since_enter = 0;
435 Ok(ret as u32)
436 }
437
438 /// Reap every available completion, calling `f` for each; returns the count.
439 pub fn for_each_completion<F: FnMut(Completion)>(&mut self, mut f: F) -> u32 {
440 // SAFETY: cq_khead / cq_ktail are the kernel-shared cursors.
441 let mut head = unsafe { (*self.cq_khead).load(Ordering::Relaxed) };
442 let tail = unsafe { (*self.cq_ktail).load(Ordering::Acquire) };
443 let mut n = 0;
444 while head != tail {
445 let idx = (head & self.cq_mask) as usize;
446 // SAFETY: `idx < cq_entries` by mask; cqes points to that array.
447 let cqe = unsafe { *self.cqes.add(idx) };
448 f(cqe);
449 head = head.wrapping_add(1);
450 n += 1;
451 }
452 // SAFETY: publish the consumer head to the kernel.
453 unsafe { (*self.cq_khead).store(head, Ordering::Release) };
454 n
455 }
456
457
458}
459
460impl Drop for IoUring {
461 fn drop(&mut self) {
462 // SAFETY: each pointer is the matching `mmap` return; fd is ours.
463 unsafe {
464 ffi::munmap(self.sqes as *mut c_void, self.sqes_len);
465 ffi::munmap(self.cq_mmap, self.cq_mmap_len);
466 ffi::munmap(self.sq_mmap, self.sq_mmap_len);
467 ffi::close(self.ring_fd);
468 }
469 }
470}