future_utils/
until.rs

1use futures::{Async, Future, Stream};
2
3/// Runs a stream or future until some condition is met.
4pub struct Until<T, C> {
5    orig: T,
6    condition: C,
7}
8
9impl<T, C> Until<T, C> {
10    pub fn new(orig: T, condition: C) -> Until<T, C> {
11        Until {
12            orig,
13            condition,
14        }
15    }
16}
17
18impl<T, C> Future for Until<T, C>
19where
20    T: Future,
21    C: Future<Item=()>,
22    T::Error: From<C::Error>
23{
24    type Item = Option<T::Item>;
25    type Error = T::Error;
26
27    fn poll(&mut self) -> Result<Async<Option<T::Item>>, T::Error> {
28        if let Async::Ready(()) = self.condition.poll()? {
29            return Ok(Async::Ready(None));
30        }
31
32        let res = try_ready!(self.orig.poll());
33        Ok(Async::Ready(Some(res)))
34    }
35}
36
37impl<T, C> Stream for Until<T, C>
38where
39    T: Stream,
40    C: Future<Item=()>,
41    T::Error: From<C::Error>
42{
43    type Item = T::Item;
44    type Error = T::Error;
45
46    fn poll(&mut self) -> Result<Async<Option<T::Item>>, T::Error> {
47        if let Async::Ready(()) = self.condition.poll()? {
48            return Ok(Async::Ready(None));
49        }
50
51        self.orig.poll()
52    }
53}
54