linux_io_uring/
submit.rs

1use std::{ io, ptr };
2use std::sync::atomic;
3use std::os::unix::io::AsRawFd;
4use linux_io_uring_sys as sys;
5use crate::register::{ register as reg, unregister as unreg };
6use crate::squeue::SubmissionQueue;
7use crate::util::{ Fd, unsync_load };
8
9
10/// Submitter
11pub struct Submitter<'a> {
12    fd: &'a Fd,
13    flags: u32,
14
15    sq_head: *const atomic::AtomicU32,
16    sq_tail: *const atomic::AtomicU32,
17    sq_flags: *const atomic::AtomicU32
18}
19
20impl<'a> Submitter<'a> {
21    #[inline]
22    pub(crate) const fn new(fd: &'a Fd, flags: u32, sq: &SubmissionQueue) -> Submitter<'a> {
23        Submitter {
24            fd, flags,
25            sq_head: sq.head,
26            sq_tail: sq.tail,
27            sq_flags: sq.flags
28        }
29    }
30
31    fn sq_len(&self) -> usize {
32        let head = unsafe { (*self.sq_head).load(atomic::Ordering::Acquire) };
33        let tail = unsafe { unsync_load(self.sq_tail) };
34
35        tail.wrapping_sub(head) as usize
36    }
37
38    fn sq_need_wakeup(&self) -> bool {
39        unsafe {
40            (*self.sq_flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_NEED_WAKEUP
41                != 0
42        }
43    }
44
45    /// Register files or user buffers for asynchronous I/O.
46    #[inline]
47    pub unsafe fn register(&self, target: reg::Target<'_>) -> io::Result<()> {
48        target.execute(self.fd.as_raw_fd())
49    }
50
51    /// Unregister files or user buffers for asynchronous I/O.
52    #[inline]
53    pub fn unregister(&self, target: unreg::Target) -> io::Result<()> {
54        target.execute(self.fd.as_raw_fd())
55    }
56
57    /// Initiate and/or complete asynchronous I/O
58    ///
59    /// # Safety
60    ///
61    /// This provides a raw interface so developer must ensure that parameters are correct.
62    pub unsafe fn enter(&self, to_submit: u32, min_complete: u32, flag: u32, sig: Option<&libc::sigset_t>)
63        -> io::Result<usize>
64    {
65        let sig = sig.map(|sig| sig as *const _).unwrap_or_else(ptr::null);
66        let result = sys::io_uring_enter(self.fd.as_raw_fd(), to_submit, min_complete, flag, sig);
67        if result >= 0 {
68            Ok(result as _)
69        } else {
70            Err(io::Error::last_os_error())
71        }
72    }
73
74    /// Initiate asynchronous I/O.
75    #[inline]
76    pub fn submit(&self) -> io::Result<usize> {
77        self.submit_and_wait(0)
78    }
79
80    /// Initiate and/or complete asynchronous I/O
81    pub fn submit_and_wait(&self, want: usize) -> io::Result<usize> {
82        let len = self.sq_len();
83
84        let mut flags = 0;
85
86        if want > 0 {
87            flags |= sys::IORING_ENTER_GETEVENTS;
88        }
89
90        if self.flags & sys::IORING_SETUP_SQPOLL != 0 {
91            if self.sq_need_wakeup() {
92                flags |= sys::IORING_ENTER_SQ_WAKEUP;
93            } else if want == 0 {
94                // fast poll
95                return Ok(len)
96            }
97        }
98
99        unsafe {
100            self.enter(len as _, want as _, flags, None)
101        }
102    }
103}