use std::fmt;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CompactionError {
ConcurrentModification {
expected_seq: u64,
actual_seq: u64,
direction: Direction,
},
ForwardSwapFailed {
reason: SwapFailureReason,
},
ReverseSwapFailed {
reason: SwapFailureReason,
rollback_successful: bool,
},
CounterReconcileFailed {
active_guards: usize,
forward_swapped: bool,
reverse_swapped: bool,
},
Interrupted {
reason: InterruptReason,
edges_processed: usize,
edges_total: usize,
},
BuildFailed {
direction: Direction,
reason: BuildFailureReason,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Direction {
Forward,
Reverse,
}
impl fmt::Display for Direction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Forward => write!(f, "forward"),
Self::Reverse => write!(f, "reverse"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SwapFailureReason {
ValidationFailed {
message: String,
},
AllocationFailed,
InvariantViolation {
message: String,
},
}
impl fmt::Display for SwapFailureReason {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::ValidationFailed { message } => write!(f, "validation failed: {message}"),
Self::AllocationFailed => write!(f, "memory allocation failed"),
Self::InvariantViolation { message } => write!(f, "invariant violation: {message}"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum InterruptReason {
ShutdownRequested,
Cancelled,
CancellationRequested,
Timeout {
elapsed_ms: u64,
limit_ms: u64,
},
}
impl fmt::Display for InterruptReason {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::ShutdownRequested => write!(f, "shutdown requested"),
Self::Cancelled => write!(f, "cancelled"),
Self::CancellationRequested => write!(f, "cancellation requested"),
Self::Timeout {
elapsed_ms,
limit_ms,
} => {
write!(f, "timeout after {elapsed_ms}ms (limit: {limit_ms}ms)")
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BuildFailureReason {
InsufficientEdges {
count: usize,
minimum: usize,
},
InvalidEdgeData {
message: String,
},
AllocationFailed,
BuilderError {
message: String,
},
}
impl fmt::Display for BuildFailureReason {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InsufficientEdges { count, minimum } => {
write!(f, "insufficient edges: {count} (minimum: {minimum})")
}
Self::InvalidEdgeData { message } => write!(f, "invalid edge data: {message}"),
Self::AllocationFailed => write!(f, "memory allocation failed"),
Self::BuilderError { message } => write!(f, "builder error: {message}"),
}
}
}
impl fmt::Display for CompactionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::ConcurrentModification {
expected_seq,
actual_seq,
direction,
} => {
write!(
f,
"concurrent modification in {direction} store: expected seq {expected_seq}, found {actual_seq}"
)
}
Self::ForwardSwapFailed { reason } => {
write!(f, "forward CSR swap failed: {reason}")
}
Self::ReverseSwapFailed {
reason,
rollback_successful,
} => {
let rollback_status = if *rollback_successful {
"rollback succeeded"
} else {
"ROLLBACK FAILED"
};
write!(f, "reverse CSR swap failed: {reason} ({rollback_status})")
}
Self::CounterReconcileFailed {
active_guards,
forward_swapped,
reverse_swapped,
} => {
let swap_status = match (forward_swapped, reverse_swapped) {
(true, true) => "both CSRs swapped",
(true, false) => "forward CSR swapped",
(false, true) => "reverse CSR swapped",
(false, false) => "no CSRs swapped",
};
write!(
f,
"counter reconciliation failed: {active_guards} active guards prevented reset ({swap_status})"
)
}
Self::Interrupted {
reason,
edges_processed,
edges_total,
} => {
write!(
f,
"compaction interrupted: {reason} ({edges_processed}/{edges_total} edges processed)"
)
}
Self::BuildFailed { direction, reason } => {
write!(f, "{direction} CSR build failed: {reason}")
}
}
}
}
impl std::error::Error for CompactionError {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PostErrorState {
pub forward_csr: ComponentState,
pub forward_deltas: ComponentState,
pub forward_seq: ComponentState,
pub reverse_csr: ComponentState,
pub reverse_deltas: ComponentState,
pub reverse_seq: ComponentState,
pub committed: ComponentState,
pub reserved: ComponentState,
pub counter_reconciled: CounterReconcileState,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ComponentState {
Unchanged,
RolledBack,
Restored,
Success,
Cleared,
Reset,
Stale,
}
impl fmt::Display for ComponentState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Unchanged => write!(f, "UNCHANGED"),
Self::RolledBack => write!(f, "ROLLED BACK"),
Self::Restored => write!(f, "RESTORED"),
Self::Success => write!(f, "SUCCESS"),
Self::Cleared => write!(f, "CLEARED"),
Self::Reset => write!(f, "RESET"),
Self::Stale => write!(f, "STALE"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CounterReconcileState {
NotApplicable,
Yes,
NoLogged,
}
impl fmt::Display for CounterReconcileState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NotApplicable => write!(f, "N/A"),
Self::Yes => write!(f, "YES"),
Self::NoLogged => write!(f, "NO - LOGGED"),
}
}
}
impl CompactionError {
#[must_use]
pub fn post_error_state(&self) -> PostErrorState {
match self {
Self::ConcurrentModification { .. } | Self::ForwardSwapFailed { .. } => {
PostErrorState {
forward_csr: ComponentState::Unchanged,
forward_deltas: ComponentState::Unchanged,
forward_seq: ComponentState::Unchanged,
reverse_csr: ComponentState::Unchanged,
reverse_deltas: ComponentState::Unchanged,
reverse_seq: ComponentState::Unchanged,
committed: ComponentState::Unchanged,
reserved: ComponentState::Unchanged,
counter_reconciled: CounterReconcileState::NotApplicable,
}
}
Self::ReverseSwapFailed { .. } => PostErrorState {
forward_csr: ComponentState::RolledBack,
forward_deltas: ComponentState::RolledBack,
forward_seq: ComponentState::RolledBack,
reverse_csr: ComponentState::Unchanged,
reverse_deltas: ComponentState::Unchanged,
reverse_seq: ComponentState::Unchanged,
committed: ComponentState::Restored,
reserved: ComponentState::Restored,
counter_reconciled: CounterReconcileState::Yes,
},
Self::CounterReconcileFailed { .. } => PostErrorState {
forward_csr: ComponentState::Success,
forward_deltas: ComponentState::Cleared,
forward_seq: ComponentState::Reset,
reverse_csr: ComponentState::Success,
reverse_deltas: ComponentState::Cleared,
reverse_seq: ComponentState::Reset,
committed: ComponentState::Stale,
reserved: ComponentState::Unchanged,
counter_reconciled: CounterReconcileState::NoLogged,
},
Self::Interrupted { .. } | Self::BuildFailed { .. } => PostErrorState {
forward_csr: ComponentState::Unchanged,
forward_deltas: ComponentState::Unchanged,
forward_seq: ComponentState::Unchanged,
reverse_csr: ComponentState::Unchanged,
reverse_deltas: ComponentState::Unchanged,
reverse_seq: ComponentState::Unchanged,
committed: ComponentState::Unchanged,
reserved: ComponentState::Unchanged,
counter_reconciled: CounterReconcileState::NotApplicable,
},
}
}
#[must_use]
pub fn phase(&self) -> CompactionPhase {
match self {
Self::ConcurrentModification { .. } => CompactionPhase::Phase2Start,
Self::ForwardSwapFailed { .. } => CompactionPhase::Phase2Forward,
Self::ReverseSwapFailed { .. } => CompactionPhase::Phase2Reverse,
Self::CounterReconcileFailed { .. } => CompactionPhase::Phase2PostSwap,
Self::Interrupted { .. } | Self::BuildFailed { .. } => CompactionPhase::Phase1,
}
}
#[must_use]
pub fn is_fully_consistent(&self) -> bool {
!matches!(self, Self::CounterReconcileFailed { .. })
}
#[must_use]
pub fn should_retry(&self) -> bool {
matches!(
self,
Self::ConcurrentModification { .. } | Self::Interrupted { .. }
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompactionPhase {
Phase1,
Phase2Start,
Phase2Forward,
Phase2Reverse,
Phase2PostSwap,
}
impl fmt::Display for CompactionPhase {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Phase1 => write!(f, "Phase 1 (prepare)"),
Self::Phase2Start => write!(f, "Phase 2 start (lock acquisition)"),
Self::Phase2Forward => write!(f, "Phase 2 forward (CSR swap)"),
Self::Phase2Reverse => write!(f, "Phase 2 reverse (CSR swap)"),
Self::Phase2PostSwap => write!(f, "Phase 2 post-swap (counter reconciliation)"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SwapPreconditions {
pub expected_seq: u64,
pub expected_csr_version: u64,
pub require_deltas: bool,
}
impl SwapPreconditions {
pub fn validate(
&self,
actual_seq: u64,
actual_csr_version: u64,
delta_count: usize,
) -> Result<(), SwapPreconditionError> {
if actual_seq != self.expected_seq {
return Err(SwapPreconditionError::SequenceMismatch {
expected: self.expected_seq,
actual: actual_seq,
});
}
if actual_csr_version != self.expected_csr_version {
return Err(SwapPreconditionError::CsrVersionMismatch {
expected: self.expected_csr_version,
actual: actual_csr_version,
});
}
if self.require_deltas && delta_count == 0 {
return Err(SwapPreconditionError::EmptyDeltaBuffer);
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SwapPreconditionError {
SequenceMismatch {
expected: u64,
actual: u64,
},
CsrVersionMismatch {
expected: u64,
actual: u64,
},
EmptyDeltaBuffer,
}
impl fmt::Display for SwapPreconditionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::SequenceMismatch { expected, actual } => {
write!(f, "sequence mismatch: expected {expected}, actual {actual}")
}
Self::CsrVersionMismatch { expected, actual } => {
write!(
f,
"CSR version mismatch: expected {expected}, actual {actual}"
)
}
Self::EmptyDeltaBuffer => write!(f, "delta buffer is empty"),
}
}
}
impl std::error::Error for SwapPreconditionError {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_concurrent_modification_error() {
let error = CompactionError::ConcurrentModification {
expected_seq: 100,
actual_seq: 105,
direction: Direction::Forward,
};
assert!(error.to_string().contains("concurrent modification"));
assert!(error.to_string().contains("forward"));
assert!(error.to_string().contains("100"));
assert!(error.to_string().contains("105"));
assert_eq!(error.phase(), CompactionPhase::Phase2Start);
assert!(error.is_fully_consistent());
assert!(error.should_retry());
}
#[test]
fn test_forward_swap_failed_error() {
let error = CompactionError::ForwardSwapFailed {
reason: SwapFailureReason::ValidationFailed {
message: "invalid node count".to_string(),
},
};
assert!(error.to_string().contains("forward CSR swap failed"));
assert!(error.to_string().contains("validation failed"));
assert_eq!(error.phase(), CompactionPhase::Phase2Forward);
assert!(error.is_fully_consistent());
assert!(!error.should_retry());
}
#[test]
fn test_reverse_swap_failed_error() {
let error = CompactionError::ReverseSwapFailed {
reason: SwapFailureReason::AllocationFailed,
rollback_successful: true,
};
assert!(error.to_string().contains("reverse CSR swap failed"));
assert!(error.to_string().contains("rollback succeeded"));
assert_eq!(error.phase(), CompactionPhase::Phase2Reverse);
assert!(error.is_fully_consistent());
assert!(!error.should_retry());
}
#[test]
fn test_reverse_swap_failed_rollback_failed() {
let error = CompactionError::ReverseSwapFailed {
reason: SwapFailureReason::InvariantViolation {
message: "edge count mismatch".to_string(),
},
rollback_successful: false,
};
assert!(error.to_string().contains("ROLLBACK FAILED"));
assert!(error.is_fully_consistent()); }
#[test]
fn test_counter_reconcile_failed_error() {
let error = CompactionError::CounterReconcileFailed {
active_guards: 2,
forward_swapped: true,
reverse_swapped: true,
};
assert!(error.to_string().contains("counter reconciliation failed"));
assert!(error.to_string().contains("2 active guards"));
assert!(error.to_string().contains("both CSRs swapped"));
assert_eq!(error.phase(), CompactionPhase::Phase2PostSwap);
assert!(!error.is_fully_consistent()); assert!(!error.should_retry());
}
#[test]
fn test_interrupted_error() {
let error = CompactionError::Interrupted {
reason: InterruptReason::Timeout {
elapsed_ms: 5000,
limit_ms: 3000,
},
edges_processed: 500,
edges_total: 1000,
};
assert!(error.to_string().contains("compaction interrupted"));
assert!(error.to_string().contains("timeout"));
assert!(error.to_string().contains("500/1000"));
assert_eq!(error.phase(), CompactionPhase::Phase1);
assert!(error.is_fully_consistent());
assert!(error.should_retry());
}
#[test]
fn test_build_failed_error() {
let error = CompactionError::BuildFailed {
direction: Direction::Reverse,
reason: BuildFailureReason::InsufficientEdges {
count: 0,
minimum: 1,
},
};
assert!(error.to_string().contains("reverse CSR build failed"));
assert!(error.to_string().contains("insufficient edges"));
assert_eq!(error.phase(), CompactionPhase::Phase1);
assert!(error.is_fully_consistent());
assert!(!error.should_retry());
}
#[test]
fn test_post_error_state_concurrent_modification() {
let error = CompactionError::ConcurrentModification {
expected_seq: 1,
actual_seq: 2,
direction: Direction::Forward,
};
let state = error.post_error_state();
assert_eq!(state.forward_csr, ComponentState::Unchanged);
assert_eq!(state.reverse_csr, ComponentState::Unchanged);
assert_eq!(state.committed, ComponentState::Unchanged);
assert_eq!(
state.counter_reconciled,
CounterReconcileState::NotApplicable
);
}
#[test]
fn test_post_error_state_reverse_swap_failed() {
let error = CompactionError::ReverseSwapFailed {
reason: SwapFailureReason::AllocationFailed,
rollback_successful: true,
};
let state = error.post_error_state();
assert_eq!(state.forward_csr, ComponentState::RolledBack);
assert_eq!(state.forward_deltas, ComponentState::RolledBack);
assert_eq!(state.reverse_csr, ComponentState::Unchanged);
assert_eq!(state.committed, ComponentState::Restored);
assert_eq!(state.counter_reconciled, CounterReconcileState::Yes);
}
#[test]
fn test_post_error_state_counter_reconcile_failed() {
let error = CompactionError::CounterReconcileFailed {
active_guards: 1,
forward_swapped: true,
reverse_swapped: true,
};
let state = error.post_error_state();
assert_eq!(state.forward_csr, ComponentState::Success);
assert_eq!(state.forward_deltas, ComponentState::Cleared);
assert_eq!(state.forward_seq, ComponentState::Reset);
assert_eq!(state.committed, ComponentState::Stale);
assert_eq!(state.counter_reconciled, CounterReconcileState::NoLogged);
}
#[test]
fn test_direction_display() {
assert_eq!(Direction::Forward.to_string(), "forward");
assert_eq!(Direction::Reverse.to_string(), "reverse");
}
#[test]
fn test_swap_failure_reason_display() {
assert!(
SwapFailureReason::AllocationFailed
.to_string()
.contains("allocation")
);
assert!(
SwapFailureReason::ValidationFailed {
message: "test".to_string()
}
.to_string()
.contains("test")
);
}
#[test]
fn test_interrupt_reason_display() {
assert!(
InterruptReason::ShutdownRequested
.to_string()
.contains("shutdown")
);
assert!(InterruptReason::Cancelled.to_string().contains("cancelled"));
assert!(
InterruptReason::Timeout {
elapsed_ms: 100,
limit_ms: 50
}
.to_string()
.contains("100ms")
);
}
#[test]
fn test_build_failure_reason_display() {
assert!(
BuildFailureReason::AllocationFailed
.to_string()
.contains("allocation")
);
assert!(
BuildFailureReason::InsufficientEdges {
count: 0,
minimum: 1
}
.to_string()
.contains("insufficient")
);
}
#[test]
fn test_compaction_phase_display() {
assert!(CompactionPhase::Phase1.to_string().contains("Phase 1"));
assert!(
CompactionPhase::Phase2Start
.to_string()
.contains("Phase 2 start")
);
assert!(
CompactionPhase::Phase2Forward
.to_string()
.contains("forward")
);
}
#[test]
fn test_component_state_display() {
assert_eq!(ComponentState::Unchanged.to_string(), "UNCHANGED");
assert_eq!(ComponentState::RolledBack.to_string(), "ROLLED BACK");
assert_eq!(ComponentState::Restored.to_string(), "RESTORED");
assert_eq!(ComponentState::Success.to_string(), "SUCCESS");
assert_eq!(ComponentState::Cleared.to_string(), "CLEARED");
assert_eq!(ComponentState::Reset.to_string(), "RESET");
assert_eq!(ComponentState::Stale.to_string(), "STALE");
}
#[test]
fn test_counter_reconcile_state_display() {
assert_eq!(CounterReconcileState::NotApplicable.to_string(), "N/A");
assert_eq!(CounterReconcileState::Yes.to_string(), "YES");
assert_eq!(CounterReconcileState::NoLogged.to_string(), "NO - LOGGED");
}
#[test]
fn test_swap_preconditions_validate_success() {
let preconditions = SwapPreconditions {
expected_seq: 100,
expected_csr_version: 5,
require_deltas: true,
};
assert!(preconditions.validate(100, 5, 10).is_ok());
}
#[test]
fn test_swap_preconditions_validate_seq_mismatch() {
let preconditions = SwapPreconditions {
expected_seq: 100,
expected_csr_version: 5,
require_deltas: false,
};
let result = preconditions.validate(101, 5, 0);
assert!(matches!(
result,
Err(SwapPreconditionError::SequenceMismatch {
expected: 100,
actual: 101
})
));
}
#[test]
fn test_swap_preconditions_validate_csr_version_mismatch() {
let preconditions = SwapPreconditions {
expected_seq: 100,
expected_csr_version: 5,
require_deltas: false,
};
let result = preconditions.validate(100, 6, 0);
assert!(matches!(
result,
Err(SwapPreconditionError::CsrVersionMismatch {
expected: 5,
actual: 6
})
));
}
#[test]
fn test_swap_preconditions_validate_empty_deltas() {
let preconditions = SwapPreconditions {
expected_seq: 100,
expected_csr_version: 5,
require_deltas: true,
};
let result = preconditions.validate(100, 5, 0);
assert!(matches!(
result,
Err(SwapPreconditionError::EmptyDeltaBuffer)
));
}
#[test]
fn test_swap_precondition_error_display() {
assert!(
SwapPreconditionError::SequenceMismatch {
expected: 1,
actual: 2
}
.to_string()
.contains("sequence")
);
assert!(
SwapPreconditionError::CsrVersionMismatch {
expected: 1,
actual: 2
}
.to_string()
.contains("CSR version")
);
assert!(
SwapPreconditionError::EmptyDeltaBuffer
.to_string()
.contains("empty")
);
}
#[test]
fn test_error_is_std_error() {
fn assert_error<E: std::error::Error>() {}
assert_error::<CompactionError>();
assert_error::<SwapPreconditionError>();
}
}