flowly_service/
map.rs

1use std::marker::PhantomData;
2
3use flowly_core::Void;
4use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
5
6use crate::{Context, Service};
7
8pub fn map<U, F>(map: F) -> Map<U, F> {
9    Map {
10        map,
11        m: PhantomData,
12    }
13}
14
15#[derive(Debug, Clone)]
16pub struct Map<U, F> {
17    pub(crate) map: F,
18    pub(crate) m: PhantomData<U>,
19}
20
21impl<I, U, H, F> Service<I> for Map<U, F>
22where
23    F: FnMut(I) -> H + Send,
24    H: Future<Output = U> + Send,
25{
26    type Out = Result<U, Void>;
27
28    fn handle(&mut self, input: I, _cx: &Context) -> impl Stream<Item = Self::Out> + Send {
29        (self.map)(input).map(Ok).into_stream()
30    }
31}
32
33pub fn filter_map<U, F>(map: F) -> FilterMap<U, F> {
34    FilterMap {
35        map,
36        m: PhantomData,
37    }
38}
39
40#[derive(Debug, Clone)]
41pub struct FilterMap<U, F> {
42    pub(crate) map: F,
43    pub(crate) m: PhantomData<U>,
44}
45
46impl<I, U, H, F> Service<I> for FilterMap<U, F>
47where
48    F: FnMut(I) -> H + Send,
49    H: Future<Output = Option<U>> + Send,
50    U: Send,
51{
52    type Out = Result<U, Void>;
53
54    fn handle(&mut self, input: I, _cx: &Context) -> impl Stream<Item = Self::Out> + Send {
55        (self.map)(input)
56            .map(Ok)
57            .into_stream()
58            .try_filter_map(async |x| Ok(x))
59    }
60}
61
62pub fn try_map<U, E, F>(map: F) -> TryMap<U, E, F> {
63    TryMap {
64        map,
65        m: PhantomData,
66    }
67}
68
69#[derive(Debug, Clone)]
70pub struct TryMap<U, E, F> {
71    pub(crate) map: F,
72    pub(crate) m: PhantomData<(U, E)>,
73}
74
75impl<I, U, E, H, F> Service<I> for TryMap<U, E, F>
76where
77    F: FnMut(I) -> H + Send,
78    H: Future<Output = Result<U, E>> + Send,
79{
80    type Out = Result<U, E>;
81
82    fn handle(&mut self, input: I, _cx: &Context) -> impl Stream<Item = Self::Out> {
83        (self.map)(input).into_stream()
84    }
85}
86
87pub fn try_filter_map<U, E, F>(map: F) -> TryFilterMap<U, E, F> {
88    TryFilterMap {
89        map,
90        m: PhantomData,
91    }
92}
93
94#[derive(Debug, Clone)]
95pub struct TryFilterMap<U, E, F> {
96    pub(crate) map: F,
97    pub(crate) m: PhantomData<(U, E)>,
98}
99
100impl<I, U, E, H, F> Service<I> for TryFilterMap<U, E, F>
101where
102    F: FnMut(I) -> H,
103    H: Future<Output = Result<Option<U>, E>> + Send,
104    U: Send,
105{
106    type Out = Result<U, E>;
107
108    fn handle(&mut self, input: I, _cx: &Context) -> impl Stream<Item = Self::Out> {
109        (self.map)(input)
110            .into_stream()
111            .try_filter_map(async |x| Ok(x))
112    }
113}
114
115pub struct MapIfElse<I, F, S1, S2> {
116    f: F,
117    on_true: S1,
118    on_false: S2,
119    _m: PhantomData<I>,
120}
121
122pub fn map_if_else<I, O, F, S1, S2>(f: F, on_true: S1, on_false: S2) -> impl Service<I, Out = O>
123where
124    F: Fn(&I) -> bool,
125    S1: Service<I, Out = O>,
126    S2: Service<I, Out = O>,
127{
128    MapIfElse {
129        f,
130        on_true,
131        on_false,
132        _m: PhantomData,
133    }
134}
135
136impl<I, O, F, S1, S2> Service<I> for MapIfElse<I, F, S1, S2>
137where
138    S1: Service<I, Out = O>,
139    S2: Service<I, Out = O>,
140    F: for<'a> Fn(&'a I) -> bool,
141{
142    type Out = O;
143
144    fn handle(&mut self, input: I, cx: &Context) -> impl Stream<Item = Self::Out> {
145        if (self.f)(&input) {
146            self.on_true.handle(input, cx).left_stream()
147        } else {
148            self.on_false.handle(input, cx).right_stream()
149        }
150    }
151}