1use crate::error::{WorkflowError, WorkflowResult};
6use crate::models::{ParallelStep, Workflow, WorkflowState};
7use crate::state::StateManager;
8use std::time::Instant;
9
10pub struct ParallelExecutor;
12
13impl ParallelExecutor {
14 pub fn execute_parallel_step(
31 workflow: &Workflow,
32 state: &mut WorkflowState,
33 step_id: &str,
34 parallel_step: &ParallelStep,
35 ) -> WorkflowResult<()> {
36 StateManager::start_step(state, step_id.to_string());
38
39 let start_time = Instant::now();
40
41 let parallel_output = Self::execute_parallel_internal(workflow, state, parallel_step)?;
50
51 let duration_ms = start_time.elapsed().as_millis() as u64;
52
53 StateManager::complete_step(
55 state,
56 step_id.to_string(),
57 Some(parallel_output),
58 duration_ms,
59 );
60
61 Ok(())
62 }
63
64 fn execute_parallel_internal(
68 _workflow: &Workflow,
69 _state: &WorkflowState,
70 parallel_step: &ParallelStep,
71 ) -> WorkflowResult<serde_json::Value> {
72 Ok(serde_json::json!({
81 "parallel_steps": parallel_step.steps,
82 "max_concurrency": parallel_step.max_concurrency,
83 "status": "completed",
84 "results": {}
85 }))
86 }
87
88 pub fn execute_parallel_step_with_limit(
106 workflow: &Workflow,
107 state: &mut WorkflowState,
108 step_id: &str,
109 parallel_step: &ParallelStep,
110 max_concurrency: usize,
111 ) -> WorkflowResult<()> {
112 if max_concurrency == 0 {
114 return Err(WorkflowError::Invalid(
115 "max_concurrency must be greater than 0".to_string(),
116 ));
117 }
118
119 StateManager::start_step(state, step_id.to_string());
121
122 let start_time = Instant::now();
123
124 let mut modified_step = parallel_step.clone();
126 modified_step.max_concurrency = max_concurrency;
127
128 let parallel_output = Self::execute_parallel_internal(workflow, state, &modified_step)?;
130
131 let duration_ms = start_time.elapsed().as_millis() as u64;
132
133 StateManager::complete_step(
135 state,
136 step_id.to_string(),
137 Some(parallel_output),
138 duration_ms,
139 );
140
141 Ok(())
142 }
143
144 pub fn get_parallel_steps(parallel_step: &ParallelStep) -> &[String] {
146 ¶llel_step.steps
147 }
148
149 pub fn get_max_concurrency(parallel_step: &ParallelStep) -> usize {
151 parallel_step.max_concurrency
152 }
153
154 pub fn validate_parallel_step(parallel_step: &ParallelStep) -> WorkflowResult<()> {
160 if parallel_step.steps.is_empty() {
161 return Err(WorkflowError::Invalid(
162 "Parallel step must have at least one step".to_string(),
163 ));
164 }
165
166 if parallel_step.max_concurrency == 0 {
167 return Err(WorkflowError::Invalid(
168 "max_concurrency must be greater than 0".to_string(),
169 ));
170 }
171
172 Ok(())
173 }
174}
175
176#[cfg(test)]
177mod tests {
178 use super::*;
179 use crate::models::{
180 ErrorAction, RiskFactors, StepConfig, StepStatus, StepType, WorkflowConfig, WorkflowStep,
181 };
182
183 fn create_workflow_with_parallel_step() -> Workflow {
184 Workflow {
185 id: "test-workflow".to_string(),
186 name: "Test Workflow".to_string(),
187 description: "A test workflow".to_string(),
188 parameters: vec![],
189 steps: vec![WorkflowStep {
190 id: "parallel-step".to_string(),
191 name: "Parallel Step".to_string(),
192 step_type: StepType::Parallel(ParallelStep {
193 steps: vec!["step1".to_string(), "step2".to_string()],
194 max_concurrency: 2,
195 }),
196 config: StepConfig {
197 config: serde_json::json!({}),
198 },
199 dependencies: vec![],
200 approval_required: false,
201 on_error: ErrorAction::Fail,
202 risk_score: None,
203 risk_factors: RiskFactors::default(),
204 }],
205 config: WorkflowConfig {
206 timeout_ms: None,
207 max_parallel: None,
208 },
209 }
210 }
211
212 #[test]
213 fn test_execute_parallel_step() {
214 let workflow = create_workflow_with_parallel_step();
215 let mut state = StateManager::create_state(&workflow);
216 let parallel_step = ParallelStep {
217 steps: vec!["step1".to_string(), "step2".to_string()],
218 max_concurrency: 2,
219 };
220
221 let result = ParallelExecutor::execute_parallel_step(
222 &workflow,
223 &mut state,
224 "parallel-step",
225 ¶llel_step,
226 );
227 assert!(result.is_ok());
228
229 let step_result = state.step_results.get("parallel-step");
231 assert!(step_result.is_some());
232 assert_eq!(step_result.unwrap().status, StepStatus::Completed);
233 }
234
235 #[test]
236 fn test_execute_parallel_step_with_limit() {
237 let workflow = create_workflow_with_parallel_step();
238 let mut state = StateManager::create_state(&workflow);
239 let parallel_step = ParallelStep {
240 steps: vec![
241 "step1".to_string(),
242 "step2".to_string(),
243 "step3".to_string(),
244 ],
245 max_concurrency: 2,
246 };
247
248 let result = ParallelExecutor::execute_parallel_step_with_limit(
249 &workflow,
250 &mut state,
251 "parallel-step",
252 ¶llel_step,
253 1, );
255 assert!(result.is_ok());
256
257 let step_result = state.step_results.get("parallel-step");
259 assert!(step_result.is_some());
260 assert_eq!(step_result.unwrap().status, StepStatus::Completed);
261 }
262
263 #[test]
264 fn test_get_parallel_steps() {
265 let parallel_step = ParallelStep {
266 steps: vec!["step1".to_string(), "step2".to_string()],
267 max_concurrency: 2,
268 };
269
270 assert_eq!(
271 ParallelExecutor::get_parallel_steps(¶llel_step),
272 &["step1".to_string(), "step2".to_string()]
273 );
274 }
275
276 #[test]
277 fn test_get_max_concurrency() {
278 let parallel_step = ParallelStep {
279 steps: vec!["step1".to_string()],
280 max_concurrency: 4,
281 };
282
283 assert_eq!(ParallelExecutor::get_max_concurrency(¶llel_step), 4);
284 }
285
286 #[test]
287 fn test_validate_parallel_step_valid() {
288 let parallel_step = ParallelStep {
289 steps: vec!["step1".to_string(), "step2".to_string()],
290 max_concurrency: 2,
291 };
292
293 let result = ParallelExecutor::validate_parallel_step(¶llel_step);
294 assert!(result.is_ok());
295 }
296
297 #[test]
298 fn test_validate_parallel_step_empty_steps() {
299 let parallel_step = ParallelStep {
300 steps: vec![],
301 max_concurrency: 2,
302 };
303
304 let result = ParallelExecutor::validate_parallel_step(¶llel_step);
305 assert!(result.is_err());
306 }
307
308 #[test]
309 fn test_validate_parallel_step_zero_concurrency() {
310 let parallel_step = ParallelStep {
311 steps: vec!["step1".to_string()],
312 max_concurrency: 0,
313 };
314
315 let result = ParallelExecutor::validate_parallel_step(¶llel_step);
316 assert!(result.is_err());
317 }
318
319 #[test]
320 fn test_execute_parallel_step_with_limit_zero_concurrency() {
321 let workflow = create_workflow_with_parallel_step();
322 let mut state = StateManager::create_state(&workflow);
323 let parallel_step = ParallelStep {
324 steps: vec!["step1".to_string()],
325 max_concurrency: 2,
326 };
327
328 let result = ParallelExecutor::execute_parallel_step_with_limit(
329 &workflow,
330 &mut state,
331 "parallel-step",
332 ¶llel_step,
333 0, );
335 assert!(result.is_err());
336 }
337}