1use std::{ io, ptr };
2use std::sync::atomic;
3use std::os::unix::io::AsRawFd;
4use linux_io_uring_sys as sys;
5use crate::register::{ register as reg, unregister as unreg };
6use crate::squeue::SubmissionQueue;
7use crate::util::{ Fd, unsync_load };
8
9
10pub struct Submitter<'a> {
12 fd: &'a Fd,
13 flags: u32,
14
15 sq_head: *const atomic::AtomicU32,
16 sq_tail: *const atomic::AtomicU32,
17 sq_flags: *const atomic::AtomicU32
18}
19
20impl<'a> Submitter<'a> {
21 #[inline]
22 pub(crate) const fn new(fd: &'a Fd, flags: u32, sq: &SubmissionQueue) -> Submitter<'a> {
23 Submitter {
24 fd, flags,
25 sq_head: sq.head,
26 sq_tail: sq.tail,
27 sq_flags: sq.flags
28 }
29 }
30
31 fn sq_len(&self) -> usize {
32 let head = unsafe { (*self.sq_head).load(atomic::Ordering::Acquire) };
33 let tail = unsafe { unsync_load(self.sq_tail) };
34
35 tail.wrapping_sub(head) as usize
36 }
37
38 fn sq_need_wakeup(&self) -> bool {
39 unsafe {
40 (*self.sq_flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_NEED_WAKEUP
41 != 0
42 }
43 }
44
45 #[inline]
47 pub unsafe fn register(&self, target: reg::Target<'_>) -> io::Result<()> {
48 target.execute(self.fd.as_raw_fd())
49 }
50
51 #[inline]
53 pub fn unregister(&self, target: unreg::Target) -> io::Result<()> {
54 target.execute(self.fd.as_raw_fd())
55 }
56
57 pub unsafe fn enter(&self, to_submit: u32, min_complete: u32, flag: u32, sig: Option<&libc::sigset_t>)
63 -> io::Result<usize>
64 {
65 let sig = sig.map(|sig| sig as *const _).unwrap_or_else(ptr::null);
66 let result = sys::io_uring_enter(self.fd.as_raw_fd(), to_submit, min_complete, flag, sig);
67 if result >= 0 {
68 Ok(result as _)
69 } else {
70 Err(io::Error::last_os_error())
71 }
72 }
73
74 #[inline]
76 pub fn submit(&self) -> io::Result<usize> {
77 self.submit_and_wait(0)
78 }
79
80 pub fn submit_and_wait(&self, want: usize) -> io::Result<usize> {
82 let len = self.sq_len();
83
84 let mut flags = 0;
85
86 if want > 0 {
87 flags |= sys::IORING_ENTER_GETEVENTS;
88 }
89
90 if self.flags & sys::IORING_SETUP_SQPOLL != 0 {
91 if self.sq_need_wakeup() {
92 flags |= sys::IORING_ENTER_SQ_WAKEUP;
93 } else if want == 0 {
94 return Ok(len)
96 }
97 }
98
99 unsafe {
100 self.enter(len as _, want as _, flags, None)
101 }
102 }
103}