node_flow/flows/one_of_sequential_flow/
mod.rs

1mod chain_run;
2
3use crate::{
4    context::{Fork, Update},
5    describe::{Description, Edge, remove_generics_from_name},
6    flows::{chain_describe::ChainDescribe, generic_defs::define_flow_and_ioe_conv_builder},
7};
8use chain_run::ChainRunOneOfSequential as ChainRun;
9
10define_flow_and_ioe_conv_builder!(
11    OneOfSequentialFlow,
12    ChainRun,
13    |self| {
14        let node_count = <NodeTypes as ChainDescribe<Context, NodeIOETypes>>::COUNT;
15        let mut node_descriptions = Vec::with_capacity(node_count);
16        self.nodes.describe(&mut node_descriptions);
17        let edges = (0..node_count)
18            .flat_map(|i| [Edge::flow_to_node(i), Edge::node_to_flow(i)])
19            .collect::<Vec<_>>();
20
21        Description::new_flow(self, node_descriptions, edges).modify_name(remove_generics_from_name)
22    },
23    >Input: Send + Clone,
24    >Context: Fork + Update + Send,
25    #NodeType: Send + Sync + Clone
26    /// `OneOfSequentialFlow` executes nodes (branches) **sequentially**, returning when one succeeds or fails.
27    ///
28    /// Nodes (branches) are executed sequentially in order of insertion until **one** succeeds or "hard" fails.
29    /// - If a node returns [`NodeOutput::Ok`](crate::node::NodeOutput::Ok), that value is returned.
30    /// - If a node returns [`NodeOutput::SoftFail`](crate::node::NodeOutput::SoftFail), the flow continues onto the next node (branch).
31    /// - If a node returns an **error**, then that error is returned.
32    ///
33    /// If all nodes (branches) soft-fail, the flow itself returns [`NodeOutput::SoftFail`](crate::node::NodeOutput::SoftFail).
34    ///
35    /// This flow allows for defining fallback-style chain, where multiple nodes are tried
36    /// in sequence until one handles the input successfully.
37    ///
38    /// # Type Parameters
39    /// - `Input`: The type of data accepted by this flow.
40    /// - `Output`: The type of data produced by this flow.
41    /// - `Error`: The type of error emitted by this flow.
42    /// - `Context`: The type of context used during execution.
43    ///
44    /// # Examples
45    /// ```
46    /// use node_flow::node::{Node, NodeOutput};
47    /// use node_flow::flows::OneOfSequentialFlow;
48    /// use node_flow::context::{Fork, Update};
49    ///
50    /// // Example nodes
51    /// #[derive(Clone)]
52    /// struct A;
53    /// #[derive(Clone)]
54    /// struct B;
55    ///
56    /// struct ExampleCtx;
57    /// impl Fork for ExampleCtx // ...
58    /// # { fn fork(&self) -> Self { Self } }
59    /// impl Update for ExampleCtx // ...
60    /// # { fn update_from(&mut self, other: Self) {} }
61    ///
62    /// impl<Ctx: Send> Node<(), NodeOutput<i32>, (), Ctx> for A {
63    ///     async fn run(&mut self, _: (), _: &mut Ctx) -> Result<NodeOutput<i32>, ()> {
64    ///         Ok(NodeOutput::SoftFail) // Try next
65    ///     }
66    /// }
67    ///
68    /// impl<Ctx: Send> Node<(), NodeOutput<i32>, (), Ctx> for B {
69    ///     async fn run(&mut self, _: (), _: &mut Ctx) -> Result<NodeOutput<i32>, ()> {
70    ///         Ok(NodeOutput::Ok(5)) // Succeeds
71    ///     }
72    /// }
73    ///
74    /// # tokio::runtime::Builder::new_current_thread()
75    /// #     .enable_all()
76    /// #     .build()
77    /// #     .unwrap()
78    /// #     .block_on(async {
79    /// async fn main() {
80    ///     let mut flow = OneOfSequentialFlow::<(), i32, (), _>::builder()
81    ///         .add_node(A)
82    ///         .add_node(B)
83    ///         .build();
84    ///
85    ///     let mut ctx = ExampleCtx;
86    ///     let result = flow.run((), &mut ctx).await;
87    ///     assert_eq!(result, Ok(NodeOutput::Ok(5)));
88    /// }
89    /// # main().await;
90    /// # });
91    /// ```
92);
93
94#[cfg(test)]
95mod test {
96    use super::{ChainRun, OneOfSequentialFlow as Flow};
97    use crate::{
98        context::storage::local_storage::{LocalStorageImpl, tests::MyVal},
99        flows::tests::{InsertIntoStorageAssertWasNotInStorage, Passer, SoftFailNode},
100        node::{Node, NodeOutput},
101    };
102
103    #[tokio::test]
104    async fn test_flow() {
105        let mut st = LocalStorageImpl::new();
106        let mut flow = Flow::<u8, u64, (), _>::builder()
107            .add_node(SoftFailNode::<u16, u32, ()>::new())
108            .add_node(SoftFailNode::<u8, u16, ()>::new())
109            .add_node(SoftFailNode::<u32, u64, ()>::new())
110            .add_node(Passer::<u16, u32, ()>::new())
111            .build();
112        let res = flow.run(5, &mut st).await;
113
114        assert_eq!(res, Ok(NodeOutput::Ok(5)));
115    }
116
117    #[tokio::test]
118    async fn test_chain() {
119        let mut st = LocalStorageImpl::new();
120        let node = (
121            (
122                (SoftFailNode::<u16, u32, ()>::new(),),
123                SoftFailNode::<u16, u32, ()>::new(),
124            ),
125            Passer::<u16, u32, ()>::new(),
126        );
127        let res = ChainRun::<_, Result<NodeOutput<u64>, ()>, _, _>::run(&node, 5u8, &mut st).await;
128        assert_eq!(res, Ok(NodeOutput::Ok(5)));
129    }
130
131    #[tokio::test]
132    async fn test_flow_storage() {
133        let mut st = LocalStorageImpl::new();
134        let mut flow = Flow::<u8, u64, (), _>::builder()
135            .add_node(InsertIntoStorageAssertWasNotInStorage::<u16, u32, (), MyVal>::new())
136            .add_node(InsertIntoStorageAssertWasNotInStorage::<u8, u16, (), MyVal>::new())
137            .add_node(InsertIntoStorageAssertWasNotInStorage::<u32, u64, (), MyVal>::new())
138            .add_node(Passer::<u16, u32, ()>::new())
139            .build();
140        let res = flow.run(5, &mut st).await;
141
142        assert_eq!(res, Ok(NodeOutput::Ok(5)));
143    }
144}