macro_rules! define_on_error_impl {
($($bounds:tt)*) => {
use fluxion_core::{FluxionError, StreamItem};
use futures::future::ready;
use futures::{Stream, StreamExt};
pub trait OnErrorExt<T>: Stream<Item = StreamItem<T>> + Sized {
fn on_error<F>(self, handler: F) -> impl Stream<Item = StreamItem<T>> + $($bounds)*
where
F: FnMut(&FluxionError) -> bool + $($bounds)* 'static,
Self: $($bounds)* 'static; }
impl<S, T> OnErrorExt<T> for S
where
S: Stream<Item = StreamItem<T>> + $($bounds)* 'static,
T: $($bounds)* 'static,
{
fn on_error<F>(self, mut handler: F) -> impl Stream<Item = StreamItem<T>> + $($bounds)*
where
F: FnMut(&FluxionError) -> bool + $($bounds)* 'static,
{
self.filter_map(move |item| {
ready(match item {
StreamItem::Error(err) => {
if handler(&err) {
None
} else {
Some(StreamItem::Error(err))
}
}
other => Some(other),
})
})
}
}
};
}