rustix_uring/
cqueue.rs

1//! Completion Queue
2
3use core::fmt::{self, Debug};
4use core::mem;
5use core::mem::MaybeUninit;
6use core::sync::atomic;
7
8use crate::sys;
9use crate::util::{private, unsync_load, Mmap};
10
11use bitflags::bitflags;
12use rustix::io::Errno;
13
14pub(crate) struct Inner<E: EntryMarker> {
15    head: *const atomic::AtomicU32,
16    tail: *const atomic::AtomicU32,
17    ring_mask: u32,
18    ring_entries: u32,
19
20    overflow: *const atomic::AtomicU32,
21
22    cqes: *const E,
23
24    flags: *const atomic::AtomicU32,
25}
26
27/// An io_uring instance's completion queue. This stores all the I/O operations that have completed.
28pub struct CompletionQueue<'a, E: EntryMarker = Entry> {
29    head: u32,
30    tail: u32,
31    queue: &'a Inner<E>,
32}
33
34/// A completion queue entry (CQE), representing a complete I/O operation.
35///
36/// This is implemented for [`Entry`] and [`Entry32`].
37pub trait EntryMarker: Clone + Debug + Into<Entry> + private::Sealed {
38    const BUILD_FLAGS: sys::IoringSetupFlags;
39}
40
41/// A 16-byte completion queue entry (CQE), representing a complete I/O operation.
42#[repr(C)]
43pub struct Entry(pub(crate) sys::io_uring_cqe);
44
45/// A 32-byte completion queue entry (CQE), representing a complete I/O operation.
46#[repr(C)]
47#[derive(Clone)]
48pub struct Entry32(pub(crate) Entry, pub(crate) [u64; 2]);
49
50#[test]
51fn test_entry_sizes() {
52    assert_eq!(size_of::<Entry>(), 16);
53    assert_eq!(size_of::<Entry32>(), 32);
54}
55
56bitflags! {
57    /// Completion flags
58    #[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
59    pub struct Flags: u32 {
60        const BUFFER = sys::IoringCqeFlags::BUFFER.bits();
61
62        const MORE = sys::IoringCqeFlags::MORE.bits();
63
64        const SOCK_NONEMPTY = sys::IoringCqeFlags::SOCK_NONEMPTY.bits();
65
66        const NOTIF = sys::IoringCqeFlags::NOTIF.bits();
67    }
68}
69
70impl<E: EntryMarker> Inner<E> {
71    #[rustfmt::skip]
72    pub(crate) unsafe fn new(cq_mmap: &Mmap, p: &sys::io_uring_params) -> Self {
73        let head         = cq_mmap.offset(p.cq_off.head         ) as *const atomic::AtomicU32;
74        let tail         = cq_mmap.offset(p.cq_off.tail         ) as *const atomic::AtomicU32;
75        let ring_mask    = cq_mmap.offset(p.cq_off.ring_mask    ).cast::<u32>().read();
76        let ring_entries = cq_mmap.offset(p.cq_off.ring_entries ).cast::<u32>().read();
77        let overflow     = cq_mmap.offset(p.cq_off.overflow     ) as *const atomic::AtomicU32;
78        let cqes         = cq_mmap.offset(p.cq_off.cqes         ) as *const E;
79        let flags        = cq_mmap.offset(p.cq_off.flags        ) as *const atomic::AtomicU32;
80
81        Self {
82            head,
83            tail,
84            ring_mask,
85            ring_entries,
86            overflow,
87            cqes,
88            flags,
89        }
90    }
91
92    #[inline]
93    pub(crate) unsafe fn borrow_shared(&self) -> CompletionQueue<'_, E> {
94        CompletionQueue {
95            head: unsync_load(self.head),
96            tail: (*self.tail).load(atomic::Ordering::Acquire),
97            queue: self,
98        }
99    }
100
101    #[inline]
102    pub(crate) fn borrow(&mut self) -> CompletionQueue<'_, E> {
103        unsafe { self.borrow_shared() }
104    }
105}
106
107impl<E: EntryMarker> CompletionQueue<'_, E> {
108    /// Synchronize this type with the real completion queue.
109    ///
110    /// This will flush any entries consumed in this iterator and will make available new entries
111    /// in the queue if the kernel has produced some entries in the meantime.
112    #[inline]
113    pub fn sync(&mut self) {
114        unsafe {
115            (*self.queue.head).store(self.head, atomic::Ordering::Release);
116            self.tail = (*self.queue.tail).load(atomic::Ordering::Acquire);
117        }
118    }
119
120    /// If queue is full and [`is_feature_nodrop`](crate::Parameters::is_feature_nodrop) is not set,
121    /// new events may be dropped. This records the number of dropped events.
122    pub fn overflow(&self) -> u32 {
123        unsafe { (*self.queue.overflow).load(atomic::Ordering::Acquire) }
124    }
125
126    /// Whether eventfd notifications are disabled when a request is completed and queued to the CQ
127    /// ring. This library currently does not provide a way to set it, so this will always be
128    /// `false`.
129    pub fn eventfd_disabled(&self) -> bool {
130        unsafe {
131            sys::IoringCqFlags::from_bits_retain(
132                (*self.queue.flags).load(atomic::Ordering::Acquire),
133            )
134            .contains(sys::IoringCqFlags::EVENTFD_DISABLED)
135        }
136    }
137
138    /// Get the total number of entries in the completion queue ring buffer.
139    #[inline]
140    pub fn capacity(&self) -> usize {
141        self.queue.ring_entries as usize
142    }
143
144    /// Returns `true` if there are no completion queue events to be processed.
145    #[inline]
146    pub fn is_empty(&self) -> bool {
147        self.len() == 0
148    }
149
150    /// Returns `true` if the completion queue is at maximum capacity. If
151    /// [`is_feature_nodrop`](crate::Parameters::is_feature_nodrop) is not set, this will cause any
152    /// new completion queue events to be dropped by the kernel.
153    #[inline]
154    pub fn is_full(&self) -> bool {
155        self.len() == self.capacity()
156    }
157
158    #[inline]
159    pub fn fill<'a>(&mut self, entries: &'a mut [MaybeUninit<E>]) -> &'a mut [E] {
160        let len = core::cmp::min(self.len(), entries.len());
161
162        for entry in &mut entries[..len] {
163            entry.write(unsafe { self.pop() });
164        }
165
166        unsafe { core::slice::from_raw_parts_mut(entries as *mut _ as *mut E, len) }
167    }
168
169    #[inline]
170    unsafe fn pop(&mut self) -> E {
171        let entry = &*self
172            .queue
173            .cqes
174            .add((self.head & self.queue.ring_mask) as usize);
175        self.head = self.head.wrapping_add(1);
176        entry.clone()
177    }
178}
179
180impl<E: EntryMarker> Drop for CompletionQueue<'_, E> {
181    #[inline]
182    fn drop(&mut self) {
183        unsafe { &*self.queue.head }.store(self.head, atomic::Ordering::Release);
184    }
185}
186
187impl<E: EntryMarker> Iterator for CompletionQueue<'_, E> {
188    type Item = E;
189
190    #[inline]
191    fn next(&mut self) -> Option<Self::Item> {
192        if self.head != self.tail {
193            Some(unsafe { self.pop() })
194        } else {
195            None
196        }
197    }
198
199    #[inline]
200    fn size_hint(&self) -> (usize, Option<usize>) {
201        (self.len(), Some(self.len()))
202    }
203}
204
205impl<E: EntryMarker> ExactSizeIterator for CompletionQueue<'_, E> {
206    #[inline]
207    fn len(&self) -> usize {
208        self.tail.wrapping_sub(self.head) as usize
209    }
210}
211
212impl Entry {
213    /// The result of the operation.  If the operation succeeded, this is the operation-specific
214    /// return value.  For example, for a [`Read`](crate::opcode::Read) operation this is
215    /// equivalent to the return value of the `read(2)` system call.  If the operation failed, the
216    /// errno is returned.
217    #[inline]
218    pub fn result(&self) -> Result<u32, Errno> {
219        // The following text is found in many io_uring man pages:
220        //
221        // > Note that where synchronous system calls will return -1 on failure
222        // > and set errno to the actual error value, io_uring never uses errno.
223        // > Instead it returns the negated errno directly in the CQE res field.
224        //
225        // Furthermore, I believe a negative value in the `res` field is
226        // _always_ a negated errno.  We return a `Result` instead for
227        // convenience.
228        if let Ok(x) = u32::try_from(self.0.res) {
229            Ok(x)
230        } else {
231            Err(Errno::from_raw_os_error(-self.0.res))
232        }
233    }
234
235    /// The operation-specific result code. For example, for a [`Read`](crate::opcode::Read)
236    /// operation this is equivalent to the return value of the `read(2)` system call.
237    #[inline]
238    pub fn raw_result(&self) -> i32 {
239        self.0.res
240    }
241
242    /// The user data of the request, as set by
243    /// [`Entry::user_data`](crate::squeue::Entry::user_data) on the submission queue event.
244    #[inline]
245    pub fn user_data(&self) -> sys::io_uring_user_data {
246        self.0.user_data
247    }
248
249    /// The user data of the request, as set by
250    /// [`Entry::user_data`](crate::squeue::Entry::user_data) on the submission queue event.
251    #[inline]
252    pub fn user_data_u64(&self) -> u64 {
253        self.0.user_data.u64_()
254    }
255
256    /// The user data of the request, as set by
257    /// [`Entry::user_data`](crate::squeue::Entry::user_data) on the submission queue event.
258    #[inline]
259    pub fn user_data_ptr(&self) -> *mut core::ffi::c_void {
260        self.0.user_data.ptr()
261    }
262
263    /// Metadata related to the operation.
264    ///
265    /// This is currently used for:
266    /// - Storing the selected buffer ID, if one was selected. See
267    ///   [`BUFFER_SELECT`](crate::squeue::Flags::BUFFER_SELECT) for more info.
268    #[inline]
269    pub fn flags(&self) -> Flags {
270        Flags::from_bits_retain(self.0.flags.bits())
271    }
272}
273
274impl private::Sealed for Entry {}
275
276impl EntryMarker for Entry {
277    const BUILD_FLAGS: sys::IoringSetupFlags = sys::IoringSetupFlags::empty();
278}
279
280impl Clone for Entry {
281    fn clone(&self) -> Entry {
282        // io_uring_cqe doesn't implement Clone due to the 'big_cqe' incomplete array field.
283        Entry(unsafe { mem::transmute_copy(&self.0) })
284    }
285}
286
287impl Debug for Entry {
288    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289        f.debug_struct("Entry")
290            .field("result", &self.result())
291            .field("user_data", &self.user_data())
292            .field("flags", &self.flags())
293            .finish()
294    }
295}
296
297impl Entry32 {
298    /// The operation-specific result code. For example, for a [`Read`](crate::opcode::Read)
299    /// operation this is equivalent to the return value of the `read(2)` system call.
300    #[inline]
301    pub fn result(&self) -> i32 {
302        self.0 .0.res
303    }
304
305    /// The user data of the request, as set by
306    /// [`Entry::user_data`](crate::squeue::Entry::user_data) on the submission queue event.
307    #[inline]
308    pub fn user_data(&self) -> sys::io_uring_user_data {
309        self.0 .0.user_data
310    }
311
312    /// Metadata related to the operation.
313    ///
314    /// This is currently used for:
315    /// - Storing the selected buffer ID, if one was selected. See
316    ///   [`BUFFER_SELECT`](crate::squeue::Flags::BUFFER_SELECT) for more info.
317    #[inline]
318    pub fn flags(&self) -> Flags {
319        Flags::from_bits_retain(self.0 .0.flags.bits())
320    }
321
322    /// Additional data available in 32-byte completion queue entries (CQEs).
323    #[inline]
324    pub fn big_cqe(&self) -> &[u64; 2] {
325        &self.1
326    }
327}
328
329impl private::Sealed for Entry32 {}
330
331impl EntryMarker for Entry32 {
332    const BUILD_FLAGS: sys::IoringSetupFlags = sys::IoringSetupFlags::CQE32;
333}
334
335impl From<Entry32> for Entry {
336    fn from(entry32: Entry32) -> Self {
337        entry32.0
338    }
339}
340
341impl Debug for Entry32 {
342    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
343        f.debug_struct("Entry32")
344            .field("result", &self.result())
345            .field("user_data", &self.user_data())
346            .field("flags", &self.flags())
347            .field("big_cqe", &self.big_cqe())
348            .finish()
349    }
350}
351
352/// Return which dynamic buffer was used by this operation.
353///
354/// This corresponds to the `IORING_CQE_F_BUFFER` flag (and related bit-shifting),
355/// and it signals to the consumer which provided contains the result of this
356/// operation.
357pub fn buffer_select(flags: Flags) -> Option<u16> {
358    if flags.contains(Flags::BUFFER) {
359        let id = flags.bits() >> sys::IORING_CQE_BUFFER_SHIFT;
360
361        // FIXME
362        //
363        // Should we return u16? maybe kernel will change value of `IORING_CQE_BUFFER_SHIFT` in future.
364        Some(id as u16)
365    } else {
366        None
367    }
368}
369
370/// Return whether further completion events will be submitted for
371/// this same operation.
372///
373/// This corresponds to the `IORING_CQE_F_MORE` flag, and it signals to
374/// the consumer that it should expect further CQE entries after this one,
375/// still from the same original SQE request (e.g. for multishot operations).
376pub fn more(flags: Flags) -> bool {
377    flags.contains(Flags::MORE)
378}
379
380/// Return whether socket has more data ready to read.
381///
382/// This corresponds to the `IORING_CQE_F_SOCK_NONEMPTY` flag, and it signals to
383/// the consumer that the socket has more data that can be read immediately.
384///
385/// The io_uring documentation says recv, recv-multishot, recvmsg, and recvmsg-multishot
386/// can provide this bit in their respective CQE.
387pub fn sock_nonempty(flags: Flags) -> bool {
388    flags.contains(Flags::SOCK_NONEMPTY)
389}
390
391/// Returns whether this completion event is a notification.
392///
393/// This corresponds to the `IORING_CQE_F_NOTIF` flag,
394/// currently used by the [SendZc](crate::opcode::SendZc) operation.
395pub fn notif(flags: Flags) -> bool {
396    flags.contains(Flags::NOTIF)
397}