pub mod timer {
use futures::{Async, Future, Poll};
use std::sync::mpsc::RecvError;
use std::time;
use fiber::{self, Context};
use io::poll;
pub trait TimerExt: Sized + Future {
fn timeout_after(self, duration: time::Duration) -> TimeoutAfter<Self> {
TimeoutAfter {
future: self,
timeout: timeout(duration),
}
}
}
impl<T: Future> TimerExt for T {}
pub struct TimeoutAfter<T> {
future: T,
timeout: Timeout,
}
impl<T: Future> Future for TimeoutAfter<T> {
type Item = T::Item;
type Error = Option<T::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Async::Ready(value) = self.future.poll().map_err(Some)? {
Ok(Async::Ready(value))
} else if let Ok(Async::NotReady) = self.timeout.poll() {
Ok(Async::NotReady)
} else {
Err(None)
}
}
}
#[derive(Debug)]
pub struct Timeout {
start: time::Instant,
duration: time::Duration,
inner: Option<poll::poller::Timeout>,
}
pub fn timeout(delay_from_now: time::Duration) -> Timeout {
Timeout {
start: time::Instant::now(),
duration: delay_from_now,
inner: None,
}
}
impl Future for Timeout {
type Item = ();
type Error = RecvError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut inner) = self.inner {
inner.poll()
} else {
let duration = self.duration;
let elapsed = self.start.elapsed();
if elapsed >= duration {
return Ok(Async::Ready(()));
}
let set_timeout = |mut c: Context| {
let rest = duration - elapsed;
poll::poller::set_timeout(c.poller(), rest)
};
if let Some(inner) = fiber::with_current_context(set_timeout) {
self.inner = Some(inner);
self.poll()
} else {
Ok(Async::NotReady)
}
}
}
}
#[cfg(test)]
mod test {
use super::*;
use futures::{self, Async, Future};
use std::time::Duration;
#[test]
fn it_works() {
let mut timeout = timeout(Duration::from_secs(0));
assert_eq!(timeout.poll(), Ok(Async::Ready(())));
}
#[test]
fn timeout_after_works() {
let mut future = futures::empty::<(), ()>().timeout_after(Duration::from_secs(0));
assert_eq!(future.poll(), Err(None));
let mut future = futures::finished::<(), ()>(()).timeout_after(Duration::from_secs(1));
assert_eq!(future.poll(), Ok(Async::Ready(())));
let mut future = futures::failed::<(), ()>(()).timeout_after(Duration::from_secs(1));
assert_eq!(future.poll(), Err(Some(())));
}
}
}