Skip to main content

kevy_uring/
ring.rs

1//! The io_uring engine: `IoUring::new` sets up the kernel ring + maps the
2//! three shared regions; `prep_*` queues SQEs into the SQ; `submit_and_wait`
3//! enters the kernel; `for_each_completion` reaps completed CQEs.
4
5use core::ffi::{c_int, c_long, c_void};
6use core::ptr;
7use core::sync::atomic::{AtomicU32, Ordering};
8use std::io;
9
10use crate::completion::Completion;
11use crate::ffi::{
12    self, IORING_ENTER_GETEVENTS, IORING_ENTER_SQ_WAKEUP, IORING_OFF_CQ_RING, IORING_OFF_SQ_RING,
13    IORING_OFF_SQES, IORING_SETUP_COOP_TASKRUN, IORING_SETUP_SINGLE_ISSUER, IORING_SETUP_SQ_AFF,
14    IORING_SETUP_SQPOLL, IORING_SQ_NEED_WAKEUP, MAP_POPULATE, MAP_SHARED, PROT_READ, PROT_WRITE,
15    SYS_IO_URING_ENTER, SYS_IO_URING_SETUP,
16};
17use crate::layout::{IoUringParams, IoUringSqe};
18
19/// A Linux io_uring instance: one submission ring + one completion ring.
20pub struct IoUring {
21    pub(crate) ring_fd: c_int,
22    sq_mmap: *mut c_void,
23    sq_mmap_len: usize,
24    cq_mmap: *mut c_void,
25    cq_mmap_len: usize,
26    sqes: *mut IoUringSqe,
27    sqes_len: usize,
28    sq_entries: u32,
29    sq_mask: u32,
30    /// Local producer cursor; published to the kernel on `submit`.
31    sq_tail: u32,
32    sq_khead: *const AtomicU32,
33    sq_ktail: *const AtomicU32,
34    sq_array: *mut u32,
35    cq_mask: u32,
36    cq_khead: *const AtomicU32,
37    cq_ktail: *const AtomicU32,
38    cqes: *const Completion,
39    /// `*const AtomicU32` to the shared SQ flag word, **only** populated when
40    /// the ring was set up with `IORING_SETUP_SQPOLL`. `None` => classic mode,
41    /// always call `io_uring_enter` to submit; `Some` => check
42    /// `IORING_SQ_NEED_WAKEUP` first and skip the syscall when the SQ poll
43    /// thread is awake.
44    sq_flags: Option<*const AtomicU32>,
45    /// `(index, enter_flag)` for a successful registered-ring-fd setup. When
46    /// `Some((i, _))`, `submit_and_wait` passes `i` as the syscall fd and
47    /// ORs `IORING_ENTER_REGISTERED_RING` into the enter flags — the kernel
48    /// resolves the ring via the registered-rings table, skipping
49    /// `fget`/`fput` per syscall. `None` = raw `ring_fd` path.
50    pub(crate) enter_ring: Option<(u32, u32)>,
51}
52
53// SAFETY: `IoUring` owns its fd and mappings exclusively; moving the whole
54// engine to another thread (one per shard) is sound. It is not `Sync`
55// (single owner).
56unsafe impl Send for IoUring {}
57
58/// Cursors recovered from the SQ ring mapping.
59struct SqCursors {
60    khead: *const AtomicU32,
61    ktail: *const AtomicU32,
62    array: *mut u32,
63    mask: u32,
64    tail: u32,
65    /// SQ flag word — `IORING_SQ_NEED_WAKEUP` lives here under SQPOLL.
66    flags: *const AtomicU32,
67}
68
69/// Cursors recovered from the CQ ring mapping.
70struct CqCursors {
71    khead: *const AtomicU32,
72    ktail: *const AtomicU32,
73    cqes: *const Completion,
74    mask: u32,
75}
76
77impl IoUring {
78    /// Create a ring sized for at least `entries` in-flight submissions.
79    pub fn new(entries: u32) -> io::Result<IoUring> {
80        Self::new_inner(entries, None)
81    }
82
83    /// Create a ring backed by a kernel-side **submission poll thread**
84    /// (`IORING_SETUP_SQPOLL`). Submissions are reaped without an
85    /// `io_uring_enter` syscall on the steady state; when the SQ poll
86    /// thread parks (after `idle_ms` ms with no work), userland wakes it
87    /// via [`Self::submit_and_wait`]'s SQ_WAKEUP path.
88    ///
89    /// `cpu = Some(c)` pins the kernel thread to CPU `c` via
90    /// `IORING_SETUP_SQ_AFF`. Costs 1 core at ~100% whenever traffic
91    /// flows; requires Linux 5.13+ (the version that dropped CAP_SYS_NICE
92    /// for SQPOLL).
93    ///
94    /// **Not suitable for kevy's per-shard reactor.** Each ring spawns
95    /// one kernel poll thread; in kevy's shared-nothing layout N shards
96    /// would spawn N poll threads, each contending for the same cores
97    /// as the shard threads (measured 2–15× throughput regression on
98    /// the lx64 reference box, 10 shards on 16 cores — see
99    /// `bench/PERF-ATTACK-LOG-2026-06-20.md` attack D5). Reserved for
100    /// callers with a single-threaded reactor and an unallocated core
101    /// budget for the kernel poll thread.
102    pub fn new_sqpoll(entries: u32, idle_ms: u32, cpu: Option<u32>) -> io::Result<IoUring> {
103        Self::new_inner(entries, Some((idle_ms, cpu)))
104    }
105
106    fn new_inner(entries: u32, sqpoll: Option<(u32, Option<u32>)>) -> io::Result<IoUring> {
107        let (ring_fd, p) = Self::setup_ring(entries, sqpoll)?;
108        let (sq_len, cq_len, sqes_len) = Self::region_sizes(&p);
109        let (sq_mmap, cq_mmap, sqes_map) =
110            Self::map_three_regions(ring_fd, sq_len, cq_len, sqes_len)?;
111
112        // SAFETY: `sq_off` / `cq_off` were filled by the kernel for this ring;
113        // their byte offsets lie inside the just-mapped regions.
114        let sq = unsafe { Self::sq_cursors(sq_mmap, &p) };
115        let cq = unsafe { Self::cq_cursors(cq_mmap, &p) };
116        let sq_flags = if sqpoll.is_some() { Some(sq.flags) } else { None };
117
118        let mut ring = IoUring {
119            ring_fd,
120            sq_mmap,
121            sq_mmap_len: sq_len,
122            cq_mmap,
123            cq_mmap_len: cq_len,
124            sqes: sqes_map as *mut IoUringSqe,
125            sqes_len,
126            sq_entries: p.sq_entries,
127            sq_mask: sq.mask,
128            sq_tail: sq.tail,
129            sq_khead: sq.khead,
130            sq_ktail: sq.ktail,
131            sq_array: sq.array,
132            cq_mask: cq.mask,
133            cq_khead: cq.khead,
134            cq_ktail: cq.ktail,
135            cqes: cq.cqes,
136            sq_flags,
137            enter_ring: None,
138        };
139        // Best-effort: register the ring's own fd into the calling thread's
140        // io_uring registered-rings table (Linux 5.18+). On success, subsequent
141        // `submit_and_wait` syscalls reference the ring by index and the
142        // kernel skips fget/fput on the ring fd per syscall. On older kernels
143        // this fails with EINVAL — the raw fd path stays in use.
144        ring.try_register_ring_fd();
145        Ok(ring)
146    }
147
148    /// `mmap` all three io_uring shared regions. On any failure, cleans up
149    /// the partial state (close fd, unmap what was already mapped) and
150    /// returns the original syscall error.
151    fn map_three_regions(
152        ring_fd: c_int,
153        sq_len: usize,
154        cq_len: usize,
155        sqes_len: usize,
156    ) -> io::Result<(*mut c_void, *mut c_void, *mut c_void)> {
157        let sq_mmap = Self::map_region(ring_fd, sq_len, IORING_OFF_SQ_RING).inspect_err(|_| {
158            // SAFETY: ring_fd came from setup; not yet observed elsewhere.
159            unsafe { ffi::close(ring_fd) };
160        })?;
161        let cq_mmap = Self::map_region(ring_fd, cq_len, IORING_OFF_CQ_RING).inspect_err(|_| {
162            // SAFETY: free what we mapped + close the fd.
163            unsafe {
164                ffi::munmap(sq_mmap, sq_len);
165                ffi::close(ring_fd);
166            }
167        })?;
168        let sqes_map = Self::map_region(ring_fd, sqes_len, IORING_OFF_SQES).inspect_err(|_| {
169            // SAFETY: free what we mapped + close the fd.
170            unsafe {
171                ffi::munmap(cq_mmap, cq_len);
172                ffi::munmap(sq_mmap, sq_len);
173                ffi::close(ring_fd);
174            }
175        })?;
176        Ok((sq_mmap, cq_mmap, sqes_map))
177    }
178
179    /// Issue `io_uring_setup` and return `(ring_fd, params)`. When `sqpoll`
180    /// is `Some((idle_ms, cpu))`, configures the kernel-side SQ poll thread.
181    ///
182    /// For the non-SQPOLL path (the default kevy reactor) tries
183    /// `IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_COOP_TASKRUN` first
184    /// (Linux 6.0+, +3–5% measured on the lx64 reference) and falls back
185    /// to a plain setup if the kernel rejects them (EINVAL). The fallback
186    /// keeps Linux 5.13+ supported with no hard version check.
187    ///
188    /// **Not enabled**: `IORING_SETUP_DEFER_TASKRUN` (Linux 6.1+) — it
189    /// changes the CQ ring semantics so completions only land after
190    /// `io_uring_enter` is called. kevy's reactor busy-polls the CQ ring
191    /// directly without entering the kernel on the steady state, so
192    /// DEFER_TASKRUN starves completions (measured 65–73% regression in
193    /// the E2 isolation, see `bench/PERF-ATTACK-LOG-2026-06-20.md`).
194    fn setup_ring(
195        entries: u32,
196        sqpoll: Option<(u32, Option<u32>)>,
197    ) -> io::Result<(c_int, IoUringParams)> {
198        // SQPOLL is mutually exclusive with the cooperative flags
199        // (the SQ poll kernel thread is the one running task_work, not the
200        // user thread). Otherwise prefer the strongest set the kernel
201        // accepts; fall back on EINVAL by dropping flags level by level.
202        let sqpoll_flags: u32 = match sqpoll {
203            Some(_) => IORING_SETUP_SQPOLL,
204            None => 0,
205        };
206        let modern_flag_tiers: &[u32] = if sqpoll.is_some() {
207            &[0]
208        } else {
209            &[IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_COOP_TASKRUN, 0]
210        };
211
212        for &modern in modern_flag_tiers {
213            let mut p = IoUringParams::default();
214            p.flags = sqpoll_flags | modern;
215            if let Some((idle_ms, cpu)) = sqpoll {
216                p.sq_thread_idle = idle_ms;
217                if let Some(c) = cpu {
218                    p.flags |= IORING_SETUP_SQ_AFF;
219                    p.sq_thread_cpu = c;
220                }
221            }
222            // SAFETY: `&mut p` lives across this call; kernel writes via ptr.
223            let fd = unsafe {
224                ffi::syscall(
225                    SYS_IO_URING_SETUP,
226                    entries as c_long,
227                    &mut p as *mut IoUringParams,
228                )
229            };
230            if fd >= 0 {
231                return Ok((fd as c_int, p));
232            }
233            let err = io::Error::last_os_error();
234            // EINVAL = kernel doesn't recognise these flags. Try next tier.
235            if err.raw_os_error() != Some(22) {
236                return Err(err);
237            }
238        }
239        Err(io::Error::last_os_error())
240    }
241
242    /// Compute the three mapping lengths the kernel needs us to map.
243    fn region_sizes(p: &IoUringParams) -> (usize, usize, usize) {
244        let sq_len = (p.sq_off.array as usize) + (p.sq_entries as usize) * 4;
245        let cq_len =
246            (p.cq_off.cqes as usize) + (p.cq_entries as usize) * core::mem::size_of::<Completion>();
247        let sqes_len = (p.sq_entries as usize) * core::mem::size_of::<IoUringSqe>();
248        (sq_len, cq_len, sqes_len)
249    }
250
251    /// `mmap` one of the three io_uring regions (`MAP_SHARED | MAP_POPULATE`).
252    fn map_region(ring_fd: c_int, len: usize, off: i64) -> io::Result<*mut c_void> {
253        // SAFETY: kernel-validated `len`/`off`/`ring_fd`; null hint lets the
254        // kernel pick the address. Returns -1 on failure.
255        let m = unsafe {
256            ffi::mmap(
257                ptr::null_mut(),
258                len,
259                PROT_READ | PROT_WRITE,
260                MAP_SHARED | MAP_POPULATE,
261                ring_fd,
262                off,
263            )
264        };
265        if m as isize == -1 {
266            return Err(io::Error::last_os_error());
267        }
268        Ok(m)
269    }
270
271    /// Extract the SQ cursors from a just-mapped SQ region.
272    ///
273    /// # Safety
274    /// `sq_mmap` must point to a region of at least
275    /// `p.sq_off.array + p.sq_entries * 4` bytes, and the kernel must have
276    /// filled `p.sq_off` for this ring.
277    unsafe fn sq_cursors(sq_mmap: *mut c_void, p: &IoUringParams) -> SqCursors {
278        let base = sq_mmap as usize;
279        let at = |off: u32| (base + off as usize) as *const AtomicU32;
280        let khead = at(p.sq_off.head);
281        let ktail = at(p.sq_off.tail);
282        let flags = at(p.sq_off.flags);
283        let array = (base + p.sq_off.array as usize) as *mut u32;
284        // SAFETY: caller's invariant says `ring_mask` is inside the region.
285        let mask = unsafe { *((base + p.sq_off.ring_mask as usize) as *const u32) };
286        // SAFETY: ktail is published by the kernel; reading current tail at
287        // construction lets us start the local cursor in sync.
288        let tail = unsafe { (*ktail).load(Ordering::Acquire) };
289        SqCursors { khead, ktail, array, mask, tail, flags }
290    }
291
292    /// Extract the CQ cursors from a just-mapped CQ region.
293    ///
294    /// # Safety
295    /// `cq_mmap` must point to a region of at least
296    /// `p.cq_off.cqes + p.cq_entries * sizeof(Completion)` bytes.
297    unsafe fn cq_cursors(cq_mmap: *mut c_void, p: &IoUringParams) -> CqCursors {
298        let base = cq_mmap as usize;
299        let at = |off: u32| (base + off as usize) as *const AtomicU32;
300        let khead = at(p.cq_off.head);
301        let ktail = at(p.cq_off.tail);
302        let cqes = (base + p.cq_off.cqes as usize) as *const Completion;
303        // SAFETY: caller's invariant says `ring_mask` is inside the region.
304        let mask = unsafe { *((base + p.cq_off.ring_mask as usize) as *const u32) };
305        CqCursors { khead, ktail, cqes, mask }
306    }
307
308    /// Reserve the next SQ slot (advancing the producer cursor + array map);
309    /// returns its SQE index, or `None` if the submission queue is full.
310    /// Called from the `prep_*` helpers in [`crate::prep`].
311    pub(crate) fn reserve(&mut self) -> Option<usize> {
312        // SAFETY: `sq_khead` is the kernel-published head ptr.
313        let khead = unsafe { (*self.sq_khead).load(Ordering::Acquire) };
314        if self.sq_tail.wrapping_sub(khead) >= self.sq_entries {
315            return None; // SQ full
316        }
317        let idx = (self.sq_tail & self.sq_mask) as usize;
318        // The SQ `array` maps a ring slot to an SQE index (here 1:1).
319        // SAFETY: `idx < sq_entries` ensures we're inside `sq_array`.
320        unsafe { *self.sq_array.add(idx) = idx as u32 };
321        self.sq_tail = self.sq_tail.wrapping_add(1);
322        Some(idx)
323    }
324
325    /// Raw SQE table pointer — exposed for the `prep_*` helpers in
326    /// [`crate::prep`]. Returned slot `idx` must come from `reserve()`.
327    #[inline]
328    pub(crate) fn sqes_ptr(&mut self) -> *mut IoUringSqe {
329        self.sqes
330    }
331
332    /// Publish queued submissions and enter the kernel, optionally waiting for
333    /// `wait_nr` completions. Returns the number of SQEs consumed.
334    ///
335    /// **SQPOLL fast path**: when the ring was constructed via
336    /// [`Self::new_sqpoll`] and the SQ poll thread is awake
337    /// (`IORING_SQ_NEED_WAKEUP` clear) and the caller doesn't need to block
338    /// on completions (`wait_nr == 0`), we publish the tail and return
339    /// **without any syscall** — the kernel thread will reap submissions on
340    /// its next poll spin.
341    pub fn submit_and_wait(&mut self, wait_nr: u32) -> io::Result<u32> {
342        // SAFETY: `sq_ktail` is the kernel-published tail ptr.
343        let prev = unsafe { (*self.sq_ktail).load(Ordering::Relaxed) };
344        let to_submit = self.sq_tail.wrapping_sub(prev);
345        // SAFETY: publishing our local tail to the kernel-shared atomic.
346        unsafe { (*self.sq_ktail).store(self.sq_tail, Ordering::Release) };
347
348        // **E3 — DROPPED** (measured 16-25% regression on lx64). Skipping
349        // io_uring_enter on `to_submit == 0 && wait_nr == 0` conflicts with
350        // the `IORING_SETUP_COOP_TASKRUN` flag enabled in E2 — the kernel
351        // cooperative model needs userland to enter periodically so
352        // task_work runs and CQEs flush. Detail in
353        // `bench/PERF-ATTACK-LOG-2026-06-20.md` attack E3.
354
355        let mut enter_flags = if wait_nr > 0 { IORING_ENTER_GETEVENTS } else { 0 };
356        if let Some(sq_flags_ptr) = self.sq_flags {
357            // SAFETY: `sq_flags_ptr` lives inside the SQ mmap, valid for ring
358            // lifetime. Kernel writes IORING_SQ_NEED_WAKEUP on park; Acquire
359            // pairs with the kernel's Release on update.
360            let sq_flags = unsafe { (*sq_flags_ptr).load(Ordering::Acquire) };
361            if sq_flags & IORING_SQ_NEED_WAKEUP != 0 {
362                enter_flags |= IORING_ENTER_SQ_WAKEUP;
363            } else if wait_nr == 0 {
364                // SQ poll thread is awake and caller doesn't need to wait —
365                // skip the syscall entirely. This is the SQPOLL fast path.
366                return Ok(to_submit);
367            }
368        }
369        // E1.5: when the ring is self-registered (IORING_REGISTER_RING_FDS),
370        // pass the registered index instead of the raw fd. The kernel skips
371        // its per-syscall fget/fput on the ring.
372        let (syscall_fd, extra_flags) = match self.enter_ring {
373            Some((idx, flag)) => (idx as c_long, flag),
374            None => (self.ring_fd as c_long, 0),
375        };
376        enter_flags |= extra_flags;
377        // SAFETY: kernel-validated args; no Rust memory is read/written.
378        let ret = unsafe {
379            ffi::syscall(
380                SYS_IO_URING_ENTER,
381                syscall_fd,
382                to_submit as c_long,
383                wait_nr as c_long,
384                enter_flags as c_long,
385                ptr::null::<c_void>(),
386                0usize,
387            )
388        };
389        if ret < 0 {
390            return Err(io::Error::last_os_error());
391        }
392        Ok(ret as u32)
393    }
394
395    /// Reap every available completion, calling `f` for each; returns the count.
396    pub fn for_each_completion<F: FnMut(Completion)>(&mut self, mut f: F) -> u32 {
397        // SAFETY: cq_khead / cq_ktail are the kernel-shared cursors.
398        let mut head = unsafe { (*self.cq_khead).load(Ordering::Relaxed) };
399        let tail = unsafe { (*self.cq_ktail).load(Ordering::Acquire) };
400        let mut n = 0;
401        while head != tail {
402            let idx = (head & self.cq_mask) as usize;
403            // SAFETY: `idx < cq_entries` by mask; cqes points to that array.
404            let cqe = unsafe { *self.cqes.add(idx) };
405            f(cqe);
406            head = head.wrapping_add(1);
407            n += 1;
408        }
409        // SAFETY: publish the consumer head to the kernel.
410        unsafe { (*self.cq_khead).store(head, Ordering::Release) };
411        n
412    }
413
414
415}
416
417impl Drop for IoUring {
418    fn drop(&mut self) {
419        // SAFETY: each pointer is the matching `mmap` return; fd is ours.
420        unsafe {
421            ffi::munmap(self.sqes as *mut c_void, self.sqes_len);
422            ffi::munmap(self.cq_mmap, self.cq_mmap_len);
423            ffi::munmap(self.sq_mmap, self.sq_mmap_len);
424            ffi::close(self.ring_fd);
425        }
426    }
427}