node_flow/flows/fn_flow.rs
1use std::fmt::Debug;
2
3use crate::{
4 describe::{Description, DescriptionBase, Edge, Type, remove_generics_from_name},
5 flows::NodeResult,
6 node::{Node, NodeOutput},
7};
8
9/// The `Runner` is used in [`FnFlow`] and is basically a replacement for [`Node`].
10///
11/// It defines how `Input` and inner data is processed into an `Output` (and `Error`) with a given `Context`.
12///
13/// See also [`FnFlow`], [`Node`].
14pub trait Runner<'a, Input, Output, Error, Context, InnerData>: Send + Sync {
15 /// Executes the runner using the provided inner data, input, and context.
16 ///
17 /// # Parameters
18 /// - `data`: Inner data used in this runner.
19 /// - `input`: The input data to process.
20 /// - `context`: Mutable reference to the a context.
21 fn run(
22 &self,
23 data: InnerData,
24 input: Input,
25 context: &'a mut Context,
26 ) -> impl Future<Output = NodeResult<Output, Error>> + Send;
27}
28
29impl<'a, Input, Output, Error, Context, InnerData, T, F>
30 Runner<'a, Input, Output, Error, Context, InnerData> for T
31where
32 Input: Send,
33 F: Future<Output = NodeResult<Output, Error>> + Send + 'a,
34 T: Fn(InnerData, Input, &'a mut Context) -> F + Send + Sync,
35 Context: 'a,
36{
37 fn run(
38 &self,
39 data: InnerData,
40 input: Input,
41 context: &'a mut Context,
42 ) -> impl Future<Output = NodeResult<Output, Error>> {
43 (self)(data, input, context)
44 }
45}
46
47/// `FnFlow` takes some async function and wraps around it to crate a node.
48///
49/// This flow allows for setting custom [`Description`]
50/// through the [`FnFlow::with_description`] function,
51/// since there is no other way to get it (like with [`Node::describe`]).
52///
53/// # Type Parameters
54/// - `Input`: The type of data accepted by this flow.
55/// - `Output`: The type of data produced by this flow.
56/// - `Error`: The type of error emitted by this flow.
57/// - `Context`: The type of context used during execution.
58///
59/// # Examples
60/// ```
61/// use node_flow::flows::FnFlow;
62/// use node_flow::node::{Node, NodeOutput};
63///
64/// #[derive(Clone)]
65/// struct SomeExpensiveData(Vec<u32>);
66///
67/// # tokio::runtime::Builder::new_current_thread()
68/// # .enable_all()
69/// # .build()
70/// # .unwrap()
71/// # .block_on(async {
72/// async fn main() {
73/// let mut flow = FnFlow::<u32, u32, (), _>::new(
74/// SomeExpensiveData((0..1<<15).collect()),
75/// async |SomeExpensiveData(data), input, _: &mut _| {
76/// let res = data.iter().sum::<u32>() / data.len() as u32 + input;
77/// Ok(NodeOutput::Ok(res))
78/// },
79/// );
80///
81/// let result = flow.run(1, &mut ()).await;
82/// assert_eq!(result, Ok(NodeOutput::Ok(1<<14)));
83/// }
84/// # main().await;
85/// # });
86/// ```
87pub struct FnFlow<Input, Output, Error, Context, InnerData = (), R = ()> {
88 #[expect(clippy::type_complexity)]
89 _ioec: std::marker::PhantomData<fn() -> (Input, Output, Error, Context)>,
90 inner_data: std::sync::Arc<InnerData>,
91 runner_description: Option<std::sync::Arc<Description>>,
92 runner: R,
93}
94impl<Input, Output, Error, Context> FnFlow<Input, Output, Error, Context>
95where
96 // Trait bounds for better and nicer errors
97 Input: Send,
98 Context: Send,
99{
100 /// Creates a new [`FnFlow`] from a given runner.
101 ///
102 /// The runner must satisfy:
103 /// - `Self`: `Runner<'_, Input, Output, Error, _, InnerData>`
104 ///
105 /// When using closure as a runner it always needs:
106 /// - to be an **async closure** - because of lifetimes
107 /// - *for context to*:
108 /// - have the **type of a context** specified when **using** context - because it cannot infer the type\
109 /// *or*
110 /// - have the context specified as `_: &mut _` when **not using** context - because it cannot satisfy that `Runner` is implemented
111 /// - to have the **type of inner data** specified when **using** inner data - because it cannot infer the type
112 ///
113 /// # Examples
114 /// ```
115 /// # use node_flow::flows::FnFlow;
116 /// # use node_flow::node::{Node, NodeOutput};
117 /// # use node_flow::context::Fork;
118 /// # #[derive(Clone)]
119 /// struct SomeData(u16);
120 /// # struct Context;
121 /// # impl Fork for Context { fn fork(&self) -> Self { Self } }
122 ///
123 /// FnFlow::<u8, u16, (), Context>::new(
124 /// SomeData(15),
125 /// async |_, _, _: &mut _| {
126 /// Ok(NodeOutput::Ok(30))
127 /// },
128 /// );
129 /// FnFlow::<u8, u16, (), Context>::new(
130 /// SomeData(15),
131 /// async |data: SomeData, _, _: &mut _| {
132 /// Ok(NodeOutput::Ok(data.0 + 30))
133 /// },
134 /// );
135 /// FnFlow::<u8, u16, (), Context>::new(
136 /// SomeData(15),
137 /// async |_, _, ctx: &mut Context| {
138 /// let _forked_ctx = ctx.fork();
139 /// Ok(NodeOutput::Ok(30))
140 /// },
141 /// );
142 /// ```
143 pub fn new<InnerData, R>(
144 inner_data: InnerData,
145 runner: R,
146 ) -> FnFlow<Input, Output, Error, Context, InnerData, R>
147 where
148 InnerData: Clone + Send + Sync,
149 for<'a> R: Runner<'a, Input, Output, Error, Context, InnerData>,
150 {
151 FnFlow {
152 _ioec: std::marker::PhantomData,
153 inner_data: std::sync::Arc::new(inner_data),
154 runner_description: None,
155 runner,
156 }
157 }
158}
159
160impl<Input, Output, Error, Context, InnerData, R>
161 FnFlow<Input, Output, Error, Context, InnerData, R>
162{
163 /// Attaches a custom [`Description`] to this flow.
164 ///
165 /// This is be useful when something complex happens in the flow.
166 ///
167 /// # Examples
168 /// ```
169 /// use node_flow::flows::FnFlow;
170 /// use node_flow::describe::{Description, DescriptionBase};
171 /// use node_flow::node::NodeOutput;
172 ///
173 /// let desc = Description::Node {
174 /// base: DescriptionBase::from::<(), String, usize, u8, u32>()
175 /// }.with_description("I am lying about my types! But shh...");
176 /// let flow = FnFlow::<u8, u16, (), ()>::new((), async|_, x, _: &mut _| {
177 /// Ok(NodeOutput::Ok(x as u16))
178 /// }).with_description(desc);
179 /// ```
180 #[must_use]
181 pub fn with_description(mut self, description: Description) -> Self {
182 self.runner_description = Some(std::sync::Arc::new(description));
183 self
184 }
185}
186
187impl<Input, Output, Error, Context, InnerData, R> Debug
188 for FnFlow<Input, Output, Error, Context, InnerData, R>
189where
190 InnerData: Debug,
191{
192 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193 f.debug_struct("FnFlow")
194 .field("inner_data", &self.inner_data)
195 .finish_non_exhaustive()
196 }
197}
198
199impl<Input, Output, Error, Context, InnerData, R> Clone
200 for FnFlow<Input, Output, Error, Context, InnerData, R>
201where
202 R: Clone,
203{
204 fn clone(&self) -> Self {
205 Self {
206 _ioec: std::marker::PhantomData,
207 inner_data: self.inner_data.clone(),
208 runner_description: self.runner_description.clone(),
209 runner: self.runner.clone(),
210 }
211 }
212}
213
214impl<Input, Output, Error, Context, InnerData, R> Node<Input, NodeOutput<Output>, Error, Context>
215 for FnFlow<Input, Output, Error, Context, InnerData, R>
216where
217 InnerData: Clone,
218 for<'a> R: Runner<'a, Input, Output, Error, Context, InnerData>,
219{
220 fn run(
221 &mut self,
222 input: Input,
223 context: &mut Context,
224 ) -> impl Future<Output = Result<NodeOutput<Output>, Error>> + Send {
225 self.runner
226 .run(self.inner_data.as_ref().clone(), input, context)
227 }
228
229 fn describe(&self) -> crate::describe::Description {
230 if let Some(desc) = self.runner_description.as_ref() {
231 return desc.as_ref().clone();
232 }
233
234 let runner = Description::Node {
235 base: DescriptionBase {
236 r#type: Type {
237 name: "Runner".to_owned(),
238 },
239 input: Type {
240 name: String::new(),
241 },
242 output: Type {
243 name: String::new(),
244 },
245 error: Type::of::<Error>(),
246 context: Type::of::<Context>(),
247 description: None,
248 externals: None,
249 },
250 };
251
252 let inner_data = Description::Node {
253 base: DescriptionBase {
254 r#type: Type::of::<InnerData>(),
255 input: Type {
256 name: String::new(),
257 },
258 output: Type {
259 name: String::new(),
260 },
261 error: Type {
262 name: String::new(),
263 },
264 context: Type {
265 name: String::new(),
266 },
267 description: None,
268 externals: None,
269 },
270 };
271
272 Description::new_flow(
273 self,
274 vec![runner, inner_data],
275 vec![
276 Edge::flow_to_node(0),
277 Edge::node_to_flow(0),
278 Edge::node_to_node(1, 0),
279 ],
280 )
281 .modify_name(remove_generics_from_name)
282 }
283}
284
285#[cfg(test)]
286mod test {
287 use super::FnFlow as Flow;
288 use crate::{
289 context::storage::{
290 LocalStorage,
291 local_storage::{LocalStorageImpl, tests::MyVal},
292 },
293 node::{Node, NodeOutput},
294 };
295
296 #[tokio::test]
297 async fn test_flow() {
298 let mut st = LocalStorageImpl::new();
299 let mut flow = Flow::<u8, u64, (), _>::new(
300 (5u8, "aaa".to_owned(), 12u32),
301 async |data: (u8, String, u32), input: u8, context: &mut LocalStorageImpl| {
302 context.insert(MyVal::default());
303 Ok(NodeOutput::Ok(
304 data.0 as u64 + data.1.len() as u64 + data.2 as u64 + input as u64,
305 ))
306 },
307 );
308 let res = flow.run(3, &mut st).await;
309 assert_eq!(res, Ok(NodeOutput::Ok(23)));
310 }
311}