1use crate::agent_executor::AgentExecutor;
4use crate::approval::ApprovalGate;
5use crate::command_executor::CommandExecutor;
6use crate::condition::ConditionEvaluator;
7use crate::error::{WorkflowError, WorkflowResult};
8use crate::models::{StepResult, StepStatus, StepType, Workflow, WorkflowState};
9use crate::parallel_executor::ParallelExecutor;
10use crate::state::StateManager;
11use std::time::Instant;
12
13pub struct StepExecutor;
20
21impl StepExecutor {
22 pub fn execute_step(
27 workflow: &Workflow,
28 state: &mut WorkflowState,
29 step_id: &str,
30 ) -> WorkflowResult<()> {
31 let step = workflow
33 .steps
34 .iter()
35 .find(|s| s.id == step_id)
36 .ok_or_else(|| WorkflowError::NotFound(format!("Step not found: {}", step_id)))?;
37
38 match &step.step_type {
40 StepType::Agent(agent_step) => {
41 AgentExecutor::execute_agent_step(workflow, state, step_id, agent_step)
42 }
43 StepType::Command(command_step) => {
44 CommandExecutor::execute_command_step(workflow, state, step_id, command_step)
45 }
46 StepType::Condition(condition_step) => {
47 StateManager::start_step(state, step_id.to_string());
49 let start_time = Instant::now();
50
51 let next_steps =
52 ConditionEvaluator::evaluate_condition(workflow, state, condition_step)?;
53
54 let duration_ms = start_time.elapsed().as_millis() as u64;
55
56 StateManager::complete_step(
57 state,
58 step_id.to_string(),
59 Some(serde_json::json!({
60 "next_steps": next_steps,
61 "condition": condition_step.condition,
62 })),
63 duration_ms,
64 );
65
66 Ok(())
67 }
68 StepType::Parallel(parallel_step) => {
69 ParallelExecutor::execute_parallel_step(workflow, state, step_id, parallel_step)
70 }
71 StepType::Approval(approval_step) => {
72 StateManager::start_step(state, step_id.to_string());
75 let start_time = Instant::now();
76
77 let duration_ms = start_time.elapsed().as_millis() as u64;
78
79 StateManager::complete_step(
80 state,
81 step_id.to_string(),
82 Some(serde_json::json!({
83 "message": &approval_step.message,
84 "timeout": approval_step.timeout,
85 "default": format!("{:?}", approval_step.default),
86 })),
87 duration_ms,
88 );
89
90 Ok(())
91 }
92 }
93 }
94
95 pub fn execute_workflow(workflow: &Workflow, state: &mut WorkflowState) -> WorkflowResult<()> {
100 let order = crate::engine::WorkflowEngine::get_execution_order(workflow)?;
102
103 for step_id in order {
105 if !crate::engine::WorkflowEngine::can_execute_step(workflow, state, &step_id)? {
107 return Err(WorkflowError::StateError(format!(
108 "Cannot execute step {}: dependencies not met",
109 step_id
110 )));
111 }
112
113 Self::execute_step(workflow, state, &step_id)?;
115 }
116
117 Ok(())
118 }
119
120 pub fn execute_next_step(
125 workflow: &Workflow,
126 state: &mut WorkflowState,
127 ) -> WorkflowResult<Option<String>> {
128 if let Some(step_id) = crate::engine::WorkflowEngine::get_next_step(workflow, state)? {
130 Self::execute_step(workflow, state, &step_id)?;
131 Ok(Some(step_id))
132 } else {
133 Ok(None)
134 }
135 }
136
137 pub fn get_step_context(
144 workflow: &Workflow,
145 state: &WorkflowState,
146 step_id: &str,
147 ) -> WorkflowResult<serde_json::Value> {
148 let step = workflow
149 .steps
150 .iter()
151 .find(|s| s.id == step_id)
152 .ok_or_else(|| WorkflowError::NotFound(format!("Step not found: {}", step_id)))?;
153
154 let mut context = serde_json::json!({
156 "step_id": step_id,
157 "step_name": &step.name,
158 "config": &step.config.config,
159 "dependencies": {}
160 });
161
162 if let Some(obj) = context.get_mut("dependencies") {
164 for dep_id in &step.dependencies {
165 if let Some(result) = state.step_results.get(dep_id) {
166 if let Some(output) = &result.output {
167 if let Some(deps_obj) = obj.as_object_mut() {
168 deps_obj.insert(dep_id.clone(), output.clone());
169 }
170 }
171 }
172 }
173 }
174
175 Ok(context)
176 }
177
178 pub fn fail_step(
180 state: &mut WorkflowState,
181 step_id: &str,
182 error: String,
183 ) -> WorkflowResult<()> {
184 let duration_ms = state
185 .step_results
186 .get(step_id)
187 .map(|r| r.duration_ms)
188 .unwrap_or(0);
189
190 StateManager::fail_step(state, step_id.to_string(), error, duration_ms);
191 Ok(())
192 }
193
194 pub fn skip_step(state: &mut WorkflowState, step_id: &str) -> WorkflowResult<()> {
196 StateManager::skip_step(state, step_id.to_string());
197 Ok(())
198 }
199
200 pub fn get_step_status(state: &WorkflowState, step_id: &str) -> Option<StepStatus> {
202 state.step_results.get(step_id).map(|r| r.status)
203 }
204
205 pub fn is_step_completed(state: &WorkflowState, step_id: &str) -> bool {
207 state
208 .step_results
209 .get(step_id)
210 .map(|r| r.status == StepStatus::Completed)
211 .unwrap_or(false)
212 }
213
214 pub fn is_step_failed(state: &WorkflowState, step_id: &str) -> bool {
216 state
217 .step_results
218 .get(step_id)
219 .map(|r| r.status == StepStatus::Failed)
220 .unwrap_or(false)
221 }
222
223 pub fn get_step_result(state: &WorkflowState, step_id: &str) -> Option<StepResult> {
225 state.step_results.get(step_id).cloned()
226 }
227
228 pub fn execute_condition_step(
233 workflow: &Workflow,
234 state: &mut WorkflowState,
235 step_id: &str,
236 ) -> WorkflowResult<Vec<String>> {
237 let step = workflow
239 .steps
240 .iter()
241 .find(|s| s.id == step_id)
242 .ok_or_else(|| WorkflowError::NotFound(format!("Step not found: {}", step_id)))?;
243
244 let condition_step = match &step.step_type {
246 StepType::Condition(cs) => cs,
247 _ => {
248 return Err(WorkflowError::Invalid(format!(
249 "Step {} is not a condition step",
250 step_id
251 )))
252 }
253 };
254
255 StateManager::start_step(state, step_id.to_string());
257
258 let start_time = Instant::now();
260
261 let next_steps = ConditionEvaluator::evaluate_condition(workflow, state, condition_step)?;
263
264 let duration_ms = start_time.elapsed().as_millis() as u64;
265
266 StateManager::complete_step(
268 state,
269 step_id.to_string(),
270 Some(serde_json::json!({
271 "next_steps": next_steps,
272 "condition": condition_step.condition,
273 })),
274 duration_ms,
275 );
276
277 Ok(next_steps)
278 }
279
280 pub fn get_condition_next_steps(
284 workflow: &Workflow,
285 state: &WorkflowState,
286 step_id: &str,
287 ) -> WorkflowResult<Vec<String>> {
288 let step = workflow
290 .steps
291 .iter()
292 .find(|s| s.id == step_id)
293 .ok_or_else(|| WorkflowError::NotFound(format!("Step not found: {}", step_id)))?;
294
295 let condition_step = match &step.step_type {
297 StepType::Condition(cs) => cs,
298 _ => {
299 return Err(WorkflowError::Invalid(format!(
300 "Step {} is not a condition step",
301 step_id
302 )))
303 }
304 };
305
306 ConditionEvaluator::evaluate_condition(workflow, state, condition_step)
308 }
309
310 pub fn requires_approval(workflow: &Workflow, step_id: &str) -> WorkflowResult<bool> {
314 let step = workflow
315 .steps
316 .iter()
317 .find(|s| s.id == step_id)
318 .ok_or_else(|| WorkflowError::NotFound(format!("Step not found: {}", step_id)))?;
319
320 Ok(step.approval_required)
321 }
322
323 pub fn request_step_approval(
328 approval_gate: &mut ApprovalGate,
329 workflow: &Workflow,
330 step_id: &str,
331 ) -> WorkflowResult<String> {
332 let step = workflow
333 .steps
334 .iter()
335 .find(|s| s.id == step_id)
336 .ok_or_else(|| WorkflowError::NotFound(format!("Step not found: {}", step_id)))?;
337
338 let message = format!("Approval required for step: {}", step.name);
339 let timeout_ms = 3600000; approval_gate.request_approval(step_id.to_string(), message, timeout_ms)
342 }
343
344 pub fn is_step_approved(
349 approval_gate: &ApprovalGate,
350 request_id: &str,
351 ) -> WorkflowResult<bool> {
352 approval_gate.is_approved(request_id)
353 }
354
355 pub fn is_step_rejected(
360 approval_gate: &ApprovalGate,
361 request_id: &str,
362 ) -> WorkflowResult<bool> {
363 approval_gate.is_rejected(request_id)
364 }
365
366 pub fn is_approval_pending(
370 approval_gate: &ApprovalGate,
371 request_id: &str,
372 ) -> WorkflowResult<bool> {
373 approval_gate.is_pending(request_id)
374 }
375}
376
377#[cfg(test)]
378mod tests {
379 use super::*;
380 use crate::models::{
381 AgentStep, ErrorAction, RiskFactors, StepConfig, StepType, WorkflowConfig, WorkflowStep,
382 };
383
384 fn create_simple_workflow() -> Workflow {
385 Workflow {
386 id: "test-workflow".to_string(),
387 name: "Test Workflow".to_string(),
388 description: "A test workflow".to_string(),
389 parameters: vec![],
390 steps: vec![WorkflowStep {
391 id: "step1".to_string(),
392 name: "Step 1".to_string(),
393 step_type: StepType::Agent(AgentStep {
394 agent_id: "test-agent".to_string(),
395 task: "test-task".to_string(),
396 }),
397 config: StepConfig {
398 config: serde_json::json!({"param": "value"}),
399 },
400 dependencies: vec![],
401 approval_required: false,
402 on_error: ErrorAction::Fail,
403 risk_score: None,
404 risk_factors: RiskFactors::default(),
405 }],
406 config: WorkflowConfig {
407 timeout_ms: None,
408 max_parallel: None,
409 },
410 }
411 }
412
413 #[test]
414 fn test_execute_step() {
415 let workflow = create_simple_workflow();
416 let mut state = StateManager::create_state(&workflow);
417
418 let result = StepExecutor::execute_step(&workflow, &mut state, "step1");
419 assert!(result.is_ok());
420
421 assert!(StepExecutor::is_step_completed(&state, "step1"));
422 assert!(state.completed_steps.contains(&"step1".to_string()));
423 }
424
425 #[test]
426 fn test_execute_next_step() {
427 let workflow = create_simple_workflow();
428 let mut state = StateManager::create_state(&workflow);
429
430 let result = StepExecutor::execute_next_step(&workflow, &mut state);
431 assert!(result.is_ok());
432
433 let executed = result.unwrap();
434 assert_eq!(executed, Some("step1".to_string()));
435 assert!(StepExecutor::is_step_completed(&state, "step1"));
436 }
437
438 #[test]
439 fn test_get_step_context() {
440 let workflow = create_simple_workflow();
441 let state = StateManager::create_state(&workflow);
442
443 let context = StepExecutor::get_step_context(&workflow, &state, "step1");
444 assert!(context.is_ok());
445
446 let context = context.unwrap();
447 assert_eq!(context["step_id"], "step1");
448 assert_eq!(context["step_name"], "Step 1");
449 }
450
451 #[test]
452 fn test_fail_step() {
453 let workflow = create_simple_workflow();
454 let mut state = StateManager::create_state(&workflow);
455
456 StateManager::start_step(&mut state, "step1".to_string());
457 let result = StepExecutor::fail_step(&mut state, "step1", "Test error".to_string());
458 assert!(result.is_ok());
459
460 assert!(StepExecutor::is_step_failed(&state, "step1"));
461 }
462
463 #[test]
464 fn test_skip_step() {
465 let workflow = create_simple_workflow();
466 let mut state = StateManager::create_state(&workflow);
467
468 StateManager::start_step(&mut state, "step1".to_string());
470
471 let result = StepExecutor::skip_step(&mut state, "step1");
472 assert!(result.is_ok());
473
474 let status = StepExecutor::get_step_status(&state, "step1");
475 assert_eq!(status, Some(StepStatus::Skipped));
476 }
477
478 #[test]
479 fn test_get_step_result() {
480 let workflow = create_simple_workflow();
481 let mut state = StateManager::create_state(&workflow);
482
483 StateManager::start_step(&mut state, "step1".to_string());
484 StateManager::complete_step(
485 &mut state,
486 "step1".to_string(),
487 Some(serde_json::json!({"result": "success"})),
488 100,
489 );
490
491 let result = StepExecutor::get_step_result(&state, "step1");
492 assert!(result.is_some());
493
494 let result = result.unwrap();
495 assert_eq!(result.status, StepStatus::Completed);
496 assert_eq!(result.duration_ms, 100);
497 }
498
499 #[test]
500 fn test_execute_condition_step_then_branch() {
501 use crate::models::{ConditionStep, StepStatus};
502
503 let workflow = Workflow {
504 id: "test-workflow".to_string(),
505 name: "Test Workflow".to_string(),
506 description: "A test workflow".to_string(),
507 parameters: vec![],
508 steps: vec![
509 WorkflowStep {
510 id: "step1".to_string(),
511 name: "Step 1".to_string(),
512 step_type: StepType::Agent(crate::models::AgentStep {
513 agent_id: "test-agent".to_string(),
514 task: "test-task".to_string(),
515 }),
516 config: crate::models::StepConfig {
517 config: serde_json::json!({}),
518 },
519 dependencies: vec![],
520 approval_required: false,
521 on_error: ErrorAction::Fail,
522 risk_score: None,
523 risk_factors: RiskFactors::default(),
524 },
525 WorkflowStep {
526 id: "condition".to_string(),
527 name: "Condition".to_string(),
528 step_type: StepType::Condition(ConditionStep {
529 condition: "step1.output.count > 5".to_string(),
530 then_steps: vec!["step2".to_string()],
531 else_steps: vec!["step3".to_string()],
532 }),
533 config: crate::models::StepConfig {
534 config: serde_json::json!({}),
535 },
536 dependencies: vec!["step1".to_string()],
537 approval_required: false,
538 on_error: ErrorAction::Fail,
539 risk_score: None,
540 risk_factors: RiskFactors::default(),
541 },
542 ],
543 config: crate::models::WorkflowConfig {
544 timeout_ms: None,
545 max_parallel: None,
546 },
547 };
548
549 let mut state = StateManager::create_state(&workflow);
550
551 state.step_results.insert(
553 "step1".to_string(),
554 StepResult {
555 status: StepStatus::Completed,
556 output: Some(serde_json::json!({"count": 10})),
557 error: None,
558 duration_ms: 100,
559 },
560 );
561 state.completed_steps.push("step1".to_string());
562
563 let result = StepExecutor::execute_condition_step(&workflow, &mut state, "condition");
564 assert!(result.is_ok());
565
566 let next_steps = result.unwrap();
567 assert_eq!(next_steps, vec!["step2".to_string()]);
568 assert!(StepExecutor::is_step_completed(&state, "condition"));
569 }
570
571 #[test]
572 fn test_execute_condition_step_else_branch() {
573 use crate::models::{ConditionStep, StepStatus};
574
575 let workflow = Workflow {
576 id: "test-workflow".to_string(),
577 name: "Test Workflow".to_string(),
578 description: "A test workflow".to_string(),
579 parameters: vec![],
580 steps: vec![
581 WorkflowStep {
582 id: "step1".to_string(),
583 name: "Step 1".to_string(),
584 step_type: StepType::Agent(crate::models::AgentStep {
585 agent_id: "test-agent".to_string(),
586 task: "test-task".to_string(),
587 }),
588 config: crate::models::StepConfig {
589 config: serde_json::json!({}),
590 },
591 dependencies: vec![],
592 approval_required: false,
593 on_error: ErrorAction::Fail,
594 risk_score: None,
595 risk_factors: RiskFactors::default(),
596 },
597 WorkflowStep {
598 id: "condition".to_string(),
599 name: "Condition".to_string(),
600 step_type: StepType::Condition(ConditionStep {
601 condition: "step1.output.count > 5".to_string(),
602 then_steps: vec!["step2".to_string()],
603 else_steps: vec!["step3".to_string()],
604 }),
605 config: crate::models::StepConfig {
606 config: serde_json::json!({}),
607 },
608 dependencies: vec!["step1".to_string()],
609 approval_required: false,
610 on_error: ErrorAction::Fail,
611 risk_score: None,
612 risk_factors: RiskFactors::default(),
613 },
614 ],
615 config: crate::models::WorkflowConfig {
616 timeout_ms: None,
617 max_parallel: None,
618 },
619 };
620
621 let mut state = StateManager::create_state(&workflow);
622
623 state.step_results.insert(
625 "step1".to_string(),
626 StepResult {
627 status: StepStatus::Completed,
628 output: Some(serde_json::json!({"count": 3})),
629 error: None,
630 duration_ms: 100,
631 },
632 );
633 state.completed_steps.push("step1".to_string());
634
635 let result = StepExecutor::execute_condition_step(&workflow, &mut state, "condition");
636 assert!(result.is_ok());
637
638 let next_steps = result.unwrap();
639 assert_eq!(next_steps, vec!["step3".to_string()]);
640 assert!(StepExecutor::is_step_completed(&state, "condition"));
641 }
642
643 #[test]
644 fn test_get_condition_next_steps() {
645 use crate::models::{ConditionStep, StepStatus};
646
647 let workflow = Workflow {
648 id: "test-workflow".to_string(),
649 name: "Test Workflow".to_string(),
650 description: "A test workflow".to_string(),
651 parameters: vec![],
652 steps: vec![
653 WorkflowStep {
654 id: "step1".to_string(),
655 name: "Step 1".to_string(),
656 step_type: StepType::Agent(crate::models::AgentStep {
657 agent_id: "test-agent".to_string(),
658 task: "test-task".to_string(),
659 }),
660 config: crate::models::StepConfig {
661 config: serde_json::json!({}),
662 },
663 dependencies: vec![],
664 approval_required: false,
665 on_error: ErrorAction::Fail,
666 risk_score: None,
667 risk_factors: RiskFactors::default(),
668 },
669 WorkflowStep {
670 id: "condition".to_string(),
671 name: "Condition".to_string(),
672 step_type: StepType::Condition(ConditionStep {
673 condition: "step1.output.status == 'success'".to_string(),
674 then_steps: vec!["step2".to_string()],
675 else_steps: vec!["step3".to_string()],
676 }),
677 config: crate::models::StepConfig {
678 config: serde_json::json!({}),
679 },
680 dependencies: vec!["step1".to_string()],
681 approval_required: false,
682 on_error: ErrorAction::Fail,
683 risk_score: None,
684 risk_factors: RiskFactors::default(),
685 },
686 ],
687 config: crate::models::WorkflowConfig {
688 timeout_ms: None,
689 max_parallel: None,
690 },
691 };
692
693 let mut state = StateManager::create_state(&workflow);
694
695 state.step_results.insert(
697 "step1".to_string(),
698 StepResult {
699 status: StepStatus::Completed,
700 output: Some(serde_json::json!({"status": "success"})),
701 error: None,
702 duration_ms: 100,
703 },
704 );
705
706 let result = StepExecutor::get_condition_next_steps(&workflow, &state, "condition");
707 assert!(result.is_ok());
708
709 let next_steps = result.unwrap();
710 assert_eq!(next_steps, vec!["step2".to_string()]);
711 }
712
713 #[test]
714 fn test_requires_approval() {
715 let workflow = Workflow {
716 id: "test-workflow".to_string(),
717 name: "Test Workflow".to_string(),
718 description: "A test workflow".to_string(),
719 parameters: vec![],
720 steps: vec![
721 WorkflowStep {
722 id: "step1".to_string(),
723 name: "Step 1".to_string(),
724 step_type: StepType::Agent(crate::models::AgentStep {
725 agent_id: "test-agent".to_string(),
726 task: "test-task".to_string(),
727 }),
728 config: crate::models::StepConfig {
729 config: serde_json::json!({}),
730 },
731 dependencies: vec![],
732 approval_required: true,
733 on_error: ErrorAction::Fail,
734 risk_score: None,
735 risk_factors: RiskFactors::default(),
736 },
737 WorkflowStep {
738 id: "step2".to_string(),
739 name: "Step 2".to_string(),
740 step_type: StepType::Agent(crate::models::AgentStep {
741 agent_id: "test-agent".to_string(),
742 task: "test-task".to_string(),
743 }),
744 config: crate::models::StepConfig {
745 config: serde_json::json!({}),
746 },
747 dependencies: vec![],
748 approval_required: false,
749 on_error: ErrorAction::Fail,
750 risk_score: None,
751 risk_factors: RiskFactors::default(),
752 },
753 ],
754 config: crate::models::WorkflowConfig {
755 timeout_ms: None,
756 max_parallel: None,
757 },
758 };
759
760 assert!(StepExecutor::requires_approval(&workflow, "step1").unwrap());
761 assert!(!StepExecutor::requires_approval(&workflow, "step2").unwrap());
762 }
763
764 #[test]
765 fn test_request_step_approval() {
766 use crate::approval::ApprovalGate;
767
768 let workflow = create_simple_workflow();
769 let mut approval_gate = ApprovalGate::new();
770
771 let request_id =
772 StepExecutor::request_step_approval(&mut approval_gate, &workflow, "step1").unwrap();
773
774 assert!(!request_id.is_empty());
775 assert!(StepExecutor::is_approval_pending(&approval_gate, &request_id).unwrap());
776 }
777
778 #[test]
779 fn test_is_step_approved() {
780 use crate::approval::ApprovalGate;
781
782 let workflow = create_simple_workflow();
783 let mut approval_gate = ApprovalGate::new();
784
785 let request_id =
786 StepExecutor::request_step_approval(&mut approval_gate, &workflow, "step1").unwrap();
787
788 assert!(!StepExecutor::is_step_approved(&approval_gate, &request_id).unwrap());
790
791 approval_gate.approve(&request_id, None).unwrap();
793
794 assert!(StepExecutor::is_step_approved(&approval_gate, &request_id).unwrap());
796 }
797
798 #[test]
799 fn test_is_step_rejected() {
800 use crate::approval::ApprovalGate;
801
802 let workflow = create_simple_workflow();
803 let mut approval_gate = ApprovalGate::new();
804
805 let request_id =
806 StepExecutor::request_step_approval(&mut approval_gate, &workflow, "step1").unwrap();
807
808 assert!(!StepExecutor::is_step_rejected(&approval_gate, &request_id).unwrap());
810
811 approval_gate.reject(&request_id, None).unwrap();
813
814 assert!(StepExecutor::is_step_rejected(&approval_gate, &request_id).unwrap());
816 }
817}