Skip to main content

voirs_cli/commands/
workflow.rs

1//! Workflow automation command implementations.
2//!
3//! This module provides commands for managing and executing declarative workflows
4//! for complex multi-step synthesis pipelines.
5
6use crate::error::Result;
7use crate::workflow::{
8    ExecutionState, StateManager, Workflow, WorkflowEngine, WorkflowRegistry, WorkflowValidator,
9};
10use clap::Subcommand;
11use std::path::PathBuf;
12
13/// Workflow automation commands
14#[derive(Subcommand)]
15pub enum WorkflowCommands {
16    /// Execute a workflow from a definition file
17    Execute {
18        /// Path to workflow definition file (YAML or JSON)
19        workflow_file: PathBuf,
20
21        /// Override workflow variables (key=value format)
22        #[arg(short = 'v', long = "var", value_parser = parse_key_val)]
23        variables: Vec<(String, String)>,
24
25        /// Maximum number of parallel steps
26        #[arg(long, default_value = "4")]
27        max_parallel: usize,
28
29        /// Resume from previous execution if available
30        #[arg(long)]
31        resume: bool,
32
33        /// Storage directory for workflow state
34        #[arg(long)]
35        state_dir: Option<PathBuf>,
36    },
37
38    /// Validate a workflow definition without executing
39    Validate {
40        /// Path to workflow definition file (YAML or JSON)
41        workflow_file: PathBuf,
42
43        /// Show detailed validation results
44        #[arg(long)]
45        detailed: bool,
46
47        /// Output format (text, json, yaml)
48        #[arg(long, default_value = "text")]
49        format: String,
50    },
51
52    /// List all registered workflows
53    List {
54        /// Workflow registry directory
55        #[arg(long)]
56        registry_dir: Option<PathBuf>,
57
58        /// Show detailed information
59        #[arg(long)]
60        detailed: bool,
61    },
62
63    /// Show status of a workflow execution
64    Status {
65        /// Workflow name
66        workflow_name: String,
67
68        /// Storage directory for workflow state
69        #[arg(long)]
70        state_dir: Option<PathBuf>,
71
72        /// Output format (text, json, yaml)
73        #[arg(long, default_value = "text")]
74        format: String,
75    },
76
77    /// Resume a failed or stopped workflow
78    Resume {
79        /// Workflow name to resume
80        workflow_name: String,
81
82        /// Storage directory for workflow state
83        #[arg(long)]
84        state_dir: Option<PathBuf>,
85
86        /// Maximum number of parallel steps
87        #[arg(long, default_value = "4")]
88        max_parallel: usize,
89    },
90
91    /// Stop a running workflow
92    Stop {
93        /// Workflow name to stop
94        workflow_name: String,
95
96        /// Storage directory for workflow state
97        #[arg(long)]
98        state_dir: Option<PathBuf>,
99
100        /// Force stop without saving state
101        #[arg(long)]
102        force: bool,
103    },
104}
105
106/// Parse a key-value pair for variable overrides
107fn parse_key_val(s: &str) -> std::result::Result<(String, String), String> {
108    let pos = s
109        .find('=')
110        .ok_or_else(|| format!("Invalid KEY=value: no `=` found in `{}`", s))?;
111    Ok((s[..pos].to_string(), s[pos + 1..].to_string()))
112}
113
114/// Execute a workflow from a definition file
115pub async fn run_workflow_execute(
116    workflow_file: PathBuf,
117    variables: Vec<(String, String)>,
118    max_parallel: usize,
119    resume: bool,
120    state_dir: Option<PathBuf>,
121) -> Result<()> {
122    println!("Loading workflow from: {}", workflow_file.display());
123
124    // Load workflow definition
125    let mut workflow = Workflow::load_from_file(&workflow_file).await?;
126
127    // Apply variable overrides
128    for (key, value) in variables {
129        workflow
130            .variables
131            .insert(key.clone(), crate::workflow::Variable::String(value));
132    }
133
134    println!("Workflow: {}", workflow.metadata.name);
135    println!("Version: {}", workflow.metadata.version);
136    if !workflow.metadata.description.is_empty() {
137        println!("Description: {}", workflow.metadata.description);
138    }
139    println!("Steps: {}", workflow.steps.len());
140    println!();
141
142    // Determine state directory
143    let state_dir = state_dir.unwrap_or_else(|| {
144        std::env::current_dir()
145            .unwrap()
146            .join(".voirs")
147            .join("workflow_state")
148    });
149
150    // Check if we should resume
151    if resume {
152        let state_manager = StateManager::new(state_dir.clone());
153        if state_manager.exists(&workflow.metadata.name).await {
154            println!("Resuming workflow from previous state...");
155            println!();
156        }
157    }
158
159    // Create workflow engine
160    let engine = WorkflowEngine::new(state_dir, max_parallel);
161
162    // Execute workflow
163    println!("Executing workflow...");
164    println!("─────────────────────────────────────────────────");
165
166    let result = engine.execute(workflow).await?;
167
168    println!("─────────────────────────────────────────────────");
169    println!();
170    println!("Workflow execution completed!");
171    println!();
172    println!("Statistics:");
173    println!("  Total steps: {}", result.stats.total_steps);
174    println!("  Successful: {}", result.stats.successful_steps);
175    println!("  Failed: {}", result.stats.failed_steps);
176    println!("  Skipped: {}", result.stats.skipped_steps);
177    println!("  Total duration: {}ms", result.stats.total_duration_ms);
178    println!(
179        "  Average step duration: {}ms",
180        result.stats.avg_step_duration_ms
181    );
182    println!("  Total retries: {}", result.stats.total_retries);
183    println!(
184        "  Success rate: {:.1}%",
185        result.stats.success_rate() * 100.0
186    );
187
188    if result.stats.is_successful() {
189        println!();
190        println!("✓ All steps completed successfully");
191    } else {
192        println!();
193        println!("✗ Some steps failed");
194        return Err(crate::error::CliError::Workflow(
195            "Workflow execution had failures".to_string(),
196        ));
197    }
198
199    Ok(())
200}
201
202/// Validate a workflow definition
203pub async fn run_workflow_validate(
204    workflow_file: PathBuf,
205    detailed: bool,
206    format: String,
207) -> Result<()> {
208    println!("Validating workflow: {}", workflow_file.display());
209    println!();
210
211    // Load workflow definition
212    let workflow = Workflow::load_from_file(&workflow_file).await?;
213
214    // Create validator
215    let validator = WorkflowValidator::new();
216
217    // Validate workflow
218    let result = validator.validate(&workflow)?;
219
220    // Output results based on format
221    match format.as_str() {
222        "json" => {
223            let json = serde_json::to_string_pretty(&result)?;
224            println!("{}", json);
225        }
226        "yaml" => {
227            let yaml = serde_yaml::to_string(&result).map_err(|e| {
228                crate::error::CliError::SerializationError(format!(
229                    "Failed to serialize to YAML: {}",
230                    e
231                ))
232            })?;
233            println!("{}", yaml);
234        }
235        _ => {
236            // Text format
237            if result.valid {
238                println!("✓ Workflow validation passed");
239            } else {
240                println!("✗ Workflow validation failed");
241            }
242            println!();
243
244            if result.has_errors() {
245                println!("Errors:");
246                for error in &result.errors {
247                    println!("  - {}", error);
248                }
249                println!();
250            }
251
252            if result.has_warnings() {
253                println!("Warnings:");
254                for warning in &result.warnings {
255                    println!("  - {}", warning);
256                }
257                println!();
258            }
259
260            if detailed && result.valid {
261                println!("Workflow Details:");
262                println!("  Name: {}", workflow.metadata.name);
263                println!("  Version: {}", workflow.metadata.version);
264                if !workflow.metadata.description.is_empty() {
265                    println!("  Description: {}", workflow.metadata.description);
266                }
267                println!("  Steps: {}", workflow.steps.len());
268                println!("  Variables: {}", workflow.variables.len());
269                println!("  Max parallel: {}", workflow.config.max_parallel);
270            }
271        }
272    }
273
274    if !result.valid {
275        return Err(crate::error::CliError::Workflow(
276            "Workflow validation failed".to_string(),
277        ));
278    }
279
280    Ok(())
281}
282
283/// List all registered workflows
284pub async fn run_workflow_list(registry_dir: Option<PathBuf>, detailed: bool) -> Result<()> {
285    // Determine registry directory
286    let registry_dir = registry_dir.unwrap_or_else(|| {
287        std::env::current_dir()
288            .unwrap()
289            .join(".voirs")
290            .join("workflows")
291    });
292
293    let registry = WorkflowRegistry::new(registry_dir.clone());
294
295    // Load workflows from directory
296    let count = registry.load_from_directory().await?;
297
298    if count == 0 {
299        println!("No workflows found in: {}", registry_dir.display());
300        println!();
301        println!("Create workflow definitions in this directory or specify a different path with --registry-dir");
302        return Ok(());
303    }
304
305    println!("Registered workflows ({} found):", count);
306    println!();
307
308    let workflow_names = registry.list().await;
309
310    for name in workflow_names {
311        if let Some(workflow) = registry.get(&name).await {
312            println!("  • {}", workflow.metadata.name);
313            if detailed {
314                println!("    Version: {}", workflow.metadata.version);
315                if !workflow.metadata.description.is_empty() {
316                    println!("    Description: {}", workflow.metadata.description);
317                }
318                println!("    Steps: {}", workflow.steps.len());
319                println!("    Variables: {}", workflow.variables.len());
320                println!();
321            }
322        }
323    }
324
325    Ok(())
326}
327
328/// Show status of a workflow execution
329pub async fn run_workflow_status(
330    workflow_name: String,
331    state_dir: Option<PathBuf>,
332    format: String,
333) -> Result<()> {
334    // Determine state directory
335    let state_dir = state_dir.unwrap_or_else(|| {
336        std::env::current_dir()
337            .unwrap()
338            .join(".voirs")
339            .join("workflow_state")
340    });
341
342    let state_manager = StateManager::new(state_dir);
343
344    // Check if state exists
345    if !state_manager.exists(&workflow_name).await {
346        return Err(crate::error::CliError::Workflow(format!(
347            "No state found for workflow: {}",
348            workflow_name
349        )));
350    }
351
352    // Load state
353    let state = state_manager.load(&workflow_name).await?;
354
355    // Output based on format
356    match format.as_str() {
357        "json" => {
358            let json = serde_json::to_string_pretty(&state)?;
359            println!("{}", json);
360        }
361        "yaml" => {
362            let yaml = serde_yaml::to_string(&state).map_err(|e| {
363                crate::error::CliError::SerializationError(format!(
364                    "Failed to serialize to YAML: {}",
365                    e
366                ))
367            })?;
368            println!("{}", yaml);
369        }
370        _ => {
371            // Text format
372            println!("Workflow: {}", state.workflow_name);
373            println!("State: {:?}", state.state);
374            println!();
375
376            println!("Progress:");
377            println!("  Completed steps: {}", state.completed_steps.len());
378            println!("  Skipped steps: {}", state.skipped_steps.len());
379            if let Some(ref current) = state.current_step {
380                println!("  Current step: {}", current);
381            }
382            println!("  Total retries: {}", state.total_retries);
383            println!();
384
385            println!("Variables: {}", state.variables.len());
386            for (key, value) in &state.variables {
387                println!("  {}: {:?}", key, value);
388            }
389            println!();
390
391            println!(
392                "Last updated: {}",
393                state.last_updated.format("%Y-%m-%d %H:%M:%S UTC")
394            );
395            println!();
396
397            if state.can_resume() {
398                println!("✓ This workflow can be resumed");
399            } else {
400                println!(
401                    "✗ This workflow cannot be resumed (state: {:?})",
402                    state.state
403                );
404            }
405        }
406    }
407
408    Ok(())
409}
410
411/// Resume a failed or stopped workflow
412pub async fn run_workflow_resume(
413    workflow_name: String,
414    state_dir: Option<PathBuf>,
415    max_parallel: usize,
416) -> Result<()> {
417    // Determine state directory
418    let state_dir = state_dir.unwrap_or_else(|| {
419        std::env::current_dir()
420            .unwrap()
421            .join(".voirs")
422            .join("workflow_state")
423    });
424
425    let state_manager = StateManager::new(state_dir.clone());
426
427    // Check if state exists
428    if !state_manager.exists(&workflow_name).await {
429        return Err(crate::error::CliError::Workflow(format!(
430            "No state found for workflow: {}",
431            workflow_name
432        )));
433    }
434
435    // Load state
436    let state = state_manager.load(&workflow_name).await?;
437
438    // Check if workflow can be resumed
439    if !state.can_resume() {
440        return Err(crate::error::CliError::Workflow(format!(
441            "Workflow '{}' cannot be resumed (current state: {:?})",
442            workflow_name, state.state
443        )));
444    }
445
446    println!("Resuming workflow: {}", workflow_name);
447    println!("Current state: {:?}", state.state);
448    println!("Completed steps: {}", state.completed_steps.len());
449    println!();
450
451    // Note: Resume functionality would require loading the original workflow definition
452    // and passing the existing state to the engine. This is a simplified version.
453    println!("⚠ Resume functionality requires the original workflow definition file");
454    println!("  Use: voirs workflow execute <workflow-file> --resume");
455
456    Ok(())
457}
458
459/// Stop a running workflow
460pub async fn run_workflow_stop(
461    workflow_name: String,
462    state_dir: Option<PathBuf>,
463    force: bool,
464) -> Result<()> {
465    // Determine state directory
466    let state_dir = state_dir.unwrap_or_else(|| {
467        std::env::current_dir()
468            .unwrap()
469            .join(".voirs")
470            .join("workflow_state")
471    });
472
473    let state_manager = StateManager::new(state_dir);
474
475    // Check if state exists
476    if !state_manager.exists(&workflow_name).await {
477        return Err(crate::error::CliError::Workflow(format!(
478            "No state found for workflow: {}",
479            workflow_name
480        )));
481    }
482
483    // Load state
484    let mut state = state_manager.load(&workflow_name).await?;
485
486    // Check if workflow is running
487    if state.state != ExecutionState::Running {
488        println!(
489            "Workflow '{}' is not running (state: {:?})",
490            workflow_name, state.state
491        );
492        return Ok(());
493    }
494
495    if force {
496        // Force stop - delete state
497        state_manager.delete(&workflow_name).await?;
498        println!(
499            "✓ Workflow '{}' forcefully stopped (state deleted)",
500            workflow_name
501        );
502    } else {
503        // Graceful stop - mark as stopped
504        state.state = ExecutionState::Stopped;
505        state.last_updated = chrono::Utc::now();
506        state_manager.save(&workflow_name, &state).await?;
507        println!("✓ Workflow '{}' stopped gracefully", workflow_name);
508        println!("  State saved for potential resume");
509    }
510
511    Ok(())
512}