1extern crate alloc;
44extern crate core;
45
46use core::mem::MaybeUninit;
47use core::ptr;
48use std::io::{self, IoSlice};
49use std::time::Duration;
50
51pub mod operation;
52
53mod bindings;
54
55#[derive(Debug, Clone, Copy)]
57pub struct Completion {
58 user_data: u64,
60 res: i32,
62 pub flags: u32,
64}
65
66impl Completion {
67 pub fn is_ok(&self) -> bool {
69 self.res >= 0
70 }
71
72 pub fn result(&self) -> i32 {
74 self.res
75 }
76
77 pub fn user_data(&self) -> u64 {
79 self.user_data
80 }
81
82 pub fn has_more(&self) -> bool {
84 (self.flags & bindings::IORING_CQE_F_MORE) != 0
85 }
86
87 pub fn buffer_id(&self) -> Option<u16> {
89 if (self.flags & bindings::IORING_CQE_F_BUFFER) != 0 {
90 Some((self.flags >> bindings::IORING_CQE_BUFFER_SHIFT) as u16)
91 } else {
92 None
93 }
94 }
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
99pub struct SqeFlags(u8);
100
101impl SqeFlags {
102 pub const NONE: Self = Self(0);
104
105 pub const FIXED_FILE: Self =
107 Self(1 << bindings::io_uring_sqe_flags_bit_IOSQE_FIXED_FILE_BIT);
108
109 pub const ASYNC: Self =
111 Self(1 << bindings::io_uring_sqe_flags_bit_IOSQE_ASYNC_BIT);
112
113 pub const IO_LINK: Self =
115 Self(1 << bindings::io_uring_sqe_flags_bit_IOSQE_IO_LINK_BIT);
116
117 pub const IO_DRAIN: Self =
119 Self(1 << bindings::io_uring_sqe_flags_bit_IOSQE_IO_DRAIN_BIT);
120
121 pub const IO_HARDLINK: Self =
123 Self(1 << bindings::io_uring_sqe_flags_bit_IOSQE_IO_HARDLINK_BIT);
124
125 pub const BUFFER_SELECT: Self =
127 Self(1 << bindings::io_uring_sqe_flags_bit_IOSQE_BUFFER_SELECT_BIT);
128
129 pub const CQE_SKIP_SUCCESS: Self =
131 Self(1 << bindings::io_uring_sqe_flags_bit_IOSQE_CQE_SKIP_SUCCESS_BIT);
132
133 pub const fn or(self, other: Self) -> Self {
135 Self(self.0 | other.0)
136 }
137
138 pub const fn contains(self, other: Self) -> bool {
140 (self.0 & other.0) == other.0
141 }
142
143 pub fn bits(self) -> u8 {
144 self.0
145 }
146}
147
148impl std::ops::BitOr for SqeFlags {
149 type Output = Self;
150 fn bitor(self, rhs: Self) -> Self::Output {
151 self.or(rhs)
152 }
153}
154
155pub struct Entry(pub(crate) bindings::io_uring_sqe);
157
158impl Entry {
159 pub(crate) fn from_sqe(sqe: bindings::io_uring_sqe) -> Self {
160 Self(sqe)
161 }
162
163 pub(crate) fn into_sqe(self) -> bindings::io_uring_sqe {
164 self.0
165 }
166}
167
168#[derive(Debug, Clone)]
170pub struct Params {
171 pub sq_entries: u32,
173 pub flags: u32,
175 pub sq_thread_cpu: u32,
177 pub sq_thread_idle: u32,
179}
180
181impl Default for Params {
182 fn default() -> Self {
183 Self { sq_entries: 128, flags: 0, sq_thread_cpu: 0, sq_thread_idle: 0 }
184 }
185}
186
187impl Params {
188 pub fn sqpoll(mut self, idle_ms: u32) -> Self {
190 self.flags |= bindings::IORING_SETUP_SQPOLL;
191 self.sq_thread_idle = idle_ms;
192 self
193 }
194
195 pub fn iopoll(mut self) -> Self {
197 self.flags |= bindings::IORING_SETUP_IOPOLL;
198 self
199 }
200}
201
202pub struct LioUring {
207 ring: bindings::io_uring,
208 flags: u32,
209}
210
211impl Drop for LioUring {
212 fn drop(&mut self) {
213 unsafe { bindings::io_uring_queue_exit(&raw mut self.ring) };
214 }
215}
216
217impl LioUring {
218 pub fn new(capacity: u32) -> io::Result<Self> {
227 Self::with_params(Params { sq_entries: capacity, ..Default::default() })
228 }
229
230 pub fn with_params(params: Params) -> io::Result<Self> {
235 let mut ring = MaybeUninit::zeroed();
236 let mut raw_params = bindings::io_uring_params {
237 sq_entries: params.sq_entries,
238 cq_entries: 0,
239 flags: params.flags,
240 sq_thread_cpu: params.sq_thread_cpu,
241 sq_thread_idle: params.sq_thread_idle,
242 features: 0,
243 wq_fd: 0,
244 resv: [0; 3],
245 sq_off: unsafe { std::mem::zeroed() },
246 cq_off: unsafe { std::mem::zeroed() },
247 };
248
249 let ret = unsafe {
250 bindings::io_uring_queue_init_params(
251 params.sq_entries,
252 ring.as_mut_ptr(),
253 &raw mut raw_params,
254 )
255 };
256
257 if ret < 0 {
258 return Err(io::Error::from_raw_os_error(-ret));
259 }
260
261 let ring_init = unsafe { ring.assume_init() };
262 let flags = ring_init.flags;
263
264 Ok(Self { ring: ring_init, flags })
265 }
266
267 pub unsafe fn push(
280 &mut self,
281 entry: Entry,
282 user_data: u64,
283 ) -> io::Result<()> {
284 unsafe { self.push_with_flags(entry, user_data, SqeFlags::NONE) }
285 }
286
287 pub unsafe fn push_with_flags(
295 &mut self,
296 entry: Entry,
297 user_data: u64,
298 flags: SqeFlags,
299 ) -> io::Result<()> {
300 let sqe = unsafe { bindings::io_uring_get_sqe(&raw mut self.ring) };
301 if sqe.is_null() {
302 return Err(io::Error::new(
303 io::ErrorKind::WouldBlock,
304 "submission queue is full",
305 ));
306 }
307
308 unsafe {
309 (*sqe) = entry.into_sqe();
310 (*sqe).user_data = user_data;
311 (*sqe).flags = flags.bits()
312 }
313
314 Ok(())
315 }
316
317 pub fn submit(&mut self) -> io::Result<usize> {
327 if !self.is_sqpoll() {
328 let ret =
329 unsafe { bindings::io_uring_submit_and_wait(&raw mut self.ring, 0) };
330 if ret < 0 {
331 return Err(io::Error::from_raw_os_error(-ret));
332 }
333 return Ok(ret as usize);
334 }
335
336 let pending = unsafe {
338 let sq_tail = self.ring.sq.sqe_tail;
339 let sq_head = self.ring.sq.sqe_head;
340
341 if sq_head != sq_tail {
342 self.ring.sq.sqe_head = sq_tail;
343 std::sync::atomic::fence(std::sync::atomic::Ordering::SeqCst);
344 std::ptr::write_volatile(self.ring.sq.ktail, sq_tail);
345 }
346
347 sq_tail.wrapping_sub(*self.ring.sq.khead)
348 };
349
350 if pending == 0 {
351 return Ok(0);
352 }
353
354 let needs_wakeup =
355 unsafe { *self.ring.sq.kflags & bindings::IORING_SQ_NEED_WAKEUP != 0 };
356
357 if needs_wakeup {
358 let ret = unsafe { bindings::io_uring_submit(&raw mut self.ring) };
359 if ret < 0 {
360 return Err(io::Error::from_raw_os_error(-ret));
361 }
362 Ok(ret as usize)
363 } else {
364 Ok(pending as usize)
365 }
366 }
367
368 pub fn is_sqpoll(&self) -> bool {
370 (self.flags & bindings::IORING_SETUP_SQPOLL) != 0
371 }
372
373 pub fn sq_space_left(&self) -> usize {
375 unsafe {
376 bindings::io_uring_sq_space_left(&self.ring as *const _ as *mut _)
377 as usize
378 }
379 }
380
381 pub fn wait(&mut self) -> io::Result<Completion> {
390 let mut cqe_ptr = ptr::null_mut();
391 let ret = unsafe {
392 bindings::io_uring_wait_cqe(&raw mut self.ring, &raw mut cqe_ptr)
393 };
394
395 if ret < 0 {
396 return Err(io::Error::from_raw_os_error(-ret));
397 }
398
399 let cqe = unsafe { &*cqe_ptr };
400 let completion =
401 Completion { user_data: cqe.user_data, res: cqe.res, flags: cqe.flags };
402
403 unsafe { bindings::io_uring_cqe_seen(&raw mut self.ring, cqe_ptr) };
404
405 Ok(completion)
406 }
407
408 pub fn wait_timeout(
415 &mut self,
416 timeout: Duration,
417 ) -> io::Result<Option<Completion>> {
418 let mut cqe_ptr = ptr::null_mut();
419 let mut ts = bindings::__kernel_timespec {
420 tv_sec: timeout.as_secs() as i64,
421 tv_nsec: timeout.subsec_nanos() as i64,
422 };
423
424 let ret = unsafe {
425 bindings::io_uring_wait_cqe_timeout(
426 &raw mut self.ring,
427 &raw mut cqe_ptr,
428 &raw mut ts,
429 )
430 };
431
432 if ret < 0 {
433 let errno = -ret;
434 if errno == libc::ETIME {
435 return Ok(None);
436 }
437 return Err(io::Error::from_raw_os_error(errno));
438 }
439
440 let cqe = unsafe { &*cqe_ptr };
441 let completion =
442 Completion { user_data: cqe.user_data, res: cqe.res, flags: cqe.flags };
443
444 unsafe { bindings::io_uring_cqe_seen(&raw mut self.ring, cqe_ptr) };
445
446 Ok(Some(completion))
447 }
448
449 pub fn try_wait(&mut self) -> io::Result<Option<Completion>> {
456 let mut cqe_ptr = ptr::null_mut();
457 let ret = unsafe {
458 bindings::io_uring_peek_cqe(&raw mut self.ring, &raw mut cqe_ptr)
459 };
460
461 if ret < 0 {
462 if -ret == libc::EAGAIN {
463 return Ok(None);
464 }
465 return Err(io::Error::from_raw_os_error(-ret));
466 }
467
468 if cqe_ptr.is_null() {
469 return Ok(None);
470 }
471
472 let cqe = unsafe { &*cqe_ptr };
473 let completion =
474 Completion { user_data: cqe.user_data, res: cqe.res, flags: cqe.flags };
475
476 unsafe { bindings::io_uring_cqe_seen(&raw mut self.ring, cqe_ptr) };
477
478 Ok(Some(completion))
479 }
480
481 pub fn peek(&self) -> io::Result<Option<Completion>> {
485 let mut cqe_ptr = ptr::null_mut();
486 let ret = unsafe {
487 bindings::io_uring_peek_cqe(
488 &self.ring as *const _ as *mut _,
489 &raw mut cqe_ptr,
490 )
491 };
492
493 if ret < 0 {
494 if -ret == libc::EAGAIN {
495 return Ok(None);
496 }
497 return Err(io::Error::from_raw_os_error(-ret));
498 }
499
500 if cqe_ptr.is_null() {
501 return Ok(None);
502 }
503
504 let cqe = unsafe { &*cqe_ptr };
505 Ok(Some(Completion {
506 user_data: cqe.user_data,
507 res: cqe.res,
508 flags: cqe.flags,
509 }))
510 }
511
512 pub fn cq_ready(&self) -> usize {
514 unsafe {
515 bindings::io_uring_cq_ready(&self.ring as *const _ as *mut _) as usize
516 }
517 }
518
519 pub unsafe fn register_buffers(
533 &mut self,
534 buffers: &[IoSlice<'_>],
535 ) -> io::Result<()> {
536 let iovecs: Vec<libc::iovec> = buffers
537 .iter()
538 .map(|buf| libc::iovec {
539 iov_base: buf.as_ptr() as *mut _,
540 iov_len: buf.len(),
541 })
542 .collect();
543
544 let ret = unsafe {
545 bindings::io_uring_register_buffers(
546 &raw mut self.ring,
547 iovecs.as_ptr().cast(),
548 iovecs.len() as u32,
549 )
550 };
551
552 if ret < 0 {
553 return Err(io::Error::from_raw_os_error(-ret));
554 }
555 Ok(())
556 }
557
558 pub fn unregister_buffers(&mut self) -> io::Result<()> {
560 let ret =
561 unsafe { bindings::io_uring_unregister_buffers(&raw mut self.ring) };
562
563 if ret < 0 {
564 return Err(io::Error::from_raw_os_error(-ret));
565 }
566 Ok(())
567 }
568
569 pub fn register_files(&mut self, fds: &[i32]) -> io::Result<()> {
577 let ret = unsafe {
578 bindings::io_uring_register_files(
579 &raw mut self.ring,
580 fds.as_ptr(),
581 fds.len() as u32,
582 )
583 };
584
585 if ret < 0 {
586 return Err(io::Error::from_raw_os_error(-ret));
587 }
588 Ok(())
589 }
590
591 pub fn register_files_update(
595 &mut self,
596 offset: u32,
597 fds: &[i32],
598 ) -> io::Result<()> {
599 let ret = unsafe {
600 bindings::io_uring_register_files_update(
601 &raw mut self.ring,
602 offset,
603 fds.as_ptr(),
604 fds.len() as u32,
605 )
606 };
607
608 if ret < 0 {
609 return Err(io::Error::from_raw_os_error(-ret));
610 }
611 Ok(())
612 }
613
614 pub fn unregister_files(&mut self) -> io::Result<()> {
616 let ret =
617 unsafe { bindings::io_uring_unregister_files(&raw mut self.ring) };
618
619 if ret < 0 {
620 return Err(io::Error::from_raw_os_error(-ret));
621 }
622 Ok(())
623 }
624}
625
626#[cfg(test)]
627mod tests {
628 use super::*;
629
630 #[test]
635 fn test_completion_is_ok_positive() {
636 let c = Completion { user_data: 1, res: 0, flags: 0 };
637 assert!(c.is_ok());
638
639 let c = Completion { user_data: 1, res: 100, flags: 0 };
640 assert!(c.is_ok());
641 }
642
643 #[test]
644 fn test_completion_is_ok_negative() {
645 let c = Completion { user_data: 1, res: -1, flags: 0 };
646 assert!(!c.is_ok());
647
648 let c = Completion { user_data: 1, res: -libc::EBADF, flags: 0 };
649 assert!(!c.is_ok());
650 }
651
652 #[test]
653 fn test_completion_result() {
654 let c = Completion { user_data: 1, res: 42, flags: 0 };
655 assert_eq!(c.result(), 42);
656
657 let c = Completion { user_data: 1, res: -libc::EINVAL, flags: 0 };
658 assert_eq!(c.result(), -libc::EINVAL);
659 }
660
661 #[test]
662 fn test_completion_user_data() {
663 let c = Completion { user_data: 0xDEADBEEF, res: 0, flags: 0 };
664 assert_eq!(c.user_data(), 0xDEADBEEF);
665
666 let c = Completion { user_data: u64::MAX, res: 0, flags: 0 };
667 assert_eq!(c.user_data(), u64::MAX);
668 }
669
670 #[test]
671 fn test_completion_has_more() {
672 let c = Completion { user_data: 1, res: 0, flags: 0 };
673 assert!(!c.has_more());
674
675 let c =
676 Completion { user_data: 1, res: 0, flags: bindings::IORING_CQE_F_MORE };
677 assert!(c.has_more());
678 }
679
680 #[test]
681 fn test_completion_buffer_id_none() {
682 let c = Completion { user_data: 1, res: 0, flags: 0 };
683 assert_eq!(c.buffer_id(), None);
684 }
685
686 #[test]
687 fn test_completion_buffer_id_some() {
688 let buffer_id: u16 = 42;
689 let flags = bindings::IORING_CQE_F_BUFFER
690 | ((buffer_id as u32) << bindings::IORING_CQE_BUFFER_SHIFT);
691 let c = Completion { user_data: 1, res: 0, flags };
692 assert_eq!(c.buffer_id(), Some(42));
693 }
694
695 #[test]
700 fn test_sqe_flags_none_is_zero() {
701 assert_eq!(SqeFlags::NONE.bits(), 0);
702 }
703
704 #[test]
705 fn test_sqe_flags_individual_values() {
706 assert_ne!(SqeFlags::FIXED_FILE.bits(), 0);
707 assert_ne!(SqeFlags::ASYNC.bits(), 0);
708 assert_ne!(SqeFlags::IO_LINK.bits(), 0);
709 assert_ne!(SqeFlags::IO_DRAIN.bits(), 0);
710 assert_ne!(SqeFlags::IO_HARDLINK.bits(), 0);
711 assert_ne!(SqeFlags::BUFFER_SELECT.bits(), 0);
712 assert_ne!(SqeFlags::CQE_SKIP_SUCCESS.bits(), 0);
713 }
714
715 #[test]
716 fn test_sqe_flags_or_combines() {
717 let combined = SqeFlags::ASYNC.or(SqeFlags::IO_LINK);
718 assert_eq!(
719 combined.bits(),
720 SqeFlags::ASYNC.bits() | SqeFlags::IO_LINK.bits()
721 );
722 }
723
724 #[test]
725 fn test_sqe_flags_bitor_operator() {
726 let combined = SqeFlags::ASYNC | SqeFlags::IO_DRAIN;
727 assert_eq!(
728 combined.bits(),
729 SqeFlags::ASYNC.bits() | SqeFlags::IO_DRAIN.bits()
730 );
731 }
732
733 #[test]
734 fn test_sqe_flags_contains_true() {
735 let flags = SqeFlags::ASYNC | SqeFlags::IO_LINK;
736 assert!(flags.contains(SqeFlags::ASYNC));
737 assert!(flags.contains(SqeFlags::IO_LINK));
738 }
739
740 #[test]
741 fn test_sqe_flags_contains_false() {
742 let flags = SqeFlags::ASYNC | SqeFlags::IO_LINK;
743 assert!(!flags.contains(SqeFlags::FIXED_FILE));
744 assert!(!flags.contains(SqeFlags::IO_DRAIN));
745 }
746
747 #[test]
748 fn test_sqe_flags_contains_none() {
749 let flags = SqeFlags::ASYNC;
750 assert!(flags.contains(SqeFlags::NONE)); }
752
753 #[test]
754 fn test_sqe_flags_multiple_or() {
755 let flags = SqeFlags::ASYNC
756 | SqeFlags::IO_LINK
757 | SqeFlags::FIXED_FILE
758 | SqeFlags::IO_DRAIN;
759 assert!(flags.contains(SqeFlags::ASYNC));
760 assert!(flags.contains(SqeFlags::IO_LINK));
761 assert!(flags.contains(SqeFlags::FIXED_FILE));
762 assert!(flags.contains(SqeFlags::IO_DRAIN));
763 }
764
765 #[test]
770 fn test_params_default() {
771 let params = Params::default();
772 assert_eq!(params.sq_entries, 128);
773 assert_eq!(params.flags, 0);
774 assert_eq!(params.sq_thread_cpu, 0);
775 assert_eq!(params.sq_thread_idle, 0);
776 }
777
778 #[test]
779 fn test_params_sqpoll() {
780 let params = Params::default().sqpoll(1000);
781 assert!((params.flags & bindings::IORING_SETUP_SQPOLL) != 0);
782 assert_eq!(params.sq_thread_idle, 1000);
783 }
784
785 #[test]
786 fn test_params_iopoll() {
787 let params = Params::default().iopoll();
788 assert!((params.flags & bindings::IORING_SETUP_IOPOLL) != 0);
789 }
790
791 #[test]
792 fn test_params_chained() {
793 let params = Params::default().sqpoll(500).iopoll();
794 assert!((params.flags & bindings::IORING_SETUP_SQPOLL) != 0);
795 assert!((params.flags & bindings::IORING_SETUP_IOPOLL) != 0);
796 assert_eq!(params.sq_thread_idle, 500);
797 }
798}