1use crate::error::{WorkflowError, WorkflowResult};
4use crate::models::{Workflow, WorkflowState};
5use crate::state::StateManager;
6use std::collections::{HashMap, HashSet, VecDeque};
7use uuid::Uuid;
8
9pub struct WorkflowEngine {
14 active_workflows: HashMap<String, WorkflowState>,
16}
17
18impl Default for WorkflowEngine {
19 fn default() -> Self {
20 Self::new()
21 }
22}
23
24impl WorkflowEngine {
25 pub fn new() -> Self {
27 WorkflowEngine {
28 active_workflows: HashMap::new(),
29 }
30 }
31
32 pub fn create_execution(&mut self, workflow: &Workflow) -> WorkflowResult<String> {
36 let state = StateManager::create_state(workflow);
37 let execution_id = Uuid::new_v4().to_string();
38 self.active_workflows.insert(execution_id.clone(), state);
39 Ok(execution_id)
40 }
41
42 pub fn start_execution(&mut self, execution_id: &str) -> WorkflowResult<()> {
46 let state = self.active_workflows.get_mut(execution_id).ok_or_else(|| {
47 WorkflowError::NotFound(format!("Execution not found: {}", execution_id))
48 })?;
49
50 StateManager::start_workflow(state);
51 Ok(())
52 }
53
54 pub fn pause_execution(&mut self, execution_id: &str) -> WorkflowResult<()> {
58 let state = self.active_workflows.get_mut(execution_id).ok_or_else(|| {
59 WorkflowError::NotFound(format!("Execution not found: {}", execution_id))
60 })?;
61
62 let _ = StateManager::pause_workflow(state);
63 Ok(())
64 }
65
66 pub fn resume_execution(&mut self, execution_id: &str) -> WorkflowResult<()> {
70 let state = self.active_workflows.get_mut(execution_id).ok_or_else(|| {
71 WorkflowError::NotFound(format!("Execution not found: {}", execution_id))
72 })?;
73
74 let _ = StateManager::resume_workflow(state);
75 Ok(())
76 }
77
78 pub fn cancel_execution(&mut self, execution_id: &str) -> WorkflowResult<()> {
82 let state = self.active_workflows.get_mut(execution_id).ok_or_else(|| {
83 WorkflowError::NotFound(format!("Execution not found: {}", execution_id))
84 })?;
85
86 StateManager::cancel_workflow(state);
87 Ok(())
88 }
89
90 pub fn get_execution_state(&self, execution_id: &str) -> WorkflowResult<WorkflowState> {
92 self.active_workflows
93 .get(execution_id)
94 .cloned()
95 .ok_or_else(|| {
96 WorkflowError::NotFound(format!("Execution not found: {}", execution_id))
97 })
98 }
99
100 pub fn get_execution_order(workflow: &Workflow) -> WorkflowResult<Vec<String>> {
105 Self::resolve_dependencies(workflow)
106 }
107
108 fn resolve_dependencies(workflow: &Workflow) -> WorkflowResult<Vec<String>> {
113 let mut order = Vec::new();
114 let mut completed = HashSet::new();
115 let mut queue = VecDeque::new();
116
117 for step in &workflow.steps {
119 if step.dependencies.is_empty() {
120 queue.push_back(step.id.clone());
121 }
122 }
123
124 let step_map: HashMap<_, _> = workflow.steps.iter().map(|s| (&s.id, s)).collect();
126
127 while let Some(step_id) = queue.pop_front() {
129 if completed.contains(&step_id) {
130 continue;
131 }
132
133 if let Some(step) = step_map.get(&step_id) {
135 let all_deps_completed =
136 step.dependencies.iter().all(|dep| completed.contains(dep));
137
138 if all_deps_completed {
139 order.push(step_id.clone());
140 completed.insert(step_id.clone());
141
142 for other_step in &workflow.steps {
144 if other_step.dependencies.contains(&step_id)
145 && !completed.contains(&other_step.id)
146 {
147 queue.push_back(other_step.id.clone());
148 }
149 }
150 } else {
151 queue.push_back(step_id);
153 }
154 }
155 }
156
157 if order.len() != workflow.steps.len() {
158 return Err(WorkflowError::Invalid(
159 "Could not determine execution order for all steps".to_string(),
160 ));
161 }
162
163 Ok(order)
164 }
165
166 pub fn can_execute_step(
170 workflow: &Workflow,
171 state: &WorkflowState,
172 step_id: &str,
173 ) -> WorkflowResult<bool> {
174 let step = workflow
176 .steps
177 .iter()
178 .find(|s| s.id == step_id)
179 .ok_or_else(|| WorkflowError::NotFound(format!("Step not found: {}", step_id)))?;
180
181 for dep in &step.dependencies {
183 if !state.completed_steps.contains(dep) {
184 return Ok(false);
185 }
186 }
187
188 Ok(true)
189 }
190
191 pub fn get_next_step(
196 workflow: &Workflow,
197 state: &WorkflowState,
198 ) -> WorkflowResult<Option<String>> {
199 for step in &workflow.steps {
200 if !state.completed_steps.contains(&step.id)
201 && !state.step_results.contains_key(&step.id)
202 && Self::can_execute_step(workflow, state, &step.id)?
203 {
204 return Ok(Some(step.id.clone()));
205 }
206 }
207
208 Ok(None)
209 }
210
211 pub fn wait_for_dependencies(
216 workflow: &Workflow,
217 state: &WorkflowState,
218 step_id: &str,
219 ) -> WorkflowResult<()> {
220 let step = workflow
221 .steps
222 .iter()
223 .find(|s| s.id == step_id)
224 .ok_or_else(|| WorkflowError::NotFound(format!("Step not found: {}", step_id)))?;
225
226 for dep in &step.dependencies {
228 if !state.completed_steps.contains(dep) {
229 return Err(WorkflowError::StateError(format!(
230 "Dependency {} not completed for step {}",
231 dep, step_id
232 )));
233 }
234 }
235
236 Ok(())
237 }
238
239 pub fn complete_execution(&mut self, execution_id: &str) -> WorkflowResult<()> {
241 let state = self.active_workflows.get_mut(execution_id).ok_or_else(|| {
242 WorkflowError::NotFound(format!("Execution not found: {}", execution_id))
243 })?;
244
245 StateManager::complete_workflow(state);
246 Ok(())
247 }
248
249 pub fn fail_execution(&mut self, execution_id: &str) -> WorkflowResult<()> {
251 let state = self.active_workflows.get_mut(execution_id).ok_or_else(|| {
252 WorkflowError::NotFound(format!("Execution not found: {}", execution_id))
253 })?;
254
255 StateManager::fail_workflow(state);
256 Ok(())
257 }
258
259 pub fn remove_execution(&mut self, execution_id: &str) -> WorkflowResult<WorkflowState> {
261 self.active_workflows.remove(execution_id).ok_or_else(|| {
262 WorkflowError::NotFound(format!("Execution not found: {}", execution_id))
263 })
264 }
265
266 pub fn get_active_executions(&self) -> Vec<String> {
268 self.active_workflows.keys().cloned().collect()
269 }
270
271 pub fn active_execution_count(&self) -> usize {
273 self.active_workflows.len()
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280 use crate::models::{
281 ErrorAction, RiskFactors, StepType, WorkflowConfig, WorkflowStatus, WorkflowStep,
282 };
283
284 fn create_test_workflow_with_deps() -> Workflow {
285 Workflow {
286 id: "test-workflow".to_string(),
287 name: "Test Workflow".to_string(),
288 description: "A test workflow".to_string(),
289 parameters: vec![],
290 steps: vec![
291 WorkflowStep {
292 id: "step1".to_string(),
293 name: "Step 1".to_string(),
294 step_type: StepType::Agent(crate::models::AgentStep {
295 agent_id: "test-agent".to_string(),
296 task: "test-task".to_string(),
297 }),
298 config: crate::models::StepConfig {
299 config: serde_json::json!({}),
300 },
301 dependencies: vec![],
302 approval_required: false,
303 on_error: ErrorAction::Fail,
304 risk_score: None,
305 risk_factors: RiskFactors::default(),
306 },
307 WorkflowStep {
308 id: "step2".to_string(),
309 name: "Step 2".to_string(),
310 step_type: StepType::Agent(crate::models::AgentStep {
311 agent_id: "test-agent".to_string(),
312 task: "test-task".to_string(),
313 }),
314 config: crate::models::StepConfig {
315 config: serde_json::json!({}),
316 },
317 dependencies: vec!["step1".to_string()],
318 approval_required: false,
319 on_error: ErrorAction::Fail,
320 risk_score: None,
321 risk_factors: RiskFactors::default(),
322 },
323 WorkflowStep {
324 id: "step3".to_string(),
325 name: "Step 3".to_string(),
326 step_type: StepType::Agent(crate::models::AgentStep {
327 agent_id: "test-agent".to_string(),
328 task: "test-task".to_string(),
329 }),
330 config: crate::models::StepConfig {
331 config: serde_json::json!({}),
332 },
333 dependencies: vec!["step1".to_string(), "step2".to_string()],
334 approval_required: false,
335 on_error: ErrorAction::Fail,
336 risk_score: None,
337 risk_factors: RiskFactors::default(),
338 },
339 ],
340 config: WorkflowConfig {
341 timeout_ms: None,
342 max_parallel: None,
343 },
344 }
345 }
346
347 #[test]
348 fn test_create_engine() {
349 let engine = WorkflowEngine::new();
350 assert_eq!(engine.active_execution_count(), 0);
351 }
352
353 #[test]
354 fn test_create_execution() {
355 let mut engine = WorkflowEngine::new();
356 let workflow = create_test_workflow_with_deps();
357
358 let execution_id = engine.create_execution(&workflow).unwrap();
359 assert!(!execution_id.is_empty());
360 assert_eq!(engine.active_execution_count(), 1);
361 }
362
363 #[test]
364 fn test_start_execution() {
365 let mut engine = WorkflowEngine::new();
366 let workflow = create_test_workflow_with_deps();
367
368 let execution_id = engine.create_execution(&workflow).unwrap();
369 engine.start_execution(&execution_id).unwrap();
370
371 let state = engine.get_execution_state(&execution_id).unwrap();
372 assert_eq!(state.status, WorkflowStatus::Running);
373 }
374
375 #[test]
376 fn test_get_execution_order() {
377 let workflow = create_test_workflow_with_deps();
378 let order = WorkflowEngine::get_execution_order(&workflow).unwrap();
379
380 assert_eq!(order.len(), 3);
381 assert_eq!(order[0], "step1");
382 assert_eq!(order[1], "step2");
383 assert_eq!(order[2], "step3");
384 }
385
386 #[test]
387 fn test_can_execute_step() {
388 let workflow = create_test_workflow_with_deps();
389 let state = StateManager::create_state(&workflow);
390
391 assert!(WorkflowEngine::can_execute_step(&workflow, &state, "step1").unwrap());
393
394 assert!(!WorkflowEngine::can_execute_step(&workflow, &state, "step2").unwrap());
396
397 let mut state2 = StateManager::create_state(&workflow);
399 state2.completed_steps.push("step1".to_string());
400
401 assert!(WorkflowEngine::can_execute_step(&workflow, &state2, "step2").unwrap());
403 }
404
405 #[test]
406 fn test_get_next_step() {
407 let workflow = create_test_workflow_with_deps();
408 let state = StateManager::create_state(&workflow);
409
410 let next = WorkflowEngine::get_next_step(&workflow, &state).unwrap();
411 assert_eq!(next, Some("step1".to_string()));
412 }
413
414 #[test]
415 fn test_pause_and_resume_execution() {
416 let mut engine = WorkflowEngine::new();
417 let workflow = create_test_workflow_with_deps();
418
419 let execution_id = engine.create_execution(&workflow).unwrap();
420 engine.start_execution(&execution_id).unwrap();
421
422 engine.pause_execution(&execution_id).unwrap();
423 let state = engine.get_execution_state(&execution_id).unwrap();
424 assert_eq!(state.status, WorkflowStatus::Paused);
425
426 engine.resume_execution(&execution_id).unwrap();
427 let state = engine.get_execution_state(&execution_id).unwrap();
428 assert_eq!(state.status, WorkflowStatus::Running);
429 }
430
431 #[test]
432 fn test_cancel_execution() {
433 let mut engine = WorkflowEngine::new();
434 let workflow = create_test_workflow_with_deps();
435
436 let execution_id = engine.create_execution(&workflow).unwrap();
437 engine.start_execution(&execution_id).unwrap();
438 engine.cancel_execution(&execution_id).unwrap();
439
440 let state = engine.get_execution_state(&execution_id).unwrap();
441 assert_eq!(state.status, WorkflowStatus::Cancelled);
442 }
443
444 #[test]
445 fn test_get_active_executions() {
446 let mut engine = WorkflowEngine::new();
447 let workflow = create_test_workflow_with_deps();
448
449 let id1 = engine.create_execution(&workflow).unwrap();
450 let id2 = engine.create_execution(&workflow).unwrap();
451
452 let active = engine.get_active_executions();
453 assert_eq!(active.len(), 2);
454 assert!(active.contains(&id1));
455 assert!(active.contains(&id2));
456 }
457
458 #[test]
459 fn test_remove_execution() {
460 let mut engine = WorkflowEngine::new();
461 let workflow = create_test_workflow_with_deps();
462
463 let execution_id = engine.create_execution(&workflow).unwrap();
464 assert_eq!(engine.active_execution_count(), 1);
465
466 let removed_state = engine.remove_execution(&execution_id).unwrap();
467 assert_eq!(removed_state.workflow_id, "test-workflow");
468 assert_eq!(engine.active_execution_count(), 0);
469 }
470
471 #[test]
472 fn test_wait_for_dependencies() {
473 let workflow = create_test_workflow_with_deps();
474 let mut state = StateManager::create_state(&workflow);
475
476 let result = WorkflowEngine::wait_for_dependencies(&workflow, &state, "step2");
478 assert!(result.is_err());
479
480 state.completed_steps.push("step1".to_string());
482
483 let result = WorkflowEngine::wait_for_dependencies(&workflow, &state, "step2");
485 assert!(result.is_ok());
486 }
487}