node_flow/flows/
fn_flow.rs

1use std::fmt::Debug;
2
3use crate::{
4    describe::{Description, DescriptionBase, Edge, Type, remove_generics_from_name},
5    flows::NodeResult,
6    node::{Node, NodeOutput},
7};
8
9/// The `Runner` is used in [`FnFlow`] and is basically a replacement for [`Node`].
10///
11/// It defines how `Input` and inner data is processed into an `Output` (and `Error`) with a given `Context`.
12///
13/// See also [`FnFlow`], [`Node`].
14pub trait Runner<'a, Input, Output, Error, Context, InnerData>: Send + Sync {
15    /// Executes the runner using the provided inner data, input, and context.
16    ///
17    /// # Parameters
18    /// - `data`: Inner data used in this runner.
19    /// - `input`: The input data to process.
20    /// - `context`: Mutable reference to the a context.
21    fn run(
22        &self,
23        data: InnerData,
24        input: Input,
25        context: &'a mut Context,
26    ) -> impl Future<Output = NodeResult<Output, Error>> + Send;
27}
28
29impl<'a, Input, Output, Error, Context, InnerData, T, F>
30    Runner<'a, Input, Output, Error, Context, InnerData> for T
31where
32    Input: Send,
33    F: Future<Output = NodeResult<Output, Error>> + Send + 'a,
34    T: Fn(InnerData, Input, &'a mut Context) -> F + Send + Sync,
35    Context: 'a,
36{
37    fn run(
38        &self,
39        data: InnerData,
40        input: Input,
41        context: &'a mut Context,
42    ) -> impl Future<Output = NodeResult<Output, Error>> {
43        (self)(data, input, context)
44    }
45}
46
47/// `FnFlow` takes some async function and wraps around it to crate a node.
48///
49/// This flow allows for setting custom [`Description`]
50/// through the [`FnFlow::with_description`] function,
51/// since there is no other way to get it (like with [`Node::describe`]).
52///
53/// # Type Parameters
54/// - `Input`: The type of data accepted by this flow.
55/// - `Output`: The type of data produced by this flow.
56/// - `Error`: The type of error emitted by this flow.
57/// - `Context`: The type of context used during execution.
58///
59/// # Examples
60/// ```
61/// use node_flow::flows::FnFlow;
62/// use node_flow::node::{Node, NodeOutput};
63///
64/// #[derive(Clone)]
65/// struct SomeExpensiveData(Vec<u32>);
66///
67/// # tokio::runtime::Builder::new_current_thread()
68/// #     .enable_all()
69/// #     .build()
70/// #     .unwrap()
71/// #     .block_on(async {
72/// async fn main() {
73///     let mut flow = FnFlow::<u32, u32, (), _>::new(
74///         SomeExpensiveData((0..1<<15).collect()),
75///         async |SomeExpensiveData(data), input, _: &mut _| {
76///             let res = data.iter().sum::<u32>() / data.len() as u32 + input;
77///             Ok(NodeOutput::Ok(res))
78///         },
79///     );
80///
81///     let result = flow.run(1, &mut ()).await;
82///     assert_eq!(result, Ok(NodeOutput::Ok(1<<14)));
83/// }
84/// # main().await;
85/// # });
86/// ```
87pub struct FnFlow<Input, Output, Error, Context, InnerData = (), R = ()> {
88    #[expect(clippy::type_complexity)]
89    _ioec: std::marker::PhantomData<fn() -> (Input, Output, Error, Context)>,
90    inner_data: std::sync::Arc<InnerData>,
91    runner_description: Option<std::sync::Arc<Description>>,
92    runner: R,
93}
94impl<Input, Output, Error, Context> FnFlow<Input, Output, Error, Context>
95where
96    // Trait bounds for better and nicer errors
97    Input: Send,
98    Context: Send,
99{
100    /// Creates a new [`FnFlow`] from a given runner.
101    ///
102    /// The runner must satisfy:
103    /// - `Self`: `Runner<'_, Input, Output, Error, _, InnerData>`
104    ///
105    /// When using closure as a runner it always needs:
106    /// - to be an **async closure** - because of lifetimes
107    /// - *for context to*:
108    ///     - have the **type of a context** specified when **using** context - because it cannot infer the type\
109    ///       *or*
110    ///     - have the context specified as `_: &mut _` when **not using** context - because it cannot satisfy that `Runner` is implemented
111    /// - to have the **type of inner data** specified when **using** inner data - because it cannot infer the type
112    ///
113    /// # Examples
114    /// ```
115    /// # use node_flow::flows::FnFlow;
116    /// # use node_flow::node::{Node, NodeOutput};
117    /// # use node_flow::context::Fork;
118    /// # #[derive(Clone)]
119    /// struct SomeData(u16);
120    /// # struct Context;
121    /// # impl Fork for Context { fn fork(&self) -> Self { Self } }
122    ///
123    /// FnFlow::<u8, u16, (), Context>::new(
124    ///     SomeData(15),
125    ///     async |_, _, _: &mut _| {
126    ///         Ok(NodeOutput::Ok(30))
127    ///     },
128    /// );
129    /// FnFlow::<u8, u16, (), Context>::new(
130    ///     SomeData(15),
131    ///     async |data: SomeData, _, _: &mut _| {
132    ///         Ok(NodeOutput::Ok(data.0 + 30))
133    ///     },
134    /// );
135    /// FnFlow::<u8, u16, (), Context>::new(
136    ///     SomeData(15),
137    ///     async |_, _, ctx: &mut Context| {
138    ///         let _forked_ctx = ctx.fork();
139    ///         Ok(NodeOutput::Ok(30))
140    ///     },
141    /// );
142    /// ```
143    pub fn new<InnerData, R>(
144        inner_data: InnerData,
145        runner: R,
146    ) -> FnFlow<Input, Output, Error, Context, InnerData, R>
147    where
148        InnerData: Clone + Send + Sync,
149        for<'a> R: Runner<'a, Input, Output, Error, Context, InnerData>,
150    {
151        FnFlow {
152            _ioec: std::marker::PhantomData,
153            inner_data: std::sync::Arc::new(inner_data),
154            runner_description: None,
155            runner,
156        }
157    }
158}
159
160impl<Input, Output, Error, Context, InnerData, R>
161    FnFlow<Input, Output, Error, Context, InnerData, R>
162{
163    /// Attaches a custom [`Description`] to this flow.
164    ///
165    /// This is be useful when something complex happens in the flow.
166    ///
167    /// # Examples
168    /// ```
169    /// use node_flow::flows::FnFlow;
170    /// use node_flow::describe::{Description, DescriptionBase};
171    /// use node_flow::node::NodeOutput;
172    ///
173    /// let desc = Description::Node {
174    ///     base: DescriptionBase::from::<(), String, usize, u8, u32>()
175    /// }.with_description("I am lying about my types! But shh...");
176    /// let flow = FnFlow::<u8, u16, (), ()>::new((), async|_, x, _: &mut _| {
177    ///     Ok(NodeOutput::Ok(x as u16))
178    /// }).with_description(desc);
179    /// ```
180    #[must_use]
181    pub fn with_description(mut self, description: Description) -> Self {
182        self.runner_description = Some(std::sync::Arc::new(description));
183        self
184    }
185}
186
187impl<Input, Output, Error, Context, InnerData, R> Debug
188    for FnFlow<Input, Output, Error, Context, InnerData, R>
189where
190    InnerData: Debug,
191{
192    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193        f.debug_struct("FnFlow")
194            .field("inner_data", &self.inner_data)
195            .finish_non_exhaustive()
196    }
197}
198
199impl<Input, Output, Error, Context, InnerData, R> Clone
200    for FnFlow<Input, Output, Error, Context, InnerData, R>
201where
202    R: Clone,
203{
204    fn clone(&self) -> Self {
205        Self {
206            _ioec: std::marker::PhantomData,
207            inner_data: self.inner_data.clone(),
208            runner_description: self.runner_description.clone(),
209            runner: self.runner.clone(),
210        }
211    }
212}
213
214impl<Input, Output, Error, Context, InnerData, R> Node<Input, NodeOutput<Output>, Error, Context>
215    for FnFlow<Input, Output, Error, Context, InnerData, R>
216where
217    InnerData: Clone,
218    for<'a> R: Runner<'a, Input, Output, Error, Context, InnerData>,
219{
220    fn run(
221        &mut self,
222        input: Input,
223        context: &mut Context,
224    ) -> impl Future<Output = Result<NodeOutput<Output>, Error>> + Send {
225        self.runner
226            .run(self.inner_data.as_ref().clone(), input, context)
227    }
228
229    fn describe(&self) -> crate::describe::Description {
230        if let Some(desc) = self.runner_description.as_ref() {
231            return desc.as_ref().clone();
232        }
233
234        let runner = Description::Node {
235            base: DescriptionBase {
236                r#type: Type {
237                    name: "Runner".to_owned(),
238                },
239                input: Type {
240                    name: String::new(),
241                },
242                output: Type {
243                    name: String::new(),
244                },
245                error: Type::of::<Error>(),
246                context: Type::of::<Context>(),
247                description: None,
248                externals: None,
249            },
250        };
251
252        let inner_data = Description::Node {
253            base: DescriptionBase {
254                r#type: Type::of::<InnerData>(),
255                input: Type {
256                    name: String::new(),
257                },
258                output: Type {
259                    name: String::new(),
260                },
261                error: Type {
262                    name: String::new(),
263                },
264                context: Type {
265                    name: String::new(),
266                },
267                description: None,
268                externals: None,
269            },
270        };
271
272        Description::new_flow(
273            self,
274            vec![runner, inner_data],
275            vec![
276                Edge::flow_to_node(0),
277                Edge::node_to_flow(0),
278                Edge::node_to_node(1, 0),
279            ],
280        )
281        .modify_name(remove_generics_from_name)
282    }
283}
284
285#[cfg(test)]
286mod test {
287    use super::FnFlow as Flow;
288    use crate::{
289        context::storage::{
290            LocalStorage,
291            local_storage::{LocalStorageImpl, tests::MyVal},
292        },
293        node::{Node, NodeOutput},
294    };
295
296    #[tokio::test]
297    async fn test_flow() {
298        let mut st = LocalStorageImpl::new();
299        let mut flow = Flow::<u8, u64, (), _>::new(
300            (5u8, "aaa".to_owned(), 12u32),
301            async |data: (u8, String, u32), input: u8, context: &mut LocalStorageImpl| {
302                context.insert(MyVal::default());
303                Ok(NodeOutput::Ok(
304                    data.0 as u64 + data.1.len() as u64 + data.2 as u64 + input as u64,
305                ))
306            },
307        );
308        let res = flow.run(3, &mut st).await;
309        assert_eq!(res, Ok(NodeOutput::Ok(23)));
310    }
311}