use std::{
error, fmt, io,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use futures::prelude::*;
use futures_timer::Delay;
use crate::{
transport::{DialOpts, ListenerId, TransportError, TransportEvent},
Multiaddr, Transport,
};
#[derive(Debug, Copy, Clone)]
#[pin_project::pin_project]
pub struct TransportTimeout<InnerTrans> {
#[pin]
inner: InnerTrans,
outgoing_timeout: Duration,
incoming_timeout: Duration,
}
impl<InnerTrans> TransportTimeout<InnerTrans> {
pub fn new(trans: InnerTrans, timeout: Duration) -> Self {
TransportTimeout {
inner: trans,
outgoing_timeout: timeout,
incoming_timeout: timeout,
}
}
pub fn with_outgoing_timeout(trans: InnerTrans, timeout: Duration) -> Self {
TransportTimeout {
inner: trans,
outgoing_timeout: timeout,
incoming_timeout: Duration::from_secs(100 * 365 * 24 * 3600), }
}
pub fn with_ingoing_timeout(trans: InnerTrans, timeout: Duration) -> Self {
TransportTimeout {
inner: trans,
outgoing_timeout: Duration::from_secs(100 * 365 * 24 * 3600), incoming_timeout: timeout,
}
}
}
impl<InnerTrans> Transport for TransportTimeout<InnerTrans>
where
InnerTrans: Transport,
InnerTrans::Error: 'static,
{
type Output = InnerTrans::Output;
type Error = TransportTimeoutError<InnerTrans::Error>;
type ListenerUpgrade = Timeout<InnerTrans::ListenerUpgrade>;
type Dial = Timeout<InnerTrans::Dial>;
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>> {
self.inner
.listen_on(id, addr)
.map_err(|err| err.map(TransportTimeoutError::Other))
}
fn remove_listener(&mut self, id: ListenerId) -> bool {
self.inner.remove_listener(id)
}
fn dial(
&mut self,
addr: Multiaddr,
opts: DialOpts,
) -> Result<Self::Dial, TransportError<Self::Error>> {
let dial = self
.inner
.dial(addr, opts)
.map_err(|err| err.map(TransportTimeoutError::Other))?;
Ok(Timeout {
inner: dial,
timer: Delay::new(self.outgoing_timeout),
})
}
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
let this = self.project();
let timeout = *this.incoming_timeout;
this.inner.poll(cx).map(|event| {
event
.map_upgrade(move |inner_fut| Timeout {
inner: inner_fut,
timer: Delay::new(timeout),
})
.map_err(TransportTimeoutError::Other)
})
}
}
#[pin_project::pin_project]
#[must_use = "futures do nothing unless polled"]
pub struct Timeout<InnerFut> {
#[pin]
inner: InnerFut,
timer: Delay,
}
impl<InnerFut> Future for Timeout<InnerFut>
where
InnerFut: TryFuture,
{
type Output = Result<InnerFut::Ok, TransportTimeoutError<InnerFut::Error>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
match TryFuture::try_poll(this.inner, cx) {
Poll::Pending => {}
Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)),
Poll::Ready(Err(err)) => return Poll::Ready(Err(TransportTimeoutError::Other(err))),
}
match Pin::new(&mut this.timer).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(()) => Poll::Ready(Err(TransportTimeoutError::Timeout)),
}
}
}
#[derive(Debug)]
pub enum TransportTimeoutError<TErr> {
Timeout,
TimerError(io::Error),
Other(TErr),
}
impl<TErr> fmt::Display for TransportTimeoutError<TErr>
where
TErr: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TransportTimeoutError::Timeout => write!(f, "Timeout has been reached"),
TransportTimeoutError::TimerError(err) => write!(f, "Error in the timer: {err}"),
TransportTimeoutError::Other(err) => write!(f, "{err}"),
}
}
}
impl<TErr> error::Error for TransportTimeoutError<TErr>
where
TErr: error::Error + 'static,
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
TransportTimeoutError::Timeout => None,
TransportTimeoutError::TimerError(err) => Some(err),
TransportTimeoutError::Other(err) => Some(err),
}
}
}