Skip to main content

agave_io_uring/
ring.rs

1use {
2    crate::slab::FixedSlab,
3    io_uring::{
4        cqueue, squeue,
5        types::{SubmitArgs, Timespec},
6        IoUring,
7    },
8    smallvec::{smallvec, SmallVec},
9    std::{io, os::fd::RawFd, time::Duration},
10};
11
12/// An io_uring instance.
13pub struct Ring<T, E: RingOp<T>> {
14    ring: IoUring,
15    entries: FixedSlab<E>,
16    context: T,
17}
18
19impl<T, E: RingOp<T>> Ring<T, E> {
20    /// Creates a new ring with the provided io_uring instance and context.
21    ///
22    /// The context `T` is a user defined value that will be passed to entries `E` once they
23    /// complete. This value can be used to update state or perform additional actions as operations
24    /// complete asynchronously.
25    pub fn new(ring: IoUring, ctx: T) -> Self {
26        Self {
27            entries: FixedSlab::with_capacity(ring.params().cq_entries() as usize),
28            ring,
29            context: ctx,
30        }
31    }
32
33    /// Returns a reference to the context value.
34    pub fn context(&self) -> &T {
35        &self.context
36    }
37
38    /// Returns a mutable reference to the context value.
39    pub fn context_mut(&mut self) -> &mut T {
40        &mut self.context
41    }
42
43    /// Registers in-memory fixed buffers for I/O with the kernel.
44    ///
45    /// # Safety
46    ///
47    /// Callers must ensure that the iov_base and iov_len values are valid and will be valid until
48    /// buffers are unregistered or the ring destroyed, otherwise undefined behaviour may occur.
49    ///
50    /// See
51    /// [Submitter::register_buffers](https://docs.rs/io-uring/0.6.3/io_uring/struct.Submitter.html#method.register_buffers).
52    pub unsafe fn register_buffers(&self, iovecs: &[libc::iovec]) -> io::Result<()> {
53        self.ring.submitter().register_buffers(iovecs)
54    }
55
56    /// Registers file descriptors as fixed for I/O with the kernel.
57    ///
58    /// Operations may then use `types::Fixed(index)` for index in `fds` to refer to the
59    /// registered file descriptor.
60    ///
61    /// `-1` values can be used as slots for kernel managed fixed file descriptors (created by
62    /// open operation).
63    pub fn register_files(&self, fds: &[RawFd]) -> io::Result<()> {
64        self.ring.submitter().register_files(fds)
65    }
66
67    /// Pushes an operation to the submission queue.
68    ///
69    /// Once completed, [RingOp::complete] will be called with the result.
70    ///
71    /// Note that the operation is not submitted to the kernel until [Ring::submit] is called. If
72    /// the submission queue is full, submit will be called internally to make room for the new
73    /// operation.
74    ///
75    /// See also [Ring::submit].
76    pub fn push(&mut self, op: E) -> io::Result<()> {
77        loop {
78            self.process_completions()?;
79
80            if !self.entries.is_full() {
81                break;
82            }
83            // if the entries slab is full, we need to submit and poll
84            // completions to make room
85            self.submit_and_wait(1, None)?;
86        }
87        let key = self.entries.insert(op);
88        let entry = self.entries.get_mut(key).unwrap().entry();
89        let entry = entry.user_data(key as u64);
90        // Safety: the entry is stored in self.entries and guaranteed to be valid for the lifetime
91        // of the operation. E implementations must still ensure that the entry
92        // remains valid until the last E::complete call.
93        while unsafe { self.ring.submission().push(&entry) }.is_err() {
94            self.submit()?;
95            self.process_completions()?;
96        }
97
98        Ok(())
99    }
100
101    /// Submits all pending operations to the kernel.
102    ///
103    /// If the ring can't accept any more submissions because the completion
104    /// queue is full, this will process completions and retry until the
105    /// submissions are accepted.
106    ///
107    /// See also [Ring::process_completions].
108    pub fn submit(&mut self) -> io::Result<()> {
109        self.submit_and_wait(0, None).map(|_| ())
110    }
111
112    /// Submits all pending operations to the kernel and waits for completions.
113    ///
114    /// If no `timeout` is passed this will block until `want` completions are available. If a
115    /// timeout is passed, this will block until `want` completions are available or the timeout is
116    /// reached.
117    ///
118    /// Returns the number of completions received.
119    pub fn submit_and_wait(&mut self, want: usize, timeout: Option<Duration>) -> io::Result<usize> {
120        let mut args = SubmitArgs::new();
121        let ts;
122        if let Some(timeout) = timeout {
123            ts = Timespec::from(timeout);
124            args = args.timespec(&ts);
125        }
126
127        loop {
128            match self.ring.submitter().submit_with_args(want, &args) {
129                Ok(n) => return Ok(n),
130                Err(e) if e.raw_os_error() == Some(libc::ETIME) => return Ok(0),
131                Err(e) if e.raw_os_error() == Some(libc::EBUSY) => {
132                    // the completion queue is full, process completions and retry
133                    self.process_completions()?;
134                    continue;
135                }
136                Err(e) if e.raw_os_error() == Some(libc::EINTR) => return Ok(0),
137                Err(e) => return Err(e),
138            }
139        }
140    }
141
142    /// Processes completions from the kernel.
143    ///
144    /// This will process all completions currently available in the completion
145    /// queue and invoke [RingOp::complete] for each completed operation.
146    pub fn process_completions(&mut self) -> io::Result<()> {
147        let mut completion = self.ring.completion();
148        let mut new_entries = smallvec![];
149        while let Some(cqe) = completion.next() {
150            let completed_key = cqe.user_data() as usize;
151            let entry = self.entries.get_mut(completed_key).unwrap();
152            let result = entry.result(cqe.result());
153            let mut comp_ctx = Completion {
154                context: &mut self.context,
155                new_entries,
156            };
157            let res = entry.complete(&mut comp_ctx, result);
158            if !cqueue::more(cqe.flags()) {
159                self.entries.remove(completed_key);
160            }
161            res?;
162            new_entries = std::mem::take(&mut comp_ctx.new_entries);
163            if !new_entries.is_empty() {
164                completion.sync();
165                drop(completion);
166                for new_entry in new_entries.drain(..) {
167                    self.push(new_entry)?;
168                }
169                completion = self.ring.completion();
170            }
171        }
172
173        Ok(())
174    }
175
176    /// Drains the ring.
177    ///
178    /// This will submit all pending operations to the kernel and process all
179    /// completions until the ring is empty.
180    pub fn drain(&mut self) -> io::Result<()> {
181        loop {
182            self.process_completions()?;
183
184            if self.entries.is_empty() {
185                break;
186            }
187
188            match self.ring.submitter().submit_with_args(
189                1,
190                &SubmitArgs::new().timespec(&Timespec::from(Duration::from_millis(10))),
191            ) {
192                Ok(_) => {}
193                Err(e) if e.raw_os_error() == Some(libc::ETIME) => {}
194                Err(e) => return Err(e),
195            }
196        }
197
198        Ok(())
199    }
200}
201
202/// Trait for operations that can be submitted to a [Ring].
203pub trait RingOp<T> {
204    fn entry(&mut self) -> squeue::Entry;
205    fn complete(&mut self, ctx: &mut Completion<T, Self>, res: io::Result<i32>) -> io::Result<()>
206    where
207        Self: Sized;
208    fn result(&self, res: i32) -> io::Result<i32> {
209        if res < 0 {
210            Err(io::Error::from_raw_os_error(res.wrapping_neg()))
211        } else {
212            Ok(res)
213        }
214    }
215}
216
217/// Context object passed to [RingOp::complete].
218pub struct Completion<'a, T, E: RingOp<T>> {
219    // Give new_entries a stack size of 2 to avoid heap allocations in the common case where only
220    // one or two ops are queued from a completion handler.
221    //
222    // It's common to want to queue some extra work after a completion, for instance if you've
223    // completed a read and want to close the file descriptor, or if you're doing chained operations
224    // and want to push the next one. It's less common to want to queue many operations.
225    new_entries: SmallVec<[E; 2]>,
226    context: &'a mut T,
227}
228
229impl<T, E: RingOp<T>> Completion<'_, T, E> {
230    /// Returns a reference to the context value stored in a [Ring].
231    pub fn context(&self) -> &T {
232        self.context
233    }
234
235    /// Returns a mutable reference to the context value stored in a [Ring].
236    pub fn context_mut(&mut self) -> &mut T {
237        self.context
238    }
239
240    /// Pushes an operation to the submission queue.
241    ///
242    /// This can be used to push new operations from within [RingOp::complete].
243    ///
244    /// See also [Ring::push].
245    pub fn push(&mut self, op: E) {
246        self.new_entries.push(op);
247    }
248}