Skip to main content

io_uring/
squeue.rs

1//! Submission Queue
2
3use std::error::Error;
4use std::fmt::{self, Debug, Display, Formatter};
5use std::mem;
6use std::sync::atomic;
7
8use crate::sys;
9use crate::util::{private, unsync_load, Mmap};
10
11use bitflags::bitflags;
12
13pub(crate) struct Inner<E: EntryMarker> {
14    pub(crate) head: *const atomic::AtomicU32,
15    pub(crate) tail: *const atomic::AtomicU32,
16    pub(crate) ring_mask: u32,
17    pub(crate) ring_entries: u32,
18    pub(crate) flags: *const atomic::AtomicU32,
19    dropped: *const atomic::AtomicU32,
20
21    pub(crate) sqes: *mut E,
22}
23
24/// An io_uring instance's submission queue. This is used to send I/O requests to the kernel.
25pub struct SubmissionQueue<'a, E: EntryMarker = Entry> {
26    head: u32,
27    tail: u32,
28    queue: &'a Inner<E>,
29}
30
31/// A submission queue entry (SQE), representing a request for an I/O operation.
32///
33/// This is implemented for [`Entry`] and [`Entry128`].
34pub trait EntryMarker: Clone + Debug + From<Entry> + private::Sealed {
35    const BUILD_FLAGS: u32;
36}
37
38/// A 64-byte submission queue entry (SQE), representing a request for an I/O operation.
39///
40/// These can be created via opcodes in [`opcode`](crate::opcode).
41///
42/// # Example
43///
44/// ```
45/// use io_uring::{opcode, types};
46/// use std::ffi::CString;
47///
48/// let path = CString::new("/etc/passwd").unwrap();
49///
50/// // Create an Entry to open /etc/passwd for reading
51/// let entry = opcode::OpenAt::new(types::Fd(libc::AT_FDCWD), path.as_ptr())
52///     .flags(libc::O_RDONLY)
53///     .build()
54///     .user_data(0x42);
55/// ```
56#[repr(C)]
57pub struct Entry(pub(crate) sys::io_uring_sqe);
58
59/// A 128-byte submission queue entry (SQE), representing a request for an I/O operation.
60///
61/// These can be created via opcodes in [`opcode`](crate::opcode), or by converting
62/// from an [`Entry`] using the [`From`] trait.
63///
64/// # Example
65///
66/// ```
67/// use io_uring::{opcode, squeue::Entry128, types};
68/// use std::ffi::CString;
69///
70/// let path = CString::new("/etc/passwd").unwrap();
71///
72/// // Create an Entry128 to open /etc/passwd for reading
73/// let entry = Entry128::from(
74///     opcode::OpenAt::new(types::Fd(libc::AT_FDCWD), path.as_ptr())
75///         .flags(libc::O_RDONLY)
76///         .build()
77///         .user_data(0x42)
78/// );
79/// ```
80#[repr(C)]
81#[derive(Clone)]
82pub struct Entry128(pub(crate) Entry, pub(crate) [u8; 64]);
83
84#[test]
85fn test_entry_sizes() {
86    assert_eq!(mem::size_of::<Entry>(), 64);
87    assert_eq!(mem::size_of::<Entry128>(), 128);
88}
89
90bitflags! {
91    /// Submission flags
92    #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
93    pub struct Flags: u8 {
94        /// When this flag is specified,
95        /// `fd` is an index into the files array registered with the io_uring instance.
96        #[doc(hidden)]
97        const FIXED_FILE = 1 << sys::IOSQE_FIXED_FILE_BIT;
98
99        /// When this flag is specified,
100        /// the SQE will not be started before previously submitted SQEs have completed,
101        /// and new SQEs will not be started before this one completes.
102        const IO_DRAIN = 1 << sys::IOSQE_IO_DRAIN_BIT;
103
104        /// When this flag is specified,
105        /// it forms a link with the next SQE in the submission ring.
106        /// That next SQE will not be started before this one completes.
107        const IO_LINK = 1 << sys::IOSQE_IO_LINK_BIT;
108
109        /// Like [`IO_LINK`](Self::IO_LINK), but it doesn’t sever regardless of the completion
110        /// result.
111        const IO_HARDLINK = 1 << sys::IOSQE_IO_HARDLINK_BIT;
112
113        /// Normal operation for io_uring is to try and issue an sqe as non-blocking first,
114        /// and if that fails, execute it in an async manner.
115        ///
116        /// To support more efficient overlapped operation of requests
117        /// that the application knows/assumes will always (or most of the time) block,
118        /// the application can ask for an sqe to be issued async from the start.
119        const ASYNC = 1 << sys::IOSQE_ASYNC_BIT;
120
121        /// Conceptually the kernel holds a set of buffers organized into groups. When you issue a
122        /// request with this flag and set `buf_group` to a valid buffer group ID (e.g.
123        /// [`buf_group` on `Read`](crate::opcode::Read::buf_group)) then once the file descriptor
124        /// becomes ready the kernel will try to take a buffer from the group.
125        ///
126        /// If there are no buffers in the group, your request will fail with `-ENOBUFS`. Otherwise,
127        /// the corresponding [`cqueue::Entry::flags`](crate::cqueue::Entry::flags) will contain the
128        /// chosen buffer ID, encoded with:
129        ///
130        /// ```text
131        /// (buffer_id << IORING_CQE_BUFFER_SHIFT) | IORING_CQE_F_BUFFER
132        /// ```
133        ///
134        /// You can use [`buffer_select`](crate::cqueue::buffer_select) to take the buffer ID.
135        ///
136        /// The buffer will then be removed from the group and won't be usable by other requests
137        /// anymore.
138        ///
139        /// You can provide new buffers in a group with
140        /// [`ProvideBuffers`](crate::opcode::ProvideBuffers).
141        ///
142        /// See also [the LWN thread on automatic buffer
143        /// selection](https://lwn.net/Articles/815491/).
144        const BUFFER_SELECT = 1 << sys::IOSQE_BUFFER_SELECT_BIT;
145
146        /// Don't post CQE if request succeeded.
147        const SKIP_SUCCESS = 1 << sys::IOSQE_CQE_SKIP_SUCCESS_BIT;
148    }
149}
150
151impl<E: EntryMarker> Inner<E> {
152    #[rustfmt::skip]
153    pub(crate) unsafe fn new(
154        sq_mmap: &Mmap,
155        sqe_mmap: &Mmap,
156        p: &sys::io_uring_params,
157    ) -> Self {
158        let head         = sq_mmap.offset(p.sq_off.head        ) as *const atomic::AtomicU32;
159        let tail         = sq_mmap.offset(p.sq_off.tail        ) as *const atomic::AtomicU32;
160        let ring_mask    = sq_mmap.offset(p.sq_off.ring_mask   ).cast::<u32>().read();
161        let ring_entries = sq_mmap.offset(p.sq_off.ring_entries).cast::<u32>().read();
162        let flags        = sq_mmap.offset(p.sq_off.flags       ) as *const atomic::AtomicU32;
163        let dropped      = sq_mmap.offset(p.sq_off.dropped     ) as *const atomic::AtomicU32;
164        let sqes         = sqe_mmap.as_mut_ptr() as *mut E;
165
166        // Initialize the SQ array with an identity mapping unless NO_SQARRAY is set, in which case
167        // the kernel consumes SQEs directly by ring index and no array exists.
168        if p.flags & sys::IORING_SETUP_NO_SQARRAY == 0 {
169            let array = sq_mmap.offset(p.sq_off.array) as *mut u32;
170            for i in 0..ring_entries {
171                array.add(i as usize).write_volatile(i);
172            }
173        }
174
175        Self {
176            head,
177            tail,
178            ring_mask,
179            ring_entries,
180            flags,
181            dropped,
182            sqes,
183        }
184    }
185
186    #[inline]
187    pub(crate) unsafe fn borrow_shared(&self) -> SubmissionQueue<'_, E> {
188        SubmissionQueue {
189            head: (*self.head).load(atomic::Ordering::Acquire),
190            tail: unsync_load(self.tail),
191            queue: self,
192        }
193    }
194
195    #[inline]
196    pub(crate) fn borrow(&mut self) -> SubmissionQueue<'_, E> {
197        unsafe { self.borrow_shared() }
198    }
199}
200
201impl<E: EntryMarker> SubmissionQueue<'_, E> {
202    /// Synchronize this type with the real submission queue.
203    ///
204    /// This will flush any entries added by [`push`](Self::push) or
205    /// [`push_multiple`](Self::push_multiple) and will update the queue's length if the kernel has
206    /// consumed some entries in the meantime.
207    #[inline]
208    pub fn sync(&mut self) {
209        unsafe {
210            (*self.queue.tail).store(self.tail, atomic::Ordering::Release);
211            self.head = (*self.queue.head).load(atomic::Ordering::Acquire);
212        }
213    }
214
215    /// When [`is_setup_sqpoll`](crate::Parameters::is_setup_sqpoll) is set, whether the kernel
216    /// threads has gone to sleep and requires a system call to wake it up.
217    ///
218    /// A result of `false` is only meaningful if the function was called after the latest update
219    /// to the queue head. Other interpretations could lead to a race condition where the kernel
220    /// concurrently put the device to sleep and no further progress is made.
221    #[inline]
222    pub fn need_wakeup(&self) -> bool {
223        // See discussions that happened in [#197] and its linked threads in liburing. We need to
224        // ensure that writes to the head have been visible _to the kernel_ if this load results in
225        // decision to sleep. This is solved with a SeqCst fence. There is no common modified
226        // memory location that would provide alternative synchronization.
227        //
228        // The kernel, from its sequencing, first writes the wake flag, then performs a full
229        // barrier (`smp_mb`, or `smp_mb__after_atomic`), then reads the head. We assume that our
230        // user first writes the head and then reads the `need_wakeup` flag as documented. It is
231        // necessary to ensure that at least one observes the other write. By establishing a point
232        // of sequential consistency on both sides between their respective write and read, at
233        // least one coherency order holds. With regards to the interpretation of the atomic memory
234        // model of Rust (that is, that of C++20) we're assuming that an `smp_mb` provides at least
235        // the effect of a `fence(SeqCst)`.
236        //
237        // [#197]: https://github.com/tokio-rs/io-uring/issues/197
238        atomic::fence(atomic::Ordering::SeqCst);
239        unsafe {
240            (*self.queue.flags).load(atomic::Ordering::Relaxed) & sys::IORING_SQ_NEED_WAKEUP != 0
241        }
242    }
243
244    /// The effect of [`Self::need_wakeup`], after synchronization work performed by the caller.
245    ///
246    /// This function should only be called if the caller can guarantee that a `SeqCst` fence has
247    /// been inserted after the last write to the queue's head. The function is then a little more
248    /// efficient by avoiding to perform one itself.
249    ///
250    /// Failure to uphold the precondition can result in an effective dead-lock due to a sleeping
251    /// device.
252    #[inline]
253    pub fn need_wakeup_after_intermittent_seqcst(&self) -> bool {
254        unsafe {
255            (*self.queue.flags).load(atomic::Ordering::Relaxed) & sys::IORING_SQ_NEED_WAKEUP != 0
256        }
257    }
258
259    /// The number of invalid submission queue entries that have been encountered in the ring
260    /// buffer.
261    pub fn dropped(&self) -> u32 {
262        unsafe { (*self.queue.dropped).load(atomic::Ordering::Acquire) }
263    }
264
265    /// Returns `true` if the completion queue ring is overflown.
266    pub fn cq_overflow(&self) -> bool {
267        unsafe {
268            (*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_CQ_OVERFLOW != 0
269        }
270    }
271
272    /// Returns `true` if completions are pending that should be processed. Only relevant when used
273    /// in conjuction with the `setup_taskrun_flag` function. Available since 5.19.
274    pub fn taskrun(&self) -> bool {
275        unsafe { (*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_TASKRUN != 0 }
276    }
277
278    /// Get the total number of entries in the submission queue ring buffer.
279    #[inline]
280    pub fn capacity(&self) -> usize {
281        self.queue.ring_entries as usize
282    }
283
284    /// Get the number of submission queue events in the ring buffer.
285    #[inline]
286    pub fn len(&self) -> usize {
287        self.tail.wrapping_sub(self.head) as usize
288    }
289
290    /// Returns `true` if the submission queue ring buffer is empty.
291    #[inline]
292    pub fn is_empty(&self) -> bool {
293        self.len() == 0
294    }
295
296    /// Returns `true` if the submission queue ring buffer has reached capacity, and no more events
297    /// can be added before the kernel consumes some.
298    #[inline]
299    pub fn is_full(&self) -> bool {
300        self.len() == self.capacity()
301    }
302
303    /// Attempts to push an entry into the queue.
304    /// If the queue is full, an error is returned.
305    ///
306    /// # Safety
307    ///
308    /// Developers must ensure that parameters of the entry (such as buffer) are valid and will
309    /// be valid for the entire duration of the operation, otherwise it may cause memory problems.
310    #[inline]
311    pub unsafe fn push(&mut self, entry: &E) -> Result<(), PushError> {
312        if !self.is_full() {
313            self.push_unchecked(entry);
314            Ok(())
315        } else {
316            Err(PushError)
317        }
318    }
319
320    /// Attempts to push several entries into the queue.
321    /// If the queue does not have space for all of the entries, an error is returned.
322    ///
323    /// # Safety
324    ///
325    /// Developers must ensure that parameters of all the entries (such as buffer) are valid and
326    /// will be valid for the entire duration of the operation, otherwise it may cause memory
327    /// problems.
328    #[inline]
329    pub unsafe fn push_multiple(&mut self, entries: &[E]) -> Result<(), PushError> {
330        if self.capacity() - self.len() < entries.len() {
331            return Err(PushError);
332        }
333
334        for entry in entries {
335            self.push_unchecked(entry);
336        }
337
338        Ok(())
339    }
340
341    #[inline]
342    unsafe fn push_unchecked(&mut self, entry: &E) {
343        *self
344            .queue
345            .sqes
346            .add((self.tail & self.queue.ring_mask) as usize) = entry.clone();
347        self.tail = self.tail.wrapping_add(1);
348    }
349}
350
351impl<E: EntryMarker> Drop for SubmissionQueue<'_, E> {
352    #[inline]
353    fn drop(&mut self) {
354        unsafe { &*self.queue.tail }.store(self.tail, atomic::Ordering::Release);
355    }
356}
357
358impl Entry {
359    /// Set the submission event's [flags](Flags).
360    #[inline]
361    pub fn flags(mut self, flags: Flags) -> Entry {
362        self.0.flags |= flags.bits();
363        self
364    }
365
366    /// Clear the submission event's [flags](Flags).
367    #[inline]
368    pub fn clear_flags(mut self) -> Entry {
369        self.0.flags = 0;
370        self
371    }
372
373    /// Set the user data. This is an application-supplied value that will be passed straight
374    /// through into the [completion queue entry](crate::cqueue::Entry::user_data).
375    #[inline]
376    pub fn user_data(mut self, user_data: u64) -> Entry {
377        self.0.user_data = user_data;
378        self
379    }
380
381    /// Set the user_data without consuming the entry.
382    #[inline]
383    pub fn set_user_data(&mut self, user_data: u64) {
384        self.0.user_data = user_data;
385    }
386
387    /// Get the previously application-supplied user data.
388    #[inline]
389    pub fn get_user_data(&self) -> u64 {
390        self.0.user_data
391    }
392
393    /// Get the opcode associated with this entry.
394    #[inline]
395    pub fn get_opcode(&self) -> u32 {
396        self.0.opcode.into()
397    }
398
399    /// Set the personality of this event. You can obtain a personality using
400    /// [`Submitter::register_personality`](crate::Submitter::register_personality).
401    pub fn personality(mut self, personality: u16) -> Entry {
402        self.0.personality = personality;
403        self
404    }
405}
406
407impl private::Sealed for Entry {}
408
409impl EntryMarker for Entry {
410    const BUILD_FLAGS: u32 = 0;
411}
412
413impl Clone for Entry {
414    #[inline(always)]
415    fn clone(&self) -> Entry {
416        // io_uring_sqe doesn't implement Clone due to the 'cmd' incomplete array field.
417        Entry(unsafe { mem::transmute_copy(&self.0) })
418    }
419}
420
421impl Debug for Entry {
422    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
423        f.debug_struct("Entry")
424            .field("op_code", &self.0.opcode)
425            .field("flags", &self.0.flags)
426            .field("user_data", &self.0.user_data)
427            .finish()
428    }
429}
430
431impl Entry128 {
432    /// Set the submission event's [flags](Flags).
433    #[inline]
434    pub fn flags(mut self, flags: Flags) -> Entry128 {
435        self.0 .0.flags |= flags.bits();
436        self
437    }
438
439    /// Clear the submission event's [flags](Flags).
440    #[inline]
441    pub fn clear_flags(mut self) -> Entry128 {
442        self.0 .0.flags = 0;
443        self
444    }
445
446    /// Set the user data. This is an application-supplied value that will be passed straight
447    /// through into the [completion queue entry](crate::cqueue::Entry::user_data).
448    #[inline]
449    pub fn user_data(mut self, user_data: u64) -> Entry128 {
450        self.0 .0.user_data = user_data;
451        self
452    }
453
454    /// Set the user data without consuming the entry.
455    #[inline]
456    pub fn set_user_data(&mut self, user_data: u64) {
457        self.0 .0.user_data = user_data;
458    }
459
460    /// Set the personality of this event. You can obtain a personality using
461    /// [`Submitter::register_personality`](crate::Submitter::register_personality).
462    #[inline]
463    pub fn personality(mut self, personality: u16) -> Entry128 {
464        self.0 .0.personality = personality;
465        self
466    }
467
468    /// Get the opcode associated with this entry.
469    #[inline]
470    pub fn get_opcode(&self) -> u32 {
471        self.0 .0.opcode.into()
472    }
473}
474
475impl private::Sealed for Entry128 {}
476
477impl EntryMarker for Entry128 {
478    const BUILD_FLAGS: u32 = sys::IORING_SETUP_SQE128;
479}
480
481impl From<Entry> for Entry128 {
482    fn from(entry: Entry) -> Entry128 {
483        Entry128(entry, [0u8; 64])
484    }
485}
486
487impl Debug for Entry128 {
488    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
489        f.debug_struct("Entry128")
490            .field("op_code", &self.0 .0.opcode)
491            .field("flags", &self.0 .0.flags)
492            .field("user_data", &self.0 .0.user_data)
493            .finish()
494    }
495}
496
497/// An error pushing to the submission queue due to it being full.
498#[derive(Debug, Clone, PartialEq, Eq)]
499#[non_exhaustive]
500pub struct PushError;
501
502impl Display for PushError {
503    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
504        f.write_str("submission queue is full")
505    }
506}
507
508impl Error for PushError {}
509
510impl<E: EntryMarker> Debug for SubmissionQueue<'_, E> {
511    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
512        let mut d = f.debug_list();
513        let mut pos = self.head;
514        while pos != self.tail {
515            let entry: &E = unsafe { &*self.queue.sqes.add((pos & self.queue.ring_mask) as usize) };
516            d.entry(&entry);
517            pos = pos.wrapping_add(1);
518        }
519        d.finish()
520    }
521}