flowly_service/
map.rs

1use std::marker::PhantomData;
2
3use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
4
5use crate::{Context, Service};
6
7pub fn map<U, F>(map: F) -> Map<U, F> {
8    Map {
9        map,
10        m: PhantomData,
11    }
12}
13
14#[derive(Debug, Clone)]
15pub struct Map<U, F> {
16    pub(crate) map: F,
17    pub(crate) m: PhantomData<U>,
18}
19
20impl<I, U, H, F> Service<I> for Map<U, F>
21where
22    F: FnMut(I) -> H + Send,
23    H: Future<Output = U> + Send,
24{
25    type Out = U;
26
27    fn handle(&mut self, input: I, _cx: &Context) -> impl Stream<Item = Self::Out> + Send {
28        (self.map)(input).into_stream()
29    }
30}
31
32pub fn filter_map<U, F>(map: F) -> FilterMap<U, F> {
33    FilterMap {
34        map,
35        m: PhantomData,
36    }
37}
38
39#[derive(Debug, Clone)]
40pub struct FilterMap<U, F> {
41    pub(crate) map: F,
42    pub(crate) m: PhantomData<U>,
43}
44
45impl<I, U, H, F> Service<I> for FilterMap<U, F>
46where
47    F: FnMut(I) -> H + Send,
48    H: Future<Output = Option<U>> + Send,
49    U: Send,
50{
51    type Out = U;
52
53    fn handle(&mut self, input: I, _cx: &Context) -> impl Stream<Item = Self::Out> + Send {
54        (self.map)(input).into_stream().filter_map(async |x| x)
55    }
56}
57
58pub fn try_map<U, E, F>(map: F) -> TryMap<U, E, F> {
59    TryMap {
60        map,
61        m: PhantomData,
62    }
63}
64
65#[derive(Debug, Clone)]
66pub struct TryMap<U, E, F> {
67    pub(crate) map: F,
68    pub(crate) m: PhantomData<(U, E)>,
69}
70
71impl<I, U, E, H, F> Service<I> for TryMap<U, E, F>
72where
73    F: FnMut(I) -> H + Send,
74    H: Future<Output = Result<U, E>> + Send,
75{
76    type Out = Result<U, E>;
77
78    fn handle(&mut self, input: I, _cx: &Context) -> impl Stream<Item = Self::Out> {
79        (self.map)(input).into_stream()
80    }
81}
82
83pub fn try_filter_map<U, E, F>(map: F) -> TryFilterMap<U, E, F> {
84    TryFilterMap {
85        map,
86        m: PhantomData,
87    }
88}
89
90#[derive(Debug, Clone)]
91pub struct TryFilterMap<U, E, F> {
92    pub(crate) map: F,
93    pub(crate) m: PhantomData<(U, E)>,
94}
95
96impl<I, U, E, H, F> Service<I> for TryFilterMap<U, E, F>
97where
98    F: FnMut(I) -> H,
99    H: Future<Output = Result<Option<U>, E>> + Send,
100    U: Send,
101{
102    type Out = Result<U, E>;
103
104    fn handle(&mut self, input: I, _cx: &Context) -> impl Stream<Item = Self::Out> {
105        (self.map)(input)
106            .into_stream()
107            .try_filter_map(async |x| Ok(x))
108    }
109}
110
111#[derive(Clone)]
112pub struct MapIfElse<I, F, S1, S2> {
113    f: F,
114    on_true: S1,
115    on_false: S2,
116    _m: PhantomData<I>,
117}
118
119pub fn map_if_else<I, O, F, S1, S2>(f: F, on_true: S1, on_false: S2) -> impl Service<I, Out = O>
120where
121    F: Fn(&I) -> bool,
122    S1: Service<I, Out = O>,
123    S2: Service<I, Out = O>,
124{
125    MapIfElse {
126        f,
127        on_true,
128        on_false,
129        _m: PhantomData,
130    }
131}
132
133impl<I, O, F, S1, S2> Service<I> for MapIfElse<I, F, S1, S2>
134where
135    S1: Service<I, Out = O>,
136    S2: Service<I, Out = O>,
137    F: for<'a> Fn(&'a I) -> bool,
138{
139    type Out = O;
140
141    fn handle(&mut self, input: I, cx: &Context) -> impl Stream<Item = Self::Out> {
142        if (self.f)(&input) {
143            self.on_true.handle(input, cx).left_stream()
144        } else {
145            self.on_false.handle(input, cx).right_stream()
146        }
147    }
148}