use pin_project::pin_project;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use futures::{AsyncRead, Stream};
use super::Delay;
#[pin_project]
#[derive(Debug)]
pub struct Timeout<F: Future> {
#[pin]
future: F,
#[pin]
delay: Delay,
}
impl<F: Future> Future for Timeout<F> {
type Output = Result<F::Output, io::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(t) = this.future.poll(cx) {
return Poll::Ready(Ok(t));
}
this.delay
.poll(cx)
.map(|_| Err(io::Error::new(io::ErrorKind::TimedOut, "future timed out")))
}
}
pub trait FutureExt: Future + Sized {
fn timeout(self, dur: Duration) -> Timeout<Self> {
Timeout {
delay: Delay::new(dur),
future: self,
}
}
fn timeout_at(self, at: Instant) -> Timeout<Self> {
Timeout {
delay: Delay::new_at(at),
future: self,
}
}
}
impl<T: Future> FutureExt for T {}
#[pin_project]
#[derive(Debug)]
pub struct TimeoutStream<S: Stream> {
#[pin]
timeout: Delay,
dur: Duration,
#[pin]
stream: S,
}
impl<S: Stream> Stream for TimeoutStream<S> {
type Item = Result<S::Item, io::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if let Poll::Ready(s) = this.stream.as_mut().poll_next(cx) {
this.timeout.set(Delay::new(*this.dur));
return Poll::Ready(Ok(s).transpose());
}
this.timeout.as_mut().poll(cx).map(|_| {
this.timeout.set(Delay::new(*this.dur));
Some(Err(io::Error::new(
io::ErrorKind::TimedOut,
"future timed out",
)))
})
}
}
pub trait StreamExt: Stream + Sized {
fn timeout(self, dur: Duration) -> TimeoutStream<Self> {
TimeoutStream {
timeout: Delay::new(dur),
dur,
stream: self,
}
}
}
impl<S: Stream> StreamExt for S {}
#[pin_project]
#[derive(Debug)]
pub struct TimeoutAsyncRead<S: AsyncRead> {
#[pin]
timeout: Delay,
dur: Duration,
#[pin]
stream: S,
}
impl<S: AsyncRead> AsyncRead for TimeoutAsyncRead<S> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize, io::Error>> {
let mut this = self.project();
if let Poll::Ready(s) = this.stream.as_mut().poll_read(cx, buf) {
this.timeout.set(Delay::new(*this.dur));
return Poll::Ready(s);
}
this.timeout.as_mut().poll(cx).map(|_| {
this.timeout.set(Delay::new(*this.dur));
Err(io::Error::new(io::ErrorKind::TimedOut, "future timed out"))
})
}
}
pub trait AsyncReadExt: AsyncRead + Sized {
fn timeout(self, dur: Duration) -> TimeoutAsyncRead<Self> {
TimeoutAsyncRead {
timeout: Delay::new(dur),
dur,
stream: self,
}
}
}
impl<S: AsyncRead> AsyncReadExt for S {}