wfe_core/primitives/
foreach_step.rs1use async_trait::async_trait;
2use serde_json::json;
3
4use crate::models::ExecutionResult;
5use crate::traits::step::{StepBody, StepExecutionContext};
6
7pub struct ForEachStep {
9 pub collection: Vec<serde_json::Value>,
11 pub run_parallel: bool,
13}
14
15impl Default for ForEachStep {
16 fn default() -> Self {
17 Self {
18 collection: Vec::new(),
19 run_parallel: true,
20 }
21 }
22}
23
24#[async_trait]
25impl StepBody for ForEachStep {
26 async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
27 if self.collection.is_empty() {
28 return Ok(ExecutionResult::next());
29 }
30
31 if self.run_parallel {
32 let children_active = context
34 .persistence_data
35 .and_then(|d| d.get("children_active"))
36 .and_then(|v| v.as_bool())
37 .unwrap_or(false);
38
39 if children_active {
40 let mut scope = context.execution_pointer.scope.clone();
41 scope.push(context.execution_pointer.id.clone());
42
43 if context.workflow.is_branch_complete(&scope) {
44 Ok(ExecutionResult::next())
45 } else {
46 Ok(ExecutionResult::persist(json!({"children_active": true})))
47 }
48 } else {
49 Ok(ExecutionResult::branch(
50 self.collection.clone(),
51 Some(json!({"children_active": true})),
52 ))
53 }
54 } else {
55 let current_index = context
57 .persistence_data
58 .and_then(|d| d.get("current_index"))
59 .and_then(|v| v.as_u64())
60 .unwrap_or(0) as usize;
61
62 let children_active = context
63 .persistence_data
64 .and_then(|d| d.get("children_active"))
65 .and_then(|v| v.as_bool())
66 .unwrap_or(false);
67
68 if children_active {
69 let mut scope = context.execution_pointer.scope.clone();
71 scope.push(context.execution_pointer.id.clone());
72
73 if context.workflow.is_branch_complete(&scope) {
74 let next_index = current_index + 1;
75 if next_index >= self.collection.len() {
76 Ok(ExecutionResult::next())
78 } else {
79 Ok(ExecutionResult::branch(
81 vec![self.collection[next_index].clone()],
82 Some(json!({"children_active": true, "current_index": next_index})),
83 ))
84 }
85 } else {
86 Ok(ExecutionResult::persist(
87 json!({"children_active": true, "current_index": current_index}),
88 ))
89 }
90 } else {
91 Ok(ExecutionResult::branch(
93 vec![self.collection[current_index].clone()],
94 Some(json!({"children_active": true, "current_index": current_index})),
95 ))
96 }
97 }
98 }
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104 use crate::models::{ExecutionPointer, PointerStatus};
105 use crate::primitives::test_helpers::*;
106
107 #[tokio::test]
108 async fn empty_collection_proceeds() {
109 let mut step = ForEachStep {
110 collection: vec![],
111 run_parallel: true,
112 };
113 let pointer = ExecutionPointer::new(0);
114 let wf_step = default_step();
115 let workflow = default_workflow();
116 let ctx = make_context(&pointer, &wf_step, &workflow);
117
118 let result = step.run(&ctx).await.unwrap();
119 assert!(result.proceed);
120 }
121
122 #[tokio::test]
123 async fn parallel_branches_all_items() {
124 let mut step = ForEachStep {
125 collection: vec![json!(1), json!(2), json!(3)],
126 run_parallel: true,
127 };
128 let pointer = ExecutionPointer::new(0);
129 let wf_step = default_step();
130 let workflow = default_workflow();
131 let ctx = make_context(&pointer, &wf_step, &workflow);
132
133 let result = step.run(&ctx).await.unwrap();
134 assert!(!result.proceed);
135 assert_eq!(
136 result.branch_values,
137 Some(vec![json!(1), json!(2), json!(3)])
138 );
139 }
140
141 #[tokio::test]
142 async fn parallel_complete_proceeds() {
143 let mut step = ForEachStep {
144 collection: vec![json!(1), json!(2)],
145 run_parallel: true,
146 };
147 let mut pointer = ExecutionPointer::new(0);
148 pointer.persistence_data = Some(json!({"children_active": true}));
149
150 let wf_step = default_step();
151 let mut workflow = default_workflow();
152 let mut child = ExecutionPointer::new(1);
153 child.scope = vec![pointer.id.clone()];
154 child.status = PointerStatus::Complete;
155 workflow.execution_pointers.push(child);
156
157 let ctx = make_context(&pointer, &wf_step, &workflow);
158 let result = step.run(&ctx).await.unwrap();
159 assert!(result.proceed);
160 }
161
162 #[tokio::test]
163 async fn sequential_starts_first_item() {
164 let mut step = ForEachStep {
165 collection: vec![json!("a"), json!("b"), json!("c")],
166 run_parallel: false,
167 };
168 let pointer = ExecutionPointer::new(0);
169 let wf_step = default_step();
170 let workflow = default_workflow();
171 let ctx = make_context(&pointer, &wf_step, &workflow);
172
173 let result = step.run(&ctx).await.unwrap();
174 assert!(!result.proceed);
175 assert_eq!(result.branch_values, Some(vec![json!("a")]));
176 assert_eq!(
177 result.persistence_data,
178 Some(json!({"children_active": true, "current_index": 0}))
179 );
180 }
181
182 #[tokio::test]
183 async fn sequential_advances_to_next_item() {
184 let mut step = ForEachStep {
185 collection: vec![json!("a"), json!("b"), json!("c")],
186 run_parallel: false,
187 };
188 let mut pointer = ExecutionPointer::new(0);
189 pointer.persistence_data = Some(json!({"children_active": true, "current_index": 0}));
190
191 let wf_step = default_step();
192 let mut workflow = default_workflow();
193 let mut child = ExecutionPointer::new(1);
194 child.scope = vec![pointer.id.clone()];
195 child.status = PointerStatus::Complete;
196 workflow.execution_pointers.push(child);
197
198 let ctx = make_context(&pointer, &wf_step, &workflow);
199 let result = step.run(&ctx).await.unwrap();
200 assert!(!result.proceed);
201 assert_eq!(result.branch_values, Some(vec![json!("b")]));
202 assert_eq!(
203 result.persistence_data,
204 Some(json!({"children_active": true, "current_index": 1}))
205 );
206 }
207
208 #[tokio::test]
209 async fn sequential_completes_after_last_item() {
210 let mut step = ForEachStep {
211 collection: vec![json!("a"), json!("b")],
212 run_parallel: false,
213 };
214 let mut pointer = ExecutionPointer::new(0);
215 pointer.persistence_data = Some(json!({"children_active": true, "current_index": 1}));
216
217 let wf_step = default_step();
218 let mut workflow = default_workflow();
219 let mut child = ExecutionPointer::new(1);
220 child.scope = vec![pointer.id.clone()];
221 child.status = PointerStatus::Complete;
222 workflow.execution_pointers.push(child);
223
224 let ctx = make_context(&pointer, &wf_step, &workflow);
225 let result = step.run(&ctx).await.unwrap();
226 assert!(result.proceed);
227 }
228}