use crate::{sys::AsSysFd, traits::AsyncToSocketAddrs};
use futures_core::Stream;
use futures_io::{AsyncRead, AsyncWrite};
use std::{
io::{self, Read, Write},
net::SocketAddr,
ops::Deref,
time::{Duration, Instant},
};
pub trait Reactor {
type TcpStream: AsyncRead + AsyncWrite + Send + Unpin + 'static;
type Sleep: Future + Send + 'static;
fn register<H: Read + Write + AsSysFd + Send + 'static>(
&self,
socket: H,
) -> io::Result<impl AsyncRead + AsyncWrite + Send + Unpin + 'static>
where
Self: Sized;
fn sleep(&self, dur: Duration) -> Self::Sleep
where
Self: Sized;
fn interval(&self, dur: Duration) -> impl Stream<Item = Instant> + Send + 'static
where
Self: Sized;
fn tcp_connect<A: AsyncToSocketAddrs + Send>(
&self,
addrs: A,
) -> impl Future<Output = io::Result<Self::TcpStream>> + Send
where
Self: Sync + Sized,
{
async move {
let mut err = None;
for addr in addrs.to_socket_addrs().await? {
match self.tcp_connect_addr(addr).await {
Ok(stream) => return Ok(stream),
Err(e) => err = Some(e),
}
}
Err(err.unwrap_or_else(|| {
io::Error::new(io::ErrorKind::AddrNotAvailable, "couldn't resolve host")
}))
}
}
fn tcp_connect_addr(
&self,
addr: SocketAddr,
) -> impl Future<Output = io::Result<Self::TcpStream>> + Send + 'static
where
Self: Sized;
}
impl<R: Deref> Reactor for R
where
R::Target: Reactor + Sized,
{
type TcpStream = <<R as Deref>::Target as Reactor>::TcpStream;
type Sleep = <<R as Deref>::Target as Reactor>::Sleep;
fn register<H: Read + Write + AsSysFd + Send + 'static>(
&self,
socket: H,
) -> io::Result<impl AsyncRead + AsyncWrite + Send + Unpin + 'static> {
self.deref().register(socket)
}
fn sleep(&self, dur: Duration) -> Self::Sleep {
self.deref().sleep(dur)
}
fn interval(&self, dur: Duration) -> impl Stream<Item = Instant> + Send + 'static {
self.deref().interval(dur)
}
fn tcp_connect_addr(
&self,
addr: SocketAddr,
) -> impl Future<Output = io::Result<Self::TcpStream>> + Send + 'static {
self.deref().tcp_connect_addr(addr)
}
}