1use std::marker::PhantomData;
2
3use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
4
5use crate::Service;
6
7pub fn scope<I, M, E, S, F>(f: F, service: S) -> Scope<I, M, E, S, F> {
8 Scope {
9 service,
10 f,
11 _m: PhantomData,
12 }
13}
14
15pub fn scope_each<I: Clone, M, S, F, E>(f: F, service: S) -> ScopeEach<I, M, S, F, E> {
16 ScopeEach {
17 service,
18 f,
19 _m: PhantomData,
20 }
21}
22
23#[derive(Clone)]
24pub struct Scope<I, M, E, S, F> {
25 service: S,
26 f: F,
27 _m: PhantomData<(I, M, E)>,
28}
29
30impl<I, M, E1, O, E, S, F> Service<I> for Scope<I, M, E1, S, F>
31where
32 S: Service<M, Out = Result<O, E>> + Send,
33 F: Send + Fn(&I) -> Result<M, E1> + Sync,
34 M: Send + Sync,
35 O: Send,
36 I: Send + Sync,
37 E: Send + Sync + From<E1>,
38 E1: Send + Sync,
39{
40 type Out = Result<(I, Vec<O>), E>;
41
42 fn handle(&self, msg: I, cx: &crate::Context) -> impl Stream<Item = Self::Out> + Send {
43 async move {
44 match (self.f)(&msg) {
45 Ok(m) => self
46 .service
47 .handle(m, cx)
48 .try_collect()
49 .await
50 .map(move |dat| (msg, dat)),
51 Err(err) => Err(E::from(err)),
52 }
53 }
54 .into_stream()
55 }
56}
57
58#[derive(Clone)]
59pub struct ScopeEach<I, M, S, F, E> {
60 service: S,
61 f: F,
62 _m: PhantomData<(I, M, E)>,
63}
64
65impl<I, M, O, E, E1, S, F> Service<I> for ScopeEach<I, M, S, F, E1>
66where
67 S: Service<M, Out = Result<O, E>> + Send,
68 F: Send + Sync + Fn(&I) -> Result<M, E1>,
69 M: Send + Sync,
70 O: Send,
71 I: Send + Clone + Sync + 'static,
72 E: Send + Sync + From<E1>,
73 E1: Send + Sync,
74{
75 type Out = Result<(I, O), E>;
76
77 fn handle(&self, msg: I, cx: &crate::Context) -> impl Stream<Item = Self::Out> + Send {
78 async move {
79 match (self.f)(&msg) {
80 Ok(m) => Ok(self
81 .service
82 .handle(m, cx)
83 .map(move |x| x.map(|x| (msg.clone(), x)))),
84 Err(err) => Err(E::from(err)),
85 }
86 }
87 .into_stream()
88 .try_flatten()
89 }
90}