rustix_uring/squeue.rs
1//! Submission Queue
2
3use core::error::Error;
4use core::fmt::{self, Debug, Display, Formatter};
5use core::mem;
6use core::sync::atomic;
7
8use crate::sys;
9use crate::util::{private, unsync_load, Mmap};
10
11use bitflags::bitflags;
12
13pub(crate) struct Inner<E: EntryMarker> {
14 pub(crate) head: *const atomic::AtomicU32,
15 pub(crate) tail: *const atomic::AtomicU32,
16 pub(crate) ring_mask: u32,
17 pub(crate) ring_entries: u32,
18 pub(crate) flags: *const atomic::AtomicU32,
19 dropped: *const atomic::AtomicU32,
20
21 pub(crate) sqes: *mut E,
22}
23
24/// An io_uring instance's submission queue. This is used to send I/O requests to the kernel.
25pub struct SubmissionQueue<'a, E: EntryMarker = Entry> {
26 head: u32,
27 tail: u32,
28 queue: &'a Inner<E>,
29}
30
31/// A submission queue entry (SQE), representing a request for an I/O operation.
32///
33/// This is implemented for [`Entry`] and [`Entry128`].
34pub trait EntryMarker: Clone + Debug + From<Entry> + private::Sealed {
35 const BUILD_FLAGS: sys::IoringSetupFlags;
36}
37
38/// A 64-byte submission queue entry (SQE), representing a request for an I/O operation.
39///
40/// These can be created via opcodes in [`opcode`](crate::opcode).
41#[repr(C)]
42pub struct Entry(pub(crate) sys::io_uring_sqe);
43
44/// A 128-byte submission queue entry (SQE), representing a request for an I/O operation.
45///
46/// These can be created via opcodes in [`opcode`](crate::opcode).
47#[repr(C)]
48#[derive(Clone)]
49pub struct Entry128(pub(crate) Entry, pub(crate) [u8; 64]);
50
51#[test]
52fn test_entry_sizes() {
53 assert_eq!(size_of::<Entry>(), 64);
54 assert_eq!(size_of::<Entry128>(), 128);
55}
56
57bitflags! {
58 /// Submission flags
59 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
60 pub struct Flags: u8 {
61 /// When this flag is specified,
62 /// `fd` is an index into the files array registered with the io_uring instance.
63 #[doc(hidden)]
64 const FIXED_FILE = sys::IoringSqeFlags::FIXED_FILE.bits();
65
66 /// When this flag is specified,
67 /// the SQE will not be started before previously submitted SQEs have completed,
68 /// and new SQEs will not be started before this one completes.
69 const IO_DRAIN = sys::IoringSqeFlags::IO_DRAIN.bits();
70
71 /// When this flag is specified,
72 /// it forms a link with the next SQE in the submission ring.
73 /// That next SQE will not be started before this one completes.
74 const IO_LINK = sys::IoringSqeFlags::IO_LINK.bits();
75
76 /// Like [`IO_LINK`](Self::IO_LINK), but it doesn’t sever regardless of the completion
77 /// result.
78 const IO_HARDLINK = sys::IoringSqeFlags::IO_HARDLINK.bits();
79
80 /// Normal operation for io_uring is to try and issue an sqe as non-blocking first,
81 /// and if that fails, execute it in an async manner.
82 ///
83 /// To support more efficient overlapped operation of requests
84 /// that the application knows/assumes will always (or most of the time) block,
85 /// the application can ask for an sqe to be issued async from the start.
86 const ASYNC = sys::IoringSqeFlags::ASYNC.bits();
87
88 /// Conceptually the kernel holds a set of buffers organized into groups. When you issue a
89 /// request with this flag and set `buf_group` to a valid buffer group ID (e.g.
90 /// [`buf_group` on `Read`](crate::opcode::Read::buf_group)) then once the file descriptor
91 /// becomes ready the kernel will try to take a buffer from the group.
92 ///
93 /// If there are no buffers in the group, your request will fail with `-ENOBUFS`. Otherwise,
94 /// the corresponding [`cqueue::Entry::flags`](crate::cqueue::Entry::flags) will contain the
95 /// chosen buffer ID, encoded with:
96 ///
97 /// ```text
98 /// (buffer_id << IORING_CQE_BUFFER_SHIFT) | IORING_CQE_F_BUFFER
99 /// ```
100 ///
101 /// You can use [`buffer_select`](crate::cqueue::buffer_select) to take the buffer ID.
102 ///
103 /// The buffer will then be removed from the group and won't be usable by other requests
104 /// anymore.
105 ///
106 /// You can provide new buffers in a group with
107 /// [`ProvideBuffers`](crate::opcode::ProvideBuffers).
108 ///
109 /// See also [the LWN thread on automatic buffer
110 /// selection](https://lwn.net/Articles/815491/).
111 const BUFFER_SELECT = sys::IoringSqeFlags::BUFFER_SELECT.bits();
112
113 /// Don't post CQE if request succeeded.
114 const SKIP_SUCCESS = sys::IoringSqeFlags::CQE_SKIP_SUCCESS.bits();
115 }
116}
117
118impl<E: EntryMarker> Inner<E> {
119 #[rustfmt::skip]
120 pub(crate) unsafe fn new(
121 sq_mmap: &Mmap,
122 sqe_mmap: &Mmap,
123 p: &sys::io_uring_params,
124 ) -> Self {
125 let head = sq_mmap.offset(p.sq_off.head ) as *const atomic::AtomicU32;
126 let tail = sq_mmap.offset(p.sq_off.tail ) as *const atomic::AtomicU32;
127 let ring_mask = sq_mmap.offset(p.sq_off.ring_mask ).cast::<u32>().read();
128 let ring_entries = sq_mmap.offset(p.sq_off.ring_entries).cast::<u32>().read();
129 let flags = sq_mmap.offset(p.sq_off.flags ) as *const atomic::AtomicU32;
130 let dropped = sq_mmap.offset(p.sq_off.dropped ) as *const atomic::AtomicU32;
131 let array = sq_mmap.offset(p.sq_off.array ) as *mut u32;
132
133 let sqes = sqe_mmap.as_mut_ptr() as *mut E;
134
135 // To keep it simple, map it directly to `sqes`.
136 for i in 0..ring_entries {
137 array.add(i as usize).write_volatile(i);
138 }
139
140 Self {
141 head,
142 tail,
143 ring_mask,
144 ring_entries,
145 flags,
146 dropped,
147 sqes,
148 }
149 }
150
151 #[inline]
152 pub(crate) unsafe fn borrow_shared(&self) -> SubmissionQueue<'_, E> {
153 SubmissionQueue {
154 head: (*self.head).load(atomic::Ordering::Acquire),
155 tail: unsync_load(self.tail),
156 queue: self,
157 }
158 }
159
160 #[inline]
161 pub(crate) fn borrow(&mut self) -> SubmissionQueue<'_, E> {
162 unsafe { self.borrow_shared() }
163 }
164}
165
166impl<E: EntryMarker> SubmissionQueue<'_, E> {
167 /// Synchronize this type with the real submission queue.
168 ///
169 /// This will flush any entries added by [`push`](Self::push) or
170 /// [`push_multiple`](Self::push_multiple) and will update the queue's length if the kernel has
171 /// consumed some entries in the meantime.
172 #[inline]
173 pub fn sync(&mut self) {
174 unsafe {
175 (*self.queue.tail).store(self.tail, atomic::Ordering::Release);
176 self.head = (*self.queue.head).load(atomic::Ordering::Acquire);
177 }
178 }
179
180 /// When [`is_setup_sqpoll`](crate::Parameters::is_setup_sqpoll) is set, whether the kernel
181 /// threads has gone to sleep and requires a system call to wake it up.
182 ///
183 /// A result of `false` is only meaningful if the function was called after the latest update
184 /// to the queue head. Other interpretations could lead to a race condition where the kernel
185 /// concurrently put the device to sleep and no further progress is made.
186 #[inline]
187 pub fn need_wakeup(&self) -> bool {
188 // See discussions that happened in [#197] and its linked threads in liburing. We need to
189 // ensure that writes to the head have been visible _to the kernel_ if this load results in
190 // decision to sleep. This is solved with a SeqCst fence. There is no common modified
191 // memory location that would provide alternative synchronization.
192 //
193 // The kernel, from its sequencing, first writes the wake flag, then performs a full
194 // barrier (`smp_mb`, or `smp_mb__after_atomic`), then reads the head. We assume that our
195 // user first writes the head and then reads the `need_wakeup` flag as documented. It is
196 // necessary to ensure that at least one observes the other write. By establishing a point
197 // of sequential consistency on both sides between their respective write and read, at
198 // least one coherency order holds. With regards to the interpretation of the atomic memory
199 // model of Rust (that is, that of C++20) we're assuming that an `smp_mb` provides at least
200 // the effect of a `fence(SeqCst)`.
201 //
202 // [#197]: https://github.com/tokio-rs/io-uring/issues/197
203 atomic::fence(atomic::Ordering::SeqCst);
204 unsafe {
205 sys::IoringSqFlags::from_bits_retain(
206 (*self.queue.flags).load(atomic::Ordering::Relaxed),
207 )
208 .contains(sys::IoringSqFlags::NEED_WAKEUP)
209 }
210 }
211
212 /// The effect of [`Self::need_wakeup`], after synchronization work performed by the caller.
213 ///
214 /// This function should only be called if the caller can guarantee that a `SeqCst` fence has
215 /// been inserted after the last write to the queue's head. The function is then a little more
216 /// efficient by avoiding to perform one itself.
217 ///
218 /// Failure to uphold the precondition can result in an effective dead-lock due to a sleeping
219 /// device.
220 #[inline]
221 pub fn need_wakeup_after_intermittent_seqcst(&self) -> bool {
222 unsafe {
223 sys::IoringSqFlags::from_bits_retain(
224 (*self.queue.flags).load(atomic::Ordering::Relaxed),
225 )
226 .contains(sys::IoringSqFlags::NEED_WAKEUP)
227 }
228 }
229
230 /// The number of invalid submission queue entries that have been encountered in the ring
231 /// buffer.
232 pub fn dropped(&self) -> u32 {
233 unsafe { (*self.queue.dropped).load(atomic::Ordering::Acquire) }
234 }
235
236 /// Returns `true` if the completion queue ring is overflown.
237 pub fn cq_overflow(&self) -> bool {
238 unsafe {
239 sys::IoringSqFlags::from_bits_retain(
240 (*self.queue.flags).load(atomic::Ordering::Acquire),
241 )
242 .contains(sys::IoringSqFlags::CQ_OVERFLOW)
243 }
244 }
245
246 /// Returns `true` if completions are pending that should be processed. Only relevant when used
247 /// in conjuction with the `setup_taskrun_flag` function. Available since 5.19.
248 pub fn taskrun(&self) -> bool {
249 unsafe {
250 (*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IoringSqFlags::TASKRUN.bits()
251 != 0
252 }
253 }
254
255 /// Get the total number of entries in the submission queue ring buffer.
256 #[inline]
257 pub fn capacity(&self) -> usize {
258 self.queue.ring_entries as usize
259 }
260
261 /// Get the number of submission queue events in the ring buffer.
262 #[inline]
263 pub fn len(&self) -> usize {
264 self.tail.wrapping_sub(self.head) as usize
265 }
266
267 /// Returns `true` if the submission queue ring buffer is empty.
268 #[inline]
269 pub fn is_empty(&self) -> bool {
270 self.len() == 0
271 }
272
273 /// Returns `true` if the submission queue ring buffer has reached capacity, and no more events
274 /// can be added before the kernel consumes some.
275 #[inline]
276 pub fn is_full(&self) -> bool {
277 self.len() == self.capacity()
278 }
279
280 /// Attempts to push an entry into the queue.
281 /// If the queue is full, an error is returned.
282 ///
283 /// # Safety
284 ///
285 /// Developers must ensure that parameters of the entry (such as buffer) are valid and will
286 /// be valid for the entire duration of the operation, otherwise it may cause memory problems.
287 #[inline]
288 pub unsafe fn push(&mut self, entry: &E) -> Result<(), PushError> {
289 if !self.is_full() {
290 self.push_unchecked(entry);
291 Ok(())
292 } else {
293 Err(PushError)
294 }
295 }
296
297 /// Attempts to push several entries into the queue.
298 /// If the queue does not have space for all of the entries, an error is returned.
299 ///
300 /// # Safety
301 ///
302 /// Developers must ensure that parameters of all the entries (such as buffer) are valid and
303 /// will be valid for the entire duration of the operation, otherwise it may cause memory
304 /// problems.
305 #[inline]
306 pub unsafe fn push_multiple(&mut self, entries: &[E]) -> Result<(), PushError> {
307 if self.capacity() - self.len() < entries.len() {
308 return Err(PushError);
309 }
310
311 for entry in entries {
312 self.push_unchecked(entry);
313 }
314
315 Ok(())
316 }
317
318 #[inline]
319 unsafe fn push_unchecked(&mut self, entry: &E) {
320 *self
321 .queue
322 .sqes
323 .add((self.tail & self.queue.ring_mask) as usize) = entry.clone();
324 self.tail = self.tail.wrapping_add(1);
325 }
326}
327
328impl<E: EntryMarker> Drop for SubmissionQueue<'_, E> {
329 #[inline]
330 fn drop(&mut self) {
331 unsafe { &*self.queue.tail }.store(self.tail, atomic::Ordering::Release);
332 }
333}
334
335impl Entry {
336 /// Set the submission event's [flags](Flags).
337 #[inline]
338 pub fn flags(mut self, flags: Flags) -> Entry {
339 self.0.flags |= sys::IoringSqeFlags::from_bits(flags.bits()).unwrap();
340 self
341 }
342
343 /// Set the user data. This is an application-supplied value that will be passed straight
344 /// through into the [completion queue entry](crate::cqueue::Entry::user_data).
345 #[inline]
346 pub fn user_data(mut self, user_data: impl Into<sys::io_uring_user_data>) -> Entry {
347 self.0.user_data = user_data.into();
348 self
349 }
350
351 /// Get the previously application-supplied user data.
352 #[inline]
353 pub fn get_user_data(&self) -> sys::io_uring_user_data {
354 self.0.user_data
355 }
356
357 /// Set the personality of this event. You can obtain a personality using
358 /// [`Submitter::register_personality`](crate::Submitter::register_personality).
359 #[inline]
360 pub fn personality(mut self, personality: u16) -> Entry {
361 self.0.personality = personality;
362 self
363 }
364}
365
366impl private::Sealed for Entry {}
367
368impl EntryMarker for Entry {
369 const BUILD_FLAGS: sys::IoringSetupFlags = sys::IoringSetupFlags::empty();
370}
371
372impl Clone for Entry {
373 #[inline(always)]
374 fn clone(&self) -> Entry {
375 // io_uring_sqe doesn't implement Clone due to the 'cmd' incomplete array field.
376 Entry(unsafe { mem::transmute_copy(&self.0) })
377 }
378}
379
380impl Debug for Entry {
381 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
382 f.debug_struct("Entry")
383 .field("op_code", &self.0.opcode)
384 .field("flags", &self.0.flags)
385 .field("user_data", &self.0.user_data)
386 .finish()
387 }
388}
389
390impl Entry128 {
391 /// Set the submission event's [flags](Flags).
392 #[inline]
393 pub fn flags(mut self, flags: Flags) -> Entry128 {
394 self.0 .0.flags |= sys::IoringSqeFlags::from_bits(flags.bits()).unwrap();
395 self
396 }
397
398 /// Set the user data. This is an application-supplied value that will be passed straight
399 /// through into the [completion queue entry](crate::cqueue::Entry::user_data).
400 #[inline]
401 pub fn user_data(mut self, user_data: impl Into<sys::io_uring_user_data>) -> Entry128 {
402 self.0 .0.user_data = user_data.into();
403 self
404 }
405
406 /// Get the previously application-supplied user data.
407 #[inline]
408 pub fn get_user_data(&self) -> sys::io_uring_user_data {
409 self.0 .0.user_data
410 }
411
412 /// Set the personality of this event. You can obtain a personality using
413 /// [`Submitter::register_personality`](crate::Submitter::register_personality).
414 #[inline]
415 pub fn personality(mut self, personality: u16) -> Entry128 {
416 self.0 .0.personality = personality;
417 self
418 }
419}
420
421impl private::Sealed for Entry128 {}
422
423impl EntryMarker for Entry128 {
424 const BUILD_FLAGS: sys::IoringSetupFlags = sys::IoringSetupFlags::SQE128;
425}
426
427impl From<Entry> for Entry128 {
428 fn from(entry: Entry) -> Entry128 {
429 Entry128(entry, [0u8; 64])
430 }
431}
432
433impl Debug for Entry128 {
434 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
435 f.debug_struct("Entry128")
436 .field("op_code", &self.0 .0.opcode)
437 .field("flags", &self.0 .0.flags)
438 .field("user_data", &self.0 .0.user_data)
439 .finish()
440 }
441}
442
443/// An error pushing to the submission queue due to it being full.
444#[derive(Debug, Clone, PartialEq, Eq)]
445#[non_exhaustive]
446pub struct PushError;
447
448impl Display for PushError {
449 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
450 f.write_str("submission queue is full")
451 }
452}
453
454impl Error for PushError {}
455
456impl<E: EntryMarker> Debug for SubmissionQueue<'_, E> {
457 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
458 let mut d = f.debug_list();
459 let mut pos = self.head;
460 while pos != self.tail {
461 let entry: &E = unsafe { &*self.queue.sqes.add((pos & self.queue.ring_mask) as usize) };
462 d.entry(&entry);
463 pos = pos.wrapping_add(1);
464 }
465 d.finish()
466 }
467}