futures_util/stream/
recover.rs

1use core::marker::PhantomData;
2
3use futures_core::{Stream, Poll, Async};
4use futures_core::task;
5
6/// Future for the `recover` combinator, handling errors by converting them into
7/// an `Option<Item>`, such that a `None` value terminates the stream. `Recover`
8/// is compatible with any error type of the caller's choosing.
9#[must_use = "streams do nothing unless polled"]
10#[derive(Debug)]
11pub struct Recover<A, E, F> {
12    inner: A,
13    f: F,
14    err: PhantomData<E>,
15}
16
17pub fn new<A, E, F>(stream: A, f: F) -> Recover<A, E, F>
18    where A: Stream
19{
20    Recover { inner: stream, f: f, err: PhantomData }
21}
22
23impl<A, E, F> Stream for Recover<A, E, F>
24    where A: Stream,
25          F: FnMut(A::Error) -> Option<A::Item>,
26{
27    type Item = A::Item;
28    type Error = E;
29
30    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<A::Item>, E> {
31        match self.inner.poll_next(cx) {
32            Err(e) => Ok(Async::Ready((self.f)(e))),
33            Ok(x) => Ok(x),
34        }
35    }
36}