1mod and_then;
2mod map;
3mod pass;
4mod spawn;
5mod stub;
6pub use and_then::and_then;
13use flowly_core::Either;
14pub use map::{filter_map, map, try_filter_map, try_map};
15use spawn::SpawnEach;
16pub use stub::stub;
17use tokio::sync::watch;
18use std::{marker::PhantomData, pin::pin};
21
22use futures::{Stream, StreamExt};
23
24#[inline]
25pub fn flow<I>() -> pass::Pass<I> {
26 pass::Pass(PhantomData)
27}
28
29#[derive(Clone)]
30#[non_exhaustive]
31pub struct Context {
32 pub abort: watch::Sender<bool>,
33 pub abort_recv: watch::Receiver<bool>,
34}
35
36impl Default for Context {
37 fn default() -> Self {
38 Self::new()
39 }
40}
41
42impl From<watch::Sender<bool>> for Context {
43 fn from(abort: watch::Sender<bool>) -> Self {
44 Self {
45 abort_recv: abort.subscribe(),
46 abort,
47 }
48 }
49}
50
51impl Context {
52 pub fn new() -> Self {
53 Self::from(watch::Sender::default())
54 }
55}
56
57pub trait Service<In> {
58 type Out;
59
60 fn handle(&mut self, input: In, cx: &Context) -> impl Stream<Item = Self::Out> + Send;
61
62 fn handle_stream(
63 &mut self,
64 input: impl Stream<Item = In> + Send,
65 cx: &Context,
66 ) -> impl Stream<Item = Self::Out> + Send
67 where
68 In: Send,
69 Self: Send,
70 Self::Out: Send,
71 {
72 async_stream::stream! {
73 let mut input = pin!(input);
74
75 while let Some(item) = input.next().await {
76 let mut s = pin!(self.handle(item, cx));
77
78 while let Some(out) = s.next().await {
79 yield out;
80 }
81 }
82 }
83 }
84
85 fn finalize(&mut self, _cx: &Context) -> impl Future<Output = ()>
86 where
87 Self: Sized,
88 {
89 async move {}
90 }
91}
92
93impl<I, O1, E1, O2, E2, S1, S2> Service<I> for (S1, S2)
94where
95 I: Send,
96 O1: Send,
97 O2: Send,
98 E1: Send,
99 E2: Send,
100 S1: Service<I, Out = Result<O1, E1>> + Send,
101 S2: Service<O1, Out = Result<O2, E2>> + Send,
102{
103 type Out = Result<O2, Either<E1, E2>>;
104
105 fn handle(&mut self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
106 async_stream::stream! {
107 let mut s1 = pin!(self.0.handle(msg, cx));
108
109 while let Some(res) = s1.next().await {
110 match res {
111 Ok(ok) => {
112 let mut s2 = pin!(self.1.handle(ok, cx));
113
114 while let Some(i2) = s2.next().await {
115 yield i2.map_err(Either::Right);
116 }
117 },
118 Err(err) => yield Err(Either::Left(err)),
119 }
120 }
121 }
122 }
123}
124
125pub trait ServiceExt<I: Send>: Service<I> {
126 #[inline]
127 fn flow<O1, O2, E1, E2, U>(self, service: U) -> (Self, U)
128 where
129 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
130 U: Service<O1, Out = Result<O2, E2>>,
131 O1: Send,
132 O2: Send,
133 E1: Send,
134 E2: Send,
135 U: Send,
136 {
137 (self, service)
138 }
139
140 #[inline]
149 fn spawn_each(self) -> SpawnEach<I, Self>
150 where
151 Self: Sized + Send + Clone,
152 Self::Out: Send,
153 {
154 SpawnEach::new(self)
155 }
156
157 #[inline]
158 fn flow_map<O1, O2, E1, F, H>(self, f: F) -> (Self, map::Map<O2, F>)
159 where
160 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
161 F: FnMut(O1) -> H + Send,
162 H: Future<Output = O2> + Send,
163 O1: Send,
164 O2: Send,
165 E1: Send,
166 {
167 (self, map::map::<O2, _>(f))
168 }
169
170 #[inline]
171 fn flow_filter_map<O1, O2, E1, F, H>(self, f: F) -> (Self, map::FilterMap<O2, F>)
172 where
173 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
174 O1: Send,
175 O2: Send,
176 E1: Send,
177 F: FnMut(O1) -> H + Send,
178 H: Future<Output = Option<O2>> + Send,
179 {
180 (self, map::filter_map::<O2, _>(f))
181 }
182}
183
184impl<I: Send, T: Service<I>> ServiceExt<I> for T {}
185
186impl<I: Send, E, S: Service<I, Out = Result<I, E>>> Service<I> for Option<S> {
187 type Out = Result<I, E>;
188
189 fn handle(&mut self, input: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
190 if let Some(srv) = self {
191 srv.handle(input, cx).left_stream()
192 } else {
193 futures::stream::once(async move { Ok(input) }).right_stream()
194 }
195 }
196}