future_utils/
next_or_else.rs1use futures::{Async, Future, Stream};
2
3pub struct NextOrElse<S, F> {
6 inner: Option<Inner<S, F>>,
7}
8
9struct Inner<S, F> {
10 stream: S,
11 f: F,
12}
13
14impl<S, F> NextOrElse<S, F> {
15 pub fn new(stream: S, f: F) -> NextOrElse<S, F> {
16 let inner = Inner {
17 stream, f,
18 };
19 NextOrElse {
20 inner: Some(inner),
21 }
22 }
23}
24
25impl<S, F> Future for NextOrElse<S, F>
26where
27 S: Stream,
28 F: FnOnce() -> S::Error,
29{
30 type Item = (S::Item, S);
31 type Error = S::Error;
32
33 fn poll(&mut self) -> Result<Async<(S::Item, S)>, S::Error> {
34 let mut inner = self.inner.take().unwrap();
35 match inner.stream.poll() {
36 Err(e) => Err(e),
37 Ok(Async::NotReady) => {
38 self.inner = Some(inner);
39 Ok(Async::NotReady)
40 },
41 Ok(Async::Ready(None)) => Err((inner.f)()),
42 Ok(Async::Ready(Some(x))) => Ok(Async::Ready((x, inner.stream))),
43 }
44 }
45}
46