futures_util/stream/
recover.rs1use core::marker::PhantomData;
2
3use futures_core::{Stream, Poll, Async};
4use futures_core::task;
5
6#[must_use = "streams do nothing unless polled"]
10#[derive(Debug)]
11pub struct Recover<A, E, F> {
12 inner: A,
13 f: F,
14 err: PhantomData<E>,
15}
16
17pub fn new<A, E, F>(stream: A, f: F) -> Recover<A, E, F>
18 where A: Stream
19{
20 Recover { inner: stream, f: f, err: PhantomData }
21}
22
23impl<A, E, F> Stream for Recover<A, E, F>
24 where A: Stream,
25 F: FnMut(A::Error) -> Option<A::Item>,
26{
27 type Item = A::Item;
28 type Error = E;
29
30 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<A::Item>, E> {
31 match self.inner.poll_next(cx) {
32 Err(e) => Ok(Async::Ready((self.f)(e))),
33 Ok(x) => Ok(x),
34 }
35 }
36}