io_uring/
squeue.rs

1//! Submission Queue
2
3use std::error::Error;
4use std::fmt::{self, Debug, Display, Formatter};
5use std::mem;
6use std::sync::atomic;
7
8use crate::sys;
9use crate::util::{private, unsync_load, Mmap};
10
11use bitflags::bitflags;
12
13pub(crate) struct Inner<E: EntryMarker> {
14    pub(crate) head: *const atomic::AtomicU32,
15    pub(crate) tail: *const atomic::AtomicU32,
16    pub(crate) ring_mask: u32,
17    pub(crate) ring_entries: u32,
18    pub(crate) flags: *const atomic::AtomicU32,
19    dropped: *const atomic::AtomicU32,
20
21    pub(crate) sqes: *mut E,
22}
23
24/// An io_uring instance's submission queue. This is used to send I/O requests to the kernel.
25pub struct SubmissionQueue<'a, E: EntryMarker = Entry> {
26    head: u32,
27    tail: u32,
28    queue: &'a Inner<E>,
29}
30
31/// A submission queue entry (SQE), representing a request for an I/O operation.
32///
33/// This is implemented for [`Entry`] and [`Entry128`].
34pub trait EntryMarker: Clone + Debug + From<Entry> + private::Sealed {
35    const BUILD_FLAGS: u32;
36}
37
38/// A 64-byte submission queue entry (SQE), representing a request for an I/O operation.
39///
40/// These can be created via opcodes in [`opcode`](crate::opcode).
41#[repr(C)]
42pub struct Entry(pub(crate) sys::io_uring_sqe);
43
44/// A 128-byte submission queue entry (SQE), representing a request for an I/O operation.
45///
46/// These can be created via opcodes in [`opcode`](crate::opcode).
47#[repr(C)]
48#[derive(Clone)]
49pub struct Entry128(pub(crate) Entry, pub(crate) [u8; 64]);
50
51#[test]
52fn test_entry_sizes() {
53    assert_eq!(mem::size_of::<Entry>(), 64);
54    assert_eq!(mem::size_of::<Entry128>(), 128);
55}
56
57bitflags! {
58    /// Submission flags
59    #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
60    pub struct Flags: u8 {
61        /// When this flag is specified,
62        /// `fd` is an index into the files array registered with the io_uring instance.
63        #[doc(hidden)]
64        const FIXED_FILE = 1 << sys::IOSQE_FIXED_FILE_BIT;
65
66        /// When this flag is specified,
67        /// the SQE will not be started before previously submitted SQEs have completed,
68        /// and new SQEs will not be started before this one completes.
69        const IO_DRAIN = 1 << sys::IOSQE_IO_DRAIN_BIT;
70
71        /// When this flag is specified,
72        /// it forms a link with the next SQE in the submission ring.
73        /// That next SQE will not be started before this one completes.
74        const IO_LINK = 1 << sys::IOSQE_IO_LINK_BIT;
75
76        /// Like [`IO_LINK`](Self::IO_LINK), but it doesn’t sever regardless of the completion
77        /// result.
78        const IO_HARDLINK = 1 << sys::IOSQE_IO_HARDLINK_BIT;
79
80        /// Normal operation for io_uring is to try and issue an sqe as non-blocking first,
81        /// and if that fails, execute it in an async manner.
82        ///
83        /// To support more efficient overlapped operation of requests
84        /// that the application knows/assumes will always (or most of the time) block,
85        /// the application can ask for an sqe to be issued async from the start.
86        const ASYNC = 1 << sys::IOSQE_ASYNC_BIT;
87
88        /// Conceptually the kernel holds a set of buffers organized into groups. When you issue a
89        /// request with this flag and set `buf_group` to a valid buffer group ID (e.g.
90        /// [`buf_group` on `Read`](crate::opcode::Read::buf_group)) then once the file descriptor
91        /// becomes ready the kernel will try to take a buffer from the group.
92        ///
93        /// If there are no buffers in the group, your request will fail with `-ENOBUFS`. Otherwise,
94        /// the corresponding [`cqueue::Entry::flags`](crate::cqueue::Entry::flags) will contain the
95        /// chosen buffer ID, encoded with:
96        ///
97        /// ```text
98        /// (buffer_id << IORING_CQE_BUFFER_SHIFT) | IORING_CQE_F_BUFFER
99        /// ```
100        ///
101        /// You can use [`buffer_select`](crate::cqueue::buffer_select) to take the buffer ID.
102        ///
103        /// The buffer will then be removed from the group and won't be usable by other requests
104        /// anymore.
105        ///
106        /// You can provide new buffers in a group with
107        /// [`ProvideBuffers`](crate::opcode::ProvideBuffers).
108        ///
109        /// See also [the LWN thread on automatic buffer
110        /// selection](https://lwn.net/Articles/815491/).
111        const BUFFER_SELECT = 1 << sys::IOSQE_BUFFER_SELECT_BIT;
112
113        /// Don't post CQE if request succeeded.
114        const SKIP_SUCCESS = 1 << sys::IOSQE_CQE_SKIP_SUCCESS_BIT;
115    }
116}
117
118impl<E: EntryMarker> Inner<E> {
119    #[rustfmt::skip]
120    pub(crate) unsafe fn new(
121        sq_mmap: &Mmap,
122        sqe_mmap: &Mmap,
123        p: &sys::io_uring_params,
124    ) -> Self {
125        let head         = sq_mmap.offset(p.sq_off.head        ) as *const atomic::AtomicU32;
126        let tail         = sq_mmap.offset(p.sq_off.tail        ) as *const atomic::AtomicU32;
127        let ring_mask    = sq_mmap.offset(p.sq_off.ring_mask   ).cast::<u32>().read();
128        let ring_entries = sq_mmap.offset(p.sq_off.ring_entries).cast::<u32>().read();
129        let flags        = sq_mmap.offset(p.sq_off.flags       ) as *const atomic::AtomicU32;
130        let dropped      = sq_mmap.offset(p.sq_off.dropped     ) as *const atomic::AtomicU32;
131        let array        = sq_mmap.offset(p.sq_off.array       ) as *mut u32;
132
133        let sqes         = sqe_mmap.as_mut_ptr() as *mut E;
134
135        // To keep it simple, map it directly to `sqes`.
136        for i in 0..ring_entries {
137            array.add(i as usize).write_volatile(i);
138        }
139
140        Self {
141            head,
142            tail,
143            ring_mask,
144            ring_entries,
145            flags,
146            dropped,
147            sqes,
148        }
149    }
150
151    #[inline]
152    pub(crate) unsafe fn borrow_shared(&self) -> SubmissionQueue<'_, E> {
153        SubmissionQueue {
154            head: (*self.head).load(atomic::Ordering::Acquire),
155            tail: unsync_load(self.tail),
156            queue: self,
157        }
158    }
159
160    #[inline]
161    pub(crate) fn borrow(&mut self) -> SubmissionQueue<'_, E> {
162        unsafe { self.borrow_shared() }
163    }
164}
165
166impl<E: EntryMarker> SubmissionQueue<'_, E> {
167    /// Synchronize this type with the real submission queue.
168    ///
169    /// This will flush any entries added by [`push`](Self::push) or
170    /// [`push_multiple`](Self::push_multiple) and will update the queue's length if the kernel has
171    /// consumed some entries in the meantime.
172    #[inline]
173    pub fn sync(&mut self) {
174        unsafe {
175            (*self.queue.tail).store(self.tail, atomic::Ordering::Release);
176            self.head = (*self.queue.head).load(atomic::Ordering::Acquire);
177        }
178    }
179
180    /// When [`is_setup_sqpoll`](crate::Parameters::is_setup_sqpoll) is set, whether the kernel
181    /// threads has gone to sleep and requires a system call to wake it up.
182    ///
183    /// A result of `false` is only meaningful if the function was called after the latest update
184    /// to the queue head. Other interpretations could lead to a race condition where the kernel
185    /// concurrently put the device to sleep and no further progress is made.
186    #[inline]
187    pub fn need_wakeup(&self) -> bool {
188        // See discussions that happened in [#197] and its linked threads in liburing. We need to
189        // ensure that writes to the head have been visible _to the kernel_ if this load results in
190        // decision to sleep. This is solved with a SeqCst fence. There is no common modified
191        // memory location that would provide alternative synchronization.
192        //
193        // The kernel, from its sequencing, first writes the wake flag, then performs a full
194        // barrier (`smp_mb`, or `smp_mb__after_atomic`), then reads the head. We assume that our
195        // user first writes the head and then reads the `need_wakeup` flag as documented. It is
196        // necessary to ensure that at least one observes the other write. By establishing a point
197        // of sequential consistency on both sides between their respective write and read, at
198        // least one coherency order holds. With regards to the interpretation of the atomic memory
199        // model of Rust (that is, that of C++20) we're assuming that an `smp_mb` provides at least
200        // the effect of a `fence(SeqCst)`.
201        //
202        // [#197]: https://github.com/tokio-rs/io-uring/issues/197
203        atomic::fence(atomic::Ordering::SeqCst);
204        unsafe {
205            (*self.queue.flags).load(atomic::Ordering::Relaxed) & sys::IORING_SQ_NEED_WAKEUP != 0
206        }
207    }
208
209    /// The effect of [`Self::need_wakeup`], after synchronization work performed by the caller.
210    ///
211    /// This function should only be called if the caller can guarantee that a `SeqCst` fence has
212    /// been inserted after the last write to the queue's head. The function is then a little more
213    /// efficient by avoiding to perform one itself.
214    ///
215    /// Failure to uphold the precondition can result in an effective dead-lock due to a sleeping
216    /// device.
217    #[inline]
218    pub fn need_wakeup_after_intermittent_seqcst(&self) -> bool {
219        unsafe {
220            (*self.queue.flags).load(atomic::Ordering::Relaxed) & sys::IORING_SQ_NEED_WAKEUP != 0
221        }
222    }
223
224    /// The number of invalid submission queue entries that have been encountered in the ring
225    /// buffer.
226    pub fn dropped(&self) -> u32 {
227        unsafe { (*self.queue.dropped).load(atomic::Ordering::Acquire) }
228    }
229
230    /// Returns `true` if the completion queue ring is overflown.
231    pub fn cq_overflow(&self) -> bool {
232        unsafe {
233            (*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_CQ_OVERFLOW != 0
234        }
235    }
236
237    /// Returns `true` if completions are pending that should be processed. Only relevant when used
238    /// in conjuction with the `setup_taskrun_flag` function. Available since 5.19.
239    pub fn taskrun(&self) -> bool {
240        unsafe { (*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_TASKRUN != 0 }
241    }
242
243    /// Get the total number of entries in the submission queue ring buffer.
244    #[inline]
245    pub fn capacity(&self) -> usize {
246        self.queue.ring_entries as usize
247    }
248
249    /// Get the number of submission queue events in the ring buffer.
250    #[inline]
251    pub fn len(&self) -> usize {
252        self.tail.wrapping_sub(self.head) as usize
253    }
254
255    /// Returns `true` if the submission queue ring buffer is empty.
256    #[inline]
257    pub fn is_empty(&self) -> bool {
258        self.len() == 0
259    }
260
261    /// Returns `true` if the submission queue ring buffer has reached capacity, and no more events
262    /// can be added before the kernel consumes some.
263    #[inline]
264    pub fn is_full(&self) -> bool {
265        self.len() == self.capacity()
266    }
267
268    /// Attempts to push an entry into the queue.
269    /// If the queue is full, an error is returned.
270    ///
271    /// # Safety
272    ///
273    /// Developers must ensure that parameters of the entry (such as buffer) are valid and will
274    /// be valid for the entire duration of the operation, otherwise it may cause memory problems.
275    #[inline]
276    pub unsafe fn push(&mut self, entry: &E) -> Result<(), PushError> {
277        if !self.is_full() {
278            self.push_unchecked(entry);
279            Ok(())
280        } else {
281            Err(PushError)
282        }
283    }
284
285    /// Attempts to push several entries into the queue.
286    /// If the queue does not have space for all of the entries, an error is returned.
287    ///
288    /// # Safety
289    ///
290    /// Developers must ensure that parameters of all the entries (such as buffer) are valid and
291    /// will be valid for the entire duration of the operation, otherwise it may cause memory
292    /// problems.
293    #[inline]
294    pub unsafe fn push_multiple(&mut self, entries: &[E]) -> Result<(), PushError> {
295        if self.capacity() - self.len() < entries.len() {
296            return Err(PushError);
297        }
298
299        for entry in entries {
300            self.push_unchecked(entry);
301        }
302
303        Ok(())
304    }
305
306    #[inline]
307    unsafe fn push_unchecked(&mut self, entry: &E) {
308        *self
309            .queue
310            .sqes
311            .add((self.tail & self.queue.ring_mask) as usize) = entry.clone();
312        self.tail = self.tail.wrapping_add(1);
313    }
314}
315
316impl<E: EntryMarker> Drop for SubmissionQueue<'_, E> {
317    #[inline]
318    fn drop(&mut self) {
319        unsafe { &*self.queue.tail }.store(self.tail, atomic::Ordering::Release);
320    }
321}
322
323impl Entry {
324    /// Set the submission event's [flags](Flags).
325    #[inline]
326    pub fn flags(mut self, flags: Flags) -> Entry {
327        self.0.flags |= flags.bits();
328        self
329    }
330
331    /// Clear the submission event's [flags](Flags).
332    #[inline]
333    pub fn clear_flags(mut self) -> Entry {
334        self.0.flags = 0;
335        self
336    }
337
338    /// Set the user data. This is an application-supplied value that will be passed straight
339    /// through into the [completion queue entry](crate::cqueue::Entry::user_data).
340    #[inline]
341    pub fn user_data(mut self, user_data: u64) -> Entry {
342        self.0.user_data = user_data;
343        self
344    }
345
346    /// Set the user_data without consuming the entry.
347    #[inline]
348    pub fn set_user_data(&mut self, user_data: u64) {
349        self.0.user_data = user_data;
350    }
351
352    /// Get the previously application-supplied user data.
353    #[inline]
354    pub fn get_user_data(&self) -> u64 {
355        self.0.user_data
356    }
357
358    /// Get the opcode associated with this entry.
359    #[inline]
360    pub fn get_opcode(&self) -> u32 {
361        self.0.opcode.into()
362    }
363
364    /// Set the personality of this event. You can obtain a personality using
365    /// [`Submitter::register_personality`](crate::Submitter::register_personality).
366    pub fn personality(mut self, personality: u16) -> Entry {
367        self.0.personality = personality;
368        self
369    }
370}
371
372impl private::Sealed for Entry {}
373
374impl EntryMarker for Entry {
375    const BUILD_FLAGS: u32 = 0;
376}
377
378impl Clone for Entry {
379    #[inline(always)]
380    fn clone(&self) -> Entry {
381        // io_uring_sqe doesn't implement Clone due to the 'cmd' incomplete array field.
382        Entry(unsafe { mem::transmute_copy(&self.0) })
383    }
384}
385
386impl Debug for Entry {
387    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
388        f.debug_struct("Entry")
389            .field("op_code", &self.0.opcode)
390            .field("flags", &self.0.flags)
391            .field("user_data", &self.0.user_data)
392            .finish()
393    }
394}
395
396impl Entry128 {
397    /// Set the submission event's [flags](Flags).
398    #[inline]
399    pub fn flags(mut self, flags: Flags) -> Entry128 {
400        self.0 .0.flags |= flags.bits();
401        self
402    }
403
404    /// Clear the submission event's [flags](Flags).
405    #[inline]
406    pub fn clear_flags(mut self) -> Entry128 {
407        self.0 .0.flags = 0;
408        self
409    }
410
411    /// Set the user data. This is an application-supplied value that will be passed straight
412    /// through into the [completion queue entry](crate::cqueue::Entry::user_data).
413    #[inline]
414    pub fn user_data(mut self, user_data: u64) -> Entry128 {
415        self.0 .0.user_data = user_data;
416        self
417    }
418
419    /// Set the user data without consuming the entry.
420    #[inline]
421    pub fn set_user_data(&mut self, user_data: u64) {
422        self.0 .0.user_data = user_data;
423    }
424
425    /// Set the personality of this event. You can obtain a personality using
426    /// [`Submitter::register_personality`](crate::Submitter::register_personality).
427    #[inline]
428    pub fn personality(mut self, personality: u16) -> Entry128 {
429        self.0 .0.personality = personality;
430        self
431    }
432
433    /// Get the opcode associated with this entry.
434    #[inline]
435    pub fn get_opcode(&self) -> u32 {
436        self.0 .0.opcode.into()
437    }
438}
439
440impl private::Sealed for Entry128 {}
441
442impl EntryMarker for Entry128 {
443    const BUILD_FLAGS: u32 = sys::IORING_SETUP_SQE128;
444}
445
446impl From<Entry> for Entry128 {
447    fn from(entry: Entry) -> Entry128 {
448        Entry128(entry, [0u8; 64])
449    }
450}
451
452impl Debug for Entry128 {
453    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
454        f.debug_struct("Entry128")
455            .field("op_code", &self.0 .0.opcode)
456            .field("flags", &self.0 .0.flags)
457            .field("user_data", &self.0 .0.user_data)
458            .finish()
459    }
460}
461
462/// An error pushing to the submission queue due to it being full.
463#[derive(Debug, Clone, PartialEq, Eq)]
464#[non_exhaustive]
465pub struct PushError;
466
467impl Display for PushError {
468    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
469        f.write_str("submission queue is full")
470    }
471}
472
473impl Error for PushError {}
474
475impl<E: EntryMarker> Debug for SubmissionQueue<'_, E> {
476    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
477        let mut d = f.debug_list();
478        let mut pos = self.head;
479        while pos != self.tail {
480            let entry: &E = unsafe { &*self.queue.sqes.add((pos & self.queue.ring_mask) as usize) };
481            d.entry(&entry);
482            pos = pos.wrapping_add(1);
483        }
484        d.finish()
485    }
486}