future_utils/
with_timeout.rs1use std::time::{Duration, Instant};
2use futures::{Async, Future, Stream};
3use delay::Delay;
4use void::ResultVoidExt;
5
6pub struct WithTimeout<F> {
7 inner: F,
8 delay: Delay,
9}
10
11impl<F> WithTimeout<F> {
12 pub fn new(inner: F, duration: Duration) -> WithTimeout<F> {
14 let deadline = Instant::now() + duration;
15 WithTimeout {
16 inner: inner,
17 delay: Delay::new(deadline),
18 }
19 }
20
21 pub fn new_at(inner: F, instant: Instant) -> WithTimeout<F> {
23 WithTimeout {
24 inner: inner,
25 delay: Delay::new(instant),
26 }
27 }
28
29 pub fn into_inner(self) -> F {
31 self.inner
32 }
33}
34
35impl<F> Future for WithTimeout<F>
36where
37 F: Future
38{
39 type Item = Option<F::Item>;
40 type Error = F::Error;
41
42 fn poll(&mut self) -> Result<Async<Option<F::Item>>, F::Error> {
43 if let Async::Ready(()) = self.delay.poll().void_unwrap() {
44 return Ok(Async::Ready(None));
45 }
46
47 match self.inner.poll() {
48 Ok(Async::Ready(x)) => Ok(Async::Ready(Some(x))),
49 Ok(Async::NotReady) => Ok(Async::NotReady),
50 Err(e) => Err(e),
51 }
52 }
53}
54
55impl<F> Stream for WithTimeout<F>
56where
57 F: Stream
58{
59 type Item = F::Item;
60 type Error = F::Error;
61
62 fn poll(&mut self) -> Result<Async<Option<F::Item>>, F::Error> {
63 if let Async::Ready(()) = self.delay.poll().void_unwrap() {
64 return Ok(Async::Ready(None));
65 }
66
67 self.inner.poll()
68 }
69}
70