node_flow/flows/one_of_parallel_flow/mod.rs
1mod chain_run;
2
3use crate::{
4 context::{Fork, Update},
5 describe::{Description, Edge, remove_generics_from_name},
6 flows::{chain_describe::ChainDescribe, generic_defs::define_flow_and_ioe_conv_builder},
7 node::NodeOutput as NodeOutputStruct,
8};
9use chain_run::ChainRunOneOfParallel as ChainRun;
10
11type FutOutput<Output, Error, Context> = Result<(NodeOutputStruct<Output>, Context), Error>;
12
13define_flow_and_ioe_conv_builder!(
14 OneOfParallelFlow,
15 ChainRun,
16 |self| {
17 let node_count = <NodeTypes as ChainDescribe<Context, NodeIOETypes>>::COUNT;
18 let mut node_descriptions = Vec::with_capacity(node_count);
19 self.nodes.describe(&mut node_descriptions);
20 let edges = (0..node_count)
21 .flat_map(|i| [Edge::flow_to_node(i), Edge::node_to_flow(i)])
22 .collect::<Vec<_>>();
23
24 Description::new_flow(self, node_descriptions, edges).modify_name(remove_generics_from_name)
25 },
26 >Input: Send + Clone,
27 >Output: Send,
28 >Error: Send,
29 >Context: Fork + Update + Send,
30 #NodeType: Send + Sync + Clone
31 /// `OneOfParallelFlow` executes nodes (branches) **in parallel**, returning when one succeeds or fails.
32 ///
33 /// Nodes (branches) are executed concurrently.
34 /// The flow completes when **any** node succeeds or "hard" fails.
35 /// - If a node returns [`NodeOutput::Ok`](crate::node::NodeOutput::Ok), that value is returned.
36 /// - If a node returns [`NodeOutput::SoftFail`](crate::node::NodeOutput::SoftFail),
37 /// that result is ignored and the flow continues waiting for other nodes (branches).
38 /// - If a node returns an **error**, then that error is returned.
39 ///
40 /// If all nodes (branches) soft-fail, the flow itself returns [`NodeOutput::SoftFail`](crate::node::NodeOutput::SoftFail).
41 ///
42 /// This flow allows defining race-style execution, where multiple branches are ran in parallel
43 /// and the first one to complete determines the returned value.
44 ///
45 /// # Type Parameters
46 /// - `Input`: The type of data accepted by this flow.
47 /// - `Output`: The type of data produced by this flow.
48 /// - `Error`: The type of error emitted by this flow.
49 /// - `Context`: The type of context used during execution.
50 ///
51 /// # Examples
52 /// ```
53 /// use node_flow::node::{Node, NodeOutput};
54 /// use node_flow::flows::OneOfParallelFlow;
55 /// use node_flow::context::{Fork, Update};
56 ///
57 /// // Example nodes
58 /// #[derive(Clone)]
59 /// struct A;
60 /// #[derive(Clone)]
61 /// struct B;
62 ///
63 /// struct ExampleCtx;
64 /// impl Fork for ExampleCtx // ...
65 /// # { fn fork(&self) -> Self { Self } }
66 /// impl Update for ExampleCtx // ...
67 /// # { fn update_from(&mut self, other: Self) {} }
68 ///
69 /// impl<Ctx: Send> Node<(), NodeOutput<i32>, (), Ctx> for A {
70 /// async fn run(&mut self, _: (), _: &mut Ctx) -> Result<NodeOutput<i32>, ()> {
71 /// Ok(NodeOutput::SoftFail) // Ignored
72 /// }
73 /// }
74 ///
75 /// impl<Ctx: Send> Node<(), NodeOutput<i32>, (), Ctx> for B {
76 /// async fn run(&mut self, _: (), _: &mut Ctx) -> Result<NodeOutput<i32>, ()> {
77 /// Ok(NodeOutput::Ok(5)) // Wins the race
78 /// }
79 /// }
80 ///
81 /// # tokio::runtime::Builder::new_current_thread()
82 /// # .enable_all()
83 /// # .build()
84 /// # .unwrap()
85 /// # .block_on(async {
86 /// async fn main() {
87 /// let mut flow = OneOfParallelFlow::<(), i32, (), _>::builder()
88 /// .add_node(A)
89 /// .add_node(B)
90 /// .build();
91 ///
92 /// let mut ctx = ExampleCtx;
93 /// let result = flow.run((), &mut ctx).await;
94 /// assert_eq!(result, Ok(NodeOutput::Ok(5)));
95 /// }
96 /// # main().await;
97 /// # });
98 /// ```
99);
100
101#[cfg(test)]
102mod test {
103 use super::{ChainRun, OneOfParallelFlow as Flow};
104 use crate::{
105 context::storage::local_storage::{LocalStorageImpl, tests::MyVal},
106 flows::tests::{InsertIntoStorageAssertWasNotInStorage, Passer, SoftFailNode},
107 node::{Node, NodeOutput},
108 };
109
110 #[tokio::test]
111 async fn test_flow() {
112 let mut st = LocalStorageImpl::new();
113 let mut flow = Flow::<u8, u64, (), _>::builder()
114 .add_node(SoftFailNode::<u16, u32, ()>::new())
115 .add_node(SoftFailNode::<u8, u16, ()>::new())
116 .add_node(SoftFailNode::<u32, u64, ()>::new())
117 .add_node(Passer::<u16, u32, ()>::new())
118 .build();
119 let res = flow.run(5, &mut st).await;
120
121 assert_eq!(res, Result::Ok(NodeOutput::Ok(5)));
122 }
123
124 #[tokio::test]
125 async fn test_chain() {
126 let mut st = LocalStorageImpl::new();
127 let node = (
128 (
129 (SoftFailNode::<u16, u32, ()>::new(),),
130 SoftFailNode::<u16, u32, ()>::new(),
131 ),
132 Passer::<u16, u32, ()>::new(),
133 );
134 let res = ChainRun::<_, Result<NodeOutput<u64>, ()>, _, _>::run(&node, 5u8, &mut st).await;
135 assert_eq!(res, Ok(NodeOutput::Ok(5)));
136 }
137
138 #[tokio::test]
139 async fn test_flow_storage() {
140 let mut st = LocalStorageImpl::new();
141 let mut flow = Flow::<u8, u64, (), _>::builder()
142 .add_node(InsertIntoStorageAssertWasNotInStorage::<u16, u32, (), MyVal>::new())
143 .add_node(InsertIntoStorageAssertWasNotInStorage::<u8, u16, (), MyVal>::new())
144 .add_node(InsertIntoStorageAssertWasNotInStorage::<u32, u64, (), MyVal>::new())
145 .add_node(Passer::<u16, u32, ()>::new())
146 .build();
147 let res = flow.run(5, &mut st).await;
148
149 assert_eq!(res, Result::Ok(NodeOutput::Ok(5)));
150 }
151}