Skip to main content

kevy_uring/
ring.rs

1//! The io_uring engine: `IoUring::new` sets up the kernel ring + maps the
2//! three shared regions; `prep_*` queues SQEs into the SQ; `submit_and_wait`
3//! enters the kernel; `for_each_completion` reaps completed CQEs.
4
5use core::ffi::{c_int, c_long, c_void};
6use core::ptr;
7use core::sync::atomic::{AtomicU32, Ordering};
8use std::io;
9
10use crate::completion::Completion;
11use crate::ffi::{
12    self, IORING_ENTER_GETEVENTS, IORING_OFF_CQ_RING, IORING_OFF_SQ_RING, IORING_OFF_SQES,
13    IORING_OP_ACCEPT, IORING_OP_NOP, IORING_OP_READ, IORING_OP_RECV, IORING_OP_TIMEOUT,
14    IORING_OP_WRITE,
15    IORING_RECV_MULTISHOT, IOSQE_BUFFER_SELECT, MAP_POPULATE, MAP_SHARED, PROT_READ, PROT_WRITE,
16    SOCK_CLOEXEC, SOCK_NONBLOCK, SYS_IO_URING_ENTER, SYS_IO_URING_SETUP,
17};
18use crate::layout::{IoUringParams, IoUringSqe, KernelTimespec};
19use crate::pbr::ProvidedBufRing;
20
21/// A Linux io_uring instance: one submission ring + one completion ring.
22pub struct IoUring {
23    ring_fd: c_int,
24    sq_mmap: *mut c_void,
25    sq_mmap_len: usize,
26    cq_mmap: *mut c_void,
27    cq_mmap_len: usize,
28    sqes: *mut IoUringSqe,
29    sqes_len: usize,
30    sq_entries: u32,
31    sq_mask: u32,
32    /// Local producer cursor; published to the kernel on `submit`.
33    sq_tail: u32,
34    sq_khead: *const AtomicU32,
35    sq_ktail: *const AtomicU32,
36    sq_array: *mut u32,
37    cq_mask: u32,
38    cq_khead: *const AtomicU32,
39    cq_ktail: *const AtomicU32,
40    cqes: *const Completion,
41}
42
43// SAFETY: `IoUring` owns its fd and mappings exclusively; moving the whole
44// engine to another thread (one per shard) is sound. It is not `Sync`
45// (single owner).
46unsafe impl Send for IoUring {}
47
48/// Cursors recovered from the SQ ring mapping.
49struct SqCursors {
50    khead: *const AtomicU32,
51    ktail: *const AtomicU32,
52    array: *mut u32,
53    mask: u32,
54    tail: u32,
55}
56
57/// Cursors recovered from the CQ ring mapping.
58struct CqCursors {
59    khead: *const AtomicU32,
60    ktail: *const AtomicU32,
61    cqes: *const Completion,
62    mask: u32,
63}
64
65impl IoUring {
66    /// Create a ring sized for at least `entries` in-flight submissions.
67    pub fn new(entries: u32) -> io::Result<IoUring> {
68        let (ring_fd, p) = Self::setup_ring(entries)?;
69        let (sq_len, cq_len, sqes_len) = Self::region_sizes(&p);
70        let (sq_mmap, cq_mmap, sqes_map) =
71            Self::map_three_regions(ring_fd, sq_len, cq_len, sqes_len)?;
72
73        // SAFETY: `sq_off` / `cq_off` were filled by the kernel for this ring;
74        // their byte offsets lie inside the just-mapped regions.
75        let sq = unsafe { Self::sq_cursors(sq_mmap, &p) };
76        let cq = unsafe { Self::cq_cursors(cq_mmap, &p) };
77
78        Ok(IoUring {
79            ring_fd,
80            sq_mmap,
81            sq_mmap_len: sq_len,
82            cq_mmap,
83            cq_mmap_len: cq_len,
84            sqes: sqes_map as *mut IoUringSqe,
85            sqes_len,
86            sq_entries: p.sq_entries,
87            sq_mask: sq.mask,
88            sq_tail: sq.tail,
89            sq_khead: sq.khead,
90            sq_ktail: sq.ktail,
91            sq_array: sq.array,
92            cq_mask: cq.mask,
93            cq_khead: cq.khead,
94            cq_ktail: cq.ktail,
95            cqes: cq.cqes,
96        })
97    }
98
99    /// `mmap` all three io_uring shared regions. On any failure, cleans up
100    /// the partial state (close fd, unmap what was already mapped) and
101    /// returns the original syscall error.
102    fn map_three_regions(
103        ring_fd: c_int,
104        sq_len: usize,
105        cq_len: usize,
106        sqes_len: usize,
107    ) -> io::Result<(*mut c_void, *mut c_void, *mut c_void)> {
108        let sq_mmap = Self::map_region(ring_fd, sq_len, IORING_OFF_SQ_RING).inspect_err(|_| {
109            // SAFETY: ring_fd came from setup; not yet observed elsewhere.
110            unsafe { ffi::close(ring_fd) };
111        })?;
112        let cq_mmap = Self::map_region(ring_fd, cq_len, IORING_OFF_CQ_RING).inspect_err(|_| {
113            // SAFETY: free what we mapped + close the fd.
114            unsafe {
115                ffi::munmap(sq_mmap, sq_len);
116                ffi::close(ring_fd);
117            }
118        })?;
119        let sqes_map = Self::map_region(ring_fd, sqes_len, IORING_OFF_SQES).inspect_err(|_| {
120            // SAFETY: free what we mapped + close the fd.
121            unsafe {
122                ffi::munmap(cq_mmap, cq_len);
123                ffi::munmap(sq_mmap, sq_len);
124                ffi::close(ring_fd);
125            }
126        })?;
127        Ok((sq_mmap, cq_mmap, sqes_map))
128    }
129
130    /// Issue `io_uring_setup` and return `(ring_fd, params)`.
131    fn setup_ring(entries: u32) -> io::Result<(c_int, IoUringParams)> {
132        let mut p = IoUringParams::default();
133        // SAFETY: `&mut p` lives across this call; kernel writes via the ptr.
134        let fd = unsafe {
135            ffi::syscall(
136                SYS_IO_URING_SETUP,
137                entries as c_long,
138                &mut p as *mut IoUringParams,
139            )
140        };
141        if fd < 0 {
142            return Err(io::Error::last_os_error());
143        }
144        Ok((fd as c_int, p))
145    }
146
147    /// Compute the three mapping lengths the kernel needs us to map.
148    fn region_sizes(p: &IoUringParams) -> (usize, usize, usize) {
149        let sq_len = (p.sq_off.array as usize) + (p.sq_entries as usize) * 4;
150        let cq_len =
151            (p.cq_off.cqes as usize) + (p.cq_entries as usize) * core::mem::size_of::<Completion>();
152        let sqes_len = (p.sq_entries as usize) * core::mem::size_of::<IoUringSqe>();
153        (sq_len, cq_len, sqes_len)
154    }
155
156    /// `mmap` one of the three io_uring regions (`MAP_SHARED | MAP_POPULATE`).
157    fn map_region(ring_fd: c_int, len: usize, off: i64) -> io::Result<*mut c_void> {
158        // SAFETY: kernel-validated `len`/`off`/`ring_fd`; null hint lets the
159        // kernel pick the address. Returns -1 on failure.
160        let m = unsafe {
161            ffi::mmap(
162                ptr::null_mut(),
163                len,
164                PROT_READ | PROT_WRITE,
165                MAP_SHARED | MAP_POPULATE,
166                ring_fd,
167                off,
168            )
169        };
170        if m as isize == -1 {
171            return Err(io::Error::last_os_error());
172        }
173        Ok(m)
174    }
175
176    /// Extract the SQ cursors from a just-mapped SQ region.
177    ///
178    /// # Safety
179    /// `sq_mmap` must point to a region of at least
180    /// `p.sq_off.array + p.sq_entries * 4` bytes, and the kernel must have
181    /// filled `p.sq_off` for this ring.
182    unsafe fn sq_cursors(sq_mmap: *mut c_void, p: &IoUringParams) -> SqCursors {
183        let base = sq_mmap as usize;
184        let at = |off: u32| (base + off as usize) as *const AtomicU32;
185        let khead = at(p.sq_off.head);
186        let ktail = at(p.sq_off.tail);
187        let array = (base + p.sq_off.array as usize) as *mut u32;
188        // SAFETY: caller's invariant says `ring_mask` is inside the region.
189        let mask = unsafe { *((base + p.sq_off.ring_mask as usize) as *const u32) };
190        // SAFETY: ktail is published by the kernel; reading current tail at
191        // construction lets us start the local cursor in sync.
192        let tail = unsafe { (*ktail).load(Ordering::Acquire) };
193        SqCursors { khead, ktail, array, mask, tail }
194    }
195
196    /// Extract the CQ cursors from a just-mapped CQ region.
197    ///
198    /// # Safety
199    /// `cq_mmap` must point to a region of at least
200    /// `p.cq_off.cqes + p.cq_entries * sizeof(Completion)` bytes.
201    unsafe fn cq_cursors(cq_mmap: *mut c_void, p: &IoUringParams) -> CqCursors {
202        let base = cq_mmap as usize;
203        let at = |off: u32| (base + off as usize) as *const AtomicU32;
204        let khead = at(p.cq_off.head);
205        let ktail = at(p.cq_off.tail);
206        let cqes = (base + p.cq_off.cqes as usize) as *const Completion;
207        // SAFETY: caller's invariant says `ring_mask` is inside the region.
208        let mask = unsafe { *((base + p.cq_off.ring_mask as usize) as *const u32) };
209        CqCursors { khead, ktail, cqes, mask }
210    }
211
212    /// Reserve the next SQ slot (advancing the producer cursor + array map);
213    /// returns its SQE index, or `None` if the submission queue is full.
214    fn reserve(&mut self) -> Option<usize> {
215        // SAFETY: `sq_khead` is the kernel-published head ptr.
216        let khead = unsafe { (*self.sq_khead).load(Ordering::Acquire) };
217        if self.sq_tail.wrapping_sub(khead) >= self.sq_entries {
218            return None; // SQ full
219        }
220        let idx = (self.sq_tail & self.sq_mask) as usize;
221        // The SQ `array` maps a ring slot to an SQE index (here 1:1).
222        // SAFETY: `idx < sq_entries` ensures we're inside `sq_array`.
223        unsafe { *self.sq_array.add(idx) = idx as u32 };
224        self.sq_tail = self.sq_tail.wrapping_add(1);
225        Some(idx)
226    }
227
228    /// Queue a `read(fd)` of `len` bytes into `buf`, tagged with `user_data`.
229    /// Returns `false` if the SQ is full.
230    ///
231    /// # Safety
232    /// `buf` must point to `len` writable bytes and stay valid until the matching
233    /// completion is reaped.
234    pub unsafe fn prep_read(&mut self, fd: i32, buf: *mut u8, len: u32, user_data: u64) -> bool {
235        let Some(idx) = self.reserve() else {
236            return false;
237        };
238        // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
239        unsafe {
240            ptr::write(
241                self.sqes.add(idx),
242                IoUringSqe::new(IORING_OP_READ, fd, buf as u64, len, user_data),
243            );
244        }
245        true
246    }
247
248    /// Queue a `write(fd)` of `len` bytes from `buf`, tagged with `user_data`.
249    /// Returns `false` if the SQ is full.
250    ///
251    /// # Safety
252    /// `buf` must point to `len` readable bytes and stay valid until the matching
253    /// completion is reaped.
254    pub unsafe fn prep_write(&mut self, fd: i32, buf: *const u8, len: u32, user_data: u64) -> bool {
255        let Some(idx) = self.reserve() else {
256            return false;
257        };
258        // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
259        unsafe {
260            ptr::write(
261                self.sqes.add(idx),
262                IoUringSqe::new(IORING_OP_WRITE, fd, buf as u64, len, user_data),
263            );
264        }
265        true
266    }
267
268    /// Queue a **multishot** `recv(fd)` that draws its destination buffer from
269    /// the provided-buffer group `bgid` (see [`IoUring::register_buf_ring`]): one
270    /// SQE re-fires a completion per arrival, the kernel picking + reporting a
271    /// buffer id each time, until it terminates (error / `ENOBUFS`, signalled by
272    /// [`Completion::has_more`] returning `false`). No per-recv SQE, no read
273    /// buffer to keep alive. Returns `false` if the SQ is full.
274    pub fn prep_recv_multishot(&mut self, fd: i32, bgid: u16, user_data: u64) -> bool {
275        let Some(idx) = self.reserve() else {
276            return false;
277        };
278        // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
279        unsafe {
280            let sqe = self.sqes.add(idx);
281            // addr/len 0: the buffer comes from the group, not from us.
282            ptr::write(sqe, IoUringSqe::new(IORING_OP_RECV, fd, 0, 0, user_data));
283            (*sqe).ioprio = IORING_RECV_MULTISHOT;
284            (*sqe).flags = IOSQE_BUFFER_SELECT;
285            // `buf_index` aliases `buf_group` in the kernel ABI.
286            (*sqe).buf_index = bgid;
287        }
288        true
289    }
290
291    /// Queue an `accept` on `listen_fd`; the accepted fd arrives as the
292    /// completion's `res` (already `O_NONBLOCK | O_CLOEXEC`). Returns `false` if
293    /// the SQ is full.
294    pub fn prep_accept(&mut self, listen_fd: i32, user_data: u64) -> bool {
295        let Some(idx) = self.reserve() else {
296            return false;
297        };
298        // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
299        unsafe {
300            let sqe = self.sqes.add(idx);
301            ptr::write(
302                sqe,
303                IoUringSqe::new(IORING_OP_ACCEPT, listen_fd, 0, 0, user_data),
304            );
305            (*sqe).rw_flags = SOCK_NONBLOCK | SOCK_CLOEXEC;
306        }
307        true
308    }
309
310    /// Queue a relative timeout: the completion (res = `-ETIME`) arrives once
311    /// `ts` elapses, or earlier with res = 0 / `-ECANCELED` if the ring shuts
312    /// down. Bounds a blocking [`IoUring::submit_and_wait`] the way a poller's
313    /// wait-timeout would. Returns `false` if the SQ is full.
314    ///
315    /// # Safety
316    /// `ts` must stay valid (not moved or dropped) until the matching
317    /// completion is reaped — the kernel reads it asynchronously.
318    pub unsafe fn prep_timeout(&mut self, ts: *const KernelTimespec, user_data: u64) -> bool {
319        let Some(idx) = self.reserve() else {
320            return false;
321        };
322        // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
323        unsafe {
324            let sqe = self.sqes.add(idx);
325            // addr = timespec ptr, len = 1 (one timespec), off = 0 (pure
326            // timeout — no completion-count trigger), rw_flags = 0 (relative).
327            ptr::write(
328                sqe,
329                IoUringSqe::new(IORING_OP_TIMEOUT, -1, ts as u64, 1, user_data),
330            );
331        }
332        true
333    }
334
335    /// Queue a no-op tagged with `user_data` (used to prove the round-trip).
336    /// Returns `false` if the SQ is full.
337    pub fn prep_nop(&mut self, user_data: u64) -> bool {
338        let Some(idx) = self.reserve() else {
339            return false;
340        };
341        // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
342        unsafe {
343            ptr::write(
344                self.sqes.add(idx),
345                IoUringSqe::new(IORING_OP_NOP, -1, 0, 0, user_data),
346            );
347        }
348        true
349    }
350
351    /// Publish queued submissions and enter the kernel, optionally waiting for
352    /// `wait_nr` completions. Returns the number of SQEs consumed.
353    pub fn submit_and_wait(&mut self, wait_nr: u32) -> io::Result<u32> {
354        // SAFETY: `sq_ktail` is the kernel-published tail ptr.
355        let prev = unsafe { (*self.sq_ktail).load(Ordering::Relaxed) };
356        let to_submit = self.sq_tail.wrapping_sub(prev);
357        // SAFETY: publishing our local tail to the kernel-shared atomic.
358        unsafe { (*self.sq_ktail).store(self.sq_tail, Ordering::Release) };
359        let flags = if wait_nr > 0 { IORING_ENTER_GETEVENTS } else { 0 };
360        // SAFETY: kernel-validated args; no Rust memory is read/written.
361        let ret = unsafe {
362            ffi::syscall(
363                SYS_IO_URING_ENTER,
364                self.ring_fd as c_long,
365                to_submit as c_long,
366                wait_nr as c_long,
367                flags as c_long,
368                ptr::null::<c_void>(),
369                0usize,
370            )
371        };
372        if ret < 0 {
373            return Err(io::Error::last_os_error());
374        }
375        Ok(ret as u32)
376    }
377
378    /// Reap every available completion, calling `f` for each; returns the count.
379    pub fn for_each_completion<F: FnMut(Completion)>(&mut self, mut f: F) -> u32 {
380        // SAFETY: cq_khead / cq_ktail are the kernel-shared cursors.
381        let mut head = unsafe { (*self.cq_khead).load(Ordering::Relaxed) };
382        let tail = unsafe { (*self.cq_ktail).load(Ordering::Acquire) };
383        let mut n = 0;
384        while head != tail {
385            let idx = (head & self.cq_mask) as usize;
386            // SAFETY: `idx < cq_entries` by mask; cqes points to that array.
387            let cqe = unsafe { *self.cqes.add(idx) };
388            f(cqe);
389            head = head.wrapping_add(1);
390            n += 1;
391        }
392        // SAFETY: publish the consumer head to the kernel.
393        unsafe { (*self.cq_khead).store(head, Ordering::Release) };
394        n
395    }
396
397    /// Register a **provided-buffer ring** of `entries` (power of two) buffers of
398    /// `buf_size` bytes each under group id `bgid`, for multishot
399    /// [`prep_recv_multishot`](Self::prep_recv_multishot). The kernel draws a
400    /// buffer per arrival and reports its id; the application recycles it via
401    /// [`ProvidedBufRing::recycle`]. The registration is auto-released when the
402    /// ring fd closes; the returned handle also unregisters + unmaps on drop.
403    pub fn register_buf_ring(
404        &self,
405        entries: u16,
406        buf_size: u32,
407        bgid: u16,
408    ) -> io::Result<ProvidedBufRing> {
409        ProvidedBufRing::new(self.ring_fd, entries, buf_size, bgid)
410    }
411}
412
413impl Drop for IoUring {
414    fn drop(&mut self) {
415        // SAFETY: each pointer is the matching `mmap` return; fd is ours.
416        unsafe {
417            ffi::munmap(self.sqes as *mut c_void, self.sqes_len);
418            ffi::munmap(self.cq_mmap, self.cq_mmap_len);
419            ffi::munmap(self.sq_mmap, self.sq_mmap_len);
420            ffi::close(self.ring_fd);
421        }
422    }
423}