Skip to main content

node_flow/flows/parallel_flow/
builder.rs

1use std::marker::PhantomData;
2
3use super::ParallelFlow as Flow;
4use crate::{
5    context::{Fork, Join},
6    flows::{
7        ChainLink, NodeIOE,
8        generic_defs::debug::impl_debug_for_builder,
9        parallel_flow::{Joiner, chain_run::ChainRunParallel as ChainRun},
10    },
11    node::{Node, NodeOutput as NodeOutputStruct},
12};
13
14/// Builder for [`ParallelFlow`](Flow).
15///
16/// This builder ensures:
17/// - `Input` into the flow can be converted into the input of all nodes
18/// - error of all nodes can be converted into the `Error` of the flow
19/// - `Joiner` returns `Result<Output, Error>`
20///
21/// See also [`ParallelFlow`](Flow).
22pub struct Builder<Input, Output, Error, Context, NodeTypes = (), NodeIOETypes = ()>
23where
24    // Trait bounds for better and nicer errors
25    Input: Send + Clone,
26    Error: Send,
27    Context: Fork + Join + Send,
28{
29    #[expect(clippy::type_complexity)]
30    _ioec: PhantomData<fn() -> (Input, Output, Error, Context)>,
31    _nodes_io: PhantomData<fn() -> NodeIOETypes>,
32    nodes: NodeTypes,
33}
34
35impl_debug_for_builder!(
36    "ParallelFlow",
37    Builder,
38    Input: Send + Clone,
39    Error: Send,
40    Context: Fork + Join + Send
41);
42
43impl<Input, Output, Error, Context> Default for Builder<Input, Output, Error, Context>
44where
45    // Trait bounds for better and nicer errors
46    Input: Send + Clone,
47    Error: Send,
48    Context: Fork + Join + Send,
49{
50    fn default() -> Self {
51        Self::new()
52    }
53}
54
55impl<Input, Output, Error, Context> Builder<Input, Output, Error, Context>
56where
57    // Trait bounds for better and nicer errors
58    Input: Send + Clone,
59    Error: Send,
60    Context: Fork + Join + Send,
61{
62    /// Creates a new empty builder for [`ParallelFlow`](Flow).
63    #[must_use]
64    pub fn new() -> Self {
65        Self {
66            _ioec: PhantomData,
67            _nodes_io: PhantomData,
68            nodes: (),
69        }
70    }
71
72    /// Adds a new node.
73    ///
74    /// The new node must satisfy:
75    /// - `Self`: `Node<NodeInputType, NodeOutput<NodeOutputType>, NodeErrorType, _>`
76    /// - `Input`: `Into<NodeInputType>`,
77    /// - `NodeErrorType`: `Into<Error>`,
78    ///
79    /// # Returns
80    /// A new [`Builder`] with the added node.
81    #[expect(clippy::type_complexity, clippy::type_repetition_in_bounds)]
82    pub fn add_node<NodeType, NodeInput, NodeOutput, NodeError>(
83        self,
84        node: NodeType,
85    ) -> Builder<
86        Input,
87        Output,
88        Error,
89        Context,
90        (NodeType,),
91        ChainLink<(), NodeIOE<NodeInput, NodeOutput, NodeError>>,
92    >
93    where
94        Input: Into<NodeInput>,
95        NodeError: Into<Error>,
96        NodeType: Node<NodeInput, NodeOutputStruct<NodeOutput>, NodeError, Context>,
97        // Trait bounds for better and nicer errors
98        NodeType: Send + Sync + Clone,
99        NodeOutput: Send,
100    {
101        Builder {
102            _ioec: PhantomData,
103            _nodes_io: PhantomData,
104            nodes: (node,),
105        }
106    }
107}
108
109impl<Input, Output, Error, Context, NodeTypes, OtherNodeIOETypes, LastNodeIOETypes>
110    Builder<
111        Input,
112        Output,
113        Error,
114        Context,
115        NodeTypes,
116        ChainLink<OtherNodeIOETypes, LastNodeIOETypes>,
117    >
118where
119    // Trait bounds for better and nicer errors
120    Input: Send + Clone,
121    Error: Send,
122    Context: Fork + Join + Send,
123{
124    /// Adds a new node.
125    ///
126    /// The new node must satisfy:
127    /// - `Self`: `Node<NodeInputType, NodeOutput<NodeOutputType>, NodeErrorType, _>`
128    /// - `Input`: `Into<NodeInputType>`,
129    /// - `NodeErrorType`: `Into<Error>`,
130    ///
131    /// # Returns
132    /// A new [`Builder`] with the added node.
133    #[expect(clippy::type_complexity, clippy::type_repetition_in_bounds)]
134    pub fn add_node<NodeType, NodeInput, NodeOutput, NodeError>(
135        self,
136        node: NodeType,
137    ) -> Builder<
138        Input,
139        Output,
140        Error,
141        Context,
142        ChainLink<NodeTypes, NodeType>,
143        ChainLink<
144            ChainLink<OtherNodeIOETypes, LastNodeIOETypes>,
145            NodeIOE<NodeInput, NodeOutput, NodeError>,
146        >,
147    >
148    where
149        Input: Into<NodeInput>,
150        NodeError: Into<Error>,
151        NodeType: Node<NodeInput, NodeOutputStruct<NodeOutput>, NodeError, Context>,
152        // Trait bounds for better and nicer errors
153        NodeType: Send + Sync + Clone,
154        NodeOutput: Send,
155    {
156        Builder {
157            _ioec: PhantomData,
158            _nodes_io: PhantomData,
159            nodes: (self.nodes, node),
160        }
161    }
162
163    /// Finalizes the builder and produces a [`ParallelFlow`](Flow) instance.
164    ///
165    /// The joiner must satisfy:
166    /// - `Self`: `Joiner<'_, NodesOutputs, Output, Error, _>`
167    ///
168    /// When using closure as a joiner it always needs:
169    /// - to be an **async closure** - because of lifetimes
170    /// - *for context to*:
171    ///     - have the **type of a context** specified when **using** context - because it cannot infer the type\
172    ///       *or*
173    ///     - have the context specified as `_: &mut _` when **not using** context - because it cannot satisfy that `Joiner` is implemented
174    ///
175    /// # Examples
176    /// ```
177    /// # use node_flow::node::{Node, NodeOutput};
178    /// # use node_flow::flows::ParallelFlow;
179    /// # use node_flow::context::{Fork, Join};
180    /// # #[derive(Clone)]
181    /// # struct A;
182    /// # struct Context;
183    /// # impl Fork for Context { fn fork(&self) -> Self { Self } }
184    /// # impl Join for Context { fn join(&mut self, others: Box<[Self]>) {} }
185    /// # impl<Ctx: Send> Node<(), NodeOutput<i32>, (), Ctx> for A {
186    /// #     async fn run(&mut self, _: (), _: &mut Ctx) -> Result<NodeOutput<i32>, ()> { todo!() }
187    /// # }
188    /// # let flow = ParallelFlow::<(), i32, (), Context>::builder()
189    /// #     .add_node(A)
190    /// // ...
191    /// .build(async |_, _: &mut _| {
192    ///     Ok(NodeOutput::Ok(120))
193    /// });
194    /// # let flow = ParallelFlow::<(), i32, (), Context>::builder()
195    /// #     .add_node(A)
196    /// // ...
197    /// .build(async |_, ctx: &mut Context| {
198    ///     let _forked_ctx = ctx.fork();
199    ///     Ok(NodeOutput::Ok(120))
200    /// });
201    /// ```
202    pub fn build<J, ChainRunOutput>(
203        self,
204        joiner: J,
205    ) -> Flow<
206        Input,
207        Output,
208        Error,
209        Context,
210        ChainRunOutput,
211        J,
212        NodeTypes,
213        ChainLink<OtherNodeIOETypes, LastNodeIOETypes>,
214    >
215    where
216        for<'a> J: Joiner<'a, ChainRunOutput, Output, Error, Context>,
217        NodeTypes: ChainRun<
218                Input,
219                Result<ChainRunOutput, Error>,
220                Context,
221                ChainLink<OtherNodeIOETypes, LastNodeIOETypes>,
222            >,
223    {
224        Flow {
225            _ioec: PhantomData,
226            _nodes_io: PhantomData,
227            nodes: self.nodes,
228            _joiner_input: PhantomData,
229            joiner,
230        }
231    }
232}