use crate::obligation::crdt::{CrdtObligationLedger, LinearityViolation};
use crate::trace::distributed::lattice::LatticeState;
use crate::types::ObligationId;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt;
#[derive(Debug, Clone)]
pub struct RecoveryConfig {
pub stale_timeout_ns: u64,
pub max_resolutions_per_tick: usize,
pub auto_resolve_conflicts: bool,
pub auto_abort_violations: bool,
}
impl RecoveryConfig {
#[must_use]
pub fn default_for_test() -> Self {
Self {
stale_timeout_ns: 5_000_000_000, max_resolutions_per_tick: 100,
auto_resolve_conflicts: true,
auto_abort_violations: true,
}
}
}
impl Default for RecoveryConfig {
fn default() -> Self {
Self {
stale_timeout_ns: 30_000_000_000, max_resolutions_per_tick: 50,
auto_resolve_conflicts: true,
auto_abort_violations: true,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RecoveryPhase {
Idle,
Scanning,
Resolving,
}
impl fmt::Display for RecoveryPhase {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Idle => write!(f, "Idle"),
Self::Scanning => write!(f, "Scanning"),
Self::Resolving => write!(f, "Resolving"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RecoveryAction {
StaleAbort {
id: ObligationId,
age_ns: u64,
},
ConflictResolved {
id: ObligationId,
},
ViolationAborted {
id: ObligationId,
total_acquires: u64,
total_resolves: u64,
},
Flagged {
id: ObligationId,
reason: String,
},
}
impl fmt::Display for RecoveryAction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::StaleAbort { id, age_ns } => {
write!(f, "stale-abort {id:?} (age={age_ns}ns)")
}
Self::ConflictResolved { id } => {
write!(f, "conflict-resolved {id:?}")
}
Self::ViolationAborted {
id,
total_acquires,
total_resolves,
} => {
write!(
f,
"violation-aborted {id:?} (acquires={total_acquires}, resolves={total_resolves})"
)
}
Self::Flagged { id, reason } => {
write!(f, "flagged {id:?}: {reason}")
}
}
}
}
#[derive(Debug, Clone)]
pub struct RecoveryTickResult {
pub actions: Vec<RecoveryAction>,
pub remaining_pending: usize,
pub remaining_conflicts: usize,
pub remaining_violations: usize,
pub is_quiescent: bool,
}
impl RecoveryTickResult {
#[must_use]
pub fn action_count(&self) -> usize {
self.actions.len()
}
}
#[derive(Debug)]
pub struct RecoveryGovernor {
config: RecoveryConfig,
phase: RecoveryPhase,
first_seen_reserved: BTreeMap<ObligationId, u64>,
total_ticks: u64,
total_actions: u64,
}
impl RecoveryGovernor {
#[must_use]
pub fn new(config: RecoveryConfig) -> Self {
Self {
config,
phase: RecoveryPhase::Idle,
first_seen_reserved: BTreeMap::new(),
total_ticks: 0,
total_actions: 0,
}
}
#[must_use]
pub fn phase(&self) -> RecoveryPhase {
self.phase
}
#[must_use]
pub fn total_ticks(&self) -> u64 {
self.total_ticks
}
#[must_use]
pub fn total_actions(&self) -> u64 {
self.total_actions
}
pub fn tick(&mut self, ledger: &mut CrdtObligationLedger, now_ns: u64) -> RecoveryTickResult {
self.total_ticks += 1;
self.phase = RecoveryPhase::Scanning;
let mut actions = Vec::new();
let mut budget = self.config.max_resolutions_per_tick;
self.update_first_seen(ledger, now_ns);
self.phase = RecoveryPhase::Resolving;
if budget > 0 {
let stale_ids = self.find_stale(now_ns, budget);
for id in stale_ids {
if budget == 0 {
break;
}
ledger.record_abort(id);
let age = now_ns
.saturating_sub(self.first_seen_reserved.get(&id).copied().unwrap_or(now_ns));
actions.push(RecoveryAction::StaleAbort { id, age_ns: age });
self.first_seen_reserved.remove(&id);
budget -= 1;
}
}
let mut unresolved_conflicts = BTreeSet::new();
if budget > 0 {
let conflicts: Vec<ObligationId> = ledger
.conflicts_iter()
.take(budget)
.map(|(id, _)| id)
.collect();
for id in conflicts {
if budget == 0 {
break;
}
if self.config.auto_resolve_conflicts {
ledger.force_abort_repair(id);
actions.push(RecoveryAction::ConflictResolved { id });
} else {
unresolved_conflicts.insert(id);
actions.push(RecoveryAction::Flagged {
id,
reason: "conflict: Committed ⊔ Aborted".to_string(),
});
}
budget -= 1;
}
}
if budget > 0 {
let violations: Vec<LinearityViolation> =
ledger.linearity_violations_iter().take(budget).collect();
for v in violations {
if budget == 0 {
break;
}
if unresolved_conflicts.contains(&v.id) {
continue;
}
if self.config.auto_abort_violations {
ledger.force_abort_repair(v.id);
actions.push(RecoveryAction::ViolationAborted {
id: v.id,
total_acquires: v.total_acquires,
total_resolves: v.total_resolves,
});
} else {
actions.push(RecoveryAction::Flagged {
id: v.id,
reason: format!(
"linearity: acquires={}, resolves={}",
v.total_acquires, v.total_resolves
),
});
}
budget -= 1;
}
}
self.total_actions += actions.len() as u64;
self.phase = RecoveryPhase::Idle;
let remaining_pending = ledger.pending().len();
let remaining_conflicts = ledger.conflicts().len();
let remaining_violations = ledger.linearity_violations().len();
let is_quiescent =
remaining_pending == 0 && remaining_conflicts == 0 && remaining_violations == 0;
RecoveryTickResult {
actions,
remaining_pending,
remaining_conflicts,
remaining_violations,
is_quiescent,
}
}
fn update_first_seen(&mut self, ledger: &CrdtObligationLedger, now_ns: u64) {
self.first_seen_reserved
.retain(|id, _| ledger.get(id) == LatticeState::Reserved);
for id in ledger.pending() {
self.first_seen_reserved.entry(id).or_insert(now_ns);
}
}
fn find_stale(&self, now_ns: u64, limit: usize) -> Vec<ObligationId> {
self.first_seen_reserved
.iter()
.filter(|(_, first_seen)| {
now_ns.saturating_sub(**first_seen) >= self.config.stale_timeout_ns
})
.take(limit)
.map(|(id, _)| *id)
.collect()
}
pub fn reset(&mut self) {
self.phase = RecoveryPhase::Idle;
self.first_seen_reserved.clear();
self.total_ticks = 0;
self.total_actions = 0;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::obligation::crdt::CrdtObligationLedger;
use crate::record::ObligationKind;
use crate::remote::NodeId;
use crate::trace::distributed::crdt::Merge;
use crate::types::ObligationId;
fn oid(index: u32) -> ObligationId {
ObligationId::new_for_test(index, 0)
}
fn node(name: &str) -> NodeId {
NodeId::new(name)
}
fn test_config() -> RecoveryConfig {
RecoveryConfig {
stale_timeout_ns: 1000,
max_resolutions_per_tick: 100,
auto_resolve_conflicts: true,
auto_abort_violations: true,
}
}
#[test]
fn governor_starts_idle() {
let gov = RecoveryGovernor::new(test_config());
assert_eq!(gov.phase(), RecoveryPhase::Idle);
assert_eq!(gov.total_ticks(), 0);
}
#[test]
fn clean_ledger_is_quiescent() {
let mut gov = RecoveryGovernor::new(test_config());
let mut ledger = CrdtObligationLedger::new(node("A"));
ledger.record_acquire(oid(1), ObligationKind::SendPermit);
ledger.record_commit(oid(1));
let result = gov.tick(&mut ledger, 0);
assert!(result.is_quiescent);
assert_eq!(result.action_count(), 0);
}
#[test]
fn pending_obligation_not_stale_yet() {
let mut gov = RecoveryGovernor::new(test_config());
let mut ledger = CrdtObligationLedger::new(node("A"));
ledger.record_acquire(oid(1), ObligationKind::SendPermit);
let result = gov.tick(&mut ledger, 0);
assert!(!result.is_quiescent);
assert_eq!(result.action_count(), 0);
assert_eq!(result.remaining_pending, 1);
let result = gov.tick(&mut ledger, 500);
assert_eq!(result.action_count(), 0);
}
#[test]
fn stale_obligation_is_aborted() {
let mut gov = RecoveryGovernor::new(test_config());
let mut ledger = CrdtObligationLedger::new(node("A"));
ledger.record_acquire(oid(1), ObligationKind::SendPermit);
gov.tick(&mut ledger, 0);
let result = gov.tick(&mut ledger, 2000);
assert_eq!(result.action_count(), 1);
assert!(matches!(
&result.actions[0],
RecoveryAction::StaleAbort { id, age_ns } if *id == oid(1) && *age_ns >= 1000
));
assert_eq!(ledger.get(&oid(1)), LatticeState::Aborted);
}
#[test]
fn resolved_obligation_not_considered_stale() {
let mut gov = RecoveryGovernor::new(test_config());
let mut ledger = CrdtObligationLedger::new(node("A"));
ledger.record_acquire(oid(1), ObligationKind::SendPermit);
gov.tick(&mut ledger, 0);
ledger.record_commit(oid(1));
let result = gov.tick(&mut ledger, 2000);
assert_eq!(result.action_count(), 0);
assert_eq!(ledger.get(&oid(1)), LatticeState::Committed);
}
#[test]
fn conflict_auto_resolved_by_abort() {
let mut gov = RecoveryGovernor::new(test_config());
let mut a = CrdtObligationLedger::new(node("A"));
a.record_acquire(oid(1), ObligationKind::SendPermit);
a.record_commit(oid(1));
let mut b = CrdtObligationLedger::new(node("B"));
b.record_acquire(oid(1), ObligationKind::SendPermit);
b.record_abort(oid(1));
a.merge(&b);
assert_eq!(a.get(&oid(1)), LatticeState::Conflict);
let result = gov.tick(&mut a, 0);
assert!(
result
.actions
.iter()
.any(|a| matches!(a, RecoveryAction::ConflictResolved { .. }))
);
}
#[test]
fn conflict_repair_survives_stale_merge() {
let mut gov = RecoveryGovernor::new(test_config());
let mut a = CrdtObligationLedger::new(node("A"));
a.record_acquire(oid(1), ObligationKind::SendPermit);
a.record_commit(oid(1));
let mut b = CrdtObligationLedger::new(node("B"));
b.record_acquire(oid(1), ObligationKind::SendPermit);
b.record_abort(oid(1));
a.merge(&b);
let stale_conflict = a.clone();
assert_eq!(a.get(&oid(1)), LatticeState::Conflict);
let result = gov.tick(&mut a, 0);
assert!(
result
.actions
.iter()
.any(|a| matches!(a, RecoveryAction::ConflictResolved { .. }))
);
a.merge(&stale_conflict);
let repaired = a.get_entry(&oid(1)).expect("entry should exist");
assert_eq!(repaired.state, LatticeState::Aborted);
assert!(repaired.is_linear());
assert!(!repaired.is_conflict());
}
#[test]
fn conflict_flagged_when_auto_resolve_disabled() {
let mut config = test_config();
config.auto_resolve_conflicts = false;
let mut gov = RecoveryGovernor::new(config);
let mut a = CrdtObligationLedger::new(node("A"));
a.record_acquire(oid(1), ObligationKind::SendPermit);
a.record_commit(oid(1));
let mut b = CrdtObligationLedger::new(node("B"));
b.record_acquire(oid(1), ObligationKind::SendPermit);
b.record_abort(oid(1));
a.merge(&b);
let result = gov.tick(&mut a, 0);
assert_eq!(result.action_count(), 1);
assert!(matches!(
&result.actions[0],
RecoveryAction::Flagged { id, reason }
if *id == oid(1) && reason == "conflict: Committed ⊔ Aborted"
));
assert!(result.remaining_conflicts > 0);
assert_eq!(a.get(&oid(1)), LatticeState::Conflict);
}
#[test]
fn linearity_violation_auto_aborted() {
let mut gov = RecoveryGovernor::new(test_config());
let mut ledger = CrdtObligationLedger::new(node("A"));
ledger.record_acquire(oid(1), ObligationKind::SendPermit);
ledger.record_acquire(oid(1), ObligationKind::SendPermit);
let result = gov.tick(&mut ledger, 0);
assert!(
result
.actions
.iter()
.any(|a| matches!(a, RecoveryAction::ViolationAborted { .. }))
);
}
#[test]
fn linearity_violation_flagged_when_auto_disabled() {
let mut config = test_config();
config.auto_abort_violations = false;
let mut gov = RecoveryGovernor::new(config);
let mut ledger = CrdtObligationLedger::new(node("A"));
ledger.record_acquire(oid(1), ObligationKind::SendPermit);
ledger.record_acquire(oid(1), ObligationKind::SendPermit);
let result = gov.tick(&mut ledger, 0);
assert!(
result
.actions
.iter()
.any(|a| matches!(a, RecoveryAction::Flagged { .. }))
);
}
#[test]
fn repeated_ticks_converge_to_quiescence() {
let mut gov = RecoveryGovernor::new(test_config());
let mut ledger = CrdtObligationLedger::new(node("A"));
ledger.record_acquire(oid(1), ObligationKind::SendPermit); ledger.record_acquire(oid(2), ObligationKind::Ack);
ledger.record_acquire(oid(2), ObligationKind::Ack);
let mut b = CrdtObligationLedger::new(node("B"));
b.record_acquire(oid(3), ObligationKind::Lease);
b.record_commit(oid(3));
let mut c = CrdtObligationLedger::new(node("C"));
c.record_acquire(oid(3), ObligationKind::Lease);
c.record_abort(oid(3));
b.merge(&c); ledger.merge(&b);
let r1 = gov.tick(&mut ledger, 0);
assert!(r1.action_count() > 0);
let _r2 = gov.tick(&mut ledger, 2000);
let r3 = gov.tick(&mut ledger, 3000);
assert!(
r3.is_quiescent,
"not quiescent: pending={}, conflicts={}, violations={}",
r3.remaining_pending, r3.remaining_conflicts, r3.remaining_violations
);
}
#[test]
fn convergence_is_monotonic() {
let mut gov = RecoveryGovernor::new(test_config());
let mut ledger = CrdtObligationLedger::new(node("A"));
for i in 0..5 {
ledger.record_acquire(oid(i), ObligationKind::SendPermit);
}
gov.tick(&mut ledger, 0);
let result = gov.tick(&mut ledger, 2000);
assert_eq!(result.remaining_pending, 0);
let result2 = gov.tick(&mut ledger, 3000);
assert_eq!(result2.action_count(), 0);
assert!(result2.is_quiescent);
}
#[test]
fn max_resolutions_per_tick_respected() {
let mut config = test_config();
config.max_resolutions_per_tick = 2;
let mut gov = RecoveryGovernor::new(config);
let mut ledger = CrdtObligationLedger::new(node("A"));
for i in 0..5 {
ledger.record_acquire(oid(i), ObligationKind::SendPermit);
}
gov.tick(&mut ledger, 0);
let result = gov.tick(&mut ledger, 2000);
assert_eq!(result.action_count(), 2);
assert_eq!(result.remaining_pending, 3);
let result2 = gov.tick(&mut ledger, 3000);
assert_eq!(result2.action_count(), 2);
assert_eq!(result2.remaining_pending, 1);
}
#[test]
fn reset_clears_state() {
let mut gov = RecoveryGovernor::new(test_config());
let mut ledger = CrdtObligationLedger::new(node("A"));
ledger.record_acquire(oid(1), ObligationKind::SendPermit);
gov.tick(&mut ledger, 0);
gov.reset();
assert_eq!(gov.phase(), RecoveryPhase::Idle);
assert_eq!(gov.total_ticks(), 0);
assert_eq!(gov.total_actions(), 0);
}
#[test]
fn recovery_action_display() {
let action = RecoveryAction::StaleAbort {
id: oid(1),
age_ns: 5000,
};
let display = format!("{action}");
assert!(display.contains("stale-abort"));
assert!(display.contains("5000"));
}
#[test]
fn recovery_phase_display() {
assert_eq!(format!("{}", RecoveryPhase::Idle), "Idle");
assert_eq!(format!("{}", RecoveryPhase::Scanning), "Scanning");
assert_eq!(format!("{}", RecoveryPhase::Resolving), "Resolving");
}
#[test]
fn partition_heal_converges() {
let mut gov = RecoveryGovernor::new(test_config());
let mut a = CrdtObligationLedger::new(node("A"));
a.record_acquire(oid(1), ObligationKind::SendPermit);
a.record_commit(oid(1));
let mut b = CrdtObligationLedger::new(node("B"));
b.record_acquire(oid(1), ObligationKind::SendPermit);
gov.tick(&mut b, 0);
let _result = gov.tick(&mut b, 2000);
assert_eq!(b.get(&oid(1)), LatticeState::Aborted);
a.merge(&b);
assert_eq!(a.get(&oid(1)), LatticeState::Conflict);
let mut gov2 = RecoveryGovernor::new(test_config());
let result = gov2.tick(&mut a, 0);
assert!(
result
.actions
.iter()
.any(|a| matches!(a, RecoveryAction::ConflictResolved { .. }))
);
}
#[test]
fn recovery_config_debug_clone_default() {
let c = RecoveryConfig::default();
let dbg = format!("{c:?}");
assert!(dbg.contains("RecoveryConfig"));
let c2 = c;
assert_eq!(c2.stale_timeout_ns, 30_000_000_000);
assert_eq!(c2.max_resolutions_per_tick, 50);
let c3 = RecoveryConfig::default_for_test();
assert_eq!(c3.stale_timeout_ns, 5_000_000_000);
}
#[test]
fn recovery_phase_debug_clone_copy_eq() {
let p = RecoveryPhase::Idle;
let dbg = format!("{p:?}");
assert!(dbg.contains("Idle"));
let p2 = p;
assert_eq!(p, p2);
let p3 = p;
assert_eq!(p, p3);
assert_ne!(RecoveryPhase::Idle, RecoveryPhase::Scanning);
}
#[test]
fn recovery_action_debug_clone_eq() {
let a = RecoveryAction::ConflictResolved { id: oid(42) };
let dbg = format!("{a:?}");
assert!(dbg.contains("ConflictResolved"));
let a2 = a.clone();
assert_eq!(a, a2);
let a3 = RecoveryAction::Flagged {
id: oid(1),
reason: "test".into(),
};
assert_ne!(a, a3);
}
#[test]
fn recovery_tick_result_debug_clone() {
let r = RecoveryTickResult {
actions: vec![],
remaining_pending: 0,
remaining_conflicts: 0,
remaining_violations: 0,
is_quiescent: true,
};
let dbg = format!("{r:?}");
assert!(dbg.contains("RecoveryTickResult"));
let r2 = r;
assert!(r2.is_quiescent);
assert!(r2.actions.is_empty());
}
}