tokio/runtime/io/driver/uring.rs
1use io_uring::{squeue::Entry, IoUring, Probe};
2use mio::unix::SourceFd;
3use slab::Slab;
4
5use crate::runtime::driver::op::CancelData;
6use crate::runtime::driver::op::CqeResult;
7use crate::runtime::driver::op::{Cancellable, Lifecycle};
8use crate::{io::Interest, loom::sync::Mutex};
9
10use super::{Handle, TOKEN_WAKEUP};
11
12use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
13use std::{io, mem, task::Waker};
14
15const DEFAULT_RING_SIZE: u32 = 256;
16
17pub(crate) struct UringContext {
18 pub(crate) uring: Option<io_uring::IoUring>,
19 pub(crate) ops: slab::Slab<Lifecycle>,
20}
21
22impl UringContext {
23 pub(crate) fn new() -> Self {
24 Self {
25 ops: Slab::new(),
26 uring: None,
27 }
28 }
29
30 pub(crate) fn ring(&self) -> &io_uring::IoUring {
31 self.uring.as_ref().expect("io_uring not initialized")
32 }
33
34 pub(crate) fn ring_mut(&mut self) -> &mut io_uring::IoUring {
35 self.uring.as_mut().expect("io_uring not initialized")
36 }
37
38 /// Perform `io_uring_setup` system call, and Returns true if this
39 /// actually initialized the io_uring.
40 ///
41 /// If the machine doesn't support io_uring, then this will return an
42 /// `ENOSYS` error.
43 pub(crate) fn try_init(&mut self, probe: &mut Probe) -> io::Result<bool> {
44 if self.uring.is_some() {
45 // Already initialized.
46 return Ok(false);
47 }
48
49 let uring = IoUring::new(DEFAULT_RING_SIZE)?;
50
51 match uring.submitter().register_probe(probe) {
52 Ok(_) => {}
53 Err(e) if e.raw_os_error() == Some(libc::EINVAL) => {
54 // The kernel does not support IORING_REGISTER_PROBE.
55 return Err(io::Error::from_raw_os_error(libc::ENOSYS));
56 }
57 Err(e) => return Err(e),
58 }
59
60 self.uring.replace(uring);
61
62 Ok(true)
63 }
64
65 pub(crate) fn dispatch_completions(&mut self) {
66 let ops = &mut self.ops;
67 let Some(mut uring) = self.uring.take() else {
68 // Uring is not initialized yet.
69 return;
70 };
71
72 let cq = uring.completion();
73
74 for cqe in cq {
75 let idx = cqe.user_data() as usize;
76
77 match ops.get_mut(idx) {
78 Some(Lifecycle::Waiting(waker)) => {
79 waker.wake_by_ref();
80 *ops.get_mut(idx).unwrap() = Lifecycle::Completed(cqe);
81 }
82 Some(Lifecycle::Cancelled(cancel_data)) => {
83 if let CancelData::Open(_) = cancel_data {
84 if let Ok(fd) = CqeResult::from(cqe).result {
85 // SAFETY: the successful CQE result provides
86 // a non-negative integer, and the event is
87 // related to an open operation.
88 unsafe { OwnedFd::from_raw_fd(fd as i32) };
89 }
90 }
91 // Op future was cancelled, so we discard the result.
92 ops.remove(idx);
93 }
94 Some(other) => {
95 panic!("unexpected lifecycle for slot {idx}: {other:?}");
96 }
97 None => {
98 panic!("no op at index {idx}");
99 }
100 }
101 }
102
103 self.uring.replace(uring);
104
105 // `cq`'s drop gets called here, updating the latest head pointer
106 }
107
108 pub(crate) fn submit(&mut self) -> io::Result<()> {
109 loop {
110 // Errors from io_uring_enter: https://man7.org/linux/man-pages/man2/io_uring_enter.2.html#ERRORS
111 match self.ring().submit() {
112 Ok(_) => {
113 return Ok(());
114 }
115
116 // If the submission queue is full, we dispatch completions and try again.
117 Err(ref e) if e.raw_os_error() == Some(libc::EBUSY) => {
118 self.dispatch_completions();
119 }
120 // For other errors, we currently return the error as is.
121 Err(e) => {
122 return Err(e);
123 }
124 }
125 }
126 }
127
128 pub(crate) fn remove_op(&mut self, index: usize) -> Lifecycle {
129 self.ops.remove(index)
130 }
131}
132
133/// Drop the driver, cancelling any in-progress ops and waiting for them to terminate.
134impl Drop for UringContext {
135 fn drop(&mut self) {
136 if self.uring.is_none() {
137 // Uring is not initialized or not supported.
138 return;
139 }
140
141 // Make sure we flush the submission queue before dropping the driver.
142 while !self.ring_mut().submission().is_empty() {
143 self.submit().expect("Internal error when dropping driver");
144 }
145
146 let mut ops = std::mem::take(&mut self.ops);
147
148 // Remove all completed ops since we don't need to wait for them.
149 ops.retain(|_, lifecycle| !matches!(lifecycle, Lifecycle::Completed(_)));
150
151 while !ops.is_empty() {
152 // Wait until at least one completion is available.
153 self.ring_mut()
154 .submit_and_wait(1)
155 .expect("Internal error when dropping driver");
156
157 for cqe in self.ring_mut().completion() {
158 let idx = cqe.user_data() as usize;
159
160 if let Some(Lifecycle::Cancelled(CancelData::Open(_))) = ops.get_mut(idx) {
161 if let Ok(fd) = CqeResult::from(cqe).result {
162 // SAFETY: the successful CQE result provides
163 // a non-negative integer, and the event is
164 // related to an open operation.
165 unsafe { OwnedFd::from_raw_fd(fd as i32) };
166 }
167 };
168
169 ops.remove(idx);
170 }
171 }
172 }
173}
174
175impl Handle {
176 fn add_uring_source(&self, uringfd: RawFd) -> io::Result<()> {
177 let mut source = SourceFd(&uringfd);
178 self.registry
179 .register(&mut source, TOKEN_WAKEUP, Interest::READABLE.to_mio())
180 }
181
182 pub(crate) fn get_uring(&self) -> &Mutex<UringContext> {
183 &self.uring_context
184 }
185
186 /// Returns `true` if io_uring has already been initialized and the given
187 /// opcode is supported. Returns `false` if io_uring hasn't been
188 /// initialized yet or is unsupported. Unlike `check_and_init`, this
189 /// doesn't attempt initialization.
190 #[cfg_attr(test, allow(dead_code))]
191 pub(crate) fn is_uring_ready(&self, opcode: u8) -> bool {
192 self.uring_probe
193 .get()
194 .and_then(|opt| opt.as_ref())
195 .is_some_and(|probe| probe.is_supported(opcode))
196 }
197
198 /// Returns `true` if the io_uring probe has already been attempted
199 /// (regardless of whether io_uring is supported). Returns `false` if
200 /// no probe has been attempted yet.
201 #[cfg_attr(test, allow(dead_code))]
202 pub(crate) fn is_uring_probed(&self) -> bool {
203 self.uring_probe.get().is_some()
204 }
205
206 /// Check if the io_uring context is initialized. If not, it will try to initialize it.
207 /// Then, check if the provided opcode is supported.
208 ///
209 /// If both the context initialization succeeds and the opcode is supported,
210 /// this returns `Ok(true)`.
211 /// If either io_uring is unsupported or the opcode is unsupported,
212 /// this returns `Ok(false)`.
213 /// An error is returned if an io_uring syscall returns an unexpected error value.
214 ///
215 /// TODO: This would like to be a synchronous function,
216 /// but we require `OnceLock::get_or_try_init`.
217 /// <https://github.com/rust-lang/rust/issues/109737>
218 pub(crate) async fn check_and_init(&self, opcode: u8) -> io::Result<bool> {
219 let probe = self
220 .uring_probe
221 .get_or_try_init(|| async {
222 let mut probe = Probe::new();
223 match self.try_init(&mut probe) {
224 Ok(()) => Ok(Some(probe)),
225 // If the system doesn't support io_uring, we set the probe to `None`.
226 Err(e) if e.raw_os_error() == Some(libc::ENOSYS) => Ok(None),
227 // If we get EPERM, io-uring syscalls may be blocked (for example, by seccomp).
228 // In this case, we try to fall back to spawn_blocking for this and future operations.
229 // See also: https://github.com/tokio-rs/tokio/issues/7691
230 Err(e) if e.raw_os_error() == Some(libc::EPERM) => Ok(None),
231 // For other system errors, we just return it.
232 Err(e) => Err(e),
233 }
234 })
235 .await?;
236
237 Ok(probe
238 .as_ref()
239 .is_some_and(|probe| probe.is_supported(opcode)))
240 }
241
242 /// Initialize the io_uring context if it hasn't been initialized yet.
243 fn try_init(&self, probe: &mut Probe) -> io::Result<()> {
244 let mut guard = self.get_uring().lock();
245 if guard.try_init(probe)? {
246 self.add_uring_source(guard.ring().as_raw_fd())?;
247 }
248
249 Ok(())
250 }
251
252 /// Register an operation with the io_uring.
253 ///
254 /// If this is the first io_uring operation, it will also initialize the io_uring context.
255 /// If io_uring isn't supported, this function returns an `ENOSYS` error, so the caller can
256 /// perform custom handling, such as falling back to an alternative mechanism.
257 ///
258 /// # Safety
259 ///
260 /// Callers must ensure that parameters of the entry (such as buffer) are valid and will
261 /// be valid for the entire duration of the operation, otherwise it may cause memory problems.
262 pub(crate) unsafe fn register_op(&self, entry: Entry, waker: Waker) -> io::Result<usize> {
263 assert!(self.uring_probe.initialized());
264
265 // Uring is initialized.
266
267 let mut guard = self.get_uring().lock();
268 let ctx = &mut *guard;
269 let index = ctx.ops.insert(Lifecycle::Waiting(waker));
270 let entry = entry.user_data(index as u64);
271
272 let submit_or_remove = |ctx: &mut UringContext| -> io::Result<()> {
273 if let Err(e) = ctx.submit() {
274 // Submission failed, remove the entry from the slab and return the error
275 ctx.remove_op(index);
276 return Err(e);
277 }
278 Ok(())
279 };
280
281 // SAFETY: entry is valid for the entire duration of the operation
282 while unsafe { ctx.ring_mut().submission().push(&entry).is_err() } {
283 // If the submission queue is full, flush it to the kernel
284 submit_or_remove(ctx)?;
285 }
286
287 // Ensure that the completion queue is not full before submitting the entry.
288 while ctx.ring_mut().completion().is_full() {
289 ctx.dispatch_completions();
290 }
291
292 // Note: For now, we submit the entry immediately without utilizing batching.
293 submit_or_remove(ctx)?;
294
295 Ok(index)
296 }
297
298 pub(crate) fn cancel_op<T: Cancellable>(&self, index: usize, data: Option<T>) {
299 let mut guard = self.get_uring().lock();
300 let ctx = &mut *guard;
301 let ops = &mut ctx.ops;
302 let Some(lifecycle) = ops.get_mut(index) else {
303 // The corresponding index doesn't exist anymore, so this Op is already complete.
304 return;
305 };
306
307 // This Op will be cancelled. Here, we don't remove the lifecycle from the slab to keep
308 // uring data alive until the operation completes.
309
310 let cancel_data = data.expect("Data should be present").cancel();
311 match mem::replace(lifecycle, Lifecycle::Cancelled(cancel_data)) {
312 Lifecycle::Submitted | Lifecycle::Waiting(_) => (),
313 // The driver saw the completion, but it was never polled.
314 Lifecycle::Completed(cqe) => {
315 if let Lifecycle::Cancelled(CancelData::Open(_)) = lifecycle {
316 if let Ok(fd) = CqeResult::from(cqe).result {
317 // SAFETY: the successful CQE result provides
318 // a non-negative integer, and the event is
319 // related to an open operation.
320 unsafe { OwnedFd::from_raw_fd(fd as i32) };
321 }
322 }
323 // We can safely remove the entry from the slab, as it has already been completed.
324 ops.remove(index);
325 }
326 prev => panic!("Unexpected state: {prev:?}"),
327 };
328 }
329}