ricecoder_workflows/
parallel_executor.rs

1//! Parallel step execution handler
2//!
3//! Handles execution of multiple steps concurrently within workflows.
4
5use crate::error::{WorkflowError, WorkflowResult};
6use crate::models::{ParallelStep, Workflow, WorkflowState};
7use crate::state::StateManager;
8use std::time::Instant;
9
10/// Executes parallel steps by running multiple steps concurrently
11pub struct ParallelExecutor;
12
13impl ParallelExecutor {
14    /// Execute a parallel step
15    ///
16    /// Executes multiple steps concurrently, respecting the max_concurrency limit.
17    /// Waits for all parallel steps to complete before returning.
18    ///
19    /// # Arguments
20    ///
21    /// * `workflow` - The workflow containing the step
22    /// * `state` - The current workflow state
23    /// * `step_id` - The ID of the parallel step to execute
24    /// * `parallel_step` - The parallel step configuration
25    ///
26    /// # Returns
27    ///
28    /// Returns `Ok(())` if all parallel steps executed successfully,
29    /// or an error if any step failed.
30    pub fn execute_parallel_step(
31        workflow: &Workflow,
32        state: &mut WorkflowState,
33        step_id: &str,
34        parallel_step: &ParallelStep,
35    ) -> WorkflowResult<()> {
36        // Mark step as started
37        StateManager::start_step(state, step_id.to_string());
38
39        let start_time = Instant::now();
40
41        // Execute parallel steps
42        // In a real implementation, this would:
43        // 1. Create tokio tasks for each step
44        // 2. Respect max_concurrency limit
45        // 3. Wait for all tasks to complete
46        // 4. Aggregate results
47        //
48        // For now, we simulate successful execution
49        let parallel_output = Self::execute_parallel_internal(workflow, state, parallel_step)?;
50
51        let duration_ms = start_time.elapsed().as_millis() as u64;
52
53        // Mark step as completed with the aggregated output
54        StateManager::complete_step(
55            state,
56            step_id.to_string(),
57            Some(parallel_output),
58            duration_ms,
59        );
60
61        Ok(())
62    }
63
64    /// Internal parallel execution logic
65    ///
66    /// This is where the actual parallel execution would happen.
67    fn execute_parallel_internal(
68        _workflow: &Workflow,
69        _state: &WorkflowState,
70        parallel_step: &ParallelStep,
71    ) -> WorkflowResult<serde_json::Value> {
72        // In a real implementation, this would:
73        // 1. Create tokio tasks for each step in parallel_step.steps
74        // 2. Limit concurrency to parallel_step.max_concurrency
75        // 3. Wait for all tasks to complete
76        // 4. Aggregate results from all steps
77        // 5. Return the aggregated output
78        //
79        // For now, we return a simulated output
80        Ok(serde_json::json!({
81            "parallel_steps": parallel_step.steps,
82            "max_concurrency": parallel_step.max_concurrency,
83            "status": "completed",
84            "results": {}
85        }))
86    }
87
88    /// Execute parallel steps with a concurrency limit
89    ///
90    /// Executes multiple steps concurrently, limiting the number of concurrent
91    /// executions to the specified max_concurrency value.
92    ///
93    /// # Arguments
94    ///
95    /// * `workflow` - The workflow containing the step
96    /// * `state` - The current workflow state
97    /// * `step_id` - The ID of the parallel step to execute
98    /// * `parallel_step` - The parallel step configuration
99    /// * `max_concurrency` - The maximum number of concurrent executions
100    ///
101    /// # Returns
102    ///
103    /// Returns `Ok(())` if all parallel steps executed successfully,
104    /// or an error if any step failed.
105    pub fn execute_parallel_step_with_limit(
106        workflow: &Workflow,
107        state: &mut WorkflowState,
108        step_id: &str,
109        parallel_step: &ParallelStep,
110        max_concurrency: usize,
111    ) -> WorkflowResult<()> {
112        // Validate max_concurrency
113        if max_concurrency == 0 {
114            return Err(WorkflowError::Invalid(
115                "max_concurrency must be greater than 0".to_string(),
116            ));
117        }
118
119        // Mark step as started
120        StateManager::start_step(state, step_id.to_string());
121
122        let start_time = Instant::now();
123
124        // Create a modified parallel step with the new concurrency limit
125        let mut modified_step = parallel_step.clone();
126        modified_step.max_concurrency = max_concurrency;
127
128        // Execute parallel steps with the new limit
129        let parallel_output = Self::execute_parallel_internal(workflow, state, &modified_step)?;
130
131        let duration_ms = start_time.elapsed().as_millis() as u64;
132
133        // Mark step as completed
134        StateManager::complete_step(
135            state,
136            step_id.to_string(),
137            Some(parallel_output),
138            duration_ms,
139        );
140
141        Ok(())
142    }
143
144    /// Get the steps to execute in parallel
145    pub fn get_parallel_steps(parallel_step: &ParallelStep) -> &[String] {
146        &parallel_step.steps
147    }
148
149    /// Get the max concurrency from a parallel step
150    pub fn get_max_concurrency(parallel_step: &ParallelStep) -> usize {
151        parallel_step.max_concurrency
152    }
153
154    /// Validate a parallel step
155    ///
156    /// Checks that the parallel step is valid:
157    /// - Has at least one step to execute
158    /// - max_concurrency is greater than 0
159    pub fn validate_parallel_step(parallel_step: &ParallelStep) -> WorkflowResult<()> {
160        if parallel_step.steps.is_empty() {
161            return Err(WorkflowError::Invalid(
162                "Parallel step must have at least one step".to_string(),
163            ));
164        }
165
166        if parallel_step.max_concurrency == 0 {
167            return Err(WorkflowError::Invalid(
168                "max_concurrency must be greater than 0".to_string(),
169            ));
170        }
171
172        Ok(())
173    }
174}
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179    use crate::models::{
180        ErrorAction, RiskFactors, StepConfig, StepStatus, StepType, WorkflowConfig, WorkflowStep,
181    };
182
183    fn create_workflow_with_parallel_step() -> Workflow {
184        Workflow {
185            id: "test-workflow".to_string(),
186            name: "Test Workflow".to_string(),
187            description: "A test workflow".to_string(),
188            parameters: vec![],
189            steps: vec![WorkflowStep {
190                id: "parallel-step".to_string(),
191                name: "Parallel Step".to_string(),
192                step_type: StepType::Parallel(ParallelStep {
193                    steps: vec!["step1".to_string(), "step2".to_string()],
194                    max_concurrency: 2,
195                }),
196                config: StepConfig {
197                    config: serde_json::json!({}),
198                },
199                dependencies: vec![],
200                approval_required: false,
201                on_error: ErrorAction::Fail,
202                risk_score: None,
203                risk_factors: RiskFactors::default(),
204            }],
205            config: WorkflowConfig {
206                timeout_ms: None,
207                max_parallel: None,
208            },
209        }
210    }
211
212    #[test]
213    fn test_execute_parallel_step() {
214        let workflow = create_workflow_with_parallel_step();
215        let mut state = StateManager::create_state(&workflow);
216        let parallel_step = ParallelStep {
217            steps: vec!["step1".to_string(), "step2".to_string()],
218            max_concurrency: 2,
219        };
220
221        let result = ParallelExecutor::execute_parallel_step(
222            &workflow,
223            &mut state,
224            "parallel-step",
225            &parallel_step,
226        );
227        assert!(result.is_ok());
228
229        // Verify step is marked as completed
230        let step_result = state.step_results.get("parallel-step");
231        assert!(step_result.is_some());
232        assert_eq!(step_result.unwrap().status, StepStatus::Completed);
233    }
234
235    #[test]
236    fn test_execute_parallel_step_with_limit() {
237        let workflow = create_workflow_with_parallel_step();
238        let mut state = StateManager::create_state(&workflow);
239        let parallel_step = ParallelStep {
240            steps: vec![
241                "step1".to_string(),
242                "step2".to_string(),
243                "step3".to_string(),
244            ],
245            max_concurrency: 2,
246        };
247
248        let result = ParallelExecutor::execute_parallel_step_with_limit(
249            &workflow,
250            &mut state,
251            "parallel-step",
252            &parallel_step,
253            1, // Override to 1 concurrent execution
254        );
255        assert!(result.is_ok());
256
257        // Verify step is marked as completed
258        let step_result = state.step_results.get("parallel-step");
259        assert!(step_result.is_some());
260        assert_eq!(step_result.unwrap().status, StepStatus::Completed);
261    }
262
263    #[test]
264    fn test_get_parallel_steps() {
265        let parallel_step = ParallelStep {
266            steps: vec!["step1".to_string(), "step2".to_string()],
267            max_concurrency: 2,
268        };
269
270        assert_eq!(
271            ParallelExecutor::get_parallel_steps(&parallel_step),
272            &["step1".to_string(), "step2".to_string()]
273        );
274    }
275
276    #[test]
277    fn test_get_max_concurrency() {
278        let parallel_step = ParallelStep {
279            steps: vec!["step1".to_string()],
280            max_concurrency: 4,
281        };
282
283        assert_eq!(ParallelExecutor::get_max_concurrency(&parallel_step), 4);
284    }
285
286    #[test]
287    fn test_validate_parallel_step_valid() {
288        let parallel_step = ParallelStep {
289            steps: vec!["step1".to_string(), "step2".to_string()],
290            max_concurrency: 2,
291        };
292
293        let result = ParallelExecutor::validate_parallel_step(&parallel_step);
294        assert!(result.is_ok());
295    }
296
297    #[test]
298    fn test_validate_parallel_step_empty_steps() {
299        let parallel_step = ParallelStep {
300            steps: vec![],
301            max_concurrency: 2,
302        };
303
304        let result = ParallelExecutor::validate_parallel_step(&parallel_step);
305        assert!(result.is_err());
306    }
307
308    #[test]
309    fn test_validate_parallel_step_zero_concurrency() {
310        let parallel_step = ParallelStep {
311            steps: vec!["step1".to_string()],
312            max_concurrency: 0,
313        };
314
315        let result = ParallelExecutor::validate_parallel_step(&parallel_step);
316        assert!(result.is_err());
317    }
318
319    #[test]
320    fn test_execute_parallel_step_with_limit_zero_concurrency() {
321        let workflow = create_workflow_with_parallel_step();
322        let mut state = StateManager::create_state(&workflow);
323        let parallel_step = ParallelStep {
324            steps: vec!["step1".to_string()],
325            max_concurrency: 2,
326        };
327
328        let result = ParallelExecutor::execute_parallel_step_with_limit(
329            &workflow,
330            &mut state,
331            "parallel-step",
332            &parallel_step,
333            0, // Invalid: 0 concurrency
334        );
335        assert!(result.is_err());
336    }
337}