1use futures::{Async, Future, Stream};
2
3pub struct Until<T, C> {
5 orig: T,
6 condition: C,
7}
8
9impl<T, C> Until<T, C> {
10 pub fn new(orig: T, condition: C) -> Until<T, C> {
11 Until {
12 orig,
13 condition,
14 }
15 }
16}
17
18impl<T, C> Future for Until<T, C>
19where
20 T: Future,
21 C: Future<Item=()>,
22 T::Error: From<C::Error>
23{
24 type Item = Option<T::Item>;
25 type Error = T::Error;
26
27 fn poll(&mut self) -> Result<Async<Option<T::Item>>, T::Error> {
28 if let Async::Ready(()) = self.condition.poll()? {
29 return Ok(Async::Ready(None));
30 }
31
32 let res = try_ready!(self.orig.poll());
33 Ok(Async::Ready(Some(res)))
34 }
35}
36
37impl<T, C> Stream for Until<T, C>
38where
39 T: Stream,
40 C: Future<Item=()>,
41 T::Error: From<C::Error>
42{
43 type Item = T::Item;
44 type Error = T::Error;
45
46 fn poll(&mut self) -> Result<Async<Option<T::Item>>, T::Error> {
47 if let Async::Ready(()) = self.condition.poll()? {
48 return Ok(Async::Ready(None));
49 }
50
51 self.orig.poll()
52 }
53}
54