node_flow/flows/sequential_flow/mod.rs
1mod builder;
2pub use builder::*;
3mod chain_run;
4
5use crate::{
6 describe::{Description, Edge, remove_generics_from_name},
7 flows::{chain_describe::ChainDescribe, generic_defs::flow::define_flow},
8};
9use chain_run::ChainRunSequential as ChainRun;
10
11define_flow!(
12 SequentialFlow,
13 ChainRun,
14 |self| {
15 let node_count = <NodeTypes as ChainDescribe<Context, NodeIOETypes>>::COUNT;
16 let mut node_descriptions = Vec::with_capacity(node_count);
17 self.nodes.describe(&mut node_descriptions);
18
19 let mut edges = Vec::with_capacity(node_count + 1);
20 edges.push(Edge::flow_to_node(0));
21 for i in 0..node_count - 1 {
22 edges.push(Edge::node_to_node(i, i + 1));
23 }
24 edges.push(Edge::node_to_flow(node_count - 1));
25
26 Description::new_flow(self, node_descriptions, edges).modify_name(remove_generics_from_name)
27 },
28 Input: Send,
29 Error: Send,
30 Context: Send,
31 /// `SequentialFlow` executes nodes **sequentially**, like a pipeline.
32 ///
33 /// Nodes are executed sequentially like a pipeline where
34 /// the output of one node is used as in input of the next node.
35 /// Nodes are executed in order of insertion until **all** succeed or **any** node "hard" fails.
36 ///
37 /// - If a node returns [`NodeOutput::Ok`](crate::node::NodeOutput::Ok), that value is then fed into the next node.
38 /// - If a node returns [`NodeOutput::SoftFail`](crate::node::NodeOutput::SoftFail), the flow soft-fails.
39 /// - If a node returns an **error**, then that error is returned.
40 ///
41 /// # Type Parameters
42 /// - `Input`: The type of data accepted by this flow.
43 /// - `Output`: The type of data produced by this flow.
44 /// - `Error`: The type of error emitted by this flow.
45 /// - `Context`: The type of context used during execution.
46 ///
47 /// # Examples
48 /// ```
49 /// use node_flow::node::{Node, NodeOutput};
50 /// use node_flow::flows::SequentialFlow;
51 ///
52 /// // Example node
53 /// #[derive(Clone)]
54 /// struct AddOne;
55 ///
56 /// struct ExampleCtx;
57 ///
58 /// impl<Ctx: Send> Node<u8, NodeOutput<u8>, (), Ctx> for AddOne {
59 /// async fn run(&mut self, input: u8, _: &mut Ctx) -> Result<NodeOutput<u8>, ()> {
60 /// Ok(NodeOutput::Ok(input + 1))
61 /// }
62 /// }
63 ///
64 /// # tokio::runtime::Builder::new_current_thread()
65 /// # .enable_all()
66 /// # .build()
67 /// # .unwrap()
68 /// # .block_on(async {
69 /// async fn main() {
70 /// let mut flow = SequentialFlow::<u8, u8, (), _>::builder()
71 /// .add_node(AddOne)
72 /// .add_node(AddOne)
73 /// .add_node(AddOne)
74 /// .build();
75 ///
76 /// let mut ctx = ExampleCtx;
77 /// let result = flow.run(5u8, &mut ctx).await;
78 /// assert_eq!(result, Ok(NodeOutput::Ok(8)));
79 /// }
80 /// # main().await;
81 /// # });
82 /// ```
83);
84
85#[cfg(test)]
86mod test {
87 use super::{ChainRun, SequentialFlow as Flow};
88 use crate::{
89 flows::tests::Passer,
90 node::{Node, NodeOutput},
91 };
92
93 #[tokio::test]
94 async fn test_flow() {
95 let mut flow = Flow::<bool, u128, (), ()>::builder()
96 .add_node(Passer::<u8, u16, ()>::new())
97 .add_node(Passer::<u32, u64, ()>::new())
98 .build();
99 let res = flow.run(true, &mut ()).await;
100
101 assert_eq!(res, Ok(NodeOutput::Ok(1)));
102 }
103
104 #[tokio::test]
105 async fn test_chain() {
106 let node = (
107 (
108 (Passer::<bool, u8, ()>::new(),),
109 Passer::<u16, u32, ()>::new(),
110 ),
111 Passer::<u64, u128, ()>::new(),
112 );
113 let res =
114 ChainRun::<_, Result<NodeOutput<u128>, ()>, (), _>::run(&node, true, &mut ()).await;
115 assert_eq!(res, Ok(NodeOutput::Ok(1)));
116 }
117}