Skip to main content

mempill_core/testing/
oracle_conformance.rs

1//! Generic oracle-resolution conformance suite.
2//!
3//! `run_oracle_conformance` exercises every observable oracle-resolution behavior
4//! and panics on any deviation from the expected contract.
5//!
6//! Both `mempill-sqlite` and `mempill-postgres` activate `mempill-core/test-support`
7//! in dev-dependencies and call the exported scenario functions to verify that the
8//! SAME assertions pass on SQLite (in-memory + file-backed) and PG 16 + PG 18.
9//!
10//! # Design
11//!
12//! The harness is built from standalone async scenario functions.  Each function
13//! receives either:
14//! - A reference to an already-constructed `EngineHandle` (for most scenarios), OR
15//! - A pair of factory closures (for the reopen / durability scenario which drops the
16//!   first engine and opens a second one over the same durable backing store).
17//!
18//! Scenarios use a dedicated `AgentId` per-scenario so they are safe to run
19//! sequentially against a shared store without cross-contamination.
20//!
21//! # Scenario catalogue
22//!
23//! | Sub-test | Function |
24//! |----------|----------|
25//! | 1  | `scenario_affirm_challenger_wins` |
26//! | 2  | `scenario_deny_incumbent_stands` |
27//! | 3  | `scenario_unknown_stays_contested` |
28//! | 4  | `scenario_queued_surfaces_contested` |
29//! | 5  | `scenario_stale_handle_not_found` |
30//! | 6  | `scenario_duplicate_submit_not_found` |
31//! | 7  | `scenario_ttl_expiry_reverts_contested` |
32//! | 8a | `scenario_sweep_reverts_expired` |
33//! | 8b | `scenario_sweep_recovers_orphan` |
34//! | 9  | `scenario_durable_store_survives_reopen` |
35//! | 10 | `scenario_atomicity_no_torn_write` |
36//! | 11 | `scenario_ledger_entry_expectations` |
37//! | 12 | `scenario_b11_oracle_absent_contested` |
38
39#[cfg(any(test, feature = "test-support"))]
40use std::time::Duration;
41
42#[cfg(any(test, feature = "test-support"))]
43use mempill_types::{
44    AdjudicationResponse, AdjudicationVerdict, AgentId, BeliefStatus, Cardinality, Confidence,
45    Criticality, Disposition, ExternalKind, LedgerEventKind, ProvenanceLabel,
46};
47
48#[cfg(any(test, feature = "test-support"))]
49use crate::{
50    application::{AuditQueryRequest, IngestClaimRequest, QueryMemoryRequest},
51    ports::OraclePort,
52    EngineConfig, EngineHandle,
53};
54
55// ── Internal TestOracle ───────────────────────────────────────────────────────
56
57/// Deterministic oracle that always returns the caller-supplied `fixed_uuid` as the
58/// adjudication handle.  `handle_to_uuid` is the identity function on `uuid::Uuid`.
59///
60/// Both adapter test files use this type (via the re-exported `build_oracle_engine` helper)
61/// so that both adapters exercise **identical oracle behavior**.
62#[cfg(any(test, feature = "test-support"))]
63pub struct TestOracle {
64    /// The UUID returned by every `request_adjudication` call, allowing callers to predict
65    /// the handle before submitting an adjudication response.
66    pub fixed_uuid: uuid::Uuid,
67}
68
69#[cfg(any(test, feature = "test-support"))]
70impl OraclePort for TestOracle {
71    type Error = crate::noop::NoOpError;
72    type Handle = uuid::Uuid;
73
74    fn request_adjudication(
75        &self,
76        _agent_id: &AgentId,
77        _request: mempill_types::AdjudicationRequest,
78    ) -> Result<Self::Handle, Self::Error> {
79        Ok(self.fixed_uuid)
80    }
81
82    fn handle_to_uuid(handle: &Self::Handle) -> uuid::Uuid {
83        *handle
84    }
85}
86
87// ── Common request builders ───────────────────────────────────────────────────
88
89/// Builds a minimal `IngestClaimRequest` for `agent` with the given `value`.
90/// All other fields are set to conformance-stable defaults (External/UserAsserted, Functional, High).
91#[cfg(any(test, feature = "test-support"))]
92pub fn ingest_req(agent: &AgentId, value: &str) -> IngestClaimRequest {
93    IngestClaimRequest {
94        agent_id: agent.clone(),
95        subject: "subject".into(),
96        predicate: "predicate".into(),
97        value: serde_json::json!(value),
98        provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
99        cardinality: Cardinality::Functional,
100        valid_time: None,
101        confidence: Confidence { value_confidence: 0.95, valid_time_confidence: 0.0 },
102        criticality: Criticality::High,
103        derived_from: vec![],
104    }
105}
106
107/// Builds a `QueryMemoryRequest` targeting the fixed `(subject, predicate)` pair used by all
108/// conformance scenarios, scoped to `agent`.
109#[cfg(any(test, feature = "test-support"))]
110pub fn query_req(agent: &AgentId) -> QueryMemoryRequest {
111    QueryMemoryRequest {
112        agent_id: agent.clone(),
113        subject: "subject".into(),
114        predicate: "predicate".into(),
115        as_of_tx_time: None,
116    }
117}
118
119#[cfg(any(test, feature = "test-support"))]
120fn adj_response(
121    handle_id: uuid::Uuid,
122    verdict: AdjudicationVerdict,
123) -> AdjudicationResponse {
124    AdjudicationResponse {
125        handle_id,
126        verdict,
127        evidence_provenance: ProvenanceLabel::External(ExternalKind::ExternalFirstHand),
128    }
129}
130
131// ── Scenario 1: Affirm — challenger wins ─────────────────────────────────────
132
133/// Scenario 1 (Affirm): challenger CommittedCheap, incumbent Superseded,
134/// ledger entry has External provenance, query_memory surfaces challenger.
135///
136/// Callers pass `handle_id` matching the UUID used when building the engine's `TestOracle`.
137#[cfg(any(test, feature = "test-support"))]
138#[cfg(any(test, feature = "test-support"))]
139pub async fn scenario_affirm_challenger_wins_with_handle<P, O, V>(
140    engine: &EngineHandle<P, O, V>,
141    handle_id: uuid::Uuid,
142) where
143    P: crate::ports::PersistencePort + Send + Sync + 'static,
144    P::Error: std::fmt::Debug,
145    O: OraclePort + Send + Sync + 'static,
146    V: crate::ports::VectorPort + Send + Sync + 'static,
147{
148    let agent = AgentId("conformance-affirm-agent".into());
149
150    let resp_inc = engine.ingest_claim(ingest_req(&agent, "incumbent-value")).await
151        .expect("conformance[affirm]: ingest incumbent must succeed");
152    assert_eq!(resp_inc.disposition, Disposition::CommittedCheap,
153        "conformance[affirm]: incumbent must be CommittedCheap");
154
155    let resp_ch = engine.ingest_claim(ingest_req(&agent, "challenger-value")).await
156        .expect("conformance[affirm]: ingest challenger must succeed");
157    assert_eq!(resp_ch.disposition, Disposition::QueuedForAdjudication,
158        "conformance[affirm]: challenger with oracle present must be QueuedForAdjudication");
159
160    let challenger_ref = resp_ch.claim_ref.clone();
161    let incumbent_ref = resp_inc.claim_ref.clone();
162
163    // Submit Affirm.
164    let outcome = engine.submit_adjudication(
165        handle_id,
166        adj_response(handle_id, AdjudicationVerdict::Affirm),
167    ).await.expect("conformance[affirm]: Affirm submit must succeed");
168
169    assert_eq!(outcome.disposition, Disposition::CommittedCheap,
170        "conformance[affirm]: challenger must be CommittedCheap after Affirm");
171    assert_eq!(outcome.claim_ref, challenger_ref,
172        "conformance[affirm]: outcome.claim_ref must be challenger");
173
174    // query_memory must surface challenger.
175    let qr = engine.query_memory(query_req(&agent)).await
176        .expect("conformance[affirm]: query must succeed");
177    let primary_val = qr.belief.primary.as_ref().map(|b| b.fact.value.clone());
178    assert_ne!(qr.belief.status, BeliefStatus::Contested,
179        "conformance[affirm]: must NOT be Contested after Affirm");
180    assert_ne!(qr.belief.status, BeliefStatus::NoBelief,
181        "conformance[affirm]: must NOT be NoBelief after Affirm");
182    assert_eq!(primary_val, Some(serde_json::json!("challenger-value")),
183        "conformance[affirm]: challenger must be surfaced as primary belief");
184
185    // Ledger must have AdjudicationResolved + External provenance for challenger.
186    let audit = engine.query_audit(AuditQueryRequest {
187        agent_id: agent.clone(),
188        claim_ref: None,
189        from_tx_time: None,
190        limit: 100,
191    }).await.expect("conformance[affirm]: audit must succeed");
192
193    let ch_entry = audit.entries.iter()
194        .find(|e| e.claim_ref == challenger_ref && e.event_kind == LedgerEventKind::AdjudicationResolved)
195        .expect("conformance[affirm]: AdjudicationResolved entry for challenger must exist");
196    assert_eq!(ch_entry.disposition, Disposition::CommittedCheap,
197        "conformance[affirm]: ledger entry disposition must be CommittedCheap");
198    let rationale = ch_entry.rationale.as_ref().map(|r| r.to_string()).unwrap_or_default();
199    assert!(rationale.contains("ExternalFirstHand"),
200        "conformance[affirm]: Affirm rationale must contain ExternalFirstHand provenance");
201
202    // Incumbent must have a Superseded entry (written during ingest heavy-path).
203    let inc_entry = audit.entries.iter()
204        .find(|e| e.claim_ref == incumbent_ref && e.disposition == Disposition::Superseded)
205        .expect("conformance[affirm]: incumbent Superseded entry must exist");
206    assert_eq!(inc_entry.disposition, Disposition::Superseded,
207        "conformance[affirm]: incumbent must be Superseded");
208}
209
210// ── Scenario 2: Deny — incumbent stands ──────────────────────────────────────
211
212/// Scenario 2 (Deny): after `AdjudicationVerdict::Deny` the incumbent remains the primary belief
213/// and the challenger is Superseded.  Verifies that `query_memory` surfaces the incumbent value.
214#[cfg(any(test, feature = "test-support"))]
215pub async fn scenario_deny_incumbent_stands<P, O, V>(
216    engine: &EngineHandle<P, O, V>,
217    handle_id: uuid::Uuid,
218) where
219    P: crate::ports::PersistencePort + Send + Sync + 'static,
220    P::Error: std::fmt::Debug,
221    O: OraclePort + Send + Sync + 'static,
222    V: crate::ports::VectorPort + Send + Sync + 'static,
223{
224    let agent = AgentId("conformance-deny-agent".into());
225
226    let resp_inc = engine.ingest_claim(ingest_req(&agent, "incumbent-deny")).await
227        .expect("conformance[deny]: ingest incumbent");
228    assert_eq!(resp_inc.disposition, Disposition::CommittedCheap);
229
230    let resp_ch = engine.ingest_claim(ingest_req(&agent, "challenger-deny")).await
231        .expect("conformance[deny]: ingest challenger");
232    assert_eq!(resp_ch.disposition, Disposition::QueuedForAdjudication);
233
234    let challenger_ref = resp_ch.claim_ref.clone();
235
236    let outcome = engine.submit_adjudication(
237        handle_id,
238        adj_response(handle_id, AdjudicationVerdict::Deny),
239    ).await.expect("conformance[deny]: Deny submit must succeed");
240
241    assert_eq!(outcome.disposition, Disposition::Superseded,
242        "conformance[deny]: challenger must be Superseded after Deny");
243    assert_eq!(outcome.claim_ref, challenger_ref,
244        "conformance[deny]: outcome.claim_ref must be challenger");
245
246    // query_memory must surface incumbent.
247    let qr = engine.query_memory(query_req(&agent)).await
248        .expect("conformance[deny]: query must succeed");
249    let primary_val = qr.belief.primary.as_ref().map(|b| b.fact.value.clone());
250    assert_ne!(qr.belief.status, BeliefStatus::Contested,
251        "conformance[deny]: must NOT be Contested after Deny");
252    assert_ne!(qr.belief.status, BeliefStatus::NoBelief,
253        "conformance[deny]: must NOT be NoBelief after Deny");
254    assert_eq!(primary_val, Some(serde_json::json!("incumbent-deny")),
255        "conformance[deny]: incumbent must be surfaced after Deny");
256}
257
258// ── Scenario 3: Unknown — stays Contested ────────────────────────────────────
259
260/// Scenario 3 (Unknown): `AdjudicationVerdict::Unknown` leaves both claims in `Contested` state.
261/// Verifies that a second submit on the now-consumed handle returns `AdjudicationHandleNotFound`.
262#[cfg(any(test, feature = "test-support"))]
263pub async fn scenario_unknown_stays_contested<P, O, V>(
264    engine: &EngineHandle<P, O, V>,
265    handle_id: uuid::Uuid,
266) where
267    P: crate::ports::PersistencePort + Send + Sync + 'static,
268    P::Error: std::fmt::Debug,
269    O: OraclePort + Send + Sync + 'static,
270    V: crate::ports::VectorPort + Send + Sync + 'static,
271{
272    let agent = AgentId("conformance-unknown-agent".into());
273
274    let resp_inc = engine.ingest_claim(ingest_req(&agent, "incumbent-unknown")).await
275        .expect("conformance[unknown]: ingest incumbent");
276    let incumbent_ref = resp_inc.claim_ref.clone();
277    assert_eq!(resp_inc.disposition, Disposition::CommittedCheap);
278
279    let resp_ch = engine.ingest_claim(ingest_req(&agent, "challenger-unknown")).await
280        .expect("conformance[unknown]: ingest challenger");
281    let challenger_ref = resp_ch.claim_ref.clone();
282    assert_eq!(resp_ch.disposition, Disposition::QueuedForAdjudication);
283
284    let outcome = engine.submit_adjudication(
285        handle_id,
286        adj_response(handle_id, AdjudicationVerdict::Unknown),
287    ).await.expect("conformance[unknown]: Unknown submit must succeed");
288
289    assert_eq!(outcome.disposition, Disposition::Contested,
290        "conformance[unknown]: outcome must be Contested after Unknown");
291    assert_eq!(outcome.claim_ref, challenger_ref);
292
293    // query_memory must surface Contested[both].
294    let qr = engine.query_memory(query_req(&agent)).await
295        .expect("conformance[unknown]: query must succeed");
296    assert_eq!(qr.belief.status, BeliefStatus::Contested,
297        "conformance[unknown]: must be Contested after Unknown");
298    let all_vals: Vec<_> = qr.belief.primary.iter()
299        .map(|b| b.fact.value.clone())
300        .chain(qr.belief.alternatives.iter().map(|b| b.fact.value.clone()))
301        .collect();
302    assert!(all_vals.contains(&serde_json::json!("incumbent-unknown")),
303        "conformance[unknown]: incumbent must be visible in Contested");
304    assert!(all_vals.contains(&serde_json::json!("challenger-unknown")),
305        "conformance[unknown]: challenger must be visible in Contested");
306
307    // Handle must be consumed — second submit must fail.
308    let second = engine.submit_adjudication(
309        handle_id,
310        adj_response(handle_id, AdjudicationVerdict::Unknown),
311    ).await;
312    assert!(
313        matches!(second, Err(crate::error::MemError::AdjudicationHandleNotFound { .. })),
314        "conformance[unknown]: second submit on consumed handle must be AdjudicationHandleNotFound; got {second:?}"
315    );
316
317    // Audit: 2 AdjudicationResolved entries (one per claim).
318    let audit = engine.query_audit(AuditQueryRequest {
319        agent_id: agent.clone(),
320        claim_ref: None,
321        from_tx_time: None,
322        limit: 100,
323    }).await.expect("conformance[unknown]: audit must succeed");
324    let resolved: Vec<_> = audit.entries.iter()
325        .filter(|e| e.event_kind == LedgerEventKind::AdjudicationResolved)
326        .collect();
327    assert_eq!(resolved.len(), 2,
328        "conformance[unknown]: Unknown must produce 2 AdjudicationResolved entries");
329    let has_inc = resolved.iter().any(|e| e.claim_ref == incumbent_ref && e.disposition == Disposition::Contested);
330    let has_ch  = resolved.iter().any(|e| e.claim_ref == challenger_ref && e.disposition == Disposition::Contested);
331    assert!(has_inc, "conformance[unknown]: incumbent AdjudicationResolved/Contested must exist");
332    assert!(has_ch,  "conformance[unknown]: challenger AdjudicationResolved/Contested must exist");
333}
334
335// ── Scenario 4: Queued — BEFORE submit surfaces Contested ─────────────────────
336
337/// Scenario 4 (Queued): before any adjudication is submitted, `query_memory` must surface
338/// `BeliefStatus::Contested` for both the incumbent and the queued challenger (invariant I7).
339#[cfg(any(test, feature = "test-support"))]
340pub async fn scenario_queued_surfaces_contested<P, O, V>(
341    engine: &EngineHandle<P, O, V>,
342) where
343    P: crate::ports::PersistencePort + Send + Sync + 'static,
344    P::Error: std::fmt::Debug,
345    O: OraclePort + Send + Sync + 'static,
346    V: crate::ports::VectorPort + Send + Sync + 'static,
347{
348    let agent = AgentId("conformance-queued-agent".into());
349
350    let resp_inc = engine.ingest_claim(ingest_req(&agent, "queued-incumbent")).await
351        .expect("conformance[queued]: ingest incumbent");
352    assert_eq!(resp_inc.disposition, Disposition::CommittedCheap);
353
354    let resp_ch = engine.ingest_claim(ingest_req(&agent, "queued-challenger")).await
355        .expect("conformance[queued]: ingest challenger");
356    assert_eq!(resp_ch.disposition, Disposition::QueuedForAdjudication,
357        "conformance[queued]: challenger with oracle must be QueuedForAdjudication");
358
359    // BEFORE submit: query_memory must surface Contested.
360    let qr = engine.query_memory(query_req(&agent)).await
361        .expect("conformance[queued]: query must succeed");
362    assert_eq!(qr.belief.status, BeliefStatus::Contested,
363        "conformance[queued]: BEFORE any submit, belief must be Contested (I7)");
364    let all_vals: Vec<_> = qr.belief.primary.iter()
365        .map(|b| b.fact.value.clone())
366        .chain(qr.belief.alternatives.iter().map(|b| b.fact.value.clone()))
367        .collect();
368    assert!(all_vals.contains(&serde_json::json!("queued-incumbent")),
369        "conformance[queued]: incumbent must be visible in pre-submit Contested");
370    assert!(all_vals.contains(&serde_json::json!("queued-challenger")),
371        "conformance[queued]: challenger must be visible in pre-submit Contested");
372}
373
374// ── Scenario 5: Stale handle → AdjudicationHandleNotFound ────────────────────
375
376/// Scenario 5 (Stale handle): submitting adjudication with a random/unknown UUID must return
377/// `MemError::AdjudicationHandleNotFound`, proving the engine rejects phantom handles.
378#[cfg(any(test, feature = "test-support"))]
379pub async fn scenario_stale_handle_not_found<P, O, V>(
380    engine: &EngineHandle<P, O, V>,
381) where
382    P: crate::ports::PersistencePort + Send + Sync + 'static,
383    P::Error: std::fmt::Debug,
384    O: OraclePort + Send + Sync + 'static,
385    V: crate::ports::VectorPort + Send + Sync + 'static,
386{
387    let random_handle = uuid::Uuid::new_v4();
388    let result = engine.submit_adjudication(
389        random_handle,
390        adj_response(random_handle, AdjudicationVerdict::Affirm),
391    ).await;
392    assert!(
393        matches!(result, Err(crate::error::MemError::AdjudicationHandleNotFound { .. })),
394        "conformance[stale-handle]: random/unknown handle must return AdjudicationHandleNotFound; got {result:?}"
395    );
396}
397
398// ── Scenario 6: Duplicate submit → AdjudicationHandleNotFound ─────────────────
399
400/// Scenario 6 (Duplicate submit): after a successful first submit the handle is consumed;
401/// a second submit with the same `handle_id` must return `MemError::AdjudicationHandleNotFound`.
402#[cfg(any(test, feature = "test-support"))]
403pub async fn scenario_duplicate_submit_not_found<P, O, V>(
404    engine: &EngineHandle<P, O, V>,
405    handle_id: uuid::Uuid,
406) where
407    P: crate::ports::PersistencePort + Send + Sync + 'static,
408    P::Error: std::fmt::Debug,
409    O: OraclePort + Send + Sync + 'static,
410    V: crate::ports::VectorPort + Send + Sync + 'static,
411{
412    let agent = AgentId("conformance-dup-agent".into());
413
414    engine.ingest_claim(ingest_req(&agent, "dup-incumbent")).await
415        .expect("conformance[dup]: ingest incumbent");
416    engine.ingest_claim(ingest_req(&agent, "dup-challenger")).await
417        .expect("conformance[dup]: ingest challenger");
418
419    // First submit succeeds.
420    engine.submit_adjudication(handle_id, adj_response(handle_id, AdjudicationVerdict::Affirm)).await
421        .expect("conformance[dup]: first submit must succeed");
422
423    // Second submit must fail.
424    let second = engine.submit_adjudication(handle_id, adj_response(handle_id, AdjudicationVerdict::Affirm)).await;
425    assert!(
426        matches!(second, Err(crate::error::MemError::AdjudicationHandleNotFound { .. })),
427        "conformance[dup]: duplicate submit must return AdjudicationHandleNotFound; got {second:?}"
428    );
429}
430
431// ── Scenario 7: TTL expiry → AdjudicationHandleNotFound + Contested ──────────
432
433/// TTL expiry via a 1-ns TTL so the row expires immediately.
434/// Caller must supply an engine built with `EngineConfig { default_adjudication_ttl: Some(1ns), .. }`.
435#[cfg(any(test, feature = "test-support"))]
436pub async fn scenario_ttl_expiry_reverts_contested<P, O, V>(
437    engine: &EngineHandle<P, O, V>,
438    handle_id: uuid::Uuid,
439) where
440    P: crate::ports::PersistencePort + Send + Sync + 'static,
441    P::Error: std::fmt::Debug,
442    O: OraclePort + Send + Sync + 'static,
443    V: crate::ports::VectorPort + Send + Sync + 'static,
444{
445    let agent = AgentId("conformance-ttl-agent".into());
446
447    let resp_inc = engine.ingest_claim(ingest_req(&agent, "ttl-incumbent")).await
448        .expect("conformance[ttl]: ingest incumbent");
449    assert_eq!(resp_inc.disposition, Disposition::CommittedCheap);
450
451    let resp_ch = engine.ingest_claim(ingest_req(&agent, "ttl-challenger")).await
452        .expect("conformance[ttl]: ingest challenger");
453    assert_eq!(resp_ch.disposition, Disposition::QueuedForAdjudication);
454
455    // Sleep a tiny bit to ensure the 1-ns TTL has elapsed.
456    tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
457
458    // submit on the expired handle → AdjudicationHandleNotFound.
459    let result = engine.submit_adjudication(
460        handle_id,
461        adj_response(handle_id, AdjudicationVerdict::Affirm),
462    ).await;
463    assert!(
464        matches!(result, Err(crate::error::MemError::AdjudicationHandleNotFound { .. })),
465        "conformance[ttl]: expired handle must return AdjudicationHandleNotFound; got {result:?}"
466    );
467
468    // query_memory must surface Contested[both] after lazy expiry.
469    let qr = engine.query_memory(query_req(&agent)).await
470        .expect("conformance[ttl]: query must succeed");
471    assert_eq!(qr.belief.status, BeliefStatus::Contested,
472        "conformance[ttl]: after TTL expiry, must be Contested");
473    let all_vals: Vec<_> = qr.belief.primary.iter()
474        .map(|b| b.fact.value.clone())
475        .chain(qr.belief.alternatives.iter().map(|b| b.fact.value.clone()))
476        .collect();
477    assert!(all_vals.contains(&serde_json::json!("ttl-incumbent")),
478        "conformance[ttl]: incumbent must be visible in Contested after expiry");
479    assert!(all_vals.contains(&serde_json::json!("ttl-challenger")),
480        "conformance[ttl]: challenger must be visible in Contested after expiry");
481
482    // Audit must contain a TTL/expiry-related ledger entry for the challenger.
483    let audit = engine.query_audit(AuditQueryRequest {
484        agent_id: agent.clone(),
485        claim_ref: None,
486        from_tx_time: None,
487        limit: 100,
488    }).await.expect("conformance[ttl]: audit must succeed");
489    // The engine writes an AdjudicationExpired or AdjudicationResolved entry on expiry.
490    let has_expiry_entry = audit.entries.iter().any(|e| {
491        e.claim_ref == resp_ch.claim_ref
492            && (e.event_kind == LedgerEventKind::AdjudicationExpired
493                || e.disposition == Disposition::Contested)
494    });
495    assert!(has_expiry_entry,
496        "conformance[ttl]: ledger must have an expiry entry for the challenger; entries={:?}",
497        audit.entries.iter().map(|e| (&e.claim_ref, &e.event_kind, &e.disposition)).collect::<Vec<_>>()
498    );
499}
500
501// ── Scenario 8a: Sweep reverts expired ────────────────────────────────────────
502
503/// Sweep test: an already-past TTL row is reverted to Contested by sweep.
504/// Caller must supply an engine built with `default_adjudication_ttl: Some(1ns)`.
505#[cfg(any(test, feature = "test-support"))]
506pub async fn scenario_sweep_reverts_expired<P, O, V>(
507    engine: &EngineHandle<P, O, V>,
508) where
509    P: crate::ports::PersistencePort + Send + Sync + 'static,
510    P::Error: std::fmt::Debug,
511    O: OraclePort + Send + Sync + 'static,
512    V: crate::ports::VectorPort + Send + Sync + 'static,
513{
514    let agent = AgentId("conformance-sweep-exp-agent".into());
515
516    engine.ingest_claim(ingest_req(&agent, "sweep-exp-incumbent")).await
517        .expect("conformance[sweep-exp]: ingest incumbent");
518    let resp_ch = engine.ingest_claim(ingest_req(&agent, "sweep-exp-challenger")).await
519        .expect("conformance[sweep-exp]: ingest challenger");
520    assert_eq!(resp_ch.disposition, Disposition::QueuedForAdjudication);
521
522    tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
523
524    let swept = engine.sweep_expired_adjudications().await
525        .expect("conformance[sweep-exp]: sweep must succeed");
526    assert!(swept >= 1,
527        "conformance[sweep-exp]: sweep must revert at least 1 expired row; got {swept}");
528
529    let qr = engine.query_memory(query_req(&agent)).await
530        .expect("conformance[sweep-exp]: query must succeed");
531    assert_eq!(qr.belief.status, BeliefStatus::Contested,
532        "conformance[sweep-exp]: after sweep, must be Contested");
533    let all_vals: Vec<_> = qr.belief.primary.iter()
534        .map(|b| b.fact.value.clone())
535        .chain(qr.belief.alternatives.iter().map(|b| b.fact.value.clone()))
536        .collect();
537    assert!(all_vals.contains(&serde_json::json!("sweep-exp-incumbent")),
538        "conformance[sweep-exp]: incumbent must be visible after sweep");
539    assert!(all_vals.contains(&serde_json::json!("sweep-exp-challenger")),
540        "conformance[sweep-exp]: challenger must be visible after sweep");
541}
542
543// ── Scenario 8b: Sweep recovers orphan ────────────────────────────────────────
544
545/// Orphan recovery: a QueuedForAdjudication claim with no pending row is reverted by sweep.
546///
547/// The orphan is seeded directly via the persistence port, bypassing the engine ingest path.
548/// The engine passed to this function must have an accessible persistence store.
549/// Because `EngineHandle` does not expose the store, callers must seed the orphan externally
550/// and then call this function.  The scenario verifies the post-sweep state.
551///
552/// This function takes two lambdas:
553/// - `seed_orphan`: inserts the orphan claim + ledger entry directly, returns
554///   `(incumbent_agent_id, challenger_value, incumbent_value)`.
555/// - The engine reference (the EngineHandle built on the same store as seed_orphan touches).
556#[cfg(any(test, feature = "test-support"))]
557pub async fn scenario_sweep_recovers_orphan<P, O, V>(
558    engine: &EngineHandle<P, O, V>,
559    agent_name: &str,
560) where
561    P: crate::ports::PersistencePort + Send + Sync + 'static,
562    P::Error: std::fmt::Debug,
563    O: OraclePort + Send + Sync + 'static,
564    V: crate::ports::VectorPort + Send + Sync + 'static,
565{
566    // After the adapter test has seeded the orphan, we simply call sweep and verify.
567    let agent = AgentId(agent_name.into());
568
569    let swept = engine.sweep_expired_adjudications().await
570        .expect("conformance[sweep-orphan]: sweep must succeed");
571    assert!(swept >= 1,
572        "conformance[sweep-orphan]: sweep must recover at least 1 orphaned claim; got {swept}");
573
574    let qr = engine.query_memory(query_req(&agent)).await
575        .expect("conformance[sweep-orphan]: query must succeed");
576    assert_eq!(qr.belief.status, BeliefStatus::Contested,
577        "conformance[sweep-orphan]: after orphan recovery, must be Contested");
578    let all_vals: Vec<_> = qr.belief.primary.iter()
579        .map(|b| b.fact.value.clone())
580        .chain(qr.belief.alternatives.iter().map(|b| b.fact.value.clone()))
581        .collect();
582    assert!(all_vals.contains(&serde_json::json!("orphan-incumbent")),
583        "conformance[sweep-orphan]: incumbent must be visible; got {all_vals:?}");
584    assert!(all_vals.contains(&serde_json::json!("orphan-challenger")),
585        "conformance[sweep-orphan]: orphaned challenger must be visible; got {all_vals:?}");
586}
587
588// ── Scenario 9: Durable store survives reopen ─────────────────────────────────
589
590/// After queuing a conflict on engine-1, drop it and open engine-2 on the SAME backing
591/// store, then submit Affirm on the pre-restart handle.  This proves the pending row
592/// (Amendment-1) survives engine restart.
593///
594/// Callers supply two engines built over the same durable backing store and the
595/// handle_id used by the oracle.
596#[cfg(any(test, feature = "test-support"))]
597pub async fn scenario_durable_store_survives_reopen<P, O, V>(
598    engine1: EngineHandle<P, O, V>,
599    build_engine2: impl FnOnce() -> EngineHandle<P, O, V>,
600    handle_id: uuid::Uuid,
601) where
602    P: crate::ports::PersistencePort + Send + Sync + 'static,
603    P::Error: std::fmt::Debug,
604    O: OraclePort + Send + Sync + 'static,
605    V: crate::ports::VectorPort + Send + Sync + 'static,
606{
607    let agent = AgentId("conformance-reopen-agent".into());
608
609    // Engine 1: ingest conflict.
610    let resp_inc = engine1.ingest_claim(ingest_req(&agent, "reopen-incumbent")).await
611        .expect("conformance[reopen]: ingest incumbent on engine-1");
612    assert_eq!(resp_inc.disposition, Disposition::CommittedCheap);
613
614    let resp_ch = engine1.ingest_claim(ingest_req(&agent, "reopen-challenger")).await
615        .expect("conformance[reopen]: ingest challenger on engine-1");
616    assert_eq!(resp_ch.disposition, Disposition::QueuedForAdjudication);
617
618    let challenger_ref = resp_ch.claim_ref.clone();
619
620    // Drop engine 1, simulating restart.
621    drop(engine1);
622
623    // Engine 2: open on same backing store.
624    let engine2 = build_engine2();
625
626    // Submit Affirm on the pre-restart handle — must resolve (proves pending row durability).
627    let outcome = engine2.submit_adjudication(
628        handle_id,
629        adj_response(handle_id, AdjudicationVerdict::Affirm),
630    ).await.expect("conformance[reopen]: Affirm on pre-restart handle must succeed");
631    assert_eq!(outcome.disposition, Disposition::CommittedCheap,
632        "conformance[reopen]: challenger must be CommittedCheap after cross-restart Affirm");
633    assert_eq!(outcome.claim_ref, challenger_ref,
634        "conformance[reopen]: outcome.claim_ref must be challenger");
635
636    // Query on engine 2 must surface challenger.
637    let qr = engine2.query_memory(query_req(&agent)).await
638        .expect("conformance[reopen]: query on engine-2 must succeed");
639    let primary_val = qr.belief.primary.as_ref().map(|b| b.fact.value.clone());
640    assert_eq!(primary_val, Some(serde_json::json!("reopen-challenger")),
641        "conformance[reopen]: challenger must be surfaced after cross-restart Affirm");
642}
643
644// ── Scenario 10: Atomicity — no torn write ────────────────────────────────────
645
646/// After a successful Affirm submit, the ledger + disposition + pending-row-resolved
647/// are all consistent (no partial state).
648///
649/// Full mid-apply failure injection is not feasible without engine-level hooks; we
650/// verify the observable post-success consistency guarantee instead.
651#[cfg(any(test, feature = "test-support"))]
652pub async fn scenario_atomicity_no_torn_write<P, O, V>(
653    engine: &EngineHandle<P, O, V>,
654    handle_id: uuid::Uuid,
655) where
656    P: crate::ports::PersistencePort + Send + Sync + 'static,
657    P::Error: std::fmt::Debug,
658    O: OraclePort + Send + Sync + 'static,
659    V: crate::ports::VectorPort + Send + Sync + 'static,
660{
661    let agent = AgentId("conformance-atomicity-agent".into());
662
663    engine.ingest_claim(ingest_req(&agent, "atom-incumbent")).await
664        .expect("conformance[atomicity]: ingest incumbent");
665    let resp_ch = engine.ingest_claim(ingest_req(&agent, "atom-challenger")).await
666        .expect("conformance[atomicity]: ingest challenger");
667    let challenger_ref = resp_ch.claim_ref.clone();
668
669    let outcome = engine.submit_adjudication(
670        handle_id,
671        adj_response(handle_id, AdjudicationVerdict::Affirm),
672    ).await.expect("conformance[atomicity]: Affirm submit must succeed");
673
674    // Disposition check (ledger).
675    assert_eq!(outcome.disposition, Disposition::CommittedCheap,
676        "conformance[atomicity]: challenger disposition must be CommittedCheap");
677    assert_eq!(outcome.claim_ref, challenger_ref);
678
679    // Pending row must be consumed (handle gone).
680    let second = engine.submit_adjudication(
681        handle_id,
682        adj_response(handle_id, AdjudicationVerdict::Affirm),
683    ).await;
684    assert!(
685        matches!(second, Err(crate::error::MemError::AdjudicationHandleNotFound { .. })),
686        "conformance[atomicity]: pending row must be consumed (not found on second submit)"
687    );
688
689    // query_memory consistent: challenger surfaced, not Contested.
690    let qr = engine.query_memory(query_req(&agent)).await
691        .expect("conformance[atomicity]: query must succeed");
692    assert_ne!(qr.belief.status, BeliefStatus::Contested,
693        "conformance[atomicity]: after Affirm, must NOT be Contested");
694    assert_ne!(qr.belief.status, BeliefStatus::NoBelief,
695        "conformance[atomicity]: after Affirm, must NOT be NoBelief");
696
697    // Ledger consistent: AdjudicationResolved entry present.
698    let audit = engine.query_audit(AuditQueryRequest {
699        agent_id: agent.clone(),
700        claim_ref: None,
701        from_tx_time: None,
702        limit: 100,
703    }).await.expect("conformance[atomicity]: audit must succeed");
704    let resolved = audit.entries.iter()
705        .find(|e| e.claim_ref == challenger_ref && e.event_kind == LedgerEventKind::AdjudicationResolved)
706        .expect("conformance[atomicity]: AdjudicationResolved ledger entry must exist");
707    assert_eq!(resolved.disposition, Disposition::CommittedCheap,
708        "conformance[atomicity]: ledger entry must be CommittedCheap");
709}
710
711// ── Scenario 11: Ledger entry expectations consistent across adapters ──────────
712
713/// Verify that the ledger entry kinds and dispositions for each verdict
714/// are consistent (same invariants) across adapters.
715/// This is an aggregated check — sub-assertions from scenarios 1, 2, 3 are reused
716/// here as an explicit cross-check.
717#[cfg(any(test, feature = "test-support"))]
718pub async fn scenario_ledger_entry_expectations<P, O, V>(
719    engine: &EngineHandle<P, O, V>,
720    handle_id: uuid::Uuid,
721    verdict: AdjudicationVerdict,
722    expected_ch_disposition: Disposition,
723    expected_ch_event_kind: LedgerEventKind,
724) where
725    P: crate::ports::PersistencePort + Send + Sync + 'static,
726    P::Error: std::fmt::Debug,
727    O: OraclePort + Send + Sync + 'static,
728    V: crate::ports::VectorPort + Send + Sync + 'static,
729{
730    let label = format!("{verdict:?}");
731    let agent = AgentId(format!("conformance-ledger-{label}-agent"));
732
733    engine.ingest_claim(ingest_req(&agent, "ledger-incumbent")).await
734        .expect("conformance[ledger]: ingest incumbent");
735    let resp_ch = engine.ingest_claim(ingest_req(&agent, "ledger-challenger")).await
736        .expect("conformance[ledger]: ingest challenger");
737    let challenger_ref = resp_ch.claim_ref.clone();
738
739    engine.submit_adjudication(handle_id, adj_response(handle_id, verdict)).await
740        .expect("conformance[ledger]: submit must succeed");
741
742    let audit = engine.query_audit(AuditQueryRequest {
743        agent_id: agent.clone(),
744        claim_ref: None,
745        from_tx_time: None,
746        limit: 100,
747    }).await.expect("conformance[ledger]: audit must succeed");
748
749    // Find the resolution entry for the challenger.
750    let ch_entry = audit.entries.iter()
751        .find(|e| e.claim_ref == challenger_ref && e.event_kind == expected_ch_event_kind)
752        .unwrap_or_else(|| panic!(
753            "conformance[ledger/{label}]: expected {:?} event kind for challenger; entries={:?}",
754            expected_ch_event_kind,
755            audit.entries.iter().map(|e| (&e.event_kind, &e.disposition)).collect::<Vec<_>>()
756        ));
757    assert_eq!(ch_entry.disposition, expected_ch_disposition,
758        "conformance[ledger/{label}]: challenger disposition must be {expected_ch_disposition:?}");
759}
760
761// ── Scenario 12: B11 oracle-absent → Contested ────────────────────────────────
762
763/// With no oracle, conflicting External claims must immediately surface as Contested.
764/// Caller must pass a no-oracle engine (DefaultEngine / `open_default_in_memory` variant).
765#[cfg(any(test, feature = "test-support"))]
766pub async fn scenario_b11_oracle_absent_contested<P, O, V>(
767    engine: &EngineHandle<P, O, V>,
768) where
769    P: crate::ports::PersistencePort + Send + Sync + 'static,
770    P::Error: std::fmt::Debug,
771    O: OraclePort + Send + Sync + 'static,
772    V: crate::ports::VectorPort + Send + Sync + 'static,
773{
774    let agent = AgentId("conformance-b11-agent".into());
775
776    let resp_inc = engine.ingest_claim(ingest_req(&agent, "b11-incumbent")).await
777        .expect("conformance[b11]: ingest incumbent");
778    assert_eq!(resp_inc.disposition, Disposition::CommittedCheap);
779
780    let resp_ch = engine.ingest_claim(ingest_req(&agent, "b11-challenger")).await
781        .expect("conformance[b11]: ingest challenger");
782    assert_eq!(resp_ch.disposition, Disposition::Contested,
783        "conformance[b11]: oracle-absent External conflict MUST be Contested immediately");
784
785    let qr = engine.query_memory(query_req(&agent)).await
786        .expect("conformance[b11]: query must succeed");
787    assert_eq!(qr.belief.status, BeliefStatus::Contested,
788        "conformance[b11]: query_memory after oracle-absent conflict must be Contested");
789    let all_vals: Vec<_> = qr.belief.primary.iter()
790        .map(|b| b.fact.value.clone())
791        .chain(qr.belief.alternatives.iter().map(|b| b.fact.value.clone()))
792        .collect();
793    assert!(all_vals.contains(&serde_json::json!("b11-incumbent")),
794        "conformance[b11]: incumbent must be visible in Contested");
795    assert!(all_vals.contains(&serde_json::json!("b11-challenger")),
796        "conformance[b11]: challenger must be visible in Contested");
797}
798
799// ── Public entry-point helpers ─────────────────────────────────────────────────
800
801/// Build a fresh `EngineConfig` with a 1-nanosecond adjudication TTL.
802/// Used by adapter tests for TTL/sweep scenarios.
803#[cfg(any(test, feature = "test-support"))]
804pub fn tiny_ttl_config() -> EngineConfig {
805    EngineConfig {
806        default_adjudication_ttl: Some(Duration::from_nanos(1)),
807        ..EngineConfig::default()
808    }
809}