Skip to main content

enact_core/flow/
sequential.rs

1//! Sequential Flow - Execute callables one after another
2//!
3//! The output of each step becomes the input to the next.
4
5use crate::callable::Callable;
6use std::sync::Arc;
7
8/// Sequential execution flow
9pub struct SequentialFlow<C: Callable> {
10    /// Ordered list of callables to execute
11    steps: Vec<Arc<C>>,
12    /// Flow name
13    name: String,
14}
15
16impl<C: Callable> SequentialFlow<C> {
17    /// Create a new sequential flow
18    pub fn new(name: impl Into<String>) -> Self {
19        Self {
20            steps: Vec::new(),
21            name: name.into(),
22        }
23    }
24
25    /// Add a step to the flow
26    pub fn add_step(mut self, callable: Arc<C>) -> Self {
27        self.steps.push(callable);
28        self
29    }
30
31    /// Add multiple steps
32    pub fn with_steps(mut self, callables: Vec<Arc<C>>) -> Self {
33        self.steps.extend(callables);
34        self
35    }
36
37    /// Execute the flow
38    pub async fn execute(&self, input: &str) -> anyhow::Result<String> {
39        let mut current_output = input.to_string();
40
41        for step in &self.steps {
42            current_output = step.run(&current_output).await?;
43        }
44
45        Ok(current_output)
46    }
47
48    /// Get the flow name
49    pub fn name(&self) -> &str {
50        &self.name
51    }
52
53    /// Get the number of steps
54    pub fn len(&self) -> usize {
55        self.steps.len()
56    }
57
58    /// Check if empty
59    pub fn is_empty(&self) -> bool {
60        self.steps.is_empty()
61    }
62}
63
64#[cfg(test)]
65mod tests {
66    use super::*;
67    use async_trait::async_trait;
68
69    /// Mock callable for testing - transforms input in a predictable way
70    struct MockCallable {
71        name: String,
72        transform: Box<dyn Fn(&str) -> String + Send + Sync>,
73    }
74
75    impl MockCallable {
76        fn new(name: &str, transform: impl Fn(&str) -> String + Send + Sync + 'static) -> Self {
77            Self {
78                name: name.to_string(),
79                transform: Box::new(transform),
80            }
81        }
82
83        fn uppercase(name: &str) -> Self {
84            Self::new(name, |s| s.to_uppercase())
85        }
86
87        fn append(name: &str, suffix: &'static str) -> Self {
88            Self::new(name, move |s| format!("{}{}", s, suffix))
89        }
90
91        fn prepend(name: &str, prefix: &'static str) -> Self {
92            Self::new(name, move |s| format!("{}{}", prefix, s))
93        }
94    }
95
96    #[async_trait]
97    impl Callable for MockCallable {
98        fn name(&self) -> &str {
99            &self.name
100        }
101
102        async fn run(&self, input: &str) -> anyhow::Result<String> {
103            Ok((self.transform)(input))
104        }
105    }
106
107    #[tokio::test]
108    async fn test_sequential_empty() {
109        let flow: SequentialFlow<MockCallable> = SequentialFlow::new("empty");
110        assert!(flow.is_empty());
111        assert_eq!(flow.len(), 0);
112
113        let result = flow.execute("input").await.unwrap();
114        assert_eq!(result, "input"); // No steps = input passes through
115    }
116
117    #[tokio::test]
118    async fn test_sequential_single_step() {
119        let step = Arc::new(MockCallable::uppercase("upper"));
120        let flow = SequentialFlow::new("single").add_step(step);
121
122        assert_eq!(flow.len(), 1);
123        assert!(!flow.is_empty());
124        assert_eq!(flow.name(), "single");
125
126        let result = flow.execute("hello").await.unwrap();
127        assert_eq!(result, "HELLO");
128    }
129
130    #[tokio::test]
131    async fn test_sequential_multiple_steps() {
132        let flow = SequentialFlow::new("chain")
133            .add_step(Arc::new(MockCallable::uppercase("step1")))
134            .add_step(Arc::new(MockCallable::append("step2", "!")))
135            .add_step(Arc::new(MockCallable::prepend("step3", ">> ")));
136
137        assert_eq!(flow.len(), 3);
138
139        let result = flow.execute("hello").await.unwrap();
140        assert_eq!(result, ">> HELLO!");
141    }
142
143    #[tokio::test]
144    async fn test_sequential_with_steps() {
145        let steps = vec![
146            Arc::new(MockCallable::uppercase("s1")),
147            Arc::new(MockCallable::append("s2", "_done")),
148        ];
149        let flow = SequentialFlow::new("batch").with_steps(steps);
150
151        assert_eq!(flow.len(), 2);
152        let result = flow.execute("test").await.unwrap();
153        assert_eq!(result, "TEST_done");
154    }
155
156    #[tokio::test]
157    async fn test_sequential_error_propagation() {
158        struct FailingCallable {
159            name: String,
160            fail_on_call: usize,
161            call_count: std::sync::atomic::AtomicUsize,
162        }
163
164        impl FailingCallable {
165            fn new(name: &str, fail_on: usize) -> Self {
166                Self {
167                    name: name.to_string(),
168                    fail_on_call: fail_on,
169                    call_count: std::sync::atomic::AtomicUsize::new(0),
170                }
171            }
172        }
173
174        #[async_trait]
175        impl Callable for FailingCallable {
176            fn name(&self) -> &str {
177                &self.name
178            }
179
180            async fn run(&self, input: &str) -> anyhow::Result<String> {
181                let n = self
182                    .call_count
183                    .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
184                if n >= self.fail_on_call {
185                    anyhow::bail!("Intentional failure at step {}", n)
186                }
187                Ok(input.to_uppercase())
188            }
189        }
190
191        // Flow of 3 FailingCallable steps, second one fails
192        let flow = SequentialFlow::new("with_error")
193            .add_step(Arc::new(FailingCallable::new("step1", 10))) // Won't fail
194            .add_step(Arc::new(FailingCallable::new("step2", 0))) // Fails immediately
195            .add_step(Arc::new(FailingCallable::new("step3", 10))); // Never reached
196
197        let result = flow.execute("hello").await;
198        assert!(result.is_err());
199        assert!(result
200            .unwrap_err()
201            .to_string()
202            .contains("Intentional failure"));
203    }
204
205    #[tokio::test]
206    async fn test_sequential_preserves_order() {
207        use std::sync::atomic::{AtomicUsize, Ordering};
208
209        let counter = Arc::new(AtomicUsize::new(0));
210        let execution_order = Arc::new(std::sync::Mutex::new(Vec::new()));
211
212        struct OrderTracker {
213            name: String,
214            counter: Arc<AtomicUsize>,
215            order: Arc<std::sync::Mutex<Vec<usize>>>,
216        }
217
218        #[async_trait]
219        impl Callable for OrderTracker {
220            fn name(&self) -> &str {
221                &self.name
222            }
223
224            async fn run(&self, input: &str) -> anyhow::Result<String> {
225                let n = self.counter.fetch_add(1, Ordering::SeqCst);
226                self.order.lock().unwrap().push(n);
227                Ok(input.to_string())
228            }
229        }
230
231        let flow = SequentialFlow::new("ordered")
232            .add_step(Arc::new(OrderTracker {
233                name: "first".to_string(),
234                counter: counter.clone(),
235                order: execution_order.clone(),
236            }))
237            .add_step(Arc::new(OrderTracker {
238                name: "second".to_string(),
239                counter: counter.clone(),
240                order: execution_order.clone(),
241            }))
242            .add_step(Arc::new(OrderTracker {
243                name: "third".to_string(),
244                counter: counter.clone(),
245                order: execution_order.clone(),
246            }));
247
248        flow.execute("test").await.unwrap();
249
250        let order = execution_order.lock().unwrap();
251        assert_eq!(*order, vec![0, 1, 2]);
252    }
253}