1#![allow(missing_docs)]
2use std::sync::Arc;
29
30use chrono::{DateTime, Utc};
31use mempill_types::{
32 AgentId, AdjudicationOutcome, AdjudicationVerdict, AssertionKind, Disposition,
33 LedgerEntry, LedgerEventKind, TransactionTime, ValidityAssertion,
34};
35
36use crate::{
37 engine_handle::ErasedPendingStore,
38 error::MemError,
39 ports::PersistencePort,
40};
41
42pub struct SubmitAdjudicationUseCase<P>
46where
47 P: PersistencePort + Send + Sync + 'static,
48{
49 persistence: Arc<P>,
50 pending_store: Arc<dyn ErasedPendingStore>,
51}
52
53impl<P> SubmitAdjudicationUseCase<P>
54where
55 P: PersistencePort + Send + Sync + 'static,
56{
57 pub fn new(persistence: Arc<P>, pending_store: Arc<dyn ErasedPendingStore>) -> Self {
58 Self { persistence, pending_store }
59 }
60
61 pub fn execute(
65 &self,
66 handle_id: uuid::Uuid,
67 response: mempill_types::AdjudicationResponse,
68 now: DateTime<Utc>,
69 ) -> Result<AdjudicationOutcome, MemError> {
70 let tx_time = TransactionTime(now);
71
72 let row = self.pending_store
74 .get_pending_erased(handle_id)
75 .map_err(|e| MemError::PendingStore { source: e })?
76 .ok_or(MemError::AdjudicationHandleNotFound { handle_id })?;
77
78 if let Some(expires_at) = row.expires_at {
82 if expires_at <= now {
83 let agent_id_exp: AgentId = row.agent_id.clone();
85 let challenger_ref_exp = row.challenger_claim_ref.clone();
86 let handle_id_exp = handle_id;
87
88 let ledger_check = self.persistence
90 .load_ledger(&agent_id_exp, None, 10_000)
91 .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
92 let challenger_disp = latest_disposition_from_ledger(&ledger_check, &challenger_ref_exp);
93
94 if challenger_disp == Some(Disposition::QueuedForAdjudication) {
95 let mut txn = self.persistence
97 .begin_atomic(&agent_id_exp)
98 .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
99
100 let expired_entry = mempill_types::LedgerEntry {
101 entry_id: uuid::Uuid::new_v4(),
102 agent_id: agent_id_exp.clone(),
103 claim_ref: challenger_ref_exp.clone(),
104 event_kind: LedgerEventKind::AdjudicationExpired,
105 disposition: Disposition::Contested,
106 rationale: Some(serde_json::json!({
107 "event": "adjudication_ttl_expired_lazy",
108 "handle_id": handle_id_exp.to_string(),
109 "expired_at": expires_at.to_rfc3339(),
110 "incumbent_claim_ref": row.incumbent_claim_ref.0.to_string(),
111 })),
112 recorded_at: tx_time.clone(),
113 };
114
115 match self.persistence.append_ledger_entry(&mut txn, &expired_entry) {
116 Ok(()) => {
117 if let Err(e) = self.persistence.commit(txn) {
118 return Err(MemError::Persistence { source: Box::new(e) });
119 }
120 let _ = self.pending_store.mark_expired_erased(handle_id_exp);
122 }
123 Err(e) => {
124 let _ = self.persistence.rollback(txn);
125 return Err(MemError::Persistence { source: Box::new(e) });
126 }
127 }
128 }
129
130 return Err(MemError::AdjudicationHandleNotFound { handle_id });
131 }
132 }
133
134 let agent_id: AgentId = row.agent_id.clone();
135 let challenger_ref = row.challenger_claim_ref.clone();
136 let incumbent_ref = row.incumbent_claim_ref.clone();
137
138 let ledger = self.persistence
142 .load_ledger(&agent_id, None, 10_000)
143 .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
144
145 let challenger_disp = latest_disposition_from_ledger(&ledger, &challenger_ref);
146
147 if challenger_disp != Some(Disposition::QueuedForAdjudication) {
153 return Err(MemError::AdjudicationHandleNotFound { handle_id });
154 }
155
156 let incumbent_edges = self.persistence
160 .load_edges_for(&agent_id, &incumbent_ref)
161 .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
162 let challenger_edges = self.persistence
163 .load_edges_for(&agent_id, &challenger_ref)
164 .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
165
166 let mut txn = self.persistence
168 .begin_atomic(&agent_id)
169 .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
170
171 let result = self.apply_verdict_within_txn(
172 &response.verdict,
173 &response.evidence_provenance,
174 &agent_id,
175 &challenger_ref,
176 &incumbent_ref,
177 tx_time.clone(),
178 &incumbent_edges,
179 &challenger_edges,
180 &mut txn,
181 );
182
183 match result {
184 Ok(()) => {
185 self.persistence
186 .commit(txn)
187 .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
188
189 self.pending_store
191 .mark_resolved_erased(handle_id)
192 .map_err(|e| MemError::PendingStore { source: e })?;
193
194 let (outcome_claim_ref, final_disposition) = match &response.verdict {
196 AdjudicationVerdict::Affirm => {
197 (challenger_ref, Disposition::CommittedCheap)
198 }
199 AdjudicationVerdict::Deny => {
200 (challenger_ref, Disposition::Superseded)
201 }
202 AdjudicationVerdict::Unknown => {
203 (challenger_ref, Disposition::Contested)
205 }
206 _ => (challenger_ref, Disposition::Contested),
208 };
209
210 Ok(AdjudicationOutcome {
211 handle_id,
212 disposition: final_disposition,
213 claim_ref: outcome_claim_ref,
214 })
215 }
216 Err(e) => {
217 let _ = self.persistence.rollback(txn);
218 Err(e)
219 }
220 }
221 }
222
223 #[allow(clippy::too_many_arguments)]
227 fn apply_verdict_within_txn(
228 &self,
229 verdict: &AdjudicationVerdict,
230 evidence_provenance: &mempill_types::ProvenanceLabel,
231 agent_id: &AgentId,
232 challenger_ref: &mempill_types::ClaimRef,
233 incumbent_ref: &mempill_types::ClaimRef,
234 tx_time: TransactionTime,
235 incumbent_edges: &[mempill_types::ClaimEdge],
236 challenger_edges: &[mempill_types::ClaimEdge],
237 txn: &mut P::Transaction,
238 ) -> Result<(), MemError> {
239 match verdict {
240 AdjudicationVerdict::Affirm => {
241 self.bound_claim(
244 agent_id,
245 incumbent_ref,
246 challenger_ref,
247 tx_time.clone(),
248 incumbent_edges,
249 txn,
250 )?;
251 let affirm_entry = LedgerEntry {
253 entry_id: uuid::Uuid::new_v4(),
254 agent_id: agent_id.clone(),
255 claim_ref: challenger_ref.clone(),
256 event_kind: LedgerEventKind::AdjudicationResolved,
257 disposition: Disposition::CommittedCheap,
258 rationale: Some(serde_json::json!({
259 "event": "oracle_affirm",
260 "verdict": "Affirm",
261 "evidence_provenance": serde_json::to_value(evidence_provenance).ok(),
262 })),
263 recorded_at: tx_time.clone(),
264 };
265 self.persistence
266 .append_ledger_entry(txn, &affirm_entry)
267 .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
268 Ok(())
269 }
270
271 AdjudicationVerdict::Deny => {
272 self.bound_claim(
275 agent_id,
276 challenger_ref,
277 incumbent_ref,
278 tx_time.clone(),
279 challenger_edges,
280 txn,
281 )?;
282 Ok(())
284 }
285
286 AdjudicationVerdict::Unknown => {
287 let rationale = serde_json::json!({
291 "event": "oracle_abstain",
292 "verdict": "Unknown",
293 });
294 let challenger_entry = LedgerEntry {
295 entry_id: uuid::Uuid::new_v4(),
296 agent_id: agent_id.clone(),
297 claim_ref: challenger_ref.clone(),
298 event_kind: LedgerEventKind::AdjudicationResolved,
299 disposition: Disposition::Contested,
300 rationale: Some(rationale.clone()),
301 recorded_at: tx_time.clone(),
302 };
303 self.persistence
304 .append_ledger_entry(txn, &challenger_entry)
305 .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
306
307 let incumbent_entry = LedgerEntry {
308 entry_id: uuid::Uuid::new_v4(),
309 agent_id: agent_id.clone(),
310 claim_ref: incumbent_ref.clone(),
311 event_kind: LedgerEventKind::AdjudicationResolved,
312 disposition: Disposition::Contested,
313 rationale: Some(rationale),
314 recorded_at: tx_time.clone(),
315 };
316 self.persistence
317 .append_ledger_entry(txn, &incumbent_entry)
318 .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
319 Ok(())
320 }
321 _ => Ok(()),
323 }
324 }
325
326 fn bound_claim(
332 &self,
333 agent_id: &AgentId,
334 target_ref: &mempill_types::ClaimRef,
335 overturning_ref: &mempill_types::ClaimRef,
336 tx_time: TransactionTime,
337 preloaded_edges: &[mempill_types::ClaimEdge],
338 txn: &mut P::Transaction,
339 ) -> Result<(), MemError> {
340 use mempill_types::{EdgeKind, ExternalKind, Confidence};
341
342 let assertion = ValidityAssertion {
344 assertion_ref: uuid::Uuid::new_v4(),
345 agent_id: agent_id.clone(),
346 target_claim: target_ref.clone(),
347 kind: AssertionKind::Bound { bound_at: tx_time.0 },
348 provenance: mempill_types::ProvenanceLabel::External(
349 ExternalKind::ExternalFirstHand,
350 ),
351 confidence: Confidence {
352 value_confidence: 1.0,
353 valid_time_confidence: 1.0,
354 },
355 asserted_at: tx_time.clone(),
356 };
357 self.persistence
358 .append_validity_assertion(txn, &assertion)
359 .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
360
361 let ledger_entry = LedgerEntry {
363 entry_id: uuid::Uuid::new_v4(),
364 agent_id: agent_id.clone(),
365 claim_ref: target_ref.clone(),
366 event_kind: LedgerEventKind::ValidityAsserted,
367 disposition: Disposition::Superseded,
368 rationale: Some(serde_json::json!({
369 "event": "oracle_supersession",
370 "overturning_claim": overturning_ref.0.to_string(),
371 "bound_at": tx_time.0.to_rfc3339(),
372 })),
373 recorded_at: tx_time.clone(),
374 };
375 self.persistence
376 .append_ledger_entry(txn, &ledger_entry)
377 .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
378
379 use std::collections::HashSet;
383 let mut seen: HashSet<mempill_types::ClaimRef> = HashSet::new();
384 for edge in preloaded_edges {
385 if edge.kind == EdgeKind::DependsOn && edge.to_claim == *target_ref {
386 if !seen.insert(edge.from_claim.clone()) {
387 continue;
388 }
389 let flag_entry = LedgerEntry {
390 entry_id: uuid::Uuid::new_v4(),
391 agent_id: agent_id.clone(),
392 claim_ref: edge.from_claim.clone(),
393 event_kind: LedgerEventKind::DependentFlaggedPendingReview,
394 disposition: Disposition::PendingReview,
395 rationale: Some(serde_json::json!({
396 "event": "depends_on_cascade",
397 "superseded_parent": target_ref.0.to_string(),
398 "overturning_claim": overturning_ref.0.to_string(),
399 })),
400 recorded_at: tx_time.clone(),
401 };
402 self.persistence
403 .append_ledger_entry(txn, &flag_entry)
404 .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
405 }
406 }
407 Ok(())
408 }
409}
410
411fn latest_disposition_from_ledger(
414 ledger: &[LedgerEntry],
415 target: &mempill_types::ClaimRef,
416) -> Option<Disposition> {
417 ledger
418 .iter()
419 .filter(|e| &e.claim_ref == target)
420 .max_by_key(|e| e.recorded_at.0)
421 .map(|e| e.disposition.clone())
422}
423
424#[cfg(test)]
427mod tests {
428 use super::*;
429 use crate::engine_handle::{ErasedPendingStore, ErasedPendingStoreAdapter};
430 use crate::ports::{
431 PendingAdjudicationPort, PendingAdjudicationRow, PersistencePort, Txn as TxnTrait,
432 };
433 use mempill_types::{
434 AgentId, AdjudicationRequest, AdjudicationResponse, AdjudicationVerdict,
435 Cardinality, Claim, ClaimEdge, ClaimRef, Confidence, Criticality, CurrencySignal,
436 CurrencyState, Disposition, ExternalAnchor, ExternalKind, Fact, LedgerEntry,
437 ProvenanceLabel, TransactionTime, ValidTime, ValidityAssertion,
438 };
439 use std::sync::Mutex;
440
441 struct MockTxn(AgentId);
444 impl TxnTrait for MockTxn {
445 fn agent_id(&self) -> &AgentId { &self.0 }
446 }
447
448 #[derive(Debug, thiserror::Error)]
451 #[error("mock error")]
452 struct MockErr;
453
454 #[derive(Default)]
457 struct MockStore {
458 claims: Mutex<Vec<Claim>>,
459 ledger: Mutex<Vec<LedgerEntry>>,
460 validity_assertions: Mutex<Vec<ValidityAssertion>>,
461 fail_on_ledger_write: Mutex<Option<usize>>,
463 ledger_write_count: Mutex<usize>,
464 rollback_called: Mutex<bool>,
465 }
466
467 impl PersistencePort for MockStore {
468 type Transaction = MockTxn;
469 type Error = MockErr;
470
471 fn begin_atomic(&self, agent_id: &AgentId) -> Result<MockTxn, MockErr> {
472 Ok(MockTxn(agent_id.clone()))
473 }
474
475 fn append_claim(&self, _: &mut MockTxn, claim: &Claim) -> Result<ClaimRef, MockErr> {
476 self.claims.lock().unwrap().push(claim.clone());
477 Ok(claim.claim_ref().clone())
478 }
479
480 fn append_validity_assertion(
481 &self,
482 _: &mut MockTxn,
483 a: &ValidityAssertion,
484 ) -> Result<(), MockErr> {
485 self.validity_assertions.lock().unwrap().push(a.clone());
486 Ok(())
487 }
488
489 fn append_ledger_entry(
490 &self,
491 _: &mut MockTxn,
492 e: &LedgerEntry,
493 ) -> Result<(), MockErr> {
494 let mut count = self.ledger_write_count.lock().unwrap();
495 *count += 1;
496 let fail_on = *self.fail_on_ledger_write.lock().unwrap();
497 if fail_on == Some(*count) {
498 return Err(MockErr);
499 }
500 self.ledger.lock().unwrap().push(e.clone());
501 Ok(())
502 }
503
504 fn append_claim_edge(&self, _: &mut MockTxn, _: &ClaimEdge) -> Result<(), MockErr> {
505 Ok(())
506 }
507
508 fn commit(&self, _: MockTxn) -> Result<(), MockErr> { Ok(()) }
509
510 fn rollback(&self, _: MockTxn) -> Result<(), MockErr> {
511 *self.rollback_called.lock().unwrap() = true;
512 Ok(())
513 }
514
515 fn load_subject_line(&self, _: &AgentId, _: &str, _: &str) -> Result<Vec<Claim>, MockErr> {
516 Ok(self.claims.lock().unwrap().clone())
517 }
518
519 fn load_claim(&self, _: &AgentId, r: &ClaimRef) -> Result<Option<Claim>, MockErr> {
520 Ok(self.claims.lock().unwrap().iter().find(|c| c.claim_ref() == r).cloned())
521 }
522
523 fn load_validity_assertions_for(&self, _: &AgentId, _: &ClaimRef) -> Result<Vec<ValidityAssertion>, MockErr> {
524 Ok(vec![])
525 }
526
527 fn load_ledger(&self, _: &AgentId, _: Option<&TransactionTime>, _: usize) -> Result<Vec<LedgerEntry>, MockErr> {
528 Ok(self.ledger.lock().unwrap().clone())
529 }
530
531 fn load_ledger_for_claims(&self, _: &AgentId, _refs: &[ClaimRef]) -> Result<Vec<LedgerEntry>, MockErr> {
532 Ok(vec![])
533 }
534
535 fn load_edges_for(&self, _: &AgentId, _: &ClaimRef) -> Result<Vec<ClaimEdge>, MockErr> {
536 Ok(vec![])
537 }
538
539 fn load_injected_claims(&self, _: &AgentId) -> Result<Vec<ClaimRef>, MockErr> { Ok(vec![]) }
540
541 fn load_lineage(&self, _: &AgentId, _: &ClaimRef) -> Result<Vec<ClaimEdge>, MockErr> { Ok(vec![]) }
542 }
543
544 #[derive(Default)]
547 struct MockPendingStore {
548 rows: Mutex<Vec<PendingAdjudicationRow>>,
549 }
550
551 impl MockPendingStore {
552 fn seed(&self, row: PendingAdjudicationRow) {
553 self.rows.lock().unwrap().push(row);
554 }
555
556 fn is_resolved(&self, handle_id: uuid::Uuid) -> bool {
557 self.rows.lock().unwrap().iter()
558 .any(|r| r.handle_id == handle_id && r.status == "resolved")
559 }
560 }
561
562 impl PendingAdjudicationPort for MockPendingStore {
563 type Error = MockErr;
564
565 fn insert_pending(&self, row: &PendingAdjudicationRow) -> Result<(), MockErr> {
566 self.rows.lock().unwrap().push(row.clone());
567 Ok(())
568 }
569
570 fn get_pending(&self, handle_id: uuid::Uuid) -> Result<Option<PendingAdjudicationRow>, MockErr> {
571 Ok(self.rows.lock().unwrap().iter().find(|r| r.handle_id == handle_id).cloned())
572 }
573
574 fn list_pending(&self, agent_id: Option<&AgentId>) -> Result<Vec<PendingAdjudicationRow>, MockErr> {
575 Ok(self.rows.lock().unwrap().iter()
576 .filter(|r| agent_id.is_none_or(|a| r.agent_id == *a) && r.status == "pending")
577 .cloned()
578 .collect())
579 }
580
581 fn list_expired(&self, now: chrono::DateTime<Utc>) -> Result<Vec<PendingAdjudicationRow>, MockErr> {
582 Ok(self.rows.lock().unwrap().iter()
583 .filter(|r| r.status == "pending" && r.expires_at.is_some_and(|e| e <= now))
584 .cloned()
585 .collect())
586 }
587
588 fn mark_resolved(&self, handle_id: uuid::Uuid) -> Result<(), MockErr> {
589 for r in self.rows.lock().unwrap().iter_mut() {
590 if r.handle_id == handle_id {
591 r.status = "resolved".to_string();
592 }
593 }
594 Ok(())
595 }
596
597 fn mark_expired(&self, handle_id: uuid::Uuid) -> Result<(), MockErr> {
598 for r in self.rows.lock().unwrap().iter_mut() {
599 if r.handle_id == handle_id {
600 r.status = "expired".to_string();
601 }
602 }
603 Ok(())
604 }
605
606 fn list_queued_orphan_claims(&self) -> Result<Vec<crate::ports::pending_adjudication::OrphanedQueuedClaim>, MockErr> {
607 Ok(vec![])
608 }
609 }
610
611 fn make_agent() -> AgentId { AgentId("test-agent".into()) }
614
615 fn make_claim(agent: &AgentId) -> Claim {
616 Claim::new(
617 ClaimRef::new_random(),
618 agent.clone(),
619 Fact { subject: "user".into(), predicate: "city".into(), value: serde_json::json!("Berlin") },
620 Cardinality::Functional,
621 ProvenanceLabel::External(ExternalKind::UserAsserted),
622 ExternalAnchor { nearest_external_anchor: None, derivation_depth: 0 },
623 TransactionTime(Utc::now()),
624 ValidTime { start: None, end: None, valid_time_confidence: 0.0 },
625 Confidence { value_confidence: 0.9, valid_time_confidence: 0.0 },
626 Criticality::Medium,
627 vec![],
628 None,
629 None,
630 )
631 }
632
633 fn make_dummy_adj_request(agent: &AgentId) -> AdjudicationRequest {
634 AdjudicationRequest {
635 subject_line: mempill_types::SubjectLineRef {
636 agent_id: agent.clone(),
637 subject: "user".into(),
638 predicate: "city".into(),
639 },
640 incumbent: mempill_types::Belief {
641 claim_ref: ClaimRef::new_random(),
642 fact: Fact { subject: "user".into(), predicate: "city".into(), value: serde_json::json!("Berlin") },
643 provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
644 valid_time: ValidTime { start: None, end: None, valid_time_confidence: 0.0 },
645 transaction_time: TransactionTime(Utc::now()),
646 confidence: Confidence { value_confidence: 0.9, valid_time_confidence: 0.0 },
647 currency_signal: CurrencySignal {
648 last_refreshed_at: TransactionTime(Utc::now()),
649 state: CurrencyState::Fresh,
650 corroboration_count: 0,
651 },
652 criticality: Criticality::Medium,
653 },
654 challenger: make_claim(agent),
655 criticality: Criticality::Medium,
656 reason: mempill_types::OverturnReason::ExternalContradiction,
657 }
658 }
659
660 fn setup_queued_scenario(
662 store: &MockStore,
663 pending: &MockPendingStore,
664 handle_id: uuid::Uuid,
665 ) -> (ClaimRef, ClaimRef) {
666 let agent = make_agent();
667 let challenger = make_claim(&agent);
668 let incumbent = make_claim(&agent);
669 let now = Utc::now();
670
671 store.ledger.lock().unwrap().push(LedgerEntry {
674 entry_id: uuid::Uuid::new_v4(),
675 agent_id: agent.clone(),
676 claim_ref: challenger.claim_ref().clone(),
677 event_kind: LedgerEventKind::ClaimCommitted,
678 disposition: Disposition::QueuedForAdjudication,
679 rationale: None,
680 recorded_at: TransactionTime(now - chrono::Duration::seconds(5)),
681 });
682 store.ledger.lock().unwrap().push(LedgerEntry {
683 entry_id: uuid::Uuid::new_v4(),
684 agent_id: agent.clone(),
685 claim_ref: incumbent.claim_ref().clone(),
686 event_kind: LedgerEventKind::ClaimCommitted,
687 disposition: Disposition::CommittedCheap, rationale: None,
689 recorded_at: TransactionTime(now - chrono::Duration::seconds(10)),
690 });
691
692 pending.seed(PendingAdjudicationRow {
694 handle_id,
695 agent_id: agent.clone(),
696 subject: "user".into(),
697 predicate: "city".into(),
698 challenger_claim_ref: challenger.claim_ref().clone(),
699 incumbent_claim_ref: incumbent.claim_ref().clone(),
700 request_payload: make_dummy_adj_request(&agent),
701 queued_at: now - chrono::Duration::seconds(10),
702 expires_at: None,
703 status: "pending".to_string(),
704 });
705
706 (challenger.claim_ref().clone(), incumbent.claim_ref().clone())
707 }
708
709 fn build_use_case(
710 store: Arc<MockStore>,
711 pending: Arc<MockPendingStore>,
712 ) -> SubmitAdjudicationUseCase<MockStore> {
713 let erased: Arc<dyn ErasedPendingStore> =
714 Arc::new(ErasedPendingStoreAdapter::new({
715 struct Delegate(Arc<MockPendingStore>);
716 impl PendingAdjudicationPort for Delegate {
717 type Error = MockErr;
718 fn insert_pending(&self, r: &PendingAdjudicationRow) -> Result<(), MockErr> { self.0.insert_pending(r) }
719 fn get_pending(&self, h: uuid::Uuid) -> Result<Option<PendingAdjudicationRow>, MockErr> { self.0.get_pending(h) }
720 fn list_pending(&self, a: Option<&AgentId>) -> Result<Vec<PendingAdjudicationRow>, MockErr> { self.0.list_pending(a) }
721 fn list_expired(&self, n: chrono::DateTime<Utc>) -> Result<Vec<PendingAdjudicationRow>, MockErr> { self.0.list_expired(n) }
722 fn mark_resolved(&self, h: uuid::Uuid) -> Result<(), MockErr> { self.0.mark_resolved(h) }
723 fn mark_expired(&self, h: uuid::Uuid) -> Result<(), MockErr> { self.0.mark_expired(h) }
724 fn list_queued_orphan_claims(&self) -> Result<Vec<crate::ports::pending_adjudication::OrphanedQueuedClaim>, MockErr> { self.0.list_queued_orphan_claims() }
725 }
726 Delegate(Arc::clone(&pending))
727 }));
728 SubmitAdjudicationUseCase::new(store, erased)
729 }
730
731 #[test]
734 fn unknown_handle_returns_handle_not_found() {
735 let store = Arc::new(MockStore::default());
736 let pending = Arc::new(MockPendingStore::default());
737 let uc = build_use_case(Arc::clone(&store), Arc::clone(&pending));
738
739 let response = AdjudicationResponse {
740 handle_id: uuid::Uuid::new_v4(),
741 verdict: AdjudicationVerdict::Affirm,
742 evidence_provenance: ProvenanceLabel::External(ExternalKind::ExternalFirstHand),
743 };
744 let result = uc.execute(response.handle_id, response, Utc::now());
745 assert!(matches!(result, Err(MemError::AdjudicationHandleNotFound { .. })));
746 }
747
748 #[test]
751 fn affirm_challenger_committed_cheap_incumbent_superseded_two_ledger_entries() {
752 let store = Arc::new(MockStore::default());
753 let pending = Arc::new(MockPendingStore::default());
754 let handle_id = uuid::Uuid::new_v4();
755 let (challenger_ref, incumbent_ref) =
756 setup_queued_scenario(&store, &pending, handle_id);
757
758 let uc = build_use_case(Arc::clone(&store), Arc::clone(&pending));
759 let response = AdjudicationResponse {
760 handle_id,
761 verdict: AdjudicationVerdict::Affirm,
762 evidence_provenance: ProvenanceLabel::External(ExternalKind::ExternalFirstHand),
763 };
764 let outcome = uc.execute(handle_id, response, Utc::now()).unwrap();
765
766 assert_eq!(outcome.handle_id, handle_id);
767 assert_eq!(outcome.disposition, Disposition::CommittedCheap);
768 assert_eq!(outcome.claim_ref, challenger_ref);
769
770 let ledger = store.ledger.lock().unwrap();
771 let resolution_entries: Vec<_> = ledger.iter()
773 .filter(|e| e.event_kind == LedgerEventKind::AdjudicationResolved
774 || e.event_kind == LedgerEventKind::ValidityAsserted)
775 .collect();
776 assert_eq!(resolution_entries.len(), 2, "Affirm must write exactly 2 ledger entries");
777
778 let challenger_entry = resolution_entries.iter()
780 .find(|e| e.claim_ref == challenger_ref && e.event_kind == LedgerEventKind::AdjudicationResolved)
781 .expect("challenger AdjudicationResolved entry must exist");
782 assert_eq!(challenger_entry.disposition, Disposition::CommittedCheap);
783
784 let incumbent_entry = resolution_entries.iter()
786 .find(|e| e.claim_ref == incumbent_ref)
787 .expect("incumbent ValidityAsserted entry must exist");
788 assert_eq!(incumbent_entry.disposition, Disposition::Superseded);
789
790 let assertions = store.validity_assertions.lock().unwrap();
792 assert_eq!(assertions.len(), 1, "one Bound assertion for incumbent");
793 assert_eq!(assertions[0].target_claim, incumbent_ref);
794
795 assert!(pending.is_resolved(handle_id), "pending row must be resolved");
797 }
798
799 #[test]
802 fn affirm_challenger_entry_has_external_provenance_in_rationale() {
803 let store = Arc::new(MockStore::default());
804 let pending = Arc::new(MockPendingStore::default());
805 let handle_id = uuid::Uuid::new_v4();
806 setup_queued_scenario(&store, &pending, handle_id);
807
808 let uc = build_use_case(Arc::clone(&store), Arc::clone(&pending));
809 let evidence = ProvenanceLabel::External(ExternalKind::ExternalFirstHand);
810 let response = AdjudicationResponse {
811 handle_id,
812 verdict: AdjudicationVerdict::Affirm,
813 evidence_provenance: evidence.clone(),
814 };
815 uc.execute(handle_id, response, Utc::now()).unwrap();
816
817 let ledger = store.ledger.lock().unwrap();
818 let affirm_entry = ledger.iter()
819 .find(|e| e.event_kind == LedgerEventKind::AdjudicationResolved
820 && e.disposition == Disposition::CommittedCheap)
821 .expect("affirm ledger entry must exist");
822 let rationale = affirm_entry.rationale.as_ref().expect("rationale must be present");
823 let rationale_str = rationale.to_string();
824 assert!(rationale_str.contains("Affirm"), "rationale must mention Affirm verdict");
825 assert!(rationale_str.contains("ExternalFirstHand"), "rationale must include evidence provenance");
826 }
827
828 #[test]
831 fn deny_challenger_superseded_one_ledger_entry() {
832 let store = Arc::new(MockStore::default());
833 let pending = Arc::new(MockPendingStore::default());
834 let handle_id = uuid::Uuid::new_v4();
835 let (challenger_ref, incumbent_ref) =
836 setup_queued_scenario(&store, &pending, handle_id);
837
838 let uc = build_use_case(Arc::clone(&store), Arc::clone(&pending));
839 let response = AdjudicationResponse {
840 handle_id,
841 verdict: AdjudicationVerdict::Deny,
842 evidence_provenance: ProvenanceLabel::External(ExternalKind::ExternalFirstHand),
843 };
844 let outcome = uc.execute(handle_id, response, Utc::now()).unwrap();
845
846 assert_eq!(outcome.disposition, Disposition::Superseded);
847 assert_eq!(outcome.claim_ref, challenger_ref);
848
849 let ledger = store.ledger.lock().unwrap();
850 let resolution_entries: Vec<_> = ledger.iter()
851 .filter(|e| e.event_kind == LedgerEventKind::ValidityAsserted)
852 .collect();
853 assert_eq!(resolution_entries.len(), 1, "Deny must write exactly 1 ValidityAsserted entry");
854 assert_eq!(resolution_entries[0].claim_ref, challenger_ref);
855 assert_eq!(resolution_entries[0].disposition, Disposition::Superseded);
856
857 let assertions = store.validity_assertions.lock().unwrap();
859 assert_eq!(assertions.len(), 1, "one Bound assertion for challenger");
860 assert_eq!(assertions[0].target_claim, challenger_ref);
861
862 let incumbent_resolution = ledger.iter()
864 .filter(|e| e.claim_ref == incumbent_ref
865 && (e.event_kind == LedgerEventKind::AdjudicationResolved
866 || e.event_kind == LedgerEventKind::ValidityAsserted))
867 .count();
868 assert_eq!(incumbent_resolution, 0, "Deny must not touch the incumbent");
869
870 assert!(pending.is_resolved(handle_id));
871 }
872
873 #[test]
876 fn unknown_both_contested_two_ledger_entries_no_bound_assertion() {
877 let store = Arc::new(MockStore::default());
878 let pending = Arc::new(MockPendingStore::default());
879 let handle_id = uuid::Uuid::new_v4();
880 let (challenger_ref, incumbent_ref) =
881 setup_queued_scenario(&store, &pending, handle_id);
882
883 let uc = build_use_case(Arc::clone(&store), Arc::clone(&pending));
884 let response = AdjudicationResponse {
885 handle_id,
886 verdict: AdjudicationVerdict::Unknown,
887 evidence_provenance: ProvenanceLabel::External(ExternalKind::ExternalFirstHand),
888 };
889 let outcome = uc.execute(handle_id, response, Utc::now()).unwrap();
890
891 assert_eq!(outcome.disposition, Disposition::Contested);
892 assert_eq!(outcome.claim_ref, challenger_ref);
893
894 let ledger = store.ledger.lock().unwrap();
895 let abstain_entries: Vec<_> = ledger.iter()
896 .filter(|e| e.event_kind == LedgerEventKind::AdjudicationResolved)
897 .collect();
898 assert_eq!(abstain_entries.len(), 2, "Unknown must write 2 AdjudicationResolved entries (one per claim)");
899
900 let ch_entry = abstain_entries.iter().find(|e| e.claim_ref == challenger_ref).unwrap();
901 let inc_entry = abstain_entries.iter().find(|e| e.claim_ref == incumbent_ref).unwrap();
902 assert_eq!(ch_entry.disposition, Disposition::Contested);
903 assert_eq!(inc_entry.disposition, Disposition::Contested);
904
905 let assertions = store.validity_assertions.lock().unwrap();
907 assert_eq!(assertions.len(), 0, "Unknown must not write any Bound assertions");
908
909 assert!(pending.is_resolved(handle_id));
910 }
911
912 #[test]
915 fn duplicate_submit_returns_handle_not_found() {
916 let store = Arc::new(MockStore::default());
917 let pending = Arc::new(MockPendingStore::default());
918 let handle_id = uuid::Uuid::new_v4();
919 setup_queued_scenario(&store, &pending, handle_id);
920
921 let uc = build_use_case(Arc::clone(&store), Arc::clone(&pending));
922 let mk_response = || AdjudicationResponse {
923 handle_id,
924 verdict: AdjudicationVerdict::Deny,
925 evidence_provenance: ProvenanceLabel::External(ExternalKind::ExternalFirstHand),
926 };
927
928 uc.execute(handle_id, mk_response(), Utc::now()).unwrap();
930
931 let result = uc.execute(handle_id, mk_response(), Utc::now());
934 assert!(
935 matches!(result, Err(MemError::AdjudicationHandleNotFound { .. })),
936 "duplicate submit must return AdjudicationHandleNotFound"
937 );
938 }
939
940 #[test]
947 fn stale_challenger_not_queued_returns_handle_not_found() {
948 let store = Arc::new(MockStore::default());
949 let pending = Arc::new(MockPendingStore::default());
950 let handle_id = uuid::Uuid::new_v4();
951 let agent = make_agent();
952 let challenger = make_claim(&agent);
953 let incumbent = make_claim(&agent);
954 let now = Utc::now();
955
956 store.ledger.lock().unwrap().push(LedgerEntry {
958 entry_id: uuid::Uuid::new_v4(),
959 agent_id: agent.clone(),
960 claim_ref: challenger.claim_ref().clone(),
961 event_kind: LedgerEventKind::ClaimCommitted,
962 disposition: Disposition::CommittedCheap, rationale: None,
964 recorded_at: TransactionTime(now),
965 });
966 store.ledger.lock().unwrap().push(LedgerEntry {
968 entry_id: uuid::Uuid::new_v4(),
969 agent_id: agent.clone(),
970 claim_ref: incumbent.claim_ref().clone(),
971 event_kind: LedgerEventKind::ClaimCommitted,
972 disposition: Disposition::CommittedCheap,
973 rationale: None,
974 recorded_at: TransactionTime(now),
975 });
976
977 pending.seed(PendingAdjudicationRow {
978 handle_id,
979 agent_id: agent.clone(),
980 subject: "user".into(),
981 predicate: "city".into(),
982 challenger_claim_ref: challenger.claim_ref().clone(),
983 incumbent_claim_ref: incumbent.claim_ref().clone(),
984 request_payload: make_dummy_adj_request(&agent),
985 queued_at: now,
986 expires_at: None,
987 status: "pending".to_string(),
988 });
989
990 let uc = build_use_case(Arc::clone(&store), Arc::clone(&pending));
991 let response = AdjudicationResponse {
992 handle_id,
993 verdict: AdjudicationVerdict::Affirm,
994 evidence_provenance: ProvenanceLabel::External(ExternalKind::ExternalFirstHand),
995 };
996 let result = uc.execute(handle_id, response, Utc::now());
997 assert!(
998 matches!(result, Err(MemError::AdjudicationHandleNotFound { .. })),
999 "stale challenger (not QueuedForAdjudication) must return AdjudicationHandleNotFound"
1000 );
1001 }
1002
1003 #[test]
1006 fn expired_handle_returns_handle_not_found() {
1007 let store = Arc::new(MockStore::default());
1008 let pending = Arc::new(MockPendingStore::default());
1009 let handle_id = uuid::Uuid::new_v4();
1010 let agent = make_agent();
1011 let challenger = make_claim(&agent);
1012 let incumbent = make_claim(&agent);
1013 let past = Utc::now() - chrono::Duration::hours(2);
1014
1015 pending.seed(PendingAdjudicationRow {
1017 handle_id,
1018 agent_id: agent.clone(),
1019 subject: "user".into(),
1020 predicate: "city".into(),
1021 challenger_claim_ref: challenger.claim_ref().clone(),
1022 incumbent_claim_ref: incumbent.claim_ref().clone(),
1023 request_payload: make_dummy_adj_request(&agent),
1024 queued_at: past - chrono::Duration::hours(1),
1025 expires_at: Some(past), status: "pending".to_string(),
1027 });
1028
1029 let uc = build_use_case(Arc::clone(&store), Arc::clone(&pending));
1030 let response = AdjudicationResponse {
1031 handle_id,
1032 verdict: AdjudicationVerdict::Affirm,
1033 evidence_provenance: ProvenanceLabel::External(ExternalKind::ExternalFirstHand),
1034 };
1035 let result = uc.execute(handle_id, response, Utc::now());
1036 assert!(
1037 matches!(result, Err(MemError::AdjudicationHandleNotFound { .. })),
1038 "expired handle must return AdjudicationHandleNotFound"
1039 );
1040 }
1041
1042 #[test]
1045 fn atomicity_failure_mid_apply_no_partial_state() {
1046 let store = Arc::new(MockStore::default());
1047 let pending = Arc::new(MockPendingStore::default());
1048 let handle_id = uuid::Uuid::new_v4();
1049 setup_queued_scenario(&store, &pending, handle_id);
1050
1051 *store.fail_on_ledger_write.lock().unwrap() = Some(1);
1053
1054 let uc = build_use_case(Arc::clone(&store), Arc::clone(&pending));
1055 let response = AdjudicationResponse {
1056 handle_id,
1057 verdict: AdjudicationVerdict::Affirm,
1058 evidence_provenance: ProvenanceLabel::External(ExternalKind::ExternalFirstHand),
1059 };
1060 let result = uc.execute(handle_id, response, Utc::now());
1061 assert!(result.is_err(), "must propagate the injected failure");
1062
1063 let ledger = store.ledger.lock().unwrap();
1065 let resolution_entries: Vec<_> = ledger.iter()
1066 .filter(|e| e.event_kind == LedgerEventKind::AdjudicationResolved
1067 || e.event_kind == LedgerEventKind::ValidityAsserted)
1068 .collect();
1069 assert_eq!(
1070 resolution_entries.len(), 0,
1071 "no resolution ledger entries must remain after mid-apply failure"
1072 );
1073 assert!(*store.rollback_called.lock().unwrap(), "rollback must be called");
1074 }
1075}