atlas_io_uring/ring.rs
1use {
2 crate::slab::FixedSlab,
3 io_uring::{
4 cqueue, squeue,
5 types::{SubmitArgs, Timespec},
6 IoUring,
7 },
8 smallvec::{smallvec, SmallVec},
9 std::{io, time::Duration},
10};
11
12/// An io_uring instance.
13pub struct Ring<T, E: RingOp<T>> {
14 ring: IoUring,
15 entries: FixedSlab<E>,
16 context: T,
17}
18
19impl<T, E: RingOp<T>> Ring<T, E> {
20 /// Creates a new ring with the provided io_uring instance and context.
21 ///
22 /// The context `T` is a user defined value that will be passed to entries `E` once they
23 /// complete. This value can be used to update state or perform additional actions as operations
24 /// complete asynchronously.
25 pub fn new(ring: IoUring, ctx: T) -> Self {
26 Self {
27 entries: FixedSlab::with_capacity(ring.params().cq_entries() as usize),
28 ring,
29 context: ctx,
30 }
31 }
32
33 /// Returns a reference to the context value.
34 pub fn context(&self) -> &T {
35 &self.context
36 }
37
38 /// Returns a mutable reference to the context value.
39 pub fn context_mut(&mut self) -> &mut T {
40 &mut self.context
41 }
42
43 /// Registers in-memory fixed buffers for I/O with the kernel.
44 ///
45 /// # Safety
46 ///
47 /// Callers must ensure that the iov_base and iov_len values are valid and will be valid until
48 /// buffers are unregistered or the ring destroyed, otherwise undefined behaviour may occur.
49 ///
50 /// See
51 /// [Submitter::register_buffers](https://docs.rs/io-uring/0.6.3/io_uring/struct.Submitter.html#method.register_buffers).
52 pub unsafe fn register_buffers(&self, iovecs: &[libc::iovec]) -> io::Result<()> {
53 self.ring.submitter().register_buffers(iovecs)
54 }
55
56 /// Pushes an operation to the submission queue.
57 ///
58 /// Once completed, [RingOp::complete] will be called with the result.
59 ///
60 /// Note that the operation is not submitted to the kernel until [Ring::submit] is called. If
61 /// the submission queue is full, submit will be called internally to make room for the new
62 /// operation.
63 ///
64 /// See also [Ring::submit].
65 pub fn push(&mut self, op: E) -> io::Result<()> {
66 loop {
67 self.process_completions()?;
68
69 if !self.entries.is_full() {
70 break;
71 }
72 // if the entries slab is full, we need to submit and poll
73 // completions to make room
74 self.submit_and_wait(1, None)?;
75 }
76 let key = self.entries.insert(op);
77 let entry = self.entries.get_mut(key).unwrap().entry();
78 let entry = entry.user_data(key as u64);
79 // Safety: the entry is stored in self.entries and guaranteed to be valid for the lifetime
80 // of the operation. E implementations must still ensure that the entry
81 // remains valid until the last E::complete call.
82 while unsafe { self.ring.submission().push(&entry) }.is_err() {
83 self.submit()?;
84 self.process_completions()?;
85 }
86
87 Ok(())
88 }
89
90 /// Submits all pending operations to the kernel.
91 ///
92 /// If the ring can't accept any more submissions because the completion
93 /// queue is full, this will process completions and retry until the
94 /// submissions are accepted.
95 ///
96 /// See also [Ring::process_completions].
97 pub fn submit(&mut self) -> io::Result<()> {
98 self.submit_and_wait(0, None).map(|_| ())
99 }
100
101 /// Submits all pending operations to the kernel and waits for completions.
102 ///
103 /// If no `timeout` is passed this will block until `want` completions are available. If a
104 /// timeout is passed, this will block until `want` completions are available or the timeout is
105 /// reached.
106 ///
107 /// Returns the number of completions received.
108 pub fn submit_and_wait(&mut self, want: usize, timeout: Option<Duration>) -> io::Result<usize> {
109 let mut args = SubmitArgs::new();
110 let ts;
111 if let Some(timeout) = timeout {
112 ts = Timespec::from(timeout);
113 args = args.timespec(&ts);
114 }
115
116 loop {
117 match self.ring.submitter().submit_with_args(want, &args) {
118 Ok(n) => return Ok(n),
119 Err(e) if e.raw_os_error() == Some(libc::ETIME) => return Ok(0),
120 Err(e) if e.raw_os_error() == Some(libc::EBUSY) => {
121 // the completion queue is full, process completions and retry
122 self.process_completions()?;
123 continue;
124 }
125 Err(e) if e.raw_os_error() == Some(libc::EINTR) => return Ok(0),
126 Err(e) => return Err(e),
127 }
128 }
129 }
130
131 /// Processes completions from the kernel.
132 ///
133 /// This will process all completions currently available in the completion
134 /// queue and invoke [RingOp::complete] for each completed operation.
135 pub fn process_completions(&mut self) -> io::Result<()> {
136 let mut completion = self.ring.completion();
137 let mut new_entries = smallvec![];
138 loop {
139 let Some(cqe) = completion.next() else {
140 break;
141 };
142 let completed_key = cqe.user_data() as usize;
143 let entry = self.entries.get_mut(completed_key).unwrap();
144 let result = entry.result(cqe.result());
145 let mut comp_ctx = Completion {
146 context: &mut self.context,
147 new_entries,
148 };
149 let res = entry.complete(&mut comp_ctx, result);
150 if !cqueue::more(cqe.flags()) {
151 self.entries.remove(completed_key);
152 }
153 res?;
154 new_entries = std::mem::take(&mut comp_ctx.new_entries);
155 if !new_entries.is_empty() {
156 completion.sync();
157 drop(completion);
158 for new_entry in new_entries.drain(..) {
159 self.push(new_entry)?;
160 }
161 completion = self.ring.completion();
162 }
163 }
164
165 Ok(())
166 }
167
168 /// Drains the ring.
169 ///
170 /// This will submit all pending operations to the kernel and process all
171 /// completions until the ring is empty.
172 pub fn drain(&mut self) -> io::Result<()> {
173 loop {
174 self.process_completions()?;
175
176 if self.entries.is_empty() {
177 break;
178 }
179
180 match self.ring.submitter().submit_with_args(
181 1,
182 &SubmitArgs::new().timespec(&Timespec::from(Duration::from_millis(10))),
183 ) {
184 Ok(_) => {}
185 Err(e) if e.raw_os_error() == Some(libc::ETIME) => {}
186 Err(e) => return Err(e),
187 }
188 }
189
190 Ok(())
191 }
192}
193
194/// Trait for operations that can be submitted to a [Ring].
195pub trait RingOp<T> {
196 fn entry(&mut self) -> squeue::Entry;
197 fn complete(&mut self, ctx: &mut Completion<T, Self>, res: io::Result<i32>) -> io::Result<()>
198 where
199 Self: Sized;
200 fn result(&self, res: i32) -> io::Result<i32> {
201 if res < 0 {
202 Err(io::Error::from_raw_os_error(res.wrapping_neg()))
203 } else {
204 Ok(res)
205 }
206 }
207}
208
209/// Context object passed to [RingOp::complete].
210pub struct Completion<'a, T, E: RingOp<T>> {
211 // Give new_entries a stack size of 2 to avoid heap allocations in the common case where only
212 // one or two ops are queued from a completion handler.
213 //
214 // It's common to want to queue some extra work after a completion, for instance if you've
215 // completed a read and want to close the file descriptor, or if you're doing chained operations
216 // and want to push the next one. It's less common to want to queue many operations.
217 new_entries: SmallVec<[E; 2]>,
218 context: &'a mut T,
219}
220
221impl<T, E: RingOp<T>> Completion<'_, T, E> {
222 /// Returns a reference to the context value stored in a [Ring].
223 pub fn context(&self) -> &T {
224 self.context
225 }
226
227 /// Returns a mutable reference to the context value stored in a [Ring].
228 pub fn context_mut(&mut self) -> &mut T {
229 self.context
230 }
231
232 /// Pushes an operation to the submission queue.
233 ///
234 /// This can be used to push new operations from within [RingOp::complete].
235 ///
236 /// See also [Ring::push].
237 pub fn push(&mut self, op: E) {
238 self.new_entries.push(op);
239 }
240}