node_flow/flows/one_of_sequential_flow/mod.rs
1mod chain_run;
2
3use crate::{
4 context::{Fork, Update},
5 describe::{Description, Edge, remove_generics_from_name},
6 flows::{chain_describe::ChainDescribe, generic_defs::define_flow_and_ioe_conv_builder},
7};
8use chain_run::ChainRunOneOfSequential as ChainRun;
9
10define_flow_and_ioe_conv_builder!(
11 OneOfSequentialFlow,
12 ChainRun,
13 |self| {
14 let node_count = <NodeTypes as ChainDescribe<Context, NodeIOETypes>>::COUNT;
15 let mut node_descriptions = Vec::with_capacity(node_count);
16 self.nodes.describe(&mut node_descriptions);
17 let edges = (0..node_count)
18 .flat_map(|i| [Edge::flow_to_node(i), Edge::node_to_flow(i)])
19 .collect::<Vec<_>>();
20
21 Description::new_flow(self, node_descriptions, edges).modify_name(remove_generics_from_name)
22 },
23 >Input: Send + Clone,
24 >Context: Fork + Update + Send,
25 #NodeType: Send + Sync + Clone
26 /// `OneOfSequentialFlow` executes nodes (branches) **sequentially**, returning when one succeeds or fails.
27 ///
28 /// Nodes (branches) are executed sequentially in order of insertion until **one** succeeds or "hard" fails.
29 /// - If a node returns [`NodeOutput::Ok`](crate::node::NodeOutput::Ok), that value is returned.
30 /// - If a node returns [`NodeOutput::SoftFail`](crate::node::NodeOutput::SoftFail), the flow continues onto the next node (branch).
31 /// - If a node returns an **error**, then that error is returned.
32 ///
33 /// If all nodes (branches) soft-fail, the flow itself returns [`NodeOutput::SoftFail`](crate::node::NodeOutput::SoftFail).
34 ///
35 /// This flow allows for defining fallback-style chain, where multiple nodes are tried
36 /// in sequence until one handles the input successfully.
37 ///
38 /// # Type Parameters
39 /// - `Input`: The type of data accepted by this flow.
40 /// - `Output`: The type of data produced by this flow.
41 /// - `Error`: The type of error emitted by this flow.
42 /// - `Context`: The type of context used during execution.
43 ///
44 /// # Examples
45 /// ```
46 /// use node_flow::node::{Node, NodeOutput};
47 /// use node_flow::flows::OneOfSequentialFlow;
48 /// use node_flow::context::{Fork, Update};
49 ///
50 /// // Example nodes
51 /// #[derive(Clone)]
52 /// struct A;
53 /// #[derive(Clone)]
54 /// struct B;
55 ///
56 /// struct ExampleCtx;
57 /// impl Fork for ExampleCtx // ...
58 /// # { fn fork(&self) -> Self { Self } }
59 /// impl Update for ExampleCtx // ...
60 /// # { fn update_from(&mut self, other: Self) {} }
61 ///
62 /// impl<Ctx: Send> Node<(), NodeOutput<i32>, (), Ctx> for A {
63 /// async fn run(&mut self, _: (), _: &mut Ctx) -> Result<NodeOutput<i32>, ()> {
64 /// Ok(NodeOutput::SoftFail) // Try next
65 /// }
66 /// }
67 ///
68 /// impl<Ctx: Send> Node<(), NodeOutput<i32>, (), Ctx> for B {
69 /// async fn run(&mut self, _: (), _: &mut Ctx) -> Result<NodeOutput<i32>, ()> {
70 /// Ok(NodeOutput::Ok(5)) // Succeeds
71 /// }
72 /// }
73 ///
74 /// # tokio::runtime::Builder::new_current_thread()
75 /// # .enable_all()
76 /// # .build()
77 /// # .unwrap()
78 /// # .block_on(async {
79 /// async fn main() {
80 /// let mut flow = OneOfSequentialFlow::<(), i32, (), _>::builder()
81 /// .add_node(A)
82 /// .add_node(B)
83 /// .build();
84 ///
85 /// let mut ctx = ExampleCtx;
86 /// let result = flow.run((), &mut ctx).await;
87 /// assert_eq!(result, Ok(NodeOutput::Ok(5)));
88 /// }
89 /// # main().await;
90 /// # });
91 /// ```
92);
93
94#[cfg(test)]
95mod test {
96 use super::{ChainRun, OneOfSequentialFlow as Flow};
97 use crate::{
98 context::storage::local_storage::{LocalStorageImpl, tests::MyVal},
99 flows::tests::{InsertIntoStorageAssertWasNotInStorage, Passer, SoftFailNode},
100 node::{Node, NodeOutput},
101 };
102
103 #[tokio::test]
104 async fn test_flow() {
105 let mut st = LocalStorageImpl::new();
106 let mut flow = Flow::<u8, u64, (), _>::builder()
107 .add_node(SoftFailNode::<u16, u32, ()>::new())
108 .add_node(SoftFailNode::<u8, u16, ()>::new())
109 .add_node(SoftFailNode::<u32, u64, ()>::new())
110 .add_node(Passer::<u16, u32, ()>::new())
111 .build();
112 let res = flow.run(5, &mut st).await;
113
114 assert_eq!(res, Ok(NodeOutput::Ok(5)));
115 }
116
117 #[tokio::test]
118 async fn test_chain() {
119 let mut st = LocalStorageImpl::new();
120 let node = (
121 (
122 (SoftFailNode::<u16, u32, ()>::new(),),
123 SoftFailNode::<u16, u32, ()>::new(),
124 ),
125 Passer::<u16, u32, ()>::new(),
126 );
127 let res = ChainRun::<_, Result<NodeOutput<u64>, ()>, _, _>::run(&node, 5u8, &mut st).await;
128 assert_eq!(res, Ok(NodeOutput::Ok(5)));
129 }
130
131 #[tokio::test]
132 async fn test_flow_storage() {
133 let mut st = LocalStorageImpl::new();
134 let mut flow = Flow::<u8, u64, (), _>::builder()
135 .add_node(InsertIntoStorageAssertWasNotInStorage::<u16, u32, (), MyVal>::new())
136 .add_node(InsertIntoStorageAssertWasNotInStorage::<u8, u16, (), MyVal>::new())
137 .add_node(InsertIntoStorageAssertWasNotInStorage::<u32, u64, (), MyVal>::new())
138 .add_node(Passer::<u16, u32, ()>::new())
139 .build();
140 let res = flow.run(5, &mut st).await;
141
142 assert_eq!(res, Ok(NodeOutput::Ok(5)));
143 }
144}