wfe_core/primitives/
foreach_step.rs1use async_trait::async_trait;
2use serde_json::json;
3
4use crate::models::ExecutionResult;
5use crate::traits::step::{StepBody, StepExecutionContext};
6
7pub struct ForEachStep {
9 pub collection: Vec<serde_json::Value>,
10 pub run_parallel: bool,
11}
12
13impl Default for ForEachStep {
14 fn default() -> Self {
15 Self {
16 collection: Vec::new(),
17 run_parallel: true,
18 }
19 }
20}
21
22#[async_trait]
23impl StepBody for ForEachStep {
24 async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
25 if self.collection.is_empty() {
26 return Ok(ExecutionResult::next());
27 }
28
29 if self.run_parallel {
30 let children_active = context
32 .persistence_data
33 .and_then(|d| d.get("children_active"))
34 .and_then(|v| v.as_bool())
35 .unwrap_or(false);
36
37 if children_active {
38 let mut scope = context.execution_pointer.scope.clone();
39 scope.push(context.execution_pointer.id.clone());
40
41 if context.workflow.is_branch_complete(&scope) {
42 Ok(ExecutionResult::next())
43 } else {
44 Ok(ExecutionResult::persist(json!({"children_active": true})))
45 }
46 } else {
47 Ok(ExecutionResult::branch(
48 self.collection.clone(),
49 Some(json!({"children_active": true})),
50 ))
51 }
52 } else {
53 let current_index = context
55 .persistence_data
56 .and_then(|d| d.get("current_index"))
57 .and_then(|v| v.as_u64())
58 .unwrap_or(0) as usize;
59
60 let children_active = context
61 .persistence_data
62 .and_then(|d| d.get("children_active"))
63 .and_then(|v| v.as_bool())
64 .unwrap_or(false);
65
66 if children_active {
67 let mut scope = context.execution_pointer.scope.clone();
69 scope.push(context.execution_pointer.id.clone());
70
71 if context.workflow.is_branch_complete(&scope) {
72 let next_index = current_index + 1;
73 if next_index >= self.collection.len() {
74 Ok(ExecutionResult::next())
76 } else {
77 Ok(ExecutionResult::branch(
79 vec![self.collection[next_index].clone()],
80 Some(json!({"children_active": true, "current_index": next_index})),
81 ))
82 }
83 } else {
84 Ok(ExecutionResult::persist(
85 json!({"children_active": true, "current_index": current_index}),
86 ))
87 }
88 } else {
89 Ok(ExecutionResult::branch(
91 vec![self.collection[current_index].clone()],
92 Some(json!({"children_active": true, "current_index": current_index})),
93 ))
94 }
95 }
96 }
97}
98
99#[cfg(test)]
100mod tests {
101 use super::*;
102 use crate::models::{ExecutionPointer, PointerStatus};
103 use crate::primitives::test_helpers::*;
104
105 #[tokio::test]
106 async fn empty_collection_proceeds() {
107 let mut step = ForEachStep {
108 collection: vec![],
109 run_parallel: true,
110 };
111 let pointer = ExecutionPointer::new(0);
112 let wf_step = default_step();
113 let workflow = default_workflow();
114 let ctx = make_context(&pointer, &wf_step, &workflow);
115
116 let result = step.run(&ctx).await.unwrap();
117 assert!(result.proceed);
118 }
119
120 #[tokio::test]
121 async fn parallel_branches_all_items() {
122 let mut step = ForEachStep {
123 collection: vec![json!(1), json!(2), json!(3)],
124 run_parallel: true,
125 };
126 let pointer = ExecutionPointer::new(0);
127 let wf_step = default_step();
128 let workflow = default_workflow();
129 let ctx = make_context(&pointer, &wf_step, &workflow);
130
131 let result = step.run(&ctx).await.unwrap();
132 assert!(!result.proceed);
133 assert_eq!(result.branch_values, Some(vec![json!(1), json!(2), json!(3)]));
134 }
135
136 #[tokio::test]
137 async fn parallel_complete_proceeds() {
138 let mut step = ForEachStep {
139 collection: vec![json!(1), json!(2)],
140 run_parallel: true,
141 };
142 let mut pointer = ExecutionPointer::new(0);
143 pointer.persistence_data = Some(json!({"children_active": true}));
144
145 let wf_step = default_step();
146 let mut workflow = default_workflow();
147 let mut child = ExecutionPointer::new(1);
148 child.scope = vec![pointer.id.clone()];
149 child.status = PointerStatus::Complete;
150 workflow.execution_pointers.push(child);
151
152 let ctx = make_context(&pointer, &wf_step, &workflow);
153 let result = step.run(&ctx).await.unwrap();
154 assert!(result.proceed);
155 }
156
157 #[tokio::test]
158 async fn sequential_starts_first_item() {
159 let mut step = ForEachStep {
160 collection: vec![json!("a"), json!("b"), json!("c")],
161 run_parallel: false,
162 };
163 let pointer = ExecutionPointer::new(0);
164 let wf_step = default_step();
165 let workflow = default_workflow();
166 let ctx = make_context(&pointer, &wf_step, &workflow);
167
168 let result = step.run(&ctx).await.unwrap();
169 assert!(!result.proceed);
170 assert_eq!(result.branch_values, Some(vec![json!("a")]));
171 assert_eq!(
172 result.persistence_data,
173 Some(json!({"children_active": true, "current_index": 0}))
174 );
175 }
176
177 #[tokio::test]
178 async fn sequential_advances_to_next_item() {
179 let mut step = ForEachStep {
180 collection: vec![json!("a"), json!("b"), json!("c")],
181 run_parallel: false,
182 };
183 let mut pointer = ExecutionPointer::new(0);
184 pointer.persistence_data = Some(json!({"children_active": true, "current_index": 0}));
185
186 let wf_step = default_step();
187 let mut workflow = default_workflow();
188 let mut child = ExecutionPointer::new(1);
189 child.scope = vec![pointer.id.clone()];
190 child.status = PointerStatus::Complete;
191 workflow.execution_pointers.push(child);
192
193 let ctx = make_context(&pointer, &wf_step, &workflow);
194 let result = step.run(&ctx).await.unwrap();
195 assert!(!result.proceed);
196 assert_eq!(result.branch_values, Some(vec![json!("b")]));
197 assert_eq!(
198 result.persistence_data,
199 Some(json!({"children_active": true, "current_index": 1}))
200 );
201 }
202
203 #[tokio::test]
204 async fn sequential_completes_after_last_item() {
205 let mut step = ForEachStep {
206 collection: vec![json!("a"), json!("b")],
207 run_parallel: false,
208 };
209 let mut pointer = ExecutionPointer::new(0);
210 pointer.persistence_data = Some(json!({"children_active": true, "current_index": 1}));
211
212 let wf_step = default_step();
213 let mut workflow = default_workflow();
214 let mut child = ExecutionPointer::new(1);
215 child.scope = vec![pointer.id.clone()];
216 child.status = PointerStatus::Complete;
217 workflow.execution_pointers.push(child);
218
219 let ctx = make_context(&pointer, &wf_step, &workflow);
220 let result = step.run(&ctx).await.unwrap();
221 assert!(result.proceed);
222 }
223}