Skip to main content

node_flow/flows/one_of_parallel_flow/
mod.rs

1mod chain_run;
2
3use crate::{
4    context::{Fork, Update},
5    describe::{Description, Edge, remove_generics_from_name},
6    flows::{chain_describe::ChainDescribe, generic_defs::define_flow_and_ioe_conv_builder},
7    node::NodeOutput as NodeOutputStruct,
8};
9use chain_run::ChainRunOneOfParallel as ChainRun;
10
11type FutOutput<Output, Error, Context> = Result<(NodeOutputStruct<Output>, Context), Error>;
12
13define_flow_and_ioe_conv_builder!(
14    OneOfParallelFlow,
15    ChainRun,
16    |self| {
17        let node_count = <NodeTypes as ChainDescribe<Context, NodeIOETypes>>::COUNT;
18        let mut node_descriptions = Vec::with_capacity(node_count);
19        self.nodes.describe(&mut node_descriptions);
20        let edges = (0..node_count)
21            .flat_map(|i| [Edge::flow_to_node(i), Edge::node_to_flow(i)])
22            .collect::<Vec<_>>();
23
24        Description::new_flow(self, node_descriptions, edges).modify_name(remove_generics_from_name)
25    },
26    >Input: Send + Clone,
27    >Output: Send,
28    >Error: Send,
29    >Context: Fork + Update + Send,
30    #NodeType: Send + Sync + Clone
31    /// `OneOfParallelFlow` executes nodes (branches) **in parallel**, returning when one succeeds or fails.
32    ///
33    /// Nodes (branches) are executed concurrently.
34    /// The flow completes when **any** node succeeds or "hard" fails.
35    /// - If a node returns [`NodeOutput::Ok`](crate::node::NodeOutput::Ok), that value is returned.
36    /// - If a node returns [`NodeOutput::SoftFail`](crate::node::NodeOutput::SoftFail),
37    ///   that result is ignored and the flow continues waiting for other nodes (branches).
38    /// - If a node returns an **error**, then that error is returned.
39    ///
40    /// If all nodes (branches) soft-fail, the flow itself returns [`NodeOutput::SoftFail`](crate::node::NodeOutput::SoftFail).
41    ///
42    /// This flow allows defining race-style execution, where multiple branches are ran in parallel
43    /// and the first one to complete determines the returned value.
44    ///
45    /// # Type Parameters
46    /// - `Input`: The type of data accepted by this flow.
47    /// - `Output`: The type of data produced by this flow.
48    /// - `Error`: The type of error emitted by this flow.
49    /// - `Context`: The type of context used during execution.
50    ///
51    /// # Examples
52    /// ```
53    /// use node_flow::node::{Node, NodeOutput};
54    /// use node_flow::flows::OneOfParallelFlow;
55    /// use node_flow::context::{Fork, Update};
56    ///
57    /// // Example nodes
58    /// #[derive(Clone)]
59    /// struct A;
60    /// #[derive(Clone)]
61    /// struct B;
62    ///
63    /// struct ExampleCtx;
64    /// impl Fork for ExampleCtx // ...
65    /// # { fn fork(&self) -> Self { Self } }
66    /// impl Update for ExampleCtx // ...
67    /// # { fn update_from(&mut self, other: Self) {} }
68    ///
69    /// impl<Ctx: Send> Node<(), NodeOutput<i32>, (), Ctx> for A {
70    ///     async fn run(&mut self, _: (), _: &mut Ctx) -> Result<NodeOutput<i32>, ()> {
71    ///         Ok(NodeOutput::SoftFail) // Ignored
72    ///     }
73    /// }
74    ///
75    /// impl<Ctx: Send> Node<(), NodeOutput<i32>, (), Ctx> for B {
76    ///     async fn run(&mut self, _: (), _: &mut Ctx) -> Result<NodeOutput<i32>, ()> {
77    ///         Ok(NodeOutput::Ok(5)) // Wins the race
78    ///     }
79    /// }
80    ///
81    /// # tokio::runtime::Builder::new_current_thread()
82    /// #     .enable_all()
83    /// #     .build()
84    /// #     .unwrap()
85    /// #     .block_on(async {
86    /// async fn main() {
87    ///     let mut flow = OneOfParallelFlow::<(), i32, (), _>::builder()
88    ///         .add_node(A)
89    ///         .add_node(B)
90    ///         .build();
91    ///
92    ///     let mut ctx = ExampleCtx;
93    ///     let result = flow.run((), &mut ctx).await;
94    ///     assert_eq!(result, Ok(NodeOutput::Ok(5)));
95    /// }
96    /// # main().await;
97    /// # });
98    /// ```
99);
100
101#[cfg(test)]
102mod test {
103    use super::{ChainRun, OneOfParallelFlow as Flow};
104    use crate::{
105        context::storage::local_storage::{LocalStorageImpl, tests::MyVal},
106        flows::tests::{InsertIntoStorageAssertWasNotInStorage, Passer, SoftFailNode},
107        node::{Node, NodeOutput},
108    };
109
110    #[tokio::test]
111    async fn test_flow() {
112        let mut st = LocalStorageImpl::new();
113        let mut flow = Flow::<u8, u64, (), _>::builder()
114            .add_node(SoftFailNode::<u16, u32, ()>::new())
115            .add_node(SoftFailNode::<u8, u16, ()>::new())
116            .add_node(SoftFailNode::<u32, u64, ()>::new())
117            .add_node(Passer::<u16, u32, ()>::new())
118            .build();
119        let res = flow.run(5, &mut st).await;
120
121        assert_eq!(res, Result::Ok(NodeOutput::Ok(5)));
122    }
123
124    #[tokio::test]
125    async fn test_chain() {
126        let mut st = LocalStorageImpl::new();
127        let node = (
128            (
129                (SoftFailNode::<u16, u32, ()>::new(),),
130                SoftFailNode::<u16, u32, ()>::new(),
131            ),
132            Passer::<u16, u32, ()>::new(),
133        );
134        let res = ChainRun::<_, Result<NodeOutput<u64>, ()>, _, _>::run(&node, 5u8, &mut st).await;
135        assert_eq!(res, Ok(NodeOutput::Ok(5)));
136    }
137
138    #[tokio::test]
139    async fn test_flow_storage() {
140        let mut st = LocalStorageImpl::new();
141        let mut flow = Flow::<u8, u64, (), _>::builder()
142            .add_node(InsertIntoStorageAssertWasNotInStorage::<u16, u32, (), MyVal>::new())
143            .add_node(InsertIntoStorageAssertWasNotInStorage::<u8, u16, (), MyVal>::new())
144            .add_node(InsertIntoStorageAssertWasNotInStorage::<u32, u64, (), MyVal>::new())
145            .add_node(Passer::<u16, u32, ()>::new())
146            .build();
147        let res = flow.run(5, &mut st).await;
148
149        assert_eq!(res, Result::Ok(NodeOutput::Ok(5)));
150    }
151}