rustix_uring/
cqueue.rs

1//! Completion Queue
2
3use core::fmt::{self, Debug};
4use core::mem;
5use core::mem::MaybeUninit;
6use core::sync::atomic;
7
8use crate::sys;
9use crate::util::{private, unsync_load, Mmap};
10
11use bitflags::bitflags;
12
13pub(crate) struct Inner<E: EntryMarker> {
14    head: *const atomic::AtomicU32,
15    tail: *const atomic::AtomicU32,
16    ring_mask: u32,
17    ring_entries: u32,
18
19    overflow: *const atomic::AtomicU32,
20
21    cqes: *const E,
22
23    flags: *const atomic::AtomicU32,
24}
25
26/// An io_uring instance's completion queue. This stores all the I/O operations that have completed.
27pub struct CompletionQueue<'a, E: EntryMarker = Entry> {
28    head: u32,
29    tail: u32,
30    queue: &'a Inner<E>,
31}
32
33/// A completion queue entry (CQE), representing a complete I/O operation.
34///
35/// This is implemented for [`Entry`] and [`Entry32`].
36pub trait EntryMarker: Clone + Debug + Into<Entry> + private::Sealed {
37    const BUILD_FLAGS: sys::IoringSetupFlags;
38}
39
40/// A 16-byte completion queue entry (CQE), representing a complete I/O operation.
41#[repr(C)]
42pub struct Entry(pub(crate) sys::io_uring_cqe);
43
44/// A 32-byte completion queue entry (CQE), representing a complete I/O operation.
45#[repr(C)]
46#[derive(Clone)]
47pub struct Entry32(pub(crate) Entry, pub(crate) [u64; 2]);
48
49#[test]
50fn test_entry_sizes() {
51    assert_eq!(size_of::<Entry>(), 16);
52    assert_eq!(size_of::<Entry32>(), 32);
53}
54
55bitflags! {
56    /// Completion flags
57    #[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
58    pub struct Flags: u32 {
59        const BUFFER = sys::IoringCqeFlags::BUFFER.bits();
60
61        const MORE = sys::IoringCqeFlags::MORE.bits();
62
63        const SOCK_NONEMPTY = sys::IoringCqeFlags::SOCK_NONEMPTY.bits();
64
65        const NOTIF = sys::IoringCqeFlags::NOTIF.bits();
66    }
67}
68
69impl<E: EntryMarker> Inner<E> {
70    #[rustfmt::skip]
71    pub(crate) unsafe fn new(cq_mmap: &Mmap, p: &sys::io_uring_params) -> Self {
72        let head         = cq_mmap.offset(p.cq_off.head         ) as *const atomic::AtomicU32;
73        let tail         = cq_mmap.offset(p.cq_off.tail         ) as *const atomic::AtomicU32;
74        let ring_mask    = cq_mmap.offset(p.cq_off.ring_mask    ).cast::<u32>().read();
75        let ring_entries = cq_mmap.offset(p.cq_off.ring_entries ).cast::<u32>().read();
76        let overflow     = cq_mmap.offset(p.cq_off.overflow     ) as *const atomic::AtomicU32;
77        let cqes         = cq_mmap.offset(p.cq_off.cqes         ) as *const E;
78        let flags        = cq_mmap.offset(p.cq_off.flags        ) as *const atomic::AtomicU32;
79
80        Self {
81            head,
82            tail,
83            ring_mask,
84            ring_entries,
85            overflow,
86            cqes,
87            flags,
88        }
89    }
90
91    #[inline]
92    pub(crate) unsafe fn borrow_shared(&self) -> CompletionQueue<'_, E> {
93        CompletionQueue {
94            head: unsync_load(self.head),
95            tail: (*self.tail).load(atomic::Ordering::Acquire),
96            queue: self,
97        }
98    }
99
100    #[inline]
101    pub(crate) fn borrow(&mut self) -> CompletionQueue<'_, E> {
102        unsafe { self.borrow_shared() }
103    }
104}
105
106impl<E: EntryMarker> CompletionQueue<'_, E> {
107    /// Synchronize this type with the real completion queue.
108    ///
109    /// This will flush any entries consumed in this iterator and will make available new entries
110    /// in the queue if the kernel has produced some entries in the meantime.
111    #[inline]
112    pub fn sync(&mut self) {
113        unsafe {
114            (*self.queue.head).store(self.head, atomic::Ordering::Release);
115            self.tail = (*self.queue.tail).load(atomic::Ordering::Acquire);
116        }
117    }
118
119    /// If queue is full and [`is_feature_nodrop`](crate::Parameters::is_feature_nodrop) is not set,
120    /// new events may be dropped. This records the number of dropped events.
121    pub fn overflow(&self) -> u32 {
122        unsafe { (*self.queue.overflow).load(atomic::Ordering::Acquire) }
123    }
124
125    /// Whether eventfd notifications are disabled when a request is completed and queued to the CQ
126    /// ring. This library currently does not provide a way to set it, so this will always be
127    /// `false`.
128    pub fn eventfd_disabled(&self) -> bool {
129        unsafe {
130            sys::IoringCqFlags::from_bits_retain(
131                (*self.queue.flags).load(atomic::Ordering::Acquire),
132            )
133            .contains(sys::IoringCqFlags::EVENTFD_DISABLED)
134        }
135    }
136
137    /// Get the total number of entries in the completion queue ring buffer.
138    #[inline]
139    pub fn capacity(&self) -> usize {
140        self.queue.ring_entries as usize
141    }
142
143    /// Returns `true` if there are no completion queue events to be processed.
144    #[inline]
145    pub fn is_empty(&self) -> bool {
146        self.len() == 0
147    }
148
149    /// Returns `true` if the completion queue is at maximum capacity. If
150    /// [`is_feature_nodrop`](crate::Parameters::is_feature_nodrop) is not set, this will cause any
151    /// new completion queue events to be dropped by the kernel.
152    #[inline]
153    pub fn is_full(&self) -> bool {
154        self.len() == self.capacity()
155    }
156
157    #[inline]
158    pub fn fill<'a>(&mut self, entries: &'a mut [MaybeUninit<E>]) -> &'a mut [E] {
159        let len = core::cmp::min(self.len(), entries.len());
160
161        for entry in &mut entries[..len] {
162            entry.write(unsafe { self.pop() });
163        }
164
165        unsafe { core::slice::from_raw_parts_mut(entries as *mut _ as *mut E, len) }
166    }
167
168    #[inline]
169    unsafe fn pop(&mut self) -> E {
170        let entry = &*self
171            .queue
172            .cqes
173            .add((self.head & self.queue.ring_mask) as usize);
174        self.head = self.head.wrapping_add(1);
175        entry.clone()
176    }
177}
178
179impl<E: EntryMarker> Drop for CompletionQueue<'_, E> {
180    #[inline]
181    fn drop(&mut self) {
182        unsafe { &*self.queue.head }.store(self.head, atomic::Ordering::Release);
183    }
184}
185
186impl<E: EntryMarker> Iterator for CompletionQueue<'_, E> {
187    type Item = E;
188
189    #[inline]
190    fn next(&mut self) -> Option<Self::Item> {
191        if self.head != self.tail {
192            Some(unsafe { self.pop() })
193        } else {
194            None
195        }
196    }
197
198    #[inline]
199    fn size_hint(&self) -> (usize, Option<usize>) {
200        (self.len(), Some(self.len()))
201    }
202}
203
204impl<E: EntryMarker> ExactSizeIterator for CompletionQueue<'_, E> {
205    #[inline]
206    fn len(&self) -> usize {
207        self.tail.wrapping_sub(self.head) as usize
208    }
209}
210
211impl Entry {
212    /// The operation-specific result code. For example, for a [`Read`](crate::opcode::Read)
213    /// operation this is equivalent to the return value of the `read(2)` system call.
214    #[inline]
215    pub fn result(&self) -> i32 {
216        self.0.res
217    }
218
219    /// The user data of the request, as set by
220    /// [`Entry::user_data`](crate::squeue::Entry::user_data) on the submission queue event.
221    #[inline]
222    pub fn user_data(&self) -> sys::io_uring_user_data {
223        self.0.user_data
224    }
225
226    /// The user data of the request, as set by
227    /// [`Entry::user_data`](crate::squeue::Entry::user_data) on the submission queue event.
228    #[inline]
229    pub fn user_data_u64(&self) -> u64 {
230        self.0.user_data.u64_()
231    }
232
233    /// The user data of the request, as set by
234    /// [`Entry::user_data`](crate::squeue::Entry::user_data) on the submission queue event.
235    #[inline]
236    pub fn user_data_ptr(&self) -> *mut core::ffi::c_void {
237        self.0.user_data.ptr()
238    }
239
240    /// Metadata related to the operation.
241    ///
242    /// This is currently used for:
243    /// - Storing the selected buffer ID, if one was selected. See
244    ///   [`BUFFER_SELECT`](crate::squeue::Flags::BUFFER_SELECT) for more info.
245    #[inline]
246    pub fn flags(&self) -> Flags {
247        Flags::from_bits_retain(self.0.flags.bits())
248    }
249}
250
251impl private::Sealed for Entry {}
252
253impl EntryMarker for Entry {
254    const BUILD_FLAGS: sys::IoringSetupFlags = sys::IoringSetupFlags::empty();
255}
256
257impl Clone for Entry {
258    fn clone(&self) -> Entry {
259        // io_uring_cqe doesn't implement Clone due to the 'big_cqe' incomplete array field.
260        Entry(unsafe { mem::transmute_copy(&self.0) })
261    }
262}
263
264impl Debug for Entry {
265    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
266        f.debug_struct("Entry")
267            .field("result", &self.result())
268            .field("user_data", &self.user_data())
269            .field("flags", &self.flags())
270            .finish()
271    }
272}
273
274impl Entry32 {
275    /// The operation-specific result code. For example, for a [`Read`](crate::opcode::Read)
276    /// operation this is equivalent to the return value of the `read(2)` system call.
277    #[inline]
278    pub fn result(&self) -> i32 {
279        self.0 .0.res
280    }
281
282    /// The user data of the request, as set by
283    /// [`Entry::user_data`](crate::squeue::Entry::user_data) on the submission queue event.
284    #[inline]
285    pub fn user_data(&self) -> sys::io_uring_user_data {
286        self.0 .0.user_data
287    }
288
289    /// Metadata related to the operation.
290    ///
291    /// This is currently used for:
292    /// - Storing the selected buffer ID, if one was selected. See
293    ///   [`BUFFER_SELECT`](crate::squeue::Flags::BUFFER_SELECT) for more info.
294    #[inline]
295    pub fn flags(&self) -> Flags {
296        Flags::from_bits_retain(self.0 .0.flags.bits())
297    }
298
299    /// Additional data available in 32-byte completion queue entries (CQEs).
300    #[inline]
301    pub fn big_cqe(&self) -> &[u64; 2] {
302        &self.1
303    }
304}
305
306impl private::Sealed for Entry32 {}
307
308impl EntryMarker for Entry32 {
309    const BUILD_FLAGS: sys::IoringSetupFlags = sys::IoringSetupFlags::CQE32;
310}
311
312impl From<Entry32> for Entry {
313    fn from(entry32: Entry32) -> Self {
314        entry32.0
315    }
316}
317
318impl Debug for Entry32 {
319    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
320        f.debug_struct("Entry32")
321            .field("result", &self.result())
322            .field("user_data", &self.user_data())
323            .field("flags", &self.flags())
324            .field("big_cqe", &self.big_cqe())
325            .finish()
326    }
327}
328
329/// Return which dynamic buffer was used by this operation.
330///
331/// This corresponds to the `IORING_CQE_F_BUFFER` flag (and related bit-shifting),
332/// and it signals to the consumer which provided contains the result of this
333/// operation.
334pub fn buffer_select(flags: Flags) -> Option<u16> {
335    if flags.contains(Flags::BUFFER) {
336        let id = flags.bits() >> sys::IORING_CQE_BUFFER_SHIFT;
337
338        // FIXME
339        //
340        // Should we return u16? maybe kernel will change value of `IORING_CQE_BUFFER_SHIFT` in future.
341        Some(id as u16)
342    } else {
343        None
344    }
345}
346
347/// Return whether further completion events will be submitted for
348/// this same operation.
349///
350/// This corresponds to the `IORING_CQE_F_MORE` flag, and it signals to
351/// the consumer that it should expect further CQE entries after this one,
352/// still from the same original SQE request (e.g. for multishot operations).
353pub fn more(flags: Flags) -> bool {
354    flags.contains(Flags::MORE)
355}
356
357/// Return whether socket has more data ready to read.
358///
359/// This corresponds to the `IORING_CQE_F_SOCK_NONEMPTY` flag, and it signals to
360/// the consumer that the socket has more data that can be read immediately.
361///
362/// The io_uring documentation says recv, recv-multishot, recvmsg, and recvmsg-multishot
363/// can provide this bit in their respective CQE.
364pub fn sock_nonempty(flags: Flags) -> bool {
365    flags.contains(Flags::SOCK_NONEMPTY)
366}
367
368/// Returns whether this completion event is a notification.
369///
370/// This corresponds to the `IORING_CQE_F_NOTIF` flag,
371/// currently used by the [SendZc](crate::opcode::SendZc) operation.
372pub fn notif(flags: Flags) -> bool {
373    flags.contains(Flags::NOTIF)
374}