async_rs/traits/
reactor.rs

1//! A collection of traits to define a common interface across reactors
2
3use crate::{sys::AsSysFd, traits::AsyncToSocketAddrs};
4use futures_core::Stream;
5use futures_io::{AsyncRead, AsyncWrite};
6use std::{
7    io::{self, Read, Write},
8    net::SocketAddr,
9    ops::Deref,
10    time::{Duration, Instant},
11};
12
13/// A common interface for performing actions on a reactor
14pub trait Reactor {
15    /// The type representing a TCP stream (after tcp_connect) for this reactor
16    type TcpStream: AsyncRead + AsyncWrite + Send + Unpin + 'static;
17
18    /// Register a synchronous handle, returning an asynchronous one
19    fn register<H: Read + Write + AsSysFd + Send + 'static>(
20        &self,
21        socket: H,
22    ) -> io::Result<impl AsyncRead + AsyncWrite + Send + Unpin + 'static>
23    where
24        Self: Sized;
25
26    /// Sleep for the given duration
27    fn sleep(&self, dur: Duration) -> impl Future<Output = ()> + Send + 'static
28    where
29        Self: Sized;
30
31    /// Stream that yields at every given interval
32    fn interval(&self, dur: Duration) -> impl Stream<Item = Instant> + Send + 'static
33    where
34        Self: Sized;
35
36    /// Create a TcpStream by connecting to a remote host
37    fn tcp_connect<A: AsyncToSocketAddrs + Send>(
38        &self,
39        addrs: A,
40    ) -> impl Future<Output = io::Result<Self::TcpStream>> + Send
41    where
42        Self: Sync + Sized,
43    {
44        async move {
45            let mut err = None;
46            for addr in addrs.to_socket_addrs().await? {
47                match self.tcp_connect_addr(addr).await {
48                    Ok(stream) => return Ok(stream),
49                    Err(e) => err = Some(e),
50                }
51            }
52            Err(err.unwrap_or_else(|| {
53                io::Error::new(io::ErrorKind::AddrNotAvailable, "couldn't resolve host")
54            }))
55        }
56    }
57
58    /// Create a TcpStream by connecting to a remote host
59    fn tcp_connect_addr(
60        &self,
61        addr: SocketAddr,
62    ) -> impl Future<Output = io::Result<Self::TcpStream>> + Send + 'static
63    where
64        Self: Sized;
65}
66
67impl<R: Deref> Reactor for R
68where
69    R::Target: Reactor + Sized,
70{
71    type TcpStream = <<R as Deref>::Target as Reactor>::TcpStream;
72
73    fn register<H: Read + Write + AsSysFd + Send + 'static>(
74        &self,
75        socket: H,
76    ) -> io::Result<impl AsyncRead + AsyncWrite + Send + Unpin + 'static> {
77        self.deref().register(socket)
78    }
79
80    fn sleep(&self, dur: Duration) -> impl Future<Output = ()> + Send + 'static {
81        self.deref().sleep(dur)
82    }
83
84    fn interval(&self, dur: Duration) -> impl Stream<Item = Instant> + Send + 'static {
85        self.deref().interval(dur)
86    }
87
88    fn tcp_connect_addr(
89        &self,
90        addr: SocketAddr,
91    ) -> impl Future<Output = io::Result<Self::TcpStream>> + Send + 'static {
92        self.deref().tcp_connect_addr(addr)
93    }
94}