async_rs/implementors/
async_io.rs

1use crate::{AsyncIOHandle, IOHandle, Reactor, sys::IO};
2use async_io::{Async, Timer};
3use async_trait::async_trait;
4use futures_core::Stream;
5use std::{
6    future::Future,
7    io,
8    net::{SocketAddr, TcpStream},
9    pin::Pin,
10    task::{self, Context, Poll},
11    time::{Duration, Instant},
12};
13
14/// Dummy object implementing reactor common interfaces on top of async-io
15#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
16pub struct AsyncIO;
17
18#[async_trait]
19impl Reactor for AsyncIO {
20    fn register<H: IO + Send + 'static>(
21        &self,
22        socket: IOHandle<H>,
23    ) -> io::Result<impl AsyncIOHandle + Send> {
24        Async::new(socket)
25    }
26
27    fn sleep(&self, dur: Duration) -> impl Future<Output = ()> {
28        TimerTask(Timer::after(dur))
29    }
30
31    fn interval(&self, dur: Duration) -> impl Stream<Item = Instant> {
32        Timer::interval(dur)
33    }
34
35    async fn tcp_connect(&self, addr: SocketAddr) -> io::Result<impl AsyncIOHandle + Send> {
36        Async::<TcpStream>::connect(addr).await
37    }
38}
39
40pub(crate) struct TimerTask<T: Future + Unpin>(pub(crate) T);
41
42impl<T: Future + Unpin> Future for TimerTask<T> {
43    type Output = ();
44
45    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
46        task::ready!(Pin::new(&mut self.0).poll(cx));
47        Poll::Ready(())
48    }
49}
50
51#[cfg(test)]
52mod tests {
53    use super::*;
54
55    #[test]
56    fn dyn_compat() {
57        struct Test {
58            _reactor: Box<dyn Reactor>,
59        }
60
61        let _ = Test {
62            _reactor: Box::new(AsyncIO),
63        };
64    }
65}