Skip to main content

voirs_cli/workflow/
mod.rs

1//! Workflow Automation System
2//!
3//! This module provides a sophisticated workflow engine for defining and executing
4//! complex multi-step synthesis pipelines with conditional logic, error recovery,
5//! and composition capabilities.
6//!
7//! # Features
8//!
9//! - **Declarative Workflow Definition**: Define workflows using YAML/JSON/TOML
10//! - **Conditional Execution**: Branch based on quality metrics, file size, duration
11//! - **Error Recovery**: Automatic retry with exponential backoff, fallback strategies
12//! - **Workflow Composition**: Combine workflows, create sub-workflows
13//! - **State Management**: Persistent workflow state, resume capability
14//! - **Validation**: Pre-execution workflow validation and optimization
15//! - **Parallel Execution**: Run independent steps in parallel
16//! - **Variable Substitution**: Dynamic parameter substitution
17//!
18//! # Example Workflow
19//!
20//! ```yaml
21//! name: "Multi-Voice Production Pipeline"
22//! version: "1.0"
23//! variables:
24//!   quality: "high"
25//!   voices: ["en-US-neural-1", "en-US-neural-2"]
26//!
27//! steps:
28//!   - name: "synthesis"
29//!     type: "synthesize"
30//!     parameters:
31//!       text: "${input_text}"
32//!       voice: "${voice}"
33//!       quality: "${quality}"
34//!     for_each: "${voices}"
35//!     retry:
36//!       max_attempts: 3
37//!       backoff: "exponential"
38//!
39//!   - name: "quality_check"
40//!     type: "validate"
41//!     condition: "${synthesis.success}"
42//!     parameters:
43//!       min_mos: 4.0
44//!       max_duration_diff: 0.1
45//!
46//!   - name: "fallback"
47//!     type: "synthesize"
48//!     condition: "${quality_check.failed}"
49//!     parameters:
50//!       voice: "fallback-voice"
51//!       quality: "medium"
52//! ```
53
54pub mod definition;
55pub mod engine;
56pub mod executor;
57pub mod retry;
58pub mod state;
59pub mod validation;
60pub mod variables;
61
62pub use definition::{
63    Condition, ConditionOperator, RetryStrategy, Step, StepDependency, StepType, Variable,
64    Workflow, WorkflowMetadata,
65};
66pub use engine::WorkflowEngine;
67pub use executor::{ExecutionContext, ExecutionResult, StepExecutor, StepResult};
68pub use retry::{BackoffStrategy, RetryConfig, RetryManager};
69pub use state::{ExecutionState, StateManager, WorkflowState};
70pub use validation::{ValidationError, ValidationResult, WorkflowValidator};
71pub use variables::{VariableResolver, VariableScope};
72
73use crate::error::Result;
74use serde::{Deserialize, Serialize};
75use std::collections::HashMap;
76use std::path::PathBuf;
77use std::sync::Arc;
78use tokio::sync::RwLock;
79
80/// Workflow execution statistics
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct WorkflowStats {
83    /// Total steps executed
84    pub total_steps: usize,
85    /// Successful steps
86    pub successful_steps: usize,
87    /// Failed steps
88    pub failed_steps: usize,
89    /// Skipped steps (due to conditions)
90    pub skipped_steps: usize,
91    /// Total execution time in milliseconds
92    pub total_duration_ms: u64,
93    /// Average step duration in milliseconds
94    pub avg_step_duration_ms: u64,
95    /// Number of retries performed
96    pub total_retries: usize,
97}
98
99impl WorkflowStats {
100    /// Create new statistics
101    pub fn new() -> Self {
102        Self {
103            total_steps: 0,
104            successful_steps: 0,
105            failed_steps: 0,
106            skipped_steps: 0,
107            total_duration_ms: 0,
108            avg_step_duration_ms: 0,
109            total_retries: 0,
110        }
111    }
112
113    /// Calculate success rate
114    pub fn success_rate(&self) -> f64 {
115        if self.total_steps == 0 {
116            0.0
117        } else {
118            self.successful_steps as f64 / self.total_steps as f64
119        }
120    }
121
122    /// Check if workflow completed successfully
123    pub fn is_successful(&self) -> bool {
124        self.failed_steps == 0 && self.total_steps > 0
125    }
126}
127
128impl Default for WorkflowStats {
129    fn default() -> Self {
130        Self::new()
131    }
132}
133
134/// Workflow registry for managing multiple workflows
135pub struct WorkflowRegistry {
136    workflows: Arc<RwLock<HashMap<String, Workflow>>>,
137    storage_dir: PathBuf,
138}
139
140impl WorkflowRegistry {
141    /// Create new registry
142    pub fn new(storage_dir: PathBuf) -> Self {
143        Self {
144            workflows: Arc::new(RwLock::new(HashMap::new())),
145            storage_dir,
146        }
147    }
148
149    /// Register a workflow
150    pub async fn register(&self, workflow: Workflow) -> Result<()> {
151        let mut workflows = self.workflows.write().await;
152        let name = workflow.metadata.name.clone();
153        workflows.insert(name, workflow);
154        Ok(())
155    }
156
157    /// Get a workflow by name
158    pub async fn get(&self, name: &str) -> Option<Workflow> {
159        let workflows = self.workflows.read().await;
160        workflows.get(name).cloned()
161    }
162
163    /// List all registered workflows
164    pub async fn list(&self) -> Vec<String> {
165        let workflows = self.workflows.read().await;
166        workflows.keys().cloned().collect()
167    }
168
169    /// Remove a workflow
170    pub async fn remove(&self, name: &str) -> Result<()> {
171        let mut workflows = self.workflows.write().await;
172        workflows.remove(name);
173        Ok(())
174    }
175
176    /// Load workflows from directory
177    pub async fn load_from_directory(&self) -> Result<usize> {
178        let mut count = 0;
179        if !self.storage_dir.exists() {
180            tokio::fs::create_dir_all(&self.storage_dir).await?;
181            return Ok(0);
182        }
183
184        let mut entries = tokio::fs::read_dir(&self.storage_dir).await?;
185        while let Some(entry) = entries.next_entry().await? {
186            let path = entry.path();
187            if path
188                .extension()
189                .is_some_and(|ext| ext == "yaml" || ext == "json")
190            {
191                if let Ok(workflow) = Workflow::load_from_file(&path).await {
192                    self.register(workflow).await?;
193                    count += 1;
194                }
195            }
196        }
197
198        Ok(count)
199    }
200
201    /// Save all workflows to directory
202    pub async fn save_all(&self) -> Result<usize> {
203        tokio::fs::create_dir_all(&self.storage_dir).await?;
204
205        let workflows = self.workflows.read().await;
206        let mut count = 0;
207
208        for workflow in workflows.values() {
209            let filename = format!("{}.yaml", workflow.metadata.name);
210            let path = self.storage_dir.join(filename);
211            workflow.save_to_file(&path).await?;
212            count += 1;
213        }
214
215        Ok(count)
216    }
217}
218
219#[cfg(test)]
220mod tests {
221    use super::*;
222    use std::env;
223
224    #[test]
225    fn test_workflow_stats_creation() {
226        let stats = WorkflowStats::new();
227        assert_eq!(stats.total_steps, 0);
228        assert_eq!(stats.successful_steps, 0);
229        assert_eq!(stats.failed_steps, 0);
230        assert_eq!(stats.success_rate(), 0.0);
231        assert!(!stats.is_successful());
232    }
233
234    #[test]
235    fn test_workflow_stats_success_rate() {
236        let stats = WorkflowStats {
237            total_steps: 10,
238            successful_steps: 8,
239            failed_steps: 2,
240            skipped_steps: 0,
241            total_duration_ms: 1000,
242            avg_step_duration_ms: 100,
243            total_retries: 3,
244        };
245
246        assert_eq!(stats.success_rate(), 0.8);
247        assert!(!stats.is_successful()); // Has failures
248    }
249
250    #[test]
251    fn test_workflow_stats_full_success() {
252        let stats = WorkflowStats {
253            total_steps: 5,
254            successful_steps: 5,
255            failed_steps: 0,
256            skipped_steps: 0,
257            total_duration_ms: 500,
258            avg_step_duration_ms: 100,
259            total_retries: 0,
260        };
261
262        assert_eq!(stats.success_rate(), 1.0);
263        assert!(stats.is_successful());
264    }
265
266    #[tokio::test]
267    async fn test_workflow_registry_creation() {
268        let temp_dir = env::temp_dir().join("voirs_workflow_test");
269        let registry = WorkflowRegistry::new(temp_dir);
270
271        let workflows = registry.list().await;
272        assert_eq!(workflows.len(), 0);
273    }
274
275    #[tokio::test]
276    async fn test_workflow_registry_register_and_get() {
277        let temp_dir = env::temp_dir().join("voirs_workflow_test_2");
278        let registry = WorkflowRegistry::new(temp_dir);
279
280        let workflow = Workflow::new("test-workflow", "1.0", "Test workflow");
281        registry.register(workflow.clone()).await.unwrap();
282
283        let retrieved = registry.get("test-workflow").await;
284        assert!(retrieved.is_some());
285        assert_eq!(retrieved.unwrap().metadata.name, "test-workflow");
286    }
287
288    #[tokio::test]
289    async fn test_workflow_registry_list() {
290        let temp_dir = env::temp_dir().join("voirs_workflow_test_3");
291        let registry = WorkflowRegistry::new(temp_dir);
292
293        let workflow1 = Workflow::new("workflow-1", "1.0", "First workflow");
294        let workflow2 = Workflow::new("workflow-2", "1.0", "Second workflow");
295
296        registry.register(workflow1).await.unwrap();
297        registry.register(workflow2).await.unwrap();
298
299        let workflows = registry.list().await;
300        assert_eq!(workflows.len(), 2);
301        assert!(workflows.contains(&"workflow-1".to_string()));
302        assert!(workflows.contains(&"workflow-2".to_string()));
303    }
304
305    #[tokio::test]
306    async fn test_workflow_registry_remove() {
307        let temp_dir = env::temp_dir().join("voirs_workflow_test_4");
308        let registry = WorkflowRegistry::new(temp_dir);
309
310        let workflow = Workflow::new("removable-workflow", "1.0", "Test removal");
311        registry.register(workflow).await.unwrap();
312
313        assert!(registry.get("removable-workflow").await.is_some());
314
315        registry.remove("removable-workflow").await.unwrap();
316
317        assert!(registry.get("removable-workflow").await.is_none());
318    }
319}