1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
use std::time::{Instant, Duration}; use Timeout; use tokio_core::reactor::Handle; use futures::{Future, Stream, Async}; use void::ResultVoidExt; pub struct WithReadinessTimeout<S> { stream: S, duration: Duration, timeout: Timeout, } impl<S> WithReadinessTimeout<S> { pub fn new(stream: S, duration: Duration, handle: &Handle) -> WithReadinessTimeout<S> { let timeout = Timeout::new(duration, handle); WithReadinessTimeout { stream, duration, timeout, } } } impl<S: Stream> Stream for WithReadinessTimeout<S> { type Item = S::Item; type Error = S::Error; fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> { match self.stream.poll() { Ok(Async::Ready(Some(item))) => { self.timeout.reset(Instant::now() + self.duration); Ok(Async::Ready(Some(item))) } Ok(Async::Ready(None)) => Ok(Async::Ready(None)), Ok(Async::NotReady) => { match self.timeout.poll().void_unwrap() { Async::Ready(()) => Ok(Async::Ready(None)), Async::NotReady => Ok(Async::NotReady), } }, Err(e) => Err(e), } } }