1use core::fmt::{self, Debug};
4use core::mem;
5use core::mem::MaybeUninit;
6use core::sync::atomic;
7
8use crate::sys;
9use crate::util::{private, unsync_load, Mmap};
10
11use bitflags::bitflags;
12use rustix::io::Errno;
13
14pub(crate) struct Inner<E: EntryMarker> {
15 head: *const atomic::AtomicU32,
16 tail: *const atomic::AtomicU32,
17 ring_mask: u32,
18 ring_entries: u32,
19
20 overflow: *const atomic::AtomicU32,
21
22 cqes: *const E,
23
24 flags: *const atomic::AtomicU32,
25}
26
27pub struct CompletionQueue<'a, E: EntryMarker = Entry> {
29 head: u32,
30 tail: u32,
31 queue: &'a Inner<E>,
32}
33
34pub trait EntryMarker: Clone + Debug + Into<Entry> + private::Sealed {
38 const BUILD_FLAGS: sys::IoringSetupFlags;
39}
40
41#[repr(C)]
43pub struct Entry(pub(crate) sys::io_uring_cqe);
44
45#[repr(C)]
47#[derive(Clone)]
48pub struct Entry32(pub(crate) Entry, pub(crate) [u64; 2]);
49
50#[test]
51fn test_entry_sizes() {
52 assert_eq!(size_of::<Entry>(), 16);
53 assert_eq!(size_of::<Entry32>(), 32);
54}
55
56bitflags! {
57 #[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
59 pub struct Flags: u32 {
60 const BUFFER = sys::IoringCqeFlags::BUFFER.bits();
61
62 const MORE = sys::IoringCqeFlags::MORE.bits();
63
64 const SOCK_NONEMPTY = sys::IoringCqeFlags::SOCK_NONEMPTY.bits();
65
66 const NOTIF = sys::IoringCqeFlags::NOTIF.bits();
67 }
68}
69
70impl<E: EntryMarker> Inner<E> {
71 #[rustfmt::skip]
72 pub(crate) unsafe fn new(cq_mmap: &Mmap, p: &sys::io_uring_params) -> Self {
73 let head = cq_mmap.offset(p.cq_off.head ) as *const atomic::AtomicU32;
74 let tail = cq_mmap.offset(p.cq_off.tail ) as *const atomic::AtomicU32;
75 let ring_mask = cq_mmap.offset(p.cq_off.ring_mask ).cast::<u32>().read();
76 let ring_entries = cq_mmap.offset(p.cq_off.ring_entries ).cast::<u32>().read();
77 let overflow = cq_mmap.offset(p.cq_off.overflow ) as *const atomic::AtomicU32;
78 let cqes = cq_mmap.offset(p.cq_off.cqes ) as *const E;
79 let flags = cq_mmap.offset(p.cq_off.flags ) as *const atomic::AtomicU32;
80
81 Self {
82 head,
83 tail,
84 ring_mask,
85 ring_entries,
86 overflow,
87 cqes,
88 flags,
89 }
90 }
91
92 #[inline]
93 pub(crate) unsafe fn borrow_shared(&self) -> CompletionQueue<'_, E> {
94 CompletionQueue {
95 head: unsync_load(self.head),
96 tail: (*self.tail).load(atomic::Ordering::Acquire),
97 queue: self,
98 }
99 }
100
101 #[inline]
102 pub(crate) fn borrow(&mut self) -> CompletionQueue<'_, E> {
103 unsafe { self.borrow_shared() }
104 }
105}
106
107impl<E: EntryMarker> CompletionQueue<'_, E> {
108 #[inline]
113 pub fn sync(&mut self) {
114 unsafe {
115 (*self.queue.head).store(self.head, atomic::Ordering::Release);
116 self.tail = (*self.queue.tail).load(atomic::Ordering::Acquire);
117 }
118 }
119
120 pub fn overflow(&self) -> u32 {
123 unsafe { (*self.queue.overflow).load(atomic::Ordering::Acquire) }
124 }
125
126 pub fn eventfd_disabled(&self) -> bool {
130 unsafe {
131 sys::IoringCqFlags::from_bits_retain(
132 (*self.queue.flags).load(atomic::Ordering::Acquire),
133 )
134 .contains(sys::IoringCqFlags::EVENTFD_DISABLED)
135 }
136 }
137
138 #[inline]
140 pub fn capacity(&self) -> usize {
141 self.queue.ring_entries as usize
142 }
143
144 #[inline]
146 pub fn is_empty(&self) -> bool {
147 self.len() == 0
148 }
149
150 #[inline]
154 pub fn is_full(&self) -> bool {
155 self.len() == self.capacity()
156 }
157
158 #[inline]
159 pub fn fill<'a>(&mut self, entries: &'a mut [MaybeUninit<E>]) -> &'a mut [E] {
160 let len = core::cmp::min(self.len(), entries.len());
161
162 for entry in &mut entries[..len] {
163 entry.write(unsafe { self.pop() });
164 }
165
166 unsafe { core::slice::from_raw_parts_mut(entries as *mut _ as *mut E, len) }
167 }
168
169 #[inline]
170 unsafe fn pop(&mut self) -> E {
171 let entry = &*self
172 .queue
173 .cqes
174 .add((self.head & self.queue.ring_mask) as usize);
175 self.head = self.head.wrapping_add(1);
176 entry.clone()
177 }
178}
179
180impl<E: EntryMarker> Drop for CompletionQueue<'_, E> {
181 #[inline]
182 fn drop(&mut self) {
183 unsafe { &*self.queue.head }.store(self.head, atomic::Ordering::Release);
184 }
185}
186
187impl<E: EntryMarker> Iterator for CompletionQueue<'_, E> {
188 type Item = E;
189
190 #[inline]
191 fn next(&mut self) -> Option<Self::Item> {
192 if self.head != self.tail {
193 Some(unsafe { self.pop() })
194 } else {
195 None
196 }
197 }
198
199 #[inline]
200 fn size_hint(&self) -> (usize, Option<usize>) {
201 (self.len(), Some(self.len()))
202 }
203}
204
205impl<E: EntryMarker> ExactSizeIterator for CompletionQueue<'_, E> {
206 #[inline]
207 fn len(&self) -> usize {
208 self.tail.wrapping_sub(self.head) as usize
209 }
210}
211
212impl Entry {
213 #[inline]
218 pub fn result(&self) -> Result<u32, Errno> {
219 if let Ok(x) = u32::try_from(self.0.res) {
229 Ok(x)
230 } else {
231 Err(Errno::from_raw_os_error(-self.0.res))
232 }
233 }
234
235 #[inline]
238 pub fn raw_result(&self) -> i32 {
239 self.0.res
240 }
241
242 #[inline]
245 pub fn user_data(&self) -> sys::io_uring_user_data {
246 self.0.user_data
247 }
248
249 #[inline]
252 pub fn user_data_u64(&self) -> u64 {
253 self.0.user_data.u64_()
254 }
255
256 #[inline]
259 pub fn user_data_ptr(&self) -> *mut core::ffi::c_void {
260 self.0.user_data.ptr()
261 }
262
263 #[inline]
269 pub fn flags(&self) -> Flags {
270 Flags::from_bits_retain(self.0.flags.bits())
271 }
272}
273
274impl private::Sealed for Entry {}
275
276impl EntryMarker for Entry {
277 const BUILD_FLAGS: sys::IoringSetupFlags = sys::IoringSetupFlags::empty();
278}
279
280impl Clone for Entry {
281 fn clone(&self) -> Entry {
282 Entry(unsafe { mem::transmute_copy(&self.0) })
284 }
285}
286
287impl Debug for Entry {
288 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289 f.debug_struct("Entry")
290 .field("result", &self.result())
291 .field("user_data", &self.user_data())
292 .field("flags", &self.flags())
293 .finish()
294 }
295}
296
297impl Entry32 {
298 #[inline]
303 pub fn result(&self) -> Result<u32, Errno> {
304 if let Ok(x) = u32::try_from(self.0 .0.res) {
306 Ok(x)
307 } else {
308 Err(Errno::from_raw_os_error(-self.0 .0.res))
309 }
310 }
311
312 #[inline]
315 pub fn raw_result(&self) -> i32 {
316 self.0 .0.res
317 }
318
319 #[inline]
322 pub fn user_data(&self) -> sys::io_uring_user_data {
323 self.0 .0.user_data
324 }
325
326 #[inline]
332 pub fn flags(&self) -> Flags {
333 Flags::from_bits_retain(self.0 .0.flags.bits())
334 }
335
336 #[inline]
338 pub fn big_cqe(&self) -> &[u64; 2] {
339 &self.1
340 }
341}
342
343impl private::Sealed for Entry32 {}
344
345impl EntryMarker for Entry32 {
346 const BUILD_FLAGS: sys::IoringSetupFlags = sys::IoringSetupFlags::CQE32;
347}
348
349impl From<Entry32> for Entry {
350 fn from(entry32: Entry32) -> Self {
351 entry32.0
352 }
353}
354
355impl Debug for Entry32 {
356 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
357 f.debug_struct("Entry32")
358 .field("result", &self.result())
359 .field("user_data", &self.user_data())
360 .field("flags", &self.flags())
361 .field("big_cqe", &self.big_cqe())
362 .finish()
363 }
364}
365
366pub fn buffer_select(flags: Flags) -> Option<u16> {
372 if flags.contains(Flags::BUFFER) {
373 let id = flags.bits() >> sys::IORING_CQE_BUFFER_SHIFT;
374
375 Some(id as u16)
379 } else {
380 None
381 }
382}
383
384pub fn more(flags: Flags) -> bool {
391 flags.contains(Flags::MORE)
392}
393
394pub fn sock_nonempty(flags: Flags) -> bool {
402 flags.contains(Flags::SOCK_NONEMPTY)
403}
404
405pub fn notif(flags: Flags) -> bool {
410 flags.contains(Flags::NOTIF)
411}