flowly_service/
lib.rs

1mod and_then;
2mod map;
3mod pass;
4mod spawn;
5mod stub;
6// mod switch;
7// mod maybe;
8// mod optional;
9// mod finally;
10// mod flatten;
11
12pub use and_then::and_then;
13use flowly_core::Either;
14pub use map::{filter_map, map, try_filter_map, try_map};
15use spawn::SpawnEach;
16pub use stub::stub;
17use tokio::sync::watch;
18// pub use switch::{map_if_else, switch};
19
20use std::{marker::PhantomData, pin::pin};
21
22use futures::{Stream, StreamExt};
23
24#[inline]
25pub fn flow<I>() -> pass::Pass<I> {
26    pass::Pass(PhantomData)
27}
28
29#[derive(Clone)]
30#[non_exhaustive]
31pub struct Context {
32    pub abort: watch::Sender<bool>,
33    pub abort_recv: watch::Receiver<bool>,
34}
35
36impl Default for Context {
37    fn default() -> Self {
38        Self::new()
39    }
40}
41
42impl From<watch::Sender<bool>> for Context {
43    fn from(abort: watch::Sender<bool>) -> Self {
44        Self {
45            abort_recv: abort.subscribe(),
46            abort,
47        }
48    }
49}
50
51impl Context {
52    pub fn new() -> Self {
53        Self::from(watch::Sender::default())
54    }
55}
56
57pub trait Service<In> {
58    type Out;
59
60    fn handle(&mut self, input: In, cx: &Context) -> impl Stream<Item = Self::Out> + Send;
61
62    fn handle_stream(
63        &mut self,
64        input: impl Stream<Item = In> + Send,
65        cx: &Context,
66    ) -> impl Stream<Item = Self::Out> + Send
67    where
68        In: Send,
69        Self: Send,
70        Self::Out: Send,
71    {
72        async_stream::stream! {
73            let mut input = pin!(input);
74
75            while let Some(item) = input.next().await {
76                let mut s = pin!(self.handle(item, cx));
77
78                while let Some(out) = s.next().await {
79                    yield out;
80                }
81            }
82        }
83    }
84
85    fn finalize(&mut self, _cx: &Context) -> impl Future<Output = ()>
86    where
87        Self: Sized,
88    {
89        async move {}
90    }
91}
92
93impl<I, O1, E1, O2, E2, S1, S2> Service<I> for (S1, S2)
94where
95    I: Send,
96    O1: Send,
97    O2: Send,
98    E1: Send,
99    E2: Send,
100    S1: Service<I, Out = Result<O1, E1>> + Send,
101    S2: Service<O1, Out = Result<O2, E2>> + Send,
102{
103    type Out = Result<O2, Either<E1, E2>>;
104
105    fn handle(&mut self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
106        async_stream::stream! {
107            let mut s1 = pin!(self.0.handle(msg, cx));
108
109            while let Some(res) = s1.next().await {
110                match res {
111                    Ok(ok) => {
112                        let mut s2 = pin!(self.1.handle(ok, cx));
113
114                        while let Some(i2) = s2.next().await {
115                            yield i2.map_err(Either::Right);
116                        }
117                    },
118                    Err(err) => yield Err(Either::Left(err)),
119                }
120            }
121        }
122    }
123}
124
125pub trait ServiceExt<I: Send>: Service<I> {
126    #[inline]
127    fn flow<O1: Send, O2: Send, E1: Send, E2: Send, U: Send>(self, service: U) -> (Self, U)
128    where
129        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
130        U: Service<O1, Out = Result<O2, E2>>,
131    {
132        (self, service)
133    }
134
135    // #[inline]
136    // fn spawn(self) -> spawn::Spawn<Self>
137    // where
138    //     Self: Sized,
139    // {
140    //     spawn::Spawn { service: self }
141    // }
142
143    #[inline]
144    fn spawn_each(self) -> SpawnEach<I, Self>
145    where
146        Self: Sized + Send + Clone,
147        Self::Out: Send,
148    {
149        SpawnEach::new(self)
150    }
151
152    #[inline]
153    fn flow_map<O1, O2, E1, F, H>(self, f: F) -> (Self, map::Map<O2, F>)
154    where
155        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
156        F: FnMut(O1) -> H + Send,
157        H: Future<Output = O2> + Send,
158        O1: Send,
159        O2: Send,
160        E1: Send,
161    {
162        (self, map::map::<O2, _>(f))
163    }
164
165    #[inline]
166    fn flow_filter_map<O1, O2, E1, F, H>(self, f: F) -> (Self, map::FilterMap<O2, F>)
167    where
168        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
169        O1: Send,
170        O2: Send,
171        E1: Send,
172        F: FnMut(O1) -> H + Send,
173        H: Future<Output = Option<O2>> + Send,
174    {
175        (self, map::filter_map::<O2, _>(f))
176    }
177}
178
179impl<I: Send, T: Service<I>> ServiceExt<I> for T {}
180
181impl<I: Send, E, S: Service<I, Out = Result<I, E>>> Service<I> for Option<S> {
182    type Out = Result<I, E>;
183
184    fn handle(&mut self, input: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
185        if let Some(srv) = self {
186            srv.handle(input, cx).left_stream()
187        } else {
188            futures::stream::once(async move { Ok(input) }).right_stream()
189        }
190    }
191}