mod common;
use std::sync::Arc;
use chrono::Utc;
use mempill_core::ports::{
pending_adjudication::{PendingAdjudicationPort, PendingAdjudicationRow},
PersistencePort,
};
use mempill_postgres::{PostgresPendingStore, PostgresPersistenceStore};
use mempill_types::{
AdjudicationRequest, AgentId, Belief, Cardinality, Claim, ClaimRef, Confidence, Criticality,
CurrencySignal, CurrencyState, ExternalAnchor, ExternalKind, Fact, LedgerEntry,
LedgerEventKind, OverturnReason, ProvenanceLabel, SubjectLineRef, TransactionTime, ValidTime,
};
use uuid::Uuid;
fn agent(name: &str) -> AgentId {
AgentId(name.into())
}
fn make_adj_request(ag: &AgentId) -> AdjudicationRequest {
let now = TransactionTime(Utc::now());
AdjudicationRequest {
subject_line: SubjectLineRef {
agent_id: ag.clone(),
subject: "acme".into(),
predicate: "ceo".into(),
},
incumbent: Belief {
claim_ref: ClaimRef(Uuid::new_v4()),
fact: Fact {
subject: "acme".into(),
predicate: "ceo".into(),
value: serde_json::json!("alice"),
},
provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
valid_time: ValidTime { start: None, end: None, valid_time_confidence: 0.0 },
transaction_time: now.clone(),
confidence: Confidence { value_confidence: 0.9, valid_time_confidence: 0.0 },
currency_signal: CurrencySignal {
last_refreshed_at: now.clone(),
state: CurrencyState::Fresh,
corroboration_count: 0,
},
criticality: Criticality::High,
},
challenger: Claim::new(
ClaimRef(Uuid::new_v4()),
ag.clone(),
Fact {
subject: "acme".into(),
predicate: "ceo".into(),
value: serde_json::json!("bob"),
},
Cardinality::Functional,
ProvenanceLabel::External(ExternalKind::UserAsserted),
ExternalAnchor { nearest_external_anchor: None, derivation_depth: 0 },
TransactionTime(Utc::now()),
ValidTime { start: None, end: None, valid_time_confidence: 0.0 },
Confidence { value_confidence: 0.9, valid_time_confidence: 0.0 },
Criticality::High,
vec![],
None,
None,
),
criticality: Criticality::High,
reason: OverturnReason::ExternalContradiction,
}
}
fn make_pending_row(ag: &AgentId, challenger_ref: ClaimRef, incumbent_ref: ClaimRef) -> PendingAdjudicationRow {
PendingAdjudicationRow {
handle_id: Uuid::new_v4(),
agent_id: ag.clone(),
subject: "acme".into(),
predicate: "ceo".into(),
challenger_claim_ref: challenger_ref,
incumbent_claim_ref: incumbent_ref,
request_payload: make_adj_request(ag),
queued_at: Utc::now(),
expires_at: None,
status: "pending".into(),
}
}
fn seed_claim_with_disposition(
store: &PostgresPersistenceStore,
ag: &AgentId,
claim: &Claim,
disposition: mempill_types::Disposition,
event_kind: LedgerEventKind,
) {
let mut txn = store
.begin_atomic(ag)
.expect("begin_atomic must succeed");
store
.append_claim(&mut txn, claim)
.expect("append_claim must succeed");
store
.append_ledger_entry(
&mut txn,
&LedgerEntry {
entry_id: Uuid::new_v4(),
agent_id: ag.clone(),
claim_ref: claim.claim_ref().clone(),
event_kind,
disposition,
rationale: None,
recorded_at: TransactionTime(claim.transaction_time().0),
},
)
.expect("append_ledger_entry must succeed");
store.commit(txn).expect("commit must succeed");
}
fn make_claim(ag: &AgentId, value: &str, ts_offset_secs: i64) -> Claim {
let now = Utc::now() + chrono::Duration::seconds(ts_offset_secs);
Claim::new(
ClaimRef(Uuid::new_v4()),
ag.clone(),
Fact {
subject: "acme".into(),
predicate: "ceo".into(),
value: serde_json::json!(value),
},
Cardinality::Functional,
ProvenanceLabel::External(ExternalKind::UserAsserted),
ExternalAnchor { nearest_external_anchor: None, derivation_depth: 0 },
TransactionTime(now),
ValidTime { start: None, end: None, valid_time_confidence: 0.0 },
Confidence { value_confidence: 0.9, valid_time_confidence: 0.0 },
Criticality::High,
vec![],
None,
None,
)
}
fn run_mark_expired_sets_status(store: Arc<PostgresPersistenceStore>) {
let pending: PostgresPendingStore = store.pending_store();
let ag = agent("w6-mark-expired-agent");
let challenger_ref = ClaimRef(Uuid::new_v4());
let incumbent_ref = ClaimRef(Uuid::new_v4());
let row = make_pending_row(&ag, challenger_ref, incumbent_ref);
let handle_id = row.handle_id;
pending.insert_pending(&row).expect("insert_pending must succeed");
let before = pending.list_pending(Some(&ag)).expect("list_pending must succeed");
assert_eq!(before.len(), 1, "must have 1 pending row before mark_expired");
assert_eq!(before[0].status, "pending");
pending.mark_expired(handle_id).expect("mark_expired must succeed");
let fetched = pending
.get_pending(handle_id)
.expect("get_pending must not error")
.expect("row must still exist after mark_expired");
assert_eq!(fetched.status, "expired", "status must be 'expired' after mark_expired");
let after = pending.list_pending(Some(&ag)).expect("list_pending must succeed");
assert!(
after.is_empty(),
"expired row must not appear in list_pending (status='pending' filter)"
);
let expired = pending
.list_expired(Utc::now())
.expect("list_expired must succeed");
assert!(
expired.is_empty(),
"already-expired row must not re-appear in list_expired"
);
}
fn run_list_queued_orphan_present(store: Arc<PostgresPersistenceStore>) {
let pending: PostgresPendingStore = store.pending_store();
let ag = agent("w6-orphan-present-agent");
let incumbent_claim = make_claim(&ag, "alice", -10);
let incumbent_ref = incumbent_claim.claim_ref().clone();
seed_claim_with_disposition(
&store,
&ag,
&incumbent_claim,
mempill_types::Disposition::CommittedCheap,
LedgerEventKind::ClaimCommitted,
);
let challenger_claim = make_claim(&ag, "bob", 0);
let challenger_ref = challenger_claim.claim_ref().clone();
seed_claim_with_disposition(
&store,
&ag,
&challenger_claim,
mempill_types::Disposition::QueuedForAdjudication,
LedgerEventKind::ClaimCommitted,
);
let orphans = pending
.list_queued_orphan_claims()
.expect("list_queued_orphan_claims must succeed");
assert_eq!(
orphans.len(),
1,
"must detect exactly 1 orphaned QueuedForAdjudication claim; got {:?}",
orphans.iter().map(|o| &o.challenger_claim_ref).collect::<Vec<_>>()
);
let orphan = &orphans[0];
assert_eq!(
orphan.challenger_claim_ref, challenger_ref,
"orphan challenger_claim_ref must match the seeded challenger"
);
assert_eq!(
orphan.incumbent_claim_ref,
Some(incumbent_ref),
"orphan incumbent_claim_ref must identify the CommittedCheap incumbent"
);
assert_eq!(orphan.agent_id, ag);
assert_eq!(orphan.subject, "acme");
assert_eq!(orphan.predicate, "ceo");
}
fn run_list_queued_orphan_not_present(store: Arc<PostgresPersistenceStore>) {
let pending: PostgresPendingStore = store.pending_store();
let ag = agent("w6-orphan-absent-agent");
let incumbent_claim = make_claim(&ag, "alice", -10);
let incumbent_ref = incumbent_claim.claim_ref().clone();
seed_claim_with_disposition(
&store,
&ag,
&incumbent_claim,
mempill_types::Disposition::CommittedCheap,
LedgerEventKind::ClaimCommitted,
);
let challenger_claim = make_claim(&ag, "bob", 0);
let challenger_ref = challenger_claim.claim_ref().clone();
seed_claim_with_disposition(
&store,
&ag,
&challenger_claim,
mempill_types::Disposition::QueuedForAdjudication,
LedgerEventKind::ClaimCommitted,
);
let row = make_pending_row(&ag, challenger_ref.clone(), incumbent_ref.clone());
pending.insert_pending(&row).expect("insert_pending must succeed");
let orphans = pending
.list_queued_orphan_claims()
.expect("list_queued_orphan_claims must succeed");
assert!(
orphans.is_empty(),
"QueuedForAdjudication claim WITH a matching pending row must NOT appear as orphan; got {:?}",
orphans.iter().map(|o| &o.challenger_claim_ref).collect::<Vec<_>>()
);
let ag2 = agent("w6-orphan-contested-agent");
let contested_claim = make_claim(&ag2, "carol", 0);
seed_claim_with_disposition(
&store,
&ag2,
&contested_claim,
mempill_types::Disposition::Contested,
LedgerEventKind::AdjudicationResolved,
);
let orphans2 = pending
.list_queued_orphan_claims()
.expect("list_queued_orphan_claims must succeed (second check)");
let contested_appears = orphans2
.iter()
.any(|o| o.challenger_claim_ref == *contested_claim.claim_ref());
assert!(
!contested_appears,
"Contested claim must NOT appear in list_queued_orphan_claims"
);
}
#[test]
fn w6_pg16_mark_expired_sets_status() {
common::with_pg("16", run_mark_expired_sets_status);
}
#[test]
fn w6_pg16_list_queued_orphan_present() {
common::with_pg("16", run_list_queued_orphan_present);
}
#[test]
fn w6_pg16_list_queued_orphan_not_present() {
common::with_pg("16", run_list_queued_orphan_not_present);
}
#[test]
fn w6_pg18_mark_expired_sets_status() {
common::with_pg("18", run_mark_expired_sets_status);
}
#[test]
fn w6_pg18_list_queued_orphan_present() {
common::with_pg("18", run_list_queued_orphan_present);
}
#[test]
fn w6_pg18_list_queued_orphan_not_present() {
common::with_pg("18", run_list_queued_orphan_not_present);
}