flowly_service/
lib.rs

1mod and_then;
2mod map;
3mod pass;
4mod spawn;
5mod stub;
6// mod switch;
7// mod maybe;
8// mod optional;
9// mod finally;
10// mod flatten;
11
12pub use and_then::and_then;
13use flowly_core::Either;
14pub use map::{filter_map, map, try_filter_map, try_map};
15use spawn::SpawnEach;
16pub use stub::stub;
17use tokio::sync::watch;
18// pub use switch::{map_if_else, switch};
19
20use std::{marker::PhantomData, pin::pin};
21
22use futures::{Stream, StreamExt};
23
24#[inline]
25pub fn flow<I>() -> pass::Pass<I> {
26    pass::Pass(PhantomData)
27}
28
29#[derive(Clone)]
30#[non_exhaustive]
31pub struct Context {
32    pub abort: watch::Sender<bool>,
33    pub abort_recv: watch::Receiver<bool>,
34}
35
36impl Default for Context {
37    fn default() -> Self {
38        Self::new()
39    }
40}
41
42impl From<watch::Sender<bool>> for Context {
43    fn from(abort: watch::Sender<bool>) -> Self {
44        Self {
45            abort_recv: abort.subscribe(),
46            abort,
47        }
48    }
49}
50
51impl Context {
52    pub fn new() -> Self {
53        Self::from(watch::Sender::default())
54    }
55}
56
57pub trait Service<In> {
58    type Out;
59
60    fn handle(&mut self, input: In, cx: &Context) -> impl Stream<Item = Self::Out> + Send;
61
62    fn handle_stream(
63        &mut self,
64        input: impl Stream<Item = In> + Send,
65        cx: &Context,
66    ) -> impl Stream<Item = Self::Out> + Send
67    where
68        In: Send,
69        Self: Send,
70        Self::Out: Send,
71    {
72        async_stream::stream! {
73            let mut input = pin!(input);
74
75            while let Some(item) = input.next().await {
76                let mut s = pin!(self.handle(item, cx));
77
78                while let Some(out) = s.next().await {
79                    yield out;
80                }
81            }
82        }
83    }
84
85    fn finalize(&mut self, _cx: &Context) -> impl Future<Output = ()>
86    where
87        Self: Sized,
88    {
89        async move {}
90    }
91}
92
93impl<I, O1, E1, O2, E2, S1, S2> Service<I> for (S1, S2)
94where
95    I: Send,
96    O1: Send,
97    O2: Send,
98    E1: Send,
99    E2: Send,
100    S1: Service<I, Out = Result<O1, E1>> + Send,
101    S2: Service<O1, Out = Result<O2, E2>> + Send,
102{
103    type Out = Result<O2, Either<E1, E2>>;
104
105    fn handle(&mut self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
106        async_stream::stream! {
107            let mut s1 = pin!(self.0.handle(msg, cx));
108
109            while let Some(res) = s1.next().await {
110                match res {
111                    Ok(ok) => {
112                        let mut s2 = pin!(self.1.handle(ok, cx));
113
114                        while let Some(i2) = s2.next().await {
115                            yield i2.map_err(Either::Right);
116                        }
117                    },
118                    Err(err) => yield Err(Either::Left(err)),
119                }
120            }
121        }
122    }
123}
124
125pub trait ServiceExt<I: Send>: Service<I> {
126    #[inline]
127    fn flow<O1, O2, E1, E2, U>(self, service: U) -> (Self, U)
128    where
129        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
130        U: Service<O1, Out = Result<O2, E2>>,
131        O1: Send,
132        O2: Send,
133        E1: Send,
134        E2: Send,
135        U: Send,
136    {
137        (self, service)
138    }
139
140    // #[inline]
141    // fn spawn(self) -> spawn::Spawn<Self>
142    // where
143    //     Self: Sized,
144    // {
145    //     spawn::Spawn { service: self }
146    // }
147
148    #[inline]
149    fn spawn_each(self) -> SpawnEach<I, Self>
150    where
151        Self: Sized + Send + Clone,
152        Self::Out: Send,
153    {
154        SpawnEach::new(self)
155    }
156
157    #[inline]
158    fn flow_map<O1, O2, E1, F, H>(self, f: F) -> (Self, map::Map<O2, F>)
159    where
160        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
161        F: FnMut(O1) -> H + Send,
162        H: Future<Output = O2> + Send,
163        O1: Send,
164        O2: Send,
165        E1: Send,
166    {
167        (self, map::map::<O2, _>(f))
168    }
169
170    #[inline]
171    fn flow_filter_map<O1, O2, E1, F, H>(self, f: F) -> (Self, map::FilterMap<O2, F>)
172    where
173        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
174        O1: Send,
175        O2: Send,
176        E1: Send,
177        F: FnMut(O1) -> H + Send,
178        H: Future<Output = Option<O2>> + Send,
179    {
180        (self, map::filter_map::<O2, _>(f))
181    }
182}
183
184impl<I: Send, T: Service<I>> ServiceExt<I> for T {}
185
186impl<I: Send, E, S: Service<I, Out = Result<I, E>>> Service<I> for Option<S> {
187    type Out = Result<I, E>;
188
189    fn handle(&mut self, input: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
190        if let Some(srv) = self {
191            srv.handle(input, cx).left_stream()
192        } else {
193            futures::stream::once(async move { Ok(input) }).right_stream()
194        }
195    }
196}