Skip to main content

kevy_uring/
ring.rs

1//! The io_uring engine: `IoUring::new` sets up the kernel ring + maps the
2//! three shared regions; `prep_*` queues SQEs into the SQ; `submit_and_wait`
3//! enters the kernel; `for_each_completion` reaps completed CQEs.
4
5use core::ffi::{c_int, c_long, c_void};
6use core::ptr;
7use core::sync::atomic::{AtomicU32, Ordering};
8use std::io;
9
10use crate::completion::Completion;
11use crate::ffi::{
12    self, IORING_ENTER_GETEVENTS, IORING_OFF_CQ_RING, IORING_OFF_SQ_RING, IORING_OFF_SQES,
13    IORING_OP_ACCEPT, IORING_OP_NOP, IORING_OP_READ, IORING_OP_RECV, IORING_OP_WRITE,
14    IORING_RECV_MULTISHOT, IOSQE_BUFFER_SELECT, MAP_POPULATE, MAP_SHARED, PROT_READ, PROT_WRITE,
15    SOCK_CLOEXEC, SOCK_NONBLOCK, SYS_IO_URING_ENTER, SYS_IO_URING_SETUP,
16};
17use crate::layout::{IoUringParams, IoUringSqe};
18use crate::pbr::ProvidedBufRing;
19
20/// A Linux io_uring instance: one submission ring + one completion ring.
21pub struct IoUring {
22    ring_fd: c_int,
23    sq_mmap: *mut c_void,
24    sq_mmap_len: usize,
25    cq_mmap: *mut c_void,
26    cq_mmap_len: usize,
27    sqes: *mut IoUringSqe,
28    sqes_len: usize,
29    sq_entries: u32,
30    sq_mask: u32,
31    /// Local producer cursor; published to the kernel on `submit`.
32    sq_tail: u32,
33    sq_khead: *const AtomicU32,
34    sq_ktail: *const AtomicU32,
35    sq_array: *mut u32,
36    cq_mask: u32,
37    cq_khead: *const AtomicU32,
38    cq_ktail: *const AtomicU32,
39    cqes: *const Completion,
40}
41
42// SAFETY: `IoUring` owns its fd and mappings exclusively; moving the whole
43// engine to another thread (one per shard) is sound. It is not `Sync`
44// (single owner).
45unsafe impl Send for IoUring {}
46
47/// Cursors recovered from the SQ ring mapping.
48struct SqCursors {
49    khead: *const AtomicU32,
50    ktail: *const AtomicU32,
51    array: *mut u32,
52    mask: u32,
53    tail: u32,
54}
55
56/// Cursors recovered from the CQ ring mapping.
57struct CqCursors {
58    khead: *const AtomicU32,
59    ktail: *const AtomicU32,
60    cqes: *const Completion,
61    mask: u32,
62}
63
64impl IoUring {
65    /// Create a ring sized for at least `entries` in-flight submissions.
66    pub fn new(entries: u32) -> io::Result<IoUring> {
67        let (ring_fd, p) = Self::setup_ring(entries)?;
68        let (sq_len, cq_len, sqes_len) = Self::region_sizes(&p);
69        let (sq_mmap, cq_mmap, sqes_map) =
70            Self::map_three_regions(ring_fd, sq_len, cq_len, sqes_len)?;
71
72        // SAFETY: `sq_off` / `cq_off` were filled by the kernel for this ring;
73        // their byte offsets lie inside the just-mapped regions.
74        let sq = unsafe { Self::sq_cursors(sq_mmap, &p) };
75        let cq = unsafe { Self::cq_cursors(cq_mmap, &p) };
76
77        Ok(IoUring {
78            ring_fd,
79            sq_mmap,
80            sq_mmap_len: sq_len,
81            cq_mmap,
82            cq_mmap_len: cq_len,
83            sqes: sqes_map as *mut IoUringSqe,
84            sqes_len,
85            sq_entries: p.sq_entries,
86            sq_mask: sq.mask,
87            sq_tail: sq.tail,
88            sq_khead: sq.khead,
89            sq_ktail: sq.ktail,
90            sq_array: sq.array,
91            cq_mask: cq.mask,
92            cq_khead: cq.khead,
93            cq_ktail: cq.ktail,
94            cqes: cq.cqes,
95        })
96    }
97
98    /// `mmap` all three io_uring shared regions. On any failure, cleans up
99    /// the partial state (close fd, unmap what was already mapped) and
100    /// returns the original syscall error.
101    fn map_three_regions(
102        ring_fd: c_int,
103        sq_len: usize,
104        cq_len: usize,
105        sqes_len: usize,
106    ) -> io::Result<(*mut c_void, *mut c_void, *mut c_void)> {
107        let sq_mmap = Self::map_region(ring_fd, sq_len, IORING_OFF_SQ_RING).inspect_err(|_| {
108            // SAFETY: ring_fd came from setup; not yet observed elsewhere.
109            unsafe { ffi::close(ring_fd) };
110        })?;
111        let cq_mmap = Self::map_region(ring_fd, cq_len, IORING_OFF_CQ_RING).inspect_err(|_| {
112            // SAFETY: free what we mapped + close the fd.
113            unsafe {
114                ffi::munmap(sq_mmap, sq_len);
115                ffi::close(ring_fd);
116            }
117        })?;
118        let sqes_map = Self::map_region(ring_fd, sqes_len, IORING_OFF_SQES).inspect_err(|_| {
119            // SAFETY: free what we mapped + close the fd.
120            unsafe {
121                ffi::munmap(cq_mmap, cq_len);
122                ffi::munmap(sq_mmap, sq_len);
123                ffi::close(ring_fd);
124            }
125        })?;
126        Ok((sq_mmap, cq_mmap, sqes_map))
127    }
128
129    /// Issue `io_uring_setup` and return `(ring_fd, params)`.
130    fn setup_ring(entries: u32) -> io::Result<(c_int, IoUringParams)> {
131        let mut p = IoUringParams::default();
132        // SAFETY: `&mut p` lives across this call; kernel writes via the ptr.
133        let fd = unsafe {
134            ffi::syscall(
135                SYS_IO_URING_SETUP,
136                entries as c_long,
137                &mut p as *mut IoUringParams,
138            )
139        };
140        if fd < 0 {
141            return Err(io::Error::last_os_error());
142        }
143        Ok((fd as c_int, p))
144    }
145
146    /// Compute the three mapping lengths the kernel needs us to map.
147    fn region_sizes(p: &IoUringParams) -> (usize, usize, usize) {
148        let sq_len = (p.sq_off.array as usize) + (p.sq_entries as usize) * 4;
149        let cq_len =
150            (p.cq_off.cqes as usize) + (p.cq_entries as usize) * core::mem::size_of::<Completion>();
151        let sqes_len = (p.sq_entries as usize) * core::mem::size_of::<IoUringSqe>();
152        (sq_len, cq_len, sqes_len)
153    }
154
155    /// `mmap` one of the three io_uring regions (`MAP_SHARED | MAP_POPULATE`).
156    fn map_region(ring_fd: c_int, len: usize, off: i64) -> io::Result<*mut c_void> {
157        // SAFETY: kernel-validated `len`/`off`/`ring_fd`; null hint lets the
158        // kernel pick the address. Returns -1 on failure.
159        let m = unsafe {
160            ffi::mmap(
161                ptr::null_mut(),
162                len,
163                PROT_READ | PROT_WRITE,
164                MAP_SHARED | MAP_POPULATE,
165                ring_fd,
166                off,
167            )
168        };
169        if m as isize == -1 {
170            return Err(io::Error::last_os_error());
171        }
172        Ok(m)
173    }
174
175    /// Extract the SQ cursors from a just-mapped SQ region.
176    ///
177    /// # Safety
178    /// `sq_mmap` must point to a region of at least
179    /// `p.sq_off.array + p.sq_entries * 4` bytes, and the kernel must have
180    /// filled `p.sq_off` for this ring.
181    unsafe fn sq_cursors(sq_mmap: *mut c_void, p: &IoUringParams) -> SqCursors {
182        let base = sq_mmap as usize;
183        let at = |off: u32| (base + off as usize) as *const AtomicU32;
184        let khead = at(p.sq_off.head);
185        let ktail = at(p.sq_off.tail);
186        let array = (base + p.sq_off.array as usize) as *mut u32;
187        // SAFETY: caller's invariant says `ring_mask` is inside the region.
188        let mask = unsafe { *((base + p.sq_off.ring_mask as usize) as *const u32) };
189        // SAFETY: ktail is published by the kernel; reading current tail at
190        // construction lets us start the local cursor in sync.
191        let tail = unsafe { (*ktail).load(Ordering::Acquire) };
192        SqCursors { khead, ktail, array, mask, tail }
193    }
194
195    /// Extract the CQ cursors from a just-mapped CQ region.
196    ///
197    /// # Safety
198    /// `cq_mmap` must point to a region of at least
199    /// `p.cq_off.cqes + p.cq_entries * sizeof(Completion)` bytes.
200    unsafe fn cq_cursors(cq_mmap: *mut c_void, p: &IoUringParams) -> CqCursors {
201        let base = cq_mmap as usize;
202        let at = |off: u32| (base + off as usize) as *const AtomicU32;
203        let khead = at(p.cq_off.head);
204        let ktail = at(p.cq_off.tail);
205        let cqes = (base + p.cq_off.cqes as usize) as *const Completion;
206        // SAFETY: caller's invariant says `ring_mask` is inside the region.
207        let mask = unsafe { *((base + p.cq_off.ring_mask as usize) as *const u32) };
208        CqCursors { khead, ktail, cqes, mask }
209    }
210
211    /// Reserve the next SQ slot (advancing the producer cursor + array map);
212    /// returns its SQE index, or `None` if the submission queue is full.
213    fn reserve(&mut self) -> Option<usize> {
214        // SAFETY: `sq_khead` is the kernel-published head ptr.
215        let khead = unsafe { (*self.sq_khead).load(Ordering::Acquire) };
216        if self.sq_tail.wrapping_sub(khead) >= self.sq_entries {
217            return None; // SQ full
218        }
219        let idx = (self.sq_tail & self.sq_mask) as usize;
220        // The SQ `array` maps a ring slot to an SQE index (here 1:1).
221        // SAFETY: `idx < sq_entries` ensures we're inside `sq_array`.
222        unsafe { *self.sq_array.add(idx) = idx as u32 };
223        self.sq_tail = self.sq_tail.wrapping_add(1);
224        Some(idx)
225    }
226
227    /// Queue a `read(fd)` of `len` bytes into `buf`, tagged with `user_data`.
228    /// Returns `false` if the SQ is full.
229    ///
230    /// # Safety
231    /// `buf` must point to `len` writable bytes and stay valid until the matching
232    /// completion is reaped.
233    pub unsafe fn prep_read(&mut self, fd: i32, buf: *mut u8, len: u32, user_data: u64) -> bool {
234        let Some(idx) = self.reserve() else {
235            return false;
236        };
237        // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
238        unsafe {
239            ptr::write(
240                self.sqes.add(idx),
241                IoUringSqe::new(IORING_OP_READ, fd, buf as u64, len, user_data),
242            );
243        }
244        true
245    }
246
247    /// Queue a `write(fd)` of `len` bytes from `buf`, tagged with `user_data`.
248    /// Returns `false` if the SQ is full.
249    ///
250    /// # Safety
251    /// `buf` must point to `len` readable bytes and stay valid until the matching
252    /// completion is reaped.
253    pub unsafe fn prep_write(&mut self, fd: i32, buf: *const u8, len: u32, user_data: u64) -> bool {
254        let Some(idx) = self.reserve() else {
255            return false;
256        };
257        // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
258        unsafe {
259            ptr::write(
260                self.sqes.add(idx),
261                IoUringSqe::new(IORING_OP_WRITE, fd, buf as u64, len, user_data),
262            );
263        }
264        true
265    }
266
267    /// Queue a **multishot** `recv(fd)` that draws its destination buffer from
268    /// the provided-buffer group `bgid` (see [`IoUring::register_buf_ring`]): one
269    /// SQE re-fires a completion per arrival, the kernel picking + reporting a
270    /// buffer id each time, until it terminates (error / `ENOBUFS`, signalled by
271    /// [`Completion::has_more`] returning `false`). No per-recv SQE, no read
272    /// buffer to keep alive. Returns `false` if the SQ is full.
273    pub fn prep_recv_multishot(&mut self, fd: i32, bgid: u16, user_data: u64) -> bool {
274        let Some(idx) = self.reserve() else {
275            return false;
276        };
277        // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
278        unsafe {
279            let sqe = self.sqes.add(idx);
280            // addr/len 0: the buffer comes from the group, not from us.
281            ptr::write(sqe, IoUringSqe::new(IORING_OP_RECV, fd, 0, 0, user_data));
282            (*sqe).ioprio = IORING_RECV_MULTISHOT;
283            (*sqe).flags = IOSQE_BUFFER_SELECT;
284            // `buf_index` aliases `buf_group` in the kernel ABI.
285            (*sqe).buf_index = bgid;
286        }
287        true
288    }
289
290    /// Queue an `accept` on `listen_fd`; the accepted fd arrives as the
291    /// completion's `res` (already `O_NONBLOCK | O_CLOEXEC`). Returns `false` if
292    /// the SQ is full.
293    pub fn prep_accept(&mut self, listen_fd: i32, user_data: u64) -> bool {
294        let Some(idx) = self.reserve() else {
295            return false;
296        };
297        // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
298        unsafe {
299            let sqe = self.sqes.add(idx);
300            ptr::write(
301                sqe,
302                IoUringSqe::new(IORING_OP_ACCEPT, listen_fd, 0, 0, user_data),
303            );
304            (*sqe).rw_flags = SOCK_NONBLOCK | SOCK_CLOEXEC;
305        }
306        true
307    }
308
309    /// Queue a no-op tagged with `user_data` (used to prove the round-trip).
310    /// Returns `false` if the SQ is full.
311    pub fn prep_nop(&mut self, user_data: u64) -> bool {
312        let Some(idx) = self.reserve() else {
313            return false;
314        };
315        // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
316        unsafe {
317            ptr::write(
318                self.sqes.add(idx),
319                IoUringSqe::new(IORING_OP_NOP, -1, 0, 0, user_data),
320            );
321        }
322        true
323    }
324
325    /// Publish queued submissions and enter the kernel, optionally waiting for
326    /// `wait_nr` completions. Returns the number of SQEs consumed.
327    pub fn submit_and_wait(&mut self, wait_nr: u32) -> io::Result<u32> {
328        // SAFETY: `sq_ktail` is the kernel-published tail ptr.
329        let prev = unsafe { (*self.sq_ktail).load(Ordering::Relaxed) };
330        let to_submit = self.sq_tail.wrapping_sub(prev);
331        // SAFETY: publishing our local tail to the kernel-shared atomic.
332        unsafe { (*self.sq_ktail).store(self.sq_tail, Ordering::Release) };
333        let flags = if wait_nr > 0 { IORING_ENTER_GETEVENTS } else { 0 };
334        // SAFETY: kernel-validated args; no Rust memory is read/written.
335        let ret = unsafe {
336            ffi::syscall(
337                SYS_IO_URING_ENTER,
338                self.ring_fd as c_long,
339                to_submit as c_long,
340                wait_nr as c_long,
341                flags as c_long,
342                ptr::null::<c_void>(),
343                0usize,
344            )
345        };
346        if ret < 0 {
347            return Err(io::Error::last_os_error());
348        }
349        Ok(ret as u32)
350    }
351
352    /// Reap every available completion, calling `f` for each; returns the count.
353    pub fn for_each_completion<F: FnMut(Completion)>(&mut self, mut f: F) -> u32 {
354        // SAFETY: cq_khead / cq_ktail are the kernel-shared cursors.
355        let mut head = unsafe { (*self.cq_khead).load(Ordering::Relaxed) };
356        let tail = unsafe { (*self.cq_ktail).load(Ordering::Acquire) };
357        let mut n = 0;
358        while head != tail {
359            let idx = (head & self.cq_mask) as usize;
360            // SAFETY: `idx < cq_entries` by mask; cqes points to that array.
361            let cqe = unsafe { *self.cqes.add(idx) };
362            f(cqe);
363            head = head.wrapping_add(1);
364            n += 1;
365        }
366        // SAFETY: publish the consumer head to the kernel.
367        unsafe { (*self.cq_khead).store(head, Ordering::Release) };
368        n
369    }
370
371    /// Register a **provided-buffer ring** of `entries` (power of two) buffers of
372    /// `buf_size` bytes each under group id `bgid`, for multishot
373    /// [`prep_recv_multishot`](Self::prep_recv_multishot). The kernel draws a
374    /// buffer per arrival and reports its id; the application recycles it via
375    /// [`ProvidedBufRing::recycle`]. The registration is auto-released when the
376    /// ring fd closes; the returned handle also unregisters + unmaps on drop.
377    pub fn register_buf_ring(
378        &self,
379        entries: u16,
380        buf_size: u32,
381        bgid: u16,
382    ) -> io::Result<ProvidedBufRing> {
383        ProvidedBufRing::new(self.ring_fd, entries, buf_size, bgid)
384    }
385}
386
387impl Drop for IoUring {
388    fn drop(&mut self) {
389        // SAFETY: each pointer is the matching `mmap` return; fd is ours.
390        unsafe {
391            ffi::munmap(self.sqes as *mut c_void, self.sqes_len);
392            ffi::munmap(self.cq_mmap, self.cq_mmap_len);
393            ffi::munmap(self.sq_mmap, self.sq_mmap_len);
394            ffi::close(self.ring_fd);
395        }
396    }
397}