kevy_uring/ring.rs
1//! The io_uring engine: `IoUring::new` sets up the kernel ring + maps the
2//! three shared regions; `prep_*` queues SQEs into the SQ; `submit_and_wait`
3//! enters the kernel; `for_each_completion` reaps completed CQEs.
4
5use core::ffi::{c_int, c_long, c_void};
6use core::ptr;
7use core::sync::atomic::{AtomicU32, Ordering};
8use std::io;
9
10use crate::completion::Completion;
11use crate::ffi::{
12 self, IORING_ENTER_GETEVENTS, IORING_OFF_CQ_RING, IORING_OFF_SQ_RING, IORING_OFF_SQES,
13 IORING_OP_ACCEPT, IORING_OP_NOP, IORING_OP_READ, IORING_OP_RECV, IORING_OP_TIMEOUT,
14 IORING_OP_WRITE,
15 IORING_RECV_MULTISHOT, IOSQE_BUFFER_SELECT, MAP_POPULATE, MAP_SHARED, PROT_READ, PROT_WRITE,
16 SOCK_CLOEXEC, SOCK_NONBLOCK, SYS_IO_URING_ENTER, SYS_IO_URING_SETUP,
17};
18use crate::layout::{IoUringParams, IoUringSqe, KernelTimespec};
19use crate::pbr::ProvidedBufRing;
20
21/// A Linux io_uring instance: one submission ring + one completion ring.
22pub struct IoUring {
23 ring_fd: c_int,
24 sq_mmap: *mut c_void,
25 sq_mmap_len: usize,
26 cq_mmap: *mut c_void,
27 cq_mmap_len: usize,
28 sqes: *mut IoUringSqe,
29 sqes_len: usize,
30 sq_entries: u32,
31 sq_mask: u32,
32 /// Local producer cursor; published to the kernel on `submit`.
33 sq_tail: u32,
34 sq_khead: *const AtomicU32,
35 sq_ktail: *const AtomicU32,
36 sq_array: *mut u32,
37 cq_mask: u32,
38 cq_khead: *const AtomicU32,
39 cq_ktail: *const AtomicU32,
40 cqes: *const Completion,
41}
42
43// SAFETY: `IoUring` owns its fd and mappings exclusively; moving the whole
44// engine to another thread (one per shard) is sound. It is not `Sync`
45// (single owner).
46unsafe impl Send for IoUring {}
47
48/// Cursors recovered from the SQ ring mapping.
49struct SqCursors {
50 khead: *const AtomicU32,
51 ktail: *const AtomicU32,
52 array: *mut u32,
53 mask: u32,
54 tail: u32,
55}
56
57/// Cursors recovered from the CQ ring mapping.
58struct CqCursors {
59 khead: *const AtomicU32,
60 ktail: *const AtomicU32,
61 cqes: *const Completion,
62 mask: u32,
63}
64
65impl IoUring {
66 /// Create a ring sized for at least `entries` in-flight submissions.
67 pub fn new(entries: u32) -> io::Result<IoUring> {
68 let (ring_fd, p) = Self::setup_ring(entries)?;
69 let (sq_len, cq_len, sqes_len) = Self::region_sizes(&p);
70 let (sq_mmap, cq_mmap, sqes_map) =
71 Self::map_three_regions(ring_fd, sq_len, cq_len, sqes_len)?;
72
73 // SAFETY: `sq_off` / `cq_off` were filled by the kernel for this ring;
74 // their byte offsets lie inside the just-mapped regions.
75 let sq = unsafe { Self::sq_cursors(sq_mmap, &p) };
76 let cq = unsafe { Self::cq_cursors(cq_mmap, &p) };
77
78 Ok(IoUring {
79 ring_fd,
80 sq_mmap,
81 sq_mmap_len: sq_len,
82 cq_mmap,
83 cq_mmap_len: cq_len,
84 sqes: sqes_map as *mut IoUringSqe,
85 sqes_len,
86 sq_entries: p.sq_entries,
87 sq_mask: sq.mask,
88 sq_tail: sq.tail,
89 sq_khead: sq.khead,
90 sq_ktail: sq.ktail,
91 sq_array: sq.array,
92 cq_mask: cq.mask,
93 cq_khead: cq.khead,
94 cq_ktail: cq.ktail,
95 cqes: cq.cqes,
96 })
97 }
98
99 /// `mmap` all three io_uring shared regions. On any failure, cleans up
100 /// the partial state (close fd, unmap what was already mapped) and
101 /// returns the original syscall error.
102 fn map_three_regions(
103 ring_fd: c_int,
104 sq_len: usize,
105 cq_len: usize,
106 sqes_len: usize,
107 ) -> io::Result<(*mut c_void, *mut c_void, *mut c_void)> {
108 let sq_mmap = Self::map_region(ring_fd, sq_len, IORING_OFF_SQ_RING).inspect_err(|_| {
109 // SAFETY: ring_fd came from setup; not yet observed elsewhere.
110 unsafe { ffi::close(ring_fd) };
111 })?;
112 let cq_mmap = Self::map_region(ring_fd, cq_len, IORING_OFF_CQ_RING).inspect_err(|_| {
113 // SAFETY: free what we mapped + close the fd.
114 unsafe {
115 ffi::munmap(sq_mmap, sq_len);
116 ffi::close(ring_fd);
117 }
118 })?;
119 let sqes_map = Self::map_region(ring_fd, sqes_len, IORING_OFF_SQES).inspect_err(|_| {
120 // SAFETY: free what we mapped + close the fd.
121 unsafe {
122 ffi::munmap(cq_mmap, cq_len);
123 ffi::munmap(sq_mmap, sq_len);
124 ffi::close(ring_fd);
125 }
126 })?;
127 Ok((sq_mmap, cq_mmap, sqes_map))
128 }
129
130 /// Issue `io_uring_setup` and return `(ring_fd, params)`.
131 fn setup_ring(entries: u32) -> io::Result<(c_int, IoUringParams)> {
132 let mut p = IoUringParams::default();
133 // SAFETY: `&mut p` lives across this call; kernel writes via the ptr.
134 let fd = unsafe {
135 ffi::syscall(
136 SYS_IO_URING_SETUP,
137 entries as c_long,
138 &mut p as *mut IoUringParams,
139 )
140 };
141 if fd < 0 {
142 return Err(io::Error::last_os_error());
143 }
144 Ok((fd as c_int, p))
145 }
146
147 /// Compute the three mapping lengths the kernel needs us to map.
148 fn region_sizes(p: &IoUringParams) -> (usize, usize, usize) {
149 let sq_len = (p.sq_off.array as usize) + (p.sq_entries as usize) * 4;
150 let cq_len =
151 (p.cq_off.cqes as usize) + (p.cq_entries as usize) * core::mem::size_of::<Completion>();
152 let sqes_len = (p.sq_entries as usize) * core::mem::size_of::<IoUringSqe>();
153 (sq_len, cq_len, sqes_len)
154 }
155
156 /// `mmap` one of the three io_uring regions (`MAP_SHARED | MAP_POPULATE`).
157 fn map_region(ring_fd: c_int, len: usize, off: i64) -> io::Result<*mut c_void> {
158 // SAFETY: kernel-validated `len`/`off`/`ring_fd`; null hint lets the
159 // kernel pick the address. Returns -1 on failure.
160 let m = unsafe {
161 ffi::mmap(
162 ptr::null_mut(),
163 len,
164 PROT_READ | PROT_WRITE,
165 MAP_SHARED | MAP_POPULATE,
166 ring_fd,
167 off,
168 )
169 };
170 if m as isize == -1 {
171 return Err(io::Error::last_os_error());
172 }
173 Ok(m)
174 }
175
176 /// Extract the SQ cursors from a just-mapped SQ region.
177 ///
178 /// # Safety
179 /// `sq_mmap` must point to a region of at least
180 /// `p.sq_off.array + p.sq_entries * 4` bytes, and the kernel must have
181 /// filled `p.sq_off` for this ring.
182 unsafe fn sq_cursors(sq_mmap: *mut c_void, p: &IoUringParams) -> SqCursors {
183 let base = sq_mmap as usize;
184 let at = |off: u32| (base + off as usize) as *const AtomicU32;
185 let khead = at(p.sq_off.head);
186 let ktail = at(p.sq_off.tail);
187 let array = (base + p.sq_off.array as usize) as *mut u32;
188 // SAFETY: caller's invariant says `ring_mask` is inside the region.
189 let mask = unsafe { *((base + p.sq_off.ring_mask as usize) as *const u32) };
190 // SAFETY: ktail is published by the kernel; reading current tail at
191 // construction lets us start the local cursor in sync.
192 let tail = unsafe { (*ktail).load(Ordering::Acquire) };
193 SqCursors { khead, ktail, array, mask, tail }
194 }
195
196 /// Extract the CQ cursors from a just-mapped CQ region.
197 ///
198 /// # Safety
199 /// `cq_mmap` must point to a region of at least
200 /// `p.cq_off.cqes + p.cq_entries * sizeof(Completion)` bytes.
201 unsafe fn cq_cursors(cq_mmap: *mut c_void, p: &IoUringParams) -> CqCursors {
202 let base = cq_mmap as usize;
203 let at = |off: u32| (base + off as usize) as *const AtomicU32;
204 let khead = at(p.cq_off.head);
205 let ktail = at(p.cq_off.tail);
206 let cqes = (base + p.cq_off.cqes as usize) as *const Completion;
207 // SAFETY: caller's invariant says `ring_mask` is inside the region.
208 let mask = unsafe { *((base + p.cq_off.ring_mask as usize) as *const u32) };
209 CqCursors { khead, ktail, cqes, mask }
210 }
211
212 /// Reserve the next SQ slot (advancing the producer cursor + array map);
213 /// returns its SQE index, or `None` if the submission queue is full.
214 fn reserve(&mut self) -> Option<usize> {
215 // SAFETY: `sq_khead` is the kernel-published head ptr.
216 let khead = unsafe { (*self.sq_khead).load(Ordering::Acquire) };
217 if self.sq_tail.wrapping_sub(khead) >= self.sq_entries {
218 return None; // SQ full
219 }
220 let idx = (self.sq_tail & self.sq_mask) as usize;
221 // The SQ `array` maps a ring slot to an SQE index (here 1:1).
222 // SAFETY: `idx < sq_entries` ensures we're inside `sq_array`.
223 unsafe { *self.sq_array.add(idx) = idx as u32 };
224 self.sq_tail = self.sq_tail.wrapping_add(1);
225 Some(idx)
226 }
227
228 /// Queue a `read(fd)` of `len` bytes into `buf`, tagged with `user_data`.
229 /// Returns `false` if the SQ is full.
230 ///
231 /// # Safety
232 /// `buf` must point to `len` writable bytes and stay valid until the matching
233 /// completion is reaped.
234 pub unsafe fn prep_read(&mut self, fd: i32, buf: *mut u8, len: u32, user_data: u64) -> bool {
235 let Some(idx) = self.reserve() else {
236 return false;
237 };
238 // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
239 unsafe {
240 ptr::write(
241 self.sqes.add(idx),
242 IoUringSqe::new(IORING_OP_READ, fd, buf as u64, len, user_data),
243 );
244 }
245 true
246 }
247
248 /// Queue a `write(fd)` of `len` bytes from `buf`, tagged with `user_data`.
249 /// Returns `false` if the SQ is full.
250 ///
251 /// # Safety
252 /// `buf` must point to `len` readable bytes and stay valid until the matching
253 /// completion is reaped.
254 pub unsafe fn prep_write(&mut self, fd: i32, buf: *const u8, len: u32, user_data: u64) -> bool {
255 let Some(idx) = self.reserve() else {
256 return false;
257 };
258 // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
259 unsafe {
260 ptr::write(
261 self.sqes.add(idx),
262 IoUringSqe::new(IORING_OP_WRITE, fd, buf as u64, len, user_data),
263 );
264 }
265 true
266 }
267
268 /// Queue a **multishot** `recv(fd)` that draws its destination buffer from
269 /// the provided-buffer group `bgid` (see [`IoUring::register_buf_ring`]): one
270 /// SQE re-fires a completion per arrival, the kernel picking + reporting a
271 /// buffer id each time, until it terminates (error / `ENOBUFS`, signalled by
272 /// [`Completion::has_more`] returning `false`). No per-recv SQE, no read
273 /// buffer to keep alive. Returns `false` if the SQ is full.
274 pub fn prep_recv_multishot(&mut self, fd: i32, bgid: u16, user_data: u64) -> bool {
275 let Some(idx) = self.reserve() else {
276 return false;
277 };
278 // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
279 unsafe {
280 let sqe = self.sqes.add(idx);
281 // addr/len 0: the buffer comes from the group, not from us.
282 ptr::write(sqe, IoUringSqe::new(IORING_OP_RECV, fd, 0, 0, user_data));
283 (*sqe).ioprio = IORING_RECV_MULTISHOT;
284 (*sqe).flags = IOSQE_BUFFER_SELECT;
285 // `buf_index` aliases `buf_group` in the kernel ABI.
286 (*sqe).buf_index = bgid;
287 }
288 true
289 }
290
291 /// Queue an `accept` on `listen_fd`; the accepted fd arrives as the
292 /// completion's `res` (already `O_NONBLOCK | O_CLOEXEC`). Returns `false` if
293 /// the SQ is full.
294 pub fn prep_accept(&mut self, listen_fd: i32, user_data: u64) -> bool {
295 let Some(idx) = self.reserve() else {
296 return false;
297 };
298 // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
299 unsafe {
300 let sqe = self.sqes.add(idx);
301 ptr::write(
302 sqe,
303 IoUringSqe::new(IORING_OP_ACCEPT, listen_fd, 0, 0, user_data),
304 );
305 (*sqe).rw_flags = SOCK_NONBLOCK | SOCK_CLOEXEC;
306 }
307 true
308 }
309
310 /// Queue a relative timeout: the completion (res = `-ETIME`) arrives once
311 /// `ts` elapses, or earlier with res = 0 / `-ECANCELED` if the ring shuts
312 /// down. Bounds a blocking [`IoUring::submit_and_wait`] the way a poller's
313 /// wait-timeout would. Returns `false` if the SQ is full.
314 ///
315 /// # Safety
316 /// `ts` must stay valid (not moved or dropped) until the matching
317 /// completion is reaped — the kernel reads it asynchronously.
318 pub unsafe fn prep_timeout(&mut self, ts: *const KernelTimespec, user_data: u64) -> bool {
319 let Some(idx) = self.reserve() else {
320 return false;
321 };
322 // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
323 unsafe {
324 let sqe = self.sqes.add(idx);
325 // addr = timespec ptr, len = 1 (one timespec), off = 0 (pure
326 // timeout — no completion-count trigger), rw_flags = 0 (relative).
327 ptr::write(
328 sqe,
329 IoUringSqe::new(IORING_OP_TIMEOUT, -1, ts as u64, 1, user_data),
330 );
331 }
332 true
333 }
334
335 /// Queue a no-op tagged with `user_data` (used to prove the round-trip).
336 /// Returns `false` if the SQ is full.
337 pub fn prep_nop(&mut self, user_data: u64) -> bool {
338 let Some(idx) = self.reserve() else {
339 return false;
340 };
341 // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
342 unsafe {
343 ptr::write(
344 self.sqes.add(idx),
345 IoUringSqe::new(IORING_OP_NOP, -1, 0, 0, user_data),
346 );
347 }
348 true
349 }
350
351 /// Publish queued submissions and enter the kernel, optionally waiting for
352 /// `wait_nr` completions. Returns the number of SQEs consumed.
353 pub fn submit_and_wait(&mut self, wait_nr: u32) -> io::Result<u32> {
354 // SAFETY: `sq_ktail` is the kernel-published tail ptr.
355 let prev = unsafe { (*self.sq_ktail).load(Ordering::Relaxed) };
356 let to_submit = self.sq_tail.wrapping_sub(prev);
357 // SAFETY: publishing our local tail to the kernel-shared atomic.
358 unsafe { (*self.sq_ktail).store(self.sq_tail, Ordering::Release) };
359 let flags = if wait_nr > 0 { IORING_ENTER_GETEVENTS } else { 0 };
360 // SAFETY: kernel-validated args; no Rust memory is read/written.
361 let ret = unsafe {
362 ffi::syscall(
363 SYS_IO_URING_ENTER,
364 self.ring_fd as c_long,
365 to_submit as c_long,
366 wait_nr as c_long,
367 flags as c_long,
368 ptr::null::<c_void>(),
369 0usize,
370 )
371 };
372 if ret < 0 {
373 return Err(io::Error::last_os_error());
374 }
375 Ok(ret as u32)
376 }
377
378 /// Reap every available completion, calling `f` for each; returns the count.
379 pub fn for_each_completion<F: FnMut(Completion)>(&mut self, mut f: F) -> u32 {
380 // SAFETY: cq_khead / cq_ktail are the kernel-shared cursors.
381 let mut head = unsafe { (*self.cq_khead).load(Ordering::Relaxed) };
382 let tail = unsafe { (*self.cq_ktail).load(Ordering::Acquire) };
383 let mut n = 0;
384 while head != tail {
385 let idx = (head & self.cq_mask) as usize;
386 // SAFETY: `idx < cq_entries` by mask; cqes points to that array.
387 let cqe = unsafe { *self.cqes.add(idx) };
388 f(cqe);
389 head = head.wrapping_add(1);
390 n += 1;
391 }
392 // SAFETY: publish the consumer head to the kernel.
393 unsafe { (*self.cq_khead).store(head, Ordering::Release) };
394 n
395 }
396
397 /// Register a **provided-buffer ring** of `entries` (power of two) buffers of
398 /// `buf_size` bytes each under group id `bgid`, for multishot
399 /// [`prep_recv_multishot`](Self::prep_recv_multishot). The kernel draws a
400 /// buffer per arrival and reports its id; the application recycles it via
401 /// [`ProvidedBufRing::recycle`]. The registration is auto-released when the
402 /// ring fd closes; the returned handle also unregisters + unmaps on drop.
403 pub fn register_buf_ring(
404 &self,
405 entries: u16,
406 buf_size: u32,
407 bgid: u16,
408 ) -> io::Result<ProvidedBufRing> {
409 ProvidedBufRing::new(self.ring_fd, entries, buf_size, bgid)
410 }
411}
412
413impl Drop for IoUring {
414 fn drop(&mut self) {
415 // SAFETY: each pointer is the matching `mmap` return; fd is ours.
416 unsafe {
417 ffi::munmap(self.sqes as *mut c_void, self.sqes_len);
418 ffi::munmap(self.cq_mmap, self.cq_mmap_len);
419 ffi::munmap(self.sq_mmap, self.sq_mmap_len);
420 ffi::close(self.ring_fd);
421 }
422 }
423}