1use tokio_stream::StreamExt;
2
3use crate::Service;
4
5#[derive(Clone)]
6pub struct Except<S, F> {
7 pub(crate) service: S,
8 pub(crate) on_err: F,
9}
10
11impl<I, R, E, E2, F, S> Service<I> for Except<S, F>
12where
13 S: Service<I, Out = Result<R, E>>,
14 F: Send + Sync + Fn(E) -> Option<E2>,
15{
16 type Out = Result<R, E2>;
17
18 fn handle(
19 &mut self,
20 input: I,
21 cx: &crate::Context,
22 ) -> impl futures::Stream<Item = Self::Out> + Send {
23 self.service.handle(input, cx).filter_map(|x| match x {
24 Ok(ok) => Some(Ok(ok)),
25 Err(err) => (self.on_err)(err).map(Err),
26 })
27 }
28}