1use std::sync::atomic;
4use bitflags::bitflags;
5use linux_io_uring_sys as sys;
6use crate::util::{ Mmap, unsync_load };
7use crate::mmap_offset;
8
9
10pub struct SubmissionQueue {
11 pub(crate) head: *const atomic::AtomicU32,
12 pub(crate) tail: *const atomic::AtomicU32,
13 pub(crate) ring_mask: *const u32,
14 pub(crate) ring_entries: *const u32,
15 pub(crate) flags: *const atomic::AtomicU32,
16 dropped: *const atomic::AtomicU32,
17
18 #[allow(dead_code)]
19 array: *mut u32,
20
21 pub(crate) sqes: *mut sys::io_uring_sqe
22}
23
24pub struct AvailableQueue<'a> {
25 head: u32,
26 tail: u32,
27 ring_mask: u32,
28 ring_entries: u32,
29 queue: &'a mut SubmissionQueue
30}
31
32#[repr(transparent)]
34#[derive(Clone)]
35pub struct Entry(pub(crate) sys::io_uring_sqe);
36
37bitflags!{
38 pub struct Flags: u8 {
40 #[doc(hidden)]
43 const FIXED_FILE = sys::IOSQE_FIXED_FILE as _;
44
45 const IO_DRAIN = sys::IOSQE_IO_DRAIN as _;
49
50 const IO_LINK = sys::IOSQE_IO_LINK as _;
54
55 #[cfg(feature = "unstable")]
56 const IO_HARDLINK = sys::IOSQE_IO_HARDLINK as _;
57 }
58}
59
60impl SubmissionQueue {
61 pub(crate) unsafe fn new(sq_mmap: &Mmap, sqe_mmap: &Mmap, p: &sys::io_uring_params) -> SubmissionQueue {
62 mmap_offset!{
63 let head = sq_mmap + p.sq_off.head => *const atomic::AtomicU32;
64 let tail = sq_mmap + p.sq_off.tail => *const atomic::AtomicU32;
65 let ring_mask = sq_mmap + p.sq_off.ring_mask => *const u32;
66 let ring_entries = sq_mmap + p.sq_off.ring_entries => *const u32;
67 let flags = sq_mmap + p.sq_off.flags => *const atomic::AtomicU32;
68 let dropped = sq_mmap + p.sq_off.dropped => *const atomic::AtomicU32;
69 let array = sq_mmap + p.sq_off.array => *mut u32;
70
71 let sqes = sqe_mmap + 0 => *mut sys::io_uring_sqe;
72 }
73
74 for i in 0..*ring_entries {
76 *array.add(i as usize) = i;
77 }
78
79 SubmissionQueue {
80 head, tail,
81 ring_mask, ring_entries,
82 flags, dropped,
83 array,
84 sqes
85 }
86 }
87
88 pub fn need_wakeup(&self) -> bool {
89 unsafe {
90 (*self.flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_NEED_WAKEUP
91 != 0
92 }
93 }
94
95 pub fn dropped(&self) -> u32 {
96 unsafe {
97 (*self.dropped).load(atomic::Ordering::Acquire)
98 }
99 }
100
101 pub fn capacity(&self) -> usize {
102 unsafe {
103 self.ring_entries.read_volatile() as usize
104 }
105 }
106
107 pub fn len(&self) -> usize {
108 let head = unsafe { (*self.head).load(atomic::Ordering::Acquire) };
109 let tail = unsafe { unsync_load(self.tail) };
110
111 tail.wrapping_sub(head) as usize
112 }
113
114 pub fn is_empty(&self) -> bool {
115 let head = unsafe { (*self.head).load(atomic::Ordering::Acquire) };
116 let tail = unsafe { unsync_load(self.tail) };
117
118 head == tail
119 }
120
121 pub fn is_full(&self) -> bool {
122 self.len() == self.capacity()
123 }
124
125 pub fn available(&mut self) -> AvailableQueue<'_> {
127 unsafe {
128 AvailableQueue {
129 head: (*self.head).load(atomic::Ordering::Acquire),
130 tail: unsync_load(self.tail),
131 ring_mask: self.ring_mask.read_volatile(),
132 ring_entries: self.ring_entries.read_volatile(),
133 queue: self
134 }
135 }
136 }
137}
138
139impl AvailableQueue<'_> {
140 pub fn sync(&mut self) {
142 unsafe {
143 (*self.queue.tail).store(self.tail, atomic::Ordering::Release);
144 self.head = (*self.queue.head).load(atomic::Ordering::Acquire);
145 }
146 }
147
148 pub fn capacity(&self) -> usize {
149 self.ring_entries as usize
150 }
151
152 pub fn len(&self) -> usize {
153 self.tail.wrapping_sub(self.head) as usize
154 }
155
156 pub fn is_empty(&self) -> bool {
157 self.head == self.tail
158 }
159
160 pub fn is_full(&self) -> bool {
161 self.tail.wrapping_sub(self.head) == self.ring_entries
162 }
163
164 pub unsafe fn push(&mut self, Entry(entry): Entry) -> Result<(), Entry> {
172 if self.len() < self.capacity() {
173 *self.queue.sqes.add((self.tail & self.ring_mask) as usize)
174 = entry;
175 self.tail = self.tail.wrapping_add(1);
176 Ok(())
177 } else {
178 Err(Entry(entry))
179 }
180 }
181}
182
183impl Drop for AvailableQueue<'_> {
184 fn drop(&mut self) {
185 unsafe {
186 (*self.queue.tail).store(self.tail, atomic::Ordering::Release);
187 }
188 }
189}
190
191impl Entry {
192 pub fn flags(mut self, flags: Flags) -> Entry {
194 self.0.flags |= flags.bits();
195 self
196 }
197
198 pub fn user_data(mut self, user_data: u64) -> Entry {
201 self.0.user_data = user_data;
202 self
203 }
204}