1//! Completion Queue
23use core::fmt::{self, Debug};
4use core::mem;
5use core::mem::MaybeUninit;
6use core::sync::atomic;
78use crate::sys;
9use crate::util::{private, unsync_load, Mmap};
1011use bitflags::bitflags;
12use rustix::io::Errno;
1314pub(crate) struct Inner<E: EntryMarker> {
15 head: *const atomic::AtomicU32,
16 tail: *const atomic::AtomicU32,
17 ring_mask: u32,
18 ring_entries: u32,
1920 overflow: *const atomic::AtomicU32,
2122 cqes: *const E,
2324 flags: *const atomic::AtomicU32,
25}
2627/// An io_uring instance's completion queue. This stores all the I/O operations that have completed.
28pub struct CompletionQueue<'a, E: EntryMarker = Entry> {
29 head: u32,
30 tail: u32,
31 queue: &'a Inner<E>,
32}
3334/// A completion queue entry (CQE), representing a complete I/O operation.
35///
36/// This is implemented for [`Entry`] and [`Entry32`].
37pub trait EntryMarker: Clone + Debug + Into<Entry> + private::Sealed {
38const BUILD_FLAGS: sys::IoringSetupFlags;
39}
4041/// A 16-byte completion queue entry (CQE), representing a complete I/O operation.
42#[repr(C)]
43pub struct Entry(pub(crate) sys::io_uring_cqe);
4445/// A 32-byte completion queue entry (CQE), representing a complete I/O operation.
46#[repr(C)]
47#[derive(Clone)]
48pub struct Entry32(pub(crate) Entry, pub(crate) [u64; 2]);
4950#[test]
51fn test_entry_sizes() {
52assert_eq!(size_of::<Entry>(), 16);
53assert_eq!(size_of::<Entry32>(), 32);
54}
5556bitflags! {
57/// Completion flags
58#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
59pub struct Flags: u32 {
60const BUFFER = sys::IoringCqeFlags::BUFFER.bits();
6162const MORE = sys::IoringCqeFlags::MORE.bits();
6364const SOCK_NONEMPTY = sys::IoringCqeFlags::SOCK_NONEMPTY.bits();
6566const NOTIF = sys::IoringCqeFlags::NOTIF.bits();
67 }
68}
6970impl<E: EntryMarker> Inner<E> {
71#[rustfmt::skip]
72pub(crate) unsafe fn new(cq_mmap: &Mmap, p: &sys::io_uring_params) -> Self {
73let head = cq_mmap.offset(p.cq_off.head ) as *const atomic::AtomicU32;
74let tail = cq_mmap.offset(p.cq_off.tail ) as *const atomic::AtomicU32;
75let ring_mask = cq_mmap.offset(p.cq_off.ring_mask ).cast::<u32>().read();
76let ring_entries = cq_mmap.offset(p.cq_off.ring_entries ).cast::<u32>().read();
77let overflow = cq_mmap.offset(p.cq_off.overflow ) as *const atomic::AtomicU32;
78let cqes = cq_mmap.offset(p.cq_off.cqes ) as *const E;
79let flags = cq_mmap.offset(p.cq_off.flags ) as *const atomic::AtomicU32;
8081Self {
82 head,
83 tail,
84 ring_mask,
85 ring_entries,
86 overflow,
87 cqes,
88 flags,
89 }
90 }
9192#[inline]
93pub(crate) unsafe fn borrow_shared(&self) -> CompletionQueue<'_, E> {
94 CompletionQueue {
95 head: unsync_load(self.head),
96 tail: (*self.tail).load(atomic::Ordering::Acquire),
97 queue: self,
98 }
99 }
100101#[inline]
102pub(crate) fn borrow(&mut self) -> CompletionQueue<'_, E> {
103unsafe { self.borrow_shared() }
104 }
105}
106107impl<E: EntryMarker> CompletionQueue<'_, E> {
108/// Synchronize this type with the real completion queue.
109 ///
110 /// This will flush any entries consumed in this iterator and will make available new entries
111 /// in the queue if the kernel has produced some entries in the meantime.
112#[inline]
113pub fn sync(&mut self) {
114unsafe {
115 (*self.queue.head).store(self.head, atomic::Ordering::Release);
116self.tail = (*self.queue.tail).load(atomic::Ordering::Acquire);
117 }
118 }
119120/// If queue is full and [`is_feature_nodrop`](crate::Parameters::is_feature_nodrop) is not set,
121 /// new events may be dropped. This records the number of dropped events.
122pub fn overflow(&self) -> u32 {
123unsafe { (*self.queue.overflow).load(atomic::Ordering::Acquire) }
124 }
125126/// Whether eventfd notifications are disabled when a request is completed and queued to the CQ
127 /// ring. This library currently does not provide a way to set it, so this will always be
128 /// `false`.
129pub fn eventfd_disabled(&self) -> bool {
130unsafe {
131 sys::IoringCqFlags::from_bits_retain(
132 (*self.queue.flags).load(atomic::Ordering::Acquire),
133 )
134 .contains(sys::IoringCqFlags::EVENTFD_DISABLED)
135 }
136 }
137138/// Get the total number of entries in the completion queue ring buffer.
139#[inline]
140pub fn capacity(&self) -> usize {
141self.queue.ring_entries as usize
142 }
143144/// Returns `true` if there are no completion queue events to be processed.
145#[inline]
146pub fn is_empty(&self) -> bool {
147self.len() == 0
148}
149150/// Returns `true` if the completion queue is at maximum capacity. If
151 /// [`is_feature_nodrop`](crate::Parameters::is_feature_nodrop) is not set, this will cause any
152 /// new completion queue events to be dropped by the kernel.
153#[inline]
154pub fn is_full(&self) -> bool {
155self.len() == self.capacity()
156 }
157158#[inline]
159pub fn fill<'a>(&mut self, entries: &'a mut [MaybeUninit<E>]) -> &'a mut [E] {
160let len = core::cmp::min(self.len(), entries.len());
161162for entry in &mut entries[..len] {
163 entry.write(unsafe { self.pop() });
164 }
165166unsafe { core::slice::from_raw_parts_mut(entries as *mut _ as *mut E, len) }
167 }
168169#[inline]
170unsafe fn pop(&mut self) -> E {
171let entry = &*self
172.queue
173 .cqes
174 .add((self.head & self.queue.ring_mask) as usize);
175self.head = self.head.wrapping_add(1);
176 entry.clone()
177 }
178}
179180impl<E: EntryMarker> Drop for CompletionQueue<'_, E> {
181#[inline]
182fn drop(&mut self) {
183unsafe { &*self.queue.head }.store(self.head, atomic::Ordering::Release);
184 }
185}
186187impl<E: EntryMarker> Iterator for CompletionQueue<'_, E> {
188type Item = E;
189190#[inline]
191fn next(&mut self) -> Option<Self::Item> {
192if self.head != self.tail {
193Some(unsafe { self.pop() })
194 } else {
195None
196}
197 }
198199#[inline]
200fn size_hint(&self) -> (usize, Option<usize>) {
201 (self.len(), Some(self.len()))
202 }
203}
204205impl<E: EntryMarker> ExactSizeIterator for CompletionQueue<'_, E> {
206#[inline]
207fn len(&self) -> usize {
208self.tail.wrapping_sub(self.head) as usize
209 }
210}
211212impl Entry {
213/// The result of the operation. If the operation succeeded, this is the operation-specific
214 /// return value. For example, for a [`Read`](crate::opcode::Read) operation this is
215 /// equivalent to the return value of the `read(2)` system call. If the operation failed, the
216 /// errno is returned.
217#[inline]
218pub fn result(&self) -> Result<u32, Errno> {
219// The following text is found in many io_uring man pages:
220 //
221 // > Note that where synchronous system calls will return -1 on failure
222 // > and set errno to the actual error value, io_uring never uses errno.
223 // > Instead it returns the negated errno directly in the CQE res field.
224 //
225 // Furthermore, I believe a negative value in the `res` field is
226 // _always_ a negated errno. We return a `Result` instead for
227 // convenience.
228if let Ok(x) = u32::try_from(self.0.res) {
229Ok(x)
230 } else {
231Err(Errno::from_raw_os_error(-self.0.res))
232 }
233 }
234235/// The operation-specific result code. For example, for a [`Read`](crate::opcode::Read)
236 /// operation this is equivalent to the return value of the `read(2)` system call.
237#[inline]
238pub fn raw_result(&self) -> i32 {
239self.0.res
240 }
241242/// The user data of the request, as set by
243 /// [`Entry::user_data`](crate::squeue::Entry::user_data) on the submission queue event.
244#[inline]
245pub fn user_data(&self) -> sys::io_uring_user_data {
246self.0.user_data
247 }
248249/// The user data of the request, as set by
250 /// [`Entry::user_data`](crate::squeue::Entry::user_data) on the submission queue event.
251#[inline]
252pub fn user_data_u64(&self) -> u64 {
253self.0.user_data.u64_()
254 }
255256/// The user data of the request, as set by
257 /// [`Entry::user_data`](crate::squeue::Entry::user_data) on the submission queue event.
258#[inline]
259pub fn user_data_ptr(&self) -> *mut core::ffi::c_void {
260self.0.user_data.ptr()
261 }
262263/// Metadata related to the operation.
264 ///
265 /// This is currently used for:
266 /// - Storing the selected buffer ID, if one was selected. See
267 /// [`BUFFER_SELECT`](crate::squeue::Flags::BUFFER_SELECT) for more info.
268#[inline]
269pub fn flags(&self) -> Flags {
270 Flags::from_bits_retain(self.0.flags.bits())
271 }
272}
273274impl private::Sealed for Entry {}
275276impl EntryMarker for Entry {
277const BUILD_FLAGS: sys::IoringSetupFlags = sys::IoringSetupFlags::empty();
278}
279280impl Clone for Entry {
281fn clone(&self) -> Entry {
282// io_uring_cqe doesn't implement Clone due to the 'big_cqe' incomplete array field.
283Entry(unsafe { mem::transmute_copy(&self.0) })
284 }
285}
286287impl Debug for Entry {
288fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289 f.debug_struct("Entry")
290 .field("result", &self.result())
291 .field("user_data", &self.user_data())
292 .field("flags", &self.flags())
293 .finish()
294 }
295}
296297impl Entry32 {
298/// The result of the operation. If the operation succeeded, this is the operation-specific
299 /// return value. For example, for a [`Read`](crate::opcode::Read) operation this is
300 /// equivalent to the return value of the `read(2)` system call. If the operation failed, the
301 /// errno is returned.
302#[inline]
303pub fn result(&self) -> Result<u32, Errno> {
304// See Entry::result() for the justification for this logic.
305if let Ok(x) = u32::try_from(self.0 .0.res) {
306Ok(x)
307 } else {
308Err(Errno::from_raw_os_error(-self.0 .0.res))
309 }
310 }
311312/// The operation-specific result code. For example, for a [`Read`](crate::opcode::Read)
313 /// operation this is equivalent to the return value of the `read(2)` system call.
314#[inline]
315pub fn raw_result(&self) -> i32 {
316self.0 .0.res
317 }
318319/// The user data of the request, as set by
320 /// [`Entry::user_data`](crate::squeue::Entry::user_data) on the submission queue event.
321#[inline]
322pub fn user_data(&self) -> sys::io_uring_user_data {
323self.0 .0.user_data
324 }
325326/// Metadata related to the operation.
327 ///
328 /// This is currently used for:
329 /// - Storing the selected buffer ID, if one was selected. See
330 /// [`BUFFER_SELECT`](crate::squeue::Flags::BUFFER_SELECT) for more info.
331#[inline]
332pub fn flags(&self) -> Flags {
333 Flags::from_bits_retain(self.0 .0.flags.bits())
334 }
335336/// Additional data available in 32-byte completion queue entries (CQEs).
337#[inline]
338pub fn big_cqe(&self) -> &[u64; 2] {
339&self.1
340}
341}
342343impl private::Sealed for Entry32 {}
344345impl EntryMarker for Entry32 {
346const BUILD_FLAGS: sys::IoringSetupFlags = sys::IoringSetupFlags::CQE32;
347}
348349impl From<Entry32> for Entry {
350fn from(entry32: Entry32) -> Self {
351 entry32.0
352}
353}
354355impl Debug for Entry32 {
356fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
357 f.debug_struct("Entry32")
358 .field("result", &self.result())
359 .field("user_data", &self.user_data())
360 .field("flags", &self.flags())
361 .field("big_cqe", &self.big_cqe())
362 .finish()
363 }
364}
365366/// Return which dynamic buffer was used by this operation.
367///
368/// This corresponds to the `IORING_CQE_F_BUFFER` flag (and related bit-shifting),
369/// and it signals to the consumer which provided contains the result of this
370/// operation.
371pub fn buffer_select(flags: Flags) -> Option<u16> {
372if flags.contains(Flags::BUFFER) {
373let id = flags.bits() >> sys::IORING_CQE_BUFFER_SHIFT;
374375// FIXME
376 //
377 // Should we return u16? maybe kernel will change value of `IORING_CQE_BUFFER_SHIFT` in future.
378Some(id as u16)
379 } else {
380None
381}
382}
383384/// Return whether further completion events will be submitted for
385/// this same operation.
386///
387/// This corresponds to the `IORING_CQE_F_MORE` flag, and it signals to
388/// the consumer that it should expect further CQE entries after this one,
389/// still from the same original SQE request (e.g. for multishot operations).
390pub fn more(flags: Flags) -> bool {
391 flags.contains(Flags::MORE)
392}
393394/// Return whether socket has more data ready to read.
395///
396/// This corresponds to the `IORING_CQE_F_SOCK_NONEMPTY` flag, and it signals to
397/// the consumer that the socket has more data that can be read immediately.
398///
399/// The io_uring documentation says recv, recv-multishot, recvmsg, and recvmsg-multishot
400/// can provide this bit in their respective CQE.
401pub fn sock_nonempty(flags: Flags) -> bool {
402 flags.contains(Flags::SOCK_NONEMPTY)
403}
404405/// Returns whether this completion event is a notification.
406///
407/// This corresponds to the `IORING_CQE_F_NOTIF` flag,
408/// currently used by the [SendZc](crate::opcode::SendZc) operation.
409pub fn notif(flags: Flags) -> bool {
410 flags.contains(Flags::NOTIF)
411}