tokio/runtime/io/driver/
uring.rs1use io_uring::{squeue::Entry, IoUring};
2use mio::unix::SourceFd;
3use slab::Slab;
4
5use crate::loom::sync::atomic::Ordering;
6use crate::runtime::driver::op::{Cancellable, Lifecycle};
7use crate::{io::Interest, loom::sync::Mutex};
8
9use super::{Handle, TOKEN_WAKEUP};
10
11use std::os::fd::{AsRawFd, RawFd};
12use std::{io, mem, task::Waker};
13
14const DEFAULT_RING_SIZE: u32 = 256;
15
16#[repr(usize)]
17#[derive(Debug, PartialEq, Eq, Copy, Clone)]
18enum State {
19 Uninitialized = 0,
20 Initialized = 1,
21 Unsupported = 2,
22}
23
24impl State {
25 fn as_usize(&self) -> usize {
26 *self as usize
27 }
28
29 fn from_usize(value: usize) -> Self {
30 match value {
31 0 => State::Uninitialized,
32 1 => State::Initialized,
33 2 => State::Unsupported,
34 _ => unreachable!("invalid Uring state: {}", value),
35 }
36 }
37}
38
39pub(crate) struct UringContext {
40 pub(crate) uring: Option<io_uring::IoUring>,
41 pub(crate) ops: slab::Slab<Lifecycle>,
42}
43
44impl UringContext {
45 pub(crate) fn new() -> Self {
46 Self {
47 ops: Slab::new(),
48 uring: None,
49 }
50 }
51
52 pub(crate) fn ring(&self) -> &io_uring::IoUring {
53 self.uring.as_ref().expect("io_uring not initialized")
54 }
55
56 pub(crate) fn ring_mut(&mut self) -> &mut io_uring::IoUring {
57 self.uring.as_mut().expect("io_uring not initialized")
58 }
59
60 pub(crate) fn try_init(&mut self) -> io::Result<bool> {
66 if self.uring.is_some() {
67 return Ok(false);
69 }
70
71 self.uring.replace(IoUring::new(DEFAULT_RING_SIZE)?);
72
73 Ok(true)
74 }
75
76 pub(crate) fn dispatch_completions(&mut self) {
77 let ops = &mut self.ops;
78 let Some(mut uring) = self.uring.take() else {
79 return;
81 };
82
83 let cq = uring.completion();
84
85 for cqe in cq {
86 let idx = cqe.user_data() as usize;
87
88 match ops.get_mut(idx) {
89 Some(Lifecycle::Waiting(waker)) => {
90 waker.wake_by_ref();
91 *ops.get_mut(idx).unwrap() = Lifecycle::Completed(cqe);
92 }
93 Some(Lifecycle::Cancelled(_)) => {
94 ops.remove(idx);
97 }
98 Some(other) => {
99 panic!("unexpected lifecycle for slot {idx}: {other:?}");
100 }
101 None => {
102 panic!("no op at index {idx}");
103 }
104 }
105 }
106
107 self.uring.replace(uring);
108
109 }
111
112 pub(crate) fn submit(&mut self) -> io::Result<()> {
113 loop {
114 match self.ring().submit() {
116 Ok(_) => {
117 return Ok(());
118 }
119
120 Err(ref e) if e.raw_os_error() == Some(libc::EBUSY) => {
122 self.dispatch_completions();
123 }
124 Err(e) => {
126 return Err(e);
127 }
128 }
129 }
130 }
131
132 pub(crate) fn remove_op(&mut self, index: usize) -> Lifecycle {
133 self.ops.remove(index)
134 }
135}
136
137impl Drop for UringContext {
139 fn drop(&mut self) {
140 if self.uring.is_none() {
141 return;
143 }
144
145 while !self.ring_mut().submission().is_empty() {
147 self.submit().expect("Internal error when dropping driver");
148 }
149
150 let mut ops = std::mem::take(&mut self.ops);
151
152 ops.retain(|_, lifecycle| !matches!(lifecycle, Lifecycle::Completed(_)));
154
155 while !ops.is_empty() {
156 self.ring_mut()
158 .submit_and_wait(1)
159 .expect("Internal error when dropping driver");
160
161 for cqe in self.ring_mut().completion() {
162 let idx = cqe.user_data() as usize;
163 ops.remove(idx);
164 }
165 }
166 }
167}
168
169impl Handle {
170 fn add_uring_source(&self, uringfd: RawFd) -> io::Result<()> {
171 let mut source = SourceFd(&uringfd);
172 self.registry
173 .register(&mut source, TOKEN_WAKEUP, Interest::READABLE.to_mio())
174 }
175
176 pub(crate) fn get_uring(&self) -> &Mutex<UringContext> {
177 &self.uring_context
178 }
179
180 fn set_uring_state(&self, state: State) {
181 self.uring_state.store(state.as_usize(), Ordering::Release);
182 }
183
184 pub(crate) fn check_and_init(&self) -> io::Result<bool> {
186 match State::from_usize(self.uring_state.load(Ordering::Acquire)) {
187 State::Uninitialized => match self.try_init() {
188 Ok(()) => {
189 self.set_uring_state(State::Initialized);
190 Ok(true)
191 }
192 Err(e) if e.raw_os_error() == Some(libc::ENOSYS) => {
194 self.set_uring_state(State::Unsupported);
195 Ok(false)
196 }
197 Err(e) => Err(e),
199 },
200 State::Unsupported => Ok(false),
201 State::Initialized => Ok(true),
202 }
203 }
204
205 fn try_init(&self) -> io::Result<()> {
207 let mut guard = self.get_uring().lock();
208 if guard.try_init()? {
209 self.add_uring_source(guard.ring().as_raw_fd())?;
210 }
211
212 Ok(())
213 }
214
215 pub(crate) unsafe fn register_op(&self, entry: Entry, waker: Waker) -> io::Result<usize> {
226 if !self.check_and_init()? {
228 return Err(io::Error::from_raw_os_error(libc::ENOSYS));
229 }
230
231 let mut guard = self.get_uring().lock();
234 let ctx = &mut *guard;
235 let index = ctx.ops.insert(Lifecycle::Waiting(waker));
236 let entry = entry.user_data(index as u64);
237
238 let submit_or_remove = |ctx: &mut UringContext| -> io::Result<()> {
239 if let Err(e) = ctx.submit() {
240 ctx.remove_op(index);
242 return Err(e);
243 }
244 Ok(())
245 };
246
247 while unsafe { ctx.ring_mut().submission().push(&entry).is_err() } {
249 submit_or_remove(ctx)?;
251 }
252
253 while ctx.ring_mut().completion().is_full() {
255 ctx.dispatch_completions();
256 }
257
258 submit_or_remove(ctx)?;
260
261 Ok(index)
262 }
263
264 pub(crate) fn cancel_op<T: Cancellable>(&self, index: usize, data: Option<T>) {
265 let mut guard = self.get_uring().lock();
266 let ctx = &mut *guard;
267 let ops = &mut ctx.ops;
268 let Some(lifecycle) = ops.get_mut(index) else {
269 return;
271 };
272
273 let cancel_data = data.expect("Data should be present").cancel();
277 match mem::replace(lifecycle, Lifecycle::Cancelled(cancel_data)) {
278 Lifecycle::Submitted | Lifecycle::Waiting(_) => (),
279 Lifecycle::Completed(_) => {
281 ops.remove(index);
283 }
284 prev => panic!("Unexpected state: {prev:?}"),
285 };
286 }
287}