1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
use std::time::{Instant, Duration};
use Timeout;
use tokio_core::reactor::Handle;
use futures::{Future, Stream, Async};
use void::ResultVoidExt;

pub struct WithReadinessTimeout<S> {
    stream: S,
    duration: Duration,
    timeout: Timeout,
}

impl<S> WithReadinessTimeout<S> {
    pub fn new(stream: S, duration: Duration, handle: &Handle) -> WithReadinessTimeout<S> {
        let timeout = Timeout::new(duration, handle);
        WithReadinessTimeout {
            stream, duration, timeout,
        }
    }
}

impl<S: Stream> Stream for WithReadinessTimeout<S> {
    type Item = S::Item;
    type Error = S::Error;

    fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> {
        match self.stream.poll() {
            Ok(Async::Ready(Some(item))) => {
                self.timeout.reset(Instant::now() + self.duration);
                Ok(Async::Ready(Some(item)))
            }
            Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
            Ok(Async::NotReady) => {
                match self.timeout.poll().void_unwrap() {
                    Async::Ready(()) => Ok(Async::Ready(None)),
                    Async::NotReady => Ok(Async::NotReady),
                }
            },
            Err(e) => Err(e),
        }
    }
}