1use super::{
7 definition::{Step, Workflow},
8 executor::{ExecutionContext, ExecutionResult, StepExecutor},
9 state::{ExecutionState, StateManager},
10 validation::WorkflowValidator,
11 WorkflowStats,
12};
13use crate::error::CliError;
14
15type Result<T> = std::result::Result<T, CliError>;
16use std::collections::{HashMap, HashSet};
17use std::path::PathBuf;
18use std::sync::Arc;
19use std::time::Instant;
20use tokio::sync::{RwLock, Semaphore};
21
22pub struct WorkflowEngine {
24 executor: Arc<StepExecutor>,
26 state_manager: Arc<StateManager>,
28 max_parallel: usize,
30 validator: WorkflowValidator,
32}
33
34impl WorkflowEngine {
35 pub fn new(state_dir: PathBuf, max_parallel: usize) -> Self {
37 Self {
38 executor: Arc::new(StepExecutor::new()),
39 state_manager: Arc::new(StateManager::new(state_dir)),
40 max_parallel,
41 validator: WorkflowValidator::new(),
42 }
43 }
44
45 pub async fn execute(&self, workflow: Workflow) -> Result<ExecutionResult> {
47 self.validator.validate(&workflow)?;
49
50 let start_time = Instant::now();
51 let workflow_name = workflow.metadata.name.clone();
52
53 let mut context = ExecutionContext::new(workflow.clone());
55
56 if workflow.config.save_state {
58 if let Ok(state) = self.state_manager.load(&workflow_name).await {
59 context.resume_from_state(state);
60 }
61 }
62
63 let result = self.execute_steps(&workflow, &mut context).await;
65
66 if workflow.config.save_state {
68 let state = context.get_state();
69 let _ = self.state_manager.save(&workflow_name, &state).await;
70 }
71
72 let duration = start_time.elapsed();
73
74 let stats = WorkflowStats {
76 total_steps: context.completed_steps().len(),
77 successful_steps: context
78 .completed_steps()
79 .iter()
80 .filter(|(_, r)| r.success)
81 .count(),
82 failed_steps: context
83 .completed_steps()
84 .iter()
85 .filter(|(_, r)| !r.success)
86 .count(),
87 skipped_steps: context.skipped_steps().len(),
88 total_duration_ms: duration.as_millis() as u64,
89 avg_step_duration_ms: if !context.completed_steps().is_empty() {
90 duration.as_millis() as u64 / context.completed_steps().len() as u64
91 } else {
92 0
93 },
94 total_retries: context.total_retries(),
95 };
96
97 match result {
98 Ok(_) => Ok(ExecutionResult::success(
99 workflow_name,
100 "Workflow completed successfully".to_string(),
101 stats,
102 )),
103 Err(e) => Ok(ExecutionResult::failure(
104 workflow_name,
105 format!("Workflow failed: {}", e),
106 stats,
107 )),
108 }
109 }
110
111 async fn execute_steps(
113 &self,
114 workflow: &Workflow,
115 context: &mut ExecutionContext,
116 ) -> Result<()> {
117 let semaphore = Arc::new(Semaphore::new(
118 workflow.config.max_parallel.min(self.max_parallel),
119 ));
120
121 let dep_graph = self.build_dependency_graph(workflow)?;
123
124 let completed = Arc::new(RwLock::new(HashSet::new()));
126
127 let mut pending_steps: Vec<&Step> = workflow.steps.iter().collect();
129
130 while !pending_steps.is_empty() {
131 let ready_steps: Vec<&Step> = pending_steps
133 .iter()
134 .filter(|step| {
135 if context.completed_steps().contains_key(&step.name) {
137 return false;
138 }
139
140 let completed_set = {
143 let guard = completed.blocking_read();
144 guard.clone()
145 };
146 self.dependencies_satisfied(step, &completed_set)
147 })
148 .copied()
149 .collect();
150
151 if ready_steps.is_empty() {
152 if context.completed_steps().len() + context.skipped_steps().len()
154 < workflow.steps.len()
155 {
156 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
157 continue;
158 }
159 break;
160 }
161
162 let mut tasks = Vec::new();
164
165 for step in ready_steps {
166 let step_clone = step.clone();
167 let context_clone = Arc::new(RwLock::new(context.clone()));
168 let executor = self.executor.clone();
169 let semaphore = semaphore.clone();
170 let completed = completed.clone();
171 let state_manager = self.state_manager.clone();
172 let workflow_name = workflow.metadata.name.clone();
173 let save_state = workflow.config.save_state;
174
175 let task = tokio::spawn(async move {
176 let _permit = semaphore.acquire().await.unwrap();
177
178 let mut ctx = context_clone.write().await;
179
180 if let Some(ref condition) = step_clone.condition {
182 let variables = ctx.get_variables();
183 if !condition.evaluate(&variables) {
184 ctx.skip_step(&step_clone.name, "Condition not met");
185 return Ok(());
186 }
187 }
188
189 let result = executor.execute_step(&step_clone, &mut ctx).await?;
191
192 if result.success {
193 completed.write().await.insert(step_clone.name.clone());
194 } else if !ctx.workflow().config.continue_on_error {
195 return Err(CliError::Workflow(format!(
196 "Step '{}' failed: {}",
197 step_clone.name, result.message
198 )));
199 }
200
201 if save_state {
203 let state = ctx.get_state();
204 let _ = state_manager.save(&workflow_name, &state).await;
205 }
206
207 Ok::<(), CliError>(())
208 });
209
210 tasks.push(task);
211 }
212
213 for task in tasks {
215 task.await
216 .map_err(|e| CliError::Workflow(format!("Task failed: {}", e)))??;
217 }
218
219 pending_steps.retain(|step| {
221 !context.completed_steps().contains_key(&step.name)
222 && !context.skipped_steps().contains(&step.name)
223 });
224 }
225
226 Ok(())
227 }
228
229 fn build_dependency_graph(&self, workflow: &Workflow) -> Result<HashMap<String, Vec<String>>> {
231 let mut graph: HashMap<String, Vec<String>> = HashMap::new();
232
233 for step in &workflow.steps {
234 let deps: Vec<String> = step
235 .depends_on
236 .iter()
237 .map(|d| d.step_name.clone())
238 .collect();
239 graph.insert(step.name.clone(), deps);
240 }
241
242 let mut visited = HashSet::new();
244 let mut recursion_stack = HashSet::new();
245
246 for step in &workflow.steps {
247 if self.has_cycle(&step.name, &graph, &mut visited, &mut recursion_stack) {
248 return Err(CliError::Workflow(format!(
249 "Circular dependency detected involving step '{}'",
250 step.name
251 )));
252 }
253 }
254
255 Ok(graph)
256 }
257
258 fn has_cycle(
260 &self,
261 node: &str,
262 graph: &HashMap<String, Vec<String>>,
263 visited: &mut HashSet<String>,
264 recursion_stack: &mut HashSet<String>,
265 ) -> bool {
266 if recursion_stack.contains(node) {
267 return true;
268 }
269
270 if visited.contains(node) {
271 return false;
272 }
273
274 visited.insert(node.to_string());
275 recursion_stack.insert(node.to_string());
276
277 if let Some(neighbors) = graph.get(node) {
278 for neighbor in neighbors {
279 if self.has_cycle(neighbor, graph, visited, recursion_stack) {
280 return true;
281 }
282 }
283 }
284
285 recursion_stack.remove(node);
286 false
287 }
288
289 fn dependencies_satisfied(&self, step: &Step, completed: &HashSet<String>) -> bool {
291 step.depends_on
292 .iter()
293 .all(|dep| completed.contains(&dep.step_name))
294 }
295
296 pub async fn stop(&self, workflow_name: &str) -> Result<()> {
298 if let Ok(state) = self.state_manager.load(workflow_name).await {
300 if state.state == ExecutionState::Running {
301 let mut updated_state = state;
302 updated_state.state = ExecutionState::Stopped;
303 self.state_manager
304 .save(workflow_name, &updated_state)
305 .await?;
306 }
307 }
308
309 Ok(())
310 }
311
312 pub async fn resume(&self, workflow: Workflow) -> Result<ExecutionResult> {
314 self.execute(workflow).await
316 }
317}
318
319#[cfg(test)]
320mod tests {
321 use super::*;
322 use crate::workflow::definition::{StepDependency, StepType};
323 use std::env;
324
325 fn create_test_workflow() -> Workflow {
326 let mut workflow = Workflow::new("test-workflow", "1.0", "Test workflow");
327
328 let step1 = Step {
329 name: "step1".to_string(),
330 step_type: StepType::Command,
331 description: None,
332 parameters: HashMap::new(),
333 condition: None,
334 depends_on: Vec::new(),
335 retry: None,
336 for_each: None,
337 parallel: false,
338 };
339
340 workflow.add_step(step1);
341 workflow
342 }
343
344 #[tokio::test]
345 async fn test_engine_creation() {
346 let temp_dir = env::temp_dir().join("voirs_engine_test");
347 let engine = WorkflowEngine::new(temp_dir, 4);
348 assert_eq!(engine.max_parallel, 4);
349 }
350
351 #[test]
352 fn test_dependency_graph_building() {
353 let temp_dir = env::temp_dir().join("voirs_engine_test_2");
354 let engine = WorkflowEngine::new(temp_dir, 4);
355
356 let mut workflow = Workflow::new("test", "1.0", "Test");
357
358 let step1 = Step {
359 name: "step1".to_string(),
360 step_type: StepType::Command,
361 description: None,
362 parameters: HashMap::new(),
363 condition: None,
364 depends_on: Vec::new(),
365 retry: None,
366 for_each: None,
367 parallel: false,
368 };
369
370 let step2 = Step {
371 name: "step2".to_string(),
372 step_type: StepType::Command,
373 description: None,
374 parameters: HashMap::new(),
375 condition: None,
376 depends_on: vec![StepDependency {
377 step_name: "step1".to_string(),
378 must_succeed: true,
379 }],
380 retry: None,
381 for_each: None,
382 parallel: false,
383 };
384
385 workflow.add_step(step1);
386 workflow.add_step(step2);
387
388 let graph = engine.build_dependency_graph(&workflow).unwrap();
389 assert_eq!(graph.len(), 2);
390 assert_eq!(graph.get("step2").unwrap().len(), 1);
391 }
392
393 #[test]
394 fn test_circular_dependency_detection() {
395 let temp_dir = env::temp_dir().join("voirs_engine_test_3");
396 let engine = WorkflowEngine::new(temp_dir, 4);
397
398 let mut workflow = Workflow::new("test", "1.0", "Test");
399
400 let step1 = Step {
401 name: "step1".to_string(),
402 step_type: StepType::Command,
403 description: None,
404 parameters: HashMap::new(),
405 condition: None,
406 depends_on: vec![StepDependency {
407 step_name: "step2".to_string(),
408 must_succeed: true,
409 }],
410 retry: None,
411 for_each: None,
412 parallel: false,
413 };
414
415 let step2 = Step {
416 name: "step2".to_string(),
417 step_type: StepType::Command,
418 description: None,
419 parameters: HashMap::new(),
420 condition: None,
421 depends_on: vec![StepDependency {
422 step_name: "step1".to_string(),
423 must_succeed: true,
424 }],
425 retry: None,
426 for_each: None,
427 parallel: false,
428 };
429
430 workflow.add_step(step1);
431 workflow.add_step(step2);
432
433 let result = engine.build_dependency_graph(&workflow);
434 assert!(result.is_err());
435 }
436
437 #[test]
438 fn test_dependencies_satisfied() {
439 let temp_dir = env::temp_dir().join("voirs_engine_test_4");
440 let engine = WorkflowEngine::new(temp_dir, 4);
441
442 let step = Step {
443 name: "step2".to_string(),
444 step_type: StepType::Command,
445 description: None,
446 parameters: HashMap::new(),
447 condition: None,
448 depends_on: vec![StepDependency {
449 step_name: "step1".to_string(),
450 must_succeed: true,
451 }],
452 retry: None,
453 for_each: None,
454 parallel: false,
455 };
456
457 let mut completed = HashSet::new();
458 assert!(!engine.dependencies_satisfied(&step, &completed));
459
460 completed.insert("step1".to_string());
461 assert!(engine.dependencies_satisfied(&step, &completed));
462 }
463}