rustix_uring/squeue.rs
1//! Submission Queue
2
3use core::fmt::{self, Debug, Display, Formatter};
4use core::mem;
5use core::sync::atomic;
6#[cfg(feature = "std")]
7use std::error::Error;
8
9use crate::sys;
10use crate::util::{private, unsync_load, Mmap};
11
12use bitflags::bitflags;
13
14pub(crate) struct Inner<E: EntryMarker> {
15 pub(crate) head: *const atomic::AtomicU32,
16 pub(crate) tail: *const atomic::AtomicU32,
17 pub(crate) ring_mask: u32,
18 pub(crate) ring_entries: u32,
19 pub(crate) flags: *const atomic::AtomicU32,
20 dropped: *const atomic::AtomicU32,
21
22 pub(crate) sqes: *mut E,
23}
24
25/// An io_uring instance's submission queue. This is used to send I/O requests to the kernel.
26pub struct SubmissionQueue<'a, E: EntryMarker = Entry> {
27 head: u32,
28 tail: u32,
29 queue: &'a Inner<E>,
30}
31
32/// A submission queue entry (SQE), representing a request for an I/O operation.
33///
34/// This is implemented for [`Entry`] and [`Entry128`].
35pub trait EntryMarker: Clone + Debug + From<Entry> + private::Sealed {
36 const BUILD_FLAGS: sys::IoringSetupFlags;
37}
38
39/// A 64-byte submission queue entry (SQE), representing a request for an I/O operation.
40///
41/// These can be created via opcodes in [`opcode`](crate::opcode).
42#[repr(C)]
43pub struct Entry(pub(crate) sys::io_uring_sqe);
44
45/// A 128-byte submission queue entry (SQE), representing a request for an I/O operation.
46///
47/// These can be created via opcodes in [`opcode`](crate::opcode).
48#[repr(C)]
49#[derive(Clone)]
50pub struct Entry128(pub(crate) Entry, pub(crate) [u8; 64]);
51
52#[test]
53fn test_entry_sizes() {
54 assert_eq!(size_of::<Entry>(), 64);
55 assert_eq!(size_of::<Entry128>(), 128);
56}
57
58bitflags! {
59 /// Submission flags
60 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
61 pub struct Flags: u8 {
62 /// When this flag is specified,
63 /// `fd` is an index into the files array registered with the io_uring instance.
64 #[doc(hidden)]
65 const FIXED_FILE = sys::IoringSqeFlags::FIXED_FILE.bits();
66
67 /// When this flag is specified,
68 /// the SQE will not be started before previously submitted SQEs have completed,
69 /// and new SQEs will not be started before this one completes.
70 const IO_DRAIN = sys::IoringSqeFlags::IO_DRAIN.bits();
71
72 /// When this flag is specified,
73 /// it forms a link with the next SQE in the submission ring.
74 /// That next SQE will not be started before this one completes.
75 const IO_LINK = sys::IoringSqeFlags::IO_LINK.bits();
76
77 /// Like [`IO_LINK`](Self::IO_LINK), but it doesn’t sever regardless of the completion
78 /// result.
79 const IO_HARDLINK = sys::IoringSqeFlags::IO_HARDLINK.bits();
80
81 /// Normal operation for io_uring is to try and issue an sqe as non-blocking first,
82 /// and if that fails, execute it in an async manner.
83 ///
84 /// To support more efficient overlapped operation of requests
85 /// that the application knows/assumes will always (or most of the time) block,
86 /// the application can ask for an sqe to be issued async from the start.
87 const ASYNC = sys::IoringSqeFlags::ASYNC.bits();
88
89 /// Conceptually the kernel holds a set of buffers organized into groups. When you issue a
90 /// request with this flag and set `buf_group` to a valid buffer group ID (e.g.
91 /// [`buf_group` on `Read`](crate::opcode::Read::buf_group)) then once the file descriptor
92 /// becomes ready the kernel will try to take a buffer from the group.
93 ///
94 /// If there are no buffers in the group, your request will fail with `-ENOBUFS`. Otherwise,
95 /// the corresponding [`cqueue::Entry::flags`](crate::cqueue::Entry::flags) will contain the
96 /// chosen buffer ID, encoded with:
97 ///
98 /// ```text
99 /// (buffer_id << IORING_CQE_BUFFER_SHIFT) | IORING_CQE_F_BUFFER
100 /// ```
101 ///
102 /// You can use [`buffer_select`](crate::cqueue::buffer_select) to take the buffer ID.
103 ///
104 /// The buffer will then be removed from the group and won't be usable by other requests
105 /// anymore.
106 ///
107 /// You can provide new buffers in a group with
108 /// [`ProvideBuffers`](crate::opcode::ProvideBuffers).
109 ///
110 /// See also [the LWN thread on automatic buffer
111 /// selection](https://lwn.net/Articles/815491/).
112 const BUFFER_SELECT = sys::IoringSqeFlags::BUFFER_SELECT.bits();
113
114 /// Don't post CQE if request succeeded.
115 const SKIP_SUCCESS = sys::IoringSqeFlags::CQE_SKIP_SUCCESS.bits();
116 }
117}
118
119impl<E: EntryMarker> Inner<E> {
120 #[rustfmt::skip]
121 pub(crate) unsafe fn new(
122 sq_mmap: &Mmap,
123 sqe_mmap: &Mmap,
124 p: &sys::io_uring_params,
125 ) -> Self {
126 let head = sq_mmap.offset(p.sq_off.head ) as *const atomic::AtomicU32;
127 let tail = sq_mmap.offset(p.sq_off.tail ) as *const atomic::AtomicU32;
128 let ring_mask = sq_mmap.offset(p.sq_off.ring_mask ).cast::<u32>().read();
129 let ring_entries = sq_mmap.offset(p.sq_off.ring_entries).cast::<u32>().read();
130 let flags = sq_mmap.offset(p.sq_off.flags ) as *const atomic::AtomicU32;
131 let dropped = sq_mmap.offset(p.sq_off.dropped ) as *const atomic::AtomicU32;
132 let array = sq_mmap.offset(p.sq_off.array ) as *mut u32;
133
134 let sqes = sqe_mmap.as_mut_ptr() as *mut E;
135
136 // To keep it simple, map it directly to `sqes`.
137 for i in 0..ring_entries {
138 array.add(i as usize).write_volatile(i);
139 }
140
141 Self {
142 head,
143 tail,
144 ring_mask,
145 ring_entries,
146 flags,
147 dropped,
148 sqes,
149 }
150 }
151
152 #[inline]
153 pub(crate) unsafe fn borrow_shared(&self) -> SubmissionQueue<'_, E> {
154 SubmissionQueue {
155 head: (*self.head).load(atomic::Ordering::Acquire),
156 tail: unsync_load(self.tail),
157 queue: self,
158 }
159 }
160
161 #[inline]
162 pub(crate) fn borrow(&mut self) -> SubmissionQueue<'_, E> {
163 unsafe { self.borrow_shared() }
164 }
165}
166
167impl<E: EntryMarker> SubmissionQueue<'_, E> {
168 /// Synchronize this type with the real submission queue.
169 ///
170 /// This will flush any entries added by [`push`](Self::push) or
171 /// [`push_multiple`](Self::push_multiple) and will update the queue's length if the kernel has
172 /// consumed some entries in the meantime.
173 #[inline]
174 pub fn sync(&mut self) {
175 unsafe {
176 (*self.queue.tail).store(self.tail, atomic::Ordering::Release);
177 self.head = (*self.queue.head).load(atomic::Ordering::Acquire);
178 }
179 }
180
181 /// When [`is_setup_sqpoll`](crate::Parameters::is_setup_sqpoll) is set, whether the kernel
182 /// threads has gone to sleep and requires a system call to wake it up.
183 ///
184 /// A result of `false` is only meaningful if the function was called after the latest update
185 /// to the queue head. Other interpretations could lead to a race condition where the kernel
186 /// concurrently put the device to sleep and no further progress is made.
187 #[inline]
188 pub fn need_wakeup(&self) -> bool {
189 // See discussions that happened in [#197] and its linked threads in liburing. We need to
190 // ensure that writes to the head have been visible _to the kernel_ if this load results in
191 // decision to sleep. This is solved with a SeqCst fence. There is no common modified
192 // memory location that would provide alternative synchronization.
193 //
194 // The kernel, from its sequencing, first writes the wake flag, then performs a full
195 // barrier (`smp_mb`, or `smp_mb__after_atomic`), then reads the head. We assume that our
196 // user first writes the head and then reads the `need_wakeup` flag as documented. It is
197 // necessary to ensure that at least one observes the other write. By establishing a point
198 // of sequential consistency on both sides between their respective write and read, at
199 // least one coherency order holds. With regards to the interpretation of the atomic memory
200 // model of Rust (that is, that of C++20) we're assuming that an `smp_mb` provides at least
201 // the effect of a `fence(SeqCst)`.
202 //
203 // [#197]: https://github.com/tokio-rs/io-uring/issues/197
204 atomic::fence(atomic::Ordering::SeqCst);
205 unsafe {
206 sys::IoringSqFlags::from_bits_retain(
207 (*self.queue.flags).load(atomic::Ordering::Relaxed),
208 )
209 .contains(sys::IoringSqFlags::NEED_WAKEUP)
210 }
211 }
212
213 /// The effect of [`Self::need_wakeup`], after synchronization work performed by the caller.
214 ///
215 /// This function should only be called if the caller can guarantee that a `SeqCst` fence has
216 /// been inserted after the last write to the queue's head. The function is then a little more
217 /// efficient by avoiding to perform one itself.
218 ///
219 /// Failure to uphold the precondition can result in an effective dead-lock due to a sleeping
220 /// device.
221 #[inline]
222 pub fn need_wakeup_after_intermittent_seqcst(&self) -> bool {
223 unsafe {
224 sys::IoringSqFlags::from_bits_retain(
225 (*self.queue.flags).load(atomic::Ordering::Relaxed),
226 )
227 .contains(sys::IoringSqFlags::NEED_WAKEUP)
228 }
229 }
230
231 /// The number of invalid submission queue entries that have been encountered in the ring
232 /// buffer.
233 pub fn dropped(&self) -> u32 {
234 unsafe { (*self.queue.dropped).load(atomic::Ordering::Acquire) }
235 }
236
237 /// Returns `true` if the completion queue ring is overflown.
238 pub fn cq_overflow(&self) -> bool {
239 unsafe {
240 sys::IoringSqFlags::from_bits_retain(
241 (*self.queue.flags).load(atomic::Ordering::Acquire),
242 )
243 .contains(sys::IoringSqFlags::CQ_OVERFLOW)
244 }
245 }
246
247 /// Returns `true` if completions are pending that should be processed. Only relevant when used
248 /// in conjuction with the `setup_taskrun_flag` function. Available since 5.19.
249 pub fn taskrun(&self) -> bool {
250 unsafe {
251 (*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IoringSqFlags::TASKRUN.bits()
252 != 0
253 }
254 }
255
256 /// Get the total number of entries in the submission queue ring buffer.
257 #[inline]
258 pub fn capacity(&self) -> usize {
259 self.queue.ring_entries as usize
260 }
261
262 /// Get the number of submission queue events in the ring buffer.
263 #[inline]
264 pub fn len(&self) -> usize {
265 self.tail.wrapping_sub(self.head) as usize
266 }
267
268 /// Returns `true` if the submission queue ring buffer is empty.
269 #[inline]
270 pub fn is_empty(&self) -> bool {
271 self.len() == 0
272 }
273
274 /// Returns `true` if the submission queue ring buffer has reached capacity, and no more events
275 /// can be added before the kernel consumes some.
276 #[inline]
277 pub fn is_full(&self) -> bool {
278 self.len() == self.capacity()
279 }
280
281 /// Attempts to push an entry into the queue.
282 /// If the queue is full, an error is returned.
283 ///
284 /// # Safety
285 ///
286 /// Developers must ensure that parameters of the entry (such as buffer) are valid and will
287 /// be valid for the entire duration of the operation, otherwise it may cause memory problems.
288 #[inline]
289 pub unsafe fn push(&mut self, entry: &E) -> Result<(), PushError> {
290 if !self.is_full() {
291 self.push_unchecked(entry);
292 Ok(())
293 } else {
294 Err(PushError)
295 }
296 }
297
298 /// Attempts to push several entries into the queue.
299 /// If the queue does not have space for all of the entries, an error is returned.
300 ///
301 /// # Safety
302 ///
303 /// Developers must ensure that parameters of all the entries (such as buffer) are valid and
304 /// will be valid for the entire duration of the operation, otherwise it may cause memory
305 /// problems.
306 #[inline]
307 pub unsafe fn push_multiple(&mut self, entries: &[E]) -> Result<(), PushError> {
308 if self.capacity() - self.len() < entries.len() {
309 return Err(PushError);
310 }
311
312 for entry in entries {
313 self.push_unchecked(entry);
314 }
315
316 Ok(())
317 }
318
319 #[inline]
320 unsafe fn push_unchecked(&mut self, entry: &E) {
321 *self
322 .queue
323 .sqes
324 .add((self.tail & self.queue.ring_mask) as usize) = entry.clone();
325 self.tail = self.tail.wrapping_add(1);
326 }
327}
328
329impl<E: EntryMarker> Drop for SubmissionQueue<'_, E> {
330 #[inline]
331 fn drop(&mut self) {
332 unsafe { &*self.queue.tail }.store(self.tail, atomic::Ordering::Release);
333 }
334}
335
336impl Entry {
337 /// Set the submission event's [flags](Flags).
338 #[inline]
339 pub fn flags(mut self, flags: Flags) -> Entry {
340 self.0.flags |= sys::IoringSqeFlags::from_bits(flags.bits()).unwrap();
341 self
342 }
343
344 /// Set the user data. This is an application-supplied value that will be passed straight
345 /// through into the [completion queue entry](crate::cqueue::Entry::user_data).
346 #[inline]
347 pub fn user_data(mut self, user_data: impl Into<sys::io_uring_user_data>) -> Entry {
348 self.0.user_data = user_data.into();
349 self
350 }
351
352 /// Get the previously application-supplied user data.
353 #[inline]
354 pub fn get_user_data(&self) -> sys::io_uring_user_data {
355 self.0.user_data
356 }
357
358 /// Set the personality of this event. You can obtain a personality using
359 /// [`Submitter::register_personality`](crate::Submitter::register_personality).
360 #[inline]
361 pub fn personality(mut self, personality: u16) -> Entry {
362 self.0.personality = personality;
363 self
364 }
365}
366
367impl private::Sealed for Entry {}
368
369impl EntryMarker for Entry {
370 const BUILD_FLAGS: sys::IoringSetupFlags = sys::IoringSetupFlags::empty();
371}
372
373impl Clone for Entry {
374 #[inline(always)]
375 fn clone(&self) -> Entry {
376 // io_uring_sqe doesn't implement Clone due to the 'cmd' incomplete array field.
377 Entry(unsafe { mem::transmute_copy(&self.0) })
378 }
379}
380
381impl Debug for Entry {
382 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
383 f.debug_struct("Entry")
384 .field("op_code", &self.0.opcode)
385 .field("flags", &self.0.flags)
386 .field("user_data", &self.0.user_data)
387 .finish()
388 }
389}
390
391impl Entry128 {
392 /// Set the submission event's [flags](Flags).
393 #[inline]
394 pub fn flags(mut self, flags: Flags) -> Entry128 {
395 self.0 .0.flags |= sys::IoringSqeFlags::from_bits(flags.bits()).unwrap();
396 self
397 }
398
399 /// Set the user data. This is an application-supplied value that will be passed straight
400 /// through into the [completion queue entry](crate::cqueue::Entry::user_data).
401 #[inline]
402 pub fn user_data(mut self, user_data: impl Into<sys::io_uring_user_data>) -> Entry128 {
403 self.0 .0.user_data = user_data.into();
404 self
405 }
406
407 /// Get the previously application-supplied user data.
408 #[inline]
409 pub fn get_user_data(&self) -> sys::io_uring_user_data {
410 self.0 .0.user_data
411 }
412
413 /// Set the personality of this event. You can obtain a personality using
414 /// [`Submitter::register_personality`](crate::Submitter::register_personality).
415 #[inline]
416 pub fn personality(mut self, personality: u16) -> Entry128 {
417 self.0 .0.personality = personality;
418 self
419 }
420}
421
422impl private::Sealed for Entry128 {}
423
424impl EntryMarker for Entry128 {
425 const BUILD_FLAGS: sys::IoringSetupFlags = sys::IoringSetupFlags::SQE128;
426}
427
428impl From<Entry> for Entry128 {
429 fn from(entry: Entry) -> Entry128 {
430 Entry128(entry, [0u8; 64])
431 }
432}
433
434impl Debug for Entry128 {
435 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
436 f.debug_struct("Entry128")
437 .field("op_code", &self.0 .0.opcode)
438 .field("flags", &self.0 .0.flags)
439 .field("user_data", &self.0 .0.user_data)
440 .finish()
441 }
442}
443
444/// An error pushing to the submission queue due to it being full.
445#[derive(Debug, Clone, PartialEq, Eq)]
446#[non_exhaustive]
447pub struct PushError;
448
449impl Display for PushError {
450 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
451 f.write_str("submission queue is full")
452 }
453}
454
455#[cfg(feature = "std")]
456impl Error for PushError {}
457
458impl<E: EntryMarker> Debug for SubmissionQueue<'_, E> {
459 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
460 let mut d = f.debug_list();
461 let mut pos = self.head;
462 while pos != self.tail {
463 let entry: &E = unsafe { &*self.queue.sqes.add((pos & self.queue.ring_mask) as usize) };
464 d.entry(&entry);
465 pos = pos.wrapping_add(1);
466 }
467 d.finish()
468 }
469}