flowly_service/
lib.rs

1mod and_then;
2mod filter;
3mod map;
4mod pass;
5mod spawn;
6mod stub;
7mod switch;
8// mod maybe;
9// mod optional;
10// mod finally;
11// mod flatten;
12
13pub use and_then::and_then;
14use flowly_core::Either;
15pub use map::{filter_map, map, map_if_else, try_filter_map, try_map};
16use spawn::SpawnEach;
17pub use stub::stub;
18pub use switch::switch;
19use tokio::sync::watch;
20
21use std::{marker::PhantomData, pin::pin};
22
23use futures::{Stream, StreamExt};
24
25#[inline]
26pub fn flow<I>() -> pass::Pass<I> {
27    pass::Pass(PhantomData)
28}
29
30#[derive(Clone)]
31#[non_exhaustive]
32pub struct Context {
33    pub abort: watch::Sender<bool>,
34    pub abort_recv: watch::Receiver<bool>,
35}
36
37impl Default for Context {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl From<watch::Sender<bool>> for Context {
44    fn from(abort: watch::Sender<bool>) -> Self {
45        Self {
46            abort_recv: abort.subscribe(),
47            abort,
48        }
49    }
50}
51
52impl Context {
53    pub fn new() -> Self {
54        Self::from(watch::Sender::default())
55    }
56}
57
58pub trait Service<In> {
59    type Out;
60
61    fn handle(&mut self, input: In, cx: &Context) -> impl Stream<Item = Self::Out> + Send;
62
63    fn handle_stream(
64        &mut self,
65        input: impl Stream<Item = In> + Send,
66        cx: &Context,
67    ) -> impl Stream<Item = Self::Out> + Send
68    where
69        In: Send,
70        Self: Send,
71        Self::Out: Send,
72    {
73        async_stream::stream! {
74            let mut input = pin!(input);
75
76            while let Some(item) = input.next().await {
77                let mut s = pin!(self.handle(item, cx));
78
79                while let Some(out) = s.next().await {
80                    yield out;
81                }
82            }
83        }
84    }
85
86    fn finalize(&mut self, _cx: &Context) -> impl Future<Output = ()>
87    where
88        Self: Sized,
89    {
90        async move {}
91    }
92}
93
94impl<I, O1, E1, O2, E2, S1, S2> Service<I> for (S1, S2)
95where
96    I: Send,
97    O1: Send,
98    O2: Send,
99    E1: Send,
100    E2: Send,
101    S1: Service<I, Out = Result<O1, E1>> + Send,
102    S2: Service<O1, Out = Result<O2, E2>> + Send,
103{
104    type Out = Result<O2, Either<E1, E2>>;
105
106    fn handle(&mut self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
107        async_stream::stream! {
108            let mut s1 = pin!(self.0.handle(msg, cx));
109
110            while let Some(res) = s1.next().await {
111                match res {
112                    Ok(ok) => {
113                        let mut s2 = pin!(self.1.handle(ok, cx));
114
115                        while let Some(i2) = s2.next().await {
116                            yield i2.map_err(Either::Right);
117                        }
118                    },
119                    Err(err) => yield Err(Either::Left(err)),
120                }
121            }
122        }
123    }
124}
125
126pub trait ServiceExt<I: Send>: Service<I> {
127    #[inline]
128    fn flow<O1, O2, E1, E2, U>(self, service: U) -> (Self, U)
129    where
130        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
131        U: Send + Service<O1, Out = Result<O2, E2>>,
132        O1: Send,
133        O2: Send,
134        E1: Send,
135        E2: Send,
136    {
137        (self, service)
138    }
139
140    // #[inline]
141    // fn spawn(self) -> spawn::Spawn<Self>
142    // where
143    //     Self: Sized,
144    // {
145    //     spawn::Spawn { service: self }
146    // }
147
148    #[inline]
149    fn spawn_each(self) -> SpawnEach<I, Self>
150    where
151        Self: Sized + Send + Clone,
152        Self::Out: Send,
153    {
154        SpawnEach::new(self)
155    }
156
157    #[inline]
158    fn flow_map<O1, O2, E1, F, H>(self, f: F) -> (Self, map::Map<O2, F>)
159    where
160        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
161        F: FnMut(O1) -> H + Send,
162        H: Future<Output = O2> + Send,
163        O1: Send,
164        O2: Send,
165        E1: Send,
166    {
167        (self, map::map::<O2, _>(f))
168    }
169
170    #[inline]
171    fn flow_filter_map<O1, O2, E1, F, H>(self, f: F) -> (Self, map::FilterMap<O2, F>)
172    where
173        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
174        O1: Send,
175        O2: Send,
176        E1: Send,
177        F: FnMut(O1) -> H + Send,
178        H: Future<Output = Option<O2>> + Send,
179    {
180        (self, map::filter_map::<O2, _>(f))
181    }
182}
183
184impl<I: Send, T: Service<I>> ServiceExt<I> for T {}
185
186impl<I: Send, E, S: Service<I, Out = Result<I, E>>> Service<I> for Option<S> {
187    type Out = Result<I, E>;
188
189    fn handle(&mut self, input: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
190        if let Some(srv) = self {
191            srv.handle(input, cx).left_stream()
192        } else {
193            futures::stream::once(async move { Ok(input) }).right_stream()
194        }
195    }
196}