1mod and_then;
2mod filter;
3mod map;
4mod pass;
5mod spawn;
6mod stub;
7mod switch;
8pub use and_then::and_then;
14use flowly_core::Either;
15pub use map::{filter_map, map, map_if_else, try_filter_map, try_map};
16use spawn::SpawnEach;
17pub use stub::stub;
18pub use switch::switch;
19use tokio::sync::watch;
20
21use std::{marker::PhantomData, pin::pin};
22
23use futures::{Stream, StreamExt};
24
25#[inline]
26pub fn flow<I>() -> pass::Pass<I> {
27 pass::Pass(PhantomData)
28}
29
30#[derive(Clone)]
31#[non_exhaustive]
32pub struct Context {
33 pub abort: watch::Sender<bool>,
34 pub abort_recv: watch::Receiver<bool>,
35}
36
37impl Default for Context {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl From<watch::Sender<bool>> for Context {
44 fn from(abort: watch::Sender<bool>) -> Self {
45 Self {
46 abort_recv: abort.subscribe(),
47 abort,
48 }
49 }
50}
51
52impl Context {
53 pub fn new() -> Self {
54 Self::from(watch::Sender::default())
55 }
56}
57
58pub trait Service<In> {
59 type Out;
60
61 fn handle(&mut self, input: In, cx: &Context) -> impl Stream<Item = Self::Out> + Send;
62
63 fn handle_stream(
64 &mut self,
65 input: impl Stream<Item = In> + Send,
66 cx: &Context,
67 ) -> impl Stream<Item = Self::Out> + Send
68 where
69 In: Send,
70 Self: Send,
71 Self::Out: Send,
72 {
73 async_stream::stream! {
74 let mut input = pin!(input);
75
76 while let Some(item) = input.next().await {
77 let mut s = pin!(self.handle(item, cx));
78
79 while let Some(out) = s.next().await {
80 yield out;
81 }
82 }
83 }
84 }
85
86 fn finalize(&mut self, _cx: &Context) -> impl Future<Output = ()>
87 where
88 Self: Sized,
89 {
90 async move {}
91 }
92}
93
94impl<I, O1, E1, O2, E2, S1, S2> Service<I> for (S1, S2)
95where
96 I: Send,
97 O1: Send,
98 O2: Send,
99 E1: Send,
100 E2: Send,
101 S1: Service<I, Out = Result<O1, E1>> + Send,
102 S2: Service<O1, Out = Result<O2, E2>> + Send,
103{
104 type Out = Result<O2, Either<E1, E2>>;
105
106 fn handle(&mut self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
107 async_stream::stream! {
108 let mut s1 = pin!(self.0.handle(msg, cx));
109
110 while let Some(res) = s1.next().await {
111 match res {
112 Ok(ok) => {
113 let mut s2 = pin!(self.1.handle(ok, cx));
114
115 while let Some(i2) = s2.next().await {
116 yield i2.map_err(Either::Right);
117 }
118 },
119 Err(err) => yield Err(Either::Left(err)),
120 }
121 }
122 }
123 }
124}
125
126pub trait ServiceExt<I: Send>: Service<I> {
127 #[inline]
128 fn flow<O1, O2, E1, E2, U>(self, service: U) -> (Self, U)
129 where
130 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
131 U: Send + Service<O1, Out = Result<O2, E2>>,
132 O1: Send,
133 O2: Send,
134 E1: Send,
135 E2: Send,
136 {
137 (self, service)
138 }
139
140 #[inline]
149 fn spawn_each(self) -> SpawnEach<I, Self>
150 where
151 Self: Sized + Send + Clone,
152 Self::Out: Send,
153 {
154 SpawnEach::new(self)
155 }
156
157 #[inline]
158 fn flow_map<O1, O2, E1, F, H>(self, f: F) -> (Self, map::Map<O2, F>)
159 where
160 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
161 F: FnMut(O1) -> H + Send,
162 H: Future<Output = O2> + Send,
163 O1: Send,
164 O2: Send,
165 E1: Send,
166 {
167 (self, map::map::<O2, _>(f))
168 }
169
170 #[inline]
171 fn flow_filter_map<O1, O2, E1, F, H>(self, f: F) -> (Self, map::FilterMap<O2, F>)
172 where
173 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
174 O1: Send,
175 O2: Send,
176 E1: Send,
177 F: FnMut(O1) -> H + Send,
178 H: Future<Output = Option<O2>> + Send,
179 {
180 (self, map::filter_map::<O2, _>(f))
181 }
182}
183
184impl<I: Send, T: Service<I>> ServiceExt<I> for T {}
185
186impl<I: Send, E, S: Service<I, Out = Result<I, E>>> Service<I> for Option<S> {
187 type Out = Result<I, E>;
188
189 fn handle(&mut self, input: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
190 if let Some(srv) = self {
191 srv.handle(input, cx).left_stream()
192 } else {
193 futures::stream::once(async move { Ok(input) }).right_stream()
194 }
195 }
196}