pub use self::addr_stream::AddrStream;
use async_std::net::{SocketAddr, TcpListener};
use futures::FutureExt as _;
use futures_timer::Delay;
use hyper::server::accept::Accept;
use log::{debug, error, trace};
use std::fmt;
use std::future::Future;
use std::io;
use std::net::{TcpListener as StdListener, ToSocketAddrs};
use std::pin::Pin;
use std::task::{self, Poll};
use std::time::Duration;
#[must_use = "streams do nothing unless polled"]
pub struct AddrIncoming {
addr: SocketAddr,
listener: TcpListener,
sleep_on_errors: bool,
tcp_nodelay: bool,
timeout: Option<Delay>,
}
impl AddrIncoming {
pub(super) fn new(addr: impl ToSocketAddrs) -> io::Result<Self> {
let listener = StdListener::bind(addr)?;
AddrIncoming::from_std(listener)
}
pub(super) fn from_std(listener: StdListener) -> io::Result<Self> {
let addr = listener.local_addr()?;
Ok(AddrIncoming {
listener: listener.into(),
addr,
sleep_on_errors: true,
tcp_nodelay: false,
timeout: None,
})
}
pub fn bind(addr: impl ToSocketAddrs) -> io::Result<Self> {
AddrIncoming::new(addr)
}
pub fn local_addr(&self) -> SocketAddr {
self.addr
}
#[cfg_attr(tarpaulin, skip)]
pub fn set_nodelay(&mut self, enabled: bool) -> &mut Self {
self.tcp_nodelay = enabled;
self
}
#[cfg_attr(tarpaulin, skip)]
pub fn set_sleep_on_errors(&mut self, val: bool) {
self.sleep_on_errors = val;
}
fn poll_next_(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<AddrStream>> {
if let Some(ref mut to) = self.timeout {
match Pin::new(to).poll(cx) {
Poll::Ready(()) => {}
Poll::Pending => return Poll::Pending,
}
}
self.timeout = None;
let accept = self.listener.accept();
futures::pin_mut!(accept);
loop {
match accept.poll_unpin(cx) {
Poll::Ready(Ok((socket, addr))) => {
if let Err(e) = socket.set_nodelay(self.tcp_nodelay) {
trace!("error trying to set TCP nodelay: {}", e);
}
return Poll::Ready(Ok(AddrStream::new(socket, addr)));
}
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(e)) => {
if is_connection_error(&e) {
debug!("accepted connection already errored: {}", e);
continue;
}
if self.sleep_on_errors {
error!("accept error: {}", e);
let mut timeout = Delay::new(Duration::from_secs(1));
match Pin::new(&mut timeout).poll(cx) {
Poll::Ready(()) => {
continue;
}
Poll::Pending => {
self.timeout = Some(timeout);
return Poll::Pending;
}
}
} else {
return Poll::Ready(Err(e));
}
}
}
}
}
}
impl Accept for AddrIncoming {
type Conn = AddrStream;
type Error = io::Error;
fn poll_accept(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
let result = futures::ready!(self.poll_next_(cx));
Poll::Ready(Some(result))
}
}
#[cfg_attr(tarpaulin, skip)]
fn is_connection_error(e: &io::Error) -> bool {
match e.kind() {
io::ErrorKind::ConnectionRefused
| io::ErrorKind::ConnectionAborted
| io::ErrorKind::ConnectionReset => true,
_ => false,
}
}
#[cfg_attr(tarpaulin, skip)]
impl fmt::Debug for AddrIncoming {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AddrIncoming")
.field("addr", &self.addr)
.field("sleep_on_errors", &self.sleep_on_errors)
.field("tcp_nodelay", &self.tcp_nodelay)
.finish()
}
}
mod addr_stream {
use async_std::net::TcpStream;
use async_std::sync::Arc;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{self, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
#[derive(Debug, Clone)]
pub struct AddrStream {
inner: Arc<TcpStream>,
pub(super) remote_addr: SocketAddr,
}
impl AddrStream {
pub(super) fn new(tcp: TcpStream, addr: SocketAddr) -> AddrStream {
AddrStream {
inner: Arc::new(tcp),
remote_addr: addr,
}
}
#[inline]
pub fn remote_addr(&self) -> SocketAddr {
self.remote_addr
}
#[inline]
pub fn stream(&self) -> &TcpStream {
&*self.inner
}
}
impl AsyncRead for AddrStream {
#[inline]
fn poll_read(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
futures::AsyncRead::poll_read(Pin::new(&mut self.stream()), cx, buf)
}
}
impl AsyncWrite for AddrStream {
#[inline]
fn poll_write(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
futures::AsyncWrite::poll_write(Pin::new(&mut self.stream()), cx, buf)
}
#[inline]
fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
#[inline]
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
futures::AsyncWrite::poll_close(Pin::new(&mut self.stream()), cx)
}
}
}