rustix_uring/
cqueue.rs

1//! Completion Queue
2
3use core::fmt::{self, Debug};
4use core::mem;
5use core::mem::MaybeUninit;
6use core::sync::atomic;
7
8use crate::sys;
9use crate::util::{private, unsync_load, Mmap};
10
11use bitflags::bitflags;
12use rustix::io::Errno;
13
14pub(crate) struct Inner<E: EntryMarker> {
15    head: *const atomic::AtomicU32,
16    tail: *const atomic::AtomicU32,
17    ring_mask: u32,
18    ring_entries: u32,
19
20    overflow: *const atomic::AtomicU32,
21
22    cqes: *const E,
23
24    flags: *const atomic::AtomicU32,
25}
26
27/// An io_uring instance's completion queue. This stores all the I/O operations that have completed.
28pub struct CompletionQueue<'a, E: EntryMarker = Entry> {
29    head: u32,
30    tail: u32,
31    queue: &'a Inner<E>,
32}
33
34/// A completion queue entry (CQE), representing a complete I/O operation.
35///
36/// This is implemented for [`Entry`] and [`Entry32`].
37pub trait EntryMarker: Clone + Debug + Into<Entry> + private::Sealed {
38    const BUILD_FLAGS: sys::IoringSetupFlags;
39}
40
41/// A 16-byte completion queue entry (CQE), representing a complete I/O operation.
42#[repr(C)]
43pub struct Entry(pub(crate) sys::io_uring_cqe);
44
45/// A 32-byte completion queue entry (CQE), representing a complete I/O operation.
46#[repr(C)]
47#[derive(Clone)]
48pub struct Entry32(pub(crate) Entry, pub(crate) [u64; 2]);
49
50#[test]
51fn test_entry_sizes() {
52    assert_eq!(size_of::<Entry>(), 16);
53    assert_eq!(size_of::<Entry32>(), 32);
54}
55
56bitflags! {
57    /// Completion flags
58    #[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
59    pub struct Flags: u32 {
60        const BUFFER = sys::IoringCqeFlags::BUFFER.bits();
61
62        const MORE = sys::IoringCqeFlags::MORE.bits();
63
64        const SOCK_NONEMPTY = sys::IoringCqeFlags::SOCK_NONEMPTY.bits();
65
66        const NOTIF = sys::IoringCqeFlags::NOTIF.bits();
67    }
68}
69
70impl<E: EntryMarker> Inner<E> {
71    #[rustfmt::skip]
72    pub(crate) unsafe fn new(cq_mmap: &Mmap, p: &sys::io_uring_params) -> Self {
73        let head         = cq_mmap.offset(p.cq_off.head         ) as *const atomic::AtomicU32;
74        let tail         = cq_mmap.offset(p.cq_off.tail         ) as *const atomic::AtomicU32;
75        let ring_mask    = cq_mmap.offset(p.cq_off.ring_mask    ).cast::<u32>().read();
76        let ring_entries = cq_mmap.offset(p.cq_off.ring_entries ).cast::<u32>().read();
77        let overflow     = cq_mmap.offset(p.cq_off.overflow     ) as *const atomic::AtomicU32;
78        let cqes         = cq_mmap.offset(p.cq_off.cqes         ) as *const E;
79        let flags        = cq_mmap.offset(p.cq_off.flags        ) as *const atomic::AtomicU32;
80
81        Self {
82            head,
83            tail,
84            ring_mask,
85            ring_entries,
86            overflow,
87            cqes,
88            flags,
89        }
90    }
91
92    #[inline]
93    pub(crate) unsafe fn borrow_shared(&self) -> CompletionQueue<'_, E> {
94        CompletionQueue {
95            head: unsync_load(self.head),
96            tail: (*self.tail).load(atomic::Ordering::Acquire),
97            queue: self,
98        }
99    }
100
101    #[inline]
102    pub(crate) fn borrow(&mut self) -> CompletionQueue<'_, E> {
103        unsafe { self.borrow_shared() }
104    }
105}
106
107impl<E: EntryMarker> CompletionQueue<'_, E> {
108    /// Synchronize this type with the real completion queue.
109    ///
110    /// This will flush any entries consumed in this iterator and will make available new entries
111    /// in the queue if the kernel has produced some entries in the meantime.
112    #[inline]
113    pub fn sync(&mut self) {
114        unsafe {
115            (*self.queue.head).store(self.head, atomic::Ordering::Release);
116            self.tail = (*self.queue.tail).load(atomic::Ordering::Acquire);
117        }
118    }
119
120    /// If queue is full and [`is_feature_nodrop`](crate::Parameters::is_feature_nodrop) is not set,
121    /// new events may be dropped. This records the number of dropped events.
122    pub fn overflow(&self) -> u32 {
123        unsafe { (*self.queue.overflow).load(atomic::Ordering::Acquire) }
124    }
125
126    /// Whether eventfd notifications are disabled when a request is completed and queued to the CQ
127    /// ring. This library currently does not provide a way to set it, so this will always be
128    /// `false`.
129    pub fn eventfd_disabled(&self) -> bool {
130        unsafe {
131            sys::IoringCqFlags::from_bits_retain(
132                (*self.queue.flags).load(atomic::Ordering::Acquire),
133            )
134            .contains(sys::IoringCqFlags::EVENTFD_DISABLED)
135        }
136    }
137
138    /// Get the total number of entries in the completion queue ring buffer.
139    #[inline]
140    pub fn capacity(&self) -> usize {
141        self.queue.ring_entries as usize
142    }
143
144    /// Returns `true` if there are no completion queue events to be processed.
145    #[inline]
146    pub fn is_empty(&self) -> bool {
147        self.len() == 0
148    }
149
150    /// Returns `true` if the completion queue is at maximum capacity. If
151    /// [`is_feature_nodrop`](crate::Parameters::is_feature_nodrop) is not set, this will cause any
152    /// new completion queue events to be dropped by the kernel.
153    #[inline]
154    pub fn is_full(&self) -> bool {
155        self.len() == self.capacity()
156    }
157
158    #[inline]
159    pub fn fill<'a>(&mut self, entries: &'a mut [MaybeUninit<E>]) -> &'a mut [E] {
160        let len = core::cmp::min(self.len(), entries.len());
161
162        for entry in &mut entries[..len] {
163            entry.write(unsafe { self.pop() });
164        }
165
166        unsafe { core::slice::from_raw_parts_mut(entries as *mut _ as *mut E, len) }
167    }
168
169    #[inline]
170    unsafe fn pop(&mut self) -> E {
171        let entry = &*self
172            .queue
173            .cqes
174            .add((self.head & self.queue.ring_mask) as usize);
175        self.head = self.head.wrapping_add(1);
176        entry.clone()
177    }
178}
179
180impl<E: EntryMarker> Drop for CompletionQueue<'_, E> {
181    #[inline]
182    fn drop(&mut self) {
183        unsafe { &*self.queue.head }.store(self.head, atomic::Ordering::Release);
184    }
185}
186
187impl<E: EntryMarker> Iterator for CompletionQueue<'_, E> {
188    type Item = E;
189
190    #[inline]
191    fn next(&mut self) -> Option<Self::Item> {
192        if self.head != self.tail {
193            Some(unsafe { self.pop() })
194        } else {
195            None
196        }
197    }
198
199    #[inline]
200    fn size_hint(&self) -> (usize, Option<usize>) {
201        (self.len(), Some(self.len()))
202    }
203}
204
205impl<E: EntryMarker> ExactSizeIterator for CompletionQueue<'_, E> {
206    #[inline]
207    fn len(&self) -> usize {
208        self.tail.wrapping_sub(self.head) as usize
209    }
210}
211
212impl Entry {
213    /// The result of the operation.  If the operation succeeded, this is the operation-specific
214    /// return value.  For example, for a [`Read`](crate::opcode::Read) operation this is
215    /// equivalent to the return value of the `read(2)` system call.  If the operation failed, the
216    /// errno is returned.
217    #[inline]
218    pub fn result(&self) -> Result<u32, Errno> {
219        // The following text is found in many io_uring man pages:
220        //
221        // > Note that where synchronous system calls will return -1 on failure
222        // > and set errno to the actual error value, io_uring never uses errno.
223        // > Instead it returns the negated errno directly in the CQE res field.
224        //
225        // Furthermore, I believe a negative value in the `res` field is
226        // _always_ a negated errno.  We return a `Result` instead for
227        // convenience.
228        if let Ok(x) = u32::try_from(self.0.res) {
229            Ok(x)
230        } else {
231            Err(Errno::from_raw_os_error(-self.0.res))
232        }
233    }
234
235    /// The operation-specific result code. For example, for a [`Read`](crate::opcode::Read)
236    /// operation this is equivalent to the return value of the `read(2)` system call.
237    #[inline]
238    pub fn raw_result(&self) -> i32 {
239        self.0.res
240    }
241
242    /// The user data of the request, as set by
243    /// [`Entry::user_data`](crate::squeue::Entry::user_data) on the submission queue event.
244    #[inline]
245    pub fn user_data(&self) -> sys::io_uring_user_data {
246        self.0.user_data
247    }
248
249    /// The user data of the request, as set by
250    /// [`Entry::user_data`](crate::squeue::Entry::user_data) on the submission queue event.
251    #[inline]
252    pub fn user_data_u64(&self) -> u64 {
253        self.0.user_data.u64_()
254    }
255
256    /// The user data of the request, as set by
257    /// [`Entry::user_data`](crate::squeue::Entry::user_data) on the submission queue event.
258    #[inline]
259    pub fn user_data_ptr(&self) -> *mut core::ffi::c_void {
260        self.0.user_data.ptr()
261    }
262
263    /// Metadata related to the operation.
264    ///
265    /// This is currently used for:
266    /// - Storing the selected buffer ID, if one was selected. See
267    ///   [`BUFFER_SELECT`](crate::squeue::Flags::BUFFER_SELECT) for more info.
268    #[inline]
269    pub fn flags(&self) -> Flags {
270        Flags::from_bits_retain(self.0.flags.bits())
271    }
272}
273
274impl private::Sealed for Entry {}
275
276impl EntryMarker for Entry {
277    const BUILD_FLAGS: sys::IoringSetupFlags = sys::IoringSetupFlags::empty();
278}
279
280impl Clone for Entry {
281    fn clone(&self) -> Entry {
282        // io_uring_cqe doesn't implement Clone due to the 'big_cqe' incomplete array field.
283        Entry(unsafe { mem::transmute_copy(&self.0) })
284    }
285}
286
287impl Debug for Entry {
288    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289        f.debug_struct("Entry")
290            .field("result", &self.result())
291            .field("user_data", &self.user_data())
292            .field("flags", &self.flags())
293            .finish()
294    }
295}
296
297impl Entry32 {
298    /// The result of the operation.  If the operation succeeded, this is the operation-specific
299    /// return value.  For example, for a [`Read`](crate::opcode::Read) operation this is
300    /// equivalent to the return value of the `read(2)` system call.  If the operation failed, the
301    /// errno is returned.
302    #[inline]
303    pub fn result(&self) -> Result<u32, Errno> {
304        // See Entry::result() for the justification for this logic.
305        if let Ok(x) = u32::try_from(self.0 .0.res) {
306            Ok(x)
307        } else {
308            Err(Errno::from_raw_os_error(-self.0 .0.res))
309        }
310    }
311
312    /// The operation-specific result code. For example, for a [`Read`](crate::opcode::Read)
313    /// operation this is equivalent to the return value of the `read(2)` system call.
314    #[inline]
315    pub fn raw_result(&self) -> i32 {
316        self.0 .0.res
317    }
318
319    /// The user data of the request, as set by
320    /// [`Entry::user_data`](crate::squeue::Entry::user_data) on the submission queue event.
321    #[inline]
322    pub fn user_data(&self) -> sys::io_uring_user_data {
323        self.0 .0.user_data
324    }
325
326    /// Metadata related to the operation.
327    ///
328    /// This is currently used for:
329    /// - Storing the selected buffer ID, if one was selected. See
330    ///   [`BUFFER_SELECT`](crate::squeue::Flags::BUFFER_SELECT) for more info.
331    #[inline]
332    pub fn flags(&self) -> Flags {
333        Flags::from_bits_retain(self.0 .0.flags.bits())
334    }
335
336    /// Additional data available in 32-byte completion queue entries (CQEs).
337    #[inline]
338    pub fn big_cqe(&self) -> &[u64; 2] {
339        &self.1
340    }
341}
342
343impl private::Sealed for Entry32 {}
344
345impl EntryMarker for Entry32 {
346    const BUILD_FLAGS: sys::IoringSetupFlags = sys::IoringSetupFlags::CQE32;
347}
348
349impl From<Entry32> for Entry {
350    fn from(entry32: Entry32) -> Self {
351        entry32.0
352    }
353}
354
355impl Debug for Entry32 {
356    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
357        f.debug_struct("Entry32")
358            .field("result", &self.result())
359            .field("user_data", &self.user_data())
360            .field("flags", &self.flags())
361            .field("big_cqe", &self.big_cqe())
362            .finish()
363    }
364}
365
366/// Return which dynamic buffer was used by this operation.
367///
368/// This corresponds to the `IORING_CQE_F_BUFFER` flag (and related bit-shifting),
369/// and it signals to the consumer which provided contains the result of this
370/// operation.
371pub fn buffer_select(flags: Flags) -> Option<u16> {
372    if flags.contains(Flags::BUFFER) {
373        let id = flags.bits() >> sys::IORING_CQE_BUFFER_SHIFT;
374
375        // FIXME
376        //
377        // Should we return u16? maybe kernel will change value of `IORING_CQE_BUFFER_SHIFT` in future.
378        Some(id as u16)
379    } else {
380        None
381    }
382}
383
384/// Return whether further completion events will be submitted for
385/// this same operation.
386///
387/// This corresponds to the `IORING_CQE_F_MORE` flag, and it signals to
388/// the consumer that it should expect further CQE entries after this one,
389/// still from the same original SQE request (e.g. for multishot operations).
390pub fn more(flags: Flags) -> bool {
391    flags.contains(Flags::MORE)
392}
393
394/// Return whether socket has more data ready to read.
395///
396/// This corresponds to the `IORING_CQE_F_SOCK_NONEMPTY` flag, and it signals to
397/// the consumer that the socket has more data that can be read immediately.
398///
399/// The io_uring documentation says recv, recv-multishot, recvmsg, and recvmsg-multishot
400/// can provide this bit in their respective CQE.
401pub fn sock_nonempty(flags: Flags) -> bool {
402    flags.contains(Flags::SOCK_NONEMPTY)
403}
404
405/// Returns whether this completion event is a notification.
406///
407/// This corresponds to the `IORING_CQE_F_NOTIF` flag,
408/// currently used by the [SendZc](crate::opcode::SendZc) operation.
409pub fn notif(flags: Flags) -> bool {
410    flags.contains(Flags::NOTIF)
411}