future_utils/
with_timeout.rs

1use std::time::{Duration, Instant};
2use futures::{Async, Future, Stream};
3use delay::Delay;
4use void::ResultVoidExt;
5
6pub struct WithTimeout<F> {
7    inner: F,
8    delay: Delay,
9}
10
11impl<F> WithTimeout<F> {
12    /// Creates a new `WithTimeout` which runs `inner` for the given duration.
13    pub fn new(inner: F, duration: Duration) -> WithTimeout<F> {
14        let deadline = Instant::now() + duration;
15        WithTimeout {
16            inner: inner,
17            delay: Delay::new(deadline),
18        }
19    }
20
21    /// Creates a new `WithTimeout` which runs `inner` until the given instant.
22    pub fn new_at(inner: F, instant: Instant) -> WithTimeout<F> {
23        WithTimeout {
24            inner: inner,
25            delay: Delay::new(instant),
26        }
27    }
28
29    /// Unpack the `WithTimeout`, returning the inner future or stream.
30    pub fn into_inner(self) -> F {
31        self.inner
32    }
33}
34
35impl<F> Future for WithTimeout<F>
36where
37    F: Future
38{
39    type Item = Option<F::Item>;
40    type Error = F::Error;
41
42    fn poll(&mut self) -> Result<Async<Option<F::Item>>, F::Error> {
43        if let Async::Ready(()) = self.delay.poll().void_unwrap() {
44            return Ok(Async::Ready(None));
45        }
46
47        match self.inner.poll() {
48            Ok(Async::Ready(x)) => Ok(Async::Ready(Some(x))),
49            Ok(Async::NotReady) => Ok(Async::NotReady),
50            Err(e) => Err(e),
51        }
52    }
53}
54
55impl<F> Stream for WithTimeout<F>
56where
57    F: Stream
58{
59    type Item = F::Item;
60    type Error = F::Error;
61
62    fn poll(&mut self) -> Result<Async<Option<F::Item>>, F::Error> {
63        if let Async::Ready(()) = self.delay.poll().void_unwrap() {
64            return Ok(Async::Ready(None));
65        }
66
67        self.inner.poll()
68    }
69}
70