use clock::now;
use Delay;
use futures::{Async, Future, Poll, Stream};
use std::error;
use std::fmt;
use std::time::{Duration, Instant};
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct Timeout<T> {
value: T,
delay: Delay,
}
#[derive(Debug)]
pub struct Error<T>(Kind<T>);
#[derive(Debug)]
enum Kind<T> {
Inner(T),
Elapsed,
Timer(::Error),
}
impl<T> Timeout<T> {
pub fn new(value: T, timeout: Duration) -> Timeout<T> {
let delay = Delay::new_timeout(now() + timeout, timeout);
Timeout::new_with_delay(value, delay)
}
pub(crate) fn new_with_delay(value: T, delay: Delay) -> Timeout<T> {
Timeout { value, delay }
}
pub fn get_ref(&self) -> &T {
&self.value
}
pub fn get_mut(&mut self) -> &mut T {
&mut self.value
}
pub fn into_inner(self) -> T {
self.value
}
}
impl<T: Future> Timeout<T> {
pub fn new_at(future: T, deadline: Instant) -> Timeout<T> {
let delay = Delay::new(deadline);
Timeout {
value: future,
delay,
}
}
}
impl<T> Future for Timeout<T>
where
T: Future,
{
type Item = T::Item;
type Error = Error<T::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.value.poll() {
Ok(Async::Ready(v)) => return Ok(Async::Ready(v)),
Ok(Async::NotReady) => {}
Err(e) => return Err(Error::inner(e)),
}
match self.delay.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(_)) => Err(Error::elapsed()),
Err(e) => Err(Error::timer(e)),
}
}
}
impl<T> Stream for Timeout<T>
where
T: Stream,
{
type Item = T::Item;
type Error = Error<T::Error>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.value.poll() {
Ok(Async::Ready(v)) => {
if v.is_some() {
self.delay.reset_timeout();
}
return Ok(Async::Ready(v));
}
Ok(Async::NotReady) => {}
Err(e) => return Err(Error::inner(e)),
}
match self.delay.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(_)) => {
self.delay.reset_timeout();
Err(Error::elapsed())
}
Err(e) => Err(Error::timer(e)),
}
}
}
impl<T> Error<T> {
pub fn inner(err: T) -> Error<T> {
Error(Kind::Inner(err))
}
pub fn is_inner(&self) -> bool {
match self.0 {
Kind::Inner(_) => true,
_ => false,
}
}
pub fn into_inner(self) -> Option<T> {
match self.0 {
Kind::Inner(err) => Some(err),
_ => None,
}
}
pub fn elapsed() -> Error<T> {
Error(Kind::Elapsed)
}
pub fn is_elapsed(&self) -> bool {
match self.0 {
Kind::Elapsed => true,
_ => false,
}
}
pub fn timer(err: ::Error) -> Error<T> {
Error(Kind::Timer(err))
}
pub fn is_timer(&self) -> bool {
match self.0 {
Kind::Timer(_) => true,
_ => false,
}
}
pub fn into_timer(self) -> Option<::Error> {
match self.0 {
Kind::Timer(err) => Some(err),
_ => None,
}
}
}
impl<T: error::Error> error::Error for Error<T> {
fn description(&self) -> &str {
use self::Kind::*;
match self.0 {
Inner(ref e) => e.description(),
Elapsed => "deadline has elapsed",
Timer(ref e) => e.description(),
}
}
}
impl<T: fmt::Display> fmt::Display for Error<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use self::Kind::*;
match self.0 {
Inner(ref e) => e.fmt(fmt),
Elapsed => "deadline has elapsed".fmt(fmt),
Timer(ref e) => e.fmt(fmt),
}
}
}