io_uring/squeue.rs
1//! Submission Queue
2
3use std::error::Error;
4use std::fmt::{self, Debug, Display, Formatter};
5use std::mem;
6use std::sync::atomic;
7
8use crate::sys;
9use crate::util::{private, unsync_load, Mmap};
10
11use bitflags::bitflags;
12
13pub(crate) struct Inner<E: EntryMarker> {
14 pub(crate) head: *const atomic::AtomicU32,
15 pub(crate) tail: *const atomic::AtomicU32,
16 pub(crate) ring_mask: u32,
17 pub(crate) ring_entries: u32,
18 pub(crate) flags: *const atomic::AtomicU32,
19 dropped: *const atomic::AtomicU32,
20
21 pub(crate) sqes: *mut E,
22}
23
24/// An io_uring instance's submission queue. This is used to send I/O requests to the kernel.
25pub struct SubmissionQueue<'a, E: EntryMarker = Entry> {
26 head: u32,
27 tail: u32,
28 queue: &'a Inner<E>,
29}
30
31/// A submission queue entry (SQE), representing a request for an I/O operation.
32///
33/// This is implemented for [`Entry`] and [`Entry128`].
34pub trait EntryMarker: Clone + Debug + From<Entry> + private::Sealed {
35 const BUILD_FLAGS: u32;
36}
37
38/// A 64-byte submission queue entry (SQE), representing a request for an I/O operation.
39///
40/// These can be created via opcodes in [`opcode`](crate::opcode).
41///
42/// # Example
43///
44/// ```
45/// use io_uring::{opcode, types};
46/// use std::ffi::CString;
47///
48/// let path = CString::new("/etc/passwd").unwrap();
49///
50/// // Create an Entry to open /etc/passwd for reading
51/// let entry = opcode::OpenAt::new(types::Fd(libc::AT_FDCWD), path.as_ptr())
52/// .flags(libc::O_RDONLY)
53/// .build()
54/// .user_data(0x42);
55/// ```
56#[repr(C)]
57pub struct Entry(pub(crate) sys::io_uring_sqe);
58
59/// A 128-byte submission queue entry (SQE), representing a request for an I/O operation.
60///
61/// These can be created via opcodes in [`opcode`](crate::opcode), or by converting
62/// from an [`Entry`] using the [`From`] trait.
63///
64/// # Example
65///
66/// ```
67/// use io_uring::{opcode, squeue::Entry128, types};
68/// use std::ffi::CString;
69///
70/// let path = CString::new("/etc/passwd").unwrap();
71///
72/// // Create an Entry128 to open /etc/passwd for reading
73/// let entry = Entry128::from(
74/// opcode::OpenAt::new(types::Fd(libc::AT_FDCWD), path.as_ptr())
75/// .flags(libc::O_RDONLY)
76/// .build()
77/// .user_data(0x42)
78/// );
79/// ```
80#[repr(C)]
81#[derive(Clone)]
82pub struct Entry128(pub(crate) Entry, pub(crate) [u8; 64]);
83
84#[test]
85fn test_entry_sizes() {
86 assert_eq!(mem::size_of::<Entry>(), 64);
87 assert_eq!(mem::size_of::<Entry128>(), 128);
88}
89
90bitflags! {
91 /// Submission flags
92 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
93 pub struct Flags: u8 {
94 /// When this flag is specified,
95 /// `fd` is an index into the files array registered with the io_uring instance.
96 #[doc(hidden)]
97 const FIXED_FILE = 1 << sys::IOSQE_FIXED_FILE_BIT;
98
99 /// When this flag is specified,
100 /// the SQE will not be started before previously submitted SQEs have completed,
101 /// and new SQEs will not be started before this one completes.
102 const IO_DRAIN = 1 << sys::IOSQE_IO_DRAIN_BIT;
103
104 /// When this flag is specified,
105 /// it forms a link with the next SQE in the submission ring.
106 /// That next SQE will not be started before this one completes.
107 const IO_LINK = 1 << sys::IOSQE_IO_LINK_BIT;
108
109 /// Like [`IO_LINK`](Self::IO_LINK), but it doesn’t sever regardless of the completion
110 /// result.
111 const IO_HARDLINK = 1 << sys::IOSQE_IO_HARDLINK_BIT;
112
113 /// Normal operation for io_uring is to try and issue an sqe as non-blocking first,
114 /// and if that fails, execute it in an async manner.
115 ///
116 /// To support more efficient overlapped operation of requests
117 /// that the application knows/assumes will always (or most of the time) block,
118 /// the application can ask for an sqe to be issued async from the start.
119 const ASYNC = 1 << sys::IOSQE_ASYNC_BIT;
120
121 /// Conceptually the kernel holds a set of buffers organized into groups. When you issue a
122 /// request with this flag and set `buf_group` to a valid buffer group ID (e.g.
123 /// [`buf_group` on `Read`](crate::opcode::Read::buf_group)) then once the file descriptor
124 /// becomes ready the kernel will try to take a buffer from the group.
125 ///
126 /// If there are no buffers in the group, your request will fail with `-ENOBUFS`. Otherwise,
127 /// the corresponding [`cqueue::Entry::flags`](crate::cqueue::Entry::flags) will contain the
128 /// chosen buffer ID, encoded with:
129 ///
130 /// ```text
131 /// (buffer_id << IORING_CQE_BUFFER_SHIFT) | IORING_CQE_F_BUFFER
132 /// ```
133 ///
134 /// You can use [`buffer_select`](crate::cqueue::buffer_select) to take the buffer ID.
135 ///
136 /// The buffer will then be removed from the group and won't be usable by other requests
137 /// anymore.
138 ///
139 /// You can provide new buffers in a group with
140 /// [`ProvideBuffers`](crate::opcode::ProvideBuffers).
141 ///
142 /// See also [the LWN thread on automatic buffer
143 /// selection](https://lwn.net/Articles/815491/).
144 const BUFFER_SELECT = 1 << sys::IOSQE_BUFFER_SELECT_BIT;
145
146 /// Don't post CQE if request succeeded.
147 const SKIP_SUCCESS = 1 << sys::IOSQE_CQE_SKIP_SUCCESS_BIT;
148 }
149}
150
151impl<E: EntryMarker> Inner<E> {
152 #[rustfmt::skip]
153 pub(crate) unsafe fn new(
154 sq_mmap: &Mmap,
155 sqe_mmap: &Mmap,
156 p: &sys::io_uring_params,
157 ) -> Self {
158 let head = sq_mmap.offset(p.sq_off.head ) as *const atomic::AtomicU32;
159 let tail = sq_mmap.offset(p.sq_off.tail ) as *const atomic::AtomicU32;
160 let ring_mask = sq_mmap.offset(p.sq_off.ring_mask ).cast::<u32>().read();
161 let ring_entries = sq_mmap.offset(p.sq_off.ring_entries).cast::<u32>().read();
162 let flags = sq_mmap.offset(p.sq_off.flags ) as *const atomic::AtomicU32;
163 let dropped = sq_mmap.offset(p.sq_off.dropped ) as *const atomic::AtomicU32;
164 let sqes = sqe_mmap.as_mut_ptr() as *mut E;
165
166 // Initialize the SQ array with an identity mapping unless NO_SQARRAY is set, in which case
167 // the kernel consumes SQEs directly by ring index and no array exists.
168 if p.flags & sys::IORING_SETUP_NO_SQARRAY == 0 {
169 let array = sq_mmap.offset(p.sq_off.array) as *mut u32;
170 for i in 0..ring_entries {
171 array.add(i as usize).write_volatile(i);
172 }
173 }
174
175 Self {
176 head,
177 tail,
178 ring_mask,
179 ring_entries,
180 flags,
181 dropped,
182 sqes,
183 }
184 }
185
186 #[inline]
187 pub(crate) unsafe fn borrow_shared(&self) -> SubmissionQueue<'_, E> {
188 SubmissionQueue {
189 head: (*self.head).load(atomic::Ordering::Acquire),
190 tail: unsync_load(self.tail),
191 queue: self,
192 }
193 }
194
195 #[inline]
196 pub(crate) fn borrow(&mut self) -> SubmissionQueue<'_, E> {
197 unsafe { self.borrow_shared() }
198 }
199}
200
201impl<E: EntryMarker> SubmissionQueue<'_, E> {
202 /// Synchronize this type with the real submission queue.
203 ///
204 /// This will flush any entries added by [`push`](Self::push) or
205 /// [`push_multiple`](Self::push_multiple) and will update the queue's length if the kernel has
206 /// consumed some entries in the meantime.
207 #[inline]
208 pub fn sync(&mut self) {
209 unsafe {
210 (*self.queue.tail).store(self.tail, atomic::Ordering::Release);
211 self.head = (*self.queue.head).load(atomic::Ordering::Acquire);
212 }
213 }
214
215 /// When [`is_setup_sqpoll`](crate::Parameters::is_setup_sqpoll) is set, whether the kernel
216 /// threads has gone to sleep and requires a system call to wake it up.
217 ///
218 /// A result of `false` is only meaningful if the function was called after the latest update
219 /// to the queue head. Other interpretations could lead to a race condition where the kernel
220 /// concurrently put the device to sleep and no further progress is made.
221 #[inline]
222 pub fn need_wakeup(&self) -> bool {
223 // See discussions that happened in [#197] and its linked threads in liburing. We need to
224 // ensure that writes to the head have been visible _to the kernel_ if this load results in
225 // decision to sleep. This is solved with a SeqCst fence. There is no common modified
226 // memory location that would provide alternative synchronization.
227 //
228 // The kernel, from its sequencing, first writes the wake flag, then performs a full
229 // barrier (`smp_mb`, or `smp_mb__after_atomic`), then reads the head. We assume that our
230 // user first writes the head and then reads the `need_wakeup` flag as documented. It is
231 // necessary to ensure that at least one observes the other write. By establishing a point
232 // of sequential consistency on both sides between their respective write and read, at
233 // least one coherency order holds. With regards to the interpretation of the atomic memory
234 // model of Rust (that is, that of C++20) we're assuming that an `smp_mb` provides at least
235 // the effect of a `fence(SeqCst)`.
236 //
237 // [#197]: https://github.com/tokio-rs/io-uring/issues/197
238 atomic::fence(atomic::Ordering::SeqCst);
239 unsafe {
240 (*self.queue.flags).load(atomic::Ordering::Relaxed) & sys::IORING_SQ_NEED_WAKEUP != 0
241 }
242 }
243
244 /// The effect of [`Self::need_wakeup`], after synchronization work performed by the caller.
245 ///
246 /// This function should only be called if the caller can guarantee that a `SeqCst` fence has
247 /// been inserted after the last write to the queue's head. The function is then a little more
248 /// efficient by avoiding to perform one itself.
249 ///
250 /// Failure to uphold the precondition can result in an effective dead-lock due to a sleeping
251 /// device.
252 #[inline]
253 pub fn need_wakeup_after_intermittent_seqcst(&self) -> bool {
254 unsafe {
255 (*self.queue.flags).load(atomic::Ordering::Relaxed) & sys::IORING_SQ_NEED_WAKEUP != 0
256 }
257 }
258
259 /// The number of invalid submission queue entries that have been encountered in the ring
260 /// buffer.
261 pub fn dropped(&self) -> u32 {
262 unsafe { (*self.queue.dropped).load(atomic::Ordering::Acquire) }
263 }
264
265 /// Returns `true` if the completion queue ring is overflown.
266 pub fn cq_overflow(&self) -> bool {
267 unsafe {
268 (*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_CQ_OVERFLOW != 0
269 }
270 }
271
272 /// Returns `true` if completions are pending that should be processed. Only relevant when used
273 /// in conjuction with the `setup_taskrun_flag` function. Available since 5.19.
274 pub fn taskrun(&self) -> bool {
275 unsafe { (*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_TASKRUN != 0 }
276 }
277
278 /// Get the total number of entries in the submission queue ring buffer.
279 #[inline]
280 pub fn capacity(&self) -> usize {
281 self.queue.ring_entries as usize
282 }
283
284 /// Get the number of submission queue events in the ring buffer.
285 #[inline]
286 pub fn len(&self) -> usize {
287 self.tail.wrapping_sub(self.head) as usize
288 }
289
290 /// Returns `true` if the submission queue ring buffer is empty.
291 #[inline]
292 pub fn is_empty(&self) -> bool {
293 self.len() == 0
294 }
295
296 /// Returns `true` if the submission queue ring buffer has reached capacity, and no more events
297 /// can be added before the kernel consumes some.
298 #[inline]
299 pub fn is_full(&self) -> bool {
300 self.len() == self.capacity()
301 }
302
303 /// Attempts to push an entry into the queue.
304 /// If the queue is full, an error is returned.
305 ///
306 /// # Safety
307 ///
308 /// Developers must ensure that parameters of the entry (such as buffer) are valid and will
309 /// be valid for the entire duration of the operation, otherwise it may cause memory problems.
310 #[inline]
311 pub unsafe fn push(&mut self, entry: &E) -> Result<(), PushError> {
312 if !self.is_full() {
313 self.push_unchecked(entry);
314 Ok(())
315 } else {
316 Err(PushError)
317 }
318 }
319
320 /// Attempts to push several entries into the queue.
321 /// If the queue does not have space for all of the entries, an error is returned.
322 ///
323 /// # Safety
324 ///
325 /// Developers must ensure that parameters of all the entries (such as buffer) are valid and
326 /// will be valid for the entire duration of the operation, otherwise it may cause memory
327 /// problems.
328 #[inline]
329 pub unsafe fn push_multiple(&mut self, entries: &[E]) -> Result<(), PushError> {
330 if self.capacity() - self.len() < entries.len() {
331 return Err(PushError);
332 }
333
334 for entry in entries {
335 self.push_unchecked(entry);
336 }
337
338 Ok(())
339 }
340
341 #[inline]
342 unsafe fn push_unchecked(&mut self, entry: &E) {
343 *self
344 .queue
345 .sqes
346 .add((self.tail & self.queue.ring_mask) as usize) = entry.clone();
347 self.tail = self.tail.wrapping_add(1);
348 }
349}
350
351impl<E: EntryMarker> Drop for SubmissionQueue<'_, E> {
352 #[inline]
353 fn drop(&mut self) {
354 unsafe { &*self.queue.tail }.store(self.tail, atomic::Ordering::Release);
355 }
356}
357
358impl Entry {
359 /// Set the submission event's [flags](Flags).
360 #[inline]
361 pub fn flags(mut self, flags: Flags) -> Entry {
362 self.0.flags |= flags.bits();
363 self
364 }
365
366 /// Clear the submission event's [flags](Flags).
367 #[inline]
368 pub fn clear_flags(mut self) -> Entry {
369 self.0.flags = 0;
370 self
371 }
372
373 /// Set the user data. This is an application-supplied value that will be passed straight
374 /// through into the [completion queue entry](crate::cqueue::Entry::user_data).
375 #[inline]
376 pub fn user_data(mut self, user_data: u64) -> Entry {
377 self.0.user_data = user_data;
378 self
379 }
380
381 /// Set the user_data without consuming the entry.
382 #[inline]
383 pub fn set_user_data(&mut self, user_data: u64) {
384 self.0.user_data = user_data;
385 }
386
387 /// Get the previously application-supplied user data.
388 #[inline]
389 pub fn get_user_data(&self) -> u64 {
390 self.0.user_data
391 }
392
393 /// Get the opcode associated with this entry.
394 #[inline]
395 pub fn get_opcode(&self) -> u32 {
396 self.0.opcode.into()
397 }
398
399 /// Set the personality of this event. You can obtain a personality using
400 /// [`Submitter::register_personality`](crate::Submitter::register_personality).
401 pub fn personality(mut self, personality: u16) -> Entry {
402 self.0.personality = personality;
403 self
404 }
405}
406
407impl private::Sealed for Entry {}
408
409impl EntryMarker for Entry {
410 const BUILD_FLAGS: u32 = 0;
411}
412
413impl Clone for Entry {
414 #[inline(always)]
415 fn clone(&self) -> Entry {
416 // io_uring_sqe doesn't implement Clone due to the 'cmd' incomplete array field.
417 Entry(unsafe { mem::transmute_copy(&self.0) })
418 }
419}
420
421impl Debug for Entry {
422 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
423 f.debug_struct("Entry")
424 .field("op_code", &self.0.opcode)
425 .field("flags", &self.0.flags)
426 .field("user_data", &self.0.user_data)
427 .finish()
428 }
429}
430
431impl Entry128 {
432 /// Set the submission event's [flags](Flags).
433 #[inline]
434 pub fn flags(mut self, flags: Flags) -> Entry128 {
435 self.0 .0.flags |= flags.bits();
436 self
437 }
438
439 /// Clear the submission event's [flags](Flags).
440 #[inline]
441 pub fn clear_flags(mut self) -> Entry128 {
442 self.0 .0.flags = 0;
443 self
444 }
445
446 /// Set the user data. This is an application-supplied value that will be passed straight
447 /// through into the [completion queue entry](crate::cqueue::Entry::user_data).
448 #[inline]
449 pub fn user_data(mut self, user_data: u64) -> Entry128 {
450 self.0 .0.user_data = user_data;
451 self
452 }
453
454 /// Set the user data without consuming the entry.
455 #[inline]
456 pub fn set_user_data(&mut self, user_data: u64) {
457 self.0 .0.user_data = user_data;
458 }
459
460 /// Set the personality of this event. You can obtain a personality using
461 /// [`Submitter::register_personality`](crate::Submitter::register_personality).
462 #[inline]
463 pub fn personality(mut self, personality: u16) -> Entry128 {
464 self.0 .0.personality = personality;
465 self
466 }
467
468 /// Get the opcode associated with this entry.
469 #[inline]
470 pub fn get_opcode(&self) -> u32 {
471 self.0 .0.opcode.into()
472 }
473}
474
475impl private::Sealed for Entry128 {}
476
477impl EntryMarker for Entry128 {
478 const BUILD_FLAGS: u32 = sys::IORING_SETUP_SQE128;
479}
480
481impl From<Entry> for Entry128 {
482 fn from(entry: Entry) -> Entry128 {
483 Entry128(entry, [0u8; 64])
484 }
485}
486
487impl Debug for Entry128 {
488 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
489 f.debug_struct("Entry128")
490 .field("op_code", &self.0 .0.opcode)
491 .field("flags", &self.0 .0.flags)
492 .field("user_data", &self.0 .0.user_data)
493 .finish()
494 }
495}
496
497/// An error pushing to the submission queue due to it being full.
498#[derive(Debug, Clone, PartialEq, Eq)]
499#[non_exhaustive]
500pub struct PushError;
501
502impl Display for PushError {
503 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
504 f.write_str("submission queue is full")
505 }
506}
507
508impl Error for PushError {}
509
510impl<E: EntryMarker> Debug for SubmissionQueue<'_, E> {
511 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
512 let mut d = f.debug_list();
513 let mut pos = self.head;
514 while pos != self.tail {
515 let entry: &E = unsafe { &*self.queue.sqes.add((pos & self.queue.ring_mask) as usize) };
516 d.entry(&entry);
517 pos = pos.wrapping_add(1);
518 }
519 d.finish()
520 }
521}