mfio/backend/integrations/async_io.rs
1//! `async-io` 2.0 integration.
2//!
3//! We technically support `async-io` 1, however, the system had a
4//! [limitation](https://github.com/smol-rs/async-io/issues/132) that was only resolved in version
5//! 2.
6
7use async_io::Async;
8use std::os::fd::BorrowedFd;
9
10use super::super::*;
11use super::{BorrowingFn, Integration};
12
13/// async-io integration.
14///
15/// Unlike [`Null`], this integration supports backends with polling handles, however, only
16/// async-io based runtimes are supported, such as smol and async_std.
17///
18/// Internally, this uses async-io's [`Async`] to wait for readiness of the polling FD, which means
19/// only unix platforms are supported.
20///
21/// # Examples
22///
23/// Using the integration with smol:
24///
25/// ```
26/// # mod sample {
27/// # include!("../../sample.rs");
28/// # }
29/// # use sample::SampleIo;
30/// use mfio::prelude::v1::*;
31///
32/// # #[cfg(all(unix, not(miri)))]
33/// smol::block_on(async {
34/// let mut handle = SampleIo::new(vec![1, 2, 3, 4]);
35///
36/// // Run the integration. Prefer to use `run_with_mut`, so that panics can be avoided.
37/// AsyncIo::run_with_mut(&mut handle, |handle| async move {
38/// // Read value
39/// let val = handle.read(0).await.unwrap();
40/// assert_eq!(1u8, val);
41/// })
42/// .await
43/// });
44/// # #[cfg(not(all(unix, not(miri))))]
45/// # fn main() {}
46/// ```
47#[derive(Clone, Copy, Default)]
48pub struct AsyncIo;
49
50impl Integration for AsyncIo {
51 type Impl<'a, B: LinksIoBackend + 'a, Func: for<'b> BorrowingFn<B::Link>> =
52 AsyncIoImpl<'a, B, Func, Func::Fut<'a>>;
53
54 fn run_with<'a, B: LinksIoBackend + 'a, Func: for<'b> BorrowingFn<B::Link>>(
55 backend: B,
56 func: Func,
57 ) -> Self::Impl<'a, B, Func> {
58 Self::Impl {
59 backend,
60 state: AsyncIoState::Initial(func),
61 }
62 }
63}
64
65enum AsyncIoState<'a, B: IoBackend + ?Sized + 'a, Func, F> {
66 Initial(Func),
67 Loaded(
68 WithBackend<'a, B::Backend, F>,
69 Option<(Async<BorrowedFd<'a>>, &'a PollingFlags, Waker)>,
70 ),
71 Finished,
72}
73
74#[doc(hidden)]
75pub struct AsyncIoImpl<'a, B: LinksIoBackend + 'a, Func, F> {
76 backend: B,
77 state: AsyncIoState<'a, B::Link, Func, F>,
78}
79
80impl<'a, B: LinksIoBackend + 'a, Func: BorrowingFn<B::Link>>
81 AsyncIoImpl<'a, B, Func, Func::Fut<'a>>
82{
83 pub async fn run(backend: B, func: Func) -> <Func::Fut<'a> as Future>::Output {
84 AsyncIo::run_with(backend, func).await
85 }
86}
87
88impl<'a, B: LinksIoBackend + 'a, Func: BorrowingFn<B::Link>> Future
89 for AsyncIoImpl<'a, B, Func, Func::Fut<'a>>
90{
91 type Output = <Func::Fut<'a> as Future>::Output;
92
93 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
94 let this = unsafe { self.get_unchecked_mut() };
95
96 loop {
97 match &mut this.state {
98 AsyncIoState::Initial(_) => {
99 let func = if let AsyncIoState::Initial(func) =
100 core::mem::replace(&mut this.state, AsyncIoState::Finished)
101 {
102 func
103 } else {
104 unreachable!()
105 };
106 // SAFETY: the backend reference is pinned
107 let backend: &'a B::Link =
108 unsafe { &*(this.backend.get_mut() as *const B::Link) };
109 let fut = func.call(backend);
110 let (fut, h) = backend.with_backend(fut);
111 this.state = AsyncIoState::Loaded(
112 fut,
113 h.map(
114 |PollingHandle {
115 handle,
116 cur_flags,
117 waker,
118 ..
119 }| {
120 let handle = unsafe { BorrowedFd::borrow_raw(handle) };
121 (
122 Async::new_nonblocking(handle)
123 .expect("Could not register the IO resource"),
124 cur_flags,
125 waker,
126 )
127 },
128 ),
129 );
130 }
131 AsyncIoState::Loaded(wb, fd) => {
132 break loop {
133 if let Poll::Ready(v) = unsafe { Pin::new_unchecked(&mut *wb) }.poll(cx) {
134 break Poll::Ready(v);
135 }
136
137 if let Some((fd, p, _)) = fd {
138 let (read, write) = p.get();
139 // TODO: what to do when read = write = false?
140 let mut ret = Some(Poll::Pending);
141 if read && fd.poll_readable(cx).is_ready() {
142 ret = None
143 }
144 if write && fd.poll_writable(cx).is_ready() {
145 ret = None
146 }
147 if let Some(ret) = ret {
148 break ret;
149 }
150 }
151 };
152 }
153 AsyncIoState::Finished => unreachable!(),
154 }
155 }
156 }
157}