1mod and_then;
2mod map;
3mod pass;
4mod spawn;
5mod stub;
6pub use and_then::and_then;
13use flowly_core::Either;
14pub use map::{filter_map, map, try_filter_map, try_map};
15use spawn::SpawnEach;
16pub use stub::stub;
17use tokio::sync::watch;
18use std::{marker::PhantomData, pin::pin};
21
22use futures::{Stream, StreamExt};
23
24#[inline]
25pub fn flow<I>() -> pass::Pass<I> {
26 pass::Pass(PhantomData)
27}
28
29#[derive(Clone)]
30#[non_exhaustive]
31pub struct Context {
32 pub abort: watch::Sender<bool>,
33 pub abort_recv: watch::Receiver<bool>,
34}
35
36impl Default for Context {
37 fn default() -> Self {
38 Self::new()
39 }
40}
41
42impl From<watch::Sender<bool>> for Context {
43 fn from(abort: watch::Sender<bool>) -> Self {
44 Self {
45 abort_recv: abort.subscribe(),
46 abort,
47 }
48 }
49}
50
51impl Context {
52 pub fn new() -> Self {
53 Self::from(watch::Sender::default())
54 }
55}
56
57pub trait Service<In> {
58 type Out;
59
60 fn handle(&mut self, input: In, cx: &Context) -> impl Stream<Item = Self::Out> + Send;
61
62 fn handle_stream(
63 &mut self,
64 input: impl Stream<Item = In> + Send,
65 cx: &Context,
66 ) -> impl Stream<Item = Self::Out> + Send
67 where
68 In: Send,
69 Self: Send,
70 Self::Out: Send,
71 {
72 async_stream::stream! {
73 let mut input = pin!(input);
74
75 while let Some(item) = input.next().await {
76 let mut s = pin!(self.handle(item, cx));
77
78 while let Some(out) = s.next().await {
79 yield out;
80 }
81 }
82 }
83 }
84
85 fn finalize(&mut self, _cx: &Context) -> impl Future<Output = ()>
86 where
87 Self: Sized,
88 {
89 async move {}
90 }
91}
92
93impl<I, O1, E1, O2, E2, S1, S2> Service<I> for (S1, S2)
94where
95 I: Send,
96 O1: Send,
97 O2: Send,
98 E1: Send,
99 E2: Send,
100 S1: Service<I, Out = Result<O1, E1>> + Send,
101 S2: Service<O1, Out = Result<O2, E2>> + Send,
102{
103 type Out = Result<O2, Either<E1, E2>>;
104
105 fn handle(&mut self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
106 async_stream::stream! {
107 let mut s1 = pin!(self.0.handle(msg, cx));
108
109 while let Some(res) = s1.next().await {
110 match res {
111 Ok(ok) => {
112 let mut s2 = pin!(self.1.handle(ok, cx));
113
114 while let Some(i2) = s2.next().await {
115 yield i2.map_err(Either::Right);
116 }
117 },
118 Err(err) => yield Err(Either::Left(err)),
119 }
120 }
121 }
122 }
123}
124
125pub trait ServiceExt<I: Send>: Service<I> {
126 #[inline]
127 fn flow<O1: Send, O2: Send, E1: Send, E2: Send, U: Send>(self, service: U) -> (Self, U)
128 where
129 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
130 U: Service<O1, Out = Result<O2, E2>>,
131 {
132 (self, service)
133 }
134
135 #[inline]
144 fn spawn_each(self) -> SpawnEach<I, Self>
145 where
146 Self: Sized + Send + Clone,
147 Self::Out: Send,
148 {
149 SpawnEach::new(self)
150 }
151
152 #[inline]
153 fn flow_map<O1, O2, E1, F, H>(self, f: F) -> (Self, map::Map<O2, F>)
154 where
155 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
156 F: FnMut(O1) -> H + Send,
157 H: Future<Output = O2> + Send,
158 O1: Send,
159 O2: Send,
160 E1: Send,
161 {
162 (self, map::map::<O2, _>(f))
163 }
164
165 #[inline]
166 fn flow_filter_map<O1, O2, E1, F, H>(self, f: F) -> (Self, map::FilterMap<O2, F>)
167 where
168 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
169 O1: Send,
170 O2: Send,
171 E1: Send,
172 F: FnMut(O1) -> H + Send,
173 H: Future<Output = Option<O2>> + Send,
174 {
175 (self, map::filter_map::<O2, _>(f))
176 }
177}
178
179impl<I: Send, T: Service<I>> ServiceExt<I> for T {}
180
181impl<I: Send, E, S: Service<I, Out = Result<I, E>>> Service<I> for Option<S> {
182 type Out = Result<I, E>;
183
184 fn handle(&mut self, input: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
185 if let Some(srv) = self {
186 srv.handle(input, cx).left_stream()
187 } else {
188 futures::stream::once(async move { Ok(input) }).right_stream()
189 }
190 }
191}