1use std::marker::PhantomData;
2
3use flowly_core::Either;
4use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
5
6use crate::Service;
7
8pub fn scope<I, M, E, S, F>(f: F, service: S) -> Scope<I, M, E, S, F> {
9 Scope {
10 service,
11 f,
12 _m: PhantomData,
13 }
14}
15
16pub fn scope_each<I: Clone, M, E, S, F>(f: F, service: S) -> ScopeEach<I, M, E, S, F> {
17 ScopeEach {
18 service,
19 f,
20 _m: PhantomData,
21 }
22}
23
24#[derive(Clone)]
25pub struct Scope<I, M, E, S, F> {
26 service: S,
27 f: F,
28 _m: PhantomData<(I, M, E)>,
29}
30
31impl<I, M, E1, O, E, S, F> Service<I> for Scope<I, M, E1, S, F>
32where
33 S: Service<M, Out = Result<O, E>> + Send,
34 F: Send + Fn(&I) -> Result<M, E1>,
35 M: Send,
36 O: Send,
37 I: Send,
38 E: Send,
39 E1: Send,
40{
41 type Out = Result<(I, Result<Vec<O>, Either<E, E1>>), E1>;
42
43 fn handle(&mut self, msg: I, cx: &crate::Context) -> impl Stream<Item = Self::Out> + Send {
44 async move {
45 match (self.f)(&msg) {
46 Ok(m) => Ok((
47 msg,
48 self.service
49 .handle(m, cx)
50 .map_err(Either::Left)
51 .try_collect()
52 .await,
53 )),
54 Err(err) => Ok((msg, Err(Either::Right(err)))),
55 }
56 }
57 .into_stream()
58 }
59}
60
61#[derive(Clone)]
62pub struct ScopeEach<I, M, E, S, F> {
63 service: S,
64 f: F,
65 _m: PhantomData<(I, M, E)>,
66}
67
68impl<I, M, E1, O, E, S, F> Service<I> for ScopeEach<I, M, E1, S, F>
69where
70 S: Service<M, Out = Result<O, E>> + Send,
71 F: Send + Fn(&I) -> Result<M, E1>,
72 M: Send,
73 O: Send,
74 I: Send + Clone + Sync + 'static,
75 E: Send,
76 E1: Send,
77{
78 type Out = Result<(I, Result<O, E>), E1>;
79
80 fn handle(&mut self, msg: I, cx: &crate::Context) -> impl Stream<Item = Self::Out> + Send {
81 async move {
82 match (self.f)(&msg) {
83 Ok(m) => Ok(self
84 .service
85 .handle(m, cx)
86 .map(move |x| Ok((msg.clone(), x)))),
87 Err(err) => Err(err),
88 }
89 }
90 .into_stream()
91 .try_flatten()
92 }
93}