linux_io_uring/
squeue.rs

1//! Submission Queue
2
3use std::sync::atomic;
4use bitflags::bitflags;
5use linux_io_uring_sys as sys;
6use crate::util::{ Mmap, unsync_load };
7use crate::mmap_offset;
8
9
10pub struct SubmissionQueue {
11    pub(crate) head: *const atomic::AtomicU32,
12    pub(crate) tail: *const atomic::AtomicU32,
13    pub(crate) ring_mask: *const u32,
14    pub(crate) ring_entries: *const u32,
15    pub(crate) flags: *const atomic::AtomicU32,
16    dropped: *const atomic::AtomicU32,
17
18    #[allow(dead_code)]
19    array: *mut u32,
20
21    pub(crate) sqes: *mut sys::io_uring_sqe
22}
23
24pub struct AvailableQueue<'a> {
25    head: u32,
26    tail: u32,
27    ring_mask: u32,
28    ring_entries: u32,
29    queue: &'a mut SubmissionQueue
30}
31
32/// Submission Entry
33#[repr(transparent)]
34#[derive(Clone)]
35pub struct Entry(pub(crate) sys::io_uring_sqe);
36
37bitflags!{
38    /// Submission flags
39    pub struct Flags: u8 {
40        /// When this flag is specified,
41        /// `fd` is an index into the files array registered with the io_uring instance.
42        #[doc(hidden)]
43        const FIXED_FILE = sys::IOSQE_FIXED_FILE as _;
44
45        /// When this flag is specified,
46        /// the SQE will not be started before previously submitted SQEs have completed,
47        /// and new SQEs will not be started before this one completes.
48        const IO_DRAIN = sys::IOSQE_IO_DRAIN as _;
49
50        /// When this flag is specified,
51        /// it forms a link with the next SQE in the submission ring.
52        /// That next SQE will not be started before this one completes.
53        const IO_LINK = sys::IOSQE_IO_LINK as _;
54
55        #[cfg(feature = "unstable")]
56        const IO_HARDLINK = sys::IOSQE_IO_HARDLINK as _;
57    }
58}
59
60impl SubmissionQueue {
61    pub(crate) unsafe fn new(sq_mmap: &Mmap, sqe_mmap: &Mmap, p: &sys::io_uring_params) -> SubmissionQueue {
62        mmap_offset!{
63            let head            = sq_mmap + p.sq_off.head           => *const atomic::AtomicU32;
64            let tail            = sq_mmap + p.sq_off.tail           => *const atomic::AtomicU32;
65            let ring_mask       = sq_mmap + p.sq_off.ring_mask      => *const u32;
66            let ring_entries    = sq_mmap + p.sq_off.ring_entries   => *const u32;
67            let flags           = sq_mmap + p.sq_off.flags          => *const atomic::AtomicU32;
68            let dropped         = sq_mmap + p.sq_off.dropped        => *const atomic::AtomicU32;
69            let array           = sq_mmap + p.sq_off.array          => *mut u32;
70
71            let sqes            = sqe_mmap + 0                      => *mut sys::io_uring_sqe;
72        }
73
74        // To keep it simple, map it directly to `sqes`.
75        for i in 0..*ring_entries {
76            *array.add(i as usize) = i;
77        }
78
79        SubmissionQueue {
80            head, tail,
81            ring_mask, ring_entries,
82            flags, dropped,
83            array,
84            sqes
85        }
86    }
87
88    pub fn need_wakeup(&self) -> bool {
89        unsafe {
90            (*self.flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_NEED_WAKEUP
91                != 0
92        }
93    }
94
95    pub fn dropped(&self) -> u32 {
96        unsafe {
97            (*self.dropped).load(atomic::Ordering::Acquire)
98        }
99    }
100
101    pub fn capacity(&self) -> usize {
102        unsafe {
103            self.ring_entries.read_volatile() as usize
104        }
105    }
106
107    pub fn len(&self) -> usize {
108        let head = unsafe { (*self.head).load(atomic::Ordering::Acquire) };
109        let tail = unsafe { unsync_load(self.tail) };
110
111        tail.wrapping_sub(head) as usize
112    }
113
114    pub fn is_empty(&self) -> bool {
115        let head = unsafe { (*self.head).load(atomic::Ordering::Acquire) };
116        let tail = unsafe { unsync_load(self.tail) };
117
118        head == tail
119    }
120
121    pub fn is_full(&self) -> bool {
122        self.len() == self.capacity()
123    }
124
125    /// Get currently available submission queue
126    pub fn available(&mut self) -> AvailableQueue<'_> {
127        unsafe {
128            AvailableQueue {
129                head: (*self.head).load(atomic::Ordering::Acquire),
130                tail: unsync_load(self.tail),
131                ring_mask: self.ring_mask.read_volatile(),
132                ring_entries: self.ring_entries.read_volatile(),
133                queue: self
134            }
135        }
136    }
137}
138
139impl AvailableQueue<'_> {
140    /// Sync queue
141    pub fn sync(&mut self) {
142        unsafe {
143            (*self.queue.tail).store(self.tail, atomic::Ordering::Release);
144            self.head = (*self.queue.head).load(atomic::Ordering::Acquire);
145        }
146    }
147
148    pub fn capacity(&self) -> usize {
149        self.ring_entries as usize
150    }
151
152    pub fn len(&self) -> usize {
153        self.tail.wrapping_sub(self.head) as usize
154    }
155
156    pub fn is_empty(&self) -> bool {
157        self.head == self.tail
158    }
159
160    pub fn is_full(&self) -> bool {
161        self.tail.wrapping_sub(self.head) == self.ring_entries
162    }
163
164    /// Attempts to push an [Entry] into the queue.
165    /// If the queue is full, the element is returned back as an error.
166    ///
167    /// # Safety
168    ///
169    /// Developers must ensure that parameters of the [Entry] (such as buffer) are valid,
170    /// otherwise it may cause memory problems.
171    pub unsafe fn push(&mut self, Entry(entry): Entry) -> Result<(), Entry> {
172        if self.len() < self.capacity() {
173            *self.queue.sqes.add((self.tail & self.ring_mask) as usize)
174                = entry;
175            self.tail = self.tail.wrapping_add(1);
176            Ok(())
177        } else {
178            Err(Entry(entry))
179        }
180    }
181}
182
183impl Drop for AvailableQueue<'_> {
184    fn drop(&mut self) {
185        unsafe {
186            (*self.queue.tail).store(self.tail, atomic::Ordering::Release);
187        }
188    }
189}
190
191impl Entry {
192    /// Set [Submission flags](Flags)
193    pub fn flags(mut self, flags: Flags) -> Entry {
194        self.0.flags |= flags.bits();
195        self
196    }
197
198    /// The `user_data` is an application-supplied value that will be copied into the completion queue
199    /// entry (see below).
200    pub fn user_data(mut self, user_data: u64) -> Entry {
201        self.0.user_data = user_data;
202        self
203    }
204}