Skip to main content

voirs_cli/workflow/
engine.rs

1//! Workflow Execution Engine
2//!
3//! The engine orchestrates workflow execution, managing dependencies,
4//! parallel execution, and state persistence.
5
6use super::{
7    definition::{Step, Workflow},
8    executor::{ExecutionContext, ExecutionResult, StepExecutor},
9    state::{ExecutionState, StateManager},
10    validation::WorkflowValidator,
11    WorkflowStats,
12};
13use crate::error::CliError;
14
15type Result<T> = std::result::Result<T, CliError>;
16use std::collections::{HashMap, HashSet};
17use std::path::PathBuf;
18use std::sync::Arc;
19use std::time::Instant;
20use tokio::sync::{RwLock, Semaphore};
21
22/// Workflow execution engine
23pub struct WorkflowEngine {
24    /// Step executor
25    executor: Arc<StepExecutor>,
26    /// State manager for persistence
27    state_manager: Arc<StateManager>,
28    /// Maximum parallel executions
29    max_parallel: usize,
30    /// Workflow validator
31    validator: WorkflowValidator,
32}
33
34impl WorkflowEngine {
35    /// Create a new workflow engine
36    pub fn new(state_dir: PathBuf, max_parallel: usize) -> Self {
37        Self {
38            executor: Arc::new(StepExecutor::new()),
39            state_manager: Arc::new(StateManager::new(state_dir)),
40            max_parallel,
41            validator: WorkflowValidator::new(),
42        }
43    }
44
45    /// Execute a workflow
46    pub async fn execute(&self, workflow: Workflow) -> Result<ExecutionResult> {
47        // Validate workflow
48        self.validator.validate(&workflow)?;
49
50        let start_time = Instant::now();
51        let workflow_name = workflow.metadata.name.clone();
52
53        // Initialize execution context
54        let mut context = ExecutionContext::new(workflow.clone());
55
56        // Load previous state if resuming
57        if workflow.config.save_state {
58            if let Ok(state) = self.state_manager.load(&workflow_name).await {
59                context.resume_from_state(state);
60            }
61        }
62
63        // Execute steps
64        let result = self.execute_steps(&workflow, &mut context).await;
65
66        // Save final state if configured
67        if workflow.config.save_state {
68            let state = context.get_state();
69            let _ = self.state_manager.save(&workflow_name, &state).await;
70        }
71
72        let duration = start_time.elapsed();
73
74        // Collect statistics
75        let stats = WorkflowStats {
76            total_steps: context.completed_steps().len(),
77            successful_steps: context
78                .completed_steps()
79                .iter()
80                .filter(|(_, r)| r.success)
81                .count(),
82            failed_steps: context
83                .completed_steps()
84                .iter()
85                .filter(|(_, r)| !r.success)
86                .count(),
87            skipped_steps: context.skipped_steps().len(),
88            total_duration_ms: duration.as_millis() as u64,
89            avg_step_duration_ms: if !context.completed_steps().is_empty() {
90                duration.as_millis() as u64 / context.completed_steps().len() as u64
91            } else {
92                0
93            },
94            total_retries: context.total_retries(),
95        };
96
97        match result {
98            Ok(_) => Ok(ExecutionResult::success(
99                workflow_name,
100                "Workflow completed successfully".to_string(),
101                stats,
102            )),
103            Err(e) => Ok(ExecutionResult::failure(
104                workflow_name,
105                format!("Workflow failed: {}", e),
106                stats,
107            )),
108        }
109    }
110
111    /// Execute workflow steps
112    async fn execute_steps(
113        &self,
114        workflow: &Workflow,
115        context: &mut ExecutionContext,
116    ) -> Result<()> {
117        let semaphore = Arc::new(Semaphore::new(
118            workflow.config.max_parallel.min(self.max_parallel),
119        ));
120
121        // Build dependency graph
122        let dep_graph = self.build_dependency_graph(workflow)?;
123
124        // Track completed steps
125        let completed = Arc::new(RwLock::new(HashSet::new()));
126
127        // Execute steps respecting dependencies
128        let mut pending_steps: Vec<&Step> = workflow.steps.iter().collect();
129
130        while !pending_steps.is_empty() {
131            // Find steps ready to execute (dependencies satisfied)
132            let ready_steps: Vec<&Step> = pending_steps
133                .iter()
134                .filter(|step| {
135                    // Check if already completed or skipped
136                    if context.completed_steps().contains_key(&step.name) {
137                        return false;
138                    }
139
140                    // Check dependencies
141                    // Check if dependencies are satisfied using blocking read
142                    let completed_set = {
143                        let guard = completed.blocking_read();
144                        guard.clone()
145                    };
146                    self.dependencies_satisfied(step, &completed_set)
147                })
148                .copied()
149                .collect();
150
151            if ready_steps.is_empty() {
152                // Check if we're waiting for parallel tasks
153                if context.completed_steps().len() + context.skipped_steps().len()
154                    < workflow.steps.len()
155                {
156                    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
157                    continue;
158                }
159                break;
160            }
161
162            // Execute ready steps
163            let mut tasks = Vec::new();
164
165            for step in ready_steps {
166                let step_clone = step.clone();
167                let context_clone = Arc::new(RwLock::new(context.clone()));
168                let executor = self.executor.clone();
169                let semaphore = semaphore.clone();
170                let completed = completed.clone();
171                let state_manager = self.state_manager.clone();
172                let workflow_name = workflow.metadata.name.clone();
173                let save_state = workflow.config.save_state;
174
175                let task = tokio::spawn(async move {
176                    let _permit = semaphore.acquire().await.unwrap();
177
178                    let mut ctx = context_clone.write().await;
179
180                    // Evaluate condition if present
181                    if let Some(ref condition) = step_clone.condition {
182                        let variables = ctx.get_variables();
183                        if !condition.evaluate(&variables) {
184                            ctx.skip_step(&step_clone.name, "Condition not met");
185                            return Ok(());
186                        }
187                    }
188
189                    // Execute step
190                    let result = executor.execute_step(&step_clone, &mut ctx).await?;
191
192                    if result.success {
193                        completed.write().await.insert(step_clone.name.clone());
194                    } else if !ctx.workflow().config.continue_on_error {
195                        return Err(CliError::Workflow(format!(
196                            "Step '{}' failed: {}",
197                            step_clone.name, result.message
198                        )));
199                    }
200
201                    // Save state after each step if configured
202                    if save_state {
203                        let state = ctx.get_state();
204                        let _ = state_manager.save(&workflow_name, &state).await;
205                    }
206
207                    Ok::<(), CliError>(())
208                });
209
210                tasks.push(task);
211            }
212
213            // Wait for all tasks to complete
214            for task in tasks {
215                task.await
216                    .map_err(|e| CliError::Workflow(format!("Task failed: {}", e)))??;
217            }
218
219            // Remove completed steps from pending
220            pending_steps.retain(|step| {
221                !context.completed_steps().contains_key(&step.name)
222                    && !context.skipped_steps().contains(&step.name)
223            });
224        }
225
226        Ok(())
227    }
228
229    /// Build dependency graph
230    fn build_dependency_graph(&self, workflow: &Workflow) -> Result<HashMap<String, Vec<String>>> {
231        let mut graph: HashMap<String, Vec<String>> = HashMap::new();
232
233        for step in &workflow.steps {
234            let deps: Vec<String> = step
235                .depends_on
236                .iter()
237                .map(|d| d.step_name.clone())
238                .collect();
239            graph.insert(step.name.clone(), deps);
240        }
241
242        // Detect cycles
243        let mut visited = HashSet::new();
244        let mut recursion_stack = HashSet::new();
245
246        for step in &workflow.steps {
247            if self.has_cycle(&step.name, &graph, &mut visited, &mut recursion_stack) {
248                return Err(CliError::Workflow(format!(
249                    "Circular dependency detected involving step '{}'",
250                    step.name
251                )));
252            }
253        }
254
255        Ok(graph)
256    }
257
258    /// Check for cycles in dependency graph
259    fn has_cycle(
260        &self,
261        node: &str,
262        graph: &HashMap<String, Vec<String>>,
263        visited: &mut HashSet<String>,
264        recursion_stack: &mut HashSet<String>,
265    ) -> bool {
266        if recursion_stack.contains(node) {
267            return true;
268        }
269
270        if visited.contains(node) {
271            return false;
272        }
273
274        visited.insert(node.to_string());
275        recursion_stack.insert(node.to_string());
276
277        if let Some(neighbors) = graph.get(node) {
278            for neighbor in neighbors {
279                if self.has_cycle(neighbor, graph, visited, recursion_stack) {
280                    return true;
281                }
282            }
283        }
284
285        recursion_stack.remove(node);
286        false
287    }
288
289    /// Check if all dependencies are satisfied
290    fn dependencies_satisfied(&self, step: &Step, completed: &HashSet<String>) -> bool {
291        step.depends_on
292            .iter()
293            .all(|dep| completed.contains(&dep.step_name))
294    }
295
296    /// Stop a running workflow
297    pub async fn stop(&self, workflow_name: &str) -> Result<()> {
298        // Load current state
299        if let Ok(state) = self.state_manager.load(workflow_name).await {
300            if state.state == ExecutionState::Running {
301                let mut updated_state = state;
302                updated_state.state = ExecutionState::Stopped;
303                self.state_manager
304                    .save(workflow_name, &updated_state)
305                    .await?;
306            }
307        }
308
309        Ok(())
310    }
311
312    /// Resume a stopped or failed workflow
313    pub async fn resume(&self, workflow: Workflow) -> Result<ExecutionResult> {
314        // Similar to execute but loads state first
315        self.execute(workflow).await
316    }
317}
318
319#[cfg(test)]
320mod tests {
321    use super::*;
322    use crate::workflow::definition::{StepDependency, StepType};
323    use std::env;
324
325    fn create_test_workflow() -> Workflow {
326        let mut workflow = Workflow::new("test-workflow", "1.0", "Test workflow");
327
328        let step1 = Step {
329            name: "step1".to_string(),
330            step_type: StepType::Command,
331            description: None,
332            parameters: HashMap::new(),
333            condition: None,
334            depends_on: Vec::new(),
335            retry: None,
336            for_each: None,
337            parallel: false,
338        };
339
340        workflow.add_step(step1);
341        workflow
342    }
343
344    #[tokio::test]
345    async fn test_engine_creation() {
346        let temp_dir = env::temp_dir().join("voirs_engine_test");
347        let engine = WorkflowEngine::new(temp_dir, 4);
348        assert_eq!(engine.max_parallel, 4);
349    }
350
351    #[test]
352    fn test_dependency_graph_building() {
353        let temp_dir = env::temp_dir().join("voirs_engine_test_2");
354        let engine = WorkflowEngine::new(temp_dir, 4);
355
356        let mut workflow = Workflow::new("test", "1.0", "Test");
357
358        let step1 = Step {
359            name: "step1".to_string(),
360            step_type: StepType::Command,
361            description: None,
362            parameters: HashMap::new(),
363            condition: None,
364            depends_on: Vec::new(),
365            retry: None,
366            for_each: None,
367            parallel: false,
368        };
369
370        let step2 = Step {
371            name: "step2".to_string(),
372            step_type: StepType::Command,
373            description: None,
374            parameters: HashMap::new(),
375            condition: None,
376            depends_on: vec![StepDependency {
377                step_name: "step1".to_string(),
378                must_succeed: true,
379            }],
380            retry: None,
381            for_each: None,
382            parallel: false,
383        };
384
385        workflow.add_step(step1);
386        workflow.add_step(step2);
387
388        let graph = engine.build_dependency_graph(&workflow).unwrap();
389        assert_eq!(graph.len(), 2);
390        assert_eq!(graph.get("step2").unwrap().len(), 1);
391    }
392
393    #[test]
394    fn test_circular_dependency_detection() {
395        let temp_dir = env::temp_dir().join("voirs_engine_test_3");
396        let engine = WorkflowEngine::new(temp_dir, 4);
397
398        let mut workflow = Workflow::new("test", "1.0", "Test");
399
400        let step1 = Step {
401            name: "step1".to_string(),
402            step_type: StepType::Command,
403            description: None,
404            parameters: HashMap::new(),
405            condition: None,
406            depends_on: vec![StepDependency {
407                step_name: "step2".to_string(),
408                must_succeed: true,
409            }],
410            retry: None,
411            for_each: None,
412            parallel: false,
413        };
414
415        let step2 = Step {
416            name: "step2".to_string(),
417            step_type: StepType::Command,
418            description: None,
419            parameters: HashMap::new(),
420            condition: None,
421            depends_on: vec![StepDependency {
422                step_name: "step1".to_string(),
423                must_succeed: true,
424            }],
425            retry: None,
426            for_each: None,
427            parallel: false,
428        };
429
430        workflow.add_step(step1);
431        workflow.add_step(step2);
432
433        let result = engine.build_dependency_graph(&workflow);
434        assert!(result.is_err());
435    }
436
437    #[test]
438    fn test_dependencies_satisfied() {
439        let temp_dir = env::temp_dir().join("voirs_engine_test_4");
440        let engine = WorkflowEngine::new(temp_dir, 4);
441
442        let step = Step {
443            name: "step2".to_string(),
444            step_type: StepType::Command,
445            description: None,
446            parameters: HashMap::new(),
447            condition: None,
448            depends_on: vec![StepDependency {
449                step_name: "step1".to_string(),
450                must_succeed: true,
451            }],
452            retry: None,
453            for_each: None,
454            parallel: false,
455        };
456
457        let mut completed = HashSet::new();
458        assert!(!engine.dependencies_satisfied(&step, &completed));
459
460        completed.insert("step1".to_string());
461        assert!(engine.dependencies_satisfied(&step, &completed));
462    }
463}