node_flow/flows/parallel_flow/builder.rs
1use std::marker::PhantomData;
2
3use super::ParallelFlow as Flow;
4use crate::{
5 context::{Fork, Join},
6 flows::{
7 ChainLink, NodeIOE,
8 generic_defs::debug::impl_debug_for_builder,
9 parallel_flow::{Joiner, chain_run::ChainRunParallel as ChainRun},
10 },
11 node::{Node, NodeOutput as NodeOutputStruct},
12};
13
14/// Builder for [`ParallelFlow`](Flow).
15///
16/// This builder ensures:
17/// - `Input` into the flow can be converted into the input of all nodes
18/// - error of all nodes can be converted into the `Error` of the flow
19/// - `Joiner` returns `Result<Output, Error>`
20///
21/// See also [`ParallelFlow`](Flow).
22pub struct Builder<Input, Output, Error, Context, NodeTypes = (), NodeIOETypes = ()>
23where
24 // Trait bounds for better and nicer errors
25 Input: Send + Clone,
26 Error: Send,
27 Context: Fork + Join + Send,
28{
29 #[expect(clippy::type_complexity)]
30 _ioec: PhantomData<fn() -> (Input, Output, Error, Context)>,
31 _nodes_io: PhantomData<fn() -> NodeIOETypes>,
32 nodes: NodeTypes,
33}
34
35impl_debug_for_builder!(
36 "ParallelFlow",
37 Builder,
38 Input: Send + Clone,
39 Error: Send,
40 Context: Fork + Join + Send
41);
42
43impl<Input, Output, Error, Context> Default for Builder<Input, Output, Error, Context>
44where
45 // Trait bounds for better and nicer errors
46 Input: Send + Clone,
47 Error: Send,
48 Context: Fork + Join + Send,
49{
50 fn default() -> Self {
51 Self::new()
52 }
53}
54
55impl<Input, Output, Error, Context> Builder<Input, Output, Error, Context>
56where
57 // Trait bounds for better and nicer errors
58 Input: Send + Clone,
59 Error: Send,
60 Context: Fork + Join + Send,
61{
62 /// Creates a new empty builder for [`ParallelFlow`](Flow).
63 #[must_use]
64 pub fn new() -> Self {
65 Self {
66 _ioec: PhantomData,
67 _nodes_io: PhantomData,
68 nodes: (),
69 }
70 }
71
72 /// Adds a new node.
73 ///
74 /// The new node must satisfy:
75 /// - `Self`: `Node<NodeInputType, NodeOutput<NodeOutputType>, NodeErrorType, _>`
76 /// - `Input`: `Into<NodeInputType>`,
77 /// - `NodeErrorType`: `Into<Error>`,
78 ///
79 /// # Returns
80 /// A new [`Builder`] with the added node.
81 #[expect(clippy::type_complexity, clippy::type_repetition_in_bounds)]
82 pub fn add_node<NodeType, NodeInput, NodeOutput, NodeError>(
83 self,
84 node: NodeType,
85 ) -> Builder<
86 Input,
87 Output,
88 Error,
89 Context,
90 (NodeType,),
91 ChainLink<(), NodeIOE<NodeInput, NodeOutput, NodeError>>,
92 >
93 where
94 Input: Into<NodeInput>,
95 NodeError: Into<Error>,
96 NodeType: Node<NodeInput, NodeOutputStruct<NodeOutput>, NodeError, Context>,
97 // Trait bounds for better and nicer errors
98 NodeType: Send + Sync + Clone,
99 NodeOutput: Send,
100 {
101 Builder {
102 _ioec: PhantomData,
103 _nodes_io: PhantomData,
104 nodes: (node,),
105 }
106 }
107}
108
109impl<Input, Output, Error, Context, NodeTypes, OtherNodeIOETypes, LastNodeIOETypes>
110 Builder<
111 Input,
112 Output,
113 Error,
114 Context,
115 NodeTypes,
116 ChainLink<OtherNodeIOETypes, LastNodeIOETypes>,
117 >
118where
119 // Trait bounds for better and nicer errors
120 Input: Send + Clone,
121 Error: Send,
122 Context: Fork + Join + Send,
123{
124 /// Adds a new node.
125 ///
126 /// The new node must satisfy:
127 /// - `Self`: `Node<NodeInputType, NodeOutput<NodeOutputType>, NodeErrorType, _>`
128 /// - `Input`: `Into<NodeInputType>`,
129 /// - `NodeErrorType`: `Into<Error>`,
130 ///
131 /// # Returns
132 /// A new [`Builder`] with the added node.
133 #[expect(clippy::type_complexity, clippy::type_repetition_in_bounds)]
134 pub fn add_node<NodeType, NodeInput, NodeOutput, NodeError>(
135 self,
136 node: NodeType,
137 ) -> Builder<
138 Input,
139 Output,
140 Error,
141 Context,
142 ChainLink<NodeTypes, NodeType>,
143 ChainLink<
144 ChainLink<OtherNodeIOETypes, LastNodeIOETypes>,
145 NodeIOE<NodeInput, NodeOutput, NodeError>,
146 >,
147 >
148 where
149 Input: Into<NodeInput>,
150 NodeError: Into<Error>,
151 NodeType: Node<NodeInput, NodeOutputStruct<NodeOutput>, NodeError, Context>,
152 // Trait bounds for better and nicer errors
153 NodeType: Send + Sync + Clone,
154 NodeOutput: Send,
155 {
156 Builder {
157 _ioec: PhantomData,
158 _nodes_io: PhantomData,
159 nodes: (self.nodes, node),
160 }
161 }
162
163 /// Finalizes the builder and produces a [`ParallelFlow`](Flow) instance.
164 ///
165 /// The joiner must satisfy:
166 /// - `Self`: `Joiner<'_, NodesOutputs, Output, Error, _>`
167 ///
168 /// When using closure as a joiner it always needs:
169 /// - to be an **async closure** - because of lifetimes
170 /// - *for context to*:
171 /// - have the **type of a context** specified when **using** context - because it cannot infer the type\
172 /// *or*
173 /// - have the context specified as `_: &mut _` when **not using** context - because it cannot satisfy that `Joiner` is implemented
174 ///
175 /// # Examples
176 /// ```
177 /// # use node_flow::node::{Node, NodeOutput};
178 /// # use node_flow::flows::ParallelFlow;
179 /// # use node_flow::context::{Fork, Join};
180 /// # #[derive(Clone)]
181 /// # struct A;
182 /// # struct Context;
183 /// # impl Fork for Context { fn fork(&self) -> Self { Self } }
184 /// # impl Join for Context { fn join(&mut self, others: Box<[Self]>) {} }
185 /// # impl<Ctx: Send> Node<(), NodeOutput<i32>, (), Ctx> for A {
186 /// # async fn run(&mut self, _: (), _: &mut Ctx) -> Result<NodeOutput<i32>, ()> { todo!() }
187 /// # }
188 /// # let flow = ParallelFlow::<(), i32, (), Context>::builder()
189 /// # .add_node(A)
190 /// // ...
191 /// .build(async |_, _: &mut _| {
192 /// Ok(NodeOutput::Ok(120))
193 /// });
194 /// # let flow = ParallelFlow::<(), i32, (), Context>::builder()
195 /// # .add_node(A)
196 /// // ...
197 /// .build(async |_, ctx: &mut Context| {
198 /// let _forked_ctx = ctx.fork();
199 /// Ok(NodeOutput::Ok(120))
200 /// });
201 /// ```
202 pub fn build<J, ChainRunOutput>(
203 self,
204 joiner: J,
205 ) -> Flow<
206 Input,
207 Output,
208 Error,
209 Context,
210 ChainRunOutput,
211 J,
212 NodeTypes,
213 ChainLink<OtherNodeIOETypes, LastNodeIOETypes>,
214 >
215 where
216 for<'a> J: Joiner<'a, ChainRunOutput, Output, Error, Context>,
217 NodeTypes: ChainRun<
218 Input,
219 Result<ChainRunOutput, Error>,
220 Context,
221 ChainLink<OtherNodeIOETypes, LastNodeIOETypes>,
222 >,
223 {
224 Flow {
225 _ioec: PhantomData,
226 _nodes_io: PhantomData,
227 nodes: self.nodes,
228 _joiner_input: PhantomData,
229 joiner,
230 }
231 }
232}