1use std::fmt;
37
38#[derive(Debug, Clone, PartialEq, Eq)]
42pub enum CompactionError {
43 ConcurrentModification {
52 expected_seq: u64,
54 actual_seq: u64,
56 direction: Direction,
58 },
59
60 ForwardSwapFailed {
69 reason: SwapFailureReason,
71 },
72
73 ReverseSwapFailed {
83 reason: SwapFailureReason,
85 rollback_successful: bool,
87 },
88
89 CounterReconcileFailed {
104 active_guards: usize,
107 forward_swapped: bool,
109 reverse_swapped: bool,
111 },
112
113 Interrupted {
122 reason: InterruptReason,
124 edges_processed: usize,
126 edges_total: usize,
128 },
129
130 BuildFailed {
139 direction: Direction,
141 reason: BuildFailureReason,
143 },
144}
145
146#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
148pub enum Direction {
149 Forward,
151 Reverse,
153}
154
155impl fmt::Display for Direction {
156 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157 match self {
158 Self::Forward => write!(f, "forward"),
159 Self::Reverse => write!(f, "reverse"),
160 }
161 }
162}
163
164#[derive(Debug, Clone, PartialEq, Eq)]
166pub enum SwapFailureReason {
167 ValidationFailed {
169 message: String,
171 },
172 AllocationFailed,
174 InvariantViolation {
176 message: String,
178 },
179}
180
181impl fmt::Display for SwapFailureReason {
182 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
183 match self {
184 Self::ValidationFailed { message } => write!(f, "validation failed: {message}"),
185 Self::AllocationFailed => write!(f, "memory allocation failed"),
186 Self::InvariantViolation { message } => write!(f, "invariant violation: {message}"),
187 }
188 }
189}
190
191#[derive(Debug, Clone, PartialEq, Eq)]
193pub enum InterruptReason {
194 ShutdownRequested,
196 Cancelled,
198 CancellationRequested,
200 Timeout {
202 elapsed_ms: u64,
204 limit_ms: u64,
206 },
207}
208
209impl fmt::Display for InterruptReason {
210 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211 match self {
212 Self::ShutdownRequested => write!(f, "shutdown requested"),
213 Self::Cancelled => write!(f, "cancelled"),
214 Self::CancellationRequested => write!(f, "cancellation requested"),
215 Self::Timeout {
216 elapsed_ms,
217 limit_ms,
218 } => {
219 write!(f, "timeout after {elapsed_ms}ms (limit: {limit_ms}ms)")
220 }
221 }
222 }
223}
224
225#[derive(Debug, Clone, PartialEq, Eq)]
227pub enum BuildFailureReason {
228 InsufficientEdges {
230 count: usize,
232 minimum: usize,
234 },
235 InvalidEdgeData {
237 message: String,
239 },
240 AllocationFailed,
242 BuilderError {
244 message: String,
246 },
247}
248
249impl fmt::Display for BuildFailureReason {
250 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
251 match self {
252 Self::InsufficientEdges { count, minimum } => {
253 write!(f, "insufficient edges: {count} (minimum: {minimum})")
254 }
255 Self::InvalidEdgeData { message } => write!(f, "invalid edge data: {message}"),
256 Self::AllocationFailed => write!(f, "memory allocation failed"),
257 Self::BuilderError { message } => write!(f, "builder error: {message}"),
258 }
259 }
260}
261
262impl fmt::Display for CompactionError {
263 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
264 match self {
265 Self::ConcurrentModification {
266 expected_seq,
267 actual_seq,
268 direction,
269 } => {
270 write!(
271 f,
272 "concurrent modification in {direction} store: expected seq {expected_seq}, found {actual_seq}"
273 )
274 }
275 Self::ForwardSwapFailed { reason } => {
276 write!(f, "forward CSR swap failed: {reason}")
277 }
278 Self::ReverseSwapFailed {
279 reason,
280 rollback_successful,
281 } => {
282 let rollback_status = if *rollback_successful {
283 "rollback succeeded"
284 } else {
285 "ROLLBACK FAILED"
286 };
287 write!(f, "reverse CSR swap failed: {reason} ({rollback_status})")
288 }
289 Self::CounterReconcileFailed {
290 active_guards,
291 forward_swapped,
292 reverse_swapped,
293 } => {
294 let swap_status = match (forward_swapped, reverse_swapped) {
295 (true, true) => "both CSRs swapped",
296 (true, false) => "forward CSR swapped",
297 (false, true) => "reverse CSR swapped",
298 (false, false) => "no CSRs swapped",
299 };
300 write!(
301 f,
302 "counter reconciliation failed: {active_guards} active guards prevented reset ({swap_status})"
303 )
304 }
305 Self::Interrupted {
306 reason,
307 edges_processed,
308 edges_total,
309 } => {
310 write!(
311 f,
312 "compaction interrupted: {reason} ({edges_processed}/{edges_total} edges processed)"
313 )
314 }
315 Self::BuildFailed { direction, reason } => {
316 write!(f, "{direction} CSR build failed: {reason}")
317 }
318 }
319 }
320}
321
322impl std::error::Error for CompactionError {}
323
324#[derive(Debug, Clone, PartialEq, Eq)]
329pub struct PostErrorState {
330 pub forward_csr: ComponentState,
332 pub forward_deltas: ComponentState,
334 pub forward_seq: ComponentState,
336 pub reverse_csr: ComponentState,
338 pub reverse_deltas: ComponentState,
340 pub reverse_seq: ComponentState,
342 pub committed: ComponentState,
344 pub reserved: ComponentState,
346 pub counter_reconciled: CounterReconcileState,
348}
349
350#[derive(Debug, Clone, Copy, PartialEq, Eq)]
352pub enum ComponentState {
353 Unchanged,
355 RolledBack,
357 Restored,
359 Success,
361 Cleared,
363 Reset,
365 Stale,
367}
368
369impl fmt::Display for ComponentState {
370 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
371 match self {
372 Self::Unchanged => write!(f, "UNCHANGED"),
373 Self::RolledBack => write!(f, "ROLLED BACK"),
374 Self::Restored => write!(f, "RESTORED"),
375 Self::Success => write!(f, "SUCCESS"),
376 Self::Cleared => write!(f, "CLEARED"),
377 Self::Reset => write!(f, "RESET"),
378 Self::Stale => write!(f, "STALE"),
379 }
380 }
381}
382
383#[derive(Debug, Clone, Copy, PartialEq, Eq)]
385pub enum CounterReconcileState {
386 NotApplicable,
388 Yes,
390 NoLogged,
392}
393
394impl fmt::Display for CounterReconcileState {
395 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
396 match self {
397 Self::NotApplicable => write!(f, "N/A"),
398 Self::Yes => write!(f, "YES"),
399 Self::NoLogged => write!(f, "NO - LOGGED"),
400 }
401 }
402}
403
404impl CompactionError {
405 #[must_use]
410 pub fn post_error_state(&self) -> PostErrorState {
411 match self {
412 Self::ConcurrentModification { .. } | Self::ForwardSwapFailed { .. } => {
413 PostErrorState {
414 forward_csr: ComponentState::Unchanged,
415 forward_deltas: ComponentState::Unchanged,
416 forward_seq: ComponentState::Unchanged,
417 reverse_csr: ComponentState::Unchanged,
418 reverse_deltas: ComponentState::Unchanged,
419 reverse_seq: ComponentState::Unchanged,
420 committed: ComponentState::Unchanged,
421 reserved: ComponentState::Unchanged,
422 counter_reconciled: CounterReconcileState::NotApplicable,
423 }
424 }
425 Self::ReverseSwapFailed { .. } => PostErrorState {
426 forward_csr: ComponentState::RolledBack,
427 forward_deltas: ComponentState::RolledBack,
428 forward_seq: ComponentState::RolledBack,
429 reverse_csr: ComponentState::Unchanged,
430 reverse_deltas: ComponentState::Unchanged,
431 reverse_seq: ComponentState::Unchanged,
432 committed: ComponentState::Restored,
433 reserved: ComponentState::Restored,
434 counter_reconciled: CounterReconcileState::Yes,
435 },
436 Self::CounterReconcileFailed { .. } => PostErrorState {
437 forward_csr: ComponentState::Success,
438 forward_deltas: ComponentState::Cleared,
439 forward_seq: ComponentState::Reset,
440 reverse_csr: ComponentState::Success,
441 reverse_deltas: ComponentState::Cleared,
442 reverse_seq: ComponentState::Reset,
443 committed: ComponentState::Stale,
444 reserved: ComponentState::Unchanged,
445 counter_reconciled: CounterReconcileState::NoLogged,
446 },
447 Self::Interrupted { .. } | Self::BuildFailed { .. } => PostErrorState {
448 forward_csr: ComponentState::Unchanged,
449 forward_deltas: ComponentState::Unchanged,
450 forward_seq: ComponentState::Unchanged,
451 reverse_csr: ComponentState::Unchanged,
452 reverse_deltas: ComponentState::Unchanged,
453 reverse_seq: ComponentState::Unchanged,
454 committed: ComponentState::Unchanged,
455 reserved: ComponentState::Unchanged,
456 counter_reconciled: CounterReconcileState::NotApplicable,
457 },
458 }
459 }
460
461 #[must_use]
463 pub fn phase(&self) -> CompactionPhase {
464 match self {
465 Self::ConcurrentModification { .. } => CompactionPhase::Phase2Start,
466 Self::ForwardSwapFailed { .. } => CompactionPhase::Phase2Forward,
467 Self::ReverseSwapFailed { .. } => CompactionPhase::Phase2Reverse,
468 Self::CounterReconcileFailed { .. } => CompactionPhase::Phase2PostSwap,
469 Self::Interrupted { .. } | Self::BuildFailed { .. } => CompactionPhase::Phase1,
470 }
471 }
472
473 #[must_use]
478 pub fn is_fully_consistent(&self) -> bool {
479 !matches!(self, Self::CounterReconcileFailed { .. })
481 }
482
483 #[must_use]
485 pub fn should_retry(&self) -> bool {
486 matches!(
487 self,
488 Self::ConcurrentModification { .. } | Self::Interrupted { .. }
489 )
490 }
491}
492
493#[derive(Debug, Clone, Copy, PartialEq, Eq)]
495pub enum CompactionPhase {
496 Phase1,
498 Phase2Start,
500 Phase2Forward,
502 Phase2Reverse,
504 Phase2PostSwap,
506}
507
508impl fmt::Display for CompactionPhase {
509 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
510 match self {
511 Self::Phase1 => write!(f, "Phase 1 (prepare)"),
512 Self::Phase2Start => write!(f, "Phase 2 start (lock acquisition)"),
513 Self::Phase2Forward => write!(f, "Phase 2 forward (CSR swap)"),
514 Self::Phase2Reverse => write!(f, "Phase 2 reverse (CSR swap)"),
515 Self::Phase2PostSwap => write!(f, "Phase 2 post-swap (counter reconciliation)"),
516 }
517 }
518}
519
520#[derive(Debug, Clone, PartialEq, Eq)]
524pub struct SwapPreconditions {
525 pub expected_seq: u64,
527 pub expected_csr_version: u64,
529 pub require_deltas: bool,
531}
532
533impl SwapPreconditions {
534 pub fn validate(
543 &self,
544 actual_seq: u64,
545 actual_csr_version: u64,
546 delta_count: usize,
547 ) -> Result<(), SwapPreconditionError> {
548 if actual_seq != self.expected_seq {
549 return Err(SwapPreconditionError::SequenceMismatch {
550 expected: self.expected_seq,
551 actual: actual_seq,
552 });
553 }
554
555 if actual_csr_version != self.expected_csr_version {
556 return Err(SwapPreconditionError::CsrVersionMismatch {
557 expected: self.expected_csr_version,
558 actual: actual_csr_version,
559 });
560 }
561
562 if self.require_deltas && delta_count == 0 {
563 return Err(SwapPreconditionError::EmptyDeltaBuffer);
564 }
565
566 Ok(())
567 }
568}
569
570#[derive(Debug, Clone, PartialEq, Eq)]
572pub enum SwapPreconditionError {
573 SequenceMismatch {
575 expected: u64,
577 actual: u64,
579 },
580 CsrVersionMismatch {
582 expected: u64,
584 actual: u64,
586 },
587 EmptyDeltaBuffer,
589}
590
591impl fmt::Display for SwapPreconditionError {
592 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
593 match self {
594 Self::SequenceMismatch { expected, actual } => {
595 write!(f, "sequence mismatch: expected {expected}, actual {actual}")
596 }
597 Self::CsrVersionMismatch { expected, actual } => {
598 write!(
599 f,
600 "CSR version mismatch: expected {expected}, actual {actual}"
601 )
602 }
603 Self::EmptyDeltaBuffer => write!(f, "delta buffer is empty"),
604 }
605 }
606}
607
608impl std::error::Error for SwapPreconditionError {}
609
610#[cfg(test)]
611mod tests {
612 use super::*;
613
614 #[test]
615 fn test_concurrent_modification_error() {
616 let error = CompactionError::ConcurrentModification {
617 expected_seq: 100,
618 actual_seq: 105,
619 direction: Direction::Forward,
620 };
621
622 assert!(error.to_string().contains("concurrent modification"));
623 assert!(error.to_string().contains("forward"));
624 assert!(error.to_string().contains("100"));
625 assert!(error.to_string().contains("105"));
626 assert_eq!(error.phase(), CompactionPhase::Phase2Start);
627 assert!(error.is_fully_consistent());
628 assert!(error.should_retry());
629 }
630
631 #[test]
632 fn test_forward_swap_failed_error() {
633 let error = CompactionError::ForwardSwapFailed {
634 reason: SwapFailureReason::ValidationFailed {
635 message: "invalid node count".to_string(),
636 },
637 };
638
639 assert!(error.to_string().contains("forward CSR swap failed"));
640 assert!(error.to_string().contains("validation failed"));
641 assert_eq!(error.phase(), CompactionPhase::Phase2Forward);
642 assert!(error.is_fully_consistent());
643 assert!(!error.should_retry());
644 }
645
646 #[test]
647 fn test_reverse_swap_failed_error() {
648 let error = CompactionError::ReverseSwapFailed {
649 reason: SwapFailureReason::AllocationFailed,
650 rollback_successful: true,
651 };
652
653 assert!(error.to_string().contains("reverse CSR swap failed"));
654 assert!(error.to_string().contains("rollback succeeded"));
655 assert_eq!(error.phase(), CompactionPhase::Phase2Reverse);
656 assert!(error.is_fully_consistent());
657 assert!(!error.should_retry());
658 }
659
660 #[test]
661 fn test_reverse_swap_failed_rollback_failed() {
662 let error = CompactionError::ReverseSwapFailed {
663 reason: SwapFailureReason::InvariantViolation {
664 message: "edge count mismatch".to_string(),
665 },
666 rollback_successful: false,
667 };
668
669 assert!(error.to_string().contains("ROLLBACK FAILED"));
670 assert!(error.is_fully_consistent()); }
672
673 #[test]
674 fn test_counter_reconcile_failed_error() {
675 let error = CompactionError::CounterReconcileFailed {
676 active_guards: 2,
677 forward_swapped: true,
678 reverse_swapped: true,
679 };
680
681 assert!(error.to_string().contains("counter reconciliation failed"));
682 assert!(error.to_string().contains("2 active guards"));
683 assert!(error.to_string().contains("both CSRs swapped"));
684 assert_eq!(error.phase(), CompactionPhase::Phase2PostSwap);
685 assert!(!error.is_fully_consistent()); assert!(!error.should_retry());
687 }
688
689 #[test]
690 fn test_interrupted_error() {
691 let error = CompactionError::Interrupted {
692 reason: InterruptReason::Timeout {
693 elapsed_ms: 5000,
694 limit_ms: 3000,
695 },
696 edges_processed: 500,
697 edges_total: 1000,
698 };
699
700 assert!(error.to_string().contains("compaction interrupted"));
701 assert!(error.to_string().contains("timeout"));
702 assert!(error.to_string().contains("500/1000"));
703 assert_eq!(error.phase(), CompactionPhase::Phase1);
704 assert!(error.is_fully_consistent());
705 assert!(error.should_retry());
706 }
707
708 #[test]
709 fn test_build_failed_error() {
710 let error = CompactionError::BuildFailed {
711 direction: Direction::Reverse,
712 reason: BuildFailureReason::InsufficientEdges {
713 count: 0,
714 minimum: 1,
715 },
716 };
717
718 assert!(error.to_string().contains("reverse CSR build failed"));
719 assert!(error.to_string().contains("insufficient edges"));
720 assert_eq!(error.phase(), CompactionPhase::Phase1);
721 assert!(error.is_fully_consistent());
722 assert!(!error.should_retry());
723 }
724
725 #[test]
726 fn test_post_error_state_concurrent_modification() {
727 let error = CompactionError::ConcurrentModification {
728 expected_seq: 1,
729 actual_seq: 2,
730 direction: Direction::Forward,
731 };
732
733 let state = error.post_error_state();
734 assert_eq!(state.forward_csr, ComponentState::Unchanged);
735 assert_eq!(state.reverse_csr, ComponentState::Unchanged);
736 assert_eq!(state.committed, ComponentState::Unchanged);
737 assert_eq!(
738 state.counter_reconciled,
739 CounterReconcileState::NotApplicable
740 );
741 }
742
743 #[test]
744 fn test_post_error_state_reverse_swap_failed() {
745 let error = CompactionError::ReverseSwapFailed {
746 reason: SwapFailureReason::AllocationFailed,
747 rollback_successful: true,
748 };
749
750 let state = error.post_error_state();
751 assert_eq!(state.forward_csr, ComponentState::RolledBack);
752 assert_eq!(state.forward_deltas, ComponentState::RolledBack);
753 assert_eq!(state.reverse_csr, ComponentState::Unchanged);
754 assert_eq!(state.committed, ComponentState::Restored);
755 assert_eq!(state.counter_reconciled, CounterReconcileState::Yes);
756 }
757
758 #[test]
759 fn test_post_error_state_counter_reconcile_failed() {
760 let error = CompactionError::CounterReconcileFailed {
761 active_guards: 1,
762 forward_swapped: true,
763 reverse_swapped: true,
764 };
765
766 let state = error.post_error_state();
767 assert_eq!(state.forward_csr, ComponentState::Success);
768 assert_eq!(state.forward_deltas, ComponentState::Cleared);
769 assert_eq!(state.forward_seq, ComponentState::Reset);
770 assert_eq!(state.committed, ComponentState::Stale);
771 assert_eq!(state.counter_reconciled, CounterReconcileState::NoLogged);
772 }
773
774 #[test]
775 fn test_direction_display() {
776 assert_eq!(Direction::Forward.to_string(), "forward");
777 assert_eq!(Direction::Reverse.to_string(), "reverse");
778 }
779
780 #[test]
781 fn test_swap_failure_reason_display() {
782 assert!(
783 SwapFailureReason::AllocationFailed
784 .to_string()
785 .contains("allocation")
786 );
787 assert!(
788 SwapFailureReason::ValidationFailed {
789 message: "test".to_string()
790 }
791 .to_string()
792 .contains("test")
793 );
794 }
795
796 #[test]
797 fn test_interrupt_reason_display() {
798 assert!(
799 InterruptReason::ShutdownRequested
800 .to_string()
801 .contains("shutdown")
802 );
803 assert!(InterruptReason::Cancelled.to_string().contains("cancelled"));
804 assert!(
805 InterruptReason::Timeout {
806 elapsed_ms: 100,
807 limit_ms: 50
808 }
809 .to_string()
810 .contains("100ms")
811 );
812 }
813
814 #[test]
815 fn test_build_failure_reason_display() {
816 assert!(
817 BuildFailureReason::AllocationFailed
818 .to_string()
819 .contains("allocation")
820 );
821 assert!(
822 BuildFailureReason::InsufficientEdges {
823 count: 0,
824 minimum: 1
825 }
826 .to_string()
827 .contains("insufficient")
828 );
829 }
830
831 #[test]
832 fn test_compaction_phase_display() {
833 assert!(CompactionPhase::Phase1.to_string().contains("Phase 1"));
834 assert!(
835 CompactionPhase::Phase2Start
836 .to_string()
837 .contains("Phase 2 start")
838 );
839 assert!(
840 CompactionPhase::Phase2Forward
841 .to_string()
842 .contains("forward")
843 );
844 }
845
846 #[test]
847 fn test_component_state_display() {
848 assert_eq!(ComponentState::Unchanged.to_string(), "UNCHANGED");
849 assert_eq!(ComponentState::RolledBack.to_string(), "ROLLED BACK");
850 assert_eq!(ComponentState::Restored.to_string(), "RESTORED");
851 assert_eq!(ComponentState::Success.to_string(), "SUCCESS");
852 assert_eq!(ComponentState::Cleared.to_string(), "CLEARED");
853 assert_eq!(ComponentState::Reset.to_string(), "RESET");
854 assert_eq!(ComponentState::Stale.to_string(), "STALE");
855 }
856
857 #[test]
858 fn test_counter_reconcile_state_display() {
859 assert_eq!(CounterReconcileState::NotApplicable.to_string(), "N/A");
860 assert_eq!(CounterReconcileState::Yes.to_string(), "YES");
861 assert_eq!(CounterReconcileState::NoLogged.to_string(), "NO - LOGGED");
862 }
863
864 #[test]
865 fn test_swap_preconditions_validate_success() {
866 let preconditions = SwapPreconditions {
867 expected_seq: 100,
868 expected_csr_version: 5,
869 require_deltas: true,
870 };
871
872 assert!(preconditions.validate(100, 5, 10).is_ok());
873 }
874
875 #[test]
876 fn test_swap_preconditions_validate_seq_mismatch() {
877 let preconditions = SwapPreconditions {
878 expected_seq: 100,
879 expected_csr_version: 5,
880 require_deltas: false,
881 };
882
883 let result = preconditions.validate(101, 5, 0);
884 assert!(matches!(
885 result,
886 Err(SwapPreconditionError::SequenceMismatch {
887 expected: 100,
888 actual: 101
889 })
890 ));
891 }
892
893 #[test]
894 fn test_swap_preconditions_validate_csr_version_mismatch() {
895 let preconditions = SwapPreconditions {
896 expected_seq: 100,
897 expected_csr_version: 5,
898 require_deltas: false,
899 };
900
901 let result = preconditions.validate(100, 6, 0);
902 assert!(matches!(
903 result,
904 Err(SwapPreconditionError::CsrVersionMismatch {
905 expected: 5,
906 actual: 6
907 })
908 ));
909 }
910
911 #[test]
912 fn test_swap_preconditions_validate_empty_deltas() {
913 let preconditions = SwapPreconditions {
914 expected_seq: 100,
915 expected_csr_version: 5,
916 require_deltas: true,
917 };
918
919 let result = preconditions.validate(100, 5, 0);
920 assert!(matches!(
921 result,
922 Err(SwapPreconditionError::EmptyDeltaBuffer)
923 ));
924 }
925
926 #[test]
927 fn test_swap_precondition_error_display() {
928 assert!(
929 SwapPreconditionError::SequenceMismatch {
930 expected: 1,
931 actual: 2
932 }
933 .to_string()
934 .contains("sequence")
935 );
936 assert!(
937 SwapPreconditionError::CsrVersionMismatch {
938 expected: 1,
939 actual: 2
940 }
941 .to_string()
942 .contains("CSR version")
943 );
944 assert!(
945 SwapPreconditionError::EmptyDeltaBuffer
946 .to_string()
947 .contains("empty")
948 );
949 }
950
951 #[test]
952 fn test_error_is_std_error() {
953 fn assert_error<E: std::error::Error>() {}
954 assert_error::<CompactionError>();
955 assert_error::<SwapPreconditionError>();
956 }
957}