fluke_io_uring_async/
linux.rs

1use io_uring::{opcode::AsyncCancel, IoUring};
2use std::cell::RefCell;
3use std::future::Future;
4use std::os::unix::prelude::{AsRawFd, RawFd};
5use std::rc::Rc;
6use tokio::io::unix::AsyncFd;
7
8thread_local! {
9    // This is a thread-local for now, but it shouldn't be. This is only the case
10    // for op cancellations.
11    static URING: Rc<IoUringAsync> = {
12        // FIXME: magic values
13        Rc::new(IoUringAsync::new(8).unwrap())
14    };
15}
16
17/// Returns the thread-local IoUringAsync instance
18pub fn get_ring() -> Rc<IoUringAsync> {
19    let mut u = None;
20    URING.with(|u_| u = Some(u_.clone()));
21    u.unwrap()
22}
23
24// The IoUring Op state.
25enum Lifecycle<C: cqueue::Entry> {
26    // The Op has been pushed onto the submission queue, but has not yet
27    // polled by the Rust async runtime. This state is somewhat confusingly named
28    // in that an Op in the `submitted` state has not necessarily been
29    // submitted to the io_uring with the `io_uring_submit` syscall.
30    Submitted,
31    // The Rust async runtime has polled the Op, but a completion
32    // queue entry has not yet been received. When a completion queue entry is
33    // received, the Waker can be used to trigger the Rust async runtime to poll
34    // the Op.
35    Waiting(std::task::Waker),
36    // The Op has received a submission queue entry. The Op will
37    // be Ready the next time that it is polled.
38    Completed(C),
39}
40
41// An Future implementation that represents the current state of an IoUring Op.
42pub struct Op<C: cqueue::Entry> {
43    // Ownership over the OpInner value is moved to a new tokio
44    // task when an Op is dropped.
45    inner: Option<OpInner<C>>,
46}
47
48impl<C: cqueue::Entry> Future for Op<C> {
49    type Output = C;
50
51    fn poll(
52        mut self: std::pin::Pin<&mut Self>,
53        cx: &mut std::task::Context<'_>,
54    ) -> std::task::Poll<Self::Output> {
55        // It is safe to unwrap inner because it is only set to None after
56        // the Op has been dropped.
57        std::pin::Pin::new(self.inner.as_mut().unwrap()).poll(cx)
58    }
59}
60
61impl<C: cqueue::Entry> Drop for Op<C> {
62    fn drop(&mut self) {
63        let inner = self.inner.take().unwrap();
64        let guard = inner.slab.borrow();
65        match &guard[inner.index] {
66            Lifecycle::Completed(_) => {}
67            _ => {
68                drop(guard);
69
70                // submit cancel op
71                let op = AsyncCancel::new(inner.index.try_into().unwrap()).build();
72                let mut cancel_fut = get_ring().push(op);
73                let cancel_fut_inner = cancel_fut.inner.take().unwrap();
74                std::mem::forget(cancel_fut);
75
76                tokio::task::spawn_local(async move {
77                    cancel_fut_inner.await;
78                    inner.await;
79                });
80            }
81        }
82    }
83}
84
85pub struct OpInner<C: cqueue::Entry> {
86    slab: Rc<RefCell<slab::Slab<Lifecycle<C>>>>,
87    index: usize,
88}
89
90impl<C: cqueue::Entry> Future for OpInner<C> {
91    type Output = C;
92
93    fn poll(
94        self: std::pin::Pin<&mut Self>,
95        cx: &mut std::task::Context<'_>,
96    ) -> std::task::Poll<Self::Output> {
97        let mut guard = self.slab.borrow_mut();
98        let lifecycle = &mut guard[self.index];
99        match lifecycle {
100            Lifecycle::Submitted => {
101                *lifecycle = Lifecycle::Waiting(cx.waker().clone());
102                std::task::Poll::Pending
103            }
104            Lifecycle::Waiting(_) => {
105                *lifecycle = Lifecycle::Waiting(cx.waker().clone());
106                std::task::Poll::Pending
107            }
108            Lifecycle::Completed(cqe) => std::task::Poll::Ready(cqe.clone()),
109        }
110    }
111}
112
113impl<C: cqueue::Entry> Drop for OpInner<C> {
114    fn drop(&mut self) {
115        let mut guard = self.slab.borrow_mut();
116        let lifecycle = guard.remove(self.index);
117        match lifecycle {
118            Lifecycle::Completed(_) => {}
119            _ => {
120                if std::thread::panicking() {
121                    // thread is panicking, eschewing drop cleanliness check
122                } else {
123                    panic!("Op drop occured before completion (index {})", self.index)
124                }
125            }
126        };
127    }
128}
129
130pub mod cqueue;
131pub mod squeue;
132
133pub struct IoUringAsync<
134    S: squeue::Entry = io_uring::squeue::Entry,
135    C: cqueue::Entry = io_uring::cqueue::Entry,
136> {
137    uring: Rc<IoUring<S, C>>,
138    slab: Rc<RefCell<slab::Slab<Lifecycle<C>>>>,
139}
140
141impl<S: squeue::Entry, C: cqueue::Entry> AsRawFd for IoUringAsync<S, C> {
142    fn as_raw_fd(&self) -> RawFd {
143        self.uring.as_raw_fd()
144    }
145}
146
147impl IoUringAsync<io_uring::squeue::Entry, io_uring::cqueue::Entry> {
148    pub fn new(entries: u32) -> std::io::Result<Self> {
149        Ok(Self {
150            uring: Rc::new(io_uring::IoUring::builder().build(entries)?),
151            slab: Rc::new(RefCell::new(slab::Slab::new())),
152        })
153    }
154}
155
156impl<S: squeue::Entry, C: cqueue::Entry> IoUringAsync<S, C> {
157    pub async fn listen(uring: Rc<IoUringAsync<S, C>>) {
158        let async_fd = AsyncFd::new(uring).unwrap();
159        loop {
160            let mut guard = async_fd.readable().await.unwrap();
161            guard.get_inner().handle_cqe();
162            guard.clear_ready();
163        }
164    }
165
166    pub fn generic_new(entries: u32) -> std::io::Result<Self> {
167        Ok(Self {
168            uring: Rc::new(io_uring::IoUring::builder().build(entries)?),
169            slab: Rc::new(RefCell::new(slab::Slab::new())),
170        })
171    }
172
173    pub fn push(&self, entry: impl Into<S>) -> Op<C> {
174        let mut guard = self.slab.borrow_mut();
175        let index = guard.insert(Lifecycle::Submitted);
176        let entry = entry.into().user_data(index.try_into().unwrap());
177        while unsafe { self.uring.submission_shared().push(&entry).is_err() } {
178            self.uring.submit().unwrap();
179        }
180        Op {
181            inner: Some(OpInner {
182                slab: self.slab.clone(),
183                index,
184            }),
185        }
186    }
187
188    pub fn handle_cqe(&self) {
189        let mut guard = self.slab.borrow_mut();
190        while let Some(cqe) = unsafe { self.uring.completion_shared() }.next() {
191            let index = cqe.user_data();
192            let lifecycle = &mut guard[index.try_into().unwrap()];
193            match lifecycle {
194                Lifecycle::Submitted => {
195                    *lifecycle = Lifecycle::Completed(cqe);
196                }
197                Lifecycle::Waiting(waker) => {
198                    waker.wake_by_ref();
199                    *lifecycle = Lifecycle::Completed(cqe);
200                }
201                Lifecycle::Completed(cqe) => {
202                    println!(
203                        "multishot operations not implemented: {}, {}",
204                        cqe.user_data(),
205                        cqe.result()
206                    );
207                }
208            }
209        }
210    }
211
212    /// Submit all queued submission queue events to the kernel.
213    pub fn submit(&self) -> std::io::Result<usize> {
214        self.uring.submit()
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use super::IoUringAsync;
221    use io_uring::opcode::Nop;
222    use send_wrapper::SendWrapper;
223    use std::rc::Rc;
224
225    #[test]
226    fn example1() {
227        let uring = Rc::new(IoUringAsync::new(8).unwrap());
228        let runtime = tokio::runtime::Builder::new_current_thread()
229            .enable_all()
230            .build()
231            .unwrap();
232
233        runtime.block_on(async move {
234            tokio::task::LocalSet::new()
235                .run_until(async {
236                    tokio::task::spawn_local(IoUringAsync::listen(uring.clone()));
237
238                    let fut1 = uring.push(Nop::new().build());
239                    let fut2 = uring.push(Nop::new().build());
240
241                    uring.submit().unwrap();
242
243                    let cqe1 = fut1.await;
244                    let cqe2 = fut2.await;
245
246                    assert!(cqe1.result() >= 0, "nop error: {}", cqe1.result());
247                    assert!(cqe2.result() >= 0, "nop error: {}", cqe2.result());
248                })
249                .await;
250        });
251    }
252
253    #[test]
254    fn example2() {
255        let uring = IoUringAsync::new(8).unwrap();
256        let uring = Rc::new(uring);
257
258        // Create a new current_thread runtime that submits all outstanding submission queue
259        // entries as soon as the executor goes idle.
260        let uring_clone = SendWrapper::new(uring.clone());
261        let runtime = tokio::runtime::Builder::new_current_thread()
262            .on_thread_park(move || {
263                uring_clone.submit().unwrap();
264            })
265            .enable_all()
266            .build()
267            .unwrap();
268
269        runtime.block_on(async move {
270            tokio::task::LocalSet::new()
271                .run_until(async {
272                    tokio::task::spawn_local(IoUringAsync::listen(uring.clone()));
273
274                    let cqe = uring.push(Nop::new().build()).await;
275                    assert!(cqe.result() >= 0, "nop error: {}", cqe.result());
276                })
277                .await;
278        });
279    }
280}