Skip to main content

kevy_uring/
ring.rs

1//! The io_uring engine: `IoUring::new` sets up the kernel ring + maps the
2//! three shared regions; `prep_*` queues SQEs into the SQ; `submit_and_wait`
3//! enters the kernel; `for_each_completion` reaps completed CQEs.
4
5use core::ffi::{c_int, c_long, c_void};
6use core::ptr;
7use core::sync::atomic::{AtomicU32, Ordering};
8use std::io;
9
10use crate::completion::Completion;
11use crate::ffi::{
12    self, IORING_ENTER_GETEVENTS, IORING_ENTER_SQ_WAKEUP, IORING_OFF_CQ_RING, IORING_OFF_SQ_RING,
13    IORING_OFF_SQES, IORING_SETUP_COOP_TASKRUN, IORING_SETUP_SINGLE_ISSUER, IORING_SETUP_SQ_AFF,
14    IORING_SETUP_SQPOLL, IORING_SQ_NEED_WAKEUP, MAP_POPULATE, MAP_SHARED, PROT_READ, PROT_WRITE,
15    SYS_IO_URING_ENTER, SYS_IO_URING_SETUP,
16};
17use crate::layout::{IoUringParams, IoUringSqe};
18
19/// A Linux io_uring instance: one submission ring + one completion ring.
20pub struct IoUring {
21    pub(crate) ring_fd: c_int,
22    sq_mmap: *mut c_void,
23    sq_mmap_len: usize,
24    cq_mmap: *mut c_void,
25    cq_mmap_len: usize,
26    sqes: *mut IoUringSqe,
27    sqes_len: usize,
28    sq_entries: u32,
29    sq_mask: u32,
30    /// Local producer cursor; published to the kernel on `submit`.
31    sq_tail: u32,
32    sq_khead: *const AtomicU32,
33    sq_ktail: *const AtomicU32,
34    sq_array: *mut u32,
35    cq_mask: u32,
36    cq_khead: *const AtomicU32,
37    cq_ktail: *const AtomicU32,
38    cqes: *const Completion,
39    /// `*const AtomicU32` to the shared SQ flag word, **only** populated when
40    /// the ring was set up with `IORING_SETUP_SQPOLL`. `None` => classic mode,
41    /// always call `io_uring_enter` to submit; `Some` => check
42    /// `IORING_SQ_NEED_WAKEUP` first and skip the syscall when the SQ poll
43    /// thread is awake.
44    sq_flags: Option<*const AtomicU32>,
45    /// `(index, enter_flag)` for a successful registered-ring-fd setup. When
46    /// `Some((i, _))`, `submit_and_wait` passes `i` as the syscall fd and
47    /// ORs `IORING_ENTER_REGISTERED_RING` into the enter flags — the kernel
48    /// resolves the ring via the registered-rings table, skipping
49    /// `fget`/`fput` per syscall. `None` = raw `ring_fd` path.
50    pub(crate) enter_ring: Option<(u32, u32)>,
51    /// E14: iterations since the last `io_uring_enter` syscall. The reactor
52    /// calls `submit_and_wait(0)` every iter; if neither new SQEs were
53    /// queued nor `wait_nr > 0`, the syscall does no useful work (just
54    /// runs task_work for COOP_TASKRUN). Tracking this lets us skip the
55    /// syscall for up to [`ENTER_SKIP_THRESHOLD`] empty iterations — a
56    /// forced enter every N iters still flushes deferred task_work so
57    /// completions don't stall.
58    iters_since_enter: u32,
59}
60
61/// E14: maximum empty reactor iterations between forced `io_uring_enter`
62/// syscalls. Tuned so that completion delivery (task_work flush under
63/// COOP_TASKRUN) is bounded to ~16 microseconds even on a quiet shard
64/// (~1 M iters/s observed at -c1). The H4 diagnostic measured 10.7 M
65/// enter/s = ~12 wasted enters per actual op; capping at 16 cuts this to
66/// roughly 1 enter per op while preserving completion latency.
67const ENTER_SKIP_THRESHOLD: u32 = 2;
68
69// SAFETY: `IoUring` owns its fd and mappings exclusively; moving the whole
70// engine to another thread (one per shard) is sound. It is not `Sync`
71// (single owner).
72unsafe impl Send for IoUring {}
73
74/// Cursors recovered from the SQ ring mapping.
75struct SqCursors {
76    khead: *const AtomicU32,
77    ktail: *const AtomicU32,
78    array: *mut u32,
79    mask: u32,
80    tail: u32,
81    /// SQ flag word — `IORING_SQ_NEED_WAKEUP` lives here under SQPOLL.
82    flags: *const AtomicU32,
83}
84
85/// Cursors recovered from the CQ ring mapping.
86struct CqCursors {
87    khead: *const AtomicU32,
88    ktail: *const AtomicU32,
89    cqes: *const Completion,
90    mask: u32,
91}
92
93impl IoUring {
94    /// Create a ring sized for at least `entries` in-flight submissions.
95    pub fn new(entries: u32) -> io::Result<IoUring> {
96        Self::new_inner(entries, None)
97    }
98
99    /// Create a ring backed by a kernel-side **submission poll thread**
100    /// (`IORING_SETUP_SQPOLL`). Submissions are reaped without an
101    /// `io_uring_enter` syscall on the steady state; when the SQ poll
102    /// thread parks (after `idle_ms` ms with no work), userland wakes it
103    /// via [`Self::submit_and_wait`]'s SQ_WAKEUP path.
104    ///
105    /// `cpu = Some(c)` pins the kernel thread to CPU `c` via
106    /// `IORING_SETUP_SQ_AFF`. Costs 1 core at ~100% whenever traffic
107    /// flows; requires Linux 5.13+ (the version that dropped CAP_SYS_NICE
108    /// for SQPOLL).
109    ///
110    /// **Not suitable for kevy's per-shard reactor.** Each ring spawns
111    /// one kernel poll thread; in kevy's shared-nothing layout N shards
112    /// would spawn N poll threads, each contending for the same cores
113    /// as the shard threads (measured 2–15× throughput regression on
114    /// the lx64 reference box, 10 shards on 16 cores — see
115    /// `bench/PERF-ATTACK-LOG-2026-06-20.md` attack D5). Reserved for
116    /// callers with a single-threaded reactor and an unallocated core
117    /// budget for the kernel poll thread.
118    pub fn new_sqpoll(entries: u32, idle_ms: u32, cpu: Option<u32>) -> io::Result<IoUring> {
119        Self::new_inner(entries, Some((idle_ms, cpu)))
120    }
121
122    fn new_inner(entries: u32, sqpoll: Option<(u32, Option<u32>)>) -> io::Result<IoUring> {
123        let (ring_fd, p) = Self::setup_ring(entries, sqpoll)?;
124        let (sq_len, cq_len, sqes_len) = Self::region_sizes(&p);
125        let (sq_mmap, cq_mmap, sqes_map) =
126            Self::map_three_regions(ring_fd, sq_len, cq_len, sqes_len)?;
127
128        // SAFETY: `sq_off` / `cq_off` were filled by the kernel for this ring;
129        // their byte offsets lie inside the just-mapped regions.
130        let sq = unsafe { Self::sq_cursors(sq_mmap, &p) };
131        let cq = unsafe { Self::cq_cursors(cq_mmap, &p) };
132        let sq_flags = if sqpoll.is_some() { Some(sq.flags) } else { None };
133
134        let mut ring = IoUring {
135            ring_fd,
136            sq_mmap,
137            sq_mmap_len: sq_len,
138            cq_mmap,
139            cq_mmap_len: cq_len,
140            sqes: sqes_map as *mut IoUringSqe,
141            sqes_len,
142            sq_entries: p.sq_entries,
143            sq_mask: sq.mask,
144            sq_tail: sq.tail,
145            sq_khead: sq.khead,
146            sq_ktail: sq.ktail,
147            sq_array: sq.array,
148            cq_mask: cq.mask,
149            cq_khead: cq.khead,
150            cq_ktail: cq.ktail,
151            cqes: cq.cqes,
152            sq_flags,
153            enter_ring: None,
154            iters_since_enter: 0,
155        };
156        // Best-effort: register the ring's own fd into the calling thread's
157        // io_uring registered-rings table (Linux 5.18+). On success, subsequent
158        // `submit_and_wait` syscalls reference the ring by index and the
159        // kernel skips fget/fput on the ring fd per syscall. On older kernels
160        // this fails with EINVAL — the raw fd path stays in use.
161        ring.try_register_ring_fd();
162        Ok(ring)
163    }
164
165    /// `mmap` all three io_uring shared regions. On any failure, cleans up
166    /// the partial state (close fd, unmap what was already mapped) and
167    /// returns the original syscall error.
168    fn map_three_regions(
169        ring_fd: c_int,
170        sq_len: usize,
171        cq_len: usize,
172        sqes_len: usize,
173    ) -> io::Result<(*mut c_void, *mut c_void, *mut c_void)> {
174        let sq_mmap = Self::map_region(ring_fd, sq_len, IORING_OFF_SQ_RING).inspect_err(|_| {
175            // SAFETY: ring_fd came from setup; not yet observed elsewhere.
176            unsafe { ffi::close(ring_fd) };
177        })?;
178        let cq_mmap = Self::map_region(ring_fd, cq_len, IORING_OFF_CQ_RING).inspect_err(|_| {
179            // SAFETY: free what we mapped + close the fd.
180            unsafe {
181                ffi::munmap(sq_mmap, sq_len);
182                ffi::close(ring_fd);
183            }
184        })?;
185        let sqes_map = Self::map_region(ring_fd, sqes_len, IORING_OFF_SQES).inspect_err(|_| {
186            // SAFETY: free what we mapped + close the fd.
187            unsafe {
188                ffi::munmap(cq_mmap, cq_len);
189                ffi::munmap(sq_mmap, sq_len);
190                ffi::close(ring_fd);
191            }
192        })?;
193        Ok((sq_mmap, cq_mmap, sqes_map))
194    }
195
196    /// Issue `io_uring_setup` and return `(ring_fd, params)`. When `sqpoll`
197    /// is `Some((idle_ms, cpu))`, configures the kernel-side SQ poll thread.
198    ///
199    /// For the non-SQPOLL path (the default kevy reactor) tries
200    /// `IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_COOP_TASKRUN` first
201    /// (Linux 6.0+, +3–5% measured on the lx64 reference) and falls back
202    /// to a plain setup if the kernel rejects them (EINVAL). The fallback
203    /// keeps Linux 5.13+ supported with no hard version check.
204    ///
205    /// **Not enabled**: `IORING_SETUP_DEFER_TASKRUN` (Linux 6.1+) — it
206    /// changes the CQ ring semantics so completions only land after
207    /// `io_uring_enter` is called. kevy's reactor busy-polls the CQ ring
208    /// directly without entering the kernel on the steady state, so
209    /// DEFER_TASKRUN starves completions (measured 65–73% regression in
210    /// the E2 isolation, see `bench/PERF-ATTACK-LOG-2026-06-20.md`).
211    fn setup_ring(
212        entries: u32,
213        sqpoll: Option<(u32, Option<u32>)>,
214    ) -> io::Result<(c_int, IoUringParams)> {
215        // SQPOLL is mutually exclusive with the cooperative flags
216        // (the SQ poll kernel thread is the one running task_work, not the
217        // user thread). Otherwise prefer the strongest set the kernel
218        // accepts; fall back on EINVAL by dropping flags level by level.
219        let sqpoll_flags: u32 = match sqpoll {
220            Some(_) => IORING_SETUP_SQPOLL,
221            None => 0,
222        };
223        let modern_flag_tiers: &[u32] = if sqpoll.is_some() {
224            &[0]
225        } else {
226            &[IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_COOP_TASKRUN, 0]
227        };
228
229        for &modern in modern_flag_tiers {
230            let mut p = IoUringParams::default();
231            p.flags = sqpoll_flags | modern;
232            if let Some((idle_ms, cpu)) = sqpoll {
233                p.sq_thread_idle = idle_ms;
234                if let Some(c) = cpu {
235                    p.flags |= IORING_SETUP_SQ_AFF;
236                    p.sq_thread_cpu = c;
237                }
238            }
239            // SAFETY: `&mut p` lives across this call; kernel writes via ptr.
240            let fd = unsafe {
241                ffi::syscall(
242                    SYS_IO_URING_SETUP,
243                    entries as c_long,
244                    &mut p as *mut IoUringParams,
245                )
246            };
247            if fd >= 0 {
248                return Ok((fd as c_int, p));
249            }
250            let err = io::Error::last_os_error();
251            // EINVAL = kernel doesn't recognise these flags. Try next tier.
252            if err.raw_os_error() != Some(22) {
253                return Err(err);
254            }
255        }
256        Err(io::Error::last_os_error())
257    }
258
259    /// Compute the three mapping lengths the kernel needs us to map.
260    fn region_sizes(p: &IoUringParams) -> (usize, usize, usize) {
261        let sq_len = (p.sq_off.array as usize) + (p.sq_entries as usize) * 4;
262        let cq_len =
263            (p.cq_off.cqes as usize) + (p.cq_entries as usize) * core::mem::size_of::<Completion>();
264        let sqes_len = (p.sq_entries as usize) * core::mem::size_of::<IoUringSqe>();
265        (sq_len, cq_len, sqes_len)
266    }
267
268    /// `mmap` one of the three io_uring regions (`MAP_SHARED | MAP_POPULATE`).
269    fn map_region(ring_fd: c_int, len: usize, off: i64) -> io::Result<*mut c_void> {
270        // SAFETY: kernel-validated `len`/`off`/`ring_fd`; null hint lets the
271        // kernel pick the address. Returns -1 on failure.
272        let m = unsafe {
273            ffi::mmap(
274                ptr::null_mut(),
275                len,
276                PROT_READ | PROT_WRITE,
277                MAP_SHARED | MAP_POPULATE,
278                ring_fd,
279                off,
280            )
281        };
282        if m as isize == -1 {
283            return Err(io::Error::last_os_error());
284        }
285        Ok(m)
286    }
287
288    /// Extract the SQ cursors from a just-mapped SQ region.
289    ///
290    /// # Safety
291    /// `sq_mmap` must point to a region of at least
292    /// `p.sq_off.array + p.sq_entries * 4` bytes, and the kernel must have
293    /// filled `p.sq_off` for this ring.
294    unsafe fn sq_cursors(sq_mmap: *mut c_void, p: &IoUringParams) -> SqCursors {
295        let base = sq_mmap as usize;
296        let at = |off: u32| (base + off as usize) as *const AtomicU32;
297        let khead = at(p.sq_off.head);
298        let ktail = at(p.sq_off.tail);
299        let flags = at(p.sq_off.flags);
300        let array = (base + p.sq_off.array as usize) as *mut u32;
301        // SAFETY: caller's invariant says `ring_mask` is inside the region.
302        let mask = unsafe { *((base + p.sq_off.ring_mask as usize) as *const u32) };
303        // SAFETY: ktail is published by the kernel; reading current tail at
304        // construction lets us start the local cursor in sync.
305        let tail = unsafe { (*ktail).load(Ordering::Acquire) };
306        SqCursors { khead, ktail, array, mask, tail, flags }
307    }
308
309    /// Extract the CQ cursors from a just-mapped CQ region.
310    ///
311    /// # Safety
312    /// `cq_mmap` must point to a region of at least
313    /// `p.cq_off.cqes + p.cq_entries * sizeof(Completion)` bytes.
314    unsafe fn cq_cursors(cq_mmap: *mut c_void, p: &IoUringParams) -> CqCursors {
315        let base = cq_mmap as usize;
316        let at = |off: u32| (base + off as usize) as *const AtomicU32;
317        let khead = at(p.cq_off.head);
318        let ktail = at(p.cq_off.tail);
319        let cqes = (base + p.cq_off.cqes as usize) as *const Completion;
320        // SAFETY: caller's invariant says `ring_mask` is inside the region.
321        let mask = unsafe { *((base + p.cq_off.ring_mask as usize) as *const u32) };
322        CqCursors { khead, ktail, cqes, mask }
323    }
324
325    /// Reserve the next SQ slot (advancing the producer cursor + array map);
326    /// returns its SQE index, or `None` if the submission queue is full.
327    /// Called from the `prep_*` helpers in [`crate::prep`].
328    pub(crate) fn reserve(&mut self) -> Option<usize> {
329        // SAFETY: `sq_khead` is the kernel-published head ptr.
330        let khead = unsafe { (*self.sq_khead).load(Ordering::Acquire) };
331        if self.sq_tail.wrapping_sub(khead) >= self.sq_entries {
332            return None; // SQ full
333        }
334        let idx = (self.sq_tail & self.sq_mask) as usize;
335        // The SQ `array` maps a ring slot to an SQE index (here 1:1).
336        // SAFETY: `idx < sq_entries` ensures we're inside `sq_array`.
337        unsafe { *self.sq_array.add(idx) = idx as u32 };
338        self.sq_tail = self.sq_tail.wrapping_add(1);
339        Some(idx)
340    }
341
342    /// Raw SQE table pointer — exposed for the `prep_*` helpers in
343    /// [`crate::prep`]. Returned slot `idx` must come from `reserve()`.
344    #[inline]
345    pub(crate) fn sqes_ptr(&mut self) -> *mut IoUringSqe {
346        self.sqes
347    }
348
349    /// Publish queued submissions and enter the kernel, optionally waiting for
350    /// `wait_nr` completions. Returns the number of SQEs consumed.
351    ///
352    /// **SQPOLL fast path**: when the ring was constructed via
353    /// [`Self::new_sqpoll`] and the SQ poll thread is awake
354    /// (`IORING_SQ_NEED_WAKEUP` clear) and the caller doesn't need to block
355    /// on completions (`wait_nr == 0`), we publish the tail and return
356    /// **without any syscall** — the kernel thread will reap submissions on
357    /// its next poll spin.
358    pub fn submit_and_wait(&mut self, wait_nr: u32) -> io::Result<u32> {
359        // SAFETY: `sq_ktail` is the kernel-published tail ptr.
360        let prev = unsafe { (*self.sq_ktail).load(Ordering::Relaxed) };
361        let to_submit = self.sq_tail.wrapping_sub(prev);
362        // SAFETY: publishing our local tail to the kernel-shared atomic.
363        unsafe { (*self.sq_ktail).store(self.sq_tail, Ordering::Release) };
364
365        // **E14 (replaces dropped E3).** H4 diagnostic showed 10.7 M
366        // io_uring_enter/s aggregate = ~12 wasted enters per actual op
367        // on the steady -c1 hot path. E3's naive "skip when to_submit==0
368        // && wait_nr==0" regressed because COOP_TASKRUN delays
369        // completion task_work until the next enter. E14 keeps a
370        // periodic forced enter: skip up to ENTER_SKIP_THRESHOLD empty
371        // iters in a row, then enter once to flush task_work. The skip
372        // path is gated on the non-SQPOLL case (SQPOLL has its own skip
373        // below) and on wait_nr == 0 (the caller doesn't need a
374        // completion to arrive).
375        //
376        // **A11 attempted (2026-06-20), REVERTED.** Tried adding
377        // `IORING_SETUP_TASKRUN_FLAG` (Linux 6.0+) so the kernel sets
378        // `IORING_SQ_TASKRUN` in sq_flags when task_work is pending,
379        // skipping the syscall whenever the bit was clear. On the lx64
380        // reference box this regressed -c1 GET by ~30% with multi-second
381        // 3.6k-rps stalls mid-test; the bit's set/clear timing under
382        // COOP_TASKRUN didn't match the busy-poll loop closely enough
383        // to remain race-free, and adding the E14 counter as a safety
384        // net on top still left a window where CQEs piled up between
385        // bit-clear observations. See bench/PERF-ATTACK-LOG-2026-06-20.md
386        // for the data.
387        if to_submit == 0 && wait_nr == 0 && self.sq_flags.is_none() {
388            self.iters_since_enter = self.iters_since_enter.saturating_add(1);
389            if self.iters_since_enter < ENTER_SKIP_THRESHOLD {
390                return Ok(0);
391            }
392            // Reached the threshold — fall through to the syscall path
393            // below so task_work flushes. Counter resets after syscall.
394        }
395
396        let mut enter_flags = if wait_nr > 0 { IORING_ENTER_GETEVENTS } else { 0 };
397        if let Some(sq_flags_ptr) = self.sq_flags {
398            // SAFETY: `sq_flags_ptr` lives inside the SQ mmap, valid for ring
399            // lifetime. Kernel writes IORING_SQ_NEED_WAKEUP on park; Acquire
400            // pairs with the kernel's Release on update.
401            let sq_flags = unsafe { (*sq_flags_ptr).load(Ordering::Acquire) };
402            if sq_flags & IORING_SQ_NEED_WAKEUP != 0 {
403                enter_flags |= IORING_ENTER_SQ_WAKEUP;
404            } else if wait_nr == 0 {
405                // SQ poll thread is awake and caller doesn't need to wait —
406                // skip the syscall entirely. This is the SQPOLL fast path.
407                return Ok(to_submit);
408            }
409        }
410        // E1.5: when the ring is self-registered (IORING_REGISTER_RING_FDS),
411        // pass the registered index instead of the raw fd. The kernel skips
412        // its per-syscall fget/fput on the ring.
413        let (syscall_fd, extra_flags) = match self.enter_ring {
414            Some((idx, flag)) => (idx as c_long, flag),
415            None => (self.ring_fd as c_long, 0),
416        };
417        enter_flags |= extra_flags;
418        // SAFETY: kernel-validated args; no Rust memory is read/written.
419        let ret = unsafe {
420            ffi::syscall(
421                SYS_IO_URING_ENTER,
422                syscall_fd,
423                to_submit as c_long,
424                wait_nr as c_long,
425                enter_flags as c_long,
426                ptr::null::<c_void>(),
427                0usize,
428            )
429        };
430        if ret < 0 {
431            return Err(io::Error::last_os_error());
432        }
433        // E14: real enter happened — counter resets.
434        self.iters_since_enter = 0;
435        Ok(ret as u32)
436    }
437
438    /// Reap every available completion, calling `f` for each; returns the count.
439    pub fn for_each_completion<F: FnMut(Completion)>(&mut self, mut f: F) -> u32 {
440        // SAFETY: cq_khead / cq_ktail are the kernel-shared cursors.
441        let mut head = unsafe { (*self.cq_khead).load(Ordering::Relaxed) };
442        let tail = unsafe { (*self.cq_ktail).load(Ordering::Acquire) };
443        let mut n = 0;
444        while head != tail {
445            let idx = (head & self.cq_mask) as usize;
446            // SAFETY: `idx < cq_entries` by mask; cqes points to that array.
447            let cqe = unsafe { *self.cqes.add(idx) };
448            f(cqe);
449            head = head.wrapping_add(1);
450            n += 1;
451        }
452        // SAFETY: publish the consumer head to the kernel.
453        unsafe { (*self.cq_khead).store(head, Ordering::Release) };
454        n
455    }
456
457
458}
459
460impl Drop for IoUring {
461    fn drop(&mut self) {
462        // SAFETY: each pointer is the matching `mmap` return; fd is ours.
463        unsafe {
464            ffi::munmap(self.sqes as *mut c_void, self.sqes_len);
465            ffi::munmap(self.cq_mmap, self.cq_mmap_len);
466            ffi::munmap(self.sq_mmap, self.sq_mmap_len);
467            ffi::close(self.ring_fd);
468        }
469    }
470}