flowly_service/
scope.rs

1use std::marker::PhantomData;
2
3use flowly_core::Either;
4use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
5
6use crate::Service;
7
8pub fn scope<I, M, E, S, F>(f: F, service: S) -> Scope<I, M, E, S, F> {
9    Scope {
10        service,
11        f,
12        _m: PhantomData,
13    }
14}
15
16pub fn scope_each<I: Clone, M, E, S, F>(f: F, service: S) -> ScopeEach<I, M, E, S, F> {
17    ScopeEach {
18        service,
19        f,
20        _m: PhantomData,
21    }
22}
23
24#[derive(Clone)]
25pub struct Scope<I, M, E, S, F> {
26    service: S,
27    f: F,
28    _m: PhantomData<(I, M, E)>,
29}
30
31impl<I, M, E1, O, E, S, F> Service<I> for Scope<I, M, E1, S, F>
32where
33    S: Service<M, Out = Result<O, E>> + Send,
34    F: Send + Fn(&I) -> Result<M, E1>,
35    M: Send,
36    O: Send,
37    I: Send,
38    E: Send,
39    E1: Send,
40{
41    type Out = Result<(I, Result<Vec<O>, Either<E, E1>>), E1>;
42
43    fn handle(&mut self, msg: I, cx: &crate::Context) -> impl Stream<Item = Self::Out> + Send {
44        async move {
45            match (self.f)(&msg) {
46                Ok(m) => Ok((
47                    msg,
48                    self.service
49                        .handle(m, cx)
50                        .map_err(Either::Left)
51                        .try_collect()
52                        .await,
53                )),
54                Err(err) => Ok((msg, Err(Either::Right(err)))),
55            }
56        }
57        .into_stream()
58    }
59}
60
61#[derive(Clone)]
62pub struct ScopeEach<I, M, E, S, F> {
63    service: S,
64    f: F,
65    _m: PhantomData<(I, M, E)>,
66}
67
68impl<I, M, E1, O, E, S, F> Service<I> for ScopeEach<I, M, E1, S, F>
69where
70    S: Service<M, Out = Result<O, E>> + Send,
71    F: Send + Fn(&I) -> Result<M, E1>,
72    M: Send,
73    O: Send,
74    I: Send + Clone + Sync + 'static,
75    E: Send,
76    E1: Send,
77{
78    type Out = Result<(I, Result<O, E>), E1>;
79
80    fn handle(&mut self, msg: I, cx: &crate::Context) -> impl Stream<Item = Self::Out> + Send {
81        async move {
82            match (self.f)(&msg) {
83                Ok(m) => Ok(self
84                    .service
85                    .handle(m, cx)
86                    .map(move |x| Ok((msg.clone(), x)))),
87                Err(err) => Err(err),
88            }
89        }
90        .into_stream()
91        .try_flatten()
92    }
93}