mfio/backend/integrations/
async_io.rs

1//! `async-io` 2.0 integration.
2//!
3//! We technically support `async-io` 1, however, the system had a
4//! [limitation](https://github.com/smol-rs/async-io/issues/132) that was only resolved in version
5//! 2.
6
7use async_io::Async;
8use std::os::fd::BorrowedFd;
9
10use super::super::*;
11use super::{BorrowingFn, Integration};
12
13/// async-io integration.
14///
15/// Unlike [`Null`], this integration supports backends with polling handles, however, only
16/// async-io based runtimes are supported, such as smol and async_std.
17///
18/// Internally, this uses async-io's [`Async`] to wait for readiness of the polling FD, which means
19/// only unix platforms are supported.
20///
21/// # Examples
22///
23/// Using the integration with smol:
24///
25/// ```
26/// # mod sample {
27/// #     include!("../../sample.rs");
28/// # }
29/// # use sample::SampleIo;
30/// use mfio::prelude::v1::*;
31///
32/// # #[cfg(all(unix, not(miri)))]
33/// smol::block_on(async {
34///     let mut handle = SampleIo::new(vec![1, 2, 3, 4]);
35///
36///     // Run the integration. Prefer to use `run_with_mut`, so that panics can be avoided.
37///     AsyncIo::run_with_mut(&mut handle, |handle| async move {
38///         // Read value
39///         let val = handle.read(0).await.unwrap();
40///         assert_eq!(1u8, val);
41///     })
42///     .await
43/// });
44/// # #[cfg(not(all(unix, not(miri))))]
45/// # fn main() {}
46/// ```
47#[derive(Clone, Copy, Default)]
48pub struct AsyncIo;
49
50impl Integration for AsyncIo {
51    type Impl<'a, B: LinksIoBackend + 'a, Func: for<'b> BorrowingFn<B::Link>> =
52        AsyncIoImpl<'a, B, Func, Func::Fut<'a>>;
53
54    fn run_with<'a, B: LinksIoBackend + 'a, Func: for<'b> BorrowingFn<B::Link>>(
55        backend: B,
56        func: Func,
57    ) -> Self::Impl<'a, B, Func> {
58        Self::Impl {
59            backend,
60            state: AsyncIoState::Initial(func),
61        }
62    }
63}
64
65enum AsyncIoState<'a, B: IoBackend + ?Sized + 'a, Func, F> {
66    Initial(Func),
67    Loaded(
68        WithBackend<'a, B::Backend, F>,
69        Option<(Async<BorrowedFd<'a>>, &'a PollingFlags, Waker)>,
70    ),
71    Finished,
72}
73
74#[doc(hidden)]
75pub struct AsyncIoImpl<'a, B: LinksIoBackend + 'a, Func, F> {
76    backend: B,
77    state: AsyncIoState<'a, B::Link, Func, F>,
78}
79
80impl<'a, B: LinksIoBackend + 'a, Func: BorrowingFn<B::Link>>
81    AsyncIoImpl<'a, B, Func, Func::Fut<'a>>
82{
83    pub async fn run(backend: B, func: Func) -> <Func::Fut<'a> as Future>::Output {
84        AsyncIo::run_with(backend, func).await
85    }
86}
87
88impl<'a, B: LinksIoBackend + 'a, Func: BorrowingFn<B::Link>> Future
89    for AsyncIoImpl<'a, B, Func, Func::Fut<'a>>
90{
91    type Output = <Func::Fut<'a> as Future>::Output;
92
93    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
94        let this = unsafe { self.get_unchecked_mut() };
95
96        loop {
97            match &mut this.state {
98                AsyncIoState::Initial(_) => {
99                    let func = if let AsyncIoState::Initial(func) =
100                        core::mem::replace(&mut this.state, AsyncIoState::Finished)
101                    {
102                        func
103                    } else {
104                        unreachable!()
105                    };
106                    // SAFETY: the backend reference is pinned
107                    let backend: &'a B::Link =
108                        unsafe { &*(this.backend.get_mut() as *const B::Link) };
109                    let fut = func.call(backend);
110                    let (fut, h) = backend.with_backend(fut);
111                    this.state = AsyncIoState::Loaded(
112                        fut,
113                        h.map(
114                            |PollingHandle {
115                                 handle,
116                                 cur_flags,
117                                 waker,
118                                 ..
119                             }| {
120                                let handle = unsafe { BorrowedFd::borrow_raw(handle) };
121                                (
122                                    Async::new_nonblocking(handle)
123                                        .expect("Could not register the IO resource"),
124                                    cur_flags,
125                                    waker,
126                                )
127                            },
128                        ),
129                    );
130                }
131                AsyncIoState::Loaded(wb, fd) => {
132                    break loop {
133                        if let Poll::Ready(v) = unsafe { Pin::new_unchecked(&mut *wb) }.poll(cx) {
134                            break Poll::Ready(v);
135                        }
136
137                        if let Some((fd, p, _)) = fd {
138                            let (read, write) = p.get();
139                            // TODO: what to do when read = write = false?
140                            let mut ret = Some(Poll::Pending);
141                            if read && fd.poll_readable(cx).is_ready() {
142                                ret = None
143                            }
144                            if write && fd.poll_writable(cx).is_ready() {
145                                ret = None
146                            }
147                            if let Some(ret) = ret {
148                                break ret;
149                            }
150                        }
151                    };
152                }
153                AsyncIoState::Finished => unreachable!(),
154            }
155        }
156    }
157}