flowly_service/
map.rs

1use std::marker::PhantomData;
2
3use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
4
5use crate::{Context, Service};
6
7pub fn map<U, F>(map: F) -> Map<U, F> {
8    Map {
9        map,
10        m: PhantomData,
11    }
12}
13
14#[derive(Debug, Clone)]
15pub struct Map<U, F> {
16    pub(crate) map: F,
17    pub(crate) m: PhantomData<U>,
18}
19
20impl<I, U, H, F> Service<I> for Map<U, F>
21where
22    F: FnMut(I) -> H + Send,
23    H: Future<Output = U> + Send,
24{
25    type Out = U;
26
27    fn handle(&mut self, input: I, _cx: &Context) -> impl Stream<Item = Self::Out> + Send {
28        (self.map)(input).into_stream()
29    }
30}
31
32pub fn filter_map<U, F>(map: F) -> FilterMap<U, F> {
33    FilterMap {
34        map,
35        m: PhantomData,
36    }
37}
38
39#[derive(Debug, Clone)]
40pub struct FilterMap<U, F> {
41    pub(crate) map: F,
42    pub(crate) m: PhantomData<U>,
43}
44
45impl<I, U, H, F> Service<I> for FilterMap<U, F>
46where
47    F: FnMut(I) -> H + Send,
48    H: Future<Output = Option<U>> + Send,
49    U: Send,
50{
51    type Out = U;
52
53    fn handle(&mut self, input: I, _cx: &Context) -> impl Stream<Item = Self::Out> + Send {
54        (self.map)(input).into_stream().filter_map(async |x| x)
55    }
56}
57
58pub fn try_map<U, E, F>(map: F) -> TryMap<U, E, F> {
59    TryMap {
60        map,
61        m: PhantomData,
62    }
63}
64
65#[derive(Debug, Clone)]
66pub struct TryMap<U, E, F> {
67    pub(crate) map: F,
68    pub(crate) m: PhantomData<(U, E)>,
69}
70
71impl<I, U, E, H, F> Service<I> for TryMap<U, E, F>
72where
73    F: FnMut(I) -> H + Send,
74    H: Future<Output = Result<U, E>> + Send,
75{
76    type Out = Result<U, E>;
77
78    fn handle(&mut self, input: I, _cx: &Context) -> impl Stream<Item = Self::Out> {
79        (self.map)(input).into_stream()
80    }
81}
82
83pub fn try_filter_map<U, E, F>(map: F) -> TryFilterMap<U, E, F> {
84    TryFilterMap {
85        map,
86        m: PhantomData,
87    }
88}
89
90#[derive(Debug, Clone)]
91pub struct TryFilterMap<U, E, F> {
92    pub(crate) map: F,
93    pub(crate) m: PhantomData<(U, E)>,
94}
95
96impl<I, U, E, H, F> Service<I> for TryFilterMap<U, E, F>
97where
98    F: FnMut(I) -> H,
99    H: Future<Output = Result<Option<U>, E>> + Send,
100    U: Send,
101{
102    type Out = Result<U, E>;
103
104    fn handle(&mut self, input: I, _cx: &Context) -> impl Stream<Item = Self::Out> {
105        (self.map)(input)
106            .into_stream()
107            .try_filter_map(async |x| Ok(x))
108    }
109}
110
111pub struct MapIfElse<I, F, S1, S2> {
112    f: F,
113    on_true: S1,
114    on_false: S2,
115    _m: PhantomData<I>,
116}
117
118pub fn map_if_else<I, O, F, S1, S2>(f: F, on_true: S1, on_false: S2) -> impl Service<I, Out = O>
119where
120    F: Fn(&I) -> bool,
121    S1: Service<I, Out = O>,
122    S2: Service<I, Out = O>,
123{
124    MapIfElse {
125        f,
126        on_true,
127        on_false,
128        _m: PhantomData,
129    }
130}
131
132impl<I, O, F, S1, S2> Service<I> for MapIfElse<I, F, S1, S2>
133where
134    S1: Service<I, Out = O>,
135    S2: Service<I, Out = O>,
136    F: for<'a> Fn(&'a I) -> bool,
137{
138    type Out = O;
139
140    fn handle(&mut self, input: I, cx: &Context) -> impl Stream<Item = Self::Out> {
141        if (self.f)(&input) {
142            self.on_true.handle(input, cx).left_stream()
143        } else {
144            self.on_false.handle(input, cx).right_stream()
145        }
146    }
147}