tokio/runtime/io/driver/
uring.rs

1use io_uring::{squeue::Entry, IoUring};
2use mio::unix::SourceFd;
3use slab::Slab;
4
5use crate::loom::sync::atomic::Ordering;
6use crate::runtime::driver::op::{Cancellable, Lifecycle};
7use crate::{io::Interest, loom::sync::Mutex};
8
9use super::{Handle, TOKEN_WAKEUP};
10
11use std::os::fd::{AsRawFd, RawFd};
12use std::{io, mem, task::Waker};
13
14const DEFAULT_RING_SIZE: u32 = 256;
15
16#[repr(usize)]
17#[derive(Debug, PartialEq, Eq, Copy, Clone)]
18enum State {
19    Uninitialized = 0,
20    Initialized = 1,
21    Unsupported = 2,
22}
23
24impl State {
25    fn as_usize(&self) -> usize {
26        *self as usize
27    }
28
29    fn from_usize(value: usize) -> Self {
30        match value {
31            0 => State::Uninitialized,
32            1 => State::Initialized,
33            2 => State::Unsupported,
34            _ => unreachable!("invalid Uring state: {}", value),
35        }
36    }
37}
38
39pub(crate) struct UringContext {
40    pub(crate) uring: Option<io_uring::IoUring>,
41    pub(crate) ops: slab::Slab<Lifecycle>,
42}
43
44impl UringContext {
45    pub(crate) fn new() -> Self {
46        Self {
47            ops: Slab::new(),
48            uring: None,
49        }
50    }
51
52    pub(crate) fn ring(&self) -> &io_uring::IoUring {
53        self.uring.as_ref().expect("io_uring not initialized")
54    }
55
56    pub(crate) fn ring_mut(&mut self) -> &mut io_uring::IoUring {
57        self.uring.as_mut().expect("io_uring not initialized")
58    }
59
60    /// Perform `io_uring_setup` system call, and Returns true if this
61    /// actually initialized the io_uring.
62    ///
63    /// If the machine doesn't support io_uring, then this will return an
64    /// `ENOSYS` error.
65    pub(crate) fn try_init(&mut self) -> io::Result<bool> {
66        if self.uring.is_some() {
67            // Already initialized.
68            return Ok(false);
69        }
70
71        self.uring.replace(IoUring::new(DEFAULT_RING_SIZE)?);
72
73        Ok(true)
74    }
75
76    pub(crate) fn dispatch_completions(&mut self) {
77        let ops = &mut self.ops;
78        let Some(mut uring) = self.uring.take() else {
79            // Uring is not initialized yet.
80            return;
81        };
82
83        let cq = uring.completion();
84
85        for cqe in cq {
86            let idx = cqe.user_data() as usize;
87
88            match ops.get_mut(idx) {
89                Some(Lifecycle::Waiting(waker)) => {
90                    waker.wake_by_ref();
91                    *ops.get_mut(idx).unwrap() = Lifecycle::Completed(cqe);
92                }
93                Some(Lifecycle::Cancelled(_)) => {
94                    // Op future was cancelled, so we discard the result.
95                    // We just remove the entry from the slab.
96                    ops.remove(idx);
97                }
98                Some(other) => {
99                    panic!("unexpected lifecycle for slot {idx}: {other:?}");
100                }
101                None => {
102                    panic!("no op at index {idx}");
103                }
104            }
105        }
106
107        self.uring.replace(uring);
108
109        // `cq`'s drop gets called here, updating the latest head pointer
110    }
111
112    pub(crate) fn submit(&mut self) -> io::Result<()> {
113        loop {
114            // Errors from io_uring_enter: https://man7.org/linux/man-pages/man2/io_uring_enter.2.html#ERRORS
115            match self.ring().submit() {
116                Ok(_) => {
117                    return Ok(());
118                }
119
120                // If the submission queue is full, we dispatch completions and try again.
121                Err(ref e) if e.raw_os_error() == Some(libc::EBUSY) => {
122                    self.dispatch_completions();
123                }
124                // For other errors, we currently return the error as is.
125                Err(e) => {
126                    return Err(e);
127                }
128            }
129        }
130    }
131
132    pub(crate) fn remove_op(&mut self, index: usize) -> Lifecycle {
133        self.ops.remove(index)
134    }
135}
136
137/// Drop the driver, cancelling any in-progress ops and waiting for them to terminate.
138impl Drop for UringContext {
139    fn drop(&mut self) {
140        if self.uring.is_none() {
141            // Uring is not initialized or not supported.
142            return;
143        }
144
145        // Make sure we flush the submission queue before dropping the driver.
146        while !self.ring_mut().submission().is_empty() {
147            self.submit().expect("Internal error when dropping driver");
148        }
149
150        let mut ops = std::mem::take(&mut self.ops);
151
152        // Remove all completed ops since we don't need to wait for them.
153        ops.retain(|_, lifecycle| !matches!(lifecycle, Lifecycle::Completed(_)));
154
155        while !ops.is_empty() {
156            // Wait until at least one completion is available.
157            self.ring_mut()
158                .submit_and_wait(1)
159                .expect("Internal error when dropping driver");
160
161            for cqe in self.ring_mut().completion() {
162                let idx = cqe.user_data() as usize;
163                ops.remove(idx);
164            }
165        }
166    }
167}
168
169impl Handle {
170    fn add_uring_source(&self, uringfd: RawFd) -> io::Result<()> {
171        let mut source = SourceFd(&uringfd);
172        self.registry
173            .register(&mut source, TOKEN_WAKEUP, Interest::READABLE.to_mio())
174    }
175
176    pub(crate) fn get_uring(&self) -> &Mutex<UringContext> {
177        &self.uring_context
178    }
179
180    fn set_uring_state(&self, state: State) {
181        self.uring_state.store(state.as_usize(), Ordering::Release);
182    }
183
184    /// Check if the io_uring context is initialized. If not, it will try to initialize it.
185    pub(crate) fn check_and_init(&self) -> io::Result<bool> {
186        match State::from_usize(self.uring_state.load(Ordering::Acquire)) {
187            State::Uninitialized => match self.try_init() {
188                Ok(()) => {
189                    self.set_uring_state(State::Initialized);
190                    Ok(true)
191                }
192                // If the system doesn't support io_uring, we set the state to Unsupported.
193                Err(e) if e.raw_os_error() == Some(libc::ENOSYS) => {
194                    self.set_uring_state(State::Unsupported);
195                    Ok(false)
196                }
197                // For other system errors, we just return it.
198                Err(e) => Err(e),
199            },
200            State::Unsupported => Ok(false),
201            State::Initialized => Ok(true),
202        }
203    }
204
205    /// Initialize the io_uring context if it hasn't been initialized yet.
206    fn try_init(&self) -> io::Result<()> {
207        let mut guard = self.get_uring().lock();
208        if guard.try_init()? {
209            self.add_uring_source(guard.ring().as_raw_fd())?;
210        }
211
212        Ok(())
213    }
214
215    /// Register an operation with the io_uring.
216    ///
217    /// If this is the first io_uring operation, it will also initialize the io_uring context.
218    /// If io_uring isn't supported, this function returns an `ENOSYS` error, so the caller can
219    /// perform custom handling, such as falling back to an alternative mechanism.
220    ///
221    /// # Safety
222    ///
223    /// Callers must ensure that parameters of the entry (such as buffer) are valid and will
224    /// be valid for the entire duration of the operation, otherwise it may cause memory problems.
225    pub(crate) unsafe fn register_op(&self, entry: Entry, waker: Waker) -> io::Result<usize> {
226        // Note: Maybe this check can be removed if upstream callers consistently use `check_and_init`.
227        if !self.check_and_init()? {
228            return Err(io::Error::from_raw_os_error(libc::ENOSYS));
229        }
230
231        // Uring is initialized.
232
233        let mut guard = self.get_uring().lock();
234        let ctx = &mut *guard;
235        let index = ctx.ops.insert(Lifecycle::Waiting(waker));
236        let entry = entry.user_data(index as u64);
237
238        let submit_or_remove = |ctx: &mut UringContext| -> io::Result<()> {
239            if let Err(e) = ctx.submit() {
240                // Submission failed, remove the entry from the slab and return the error
241                ctx.remove_op(index);
242                return Err(e);
243            }
244            Ok(())
245        };
246
247        // SAFETY: entry is valid for the entire duration of the operation
248        while unsafe { ctx.ring_mut().submission().push(&entry).is_err() } {
249            // If the submission queue is full, flush it to the kernel
250            submit_or_remove(ctx)?;
251        }
252
253        // Ensure that the completion queue is not full before submitting the entry.
254        while ctx.ring_mut().completion().is_full() {
255            ctx.dispatch_completions();
256        }
257
258        // Note: For now, we submit the entry immediately without utilizing batching.
259        submit_or_remove(ctx)?;
260
261        Ok(index)
262    }
263
264    pub(crate) fn cancel_op<T: Cancellable>(&self, index: usize, data: Option<T>) {
265        let mut guard = self.get_uring().lock();
266        let ctx = &mut *guard;
267        let ops = &mut ctx.ops;
268        let Some(lifecycle) = ops.get_mut(index) else {
269            // The corresponding index doesn't exist anymore, so this Op is already complete.
270            return;
271        };
272
273        // This Op will be cancelled. Here, we don't remove the lifecycle from the slab to keep
274        // uring data alive until the operation completes.
275
276        let cancel_data = data.expect("Data should be present").cancel();
277        match mem::replace(lifecycle, Lifecycle::Cancelled(cancel_data)) {
278            Lifecycle::Submitted | Lifecycle::Waiting(_) => (),
279            // The driver saw the completion, but it was never polled.
280            Lifecycle::Completed(_) => {
281                // We can safely remove the entry from the slab, as it has already been completed.
282                ops.remove(index);
283            }
284            prev => panic!("Unexpected state: {prev:?}"),
285        };
286    }
287}