use std::time::{Duration, Instant};
use std::io;
use futures::prelude::*;
use Delay;
pub trait FutureExt: Future + Sized {
fn timeout(self, dur: Duration) -> Timeout<Self>
where Self::Error: From<io::Error>,
{
Timeout {
timeout: Delay::new(dur),
future: self,
}
}
fn timeout_at(self, at: Instant) -> Timeout<Self>
where Self::Error: From<io::Error>,
{
Timeout {
timeout: Delay::new_at(at),
future: self,
}
}
}
impl<F: Future> FutureExt for F {}
pub struct Timeout<F> {
timeout: Delay,
future: F,
}
impl<F> Future for Timeout<F>
where F: Future,
F::Error: From<io::Error>,
{
type Item = F::Item;
type Error = F::Error;
fn poll(&mut self) -> Poll<F::Item, F::Error> {
match self.future.poll()? {
Async::NotReady => {}
other => return Ok(other)
}
if self.timeout.poll()?.is_ready() {
Err(io::Error::new(io::ErrorKind::TimedOut, "future timed out").into())
} else {
Ok(Async::NotReady)
}
}
}
pub trait StreamExt: Stream + Sized {
fn timeout(self, dur: Duration) -> TimeoutStream<Self>
where Self::Error: From<io::Error>,
{
TimeoutStream {
timeout: Delay::new(dur),
dur,
stream: self,
}
}
}
impl<S: Stream> StreamExt for S {}
pub struct TimeoutStream<S> {
timeout: Delay,
dur: Duration,
stream: S,
}
impl<S> Stream for TimeoutStream<S>
where S: Stream,
S::Error: From<io::Error>,
{
type Item = S::Item;
type Error = S::Error;
fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
match self.stream.poll() {
Ok(Async::NotReady) => {}
other => {
self.timeout.reset(self.dur);
return other
}
}
if self.timeout.poll()?.is_ready() {
self.timeout.reset(self.dur);
Err(io::Error::new(io::ErrorKind::TimedOut, "stream item timed out").into())
} else {
Ok(Async::NotReady)
}
}
}