1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
use std::time::{Instant, Duration};
use Delay;
use futures::{Future, Stream, Async};
use void::ResultVoidExt;

pub struct WithReadinessTimeout<S> {
    stream: S,
    duration: Duration,
    delay: Delay,
}

impl<S> WithReadinessTimeout<S> {
    pub fn new(stream: S, duration: Duration) -> WithReadinessTimeout<S> {
        let delay = Delay::new(Instant::now() + duration);
        WithReadinessTimeout {
            stream, duration, delay,
        }
    }
}

impl<S: Stream> Stream for WithReadinessTimeout<S> {
    type Item = S::Item;
    type Error = S::Error;

    fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> {
        match self.stream.poll() {
            Ok(Async::Ready(Some(item))) => {
                self.delay.reset(Instant::now() + self.duration);
                Ok(Async::Ready(Some(item)))
            }
            Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
            Ok(Async::NotReady) => {
                match self.delay.poll().void_unwrap() {
                    Async::Ready(()) => Ok(Async::Ready(None)),
                    Async::NotReady => Ok(Async::NotReady),
                }
            },
            Err(e) => Err(e),
        }
    }
}