Skip to main content

flowly_service/
scope.rs

1use std::marker::PhantomData;
2
3use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
4
5use crate::Service;
6
7pub fn scope<I, M, E, S, F>(f: F, service: S) -> Scope<I, M, E, S, F> {
8    Scope {
9        service,
10        f,
11        _m: PhantomData,
12    }
13}
14
15pub fn scope_each<I: Clone, M, S, F, E>(f: F, service: S) -> ScopeEach<I, M, S, F, E> {
16    ScopeEach {
17        service,
18        f,
19        _m: PhantomData,
20    }
21}
22
23#[derive(Clone)]
24pub struct Scope<I, M, E, S, F> {
25    service: S,
26    f: F,
27    _m: PhantomData<(I, M, E)>,
28}
29
30impl<I, M, E1, O, E, S, F> Service<I> for Scope<I, M, E1, S, F>
31where
32    S: Service<M, Out = Result<O, E>> + Send,
33    F: Send + Fn(&I) -> Result<M, E1> + Sync,
34    M: Send + Sync,
35    O: Send,
36    I: Send + Sync,
37    E: Send + Sync + From<E1>,
38    E1: Send + Sync,
39{
40    type Out = Result<(I, Vec<O>), E>;
41
42    fn handle(&self, msg: I, cx: &crate::Context) -> impl Stream<Item = Self::Out> + Send {
43        async move {
44            match (self.f)(&msg) {
45                Ok(m) => self
46                    .service
47                    .handle(m, cx)
48                    .try_collect()
49                    .await
50                    .map(move |dat| (msg, dat)),
51                Err(err) => Err(E::from(err)),
52            }
53        }
54        .into_stream()
55    }
56}
57
58#[derive(Clone)]
59pub struct ScopeEach<I, M, S, F, E> {
60    service: S,
61    f: F,
62    _m: PhantomData<(I, M, E)>,
63}
64
65impl<I, M, O, E, E1, S, F> Service<I> for ScopeEach<I, M, S, F, E1>
66where
67    S: Service<M, Out = Result<O, E>> + Send,
68    F: Send + Sync + Fn(&I) -> Result<M, E1>,
69    M: Send + Sync,
70    O: Send,
71    I: Send + Clone + Sync + 'static,
72    E: Send + Sync + From<E1>,
73    E1: Send + Sync,
74{
75    type Out = Result<(I, O), E>;
76
77    fn handle(&self, msg: I, cx: &crate::Context) -> impl Stream<Item = Self::Out> + Send {
78        async move {
79            match (self.f)(&msg) {
80                Ok(m) => Ok(self
81                    .service
82                    .handle(m, cx)
83                    .map(move |x| x.map(|x| (msg.clone(), x)))),
84                Err(err) => Err(E::from(err)),
85            }
86        }
87        .into_stream()
88        .try_flatten()
89    }
90}