1use crate::workflow::dag::Workflow;
8use crate::workflow::executor::WorkflowExecutor;
9use crate::workflow::task::TaskId;
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use sha2::{Digest, Sha256};
13use std::collections::HashSet;
14use uuid::Uuid;
15
16#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
21pub enum ValidationStatus {
22 Passed,
24 Warning,
26 Failed,
28}
29
30#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
34pub enum RollbackRecommendation {
35 ToPreviousCheckpoint,
37 SpecificTask(TaskId),
39 FullRollback,
41 None,
43}
44
45#[derive(Clone, Debug, Serialize, Deserialize)]
50pub struct ValidationResult {
51 pub confidence: f64,
53 pub status: ValidationStatus,
55 pub message: String,
57 pub rollback_recommendation: Option<RollbackRecommendation>,
59 pub timestamp: DateTime<Utc>,
61}
62
63#[derive(Clone, Debug)]
68pub struct ValidationCheckpoint {
69 pub min_confidence: f64,
71 pub warning_threshold: f64,
73 pub rollback_on_failure: bool,
75}
76
77impl Default for ValidationCheckpoint {
78 fn default() -> Self {
79 Self {
80 min_confidence: 0.7,
81 warning_threshold: 0.85,
82 rollback_on_failure: true,
83 }
84 }
85}
86
87pub fn extract_confidence(result: &crate::workflow::task::TaskResult) -> f64 {
103 match result {
104 crate::workflow::task::TaskResult::Success => 1.0,
105 crate::workflow::task::TaskResult::Skipped => 0.5,
106 crate::workflow::task::TaskResult::Failed(_) => 0.0,
107 crate::workflow::task::TaskResult::WithCompensation { result, .. } => {
108 extract_confidence(result)
109 }
110 }
111}
112
113pub fn validate_checkpoint(
128 task_result: &crate::workflow::task::TaskResult,
129 config: &ValidationCheckpoint,
130) -> ValidationResult {
131 let confidence = extract_confidence(task_result);
132
133 let status = if confidence >= config.warning_threshold {
135 ValidationStatus::Passed
136 } else if confidence >= config.min_confidence {
137 ValidationStatus::Warning
138 } else {
139 ValidationStatus::Failed
140 };
141
142 let percentage = (confidence * 100.0) as u32;
144 let message = format!(
145 "Confidence: {}% (status: {:?})",
146 percentage, status
147 );
148
149 let rollback_recommendation = if matches!(status, ValidationStatus::Failed)
151 && config.rollback_on_failure
152 {
153 Some(RollbackRecommendation::FullRollback)
154 } else {
155 None
156 };
157
158 ValidationResult {
159 confidence,
160 status,
161 message,
162 rollback_recommendation,
163 timestamp: Utc::now(),
164 }
165}
166
167pub fn can_proceed(validation: &ValidationResult) -> bool {
180 !matches!(validation.status, ValidationStatus::Failed)
181}
182
183pub fn requires_rollback(validation: &ValidationResult) -> bool {
196 matches!(
197 validation.status,
198 ValidationStatus::Failed
199 ) && validation.rollback_recommendation.is_some()
200}
201
202#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
207pub struct CheckpointId(pub Uuid);
208
209impl CheckpointId {
210 pub fn new() -> Self {
212 Self(Uuid::new_v4())
213 }
214}
215
216impl Default for CheckpointId {
217 fn default() -> Self {
218 Self::new()
219 }
220}
221
222impl std::fmt::Display for CheckpointId {
223 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224 write!(f, "{}", self.0)
225 }
226}
227
228#[derive(Clone, Debug, Serialize, Deserialize)]
233pub struct WorkflowCheckpoint {
234 pub id: CheckpointId,
236 pub workflow_id: String,
238 pub sequence: u64,
240 pub timestamp: DateTime<Utc>,
242 pub completed_tasks: Vec<TaskId>,
244 pub failed_tasks: Vec<TaskId>,
246 pub current_position: usize,
248 pub total_tasks: usize,
250 pub checksum: String,
252 pub task_ids_checksum: String,
254}
255
256impl WorkflowCheckpoint {
257 pub fn from_executor(
270 workflow_id: impl Into<String>,
271 sequence: u64,
272 executor: &WorkflowExecutor,
273 position: usize,
274 ) -> Self {
275 let completed = executor.completed_task_ids();
276 let failed = executor.failed_task_ids();
277
278 let task_ids = executor.task_ids();
280 let task_ids_checksum = compute_task_ids_checksum(&task_ids);
281
282 let mut checkpoint = Self {
283 id: CheckpointId::new(),
284 workflow_id: workflow_id.into(),
285 sequence,
286 timestamp: Utc::now(),
287 completed_tasks: completed.clone(),
288 failed_tasks: failed.clone(),
289 current_position: position,
290 total_tasks: executor.task_count(),
291 checksum: String::new(),
292 task_ids_checksum,
293 };
294
295 checkpoint.checksum = checkpoint.compute_checksum();
297 checkpoint
298 }
299
300 fn compute_checksum(&self) -> String {
305 let data_for_hash = CheckpointDataForHash {
307 id: self.id,
308 workflow_id: &self.workflow_id,
309 sequence: self.sequence,
310 timestamp: self.timestamp,
311 completed_tasks: &self.completed_tasks,
312 failed_tasks: &self.failed_tasks,
313 current_position: self.current_position,
314 total_tasks: self.total_tasks,
315 task_ids_checksum: &self.task_ids_checksum,
316 };
317
318 let json = serde_json::to_vec(&data_for_hash).unwrap_or_default();
319 let mut hasher = Sha256::new();
320 hasher.update(&json);
321 format!("{:x}", hasher.finalize())
322 }
323
324 pub fn validate(&self) -> Result<(), crate::workflow::WorkflowError> {
334 let expected = self.compute_checksum();
335 if self.checksum != expected {
336 return Err(crate::workflow::WorkflowError::CheckpointCorrupted(
337 format!("Checksum mismatch: expected {}, got {}", expected, self.checksum),
338 ));
339 }
340 Ok(())
341 }
342}
343
344#[derive(Serialize)]
346struct CheckpointDataForHash<'a> {
347 id: CheckpointId,
348 workflow_id: &'a str,
349 sequence: u64,
350 timestamp: DateTime<Utc>,
351 completed_tasks: &'a [TaskId],
352 failed_tasks: &'a [TaskId],
353 current_position: usize,
354 total_tasks: usize,
355 task_ids_checksum: &'a str,
356}
357
358fn compute_task_ids_checksum(task_ids: &[TaskId]) -> String {
363 let mut sorted_ids: Vec<&TaskId> = task_ids.iter().collect();
364 sorted_ids.sort_by_key(|id| id.as_str());
365
366 let json = serde_json::to_vec(&sorted_ids).unwrap_or_default();
367 let mut hasher = Sha256::new();
368 hasher.update(&json);
369 format!("{:x}", hasher.finalize())
370}
371
372pub fn validate_workflow_consistency(
395 workflow: &Workflow,
396 checkpoint: &WorkflowCheckpoint,
397) -> Result<(), crate::workflow::WorkflowError> {
398 if workflow.task_count() != checkpoint.total_tasks {
400 return Err(crate::workflow::WorkflowError::WorkflowChanged(
401 format!(
402 "Task count mismatch: checkpoint has {} tasks, current workflow has {} tasks",
403 checkpoint.total_tasks,
404 workflow.task_count()
405 ),
406 ));
407 }
408
409 let workflow_task_ids: HashSet<_> = workflow.task_ids().into_iter().collect();
411
412 for task_id in &checkpoint.completed_tasks {
413 if !workflow_task_ids.contains(task_id) {
414 return Err(crate::workflow::WorkflowError::WorkflowChanged(
415 format!(
416 "Completed task from checkpoint not found in workflow: {}",
417 task_id
418 ),
419 ));
420 }
421 }
422
423 for task_id in &checkpoint.failed_tasks {
424 if !workflow_task_ids.contains(task_id) {
425 return Err(crate::workflow::WorkflowError::WorkflowChanged(
426 format!(
427 "Failed task from checkpoint not found in workflow: {}",
428 task_id
429 ),
430 ));
431 }
432 }
433
434 if checkpoint.current_position >= checkpoint.total_tasks {
436 return Err(crate::workflow::WorkflowError::CheckpointCorrupted(
437 format!(
438 "Invalid checkpoint position: {} exceeds total tasks {}",
439 checkpoint.current_position, checkpoint.total_tasks
440 ),
441 ));
442 }
443
444 let current_task_ids = workflow.task_ids();
446 let current_checksum = compute_task_ids_checksum(¤t_task_ids);
447 if current_checksum != checkpoint.task_ids_checksum {
448 return Err(crate::workflow::WorkflowError::WorkflowChanged(
449 format!(
450 "Workflow structure changed: task IDs checksum mismatch. Expected: {}, Got: {}",
451 checkpoint.task_ids_checksum, current_checksum
452 ),
453 ));
454 }
455
456 Ok(())
457}
458
459#[derive(Clone, Debug, Serialize, Deserialize)]
461pub struct CheckpointSummary {
462 pub id: CheckpointId,
464 pub sequence: u64,
466 pub timestamp: DateTime<Utc>,
468 pub completed_count: usize,
470 pub current_position: usize,
472 pub total_tasks: usize,
474}
475
476impl CheckpointSummary {
477 pub fn from_checkpoint(checkpoint: &WorkflowCheckpoint) -> Self {
479 Self {
480 id: checkpoint.id,
481 sequence: checkpoint.sequence,
482 timestamp: checkpoint.timestamp,
483 completed_count: checkpoint.completed_tasks.len(),
484 current_position: checkpoint.current_position,
485 total_tasks: checkpoint.total_tasks,
486 }
487 }
488}
489
490#[derive(Clone)]
502pub struct WorkflowCheckpointService {
503 namespace: String,
505 #[allow(dead_code)]
507 storage: std::sync::Arc<
508 std::sync::RwLock<
509 std::collections::HashMap<
510 String,
511 (Vec<u8>, CheckpointSummary),
512 >,
513 >,
514 >,
515 latest_by_workflow: std::sync::Arc<
517 std::sync::RwLock<
518 std::collections::HashMap<String, CheckpointSummary>
519 >,
520 >,
521}
522
523impl WorkflowCheckpointService {
524 pub fn new(namespace: impl Into<String>) -> Self {
530 Self {
531 namespace: namespace.into(),
532 storage: std::sync::Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
533 latest_by_workflow: std::sync::Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
534 }
535 }
536
537 pub fn default() -> Self {
539 Self::new("workflow")
540 }
541
542 pub fn save(&self, checkpoint: &WorkflowCheckpoint) -> Result<(), crate::workflow::WorkflowError> {
556 checkpoint.validate()?;
558
559 let data = serde_json::to_vec(checkpoint)
562 .map_err(|e| crate::workflow::WorkflowError::CheckpointCorrupted(
563 format!("Serialization failed: {}", e)
564 ))?;
565
566 let summary = CheckpointSummary::from_checkpoint(checkpoint);
568
569 let key = format!("{}:{}", self.namespace, checkpoint.id);
571 {
572 let mut storage = self.storage.write()
573 .map_err(|e| crate::workflow::WorkflowError::CheckpointCorrupted(
574 format!("Storage lock failed: {}", e)
575 ))?;
576 storage.insert(key, (data, summary.clone()));
577 }
578
579 {
581 let mut latest = self.latest_by_workflow.write()
582 .map_err(|e| crate::workflow::WorkflowError::CheckpointCorrupted(
583 format!("Latest lock failed: {}", e)
584 ))?;
585 latest.insert(checkpoint.workflow_id.clone(), summary);
586 }
587
588 Ok(())
589 }
590
591 pub fn load(&self, id: &CheckpointId) -> Result<Option<WorkflowCheckpoint>, crate::workflow::WorkflowError> {
603 let key = format!("{}:{}", self.namespace, id);
604
605 let storage = self.storage.read()
606 .map_err(|e| crate::workflow::WorkflowError::CheckpointCorrupted(
607 format!("Storage lock failed: {}", e)
608 ))?;
609
610 if let Some((data, _)) = storage.get(&key) {
611 let checkpoint: WorkflowCheckpoint = serde_json::from_slice(data)
612 .map_err(|e| crate::workflow::WorkflowError::CheckpointCorrupted(
613 format!("Deserialization failed: {}", e)
614 ))?;
615
616 checkpoint.validate()?;
618
619 Ok(Some(checkpoint))
620 } else {
621 Ok(None)
622 }
623 }
624
625 pub fn get_latest(&self, workflow_id: &str) -> Result<Option<WorkflowCheckpoint>, crate::workflow::WorkflowError> {
637 let latest = self.latest_by_workflow.read()
638 .map_err(|e| crate::workflow::WorkflowError::CheckpointCorrupted(
639 format!("Latest lock failed: {}", e)
640 ))?;
641
642 if let Some(summary) = latest.get(workflow_id) {
643 self.load(&summary.id)
644 } else {
645 Ok(None)
646 }
647 }
648
649 pub fn list_by_workflow(&self, workflow_id: &str) -> Result<Vec<CheckpointSummary>, crate::workflow::WorkflowError> {
660 let storage = self.storage.read()
661 .map_err(|e| crate::workflow::WorkflowError::CheckpointCorrupted(
662 format!("Storage lock failed: {}", e)
663 ))?;
664
665 let mut summaries: Vec<CheckpointSummary> = storage
666 .values()
667 .filter_map(|(_, summary)| {
668 Some(summary.clone())
672 })
673 .collect();
674
675 summaries.sort_by_key(|s| s.sequence);
677
678 Ok(summaries)
679 }
680
681 pub fn delete(&self, id: &CheckpointId) -> Result<(), crate::workflow::WorkflowError> {
692 let key = format!("{}:{}", self.namespace, id);
693
694 let mut storage = self.storage.write()
695 .map_err(|e| crate::workflow::WorkflowError::CheckpointCorrupted(
696 format!("Storage lock failed: {}", e)
697 ))?;
698
699 storage.remove(&key);
700
701 Ok(())
705 }
706}
707
708#[cfg(test)]
709mod tests {
710 use super::*;
711 use crate::workflow::dag::Workflow;
712 use crate::workflow::task::{TaskContext, TaskError, TaskResult, WorkflowTask};
713 use async_trait::async_trait;
714
715 struct MockTask {
717 id: TaskId,
718 name: String,
719 }
720
721 impl MockTask {
722 fn new(id: impl Into<TaskId>, name: &str) -> Self {
723 Self {
724 id: id.into(),
725 name: name.to_string(),
726 }
727 }
728 }
729
730 #[async_trait]
731 impl WorkflowTask for MockTask {
732 async fn execute(&self, _context: &TaskContext) -> Result<TaskResult, TaskError> {
733 Ok(TaskResult::Success)
734 }
735
736 fn id(&self) -> TaskId {
737 self.id.clone()
738 }
739
740 fn name(&self) -> &str {
741 &self.name
742 }
743
744 fn dependencies(&self) -> Vec<TaskId> {
745 Vec::new()
746 }
747 }
748
749 #[test]
750 fn test_checkpoint_id_generation() {
751 let id1 = CheckpointId::new();
752 let id2 = CheckpointId::new();
753
754 assert_ne!(id1, id2);
756 }
757
758 #[test]
759 fn test_checkpoint_id_display() {
760 let id = CheckpointId::new();
761 let display = format!("{}", id);
762
763 assert!(display.len() > 0);
765 }
766
767 #[test]
768 fn test_checkpoint_from_executor() {
769 let mut workflow = Workflow::new();
770 workflow.add_task(Box::new(MockTask::new("task-1", "Task 1")));
771 workflow.add_task(Box::new(MockTask::new("task-2", "Task 2")));
772 workflow.add_task(Box::new(MockTask::new("task-3", "Task 3")));
773
774 let executor = WorkflowExecutor::new(workflow);
775
776 let checkpoint = WorkflowCheckpoint::from_executor("workflow-1", 0, &executor, 0);
777
778 assert_eq!(checkpoint.workflow_id, "workflow-1");
779 assert_eq!(checkpoint.sequence, 0);
780 assert_eq!(checkpoint.current_position, 0);
781 assert_eq!(checkpoint.total_tasks, 3);
782 assert_eq!(checkpoint.completed_tasks.len(), 0);
783 assert_eq!(checkpoint.failed_tasks.len(), 0);
784 assert!(!checkpoint.checksum.is_empty());
785 assert!(!checkpoint.task_ids_checksum.is_empty());
786 }
787
788 #[test]
789 fn test_checkpoint_checksum_computation() {
790 let mut workflow = Workflow::new();
791 workflow.add_task(Box::new(MockTask::new("task-1", "Task 1")));
792
793 let executor = WorkflowExecutor::new(workflow);
794
795 let checkpoint = WorkflowCheckpoint::from_executor("workflow-1", 0, &executor, 0);
796
797 assert!(!checkpoint.checksum.is_empty());
799 assert!(checkpoint.checksum.len() == 64); assert!(checkpoint.checksum.chars().all(|c| c.is_ascii_hexdigit()));
801 }
802
803 #[test]
804 fn test_checkpoint_validation() {
805 let mut workflow = Workflow::new();
806 workflow.add_task(Box::new(MockTask::new("task-1", "Task 1")));
807
808 let executor = WorkflowExecutor::new(workflow);
809
810 let checkpoint = WorkflowCheckpoint::from_executor("workflow-1", 0, &executor, 0);
811
812 assert!(checkpoint.validate().is_ok());
814 }
815
816 #[test]
817 fn test_checkpoint_validation_fails_on_corruption() {
818 let mut workflow = Workflow::new();
819 workflow.add_task(Box::new(MockTask::new("task-1", "Task 1")));
820
821 let executor = WorkflowExecutor::new(workflow);
822
823 let mut checkpoint = WorkflowCheckpoint::from_executor("workflow-1", 0, &executor, 0);
824
825 checkpoint.checksum = "corrupted".to_string();
827
828 assert!(checkpoint.validate().is_err());
830 }
831
832 #[test]
833 fn test_checkpoint_serialization() {
834 let mut workflow = Workflow::new();
835 workflow.add_task(Box::new(MockTask::new("task-1", "Task 1")));
836
837 let executor = WorkflowExecutor::new(workflow);
838
839 let checkpoint = WorkflowCheckpoint::from_executor("workflow-1", 0, &executor, 0);
840
841 let serialized = serde_json::to_string(&checkpoint);
843 assert!(serialized.is_ok());
844
845 let deserialized: Result<WorkflowCheckpoint, _> = serde_json::from_str(&serialized.unwrap());
847 assert!(deserialized.is_ok());
848
849 let restored = deserialized.unwrap();
850 assert_eq!(restored.id, checkpoint.id);
851 assert_eq!(restored.workflow_id, checkpoint.workflow_id);
852 assert_eq!(restored.sequence, checkpoint.sequence);
853 assert_eq!(restored.checksum, checkpoint.checksum);
854 }
855
856 #[test]
857 fn test_checkpoint_summary_from_checkpoint() {
858 let mut workflow = Workflow::new();
859 workflow.add_task(Box::new(MockTask::new("task-1", "Task 1")));
860 workflow.add_task(Box::new(MockTask::new("task-2", "Task 2")));
861
862 let executor = WorkflowExecutor::new(workflow);
863
864 let checkpoint = WorkflowCheckpoint::from_executor("workflow-1", 0, &executor, 0);
865
866 let summary = CheckpointSummary::from_checkpoint(&checkpoint);
867
868 assert_eq!(summary.id, checkpoint.id);
869 assert_eq!(summary.sequence, checkpoint.sequence);
870 assert_eq!(summary.completed_count, 0);
871 assert_eq!(summary.current_position, 0);
872 assert_eq!(summary.total_tasks, 2);
873 }
874
875 #[test]
878 fn test_checkpoint_service_creation() {
879 let service = WorkflowCheckpointService::new("test-namespace");
880 assert_eq!(service.namespace, "test-namespace");
881 }
882
883 #[test]
884 fn test_checkpoint_service_default() {
885 let service = WorkflowCheckpointService::default();
886 assert_eq!(service.namespace, "workflow");
887 }
888
889 #[test]
890 fn test_checkpoint_service_save_and_load() {
891 let service = WorkflowCheckpointService::default();
892 let mut workflow = Workflow::new();
893 workflow.add_task(Box::new(MockTask::new("task-1", "Task 1")));
894
895 let executor = WorkflowExecutor::new(workflow);
896 let checkpoint = WorkflowCheckpoint::from_executor("workflow-1", 0, &executor, 0);
897
898 let save_result = service.save(&checkpoint);
900 assert!(save_result.is_ok());
901
902 let load_result = service.load(&checkpoint.id);
904 assert!(load_result.is_ok());
905 let loaded = load_result.unwrap();
906 assert!(loaded.is_some());
907
908 let loaded_checkpoint = loaded.unwrap();
909 assert_eq!(loaded_checkpoint.id, checkpoint.id);
910 assert_eq!(loaded_checkpoint.workflow_id, checkpoint.workflow_id);
911 assert_eq!(loaded_checkpoint.sequence, checkpoint.sequence);
912 assert_eq!(loaded_checkpoint.checksum, checkpoint.checksum);
913 }
914
915 #[test]
916 fn test_checkpoint_service_load_nonexistent() {
917 let service = WorkflowCheckpointService::default();
918 let fake_id = CheckpointId::new();
919
920 let load_result = service.load(&fake_id);
921 assert!(load_result.is_ok());
922 assert!(load_result.unwrap().is_none());
923 }
924
925 #[test]
926 fn test_checkpoint_service_get_latest() {
927 let service = WorkflowCheckpointService::default();
928 let mut workflow = Workflow::new();
929 workflow.add_task(Box::new(MockTask::new("task-1", "Task 1")));
930
931 let executor = WorkflowExecutor::new(workflow);
932
933 let checkpoint1 = WorkflowCheckpoint::from_executor("workflow-1", 0, &executor, 0);
935 service.save(&checkpoint1).unwrap();
936
937 let checkpoint2 = WorkflowCheckpoint::from_executor("workflow-1", 1, &executor, 1);
939 service.save(&checkpoint2).unwrap();
940
941 let latest_result = service.get_latest("workflow-1");
943 assert!(latest_result.is_ok());
944 let latest = latest_result.unwrap();
945 assert!(latest.is_some());
946
947 let latest_checkpoint = latest.unwrap();
948 assert_eq!(latest_checkpoint.sequence, 1);
949 assert_eq!(latest_checkpoint.id, checkpoint2.id);
950 }
951
952 #[test]
953 fn test_checkpoint_service_get_latest_empty() {
954 let service = WorkflowCheckpointService::default();
955
956 let latest_result = service.get_latest("nonexistent-workflow");
957 assert!(latest_result.is_ok());
958 assert!(latest_result.unwrap().is_none());
959 }
960
961 #[test]
962 fn test_checkpoint_service_list_by_workflow() {
963 let service = WorkflowCheckpointService::default();
964 let mut workflow = Workflow::new();
965 workflow.add_task(Box::new(MockTask::new("task-1", "Task 1")));
966
967 let executor = WorkflowExecutor::new(workflow);
968
969 let checkpoint1 = WorkflowCheckpoint::from_executor("workflow-1", 0, &executor, 0);
971 service.save(&checkpoint1).unwrap();
972
973 let checkpoint2 = WorkflowCheckpoint::from_executor("workflow-1", 1, &executor, 1);
974 service.save(&checkpoint2).unwrap();
975
976 let list_result = service.list_by_workflow("workflow-1");
978 assert!(list_result.is_ok());
979
980 let summaries = list_result.unwrap();
981 assert!(summaries.len() >= 2);
982 }
983
984 #[test]
985 fn test_checkpoint_service_delete() {
986 let service = WorkflowCheckpointService::default();
987 let mut workflow = Workflow::new();
988 workflow.add_task(Box::new(MockTask::new("task-1", "Task 1")));
989
990 let executor = WorkflowExecutor::new(workflow);
991 let checkpoint = WorkflowCheckpoint::from_executor("workflow-1", 0, &executor, 0);
992
993 service.save(&checkpoint).unwrap();
995
996 let load_result = service.load(&checkpoint.id);
998 assert!(load_result.unwrap().is_some());
999
1000 let delete_result = service.delete(&checkpoint.id);
1002 assert!(delete_result.is_ok());
1003
1004 let load_result = service.load(&checkpoint.id);
1006 assert!(load_result.unwrap().is_none());
1007 }
1008
1009 #[test]
1010 fn test_checkpoint_service_save_rejects_corrupted() {
1011 let service = WorkflowCheckpointService::default();
1012 let mut workflow = Workflow::new();
1013 workflow.add_task(Box::new(MockTask::new("task-1", "Task 1")));
1014
1015 let executor = WorkflowExecutor::new(workflow);
1016 let mut checkpoint = WorkflowCheckpoint::from_executor("workflow-1", 0, &executor, 0);
1017
1018 checkpoint.checksum = "corrupted".to_string();
1020
1021 let save_result = service.save(&checkpoint);
1023 assert!(save_result.is_err());
1024 }
1025
1026 #[test]
1029 fn test_validate_workflow_consistency_success() {
1030 let mut workflow = Workflow::new();
1031 workflow.add_task(Box::new(MockTask::new("task-1", "Task 1")));
1032 workflow.add_task(Box::new(MockTask::new("task-2", "Task 2")));
1033 workflow.add_task(Box::new(MockTask::new("task-3", "Task 3")));
1034
1035 let executor = WorkflowExecutor::new(workflow);
1036 let checkpoint = WorkflowCheckpoint::from_executor("workflow-1", 0, &executor, 0);
1037
1038 let mut validation_workflow = Workflow::new();
1040 validation_workflow.add_task(Box::new(MockTask::new("task-1", "Task 1")));
1041 validation_workflow.add_task(Box::new(MockTask::new("task-2", "Task 2")));
1042 validation_workflow.add_task(Box::new(MockTask::new("task-3", "Task 3")));
1043
1044 let result = validate_workflow_consistency(&validation_workflow, &checkpoint);
1046 assert!(result.is_ok());
1047 }
1048
1049 #[test]
1050 fn test_validate_workflow_consistency_task_count_mismatch() {
1051 let mut workflow = Workflow::new();
1052 workflow.add_task(Box::new(MockTask::new("task-1", "Task 1")));
1053 workflow.add_task(Box::new(MockTask::new("task-2", "Task 2")));
1054
1055 let executor = WorkflowExecutor::new(workflow);
1056 let checkpoint = WorkflowCheckpoint::from_executor("workflow-1", 0, &executor, 0);
1057
1058 let mut validation_workflow = Workflow::new();
1060 validation_workflow.add_task(Box::new(MockTask::new("task-1", "Task 1")));
1061 validation_workflow.add_task(Box::new(MockTask::new("task-2", "Task 2")));
1062 validation_workflow.add_task(Box::new(MockTask::new("task-3", "Task 3"))); let result = validate_workflow_consistency(&validation_workflow, &checkpoint);
1065 assert!(result.is_err());
1066
1067 match result {
1068 Err(crate::workflow::WorkflowError::WorkflowChanged(msg)) => {
1069 assert!(msg.contains("Task count mismatch"));
1070 }
1071 _ => panic!("Expected WorkflowChanged error"),
1072 }
1073 }
1074
1075 #[test]
1076 fn test_validate_workflow_consistency_missing_completed_task() {
1077 let mut workflow = Workflow::new();
1078 workflow.add_task(Box::new(MockTask::new("task-1", "Task 1")));
1079 workflow.add_task(Box::new(MockTask::new("task-2", "Task 2")));
1080 workflow.add_task(Box::new(MockTask::new("task-3", "Task 3")));
1081
1082 let mut executor = WorkflowExecutor::new(workflow);
1083
1084 executor.completed_tasks.insert(TaskId::new("task-1"));
1086
1087 let checkpoint = WorkflowCheckpoint::from_executor("workflow-1", 0, &executor, 1);
1089
1090 let mut validation_workflow = Workflow::new();
1092 validation_workflow.add_task(Box::new(MockTask::new("task-2", "Task 2")));
1093 validation_workflow.add_task(Box::new(MockTask::new("task-3", "Task 3")));
1094 validation_workflow.add_task(Box::new(MockTask::new("task-4", "Task 4")));
1095
1096 let result = validate_workflow_consistency(&validation_workflow, &checkpoint);
1097 assert!(result.is_err());
1098
1099 match result {
1100 Err(crate::workflow::WorkflowError::WorkflowChanged(msg)) => {
1101 assert!(msg.contains("not found in workflow"));
1102 }
1103 _ => panic!("Expected WorkflowChanged error, got: {:?}", result),
1104 }
1105 }
1106
1107 #[test]
1108 fn test_validate_workflow_consistency_invalid_position() {
1109 let mut workflow = Workflow::new();
1110 workflow.add_task(Box::new(MockTask::new("task-1", "Task 1")));
1111 workflow.add_task(Box::new(MockTask::new("task-2", "Task 2")));
1112
1113 let executor = WorkflowExecutor::new(workflow);
1114
1115 let mut checkpoint = WorkflowCheckpoint::from_executor("workflow-1", 0, &executor, 0);
1117 checkpoint.current_position = 5; let mut validation_workflow = Workflow::new();
1120 validation_workflow.add_task(Box::new(MockTask::new("task-1", "Task 1")));
1121 validation_workflow.add_task(Box::new(MockTask::new("task-2", "Task 2")));
1122
1123 let result = validate_workflow_consistency(&validation_workflow, &checkpoint);
1124 assert!(result.is_err());
1125
1126 match result {
1127 Err(crate::workflow::WorkflowError::CheckpointCorrupted(msg)) => {
1128 assert!(msg.contains("Invalid checkpoint position"));
1129 }
1130 _ => panic!("Expected CheckpointCorrupted error"),
1131 }
1132 }
1133
1134 #[test]
1135 fn test_graph_drift_detection() {
1136 let mut workflow = Workflow::new();
1137 workflow.add_task(Box::new(MockTask::new("task-1", "Task 1")));
1138 workflow.add_task(Box::new(MockTask::new("task-2", "Task 2")));
1139 workflow.add_task(Box::new(MockTask::new("task-3", "Task 3")));
1140
1141 let executor = WorkflowExecutor::new(workflow);
1142 let checkpoint = WorkflowCheckpoint::from_executor("workflow-1", 0, &executor, 0);
1143
1144 let mut validation_workflow = Workflow::new();
1146 validation_workflow.add_task(Box::new(MockTask::new("task-1", "Task 1")));
1147 validation_workflow.add_task(Box::new(MockTask::new("task-2", "Task 2")));
1148 validation_workflow.add_task(Box::new(MockTask::new("task-4", "Task 4"))); let result = validate_workflow_consistency(&validation_workflow, &checkpoint);
1151 assert!(result.is_err());
1152
1153 match result {
1154 Err(crate::workflow::WorkflowError::WorkflowChanged(msg)) => {
1155 assert!(msg.contains("task IDs checksum mismatch"));
1156 }
1157 _ => panic!("Expected WorkflowChanged error"),
1158 }
1159 }
1160
1161 #[test]
1162 fn test_task_ids_checksum_deterministic() {
1163 let workflow1 = Workflow::new();
1164 let workflow2 = Workflow::new();
1165
1166 let ids1 = vec![TaskId::new("task-3"), TaskId::new("task-1"), TaskId::new("task-2")];
1167 let ids2 = vec![TaskId::new("task-1"), TaskId::new("task-2"), TaskId::new("task-3")];
1168
1169 let checksum1 = compute_task_ids_checksum(&ids1);
1170 let checksum2 = compute_task_ids_checksum(&ids2);
1171
1172 assert_eq!(checksum1, checksum2);
1174 }
1175
1176 #[test]
1179 fn test_validation_status_variants() {
1180 let passed = ValidationStatus::Passed;
1181 let warning = ValidationStatus::Warning;
1182 let failed = ValidationStatus::Failed;
1183
1184 assert_ne!(passed, warning);
1185 assert_ne!(warning, failed);
1186 assert_ne!(passed, failed);
1187 }
1188
1189 #[test]
1190 fn test_rollback_recommendation_variants() {
1191 let prev = RollbackRecommendation::ToPreviousCheckpoint;
1192 let specific = RollbackRecommendation::SpecificTask(TaskId::new("task-1"));
1193 let full = RollbackRecommendation::FullRollback;
1194 let none = RollbackRecommendation::None;
1195
1196 assert_ne!(prev, full);
1197 assert_ne!(full, none);
1198 assert_eq!(none, RollbackRecommendation::None);
1199 }
1200
1201 #[test]
1202 fn test_validation_result_creation() {
1203 let result = ValidationResult {
1204 confidence: 0.9,
1205 status: ValidationStatus::Passed,
1206 message: "All good".to_string(),
1207 rollback_recommendation: None,
1208 timestamp: Utc::now(),
1209 };
1210
1211 assert_eq!(result.confidence, 0.9);
1212 assert_eq!(result.status, ValidationStatus::Passed);
1213 assert_eq!(result.message, "All good");
1214 assert!(result.rollback_recommendation.is_none());
1215 }
1216
1217 #[test]
1218 fn test_validation_checkpoint_default() {
1219 let config = ValidationCheckpoint::default();
1220
1221 assert_eq!(config.min_confidence, 0.7);
1222 assert_eq!(config.warning_threshold, 0.85);
1223 assert_eq!(config.rollback_on_failure, true);
1224 }
1225
1226 #[test]
1227 fn test_validation_checkpoint_custom() {
1228 let config = ValidationCheckpoint {
1229 min_confidence: 0.5,
1230 warning_threshold: 0.8,
1231 rollback_on_failure: false,
1232 };
1233
1234 assert_eq!(config.min_confidence, 0.5);
1235 assert_eq!(config.warning_threshold, 0.8);
1236 assert_eq!(config.rollback_on_failure, false);
1237 }
1238
1239 #[test]
1240 fn test_validation_result_serialization() {
1241 let result = ValidationResult {
1242 confidence: 0.75,
1243 status: ValidationStatus::Warning,
1244 message: "Low confidence".to_string(),
1245 rollback_recommendation: Some(RollbackRecommendation::None),
1246 timestamp: Utc::now(),
1247 };
1248
1249 let serialized = serde_json::to_string(&result);
1251 assert!(serialized.is_ok());
1252
1253 let deserialized: Result<ValidationResult, _> = serde_json::from_str(&serialized.unwrap());
1255 assert!(deserialized.is_ok());
1256
1257 let restored = deserialized.unwrap();
1258 assert_eq!(restored.confidence, result.confidence);
1259 assert_eq!(restored.status, result.status);
1260 assert_eq!(restored.message, result.message);
1261 }
1262
1263 #[test]
1266 fn test_extract_confidence_success() {
1267 use crate::workflow::task::TaskResult;
1268
1269 let result = TaskResult::Success;
1270 let confidence = extract_confidence(&result);
1271
1272 assert_eq!(confidence, 1.0);
1273 }
1274
1275 #[test]
1276 fn test_extract_confidence_skipped() {
1277 use crate::workflow::task::TaskResult;
1278
1279 let result = TaskResult::Skipped;
1280 let confidence = extract_confidence(&result);
1281
1282 assert_eq!(confidence, 0.5);
1283 }
1284
1285 #[test]
1286 fn test_extract_confidence_failed() {
1287 use crate::workflow::task::TaskResult;
1288
1289 let result = TaskResult::Failed("error".to_string());
1290 let confidence = extract_confidence(&result);
1291
1292 assert_eq!(confidence, 0.0);
1293 }
1294
1295 #[test]
1296 fn test_extract_confidence_with_compensation() {
1297 use crate::workflow::task::{CompensationAction, TaskResult};
1298
1299 let inner = Box::new(TaskResult::Success);
1300 let compensation = CompensationAction::skip("test");
1301 let result = TaskResult::WithCompensation {
1302 result: inner,
1303 compensation,
1304 };
1305
1306 let confidence = extract_confidence(&result);
1307
1308 assert_eq!(confidence, 1.0);
1310 }
1311
1312 #[test]
1313 fn test_extract_confidence_with_compensation_failed() {
1314 use crate::workflow::task::{CompensationAction, TaskResult};
1315
1316 let inner = Box::new(TaskResult::Failed("error".to_string()));
1317 let compensation = CompensationAction::skip("test");
1318 let result = TaskResult::WithCompensation {
1319 result: inner,
1320 compensation,
1321 };
1322
1323 let confidence = extract_confidence(&result);
1324
1325 assert_eq!(confidence, 0.0);
1327 }
1328
1329 #[test]
1332 fn test_validate_checkpoint_passed() {
1333 use crate::workflow::task::TaskResult;
1334
1335 let result = TaskResult::Success;
1336 let config = ValidationCheckpoint::default();
1337
1338 let validation = validate_checkpoint(&result, &config);
1339
1340 assert_eq!(validation.confidence, 1.0);
1341 assert_eq!(validation.status, ValidationStatus::Passed);
1342 assert!(validation.message.contains("100%"));
1343 assert!(validation.rollback_recommendation.is_none());
1344 }
1345
1346 #[test]
1347 fn test_validate_checkpoint_warning() {
1348 use crate::workflow::task::TaskResult;
1349
1350 let result = TaskResult::Skipped;
1354 let config = ValidationCheckpoint {
1355 min_confidence: 0.4,
1356 warning_threshold: 0.6,
1357 rollback_on_failure: true,
1358 };
1359
1360 let validation = validate_checkpoint(&result, &config);
1361
1362 assert_eq!(validation.confidence, 0.5);
1363 assert_eq!(validation.status, ValidationStatus::Warning);
1364 assert!(validation.message.contains("50%"));
1365 assert!(validation.rollback_recommendation.is_none());
1366 }
1367
1368 #[test]
1369 fn test_validate_checkpoint_failed() {
1370 use crate::workflow::task::TaskResult;
1371
1372 let result = TaskResult::Failed("error".to_string());
1373 let config = ValidationCheckpoint::default();
1374
1375 let validation = validate_checkpoint(&result, &config);
1376
1377 assert_eq!(validation.confidence, 0.0);
1378 assert_eq!(validation.status, ValidationStatus::Failed);
1379 assert!(validation.message.contains("0%"));
1380 assert!(validation.rollback_recommendation.is_some());
1381 }
1382
1383 #[test]
1384 fn test_validate_thresholds_custom() {
1385 use crate::workflow::task::TaskResult;
1386
1387 let result = TaskResult::Skipped; let config = ValidationCheckpoint {
1391 min_confidence: 0.4,
1392 warning_threshold: 0.6,
1393 rollback_on_failure: false,
1394 };
1395
1396 let validation = validate_checkpoint(&result, &config);
1397
1398 assert_eq!(validation.status, ValidationStatus::Warning);
1399 assert!(validation.rollback_recommendation.is_none());
1400 }
1401
1402 #[test]
1403 fn test_can_proceed_passed() {
1404 let validation = ValidationResult {
1405 confidence: 0.9,
1406 status: ValidationStatus::Passed,
1407 message: "Good".to_string(),
1408 rollback_recommendation: None,
1409 timestamp: Utc::now(),
1410 };
1411
1412 assert!(can_proceed(&validation));
1413 }
1414
1415 #[test]
1416 fn test_can_proceed_warning() {
1417 let validation = ValidationResult {
1418 confidence: 0.7,
1419 status: ValidationStatus::Warning,
1420 message: "Warning".to_string(),
1421 rollback_recommendation: None,
1422 timestamp: Utc::now(),
1423 };
1424
1425 assert!(can_proceed(&validation));
1426 }
1427
1428 #[test]
1429 fn test_can_proceed_failed() {
1430 let validation = ValidationResult {
1431 confidence: 0.3,
1432 status: ValidationStatus::Failed,
1433 message: "Failed".to_string(),
1434 rollback_recommendation: None,
1435 timestamp: Utc::now(),
1436 };
1437
1438 assert!(!can_proceed(&validation));
1439 }
1440
1441 #[test]
1442 fn test_requires_rollback_true() {
1443 let validation = ValidationResult {
1444 confidence: 0.0,
1445 status: ValidationStatus::Failed,
1446 message: "Failed".to_string(),
1447 rollback_recommendation: Some(RollbackRecommendation::FullRollback),
1448 timestamp: Utc::now(),
1449 };
1450
1451 assert!(requires_rollback(&validation));
1452 }
1453
1454 #[test]
1455 fn test_requires_rollback_false_no_rollback() {
1456 let validation = ValidationResult {
1457 confidence: 0.0,
1458 status: ValidationStatus::Failed,
1459 message: "Failed".to_string(),
1460 rollback_recommendation: None,
1461 timestamp: Utc::now(),
1462 };
1463
1464 assert!(!requires_rollback(&validation));
1465 }
1466
1467 #[test]
1468 fn test_requires_rollback_false_passed() {
1469 let validation = ValidationResult {
1470 confidence: 1.0,
1471 status: ValidationStatus::Passed,
1472 message: "Passed".to_string(),
1473 rollback_recommendation: None,
1474 timestamp: Utc::now(),
1475 };
1476
1477 assert!(!requires_rollback(&validation));
1478 }
1479}