ricecoder_workflows/
engine.rs

1//! Workflow execution engine
2
3use crate::error::{WorkflowError, WorkflowResult};
4use crate::models::{Workflow, WorkflowState};
5use crate::state::StateManager;
6use std::collections::{HashMap, HashSet, VecDeque};
7use uuid::Uuid;
8
9/// Central coordinator for workflow execution
10///
11/// Manages workflow lifecycle (create, start, pause, resume, cancel) and tracks
12/// active workflows. Handles step execution orchestration and dependency resolution.
13pub struct WorkflowEngine {
14    /// Active workflow executions
15    active_workflows: HashMap<String, WorkflowState>,
16}
17
18impl Default for WorkflowEngine {
19    fn default() -> Self {
20        Self::new()
21    }
22}
23
24impl WorkflowEngine {
25    /// Create a new workflow engine
26    pub fn new() -> Self {
27        WorkflowEngine {
28            active_workflows: HashMap::new(),
29        }
30    }
31
32    /// Create a new workflow execution
33    ///
34    /// Creates a new execution state for the given workflow and tracks it.
35    pub fn create_execution(&mut self, workflow: &Workflow) -> WorkflowResult<String> {
36        let state = StateManager::create_state(workflow);
37        let execution_id = Uuid::new_v4().to_string();
38        self.active_workflows.insert(execution_id.clone(), state);
39        Ok(execution_id)
40    }
41
42    /// Start workflow execution
43    ///
44    /// Transitions the workflow from pending to running state.
45    pub fn start_execution(&mut self, execution_id: &str) -> WorkflowResult<()> {
46        let state = self.active_workflows.get_mut(execution_id).ok_or_else(|| {
47            WorkflowError::NotFound(format!("Execution not found: {}", execution_id))
48        })?;
49
50        StateManager::start_workflow(state);
51        Ok(())
52    }
53
54    /// Pause workflow execution
55    ///
56    /// Pauses the workflow at the current step, allowing resumption later.
57    pub fn pause_execution(&mut self, execution_id: &str) -> WorkflowResult<()> {
58        let state = self.active_workflows.get_mut(execution_id).ok_or_else(|| {
59            WorkflowError::NotFound(format!("Execution not found: {}", execution_id))
60        })?;
61
62        let _ = StateManager::pause_workflow(state);
63        Ok(())
64    }
65
66    /// Resume workflow execution
67    ///
68    /// Resumes a paused workflow from the last completed step.
69    pub fn resume_execution(&mut self, execution_id: &str) -> WorkflowResult<()> {
70        let state = self.active_workflows.get_mut(execution_id).ok_or_else(|| {
71            WorkflowError::NotFound(format!("Execution not found: {}", execution_id))
72        })?;
73
74        let _ = StateManager::resume_workflow(state);
75        Ok(())
76    }
77
78    /// Cancel workflow execution
79    ///
80    /// Cancels the workflow, stopping any further execution.
81    pub fn cancel_execution(&mut self, execution_id: &str) -> WorkflowResult<()> {
82        let state = self.active_workflows.get_mut(execution_id).ok_or_else(|| {
83            WorkflowError::NotFound(format!("Execution not found: {}", execution_id))
84        })?;
85
86        StateManager::cancel_workflow(state);
87        Ok(())
88    }
89
90    /// Get the current state of a workflow execution
91    pub fn get_execution_state(&self, execution_id: &str) -> WorkflowResult<WorkflowState> {
92        self.active_workflows
93            .get(execution_id)
94            .cloned()
95            .ok_or_else(|| {
96                WorkflowError::NotFound(format!("Execution not found: {}", execution_id))
97            })
98    }
99
100    /// Get execution order for workflow steps
101    ///
102    /// Builds execution order from dependency graph using topological sort.
103    /// Returns error if circular dependencies are detected.
104    pub fn get_execution_order(workflow: &Workflow) -> WorkflowResult<Vec<String>> {
105        Self::resolve_dependencies(workflow)
106    }
107
108    /// Resolve step dependencies and build execution order
109    ///
110    /// Uses topological sort to determine the order in which steps should execute
111    /// based on their dependencies. Detects and reports circular dependencies.
112    fn resolve_dependencies(workflow: &Workflow) -> WorkflowResult<Vec<String>> {
113        let mut order = Vec::new();
114        let mut completed = HashSet::new();
115        let mut queue = VecDeque::new();
116
117        // Find all steps with no dependencies
118        for step in &workflow.steps {
119            if step.dependencies.is_empty() {
120                queue.push_back(step.id.clone());
121            }
122        }
123
124        // Build step map for quick lookup
125        let step_map: HashMap<_, _> = workflow.steps.iter().map(|s| (&s.id, s)).collect();
126
127        // Topological sort
128        while let Some(step_id) = queue.pop_front() {
129            if completed.contains(&step_id) {
130                continue;
131            }
132
133            // Check if all dependencies are completed
134            if let Some(step) = step_map.get(&step_id) {
135                let all_deps_completed =
136                    step.dependencies.iter().all(|dep| completed.contains(dep));
137
138                if all_deps_completed {
139                    order.push(step_id.clone());
140                    completed.insert(step_id.clone());
141
142                    // Add steps that depend on this one
143                    for other_step in &workflow.steps {
144                        if other_step.dependencies.contains(&step_id)
145                            && !completed.contains(&other_step.id)
146                        {
147                            queue.push_back(other_step.id.clone());
148                        }
149                    }
150                } else {
151                    // Re-queue if dependencies not met
152                    queue.push_back(step_id);
153                }
154            }
155        }
156
157        if order.len() != workflow.steps.len() {
158            return Err(WorkflowError::Invalid(
159                "Could not determine execution order for all steps".to_string(),
160            ));
161        }
162
163        Ok(order)
164    }
165
166    /// Check if a step can be executed
167    ///
168    /// A step can be executed if all its dependencies have been completed.
169    pub fn can_execute_step(
170        workflow: &Workflow,
171        state: &WorkflowState,
172        step_id: &str,
173    ) -> WorkflowResult<bool> {
174        // Find the step
175        let step = workflow
176            .steps
177            .iter()
178            .find(|s| s.id == step_id)
179            .ok_or_else(|| WorkflowError::NotFound(format!("Step not found: {}", step_id)))?;
180
181        // Check if all dependencies are completed
182        for dep in &step.dependencies {
183            if !state.completed_steps.contains(dep) {
184                return Ok(false);
185            }
186        }
187
188        Ok(true)
189    }
190
191    /// Get next executable step
192    ///
193    /// Returns the next step that can be executed based on completed dependencies.
194    /// Returns None if all steps are completed or no steps are ready.
195    pub fn get_next_step(
196        workflow: &Workflow,
197        state: &WorkflowState,
198    ) -> WorkflowResult<Option<String>> {
199        for step in &workflow.steps {
200            if !state.completed_steps.contains(&step.id)
201                && !state.step_results.contains_key(&step.id)
202                && Self::can_execute_step(workflow, state, &step.id)?
203            {
204                return Ok(Some(step.id.clone()));
205            }
206        }
207
208        Ok(None)
209    }
210
211    /// Wait for a step's dependencies to complete
212    ///
213    /// Blocks until all dependencies for the given step are completed.
214    /// Returns error if the step is not found or dependencies cannot be resolved.
215    pub fn wait_for_dependencies(
216        workflow: &Workflow,
217        state: &WorkflowState,
218        step_id: &str,
219    ) -> WorkflowResult<()> {
220        let step = workflow
221            .steps
222            .iter()
223            .find(|s| s.id == step_id)
224            .ok_or_else(|| WorkflowError::NotFound(format!("Step not found: {}", step_id)))?;
225
226        // Check if all dependencies are completed
227        for dep in &step.dependencies {
228            if !state.completed_steps.contains(dep) {
229                return Err(WorkflowError::StateError(format!(
230                    "Dependency {} not completed for step {}",
231                    dep, step_id
232                )));
233            }
234        }
235
236        Ok(())
237    }
238
239    /// Complete workflow execution
240    pub fn complete_execution(&mut self, execution_id: &str) -> WorkflowResult<()> {
241        let state = self.active_workflows.get_mut(execution_id).ok_or_else(|| {
242            WorkflowError::NotFound(format!("Execution not found: {}", execution_id))
243        })?;
244
245        StateManager::complete_workflow(state);
246        Ok(())
247    }
248
249    /// Fail workflow execution
250    pub fn fail_execution(&mut self, execution_id: &str) -> WorkflowResult<()> {
251        let state = self.active_workflows.get_mut(execution_id).ok_or_else(|| {
252            WorkflowError::NotFound(format!("Execution not found: {}", execution_id))
253        })?;
254
255        StateManager::fail_workflow(state);
256        Ok(())
257    }
258
259    /// Remove a completed execution from tracking
260    pub fn remove_execution(&mut self, execution_id: &str) -> WorkflowResult<WorkflowState> {
261        self.active_workflows.remove(execution_id).ok_or_else(|| {
262            WorkflowError::NotFound(format!("Execution not found: {}", execution_id))
263        })
264    }
265
266    /// Get all active executions
267    pub fn get_active_executions(&self) -> Vec<String> {
268        self.active_workflows.keys().cloned().collect()
269    }
270
271    /// Get count of active executions
272    pub fn active_execution_count(&self) -> usize {
273        self.active_workflows.len()
274    }
275}
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280    use crate::models::{
281        ErrorAction, RiskFactors, StepType, WorkflowConfig, WorkflowStatus, WorkflowStep,
282    };
283
284    fn create_test_workflow_with_deps() -> Workflow {
285        Workflow {
286            id: "test-workflow".to_string(),
287            name: "Test Workflow".to_string(),
288            description: "A test workflow".to_string(),
289            parameters: vec![],
290            steps: vec![
291                WorkflowStep {
292                    id: "step1".to_string(),
293                    name: "Step 1".to_string(),
294                    step_type: StepType::Agent(crate::models::AgentStep {
295                        agent_id: "test-agent".to_string(),
296                        task: "test-task".to_string(),
297                    }),
298                    config: crate::models::StepConfig {
299                        config: serde_json::json!({}),
300                    },
301                    dependencies: vec![],
302                    approval_required: false,
303                    on_error: ErrorAction::Fail,
304                    risk_score: None,
305                    risk_factors: RiskFactors::default(),
306                },
307                WorkflowStep {
308                    id: "step2".to_string(),
309                    name: "Step 2".to_string(),
310                    step_type: StepType::Agent(crate::models::AgentStep {
311                        agent_id: "test-agent".to_string(),
312                        task: "test-task".to_string(),
313                    }),
314                    config: crate::models::StepConfig {
315                        config: serde_json::json!({}),
316                    },
317                    dependencies: vec!["step1".to_string()],
318                    approval_required: false,
319                    on_error: ErrorAction::Fail,
320                    risk_score: None,
321                    risk_factors: RiskFactors::default(),
322                },
323                WorkflowStep {
324                    id: "step3".to_string(),
325                    name: "Step 3".to_string(),
326                    step_type: StepType::Agent(crate::models::AgentStep {
327                        agent_id: "test-agent".to_string(),
328                        task: "test-task".to_string(),
329                    }),
330                    config: crate::models::StepConfig {
331                        config: serde_json::json!({}),
332                    },
333                    dependencies: vec!["step1".to_string(), "step2".to_string()],
334                    approval_required: false,
335                    on_error: ErrorAction::Fail,
336                    risk_score: None,
337                    risk_factors: RiskFactors::default(),
338                },
339            ],
340            config: WorkflowConfig {
341                timeout_ms: None,
342                max_parallel: None,
343            },
344        }
345    }
346
347    #[test]
348    fn test_create_engine() {
349        let engine = WorkflowEngine::new();
350        assert_eq!(engine.active_execution_count(), 0);
351    }
352
353    #[test]
354    fn test_create_execution() {
355        let mut engine = WorkflowEngine::new();
356        let workflow = create_test_workflow_with_deps();
357
358        let execution_id = engine.create_execution(&workflow).unwrap();
359        assert!(!execution_id.is_empty());
360        assert_eq!(engine.active_execution_count(), 1);
361    }
362
363    #[test]
364    fn test_start_execution() {
365        let mut engine = WorkflowEngine::new();
366        let workflow = create_test_workflow_with_deps();
367
368        let execution_id = engine.create_execution(&workflow).unwrap();
369        engine.start_execution(&execution_id).unwrap();
370
371        let state = engine.get_execution_state(&execution_id).unwrap();
372        assert_eq!(state.status, WorkflowStatus::Running);
373    }
374
375    #[test]
376    fn test_get_execution_order() {
377        let workflow = create_test_workflow_with_deps();
378        let order = WorkflowEngine::get_execution_order(&workflow).unwrap();
379
380        assert_eq!(order.len(), 3);
381        assert_eq!(order[0], "step1");
382        assert_eq!(order[1], "step2");
383        assert_eq!(order[2], "step3");
384    }
385
386    #[test]
387    fn test_can_execute_step() {
388        let workflow = create_test_workflow_with_deps();
389        let state = StateManager::create_state(&workflow);
390
391        // step1 can execute (no dependencies)
392        assert!(WorkflowEngine::can_execute_step(&workflow, &state, "step1").unwrap());
393
394        // step2 cannot execute (depends on step1)
395        assert!(!WorkflowEngine::can_execute_step(&workflow, &state, "step2").unwrap());
396
397        // Create a new state with step1 completed
398        let mut state2 = StateManager::create_state(&workflow);
399        state2.completed_steps.push("step1".to_string());
400
401        // Now step2 can execute
402        assert!(WorkflowEngine::can_execute_step(&workflow, &state2, "step2").unwrap());
403    }
404
405    #[test]
406    fn test_get_next_step() {
407        let workflow = create_test_workflow_with_deps();
408        let state = StateManager::create_state(&workflow);
409
410        let next = WorkflowEngine::get_next_step(&workflow, &state).unwrap();
411        assert_eq!(next, Some("step1".to_string()));
412    }
413
414    #[test]
415    fn test_pause_and_resume_execution() {
416        let mut engine = WorkflowEngine::new();
417        let workflow = create_test_workflow_with_deps();
418
419        let execution_id = engine.create_execution(&workflow).unwrap();
420        engine.start_execution(&execution_id).unwrap();
421
422        engine.pause_execution(&execution_id).unwrap();
423        let state = engine.get_execution_state(&execution_id).unwrap();
424        assert_eq!(state.status, WorkflowStatus::Paused);
425
426        engine.resume_execution(&execution_id).unwrap();
427        let state = engine.get_execution_state(&execution_id).unwrap();
428        assert_eq!(state.status, WorkflowStatus::Running);
429    }
430
431    #[test]
432    fn test_cancel_execution() {
433        let mut engine = WorkflowEngine::new();
434        let workflow = create_test_workflow_with_deps();
435
436        let execution_id = engine.create_execution(&workflow).unwrap();
437        engine.start_execution(&execution_id).unwrap();
438        engine.cancel_execution(&execution_id).unwrap();
439
440        let state = engine.get_execution_state(&execution_id).unwrap();
441        assert_eq!(state.status, WorkflowStatus::Cancelled);
442    }
443
444    #[test]
445    fn test_get_active_executions() {
446        let mut engine = WorkflowEngine::new();
447        let workflow = create_test_workflow_with_deps();
448
449        let id1 = engine.create_execution(&workflow).unwrap();
450        let id2 = engine.create_execution(&workflow).unwrap();
451
452        let active = engine.get_active_executions();
453        assert_eq!(active.len(), 2);
454        assert!(active.contains(&id1));
455        assert!(active.contains(&id2));
456    }
457
458    #[test]
459    fn test_remove_execution() {
460        let mut engine = WorkflowEngine::new();
461        let workflow = create_test_workflow_with_deps();
462
463        let execution_id = engine.create_execution(&workflow).unwrap();
464        assert_eq!(engine.active_execution_count(), 1);
465
466        let removed_state = engine.remove_execution(&execution_id).unwrap();
467        assert_eq!(removed_state.workflow_id, "test-workflow");
468        assert_eq!(engine.active_execution_count(), 0);
469    }
470
471    #[test]
472    fn test_wait_for_dependencies() {
473        let workflow = create_test_workflow_with_deps();
474        let mut state = StateManager::create_state(&workflow);
475
476        // step2 depends on step1, which is not completed
477        let result = WorkflowEngine::wait_for_dependencies(&workflow, &state, "step2");
478        assert!(result.is_err());
479
480        // Mark step1 as completed
481        state.completed_steps.push("step1".to_string());
482
483        // Now step2 dependencies are satisfied
484        let result = WorkflowEngine::wait_for_dependencies(&workflow, &state, "step2");
485        assert!(result.is_ok());
486    }
487}