1use std::marker::PhantomData;
2
3use flowly_core::Void;
4use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
5
6use crate::{Context, Service};
7
8pub fn map<U, F>(map: F) -> Map<U, F> {
9 Map {
10 map,
11 m: PhantomData,
12 }
13}
14
15#[derive(Debug, Clone)]
16pub struct Map<U, F> {
17 pub(crate) map: F,
18 pub(crate) m: PhantomData<U>,
19}
20
21impl<I, U, H, F> Service<I> for Map<U, F>
22where
23 F: FnMut(I) -> H + Send,
24 H: Future<Output = U> + Send,
25{
26 type Out = Result<U, Void>;
27
28 fn handle(&mut self, input: I, _cx: &Context) -> impl Stream<Item = Self::Out> + Send {
29 (self.map)(input).map(Ok).into_stream()
30 }
31}
32
33pub fn filter_map<U, F>(map: F) -> FilterMap<U, F> {
34 FilterMap {
35 map,
36 m: PhantomData,
37 }
38}
39
40#[derive(Debug, Clone)]
41pub struct FilterMap<U, F> {
42 pub(crate) map: F,
43 pub(crate) m: PhantomData<U>,
44}
45
46impl<I, U, H, F> Service<I> for FilterMap<U, F>
47where
48 F: FnMut(I) -> H + Send,
49 H: Future<Output = Option<U>> + Send,
50 U: Send,
51{
52 type Out = Result<U, Void>;
53
54 fn handle(&mut self, input: I, _cx: &Context) -> impl Stream<Item = Self::Out> + Send {
55 (self.map)(input)
56 .map(Ok)
57 .into_stream()
58 .try_filter_map(async |x| Ok(x))
59 }
60}
61
62pub fn try_map<U, E, F>(map: F) -> TryMap<U, E, F> {
63 TryMap {
64 map,
65 m: PhantomData,
66 }
67}
68
69#[derive(Debug, Clone)]
70pub struct TryMap<U, E, F> {
71 pub(crate) map: F,
72 pub(crate) m: PhantomData<(U, E)>,
73}
74
75impl<I, U, E, H, F> Service<I> for TryMap<U, E, F>
76where
77 F: FnMut(I) -> H + Send,
78 H: Future<Output = Result<U, E>> + Send,
79{
80 type Out = Result<U, E>;
81
82 fn handle(&mut self, input: I, _cx: &Context) -> impl Stream<Item = Self::Out> {
83 (self.map)(input).into_stream()
84 }
85}
86
87pub fn try_filter_map<U, E, F>(map: F) -> TryFilterMap<U, E, F> {
88 TryFilterMap {
89 map,
90 m: PhantomData,
91 }
92}
93
94#[derive(Debug, Clone)]
95pub struct TryFilterMap<U, E, F> {
96 pub(crate) map: F,
97 pub(crate) m: PhantomData<(U, E)>,
98}
99
100impl<I, U, E, H, F> Service<I> for TryFilterMap<U, E, F>
101where
102 F: FnMut(I) -> H,
103 H: Future<Output = Result<Option<U>, E>> + Send,
104 U: Send,
105{
106 type Out = Result<U, E>;
107
108 fn handle(&mut self, input: I, _cx: &Context) -> impl Stream<Item = Self::Out> {
109 (self.map)(input)
110 .into_stream()
111 .try_filter_map(async |x| Ok(x))
112 }
113}
114
115pub struct MapIfElse<I, F, S1, S2> {
116 f: F,
117 on_true: S1,
118 on_false: S2,
119 _m: PhantomData<I>,
120}
121
122pub fn map_if_else<I, O, F, S1, S2>(f: F, on_true: S1, on_false: S2) -> impl Service<I, Out = O>
123where
124 F: Fn(&I) -> bool,
125 S1: Service<I, Out = O>,
126 S2: Service<I, Out = O>,
127{
128 MapIfElse {
129 f,
130 on_true,
131 on_false,
132 _m: PhantomData,
133 }
134}
135
136impl<I, O, F, S1, S2> Service<I> for MapIfElse<I, F, S1, S2>
137where
138 S1: Service<I, Out = O>,
139 S2: Service<I, Out = O>,
140 F: for<'a> Fn(&'a I) -> bool,
141{
142 type Out = O;
143
144 fn handle(&mut self, input: I, cx: &Context) -> impl Stream<Item = Self::Out> {
145 if (self.f)(&input) {
146 self.on_true.handle(input, cx).left_stream()
147 } else {
148 self.on_false.handle(input, cx).right_stream()
149 }
150 }
151}