async_rs/traits/
reactor.rs

1//! A collection of traits to define a common interface across reactors
2
3use crate::{sys::AsSysFd, traits::AsyncToSocketAddrs};
4use futures_core::Stream;
5use futures_io::{AsyncRead, AsyncWrite};
6use std::{
7    io::{self, Read, Write},
8    net::SocketAddr,
9    ops::Deref,
10    time::{Duration, Instant},
11};
12
13/// A common interface for performing actions on a reactor
14pub trait Reactor {
15    /// The type representing a TCP stream (after tcp_connect) for this reactor
16    type TcpStream: AsyncRead + AsyncWrite + Send + Unpin + 'static;
17
18    /// The type representing a Sleep for this reactor
19    type Sleep: Future + Send + 'static;
20
21    /// Register a synchronous handle, returning an asynchronous one
22    fn register<H: Read + Write + AsSysFd + Send + 'static>(
23        &self,
24        socket: H,
25    ) -> io::Result<impl AsyncRead + AsyncWrite + Send + Unpin + 'static>
26    where
27        Self: Sized;
28
29    /// Sleep for the given duration
30    fn sleep(&self, dur: Duration) -> Self::Sleep
31    where
32        Self: Sized;
33
34    /// Stream that yields at every given interval
35    fn interval(&self, dur: Duration) -> impl Stream<Item = Instant> + Send + 'static
36    where
37        Self: Sized;
38
39    /// Create a TcpStream by connecting to a remote host
40    fn tcp_connect<A: AsyncToSocketAddrs + Send>(
41        &self,
42        addrs: A,
43    ) -> impl Future<Output = io::Result<Self::TcpStream>> + Send
44    where
45        Self: Sync + Sized,
46    {
47        async move {
48            let mut err = None;
49            for addr in addrs.to_socket_addrs().await? {
50                match self.tcp_connect_addr(addr).await {
51                    Ok(stream) => return Ok(stream),
52                    Err(e) => err = Some(e),
53                }
54            }
55            Err(err.unwrap_or_else(|| {
56                io::Error::new(io::ErrorKind::AddrNotAvailable, "couldn't resolve host")
57            }))
58        }
59    }
60
61    /// Create a TcpStream by connecting to a remote host
62    fn tcp_connect_addr(
63        &self,
64        addr: SocketAddr,
65    ) -> impl Future<Output = io::Result<Self::TcpStream>> + Send + 'static
66    where
67        Self: Sized;
68}
69
70impl<R: Deref> Reactor for R
71where
72    R::Target: Reactor + Sized,
73{
74    type TcpStream = <<R as Deref>::Target as Reactor>::TcpStream;
75    type Sleep = <<R as Deref>::Target as Reactor>::Sleep;
76
77    fn register<H: Read + Write + AsSysFd + Send + 'static>(
78        &self,
79        socket: H,
80    ) -> io::Result<impl AsyncRead + AsyncWrite + Send + Unpin + 'static> {
81        self.deref().register(socket)
82    }
83
84    fn sleep(&self, dur: Duration) -> Self::Sleep {
85        self.deref().sleep(dur)
86    }
87
88    fn interval(&self, dur: Duration) -> impl Stream<Item = Instant> + Send + 'static {
89        self.deref().interval(dur)
90    }
91
92    fn tcp_connect_addr(
93        &self,
94        addr: SocketAddr,
95    ) -> impl Future<Output = io::Result<Self::TcpStream>> + Send + 'static {
96        self.deref().tcp_connect_addr(addr)
97    }
98}