use std::collections::HashSet;
use mempill_types::{
AgentId, AssertionKind, ClaimRef, Disposition, EdgeKind, LedgerEntry, LedgerEventKind,
TransactionTime, ValidityAssertion,
};
use crate::ports::persistence::PersistencePort;
#[derive(Debug, Clone)]
pub(crate) struct SupersessionRequest {
pub agent_id: AgentId,
pub superseded_ref: ClaimRef,
pub overturning_ref: ClaimRef,
pub bound_at: chrono::DateTime<chrono::Utc>,
pub recorded_at: TransactionTime,
}
pub(crate) fn execute<P: PersistencePort>(
port: &P,
txn: &mut P::Transaction,
req: &SupersessionRequest,
preloaded_edges: &[mempill_types::ClaimEdge],
) -> Result<usize, P::Error> {
let assertion = ValidityAssertion {
assertion_ref: uuid::Uuid::new_v4(),
agent_id: req.agent_id.clone(),
target_claim: req.superseded_ref.clone(),
kind: AssertionKind::Bound { bound_at: req.bound_at },
provenance: mempill_types::ProvenanceLabel::External(
mempill_types::ExternalKind::ExternalFirstHand,
),
confidence: mempill_types::Confidence {
value_confidence: 1.0,
valid_time_confidence: 1.0,
},
asserted_at: req.recorded_at.clone(),
};
port.append_validity_assertion(txn, &assertion)?;
let ledger_supersession = LedgerEntry {
entry_id: uuid::Uuid::new_v4(),
agent_id: req.agent_id.clone(),
claim_ref: req.superseded_ref.clone(),
event_kind: LedgerEventKind::ValidityAsserted,
disposition: Disposition::Superseded,
rationale: Some(serde_json::json!({
"event": "supersession",
"overturning_claim": req.overturning_ref.0.to_string(),
"bound_at": req.bound_at.to_rfc3339(),
})),
recorded_at: req.recorded_at.clone(),
};
port.append_ledger_entry(txn, &ledger_supersession)?;
let edges = preloaded_edges;
let mut seen_dependents: HashSet<ClaimRef> = HashSet::new();
let mut cascade_count = 0usize;
for edge in edges {
if edge.kind == EdgeKind::DependsOn && edge.to_claim == req.superseded_ref {
if !seen_dependents.insert(edge.from_claim.clone()) {
continue;
}
let flag_entry = LedgerEntry {
entry_id: uuid::Uuid::new_v4(),
agent_id: req.agent_id.clone(),
claim_ref: edge.from_claim.clone(),
event_kind: LedgerEventKind::DependentFlaggedPendingReview,
disposition: Disposition::PendingReview,
rationale: Some(serde_json::json!({
"event": "depends_on_cascade",
"superseded_parent": req.superseded_ref.0.to_string(),
"overturning_claim": req.overturning_ref.0.to_string(),
})),
recorded_at: req.recorded_at.clone(),
};
port.append_ledger_entry(txn, &flag_entry)?;
cascade_count += 1;
}
}
Ok(cascade_count)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ports::persistence::Txn;
use mempill_types::{
AgentId, AssertionKind, Cardinality, Claim, ClaimEdge, ClaimRef, Criticality,
EdgeKind, ExternalAnchor, ExternalKind, Fact, LedgerEntry, LedgerEventKind,
ProvenanceLabel, TransactionTime, ValidTime, ValidityAssertion,
};
use chrono::Utc;
use std::sync::{Arc, Mutex};
#[derive(Debug)]
enum MockAppend {
Assertion(ValidityAssertion),
Ledger(LedgerEntry),
}
struct MockTxn {
agent_id: AgentId,
appends: Vec<MockAppend>,
write_count: usize,
fail_on_nth_write: Option<usize>,
rolled_back: bool,
}
impl MockTxn {
fn new(agent_id: AgentId) -> Self {
Self {
agent_id,
appends: Vec::new(),
write_count: 0,
fail_on_nth_write: None,
rolled_back: false,
}
}
fn with_fail_on(mut self, n: usize) -> Self {
self.fail_on_nth_write = Some(n);
self
}
fn next_write(&mut self) -> Result<(), MockError> {
self.write_count += 1;
if self.fail_on_nth_write == Some(self.write_count) {
return Err(MockError::InjectedFailure);
}
Ok(())
}
fn assertion_count(&self) -> usize {
self.appends.iter().filter(|a| matches!(a, MockAppend::Assertion(_))).count()
}
fn ledger_count(&self) -> usize {
self.appends.iter().filter(|a| matches!(a, MockAppend::Ledger(_))).count()
}
fn ledger_entries_by_kind(&self, kind: &LedgerEventKind) -> usize {
self.appends.iter().filter(|a| {
if let MockAppend::Ledger(e) = a { &e.event_kind == kind } else { false }
}).count()
}
}
impl Txn for MockTxn {
fn agent_id(&self) -> &AgentId {
&self.agent_id
}
}
#[derive(Debug, thiserror::Error)]
enum MockError {
#[error("injected failure")]
InjectedFailure,
}
#[derive(Default)]
struct CommittedState {
assertions: Vec<ValidityAssertion>,
ledger: Vec<LedgerEntry>,
}
struct MockPort {
agent_id: AgentId,
edges: Vec<ClaimEdge>,
committed: Arc<Mutex<CommittedState>>,
txn_open: Mutex<bool>,
}
impl MockPort {
fn new(agent_id: AgentId, edges: Vec<ClaimEdge>) -> Self {
Self {
agent_id,
edges,
committed: Arc::new(Mutex::new(CommittedState::default())),
txn_open: Mutex::new(false),
}
}
fn committed_assertions(&self) -> usize {
self.committed.lock().unwrap().assertions.len()
}
fn committed_ledger(&self) -> usize {
self.committed.lock().unwrap().ledger.len()
}
fn committed_ledger_by_kind(&self, kind: &LedgerEventKind) -> usize {
self.committed.lock().unwrap().ledger.iter()
.filter(|e| &e.event_kind == kind)
.count()
}
}
impl PersistencePort for MockPort {
type Transaction = MockTxn;
type Error = MockError;
fn begin_atomic(&self, agent_id: &AgentId) -> Result<MockTxn, MockError> {
*self.txn_open.lock().unwrap() = true;
Ok(MockTxn::new(agent_id.clone()))
}
fn append_validity_assertion(
&self,
txn: &mut MockTxn,
assertion: &ValidityAssertion,
) -> Result<(), MockError> {
txn.next_write()?;
txn.appends.push(MockAppend::Assertion(assertion.clone()));
Ok(())
}
fn append_ledger_entry(
&self,
txn: &mut MockTxn,
entry: &LedgerEntry,
) -> Result<(), MockError> {
txn.next_write()?;
txn.appends.push(MockAppend::Ledger(entry.clone()));
Ok(())
}
fn append_claim(
&self,
_txn: &mut MockTxn,
_claim: &Claim,
) -> Result<ClaimRef, MockError> {
unimplemented!("not needed in supersession tests")
}
fn append_claim_edge(
&self,
_txn: &mut MockTxn,
_edge: &ClaimEdge,
) -> Result<(), MockError> {
unimplemented!("not needed in supersession tests")
}
fn commit(&self, txn: MockTxn) -> Result<(), MockError> {
*self.txn_open.lock().unwrap() = false;
let mut state = self.committed.lock().unwrap();
for append in txn.appends {
match append {
MockAppend::Assertion(a) => state.assertions.push(a),
MockAppend::Ledger(e) => state.ledger.push(e),
}
}
Ok(())
}
fn rollback(&self, mut txn: MockTxn) -> Result<(), MockError> {
*self.txn_open.lock().unwrap() = false;
txn.rolled_back = true;
Ok(())
}
fn load_edges_for(
&self,
_agent_id: &AgentId,
claim_ref: &ClaimRef,
) -> Result<Vec<ClaimEdge>, MockError> {
if *self.txn_open.lock().unwrap() {
return Err(MockError::InjectedFailure);
}
Ok(self.edges.iter()
.filter(|e| e.to_claim == *claim_ref && e.kind == EdgeKind::DependsOn)
.cloned()
.collect())
}
fn load_subject_line(
&self, _: &AgentId, _: &str, _: &str,
) -> Result<Vec<Claim>, MockError> {
if *self.txn_open.lock().unwrap() { return Err(MockError::InjectedFailure); }
Ok(vec![])
}
fn load_claim(
&self, _: &AgentId, _: &ClaimRef,
) -> Result<Option<Claim>, MockError> {
if *self.txn_open.lock().unwrap() { return Err(MockError::InjectedFailure); }
Ok(None)
}
fn load_validity_assertions_for(
&self, _: &AgentId, _: &ClaimRef,
) -> Result<Vec<ValidityAssertion>, MockError> {
if *self.txn_open.lock().unwrap() { return Err(MockError::InjectedFailure); }
Ok(vec![])
}
fn load_ledger(
&self, _: &AgentId, _: Option<&TransactionTime>, _: usize,
) -> Result<Vec<LedgerEntry>, MockError> {
if *self.txn_open.lock().unwrap() { return Err(MockError::InjectedFailure); }
Ok(vec![])
}
fn load_ledger_for_claims(
&self, _: &AgentId, _refs: &[ClaimRef],
) -> Result<Vec<LedgerEntry>, MockError> {
if *self.txn_open.lock().unwrap() { return Err(MockError::InjectedFailure); }
Ok(vec![])
}
fn load_injected_claims(&self, _: &AgentId) -> Result<Vec<ClaimRef>, MockError> {
if *self.txn_open.lock().unwrap() { return Err(MockError::InjectedFailure); }
Ok(vec![])
}
fn load_lineage(
&self, _: &AgentId, _: &ClaimRef,
) -> Result<Vec<ClaimEdge>, MockError> {
if *self.txn_open.lock().unwrap() { return Err(MockError::InjectedFailure); }
Ok(vec![])
}
}
fn make_agent() -> AgentId {
AgentId("test-agent".into())
}
fn make_req(agent_id: &AgentId) -> SupersessionRequest {
SupersessionRequest {
agent_id: agent_id.clone(),
superseded_ref: ClaimRef::new_random(),
overturning_ref: ClaimRef::new_random(),
bound_at: Utc::now(),
recorded_at: TransactionTime::now(),
}
}
fn depends_on_edge(from: ClaimRef, to: ClaimRef, agent_id: &AgentId) -> ClaimEdge {
ClaimEdge {
edge_id: uuid::Uuid::new_v4(),
agent_id: agent_id.clone(),
from_claim: from,
to_claim: to,
kind: EdgeKind::DependsOn,
created_at: TransactionTime::now(),
}
}
fn make_claim(agent_id: &AgentId) -> Claim {
Claim::new(
ClaimRef::new_random(),
agent_id.clone(),
Fact {
subject: "user".into(),
predicate: "name".into(),
value: serde_json::json!("Alice"),
},
Cardinality::Functional,
ProvenanceLabel::External(ExternalKind::UserAsserted),
ExternalAnchor { nearest_external_anchor: None, derivation_depth: 0 },
TransactionTime::now(),
ValidTime { start: None, end: None, valid_time_confidence: 0.0 },
mempill_types::Confidence { value_confidence: 1.0, valid_time_confidence: 1.0 },
Criticality::Low,
vec![],
None,
None,
)
}
#[test]
fn atomicity_i9_three_dependents_committed_as_one_unit() {
let agent = make_agent();
let req = make_req(&agent);
let dep1 = ClaimRef::new_random();
let dep2 = ClaimRef::new_random();
let dep3 = ClaimRef::new_random();
let edges = vec![
depends_on_edge(dep1, req.superseded_ref.clone(), &agent),
depends_on_edge(dep2, req.superseded_ref.clone(), &agent),
depends_on_edge(dep3, req.superseded_ref.clone(), &agent),
];
let port = MockPort::new(agent.clone(), edges.clone());
let preloaded = port.load_edges_for(&agent, &req.superseded_ref).unwrap();
let mut txn = port.begin_atomic(&agent).unwrap();
let cascade_n = execute(&port, &mut txn, &req, &preloaded).unwrap();
port.commit(txn).unwrap();
assert_eq!(cascade_n, 3, "cascade count");
assert_eq!(port.committed_assertions(), 1, "bound assertion count");
assert_eq!(
port.committed_ledger_by_kind(&LedgerEventKind::ValidityAsserted),
1,
"supersession ledger entry"
);
assert_eq!(
port.committed_ledger_by_kind(&LedgerEventKind::DependentFlaggedPendingReview),
3,
"dependent pending-review entries"
);
assert_eq!(port.committed_ledger(), 4, "total ledger entries (1 + 3)");
}
#[test]
fn atomicity_i9_rollback_on_failure_zero_committed() {
let agent = make_agent();
let req = make_req(&agent);
let dep = ClaimRef::new_random();
let edges = vec![depends_on_edge(dep, req.superseded_ref.clone(), &agent)];
let port = MockPort::new(agent.clone(), edges.clone());
let preloaded = port.load_edges_for(&agent, &req.superseded_ref).unwrap();
let mut txn = MockTxn::new(agent.clone()).with_fail_on(2);
let result = execute(&port, &mut txn, &req, &preloaded);
assert!(result.is_err(), "expected error from injected failure");
port.rollback(txn).unwrap();
assert_eq!(port.committed_assertions(), 0, "no assertions after rollback");
assert_eq!(port.committed_ledger(), 0, "no ledger entries after rollback");
}
#[test]
fn cascade_a26_three_dependents_each_get_pending_review() {
let agent = make_agent();
let req = make_req(&agent);
let dep_refs: Vec<ClaimRef> = (0..3).map(|_| ClaimRef::new_random()).collect();
let edges: Vec<ClaimEdge> = dep_refs.iter()
.map(|d| depends_on_edge(d.clone(), req.superseded_ref.clone(), &agent))
.collect();
let port = MockPort::new(agent.clone(), edges);
let preloaded = port.load_edges_for(&agent, &req.superseded_ref).unwrap();
let mut txn = port.begin_atomic(&agent).unwrap();
let n = execute(&port, &mut txn, &req, &preloaded).unwrap();
port.commit(txn).unwrap();
assert_eq!(n, 3);
assert_eq!(
port.committed_ledger_by_kind(&LedgerEventKind::DependentFlaggedPendingReview),
3
);
let committed = port.committed.lock().unwrap();
for dep_ref in &dep_refs {
let flagged = committed.ledger.iter().filter(|e| {
e.claim_ref == *dep_ref
&& e.event_kind == LedgerEventKind::DependentFlaggedPendingReview
}).count();
assert_eq!(flagged, 1, "each dependent flagged exactly once: {dep_ref:?}");
}
}
#[test]
fn cascade_a26_zero_dependents_no_pending_review() {
let agent = make_agent();
let req = make_req(&agent);
let port = MockPort::new(agent.clone(), vec![]);
let preloaded = port.load_edges_for(&agent, &req.superseded_ref).unwrap();
let mut txn = port.begin_atomic(&agent).unwrap();
let n = execute(&port, &mut txn, &req, &preloaded).unwrap();
port.commit(txn).unwrap();
assert_eq!(n, 0, "no cascade when no dependents");
assert_eq!(port.committed_assertions(), 1, "still writes the Bound assertion");
assert_eq!(
port.committed_ledger_by_kind(&LedgerEventKind::DependentFlaggedPendingReview),
0
);
}
#[test]
fn non_destruction_i1_only_appends_no_deletes() {
let agent = make_agent();
let req = make_req(&agent);
let port = MockPort::new(agent.clone(), vec![]);
let preloaded = port.load_edges_for(&agent, &req.superseded_ref).unwrap();
let mut txn = port.begin_atomic(&agent).unwrap();
let _ = execute(&port, &mut txn, &req, &preloaded).unwrap();
let has_only_appends = txn.appends.iter().all(|a| {
matches!(a, MockAppend::Assertion(_) | MockAppend::Ledger(_))
});
assert!(has_only_appends, "only append operations present (I1)");
port.commit(txn).unwrap();
}
#[test]
fn determinism_same_inputs_same_append_sequence() {
let agent = make_agent();
let superseded = ClaimRef::new_random();
let overturning = ClaimRef::new_random();
let dep = ClaimRef::new_random();
let bound_at = Utc::now();
let recorded_at = TransactionTime::now();
let build_req = || SupersessionRequest {
agent_id: agent.clone(),
superseded_ref: superseded.clone(),
overturning_ref: overturning.clone(),
bound_at,
recorded_at: recorded_at.clone(),
};
let edges = vec![depends_on_edge(dep.clone(), superseded.clone(), &agent)];
let run = || -> (usize, usize, usize) {
let port = MockPort::new(agent.clone(), edges.clone());
let req = build_req();
let preloaded = port.load_edges_for(&agent, &req.superseded_ref).unwrap();
let mut txn = port.begin_atomic(&agent).unwrap();
let n = execute(&port, &mut txn, &req, &preloaded).unwrap();
port.commit(txn).unwrap();
(
n,
port.committed_assertions(),
port.committed_ledger(),
)
};
let (n1, a1, l1) = run();
let (n2, a2, l2) = run();
assert_eq!((n1, a1, l1), (n2, a2, l2), "deterministic across two runs");
}
#[test]
fn only_depends_on_edges_trigger_cascade() {
let agent = make_agent();
let req = make_req(&agent);
let other_claim = ClaimRef::new_random();
let unrelated_edge = ClaimEdge {
edge_id: uuid::Uuid::new_v4(),
agent_id: agent.clone(),
from_claim: other_claim.clone(),
to_claim: req.superseded_ref.clone(),
kind: EdgeKind::DerivedFrom,
created_at: TransactionTime::now(),
};
let port = MockPort::new(agent.clone(), vec![unrelated_edge]);
let preloaded = port.load_edges_for(&agent, &req.superseded_ref).unwrap();
let mut txn = port.begin_atomic(&agent).unwrap();
let n = execute(&port, &mut txn, &req, &preloaded).unwrap();
port.commit(txn).unwrap();
assert_eq!(n, 0, "DerivedFrom edge must not trigger DependsOn cascade");
assert_eq!(
port.committed_ledger_by_kind(&LedgerEventKind::DependentFlaggedPendingReview),
0
);
}
#[test]
fn bound_assertion_targets_superseded_claim_ref() {
let agent = make_agent();
let req = make_req(&agent);
let port = MockPort::new(agent.clone(), vec![]);
let preloaded = port.load_edges_for(&agent, &req.superseded_ref).unwrap();
let mut txn = port.begin_atomic(&agent).unwrap();
let _ = execute(&port, &mut txn, &req, &preloaded).unwrap();
port.commit(txn).unwrap();
let state = port.committed.lock().unwrap();
assert_eq!(state.assertions.len(), 1);
let a = &state.assertions[0];
assert_eq!(a.target_claim, req.superseded_ref);
assert!(
matches!(a.kind, AssertionKind::Bound { .. }),
"assertion kind must be Bound"
);
}
#[test]
fn cascade_a26_diamond_direct_dependents_only_no_transitive_walk() {
let agent = make_agent();
let req = make_req(&agent); let claim_b = ClaimRef::new_random();
let claim_c = ClaimRef::new_random();
let claim_d = ClaimRef::new_random();
let edges = vec![
depends_on_edge(claim_b.clone(), req.superseded_ref.clone(), &agent),
depends_on_edge(claim_c.clone(), req.superseded_ref.clone(), &agent),
depends_on_edge(claim_d.clone(), claim_b.clone(), &agent),
depends_on_edge(claim_d.clone(), claim_c.clone(), &agent),
];
let port = MockPort::new(agent.clone(), edges);
let preloaded = port.load_edges_for(&agent, &req.superseded_ref).unwrap();
let mut txn = port.begin_atomic(&agent).unwrap();
let n = execute(&port, &mut txn, &req, &preloaded).unwrap();
port.commit(txn).unwrap();
assert_eq!(n, 2, "only B and C are direct dependents of A; D must not be flagged");
let committed = port.committed.lock().unwrap();
let flagged_refs: Vec<&ClaimRef> = committed.ledger.iter()
.filter(|e| e.event_kind == LedgerEventKind::DependentFlaggedPendingReview)
.map(|e| &e.claim_ref)
.collect();
assert!(flagged_refs.contains(&&claim_b), "B must be flagged");
assert!(flagged_refs.contains(&&claim_c), "C must be flagged");
assert!(!flagged_refs.contains(&&claim_d), "D must NOT be flagged (not a direct dependent of A)");
}
#[test]
fn cascade_a26_two_distinct_dependents_both_flagged() {
let agent = make_agent();
let req = make_req(&agent);
let dep_x = ClaimRef::new_random();
let dep_y = ClaimRef::new_random();
let edges = vec![
depends_on_edge(dep_x.clone(), req.superseded_ref.clone(), &agent),
depends_on_edge(dep_y.clone(), req.superseded_ref.clone(), &agent),
];
let port = MockPort::new(agent.clone(), edges);
let preloaded = port.load_edges_for(&agent, &req.superseded_ref).unwrap();
let mut txn = port.begin_atomic(&agent).unwrap();
let n = execute(&port, &mut txn, &req, &preloaded).unwrap();
port.commit(txn).unwrap();
assert_eq!(n, 2);
let committed = port.committed.lock().unwrap();
let x_flags = committed.ledger.iter()
.filter(|e| e.claim_ref == dep_x && e.event_kind == LedgerEventKind::DependentFlaggedPendingReview)
.count();
let y_flags = committed.ledger.iter()
.filter(|e| e.claim_ref == dep_y && e.event_kind == LedgerEventKind::DependentFlaggedPendingReview)
.count();
assert_eq!(x_flags, 1, "dep_x must be flagged exactly once");
assert_eq!(y_flags, 1, "dep_y must be flagged exactly once");
}
#[test]
fn cascade_a26_duplicate_edges_same_dependent_flagged_exactly_once() {
let agent = make_agent();
let req = make_req(&agent);
let dep_d = ClaimRef::new_random();
let edge1 = depends_on_edge(dep_d.clone(), req.superseded_ref.clone(), &agent);
let edge2 = depends_on_edge(dep_d.clone(), req.superseded_ref.clone(), &agent);
let port = MockPort::new(agent.clone(), vec![edge1, edge2]);
let preloaded = port.load_edges_for(&agent, &req.superseded_ref).unwrap();
let mut txn = port.begin_atomic(&agent).unwrap();
let n = execute(&port, &mut txn, &req, &preloaded).unwrap();
port.commit(txn).unwrap();
assert_eq!(n, 1,
"DEFECT: duplicate DependsOn edges caused double-flagging of the same dependent (A26 idempotency violated)");
let committed = port.committed.lock().unwrap();
let d_flags = committed.ledger.iter()
.filter(|e| e.claim_ref == dep_d && e.event_kind == LedgerEventKind::DependentFlaggedPendingReview)
.count();
assert_eq!(d_flags, 1,
"DEFECT: dependent D was flagged {d_flags} times instead of 1 (A26 idempotency violated)");
}
#[test]
fn cascade_a26_outbound_depends_on_from_superseded_does_not_cascade() {
let agent = make_agent();
let req = make_req(&agent);
let other_claim = ClaimRef::new_random();
let wrong_direction_edge = ClaimEdge {
edge_id: uuid::Uuid::new_v4(),
agent_id: agent.clone(),
from_claim: req.superseded_ref.clone(), to_claim: other_claim.clone(),
kind: EdgeKind::DependsOn,
created_at: TransactionTime::now(),
};
let port = MockPort::new(agent.clone(), vec![wrong_direction_edge]);
let preloaded = port.load_edges_for(&agent, &req.superseded_ref).unwrap();
let mut txn = port.begin_atomic(&agent).unwrap();
let n = execute(&port, &mut txn, &req, &preloaded).unwrap();
port.commit(txn).unwrap();
assert_eq!(n, 0, "outbound DependsOn from superseded claim must not trigger cascade");
assert_eq!(
port.committed_ledger_by_kind(&LedgerEventKind::DependentFlaggedPendingReview),
0,
"other_claim must NOT be flagged when it is the TO of superseded's outbound edge"
);
}
#[test]
fn cascade_a26_derived_from_inbound_does_not_cascade() {
let agent = make_agent();
let req = make_req(&agent);
let derived_claim = ClaimRef::new_random();
let derived_from_edge = ClaimEdge {
edge_id: uuid::Uuid::new_v4(),
agent_id: agent.clone(),
from_claim: derived_claim.clone(),
to_claim: req.superseded_ref.clone(),
kind: EdgeKind::DerivedFrom, created_at: TransactionTime::now(),
};
let port = MockPort::new(agent.clone(), vec![derived_from_edge]);
let preloaded = port.load_edges_for(&agent, &req.superseded_ref).unwrap();
let mut txn = port.begin_atomic(&agent).unwrap();
let n = execute(&port, &mut txn, &req, &preloaded).unwrap();
port.commit(txn).unwrap();
assert_eq!(n, 0, "DerivedFrom edge must not cascade to PendingReview (A26 DependsOn-only)");
assert_eq!(
port.committed_ledger_by_kind(&LedgerEventKind::DependentFlaggedPendingReview),
0
);
}
#[test]
fn cascade_a26_mutual_exclusion_edge_does_not_cascade() {
let agent = make_agent();
let req = make_req(&agent);
let peer_claim = ClaimRef::new_random();
let mutex_edge = ClaimEdge {
edge_id: uuid::Uuid::new_v4(),
agent_id: agent.clone(),
from_claim: peer_claim.clone(),
to_claim: req.superseded_ref.clone(),
kind: EdgeKind::MutualExclusion,
created_at: TransactionTime::now(),
};
let port = MockPort::new(agent.clone(), vec![mutex_edge]);
let preloaded = port.load_edges_for(&agent, &req.superseded_ref).unwrap();
let mut txn = port.begin_atomic(&agent).unwrap();
let n = execute(&port, &mut txn, &req, &preloaded).unwrap();
port.commit(txn).unwrap();
assert_eq!(n, 0, "MutualExclusion edge must not cascade");
assert_eq!(
port.committed_ledger_by_kind(&LedgerEventKind::DependentFlaggedPendingReview),
0
);
}
#[test]
fn cascade_a26_supersedes_edge_does_not_cascade() {
let agent = make_agent();
let req = make_req(&agent);
let successor_claim = ClaimRef::new_random();
let supersedes_edge = ClaimEdge {
edge_id: uuid::Uuid::new_v4(),
agent_id: agent.clone(),
from_claim: successor_claim.clone(),
to_claim: req.superseded_ref.clone(),
kind: EdgeKind::Supersedes,
created_at: TransactionTime::now(),
};
let port = MockPort::new(agent.clone(), vec![supersedes_edge]);
let preloaded = port.load_edges_for(&agent, &req.superseded_ref).unwrap();
let mut txn = port.begin_atomic(&agent).unwrap();
let n = execute(&port, &mut txn, &req, &preloaded).unwrap();
port.commit(txn).unwrap();
assert_eq!(n, 0, "Supersedes edge must not cascade to PendingReview");
assert_eq!(
port.committed_ledger_by_kind(&LedgerEventKind::DependentFlaggedPendingReview),
0
);
}
#[test]
fn cascade_a26_ordering_is_deterministic_for_fixed_edge_set() {
let agent = make_agent();
let superseded = ClaimRef::new_random();
let overturning = ClaimRef::new_random();
let bound_at = Utc::now();
let recorded_at = TransactionTime::now();
let dep_a = ClaimRef::new_random();
let dep_b = ClaimRef::new_random();
let dep_c = ClaimRef::new_random();
let edges = vec![
depends_on_edge(dep_a.clone(), superseded.clone(), &agent),
depends_on_edge(dep_b.clone(), superseded.clone(), &agent),
depends_on_edge(dep_c.clone(), superseded.clone(), &agent),
];
let run_and_collect_order = || {
let req = SupersessionRequest {
agent_id: agent.clone(),
superseded_ref: superseded.clone(),
overturning_ref: overturning.clone(),
bound_at,
recorded_at: recorded_at.clone(),
};
let port = MockPort::new(agent.clone(), edges.clone());
let preloaded = port.load_edges_for(&agent, &req.superseded_ref).unwrap();
let mut txn = port.begin_atomic(&agent).unwrap();
execute(&port, &mut txn, &req, &preloaded).unwrap();
port.commit(txn).unwrap();
let state = port.committed.lock().unwrap();
state.ledger.iter()
.filter(|e| e.event_kind == LedgerEventKind::DependentFlaggedPendingReview)
.map(|e| e.claim_ref.clone())
.collect::<Vec<_>>()
};
let order1 = run_and_collect_order();
let order2 = run_and_collect_order();
assert_eq!(order1, order2,
"cascade ordering must be deterministic for a fixed edge set (A26, G1)");
}
#[test]
fn atomicity_i9_fail_on_first_write_zero_committed() {
let agent = make_agent();
let req = make_req(&agent);
let dep = ClaimRef::new_random();
let edges = vec![depends_on_edge(dep, req.superseded_ref.clone(), &agent)];
let port = MockPort::new(agent.clone(), edges);
let preloaded = port.load_edges_for(&agent, &req.superseded_ref).unwrap();
let mut txn = MockTxn::new(agent.clone()).with_fail_on(1);
let result = execute(&port, &mut txn, &req, &preloaded);
assert!(result.is_err(), "should fail on write 1");
port.rollback(txn).unwrap();
assert_eq!(port.committed_assertions(), 0, "no assertions committed after first-write failure");
assert_eq!(port.committed_ledger(), 0, "no ledger entries committed after first-write failure");
}
#[test]
fn atomicity_i9_fail_on_last_cascade_write_zero_committed() {
let agent = make_agent();
let req = make_req(&agent);
let dep1 = ClaimRef::new_random();
let dep2 = ClaimRef::new_random();
let dep3 = ClaimRef::new_random();
let edges = vec![
depends_on_edge(dep1, req.superseded_ref.clone(), &agent),
depends_on_edge(dep2, req.superseded_ref.clone(), &agent),
depends_on_edge(dep3, req.superseded_ref.clone(), &agent),
];
let port = MockPort::new(agent.clone(), edges);
let preloaded = port.load_edges_for(&agent, &req.superseded_ref).unwrap();
let mut txn = MockTxn::new(agent.clone()).with_fail_on(5);
let result = execute(&port, &mut txn, &req, &preloaded);
assert!(result.is_err(), "should fail on the last cascade write (write 5)");
port.rollback(txn).unwrap();
assert_eq!(port.committed_assertions(), 0,
"I9 violated: partial assertion committed despite last-write failure");
assert_eq!(port.committed_ledger(), 0,
"I9 violated: partial ledger committed despite last-write failure");
}
#[test]
fn atomicity_i9_zero_dependents_two_writes_atomic() {
let agent = make_agent();
let req = make_req(&agent);
let port = MockPort::new(agent.clone(), vec![]);
let preloaded = port.load_edges_for(&agent, &req.superseded_ref).unwrap();
let mut txn = port.begin_atomic(&agent).unwrap();
let n = execute(&port, &mut txn, &req, &preloaded).unwrap();
port.commit(txn).unwrap();
assert_eq!(n, 0);
assert_eq!(port.committed_assertions(), 1, "Bound assertion must be committed");
assert_eq!(port.committed_ledger(), 1, "ValidityAsserted entry must be committed");
assert_eq!(
port.committed_ledger_by_kind(&LedgerEventKind::ValidityAsserted),
1
);
}
#[test]
fn atomicity_i9_zero_dependents_fail_first_write_zero_committed() {
let agent = make_agent();
let req = make_req(&agent);
let port = MockPort::new(agent.clone(), vec![]);
let preloaded = port.load_edges_for(&agent, &req.superseded_ref).unwrap();
let mut txn = MockTxn::new(agent.clone()).with_fail_on(1);
let result = execute(&port, &mut txn, &req, &preloaded);
assert!(result.is_err());
port.rollback(txn).unwrap();
assert_eq!(port.committed_assertions(), 0);
assert_eq!(port.committed_ledger(), 0);
}
#[test]
fn non_destruction_i1_incumbent_not_deleted_with_cascade() {
let agent = make_agent();
let req = make_req(&agent);
let dep1 = ClaimRef::new_random();
let dep2 = ClaimRef::new_random();
let edges = vec![
depends_on_edge(dep1, req.superseded_ref.clone(), &agent),
depends_on_edge(dep2, req.superseded_ref.clone(), &agent),
];
let port = MockPort::new(agent.clone(), edges);
let preloaded = port.load_edges_for(&agent, &req.superseded_ref).unwrap();
let mut txn = port.begin_atomic(&agent).unwrap();
let _ = execute(&port, &mut txn, &req, &preloaded).unwrap();
let all_append_typed = txn.appends.iter().all(|a| {
matches!(a, MockAppend::Assertion(_) | MockAppend::Ledger(_))
});
assert!(all_append_typed, "only additive Assertion/Ledger appends present (I1)");
assert_eq!(txn.appends.len(), 4, "write count must be exactly 4 for 2 dependents");
port.commit(txn).unwrap();
let committed = port.committed.lock().unwrap();
let incumbent_flagged_as_dependent = committed.ledger.iter().any(|e| {
e.claim_ref == req.superseded_ref
&& e.event_kind == LedgerEventKind::DependentFlaggedPendingReview
});
assert!(!incumbent_flagged_as_dependent,
"I1 / A26: the superseded claim itself must not appear as a dependent in cascade entries");
}
#[test]
fn monotonicity_i10_supersession_is_append_not_rewrite() {
let agent = make_agent();
let req = make_req(&agent);
let port = MockPort::new(agent.clone(), vec![]);
let preloaded = port.load_edges_for(&agent, &req.superseded_ref).unwrap();
let mut txn = port.begin_atomic(&agent).unwrap();
let _ = execute(&port, &mut txn, &req, &preloaded).unwrap();
port.commit(txn).unwrap();
let state = port.committed.lock().unwrap();
assert_eq!(state.assertions.len(), 1, "one Bound assertion appended (not zero, not two)");
let assertion = &state.assertions[0];
assert_eq!(assertion.target_claim, req.superseded_ref,
"Bound assertion must target the superseded claim ref (I10: append, not rewrite)");
assert_ne!(assertion.assertion_ref, uuid::Uuid::nil(),
"Bound assertion must have a freshly generated assertion_ref");
assert_ne!(assertion.assertion_ref.as_u128(), req.superseded_ref.0.as_u128(),
"Bound assertion ref must differ from the superseded claim ref (I10: new row)");
assert!(
matches!(assertion.kind, AssertionKind::Bound { .. }),
"I10: supersession records a Bound assertion, not any other kind"
);
let incumbent_entry = state.ledger.iter()
.find(|e| e.claim_ref == req.superseded_ref)
.expect("ledger entry for incumbent must exist");
assert_eq!(incumbent_entry.event_kind, LedgerEventKind::ValidityAsserted,
"I10: incumbent ledger event must be ValidityAsserted (append path), not a delete event");
assert_eq!(incumbent_entry.disposition, Disposition::Superseded,
"I10: incumbent disposition must be Superseded (marking only, not deletion)");
}
}