Skip to main content

voirs_cli/workflow/
executor.rs

1//! Step Executor
2//!
3//! Executes individual workflow steps with retry logic and state management.
4
5use super::{
6    definition::{Step, StepType, Workflow},
7    retry::RetryManager,
8    state::WorkflowState,
9    WorkflowStats,
10};
11use crate::error::CliError;
12
13type Result<T> = std::result::Result<T, CliError>;
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::time::Instant;
17
18/// Execution context for a workflow
19#[derive(Clone)]
20pub struct ExecutionContext {
21    /// The workflow being executed
22    workflow: Workflow,
23    /// Current variables
24    variables: HashMap<String, serde_json::Value>,
25    /// Completed steps and their results
26    completed: HashMap<String, StepResult>,
27    /// Skipped steps
28    skipped: Vec<String>,
29    /// Total retries performed
30    retries: usize,
31}
32
33impl ExecutionContext {
34    /// Create new execution context
35    pub fn new(workflow: Workflow) -> Self {
36        // Initialize variables from workflow definition
37        let mut variables = HashMap::new();
38        for (key, value) in &workflow.variables {
39            let json_value = match value {
40                super::definition::Variable::String(s) => serde_json::Value::String(s.clone()),
41                super::definition::Variable::Number(n) => serde_json::json!(n),
42                super::definition::Variable::Boolean(b) => serde_json::Value::Bool(*b),
43                super::definition::Variable::Array(arr) => serde_json::Value::Array(arr.clone()),
44                super::definition::Variable::Object(obj) => {
45                    serde_json::Value::Object(serde_json::Map::from_iter(obj.clone()))
46                }
47            };
48            variables.insert(key.clone(), json_value);
49        }
50
51        Self {
52            workflow,
53            variables,
54            completed: HashMap::new(),
55            skipped: Vec::new(),
56            retries: 0,
57        }
58    }
59
60    /// Get workflow reference
61    pub fn workflow(&self) -> &Workflow {
62        &self.workflow
63    }
64
65    /// Get current variables
66    pub fn get_variables(&self) -> HashMap<String, serde_json::Value> {
67        self.variables.clone()
68    }
69
70    /// Set a variable
71    pub fn set_variable(&mut self, name: String, value: serde_json::Value) {
72        self.variables.insert(name, value);
73    }
74
75    /// Record step completion
76    pub fn complete_step(&mut self, name: &str, result: StepResult) {
77        self.completed.insert(name.to_string(), result);
78    }
79
80    /// Record step skip
81    pub fn skip_step(&mut self, name: &str, reason: &str) {
82        self.skipped.push(name.to_string());
83        tracing::info!("Skipping step '{}': {}", name, reason);
84    }
85
86    /// Get completed steps
87    pub fn completed_steps(&self) -> &HashMap<String, StepResult> {
88        &self.completed
89    }
90
91    /// Get skipped steps
92    pub fn skipped_steps(&self) -> &[String] {
93        &self.skipped
94    }
95
96    /// Increment retry counter
97    pub fn increment_retries(&mut self) {
98        self.retries += 1;
99    }
100
101    /// Get total retries
102    pub fn total_retries(&self) -> usize {
103        self.retries
104    }
105
106    /// Resume from saved state
107    pub fn resume_from_state(&mut self, state: WorkflowState) {
108        self.variables = state.variables;
109        self.completed = state.completed_steps;
110        self.skipped = state.skipped_steps;
111        self.retries = state.total_retries;
112    }
113
114    /// Get current state
115    pub fn get_state(&self) -> WorkflowState {
116        WorkflowState {
117            workflow_name: self.workflow.metadata.name.clone(),
118            state: super::state::ExecutionState::Running,
119            variables: self.variables.clone(),
120            completed_steps: self.completed.clone(),
121            skipped_steps: self.skipped.clone(),
122            current_step: None,
123            total_retries: self.retries,
124            last_updated: chrono::Utc::now(),
125        }
126    }
127}
128
129/// Result of a step execution
130#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct StepResult {
132    /// Step name
133    pub step_name: String,
134    /// Success status
135    pub success: bool,
136    /// Result message
137    pub message: String,
138    /// Output data
139    pub output: HashMap<String, serde_json::Value>,
140    /// Execution duration in milliseconds
141    pub duration_ms: u64,
142    /// Number of retry attempts
143    pub attempts: usize,
144}
145
146impl StepResult {
147    /// Create success result
148    pub fn success(step_name: String, message: String, duration_ms: u64) -> Self {
149        Self {
150            step_name,
151            success: true,
152            message,
153            output: HashMap::new(),
154            duration_ms,
155            attempts: 1,
156        }
157    }
158
159    /// Create failure result
160    pub fn failure(step_name: String, message: String, duration_ms: u64) -> Self {
161        Self {
162            step_name,
163            success: false,
164            message,
165            output: HashMap::new(),
166            duration_ms,
167            attempts: 1,
168        }
169    }
170
171    /// Add output data
172    pub fn with_output(mut self, key: String, value: serde_json::Value) -> Self {
173        self.output.insert(key, value);
174        self
175    }
176
177    /// Set attempt count
178    pub fn with_attempts(mut self, attempts: usize) -> Self {
179        self.attempts = attempts;
180        self
181    }
182}
183
184/// Step executor
185pub struct StepExecutor {
186    retry_manager: RetryManager,
187}
188
189impl StepExecutor {
190    /// Create new step executor
191    pub fn new() -> Self {
192        Self {
193            retry_manager: RetryManager::new(),
194        }
195    }
196
197    /// Execute a step
198    pub async fn execute_step(
199        &self,
200        step: &Step,
201        context: &mut ExecutionContext,
202    ) -> Result<StepResult> {
203        let start_time = Instant::now();
204
205        // Handle for-each loop
206        if let Some(ref for_each_var) = step.for_each {
207            return self.execute_for_each(step, for_each_var, context).await;
208        }
209
210        // Execute with retry if configured
211        if let Some(ref retry_strategy) = step.retry {
212            let mut attempts = 0;
213            loop {
214                attempts += 1;
215                match self.execute_step_once(step, context).await {
216                    Ok(result) => {
217                        let duration = start_time.elapsed().as_millis() as u64;
218                        context.complete_step(&step.name, result.clone().with_attempts(attempts));
219                        return Ok(result.with_attempts(attempts));
220                    }
221                    Err(e) if attempts < retry_strategy.max_attempts => {
222                        context.increment_retries();
223                        let delay = self.retry_manager.calculate_delay(retry_strategy, attempts);
224                        tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
225                        tracing::warn!(
226                            "Step '{}' failed (attempt {}), retrying: {}",
227                            step.name,
228                            attempts,
229                            e
230                        );
231                        continue;
232                    }
233                    Err(e) => {
234                        let duration = start_time.elapsed().as_millis() as u64;
235                        let result = StepResult::failure(
236                            step.name.clone(),
237                            format!("Error: {}", e),
238                            duration,
239                        )
240                        .with_attempts(attempts);
241                        context.complete_step(&step.name, result.clone());
242                        return Ok(result);
243                    }
244                }
245            }
246        } else {
247            let result = self.execute_step_once(step, context).await;
248            let duration = start_time.elapsed().as_millis() as u64;
249
250            match result {
251                Ok(mut result) => {
252                    result.duration_ms = duration;
253                    context.complete_step(&step.name, result.clone());
254                    Ok(result)
255                }
256                Err(e) => {
257                    let result =
258                        StepResult::failure(step.name.clone(), format!("Error: {}", e), duration);
259                    context.complete_step(&step.name, result.clone());
260                    Ok(result)
261                }
262            }
263        }
264    }
265
266    /// Execute step once (without retry)
267    async fn execute_step_once(
268        &self,
269        step: &Step,
270        context: &ExecutionContext,
271    ) -> Result<StepResult> {
272        let start_time = Instant::now();
273
274        // Resolve parameters with variable substitution
275        let resolved_params =
276            self.resolve_parameters(&step.parameters, &context.get_variables())?;
277
278        // Execute based on step type
279        let result = match step.step_type {
280            StepType::Synthesize => self.execute_synthesize(step, &resolved_params).await,
281            StepType::Validate => self.execute_validate(step, &resolved_params).await,
282            StepType::FileOp => self.execute_file_op(step, &resolved_params).await,
283            StepType::Command => self.execute_command(step, &resolved_params).await,
284            StepType::Script => self.execute_script(step, &resolved_params).await,
285            StepType::Branch => self.execute_branch(step, &resolved_params).await,
286            StepType::Loop => self.execute_loop(step, &resolved_params).await,
287            StepType::Workflow => self.execute_subworkflow(step, &resolved_params).await,
288            StepType::Wait => self.execute_wait(step, &resolved_params).await,
289            StepType::Notify => self.execute_notify(step, &resolved_params).await,
290        }?;
291
292        let duration = start_time.elapsed().as_millis() as u64;
293
294        Ok(StepResult::success(step.name.clone(), result, duration))
295    }
296
297    /// Execute for-each loop
298    async fn execute_for_each(
299        &self,
300        step: &Step,
301        for_each_var: &str,
302        context: &mut ExecutionContext,
303    ) -> Result<StepResult> {
304        let variables = context.get_variables();
305
306        // Resolve for-each variable
307        let var_name = for_each_var
308            .strip_prefix("${")
309            .and_then(|s| s.strip_suffix('}'))
310            .unwrap_or(for_each_var);
311
312        let items = variables
313            .get(var_name)
314            .and_then(|v| v.as_array())
315            .ok_or_else(|| {
316                CliError::Workflow(format!(
317                    "For-each variable '{}' not found or not an array",
318                    var_name
319                ))
320            })?;
321
322        let start_time = Instant::now();
323        let mut all_results = Vec::new();
324
325        for (idx, item) in items.iter().enumerate() {
326            // Create new step with current item as variable
327            let mut step_clone = step.clone();
328            step_clone.for_each = None;
329            step_clone.name = format!("{}[{}]", step.name, idx);
330
331            // Set loop variable
332            context.set_variable(format!("{}_item", var_name), item.clone());
333            context.set_variable(format!("{}_index", var_name), serde_json::json!(idx));
334
335            let result = self.execute_step_once(&step_clone, context).await?;
336            all_results.push(result);
337        }
338
339        let duration = start_time.elapsed().as_millis() as u64;
340        let success = all_results.iter().all(|r| r.success);
341
342        Ok(StepResult {
343            step_name: step.name.clone(),
344            success,
345            message: format!("Executed {} iterations", all_results.len()),
346            output: HashMap::new(),
347            duration_ms: duration,
348            attempts: 1,
349        })
350    }
351
352    /// Resolve parameters with variable substitution
353    fn resolve_parameters(
354        &self,
355        params: &HashMap<String, serde_json::Value>,
356        variables: &HashMap<String, serde_json::Value>,
357    ) -> Result<HashMap<String, serde_json::Value>> {
358        let mut resolved = HashMap::new();
359
360        for (key, value) in params {
361            let resolved_value = self.resolve_value(value, variables);
362            resolved.insert(key.clone(), resolved_value);
363        }
364
365        Ok(resolved)
366    }
367
368    /// Resolve a single value with variable substitution
369    fn resolve_value(
370        &self,
371        value: &serde_json::Value,
372        variables: &HashMap<String, serde_json::Value>,
373    ) -> serde_json::Value {
374        match value {
375            serde_json::Value::String(s) => {
376                if let Some(var_name) = s.strip_prefix("${").and_then(|s| s.strip_suffix('}')) {
377                    variables
378                        .get(var_name)
379                        .cloned()
380                        .unwrap_or(serde_json::Value::Null)
381                } else {
382                    value.clone()
383                }
384            }
385            serde_json::Value::Array(arr) => serde_json::Value::Array(
386                arr.iter()
387                    .map(|v| self.resolve_value(v, variables))
388                    .collect(),
389            ),
390            serde_json::Value::Object(obj) => serde_json::Value::Object(
391                obj.iter()
392                    .map(|(k, v)| (k.clone(), self.resolve_value(v, variables)))
393                    .collect(),
394            ),
395            _ => value.clone(),
396        }
397    }
398
399    // Step type implementations (placeholders for actual implementation)
400
401    async fn execute_synthesize(
402        &self,
403        _step: &Step,
404        _params: &HashMap<String, serde_json::Value>,
405    ) -> Result<String> {
406        Ok("Synthesis completed".to_string())
407    }
408
409    async fn execute_validate(
410        &self,
411        _step: &Step,
412        _params: &HashMap<String, serde_json::Value>,
413    ) -> Result<String> {
414        Ok("Validation passed".to_string())
415    }
416
417    async fn execute_file_op(
418        &self,
419        _step: &Step,
420        _params: &HashMap<String, serde_json::Value>,
421    ) -> Result<String> {
422        Ok("File operation completed".to_string())
423    }
424
425    async fn execute_command(
426        &self,
427        _step: &Step,
428        _params: &HashMap<String, serde_json::Value>,
429    ) -> Result<String> {
430        Ok("Command executed".to_string())
431    }
432
433    async fn execute_script(
434        &self,
435        _step: &Step,
436        _params: &HashMap<String, serde_json::Value>,
437    ) -> Result<String> {
438        Ok("Script executed".to_string())
439    }
440
441    async fn execute_branch(
442        &self,
443        _step: &Step,
444        _params: &HashMap<String, serde_json::Value>,
445    ) -> Result<String> {
446        Ok("Branch evaluated".to_string())
447    }
448
449    async fn execute_loop(
450        &self,
451        _step: &Step,
452        _params: &HashMap<String, serde_json::Value>,
453    ) -> Result<String> {
454        Ok("Loop completed".to_string())
455    }
456
457    async fn execute_subworkflow(
458        &self,
459        _step: &Step,
460        _params: &HashMap<String, serde_json::Value>,
461    ) -> Result<String> {
462        Ok("Sub-workflow completed".to_string())
463    }
464
465    async fn execute_wait(
466        &self,
467        _step: &Step,
468        params: &HashMap<String, serde_json::Value>,
469    ) -> Result<String> {
470        if let Some(duration) = params.get("duration_ms") {
471            if let Some(ms) = duration.as_u64() {
472                tokio::time::sleep(tokio::time::Duration::from_millis(ms)).await;
473            }
474        }
475        Ok("Wait completed".to_string())
476    }
477
478    async fn execute_notify(
479        &self,
480        _step: &Step,
481        _params: &HashMap<String, serde_json::Value>,
482    ) -> Result<String> {
483        Ok("Notification sent".to_string())
484    }
485}
486
487impl Default for StepExecutor {
488    fn default() -> Self {
489        Self::new()
490    }
491}
492
493/// Overall workflow execution result
494#[derive(Debug, Clone, Serialize, Deserialize)]
495pub struct ExecutionResult {
496    /// Workflow name
497    pub workflow_name: String,
498    /// Success status
499    pub success: bool,
500    /// Result message
501    pub message: String,
502    /// Execution statistics
503    pub stats: WorkflowStats,
504}
505
506impl ExecutionResult {
507    /// Create success result
508    pub fn success(workflow_name: String, message: String, stats: WorkflowStats) -> Self {
509        Self {
510            workflow_name,
511            success: true,
512            message,
513            stats,
514        }
515    }
516
517    /// Create failure result
518    pub fn failure(workflow_name: String, message: String, stats: WorkflowStats) -> Self {
519        Self {
520            workflow_name,
521            success: false,
522            message,
523            stats,
524        }
525    }
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531    use crate::workflow::definition::StepType;
532
533    #[test]
534    fn test_execution_context_creation() {
535        let workflow = Workflow::new("test", "1.0", "Test workflow");
536        let context = ExecutionContext::new(workflow);
537
538        assert_eq!(context.completed_steps().len(), 0);
539        assert_eq!(context.skipped_steps().len(), 0);
540        assert_eq!(context.total_retries(), 0);
541    }
542
543    #[test]
544    fn test_execution_context_variables() {
545        let mut workflow = Workflow::new("test", "1.0", "Test workflow");
546        workflow.add_variable(
547            "test_var".to_string(),
548            super::super::definition::Variable::String("test_value".to_string()),
549        );
550
551        let context = ExecutionContext::new(workflow);
552        let variables = context.get_variables();
553
554        assert_eq!(variables.len(), 1);
555        assert_eq!(
556            variables.get("test_var").unwrap().as_str().unwrap(),
557            "test_value"
558        );
559    }
560
561    #[test]
562    fn test_step_result_creation() {
563        let result = StepResult::success("step1".to_string(), "Success".to_string(), 100);
564
565        assert!(result.success);
566        assert_eq!(result.step_name, "step1");
567        assert_eq!(result.duration_ms, 100);
568    }
569
570    #[test]
571    fn test_step_result_with_output() {
572        let result = StepResult::success("step1".to_string(), "Success".to_string(), 100)
573            .with_output("key1".to_string(), serde_json::json!("value1"));
574
575        assert_eq!(result.output.len(), 1);
576        assert_eq!(
577            result.output.get("key1").unwrap().as_str().unwrap(),
578            "value1"
579        );
580    }
581
582    #[tokio::test]
583    async fn test_step_executor_creation() {
584        let executor = StepExecutor::new();
585        // Just verify creation works
586        assert!(true);
587    }
588
589    #[test]
590    fn test_execution_result_success() {
591        let stats = WorkflowStats::new();
592        let result = ExecutionResult::success("test".to_string(), "Done".to_string(), stats);
593
594        assert!(result.success);
595        assert_eq!(result.workflow_name, "test");
596    }
597
598    #[test]
599    fn test_execution_result_failure() {
600        let stats = WorkflowStats::new();
601        let result = ExecutionResult::failure("test".to_string(), "Failed".to_string(), stats);
602
603        assert!(!result.success);
604        assert_eq!(result.message, "Failed");
605    }
606}