future_utils/
with_readiness_timeout.rs

1use std::time::{Instant, Duration};
2use Delay;
3use futures::{Future, Stream, Async};
4use void::ResultVoidExt;
5
6pub struct WithReadinessTimeout<S> {
7    stream: S,
8    duration: Duration,
9    delay: Delay,
10}
11
12impl<S> WithReadinessTimeout<S> {
13    pub fn new(stream: S, duration: Duration) -> WithReadinessTimeout<S> {
14        let delay = Delay::new(Instant::now() + duration);
15        WithReadinessTimeout {
16            stream, duration, delay,
17        }
18    }
19}
20
21impl<S: Stream> Stream for WithReadinessTimeout<S> {
22    type Item = S::Item;
23    type Error = S::Error;
24
25    fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> {
26        match self.stream.poll() {
27            Ok(Async::Ready(Some(item))) => {
28                self.delay.reset(Instant::now() + self.duration);
29                Ok(Async::Ready(Some(item)))
30            }
31            Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
32            Ok(Async::NotReady) => {
33                match self.delay.poll().void_unwrap() {
34                    Async::Ready(()) => Ok(Async::Ready(None)),
35                    Async::NotReady => Ok(Async::NotReady),
36                }
37            },
38            Err(e) => Err(e),
39        }
40    }
41}
42