1use crate::error::Result;
7use crate::workflow::{
8 ExecutionState, StateManager, Workflow, WorkflowEngine, WorkflowRegistry, WorkflowValidator,
9};
10use clap::Subcommand;
11use std::path::PathBuf;
12
13#[derive(Subcommand)]
15pub enum WorkflowCommands {
16 Execute {
18 workflow_file: PathBuf,
20
21 #[arg(short = 'v', long = "var", value_parser = parse_key_val)]
23 variables: Vec<(String, String)>,
24
25 #[arg(long, default_value = "4")]
27 max_parallel: usize,
28
29 #[arg(long)]
31 resume: bool,
32
33 #[arg(long)]
35 state_dir: Option<PathBuf>,
36 },
37
38 Validate {
40 workflow_file: PathBuf,
42
43 #[arg(long)]
45 detailed: bool,
46
47 #[arg(long, default_value = "text")]
49 format: String,
50 },
51
52 List {
54 #[arg(long)]
56 registry_dir: Option<PathBuf>,
57
58 #[arg(long)]
60 detailed: bool,
61 },
62
63 Status {
65 workflow_name: String,
67
68 #[arg(long)]
70 state_dir: Option<PathBuf>,
71
72 #[arg(long, default_value = "text")]
74 format: String,
75 },
76
77 Resume {
79 workflow_name: String,
81
82 #[arg(long)]
84 state_dir: Option<PathBuf>,
85
86 #[arg(long, default_value = "4")]
88 max_parallel: usize,
89 },
90
91 Stop {
93 workflow_name: String,
95
96 #[arg(long)]
98 state_dir: Option<PathBuf>,
99
100 #[arg(long)]
102 force: bool,
103 },
104}
105
106fn parse_key_val(s: &str) -> std::result::Result<(String, String), String> {
108 let pos = s
109 .find('=')
110 .ok_or_else(|| format!("Invalid KEY=value: no `=` found in `{}`", s))?;
111 Ok((s[..pos].to_string(), s[pos + 1..].to_string()))
112}
113
114pub async fn run_workflow_execute(
116 workflow_file: PathBuf,
117 variables: Vec<(String, String)>,
118 max_parallel: usize,
119 resume: bool,
120 state_dir: Option<PathBuf>,
121) -> Result<()> {
122 println!("Loading workflow from: {}", workflow_file.display());
123
124 let mut workflow = Workflow::load_from_file(&workflow_file).await?;
126
127 for (key, value) in variables {
129 workflow
130 .variables
131 .insert(key.clone(), crate::workflow::Variable::String(value));
132 }
133
134 println!("Workflow: {}", workflow.metadata.name);
135 println!("Version: {}", workflow.metadata.version);
136 if !workflow.metadata.description.is_empty() {
137 println!("Description: {}", workflow.metadata.description);
138 }
139 println!("Steps: {}", workflow.steps.len());
140 println!();
141
142 let state_dir = state_dir.unwrap_or_else(|| {
144 std::env::current_dir()
145 .unwrap()
146 .join(".voirs")
147 .join("workflow_state")
148 });
149
150 if resume {
152 let state_manager = StateManager::new(state_dir.clone());
153 if state_manager.exists(&workflow.metadata.name).await {
154 println!("Resuming workflow from previous state...");
155 println!();
156 }
157 }
158
159 let engine = WorkflowEngine::new(state_dir, max_parallel);
161
162 println!("Executing workflow...");
164 println!("─────────────────────────────────────────────────");
165
166 let result = engine.execute(workflow).await?;
167
168 println!("─────────────────────────────────────────────────");
169 println!();
170 println!("Workflow execution completed!");
171 println!();
172 println!("Statistics:");
173 println!(" Total steps: {}", result.stats.total_steps);
174 println!(" Successful: {}", result.stats.successful_steps);
175 println!(" Failed: {}", result.stats.failed_steps);
176 println!(" Skipped: {}", result.stats.skipped_steps);
177 println!(" Total duration: {}ms", result.stats.total_duration_ms);
178 println!(
179 " Average step duration: {}ms",
180 result.stats.avg_step_duration_ms
181 );
182 println!(" Total retries: {}", result.stats.total_retries);
183 println!(
184 " Success rate: {:.1}%",
185 result.stats.success_rate() * 100.0
186 );
187
188 if result.stats.is_successful() {
189 println!();
190 println!("✓ All steps completed successfully");
191 } else {
192 println!();
193 println!("✗ Some steps failed");
194 return Err(crate::error::CliError::Workflow(
195 "Workflow execution had failures".to_string(),
196 ));
197 }
198
199 Ok(())
200}
201
202pub async fn run_workflow_validate(
204 workflow_file: PathBuf,
205 detailed: bool,
206 format: String,
207) -> Result<()> {
208 println!("Validating workflow: {}", workflow_file.display());
209 println!();
210
211 let workflow = Workflow::load_from_file(&workflow_file).await?;
213
214 let validator = WorkflowValidator::new();
216
217 let result = validator.validate(&workflow)?;
219
220 match format.as_str() {
222 "json" => {
223 let json = serde_json::to_string_pretty(&result)?;
224 println!("{}", json);
225 }
226 "yaml" => {
227 let yaml = serde_yaml::to_string(&result).map_err(|e| {
228 crate::error::CliError::SerializationError(format!(
229 "Failed to serialize to YAML: {}",
230 e
231 ))
232 })?;
233 println!("{}", yaml);
234 }
235 _ => {
236 if result.valid {
238 println!("✓ Workflow validation passed");
239 } else {
240 println!("✗ Workflow validation failed");
241 }
242 println!();
243
244 if result.has_errors() {
245 println!("Errors:");
246 for error in &result.errors {
247 println!(" - {}", error);
248 }
249 println!();
250 }
251
252 if result.has_warnings() {
253 println!("Warnings:");
254 for warning in &result.warnings {
255 println!(" - {}", warning);
256 }
257 println!();
258 }
259
260 if detailed && result.valid {
261 println!("Workflow Details:");
262 println!(" Name: {}", workflow.metadata.name);
263 println!(" Version: {}", workflow.metadata.version);
264 if !workflow.metadata.description.is_empty() {
265 println!(" Description: {}", workflow.metadata.description);
266 }
267 println!(" Steps: {}", workflow.steps.len());
268 println!(" Variables: {}", workflow.variables.len());
269 println!(" Max parallel: {}", workflow.config.max_parallel);
270 }
271 }
272 }
273
274 if !result.valid {
275 return Err(crate::error::CliError::Workflow(
276 "Workflow validation failed".to_string(),
277 ));
278 }
279
280 Ok(())
281}
282
283pub async fn run_workflow_list(registry_dir: Option<PathBuf>, detailed: bool) -> Result<()> {
285 let registry_dir = registry_dir.unwrap_or_else(|| {
287 std::env::current_dir()
288 .unwrap()
289 .join(".voirs")
290 .join("workflows")
291 });
292
293 let registry = WorkflowRegistry::new(registry_dir.clone());
294
295 let count = registry.load_from_directory().await?;
297
298 if count == 0 {
299 println!("No workflows found in: {}", registry_dir.display());
300 println!();
301 println!("Create workflow definitions in this directory or specify a different path with --registry-dir");
302 return Ok(());
303 }
304
305 println!("Registered workflows ({} found):", count);
306 println!();
307
308 let workflow_names = registry.list().await;
309
310 for name in workflow_names {
311 if let Some(workflow) = registry.get(&name).await {
312 println!(" • {}", workflow.metadata.name);
313 if detailed {
314 println!(" Version: {}", workflow.metadata.version);
315 if !workflow.metadata.description.is_empty() {
316 println!(" Description: {}", workflow.metadata.description);
317 }
318 println!(" Steps: {}", workflow.steps.len());
319 println!(" Variables: {}", workflow.variables.len());
320 println!();
321 }
322 }
323 }
324
325 Ok(())
326}
327
328pub async fn run_workflow_status(
330 workflow_name: String,
331 state_dir: Option<PathBuf>,
332 format: String,
333) -> Result<()> {
334 let state_dir = state_dir.unwrap_or_else(|| {
336 std::env::current_dir()
337 .unwrap()
338 .join(".voirs")
339 .join("workflow_state")
340 });
341
342 let state_manager = StateManager::new(state_dir);
343
344 if !state_manager.exists(&workflow_name).await {
346 return Err(crate::error::CliError::Workflow(format!(
347 "No state found for workflow: {}",
348 workflow_name
349 )));
350 }
351
352 let state = state_manager.load(&workflow_name).await?;
354
355 match format.as_str() {
357 "json" => {
358 let json = serde_json::to_string_pretty(&state)?;
359 println!("{}", json);
360 }
361 "yaml" => {
362 let yaml = serde_yaml::to_string(&state).map_err(|e| {
363 crate::error::CliError::SerializationError(format!(
364 "Failed to serialize to YAML: {}",
365 e
366 ))
367 })?;
368 println!("{}", yaml);
369 }
370 _ => {
371 println!("Workflow: {}", state.workflow_name);
373 println!("State: {:?}", state.state);
374 println!();
375
376 println!("Progress:");
377 println!(" Completed steps: {}", state.completed_steps.len());
378 println!(" Skipped steps: {}", state.skipped_steps.len());
379 if let Some(ref current) = state.current_step {
380 println!(" Current step: {}", current);
381 }
382 println!(" Total retries: {}", state.total_retries);
383 println!();
384
385 println!("Variables: {}", state.variables.len());
386 for (key, value) in &state.variables {
387 println!(" {}: {:?}", key, value);
388 }
389 println!();
390
391 println!(
392 "Last updated: {}",
393 state.last_updated.format("%Y-%m-%d %H:%M:%S UTC")
394 );
395 println!();
396
397 if state.can_resume() {
398 println!("✓ This workflow can be resumed");
399 } else {
400 println!(
401 "✗ This workflow cannot be resumed (state: {:?})",
402 state.state
403 );
404 }
405 }
406 }
407
408 Ok(())
409}
410
411pub async fn run_workflow_resume(
413 workflow_name: String,
414 state_dir: Option<PathBuf>,
415 max_parallel: usize,
416) -> Result<()> {
417 let state_dir = state_dir.unwrap_or_else(|| {
419 std::env::current_dir()
420 .unwrap()
421 .join(".voirs")
422 .join("workflow_state")
423 });
424
425 let state_manager = StateManager::new(state_dir.clone());
426
427 if !state_manager.exists(&workflow_name).await {
429 return Err(crate::error::CliError::Workflow(format!(
430 "No state found for workflow: {}",
431 workflow_name
432 )));
433 }
434
435 let state = state_manager.load(&workflow_name).await?;
437
438 if !state.can_resume() {
440 return Err(crate::error::CliError::Workflow(format!(
441 "Workflow '{}' cannot be resumed (current state: {:?})",
442 workflow_name, state.state
443 )));
444 }
445
446 println!("Resuming workflow: {}", workflow_name);
447 println!("Current state: {:?}", state.state);
448 println!("Completed steps: {}", state.completed_steps.len());
449 println!();
450
451 println!("⚠ Resume functionality requires the original workflow definition file");
454 println!(" Use: voirs workflow execute <workflow-file> --resume");
455
456 Ok(())
457}
458
459pub async fn run_workflow_stop(
461 workflow_name: String,
462 state_dir: Option<PathBuf>,
463 force: bool,
464) -> Result<()> {
465 let state_dir = state_dir.unwrap_or_else(|| {
467 std::env::current_dir()
468 .unwrap()
469 .join(".voirs")
470 .join("workflow_state")
471 });
472
473 let state_manager = StateManager::new(state_dir);
474
475 if !state_manager.exists(&workflow_name).await {
477 return Err(crate::error::CliError::Workflow(format!(
478 "No state found for workflow: {}",
479 workflow_name
480 )));
481 }
482
483 let mut state = state_manager.load(&workflow_name).await?;
485
486 if state.state != ExecutionState::Running {
488 println!(
489 "Workflow '{}' is not running (state: {:?})",
490 workflow_name, state.state
491 );
492 return Ok(());
493 }
494
495 if force {
496 state_manager.delete(&workflow_name).await?;
498 println!(
499 "✓ Workflow '{}' forcefully stopped (state deleted)",
500 workflow_name
501 );
502 } else {
503 state.state = ExecutionState::Stopped;
505 state.last_updated = chrono::Utc::now();
506 state_manager.save(&workflow_name, &state).await?;
507 println!("✓ Workflow '{}' stopped gracefully", workflow_name);
508 println!(" State saved for potential resume");
509 }
510
511 Ok(())
512}