Skip to main content

wfe_core/primitives/
foreach_step.rs

1use async_trait::async_trait;
2use serde_json::json;
3
4use crate::models::ExecutionResult;
5use crate::traits::step::{StepBody, StepExecutionContext};
6
7/// A step that iterates over a collection, branching for each element.
8pub struct ForEachStep {
9    pub collection: Vec<serde_json::Value>,
10    pub run_parallel: bool,
11}
12
13impl Default for ForEachStep {
14    fn default() -> Self {
15        Self {
16            collection: Vec::new(),
17            run_parallel: true,
18        }
19    }
20}
21
22#[async_trait]
23impl StepBody for ForEachStep {
24    async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
25        if self.collection.is_empty() {
26            return Ok(ExecutionResult::next());
27        }
28
29        if self.run_parallel {
30            // Parallel: branch with all collection values at once.
31            let children_active = context
32                .persistence_data
33                .and_then(|d| d.get("children_active"))
34                .and_then(|v| v.as_bool())
35                .unwrap_or(false);
36
37            if children_active {
38                let mut scope = context.execution_pointer.scope.clone();
39                scope.push(context.execution_pointer.id.clone());
40
41                if context.workflow.is_branch_complete(&scope) {
42                    Ok(ExecutionResult::next())
43                } else {
44                    Ok(ExecutionResult::persist(json!({"children_active": true})))
45                }
46            } else {
47                Ok(ExecutionResult::branch(
48                    self.collection.clone(),
49                    Some(json!({"children_active": true})),
50                ))
51            }
52        } else {
53            // Sequential: process one item at a time using current_index.
54            let current_index = context
55                .persistence_data
56                .and_then(|d| d.get("current_index"))
57                .and_then(|v| v.as_u64())
58                .unwrap_or(0) as usize;
59
60            let children_active = context
61                .persistence_data
62                .and_then(|d| d.get("children_active"))
63                .and_then(|v| v.as_bool())
64                .unwrap_or(false);
65
66            if children_active {
67                // Check if current child is complete.
68                let mut scope = context.execution_pointer.scope.clone();
69                scope.push(context.execution_pointer.id.clone());
70
71                if context.workflow.is_branch_complete(&scope) {
72                    let next_index = current_index + 1;
73                    if next_index >= self.collection.len() {
74                        // All items processed.
75                        Ok(ExecutionResult::next())
76                    } else {
77                        // Advance to next item.
78                        Ok(ExecutionResult::branch(
79                            vec![self.collection[next_index].clone()],
80                            Some(json!({"children_active": true, "current_index": next_index})),
81                        ))
82                    }
83                } else {
84                    Ok(ExecutionResult::persist(
85                        json!({"children_active": true, "current_index": current_index}),
86                    ))
87                }
88            } else {
89                // Start first item.
90                Ok(ExecutionResult::branch(
91                    vec![self.collection[current_index].clone()],
92                    Some(json!({"children_active": true, "current_index": current_index})),
93                ))
94            }
95        }
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use super::*;
102    use crate::models::{ExecutionPointer, PointerStatus};
103    use crate::primitives::test_helpers::*;
104
105    #[tokio::test]
106    async fn empty_collection_proceeds() {
107        let mut step = ForEachStep {
108            collection: vec![],
109            run_parallel: true,
110        };
111        let pointer = ExecutionPointer::new(0);
112        let wf_step = default_step();
113        let workflow = default_workflow();
114        let ctx = make_context(&pointer, &wf_step, &workflow);
115
116        let result = step.run(&ctx).await.unwrap();
117        assert!(result.proceed);
118    }
119
120    #[tokio::test]
121    async fn parallel_branches_all_items() {
122        let mut step = ForEachStep {
123            collection: vec![json!(1), json!(2), json!(3)],
124            run_parallel: true,
125        };
126        let pointer = ExecutionPointer::new(0);
127        let wf_step = default_step();
128        let workflow = default_workflow();
129        let ctx = make_context(&pointer, &wf_step, &workflow);
130
131        let result = step.run(&ctx).await.unwrap();
132        assert!(!result.proceed);
133        assert_eq!(result.branch_values, Some(vec![json!(1), json!(2), json!(3)]));
134    }
135
136    #[tokio::test]
137    async fn parallel_complete_proceeds() {
138        let mut step = ForEachStep {
139            collection: vec![json!(1), json!(2)],
140            run_parallel: true,
141        };
142        let mut pointer = ExecutionPointer::new(0);
143        pointer.persistence_data = Some(json!({"children_active": true}));
144
145        let wf_step = default_step();
146        let mut workflow = default_workflow();
147        let mut child = ExecutionPointer::new(1);
148        child.scope = vec![pointer.id.clone()];
149        child.status = PointerStatus::Complete;
150        workflow.execution_pointers.push(child);
151
152        let ctx = make_context(&pointer, &wf_step, &workflow);
153        let result = step.run(&ctx).await.unwrap();
154        assert!(result.proceed);
155    }
156
157    #[tokio::test]
158    async fn sequential_starts_first_item() {
159        let mut step = ForEachStep {
160            collection: vec![json!("a"), json!("b"), json!("c")],
161            run_parallel: false,
162        };
163        let pointer = ExecutionPointer::new(0);
164        let wf_step = default_step();
165        let workflow = default_workflow();
166        let ctx = make_context(&pointer, &wf_step, &workflow);
167
168        let result = step.run(&ctx).await.unwrap();
169        assert!(!result.proceed);
170        assert_eq!(result.branch_values, Some(vec![json!("a")]));
171        assert_eq!(
172            result.persistence_data,
173            Some(json!({"children_active": true, "current_index": 0}))
174        );
175    }
176
177    #[tokio::test]
178    async fn sequential_advances_to_next_item() {
179        let mut step = ForEachStep {
180            collection: vec![json!("a"), json!("b"), json!("c")],
181            run_parallel: false,
182        };
183        let mut pointer = ExecutionPointer::new(0);
184        pointer.persistence_data = Some(json!({"children_active": true, "current_index": 0}));
185
186        let wf_step = default_step();
187        let mut workflow = default_workflow();
188        let mut child = ExecutionPointer::new(1);
189        child.scope = vec![pointer.id.clone()];
190        child.status = PointerStatus::Complete;
191        workflow.execution_pointers.push(child);
192
193        let ctx = make_context(&pointer, &wf_step, &workflow);
194        let result = step.run(&ctx).await.unwrap();
195        assert!(!result.proceed);
196        assert_eq!(result.branch_values, Some(vec![json!("b")]));
197        assert_eq!(
198            result.persistence_data,
199            Some(json!({"children_active": true, "current_index": 1}))
200        );
201    }
202
203    #[tokio::test]
204    async fn sequential_completes_after_last_item() {
205        let mut step = ForEachStep {
206            collection: vec![json!("a"), json!("b")],
207            run_parallel: false,
208        };
209        let mut pointer = ExecutionPointer::new(0);
210        pointer.persistence_data = Some(json!({"children_active": true, "current_index": 1}));
211
212        let wf_step = default_step();
213        let mut workflow = default_workflow();
214        let mut child = ExecutionPointer::new(1);
215        child.scope = vec![pointer.id.clone()];
216        child.status = PointerStatus::Complete;
217        workflow.execution_pointers.push(child);
218
219        let ctx = make_context(&pointer, &wf_step, &workflow);
220        let result = step.run(&ctx).await.unwrap();
221        assert!(result.proceed);
222    }
223}