1use crate::audit::AuditLog;
7use crate::workflow::checkpoint::{
8 can_proceed, requires_rollback, validate_checkpoint, validate_workflow_consistency,
9 ValidationCheckpoint, ValidationResult, WorkflowCheckpoint, WorkflowCheckpointService,
10};
11use crate::workflow::dag::Workflow;
12use crate::workflow::deadlock::{DeadlockDetector, DeadlockError, DeadlockWarning};
13use crate::workflow::rollback::{CompensationRegistry, RollbackEngine, RollbackReport, RollbackStrategy, ToolCompensation};
14use crate::workflow::state::TaskStatus;
15use crate::workflow::task::{CompensationAction, TaskContext, TaskId, TaskResult};
16use crate::workflow::timeout::{TaskTimeout, TimeoutConfig, TimeoutError, WorkflowTimeout};
17use crate::workflow::tools::ToolRegistry;
18use chrono::Utc;
19use std::collections::HashSet;
20use std::sync::Arc;
21
22#[derive(Clone, Debug)]
26pub struct WorkflowResult {
27 pub success: bool,
29 pub completed_tasks: Vec<TaskId>,
31 pub failed_tasks: Vec<TaskId>,
33 pub error: Option<String>,
35 pub rollback_report: Option<RollbackReport>,
37}
38
39impl WorkflowResult {
40 fn new(completed_tasks: Vec<TaskId>) -> Self {
42 Self {
43 success: true,
44 completed_tasks,
45 failed_tasks: Vec::new(),
46 error: None,
47 rollback_report: None,
48 }
49 }
50
51 fn new_failed(completed_tasks: Vec<TaskId>, failed_task: TaskId, error: String) -> Self {
53 Self {
54 success: false,
55 completed_tasks,
56 failed_tasks: vec![failed_task],
57 error: Some(error),
58 rollback_report: None,
59 }
60 }
61
62 fn new_failed_with_rollback(
64 completed_tasks: Vec<TaskId>,
65 failed_task: TaskId,
66 error: String,
67 rollback_report: RollbackReport,
68 ) -> Self {
69 Self {
70 success: false,
71 completed_tasks,
72 failed_tasks: vec![failed_task],
73 error: Some(error),
74 rollback_report: Some(rollback_report),
75 }
76 }
77}
78
79pub struct WorkflowExecutor {
95 pub(in crate::workflow) workflow: Workflow,
97 pub(in crate::workflow) audit_log: AuditLog,
99 pub(in crate::workflow) completed_tasks: HashSet<TaskId>,
101 pub(in crate::workflow) failed_tasks: Vec<TaskId>,
103 rollback_engine: RollbackEngine,
105 rollback_strategy: RollbackStrategy,
107 pub(in crate::workflow) compensation_registry: CompensationRegistry,
109 pub(in crate::workflow) checkpoint_service: Option<WorkflowCheckpointService>,
111 pub(in crate::workflow) checkpoint_sequence: u64,
113 pub(in crate::workflow) validation_config: Option<ValidationCheckpoint>,
115 cancellation_source: Option<crate::workflow::cancellation::CancellationTokenSource>,
117 pub(in crate::workflow) timeout_config: Option<TimeoutConfig>,
119 pub(in crate::workflow) tool_registry: Option<Arc<ToolRegistry>>,
121 pub(in crate::workflow) deadlock_timeout: Option<std::time::Duration>,
123}
124
125impl WorkflowExecutor {
126 pub fn new(workflow: Workflow) -> Self {
139 Self {
140 workflow,
141 audit_log: AuditLog::new(),
142 completed_tasks: HashSet::new(),
143 failed_tasks: Vec::new(),
144 rollback_engine: RollbackEngine::new(),
145 rollback_strategy: RollbackStrategy::AllDependent,
146 compensation_registry: CompensationRegistry::new(),
147 checkpoint_service: None,
148 checkpoint_sequence: 0,
149 validation_config: None,
150 cancellation_source: None,
151 timeout_config: None,
152 tool_registry: None,
153 deadlock_timeout: Some(std::time::Duration::from_secs(300)), }
155 }
156
157 pub fn with_rollback_strategy(mut self, strategy: RollbackStrategy) -> Self {
174 self.rollback_strategy = strategy;
175 self
176 }
177
178 pub fn with_checkpoint_service(mut self, service: WorkflowCheckpointService) -> Self {
195 self.checkpoint_service = Some(service);
196 self
197 }
198
199 pub fn with_validation_config(mut self, config: ValidationCheckpoint) -> Self {
216 self.validation_config = Some(config);
217 self
218 }
219
220 pub fn with_cancellation_source(
240 mut self,
241 source: crate::workflow::cancellation::CancellationTokenSource,
242 ) -> Self {
243 self.cancellation_source = Some(source);
244 self
245 }
246
247 pub fn cancellation_token(&self) -> Option<crate::workflow::cancellation::CancellationToken> {
266 self.cancellation_source.as_ref().map(|source| source.token())
267 }
268
269 pub fn cancel(&self) {
290 if let Some(source) = &self.cancellation_source {
291 source.cancel();
292 }
293 }
294
295 pub fn with_timeout_config(mut self, config: TimeoutConfig) -> Self {
314 self.timeout_config = Some(config);
315 self
316 }
317
318 pub fn with_tool_registry(mut self, registry: ToolRegistry) -> Self {
338 self.tool_registry = Some(Arc::new(registry));
339 self
340 }
341
342 pub fn with_deadlock_timeout(mut self, timeout: std::time::Duration) -> Self {
361 self.deadlock_timeout = Some(timeout);
362 self
363 }
364
365 pub fn without_deadlock_timeout(mut self) -> Self {
380 self.deadlock_timeout = None;
381 self
382 }
383
384 pub fn tool_registry(&self) -> Option<&Arc<ToolRegistry>> {
399 self.tool_registry.as_ref()
400 }
401
402 pub fn timeout_config(&self) -> Option<&TimeoutConfig> {
422 self.timeout_config.as_ref()
423 }
424
425 pub fn register_compensation(&mut self, task_id: TaskId, compensation: ToolCompensation) {
444 self.compensation_registry.register(task_id, compensation);
445 }
446
447 pub fn register_file_compensation(&mut self, task_id: TaskId, file_path: impl Into<String>) {
465 self.compensation_registry.register_file_creation(task_id, file_path);
466 }
467
468 pub fn validate_compensation_coverage(&self) -> crate::workflow::rollback::CompensationReport {
486 let task_ids = self.workflow.task_ids();
487 let report = self.compensation_registry.validate_coverage(&task_ids);
488
489 if report.coverage_percentage < 1.0 {
491 let missing = &report.tasks_without_compensation;
492 if !missing.is_empty() {
493 eprintln!(
494 "Warning: {} tasks lack compensation: {:?}",
495 missing.len(),
496 missing
497 );
498 }
499 }
500
501 report
502 }
503
504 pub async fn execute(&mut self) -> Result<WorkflowResult, crate::workflow::WorkflowError> {
525 let workflow_id = self.audit_log.tx_id().to_string();
527 self.record_workflow_started(&workflow_id).await;
528
529 let execution_order = self.workflow.execution_order()?;
531
532 for (position, task_id) in execution_order.iter().enumerate() {
534 if let Some(token) = self.cancellation_token() {
536 if token.is_cancelled() {
537 self.record_workflow_cancelled(&workflow_id).await;
539
540 let completed: Vec<TaskId> = self.completed_tasks.iter().cloned().collect();
542 return Ok(WorkflowResult {
543 success: false,
544 completed_tasks: completed,
545 failed_tasks: Vec::new(),
546 error: Some("Workflow cancelled".to_string()),
547 rollback_report: None,
548 });
549 }
550 }
551
552 let task_result = match self.execute_task(&workflow_id, task_id).await {
554 Ok(result) => result,
555 Err(e) => {
556 let completed: Vec<TaskId> = self.completed_tasks.iter().cloned().collect();
558
559 let rollback_set = self
561 .rollback_engine
562 .find_rollback_set(&self.workflow, task_id, self.rollback_strategy)
563 .map_err(|err| {
564 crate::workflow::WorkflowError::TaskNotFound(task_id.clone())
565 })?;
566
567 let rollback_report = self
569 .rollback_engine
570 .execute_rollback(
571 &self.workflow,
572 rollback_set,
573 &workflow_id,
574 &mut self.audit_log,
575 &self.compensation_registry,
576 )
577 .await
578 .map_err(|_err| {
579 crate::workflow::dag::WorkflowError::TaskNotFound(task_id.clone())
580 })?;
581
582 self.record_workflow_failed(&workflow_id, task_id, &e.to_string())
584 .await;
585
586 return Ok(WorkflowResult::new_failed_with_rollback(
587 completed,
588 task_id.clone(),
589 e.to_string(),
590 rollback_report,
591 ));
592 }
593 };
594
595 if let Some(validation_config) = &self.validation_config {
597 let node_idx = self
599 .workflow
600 .task_map
601 .get(task_id)
602 .ok_or_else(|| crate::workflow::WorkflowError::TaskNotFound(task_id.clone()))?;
603 let task_node = self
604 .workflow
605 .graph
606 .node_weight(*node_idx)
607 .ok_or_else(|| crate::workflow::WorkflowError::TaskFailed(
608 "Node index exists but node not found in graph".to_string(),
609 ))?;
610 let task_name = task_node.name.clone();
611
612 let validation = validate_checkpoint(&task_result, validation_config);
613
614 let _ = self
616 .audit_log
617 .record(crate::audit::AuditEvent::WorkflowTaskCompleted {
618 timestamp: Utc::now(),
619 workflow_id: workflow_id.to_string(),
620 task_id: task_id.to_string(),
621 task_name: task_name.clone(),
622 result: format!("Validation: {:?}", validation.status),
623 })
624 .await;
625
626 if !can_proceed(&validation) {
628 let completed: Vec<TaskId> = self.completed_tasks.iter().cloned().collect();
629
630 if requires_rollback(&validation) {
632 let rollback_set = self
633 .rollback_engine
634 .find_rollback_set(&self.workflow, task_id, self.rollback_strategy)
635 .map_err(|err| {
636 crate::workflow::WorkflowError::TaskNotFound(task_id.clone())
637 })?;
638
639 let rollback_report = self
640 .rollback_engine
641 .execute_rollback(
642 &self.workflow,
643 rollback_set,
644 &workflow_id,
645 &mut self.audit_log,
646 &self.compensation_registry,
647 )
648 .await
649 .map_err(|_err| {
650 crate::workflow::dag::WorkflowError::TaskNotFound(task_id.clone())
651 })?;
652
653 return Ok(WorkflowResult::new_failed_with_rollback(
654 completed,
655 task_id.clone(),
656 validation.message,
657 rollback_report,
658 ));
659 } else {
660 return Ok(WorkflowResult::new_failed(
662 completed,
663 task_id.clone(),
664 validation.message,
665 ));
666 }
667 }
668
669 if matches!(validation.status, crate::workflow::checkpoint::ValidationStatus::Warning) {
671 eprintln!("Warning: {} - {}", task_id, validation.message);
672 }
673 }
674
675 self.create_checkpoint(&workflow_id, position).await;
677 }
678
679 self.record_workflow_completed(&workflow_id).await;
681
682 let completed: Vec<TaskId> = self.completed_tasks.iter().cloned().collect();
683 Ok(WorkflowResult::new(completed))
684 }
685
686 pub async fn execute_with_validations(&mut self) -> Result<WorkflowResult, crate::workflow::WorkflowError> {
704 if self.validation_config.is_none() {
706 self.validation_config = Some(ValidationCheckpoint::default());
707 }
708
709 self.execute().await
711 }
712
713 pub async fn execute_with_timeout(&mut self) -> Result<WorkflowResult, crate::workflow::WorkflowError> {
739 if let Some(config) = &self.timeout_config {
741 if let Some(workflow_timeout) = config.workflow_timeout {
742 let duration = workflow_timeout.duration();
743
744 match tokio::time::timeout(duration, self.execute()).await {
746 Ok(result) => result,
747 Err(_) => {
748 let workflow_id = self.audit_log.tx_id().to_string();
750 self.record_workflow_timeout(&workflow_id, duration.as_secs())
751 .await;
752
753 Err(crate::workflow::WorkflowError::Timeout(
755 TimeoutError::WorkflowTimeout { timeout: duration },
756 ))
757 }
758 }
759 } else {
760 self.execute().await
762 }
763 } else {
764 self.execute().await
766 }
767 }
768
769 pub async fn execute_parallel(&mut self) -> Result<WorkflowResult, crate::workflow::WorkflowError> {
790 use tokio::task::JoinSet;
791 use crate::workflow::state::{ConcurrentState, WorkflowState, TaskSummary};
792
793 let workflow_id = self.audit_log.tx_id().to_string();
795 self.record_workflow_started(&workflow_id).await;
796
797 self.check_for_deadlocks_before_execution(&workflow_id).await?;
799
800 let initial_state = WorkflowState::new(&workflow_id)
803 .with_status(crate::workflow::state::WorkflowStatus::Running);
804 let concurrent_state = std::sync::Arc::new(ConcurrentState::new(initial_state));
805
806 let execution_layers = self.workflow.execution_layers()?;
808
809 for (layer_index, layer) in execution_layers.iter().enumerate() {
811 if let Some(token) = self.cancellation_token() {
813 if token.is_cancelled() {
814 self.record_workflow_cancelled(&workflow_id).await;
815 let completed: Vec<TaskId> = self.completed_tasks.iter().cloned().collect();
816 return Ok(WorkflowResult {
817 success: false,
818 completed_tasks: completed,
819 failed_tasks: Vec::new(),
820 error: Some("Workflow cancelled".to_string()),
821 rollback_report: None,
822 });
823 }
824 }
825
826 let _ = self
828 .audit_log
829 .record(crate::audit::AuditEvent::WorkflowTaskParallelStarted {
830 timestamp: Utc::now(),
831 workflow_id: workflow_id.to_string(),
832 layer_index,
833 task_count: layer.len(),
834 })
835 .await;
836
837 let mut set: JoinSet<Result<(TaskId, String), crate::workflow::WorkflowError>> = JoinSet::new();
839
840 for task_id in layer {
842 let node_idx = self
843 .workflow
844 .task_map
845 .get(task_id)
846 .ok_or_else(|| crate::workflow::WorkflowError::TaskNotFound(task_id.clone()))?;
847
848 let task_node = self
849 .workflow
850 .graph
851 .node_weight(*node_idx)
852 .ok_or_else(|| crate::workflow::WorkflowError::TaskFailed(
853 "Node index exists but node not found in graph".to_string(),
854 ))?;
855
856 let task_arc = std::sync::Arc::clone(task_node.task());
858
859 let task_id_clone = task_id.clone();
860 let task_name = task_node.name.clone();
861 let workflow_id_clone = workflow_id.clone();
862 let cancellation_token = self.cancellation_token();
863 let timeout_config = self.timeout_config.clone();
864 let tool_registry = self.tool_registry.clone();
865 let audit_log = self.audit_log.clone();
866 let concurrent_state_clone = std::sync::Arc::clone(&concurrent_state);
867
868 set.spawn(async move {
870 let mut task_audit_log = audit_log.clone();
872 let _ = task_audit_log
873 .record(crate::audit::AuditEvent::WorkflowTaskStarted {
874 timestamp: Utc::now(),
875 workflow_id: workflow_id_clone.clone(),
876 task_id: task_id_clone.to_string(),
877 task_name: task_name.clone(),
878 })
879 .await;
880
881 {
884 let _state_reader = concurrent_state_clone.read();
885 }
887
888 let mut context = if let Some(token) = cancellation_token {
890 TaskContext::new(&workflow_id_clone, task_id_clone.clone()).with_cancellation_token(token)
891 } else {
892 TaskContext::new(&workflow_id_clone, task_id_clone.clone())
893 };
894
895 if let Some(config) = &timeout_config {
897 if let Some(task_timeout) = config.task_timeout {
898 context = context.with_task_timeout(task_timeout.duration());
899 }
900 }
901
902 if let Some(ref registry) = tool_registry {
904 context = context.with_tool_registry(Arc::clone(registry));
905 }
906
907 context = context.with_audit_log(task_audit_log.clone());
909
910 let result = task_arc
912 .execute(&context)
913 .await
914 .map_err(|e| crate::workflow::WorkflowError::TaskFailed(e.to_string()));
915
916 match result {
917 Ok(_) => Ok((task_id_clone, task_name)),
918 Err(e) => Err(e),
919 }
920 });
921 }
922
923 let (layer_succeeded, failed_task, error_message): (bool, Option<TaskId>, Option<String>) = if let Some(timeout) = self.deadlock_timeout {
926 let layer_result = tokio::time::timeout(timeout, async {
928 let mut layer_succeeded = true;
929 let mut failed_task: Option<TaskId> = None;
930 let mut error_message: Option<String> = None;
931
932 while let Some(result) = set.join_next().await {
933 match result {
934 Ok(Ok((task_id, task_name))) => {
935 self.completed_tasks.insert(task_id.clone());
938
939 if let Ok(mut state) = concurrent_state.write() {
941 state.completed_tasks.push(TaskSummary::new(
942 task_id.as_str(),
943 &task_name,
944 TaskStatus::Completed,
945 ));
946 }
947
948 self.record_task_completed(&workflow_id, &task_id, &task_name).await;
950 }
951 Ok(Err(_e)) => {
952 layer_succeeded = false;
954 error_message = Some("Task execution failed".to_string());
955 }
956 Err(_e) => {
957 layer_succeeded = false;
959 error_message = Some("Task panicked".to_string());
960 }
961 }
962 }
963
964 (layer_succeeded, failed_task, error_message)
965 })
966 .await;
967
968 match layer_result {
969 Ok(result) => result,
970 Err(_) => {
971 let timeout_secs = timeout.as_secs();
973 self.record_deadlock_timeout(&workflow_id, layer_index, timeout_secs).await;
974
975 return Err(DeadlockError::ResourceDeadlock(format!(
976 "Layer {} exceeded deadlock timeout of {} seconds",
977 layer_index, timeout_secs
978 )).into());
979 }
980 }
981 } else {
982 let mut layer_succeeded = true;
984 let mut failed_task: Option<TaskId> = None;
985 let mut error_message: Option<String> = None;
986
987 while let Some(result) = set.join_next().await {
988 match result {
989 Ok(Ok((task_id, task_name))) => {
990 self.completed_tasks.insert(task_id.clone());
992
993 if let Ok(mut state) = concurrent_state.write() {
994 state.completed_tasks.push(TaskSummary::new(
995 task_id.as_str(),
996 &task_name,
997 TaskStatus::Completed,
998 ));
999 }
1000
1001 self.record_task_completed(&workflow_id, &task_id, &task_name).await;
1002 }
1003 Ok(Err(_e)) => {
1004 layer_succeeded = false;
1005 error_message = Some("Task execution failed".to_string());
1006 }
1007 Err(_e) => {
1008 layer_succeeded = false;
1009 error_message = Some("Task panicked".to_string());
1010 }
1011 }
1012 }
1013
1014 (layer_succeeded, failed_task, error_message)
1015 };
1016
1017 let _ = self
1019 .audit_log
1020 .record(crate::audit::AuditEvent::WorkflowTaskParallelCompleted {
1021 timestamp: Utc::now(),
1022 workflow_id: workflow_id.to_string(),
1023 layer_index,
1024 task_count: layer.len(),
1025 })
1026 .await;
1027
1028 if !layer_succeeded {
1030 let completed: Vec<TaskId> = self.completed_tasks.iter().cloned().collect();
1031
1032 let failed_id = failed_task.unwrap_or_else(|| {
1035 layer.first().cloned().unwrap_or_else(|| TaskId::new("unknown"))
1036 });
1037
1038 let rollback_set = self
1039 .rollback_engine
1040 .find_rollback_set(&self.workflow, &failed_id, self.rollback_strategy)
1041 .map_err(|err| {
1042 crate::workflow::WorkflowError::TaskNotFound(failed_id.clone())
1043 })?;
1044
1045 let rollback_report = self
1046 .rollback_engine
1047 .execute_rollback(
1048 &self.workflow,
1049 rollback_set,
1050 &workflow_id,
1051 &mut self.audit_log,
1052 &self.compensation_registry,
1053 )
1054 .await
1055 .map_err(|_err| {
1056 crate::workflow::dag::WorkflowError::TaskNotFound(failed_id.clone())
1057 })?;
1058
1059 let error_msg = error_message.clone().unwrap_or_else(|| "Layer execution failed".to_string());
1060 self.record_workflow_failed(&workflow_id, &failed_id, &error_msg)
1061 .await;
1062
1063 return Ok(WorkflowResult::new_failed_with_rollback(
1064 completed,
1065 failed_id,
1066 error_msg,
1067 rollback_report,
1068 ));
1069 }
1070
1071 self.create_checkpoint(&workflow_id, layer_index).await;
1073 }
1074
1075 if let Ok(mut state) = concurrent_state.write() {
1077 state.status = crate::workflow::state::WorkflowStatus::Completed;
1078 }
1079
1080 self.record_workflow_completed(&workflow_id).await;
1081
1082 let completed: Vec<TaskId> = self.completed_tasks.iter().cloned().collect();
1083 Ok(WorkflowResult::new(completed))
1084 }
1085
1086 async fn execute_task(
1088 &mut self,
1089 workflow_id: &str,
1090 task_id: &TaskId,
1091 ) -> Result<TaskResult, crate::workflow::WorkflowError> {
1092 let node_idx = self
1094 .workflow
1095 .task_map
1096 .get(task_id)
1097 .ok_or_else(|| crate::workflow::WorkflowError::TaskNotFound(task_id.clone()))?;
1098
1099 let task_node = self
1100 .workflow
1101 .graph
1102 .node_weight(*node_idx)
1103 .ok_or_else(|| crate::workflow::WorkflowError::TaskFailed(
1104 "Node index exists but node not found in graph".to_string(),
1105 ))?;
1106
1107 let task_arc = std::sync::Arc::clone(task_node.task());
1109
1110 let task_name = task_node.name.clone();
1112
1113 self.record_task_started(workflow_id, task_id, &task_name)
1115 .await;
1116
1117 let mut context = if let Some(token) = self.cancellation_token() {
1119 TaskContext::new(workflow_id, task_id.clone()).with_cancellation_token(token)
1120 } else {
1121 TaskContext::new(workflow_id, task_id.clone())
1122 };
1123
1124 if let Some(config) = &self.timeout_config {
1126 if let Some(task_timeout) = config.task_timeout {
1127 context = context.with_task_timeout(task_timeout.duration());
1128 }
1129 }
1130
1131 if let Some(ref registry) = self.tool_registry {
1133 context = context.with_tool_registry(Arc::clone(registry));
1134 }
1135
1136 context = context.with_audit_log(self.audit_log.clone());
1138
1139 let execution_result = if let Some(timeout_duration) = context.task_timeout {
1141 match tokio::time::timeout(timeout_duration, self.do_execute_task(&task_arc, &context)).await {
1143 Ok(result) => result,
1144 Err(_) => {
1145 self.record_task_timeout(workflow_id, task_id, &task_name, timeout_duration.as_secs())
1147 .await;
1148
1149 return Err(crate::workflow::WorkflowError::Timeout(
1151 TimeoutError::TaskTimeout {
1152 task_id: task_id.to_string(),
1153 timeout: timeout_duration,
1154 },
1155 ));
1156 }
1157 }
1158 } else {
1159 self.do_execute_task(&task_arc, &context).await
1161 };
1162
1163 match execution_result {
1165 Ok(result) => {
1166 self.completed_tasks.insert(task_id.clone());
1167 self.record_task_completed(workflow_id, task_id, &task_name)
1168 .await;
1169 Ok(result)
1170 }
1171 Err(e) => Err(e),
1172 }
1173 }
1174
1175 async fn do_execute_task(
1179 &mut self,
1180 task: &std::sync::Arc<dyn crate::workflow::WorkflowTask>,
1181 context: &TaskContext,
1182 ) -> Result<TaskResult, crate::workflow::WorkflowError> {
1183 let result = task
1185 .execute(context)
1186 .await
1187 .map_err(|e| crate::workflow::WorkflowError::TaskFailed(e.to_string()))?;
1188
1189 match result {
1191 TaskResult::WithCompensation {
1192 result,
1193 compensation,
1194 } => {
1195 let task_id = task.id();
1197 let tool_comp: ToolCompensation = compensation.into();
1198 self.compensation_registry.register(task_id, tool_comp);
1199
1200 Ok(*result)
1202 }
1203 other => Ok(other),
1204 }
1205 }
1206
1207 async fn record_workflow_started(&mut self, workflow_id: &str) {
1209 let _ = self
1210 .audit_log
1211 .record(crate::audit::AuditEvent::WorkflowStarted {
1212 timestamp: Utc::now(),
1213 workflow_id: workflow_id.to_string(),
1214 task_count: self.workflow.task_count(),
1215 })
1216 .await;
1217 }
1218
1219 async fn record_task_started(&mut self, workflow_id: &str, task_id: &TaskId, task_name: &str) {
1221 let _ = self
1222 .audit_log
1223 .record(crate::audit::AuditEvent::WorkflowTaskStarted {
1224 timestamp: Utc::now(),
1225 workflow_id: workflow_id.to_string(),
1226 task_id: task_id.to_string(),
1227 task_name: task_name.to_string(),
1228 })
1229 .await;
1230 }
1231
1232 async fn record_task_completed(&mut self, workflow_id: &str, task_id: &TaskId, task_name: &str) {
1234 let _ = self
1235 .audit_log
1236 .record(crate::audit::AuditEvent::WorkflowTaskCompleted {
1237 timestamp: Utc::now(),
1238 workflow_id: workflow_id.to_string(),
1239 task_id: task_id.to_string(),
1240 task_name: task_name.to_string(),
1241 result: "Success".to_string(),
1242 })
1243 .await;
1244 }
1245
1246 async fn record_task_failed(&mut self, workflow_id: &str, task_id: &TaskId, task_name: &str, error: &str) {
1248 let _ = self
1249 .audit_log
1250 .record(crate::audit::AuditEvent::WorkflowTaskFailed {
1251 timestamp: Utc::now(),
1252 workflow_id: workflow_id.to_string(),
1253 task_id: task_id.to_string(),
1254 task_name: task_name.to_string(),
1255 error: error.to_string(),
1256 })
1257 .await;
1258 }
1259
1260 async fn record_workflow_failed(&mut self, workflow_id: &str, task_id: &TaskId, error: &str) {
1262 let _ = self
1263 .audit_log
1264 .record(crate::audit::AuditEvent::WorkflowTaskFailed {
1265 timestamp: Utc::now(),
1266 workflow_id: workflow_id.to_string(),
1267 task_id: task_id.to_string(),
1268 task_name: task_id.to_string(),
1269 error: error.to_string(),
1270 })
1271 .await;
1272
1273 let _ = self
1274 .audit_log
1275 .record(crate::audit::AuditEvent::WorkflowCompleted {
1276 timestamp: Utc::now(),
1277 workflow_id: workflow_id.to_string(),
1278 total_tasks: self.workflow.task_count(),
1279 completed_tasks: self.completed_tasks.len(),
1280 })
1281 .await;
1282 }
1283
1284 async fn record_workflow_cancelled(&mut self, workflow_id: &str) {
1286 let _ = self
1287 .audit_log
1288 .record(crate::audit::AuditEvent::WorkflowCancelled {
1289 timestamp: Utc::now(),
1290 workflow_id: workflow_id.to_string(),
1291 })
1292 .await;
1293
1294 let _ = self
1295 .audit_log
1296 .record(crate::audit::AuditEvent::WorkflowCompleted {
1297 timestamp: Utc::now(),
1298 workflow_id: workflow_id.to_string(),
1299 total_tasks: self.workflow.task_count(),
1300 completed_tasks: self.completed_tasks.len(),
1301 })
1302 .await;
1303 }
1304
1305 async fn record_workflow_timeout(&mut self, workflow_id: &str, timeout_secs: u64) {
1307 let _ = self
1308 .audit_log
1309 .record(crate::audit::AuditEvent::WorkflowCompleted {
1310 timestamp: Utc::now(),
1311 workflow_id: workflow_id.to_string(),
1312 total_tasks: self.workflow.task_count(),
1313 completed_tasks: self.completed_tasks.len(),
1314 })
1315 .await;
1316 }
1317
1318 async fn check_for_deadlocks_before_execution(
1325 &mut self,
1326 workflow_id: &str,
1327 ) -> Result<(), crate::workflow::WorkflowError> {
1328 let detector = DeadlockDetector::new();
1329
1330 match detector.validate_workflow(&self.workflow) {
1332 Ok(warnings) => {
1333 let warning_strings: Vec<String> = warnings
1335 .iter()
1336 .map(|w| w.description())
1337 .collect();
1338
1339 let _ = self
1341 .audit_log
1342 .record(crate::audit::AuditEvent::WorkflowDeadlockCheck {
1343 timestamp: Utc::now(),
1344 workflow_id: workflow_id.to_string(),
1345 has_cycles: false,
1346 warnings: warning_strings.clone(),
1347 })
1348 .await;
1349
1350 for warning in &warning_strings {
1352 eprintln!("Deadlock warning: {}", warning);
1353 }
1354
1355 Ok(())
1356 }
1357 Err(DeadlockError::DependencyCycle(cycle)) => {
1358 let _ = self
1360 .audit_log
1361 .record(crate::audit::AuditEvent::WorkflowDeadlockCheck {
1362 timestamp: Utc::now(),
1363 workflow_id: workflow_id.to_string(),
1364 has_cycles: true,
1365 warnings: vec![format!("Dependency cycle detected: {:?}", cycle)],
1366 })
1367 .await;
1368
1369 Err(DeadlockError::DependencyCycle(cycle).into())
1371 }
1372 Err(e) => {
1373 Err(e.into())
1375 }
1376 }
1377 }
1378
1379 async fn record_deadlock_timeout(&mut self, workflow_id: &str, layer_index: usize, timeout_secs: u64) {
1381 let _ = self
1382 .audit_log
1383 .record(crate::audit::AuditEvent::WorkflowDeadlockTimeout {
1384 timestamp: Utc::now(),
1385 workflow_id: workflow_id.to_string(),
1386 layer_index,
1387 timeout_secs,
1388 })
1389 .await;
1390 }
1391
1392 async fn record_task_timeout(
1394 &mut self,
1395 workflow_id: &str,
1396 task_id: &TaskId,
1397 task_name: &str,
1398 timeout_secs: u64,
1399 ) {
1400 let _ = self
1401 .audit_log
1402 .record(crate::audit::AuditEvent::WorkflowTaskTimedOut {
1403 timestamp: Utc::now(),
1404 workflow_id: workflow_id.to_string(),
1405 task_id: task_id.to_string(),
1406 task_name: task_name.to_string(),
1407 timeout_secs,
1408 })
1409 .await;
1410 }
1411
1412 async fn record_workflow_completed(&mut self, workflow_id: &str) {
1414 let _ = self
1415 .audit_log
1416 .record(crate::audit::AuditEvent::WorkflowCompleted {
1417 timestamp: Utc::now(),
1418 workflow_id: workflow_id.to_string(),
1419 total_tasks: self.workflow.task_count(),
1420 completed_tasks: self.completed_tasks.len(),
1421 })
1422 .await;
1423 }
1424
1425 pub fn audit_log(&self) -> &AuditLog {
1427 &self.audit_log
1428 }
1429
1430 pub fn completed_count(&self) -> usize {
1432 self.completed_tasks.len()
1433 }
1434
1435 pub fn failed_count(&self) -> usize {
1437 self.failed_tasks.len()
1438 }
1439
1440 pub fn task_count(&self) -> usize {
1442 self.workflow.task_count()
1443 }
1444
1445 pub fn task_ids(&self) -> Vec<TaskId> {
1447 self.workflow.task_ids()
1448 }
1449
1450 pub fn completed_task_ids(&self) -> Vec<TaskId> {
1452 self.completed_tasks.iter().cloned().collect()
1453 }
1454
1455 pub fn failed_task_ids(&self) -> Vec<TaskId> {
1457 self.failed_tasks.clone()
1458 }
1459
1460 pub fn is_task_completed(&self, id: &TaskId) -> bool {
1462 self.completed_tasks.contains(id)
1463 }
1464
1465 pub fn is_task_failed(&self, id: &TaskId) -> bool {
1467 self.failed_tasks.contains(id)
1468 }
1469
1470 pub fn progress(&self) -> f64 {
1472 let total = self.workflow.task_count();
1473 if total == 0 {
1474 return 0.0;
1475 }
1476 self.completed_tasks.len() as f64 / total as f64
1477 }
1478
1479 pub fn rollback_strategy(&self) -> RollbackStrategy {
1481 self.rollback_strategy
1482 }
1483
1484 async fn create_checkpoint(&mut self, workflow_id: &str, position: usize) {
1494 let service = match &self.checkpoint_service {
1496 Some(s) => s,
1497 None => return,
1498 };
1499
1500 let checkpoint = WorkflowCheckpoint::from_executor(
1502 workflow_id,
1503 self.checkpoint_sequence,
1504 self,
1505 position,
1506 );
1507
1508 if let Err(e) = service.save(&checkpoint) {
1510 let _ = self
1512 .audit_log
1513 .record(crate::audit::AuditEvent::WorkflowTaskFailed {
1514 timestamp: Utc::now(),
1515 workflow_id: workflow_id.to_string(),
1516 task_id: format!("checkpoint-{}", self.checkpoint_sequence),
1517 task_name: "Checkpoint".to_string(),
1518 error: format!("Checkpoint save failed: {}", e),
1519 })
1520 .await;
1521 } else {
1522 self.checkpoint_sequence += 1;
1524 }
1525 }
1526
1527 fn restore_state_from_checkpoint(
1541 &mut self,
1542 checkpoint: &WorkflowCheckpoint,
1543 ) -> Result<(), crate::workflow::WorkflowError> {
1544 self.completed_tasks.clear();
1546 self.failed_tasks.clear();
1547
1548 for task_id in &checkpoint.completed_tasks {
1550 self.completed_tasks.insert(task_id.clone());
1551 }
1552
1553 self.failed_tasks = checkpoint.failed_tasks.clone();
1555
1556 self.checkpoint_sequence = checkpoint.sequence + 1;
1558
1559 Ok(())
1560 }
1561
1562 pub fn restore_checkpoint_state(
1576 &mut self,
1577 checkpoint: &WorkflowCheckpoint,
1578 ) -> Result<(), crate::workflow::WorkflowError> {
1579 validate_workflow_consistency(&self.workflow, checkpoint)?;
1581
1582 self.restore_state_from_checkpoint(checkpoint)?;
1584
1585 Ok(())
1586 }
1587
1588 fn validate_task_result(
1602 &self,
1603 task_result: &TaskResult,
1604 ) -> Result<ValidationResult, crate::workflow::WorkflowError> {
1605 let config = self.validation_config.as_ref()
1606 .ok_or_else(|| crate::workflow::WorkflowError::CheckpointCorrupted(
1607 "Validation configuration not set".to_string()
1608 ))?;
1609
1610 let validation = validate_checkpoint(task_result, config);
1611 Ok(validation)
1612 }
1613
1614 pub fn can_resume(&self) -> bool {
1624 let service = match &self.checkpoint_service {
1626 Some(s) => s,
1627 None => return false,
1628 };
1629
1630 let workflow_id = self.audit_log.tx_id().to_string();
1632
1633 let checkpoint = match service.get_latest(&workflow_id) {
1635 Ok(Some(cp)) => cp,
1636 _ => return false,
1637 };
1638
1639 if checkpoint.validate().is_err() {
1641 return false;
1642 }
1643
1644 validate_workflow_consistency(&self.workflow, &checkpoint).is_ok()
1646 }
1647
1648 pub async fn resume(&mut self) -> Result<WorkflowResult, crate::workflow::WorkflowError> {
1658 let service = self.checkpoint_service.as_ref()
1660 .ok_or_else(|| crate::workflow::WorkflowError::CheckpointNotFound(
1661 "No checkpoint service configured".to_string()
1662 ))?;
1663
1664 let workflow_id = self.audit_log.tx_id().to_string();
1666
1667 let checkpoint = service.get_latest(&workflow_id)?
1669 .ok_or_else(|| crate::workflow::WorkflowError::CheckpointNotFound(
1670 format!("No checkpoint found for workflow: {}", workflow_id)
1671 ))?;
1672
1673 self.resume_from_checkpoint_id(&checkpoint.id).await
1675 }
1676
1677 pub async fn resume_from_checkpoint_id(
1691 &mut self,
1692 checkpoint_id: &crate::workflow::checkpoint::CheckpointId,
1693 ) -> Result<WorkflowResult, crate::workflow::WorkflowError> {
1694 let service = self.checkpoint_service.as_ref()
1696 .ok_or_else(|| crate::workflow::WorkflowError::CheckpointNotFound(
1697 "No checkpoint service configured".to_string()
1698 ))?;
1699
1700 let checkpoint = service.load(checkpoint_id)?
1702 .ok_or_else(|| crate::workflow::WorkflowError::CheckpointNotFound(
1703 format!("Checkpoint not found: {}", checkpoint_id)
1704 ))?;
1705
1706 checkpoint.validate()?;
1708
1709 validate_workflow_consistency(&self.workflow, &checkpoint)?;
1711
1712 self.restore_state_from_checkpoint(&checkpoint)?;
1714
1715 let workflow_id = self.audit_log.tx_id().to_string();
1717
1718 if checkpoint.completed_tasks.len() == checkpoint.total_tasks {
1720 return Ok(WorkflowResult::new(checkpoint.completed_tasks));
1722 }
1723
1724 let execution_order = self.workflow.execution_order()?;
1726
1727 let start_position = checkpoint.current_position + 1;
1729
1730 for position in start_position..execution_order.len() {
1732 let task_id = &execution_order[position];
1733
1734 if let Err(e) = self.execute_task(&workflow_id, task_id).await {
1736 let completed: Vec<TaskId> = self.completed_tasks.iter().cloned().collect();
1738
1739 let rollback_set = self
1741 .rollback_engine
1742 .find_rollback_set(&self.workflow, task_id, self.rollback_strategy)
1743 .map_err(|err| {
1744 crate::workflow::WorkflowError::TaskNotFound(task_id.clone())
1745 })?;
1746
1747 let rollback_report = self
1749 .rollback_engine
1750 .execute_rollback(
1751 &self.workflow,
1752 rollback_set,
1753 &workflow_id,
1754 &mut self.audit_log,
1755 &self.compensation_registry,
1756 )
1757 .await
1758 .map_err(|_err| {
1759 crate::workflow::dag::WorkflowError::TaskNotFound(task_id.clone())
1760 })?;
1761
1762 self.record_workflow_failed(&workflow_id, task_id, &e.to_string())
1764 .await;
1765
1766 return Ok(WorkflowResult::new_failed_with_rollback(
1767 completed,
1768 task_id.clone(),
1769 e.to_string(),
1770 rollback_report,
1771 ));
1772 }
1773
1774 self.create_checkpoint(&workflow_id, position).await;
1776 }
1777
1778 self.record_workflow_completed(&workflow_id).await;
1780
1781 let completed: Vec<TaskId> = self.completed_tasks.iter().cloned().collect();
1782 Ok(WorkflowResult::new(completed))
1783 }
1784}
1785
1786#[cfg(test)]
1787mod tests {
1788 use super::*;
1789 use crate::workflow::dag::Workflow;
1790 use crate::workflow::task::{TaskContext, TaskResult, WorkflowTask};
1791 use crate::workflow::tools::{Tool, ToolRegistry};
1792 use async_trait::async_trait;
1793
1794 #[tokio::test]
1795 async fn test_executor_with_tool_registry() {
1796 let mut workflow = Workflow::new();
1798 let task_id = TaskId::new("task1");
1799 workflow.add_task(Box::new(MockTask::new(task_id.clone(), "Task 1")));
1800
1801 let mut registry = ToolRegistry::new();
1803 registry.register(Tool::new("echo", "echo")).unwrap();
1804
1805 let mut executor = WorkflowExecutor::new(workflow)
1806 .with_tool_registry(registry);
1807
1808 assert!(executor.tool_registry().is_some());
1810 assert!(executor.tool_registry().unwrap().is_registered("echo"));
1811
1812 let result = executor.execute().await.unwrap();
1814 assert!(result.success);
1815 }
1816
1817 struct MockTask {
1819 id: TaskId,
1820 name: String,
1821 deps: Vec<TaskId>,
1822 should_fail: bool,
1823 }
1824
1825 impl MockTask {
1826 fn new(id: impl Into<TaskId>, name: &str) -> Self {
1827 Self {
1828 id: id.into(),
1829 name: name.to_string(),
1830 deps: Vec::new(),
1831 should_fail: false,
1832 }
1833 }
1834
1835 fn with_dep(mut self, dep: impl Into<TaskId>) -> Self {
1836 self.deps.push(dep.into());
1837 self
1838 }
1839
1840 fn with_failure(mut self) -> Self {
1841 self.should_fail = true;
1842 self
1843 }
1844 }
1845
1846 #[async_trait]
1847 impl WorkflowTask for MockTask {
1848 async fn execute(&self, _context: &TaskContext) -> Result<TaskResult, crate::workflow::TaskError> {
1849 if self.should_fail {
1850 Ok(TaskResult::Failed("Task failed".to_string()))
1851 } else {
1852 Ok(TaskResult::WithCompensation {
1854 result: Box::new(TaskResult::Success),
1855 compensation: crate::workflow::task::CompensationAction::skip(
1856 format!("Mock compensation for task {}", self.name),
1857 ),
1858 })
1859 }
1860 }
1861
1862 fn id(&self) -> TaskId {
1863 self.id.clone()
1864 }
1865
1866 fn name(&self) -> &str {
1867 &self.name
1868 }
1869
1870 fn dependencies(&self) -> Vec<TaskId> {
1871 self.deps.clone()
1872 }
1873 }
1874
1875 #[tokio::test]
1876 async fn test_sequential_execution() {
1877 let mut workflow = Workflow::new();
1878
1879 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
1880 workflow.add_task(Box::new(MockTask::new("b", "Task B").with_dep("a")));
1881 workflow.add_task(Box::new(MockTask::new("c", "Task C").with_dep("a")));
1882
1883 workflow.add_dependency("a", "b").unwrap();
1884 workflow.add_dependency("a", "c").unwrap();
1885
1886 let mut executor = WorkflowExecutor::new(workflow);
1887 let result = executor.execute().await.unwrap();
1888
1889 assert!(result.success);
1890 assert_eq!(result.completed_tasks.len(), 3);
1891 assert_eq!(executor.completed_count(), 3);
1892 assert_eq!(executor.failed_count(), 0);
1893 }
1894
1895 #[tokio::test]
1896 async fn test_failure_stops_execution() {
1897 let mut workflow = Workflow::new();
1898
1899 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
1900 workflow.add_task(Box::new(MockTask::new("b", "Task B")
1901 .with_dep("a")
1902 .with_failure()));
1903 workflow.add_task(Box::new(MockTask::new("c", "Task C").with_dep("b")));
1904
1905 workflow.add_dependency("a", "b").unwrap();
1906 workflow.add_dependency("b", "c").unwrap();
1907
1908 let mut executor = WorkflowExecutor::new(workflow);
1909 let result = executor.execute().await;
1910
1911 assert!(result.is_ok());
1914 }
1915
1916 #[tokio::test]
1917 async fn test_audit_events_logged() {
1918 let mut workflow = Workflow::new();
1919
1920 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
1921 workflow.add_task(Box::new(MockTask::new("b", "Task B").with_dep("a")));
1922
1923 workflow.add_dependency("a", "b").unwrap();
1924
1925 let mut executor = WorkflowExecutor::new(workflow);
1926 executor.execute().await.unwrap();
1927
1928 let events = executor.audit_log().replay();
1929
1930 assert!(events.len() >= 6);
1932
1933 assert!(matches!(events[0], crate::audit::AuditEvent::WorkflowStarted { .. }));
1935 }
1936
1937 #[tokio::test]
1938 async fn test_failure_triggers_rollback() {
1939 let mut workflow = Workflow::new();
1940
1941 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
1942 workflow.add_task(Box::new(MockTask::new("b", "Task B").with_dep("a").with_failure()));
1943 workflow.add_task(Box::new(MockTask::new("c", "Task C").with_dep("b")));
1944
1945 workflow.add_dependency("a", "b").unwrap();
1946 workflow.add_dependency("b", "c").unwrap();
1947
1948 let mut executor = WorkflowExecutor::new(workflow);
1949 let result = executor.execute().await.unwrap();
1950
1951 assert!(!result.success);
1953 assert_eq!(result.failed_tasks.len(), 1);
1954 assert_eq!(result.failed_tasks[0], TaskId::new("b"));
1955
1956 assert!(result.rollback_report.is_some());
1958 let rollback_report = result.rollback_report.unwrap();
1959
1960 assert_eq!(rollback_report.rolled_back_tasks.len(), 0);
1966 assert_eq!(rollback_report.skipped_tasks.len(), 2); let events = executor.audit_log().replay();
1970 assert!(events.iter().any(|e| matches!(e, crate::audit::AuditEvent::WorkflowTaskRolledBack { .. })));
1971 assert!(events.iter().any(|e| matches!(e, crate::audit::AuditEvent::WorkflowRolledBack { .. })));
1972 }
1973
1974 #[tokio::test]
1975 async fn test_rollback_strategy_configurable() {
1976 let mut workflow = Workflow::new();
1977
1978 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
1979 workflow.add_task(Box::new(MockTask::new("b", "Task B").with_dep("a").with_failure()));
1980
1981 workflow.add_dependency("a", "b").unwrap();
1982
1983 let mut executor = WorkflowExecutor::new(workflow)
1985 .with_rollback_strategy(RollbackStrategy::FailedOnly);
1986 assert_eq!(executor.rollback_strategy(), RollbackStrategy::FailedOnly);
1987
1988 let result = executor.execute().await.unwrap();
1989
1990 assert!(result.rollback_report.is_some());
1993 assert_eq!(result.rollback_report.as_ref().unwrap().rolled_back_tasks.len(), 0);
1994 assert_eq!(result.rollback_report.as_ref().unwrap().skipped_tasks.len(), 1);
1995 }
1996
1997 #[tokio::test]
1998 async fn test_partial_rollback_diamond_pattern() {
1999 let mut workflow = Workflow::new();
2000
2001 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2003 workflow.add_task(Box::new(MockTask::new("b", "Task B").with_dep("a")));
2004 workflow.add_task(Box::new(MockTask::new("c", "Task C").with_dep("a")));
2005 workflow.add_task(Box::new(MockTask::new("d", "Task D").with_dep("b").with_dep("c").with_failure()));
2006
2007 workflow.add_dependency("a", "b").unwrap();
2008 workflow.add_dependency("a", "c").unwrap();
2009 workflow.add_dependency("b", "d").unwrap();
2010 workflow.add_dependency("c", "d").unwrap();
2011
2012 let mut executor = WorkflowExecutor::new(workflow);
2013 let result = executor.execute().await.unwrap();
2014
2015 assert!(!result.success);
2017 assert_eq!(result.failed_tasks[0], TaskId::new("d"));
2018
2019 assert!(result.rollback_report.is_some());
2021 let rollback_report = result.rollback_report.unwrap();
2022
2023 assert_eq!(rollback_report.rolled_back_tasks.len(), 0);
2026 assert_eq!(rollback_report.skipped_tasks.len(), 1);
2027 assert!(rollback_report.skipped_tasks.contains(&TaskId::new("d")));
2028
2029 assert!(result.completed_tasks.contains(&TaskId::new("a")));
2031 assert!(result.completed_tasks.contains(&TaskId::new("b")));
2032 assert!(result.completed_tasks.contains(&TaskId::new("c")));
2033 }
2034
2035 #[tokio::test]
2036 async fn test_executor_with_checkpoint_service() {
2037 use crate::workflow::checkpoint::WorkflowCheckpointService;
2038
2039 let mut workflow = Workflow::new();
2040 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2041 workflow.add_task(Box::new(MockTask::new("b", "Task B").with_dep("a")));
2042 workflow.add_task(Box::new(MockTask::new("c", "Task C").with_dep("a")));
2043
2044 workflow.add_dependency("a", "b").unwrap();
2045 workflow.add_dependency("a", "c").unwrap();
2046
2047 let checkpoint_service = WorkflowCheckpointService::default();
2048 let mut executor = WorkflowExecutor::new(workflow)
2049 .with_checkpoint_service(checkpoint_service.clone());
2050
2051 let result = executor.execute().await.unwrap();
2052
2053 assert!(result.success);
2054 assert_eq!(result.completed_tasks.len(), 3);
2055
2056 assert_eq!(executor.checkpoint_sequence, 3);
2058 }
2059
2060 #[tokio::test]
2061 async fn test_checkpoint_after_each_task() {
2062 use crate::workflow::checkpoint::WorkflowCheckpointService;
2063
2064 let mut workflow = Workflow::new();
2065 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2066 workflow.add_task(Box::new(MockTask::new("b", "Task B").with_dep("a")));
2067
2068 workflow.add_dependency("a", "b").unwrap();
2069
2070 let checkpoint_service = WorkflowCheckpointService::default();
2071 let mut executor = WorkflowExecutor::new(workflow)
2072 .with_checkpoint_service(checkpoint_service.clone());
2073
2074 executor.execute().await.unwrap();
2075
2076 assert_eq!(executor.checkpoint_sequence, 2);
2078
2079 let workflow_id = executor.audit_log.tx_id().to_string();
2081 let latest = checkpoint_service.get_latest(&workflow_id).unwrap();
2082 assert!(latest.is_some());
2083
2084 let checkpoint = latest.unwrap();
2085 assert_eq!(checkpoint.sequence, 1); assert_eq!(checkpoint.completed_tasks.len(), 2);
2087 }
2088
2089 #[tokio::test]
2090 async fn test_checkpoint_service_optional() {
2091 let mut workflow = Workflow::new();
2092 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2093 workflow.add_task(Box::new(MockTask::new("b", "Task B").with_dep("a")));
2094
2095 workflow.add_dependency("a", "b").unwrap();
2096
2097 let mut executor = WorkflowExecutor::new(workflow);
2099
2100 let result = executor.execute().await.unwrap();
2101
2102 assert!(result.success);
2103 assert_eq!(executor.checkpoint_sequence, 0); }
2105
2106 #[tokio::test]
2107 async fn test_checkpoint_created_after_task_success() {
2108 use crate::workflow::checkpoint::WorkflowCheckpointService;
2109
2110 let mut workflow = Workflow::new();
2111 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2112 workflow.add_task(Box::new(MockTask::new("b", "Task B").with_dep("a")));
2113
2114 workflow.add_dependency("a", "b").unwrap();
2115
2116 let checkpoint_service = WorkflowCheckpointService::default();
2117 let mut executor = WorkflowExecutor::new(workflow)
2118 .with_checkpoint_service(checkpoint_service.clone());
2119
2120 let result = executor.execute().await.unwrap();
2121
2122 assert!(result.success);
2124 assert_eq!(result.completed_tasks.len(), 2);
2125
2126 assert_eq!(executor.checkpoint_sequence, 2);
2128
2129 let workflow_id = executor.audit_log.tx_id().to_string();
2131 let latest = checkpoint_service.get_latest(&workflow_id).unwrap();
2132 assert!(latest.is_some());
2133
2134 let checkpoint = latest.unwrap();
2135 assert_eq!(checkpoint.sequence, 1);
2136 assert_eq!(checkpoint.completed_tasks.len(), 2);
2137 assert!(checkpoint.completed_tasks.contains(&TaskId::new("a")));
2138 assert!(checkpoint.completed_tasks.contains(&TaskId::new("b")));
2139 }
2140
2141 #[tokio::test]
2142 async fn test_restore_state_from_checkpoint() {
2143 use crate::workflow::checkpoint::WorkflowCheckpointService;
2144
2145 let mut workflow = Workflow::new();
2146 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2147 workflow.add_task(Box::new(MockTask::new("b", "Task B").with_dep("a")));
2148 workflow.add_task(Box::new(MockTask::new("c", "Task C").with_dep("a")));
2149
2150 workflow.add_dependency("a", "b").unwrap();
2151 workflow.add_dependency("a", "c").unwrap();
2152
2153 let checkpoint_service = WorkflowCheckpointService::default();
2154 let mut executor = WorkflowExecutor::new(workflow)
2155 .with_checkpoint_service(checkpoint_service.clone());
2156
2157 executor.execute().await.unwrap();
2159
2160 let workflow_id = executor.audit_log.tx_id().to_string();
2162 let checkpoint = checkpoint_service.get_latest(&workflow_id).unwrap().unwrap();
2163
2164 let mut new_workflow = Workflow::new();
2166 new_workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2167 new_workflow.add_task(Box::new(MockTask::new("b", "Task B").with_dep("a")));
2168 new_workflow.add_task(Box::new(MockTask::new("c", "Task C").with_dep("a")));
2169
2170 new_workflow.add_dependency("a", "b").unwrap();
2171 new_workflow.add_dependency("a", "c").unwrap();
2172
2173 let mut new_executor = WorkflowExecutor::new(new_workflow);
2174
2175 let result = new_executor.restore_checkpoint_state(&checkpoint);
2177 assert!(result.is_ok());
2178
2179 assert_eq!(new_executor.completed_tasks.len(), checkpoint.completed_tasks.len());
2181 assert!(new_executor.completed_tasks.contains(&TaskId::new("a")));
2182 assert!(new_executor.completed_tasks.contains(&TaskId::new("b")));
2183 assert!(new_executor.completed_tasks.contains(&TaskId::new("c")));
2184 assert_eq!(new_executor.checkpoint_sequence, checkpoint.sequence + 1);
2185 }
2186
2187 #[tokio::test]
2188 async fn test_state_restoration_idempotent() {
2189 use crate::workflow::checkpoint::WorkflowCheckpointService;
2190
2191 let mut workflow = Workflow::new();
2192 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2193 workflow.add_task(Box::new(MockTask::new("b", "Task B").with_dep("a")));
2194
2195 workflow.add_dependency("a", "b").unwrap();
2196
2197 let checkpoint_service = WorkflowCheckpointService::default();
2198 let mut executor = WorkflowExecutor::new(workflow)
2199 .with_checkpoint_service(checkpoint_service.clone());
2200
2201 executor.execute().await.unwrap();
2203
2204 let workflow_id = executor.audit_log.tx_id().to_string();
2206 let checkpoint = checkpoint_service.get_latest(&workflow_id).unwrap().unwrap();
2207
2208 let mut new_workflow = Workflow::new();
2210 new_workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2211 new_workflow.add_task(Box::new(MockTask::new("b", "Task B").with_dep("a")));
2212
2213 new_workflow.add_dependency("a", "b").unwrap();
2214
2215 let mut new_executor = WorkflowExecutor::new(new_workflow);
2216
2217 let result1 = new_executor.restore_checkpoint_state(&checkpoint);
2219 assert!(result1.is_ok());
2220 let completed_count_after_first = new_executor.completed_tasks.len();
2221
2222 let result2 = new_executor.restore_checkpoint_state(&checkpoint);
2224 assert!(result2.is_ok());
2225 let completed_count_after_second = new_executor.completed_tasks.len();
2226
2227 assert_eq!(completed_count_after_first, completed_count_after_second);
2229 assert_eq!(completed_count_after_first, checkpoint.completed_tasks.len());
2230 }
2231
2232 #[tokio::test]
2233 async fn test_restore_checkpoint_state_validates_workflow() {
2234 use crate::workflow::checkpoint::WorkflowCheckpointService;
2235
2236 let mut workflow = Workflow::new();
2237 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2238 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
2239
2240 let checkpoint_service = WorkflowCheckpointService::default();
2241 let mut executor = WorkflowExecutor::new(workflow)
2242 .with_checkpoint_service(checkpoint_service.clone());
2243
2244 executor.execute().await.unwrap();
2246
2247 let workflow_id = executor.audit_log.tx_id().to_string();
2249 let checkpoint = checkpoint_service.get_latest(&workflow_id).unwrap().unwrap();
2250
2251 let mut different_workflow = Workflow::new();
2253 different_workflow.add_task(Box::new(MockTask::new("x", "Task X")));
2254 different_workflow.add_task(Box::new(MockTask::new("y", "Task Y")));
2255
2256 let mut different_executor = WorkflowExecutor::new(different_workflow);
2257
2258 let result = different_executor.restore_checkpoint_state(&checkpoint);
2260 assert!(result.is_err());
2261
2262 match result {
2263 Err(crate::workflow::WorkflowError::WorkflowChanged(_)) => {
2264 }
2266 _ => panic!("Expected WorkflowChanged error"),
2267 }
2268 }
2269
2270 #[tokio::test]
2271 async fn test_can_resume() {
2272 use crate::workflow::checkpoint::WorkflowCheckpointService;
2273
2274 let mut workflow = Workflow::new();
2275 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2276 workflow.add_task(Box::new(MockTask::new("b", "Task B").with_dep("a")));
2277
2278 workflow.add_dependency("a", "b").unwrap();
2279
2280 let checkpoint_service = WorkflowCheckpointService::default();
2281 let executor = WorkflowExecutor::new(workflow)
2282 .with_checkpoint_service(checkpoint_service.clone());
2283
2284 assert!(!executor.can_resume());
2286
2287 let mut workflow2 = Workflow::new();
2289 workflow2.add_task(Box::new(MockTask::new("a", "Task A")));
2290 workflow2.add_task(Box::new(MockTask::new("b", "Task B").with_dep("a")));
2291 workflow2.add_dependency("a", "b").unwrap();
2292
2293 let mut executor2 = WorkflowExecutor::new(workflow2)
2294 .with_checkpoint_service(checkpoint_service.clone());
2295 executor2.execute().await.unwrap();
2296
2297 assert!(executor2.can_resume());
2299 }
2300
2301 #[tokio::test]
2302 async fn test_can_resume_returns_false_without_service() {
2303 let mut workflow = Workflow::new();
2304 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2305
2306 let executor = WorkflowExecutor::new(workflow);
2307
2308 assert!(!executor.can_resume());
2310 }
2311
2312 #[tokio::test]
2313 async fn test_resume_from_checkpoint() {
2314 use crate::workflow::checkpoint::WorkflowCheckpointService;
2315
2316 let mut workflow = Workflow::new();
2317 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2318 workflow.add_task(Box::new(MockTask::new("b", "Task B").with_dep("a")));
2319 workflow.add_task(Box::new(MockTask::new("c", "Task C").with_dep("a")));
2320
2321 workflow.add_dependency("a", "b").unwrap();
2322 workflow.add_dependency("a", "c").unwrap();
2323
2324 let checkpoint_service = WorkflowCheckpointService::default();
2325 let mut executor = WorkflowExecutor::new(workflow)
2326 .with_checkpoint_service(checkpoint_service.clone());
2327
2328 executor.execute().await.unwrap();
2330
2331 let workflow_id = executor.audit_log.tx_id().to_string();
2333 let checkpoint = checkpoint_service.get_latest(&workflow_id).unwrap().unwrap();
2334 let checkpoint_id = checkpoint.id;
2335
2336 let mut new_workflow = Workflow::new();
2338 new_workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2339 new_workflow.add_task(Box::new(MockTask::new("b", "Task B").with_dep("a")));
2340 new_workflow.add_task(Box::new(MockTask::new("c", "Task C").with_dep("a")));
2341
2342 new_workflow.add_dependency("a", "b").unwrap();
2343 new_workflow.add_dependency("a", "c").unwrap();
2344
2345 let mut new_executor = WorkflowExecutor::new(new_workflow)
2346 .with_checkpoint_service(checkpoint_service.clone());
2347
2348 let result = new_executor.resume_from_checkpoint_id(&checkpoint_id).await;
2350
2351 assert!(result.is_ok());
2352 let workflow_result = result.unwrap();
2353
2354 assert!(workflow_result.success);
2356 assert_eq!(workflow_result.completed_tasks.len(), 3);
2357 }
2358
2359 #[tokio::test]
2360 async fn test_resume_skip_completed() {
2361 use crate::workflow::checkpoint::WorkflowCheckpointService;
2362
2363 let mut workflow = Workflow::new();
2364 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2365 workflow.add_task(Box::new(MockTask::new("b", "Task B").with_dep("a")));
2366 workflow.add_task(Box::new(MockTask::new("c", "Task C").with_dep("b")));
2367
2368 workflow.add_dependency("a", "b").unwrap();
2369 workflow.add_dependency("b", "c").unwrap();
2370
2371 let checkpoint_service = WorkflowCheckpointService::default();
2372 let mut executor = WorkflowExecutor::new(workflow)
2373 .with_checkpoint_service(checkpoint_service.clone());
2374
2375 let workflow_id = executor.audit_log.tx_id().to_string();
2377
2378 executor.completed_tasks.insert(TaskId::new("a"));
2380 let partial_checkpoint = WorkflowCheckpoint::from_executor(
2381 &workflow_id,
2382 0,
2383 &executor,
2384 0,
2385 );
2386 checkpoint_service.save(&partial_checkpoint).unwrap();
2387
2388 let checkpoint_id = partial_checkpoint.id;
2389
2390 let mut new_workflow = Workflow::new();
2392 new_workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2393 new_workflow.add_task(Box::new(MockTask::new("b", "Task B").with_dep("a")));
2394 new_workflow.add_task(Box::new(MockTask::new("c", "Task C").with_dep("b")));
2395
2396 new_workflow.add_dependency("a", "b").unwrap();
2397 new_workflow.add_dependency("b", "c").unwrap();
2398
2399 let mut new_executor = WorkflowExecutor::new(new_workflow)
2400 .with_checkpoint_service(checkpoint_service.clone());
2401
2402 let result = new_executor.resume_from_checkpoint_id(&checkpoint_id).await.unwrap();
2404
2405 assert!(result.success);
2406 assert_eq!(result.completed_tasks.len(), 3);
2407
2408 assert!(result.completed_tasks.contains(&TaskId::new("a")));
2410 assert!(result.completed_tasks.contains(&TaskId::new("b")));
2411 assert!(result.completed_tasks.contains(&TaskId::new("c")));
2412 }
2413
2414 #[tokio::test]
2415 async fn test_resume_returns_immediately_if_all_completed() {
2416 use crate::workflow::checkpoint::WorkflowCheckpointService;
2417
2418 let mut workflow = Workflow::new();
2419 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2420 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
2421
2422 let checkpoint_service = WorkflowCheckpointService::default();
2423 let mut executor = WorkflowExecutor::new(workflow)
2424 .with_checkpoint_service(checkpoint_service.clone());
2425
2426 executor.execute().await.unwrap();
2428
2429 let workflow_id = executor.audit_log.tx_id().to_string();
2431 let checkpoint = checkpoint_service.get_latest(&workflow_id).unwrap().unwrap();
2432 let checkpoint_id = checkpoint.id;
2433
2434 let mut new_workflow = Workflow::new();
2436 new_workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2437 new_workflow.add_task(Box::new(MockTask::new("b", "Task B")));
2438
2439 let mut new_executor = WorkflowExecutor::new(new_workflow)
2440 .with_checkpoint_service(checkpoint_service.clone());
2441
2442 let result = new_executor.resume_from_checkpoint_id(&checkpoint_id).await.unwrap();
2444
2445 assert!(result.success);
2446 assert_eq!(result.completed_tasks.len(), 2);
2447 }
2448
2449 #[tokio::test]
2450 async fn test_resume_fails_with_invalid_checkpoint() {
2451 use crate::workflow::checkpoint::{CheckpointId, WorkflowCheckpointService};
2452
2453 let mut workflow = Workflow::new();
2454 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2455
2456 let checkpoint_service = WorkflowCheckpointService::default();
2457 let mut executor = WorkflowExecutor::new(workflow)
2458 .with_checkpoint_service(checkpoint_service.clone());
2459
2460 let fake_checkpoint_id = CheckpointId::new();
2462 let result = executor.resume_from_checkpoint_id(&fake_checkpoint_id).await;
2463
2464 assert!(result.is_err());
2465
2466 match result {
2467 Err(crate::workflow::WorkflowError::CheckpointNotFound(_)) => {
2468 }
2470 _ => panic!("Expected CheckpointNotFound error"),
2471 }
2472 }
2473
2474 #[test]
2475 fn test_executor_register_compensation() {
2476 let mut workflow = Workflow::new();
2477 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2478 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
2479
2480 let mut executor = WorkflowExecutor::new(workflow);
2481
2482 executor.register_compensation(
2484 TaskId::new("a"),
2485 ToolCompensation::skip("Test compensation"),
2486 );
2487
2488 assert!(executor.compensation_registry.has_compensation(&TaskId::new("a")));
2490 assert!(!executor.compensation_registry.has_compensation(&TaskId::new("b")));
2491
2492 let comp = executor.compensation_registry.get(&TaskId::new("a"));
2494 assert!(comp.is_some());
2495 assert_eq!(comp.unwrap().description, "Test compensation");
2496 }
2497
2498 #[test]
2499 fn test_executor_register_file_compensation() {
2500 let mut workflow = Workflow::new();
2501 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2502
2503 let mut executor = WorkflowExecutor::new(workflow);
2504
2505 executor.register_file_compensation(TaskId::new("a"), "/tmp/test.txt");
2507
2508 assert!(executor.compensation_registry.has_compensation(&TaskId::new("a")));
2510
2511 let comp = executor.compensation_registry.get(&TaskId::new("a"));
2512 assert!(comp.is_some());
2513 assert!(comp.unwrap().description.contains("Delete file"));
2514 }
2515
2516 #[test]
2517 fn test_executor_validate_compensation_coverage() {
2518 let mut workflow = Workflow::new();
2519 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2520 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
2521 workflow.add_task(Box::new(MockTask::new("c", "Task C")));
2522
2523 let mut executor = WorkflowExecutor::new(workflow);
2524
2525 executor.register_compensation(
2527 TaskId::new("a"),
2528 ToolCompensation::skip("Test compensation"),
2529 );
2530
2531 let report = executor.validate_compensation_coverage();
2533
2534 assert_eq!(report.tasks_with_compensation.len(), 1);
2535 assert!(report.tasks_with_compensation.contains(&TaskId::new("a")));
2536
2537 assert_eq!(report.tasks_without_compensation.len(), 2);
2538 assert!(report.tasks_without_compensation.contains(&TaskId::new("b")));
2539 assert!(report.tasks_without_compensation.contains(&TaskId::new("c")));
2540
2541 assert!((report.coverage_percentage - 0.333).abs() < 0.01);
2543 }
2544
2545 #[tokio::test]
2546 async fn test_compensation_registry_integration_with_rollback() {
2547 use crate::workflow::rollback::CompensationRegistry;
2548
2549 let mut workflow = Workflow::new();
2550
2551 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2552 workflow.add_task(Box::new(MockTask::new("b", "Task B").with_dep("a")));
2553 workflow.add_task(Box::new(MockTask::new("c", "Task C").with_dep("b")));
2554
2555 workflow.add_dependency("a", "b").unwrap();
2556 workflow.add_dependency("b", "c").unwrap();
2557
2558 let mut executor = WorkflowExecutor::new(workflow);
2559
2560 let result = executor.execute().await.unwrap();
2566
2567 assert!(result.success);
2569
2570 assert!(executor.compensation_registry.has_compensation(&TaskId::new("a")));
2572 assert!(executor.compensation_registry.has_compensation(&TaskId::new("b")));
2573 assert!(executor.compensation_registry.has_compensation(&TaskId::new("c")));
2574 }
2575
2576 #[tokio::test]
2579 async fn test_execute_with_validations() {
2580 use crate::workflow::checkpoint::ValidationCheckpoint;
2581
2582 let mut workflow = Workflow::new();
2583 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2584 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
2585
2586 let mut executor = WorkflowExecutor::new(workflow);
2587 let result = executor.execute_with_validations().await;
2588
2589 assert!(result.is_ok());
2591 let workflow_result = result.unwrap();
2592 assert!(workflow_result.success);
2593 }
2594
2595 #[tokio::test]
2596 async fn test_validation_config_builder() {
2597 use crate::workflow::checkpoint::ValidationCheckpoint;
2598
2599 let mut workflow = Workflow::new();
2600 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2601
2602 let custom_config = ValidationCheckpoint {
2603 min_confidence: 0.5,
2604 warning_threshold: 0.8,
2605 rollback_on_failure: true,
2606 };
2607
2608 let executor = WorkflowExecutor::new(workflow)
2609 .with_validation_config(custom_config);
2610
2611 assert!(executor.validation_config.is_some());
2612 let config = executor.validation_config.unwrap();
2613 assert_eq!(config.min_confidence, 0.5);
2614 assert_eq!(config.warning_threshold, 0.8);
2615 assert_eq!(config.rollback_on_failure, true);
2616 }
2617
2618 #[tokio::test]
2619 async fn test_validation_warning_continues() {
2620 use crate::workflow::checkpoint::ValidationCheckpoint;
2621
2622 let mut workflow = Workflow::new();
2623 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2624 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
2625
2626 let config = ValidationCheckpoint {
2628 min_confidence: 0.4,
2629 warning_threshold: 0.9, rollback_on_failure: false,
2631 };
2632
2633 let mut executor = WorkflowExecutor::new(workflow)
2634 .with_validation_config(config);
2635
2636 let result = executor.execute().await.unwrap();
2637
2638 assert!(result.success);
2640 }
2641
2642 #[test]
2643 fn test_validate_task_result_method() {
2644 use crate::workflow::checkpoint::ValidationCheckpoint;
2645 use crate::workflow::task::TaskResult;
2646
2647 let mut workflow = Workflow::new();
2648 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2649
2650 let config = ValidationCheckpoint::default();
2651 let executor = WorkflowExecutor::new(workflow)
2652 .with_validation_config(config);
2653
2654 let result = TaskResult::Success;
2656 let validation = executor.validate_task_result(&result);
2657
2658 assert!(validation.is_ok());
2659 let v = validation.unwrap();
2660 assert_eq!(v.confidence, 1.0);
2661 assert_eq!(v.status, crate::workflow::checkpoint::ValidationStatus::Passed);
2662 }
2663
2664 #[test]
2665 fn test_validate_task_result_no_config() {
2666 use crate::workflow::task::TaskResult;
2667
2668 let mut workflow = Workflow::new();
2669 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2670
2671 let executor = WorkflowExecutor::new(workflow);
2673
2674 let result = TaskResult::Success;
2675 let validation = executor.validate_task_result(&result);
2676
2677 assert!(validation.is_err());
2678 }
2679
2680 #[test]
2683 fn test_executor_without_cancellation_source() {
2684 let mut workflow = Workflow::new();
2685 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2686
2687 let executor = WorkflowExecutor::new(workflow);
2688
2689 assert!(executor.cancellation_token().is_none());
2691
2692 executor.cancel(); }
2695
2696 #[test]
2697 fn test_executor_cancellation_token_access() {
2698 use crate::workflow::cancellation::CancellationTokenSource;
2699
2700 let mut workflow = Workflow::new();
2701 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2702
2703 let source = CancellationTokenSource::new();
2704 let executor = WorkflowExecutor::new(workflow)
2705 .with_cancellation_source(source);
2706
2707 assert!(executor.cancellation_token().is_some());
2709 let token = executor.cancellation_token().unwrap();
2710 assert!(!token.is_cancelled());
2711 }
2712
2713 #[tokio::test]
2714 async fn test_executor_cancel_stops_execution() {
2715 use crate::workflow::cancellation::CancellationTokenSource;
2716 use std::sync::Arc;
2717 use std::sync::atomic::{AtomicBool, Ordering};
2718
2719 let mut workflow = Workflow::new();
2720 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2721 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
2722 workflow.add_task(Box::new(MockTask::new("c", "Task C")));
2723
2724 let cancel_flag = Arc::new(AtomicBool::new(false));
2726 let cancel_flag_clone = cancel_flag.clone();
2727
2728 tokio::spawn(async move {
2730 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2731 cancel_flag_clone.store(true, Ordering::SeqCst);
2732 });
2733
2734 let source = CancellationTokenSource::new();
2737 let mut executor = WorkflowExecutor::new(workflow)
2738 .with_cancellation_source(source);
2739
2740 executor.cancel();
2742
2743 let result = executor.execute().await.unwrap();
2745
2746 assert!(!result.success);
2748 assert_eq!(result.completed_tasks.len(), 0);
2749 assert!(result.error.unwrap().contains("cancelled"));
2750 }
2751
2752 #[tokio::test]
2753 async fn test_cancellation_recorded_in_audit() {
2754 use crate::workflow::cancellation::CancellationTokenSource;
2755
2756 let mut workflow = Workflow::new();
2757 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2758
2759 let source = CancellationTokenSource::new();
2760 let mut executor = WorkflowExecutor::new(workflow)
2761 .with_cancellation_source(source);
2762
2763 executor.cancel();
2765
2766 executor.execute().await.unwrap();
2768
2769 let events = executor.audit_log().replay();
2771
2772 assert!(events.iter().any(|e| matches!(e, crate::audit::AuditEvent::WorkflowCancelled { .. })));
2774 }
2775
2776 #[test]
2779 fn test_executor_without_timeout_config() {
2780 let mut workflow = Workflow::new();
2781 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2782
2783 let executor = WorkflowExecutor::new(workflow);
2784
2785 assert!(executor.timeout_config().is_none());
2787 }
2788
2789 #[test]
2790 fn test_executor_with_timeout_config() {
2791 use crate::workflow::timeout::TimeoutConfig;
2792
2793 let mut workflow = Workflow::new();
2794 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2795
2796 let config = TimeoutConfig::new();
2797 let executor = WorkflowExecutor::new(workflow)
2798 .with_timeout_config(config);
2799
2800 assert!(executor.timeout_config().is_some());
2802 let retrieved_config = executor.timeout_config().unwrap();
2803 assert!(retrieved_config.task_timeout.is_some());
2804 assert!(retrieved_config.workflow_timeout.is_some());
2805 }
2806
2807 #[tokio::test]
2808 async fn test_executor_with_task_timeout() {
2809 use crate::workflow::timeout::{TaskTimeout, TimeoutConfig};
2810 use std::time::Duration;
2811
2812 let mut workflow = Workflow::new();
2813 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2814
2815 let config = TimeoutConfig {
2816 task_timeout: Some(TaskTimeout::from_millis(100)),
2817 workflow_timeout: None,
2818 };
2819
2820 let mut executor = WorkflowExecutor::new(workflow)
2821 .with_timeout_config(config);
2822
2823 let result = executor.execute().await;
2825
2826 assert!(result.is_ok());
2828 let workflow_result = result.unwrap();
2829 assert!(workflow_result.success);
2830 }
2831
2832 #[tokio::test]
2833 async fn test_executor_with_workflow_timeout() {
2834 use crate::workflow::timeout::{TimeoutConfig, WorkflowTimeout};
2835 use std::time::Duration;
2836
2837 let mut workflow = Workflow::new();
2838 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2839 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
2840
2841 let config = TimeoutConfig {
2842 task_timeout: None,
2843 workflow_timeout: Some(WorkflowTimeout::from_secs(5)),
2844 };
2845
2846 let mut executor = WorkflowExecutor::new(workflow)
2847 .with_timeout_config(config);
2848
2849 let result = executor.execute().await;
2851
2852 assert!(result.is_ok());
2853 }
2854
2855 #[tokio::test]
2856 async fn test_task_timeout_records_audit_event() {
2857 use crate::workflow::timeout::{TaskTimeout, TimeoutConfig};
2858 use std::time::Duration;
2859
2860 let mut workflow = Workflow::new();
2861 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2862
2863 let config = TimeoutConfig {
2864 task_timeout: Some(TaskTimeout::from_millis(100)),
2865 workflow_timeout: None,
2866 };
2867
2868 let mut executor = WorkflowExecutor::new(workflow)
2869 .with_timeout_config(config);
2870
2871 let result = executor.execute().await;
2873
2874 assert!(result.is_ok());
2877
2878 assert!(executor.timeout_config().is_some());
2880 }
2881
2882 #[tokio::test]
2883 async fn test_workflow_timeout_records_audit_event() {
2884 use crate::workflow::timeout::{TimeoutConfig, WorkflowTimeout};
2885
2886 let mut workflow = Workflow::new();
2887 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2888
2889 let config = TimeoutConfig {
2890 task_timeout: None,
2891 workflow_timeout: Some(WorkflowTimeout::from_secs(5)),
2892 };
2893
2894 let mut executor = WorkflowExecutor::new(workflow)
2895 .with_timeout_config(config);
2896
2897 let result = executor.execute_with_timeout().await;
2899
2900 assert!(result.is_ok());
2902 }
2903
2904 #[tokio::test]
2905 async fn test_execute_with_timeout_without_config() {
2906 let mut workflow = Workflow::new();
2907 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2908
2909 let mut executor = WorkflowExecutor::new(workflow);
2910
2911 let result = executor.execute_with_timeout().await;
2913
2914 assert!(result.is_ok());
2915 assert!(result.unwrap().success);
2916 }
2917
2918 #[tokio::test]
2921 async fn test_execute_parallel_single_task() {
2922 let mut workflow = Workflow::new();
2923 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2924
2925 let mut executor = WorkflowExecutor::new(workflow);
2926 let result = executor.execute_parallel().await;
2927
2928 assert!(result.is_ok());
2929 let workflow_result = result.unwrap();
2930 assert!(workflow_result.success);
2931 assert_eq!(workflow_result.completed_tasks.len(), 1);
2932 assert!(workflow_result.completed_tasks.contains(&TaskId::new("a")));
2933 }
2934
2935 #[tokio::test]
2936 async fn test_execute_parallel_two_independent_tasks() {
2937 let mut workflow = Workflow::new();
2938 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2939 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
2940
2941 let mut executor = WorkflowExecutor::new(workflow);
2942 let result = executor.execute_parallel().await;
2943
2944 assert!(result.is_ok());
2945 let workflow_result = result.unwrap();
2946 assert!(workflow_result.success);
2947 assert_eq!(workflow_result.completed_tasks.len(), 2);
2948 assert!(workflow_result.completed_tasks.contains(&TaskId::new("a")));
2949 assert!(workflow_result.completed_tasks.contains(&TaskId::new("b")));
2950 }
2951
2952 #[tokio::test]
2953 async fn test_execute_parallel_diamond_pattern() {
2954 let mut workflow = Workflow::new();
2955
2956 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2958 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
2959 workflow.add_task(Box::new(MockTask::new("c", "Task C")));
2960 workflow.add_task(Box::new(MockTask::new("d", "Task D")));
2961
2962 workflow.add_dependency("a", "b").unwrap();
2963 workflow.add_dependency("a", "c").unwrap();
2964 workflow.add_dependency("b", "d").unwrap();
2965 workflow.add_dependency("c", "d").unwrap();
2966
2967 let mut executor = WorkflowExecutor::new(workflow);
2968 let result = executor.execute_parallel().await;
2969
2970 assert!(result.is_ok());
2971 let workflow_result = result.unwrap();
2972 assert!(workflow_result.success);
2973 assert_eq!(workflow_result.completed_tasks.len(), 4);
2974
2975 let audit_events = executor.audit_log.replay();
2977
2978 let parallel_started_events: Vec<_> = audit_events
2980 .iter()
2981 .filter(|e| matches!(e, crate::audit::AuditEvent::WorkflowTaskParallelStarted { .. }))
2982 .collect();
2983
2984 assert_eq!(parallel_started_events.len(), 3);
2986 }
2987
2988 #[tokio::test]
2989 async fn test_execute_parallel_with_cancellation() {
2990 use crate::workflow::cancellation::CancellationTokenSource;
2991
2992 let mut workflow = Workflow::new();
2993 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
2994 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
2995
2996 let source = CancellationTokenSource::new();
2997 let mut executor = WorkflowExecutor::new(workflow)
2998 .with_cancellation_source(source);
2999
3000 executor.cancel();
3002
3003 let result = executor.execute_parallel().await;
3004
3005 assert!(result.is_ok());
3006 let workflow_result = result.unwrap();
3007 assert!(!workflow_result.success);
3008 assert_eq!(workflow_result.completed_tasks.len(), 0);
3009 assert_eq!(workflow_result.error, Some("Workflow cancelled".to_string()));
3010 }
3011
3012 #[tokio::test]
3013 async fn test_execute_parallel_empty_workflow() {
3014 let workflow = Workflow::new();
3015 let mut executor = WorkflowExecutor::new(workflow);
3016
3017 let result = executor.execute_parallel().await;
3018
3019 assert!(result.is_err());
3020 assert!(matches!(result, Err(crate::workflow::WorkflowError::EmptyWorkflow)));
3021 }
3022
3023 #[tokio::test]
3024 async fn test_execute_parallel_audit_events() {
3025 let mut workflow = Workflow::new();
3026 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
3027 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
3028
3029 let mut executor = WorkflowExecutor::new(workflow);
3030 let result = executor.execute_parallel().await;
3031
3032 assert!(result.is_ok());
3033
3034 let audit_events = executor.audit_log.replay();
3035
3036 assert!(audit_events.iter().any(|e| matches!(e, crate::audit::AuditEvent::WorkflowStarted { .. })));
3038
3039 let parallel_started: Vec<_> = audit_events
3041 .iter()
3042 .filter(|e| matches!(e, crate::audit::AuditEvent::WorkflowTaskParallelStarted { .. }))
3043 .collect();
3044
3045 assert!(!parallel_started.is_empty());
3046
3047 let parallel_completed: Vec<_> = audit_events
3049 .iter()
3050 .filter(|e| matches!(e, crate::audit::AuditEvent::WorkflowTaskParallelCompleted { .. }))
3051 .collect();
3052
3053 assert!(!parallel_completed.is_empty());
3054
3055 assert!(audit_events.iter().any(|e| matches!(e, crate::audit::AuditEvent::WorkflowCompleted { .. })));
3057
3058 assert!(audit_events.iter().any(|e| matches!(e, crate::audit::AuditEvent::WorkflowDeadlockCheck { .. })));
3060 }
3061
3062 #[tokio::test]
3063 async fn test_deadlock_check_before_execution() {
3064 let mut workflow = Workflow::new();
3066 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
3067 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
3068 workflow.add_task(Box::new(MockTask::new("c", "Task C")));
3069
3070 workflow.add_dependency("a", "b").unwrap();
3072 workflow.add_dependency("b", "c").unwrap();
3073
3074 let a_idx = workflow.task_map.get(&TaskId::new("a")).copied().unwrap();
3076 let c_idx = workflow.task_map.get(&TaskId::new("c")).copied().unwrap();
3077 workflow.graph.add_edge(c_idx, a_idx, ());
3078
3079 let mut executor = WorkflowExecutor::new(workflow);
3080 let result = executor.execute_parallel().await;
3081
3082 assert!(result.is_err());
3083 match result {
3084 Err(crate::workflow::WorkflowError::CycleDetected(cycle)) => {
3085 assert!(!cycle.is_empty());
3086 }
3087 _ => panic!("Expected CycleDetected error, got: {:?}", result),
3088 }
3089 }
3090
3091 #[tokio::test]
3092 async fn test_parallel_state_updates() {
3093 let mut workflow = Workflow::new();
3095
3096 for i in 0..10 {
3098 workflow.add_task(Box::new(MockTask::new(
3099 format!("task-{}", i),
3100 &format!("Task {}", i),
3101 )));
3102 }
3103
3104 let mut executor = WorkflowExecutor::new(workflow);
3105 let result = executor.execute_parallel().await;
3106
3107 assert!(result.is_ok());
3108 let workflow_result = result.unwrap();
3109 assert!(workflow_result.success);
3110 assert_eq!(workflow_result.completed_tasks.len(), 10);
3111
3112 for i in 0..10 {
3114 assert!(workflow_result.completed_tasks.contains(&TaskId::new(format!("task-{}", i))));
3115 }
3116 }
3117
3118 #[tokio::test]
3119 async fn test_deadlock_timeout_abort() {
3120 let mut workflow = Workflow::new();
3122 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
3123
3124 let mut executor = WorkflowExecutor::new(workflow)
3125 .with_deadlock_timeout(std::time::Duration::from_millis(100)); let result = executor.execute_parallel().await;
3129 assert!(result.is_ok());
3130 }
3131
3132 #[tokio::test]
3133 async fn test_deadlock_timeout_disabled() {
3134 let mut workflow = Workflow::new();
3136 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
3137
3138 let executor = WorkflowExecutor::new(workflow)
3139 .without_deadlock_timeout();
3140
3141 assert!(executor.deadlock_timeout.is_none());
3143 }
3144}