Skip to main content

mempill_core/application/
submit_adjudication.rs

1#![allow(missing_docs)]
2//! SubmitAdjudicationUseCase — atomic oracle verdict apply.
3//!
4//! This is the RESOLUTION path: an oracle verdict arrives asynchronously and this use-case
5//! applies the transition atomically. All reads happen BEFORE begin_atomic; all writes
6//! happen inside a single begin_atomic/commit unit.
7//!
8//! # Disposition transitions
9//!
10//! | Verdict | Challenger | Incumbent | Ledger entries |
11//! |---------|-----------|-----------|----------------|
12//! | Affirm  | → CommittedCheap (new ledger w/ External provenance) | → Superseded (ValidityAssertion Bound + ledger) | 2 |
13//! | Deny    | → Superseded (ValidityAssertion Bound + ledger) | stays committed (no change) | 1 |
14//! | Unknown | → Contested (ledger abstain entry) | → Contested (ledger abstain entry) | 2 (one per claim) |
15//!
16//! # Lock invariant
17//!
18//! This use-case is SYNC — it runs inside spawn_blocking. The EngineHandle acquires
19//! the store write lock then per-agent write lock BEFORE spawn_blocking, exactly mirroring ingest.
20//! This use-case must NOT acquire locks itself.
21//!
22//! # Transaction discipline
23//!
24//! Steps 1 and 3 (reads) execute BEFORE begin_atomic. Steps 4–6 (writes) execute inside
25//! one begin_atomic/commit unit. On any error inside the unit: rollback is called and Err
26//! is returned — no partial writes.
27
28use std::sync::Arc;
29
30use chrono::{DateTime, Utc};
31use mempill_types::{
32    AgentId, AdjudicationOutcome, AdjudicationVerdict, AssertionKind, Disposition,
33    LedgerEntry, LedgerEventKind, TransactionTime, ValidityAssertion,
34};
35
36use crate::{
37    engine_handle::ErasedPendingStore,
38    error::MemError,
39    ports::PersistencePort,
40};
41
42/// Sync use-case: apply an oracle verdict atomically.
43///
44/// Invoked via `spawn_blocking` by `EngineHandle::submit_adjudication`.
45pub struct SubmitAdjudicationUseCase<P>
46where
47    P: PersistencePort + Send + Sync + 'static,
48{
49    persistence: Arc<P>,
50    pending_store: Arc<dyn ErasedPendingStore>,
51}
52
53impl<P> SubmitAdjudicationUseCase<P>
54where
55    P: PersistencePort + Send + Sync + 'static,
56{
57    pub fn new(persistence: Arc<P>, pending_store: Arc<dyn ErasedPendingStore>) -> Self {
58        Self { persistence, pending_store }
59    }
60
61    /// Execute the verdict-apply algorithm.
62    ///
63    /// `now` is engine-stamped at the async boundary and passed in (DETERMINISM).
64    pub fn execute(
65        &self,
66        handle_id: uuid::Uuid,
67        response: mempill_types::AdjudicationResponse,
68        now: DateTime<Utc>,
69    ) -> Result<AdjudicationOutcome, MemError> {
70        let tx_time = TransactionTime(now);
71
72        // ── Step 1: Look up the pending row in the DB (DB-authoritative) ──
73        let row = self.pending_store
74            .get_pending_erased(handle_id)
75            .map_err(|e| MemError::PendingStore { source: e })?
76            .ok_or(MemError::AdjudicationHandleNotFound { handle_id })?;
77
78        // ── Lazy expiry: if TTL has elapsed, revert challenger → Contested + ledger ──
79        // Do NOT apply the verdict on an expired handle. Revert atomically then return
80        // AdjudicationHandleNotFound so the caller knows the handle is no longer active.
81        if let Some(expires_at) = row.expires_at {
82            if expires_at <= now {
83                // Revert: write Contested ledger entry for the challenger, mark row expired.
84                let agent_id_exp: AgentId = row.agent_id.clone();
85                let challenger_ref_exp = row.challenger_claim_ref.clone();
86                let handle_id_exp = handle_id;
87
88                // Load ledger to verify challenger is still QueuedForAdjudication.
89                let ledger_check = self.persistence
90                    .load_ledger(&agent_id_exp, None, 10_000)
91                    .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
92                let challenger_disp = latest_disposition_from_ledger(&ledger_check, &challenger_ref_exp);
93
94                if challenger_disp == Some(Disposition::QueuedForAdjudication) {
95                    // Write Contested ledger entry inside a transaction.
96                    let mut txn = self.persistence
97                        .begin_atomic(&agent_id_exp)
98                        .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
99
100                    let expired_entry = mempill_types::LedgerEntry {
101                        entry_id: uuid::Uuid::new_v4(),
102                        agent_id: agent_id_exp.clone(),
103                        claim_ref: challenger_ref_exp.clone(),
104                        event_kind: LedgerEventKind::AdjudicationExpired,
105                        disposition: Disposition::Contested,
106                        rationale: Some(serde_json::json!({
107                            "event": "adjudication_ttl_expired_lazy",
108                            "handle_id": handle_id_exp.to_string(),
109                            "expired_at": expires_at.to_rfc3339(),
110                            "incumbent_claim_ref": row.incumbent_claim_ref.0.to_string(),
111                        })),
112                        recorded_at: tx_time.clone(),
113                    };
114
115                    match self.persistence.append_ledger_entry(&mut txn, &expired_entry) {
116                        Ok(()) => {
117                            if let Err(e) = self.persistence.commit(txn) {
118                                return Err(MemError::Persistence { source: Box::new(e) });
119                            }
120                            // Mark pending row expired outside txn (within write lock).
121                            let _ = self.pending_store.mark_expired_erased(handle_id_exp);
122                        }
123                        Err(e) => {
124                            let _ = self.persistence.rollback(txn);
125                            return Err(MemError::Persistence { source: Box::new(e) });
126                        }
127                    }
128                }
129
130                return Err(MemError::AdjudicationHandleNotFound { handle_id });
131            }
132        }
133
134        let agent_id: AgentId = row.agent_id.clone();
135        let challenger_ref = row.challenger_claim_ref.clone();
136        let incumbent_ref = row.incumbent_claim_ref.clone();
137
138        // ── Step 3: State guard — load latest dispositions BEFORE begin_atomic ────
139        // Check that both claims are still in QueuedForAdjudication (idempotency guard R6).
140        // We read the ledger to get the latest disposition per claim.
141        let ledger = self.persistence
142            .load_ledger(&agent_id, None, 10_000)
143            .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
144
145        let challenger_disp = latest_disposition_from_ledger(&ledger, &challenger_ref);
146
147        // State guard: the challenger must still be QueuedForAdjudication.
148        // The incumbent is NOT checked here — it was never moved to QueuedForAdjudication
149        // (only the challenger is gated there). Checking only the challenger is the correct
150        // idempotency guard: a duplicate submit finds the challenger already resolved
151        // (CommittedCheap / Superseded / Contested) and returns AdjudicationHandleNotFound.
152        if challenger_disp != Some(Disposition::QueuedForAdjudication) {
153            return Err(MemError::AdjudicationHandleNotFound { handle_id });
154        }
155
156        // Pre-load edges for supersession BEFORE begin_atomic to avoid reads inside an open transaction.
157        // For Affirm: need to bound the incumbent → load its edges.
158        // For Deny: need to bound the challenger → load its edges.
159        let incumbent_edges = self.persistence
160            .load_edges_for(&agent_id, &incumbent_ref)
161            .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
162        let challenger_edges = self.persistence
163            .load_edges_for(&agent_id, &challenger_ref)
164            .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
165
166        // ── Step 2 / Step 4–6: begin_atomic + apply verdict + mark resolved + commit ──
167        let mut txn = self.persistence
168            .begin_atomic(&agent_id)
169            .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
170
171        let result = self.apply_verdict_within_txn(
172            &response.verdict,
173            &response.evidence_provenance,
174            &agent_id,
175            &challenger_ref,
176            &incumbent_ref,
177            tx_time.clone(),
178            &incumbent_edges,
179            &challenger_edges,
180            &mut txn,
181        );
182
183        match result {
184            Ok(()) => {
185                self.persistence
186                    .commit(txn)
187                    .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
188
189                // ── Step 5: Mark the pending row resolved (outside txn, within write lock) ──
190                self.pending_store
191                    .mark_resolved_erased(handle_id)
192                    .map_err(|e| MemError::PendingStore { source: e })?;
193
194                // ── Step 7: Return AdjudicationOutcome ───────────────────────────────
195                let (outcome_claim_ref, final_disposition) = match &response.verdict {
196                    AdjudicationVerdict::Affirm => {
197                        (challenger_ref, Disposition::CommittedCheap)
198                    }
199                    AdjudicationVerdict::Deny => {
200                        (challenger_ref, Disposition::Superseded)
201                    }
202                    AdjudicationVerdict::Unknown => {
203                        // Return the challenger with its new Contested disposition.
204                        (challenger_ref, Disposition::Contested)
205                    }
206                    // AdjudicationVerdict is #[non_exhaustive] — future verdicts treated as Unknown.
207                    _ => (challenger_ref, Disposition::Contested),
208                };
209
210                Ok(AdjudicationOutcome {
211                    handle_id,
212                    disposition: final_disposition,
213                    claim_ref: outcome_claim_ref,
214                })
215            }
216            Err(e) => {
217                let _ = self.persistence.rollback(txn);
218                Err(e)
219            }
220        }
221    }
222
223    /// Apply the verdict inside the already-open transaction.
224    ///
225    /// Returns `Ok(())` on success; caller derives the outcome disposition from the verdict.
226    #[allow(clippy::too_many_arguments)]
227    fn apply_verdict_within_txn(
228        &self,
229        verdict: &AdjudicationVerdict,
230        evidence_provenance: &mempill_types::ProvenanceLabel,
231        agent_id: &AgentId,
232        challenger_ref: &mempill_types::ClaimRef,
233        incumbent_ref: &mempill_types::ClaimRef,
234        tx_time: TransactionTime,
235        incumbent_edges: &[mempill_types::ClaimEdge],
236        challenger_edges: &[mempill_types::ClaimEdge],
237        txn: &mut P::Transaction,
238    ) -> Result<(), MemError> {
239        match verdict {
240            AdjudicationVerdict::Affirm => {
241                // Affirm: challenger wins.
242                // 1. Bound the incumbent (→ Superseded) + ledger entry.
243                self.bound_claim(
244                    agent_id,
245                    incumbent_ref,
246                    challenger_ref,
247                    tx_time.clone(),
248                    incumbent_edges,
249                    txn,
250                )?;
251                // 2. Write ledger entry for challenger → CommittedCheap with External provenance.
252                let affirm_entry = LedgerEntry {
253                    entry_id: uuid::Uuid::new_v4(),
254                    agent_id: agent_id.clone(),
255                    claim_ref: challenger_ref.clone(),
256                    event_kind: LedgerEventKind::AdjudicationResolved,
257                    disposition: Disposition::CommittedCheap,
258                    rationale: Some(serde_json::json!({
259                        "event": "oracle_affirm",
260                        "verdict": "Affirm",
261                        "evidence_provenance": serde_json::to_value(evidence_provenance).ok(),
262                    })),
263                    recorded_at: tx_time.clone(),
264                };
265                self.persistence
266                    .append_ledger_entry(txn, &affirm_entry)
267                    .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
268                Ok(())
269            }
270
271            AdjudicationVerdict::Deny => {
272                // Deny: incumbent stands.
273                // 1. Bound the challenger (→ Superseded) + ledger entry.
274                self.bound_claim(
275                    agent_id,
276                    challenger_ref,
277                    incumbent_ref,
278                    tx_time.clone(),
279                    challenger_edges,
280                    txn,
281                )?;
282                // Incumbent disposition is unchanged — no ledger entry needed for it.
283                Ok(())
284            }
285
286            AdjudicationVerdict::Unknown => {
287                // Unknown: abstain — no supersession.
288                // Transition both claims from QueuedForAdjudication → Contested.
289                // Write one abstain ledger entry per claim.
290                let rationale = serde_json::json!({
291                    "event": "oracle_abstain",
292                    "verdict": "Unknown",
293                });
294                let challenger_entry = LedgerEntry {
295                    entry_id: uuid::Uuid::new_v4(),
296                    agent_id: agent_id.clone(),
297                    claim_ref: challenger_ref.clone(),
298                    event_kind: LedgerEventKind::AdjudicationResolved,
299                    disposition: Disposition::Contested,
300                    rationale: Some(rationale.clone()),
301                    recorded_at: tx_time.clone(),
302                };
303                self.persistence
304                    .append_ledger_entry(txn, &challenger_entry)
305                    .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
306
307                let incumbent_entry = LedgerEntry {
308                    entry_id: uuid::Uuid::new_v4(),
309                    agent_id: agent_id.clone(),
310                    claim_ref: incumbent_ref.clone(),
311                    event_kind: LedgerEventKind::AdjudicationResolved,
312                    disposition: Disposition::Contested,
313                    rationale: Some(rationale),
314                    recorded_at: tx_time.clone(),
315                };
316                self.persistence
317                    .append_ledger_entry(txn, &incumbent_entry)
318                    .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
319                Ok(())
320            }
321            // AdjudicationVerdict is #[non_exhaustive] — future verdicts treated as Unknown (abstain).
322            _ => Ok(()),
323        }
324    }
325
326    /// Write a Bound ValidityAssertion + Superseded ledger entry for `target_ref`
327    /// inside the open transaction (reuses the supersession.rs pattern).
328    ///
329    /// `overturning_ref` — the claim that caused the bounding (for rationale).
330    /// `preloaded_edges` — DependsOn edges for `target_ref` (loaded before begin_atomic).
331    fn bound_claim(
332        &self,
333        agent_id: &AgentId,
334        target_ref: &mempill_types::ClaimRef,
335        overturning_ref: &mempill_types::ClaimRef,
336        tx_time: TransactionTime,
337        preloaded_edges: &[mempill_types::ClaimEdge],
338        txn: &mut P::Transaction,
339    ) -> Result<(), MemError> {
340        use mempill_types::{EdgeKind, ExternalKind, Confidence};
341
342        // Step A: Bound ValidityAssertion (closes the claim's valid window).
343        let assertion = ValidityAssertion {
344            assertion_ref: uuid::Uuid::new_v4(),
345            agent_id: agent_id.clone(),
346            target_claim: target_ref.clone(),
347            kind: AssertionKind::Bound { bound_at: tx_time.0 },
348            provenance: mempill_types::ProvenanceLabel::External(
349                ExternalKind::ExternalFirstHand,
350            ),
351            confidence: Confidence {
352                value_confidence: 1.0,
353                valid_time_confidence: 1.0,
354            },
355            asserted_at: tx_time.clone(),
356        };
357        self.persistence
358            .append_validity_assertion(txn, &assertion)
359            .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
360
361        // Step B: Superseded ledger entry for the bounded claim.
362        let ledger_entry = LedgerEntry {
363            entry_id: uuid::Uuid::new_v4(),
364            agent_id: agent_id.clone(),
365            claim_ref: target_ref.clone(),
366            event_kind: LedgerEventKind::ValidityAsserted,
367            disposition: Disposition::Superseded,
368            rationale: Some(serde_json::json!({
369                "event": "oracle_supersession",
370                "overturning_claim": overturning_ref.0.to_string(),
371                "bound_at": tx_time.0.to_rfc3339(),
372            })),
373            recorded_at: tx_time.clone(),
374        };
375        self.persistence
376            .append_ledger_entry(txn, &ledger_entry)
377            .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
378
379        // Step C: DependsOn cascade — flag direct dependents of the bounded claim.
380        // (Same A26 cascade pattern as supersession.rs — but we inline it here to
381        // avoid making supersession::execute pub. Uses the same deduplication logic.)
382        use std::collections::HashSet;
383        let mut seen: HashSet<mempill_types::ClaimRef> = HashSet::new();
384        for edge in preloaded_edges {
385            if edge.kind == EdgeKind::DependsOn && edge.to_claim == *target_ref {
386                if !seen.insert(edge.from_claim.clone()) {
387                    continue;
388                }
389                let flag_entry = LedgerEntry {
390                    entry_id: uuid::Uuid::new_v4(),
391                    agent_id: agent_id.clone(),
392                    claim_ref: edge.from_claim.clone(),
393                    event_kind: LedgerEventKind::DependentFlaggedPendingReview,
394                    disposition: Disposition::PendingReview,
395                    rationale: Some(serde_json::json!({
396                        "event": "depends_on_cascade",
397                        "superseded_parent": target_ref.0.to_string(),
398                        "overturning_claim": overturning_ref.0.to_string(),
399                    })),
400                    recorded_at: tx_time.clone(),
401                };
402                self.persistence
403                    .append_ledger_entry(txn, &flag_entry)
404                    .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
405            }
406        }
407        Ok(())
408    }
409}
410
411/// Extract the latest `Disposition` for a given `ClaimRef` from a ledger slice.
412/// Returns `None` if the claim has no ledger entries.
413fn latest_disposition_from_ledger(
414    ledger: &[LedgerEntry],
415    target: &mempill_types::ClaimRef,
416) -> Option<Disposition> {
417    ledger
418        .iter()
419        .filter(|e| &e.claim_ref == target)
420        .max_by_key(|e| e.recorded_at.0)
421        .map(|e| e.disposition.clone())
422}
423
424// ── Tests ─────────────────────────────────────────────────────────────────────
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429    use crate::engine_handle::{ErasedPendingStore, ErasedPendingStoreAdapter};
430    use crate::ports::{
431        PendingAdjudicationPort, PendingAdjudicationRow, PersistencePort, Txn as TxnTrait,
432    };
433    use mempill_types::{
434        AgentId, AdjudicationRequest, AdjudicationResponse, AdjudicationVerdict,
435        Cardinality, Claim, ClaimEdge, ClaimRef, Confidence, Criticality, CurrencySignal,
436        CurrencyState, Disposition, ExternalAnchor, ExternalKind, Fact, LedgerEntry,
437        ProvenanceLabel, TransactionTime, ValidTime, ValidityAssertion,
438    };
439    use std::sync::Mutex;
440
441    // ── Mock Txn ──────────────────────────────────────────────────────────────
442
443    struct MockTxn(AgentId);
444    impl TxnTrait for MockTxn {
445        fn agent_id(&self) -> &AgentId { &self.0 }
446    }
447
448    // ── Mock error ────────────────────────────────────────────────────────────
449
450    #[derive(Debug, thiserror::Error)]
451    #[error("mock error")]
452    struct MockErr;
453
454    // ── Mock persistence (in-memory, append-tracking) ────────────────────────
455
456    #[derive(Default)]
457    struct MockStore {
458        claims: Mutex<Vec<Claim>>,
459        ledger: Mutex<Vec<LedgerEntry>>,
460        validity_assertions: Mutex<Vec<ValidityAssertion>>,
461        /// When set, causes `append_ledger_entry` to fail on the Nth call (1-indexed).
462        fail_on_ledger_write: Mutex<Option<usize>>,
463        ledger_write_count: Mutex<usize>,
464        rollback_called: Mutex<bool>,
465    }
466
467    impl PersistencePort for MockStore {
468        type Transaction = MockTxn;
469        type Error = MockErr;
470
471        fn begin_atomic(&self, agent_id: &AgentId) -> Result<MockTxn, MockErr> {
472            Ok(MockTxn(agent_id.clone()))
473        }
474
475        fn append_claim(&self, _: &mut MockTxn, claim: &Claim) -> Result<ClaimRef, MockErr> {
476            self.claims.lock().unwrap().push(claim.clone());
477            Ok(claim.claim_ref().clone())
478        }
479
480        fn append_validity_assertion(
481            &self,
482            _: &mut MockTxn,
483            a: &ValidityAssertion,
484        ) -> Result<(), MockErr> {
485            self.validity_assertions.lock().unwrap().push(a.clone());
486            Ok(())
487        }
488
489        fn append_ledger_entry(
490            &self,
491            _: &mut MockTxn,
492            e: &LedgerEntry,
493        ) -> Result<(), MockErr> {
494            let mut count = self.ledger_write_count.lock().unwrap();
495            *count += 1;
496            let fail_on = *self.fail_on_ledger_write.lock().unwrap();
497            if fail_on == Some(*count) {
498                return Err(MockErr);
499            }
500            self.ledger.lock().unwrap().push(e.clone());
501            Ok(())
502        }
503
504        fn append_claim_edge(&self, _: &mut MockTxn, _: &ClaimEdge) -> Result<(), MockErr> {
505            Ok(())
506        }
507
508        fn commit(&self, _: MockTxn) -> Result<(), MockErr> { Ok(()) }
509
510        fn rollback(&self, _: MockTxn) -> Result<(), MockErr> {
511            *self.rollback_called.lock().unwrap() = true;
512            Ok(())
513        }
514
515        fn load_subject_line(&self, _: &AgentId, _: &str, _: &str) -> Result<Vec<Claim>, MockErr> {
516            Ok(self.claims.lock().unwrap().clone())
517        }
518
519        fn load_claim(&self, _: &AgentId, r: &ClaimRef) -> Result<Option<Claim>, MockErr> {
520            Ok(self.claims.lock().unwrap().iter().find(|c| c.claim_ref() == r).cloned())
521        }
522
523        fn load_validity_assertions_for(&self, _: &AgentId, _: &ClaimRef) -> Result<Vec<ValidityAssertion>, MockErr> {
524            Ok(vec![])
525        }
526
527        fn load_ledger(&self, _: &AgentId, _: Option<&TransactionTime>, _: usize) -> Result<Vec<LedgerEntry>, MockErr> {
528            Ok(self.ledger.lock().unwrap().clone())
529        }
530
531        fn load_ledger_for_claims(&self, _: &AgentId, _refs: &[ClaimRef]) -> Result<Vec<LedgerEntry>, MockErr> {
532            Ok(vec![])
533        }
534
535        fn load_edges_for(&self, _: &AgentId, _: &ClaimRef) -> Result<Vec<ClaimEdge>, MockErr> {
536            Ok(vec![])
537        }
538
539        fn load_injected_claims(&self, _: &AgentId) -> Result<Vec<ClaimRef>, MockErr> { Ok(vec![]) }
540
541        fn load_lineage(&self, _: &AgentId, _: &ClaimRef) -> Result<Vec<ClaimEdge>, MockErr> { Ok(vec![]) }
542    }
543
544    // ── Mock PendingStore ─────────────────────────────────────────────────────
545
546    #[derive(Default)]
547    struct MockPendingStore {
548        rows: Mutex<Vec<PendingAdjudicationRow>>,
549    }
550
551    impl MockPendingStore {
552        fn seed(&self, row: PendingAdjudicationRow) {
553            self.rows.lock().unwrap().push(row);
554        }
555
556        fn is_resolved(&self, handle_id: uuid::Uuid) -> bool {
557            self.rows.lock().unwrap().iter()
558                .any(|r| r.handle_id == handle_id && r.status == "resolved")
559        }
560    }
561
562    impl PendingAdjudicationPort for MockPendingStore {
563        type Error = MockErr;
564
565        fn insert_pending(&self, row: &PendingAdjudicationRow) -> Result<(), MockErr> {
566            self.rows.lock().unwrap().push(row.clone());
567            Ok(())
568        }
569
570        fn get_pending(&self, handle_id: uuid::Uuid) -> Result<Option<PendingAdjudicationRow>, MockErr> {
571            Ok(self.rows.lock().unwrap().iter().find(|r| r.handle_id == handle_id).cloned())
572        }
573
574        fn list_pending(&self, agent_id: Option<&AgentId>) -> Result<Vec<PendingAdjudicationRow>, MockErr> {
575            Ok(self.rows.lock().unwrap().iter()
576                .filter(|r| agent_id.is_none_or(|a| r.agent_id == *a) && r.status == "pending")
577                .cloned()
578                .collect())
579        }
580
581        fn list_expired(&self, now: chrono::DateTime<Utc>) -> Result<Vec<PendingAdjudicationRow>, MockErr> {
582            Ok(self.rows.lock().unwrap().iter()
583                .filter(|r| r.status == "pending" && r.expires_at.is_some_and(|e| e <= now))
584                .cloned()
585                .collect())
586        }
587
588        fn mark_resolved(&self, handle_id: uuid::Uuid) -> Result<(), MockErr> {
589            for r in self.rows.lock().unwrap().iter_mut() {
590                if r.handle_id == handle_id {
591                    r.status = "resolved".to_string();
592                }
593            }
594            Ok(())
595        }
596
597        fn mark_expired(&self, handle_id: uuid::Uuid) -> Result<(), MockErr> {
598            for r in self.rows.lock().unwrap().iter_mut() {
599                if r.handle_id == handle_id {
600                    r.status = "expired".to_string();
601                }
602            }
603            Ok(())
604        }
605
606        fn list_queued_orphan_claims(&self) -> Result<Vec<crate::ports::pending_adjudication::OrphanedQueuedClaim>, MockErr> {
607            Ok(vec![])
608        }
609    }
610
611    // ── Test helpers ──────────────────────────────────────────────────────────
612
613    fn make_agent() -> AgentId { AgentId("test-agent".into()) }
614
615    fn make_claim(agent: &AgentId) -> Claim {
616        Claim::new(
617            ClaimRef::new_random(),
618            agent.clone(),
619            Fact { subject: "user".into(), predicate: "city".into(), value: serde_json::json!("Berlin") },
620            Cardinality::Functional,
621            ProvenanceLabel::External(ExternalKind::UserAsserted),
622            ExternalAnchor { nearest_external_anchor: None, derivation_depth: 0 },
623            TransactionTime(Utc::now()),
624            ValidTime { start: None, end: None, valid_time_confidence: 0.0 },
625            Confidence { value_confidence: 0.9, valid_time_confidence: 0.0 },
626            Criticality::Medium,
627            vec![],
628            None,
629            None,
630        )
631    }
632
633    fn make_dummy_adj_request(agent: &AgentId) -> AdjudicationRequest {
634        AdjudicationRequest {
635            subject_line: mempill_types::SubjectLineRef {
636                agent_id: agent.clone(),
637                subject: "user".into(),
638                predicate: "city".into(),
639            },
640            incumbent: mempill_types::Belief {
641                claim_ref: ClaimRef::new_random(),
642                fact: Fact { subject: "user".into(), predicate: "city".into(), value: serde_json::json!("Berlin") },
643                provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
644                valid_time: ValidTime { start: None, end: None, valid_time_confidence: 0.0 },
645                transaction_time: TransactionTime(Utc::now()),
646                confidence: Confidence { value_confidence: 0.9, valid_time_confidence: 0.0 },
647                currency_signal: CurrencySignal {
648                    last_refreshed_at: TransactionTime(Utc::now()),
649                    state: CurrencyState::Fresh,
650                    corroboration_count: 0,
651                },
652                criticality: Criticality::Medium,
653            },
654            challenger: make_claim(agent),
655            criticality: Criticality::Medium,
656            reason: mempill_types::OverturnReason::ExternalContradiction,
657        }
658    }
659
660    /// Build a pending row with both claims at QueuedForAdjudication in the ledger.
661    fn setup_queued_scenario(
662        store: &MockStore,
663        pending: &MockPendingStore,
664        handle_id: uuid::Uuid,
665    ) -> (ClaimRef, ClaimRef) {
666        let agent = make_agent();
667        let challenger = make_claim(&agent);
668        let incumbent = make_claim(&agent);
669        let now = Utc::now();
670
671        // The challenger is QueuedForAdjudication; the incumbent is CommittedCheap.
672        // (Only the challenger is gated to QueuedForAdjudication by the engine.)
673        store.ledger.lock().unwrap().push(LedgerEntry {
674            entry_id: uuid::Uuid::new_v4(),
675            agent_id: agent.clone(),
676            claim_ref: challenger.claim_ref().clone(),
677            event_kind: LedgerEventKind::ClaimCommitted,
678            disposition: Disposition::QueuedForAdjudication,
679            rationale: None,
680            recorded_at: TransactionTime(now - chrono::Duration::seconds(5)),
681        });
682        store.ledger.lock().unwrap().push(LedgerEntry {
683            entry_id: uuid::Uuid::new_v4(),
684            agent_id: agent.clone(),
685            claim_ref: incumbent.claim_ref().clone(),
686            event_kind: LedgerEventKind::ClaimCommitted,
687            disposition: Disposition::CommittedCheap, // incumbent stays committed
688            rationale: None,
689            recorded_at: TransactionTime(now - chrono::Duration::seconds(10)),
690        });
691
692        // Seed the pending row.
693        pending.seed(PendingAdjudicationRow {
694            handle_id,
695            agent_id: agent.clone(),
696            subject: "user".into(),
697            predicate: "city".into(),
698            challenger_claim_ref: challenger.claim_ref().clone(),
699            incumbent_claim_ref: incumbent.claim_ref().clone(),
700            request_payload: make_dummy_adj_request(&agent),
701            queued_at: now - chrono::Duration::seconds(10),
702            expires_at: None,
703            status: "pending".to_string(),
704        });
705
706        (challenger.claim_ref().clone(), incumbent.claim_ref().clone())
707    }
708
709    fn build_use_case(
710        store: Arc<MockStore>,
711        pending: Arc<MockPendingStore>,
712    ) -> SubmitAdjudicationUseCase<MockStore> {
713        let erased: Arc<dyn ErasedPendingStore> =
714            Arc::new(ErasedPendingStoreAdapter::new({
715                struct Delegate(Arc<MockPendingStore>);
716                impl PendingAdjudicationPort for Delegate {
717                    type Error = MockErr;
718                    fn insert_pending(&self, r: &PendingAdjudicationRow) -> Result<(), MockErr> { self.0.insert_pending(r) }
719                    fn get_pending(&self, h: uuid::Uuid) -> Result<Option<PendingAdjudicationRow>, MockErr> { self.0.get_pending(h) }
720                    fn list_pending(&self, a: Option<&AgentId>) -> Result<Vec<PendingAdjudicationRow>, MockErr> { self.0.list_pending(a) }
721                    fn list_expired(&self, n: chrono::DateTime<Utc>) -> Result<Vec<PendingAdjudicationRow>, MockErr> { self.0.list_expired(n) }
722                    fn mark_resolved(&self, h: uuid::Uuid) -> Result<(), MockErr> { self.0.mark_resolved(h) }
723                    fn mark_expired(&self, h: uuid::Uuid) -> Result<(), MockErr> { self.0.mark_expired(h) }
724                    fn list_queued_orphan_claims(&self) -> Result<Vec<crate::ports::pending_adjudication::OrphanedQueuedClaim>, MockErr> { self.0.list_queued_orphan_claims() }
725                }
726                Delegate(Arc::clone(&pending))
727            }));
728        SubmitAdjudicationUseCase::new(store, erased)
729    }
730
731    // ── Test: unknown handle → AdjudicationHandleNotFound ────────────────────
732
733    #[test]
734    fn unknown_handle_returns_handle_not_found() {
735        let store = Arc::new(MockStore::default());
736        let pending = Arc::new(MockPendingStore::default());
737        let uc = build_use_case(Arc::clone(&store), Arc::clone(&pending));
738
739        let response = AdjudicationResponse {
740            handle_id: uuid::Uuid::new_v4(),
741            verdict: AdjudicationVerdict::Affirm,
742            evidence_provenance: ProvenanceLabel::External(ExternalKind::ExternalFirstHand),
743        };
744        let result = uc.execute(response.handle_id, response, Utc::now());
745        assert!(matches!(result, Err(MemError::AdjudicationHandleNotFound { .. })));
746    }
747
748    // ── Test: Affirm — challenger CommittedCheap, incumbent Superseded, 2 ledger entries ──
749
750    #[test]
751    fn affirm_challenger_committed_cheap_incumbent_superseded_two_ledger_entries() {
752        let store = Arc::new(MockStore::default());
753        let pending = Arc::new(MockPendingStore::default());
754        let handle_id = uuid::Uuid::new_v4();
755        let (challenger_ref, incumbent_ref) =
756            setup_queued_scenario(&store, &pending, handle_id);
757
758        let uc = build_use_case(Arc::clone(&store), Arc::clone(&pending));
759        let response = AdjudicationResponse {
760            handle_id,
761            verdict: AdjudicationVerdict::Affirm,
762            evidence_provenance: ProvenanceLabel::External(ExternalKind::ExternalFirstHand),
763        };
764        let outcome = uc.execute(handle_id, response, Utc::now()).unwrap();
765
766        assert_eq!(outcome.handle_id, handle_id);
767        assert_eq!(outcome.disposition, Disposition::CommittedCheap);
768        assert_eq!(outcome.claim_ref, challenger_ref);
769
770        let ledger = store.ledger.lock().unwrap();
771        // Filter only the entries written by this use-case (AdjudicationResolved + ValidityAsserted).
772        let resolution_entries: Vec<_> = ledger.iter()
773            .filter(|e| e.event_kind == LedgerEventKind::AdjudicationResolved
774                || e.event_kind == LedgerEventKind::ValidityAsserted)
775            .collect();
776        assert_eq!(resolution_entries.len(), 2, "Affirm must write exactly 2 ledger entries");
777
778        // Challenger entry: AdjudicationResolved + CommittedCheap.
779        let challenger_entry = resolution_entries.iter()
780            .find(|e| e.claim_ref == challenger_ref && e.event_kind == LedgerEventKind::AdjudicationResolved)
781            .expect("challenger AdjudicationResolved entry must exist");
782        assert_eq!(challenger_entry.disposition, Disposition::CommittedCheap);
783
784        // Incumbent entry: ValidityAsserted + Superseded.
785        let incumbent_entry = resolution_entries.iter()
786            .find(|e| e.claim_ref == incumbent_ref)
787            .expect("incumbent ValidityAsserted entry must exist");
788        assert_eq!(incumbent_entry.disposition, Disposition::Superseded);
789
790        // Validity assertion (Bound) written for incumbent.
791        let assertions = store.validity_assertions.lock().unwrap();
792        assert_eq!(assertions.len(), 1, "one Bound assertion for incumbent");
793        assert_eq!(assertions[0].target_claim, incumbent_ref);
794
795        // Pending row marked resolved.
796        assert!(pending.is_resolved(handle_id), "pending row must be resolved");
797    }
798
799    // ── Test: Affirm — External provenance stamped on challenger ledger entry ──
800
801    #[test]
802    fn affirm_challenger_entry_has_external_provenance_in_rationale() {
803        let store = Arc::new(MockStore::default());
804        let pending = Arc::new(MockPendingStore::default());
805        let handle_id = uuid::Uuid::new_v4();
806        setup_queued_scenario(&store, &pending, handle_id);
807
808        let uc = build_use_case(Arc::clone(&store), Arc::clone(&pending));
809        let evidence = ProvenanceLabel::External(ExternalKind::ExternalFirstHand);
810        let response = AdjudicationResponse {
811            handle_id,
812            verdict: AdjudicationVerdict::Affirm,
813            evidence_provenance: evidence.clone(),
814        };
815        uc.execute(handle_id, response, Utc::now()).unwrap();
816
817        let ledger = store.ledger.lock().unwrap();
818        let affirm_entry = ledger.iter()
819            .find(|e| e.event_kind == LedgerEventKind::AdjudicationResolved
820                && e.disposition == Disposition::CommittedCheap)
821            .expect("affirm ledger entry must exist");
822        let rationale = affirm_entry.rationale.as_ref().expect("rationale must be present");
823        let rationale_str = rationale.to_string();
824        assert!(rationale_str.contains("Affirm"), "rationale must mention Affirm verdict");
825        assert!(rationale_str.contains("ExternalFirstHand"), "rationale must include evidence provenance");
826    }
827
828    // ── Test: Deny — challenger Superseded, 1 ledger entry ───────────────────
829
830    #[test]
831    fn deny_challenger_superseded_one_ledger_entry() {
832        let store = Arc::new(MockStore::default());
833        let pending = Arc::new(MockPendingStore::default());
834        let handle_id = uuid::Uuid::new_v4();
835        let (challenger_ref, incumbent_ref) =
836            setup_queued_scenario(&store, &pending, handle_id);
837
838        let uc = build_use_case(Arc::clone(&store), Arc::clone(&pending));
839        let response = AdjudicationResponse {
840            handle_id,
841            verdict: AdjudicationVerdict::Deny,
842            evidence_provenance: ProvenanceLabel::External(ExternalKind::ExternalFirstHand),
843        };
844        let outcome = uc.execute(handle_id, response, Utc::now()).unwrap();
845
846        assert_eq!(outcome.disposition, Disposition::Superseded);
847        assert_eq!(outcome.claim_ref, challenger_ref);
848
849        let ledger = store.ledger.lock().unwrap();
850        let resolution_entries: Vec<_> = ledger.iter()
851            .filter(|e| e.event_kind == LedgerEventKind::ValidityAsserted)
852            .collect();
853        assert_eq!(resolution_entries.len(), 1, "Deny must write exactly 1 ValidityAsserted entry");
854        assert_eq!(resolution_entries[0].claim_ref, challenger_ref);
855        assert_eq!(resolution_entries[0].disposition, Disposition::Superseded);
856
857        // Validity assertion (Bound) written for challenger.
858        let assertions = store.validity_assertions.lock().unwrap();
859        assert_eq!(assertions.len(), 1, "one Bound assertion for challenger");
860        assert_eq!(assertions[0].target_claim, challenger_ref);
861
862        // Incumbent has NO new entry — it remains committed.
863        let incumbent_resolution = ledger.iter()
864            .filter(|e| e.claim_ref == incumbent_ref
865                && (e.event_kind == LedgerEventKind::AdjudicationResolved
866                    || e.event_kind == LedgerEventKind::ValidityAsserted))
867            .count();
868        assert_eq!(incumbent_resolution, 0, "Deny must not touch the incumbent");
869
870        assert!(pending.is_resolved(handle_id));
871    }
872
873    // ── Test: Unknown — both Contested, 2 ledger entries, no Bound assertion ──
874
875    #[test]
876    fn unknown_both_contested_two_ledger_entries_no_bound_assertion() {
877        let store = Arc::new(MockStore::default());
878        let pending = Arc::new(MockPendingStore::default());
879        let handle_id = uuid::Uuid::new_v4();
880        let (challenger_ref, incumbent_ref) =
881            setup_queued_scenario(&store, &pending, handle_id);
882
883        let uc = build_use_case(Arc::clone(&store), Arc::clone(&pending));
884        let response = AdjudicationResponse {
885            handle_id,
886            verdict: AdjudicationVerdict::Unknown,
887            evidence_provenance: ProvenanceLabel::External(ExternalKind::ExternalFirstHand),
888        };
889        let outcome = uc.execute(handle_id, response, Utc::now()).unwrap();
890
891        assert_eq!(outcome.disposition, Disposition::Contested);
892        assert_eq!(outcome.claim_ref, challenger_ref);
893
894        let ledger = store.ledger.lock().unwrap();
895        let abstain_entries: Vec<_> = ledger.iter()
896            .filter(|e| e.event_kind == LedgerEventKind::AdjudicationResolved)
897            .collect();
898        assert_eq!(abstain_entries.len(), 2, "Unknown must write 2 AdjudicationResolved entries (one per claim)");
899
900        let ch_entry = abstain_entries.iter().find(|e| e.claim_ref == challenger_ref).unwrap();
901        let inc_entry = abstain_entries.iter().find(|e| e.claim_ref == incumbent_ref).unwrap();
902        assert_eq!(ch_entry.disposition, Disposition::Contested);
903        assert_eq!(inc_entry.disposition, Disposition::Contested);
904
905        // No Bound assertions for Unknown.
906        let assertions = store.validity_assertions.lock().unwrap();
907        assert_eq!(assertions.len(), 0, "Unknown must not write any Bound assertions");
908
909        assert!(pending.is_resolved(handle_id));
910    }
911
912    // ── Test: duplicate submit → AdjudicationHandleNotFound ──────────────────
913
914    #[test]
915    fn duplicate_submit_returns_handle_not_found() {
916        let store = Arc::new(MockStore::default());
917        let pending = Arc::new(MockPendingStore::default());
918        let handle_id = uuid::Uuid::new_v4();
919        setup_queued_scenario(&store, &pending, handle_id);
920
921        let uc = build_use_case(Arc::clone(&store), Arc::clone(&pending));
922        let mk_response = || AdjudicationResponse {
923            handle_id,
924            verdict: AdjudicationVerdict::Deny,
925            evidence_provenance: ProvenanceLabel::External(ExternalKind::ExternalFirstHand),
926        };
927
928        // First submit succeeds.
929        uc.execute(handle_id, mk_response(), Utc::now()).unwrap();
930
931        // After first submit the state guard finds claims are no longer QueuedForAdjudication,
932        // so second submit returns AdjudicationHandleNotFound.
933        let result = uc.execute(handle_id, mk_response(), Utc::now());
934        assert!(
935            matches!(result, Err(MemError::AdjudicationHandleNotFound { .. })),
936            "duplicate submit must return AdjudicationHandleNotFound"
937        );
938    }
939
940    // ── Test: stale (challenger no longer QueuedForAdjudication) → HandleNotFound ─
941    //
942    // The state guard checks only the challenger's disposition. If the challenger has
943    // already been resolved (CommittedCheap / Superseded / Contested), it's stale.
944    // The incumbent's disposition is NOT checked here — it was never QueuedForAdjudication.
945
946    #[test]
947    fn stale_challenger_not_queued_returns_handle_not_found() {
948        let store = Arc::new(MockStore::default());
949        let pending = Arc::new(MockPendingStore::default());
950        let handle_id = uuid::Uuid::new_v4();
951        let agent = make_agent();
952        let challenger = make_claim(&agent);
953        let incumbent = make_claim(&agent);
954        let now = Utc::now();
955
956        // Seed challenger ledger with CommittedCheap (already resolved — stale).
957        store.ledger.lock().unwrap().push(LedgerEntry {
958            entry_id: uuid::Uuid::new_v4(),
959            agent_id: agent.clone(),
960            claim_ref: challenger.claim_ref().clone(),
961            event_kind: LedgerEventKind::ClaimCommitted,
962            disposition: Disposition::CommittedCheap, // NOT queued — already resolved
963            rationale: None,
964            recorded_at: TransactionTime(now),
965        });
966        // Incumbent stays CommittedCheap (normal state before adjudication).
967        store.ledger.lock().unwrap().push(LedgerEntry {
968            entry_id: uuid::Uuid::new_v4(),
969            agent_id: agent.clone(),
970            claim_ref: incumbent.claim_ref().clone(),
971            event_kind: LedgerEventKind::ClaimCommitted,
972            disposition: Disposition::CommittedCheap,
973            rationale: None,
974            recorded_at: TransactionTime(now),
975        });
976
977        pending.seed(PendingAdjudicationRow {
978            handle_id,
979            agent_id: agent.clone(),
980            subject: "user".into(),
981            predicate: "city".into(),
982            challenger_claim_ref: challenger.claim_ref().clone(),
983            incumbent_claim_ref: incumbent.claim_ref().clone(),
984            request_payload: make_dummy_adj_request(&agent),
985            queued_at: now,
986            expires_at: None,
987            status: "pending".to_string(),
988        });
989
990        let uc = build_use_case(Arc::clone(&store), Arc::clone(&pending));
991        let response = AdjudicationResponse {
992            handle_id,
993            verdict: AdjudicationVerdict::Affirm,
994            evidence_provenance: ProvenanceLabel::External(ExternalKind::ExternalFirstHand),
995        };
996        let result = uc.execute(handle_id, response, Utc::now());
997        assert!(
998            matches!(result, Err(MemError::AdjudicationHandleNotFound { .. })),
999            "stale challenger (not QueuedForAdjudication) must return AdjudicationHandleNotFound"
1000        );
1001    }
1002
1003    // ── Test: expired handle → AdjudicationHandleNotFound ────────────────────
1004
1005    #[test]
1006    fn expired_handle_returns_handle_not_found() {
1007        let store = Arc::new(MockStore::default());
1008        let pending = Arc::new(MockPendingStore::default());
1009        let handle_id = uuid::Uuid::new_v4();
1010        let agent = make_agent();
1011        let challenger = make_claim(&agent);
1012        let incumbent = make_claim(&agent);
1013        let past = Utc::now() - chrono::Duration::hours(2);
1014
1015        // Seed with expires_at in the past.
1016        pending.seed(PendingAdjudicationRow {
1017            handle_id,
1018            agent_id: agent.clone(),
1019            subject: "user".into(),
1020            predicate: "city".into(),
1021            challenger_claim_ref: challenger.claim_ref().clone(),
1022            incumbent_claim_ref: incumbent.claim_ref().clone(),
1023            request_payload: make_dummy_adj_request(&agent),
1024            queued_at: past - chrono::Duration::hours(1),
1025            expires_at: Some(past), // expired
1026            status: "pending".to_string(),
1027        });
1028
1029        let uc = build_use_case(Arc::clone(&store), Arc::clone(&pending));
1030        let response = AdjudicationResponse {
1031            handle_id,
1032            verdict: AdjudicationVerdict::Affirm,
1033            evidence_provenance: ProvenanceLabel::External(ExternalKind::ExternalFirstHand),
1034        };
1035        let result = uc.execute(handle_id, response, Utc::now());
1036        assert!(
1037            matches!(result, Err(MemError::AdjudicationHandleNotFound { .. })),
1038            "expired handle must return AdjudicationHandleNotFound"
1039        );
1040    }
1041
1042    // ── Test: atomicity — failure mid-apply → no partial state ───────────────
1043
1044    #[test]
1045    fn atomicity_failure_mid_apply_no_partial_state() {
1046        let store = Arc::new(MockStore::default());
1047        let pending = Arc::new(MockPendingStore::default());
1048        let handle_id = uuid::Uuid::new_v4();
1049        setup_queued_scenario(&store, &pending, handle_id);
1050
1051        // Make the 2nd ledger write fail (after the Bound assertion but during the first ledger entry).
1052        *store.fail_on_ledger_write.lock().unwrap() = Some(1);
1053
1054        let uc = build_use_case(Arc::clone(&store), Arc::clone(&pending));
1055        let response = AdjudicationResponse {
1056            handle_id,
1057            verdict: AdjudicationVerdict::Affirm,
1058            evidence_provenance: ProvenanceLabel::External(ExternalKind::ExternalFirstHand),
1059        };
1060        let result = uc.execute(handle_id, response, Utc::now());
1061        assert!(result.is_err(), "must propagate the injected failure");
1062
1063        // After failure + rollback: only the seeded QueuedForAdjudication entries remain.
1064        let ledger = store.ledger.lock().unwrap();
1065        let resolution_entries: Vec<_> = ledger.iter()
1066            .filter(|e| e.event_kind == LedgerEventKind::AdjudicationResolved
1067                || e.event_kind == LedgerEventKind::ValidityAsserted)
1068            .collect();
1069        assert_eq!(
1070            resolution_entries.len(), 0,
1071            "no resolution ledger entries must remain after mid-apply failure"
1072        );
1073        assert!(*store.rollback_called.lock().unwrap(), "rollback must be called");
1074    }
1075}