enact_core/flow/
sequential.rs1use crate::callable::Callable;
6use std::sync::Arc;
7
8pub struct SequentialFlow<C: Callable> {
10 steps: Vec<Arc<C>>,
12 name: String,
14}
15
16impl<C: Callable> SequentialFlow<C> {
17 pub fn new(name: impl Into<String>) -> Self {
19 Self {
20 steps: Vec::new(),
21 name: name.into(),
22 }
23 }
24
25 pub fn add_step(mut self, callable: Arc<C>) -> Self {
27 self.steps.push(callable);
28 self
29 }
30
31 pub fn with_steps(mut self, callables: Vec<Arc<C>>) -> Self {
33 self.steps.extend(callables);
34 self
35 }
36
37 pub async fn execute(&self, input: &str) -> anyhow::Result<String> {
39 let mut current_output = input.to_string();
40
41 for step in &self.steps {
42 current_output = step.run(¤t_output).await?;
43 }
44
45 Ok(current_output)
46 }
47
48 pub fn name(&self) -> &str {
50 &self.name
51 }
52
53 pub fn len(&self) -> usize {
55 self.steps.len()
56 }
57
58 pub fn is_empty(&self) -> bool {
60 self.steps.is_empty()
61 }
62}
63
64#[cfg(test)]
65mod tests {
66 use super::*;
67 use async_trait::async_trait;
68
69 struct MockCallable {
71 name: String,
72 transform: Box<dyn Fn(&str) -> String + Send + Sync>,
73 }
74
75 impl MockCallable {
76 fn new(name: &str, transform: impl Fn(&str) -> String + Send + Sync + 'static) -> Self {
77 Self {
78 name: name.to_string(),
79 transform: Box::new(transform),
80 }
81 }
82
83 fn uppercase(name: &str) -> Self {
84 Self::new(name, |s| s.to_uppercase())
85 }
86
87 fn append(name: &str, suffix: &'static str) -> Self {
88 Self::new(name, move |s| format!("{}{}", s, suffix))
89 }
90
91 fn prepend(name: &str, prefix: &'static str) -> Self {
92 Self::new(name, move |s| format!("{}{}", prefix, s))
93 }
94 }
95
96 #[async_trait]
97 impl Callable for MockCallable {
98 fn name(&self) -> &str {
99 &self.name
100 }
101
102 async fn run(&self, input: &str) -> anyhow::Result<String> {
103 Ok((self.transform)(input))
104 }
105 }
106
107 #[tokio::test]
108 async fn test_sequential_empty() {
109 let flow: SequentialFlow<MockCallable> = SequentialFlow::new("empty");
110 assert!(flow.is_empty());
111 assert_eq!(flow.len(), 0);
112
113 let result = flow.execute("input").await.unwrap();
114 assert_eq!(result, "input"); }
116
117 #[tokio::test]
118 async fn test_sequential_single_step() {
119 let step = Arc::new(MockCallable::uppercase("upper"));
120 let flow = SequentialFlow::new("single").add_step(step);
121
122 assert_eq!(flow.len(), 1);
123 assert!(!flow.is_empty());
124 assert_eq!(flow.name(), "single");
125
126 let result = flow.execute("hello").await.unwrap();
127 assert_eq!(result, "HELLO");
128 }
129
130 #[tokio::test]
131 async fn test_sequential_multiple_steps() {
132 let flow = SequentialFlow::new("chain")
133 .add_step(Arc::new(MockCallable::uppercase("step1")))
134 .add_step(Arc::new(MockCallable::append("step2", "!")))
135 .add_step(Arc::new(MockCallable::prepend("step3", ">> ")));
136
137 assert_eq!(flow.len(), 3);
138
139 let result = flow.execute("hello").await.unwrap();
140 assert_eq!(result, ">> HELLO!");
141 }
142
143 #[tokio::test]
144 async fn test_sequential_with_steps() {
145 let steps = vec![
146 Arc::new(MockCallable::uppercase("s1")),
147 Arc::new(MockCallable::append("s2", "_done")),
148 ];
149 let flow = SequentialFlow::new("batch").with_steps(steps);
150
151 assert_eq!(flow.len(), 2);
152 let result = flow.execute("test").await.unwrap();
153 assert_eq!(result, "TEST_done");
154 }
155
156 #[tokio::test]
157 async fn test_sequential_error_propagation() {
158 struct FailingCallable {
159 name: String,
160 fail_on_call: usize,
161 call_count: std::sync::atomic::AtomicUsize,
162 }
163
164 impl FailingCallable {
165 fn new(name: &str, fail_on: usize) -> Self {
166 Self {
167 name: name.to_string(),
168 fail_on_call: fail_on,
169 call_count: std::sync::atomic::AtomicUsize::new(0),
170 }
171 }
172 }
173
174 #[async_trait]
175 impl Callable for FailingCallable {
176 fn name(&self) -> &str {
177 &self.name
178 }
179
180 async fn run(&self, input: &str) -> anyhow::Result<String> {
181 let n = self
182 .call_count
183 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
184 if n >= self.fail_on_call {
185 anyhow::bail!("Intentional failure at step {}", n)
186 }
187 Ok(input.to_uppercase())
188 }
189 }
190
191 let flow = SequentialFlow::new("with_error")
193 .add_step(Arc::new(FailingCallable::new("step1", 10))) .add_step(Arc::new(FailingCallable::new("step2", 0))) .add_step(Arc::new(FailingCallable::new("step3", 10))); let result = flow.execute("hello").await;
198 assert!(result.is_err());
199 assert!(result
200 .unwrap_err()
201 .to_string()
202 .contains("Intentional failure"));
203 }
204
205 #[tokio::test]
206 async fn test_sequential_preserves_order() {
207 use std::sync::atomic::{AtomicUsize, Ordering};
208
209 let counter = Arc::new(AtomicUsize::new(0));
210 let execution_order = Arc::new(std::sync::Mutex::new(Vec::new()));
211
212 struct OrderTracker {
213 name: String,
214 counter: Arc<AtomicUsize>,
215 order: Arc<std::sync::Mutex<Vec<usize>>>,
216 }
217
218 #[async_trait]
219 impl Callable for OrderTracker {
220 fn name(&self) -> &str {
221 &self.name
222 }
223
224 async fn run(&self, input: &str) -> anyhow::Result<String> {
225 let n = self.counter.fetch_add(1, Ordering::SeqCst);
226 self.order.lock().unwrap().push(n);
227 Ok(input.to_string())
228 }
229 }
230
231 let flow = SequentialFlow::new("ordered")
232 .add_step(Arc::new(OrderTracker {
233 name: "first".to_string(),
234 counter: counter.clone(),
235 order: execution_order.clone(),
236 }))
237 .add_step(Arc::new(OrderTracker {
238 name: "second".to_string(),
239 counter: counter.clone(),
240 order: execution_order.clone(),
241 }))
242 .add_step(Arc::new(OrderTracker {
243 name: "third".to_string(),
244 counter: counter.clone(),
245 order: execution_order.clone(),
246 }));
247
248 flow.execute("test").await.unwrap();
249
250 let order = execution_order.lock().unwrap();
251 assert_eq!(*order, vec![0, 1, 2]);
252 }
253}