rustix_uring/
squeue.rs

1//! Submission Queue
2
3use core::fmt::{self, Debug, Display, Formatter};
4use core::mem;
5use core::sync::atomic;
6#[cfg(feature = "std")]
7use std::error::Error;
8
9use crate::sys;
10use crate::util::{private, unsync_load, Mmap};
11
12use bitflags::bitflags;
13
14pub(crate) struct Inner<E: EntryMarker> {
15    pub(crate) head: *const atomic::AtomicU32,
16    pub(crate) tail: *const atomic::AtomicU32,
17    pub(crate) ring_mask: u32,
18    pub(crate) ring_entries: u32,
19    pub(crate) flags: *const atomic::AtomicU32,
20    dropped: *const atomic::AtomicU32,
21
22    pub(crate) sqes: *mut E,
23}
24
25/// An io_uring instance's submission queue. This is used to send I/O requests to the kernel.
26pub struct SubmissionQueue<'a, E: EntryMarker = Entry> {
27    head: u32,
28    tail: u32,
29    queue: &'a Inner<E>,
30}
31
32/// A submission queue entry (SQE), representing a request for an I/O operation.
33///
34/// This is implemented for [`Entry`] and [`Entry128`].
35pub trait EntryMarker: Clone + Debug + From<Entry> + private::Sealed {
36    const BUILD_FLAGS: sys::IoringSetupFlags;
37}
38
39/// A 64-byte submission queue entry (SQE), representing a request for an I/O operation.
40///
41/// These can be created via opcodes in [`opcode`](crate::opcode).
42#[repr(C)]
43pub struct Entry(pub(crate) sys::io_uring_sqe);
44
45/// A 128-byte submission queue entry (SQE), representing a request for an I/O operation.
46///
47/// These can be created via opcodes in [`opcode`](crate::opcode).
48#[repr(C)]
49#[derive(Clone)]
50pub struct Entry128(pub(crate) Entry, pub(crate) [u8; 64]);
51
52#[test]
53fn test_entry_sizes() {
54    assert_eq!(size_of::<Entry>(), 64);
55    assert_eq!(size_of::<Entry128>(), 128);
56}
57
58bitflags! {
59    /// Submission flags
60    #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
61    pub struct Flags: u8 {
62        /// When this flag is specified,
63        /// `fd` is an index into the files array registered with the io_uring instance.
64        #[doc(hidden)]
65        const FIXED_FILE =  sys::IoringSqeFlags::FIXED_FILE.bits();
66
67        /// When this flag is specified,
68        /// the SQE will not be started before previously submitted SQEs have completed,
69        /// and new SQEs will not be started before this one completes.
70        const IO_DRAIN =  sys::IoringSqeFlags::IO_DRAIN.bits();
71
72        /// When this flag is specified,
73        /// it forms a link with the next SQE in the submission ring.
74        /// That next SQE will not be started before this one completes.
75        const IO_LINK =  sys::IoringSqeFlags::IO_LINK.bits();
76
77        /// Like [`IO_LINK`](Self::IO_LINK), but it doesn’t sever regardless of the completion
78        /// result.
79        const IO_HARDLINK =  sys::IoringSqeFlags::IO_HARDLINK.bits();
80
81        /// Normal operation for io_uring is to try and issue an sqe as non-blocking first,
82        /// and if that fails, execute it in an async manner.
83        ///
84        /// To support more efficient overlapped operation of requests
85        /// that the application knows/assumes will always (or most of the time) block,
86        /// the application can ask for an sqe to be issued async from the start.
87        const ASYNC =  sys::IoringSqeFlags::ASYNC.bits();
88
89        /// Conceptually the kernel holds a set of buffers organized into groups. When you issue a
90        /// request with this flag and set `buf_group` to a valid buffer group ID (e.g.
91        /// [`buf_group` on `Read`](crate::opcode::Read::buf_group)) then once the file descriptor
92        /// becomes ready the kernel will try to take a buffer from the group.
93        ///
94        /// If there are no buffers in the group, your request will fail with `-ENOBUFS`. Otherwise,
95        /// the corresponding [`cqueue::Entry::flags`](crate::cqueue::Entry::flags) will contain the
96        /// chosen buffer ID, encoded with:
97        ///
98        /// ```text
99        /// (buffer_id << IORING_CQE_BUFFER_SHIFT) | IORING_CQE_F_BUFFER
100        /// ```
101        ///
102        /// You can use [`buffer_select`](crate::cqueue::buffer_select) to take the buffer ID.
103        ///
104        /// The buffer will then be removed from the group and won't be usable by other requests
105        /// anymore.
106        ///
107        /// You can provide new buffers in a group with
108        /// [`ProvideBuffers`](crate::opcode::ProvideBuffers).
109        ///
110        /// See also [the LWN thread on automatic buffer
111        /// selection](https://lwn.net/Articles/815491/).
112        const BUFFER_SELECT =  sys::IoringSqeFlags::BUFFER_SELECT.bits();
113
114        /// Don't post CQE if request succeeded.
115        const SKIP_SUCCESS =  sys::IoringSqeFlags::CQE_SKIP_SUCCESS.bits();
116    }
117}
118
119impl<E: EntryMarker> Inner<E> {
120    #[rustfmt::skip]
121    pub(crate) unsafe fn new(
122        sq_mmap: &Mmap,
123        sqe_mmap: &Mmap,
124        p: &sys::io_uring_params,
125    ) -> Self {
126        let head         = sq_mmap.offset(p.sq_off.head        ) as *const atomic::AtomicU32;
127        let tail         = sq_mmap.offset(p.sq_off.tail        ) as *const atomic::AtomicU32;
128        let ring_mask    = sq_mmap.offset(p.sq_off.ring_mask   ).cast::<u32>().read();
129        let ring_entries = sq_mmap.offset(p.sq_off.ring_entries).cast::<u32>().read();
130        let flags        = sq_mmap.offset(p.sq_off.flags       ) as *const atomic::AtomicU32;
131        let dropped      = sq_mmap.offset(p.sq_off.dropped     ) as *const atomic::AtomicU32;
132        let array        = sq_mmap.offset(p.sq_off.array       ) as *mut u32;
133
134        let sqes         = sqe_mmap.as_mut_ptr() as *mut E;
135
136        // To keep it simple, map it directly to `sqes`.
137        for i in 0..ring_entries {
138            array.add(i as usize).write_volatile(i);
139        }
140
141        Self {
142            head,
143            tail,
144            ring_mask,
145            ring_entries,
146            flags,
147            dropped,
148            sqes,
149        }
150    }
151
152    #[inline]
153    pub(crate) unsafe fn borrow_shared(&self) -> SubmissionQueue<'_, E> {
154        SubmissionQueue {
155            head: (*self.head).load(atomic::Ordering::Acquire),
156            tail: unsync_load(self.tail),
157            queue: self,
158        }
159    }
160
161    #[inline]
162    pub(crate) fn borrow(&mut self) -> SubmissionQueue<'_, E> {
163        unsafe { self.borrow_shared() }
164    }
165}
166
167impl<E: EntryMarker> SubmissionQueue<'_, E> {
168    /// Synchronize this type with the real submission queue.
169    ///
170    /// This will flush any entries added by [`push`](Self::push) or
171    /// [`push_multiple`](Self::push_multiple) and will update the queue's length if the kernel has
172    /// consumed some entries in the meantime.
173    #[inline]
174    pub fn sync(&mut self) {
175        unsafe {
176            (*self.queue.tail).store(self.tail, atomic::Ordering::Release);
177            self.head = (*self.queue.head).load(atomic::Ordering::Acquire);
178        }
179    }
180
181    /// When [`is_setup_sqpoll`](crate::Parameters::is_setup_sqpoll) is set, whether the kernel
182    /// threads has gone to sleep and requires a system call to wake it up.
183    ///
184    /// A result of `false` is only meaningful if the function was called after the latest update
185    /// to the queue head. Other interpretations could lead to a race condition where the kernel
186    /// concurrently put the device to sleep and no further progress is made.
187    #[inline]
188    pub fn need_wakeup(&self) -> bool {
189        // See discussions that happened in [#197] and its linked threads in liburing. We need to
190        // ensure that writes to the head have been visible _to the kernel_ if this load results in
191        // decision to sleep. This is solved with a SeqCst fence. There is no common modified
192        // memory location that would provide alternative synchronization.
193        //
194        // The kernel, from its sequencing, first writes the wake flag, then performs a full
195        // barrier (`smp_mb`, or `smp_mb__after_atomic`), then reads the head. We assume that our
196        // user first writes the head and then reads the `need_wakeup` flag as documented. It is
197        // necessary to ensure that at least one observes the other write. By establishing a point
198        // of sequential consistency on both sides between their respective write and read, at
199        // least one coherency order holds. With regards to the interpretation of the atomic memory
200        // model of Rust (that is, that of C++20) we're assuming that an `smp_mb` provides at least
201        // the effect of a `fence(SeqCst)`.
202        //
203        // [#197]: https://github.com/tokio-rs/io-uring/issues/197
204        atomic::fence(atomic::Ordering::SeqCst);
205        unsafe {
206            sys::IoringSqFlags::from_bits_retain(
207                (*self.queue.flags).load(atomic::Ordering::Relaxed),
208            )
209            .contains(sys::IoringSqFlags::NEED_WAKEUP)
210        }
211    }
212
213    /// The effect of [`Self::need_wakeup`], after synchronization work performed by the caller.
214    ///
215    /// This function should only be called if the caller can guarantee that a `SeqCst` fence has
216    /// been inserted after the last write to the queue's head. The function is then a little more
217    /// efficient by avoiding to perform one itself.
218    ///
219    /// Failure to uphold the precondition can result in an effective dead-lock due to a sleeping
220    /// device.
221    #[inline]
222    pub fn need_wakeup_after_intermittent_seqcst(&self) -> bool {
223        unsafe {
224            sys::IoringSqFlags::from_bits_retain(
225                (*self.queue.flags).load(atomic::Ordering::Relaxed),
226            )
227            .contains(sys::IoringSqFlags::NEED_WAKEUP)
228        }
229    }
230
231    /// The number of invalid submission queue entries that have been encountered in the ring
232    /// buffer.
233    pub fn dropped(&self) -> u32 {
234        unsafe { (*self.queue.dropped).load(atomic::Ordering::Acquire) }
235    }
236
237    /// Returns `true` if the completion queue ring is overflown.
238    pub fn cq_overflow(&self) -> bool {
239        unsafe {
240            sys::IoringSqFlags::from_bits_retain(
241                (*self.queue.flags).load(atomic::Ordering::Acquire),
242            )
243            .contains(sys::IoringSqFlags::CQ_OVERFLOW)
244        }
245    }
246
247    /// Returns `true` if completions are pending that should be processed. Only relevant when used
248    /// in conjuction with the `setup_taskrun_flag` function. Available since 5.19.
249    pub fn taskrun(&self) -> bool {
250        unsafe {
251            (*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IoringSqFlags::TASKRUN.bits()
252                != 0
253        }
254    }
255
256    /// Get the total number of entries in the submission queue ring buffer.
257    #[inline]
258    pub fn capacity(&self) -> usize {
259        self.queue.ring_entries as usize
260    }
261
262    /// Get the number of submission queue events in the ring buffer.
263    #[inline]
264    pub fn len(&self) -> usize {
265        self.tail.wrapping_sub(self.head) as usize
266    }
267
268    /// Returns `true` if the submission queue ring buffer is empty.
269    #[inline]
270    pub fn is_empty(&self) -> bool {
271        self.len() == 0
272    }
273
274    /// Returns `true` if the submission queue ring buffer has reached capacity, and no more events
275    /// can be added before the kernel consumes some.
276    #[inline]
277    pub fn is_full(&self) -> bool {
278        self.len() == self.capacity()
279    }
280
281    /// Attempts to push an entry into the queue.
282    /// If the queue is full, an error is returned.
283    ///
284    /// # Safety
285    ///
286    /// Developers must ensure that parameters of the entry (such as buffer) are valid and will
287    /// be valid for the entire duration of the operation, otherwise it may cause memory problems.
288    #[inline]
289    pub unsafe fn push(&mut self, entry: &E) -> Result<(), PushError> {
290        if !self.is_full() {
291            self.push_unchecked(entry);
292            Ok(())
293        } else {
294            Err(PushError)
295        }
296    }
297
298    /// Attempts to push several entries into the queue.
299    /// If the queue does not have space for all of the entries, an error is returned.
300    ///
301    /// # Safety
302    ///
303    /// Developers must ensure that parameters of all the entries (such as buffer) are valid and
304    /// will be valid for the entire duration of the operation, otherwise it may cause memory
305    /// problems.
306    #[inline]
307    pub unsafe fn push_multiple(&mut self, entries: &[E]) -> Result<(), PushError> {
308        if self.capacity() - self.len() < entries.len() {
309            return Err(PushError);
310        }
311
312        for entry in entries {
313            self.push_unchecked(entry);
314        }
315
316        Ok(())
317    }
318
319    #[inline]
320    unsafe fn push_unchecked(&mut self, entry: &E) {
321        *self
322            .queue
323            .sqes
324            .add((self.tail & self.queue.ring_mask) as usize) = entry.clone();
325        self.tail = self.tail.wrapping_add(1);
326    }
327}
328
329impl<E: EntryMarker> Drop for SubmissionQueue<'_, E> {
330    #[inline]
331    fn drop(&mut self) {
332        unsafe { &*self.queue.tail }.store(self.tail, atomic::Ordering::Release);
333    }
334}
335
336impl Entry {
337    /// Set the submission event's [flags](Flags).
338    #[inline]
339    pub fn flags(mut self, flags: Flags) -> Entry {
340        self.0.flags |= sys::IoringSqeFlags::from_bits(flags.bits()).unwrap();
341        self
342    }
343
344    /// Set the user data. This is an application-supplied value that will be passed straight
345    /// through into the [completion queue entry](crate::cqueue::Entry::user_data).
346    #[inline]
347    pub fn user_data(mut self, user_data: impl Into<sys::io_uring_user_data>) -> Entry {
348        self.0.user_data = user_data.into();
349        self
350    }
351
352    /// Get the previously application-supplied user data.
353    #[inline]
354    pub fn get_user_data(&self) -> sys::io_uring_user_data {
355        self.0.user_data
356    }
357
358    /// Set the personality of this event. You can obtain a personality using
359    /// [`Submitter::register_personality`](crate::Submitter::register_personality).
360    #[inline]
361    pub fn personality(mut self, personality: u16) -> Entry {
362        self.0.personality = personality;
363        self
364    }
365}
366
367impl private::Sealed for Entry {}
368
369impl EntryMarker for Entry {
370    const BUILD_FLAGS: sys::IoringSetupFlags = sys::IoringSetupFlags::empty();
371}
372
373impl Clone for Entry {
374    #[inline(always)]
375    fn clone(&self) -> Entry {
376        // io_uring_sqe doesn't implement Clone due to the 'cmd' incomplete array field.
377        Entry(unsafe { mem::transmute_copy(&self.0) })
378    }
379}
380
381impl Debug for Entry {
382    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
383        f.debug_struct("Entry")
384            .field("op_code", &self.0.opcode)
385            .field("flags", &self.0.flags)
386            .field("user_data", &self.0.user_data)
387            .finish()
388    }
389}
390
391impl Entry128 {
392    /// Set the submission event's [flags](Flags).
393    #[inline]
394    pub fn flags(mut self, flags: Flags) -> Entry128 {
395        self.0 .0.flags |= sys::IoringSqeFlags::from_bits(flags.bits()).unwrap();
396        self
397    }
398
399    /// Set the user data. This is an application-supplied value that will be passed straight
400    /// through into the [completion queue entry](crate::cqueue::Entry::user_data).
401    #[inline]
402    pub fn user_data(mut self, user_data: impl Into<sys::io_uring_user_data>) -> Entry128 {
403        self.0 .0.user_data = user_data.into();
404        self
405    }
406
407    /// Get the previously application-supplied user data.
408    #[inline]
409    pub fn get_user_data(&self) -> sys::io_uring_user_data {
410        self.0 .0.user_data
411    }
412
413    /// Set the personality of this event. You can obtain a personality using
414    /// [`Submitter::register_personality`](crate::Submitter::register_personality).
415    #[inline]
416    pub fn personality(mut self, personality: u16) -> Entry128 {
417        self.0 .0.personality = personality;
418        self
419    }
420}
421
422impl private::Sealed for Entry128 {}
423
424impl EntryMarker for Entry128 {
425    const BUILD_FLAGS: sys::IoringSetupFlags = sys::IoringSetupFlags::SQE128;
426}
427
428impl From<Entry> for Entry128 {
429    fn from(entry: Entry) -> Entry128 {
430        Entry128(entry, [0u8; 64])
431    }
432}
433
434impl Debug for Entry128 {
435    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
436        f.debug_struct("Entry128")
437            .field("op_code", &self.0 .0.opcode)
438            .field("flags", &self.0 .0.flags)
439            .field("user_data", &self.0 .0.user_data)
440            .finish()
441    }
442}
443
444/// An error pushing to the submission queue due to it being full.
445#[derive(Debug, Clone, PartialEq, Eq)]
446#[non_exhaustive]
447pub struct PushError;
448
449impl Display for PushError {
450    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
451        f.write_str("submission queue is full")
452    }
453}
454
455#[cfg(feature = "std")]
456impl Error for PushError {}
457
458impl<E: EntryMarker> Debug for SubmissionQueue<'_, E> {
459    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
460        let mut d = f.debug_list();
461        let mut pos = self.head;
462        while pos != self.tail {
463            let entry: &E = unsafe { &*self.queue.sqes.add((pos & self.queue.ring_mask) as usize) };
464            d.entry(&entry);
465            pos = pos.wrapping_add(1);
466        }
467        d.finish()
468    }
469}