use std::sync::Arc;
use mempill_core::application::{AuditQueryRequest, IngestClaimRequest, QueryMemoryRequest};
use mempill_core::ports::persistence::PersistencePort;
use mempill_sqlite::{
connection::open_in_memory, SqlitePersistenceStore,
};
use mempill_types::{
AgentId, BeliefStatus, Cardinality, Claim, ClaimRef, Confidence, Criticality,
Disposition, ExternalKind, Fact, LedgerEntry, LedgerEventKind, ProvenanceLabel,
TransactionTime, ValidTime, ValidityAssertion,
};
use chrono::Utc;
use uuid::Uuid;
fn agent() -> AgentId {
AgentId("i9-agent".into())
}
fn make_claim(agent_id: &AgentId, subject: &str, predicate: &str, value: &str) -> Claim {
Claim::new(
ClaimRef(Uuid::new_v4()),
agent_id.clone(),
Fact {
subject: subject.into(),
predicate: predicate.into(),
value: serde_json::json!(value),
},
Cardinality::Functional,
ProvenanceLabel::External(ExternalKind::UserAsserted),
mempill_types::ExternalAnchor { nearest_external_anchor: None, derivation_depth: 0 },
TransactionTime(Utc::now()),
ValidTime { start: None, end: None, valid_time_confidence: 0.0 },
Confidence { value_confidence: 0.9, valid_time_confidence: 0.0 },
Criticality::Medium,
vec![],
None,
None,
)
}
fn make_validity_assertion(agent_id: &AgentId, claim_ref: &ClaimRef) -> ValidityAssertion {
ValidityAssertion {
assertion_ref: Uuid::new_v4(),
agent_id: agent_id.clone(),
target_claim: claim_ref.clone(),
kind: mempill_types::AssertionKind::Bound { bound_at: Utc::now() },
provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
confidence: Confidence { value_confidence: 0.9, valid_time_confidence: 0.0 },
asserted_at: TransactionTime(Utc::now()),
}
}
fn make_ledger_entry(agent_id: &AgentId, claim_ref: &ClaimRef) -> LedgerEntry {
LedgerEntry {
entry_id: Uuid::new_v4(),
agent_id: agent_id.clone(),
claim_ref: claim_ref.clone(),
event_kind: LedgerEventKind::ClaimCommitted,
disposition: Disposition::CommittedCheap,
rationale: None,
recorded_at: TransactionTime(Utc::now()),
}
}
#[tokio::test]
async fn i9_rollback_leaves_zero_rows_verified_via_public_read_api() {
let conn = open_in_memory().expect("in-memory connection must open");
let store = Arc::new(SqlitePersistenceStore::new(conn));
let agent = agent();
let claim = make_claim(&agent, "user", "role", "admin");
let claim_ref = claim.claim_ref().clone();
let before = store
.load_subject_line(&agent, "user", "role")
.expect("load_subject_line must succeed on empty store");
assert!(before.is_empty(), "baseline: no claims before any write");
let assertion = make_validity_assertion(&agent, &claim_ref);
let ledger_entry = make_ledger_entry(&agent, &claim_ref);
let entry_id_to_find = ledger_entry.entry_id;
let mut txn = store.begin_atomic(&agent).expect("begin_atomic must succeed");
store.append_claim(&mut txn, &claim).expect("append_claim must succeed in txn");
store
.append_validity_assertion(&mut txn, &assertion)
.expect("append_validity_assertion must succeed in txn");
store
.append_ledger_entry(&mut txn, &ledger_entry)
.expect("append_ledger_entry must succeed in txn");
store.rollback(txn).expect("rollback must succeed");
let after_claims = store
.load_subject_line(&agent, "user", "role")
.expect("load_subject_line after rollback must succeed");
assert!(
after_claims.is_empty(),
"I9: claim row MUST NOT be visible after rollback (load_subject_line returned {} claims)",
after_claims.len()
);
let after_assertions = store
.load_validity_assertions_for(&agent, &claim_ref)
.expect("load_validity_assertions_for after rollback must succeed");
assert!(
after_assertions.is_empty(),
"I9: validity_assertion MUST NOT be visible after rollback (got {} assertions)",
after_assertions.len()
);
let after_ledger = store
.load_ledger(&agent, None, 100)
.expect("load_ledger after rollback must succeed");
let found_entry = after_ledger
.iter()
.any(|e| e.entry_id == entry_id_to_find);
assert!(
!found_entry,
"I9: ledger_entry MUST NOT be visible after rollback (entry_id present in load_ledger result)"
);
}
#[tokio::test]
async fn i9_commit_makes_rows_visible_via_public_read_api() {
let conn = open_in_memory().expect("in-memory connection must open");
let store = Arc::new(SqlitePersistenceStore::new(conn));
let agent = agent();
let claim = make_claim(&agent, "user", "status", "active");
let claim_ref = claim.claim_ref().clone();
let ledger_entry = make_ledger_entry(&agent, &claim_ref);
let entry_id = ledger_entry.entry_id;
let mut txn = store.begin_atomic(&agent).expect("begin_atomic must succeed");
store.append_claim(&mut txn, &claim).expect("append_claim must succeed");
store.append_ledger_entry(&mut txn, &ledger_entry).expect("append_ledger_entry must succeed");
store.commit(txn).expect("commit must succeed");
let claims = store
.load_subject_line(&agent, "user", "status")
.expect("load_subject_line after commit must succeed");
assert_eq!(claims.len(), 1, "after commit, exactly 1 claim must be visible");
assert_eq!(claims[0].claim_ref(), &claim_ref, "the committed claim_ref must match");
let ledger = store
.load_ledger(&agent, None, 100)
.expect("load_ledger after commit must succeed");
let found = ledger.iter().any(|e| e.entry_id == entry_id);
assert!(found, "after commit, ledger_entry must be visible via load_ledger");
}
#[tokio::test]
async fn i9_engine_two_non_conflicting_ingests_leave_no_phantom_rows() {
let engine = mempill_sqlite::open_default_in_memory()
.expect("in-memory DefaultEngine must open");
let agent = AgentId("i9-e2e-agent".into());
let r1 = engine.ingest_claim(IngestClaimRequest {
agent_id: agent.clone(),
subject: "session".into(),
predicate: "start_time".into(),
value: serde_json::json!("2026-01-01T00:00:00Z"),
provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
cardinality: Cardinality::Functional,
valid_time: None,
confidence: Confidence { value_confidence: 0.9, valid_time_confidence: 0.0 },
criticality: Criticality::Low,
derived_from: vec![],
}).await.expect("first ingest (start_time) must succeed");
let r2 = engine.ingest_claim(IngestClaimRequest {
agent_id: agent.clone(),
subject: "session".into(),
predicate: "user_id".into(), value: serde_json::json!("user-42"),
provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
cardinality: Cardinality::Functional,
valid_time: None,
confidence: Confidence { value_confidence: 0.9, valid_time_confidence: 0.0 },
criticality: Criticality::Low,
derived_from: vec![],
}).await.expect("second ingest (user_id) must succeed");
assert_eq!(r1.disposition, Disposition::CommittedCheap, "start_time must be CommittedCheap");
assert_eq!(r2.disposition, Disposition::CommittedCheap, "user_id must be CommittedCheap");
assert_ne!(r1.claim_ref, r2.claim_ref, "two ingests must produce distinct claim_refs");
let q1 = engine.query_memory(QueryMemoryRequest {
agent_id: agent.clone(),
subject: "session".into(),
predicate: "start_time".into(),
as_of_tx_time: None,
}).await.expect("query start_time must succeed");
let q2 = engine.query_memory(QueryMemoryRequest {
agent_id: agent.clone(),
subject: "session".into(),
predicate: "user_id".into(),
as_of_tx_time: None,
}).await.expect("query user_id must succeed");
let primary1 = q1.belief.primary.as_ref()
.expect("I9 e2e: start_time belief must have primary");
let primary2 = q2.belief.primary.as_ref()
.expect("I9 e2e: user_id belief must have primary");
assert_eq!(primary1.claim_ref, r1.claim_ref,
"I9 e2e: start_time query must return the correct claim_ref (no phantom rows)");
assert_eq!(primary2.claim_ref, r2.claim_ref,
"I9 e2e: user_id query must return the correct claim_ref (no phantom rows)");
}
#[tokio::test]
async fn i9_heavypath_supersession_commits_atomically() {
let engine = mempill_sqlite::open_default_in_memory()
.expect("in-memory DefaultEngine must open");
let agent = AgentId("i9-heavypath-agent".into());
let resp_a = engine.ingest_claim(IngestClaimRequest {
agent_id: agent.clone(),
subject: "user".into(),
predicate: "email".into(),
value: serde_json::json!("old@example.com"),
provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
cardinality: Cardinality::Functional,
valid_time: None,
confidence: Confidence { value_confidence: 0.9, valid_time_confidence: 0.0 },
criticality: Criticality::High,
derived_from: vec![],
}).await.expect("ingest A (incumbent) must succeed");
assert_eq!(
resp_a.disposition, Disposition::CommittedCheap,
"I9 HeavyPath: first ingest must be CommittedCheap (no conflict)"
);
let claim_ref_a = resp_a.claim_ref.clone();
let resp_b = engine.ingest_claim(IngestClaimRequest {
agent_id: agent.clone(),
subject: "user".into(),
predicate: "email".into(), value: serde_json::json!("new@example.com"), provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
cardinality: Cardinality::Functional,
valid_time: None,
confidence: Confidence { value_confidence: 0.9, valid_time_confidence: 0.0 },
criticality: Criticality::High,
derived_from: vec![],
}).await.expect("I9 HeavyPath: ingest B (conflicting) must succeed (DEFECT-1 fixed)");
assert_ne!(
resp_b.disposition, Disposition::CommittedCheap,
"I9 HeavyPath: conflicting ingest must not be CommittedCheap (HeavyPath must have fired)"
);
let audit = engine.query_audit(AuditQueryRequest {
agent_id: agent.clone(),
claim_ref: Some(claim_ref_a.clone()),
from_tx_time: None,
limit: 50,
}).await.expect("audit query for claim A must succeed after B11 contested ingest");
let committed_count = audit.entries.iter()
.filter(|e| e.claim_ref == claim_ref_a && matches!(e.event_kind, LedgerEventKind::ClaimCommitted))
.count();
assert_eq!(
committed_count, 1,
"I9 HeavyPath atomicity: ClaimCommitted entry for claim A MUST be present after B11 \
contested ingest (append-only — incumbent retained). Found: {committed_count}"
);
let validity_asserted_count = audit.entries.iter()
.filter(|e| e.claim_ref == claim_ref_a && matches!(e.event_kind, LedgerEventKind::ValidityAsserted))
.count();
assert_eq!(
validity_asserted_count, 0,
"TASK-9-W4-W5-FIX: ValidityAsserted for claim A MUST NOT be present at ingest time. \
The incumbent is not superseded during ingest (only at submit_adjudication Affirm). \
Found: {validity_asserted_count} (expected 0)"
);
let query = engine.query_memory(QueryMemoryRequest {
agent_id: agent.clone(),
subject: "user".into(),
predicate: "email".into(),
as_of_tx_time: None,
}).await.expect("query after B11 Contested ingest must succeed");
assert_eq!(
query.belief.status, BeliefStatus::Contested,
"I9 HeavyPath: belief after B11 contested ingest MUST be Contested (both claims live). \
Got {:?}", query.belief.status
);
let all_values: Vec<_> = query.belief.primary.iter()
.map(|b| b.fact.value.clone())
.chain(query.belief.alternatives.iter().map(|b| b.fact.value.clone()))
.collect();
assert!(
all_values.contains(&serde_json::json!("old@example.com")),
"I9 HeavyPath: 'old@example.com' (claim A / incumbent) MUST be visible in Contested. Got: {all_values:?}"
);
assert!(
all_values.contains(&serde_json::json!("new@example.com")),
"I9 HeavyPath: 'new@example.com' (claim B / challenger) MUST be visible in Contested. Got: {all_values:?}"
);
}