Skip to main content

tokio/runtime/io/driver/
uring.rs

1use io_uring::{squeue::Entry, IoUring, Probe};
2use mio::unix::SourceFd;
3use slab::Slab;
4
5use crate::runtime::driver::op::CancelData;
6use crate::runtime::driver::op::CqeResult;
7use crate::runtime::driver::op::{Cancellable, Lifecycle};
8use crate::{io::Interest, loom::sync::Mutex};
9
10use super::{Handle, TOKEN_WAKEUP};
11
12use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
13use std::{io, mem, task::Waker};
14
15const DEFAULT_RING_SIZE: u32 = 256;
16
17pub(crate) struct UringContext {
18    pub(crate) uring: Option<io_uring::IoUring>,
19    pub(crate) ops: slab::Slab<Lifecycle>,
20}
21
22impl UringContext {
23    pub(crate) fn new() -> Self {
24        Self {
25            ops: Slab::new(),
26            uring: None,
27        }
28    }
29
30    pub(crate) fn ring(&self) -> &io_uring::IoUring {
31        self.uring.as_ref().expect("io_uring not initialized")
32    }
33
34    pub(crate) fn ring_mut(&mut self) -> &mut io_uring::IoUring {
35        self.uring.as_mut().expect("io_uring not initialized")
36    }
37
38    /// Perform `io_uring_setup` system call, and Returns true if this
39    /// actually initialized the io_uring.
40    ///
41    /// If the machine doesn't support io_uring, then this will return an
42    /// `ENOSYS` error.
43    pub(crate) fn try_init(&mut self, probe: &mut Probe) -> io::Result<bool> {
44        if self.uring.is_some() {
45            // Already initialized.
46            return Ok(false);
47        }
48
49        let uring = IoUring::new(DEFAULT_RING_SIZE)?;
50
51        match uring.submitter().register_probe(probe) {
52            Ok(_) => {}
53            Err(e) if e.raw_os_error() == Some(libc::EINVAL) => {
54                // The kernel does not support IORING_REGISTER_PROBE.
55                return Err(io::Error::from_raw_os_error(libc::ENOSYS));
56            }
57            Err(e) => return Err(e),
58        }
59
60        self.uring.replace(uring);
61
62        Ok(true)
63    }
64
65    pub(crate) fn dispatch_completions(&mut self) {
66        let ops = &mut self.ops;
67        let Some(mut uring) = self.uring.take() else {
68            // Uring is not initialized yet.
69            return;
70        };
71
72        let cq = uring.completion();
73
74        for cqe in cq {
75            let idx = cqe.user_data() as usize;
76
77            match ops.get_mut(idx) {
78                Some(Lifecycle::Waiting(waker)) => {
79                    waker.wake_by_ref();
80                    *ops.get_mut(idx).unwrap() = Lifecycle::Completed(cqe);
81                }
82                Some(Lifecycle::Cancelled(cancel_data)) => {
83                    if let CancelData::Open(_) = cancel_data {
84                        if let Ok(fd) = CqeResult::from(cqe).result {
85                            // SAFETY: the successful CQE result provides
86                            // a non-negative integer, and the event is
87                            // related to an open operation.
88                            unsafe { OwnedFd::from_raw_fd(fd as i32) };
89                        }
90                    }
91                    // Op future was cancelled, so we discard the result.
92                    ops.remove(idx);
93                }
94                Some(other) => {
95                    panic!("unexpected lifecycle for slot {idx}: {other:?}");
96                }
97                None => {
98                    panic!("no op at index {idx}");
99                }
100            }
101        }
102
103        self.uring.replace(uring);
104
105        // `cq`'s drop gets called here, updating the latest head pointer
106    }
107
108    pub(crate) fn submit(&mut self) -> io::Result<()> {
109        loop {
110            // Errors from io_uring_enter: https://man7.org/linux/man-pages/man2/io_uring_enter.2.html#ERRORS
111            match self.ring().submit() {
112                Ok(_) => {
113                    return Ok(());
114                }
115
116                // If the submission queue is full, we dispatch completions and try again.
117                Err(ref e) if e.raw_os_error() == Some(libc::EBUSY) => {
118                    self.dispatch_completions();
119                }
120                // For other errors, we currently return the error as is.
121                Err(e) => {
122                    return Err(e);
123                }
124            }
125        }
126    }
127
128    pub(crate) fn remove_op(&mut self, index: usize) -> Lifecycle {
129        self.ops.remove(index)
130    }
131}
132
133/// Drop the driver, cancelling any in-progress ops and waiting for them to terminate.
134impl Drop for UringContext {
135    fn drop(&mut self) {
136        if self.uring.is_none() {
137            // Uring is not initialized or not supported.
138            return;
139        }
140
141        // Make sure we flush the submission queue before dropping the driver.
142        while !self.ring_mut().submission().is_empty() {
143            self.submit().expect("Internal error when dropping driver");
144        }
145
146        let mut ops = std::mem::take(&mut self.ops);
147
148        // Remove all completed ops since we don't need to wait for them.
149        ops.retain(|_, lifecycle| !matches!(lifecycle, Lifecycle::Completed(_)));
150
151        while !ops.is_empty() {
152            // Wait until at least one completion is available.
153            self.ring_mut()
154                .submit_and_wait(1)
155                .expect("Internal error when dropping driver");
156
157            for cqe in self.ring_mut().completion() {
158                let idx = cqe.user_data() as usize;
159
160                if let Some(Lifecycle::Cancelled(CancelData::Open(_))) = ops.get_mut(idx) {
161                    if let Ok(fd) = CqeResult::from(cqe).result {
162                        // SAFETY: the successful CQE result provides
163                        // a non-negative integer, and the event is
164                        // related to an open operation.
165                        unsafe { OwnedFd::from_raw_fd(fd as i32) };
166                    }
167                };
168
169                ops.remove(idx);
170            }
171        }
172    }
173}
174
175impl Handle {
176    fn add_uring_source(&self, uringfd: RawFd) -> io::Result<()> {
177        let mut source = SourceFd(&uringfd);
178        self.registry
179            .register(&mut source, TOKEN_WAKEUP, Interest::READABLE.to_mio())
180    }
181
182    pub(crate) fn get_uring(&self) -> &Mutex<UringContext> {
183        &self.uring_context
184    }
185
186    /// Returns `true` if io_uring has already been initialized and the given
187    /// opcode is supported. Returns `false` if io_uring hasn't been
188    /// initialized yet or is unsupported. Unlike `check_and_init`, this
189    /// doesn't attempt initialization.
190    #[cfg_attr(test, allow(dead_code))]
191    pub(crate) fn is_uring_ready(&self, opcode: u8) -> bool {
192        self.uring_probe
193            .get()
194            .and_then(|opt| opt.as_ref())
195            .is_some_and(|probe| probe.is_supported(opcode))
196    }
197
198    /// Returns `true` if the io_uring probe has already been attempted
199    /// (regardless of whether io_uring is supported). Returns `false` if
200    /// no probe has been attempted yet.
201    #[cfg_attr(test, allow(dead_code))]
202    pub(crate) fn is_uring_probed(&self) -> bool {
203        self.uring_probe.get().is_some()
204    }
205
206    /// Check if the io_uring context is initialized. If not, it will try to initialize it.
207    /// Then, check if the provided opcode is supported.
208    ///
209    /// If both the context initialization succeeds and the opcode is supported,
210    /// this returns `Ok(true)`.
211    /// If either io_uring is unsupported or the opcode is unsupported,
212    /// this returns `Ok(false)`.
213    /// An error is returned if an io_uring syscall returns an unexpected error value.
214    ///
215    /// TODO: This would like to be a synchronous function,
216    /// but we require `OnceLock::get_or_try_init`.
217    /// <https://github.com/rust-lang/rust/issues/109737>
218    pub(crate) async fn check_and_init(&self, opcode: u8) -> io::Result<bool> {
219        let probe = self
220            .uring_probe
221            .get_or_try_init(|| async {
222                let mut probe = Probe::new();
223                match self.try_init(&mut probe) {
224                    Ok(()) => Ok(Some(probe)),
225                    // If the system doesn't support io_uring, we set the probe to `None`.
226                    Err(e) if e.raw_os_error() == Some(libc::ENOSYS) => Ok(None),
227                    // If we get EPERM, io-uring syscalls may be blocked (for example, by seccomp).
228                    // In this case, we try to fall back to spawn_blocking for this and future operations.
229                    // See also: https://github.com/tokio-rs/tokio/issues/7691
230                    Err(e) if e.raw_os_error() == Some(libc::EPERM) => Ok(None),
231                    // For other system errors, we just return it.
232                    Err(e) => Err(e),
233                }
234            })
235            .await?;
236
237        Ok(probe
238            .as_ref()
239            .is_some_and(|probe| probe.is_supported(opcode)))
240    }
241
242    /// Initialize the io_uring context if it hasn't been initialized yet.
243    fn try_init(&self, probe: &mut Probe) -> io::Result<()> {
244        let mut guard = self.get_uring().lock();
245        if guard.try_init(probe)? {
246            self.add_uring_source(guard.ring().as_raw_fd())?;
247        }
248
249        Ok(())
250    }
251
252    /// Register an operation with the io_uring.
253    ///
254    /// If this is the first io_uring operation, it will also initialize the io_uring context.
255    /// If io_uring isn't supported, this function returns an `ENOSYS` error, so the caller can
256    /// perform custom handling, such as falling back to an alternative mechanism.
257    ///
258    /// # Safety
259    ///
260    /// Callers must ensure that parameters of the entry (such as buffer) are valid and will
261    /// be valid for the entire duration of the operation, otherwise it may cause memory problems.
262    pub(crate) unsafe fn register_op(&self, entry: Entry, waker: Waker) -> io::Result<usize> {
263        assert!(self.uring_probe.initialized());
264
265        // Uring is initialized.
266
267        let mut guard = self.get_uring().lock();
268        let ctx = &mut *guard;
269        let index = ctx.ops.insert(Lifecycle::Waiting(waker));
270        let entry = entry.user_data(index as u64);
271
272        let submit_or_remove = |ctx: &mut UringContext| -> io::Result<()> {
273            if let Err(e) = ctx.submit() {
274                // Submission failed, remove the entry from the slab and return the error
275                ctx.remove_op(index);
276                return Err(e);
277            }
278            Ok(())
279        };
280
281        // SAFETY: entry is valid for the entire duration of the operation
282        while unsafe { ctx.ring_mut().submission().push(&entry).is_err() } {
283            // If the submission queue is full, flush it to the kernel
284            submit_or_remove(ctx)?;
285        }
286
287        // Ensure that the completion queue is not full before submitting the entry.
288        while ctx.ring_mut().completion().is_full() {
289            ctx.dispatch_completions();
290        }
291
292        // Note: For now, we submit the entry immediately without utilizing batching.
293        submit_or_remove(ctx)?;
294
295        Ok(index)
296    }
297
298    pub(crate) fn cancel_op<T: Cancellable>(&self, index: usize, data: Option<T>) {
299        let mut guard = self.get_uring().lock();
300        let ctx = &mut *guard;
301        let ops = &mut ctx.ops;
302        let Some(lifecycle) = ops.get_mut(index) else {
303            // The corresponding index doesn't exist anymore, so this Op is already complete.
304            return;
305        };
306
307        // This Op will be cancelled. Here, we don't remove the lifecycle from the slab to keep
308        // uring data alive until the operation completes.
309
310        let cancel_data = data.expect("Data should be present").cancel();
311        match mem::replace(lifecycle, Lifecycle::Cancelled(cancel_data)) {
312            Lifecycle::Submitted | Lifecycle::Waiting(_) => (),
313            // The driver saw the completion, but it was never polled.
314            Lifecycle::Completed(cqe) => {
315                if let Lifecycle::Cancelled(CancelData::Open(_)) = lifecycle {
316                    if let Ok(fd) = CqeResult::from(cqe).result {
317                        // SAFETY: the successful CQE result provides
318                        // a non-negative integer, and the event is
319                        // related to an open operation.
320                        unsafe { OwnedFd::from_raw_fd(fd as i32) };
321                    }
322                }
323                // We can safely remove the entry from the slab, as it has already been completed.
324                ops.remove(index);
325            }
326            prev => panic!("Unexpected state: {prev:?}"),
327        };
328    }
329}