node_flow/flows/sequential_flow/
mod.rs

1mod builder;
2pub use builder::*;
3mod chain_run;
4
5use crate::{
6    describe::{Description, Edge, remove_generics_from_name},
7    flows::{chain_describe::ChainDescribe, generic_defs::flow::define_flow},
8};
9use chain_run::ChainRunSequential as ChainRun;
10
11define_flow!(
12    SequentialFlow,
13    ChainRun,
14    |self| {
15        let node_count = <NodeTypes as ChainDescribe<Context, NodeIOETypes>>::COUNT;
16        let mut node_descriptions = Vec::with_capacity(node_count);
17        self.nodes.describe(&mut node_descriptions);
18
19        let mut edges = Vec::with_capacity(node_count + 1);
20        edges.push(Edge::flow_to_node(0));
21        for i in 0..node_count - 1 {
22            edges.push(Edge::node_to_node(i, i + 1));
23        }
24        edges.push(Edge::node_to_flow(node_count - 1));
25
26        Description::new_flow(self, node_descriptions, edges).modify_name(remove_generics_from_name)
27    },
28    Input: Send,
29    Error: Send,
30    Context: Send,
31    /// `SequentialFlow` executes nodes **sequentially**, like a pipeline.
32    ///
33    /// Nodes are executed sequentially like a pipeline where
34    /// the output of one node is used as in input of the next node.
35    /// Nodes are executed in order of insertion until **all** succeed or **any** node "hard" fails.
36    ///
37    /// - If a node returns [`NodeOutput::Ok`](crate::node::NodeOutput::Ok), that value is then fed into the next node.
38    /// - If a node returns [`NodeOutput::SoftFail`](crate::node::NodeOutput::SoftFail), the flow soft-fails.
39    /// - If a node returns an **error**, then that error is returned.
40    ///
41    /// # Type Parameters
42    /// - `Input`: The type of data accepted by this flow.
43    /// - `Output`: The type of data produced by this flow.
44    /// - `Error`: The type of error emitted by this flow.
45    /// - `Context`: The type of context used during execution.
46    ///
47    /// # Examples
48    /// ```
49    /// use node_flow::node::{Node, NodeOutput};
50    /// use node_flow::flows::SequentialFlow;
51    ///
52    /// // Example node
53    /// #[derive(Clone)]
54    /// struct AddOne;
55    ///
56    /// struct ExampleCtx;
57    ///
58    /// impl<Ctx: Send> Node<u8, NodeOutput<u8>, (), Ctx> for AddOne {
59    ///     async fn run(&mut self, input: u8, _: &mut Ctx) -> Result<NodeOutput<u8>, ()> {
60    ///         Ok(NodeOutput::Ok(input + 1))
61    ///     }
62    /// }
63    ///
64    /// # tokio::runtime::Builder::new_current_thread()
65    /// #     .enable_all()
66    /// #     .build()
67    /// #     .unwrap()
68    /// #     .block_on(async {
69    /// async fn main() {
70    ///     let mut flow = SequentialFlow::<u8, u8, (), _>::builder()
71    ///         .add_node(AddOne)
72    ///         .add_node(AddOne)
73    ///         .add_node(AddOne)
74    ///         .build();
75    ///
76    ///     let mut ctx = ExampleCtx;
77    ///     let result = flow.run(5u8, &mut ctx).await;
78    ///     assert_eq!(result, Ok(NodeOutput::Ok(8)));
79    /// }
80    /// # main().await;
81    /// # });
82    /// ```
83);
84
85#[cfg(test)]
86mod test {
87    use super::{ChainRun, SequentialFlow as Flow};
88    use crate::{
89        flows::tests::Passer,
90        node::{Node, NodeOutput},
91    };
92
93    #[tokio::test]
94    async fn test_flow() {
95        let mut flow = Flow::<bool, u128, (), ()>::builder()
96            .add_node(Passer::<u8, u16, ()>::new())
97            .add_node(Passer::<u32, u64, ()>::new())
98            .build();
99        let res = flow.run(true, &mut ()).await;
100
101        assert_eq!(res, Ok(NodeOutput::Ok(1)));
102    }
103
104    #[tokio::test]
105    async fn test_chain() {
106        let node = (
107            (
108                (Passer::<bool, u8, ()>::new(),),
109                Passer::<u16, u32, ()>::new(),
110            ),
111            Passer::<u64, u128, ()>::new(),
112        );
113        let res =
114            ChainRun::<_, Result<NodeOutput<u128>, ()>, (), _>::run(&node, true, &mut ()).await;
115        assert_eq!(res, Ok(NodeOutput::Ok(1)));
116    }
117}