use crate::record::ObligationKind;
use crate::remote::NodeId;
use crate::trace::distributed::crdt::Merge;
use crate::trace::distributed::lattice::LatticeState;
use crate::types::ObligationId;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CrdtObligationEntry {
pub state: LatticeState,
pub witnesses: BTreeMap<NodeId, LatticeState>,
repair_nodes: BTreeSet<NodeId>,
pub kind: Option<ObligationKind>,
acquire_counts: BTreeMap<NodeId, u64>,
resolve_counts: BTreeMap<NodeId, u64>,
}
impl CrdtObligationEntry {
fn new() -> Self {
Self {
state: LatticeState::Unknown,
witnesses: BTreeMap::new(),
repair_nodes: BTreeSet::new(),
kind: None,
acquire_counts: BTreeMap::new(),
resolve_counts: BTreeMap::new(),
}
}
#[must_use]
pub fn total_acquires(&self) -> u64 {
self.acquire_counts.values().sum()
}
#[must_use]
pub fn total_resolves(&self) -> u64 {
self.resolve_counts.values().sum()
}
#[must_use]
pub fn is_linear(&self) -> bool {
let acq = self.total_acquires();
let res = self.total_resolves();
acq <= 1 && res <= acq
}
#[must_use]
pub fn is_terminal(&self) -> bool {
self.state.is_terminal()
}
#[must_use]
pub fn is_conflict(&self) -> bool {
self.state.is_conflict()
}
fn merge_entry(&mut self, other: &Self) {
self.state = self.state.join(other.state);
for (node, &other_state) in &other.witnesses {
let entry = self
.witnesses
.entry(node.clone())
.or_insert(LatticeState::Unknown);
*entry = entry.join(other_state);
}
match (self.kind, other.kind) {
(None, rhs) => {
self.kind = rhs;
}
(Some(lhs), Some(rhs)) if lhs != rhs => {
self.state = self.state.join(LatticeState::Conflict);
self.kind = Some(lhs.min(rhs));
}
_ => {}
}
for (node, &count) in &other.acquire_counts {
let entry = self.acquire_counts.entry(node.clone()).or_insert(0);
*entry = (*entry).max(count);
}
for (node, &count) in &other.resolve_counts {
let entry = self.resolve_counts.entry(node.clone()).or_insert(0);
*entry = (*entry).max(count);
}
self.repair_nodes.extend(other.repair_nodes.iter().cloned());
self.normalize_repair_tombstone();
}
fn repair_owner(&self) -> Option<&NodeId> {
self.repair_nodes.iter().next()
}
fn is_repair_tombstone(&self) -> bool {
let Some(owner) = self.repair_owner() else {
return false;
};
self.state == LatticeState::Aborted
&& self.witnesses.len() == 1
&& self.witnesses.get(owner).copied() == Some(LatticeState::Aborted)
&& self.acquire_counts.len() == 1
&& self.acquire_counts.get(owner).copied() == Some(1)
&& self.resolve_counts.len() == 1
&& self.resolve_counts.get(owner).copied() == Some(1)
}
fn normalize_repair_tombstone(&mut self) {
let Some(owner) = self.repair_owner().cloned() else {
return;
};
self.state = LatticeState::Aborted;
self.witnesses.clear();
self.witnesses.insert(owner.clone(), LatticeState::Aborted);
self.acquire_counts.clear();
self.acquire_counts.insert(owner.clone(), 1);
self.resolve_counts.clear();
self.resolve_counts.insert(owner, 1);
}
fn is_compact_tombstone_for(&self, local_node: &NodeId) -> bool {
if !self.repair_nodes.is_empty() {
return self.is_repair_tombstone();
}
let witness_ok = self.witnesses.len() == 1
&& self.witnesses.get(local_node).copied() == Some(self.state);
let acquire_ok = self.acquire_counts.len() == 1
&& self.acquire_counts.get(local_node).copied() == Some(1);
let resolve_ok = self.resolve_counts.len() == 1
&& self.resolve_counts.get(local_node).copied() == Some(1);
witness_ok && acquire_ok && resolve_ok
}
fn compact_terminal_tombstone(&mut self, local_node: &NodeId) -> bool {
if self.is_compact_tombstone_for(local_node) {
return false;
}
if !self.repair_nodes.is_empty() {
self.normalize_repair_tombstone();
return true;
}
self.witnesses.clear();
self.witnesses.insert(local_node.clone(), self.state);
self.acquire_counts.clear();
self.resolve_counts.clear();
self.acquire_counts.insert(local_node.clone(), 1);
self.resolve_counts.insert(local_node.clone(), 1);
true
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CrdtObligationLedger {
local_node: NodeId,
entries: BTreeMap<ObligationId, CrdtObligationEntry>,
}
impl CrdtObligationLedger {
#[must_use]
pub fn new(local_node: NodeId) -> Self {
Self {
local_node,
entries: BTreeMap::new(),
}
}
pub fn record_acquire(&mut self, id: ObligationId, kind: ObligationKind) -> LatticeState {
let entry = self
.entries
.entry(id)
.or_insert_with(CrdtObligationEntry::new);
if entry.is_terminal() {
return entry.state;
}
match entry.kind {
None => {
entry.kind = Some(kind);
}
Some(existing_kind) if existing_kind != kind => {
entry.state = entry.state.join(LatticeState::Conflict);
let witness = entry
.witnesses
.entry(self.local_node.clone())
.or_insert(LatticeState::Unknown);
*witness = witness.join(LatticeState::Conflict);
return entry.state;
}
Some(_) => {}
}
*entry
.acquire_counts
.entry(self.local_node.clone())
.or_insert(0) += 1;
entry.state = entry.state.join(LatticeState::Reserved);
let witness = entry
.witnesses
.entry(self.local_node.clone())
.or_insert(LatticeState::Unknown);
*witness = witness.join(LatticeState::Reserved);
entry.state
}
pub fn record_commit(&mut self, id: ObligationId) -> LatticeState {
self.record_resolve(id, LatticeState::Committed)
}
pub fn record_abort(&mut self, id: ObligationId) -> LatticeState {
self.record_resolve(id, LatticeState::Aborted)
}
pub fn force_abort_repair(&mut self, id: ObligationId) {
let Some(entry) = self.entries.get_mut(&id) else {
return;
};
if !entry.is_conflict() && entry.is_linear() {
return;
}
entry.repair_nodes.insert(self.local_node.clone());
entry.normalize_repair_tombstone();
}
fn record_resolve(&mut self, id: ObligationId, terminal: LatticeState) -> LatticeState {
let entry = self
.entries
.entry(id)
.or_insert_with(CrdtObligationEntry::new);
if !entry.repair_nodes.is_empty() {
return entry.state;
}
entry.state = entry.state.join(terminal);
let witness = entry
.witnesses
.entry(self.local_node.clone())
.or_insert(LatticeState::Unknown);
*witness = witness.join(terminal);
*entry
.resolve_counts
.entry(self.local_node.clone())
.or_insert(0) += 1;
entry.normalize_repair_tombstone();
entry.state
}
#[must_use]
pub fn get(&self, id: &ObligationId) -> LatticeState {
self.entries
.get(id)
.map_or(LatticeState::Unknown, |e| e.state)
}
#[must_use]
pub fn get_entry(&self, id: &ObligationId) -> Option<&CrdtObligationEntry> {
self.entries.get(id)
}
#[must_use]
pub fn local_node(&self) -> &NodeId {
&self.local_node
}
#[must_use]
pub fn len(&self) -> usize {
self.entries.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
#[must_use]
pub fn pending(&self) -> Vec<ObligationId> {
self.entries
.iter()
.filter(|(_, e)| e.state == LatticeState::Reserved)
.map(|(id, _)| *id)
.collect()
}
#[must_use]
pub fn conflicts(&self) -> Vec<(ObligationId, &CrdtObligationEntry)> {
self.conflicts_iter().collect()
}
pub fn conflicts_iter(&self) -> impl Iterator<Item = (ObligationId, &CrdtObligationEntry)> {
self.entries
.iter()
.filter(|(_, e)| e.state.is_conflict())
.map(|(id, e)| (*id, e))
}
#[must_use]
pub fn linearity_violations(&self) -> Vec<LinearityViolation> {
self.linearity_violations_iter().collect()
}
pub fn linearity_violations_iter(&self) -> impl Iterator<Item = LinearityViolation> + '_ {
self.entries
.iter()
.filter(|(_, e)| !e.is_linear())
.map(|(id, e)| LinearityViolation {
id: *id,
total_acquires: e.total_acquires(),
total_resolves: e.total_resolves(),
witnesses: e.witnesses.clone(),
})
}
#[must_use]
pub fn is_sound(&self) -> bool {
self.entries
.values()
.all(|e| e.is_linear() && !e.is_conflict())
}
pub fn compact(&mut self) -> usize {
let mut compacted = 0;
for entry in self.entries.values_mut() {
if entry.is_terminal()
&& entry.is_linear()
&& !entry.is_conflict()
&& entry.compact_terminal_tombstone(&self.local_node)
{
compacted += 1;
}
}
compacted
}
#[must_use]
pub fn snapshot(&self) -> LedgerSnapshot {
let total = self.entries.len();
let pending = self
.entries
.values()
.filter(|e| e.state == LatticeState::Reserved)
.count();
let committed = self
.entries
.values()
.filter(|e| e.state == LatticeState::Committed)
.count();
let aborted = self
.entries
.values()
.filter(|e| e.state == LatticeState::Aborted)
.count();
let conflicts = self.entries.values().filter(|e| e.is_conflict()).count();
let linearity_violations = self.entries.values().filter(|e| !e.is_linear()).count();
LedgerSnapshot {
node: self.local_node.clone(),
total,
pending,
committed,
aborted,
conflicts,
linearity_violations,
}
}
}
impl Merge for CrdtObligationLedger {
fn merge(&mut self, other: &Self) {
for (id, other_entry) in &other.entries {
let entry = self
.entries
.entry(*id)
.or_insert_with(CrdtObligationEntry::new);
entry.merge_entry(other_entry);
}
}
}
#[derive(Debug, Clone)]
pub struct LinearityViolation {
pub id: ObligationId,
pub total_acquires: u64,
pub total_resolves: u64,
pub witnesses: BTreeMap<NodeId, LatticeState>,
}
impl fmt::Display for LinearityViolation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"obligation {:?}: acquires={}, resolves={}, witnesses={:?}",
self.id, self.total_acquires, self.total_resolves, self.witnesses
)
}
}
#[derive(Debug, Clone)]
pub struct LedgerSnapshot {
pub node: NodeId,
pub total: usize,
pub pending: usize,
pub committed: usize,
pub aborted: usize,
pub conflicts: usize,
pub linearity_violations: usize,
}
impl fmt::Display for LedgerSnapshot {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"[{}] total={} pending={} committed={} aborted={} conflicts={} violations={}",
self.node,
self.total,
self.pending,
self.committed,
self.aborted,
self.conflicts,
self.linearity_violations
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::remote::NodeId;
use crate::types::ObligationId;
use proptest::prelude::*;
fn oid(index: u32) -> ObligationId {
ObligationId::new_for_test(index, 0)
}
fn node(name: &str) -> NodeId {
NodeId::new(name)
}
#[derive(Clone, Copy, Debug)]
struct TraceOp {
id_index: u8,
action: u8,
}
fn apply_trace(ledger: &mut CrdtObligationLedger, ops: &[TraceOp]) {
for op in ops {
let id = oid(u32::from(op.id_index % 6) + 1);
match op.action % 7 {
0 => {
let kind = match op.id_index % 4 {
0 => ObligationKind::SendPermit,
1 => ObligationKind::Ack,
2 => ObligationKind::Lease,
_ => ObligationKind::IoOp,
};
let _ = ledger.record_acquire(id, kind);
}
1 => {
let _ = ledger.record_commit(id);
}
2 => {
let _ = ledger.record_abort(id);
}
3 => {
let _ = ledger.record_acquire(id, ObligationKind::SendPermit);
}
4 => {
let _ = ledger.record_acquire(id, ObligationKind::Lease);
}
5 => {
ledger.force_abort_repair(id);
}
_ => {
let _ = ledger.record_acquire(id, ObligationKind::Ack);
let _ = ledger.record_commit(id);
}
}
}
}
fn ledger_signature(
ledger: &CrdtObligationLedger,
) -> Vec<(
ObligationId,
LatticeState,
Option<ObligationKind>,
BTreeMap<NodeId, LatticeState>,
BTreeSet<NodeId>,
BTreeMap<NodeId, u64>,
BTreeMap<NodeId, u64>,
)> {
ledger
.entries
.iter()
.map(|(id, entry)| {
(
*id,
entry.state,
entry.kind,
entry.witnesses.clone(),
entry.repair_nodes.clone(),
entry.acquire_counts.clone(),
entry.resolve_counts.clone(),
)
})
.collect()
}
#[test]
fn acquire_sets_reserved() {
let mut ledger = CrdtObligationLedger::new(node("A"));
let state = ledger.record_acquire(oid(1), ObligationKind::SendPermit);
assert_eq!(state, LatticeState::Reserved);
assert_eq!(ledger.get(&oid(1)), LatticeState::Reserved);
}
#[test]
fn commit_sets_committed() {
let mut ledger = CrdtObligationLedger::new(node("A"));
ledger.record_acquire(oid(1), ObligationKind::Ack);
let state = ledger.record_commit(oid(1));
assert_eq!(state, LatticeState::Committed);
}
#[test]
fn abort_sets_aborted() {
let mut ledger = CrdtObligationLedger::new(node("A"));
ledger.record_acquire(oid(1), ObligationKind::Lease);
let state = ledger.record_abort(oid(1));
assert_eq!(state, LatticeState::Aborted);
}
#[test]
fn unknown_obligation_returns_unknown() {
let ledger = CrdtObligationLedger::new(node("A"));
assert_eq!(ledger.get(&oid(99)), LatticeState::Unknown);
}
#[test]
fn single_acquire_resolve_is_linear() {
let mut ledger = CrdtObligationLedger::new(node("A"));
ledger.record_acquire(oid(1), ObligationKind::SendPermit);
ledger.record_commit(oid(1));
let entry = ledger.get_entry(&oid(1)).unwrap();
assert!(entry.is_linear());
assert_eq!(entry.total_acquires(), 1);
assert_eq!(entry.total_resolves(), 1);
}
#[test]
fn double_acquire_on_same_node_violates_linearity() {
let mut ledger = CrdtObligationLedger::new(node("A"));
ledger.record_acquire(oid(1), ObligationKind::SendPermit);
ledger.record_acquire(oid(1), ObligationKind::SendPermit);
let entry = ledger.get_entry(&oid(1)).unwrap();
assert!(!entry.is_linear());
assert_eq!(entry.total_acquires(), 2);
}
#[test]
fn double_resolve_violates_linearity() {
let mut ledger = CrdtObligationLedger::new(node("A"));
ledger.record_acquire(oid(1), ObligationKind::SendPermit);
ledger.record_commit(oid(1));
ledger.record_commit(oid(1));
let entry = ledger.get_entry(&oid(1)).unwrap();
assert!(!entry.is_linear());
assert_eq!(entry.total_resolves(), 2);
}
#[test]
fn linearity_violations_reported() {
let mut ledger = CrdtObligationLedger::new(node("A"));
ledger.record_acquire(oid(1), ObligationKind::SendPermit);
ledger.record_acquire(oid(1), ObligationKind::SendPermit);
let violations = ledger.linearity_violations();
assert_eq!(violations.len(), 1);
assert_eq!(violations[0].id, oid(1));
}
#[test]
fn merge_two_replicas_converges() {
let mut a = CrdtObligationLedger::new(node("A"));
a.record_acquire(oid(1), ObligationKind::SendPermit);
a.record_commit(oid(1));
let mut b = CrdtObligationLedger::new(node("B"));
b.record_acquire(oid(2), ObligationKind::Ack);
b.record_abort(oid(2));
a.merge(&b);
assert_eq!(a.get(&oid(1)), LatticeState::Committed);
assert_eq!(a.get(&oid(2)), LatticeState::Aborted);
assert_eq!(a.len(), 2);
}
#[test]
fn merge_is_commutative() {
let mut a = CrdtObligationLedger::new(node("A"));
a.record_acquire(oid(1), ObligationKind::SendPermit);
a.record_commit(oid(1));
let mut b = CrdtObligationLedger::new(node("B"));
b.record_acquire(oid(1), ObligationKind::SendPermit);
let mut ab = a.clone();
ab.merge(&b);
let mut ba = b.clone();
ba.merge(&a);
assert_eq!(ab.get(&oid(1)), ba.get(&oid(1)));
assert_eq!(ab.get(&oid(1)), LatticeState::Committed);
}
proptest! {
#[test]
fn metamorphic_merge_trace_commutativity(
a_ops in prop::collection::vec((0u8..12, 0u8..7), 0..24),
b_ops in prop::collection::vec((0u8..12, 0u8..7), 0..24),
) {
let a_ops: Vec<TraceOp> = a_ops
.into_iter()
.map(|(id_index, action)| TraceOp { id_index, action })
.collect();
let b_ops: Vec<TraceOp> = b_ops
.into_iter()
.map(|(id_index, action)| TraceOp { id_index, action })
.collect();
let mut a = CrdtObligationLedger::new(node("A"));
let mut b = CrdtObligationLedger::new(node("B"));
apply_trace(&mut a, &a_ops);
apply_trace(&mut b, &b_ops);
let mut ab = a.clone();
ab.merge(&b);
let mut ba = b.clone();
ba.merge(&a);
prop_assert_eq!(ledger_signature(&ab), ledger_signature(&ba));
prop_assert_eq!(ab.pending(), ba.pending());
prop_assert_eq!(ab.conflicts().len(), ba.conflicts().len());
prop_assert_eq!(
ab.linearity_violations().len(),
ba.linearity_violations().len()
);
}
}
#[test]
fn merge_is_associative() {
let mut a = CrdtObligationLedger::new(node("A"));
a.record_acquire(oid(1), ObligationKind::SendPermit);
let mut b = CrdtObligationLedger::new(node("B"));
b.record_acquire(oid(1), ObligationKind::SendPermit);
b.record_commit(oid(1));
let mut c = CrdtObligationLedger::new(node("C"));
c.record_acquire(oid(2), ObligationKind::Lease);
let mut ab_c = a.clone();
ab_c.merge(&b);
ab_c.merge(&c);
let mut bc = b.clone();
bc.merge(&c);
let mut a_bc = a.clone();
a_bc.merge(&bc);
assert_eq!(ab_c.get(&oid(1)), a_bc.get(&oid(1)));
assert_eq!(ab_c.get(&oid(2)), a_bc.get(&oid(2)));
}
#[test]
fn merge_is_idempotent() {
let mut a = CrdtObligationLedger::new(node("A"));
a.record_acquire(oid(1), ObligationKind::SendPermit);
a.record_commit(oid(1));
let before = a.clone();
a.merge(&before);
assert_eq!(a, before);
}
#[test]
fn conflict_detected_on_commit_abort_merge() {
let mut a = CrdtObligationLedger::new(node("A"));
a.record_acquire(oid(1), ObligationKind::SendPermit);
a.record_commit(oid(1));
let mut b = CrdtObligationLedger::new(node("B"));
b.record_acquire(oid(1), ObligationKind::SendPermit);
b.record_abort(oid(1));
a.merge(&b);
assert_eq!(a.get(&oid(1)), LatticeState::Conflict);
assert!(!a.is_sound());
let conflicts = a.conflicts();
assert_eq!(conflicts.len(), 1);
}
#[test]
fn terminal_state_absorbs_reserved() {
let mut a = CrdtObligationLedger::new(node("A"));
a.record_acquire(oid(1), ObligationKind::SendPermit);
a.record_commit(oid(1));
let mut stale = CrdtObligationLedger::new(node("B"));
stale.record_acquire(oid(1), ObligationKind::SendPermit);
a.merge(&stale);
assert_eq!(a.get(&oid(1)), LatticeState::Committed);
}
#[test]
fn compact_tombstones_terminal_linear_entries() {
let mut ledger = CrdtObligationLedger::new(node("A"));
ledger.record_acquire(oid(1), ObligationKind::SendPermit);
ledger.record_commit(oid(1));
ledger.record_acquire(oid(2), ObligationKind::Ack);
let compacted = ledger.compact();
assert_eq!(compacted, 0);
assert_eq!(ledger.len(), 2);
assert_eq!(ledger.get(&oid(1)), LatticeState::Committed); assert_eq!(ledger.get(&oid(2)), LatticeState::Reserved); let entry = ledger.get_entry(&oid(1)).expect("entry should exist");
assert!(entry.is_terminal());
assert!(entry.is_linear());
assert_eq!(entry.total_acquires(), 1);
assert_eq!(entry.total_resolves(), 1);
assert_eq!(
*entry.witnesses.get(&node("A")).expect("local witness"),
LatticeState::Committed
);
}
#[test]
fn compact_preserves_conflicts() {
let mut a = CrdtObligationLedger::new(node("A"));
a.record_acquire(oid(1), ObligationKind::SendPermit);
a.record_commit(oid(1));
let mut b = CrdtObligationLedger::new(node("B"));
b.record_acquire(oid(1), ObligationKind::SendPermit);
b.record_abort(oid(1));
a.merge(&b);
assert!(a.get(&oid(1)).is_conflict());
let compacted = a.compact();
assert_eq!(compacted, 0); assert_eq!(a.len(), 1);
}
#[test]
fn compact_preserves_linearity_violations() {
let mut ledger = CrdtObligationLedger::new(node("A"));
ledger.record_acquire(oid(1), ObligationKind::SendPermit);
ledger.record_acquire(oid(1), ObligationKind::SendPermit);
ledger.record_commit(oid(1));
let compacted = ledger.compact();
assert_eq!(compacted, 0); }
#[test]
fn compact_prevents_stale_reserved_resurrection() {
let id = oid(11);
let mut a = CrdtObligationLedger::new(node("A"));
a.record_acquire(id, ObligationKind::SendPermit);
a.record_commit(id);
let compacted = a.compact();
assert_eq!(compacted, 0);
assert_eq!(a.get(&id), LatticeState::Committed);
let mut stale = CrdtObligationLedger::new(node("B"));
stale.record_acquire(id, ObligationKind::SendPermit);
assert_eq!(stale.get(&id), LatticeState::Reserved);
a.merge(&stale);
assert_eq!(a.get(&id), LatticeState::Committed);
}
#[test]
fn pending_returns_only_reserved() {
let mut ledger = CrdtObligationLedger::new(node("A"));
ledger.record_acquire(oid(1), ObligationKind::SendPermit);
ledger.record_acquire(oid(2), ObligationKind::Ack);
ledger.record_commit(oid(2));
let pending = ledger.pending();
assert_eq!(pending, vec![oid(1)]);
}
#[test]
fn snapshot_reflects_state() {
let mut ledger = CrdtObligationLedger::new(node("A"));
ledger.record_acquire(oid(1), ObligationKind::SendPermit);
ledger.record_acquire(oid(2), ObligationKind::Ack);
ledger.record_commit(oid(2));
ledger.record_acquire(oid(3), ObligationKind::Lease);
ledger.record_abort(oid(3));
let snap = ledger.snapshot();
assert_eq!(snap.total, 3);
assert_eq!(snap.pending, 1);
assert_eq!(snap.committed, 1);
assert_eq!(snap.aborted, 1);
assert_eq!(snap.conflicts, 0);
assert_eq!(snap.linearity_violations, 0);
}
#[test]
fn is_sound_with_clean_ledger() {
let mut ledger = CrdtObligationLedger::new(node("A"));
ledger.record_acquire(oid(1), ObligationKind::SendPermit);
ledger.record_commit(oid(1));
assert!(ledger.is_sound());
}
#[test]
fn three_node_ring_gossip_converges() {
let mut a = CrdtObligationLedger::new(node("A"));
a.record_acquire(oid(1), ObligationKind::SendPermit);
a.record_commit(oid(1));
let mut b = CrdtObligationLedger::new(node("B"));
b.record_acquire(oid(2), ObligationKind::Ack);
b.record_abort(oid(2));
let mut c = CrdtObligationLedger::new(node("C"));
c.record_acquire(oid(3), ObligationKind::Lease);
a.merge(&b);
b.merge(&c);
c.merge(&a);
a.merge(&c);
b.merge(&a);
for id in [oid(1), oid(2), oid(3)] {
assert_eq!(
a.get(&id),
b.get(&id),
"divergence on {id:?} between A and B"
);
assert_eq!(
b.get(&id),
c.get(&id),
"divergence on {id:?} between B and C"
);
}
assert_eq!(a.get(&oid(1)), LatticeState::Committed);
assert_eq!(a.get(&oid(2)), LatticeState::Aborted);
assert_eq!(a.get(&oid(3)), LatticeState::Reserved);
}
#[test]
fn snapshot_display_is_readable() {
let mut ledger = CrdtObligationLedger::new(node("A"));
ledger.record_acquire(oid(1), ObligationKind::SendPermit);
let snap = ledger.snapshot();
let display = format!("{snap}");
assert!(display.contains("total=1"));
assert!(display.contains("pending=1"));
}
#[test]
fn linearity_violation_display() {
let mut ledger = CrdtObligationLedger::new(node("A"));
ledger.record_acquire(oid(1), ObligationKind::SendPermit);
ledger.record_acquire(oid(1), ObligationKind::SendPermit);
let violations = ledger.linearity_violations();
let display = format!("{}", violations[0]);
assert!(display.contains("acquires=2"));
}
#[test]
fn acquire_mismatched_kind_marks_conflict() {
let mut ledger = CrdtObligationLedger::new(node("A"));
let id = oid(42);
ledger.record_acquire(id, ObligationKind::SendPermit);
let state = ledger.record_acquire(id, ObligationKind::Lease);
let entry = ledger.get_entry(&id).expect("entry should exist");
assert_eq!(state, LatticeState::Conflict);
assert_eq!(entry.state, LatticeState::Conflict);
assert_eq!(entry.kind, Some(ObligationKind::SendPermit));
assert_eq!(entry.total_acquires(), 1);
}
#[test]
fn acquire_after_terminal_preserves_terminal_witness() {
let mut ledger = CrdtObligationLedger::new(node("A"));
let id = oid(43);
ledger.record_acquire(id, ObligationKind::Ack);
ledger.record_commit(id);
let before = ledger
.get_entry(&id)
.expect("entry should exist")
.witnesses
.get(&node("A"))
.copied();
let state = ledger.record_acquire(id, ObligationKind::Lease);
let entry = ledger.get_entry(&id).expect("entry should exist");
let after = entry.witnesses.get(&node("A")).copied();
assert_eq!(state, LatticeState::Committed);
assert_eq!(before, Some(LatticeState::Committed));
assert_eq!(after, Some(LatticeState::Committed));
assert_eq!(entry.total_acquires(), 1);
assert_eq!(entry.total_resolves(), 1);
}
#[test]
fn merge_mismatched_kind_marks_conflict() {
let id = oid(44);
let mut a = CrdtObligationLedger::new(node("A"));
a.record_acquire(id, ObligationKind::SendPermit);
let mut b = CrdtObligationLedger::new(node("B"));
b.record_acquire(id, ObligationKind::Lease);
a.merge(&b);
let entry = a.get_entry(&id).expect("entry should exist");
assert_eq!(entry.state, LatticeState::Conflict);
assert!(entry.is_conflict());
}
#[test]
fn force_abort_repair_skips_healthy_pending_entry() {
let mut ledger = CrdtObligationLedger::new(node("A"));
let id = oid(45);
ledger.record_acquire(id, ObligationKind::Ack);
ledger.force_abort_repair(id);
let entry = ledger.get_entry(&id).expect("entry should exist");
assert_eq!(entry.state, LatticeState::Reserved);
assert_eq!(entry.total_acquires(), 1);
assert_eq!(entry.total_resolves(), 0);
}
#[test]
fn force_abort_repair_missing_id_is_noop() {
let mut ledger = CrdtObligationLedger::new(node("A"));
let id = oid(145);
ledger.force_abort_repair(id);
assert!(ledger.get_entry(&id).is_none());
assert!(ledger.is_empty());
}
#[test]
fn force_abort_repair_collapses_conflict_to_linear_aborted() {
let id = oid(46);
let mut a = CrdtObligationLedger::new(node("A"));
a.record_acquire(id, ObligationKind::SendPermit);
a.record_commit(id);
let mut b = CrdtObligationLedger::new(node("B"));
b.record_acquire(id, ObligationKind::SendPermit);
b.record_abort(id);
a.merge(&b);
let conflicted = a.get_entry(&id).expect("entry should exist");
assert!(conflicted.is_conflict());
assert!(!conflicted.is_linear());
a.force_abort_repair(id);
let repaired = a.get_entry(&id).expect("entry should exist");
assert_eq!(repaired.state, LatticeState::Aborted);
assert!(repaired.is_linear());
assert_eq!(repaired.total_acquires(), 1);
assert_eq!(repaired.total_resolves(), 1);
assert_eq!(repaired.witnesses.len(), 1);
assert_eq!(
repaired.witnesses.get(&node("A")).copied(),
Some(LatticeState::Aborted)
);
assert_eq!(repaired.repair_nodes.len(), 1);
assert!(repaired.repair_nodes.contains(&node("A")));
}
#[test]
fn force_abort_repair_survives_merge_with_stale_conflict() {
let id = oid(48);
let mut a = CrdtObligationLedger::new(node("A"));
a.record_acquire(id, ObligationKind::SendPermit);
a.record_commit(id);
let mut b = CrdtObligationLedger::new(node("B"));
b.record_acquire(id, ObligationKind::SendPermit);
b.record_abort(id);
a.merge(&b);
let stale_conflict = a.clone();
assert!(a.get(&id).is_conflict());
a.force_abort_repair(id);
a.merge(&stale_conflict);
let repaired = a.get_entry(&id).expect("entry should exist");
assert_eq!(repaired.state, LatticeState::Aborted);
assert!(repaired.is_linear());
assert!(!repaired.is_conflict());
assert_eq!(repaired.total_acquires(), 1);
assert_eq!(repaired.total_resolves(), 1);
assert_eq!(
repaired.witnesses.get(&node("A")).copied(),
Some(LatticeState::Aborted)
);
}
#[test]
fn force_abort_repair_converges_after_independent_repairs() {
let id = oid(49);
let mut a = CrdtObligationLedger::new(node("A"));
a.record_acquire(id, ObligationKind::SendPermit);
a.record_commit(id);
let mut b = CrdtObligationLedger::new(node("B"));
b.record_acquire(id, ObligationKind::SendPermit);
b.record_abort(id);
a.merge(&b);
b.merge(&a);
assert!(a.get(&id).is_conflict());
assert!(b.get(&id).is_conflict());
a.force_abort_repair(id);
b.force_abort_repair(id);
a.merge(&b);
let repaired = a.get_entry(&id).expect("entry should exist");
assert_eq!(repaired.state, LatticeState::Aborted);
assert!(repaired.is_linear());
assert_eq!(repaired.total_acquires(), 1);
assert_eq!(repaired.total_resolves(), 1);
assert_eq!(repaired.witnesses.len(), 1);
assert_eq!(repaired.repair_nodes.len(), 2);
assert_eq!(
repaired.witnesses.get(&node("A")).copied(),
Some(LatticeState::Aborted)
);
}
#[test]
fn compact_rewrites_non_minimal_terminal_metadata() {
let mut ledger = CrdtObligationLedger::new(node("A"));
let id = oid(47);
ledger.record_acquire(id, ObligationKind::Lease);
ledger.record_abort(id);
let entry = ledger.entries.get_mut(&id).expect("entry should exist");
entry.witnesses.insert(node("B"), LatticeState::Unknown);
entry.acquire_counts.insert(node("B"), 0);
entry.resolve_counts.insert(node("B"), 0);
assert!(entry.is_terminal());
assert!(entry.is_linear());
let compacted = ledger.compact();
assert_eq!(compacted, 1);
let compacted_entry = ledger.get_entry(&id).expect("entry should exist");
assert_eq!(compacted_entry.witnesses.len(), 1);
assert_eq!(compacted_entry.total_acquires(), 1);
assert_eq!(compacted_entry.total_resolves(), 1);
assert_eq!(
compacted_entry.witnesses.get(&node("A")).copied(),
Some(LatticeState::Aborted)
);
}
#[test]
fn crdt_obligation_entry_debug_clone_eq() {
let mut ledger = CrdtObligationLedger::new(node("X"));
let id = ObligationId::new_for_test(50, 0);
ledger.record_acquire(id, ObligationKind::SendPermit);
let entry = ledger.get_entry(&id).unwrap();
let entry2 = entry.clone();
assert_eq!(entry, &entry2);
let dbg = format!("{entry:?}");
assert!(dbg.contains("CrdtObligationEntry"));
}
#[test]
fn crdt_obligation_ledger_debug_clone_eq() {
let ledger = CrdtObligationLedger::new(node("Y"));
let ledger2 = ledger.clone();
assert_eq!(ledger, ledger2);
let dbg = format!("{ledger:?}");
assert!(dbg.contains("CrdtObligationLedger"));
}
#[test]
fn linearity_violation_debug_clone() {
let v = LinearityViolation {
id: ObligationId::new_for_test(1, 0),
total_acquires: 2,
total_resolves: 1,
witnesses: BTreeMap::new(),
};
let v2 = v;
assert_eq!(v2.total_acquires, 2);
let dbg = format!("{v2:?}");
assert!(dbg.contains("LinearityViolation"));
}
#[test]
fn ledger_snapshot_debug_clone() {
let s = LedgerSnapshot {
node: node("Z"),
total: 10,
pending: 3,
committed: 5,
aborted: 1,
conflicts: 1,
linearity_violations: 0,
};
let s2 = s;
assert_eq!(s2.total, 10);
assert_eq!(s2.pending, 3);
let dbg = format!("{s2:?}");
assert!(dbg.contains("LedgerSnapshot"));
}
#[test]
fn record_resolve_joins_witness_instead_of_overwriting() {
let mut ledger = CrdtObligationLedger::new(node("A"));
let id = oid(99);
ledger.record_acquire(id, ObligationKind::Ack);
ledger.record_commit(id);
let entry = ledger.get_entry(&id).expect("entry exists");
assert_eq!(
*entry.witnesses.get(&node("A")).unwrap(),
LatticeState::Committed,
);
ledger.record_abort(id);
let entry = ledger.get_entry(&id).expect("entry exists");
let witness = *entry.witnesses.get(&node("A")).unwrap();
assert_eq!(witness, LatticeState::Conflict);
assert_eq!(entry.state, LatticeState::Conflict);
}
}