future_utils/
next_or_else.rs

1use futures::{Async, Future, Stream};
2
3/// Takes the next item from the stream. If the stream ends then the provided callback is used to
4/// generate an error.
5pub struct NextOrElse<S, F> {
6    inner: Option<Inner<S, F>>,
7}
8
9struct Inner<S, F> {
10    stream: S,
11    f: F,
12}
13
14impl<S, F> NextOrElse<S, F> {
15    pub fn new(stream: S, f: F) -> NextOrElse<S, F> {
16        let inner = Inner {
17            stream, f,
18        };
19        NextOrElse {
20            inner: Some(inner),
21        }
22    }
23}
24
25impl<S, F> Future for NextOrElse<S, F>
26where
27    S: Stream,
28    F: FnOnce() -> S::Error,
29{
30    type Item = (S::Item, S);
31    type Error = S::Error;
32
33    fn poll(&mut self) -> Result<Async<(S::Item, S)>, S::Error> {
34        let mut inner = self.inner.take().unwrap();
35        match inner.stream.poll() {
36            Err(e) => Err(e),
37            Ok(Async::NotReady) => {
38                self.inner = Some(inner);
39                Ok(Async::NotReady)
40            },
41            Ok(Async::Ready(None)) => Err((inner.f)()),
42            Ok(Async::Ready(Some(x))) => Ok(Async::Ready((x, inner.stream))),
43        }
44    }
45}
46