Skip to main content

wfe_core/primitives/
foreach_step.rs

1use async_trait::async_trait;
2use serde_json::json;
3
4use crate::models::ExecutionResult;
5use crate::traits::step::{StepBody, StepExecutionContext};
6
7/// A step that iterates over a collection, branching for each element.
8pub struct ForEachStep {
9    /// Collection.
10    pub collection: Vec<serde_json::Value>,
11    /// Run parallel.
12    pub run_parallel: bool,
13}
14
15impl Default for ForEachStep {
16    fn default() -> Self {
17        Self {
18            collection: Vec::new(),
19            run_parallel: true,
20        }
21    }
22}
23
24#[async_trait]
25impl StepBody for ForEachStep {
26    async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
27        if self.collection.is_empty() {
28            return Ok(ExecutionResult::next());
29        }
30
31        if self.run_parallel {
32            // Parallel: branch with all collection values at once.
33            let children_active = context
34                .persistence_data
35                .and_then(|d| d.get("children_active"))
36                .and_then(|v| v.as_bool())
37                .unwrap_or(false);
38
39            if children_active {
40                let mut scope = context.execution_pointer.scope.clone();
41                scope.push(context.execution_pointer.id.clone());
42
43                if context.workflow.is_branch_complete(&scope) {
44                    Ok(ExecutionResult::next())
45                } else {
46                    Ok(ExecutionResult::persist(json!({"children_active": true})))
47                }
48            } else {
49                Ok(ExecutionResult::branch(
50                    self.collection.clone(),
51                    Some(json!({"children_active": true})),
52                ))
53            }
54        } else {
55            // Sequential: process one item at a time using current_index.
56            let current_index = context
57                .persistence_data
58                .and_then(|d| d.get("current_index"))
59                .and_then(|v| v.as_u64())
60                .unwrap_or(0) as usize;
61
62            let children_active = context
63                .persistence_data
64                .and_then(|d| d.get("children_active"))
65                .and_then(|v| v.as_bool())
66                .unwrap_or(false);
67
68            if children_active {
69                // Check if current child is complete.
70                let mut scope = context.execution_pointer.scope.clone();
71                scope.push(context.execution_pointer.id.clone());
72
73                if context.workflow.is_branch_complete(&scope) {
74                    let next_index = current_index + 1;
75                    if next_index >= self.collection.len() {
76                        // All items processed.
77                        Ok(ExecutionResult::next())
78                    } else {
79                        // Advance to next item.
80                        Ok(ExecutionResult::branch(
81                            vec![self.collection[next_index].clone()],
82                            Some(json!({"children_active": true, "current_index": next_index})),
83                        ))
84                    }
85                } else {
86                    Ok(ExecutionResult::persist(
87                        json!({"children_active": true, "current_index": current_index}),
88                    ))
89                }
90            } else {
91                // Start first item.
92                Ok(ExecutionResult::branch(
93                    vec![self.collection[current_index].clone()],
94                    Some(json!({"children_active": true, "current_index": current_index})),
95                ))
96            }
97        }
98    }
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104    use crate::models::{ExecutionPointer, PointerStatus};
105    use crate::primitives::test_helpers::*;
106
107    #[tokio::test]
108    async fn empty_collection_proceeds() {
109        let mut step = ForEachStep {
110            collection: vec![],
111            run_parallel: true,
112        };
113        let pointer = ExecutionPointer::new(0);
114        let wf_step = default_step();
115        let workflow = default_workflow();
116        let ctx = make_context(&pointer, &wf_step, &workflow);
117
118        let result = step.run(&ctx).await.unwrap();
119        assert!(result.proceed);
120    }
121
122    #[tokio::test]
123    async fn parallel_branches_all_items() {
124        let mut step = ForEachStep {
125            collection: vec![json!(1), json!(2), json!(3)],
126            run_parallel: true,
127        };
128        let pointer = ExecutionPointer::new(0);
129        let wf_step = default_step();
130        let workflow = default_workflow();
131        let ctx = make_context(&pointer, &wf_step, &workflow);
132
133        let result = step.run(&ctx).await.unwrap();
134        assert!(!result.proceed);
135        assert_eq!(
136            result.branch_values,
137            Some(vec![json!(1), json!(2), json!(3)])
138        );
139    }
140
141    #[tokio::test]
142    async fn parallel_complete_proceeds() {
143        let mut step = ForEachStep {
144            collection: vec![json!(1), json!(2)],
145            run_parallel: true,
146        };
147        let mut pointer = ExecutionPointer::new(0);
148        pointer.persistence_data = Some(json!({"children_active": true}));
149
150        let wf_step = default_step();
151        let mut workflow = default_workflow();
152        let mut child = ExecutionPointer::new(1);
153        child.scope = vec![pointer.id.clone()];
154        child.status = PointerStatus::Complete;
155        workflow.execution_pointers.push(child);
156
157        let ctx = make_context(&pointer, &wf_step, &workflow);
158        let result = step.run(&ctx).await.unwrap();
159        assert!(result.proceed);
160    }
161
162    #[tokio::test]
163    async fn sequential_starts_first_item() {
164        let mut step = ForEachStep {
165            collection: vec![json!("a"), json!("b"), json!("c")],
166            run_parallel: false,
167        };
168        let pointer = ExecutionPointer::new(0);
169        let wf_step = default_step();
170        let workflow = default_workflow();
171        let ctx = make_context(&pointer, &wf_step, &workflow);
172
173        let result = step.run(&ctx).await.unwrap();
174        assert!(!result.proceed);
175        assert_eq!(result.branch_values, Some(vec![json!("a")]));
176        assert_eq!(
177            result.persistence_data,
178            Some(json!({"children_active": true, "current_index": 0}))
179        );
180    }
181
182    #[tokio::test]
183    async fn sequential_advances_to_next_item() {
184        let mut step = ForEachStep {
185            collection: vec![json!("a"), json!("b"), json!("c")],
186            run_parallel: false,
187        };
188        let mut pointer = ExecutionPointer::new(0);
189        pointer.persistence_data = Some(json!({"children_active": true, "current_index": 0}));
190
191        let wf_step = default_step();
192        let mut workflow = default_workflow();
193        let mut child = ExecutionPointer::new(1);
194        child.scope = vec![pointer.id.clone()];
195        child.status = PointerStatus::Complete;
196        workflow.execution_pointers.push(child);
197
198        let ctx = make_context(&pointer, &wf_step, &workflow);
199        let result = step.run(&ctx).await.unwrap();
200        assert!(!result.proceed);
201        assert_eq!(result.branch_values, Some(vec![json!("b")]));
202        assert_eq!(
203            result.persistence_data,
204            Some(json!({"children_active": true, "current_index": 1}))
205        );
206    }
207
208    #[tokio::test]
209    async fn sequential_completes_after_last_item() {
210        let mut step = ForEachStep {
211            collection: vec![json!("a"), json!("b")],
212            run_parallel: false,
213        };
214        let mut pointer = ExecutionPointer::new(0);
215        pointer.persistence_data = Some(json!({"children_active": true, "current_index": 1}));
216
217        let wf_step = default_step();
218        let mut workflow = default_workflow();
219        let mut child = ExecutionPointer::new(1);
220        child.scope = vec![pointer.id.clone()];
221        child.status = PointerStatus::Complete;
222        workflow.execution_pointers.push(child);
223
224        let ctx = make_context(&pointer, &wf_step, &workflow);
225        let result = step.run(&ctx).await.unwrap();
226        assert!(result.proceed);
227    }
228}