use crate::error::Result;
use crate::workflow::{
ExecutionState, StateManager, Workflow, WorkflowEngine, WorkflowRegistry, WorkflowValidator,
};
use clap::Subcommand;
use std::path::PathBuf;
#[derive(Subcommand)]
pub enum WorkflowCommands {
Execute {
workflow_file: PathBuf,
#[arg(short = 'v', long = "var", value_parser = parse_key_val)]
variables: Vec<(String, String)>,
#[arg(long, default_value = "4")]
max_parallel: usize,
#[arg(long)]
resume: bool,
#[arg(long)]
state_dir: Option<PathBuf>,
},
Validate {
workflow_file: PathBuf,
#[arg(long)]
detailed: bool,
#[arg(long, default_value = "text")]
format: String,
},
List {
#[arg(long)]
registry_dir: Option<PathBuf>,
#[arg(long)]
detailed: bool,
},
Status {
workflow_name: String,
#[arg(long)]
state_dir: Option<PathBuf>,
#[arg(long, default_value = "text")]
format: String,
},
Resume {
workflow_name: String,
#[arg(long)]
state_dir: Option<PathBuf>,
#[arg(long, default_value = "4")]
max_parallel: usize,
},
Stop {
workflow_name: String,
#[arg(long)]
state_dir: Option<PathBuf>,
#[arg(long)]
force: bool,
},
}
fn parse_key_val(s: &str) -> std::result::Result<(String, String), String> {
let pos = s
.find('=')
.ok_or_else(|| format!("Invalid KEY=value: no `=` found in `{}`", s))?;
Ok((s[..pos].to_string(), s[pos + 1..].to_string()))
}
pub async fn run_workflow_execute(
workflow_file: PathBuf,
variables: Vec<(String, String)>,
max_parallel: usize,
resume: bool,
state_dir: Option<PathBuf>,
) -> Result<()> {
println!("Loading workflow from: {}", workflow_file.display());
let mut workflow = Workflow::load_from_file(&workflow_file).await?;
for (key, value) in variables {
workflow
.variables
.insert(key.clone(), crate::workflow::Variable::String(value));
}
println!("Workflow: {}", workflow.metadata.name);
println!("Version: {}", workflow.metadata.version);
if !workflow.metadata.description.is_empty() {
println!("Description: {}", workflow.metadata.description);
}
println!("Steps: {}", workflow.steps.len());
println!();
let state_dir = state_dir.unwrap_or_else(|| {
std::env::current_dir()
.expect("current dir should be accessible")
.join(".voirs")
.join("workflow_state")
});
if resume {
let state_manager = StateManager::new(state_dir.clone());
if state_manager.exists(&workflow.metadata.name).await {
println!("Resuming workflow from previous state...");
println!();
}
}
let engine = WorkflowEngine::new(state_dir, max_parallel);
println!("Executing workflow...");
println!("─────────────────────────────────────────────────");
let result = engine.execute(workflow).await?;
println!("─────────────────────────────────────────────────");
println!();
println!("Workflow execution completed!");
println!();
println!("Statistics:");
println!(" Total steps: {}", result.stats.total_steps);
println!(" Successful: {}", result.stats.successful_steps);
println!(" Failed: {}", result.stats.failed_steps);
println!(" Skipped: {}", result.stats.skipped_steps);
println!(" Total duration: {}ms", result.stats.total_duration_ms);
println!(
" Average step duration: {}ms",
result.stats.avg_step_duration_ms
);
println!(" Total retries: {}", result.stats.total_retries);
println!(
" Success rate: {:.1}%",
result.stats.success_rate() * 100.0
);
if result.stats.is_successful() {
println!();
println!("✓ All steps completed successfully");
} else {
println!();
println!("✗ Some steps failed");
return Err(crate::error::CliError::Workflow(
"Workflow execution had failures".to_string(),
));
}
Ok(())
}
pub async fn run_workflow_validate(
workflow_file: PathBuf,
detailed: bool,
format: String,
) -> Result<()> {
println!("Validating workflow: {}", workflow_file.display());
println!();
let workflow = Workflow::load_from_file(&workflow_file).await?;
let validator = WorkflowValidator::new();
let result = validator.validate(&workflow)?;
match format.as_str() {
"json" => {
let json = serde_json::to_string_pretty(&result)?;
println!("{}", json);
}
"yaml" => {
let yaml = serde_yaml::to_string(&result).map_err(|e| {
crate::error::CliError::SerializationError(format!(
"Failed to serialize to YAML: {}",
e
))
})?;
println!("{}", yaml);
}
_ => {
if result.valid {
println!("✓ Workflow validation passed");
} else {
println!("✗ Workflow validation failed");
}
println!();
if result.has_errors() {
println!("Errors:");
for error in &result.errors {
println!(" - {}", error);
}
println!();
}
if result.has_warnings() {
println!("Warnings:");
for warning in &result.warnings {
println!(" - {}", warning);
}
println!();
}
if detailed && result.valid {
println!("Workflow Details:");
println!(" Name: {}", workflow.metadata.name);
println!(" Version: {}", workflow.metadata.version);
if !workflow.metadata.description.is_empty() {
println!(" Description: {}", workflow.metadata.description);
}
println!(" Steps: {}", workflow.steps.len());
println!(" Variables: {}", workflow.variables.len());
println!(" Max parallel: {}", workflow.config.max_parallel);
}
}
}
if !result.valid {
return Err(crate::error::CliError::Workflow(
"Workflow validation failed".to_string(),
));
}
Ok(())
}
pub async fn run_workflow_list(registry_dir: Option<PathBuf>, detailed: bool) -> Result<()> {
let registry_dir = registry_dir.unwrap_or_else(|| {
std::env::current_dir()
.expect("current dir should be accessible")
.join(".voirs")
.join("workflows")
});
let registry = WorkflowRegistry::new(registry_dir.clone());
let count = registry.load_from_directory().await?;
if count == 0 {
println!("No workflows found in: {}", registry_dir.display());
println!();
println!("Create workflow definitions in this directory or specify a different path with --registry-dir");
return Ok(());
}
println!("Registered workflows ({} found):", count);
println!();
let workflow_names = registry.list().await;
for name in workflow_names {
if let Some(workflow) = registry.get(&name).await {
println!(" • {}", workflow.metadata.name);
if detailed {
println!(" Version: {}", workflow.metadata.version);
if !workflow.metadata.description.is_empty() {
println!(" Description: {}", workflow.metadata.description);
}
println!(" Steps: {}", workflow.steps.len());
println!(" Variables: {}", workflow.variables.len());
println!();
}
}
}
Ok(())
}
pub async fn run_workflow_status(
workflow_name: String,
state_dir: Option<PathBuf>,
format: String,
) -> Result<()> {
let state_dir = state_dir.unwrap_or_else(|| {
std::env::current_dir()
.expect("current dir should be accessible")
.join(".voirs")
.join("workflow_state")
});
let state_manager = StateManager::new(state_dir);
if !state_manager.exists(&workflow_name).await {
return Err(crate::error::CliError::Workflow(format!(
"No state found for workflow: {}",
workflow_name
)));
}
let state = state_manager.load(&workflow_name).await?;
match format.as_str() {
"json" => {
let json = serde_json::to_string_pretty(&state)?;
println!("{}", json);
}
"yaml" => {
let yaml = serde_yaml::to_string(&state).map_err(|e| {
crate::error::CliError::SerializationError(format!(
"Failed to serialize to YAML: {}",
e
))
})?;
println!("{}", yaml);
}
_ => {
println!("Workflow: {}", state.workflow_name);
println!("State: {:?}", state.state);
println!();
println!("Progress:");
println!(" Completed steps: {}", state.completed_steps.len());
println!(" Skipped steps: {}", state.skipped_steps.len());
if let Some(ref current) = state.current_step {
println!(" Current step: {}", current);
}
println!(" Total retries: {}", state.total_retries);
println!();
println!("Variables: {}", state.variables.len());
for (key, value) in &state.variables {
println!(" {}: {:?}", key, value);
}
println!();
println!(
"Last updated: {}",
state.last_updated.format("%Y-%m-%d %H:%M:%S UTC")
);
println!();
if state.can_resume() {
println!("✓ This workflow can be resumed");
} else {
println!(
"✗ This workflow cannot be resumed (state: {:?})",
state.state
);
}
}
}
Ok(())
}
pub async fn run_workflow_resume(
workflow_name: String,
state_dir: Option<PathBuf>,
max_parallel: usize,
) -> Result<()> {
let state_dir = state_dir.unwrap_or_else(|| {
std::env::current_dir()
.expect("current dir should be accessible")
.join(".voirs")
.join("workflow_state")
});
let state_manager = StateManager::new(state_dir.clone());
if !state_manager.exists(&workflow_name).await {
return Err(crate::error::CliError::Workflow(format!(
"No state found for workflow: {}",
workflow_name
)));
}
let state = state_manager.load(&workflow_name).await?;
if !state.can_resume() {
return Err(crate::error::CliError::Workflow(format!(
"Workflow '{}' cannot be resumed (current state: {:?})",
workflow_name, state.state
)));
}
println!("Resuming workflow: {}", workflow_name);
println!("Current state: {:?}", state.state);
println!("Completed steps: {}", state.completed_steps.len());
println!();
println!("⚠ Resume functionality requires the original workflow definition file");
println!(" Use: voirs workflow execute <workflow-file> --resume");
Ok(())
}
pub async fn run_workflow_stop(
workflow_name: String,
state_dir: Option<PathBuf>,
force: bool,
) -> Result<()> {
let state_dir = state_dir.unwrap_or_else(|| {
std::env::current_dir()
.expect("current dir should be accessible")
.join(".voirs")
.join("workflow_state")
});
let state_manager = StateManager::new(state_dir);
if !state_manager.exists(&workflow_name).await {
return Err(crate::error::CliError::Workflow(format!(
"No state found for workflow: {}",
workflow_name
)));
}
let mut state = state_manager.load(&workflow_name).await?;
if state.state != ExecutionState::Running {
println!(
"Workflow '{}' is not running (state: {:?})",
workflow_name, state.state
);
return Ok(());
}
if force {
state_manager.delete(&workflow_name).await?;
println!(
"✓ Workflow '{}' forcefully stopped (state deleted)",
workflow_name
);
} else {
state.state = ExecutionState::Stopped;
state.last_updated = chrono::Utc::now();
state_manager.save(&workflow_name, &state).await?;
println!("✓ Workflow '{}' stopped gracefully", workflow_name);
println!(" State saved for potential resume");
}
Ok(())
}