1use core::ffi::{c_int, c_long, c_void};
6use core::ptr;
7use core::sync::atomic::{AtomicU32, Ordering};
8use std::io;
9
10use crate::completion::Completion;
11use crate::ffi::{
12 self, IORING_ENTER_GETEVENTS, IORING_OFF_CQ_RING, IORING_OFF_SQ_RING, IORING_OFF_SQES,
13 IORING_OP_ACCEPT, IORING_OP_NOP, IORING_OP_READ, IORING_OP_RECV, IORING_OP_WRITE,
14 IORING_RECV_MULTISHOT, IOSQE_BUFFER_SELECT, MAP_POPULATE, MAP_SHARED, PROT_READ, PROT_WRITE,
15 SOCK_CLOEXEC, SOCK_NONBLOCK, SYS_IO_URING_ENTER, SYS_IO_URING_SETUP,
16};
17use crate::layout::{IoUringParams, IoUringSqe};
18use crate::pbr::ProvidedBufRing;
19
20pub struct IoUring {
22 ring_fd: c_int,
23 sq_mmap: *mut c_void,
24 sq_mmap_len: usize,
25 cq_mmap: *mut c_void,
26 cq_mmap_len: usize,
27 sqes: *mut IoUringSqe,
28 sqes_len: usize,
29 sq_entries: u32,
30 sq_mask: u32,
31 sq_tail: u32,
33 sq_khead: *const AtomicU32,
34 sq_ktail: *const AtomicU32,
35 sq_array: *mut u32,
36 cq_mask: u32,
37 cq_khead: *const AtomicU32,
38 cq_ktail: *const AtomicU32,
39 cqes: *const Completion,
40}
41
42unsafe impl Send for IoUring {}
46
47struct SqCursors {
49 khead: *const AtomicU32,
50 ktail: *const AtomicU32,
51 array: *mut u32,
52 mask: u32,
53 tail: u32,
54}
55
56struct CqCursors {
58 khead: *const AtomicU32,
59 ktail: *const AtomicU32,
60 cqes: *const Completion,
61 mask: u32,
62}
63
64impl IoUring {
65 pub fn new(entries: u32) -> io::Result<IoUring> {
67 let (ring_fd, p) = Self::setup_ring(entries)?;
68 let (sq_len, cq_len, sqes_len) = Self::region_sizes(&p);
69 let (sq_mmap, cq_mmap, sqes_map) =
70 Self::map_three_regions(ring_fd, sq_len, cq_len, sqes_len)?;
71
72 let sq = unsafe { Self::sq_cursors(sq_mmap, &p) };
75 let cq = unsafe { Self::cq_cursors(cq_mmap, &p) };
76
77 Ok(IoUring {
78 ring_fd,
79 sq_mmap,
80 sq_mmap_len: sq_len,
81 cq_mmap,
82 cq_mmap_len: cq_len,
83 sqes: sqes_map as *mut IoUringSqe,
84 sqes_len,
85 sq_entries: p.sq_entries,
86 sq_mask: sq.mask,
87 sq_tail: sq.tail,
88 sq_khead: sq.khead,
89 sq_ktail: sq.ktail,
90 sq_array: sq.array,
91 cq_mask: cq.mask,
92 cq_khead: cq.khead,
93 cq_ktail: cq.ktail,
94 cqes: cq.cqes,
95 })
96 }
97
98 fn map_three_regions(
102 ring_fd: c_int,
103 sq_len: usize,
104 cq_len: usize,
105 sqes_len: usize,
106 ) -> io::Result<(*mut c_void, *mut c_void, *mut c_void)> {
107 let sq_mmap = Self::map_region(ring_fd, sq_len, IORING_OFF_SQ_RING).inspect_err(|_| {
108 unsafe { ffi::close(ring_fd) };
110 })?;
111 let cq_mmap = Self::map_region(ring_fd, cq_len, IORING_OFF_CQ_RING).inspect_err(|_| {
112 unsafe {
114 ffi::munmap(sq_mmap, sq_len);
115 ffi::close(ring_fd);
116 }
117 })?;
118 let sqes_map = Self::map_region(ring_fd, sqes_len, IORING_OFF_SQES).inspect_err(|_| {
119 unsafe {
121 ffi::munmap(cq_mmap, cq_len);
122 ffi::munmap(sq_mmap, sq_len);
123 ffi::close(ring_fd);
124 }
125 })?;
126 Ok((sq_mmap, cq_mmap, sqes_map))
127 }
128
129 fn setup_ring(entries: u32) -> io::Result<(c_int, IoUringParams)> {
131 let mut p = IoUringParams::default();
132 let fd = unsafe {
134 ffi::syscall(
135 SYS_IO_URING_SETUP,
136 entries as c_long,
137 &mut p as *mut IoUringParams,
138 )
139 };
140 if fd < 0 {
141 return Err(io::Error::last_os_error());
142 }
143 Ok((fd as c_int, p))
144 }
145
146 fn region_sizes(p: &IoUringParams) -> (usize, usize, usize) {
148 let sq_len = (p.sq_off.array as usize) + (p.sq_entries as usize) * 4;
149 let cq_len =
150 (p.cq_off.cqes as usize) + (p.cq_entries as usize) * core::mem::size_of::<Completion>();
151 let sqes_len = (p.sq_entries as usize) * core::mem::size_of::<IoUringSqe>();
152 (sq_len, cq_len, sqes_len)
153 }
154
155 fn map_region(ring_fd: c_int, len: usize, off: i64) -> io::Result<*mut c_void> {
157 let m = unsafe {
160 ffi::mmap(
161 ptr::null_mut(),
162 len,
163 PROT_READ | PROT_WRITE,
164 MAP_SHARED | MAP_POPULATE,
165 ring_fd,
166 off,
167 )
168 };
169 if m as isize == -1 {
170 return Err(io::Error::last_os_error());
171 }
172 Ok(m)
173 }
174
175 unsafe fn sq_cursors(sq_mmap: *mut c_void, p: &IoUringParams) -> SqCursors {
182 let base = sq_mmap as usize;
183 let at = |off: u32| (base + off as usize) as *const AtomicU32;
184 let khead = at(p.sq_off.head);
185 let ktail = at(p.sq_off.tail);
186 let array = (base + p.sq_off.array as usize) as *mut u32;
187 let mask = unsafe { *((base + p.sq_off.ring_mask as usize) as *const u32) };
189 let tail = unsafe { (*ktail).load(Ordering::Acquire) };
192 SqCursors { khead, ktail, array, mask, tail }
193 }
194
195 unsafe fn cq_cursors(cq_mmap: *mut c_void, p: &IoUringParams) -> CqCursors {
201 let base = cq_mmap as usize;
202 let at = |off: u32| (base + off as usize) as *const AtomicU32;
203 let khead = at(p.cq_off.head);
204 let ktail = at(p.cq_off.tail);
205 let cqes = (base + p.cq_off.cqes as usize) as *const Completion;
206 let mask = unsafe { *((base + p.cq_off.ring_mask as usize) as *const u32) };
208 CqCursors { khead, ktail, cqes, mask }
209 }
210
211 fn reserve(&mut self) -> Option<usize> {
214 let khead = unsafe { (*self.sq_khead).load(Ordering::Acquire) };
216 if self.sq_tail.wrapping_sub(khead) >= self.sq_entries {
217 return None; }
219 let idx = (self.sq_tail & self.sq_mask) as usize;
220 unsafe { *self.sq_array.add(idx) = idx as u32 };
223 self.sq_tail = self.sq_tail.wrapping_add(1);
224 Some(idx)
225 }
226
227 pub unsafe fn prep_read(&mut self, fd: i32, buf: *mut u8, len: u32, user_data: u64) -> bool {
234 let Some(idx) = self.reserve() else {
235 return false;
236 };
237 unsafe {
239 ptr::write(
240 self.sqes.add(idx),
241 IoUringSqe::new(IORING_OP_READ, fd, buf as u64, len, user_data),
242 );
243 }
244 true
245 }
246
247 pub unsafe fn prep_write(&mut self, fd: i32, buf: *const u8, len: u32, user_data: u64) -> bool {
254 let Some(idx) = self.reserve() else {
255 return false;
256 };
257 unsafe {
259 ptr::write(
260 self.sqes.add(idx),
261 IoUringSqe::new(IORING_OP_WRITE, fd, buf as u64, len, user_data),
262 );
263 }
264 true
265 }
266
267 pub fn prep_recv_multishot(&mut self, fd: i32, bgid: u16, user_data: u64) -> bool {
274 let Some(idx) = self.reserve() else {
275 return false;
276 };
277 unsafe {
279 let sqe = self.sqes.add(idx);
280 ptr::write(sqe, IoUringSqe::new(IORING_OP_RECV, fd, 0, 0, user_data));
282 (*sqe).ioprio = IORING_RECV_MULTISHOT;
283 (*sqe).flags = IOSQE_BUFFER_SELECT;
284 (*sqe).buf_index = bgid;
286 }
287 true
288 }
289
290 pub fn prep_accept(&mut self, listen_fd: i32, user_data: u64) -> bool {
294 let Some(idx) = self.reserve() else {
295 return false;
296 };
297 unsafe {
299 let sqe = self.sqes.add(idx);
300 ptr::write(
301 sqe,
302 IoUringSqe::new(IORING_OP_ACCEPT, listen_fd, 0, 0, user_data),
303 );
304 (*sqe).rw_flags = SOCK_NONBLOCK | SOCK_CLOEXEC;
305 }
306 true
307 }
308
309 pub fn prep_nop(&mut self, user_data: u64) -> bool {
312 let Some(idx) = self.reserve() else {
313 return false;
314 };
315 unsafe {
317 ptr::write(
318 self.sqes.add(idx),
319 IoUringSqe::new(IORING_OP_NOP, -1, 0, 0, user_data),
320 );
321 }
322 true
323 }
324
325 pub fn submit_and_wait(&mut self, wait_nr: u32) -> io::Result<u32> {
328 let prev = unsafe { (*self.sq_ktail).load(Ordering::Relaxed) };
330 let to_submit = self.sq_tail.wrapping_sub(prev);
331 unsafe { (*self.sq_ktail).store(self.sq_tail, Ordering::Release) };
333 let flags = if wait_nr > 0 { IORING_ENTER_GETEVENTS } else { 0 };
334 let ret = unsafe {
336 ffi::syscall(
337 SYS_IO_URING_ENTER,
338 self.ring_fd as c_long,
339 to_submit as c_long,
340 wait_nr as c_long,
341 flags as c_long,
342 ptr::null::<c_void>(),
343 0usize,
344 )
345 };
346 if ret < 0 {
347 return Err(io::Error::last_os_error());
348 }
349 Ok(ret as u32)
350 }
351
352 pub fn for_each_completion<F: FnMut(Completion)>(&mut self, mut f: F) -> u32 {
354 let mut head = unsafe { (*self.cq_khead).load(Ordering::Relaxed) };
356 let tail = unsafe { (*self.cq_ktail).load(Ordering::Acquire) };
357 let mut n = 0;
358 while head != tail {
359 let idx = (head & self.cq_mask) as usize;
360 let cqe = unsafe { *self.cqes.add(idx) };
362 f(cqe);
363 head = head.wrapping_add(1);
364 n += 1;
365 }
366 unsafe { (*self.cq_khead).store(head, Ordering::Release) };
368 n
369 }
370
371 pub fn register_buf_ring(
378 &self,
379 entries: u16,
380 buf_size: u32,
381 bgid: u16,
382 ) -> io::Result<ProvidedBufRing> {
383 ProvidedBufRing::new(self.ring_fd, entries, buf_size, bgid)
384 }
385}
386
387impl Drop for IoUring {
388 fn drop(&mut self) {
389 unsafe {
391 ffi::munmap(self.sqes as *mut c_void, self.sqes_len);
392 ffi::munmap(self.cq_mmap, self.cq_mmap_len);
393 ffi::munmap(self.sq_mmap, self.sq_mmap_len);
394 ffi::close(self.ring_fd);
395 }
396 }
397}
398
399#[cfg(test)]
400mod tests {
401 use super::*;
402 use std::io::{Read, Write};
403 use std::net::TcpListener;
404 use std::os::fd::{AsRawFd, FromRawFd, OwnedFd};
405
406 fn loopback_listener() -> (TcpListener, u16) {
410 let l = TcpListener::bind("127.0.0.1:0").unwrap();
411 let port = l.local_addr().unwrap().port();
412 (l, port)
413 }
414
415 fn ring_or_skip(entries: u32) -> Option<IoUring> {
420 match IoUring::new(entries) {
421 Ok(r) => Some(r),
422 Err(e) => {
423 eprintln!("SKIP: io_uring unavailable ({e})");
424 None
425 }
426 }
427 }
428
429 #[test]
430 fn nop_round_trips() {
431 let Some(mut ring) = ring_or_skip(8) else {
432 return;
433 };
434 assert!(ring.prep_nop(0x1234));
435 assert_eq!(ring.submit_and_wait(1).unwrap(), 1);
436 let mut got = None;
437 let n = ring.for_each_completion(|c| got = Some(c));
438 assert_eq!(n, 1);
439 let c = got.expect("one completion");
440 assert_eq!(c.user_data, 0x1234);
441 assert_eq!(c.res, 0); }
443
444 #[test]
445 fn reads_a_file() {
446 let Some(mut ring) = ring_or_skip(8) else {
447 return;
448 };
449 let path = std::env::temp_dir().join(format!("kevy-uring-{}", std::process::id()));
450 {
451 let mut f = std::fs::File::create(&path).unwrap();
452 f.write_all(b"hello io_uring").unwrap();
453 f.sync_all().unwrap();
454 }
455 let file = std::fs::File::open(&path).unwrap();
456 let mut buf = [0u8; 64];
457 unsafe {
458 assert!(ring.prep_read(file.as_raw_fd(), buf.as_mut_ptr(), buf.len() as u32, 0xABCD));
459 }
460 assert_eq!(ring.submit_and_wait(1).unwrap(), 1);
461 let mut got = None;
462 ring.for_each_completion(|c| got = Some(c));
463 let c = got.expect("one completion");
464 assert_eq!(c.user_data, 0xABCD);
465 assert_eq!(c.res, 14, "should read 14 bytes");
466 assert_eq!(&buf[..14], b"hello io_uring");
467 let _ = std::fs::remove_file(&path);
468 }
469
470 #[test]
471 fn batched_nops() {
472 let Some(mut ring) = ring_or_skip(8) else {
474 return;
475 };
476 for i in 0..8u64 {
477 assert!(ring.prep_nop(i));
478 }
479 assert!(!ring.prep_nop(99), "9th submission should report SQ full");
480 assert_eq!(ring.submit_and_wait(8).unwrap(), 8);
481 let mut seen = 0u64;
482 let n = ring.for_each_completion(|c| seen |= 1 << c.user_data);
483 assert_eq!(n, 8);
484 assert_eq!(seen, 0xFF, "all 8 user_data tags present");
485 }
486
487 #[test]
488 fn accepts_a_connection() {
489 let Some(mut ring) = ring_or_skip(8) else {
492 return;
493 };
494 let (listener, port) = loopback_listener();
495 let _client = std::net::TcpStream::connect(("127.0.0.1", port)).unwrap();
497
498 assert!(ring.prep_accept(listener.as_raw_fd(), 0xACCE));
499 assert_eq!(ring.submit_and_wait(1).unwrap(), 1);
500 let mut got = None;
501 ring.for_each_completion(|c| got = Some(c));
502 let c = got.expect("accept completion");
503 assert_eq!(c.user_data, 0xACCE);
504 assert!(c.res >= 0, "accepted fd should be >= 0, got {}", c.res);
505 let _ = unsafe { OwnedFd::from_raw_fd(c.res) };
507 }
508
509 #[test]
510 fn echo_round_trip_via_io_uring() {
511 const ACCEPT: u64 = 1;
515 const READ: u64 = 2;
516 const WRITE: u64 = 3;
517
518 let Some(mut ring) = ring_or_skip(16) else {
519 return;
520 };
521 let (listener, port) = loopback_listener();
522
523 let client = std::thread::spawn(move || {
524 let mut s = std::net::TcpStream::connect(("127.0.0.1", port)).unwrap();
525 s.write_all(b"ping").unwrap();
526 let mut buf = [0u8; 4];
527 s.read_exact(&mut buf).unwrap();
528 assert_eq!(&buf, b"ping", "client should receive the echo");
529 });
530
531 assert!(ring.prep_accept(listener.as_raw_fd(), ACCEPT));
533 ring.submit_and_wait(1).unwrap();
534 let mut conn_fd = -1;
535 ring.for_each_completion(|c| {
536 if c.user_data == ACCEPT {
537 conn_fd = c.res;
538 }
539 });
540 assert!(conn_fd >= 0, "accept failed: {conn_fd}");
541
542 let mut rbuf = [0u8; 64];
544 unsafe { assert!(ring.prep_read(conn_fd, rbuf.as_mut_ptr(), rbuf.len() as u32, READ)) };
545 ring.submit_and_wait(1).unwrap();
546 let mut nread = 0;
547 ring.for_each_completion(|c| {
548 if c.user_data == READ {
549 nread = c.res;
550 }
551 });
552 assert_eq!(nread, 4, "should read 4 bytes");
553 assert_eq!(&rbuf[..4], b"ping");
554
555 unsafe { assert!(ring.prep_write(conn_fd, rbuf.as_ptr(), 4, WRITE)) };
557 ring.submit_and_wait(1).unwrap();
558 let mut nwrote = 0;
559 ring.for_each_completion(|c| {
560 if c.user_data == WRITE {
561 nwrote = c.res;
562 }
563 });
564 assert_eq!(nwrote, 4, "should write 4 bytes");
565
566 client.join().unwrap();
567 let _ = unsafe { OwnedFd::from_raw_fd(conn_fd) };
569 }
570
571 #[test]
572 fn multishot_recv_with_provided_buffers() {
573 const ACCEPT: u64 = 1;
577 const RECV: u64 = 2;
578
579 let Some(mut ring) = ring_or_skip(16) else {
580 return;
581 };
582 let (listener, port) = loopback_listener();
584
585 let client = std::thread::spawn(move || {
586 let mut s = std::net::TcpStream::connect(("127.0.0.1", port)).unwrap();
587 s.set_nodelay(true).unwrap();
588 s.write_all(b"ping").unwrap();
589 std::thread::sleep(std::time::Duration::from_millis(50));
590 s.write_all(b"pong").unwrap();
591 std::thread::sleep(std::time::Duration::from_millis(50));
592 });
593
594 assert!(ring.prep_accept(listener.as_raw_fd(), ACCEPT));
595 ring.submit_and_wait(1).unwrap();
596 let mut conn_fd = -1;
597 ring.for_each_completion(|c| {
598 if c.user_data == ACCEPT {
599 conn_fd = c.res;
600 }
601 });
602 assert!(conn_fd >= 0, "accept failed: {conn_fd}");
603
604 let mut pbr = match ring.register_buf_ring(4, 64, 7) {
605 Ok(r) => r,
606 Err(e) => {
607 eprintln!("SKIP: provided buffer ring unavailable ({e})");
608 let _ = unsafe { OwnedFd::from_raw_fd(conn_fd) };
609 client.join().unwrap();
610 return;
611 }
612 };
613 assert!(ring.prep_recv_multishot(conn_fd, pbr.group(), RECV));
614
615 ring.submit_and_wait(1).unwrap();
617 let mut c1 = None;
618 ring.for_each_completion(|c| {
619 if c.user_data == RECV {
620 c1 = Some(c);
621 }
622 });
623 let c1 = c1.expect("first recv completion");
624 assert!(c1.res > 0, "recv res should be >0, got {}", c1.res);
625 let bid1 = c1.buffer_id().expect("a provided buffer was used");
626 assert_eq!(pbr.bytes(bid1, c1.res as usize), b"ping");
627 assert!(c1.has_more(), "multishot recv stays armed (F_MORE)");
628 pbr.recycle(bid1);
629
630 ring.submit_and_wait(1).unwrap();
632 let mut c2 = None;
633 ring.for_each_completion(|c| {
634 if c.user_data == RECV {
635 c2 = Some(c);
636 }
637 });
638 let c2 = c2.expect("second recv completion from the same SQE");
639 let bid2 = c2.buffer_id().expect("a provided buffer was used");
640 assert_eq!(pbr.bytes(bid2, c2.res as usize), b"pong");
641 pbr.recycle(bid2);
642
643 client.join().unwrap();
644 let _ = unsafe { OwnedFd::from_raw_fd(conn_fd) };
646 }
647}