1mod and_then;
2mod filter;
3mod inspect;
4mod map;
5mod pass;
6mod scope;
7mod spawn;
8mod stub;
9mod switch;
10
11pub use and_then::and_then;
12use flowly_core::Either;
13pub use map::{filter_map, map, map_if_else, try_filter_map, try_map};
14pub use pass::flow;
15pub use spawn::{SpawnEach, spawn_each};
16pub use stub::stub;
17pub use switch::switch;
18use tokio::sync::watch;
19
20use std::{marker::PhantomData, pin::pin};
21
22use futures::{Stream, StreamExt};
23
24use crate::scope::Scope;
25
26#[derive(Clone)]
27#[non_exhaustive]
28pub struct Context {
29 pub abort: watch::Sender<bool>,
30 pub abort_recv: watch::Receiver<bool>,
31}
32
33impl Default for Context {
34 fn default() -> Self {
35 Self::new()
36 }
37}
38
39impl From<watch::Sender<bool>> for Context {
40 fn from(abort: watch::Sender<bool>) -> Self {
41 Self {
42 abort_recv: abort.subscribe(),
43 abort,
44 }
45 }
46}
47
48impl Context {
49 pub fn new() -> Self {
50 Self::from(watch::Sender::default())
51 }
52}
53
54pub trait Service<In> {
55 type Out;
56
57 fn handle(&mut self, input: In, cx: &Context) -> impl Stream<Item = Self::Out> + Send;
58
59 fn handle_stream(
60 &mut self,
61 input: impl Stream<Item = In> + Send,
62 cx: &Context,
63 ) -> impl Stream<Item = Self::Out> + Send
64 where
65 In: Send,
66 Self: Send,
67 Self::Out: Send,
68 {
69 async_stream::stream! {
70 let mut input = pin!(input);
71
72 while let Some(item) = input.next().await {
73 let mut s = pin!(self.handle(item, cx));
74
75 while let Some(out) = s.next().await {
76 yield out;
77 }
78 }
79 }
80 }
81
82 fn finalize(&mut self, _cx: &Context) -> impl Future<Output = ()>
83 where
84 Self: Sized,
85 {
86 async move {}
87 }
88}
89
90impl<I, O1, E1, O2, E2, S1, S2> Service<I> for (S1, S2)
91where
92 I: Send,
93 O1: Send,
94 O2: Send,
95 E1: Send,
96 E2: Send,
97 S1: Service<I, Out = Result<O1, E1>> + Send,
98 S2: Service<O1, Out = Result<O2, E2>> + Send,
99{
100 type Out = Result<O2, Either<E1, E2>>;
101
102 fn handle(&mut self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
103 async_stream::stream! {
104 let mut s1 = pin!(self.0.handle(msg, cx));
105
106 while let Some(res) = s1.next().await {
107 match res {
108 Ok(ok) => {
109 let mut s2 = pin!(self.1.handle(ok, cx));
110
111 while let Some(i2) = s2.next().await {
112 yield i2.map_err(Either::Right);
113 }
114 },
115 Err(err) => yield Err(Either::Left(err)),
116 }
117 }
118 }
119 }
120}
121
122pub trait ServiceExt<I: Send>: Service<I> {
123 #[inline]
124 fn flow<O1, O2, E1, E2, U>(self, service: U) -> (Self, U)
125 where
126 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
127 U: Send + Service<O1, Out = Result<O2, E2>>,
128 O1: Send,
129 O2: Send,
130 E1: Send,
131 E2: Send,
132 {
133 (self, service)
134 }
135
136 #[inline]
137 fn flow_inspect<O, E, F>(self, f: F) -> (Self, inspect::Inspect<O, E, F>)
138 where
139 Self: Sized + Service<I, Out = Result<O, E>> + Send,
140 F: Fn(&O) + Send,
141 O: Send,
142 E: Send,
143 {
144 (
145 self,
146 inspect::Inspect::<O, E, F> {
147 cb: f,
148 _m: PhantomData,
149 },
150 )
151 }
152
153 #[inline]
154 fn spawn_each(self) -> SpawnEach<I, Self>
155 where
156 Self: Sized + Send + Clone,
157 Self::Out: Send,
158 {
159 SpawnEach::new(self)
160 }
161
162 #[inline]
163 fn flow_scope<O, M, E1, S, F>(self, f: F, s: S) -> (Self, Scope<O, M, E1, S, F>)
164 where
165 F: Fn(&O) -> Result<M, E1>,
166 Self: Sized + Send,
167 O: Send,
168 E1: Send,
169 {
170 (self, scope::scope(f, s))
171 }
172
173 #[inline]
174 fn flow_map<O1, O2, E1, F, H>(self, f: F) -> (Self, map::Map<O2, F>)
175 where
176 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
177 F: FnMut(O1) -> H + Send,
178 H: Future<Output = O2> + Send,
179 O1: Send,
180 O2: Send,
181 E1: Send,
182 {
183 (self, map::map::<O2, _>(f))
184 }
185
186 #[inline]
187 fn flow_filter_map<O1, O2, E1, F, H>(self, f: F) -> (Self, map::FilterMap<O2, F>)
188 where
189 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
190 O1: Send,
191 O2: Send,
192 E1: Send,
193 F: FnMut(O1) -> H + Send,
194 H: Future<Output = Option<O2>> + Send,
195 {
196 (self, map::filter_map::<O2, _>(f))
197 }
198}
199
200impl<I: Send, T: Service<I>> ServiceExt<I> for T {}
201
202impl<I: Send, E, S: Service<I, Out = Result<I, E>>> Service<I> for Option<S> {
203 type Out = Result<I, E>;
204
205 fn handle(&mut self, input: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
206 if let Some(srv) = self {
207 srv.handle(input, cx).left_stream()
208 } else {
209 futures::stream::once(async move { Ok(input) }).right_stream()
210 }
211 }
212}