use crate::remote::NodeId;
use crate::types::ObligationId;
use std::collections::{BTreeMap, BTreeSet};
use super::lattice::LatticeState;
#[derive(Clone, Debug)]
pub struct NodeSnapshot {
pub node: NodeId,
pub states: BTreeMap<ObligationId, LatticeState>,
}
impl NodeSnapshot {
#[must_use]
pub fn new(node: NodeId) -> Self {
Self {
node,
states: BTreeMap::new(),
}
}
pub fn observe(&mut self, obligation: ObligationId, state: LatticeState) {
self.states.insert(obligation, state);
}
}
#[derive(Clone, Debug)]
pub enum SagaConstraint {
AllOrNothing {
name: String,
obligations: BTreeSet<ObligationId>,
},
}
#[derive(Clone, Debug)]
pub struct ConsistencyReport {
pub pairwise_conflicts: Vec<PairwiseConflict>,
pub phantom_states: Vec<PhantomState>,
pub constraint_violations: Vec<ConstraintViolation>,
}
impl ConsistencyReport {
#[must_use]
pub fn has_issues(&self) -> bool {
!self.pairwise_conflicts.is_empty()
|| !self.phantom_states.is_empty()
|| !self.constraint_violations.is_empty()
}
#[must_use]
pub fn has_sheaf_issues(&self) -> bool {
!self.phantom_states.is_empty() || !self.constraint_violations.is_empty()
}
}
#[derive(Clone, Debug)]
pub struct PairwiseConflict {
pub obligation: ObligationId,
pub node_a: NodeId,
pub state_a: LatticeState,
pub node_b: NodeId,
pub state_b: LatticeState,
}
#[derive(Clone, Debug)]
pub struct PhantomState {
pub obligation: ObligationId,
pub merged_state: LatticeState,
pub node_observations: BTreeMap<NodeId, LatticeState>,
}
#[derive(Clone, Debug)]
pub struct ConstraintViolation {
pub constraint_name: String,
pub obligation_states: BTreeMap<ObligationId, ObligationDetail>,
pub explanation: String,
}
#[derive(Clone, Debug)]
pub struct ObligationDetail {
pub merged: LatticeState,
pub per_node: BTreeMap<NodeId, LatticeState>,
}
pub struct SagaConsistencyChecker {
snapshots: Vec<NodeSnapshot>,
constraints: Vec<SagaConstraint>,
}
impl SagaConsistencyChecker {
#[must_use]
pub fn new(snapshots: Vec<NodeSnapshot>, constraints: Vec<SagaConstraint>) -> Self {
Self {
snapshots,
constraints,
}
}
#[must_use]
pub fn check(&self) -> ConsistencyReport {
let pairwise_conflicts = self.find_pairwise_conflicts();
let phantom_states = self.find_phantom_states();
let constraint_violations = self.find_constraint_violations();
ConsistencyReport {
pairwise_conflicts,
phantom_states,
constraint_violations,
}
}
fn find_pairwise_conflicts(&self) -> Vec<PairwiseConflict> {
let mut conflicts = Vec::new();
let all_obligations = self.all_obligations();
for &obligation in &all_obligations {
let observations: Vec<(NodeId, LatticeState)> = self
.snapshots
.iter()
.filter_map(|snap| {
snap.states
.get(&obligation)
.map(|&s| (snap.node.clone(), s))
})
.collect();
for i in 0..observations.len() {
for j in (i + 1)..observations.len() {
let (ref na, sa) = observations[i];
let (ref nb, sb) = observations[j];
if sa.join(sb).is_conflict() {
conflicts.push(PairwiseConflict {
obligation,
node_a: na.clone(),
state_a: sa,
node_b: nb.clone(),
state_b: sb,
});
}
}
}
}
conflicts
}
fn find_phantom_states(&self) -> Vec<PhantomState> {
let mut phantoms = Vec::new();
let all_obligations = self.all_obligations();
for &obligation in &all_obligations {
let mut observations = BTreeMap::new();
let mut merged = LatticeState::Unknown;
for snap in &self.snapshots {
if let Some(&state) = snap.states.get(&obligation) {
observations.insert(snap.node.clone(), state);
merged = merged.join(state);
}
}
if merged.is_terminal() && !merged.is_conflict() {
let any_node_saw_merged = observations.values().any(|&s| s == merged);
if !any_node_saw_merged {
phantoms.push(PhantomState {
obligation,
merged_state: merged,
node_observations: observations,
});
}
}
}
phantoms
}
fn find_constraint_violations(&self) -> Vec<ConstraintViolation> {
let mut violations = Vec::new();
for constraint in &self.constraints {
match constraint {
SagaConstraint::AllOrNothing { name, obligations } => {
if let Some(violation) = self.check_all_or_nothing(name, obligations) {
violations.push(violation);
}
}
}
}
violations
}
fn check_all_or_nothing(
&self,
name: &str,
obligations: &BTreeSet<ObligationId>,
) -> Option<ConstraintViolation> {
let mut obligation_states: BTreeMap<ObligationId, ObligationDetail> = BTreeMap::new();
for &obligation in obligations {
let mut per_node = BTreeMap::new();
let mut merged = LatticeState::Unknown;
for snap in &self.snapshots {
if let Some(&state) = snap.states.get(&obligation) {
per_node.insert(snap.node.clone(), state);
merged = merged.join(state);
}
}
obligation_states.insert(obligation, ObligationDetail { merged, per_node });
}
let mut terminal_states: Vec<LatticeState> = obligation_states
.values()
.map(|d| d.merged)
.filter(|s| s.is_terminal())
.collect();
terminal_states.dedup();
if terminal_states.len() > 1 {
return Some(ConstraintViolation {
constraint_name: name.to_string(),
obligation_states,
explanation: format!(
"Merged states disagree: {terminal_states:?}. \
All-or-nothing requires uniform terminal state."
),
});
}
if let Some(&terminal) = terminal_states.first() {
let any_node_witnesses_all = self.snapshots.iter().any(|snap| {
obligations.iter().all(|oid| {
snap.states
.get(oid)
.copied()
.unwrap_or(LatticeState::Unknown)
== terminal
})
});
if !any_node_witnesses_all {
return Some(ConstraintViolation {
constraint_name: name.to_string(),
obligation_states,
explanation: format!(
"No single node observed all obligations as {terminal}. \
The global '{terminal}' state is a phantom — \
it exists in the pairwise merge but not in any node's view. \
(H¹ ≠ 0: local sections do not glue into a global section.)"
),
});
}
}
None
}
fn all_obligations(&self) -> BTreeSet<ObligationId> {
self.snapshots
.iter()
.flat_map(|s| s.states.keys().copied())
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn node(name: &str) -> NodeId {
NodeId::new(name)
}
fn oid(index: u32) -> ObligationId {
ObligationId::new_for_test(index, 0)
}
#[test]
fn detects_pairwise_conflict() {
let o1 = oid(1);
let mut snap_a = NodeSnapshot::new(node("A"));
snap_a.observe(o1, LatticeState::Committed);
let mut snap_b = NodeSnapshot::new(node("B"));
snap_b.observe(o1, LatticeState::Aborted);
let checker = SagaConsistencyChecker::new(vec![snap_a, snap_b], vec![]);
let report = checker.check();
assert!(!report.pairwise_conflicts.is_empty());
assert!(report.has_issues());
}
#[test]
fn no_conflict_when_compatible() {
let o1 = oid(1);
let mut snap_a = NodeSnapshot::new(node("A"));
snap_a.observe(o1, LatticeState::Reserved);
let mut snap_b = NodeSnapshot::new(node("B"));
snap_b.observe(o1, LatticeState::Committed);
let checker = SagaConsistencyChecker::new(vec![snap_a, snap_b], vec![]);
let report = checker.check();
assert!(report.pairwise_conflicts.is_empty());
}
#[test]
fn detects_phantom_committed() {
let o1 = oid(1);
let mut snap_a = NodeSnapshot::new(node("A"));
snap_a.observe(o1, LatticeState::Committed);
let mut snap_b = NodeSnapshot::new(node("B"));
snap_b.observe(o1, LatticeState::Reserved);
let checker = SagaConsistencyChecker::new(vec![snap_a, snap_b], vec![]);
let report = checker.check();
assert!(report.phantom_states.is_empty());
}
#[test]
fn sheaf_detects_phantom_global_commit() {
let o1 = oid(1);
let o2 = oid(2);
let o3 = oid(3);
let mut snap_a = NodeSnapshot::new(node("A"));
snap_a.observe(o1, LatticeState::Committed);
snap_a.observe(o2, LatticeState::Committed);
snap_a.observe(o3, LatticeState::Reserved);
let mut snap_b = NodeSnapshot::new(node("B"));
snap_b.observe(o1, LatticeState::Committed);
snap_b.observe(o2, LatticeState::Reserved);
snap_b.observe(o3, LatticeState::Committed);
let mut snap_c = NodeSnapshot::new(node("C"));
snap_c.observe(o1, LatticeState::Reserved);
snap_c.observe(o2, LatticeState::Committed);
snap_c.observe(o3, LatticeState::Committed);
let constraint = SagaConstraint::AllOrNothing {
name: "test-saga".into(),
obligations: [o1, o2, o3].into_iter().collect(),
};
let checker = SagaConsistencyChecker::new(vec![snap_a, snap_b, snap_c], vec![constraint]);
let report = checker.check();
assert!(report.pairwise_conflicts.is_empty());
assert!(report.has_sheaf_issues());
assert_eq!(report.constraint_violations.len(), 1);
let violation = &report.constraint_violations[0];
assert_eq!(violation.constraint_name, "test-saga");
assert!(violation.explanation.contains("No single node"));
assert!(violation.explanation.contains("H¹ ≠ 0"));
}
#[test]
fn sheaf_passes_when_one_node_witnesses_all() {
let o1 = oid(1);
let o2 = oid(2);
let o3 = oid(3);
let mut snap_a = NodeSnapshot::new(node("A"));
snap_a.observe(o1, LatticeState::Committed);
snap_a.observe(o2, LatticeState::Committed);
snap_a.observe(o3, LatticeState::Committed);
let mut snap_b = NodeSnapshot::new(node("B"));
snap_b.observe(o1, LatticeState::Reserved);
snap_b.observe(o2, LatticeState::Committed);
snap_b.observe(o3, LatticeState::Reserved);
let constraint = SagaConstraint::AllOrNothing {
name: "test-saga".into(),
obligations: [o1, o2, o3].into_iter().collect(),
};
let checker = SagaConsistencyChecker::new(vec![snap_a, snap_b], vec![constraint]);
let report = checker.check();
assert!(!report.has_sheaf_issues());
assert!(report.constraint_violations.is_empty());
}
#[test]
fn detects_mixed_terminal_states() {
let o1 = oid(1);
let o2 = oid(2);
let mut snap = NodeSnapshot::new(node("A"));
snap.observe(o1, LatticeState::Committed);
snap.observe(o2, LatticeState::Aborted);
let constraint = SagaConstraint::AllOrNothing {
name: "mixed-saga".into(),
obligations: [o1, o2].into_iter().collect(),
};
let checker = SagaConsistencyChecker::new(vec![snap], vec![constraint]);
let report = checker.check();
assert!(report.has_issues());
assert_eq!(report.constraint_violations.len(), 1);
assert!(
report.constraint_violations[0]
.explanation
.contains("Merged states disagree")
);
}
#[test]
fn empty_snapshots_no_issues() {
let checker = SagaConsistencyChecker::new(vec![], vec![]);
let report = checker.check();
assert!(!report.has_issues());
}
#[test]
fn constraint_with_no_observations_is_fine() {
let o1 = oid(1);
let constraint = SagaConstraint::AllOrNothing {
name: "empty-saga".into(),
obligations: std::iter::once(o1).collect(),
};
let snap = NodeSnapshot::new(node("A"));
let checker = SagaConsistencyChecker::new(vec![snap], vec![constraint]);
let report = checker.check();
assert!(!report.has_issues());
}
#[test]
fn node_snapshot_debug_clone() {
let snap = NodeSnapshot::new(node("X"));
let dbg = format!("{snap:?}");
assert!(dbg.contains("NodeSnapshot"));
let snap2 = snap;
assert!(snap2.states.is_empty());
}
#[test]
fn node_snapshot_observe_inserts() {
let mut snap = NodeSnapshot::new(node("A"));
snap.observe(oid(1), LatticeState::Reserved);
snap.observe(oid(2), LatticeState::Committed);
assert_eq!(snap.states.len(), 2);
assert_eq!(snap.states[&oid(1)], LatticeState::Reserved);
assert_eq!(snap.states[&oid(2)], LatticeState::Committed);
}
#[test]
fn node_snapshot_observe_overwrites() {
let mut snap = NodeSnapshot::new(node("A"));
snap.observe(oid(1), LatticeState::Reserved);
snap.observe(oid(1), LatticeState::Committed);
assert_eq!(snap.states.len(), 1);
assert_eq!(snap.states[&oid(1)], LatticeState::Committed);
}
#[test]
fn saga_constraint_debug_clone() {
let c = SagaConstraint::AllOrNothing {
name: "test".into(),
obligations: [oid(1), oid(2)].into_iter().collect(),
};
let dbg = format!("{c:?}");
assert!(dbg.contains("AllOrNothing"));
let c2 = c;
let dbg2 = format!("{c2:?}");
assert!(dbg2.contains("test"));
}
#[test]
fn consistency_report_debug_clone() {
let report = ConsistencyReport {
pairwise_conflicts: vec![],
phantom_states: vec![],
constraint_violations: vec![],
};
let dbg = format!("{report:?}");
assert!(dbg.contains("ConsistencyReport"));
let r2 = report;
assert!(!r2.has_issues());
assert!(!r2.has_sheaf_issues());
}
#[test]
fn consistency_report_has_issues_with_pairwise() {
let report = ConsistencyReport {
pairwise_conflicts: vec![PairwiseConflict {
obligation: oid(1),
node_a: node("A"),
state_a: LatticeState::Committed,
node_b: node("B"),
state_b: LatticeState::Aborted,
}],
phantom_states: vec![],
constraint_violations: vec![],
};
assert!(report.has_issues());
assert!(!report.has_sheaf_issues());
}
#[test]
fn pairwise_conflict_debug_clone() {
let c = PairwiseConflict {
obligation: oid(1),
node_a: node("A"),
state_a: LatticeState::Committed,
node_b: node("B"),
state_b: LatticeState::Aborted,
};
let dbg = format!("{c:?}");
assert!(dbg.contains("PairwiseConflict"));
let c2 = c;
assert_eq!(c2.state_a, LatticeState::Committed);
}
#[test]
fn phantom_state_debug_clone() {
let p = PhantomState {
obligation: oid(1),
merged_state: LatticeState::Committed,
node_observations: BTreeMap::new(),
};
let dbg = format!("{p:?}");
assert!(dbg.contains("PhantomState"));
let p2 = p;
assert!(p2.node_observations.is_empty());
}
#[test]
fn constraint_violation_debug_clone() {
let cv = ConstraintViolation {
constraint_name: "saga-1".into(),
obligation_states: BTreeMap::new(),
explanation: "test violation".into(),
};
let dbg = format!("{cv:?}");
assert!(dbg.contains("ConstraintViolation"));
let cv2 = cv;
assert_eq!(cv2.constraint_name, "saga-1");
assert_eq!(cv2.explanation, "test violation");
}
#[test]
fn obligation_detail_debug_clone() {
let od = ObligationDetail {
merged: LatticeState::Reserved,
per_node: BTreeMap::new(),
};
let dbg = format!("{od:?}");
assert!(dbg.contains("ObligationDetail"));
let od2 = od;
assert_eq!(od2.merged, LatticeState::Reserved);
}
#[test]
fn consistency_report_has_sheaf_issues_with_phantom() {
let report = ConsistencyReport {
pairwise_conflicts: vec![],
phantom_states: vec![PhantomState {
obligation: oid(1),
merged_state: LatticeState::Committed,
node_observations: BTreeMap::new(),
}],
constraint_violations: vec![],
};
assert!(report.has_issues());
assert!(report.has_sheaf_issues());
}
#[test]
fn consistency_report_has_sheaf_issues_with_violation() {
let report = ConsistencyReport {
pairwise_conflicts: vec![],
phantom_states: vec![],
constraint_violations: vec![ConstraintViolation {
constraint_name: "test".into(),
obligation_states: BTreeMap::new(),
explanation: "broken".into(),
}],
};
assert!(report.has_issues());
assert!(report.has_sheaf_issues());
}
}