1//! Completion Queue
23use core::fmt::{self, Debug};
4use core::mem;
5use core::mem::MaybeUninit;
6use core::sync::atomic;
78use crate::sys;
9use crate::util::{private, unsync_load, Mmap};
1011use bitflags::bitflags;
1213pub(crate) struct Inner<E: EntryMarker> {
14 head: *const atomic::AtomicU32,
15 tail: *const atomic::AtomicU32,
16 ring_mask: u32,
17 ring_entries: u32,
1819 overflow: *const atomic::AtomicU32,
2021 cqes: *const E,
2223 flags: *const atomic::AtomicU32,
24}
2526/// An io_uring instance's completion queue. This stores all the I/O operations that have completed.
27pub struct CompletionQueue<'a, E: EntryMarker = Entry> {
28 head: u32,
29 tail: u32,
30 queue: &'a Inner<E>,
31}
3233/// A completion queue entry (CQE), representing a complete I/O operation.
34///
35/// This is implemented for [`Entry`] and [`Entry32`].
36pub trait EntryMarker: Clone + Debug + Into<Entry> + private::Sealed {
37const BUILD_FLAGS: sys::IoringSetupFlags;
38}
3940/// A 16-byte completion queue entry (CQE), representing a complete I/O operation.
41#[repr(C)]
42pub struct Entry(pub(crate) sys::io_uring_cqe);
4344/// A 32-byte completion queue entry (CQE), representing a complete I/O operation.
45#[repr(C)]
46#[derive(Clone)]
47pub struct Entry32(pub(crate) Entry, pub(crate) [u64; 2]);
4849#[test]
50fn test_entry_sizes() {
51assert_eq!(size_of::<Entry>(), 16);
52assert_eq!(size_of::<Entry32>(), 32);
53}
5455bitflags! {
56/// Completion flags
57#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
58pub struct Flags: u32 {
59const BUFFER = sys::IoringCqeFlags::BUFFER.bits();
6061const MORE = sys::IoringCqeFlags::MORE.bits();
6263const SOCK_NONEMPTY = sys::IoringCqeFlags::SOCK_NONEMPTY.bits();
6465const NOTIF = sys::IoringCqeFlags::NOTIF.bits();
66 }
67}
6869impl<E: EntryMarker> Inner<E> {
70#[rustfmt::skip]
71pub(crate) unsafe fn new(cq_mmap: &Mmap, p: &sys::io_uring_params) -> Self {
72let head = cq_mmap.offset(p.cq_off.head ) as *const atomic::AtomicU32;
73let tail = cq_mmap.offset(p.cq_off.tail ) as *const atomic::AtomicU32;
74let ring_mask = cq_mmap.offset(p.cq_off.ring_mask ).cast::<u32>().read();
75let ring_entries = cq_mmap.offset(p.cq_off.ring_entries ).cast::<u32>().read();
76let overflow = cq_mmap.offset(p.cq_off.overflow ) as *const atomic::AtomicU32;
77let cqes = cq_mmap.offset(p.cq_off.cqes ) as *const E;
78let flags = cq_mmap.offset(p.cq_off.flags ) as *const atomic::AtomicU32;
7980Self {
81 head,
82 tail,
83 ring_mask,
84 ring_entries,
85 overflow,
86 cqes,
87 flags,
88 }
89 }
9091#[inline]
92pub(crate) unsafe fn borrow_shared(&self) -> CompletionQueue<'_, E> {
93 CompletionQueue {
94 head: unsync_load(self.head),
95 tail: (*self.tail).load(atomic::Ordering::Acquire),
96 queue: self,
97 }
98 }
99100#[inline]
101pub(crate) fn borrow(&mut self) -> CompletionQueue<'_, E> {
102unsafe { self.borrow_shared() }
103 }
104}
105106impl<E: EntryMarker> CompletionQueue<'_, E> {
107/// Synchronize this type with the real completion queue.
108 ///
109 /// This will flush any entries consumed in this iterator and will make available new entries
110 /// in the queue if the kernel has produced some entries in the meantime.
111#[inline]
112pub fn sync(&mut self) {
113unsafe {
114 (*self.queue.head).store(self.head, atomic::Ordering::Release);
115self.tail = (*self.queue.tail).load(atomic::Ordering::Acquire);
116 }
117 }
118119/// If queue is full and [`is_feature_nodrop`](crate::Parameters::is_feature_nodrop) is not set,
120 /// new events may be dropped. This records the number of dropped events.
121pub fn overflow(&self) -> u32 {
122unsafe { (*self.queue.overflow).load(atomic::Ordering::Acquire) }
123 }
124125/// Whether eventfd notifications are disabled when a request is completed and queued to the CQ
126 /// ring. This library currently does not provide a way to set it, so this will always be
127 /// `false`.
128pub fn eventfd_disabled(&self) -> bool {
129unsafe {
130 sys::IoringCqFlags::from_bits_retain(
131 (*self.queue.flags).load(atomic::Ordering::Acquire),
132 )
133 .contains(sys::IoringCqFlags::EVENTFD_DISABLED)
134 }
135 }
136137/// Get the total number of entries in the completion queue ring buffer.
138#[inline]
139pub fn capacity(&self) -> usize {
140self.queue.ring_entries as usize
141 }
142143/// Returns `true` if there are no completion queue events to be processed.
144#[inline]
145pub fn is_empty(&self) -> bool {
146self.len() == 0
147}
148149/// Returns `true` if the completion queue is at maximum capacity. If
150 /// [`is_feature_nodrop`](crate::Parameters::is_feature_nodrop) is not set, this will cause any
151 /// new completion queue events to be dropped by the kernel.
152#[inline]
153pub fn is_full(&self) -> bool {
154self.len() == self.capacity()
155 }
156157#[inline]
158pub fn fill<'a>(&mut self, entries: &'a mut [MaybeUninit<E>]) -> &'a mut [E] {
159let len = core::cmp::min(self.len(), entries.len());
160161for entry in &mut entries[..len] {
162 entry.write(unsafe { self.pop() });
163 }
164165unsafe { core::slice::from_raw_parts_mut(entries as *mut _ as *mut E, len) }
166 }
167168#[inline]
169unsafe fn pop(&mut self) -> E {
170let entry = &*self
171.queue
172 .cqes
173 .add((self.head & self.queue.ring_mask) as usize);
174self.head = self.head.wrapping_add(1);
175 entry.clone()
176 }
177}
178179impl<E: EntryMarker> Drop for CompletionQueue<'_, E> {
180#[inline]
181fn drop(&mut self) {
182unsafe { &*self.queue.head }.store(self.head, atomic::Ordering::Release);
183 }
184}
185186impl<E: EntryMarker> Iterator for CompletionQueue<'_, E> {
187type Item = E;
188189#[inline]
190fn next(&mut self) -> Option<Self::Item> {
191if self.head != self.tail {
192Some(unsafe { self.pop() })
193 } else {
194None
195}
196 }
197198#[inline]
199fn size_hint(&self) -> (usize, Option<usize>) {
200 (self.len(), Some(self.len()))
201 }
202}
203204impl<E: EntryMarker> ExactSizeIterator for CompletionQueue<'_, E> {
205#[inline]
206fn len(&self) -> usize {
207self.tail.wrapping_sub(self.head) as usize
208 }
209}
210211impl Entry {
212/// The operation-specific result code. For example, for a [`Read`](crate::opcode::Read)
213 /// operation this is equivalent to the return value of the `read(2)` system call.
214#[inline]
215pub fn result(&self) -> i32 {
216self.0.res
217 }
218219/// The user data of the request, as set by
220 /// [`Entry::user_data`](crate::squeue::Entry::user_data) on the submission queue event.
221#[inline]
222pub fn user_data(&self) -> sys::io_uring_user_data {
223self.0.user_data
224 }
225226/// The user data of the request, as set by
227 /// [`Entry::user_data`](crate::squeue::Entry::user_data) on the submission queue event.
228#[inline]
229pub fn user_data_u64(&self) -> u64 {
230self.0.user_data.u64_()
231 }
232233/// The user data of the request, as set by
234 /// [`Entry::user_data`](crate::squeue::Entry::user_data) on the submission queue event.
235#[inline]
236pub fn user_data_ptr(&self) -> *mut core::ffi::c_void {
237self.0.user_data.ptr()
238 }
239240/// Metadata related to the operation.
241 ///
242 /// This is currently used for:
243 /// - Storing the selected buffer ID, if one was selected. See
244 /// [`BUFFER_SELECT`](crate::squeue::Flags::BUFFER_SELECT) for more info.
245#[inline]
246pub fn flags(&self) -> Flags {
247 Flags::from_bits_retain(self.0.flags.bits())
248 }
249}
250251impl private::Sealed for Entry {}
252253impl EntryMarker for Entry {
254const BUILD_FLAGS: sys::IoringSetupFlags = sys::IoringSetupFlags::empty();
255}
256257impl Clone for Entry {
258fn clone(&self) -> Entry {
259// io_uring_cqe doesn't implement Clone due to the 'big_cqe' incomplete array field.
260Entry(unsafe { mem::transmute_copy(&self.0) })
261 }
262}
263264impl Debug for Entry {
265fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
266 f.debug_struct("Entry")
267 .field("result", &self.result())
268 .field("user_data", &self.user_data())
269 .field("flags", &self.flags())
270 .finish()
271 }
272}
273274impl Entry32 {
275/// The operation-specific result code. For example, for a [`Read`](crate::opcode::Read)
276 /// operation this is equivalent to the return value of the `read(2)` system call.
277#[inline]
278pub fn result(&self) -> i32 {
279self.0 .0.res
280 }
281282/// The user data of the request, as set by
283 /// [`Entry::user_data`](crate::squeue::Entry::user_data) on the submission queue event.
284#[inline]
285pub fn user_data(&self) -> sys::io_uring_user_data {
286self.0 .0.user_data
287 }
288289/// Metadata related to the operation.
290 ///
291 /// This is currently used for:
292 /// - Storing the selected buffer ID, if one was selected. See
293 /// [`BUFFER_SELECT`](crate::squeue::Flags::BUFFER_SELECT) for more info.
294#[inline]
295pub fn flags(&self) -> Flags {
296 Flags::from_bits_retain(self.0 .0.flags.bits())
297 }
298299/// Additional data available in 32-byte completion queue entries (CQEs).
300#[inline]
301pub fn big_cqe(&self) -> &[u64; 2] {
302&self.1
303}
304}
305306impl private::Sealed for Entry32 {}
307308impl EntryMarker for Entry32 {
309const BUILD_FLAGS: sys::IoringSetupFlags = sys::IoringSetupFlags::CQE32;
310}
311312impl From<Entry32> for Entry {
313fn from(entry32: Entry32) -> Self {
314 entry32.0
315}
316}
317318impl Debug for Entry32 {
319fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
320 f.debug_struct("Entry32")
321 .field("result", &self.result())
322 .field("user_data", &self.user_data())
323 .field("flags", &self.flags())
324 .field("big_cqe", &self.big_cqe())
325 .finish()
326 }
327}
328329/// Return which dynamic buffer was used by this operation.
330///
331/// This corresponds to the `IORING_CQE_F_BUFFER` flag (and related bit-shifting),
332/// and it signals to the consumer which provided contains the result of this
333/// operation.
334pub fn buffer_select(flags: Flags) -> Option<u16> {
335if flags.contains(Flags::BUFFER) {
336let id = flags.bits() >> sys::IORING_CQE_BUFFER_SHIFT;
337338// FIXME
339 //
340 // Should we return u16? maybe kernel will change value of `IORING_CQE_BUFFER_SHIFT` in future.
341Some(id as u16)
342 } else {
343None
344}
345}
346347/// Return whether further completion events will be submitted for
348/// this same operation.
349///
350/// This corresponds to the `IORING_CQE_F_MORE` flag, and it signals to
351/// the consumer that it should expect further CQE entries after this one,
352/// still from the same original SQE request (e.g. for multishot operations).
353pub fn more(flags: Flags) -> bool {
354 flags.contains(Flags::MORE)
355}
356357/// Return whether socket has more data ready to read.
358///
359/// This corresponds to the `IORING_CQE_F_SOCK_NONEMPTY` flag, and it signals to
360/// the consumer that the socket has more data that can be read immediately.
361///
362/// The io_uring documentation says recv, recv-multishot, recvmsg, and recvmsg-multishot
363/// can provide this bit in their respective CQE.
364pub fn sock_nonempty(flags: Flags) -> bool {
365 flags.contains(Flags::SOCK_NONEMPTY)
366}
367368/// Returns whether this completion event is a notification.
369///
370/// This corresponds to the `IORING_CQE_F_NOTIF` flag,
371/// currently used by the [SendZc](crate::opcode::SendZc) operation.
372pub fn notif(flags: Flags) -> bool {
373 flags.contains(Flags::NOTIF)
374}