1use std::marker::PhantomData;
2
3use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
4
5use crate::{Context, Service};
6
7pub fn map<U, F>(map: F) -> Map<U, F> {
8 Map {
9 map,
10 m: PhantomData,
11 }
12}
13
14#[derive(Debug, Clone)]
15pub struct Map<U, F> {
16 pub(crate) map: F,
17 pub(crate) m: PhantomData<U>,
18}
19
20impl<I, U, H, F> Service<I> for Map<U, F>
21where
22 F: FnMut(I) -> H + Send,
23 H: Future<Output = U> + Send,
24{
25 type Out = U;
26
27 fn handle(&mut self, input: I, _cx: &Context) -> impl Stream<Item = Self::Out> + Send {
28 (self.map)(input).into_stream()
29 }
30}
31
32pub fn filter_map<U, F>(map: F) -> FilterMap<U, F> {
33 FilterMap {
34 map,
35 m: PhantomData,
36 }
37}
38
39#[derive(Debug, Clone)]
40pub struct FilterMap<U, F> {
41 pub(crate) map: F,
42 pub(crate) m: PhantomData<U>,
43}
44
45impl<I, U, H, F> Service<I> for FilterMap<U, F>
46where
47 F: FnMut(I) -> H + Send,
48 H: Future<Output = Option<U>> + Send,
49 U: Send,
50{
51 type Out = U;
52
53 fn handle(&mut self, input: I, _cx: &Context) -> impl Stream<Item = Self::Out> + Send {
54 (self.map)(input).into_stream().filter_map(async |x| x)
55 }
56}
57
58pub fn try_map<U, E, F>(map: F) -> TryMap<U, E, F> {
59 TryMap {
60 map,
61 m: PhantomData,
62 }
63}
64
65#[derive(Debug, Clone)]
66pub struct TryMap<U, E, F> {
67 pub(crate) map: F,
68 pub(crate) m: PhantomData<(U, E)>,
69}
70
71impl<I, U, E, H, F> Service<I> for TryMap<U, E, F>
72where
73 F: FnMut(I) -> H + Send,
74 H: Future<Output = Result<U, E>> + Send,
75{
76 type Out = Result<U, E>;
77
78 fn handle(&mut self, input: I, _cx: &Context) -> impl Stream<Item = Self::Out> {
79 (self.map)(input).into_stream()
80 }
81}
82
83pub fn try_filter_map<U, E, F>(map: F) -> TryFilterMap<U, E, F> {
84 TryFilterMap {
85 map,
86 m: PhantomData,
87 }
88}
89
90#[derive(Debug, Clone)]
91pub struct TryFilterMap<U, E, F> {
92 pub(crate) map: F,
93 pub(crate) m: PhantomData<(U, E)>,
94}
95
96impl<I, U, E, H, F> Service<I> for TryFilterMap<U, E, F>
97where
98 F: FnMut(I) -> H,
99 H: Future<Output = Result<Option<U>, E>> + Send,
100 U: Send,
101{
102 type Out = Result<U, E>;
103
104 fn handle(&mut self, input: I, _cx: &Context) -> impl Stream<Item = Self::Out> {
105 (self.map)(input)
106 .into_stream()
107 .try_filter_map(async |x| Ok(x))
108 }
109}
110
111#[derive(Clone)]
112pub struct MapIfElse<I, F, S1, S2> {
113 f: F,
114 on_true: S1,
115 on_false: S2,
116 _m: PhantomData<I>,
117}
118
119pub fn map_if_else<I, O, F, S1, S2>(f: F, on_true: S1, on_false: S2) -> impl Service<I, Out = O>
120where
121 F: Fn(&I) -> bool,
122 S1: Service<I, Out = O>,
123 S2: Service<I, Out = O>,
124{
125 MapIfElse {
126 f,
127 on_true,
128 on_false,
129 _m: PhantomData,
130 }
131}
132
133impl<I, O, F, S1, S2> Service<I> for MapIfElse<I, F, S1, S2>
134where
135 S1: Service<I, Out = O>,
136 S2: Service<I, Out = O>,
137 F: for<'a> Fn(&'a I) -> bool,
138{
139 type Out = O;
140
141 fn handle(&mut self, input: I, cx: &Context) -> impl Stream<Item = Self::Out> {
142 if (self.f)(&input) {
143 self.on_true.handle(input, cx).left_stream()
144 } else {
145 self.on_false.handle(input, cx).right_stream()
146 }
147 }
148}