future_utils/
with_readiness_timeout.rs1use std::time::{Instant, Duration};
2use Delay;
3use futures::{Future, Stream, Async};
4use void::ResultVoidExt;
5
6pub struct WithReadinessTimeout<S> {
7 stream: S,
8 duration: Duration,
9 delay: Delay,
10}
11
12impl<S> WithReadinessTimeout<S> {
13 pub fn new(stream: S, duration: Duration) -> WithReadinessTimeout<S> {
14 let delay = Delay::new(Instant::now() + duration);
15 WithReadinessTimeout {
16 stream, duration, delay,
17 }
18 }
19}
20
21impl<S: Stream> Stream for WithReadinessTimeout<S> {
22 type Item = S::Item;
23 type Error = S::Error;
24
25 fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> {
26 match self.stream.poll() {
27 Ok(Async::Ready(Some(item))) => {
28 self.delay.reset(Instant::now() + self.duration);
29 Ok(Async::Ready(Some(item)))
30 }
31 Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
32 Ok(Async::NotReady) => {
33 match self.delay.poll().void_unwrap() {
34 Async::Ready(()) => Ok(Async::Ready(None)),
35 Async::NotReady => Ok(Async::NotReady),
36 }
37 },
38 Err(e) => Err(e),
39 }
40 }
41}
42