atlas_io_uring/
ring.rs

1use {
2    crate::slab::FixedSlab,
3    io_uring::{
4        cqueue, squeue,
5        types::{SubmitArgs, Timespec},
6        IoUring,
7    },
8    smallvec::{smallvec, SmallVec},
9    std::{io, time::Duration},
10};
11
12/// An io_uring instance.
13pub struct Ring<T, E: RingOp<T>> {
14    ring: IoUring,
15    entries: FixedSlab<E>,
16    context: T,
17}
18
19impl<T, E: RingOp<T>> Ring<T, E> {
20    /// Creates a new ring with the provided io_uring instance and context.
21    ///
22    /// The context `T` is a user defined value that will be passed to entries `E` once they
23    /// complete. This value can be used to update state or perform additional actions as operations
24    /// complete asynchronously.
25    pub fn new(ring: IoUring, ctx: T) -> Self {
26        Self {
27            entries: FixedSlab::with_capacity(ring.params().cq_entries() as usize),
28            ring,
29            context: ctx,
30        }
31    }
32
33    /// Returns a reference to the context value.
34    pub fn context(&self) -> &T {
35        &self.context
36    }
37
38    /// Returns a mutable reference to the context value.
39    pub fn context_mut(&mut self) -> &mut T {
40        &mut self.context
41    }
42
43    /// Registers in-memory fixed buffers for I/O with the kernel.
44    ///
45    /// # Safety
46    ///
47    /// Callers must ensure that the iov_base and iov_len values are valid and will be valid until
48    /// buffers are unregistered or the ring destroyed, otherwise undefined behaviour may occur.
49    ///
50    /// See
51    /// [Submitter::register_buffers](https://docs.rs/io-uring/0.6.3/io_uring/struct.Submitter.html#method.register_buffers).
52    pub unsafe fn register_buffers(&self, iovecs: &[libc::iovec]) -> io::Result<()> {
53        self.ring.submitter().register_buffers(iovecs)
54    }
55
56    /// Pushes an operation to the submission queue.
57    ///
58    /// Once completed, [RingOp::complete] will be called with the result.
59    ///
60    /// Note that the operation is not submitted to the kernel until [Ring::submit] is called. If
61    /// the submission queue is full, submit will be called internally to make room for the new
62    /// operation.
63    ///
64    /// See also [Ring::submit].
65    pub fn push(&mut self, op: E) -> io::Result<()> {
66        loop {
67            self.process_completions()?;
68
69            if !self.entries.is_full() {
70                break;
71            }
72            // if the entries slab is full, we need to submit and poll
73            // completions to make room
74            self.submit_and_wait(1, None)?;
75        }
76        let key = self.entries.insert(op);
77        let entry = self.entries.get_mut(key).unwrap().entry();
78        let entry = entry.user_data(key as u64);
79        // Safety: the entry is stored in self.entries and guaranteed to be valid for the lifetime
80        // of the operation. E implementations must still ensure that the entry
81        // remains valid until the last E::complete call.
82        while unsafe { self.ring.submission().push(&entry) }.is_err() {
83            self.submit()?;
84            self.process_completions()?;
85        }
86
87        Ok(())
88    }
89
90    /// Submits all pending operations to the kernel.
91    ///
92    /// If the ring can't accept any more submissions because the completion
93    /// queue is full, this will process completions and retry until the
94    /// submissions are accepted.
95    ///
96    /// See also [Ring::process_completions].
97    pub fn submit(&mut self) -> io::Result<()> {
98        self.submit_and_wait(0, None).map(|_| ())
99    }
100
101    /// Submits all pending operations to the kernel and waits for completions.
102    ///
103    /// If no `timeout` is passed this will block until `want` completions are available. If a
104    /// timeout is passed, this will block until `want` completions are available or the timeout is
105    /// reached.
106    ///
107    /// Returns the number of completions received.
108    pub fn submit_and_wait(&mut self, want: usize, timeout: Option<Duration>) -> io::Result<usize> {
109        let mut args = SubmitArgs::new();
110        let ts;
111        if let Some(timeout) = timeout {
112            ts = Timespec::from(timeout);
113            args = args.timespec(&ts);
114        }
115
116        loop {
117            match self.ring.submitter().submit_with_args(want, &args) {
118                Ok(n) => return Ok(n),
119                Err(e) if e.raw_os_error() == Some(libc::ETIME) => return Ok(0),
120                Err(e) if e.raw_os_error() == Some(libc::EBUSY) => {
121                    // the completion queue is full, process completions and retry
122                    self.process_completions()?;
123                    continue;
124                }
125                Err(e) if e.raw_os_error() == Some(libc::EINTR) => return Ok(0),
126                Err(e) => return Err(e),
127            }
128        }
129    }
130
131    /// Processes completions from the kernel.
132    ///
133    /// This will process all completions currently available in the completion
134    /// queue and invoke [RingOp::complete] for each completed operation.
135    pub fn process_completions(&mut self) -> io::Result<()> {
136        let mut completion = self.ring.completion();
137        let mut new_entries = smallvec![];
138        loop {
139            let Some(cqe) = completion.next() else {
140                break;
141            };
142            let completed_key = cqe.user_data() as usize;
143            let entry = self.entries.get_mut(completed_key).unwrap();
144            let result = entry.result(cqe.result());
145            let mut comp_ctx = Completion {
146                context: &mut self.context,
147                new_entries,
148            };
149            let res = entry.complete(&mut comp_ctx, result);
150            if !cqueue::more(cqe.flags()) {
151                self.entries.remove(completed_key);
152            }
153            res?;
154            new_entries = std::mem::take(&mut comp_ctx.new_entries);
155            if !new_entries.is_empty() {
156                completion.sync();
157                drop(completion);
158                for new_entry in new_entries.drain(..) {
159                    self.push(new_entry)?;
160                }
161                completion = self.ring.completion();
162            }
163        }
164
165        Ok(())
166    }
167
168    /// Drains the ring.
169    ///
170    /// This will submit all pending operations to the kernel and process all
171    /// completions until the ring is empty.
172    pub fn drain(&mut self) -> io::Result<()> {
173        loop {
174            self.process_completions()?;
175
176            if self.entries.is_empty() {
177                break;
178            }
179
180            match self.ring.submitter().submit_with_args(
181                1,
182                &SubmitArgs::new().timespec(&Timespec::from(Duration::from_millis(10))),
183            ) {
184                Ok(_) => {}
185                Err(e) if e.raw_os_error() == Some(libc::ETIME) => {}
186                Err(e) => return Err(e),
187            }
188        }
189
190        Ok(())
191    }
192}
193
194/// Trait for operations that can be submitted to a [Ring].
195pub trait RingOp<T> {
196    fn entry(&mut self) -> squeue::Entry;
197    fn complete(&mut self, ctx: &mut Completion<T, Self>, res: io::Result<i32>) -> io::Result<()>
198    where
199        Self: Sized;
200    fn result(&self, res: i32) -> io::Result<i32> {
201        if res < 0 {
202            Err(io::Error::from_raw_os_error(res.wrapping_neg()))
203        } else {
204            Ok(res)
205        }
206    }
207}
208
209/// Context object passed to [RingOp::complete].
210pub struct Completion<'a, T, E: RingOp<T>> {
211    // Give new_entries a stack size of 2 to avoid heap allocations in the common case where only
212    // one or two ops are queued from a completion handler.
213    //
214    // It's common to want to queue some extra work after a completion, for instance if you've
215    // completed a read and want to close the file descriptor, or if you're doing chained operations
216    // and want to push the next one. It's less common to want to queue many operations.
217    new_entries: SmallVec<[E; 2]>,
218    context: &'a mut T,
219}
220
221impl<T, E: RingOp<T>> Completion<'_, T, E> {
222    /// Returns a reference to the context value stored in a [Ring].
223    pub fn context(&self) -> &T {
224        self.context
225    }
226
227    /// Returns a mutable reference to the context value stored in a [Ring].
228    pub fn context_mut(&mut self) -> &mut T {
229        self.context
230    }
231
232    /// Pushes an operation to the submission queue.
233    ///
234    /// This can be used to push new operations from within [RingOp::complete].
235    ///
236    /// See also [Ring::push].
237    pub fn push(&mut self, op: E) {
238        self.new_entries.push(op);
239    }
240}