1use core::fmt::{self, Debug};
4use core::mem;
5use core::mem::MaybeUninit;
6use core::sync::atomic;
7
8use crate::sys;
9use crate::util::{private, unsync_load, Mmap};
10
11use bitflags::bitflags;
12
13pub(crate) struct Inner<E: EntryMarker> {
14 head: *const atomic::AtomicU32,
15 tail: *const atomic::AtomicU32,
16 ring_mask: u32,
17 ring_entries: u32,
18
19 overflow: *const atomic::AtomicU32,
20
21 cqes: *const E,
22
23 flags: *const atomic::AtomicU32,
24}
25
26pub struct CompletionQueue<'a, E: EntryMarker = Entry> {
28 head: u32,
29 tail: u32,
30 queue: &'a Inner<E>,
31}
32
33pub trait EntryMarker: Clone + Debug + Into<Entry> + private::Sealed {
37 const BUILD_FLAGS: sys::IoringSetupFlags;
38}
39
40#[repr(C)]
42pub struct Entry(pub(crate) sys::io_uring_cqe);
43
44#[repr(C)]
46#[derive(Clone)]
47pub struct Entry32(pub(crate) Entry, pub(crate) [u64; 2]);
48
49#[test]
50fn test_entry_sizes() {
51 assert_eq!(size_of::<Entry>(), 16);
52 assert_eq!(size_of::<Entry32>(), 32);
53}
54
55bitflags! {
56 #[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
58 pub struct Flags: u32 {
59 const BUFFER = sys::IoringCqeFlags::BUFFER.bits();
60
61 const MORE = sys::IoringCqeFlags::MORE.bits();
62
63 const SOCK_NONEMPTY = sys::IoringCqeFlags::SOCK_NONEMPTY.bits();
64
65 const NOTIF = sys::IoringCqeFlags::NOTIF.bits();
66 }
67}
68
69impl<E: EntryMarker> Inner<E> {
70 #[rustfmt::skip]
71 pub(crate) unsafe fn new(cq_mmap: &Mmap, p: &sys::io_uring_params) -> Self {
72 let head = cq_mmap.offset(p.cq_off.head ) as *const atomic::AtomicU32;
73 let tail = cq_mmap.offset(p.cq_off.tail ) as *const atomic::AtomicU32;
74 let ring_mask = cq_mmap.offset(p.cq_off.ring_mask ).cast::<u32>().read();
75 let ring_entries = cq_mmap.offset(p.cq_off.ring_entries ).cast::<u32>().read();
76 let overflow = cq_mmap.offset(p.cq_off.overflow ) as *const atomic::AtomicU32;
77 let cqes = cq_mmap.offset(p.cq_off.cqes ) as *const E;
78 let flags = cq_mmap.offset(p.cq_off.flags ) as *const atomic::AtomicU32;
79
80 Self {
81 head,
82 tail,
83 ring_mask,
84 ring_entries,
85 overflow,
86 cqes,
87 flags,
88 }
89 }
90
91 #[inline]
92 pub(crate) unsafe fn borrow_shared(&self) -> CompletionQueue<'_, E> {
93 CompletionQueue {
94 head: unsync_load(self.head),
95 tail: (*self.tail).load(atomic::Ordering::Acquire),
96 queue: self,
97 }
98 }
99
100 #[inline]
101 pub(crate) fn borrow(&mut self) -> CompletionQueue<'_, E> {
102 unsafe { self.borrow_shared() }
103 }
104}
105
106impl<E: EntryMarker> CompletionQueue<'_, E> {
107 #[inline]
112 pub fn sync(&mut self) {
113 unsafe {
114 (*self.queue.head).store(self.head, atomic::Ordering::Release);
115 self.tail = (*self.queue.tail).load(atomic::Ordering::Acquire);
116 }
117 }
118
119 pub fn overflow(&self) -> u32 {
122 unsafe { (*self.queue.overflow).load(atomic::Ordering::Acquire) }
123 }
124
125 pub fn eventfd_disabled(&self) -> bool {
129 unsafe {
130 sys::IoringCqFlags::from_bits_retain(
131 (*self.queue.flags).load(atomic::Ordering::Acquire),
132 )
133 .contains(sys::IoringCqFlags::EVENTFD_DISABLED)
134 }
135 }
136
137 #[inline]
139 pub fn capacity(&self) -> usize {
140 self.queue.ring_entries as usize
141 }
142
143 #[inline]
145 pub fn is_empty(&self) -> bool {
146 self.len() == 0
147 }
148
149 #[inline]
153 pub fn is_full(&self) -> bool {
154 self.len() == self.capacity()
155 }
156
157 #[inline]
158 pub fn fill<'a>(&mut self, entries: &'a mut [MaybeUninit<E>]) -> &'a mut [E] {
159 let len = core::cmp::min(self.len(), entries.len());
160
161 for entry in &mut entries[..len] {
162 entry.write(unsafe { self.pop() });
163 }
164
165 unsafe { core::slice::from_raw_parts_mut(entries as *mut _ as *mut E, len) }
166 }
167
168 #[inline]
169 unsafe fn pop(&mut self) -> E {
170 let entry = &*self
171 .queue
172 .cqes
173 .add((self.head & self.queue.ring_mask) as usize);
174 self.head = self.head.wrapping_add(1);
175 entry.clone()
176 }
177}
178
179impl<E: EntryMarker> Drop for CompletionQueue<'_, E> {
180 #[inline]
181 fn drop(&mut self) {
182 unsafe { &*self.queue.head }.store(self.head, atomic::Ordering::Release);
183 }
184}
185
186impl<E: EntryMarker> Iterator for CompletionQueue<'_, E> {
187 type Item = E;
188
189 #[inline]
190 fn next(&mut self) -> Option<Self::Item> {
191 if self.head != self.tail {
192 Some(unsafe { self.pop() })
193 } else {
194 None
195 }
196 }
197
198 #[inline]
199 fn size_hint(&self) -> (usize, Option<usize>) {
200 (self.len(), Some(self.len()))
201 }
202}
203
204impl<E: EntryMarker> ExactSizeIterator for CompletionQueue<'_, E> {
205 #[inline]
206 fn len(&self) -> usize {
207 self.tail.wrapping_sub(self.head) as usize
208 }
209}
210
211impl Entry {
212 #[inline]
215 pub fn result(&self) -> i32 {
216 self.0.res
217 }
218
219 #[inline]
222 pub fn user_data(&self) -> sys::io_uring_user_data {
223 self.0.user_data
224 }
225
226 #[inline]
229 pub fn user_data_u64(&self) -> u64 {
230 self.0.user_data.u64_()
231 }
232
233 #[inline]
236 pub fn user_data_ptr(&self) -> *mut core::ffi::c_void {
237 self.0.user_data.ptr()
238 }
239
240 #[inline]
246 pub fn flags(&self) -> Flags {
247 Flags::from_bits_retain(self.0.flags.bits())
248 }
249}
250
251impl private::Sealed for Entry {}
252
253impl EntryMarker for Entry {
254 const BUILD_FLAGS: sys::IoringSetupFlags = sys::IoringSetupFlags::empty();
255}
256
257impl Clone for Entry {
258 fn clone(&self) -> Entry {
259 Entry(unsafe { mem::transmute_copy(&self.0) })
261 }
262}
263
264impl Debug for Entry {
265 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
266 f.debug_struct("Entry")
267 .field("result", &self.result())
268 .field("user_data", &self.user_data())
269 .field("flags", &self.flags())
270 .finish()
271 }
272}
273
274impl Entry32 {
275 #[inline]
278 pub fn result(&self) -> i32 {
279 self.0 .0.res
280 }
281
282 #[inline]
285 pub fn user_data(&self) -> sys::io_uring_user_data {
286 self.0 .0.user_data
287 }
288
289 #[inline]
295 pub fn flags(&self) -> Flags {
296 Flags::from_bits_retain(self.0 .0.flags.bits())
297 }
298
299 #[inline]
301 pub fn big_cqe(&self) -> &[u64; 2] {
302 &self.1
303 }
304}
305
306impl private::Sealed for Entry32 {}
307
308impl EntryMarker for Entry32 {
309 const BUILD_FLAGS: sys::IoringSetupFlags = sys::IoringSetupFlags::CQE32;
310}
311
312impl From<Entry32> for Entry {
313 fn from(entry32: Entry32) -> Self {
314 entry32.0
315 }
316}
317
318impl Debug for Entry32 {
319 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
320 f.debug_struct("Entry32")
321 .field("result", &self.result())
322 .field("user_data", &self.user_data())
323 .field("flags", &self.flags())
324 .field("big_cqe", &self.big_cqe())
325 .finish()
326 }
327}
328
329pub fn buffer_select(flags: Flags) -> Option<u16> {
335 if flags.contains(Flags::BUFFER) {
336 let id = flags.bits() >> sys::IORING_CQE_BUFFER_SHIFT;
337
338 Some(id as u16)
342 } else {
343 None
344 }
345}
346
347pub fn more(flags: Flags) -> bool {
354 flags.contains(Flags::MORE)
355}
356
357pub fn sock_nonempty(flags: Flags) -> bool {
365 flags.contains(Flags::SOCK_NONEMPTY)
366}
367
368pub fn notif(flags: Flags) -> bool {
373 flags.contains(Flags::NOTIF)
374}