fluke_io_uring_async/
linux.rs1use io_uring::{opcode::AsyncCancel, IoUring};
2use std::cell::RefCell;
3use std::future::Future;
4use std::os::unix::prelude::{AsRawFd, RawFd};
5use std::rc::Rc;
6use tokio::io::unix::AsyncFd;
7
8thread_local! {
9 static URING: Rc<IoUringAsync> = {
12 Rc::new(IoUringAsync::new(8).unwrap())
14 };
15}
16
17pub fn get_ring() -> Rc<IoUringAsync> {
19 let mut u = None;
20 URING.with(|u_| u = Some(u_.clone()));
21 u.unwrap()
22}
23
24enum Lifecycle<C: cqueue::Entry> {
26 Submitted,
31 Waiting(std::task::Waker),
36 Completed(C),
39}
40
41pub struct Op<C: cqueue::Entry> {
43 inner: Option<OpInner<C>>,
46}
47
48impl<C: cqueue::Entry> Future for Op<C> {
49 type Output = C;
50
51 fn poll(
52 mut self: std::pin::Pin<&mut Self>,
53 cx: &mut std::task::Context<'_>,
54 ) -> std::task::Poll<Self::Output> {
55 std::pin::Pin::new(self.inner.as_mut().unwrap()).poll(cx)
58 }
59}
60
61impl<C: cqueue::Entry> Drop for Op<C> {
62 fn drop(&mut self) {
63 let inner = self.inner.take().unwrap();
64 let guard = inner.slab.borrow();
65 match &guard[inner.index] {
66 Lifecycle::Completed(_) => {}
67 _ => {
68 drop(guard);
69
70 let op = AsyncCancel::new(inner.index.try_into().unwrap()).build();
72 let mut cancel_fut = get_ring().push(op);
73 let cancel_fut_inner = cancel_fut.inner.take().unwrap();
74 std::mem::forget(cancel_fut);
75
76 tokio::task::spawn_local(async move {
77 cancel_fut_inner.await;
78 inner.await;
79 });
80 }
81 }
82 }
83}
84
85pub struct OpInner<C: cqueue::Entry> {
86 slab: Rc<RefCell<slab::Slab<Lifecycle<C>>>>,
87 index: usize,
88}
89
90impl<C: cqueue::Entry> Future for OpInner<C> {
91 type Output = C;
92
93 fn poll(
94 self: std::pin::Pin<&mut Self>,
95 cx: &mut std::task::Context<'_>,
96 ) -> std::task::Poll<Self::Output> {
97 let mut guard = self.slab.borrow_mut();
98 let lifecycle = &mut guard[self.index];
99 match lifecycle {
100 Lifecycle::Submitted => {
101 *lifecycle = Lifecycle::Waiting(cx.waker().clone());
102 std::task::Poll::Pending
103 }
104 Lifecycle::Waiting(_) => {
105 *lifecycle = Lifecycle::Waiting(cx.waker().clone());
106 std::task::Poll::Pending
107 }
108 Lifecycle::Completed(cqe) => std::task::Poll::Ready(cqe.clone()),
109 }
110 }
111}
112
113impl<C: cqueue::Entry> Drop for OpInner<C> {
114 fn drop(&mut self) {
115 let mut guard = self.slab.borrow_mut();
116 let lifecycle = guard.remove(self.index);
117 match lifecycle {
118 Lifecycle::Completed(_) => {}
119 _ => {
120 if std::thread::panicking() {
121 } else {
123 panic!("Op drop occured before completion (index {})", self.index)
124 }
125 }
126 };
127 }
128}
129
130pub mod cqueue;
131pub mod squeue;
132
133pub struct IoUringAsync<
134 S: squeue::Entry = io_uring::squeue::Entry,
135 C: cqueue::Entry = io_uring::cqueue::Entry,
136> {
137 uring: Rc<IoUring<S, C>>,
138 slab: Rc<RefCell<slab::Slab<Lifecycle<C>>>>,
139}
140
141impl<S: squeue::Entry, C: cqueue::Entry> AsRawFd for IoUringAsync<S, C> {
142 fn as_raw_fd(&self) -> RawFd {
143 self.uring.as_raw_fd()
144 }
145}
146
147impl IoUringAsync<io_uring::squeue::Entry, io_uring::cqueue::Entry> {
148 pub fn new(entries: u32) -> std::io::Result<Self> {
149 Ok(Self {
150 uring: Rc::new(io_uring::IoUring::builder().build(entries)?),
151 slab: Rc::new(RefCell::new(slab::Slab::new())),
152 })
153 }
154}
155
156impl<S: squeue::Entry, C: cqueue::Entry> IoUringAsync<S, C> {
157 pub async fn listen(uring: Rc<IoUringAsync<S, C>>) {
158 let async_fd = AsyncFd::new(uring).unwrap();
159 loop {
160 let mut guard = async_fd.readable().await.unwrap();
161 guard.get_inner().handle_cqe();
162 guard.clear_ready();
163 }
164 }
165
166 pub fn generic_new(entries: u32) -> std::io::Result<Self> {
167 Ok(Self {
168 uring: Rc::new(io_uring::IoUring::builder().build(entries)?),
169 slab: Rc::new(RefCell::new(slab::Slab::new())),
170 })
171 }
172
173 pub fn push(&self, entry: impl Into<S>) -> Op<C> {
174 let mut guard = self.slab.borrow_mut();
175 let index = guard.insert(Lifecycle::Submitted);
176 let entry = entry.into().user_data(index.try_into().unwrap());
177 while unsafe { self.uring.submission_shared().push(&entry).is_err() } {
178 self.uring.submit().unwrap();
179 }
180 Op {
181 inner: Some(OpInner {
182 slab: self.slab.clone(),
183 index,
184 }),
185 }
186 }
187
188 pub fn handle_cqe(&self) {
189 let mut guard = self.slab.borrow_mut();
190 while let Some(cqe) = unsafe { self.uring.completion_shared() }.next() {
191 let index = cqe.user_data();
192 let lifecycle = &mut guard[index.try_into().unwrap()];
193 match lifecycle {
194 Lifecycle::Submitted => {
195 *lifecycle = Lifecycle::Completed(cqe);
196 }
197 Lifecycle::Waiting(waker) => {
198 waker.wake_by_ref();
199 *lifecycle = Lifecycle::Completed(cqe);
200 }
201 Lifecycle::Completed(cqe) => {
202 println!(
203 "multishot operations not implemented: {}, {}",
204 cqe.user_data(),
205 cqe.result()
206 );
207 }
208 }
209 }
210 }
211
212 pub fn submit(&self) -> std::io::Result<usize> {
214 self.uring.submit()
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use super::IoUringAsync;
221 use io_uring::opcode::Nop;
222 use send_wrapper::SendWrapper;
223 use std::rc::Rc;
224
225 #[test]
226 fn example1() {
227 let uring = Rc::new(IoUringAsync::new(8).unwrap());
228 let runtime = tokio::runtime::Builder::new_current_thread()
229 .enable_all()
230 .build()
231 .unwrap();
232
233 runtime.block_on(async move {
234 tokio::task::LocalSet::new()
235 .run_until(async {
236 tokio::task::spawn_local(IoUringAsync::listen(uring.clone()));
237
238 let fut1 = uring.push(Nop::new().build());
239 let fut2 = uring.push(Nop::new().build());
240
241 uring.submit().unwrap();
242
243 let cqe1 = fut1.await;
244 let cqe2 = fut2.await;
245
246 assert!(cqe1.result() >= 0, "nop error: {}", cqe1.result());
247 assert!(cqe2.result() >= 0, "nop error: {}", cqe2.result());
248 })
249 .await;
250 });
251 }
252
253 #[test]
254 fn example2() {
255 let uring = IoUringAsync::new(8).unwrap();
256 let uring = Rc::new(uring);
257
258 let uring_clone = SendWrapper::new(uring.clone());
261 let runtime = tokio::runtime::Builder::new_current_thread()
262 .on_thread_park(move || {
263 uring_clone.submit().unwrap();
264 })
265 .enable_all()
266 .build()
267 .unwrap();
268
269 runtime.block_on(async move {
270 tokio::task::LocalSet::new()
271 .run_until(async {
272 tokio::task::spawn_local(IoUringAsync::listen(uring.clone()));
273
274 let cqe = uring.push(Nop::new().build()).await;
275 assert!(cqe.result() >= 0, "nop error: {}", cqe.result());
276 })
277 .await;
278 });
279 }
280}