1use crate::runtime::state::{
65 ObligationStateSnapshot, RegionStateSnapshot, RuntimeSnapshot, TaskSnapshot, TaskStateSnapshot,
66};
67use crate::runtime::RuntimeState;
68use crate::types::Time;
69use serde::{Deserialize, Serialize};
70use std::collections::{HashMap, HashSet};
71use std::fmt;
72
73#[derive(Debug, Clone, PartialEq, Eq)]
75pub enum RestoreError {
76 OrphanTask {
78 task_id: u32,
80 region_id: u32,
82 },
83 OrphanObligation {
85 obligation_id: u32,
87 task_id: u32,
89 },
90 InvalidParent {
92 region_id: u32,
94 parent_id: u32,
96 },
97 CyclicRegionTree {
99 cycle: Vec<u32>,
101 },
102 NonQuiescentClosure {
104 region_id: u32,
106 live_children: Vec<u32>,
108 live_tasks: Vec<u32>,
110 },
111 InvalidTimestamp {
113 snapshot_time: u64,
115 entity_time: u64,
117 entity: String,
119 },
120 DuplicateId {
122 kind: &'static str,
124 id: u32,
126 },
127}
128
129impl fmt::Display for RestoreError {
130 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131 match self {
132 Self::OrphanTask { task_id, region_id } => {
133 write!(
134 f,
135 "task {task_id} references non-existent region {region_id}"
136 )
137 }
138 Self::OrphanObligation {
139 obligation_id,
140 task_id,
141 } => {
142 write!(
143 f,
144 "obligation {obligation_id} references non-existent task {task_id}"
145 )
146 }
147 Self::InvalidParent {
148 region_id,
149 parent_id,
150 } => {
151 write!(
152 f,
153 "region {region_id} references non-existent parent {parent_id}"
154 )
155 }
156 Self::CyclicRegionTree { cycle } => {
157 write!(f, "region tree contains cycle: {cycle:?}")
158 }
159 Self::NonQuiescentClosure {
160 region_id,
161 live_children,
162 live_tasks,
163 } => {
164 write!(
165 f,
166 "closed region {region_id} has {} live children and {} live tasks",
167 live_children.len(),
168 live_tasks.len()
169 )
170 }
171 Self::InvalidTimestamp {
172 snapshot_time,
173 entity_time,
174 entity,
175 } => {
176 write!(
177 f,
178 "timestamp inconsistency: snapshot={snapshot_time}, {entity}={entity_time}"
179 )
180 }
181 Self::DuplicateId { kind, id } => {
182 write!(f, "duplicate {kind} ID: {id}")
183 }
184 }
185 }
186}
187
188impl std::error::Error for RestoreError {}
189
190#[derive(Debug, Clone)]
192pub struct ValidationResult {
193 pub is_valid: bool,
195 pub errors: Vec<RestoreError>,
197 pub stats: SnapshotStats,
199}
200
201#[derive(Debug, Clone, Default)]
203pub struct SnapshotStats {
204 pub region_count: usize,
206 pub task_count: usize,
208 pub obligation_count: usize,
210 pub max_depth: usize,
212 pub terminal_task_count: usize,
214 pub resolved_obligation_count: usize,
216 pub closed_region_count: usize,
218}
219
220#[derive(Debug, Clone, Serialize, Deserialize)]
224pub struct RestorableSnapshot {
225 pub snapshot: RuntimeSnapshot,
227 pub schema_version: u32,
229 pub content_hash: u64,
231}
232
233impl RestorableSnapshot {
234 pub const SCHEMA_VERSION: u32 = 1;
236
237 #[must_use]
239 pub fn new(snapshot: RuntimeSnapshot) -> Self {
240 let content_hash = Self::compute_hash(&snapshot);
241 Self {
242 snapshot,
243 schema_version: Self::SCHEMA_VERSION,
244 content_hash,
245 }
246 }
247
248 fn compute_hash(snapshot: &RuntimeSnapshot) -> u64 {
250 const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
252 const FNV_PRIME: u64 = 0x0100_0000_01b3;
253
254 let mut hash = FNV_OFFSET;
255
256 for byte in snapshot.timestamp.to_le_bytes() {
258 hash ^= u64::from(byte);
259 hash = hash.wrapping_mul(FNV_PRIME);
260 }
261
262 for byte in (snapshot.regions.len() as u64).to_le_bytes() {
264 hash ^= u64::from(byte);
265 hash = hash.wrapping_mul(FNV_PRIME);
266 }
267 for byte in (snapshot.tasks.len() as u64).to_le_bytes() {
268 hash ^= u64::from(byte);
269 hash = hash.wrapping_mul(FNV_PRIME);
270 }
271 for byte in (snapshot.obligations.len() as u64).to_le_bytes() {
272 hash ^= u64::from(byte);
273 hash = hash.wrapping_mul(FNV_PRIME);
274 }
275
276 hash
277 }
278
279 #[must_use]
288 #[allow(clippy::too_many_lines)]
289 pub fn validate(&self) -> ValidationResult {
290 let mut errors = Vec::new();
291 let mut stats = SnapshotStats::default();
292
293 let region_ids: HashSet<u32> = self.snapshot.regions.iter().map(|r| r.id.index).collect();
295 let task_ids: HashSet<u32> = self.snapshot.tasks.iter().map(|t| t.id.index).collect();
296
297 stats.region_count = self.snapshot.regions.len();
298 stats.task_count = self.snapshot.tasks.len();
299 stats.obligation_count = self.snapshot.obligations.len();
300
301 if region_ids.len() != self.snapshot.regions.len() {
303 let mut seen = HashSet::new();
305 for region in &self.snapshot.regions {
306 if !seen.insert(region.id.index) {
307 errors.push(RestoreError::DuplicateId {
308 kind: "region",
309 id: region.id.index,
310 });
311 }
312 }
313 }
314
315 if task_ids.len() != self.snapshot.tasks.len() {
317 let mut seen = HashSet::new();
318 for task in &self.snapshot.tasks {
319 if !seen.insert(task.id.index) {
320 errors.push(RestoreError::DuplicateId {
321 kind: "task",
322 id: task.id.index,
323 });
324 }
325 }
326 }
327
328 for task in &self.snapshot.tasks {
330 if !region_ids.contains(&task.region_id.index) {
331 errors.push(RestoreError::OrphanTask {
332 task_id: task.id.index,
333 region_id: task.region_id.index,
334 });
335 }
336 if is_task_terminal(&task.state) {
337 stats.terminal_task_count += 1;
338 }
339 }
340
341 for obligation in &self.snapshot.obligations {
343 if !task_ids.contains(&obligation.holder_task.index) {
344 errors.push(RestoreError::OrphanObligation {
345 obligation_id: obligation.id.index,
346 task_id: obligation.holder_task.index,
347 });
348 }
349 if is_obligation_resolved(&obligation.state) {
350 stats.resolved_obligation_count += 1;
351 }
352 }
353
354 let mut parent_map: HashMap<u32, Option<u32>> = HashMap::new();
356 for region in &self.snapshot.regions {
357 parent_map.insert(region.id.index, region.parent_id.map(|p| p.index));
358 if let Some(parent_id) = ®ion.parent_id {
359 if !region_ids.contains(&parent_id.index) {
360 errors.push(RestoreError::InvalidParent {
361 region_id: region.id.index,
362 parent_id: parent_id.index,
363 });
364 }
365 }
366 if is_region_closed(®ion.state) {
367 stats.closed_region_count += 1;
368 }
369 }
370
371 if let Some(cycle) = detect_cycle(&parent_map) {
373 errors.push(RestoreError::CyclicRegionTree { cycle });
374 }
375
376 stats.max_depth = compute_max_depth(&parent_map);
378
379 let mut region_tasks: HashMap<u32, Vec<&TaskSnapshot>> = HashMap::new();
381 for task in &self.snapshot.tasks {
382 region_tasks
383 .entry(task.region_id.index)
384 .or_default()
385 .push(task);
386 }
387
388 let mut region_children: HashMap<u32, Vec<u32>> = HashMap::new();
389 for region in &self.snapshot.regions {
390 if let Some(parent_id) = region.parent_id {
391 region_children
392 .entry(parent_id.index)
393 .or_default()
394 .push(region.id.index);
395 }
396 }
397
398 for region in &self.snapshot.regions {
400 if is_region_closed(®ion.state) {
401 let live_children: Vec<u32> = region_children
402 .get(®ion.id.index)
403 .map(|children| {
404 children
405 .iter()
406 .filter(|&&child_id| {
407 self.snapshot
408 .regions
409 .iter()
410 .find(|r| r.id.index == child_id)
411 .is_some_and(|r| !is_region_closed(&r.state))
412 })
413 .copied()
414 .collect()
415 })
416 .unwrap_or_default();
417
418 let live_tasks: Vec<u32> = region_tasks
419 .get(®ion.id.index)
420 .map(|tasks| {
421 tasks
422 .iter()
423 .filter(|t| !is_task_terminal(&t.state))
424 .map(|t| t.id.index)
425 .collect()
426 })
427 .unwrap_or_default();
428
429 if !live_children.is_empty() || !live_tasks.is_empty() {
430 errors.push(RestoreError::NonQuiescentClosure {
431 region_id: region.id.index,
432 live_children,
433 live_tasks,
434 });
435 }
436 }
437 }
438
439 ValidationResult {
440 is_valid: errors.is_empty(),
441 errors,
442 stats,
443 }
444 }
445
446 #[must_use]
448 pub fn verify_integrity(&self) -> bool {
449 Self::compute_hash(&self.snapshot) == self.content_hash
450 }
451
452 #[must_use]
454 pub fn timestamp(&self) -> Time {
455 Time::from_nanos(self.snapshot.timestamp)
456 }
457}
458
459fn is_task_terminal(state: &TaskStateSnapshot) -> bool {
461 matches!(state, TaskStateSnapshot::Completed { .. })
462}
463
464fn is_obligation_resolved(state: &ObligationStateSnapshot) -> bool {
466 matches!(
467 state,
468 ObligationStateSnapshot::Committed
469 | ObligationStateSnapshot::Aborted
470 | ObligationStateSnapshot::Leaked
471 )
472}
473
474fn is_region_closed(state: &RegionStateSnapshot) -> bool {
476 matches!(state, RegionStateSnapshot::Closed)
477}
478
479fn detect_cycle(parent_map: &HashMap<u32, Option<u32>>) -> Option<Vec<u32>> {
481 for &start in parent_map.keys() {
482 let mut visited = HashSet::new();
483 let mut path = Vec::new();
484 let mut current = Some(start);
485
486 while let Some(node) = current {
487 if visited.contains(&node) {
488 if let Some(pos) = path.iter().position(|&n| n == node) {
490 return Some(path[pos..].to_vec());
491 }
492 }
493 visited.insert(node);
494 path.push(node);
495 current = parent_map.get(&node).copied().flatten();
496 }
497 }
498 None
499}
500
501fn compute_max_depth(parent_map: &HashMap<u32, Option<u32>>) -> usize {
503 let mut max_depth = 0;
504 for &start in parent_map.keys() {
505 let mut depth = 0;
506 let mut current = Some(start);
507 while let Some(node) = current {
508 depth += 1;
509 current = parent_map.get(&node).copied().flatten();
510 }
511 max_depth = max_depth.max(depth);
512 }
513 max_depth
514}
515
516pub trait SnapshotRestore {
518 fn restorable_snapshot(&self) -> RestorableSnapshot;
520}
521
522impl SnapshotRestore for RuntimeState {
523 fn restorable_snapshot(&self) -> RestorableSnapshot {
524 RestorableSnapshot::new(self.snapshot())
525 }
526}
527
528#[cfg(test)]
531mod tests {
532 use super::*;
533 use crate::runtime::state::IdSnapshot;
534 use crate::runtime::state::{
535 BudgetSnapshot, ObligationKindSnapshot, ObligationSnapshot, RegionSnapshot,
536 };
537
538 fn init_test(name: &str) {
539 crate::test_utils::init_test_logging();
540 crate::test_phase!(name);
541 }
542
543 fn make_region(id: u32, parent: Option<u32>, state: RegionStateSnapshot) -> RegionSnapshot {
544 RegionSnapshot {
545 id: IdSnapshot {
546 index: id,
547 generation: 0,
548 },
549 parent_id: parent.map(|p| IdSnapshot {
550 index: p,
551 generation: 0,
552 }),
553 state,
554 budget: BudgetSnapshot {
555 deadline: None,
556 poll_quota: 1000,
557 cost_quota: None,
558 priority: 100,
559 },
560 child_count: 0,
561 task_count: 0,
562 name: None,
563 }
564 }
565
566 fn make_task(id: u32, region_id: u32, state: TaskStateSnapshot) -> TaskSnapshot {
567 TaskSnapshot {
568 id: IdSnapshot {
569 index: id,
570 generation: 0,
571 },
572 region_id: IdSnapshot {
573 index: region_id,
574 generation: 0,
575 },
576 state,
577 name: None,
578 poll_count: 0,
579 created_at: 0,
580 obligations: Vec::new(),
581 }
582 }
583
584 fn make_obligation(
585 id: u32,
586 task_id: u32,
587 state: ObligationStateSnapshot,
588 ) -> ObligationSnapshot {
589 ObligationSnapshot {
590 id: IdSnapshot {
591 index: id,
592 generation: 0,
593 },
594 kind: ObligationKindSnapshot::SendPermit,
595 state,
596 holder_task: IdSnapshot {
597 index: task_id,
598 generation: 0,
599 },
600 owning_region: IdSnapshot {
601 index: 0,
602 generation: 0,
603 },
604 created_at: 0,
605 }
606 }
607
608 fn make_snapshot(
609 regions: Vec<RegionSnapshot>,
610 tasks: Vec<TaskSnapshot>,
611 obligations: Vec<ObligationSnapshot>,
612 ) -> RestorableSnapshot {
613 RestorableSnapshot::new(RuntimeSnapshot {
614 timestamp: 1000,
615 regions,
616 tasks,
617 obligations,
618 recent_events: Vec::new(),
619 })
620 }
621
622 #[test]
623 fn empty_snapshot_is_valid() {
624 init_test("empty_snapshot_is_valid");
625 let snapshot = make_snapshot(Vec::new(), Vec::new(), Vec::new());
626 let result = snapshot.validate();
627
628 crate::assert_with_log!(result.is_valid, "is_valid", true, result.is_valid);
629 let errors_empty = result.errors.is_empty();
630 crate::assert_with_log!(errors_empty, "errors empty", true, errors_empty);
631 crate::test_complete!("empty_snapshot_is_valid");
632 }
633
634 #[test]
635 fn single_region_is_valid() {
636 init_test("single_region_is_valid");
637 let snapshot = make_snapshot(
638 vec![make_region(0, None, RegionStateSnapshot::Open)],
639 Vec::new(),
640 Vec::new(),
641 );
642 let result = snapshot.validate();
643
644 crate::assert_with_log!(result.is_valid, "is_valid", true, result.is_valid);
645 crate::assert_with_log!(
646 result.stats.region_count == 1,
647 "region_count",
648 1,
649 result.stats.region_count
650 );
651 crate::test_complete!("single_region_is_valid");
652 }
653
654 #[test]
655 fn task_with_valid_region_is_valid() {
656 init_test("task_with_valid_region_is_valid");
657 let snapshot = make_snapshot(
658 vec![make_region(0, None, RegionStateSnapshot::Open)],
659 vec![make_task(0, 0, TaskStateSnapshot::Running)],
660 Vec::new(),
661 );
662 let result = snapshot.validate();
663
664 crate::assert_with_log!(result.is_valid, "is_valid", true, result.is_valid);
665 crate::assert_with_log!(
666 result.stats.task_count == 1,
667 "task_count",
668 1,
669 result.stats.task_count
670 );
671 crate::test_complete!("task_with_valid_region_is_valid");
672 }
673
674 #[test]
675 fn orphan_task_detected() {
676 init_test("orphan_task_detected");
677 let snapshot = make_snapshot(
678 vec![make_region(0, None, RegionStateSnapshot::Open)],
679 vec![make_task(0, 99, TaskStateSnapshot::Running)], Vec::new(),
681 );
682 let result = snapshot.validate();
683
684 let not_valid = !result.is_valid;
685 crate::assert_with_log!(not_valid, "not valid", true, not_valid);
686 let has_error = result
687 .errors
688 .iter()
689 .any(|e| matches!(e, RestoreError::OrphanTask { .. }));
690 crate::assert_with_log!(has_error, "has OrphanTask error", true, has_error);
691 crate::test_complete!("orphan_task_detected");
692 }
693
694 #[test]
695 fn orphan_obligation_detected() {
696 init_test("orphan_obligation_detected");
697 let snapshot = make_snapshot(
698 vec![make_region(0, None, RegionStateSnapshot::Open)],
699 vec![make_task(0, 0, TaskStateSnapshot::Running)],
700 vec![make_obligation(0, 99, ObligationStateSnapshot::Reserved)], );
702 let result = snapshot.validate();
703
704 let not_valid = !result.is_valid;
705 crate::assert_with_log!(not_valid, "not valid", true, not_valid);
706 let has_error = result
707 .errors
708 .iter()
709 .any(|e| matches!(e, RestoreError::OrphanObligation { .. }));
710 crate::assert_with_log!(has_error, "has OrphanObligation error", true, has_error);
711 crate::test_complete!("orphan_obligation_detected");
712 }
713
714 #[test]
715 fn invalid_parent_detected() {
716 init_test("invalid_parent_detected");
717 let snapshot = make_snapshot(
718 vec![
719 make_region(0, None, RegionStateSnapshot::Open),
720 make_region(1, Some(99), RegionStateSnapshot::Open), ],
722 Vec::new(),
723 Vec::new(),
724 );
725 let result = snapshot.validate();
726
727 let not_valid = !result.is_valid;
728 crate::assert_with_log!(not_valid, "not valid", true, not_valid);
729 let has_error = result
730 .errors
731 .iter()
732 .any(|e| matches!(e, RestoreError::InvalidParent { .. }));
733 crate::assert_with_log!(has_error, "has InvalidParent error", true, has_error);
734 crate::test_complete!("invalid_parent_detected");
735 }
736
737 #[test]
738 fn closed_region_with_live_task_detected() {
739 init_test("closed_region_with_live_task_detected");
740 let snapshot = make_snapshot(
741 vec![make_region(0, None, RegionStateSnapshot::Closed)],
742 vec![make_task(0, 0, TaskStateSnapshot::Running)], Vec::new(),
744 );
745 let result = snapshot.validate();
746
747 let not_valid = !result.is_valid;
748 crate::assert_with_log!(not_valid, "not valid", true, not_valid);
749 let has_error = result
750 .errors
751 .iter()
752 .any(|e| matches!(e, RestoreError::NonQuiescentClosure { .. }));
753 crate::assert_with_log!(has_error, "has NonQuiescentClosure error", true, has_error);
754 crate::test_complete!("closed_region_with_live_task_detected");
755 }
756
757 #[test]
758 fn nested_regions_valid() {
759 init_test("nested_regions_valid");
760 let snapshot = make_snapshot(
761 vec![
762 make_region(0, None, RegionStateSnapshot::Open),
763 make_region(1, Some(0), RegionStateSnapshot::Open),
764 make_region(2, Some(1), RegionStateSnapshot::Open),
765 ],
766 Vec::new(),
767 Vec::new(),
768 );
769 let result = snapshot.validate();
770
771 crate::assert_with_log!(result.is_valid, "is_valid", true, result.is_valid);
772 crate::assert_with_log!(
773 result.stats.max_depth == 3,
774 "max_depth",
775 3,
776 result.stats.max_depth
777 );
778 crate::test_complete!("nested_regions_valid");
779 }
780
781 #[test]
782 fn terminal_task_stats_computed() {
783 init_test("terminal_task_stats_computed");
784 let snapshot = make_snapshot(
785 vec![make_region(0, None, RegionStateSnapshot::Open)],
786 vec![
787 make_task(0, 0, TaskStateSnapshot::Running),
788 make_task(
789 1,
790 0,
791 TaskStateSnapshot::Completed {
792 outcome: crate::runtime::state::OutcomeSnapshot::Ok,
793 },
794 ),
795 ],
796 Vec::new(),
797 );
798 let result = snapshot.validate();
799
800 crate::assert_with_log!(result.is_valid, "is_valid", true, result.is_valid);
801 crate::assert_with_log!(
802 result.stats.terminal_task_count == 1,
803 "terminal_task_count",
804 1,
805 result.stats.terminal_task_count
806 );
807 crate::test_complete!("terminal_task_stats_computed");
808 }
809
810 #[test]
811 fn content_hash_deterministic() {
812 init_test("content_hash_deterministic");
813 let snapshot1 = make_snapshot(
814 vec![make_region(0, None, RegionStateSnapshot::Open)],
815 vec![make_task(0, 0, TaskStateSnapshot::Running)],
816 Vec::new(),
817 );
818 let snapshot2 = make_snapshot(
819 vec![make_region(0, None, RegionStateSnapshot::Open)],
820 vec![make_task(0, 0, TaskStateSnapshot::Running)],
821 Vec::new(),
822 );
823
824 crate::assert_with_log!(
825 snapshot1.content_hash == snapshot2.content_hash,
826 "hashes equal",
827 snapshot1.content_hash,
828 snapshot2.content_hash
829 );
830 crate::test_complete!("content_hash_deterministic");
831 }
832
833 #[test]
834 fn integrity_verification_works() {
835 init_test("integrity_verification_works");
836 let snapshot = make_snapshot(
837 vec![make_region(0, None, RegionStateSnapshot::Open)],
838 Vec::new(),
839 Vec::new(),
840 );
841
842 let valid = snapshot.verify_integrity();
843 crate::assert_with_log!(valid, "integrity valid", true, valid);
844
845 let mut tampered = snapshot;
847 tampered.content_hash ^= 1;
848 let invalid = !tampered.verify_integrity();
849 crate::assert_with_log!(invalid, "tampered invalid", true, invalid);
850
851 crate::test_complete!("integrity_verification_works");
852 }
853
854 #[test]
855 fn duplicate_region_id_detected() {
856 init_test("duplicate_region_id_detected");
857 let snapshot = make_snapshot(
858 vec![
859 make_region(0, None, RegionStateSnapshot::Open),
860 make_region(0, None, RegionStateSnapshot::Open), ],
862 Vec::new(),
863 Vec::new(),
864 );
865 let result = snapshot.validate();
866
867 let not_valid = !result.is_valid;
868 crate::assert_with_log!(not_valid, "not valid", true, not_valid);
869 let has_error = result
870 .errors
871 .iter()
872 .any(|e| matches!(e, RestoreError::DuplicateId { kind: "region", .. }));
873 crate::assert_with_log!(has_error, "has DuplicateId error", true, has_error);
874 crate::test_complete!("duplicate_region_id_detected");
875 }
876
877 #[test]
878 fn resolved_obligation_stats_computed() {
879 init_test("resolved_obligation_stats_computed");
880 let snapshot = make_snapshot(
881 vec![make_region(0, None, RegionStateSnapshot::Open)],
882 vec![make_task(0, 0, TaskStateSnapshot::Running)],
883 vec![
884 make_obligation(0, 0, ObligationStateSnapshot::Reserved),
885 make_obligation(1, 0, ObligationStateSnapshot::Committed),
886 make_obligation(2, 0, ObligationStateSnapshot::Aborted),
887 ],
888 );
889 let result = snapshot.validate();
890
891 crate::assert_with_log!(result.is_valid, "is_valid", true, result.is_valid);
892 crate::assert_with_log!(
893 result.stats.resolved_obligation_count == 2,
894 "resolved_obligation_count",
895 2,
896 result.stats.resolved_obligation_count
897 );
898 crate::test_complete!("resolved_obligation_stats_computed");
899 }
900}