agave_io_uring/ring.rs
1use {
2 crate::slab::FixedSlab,
3 io_uring::{
4 cqueue, squeue,
5 types::{SubmitArgs, Timespec},
6 IoUring,
7 },
8 smallvec::{smallvec, SmallVec},
9 std::{io, os::fd::RawFd, time::Duration},
10};
11
12/// An io_uring instance.
13pub struct Ring<T, E: RingOp<T>> {
14 ring: IoUring,
15 entries: FixedSlab<E>,
16 context: T,
17}
18
19impl<T, E: RingOp<T>> Ring<T, E> {
20 /// Creates a new ring with the provided io_uring instance and context.
21 ///
22 /// The context `T` is a user defined value that will be passed to entries `E` once they
23 /// complete. This value can be used to update state or perform additional actions as operations
24 /// complete asynchronously.
25 pub fn new(ring: IoUring, ctx: T) -> Self {
26 Self {
27 entries: FixedSlab::with_capacity(ring.params().cq_entries() as usize),
28 ring,
29 context: ctx,
30 }
31 }
32
33 /// Returns a reference to the context value.
34 pub fn context(&self) -> &T {
35 &self.context
36 }
37
38 /// Returns a mutable reference to the context value.
39 pub fn context_mut(&mut self) -> &mut T {
40 &mut self.context
41 }
42
43 /// Registers in-memory fixed buffers for I/O with the kernel.
44 ///
45 /// # Safety
46 ///
47 /// Callers must ensure that the iov_base and iov_len values are valid and will be valid until
48 /// buffers are unregistered or the ring destroyed, otherwise undefined behaviour may occur.
49 ///
50 /// See
51 /// [Submitter::register_buffers](https://docs.rs/io-uring/0.6.3/io_uring/struct.Submitter.html#method.register_buffers).
52 pub unsafe fn register_buffers(&self, iovecs: &[libc::iovec]) -> io::Result<()> {
53 self.ring.submitter().register_buffers(iovecs)
54 }
55
56 /// Registers file descriptors as fixed for I/O with the kernel.
57 ///
58 /// Operations may then use `types::Fixed(index)` for index in `fds` to refer to the
59 /// registered file descriptor.
60 ///
61 /// `-1` values can be used as slots for kernel managed fixed file descriptors (created by
62 /// open operation).
63 pub fn register_files(&self, fds: &[RawFd]) -> io::Result<()> {
64 self.ring.submitter().register_files(fds)
65 }
66
67 /// Pushes an operation to the submission queue.
68 ///
69 /// Once completed, [RingOp::complete] will be called with the result.
70 ///
71 /// Note that the operation is not submitted to the kernel until [Ring::submit] is called. If
72 /// the submission queue is full, submit will be called internally to make room for the new
73 /// operation.
74 ///
75 /// See also [Ring::submit].
76 pub fn push(&mut self, op: E) -> io::Result<()> {
77 loop {
78 self.process_completions()?;
79
80 if !self.entries.is_full() {
81 break;
82 }
83 // if the entries slab is full, we need to submit and poll
84 // completions to make room
85 self.submit_and_wait(1, None)?;
86 }
87 let key = self.entries.insert(op);
88 let entry = self.entries.get_mut(key).unwrap().entry();
89 let entry = entry.user_data(key as u64);
90 // Safety: the entry is stored in self.entries and guaranteed to be valid for the lifetime
91 // of the operation. E implementations must still ensure that the entry
92 // remains valid until the last E::complete call.
93 while unsafe { self.ring.submission().push(&entry) }.is_err() {
94 self.submit()?;
95 self.process_completions()?;
96 }
97
98 Ok(())
99 }
100
101 /// Submits all pending operations to the kernel.
102 ///
103 /// If the ring can't accept any more submissions because the completion
104 /// queue is full, this will process completions and retry until the
105 /// submissions are accepted.
106 ///
107 /// See also [Ring::process_completions].
108 pub fn submit(&mut self) -> io::Result<()> {
109 self.submit_and_wait(0, None).map(|_| ())
110 }
111
112 /// Submits all pending operations to the kernel and waits for completions.
113 ///
114 /// If no `timeout` is passed this will block until `want` completions are available. If a
115 /// timeout is passed, this will block until `want` completions are available or the timeout is
116 /// reached.
117 ///
118 /// Returns the number of completions received.
119 pub fn submit_and_wait(&mut self, want: usize, timeout: Option<Duration>) -> io::Result<usize> {
120 let mut args = SubmitArgs::new();
121 let ts;
122 if let Some(timeout) = timeout {
123 ts = Timespec::from(timeout);
124 args = args.timespec(&ts);
125 }
126
127 loop {
128 match self.ring.submitter().submit_with_args(want, &args) {
129 Ok(n) => return Ok(n),
130 Err(e) if e.raw_os_error() == Some(libc::ETIME) => return Ok(0),
131 Err(e) if e.raw_os_error() == Some(libc::EBUSY) => {
132 // the completion queue is full, process completions and retry
133 self.process_completions()?;
134 continue;
135 }
136 Err(e) if e.raw_os_error() == Some(libc::EINTR) => return Ok(0),
137 Err(e) => return Err(e),
138 }
139 }
140 }
141
142 /// Processes completions from the kernel.
143 ///
144 /// This will process all completions currently available in the completion
145 /// queue and invoke [RingOp::complete] for each completed operation.
146 pub fn process_completions(&mut self) -> io::Result<()> {
147 let mut completion = self.ring.completion();
148 let mut new_entries = smallvec![];
149 while let Some(cqe) = completion.next() {
150 let completed_key = cqe.user_data() as usize;
151 let entry = self.entries.get_mut(completed_key).unwrap();
152 let result = entry.result(cqe.result());
153 let mut comp_ctx = Completion {
154 context: &mut self.context,
155 new_entries,
156 };
157 let res = entry.complete(&mut comp_ctx, result);
158 if !cqueue::more(cqe.flags()) {
159 self.entries.remove(completed_key);
160 }
161 res?;
162 new_entries = std::mem::take(&mut comp_ctx.new_entries);
163 if !new_entries.is_empty() {
164 completion.sync();
165 drop(completion);
166 for new_entry in new_entries.drain(..) {
167 self.push(new_entry)?;
168 }
169 completion = self.ring.completion();
170 }
171 }
172
173 Ok(())
174 }
175
176 /// Drains the ring.
177 ///
178 /// This will submit all pending operations to the kernel and process all
179 /// completions until the ring is empty.
180 pub fn drain(&mut self) -> io::Result<()> {
181 loop {
182 self.process_completions()?;
183
184 if self.entries.is_empty() {
185 break;
186 }
187
188 match self.ring.submitter().submit_with_args(
189 1,
190 &SubmitArgs::new().timespec(&Timespec::from(Duration::from_millis(10))),
191 ) {
192 Ok(_) => {}
193 Err(e) if e.raw_os_error() == Some(libc::ETIME) => {}
194 Err(e) => return Err(e),
195 }
196 }
197
198 Ok(())
199 }
200}
201
202/// Trait for operations that can be submitted to a [Ring].
203pub trait RingOp<T> {
204 fn entry(&mut self) -> squeue::Entry;
205 fn complete(&mut self, ctx: &mut Completion<T, Self>, res: io::Result<i32>) -> io::Result<()>
206 where
207 Self: Sized;
208 fn result(&self, res: i32) -> io::Result<i32> {
209 if res < 0 {
210 Err(io::Error::from_raw_os_error(res.wrapping_neg()))
211 } else {
212 Ok(res)
213 }
214 }
215}
216
217/// Context object passed to [RingOp::complete].
218pub struct Completion<'a, T, E: RingOp<T>> {
219 // Give new_entries a stack size of 2 to avoid heap allocations in the common case where only
220 // one or two ops are queued from a completion handler.
221 //
222 // It's common to want to queue some extra work after a completion, for instance if you've
223 // completed a read and want to close the file descriptor, or if you're doing chained operations
224 // and want to push the next one. It's less common to want to queue many operations.
225 new_entries: SmallVec<[E; 2]>,
226 context: &'a mut T,
227}
228
229impl<T, E: RingOp<T>> Completion<'_, T, E> {
230 /// Returns a reference to the context value stored in a [Ring].
231 pub fn context(&self) -> &T {
232 self.context
233 }
234
235 /// Returns a mutable reference to the context value stored in a [Ring].
236 pub fn context_mut(&mut self) -> &mut T {
237 self.context
238 }
239
240 /// Pushes an operation to the submission queue.
241 ///
242 /// This can be used to push new operations from within [RingOp::complete].
243 ///
244 /// See also [Ring::push].
245 pub fn push(&mut self, op: E) {
246 self.new_entries.push(op);
247 }
248}