Skip to main content

mempill_sqlite/
store.rs

1//! `SqlitePersistenceStore` — impl of `PersistencePort` for mempill-sqlite.
2//!
3//! # Append-only
4//!
5//! Every write method is an INSERT.  No UPDATE or DELETE paths exist in this file.
6//! Attempts to update or delete data must be rejected at the application layer.
7//!
8//! # Atomic commit unit
9//!
10//! The store does NOT manage transaction lifecycle — the application use-case does.
11//! `begin_atomic` moves the connection into a `SqliteTxn`; `commit` and `rollback` return
12//! it.  This guarantees that {claim + validity assertion + ledger entry + edge} land in one
13//! SQLite transaction or not at all.
14//!
15//! # Single-writer per agent_id
16//!
17//! v0.1 is single-process embedded.  The `AgentWriteLockMap` in mempill-core coordinates
18//! per-agent_id writes at the async boundary.  The store is structurally read-safe because
19//! reads do not acquire any lock, and writes are serialised by the application layer.
20//!
21//! # Connection ownership model
22//!
23//! The store owns `Box<Connection>` inside a `std::cell::Cell`-like hand-off: `begin_atomic`
24//! takes it out; `commit`/`rollback` put it back.  We use `Arc<Mutex<Option<Box<Connection>>>>`
25//! so the store is `Send + Sync` and can be shared across `spawn_blocking` calls.
26//! The `Option` is always `Some` except during the window between `begin_atomic` and
27//! `commit`/`rollback`.  Calling `begin_atomic` while a txn is already open returns an error.
28
29use std::sync::{Arc, Mutex};
30
31use mempill_core::ports::pending_adjudication::{PendingAdjudicationPort, PendingAdjudicationRow};
32use mempill_core::ports::persistence::PersistencePort;
33use mempill_types::{
34    claim::{Cardinality, Claim, Confidence, Criticality, Fact},
35    edge::{ClaimEdge, EdgeKind},
36    identity::{AgentId, ClaimRef},
37    ledger::{LedgerEntry, LedgerEventKind},
38    provenance::{ExternalAnchor, ExternalKind, ProvenanceLabel},
39    time::{TransactionTime, ValidTime},
40    validity::{AssertionKind, ValidityAssertion},
41};
42use rusqlite::Connection;
43
44use crate::{txn::SqliteTxn, SqliteStoreError};
45
46// ── SqlitePersistenceStore ────────────────────────────────────────────────────
47
48/// The SQLite-backed implementation of `PersistencePort`.
49///
50/// Construct via `SqlitePersistenceStore::new(conn)` where `conn` is a fully-initialised
51/// rusqlite `Connection` (PRAGMAs applied, migrations run — use `connection::open` or
52/// `connection::open_in_memory`).
53pub struct SqlitePersistenceStore {
54    /// Connection slot.  `None` only while a `SqliteTxn` is active.
55    conn: Arc<Mutex<Option<Box<Connection>>>>,
56}
57
58impl SqlitePersistenceStore {
59    /// Create a store wrapping an already-initialised `Connection`.
60    pub fn new(conn: Connection) -> Self {
61        Self {
62            conn: Arc::new(Mutex::new(Some(Box::new(conn)))),
63        }
64    }
65
66    /// Return a `SqlitePendingStore` that shares the same SQLite connection.
67    ///
68    /// This is the standard way to construct the pending-adjudication adapter:
69    /// ```rust,ignore
70    /// let store = SqlitePersistenceStore::new(conn);
71    /// let pending = store.pending_store();
72    /// ```
73    /// Both `SqlitePersistenceStore` and `SqlitePendingStore` share the connection Arc,
74    /// so the pending insert is serialized with the claim transaction by the EngineHandle
75    /// write lock — not by a shared rusqlite transaction.
76    pub fn pending_store(&self) -> SqlitePendingStore {
77        SqlitePendingStore::new(Arc::clone(&self.conn))
78    }
79}
80
81// SAFETY: Connection is Send (rusqlite guarantees this); Mutex makes it Sync.
82unsafe impl Send for SqlitePersistenceStore {}
83unsafe impl Sync for SqlitePersistenceStore {}
84
85// ── Domain-type ↔ column mapping helpers ─────────────────────────────────────
86
87/// Serialize `ProvenanceLabel` to the TEXT column value used in the schema (§5).
88/// Format: `'ModelDerived'`, `'RecallReEntry'`, `'External_UserAsserted'`,
89/// `'External_ExternalFirstHand'`.
90fn provenance_to_str(p: &ProvenanceLabel) -> &'static str {
91    match p {
92        ProvenanceLabel::ModelDerived => "ModelDerived",
93        ProvenanceLabel::RecallReEntry => "RecallReEntry",
94        ProvenanceLabel::External(ExternalKind::UserAsserted) => "External_UserAsserted",
95        ProvenanceLabel::External(ExternalKind::ExternalFirstHand) => "External_ExternalFirstHand",
96        // ProvenanceLabel is #[non_exhaustive]; future variants will be caught here at compile time.
97        _ => "Unknown",
98    }
99}
100
101/// Deserialize the TEXT column value back to `ProvenanceLabel`.
102/// Used by the read path.
103fn str_to_provenance(s: &str) -> Result<ProvenanceLabel, SqliteStoreError> {
104    match s {
105        "ModelDerived" => Ok(ProvenanceLabel::ModelDerived),
106        "RecallReEntry" => Ok(ProvenanceLabel::RecallReEntry),
107        "External_UserAsserted" => {
108            Ok(ProvenanceLabel::External(ExternalKind::UserAsserted))
109        }
110        "External_ExternalFirstHand" => {
111            Ok(ProvenanceLabel::External(ExternalKind::ExternalFirstHand))
112        }
113        other => Err(SqliteStoreError::Mapping(format!(
114            "unknown provenance_label value: {other}"
115        ))),
116    }
117}
118
119fn cardinality_to_str(c: &Cardinality) -> &'static str {
120    match c {
121        Cardinality::Functional => "Functional",
122        Cardinality::SetValued => "SetValued",
123        Cardinality::Unknown => "Unknown",
124    }
125}
126
127fn str_to_cardinality(s: &str) -> Result<Cardinality, SqliteStoreError> {
128    match s {
129        "Functional" => Ok(Cardinality::Functional),
130        "SetValued" => Ok(Cardinality::SetValued),
131        "Unknown" => Ok(Cardinality::Unknown),
132        other => Err(SqliteStoreError::Mapping(format!(
133            "unknown cardinality value: {other}"
134        ))),
135    }
136}
137
138fn criticality_to_str(c: &Criticality) -> &'static str {
139    match c {
140        Criticality::Low => "Low",
141        Criticality::Medium => "Medium",
142        Criticality::High => "High",
143        Criticality::Critical => "Critical",
144    }
145}
146
147fn str_to_criticality(s: &str) -> Result<Criticality, SqliteStoreError> {
148    match s {
149        "Low" => Ok(Criticality::Low),
150        "Medium" => Ok(Criticality::Medium),
151        "High" => Ok(Criticality::High),
152        "Critical" => Ok(Criticality::Critical),
153        other => Err(SqliteStoreError::Mapping(format!(
154            "unknown criticality value: {other}"
155        ))),
156    }
157}
158
159fn edge_kind_to_str(k: &EdgeKind) -> &'static str {
160    match k {
161        EdgeKind::DerivedFrom => "DerivedFrom",
162        EdgeKind::Supersedes => "Supersedes",
163        EdgeKind::DependsOn => "DependsOn",
164        EdgeKind::MutualExclusion => "MutualExclusion",
165        // EdgeKind is #[non_exhaustive] — future variants stored as "Unknown".
166        _ => "Unknown",
167    }
168}
169
170fn str_to_edge_kind(s: &str) -> Result<EdgeKind, SqliteStoreError> {
171    match s {
172        "DerivedFrom" => Ok(EdgeKind::DerivedFrom),
173        "Supersedes" => Ok(EdgeKind::Supersedes),
174        "DependsOn" => Ok(EdgeKind::DependsOn),
175        "MutualExclusion" => Ok(EdgeKind::MutualExclusion),
176        other => Err(SqliteStoreError::Mapping(format!(
177            "unknown edge_kind value: {other}"
178        ))),
179    }
180}
181
182fn ledger_event_kind_to_str(k: &LedgerEventKind) -> &'static str {
183    // AdjudicationExpired maps to "AdjudicationExpired" for the TTL sweep and lazy expiry path.
184    match k {
185        LedgerEventKind::ClaimCommitted => "ClaimCommitted",
186        LedgerEventKind::ValidityAsserted => "ValidityAsserted",
187        LedgerEventKind::AdjudicationRequested => "AdjudicationRequested",
188        LedgerEventKind::AdjudicationResolved => "AdjudicationResolved",
189        LedgerEventKind::RecallReEntryDetected => "RecallReEntryDetected",
190        LedgerEventKind::Quarantined => "Quarantined",
191        LedgerEventKind::DependentFlaggedPendingReview => "DependentFlaggedPendingReview",
192        LedgerEventKind::ServedAsInjected => "ServedAsInjected",
193        LedgerEventKind::AdjudicationExpired => "AdjudicationExpired",
194        // LedgerEventKind is #[non_exhaustive] — future variants stored as "Unknown".
195        _ => "Unknown",
196    }
197}
198
199fn str_to_ledger_event_kind(s: &str) -> Result<LedgerEventKind, SqliteStoreError> {
200    match s {
201        "ClaimCommitted" => Ok(LedgerEventKind::ClaimCommitted),
202        "ValidityAsserted" => Ok(LedgerEventKind::ValidityAsserted),
203        "AdjudicationRequested" => Ok(LedgerEventKind::AdjudicationRequested),
204        "AdjudicationResolved" => Ok(LedgerEventKind::AdjudicationResolved),
205        "RecallReEntryDetected" => Ok(LedgerEventKind::RecallReEntryDetected),
206        "Quarantined" => Ok(LedgerEventKind::Quarantined),
207        "DependentFlaggedPendingReview" => Ok(LedgerEventKind::DependentFlaggedPendingReview),
208        "ServedAsInjected" => Ok(LedgerEventKind::ServedAsInjected),
209        "AdjudicationExpired" => Ok(LedgerEventKind::AdjudicationExpired),
210        other => Err(SqliteStoreError::Mapping(format!(
211            "unknown ledger event_kind value: {other}"
212        ))),
213    }
214}
215
216fn disposition_to_str(d: &mempill_types::disposition::Disposition) -> &'static str {
217    use mempill_types::disposition::Disposition;
218    match d {
219        Disposition::CommittedCheap => "CommittedCheap",
220        Disposition::CommittedInferred => "CommittedInferred",
221        Disposition::QueuedForAdjudication => "QueuedForAdjudication",
222        Disposition::Contested => "Contested",
223        Disposition::PendingConflict => "PendingConflict",
224        Disposition::PendingReview => "PendingReview",
225        Disposition::PendingLowConfidence => "PendingLowConfidence",
226        Disposition::Quarantined => "Quarantined",
227        Disposition::Superseded => "Superseded",
228        Disposition::Invalidated => "Invalidated",
229        Disposition::Reinstated => "Reinstated",
230        Disposition::Rejected => "Rejected",
231        // Disposition is #[non_exhaustive] — future variants stored as "Unknown".
232        _ => "Unknown",
233    }
234}
235
236fn str_to_disposition(s: &str) -> Result<mempill_types::disposition::Disposition, SqliteStoreError> {
237    use mempill_types::disposition::Disposition;
238    match s {
239        "CommittedCheap" => Ok(Disposition::CommittedCheap),
240        "CommittedInferred" => Ok(Disposition::CommittedInferred),
241        "QueuedForAdjudication" => Ok(Disposition::QueuedForAdjudication),
242        "Contested" => Ok(Disposition::Contested),
243        "PendingConflict" => Ok(Disposition::PendingConflict),
244        "PendingReview" => Ok(Disposition::PendingReview),
245        "PendingLowConfidence" => Ok(Disposition::PendingLowConfidence),
246        "Quarantined" => Ok(Disposition::Quarantined),
247        "Superseded" => Ok(Disposition::Superseded),
248        "Invalidated" => Ok(Disposition::Invalidated),
249        "Reinstated" => Ok(Disposition::Reinstated),
250        "Rejected" => Ok(Disposition::Rejected),
251        other => Err(SqliteStoreError::Mapping(format!(
252            "unknown disposition value: {other}"
253        ))),
254    }
255}
256
257// ── Row-to-domain-type mapping helpers ───────────────────────────────────────
258
259/// Map a rusqlite `Row` from the `claims` table to a `Claim` domain type.
260///
261/// Column order (must match every SELECT that feeds this function):
262///   0  claim_id
263///   1  agent_id
264///   2  subject
265///   3  predicate
266///   4  value  (JSON text)
267///   5  cardinality
268///   6  provenance_label
269///   7  nearest_external_anchor_id  (nullable TEXT)
270///   8  derivation_depth
271///   9  tx_time
272///  10  valid_time_start  (nullable)
273///  11  valid_time_end    (nullable)
274///  12  valid_time_confidence
275///  13  value_confidence
276///  14  criticality
277///  15  derived_from  (JSON array of UUID strings)
278///  16  metadata      (nullable JSON text)
279///  17  snapshot_schema_version  (nullable INTEGER)
280fn row_to_claim(row: &rusqlite::Row<'_>) -> Result<Claim, rusqlite::Error> {
281    // We map rusqlite errors to SqliteStoreError in the caller; use rusqlite::Error here
282    // so this fn can be used directly as a row-mapper closure.
283    let claim_id_str: String = row.get(0)?;
284    let agent_id_str: String = row.get(1)?;
285    let subject: String = row.get(2)?;
286    let predicate: String = row.get(3)?;
287    let value_json: String = row.get(4)?;
288    let cardinality_str: String = row.get(5)?;
289    let provenance_str: String = row.get(6)?;
290    let nearest_anchor_str: Option<String> = row.get(7)?;
291    let derivation_depth: i64 = row.get(8)?;
292    let tx_time_str: String = row.get(9)?;
293    let valid_time_start_str: Option<String> = row.get(10)?;
294    let valid_time_end_str: Option<String> = row.get(11)?;
295    let valid_time_confidence: f64 = row.get(12)?;
296    let value_confidence: f64 = row.get(13)?;
297    let criticality_str: String = row.get(14)?;
298    let derived_from_json: String = row.get(15)?;
299    let metadata_json: Option<String> = row.get(16)?;
300    let snapshot_schema_version_raw: Option<i64> = row.get(17)?;
301
302    // These mapping errors cannot be expressed as rusqlite::Error cleanly; use
303    // rusqlite::Error::InvalidColumnType as a carrier — callers convert to SqliteStoreError.
304    let to_rusqlite_err = |msg: String| rusqlite::Error::InvalidColumnType(
305        0,
306        msg,
307        rusqlite::types::Type::Text,
308    );
309
310    let claim_id = uuid::Uuid::parse_str(&claim_id_str)
311        .map_err(|e| to_rusqlite_err(format!("claim_id UUID parse: {e}")))?;
312
313    let value: serde_json::Value = serde_json::from_str(&value_json)
314        .map_err(|e| to_rusqlite_err(format!("value JSON parse: {e}")))?;
315
316    let cardinality = str_to_cardinality(&cardinality_str)
317        .map_err(|e| to_rusqlite_err(e.to_string()))?;
318
319    let provenance = str_to_provenance(&provenance_str)
320        .map_err(|e| to_rusqlite_err(e.to_string()))?;
321
322    let nearest_external_anchor: Option<ClaimRef> = nearest_anchor_str
323        .map(|s| {
324            uuid::Uuid::parse_str(&s)
325                .map(ClaimRef)
326                .map_err(|e| to_rusqlite_err(format!("anchor UUID parse: {e}")))
327        })
328        .transpose()?;
329
330    let tx_time = chrono::DateTime::parse_from_rfc3339(&tx_time_str)
331        .map(|dt| dt.with_timezone(&chrono::Utc))
332        .map_err(|e| to_rusqlite_err(format!("tx_time parse: {e}")))?;
333
334    let valid_time_start = valid_time_start_str
335        .map(|s| {
336            chrono::DateTime::parse_from_rfc3339(&s)
337                .map(|dt| dt.with_timezone(&chrono::Utc))
338                .map_err(|e| to_rusqlite_err(format!("valid_time_start parse: {e}")))
339        })
340        .transpose()?;
341
342    let valid_time_end = valid_time_end_str
343        .map(|s| {
344            chrono::DateTime::parse_from_rfc3339(&s)
345                .map(|dt| dt.with_timezone(&chrono::Utc))
346                .map_err(|e| to_rusqlite_err(format!("valid_time_end parse: {e}")))
347        })
348        .transpose()?;
349
350    let criticality = str_to_criticality(&criticality_str)
351        .map_err(|e| to_rusqlite_err(e.to_string()))?;
352
353    let derived_from_uuids: Vec<String> = serde_json::from_str(&derived_from_json)
354        .map_err(|e| to_rusqlite_err(format!("derived_from JSON parse: {e}")))?;
355
356    let derived_from: Vec<ClaimRef> = derived_from_uuids
357        .iter()
358        .map(|s| {
359            uuid::Uuid::parse_str(s)
360                .map(ClaimRef)
361                .map_err(|e| to_rusqlite_err(format!("derived_from UUID parse: {e}")))
362        })
363        .collect::<Result<_, _>>()?;
364
365    let metadata: Option<serde_json::Value> = metadata_json
366        .map(|s| {
367            serde_json::from_str(&s)
368                .map_err(|e| to_rusqlite_err(format!("metadata JSON parse: {e}")))
369        })
370        .transpose()?;
371
372    let snapshot_schema_version: Option<u32> =
373        snapshot_schema_version_raw.map(|v| v as u32);
374
375    Ok(Claim::new(
376        ClaimRef(claim_id),
377        AgentId(agent_id_str),
378        Fact { subject, predicate, value },
379        cardinality,
380        provenance,
381        ExternalAnchor {
382            nearest_external_anchor,
383            derivation_depth: derivation_depth as u32,
384        },
385        TransactionTime(tx_time),
386        ValidTime {
387            start: valid_time_start,
388            end: valid_time_end,
389            valid_time_confidence: valid_time_confidence as f32,
390        },
391        Confidence {
392            value_confidence: value_confidence as f32,
393            valid_time_confidence: valid_time_confidence as f32,
394        },
395        criticality,
396        derived_from,
397        metadata,
398        snapshot_schema_version,
399    ))
400}
401
402/// The SELECT column list that must be used with `row_to_claim`.
403/// Columns must be in the exact order defined in `row_to_claim`.
404const CLAIM_SELECT_COLS: &str = "
405    claim_id, agent_id, subject, predicate, value, cardinality,
406    provenance_label, nearest_external_anchor_id, derivation_depth,
407    tx_time, valid_time_start, valid_time_end, valid_time_confidence,
408    value_confidence, criticality, derived_from,
409    metadata, snapshot_schema_version
410";
411
412/// Map a rusqlite `Row` from the `claim_edges` table to a `ClaimEdge` domain type.
413fn row_to_edge(row: &rusqlite::Row<'_>) -> Result<ClaimEdge, rusqlite::Error> {
414    let to_err = |msg: String| rusqlite::Error::InvalidColumnType(
415        0, msg, rusqlite::types::Type::Text,
416    );
417
418    let edge_id_str: String = row.get(0)?;
419    let agent_id_str: String = row.get(1)?;
420    let from_claim_str: String = row.get(2)?;
421    let to_claim_str: String = row.get(3)?;
422    let kind_str: String = row.get(4)?;
423    let created_at_str: String = row.get(5)?;
424
425    let edge_id = uuid::Uuid::parse_str(&edge_id_str)
426        .map_err(|e| to_err(format!("edge_id UUID: {e}")))?;
427    let from_claim = uuid::Uuid::parse_str(&from_claim_str)
428        .map(ClaimRef)
429        .map_err(|e| to_err(format!("from_claim UUID: {e}")))?;
430    let to_claim = uuid::Uuid::parse_str(&to_claim_str)
431        .map(ClaimRef)
432        .map_err(|e| to_err(format!("to_claim UUID: {e}")))?;
433    let kind = str_to_edge_kind(&kind_str)
434        .map_err(|e| to_err(e.to_string()))?;
435    let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
436        .map(|dt| dt.with_timezone(&chrono::Utc))
437        .map_err(|e| to_err(format!("created_at parse: {e}")))?;
438
439    Ok(ClaimEdge {
440        edge_id,
441        agent_id: AgentId(agent_id_str),
442        from_claim,
443        to_claim,
444        kind,
445        created_at: TransactionTime(created_at),
446    })
447}
448
449// ── PersistencePort impl ──────────────────────────────────────────────────────
450
451impl PersistencePort for SqlitePersistenceStore {
452    type Transaction = SqliteTxn;
453    type Error = SqliteStoreError;
454
455    // ── Transaction lifecycle ─────────────────────────────────────────────────
456
457    /// Open an explicit `BEGIN DEFERRED` transaction scoped to `agent_id`.
458    ///
459    /// The connection is moved into the returned `SqliteTxn`.  Calling `begin_atomic`
460    /// again before `commit`/`rollback` returns `SqliteStoreError::TxnAlreadyOpen`.
461    fn begin_atomic(&self, agent_id: &AgentId) -> Result<SqliteTxn, SqliteStoreError> {
462        let mut slot = self.conn.lock().expect("SqlitePersistenceStore: mutex poisoned");
463        let conn = slot.take().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
464        SqliteTxn::begin(agent_id.clone(), conn)
465    }
466
467    /// Commit the transaction and return the connection to the store.
468    fn commit(&self, txn: SqliteTxn) -> Result<(), SqliteStoreError> {
469        let conn = txn.commit_and_return()?;
470        let mut slot = self.conn.lock().expect("SqlitePersistenceStore: mutex poisoned");
471        *slot = Some(conn);
472        Ok(())
473    }
474
475    /// Rollback the transaction and return the connection to the store.
476    /// On rollback all rows appended within the txn are discarded (all-or-nothing atomicity).
477    fn rollback(&self, txn: SqliteTxn) -> Result<(), SqliteStoreError> {
478        let conn = txn.rollback_and_return()?;
479        let mut slot = self.conn.lock().expect("SqlitePersistenceStore: mutex poisoned");
480        *slot = Some(conn);
481        Ok(())
482    }
483
484    // ── Write methods (INSERT-only, I1) ───────────────────────────────────────
485
486    /// Append a claim row within the open transaction.
487    ///
488    /// Column mapping (§5):
489    /// - `claim_id` ← `claim.claim_ref().0` (UUID → TEXT)
490    /// - `agent_id` ← `claim.agent_id().0`
491    /// - `provenance_label` ← `provenance_to_str(claim.provenance())` (NOT NULL; bi-temporal provenance column)
492    /// - `nearest_external_anchor_id` ← `ExternalAnchor.nearest_external_anchor` (nullable)
493    /// - `derived_from` ← JSON array of ClaimRef UUIDs
494    fn append_claim(
495        &self,
496        txn: &mut SqliteTxn,
497        claim: &Claim,
498    ) -> Result<ClaimRef, SqliteStoreError> {
499        let conn = txn.conn();
500
501        let claim_id = claim.claim_ref().0.to_string();
502        let agent_id = claim.agent_id().0.as_str();
503        let fact = claim.fact();
504        let value_json = serde_json::to_string(&fact.value)
505            .map_err(|e| SqliteStoreError::Mapping(format!("value serialization: {e}")))?;
506        let cardinality = cardinality_to_str(claim.cardinality());
507        let provenance = provenance_to_str(claim.provenance());
508        let anchor = claim.external_anchor();
509        let nearest_anchor: Option<String> =
510            anchor.nearest_external_anchor.as_ref().map(|r| r.0.to_string());
511        let derivation_depth = anchor.derivation_depth as i64;
512        let tx_time = claim.transaction_time().0.to_rfc3339();
513        let vt = claim.valid_time();
514        let valid_time_start: Option<String> = vt.start.map(|dt| dt.to_rfc3339());
515        let valid_time_end: Option<String> = vt.end.map(|dt| dt.to_rfc3339());
516        let valid_time_confidence = vt.valid_time_confidence as f64;
517        let conf = claim.confidence();
518        let value_confidence = conf.value_confidence as f64;
519        let criticality = criticality_to_str(claim.criticality());
520        let derived_from_refs: Vec<String> =
521            claim.derived_from().iter().map(|r| r.0.to_string()).collect();
522        let derived_from_json = serde_json::to_string(&derived_from_refs)
523            .map_err(|e| SqliteStoreError::Mapping(format!("derived_from serialization: {e}")))?;
524        let metadata: Option<String> = claim
525            .metadata()
526            .map(|v| {
527                serde_json::to_string(v)
528                    .map_err(|e| SqliteStoreError::Mapping(format!("metadata serialization: {e}")))
529            })
530            .transpose()?;
531        let snapshot_schema_version: Option<i64> =
532            claim.snapshot_schema_version().map(|v| v as i64);
533
534        conn.execute(
535            "INSERT INTO claims (
536                claim_id, agent_id, subject, predicate, value, cardinality,
537                provenance_label, nearest_external_anchor_id, derivation_depth,
538                tx_time, valid_time_start, valid_time_end, valid_time_confidence,
539                value_confidence, criticality, derived_from,
540                metadata, snapshot_schema_version, embedding_model_id
541            ) VALUES (
542                ?1,  ?2,  ?3,  ?4,  ?5,  ?6,
543                ?7,  ?8,  ?9,
544                ?10, ?11, ?12, ?13,
545                ?14, ?15, ?16,
546                ?17, ?18, NULL
547            )",
548            rusqlite::params![
549                claim_id,
550                agent_id,
551                fact.subject.as_str(),
552                fact.predicate.as_str(),
553                value_json.as_str(),
554                cardinality,
555                provenance,
556                nearest_anchor,
557                derivation_depth,
558                tx_time.as_str(),
559                valid_time_start,
560                valid_time_end,
561                valid_time_confidence,
562                value_confidence,
563                criticality,
564                derived_from_json.as_str(),
565                metadata,
566                snapshot_schema_version,
567            ],
568        )?;
569
570        Ok(claim.claim_ref().clone())
571    }
572
573    /// Append a validity assertion row (Bound or Reopen) within the open transaction.
574    fn append_validity_assertion(
575        &self,
576        txn: &mut SqliteTxn,
577        assertion: &ValidityAssertion,
578    ) -> Result<(), SqliteStoreError> {
579        let conn = txn.conn();
580
581        let assertion_id = assertion.assertion_ref.to_string();
582        let agent_id = assertion.agent_id.0.as_str();
583        let target_claim_id = assertion.target_claim.0.to_string();
584        let provenance = provenance_to_str(&assertion.provenance);
585        let value_confidence = assertion.confidence.value_confidence as f64;
586        let valid_time_confidence = assertion.confidence.valid_time_confidence as f64;
587        let asserted_at = assertion.asserted_at.0.to_rfc3339();
588
589        let (assertion_kind, bound_at, reopen_at): (&str, Option<String>, Option<String>) =
590            match &assertion.kind {
591                AssertionKind::Bound { bound_at } => {
592                    ("Bound", Some(bound_at.to_rfc3339()), None)
593                }
594                AssertionKind::Reopen { reopen_at } => {
595                    ("Reopen", None, Some(reopen_at.to_rfc3339()))
596                }
597                // AssertionKind is #[non_exhaustive] — future kinds stored as "Unknown" (no-op).
598                _ => ("Unknown", None, None),
599            };
600
601        conn.execute(
602            "INSERT INTO validity_assertions (
603                assertion_id, agent_id, target_claim_id,
604                assertion_kind, bound_at, reopen_at,
605                provenance_label, value_confidence, valid_time_confidence, asserted_at
606            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
607            rusqlite::params![
608                assertion_id.as_str(),
609                agent_id,
610                target_claim_id.as_str(),
611                assertion_kind,
612                bound_at,
613                reopen_at,
614                provenance,
615                value_confidence,
616                valid_time_confidence,
617                asserted_at.as_str(),
618            ],
619        )?;
620
621        Ok(())
622    }
623
624    /// Append a ledger entry row within the open transaction.
625    fn append_ledger_entry(
626        &self,
627        txn: &mut SqliteTxn,
628        entry: &LedgerEntry,
629    ) -> Result<(), SqliteStoreError> {
630        let conn = txn.conn();
631
632        let entry_id = entry.entry_id.to_string();
633        let agent_id = entry.agent_id.0.as_str();
634        let claim_id = entry.claim_ref.0.to_string();
635        let event_kind = ledger_event_kind_to_str(&entry.event_kind);
636        let disposition = disposition_to_str(&entry.disposition);
637        let rationale: Option<String> = entry
638            .rationale
639            .as_ref()
640            .map(|v| {
641                serde_json::to_string(v)
642                    .map_err(|e| SqliteStoreError::Mapping(format!("rationale serialization: {e}")))
643            })
644            .transpose()?;
645        let recorded_at = entry.recorded_at.0.to_rfc3339();
646
647        conn.execute(
648            "INSERT INTO ledger_entries (
649                entry_id, agent_id, claim_id, event_kind, disposition, rationale, recorded_at
650            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
651            rusqlite::params![
652                entry_id.as_str(),
653                agent_id,
654                claim_id.as_str(),
655                event_kind,
656                disposition,
657                rationale,
658                recorded_at.as_str(),
659            ],
660        )?;
661
662        Ok(())
663    }
664
665    /// Append a claim edge row within the open transaction.
666    fn append_claim_edge(
667        &self,
668        txn: &mut SqliteTxn,
669        edge: &ClaimEdge,
670    ) -> Result<(), SqliteStoreError> {
671        let conn = txn.conn();
672
673        let edge_id = edge.edge_id.to_string();
674        let agent_id = edge.agent_id.0.as_str();
675        let from_claim_id = edge.from_claim.0.to_string();
676        let to_claim_id = edge.to_claim.0.to_string();
677        let edge_kind = edge_kind_to_str(&edge.kind);
678        let created_at = edge.created_at.0.to_rfc3339();
679
680        conn.execute(
681            "INSERT INTO claim_edges (
682                edge_id, agent_id, from_claim_id, to_claim_id, edge_kind, created_at
683            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
684            rusqlite::params![
685                edge_id.as_str(),
686                agent_id,
687                from_claim_id.as_str(),
688                to_claim_id.as_str(),
689                edge_kind,
690                created_at.as_str(),
691            ],
692        )?;
693
694        Ok(())
695    }
696
697    // ── Read methods (non-mutating; lock connection slot directly) ───────────
698
699    /// Load all claims on the given (agent_id, subject, predicate) subject-line,
700    /// ordered by tx_time ASC (oldest first — callers fold in tx_time order).
701    ///
702    /// Uses `idx_claims_subject_line` covering index (§5).
703    fn load_subject_line(
704        &self,
705        agent_id: &AgentId,
706        subject: &str,
707        predicate: &str,
708    ) -> Result<Vec<Claim>, SqliteStoreError> {
709        let slot = self.conn.lock().expect("mutex poisoned");
710        let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
711
712        let sql = format!(
713            "SELECT {CLAIM_SELECT_COLS} FROM claims
714             WHERE agent_id = ?1 AND subject = ?2 AND predicate = ?3
715             ORDER BY tx_time ASC"
716        );
717        let mut stmt = conn.prepare(&sql)?;
718        let rows = stmt.query_map(
719            rusqlite::params![agent_id.0.as_str(), subject, predicate],
720            row_to_claim,
721        )?;
722
723        let mut claims = Vec::new();
724        for row in rows {
725            claims.push(row?);
726        }
727        Ok(claims)
728    }
729
730    /// Load a single claim by its `ClaimRef`. Returns `None` if not found.
731    fn load_claim(
732        &self,
733        agent_id: &AgentId,
734        claim_ref: &ClaimRef,
735    ) -> Result<Option<Claim>, SqliteStoreError> {
736        let slot = self.conn.lock().expect("mutex poisoned");
737        let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
738
739        let sql = format!(
740            "SELECT {CLAIM_SELECT_COLS} FROM claims
741             WHERE agent_id = ?1 AND claim_id = ?2"
742        );
743        let mut stmt = conn.prepare(&sql)?;
744        let mut rows = stmt.query_map(
745            rusqlite::params![agent_id.0.as_str(), claim_ref.0.to_string()],
746            row_to_claim,
747        )?;
748
749        match rows.next() {
750            None => Ok(None),
751            Some(row) => Ok(Some(row?)),
752        }
753    }
754
755    /// Load all validity assertions targeting a claim, ordered by asserted_at ASC.
756    ///
757    /// Uses `idx_validity_assertions_target` index (§5).
758    fn load_validity_assertions_for(
759        &self,
760        agent_id: &AgentId,
761        claim_ref: &ClaimRef,
762    ) -> Result<Vec<ValidityAssertion>, SqliteStoreError> {
763        let slot = self.conn.lock().expect("mutex poisoned");
764        let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
765
766        let mut stmt = conn.prepare(
767            "SELECT assertion_id, agent_id, target_claim_id,
768                    assertion_kind, bound_at, reopen_at,
769                    provenance_label, value_confidence, valid_time_confidence, asserted_at
770             FROM validity_assertions
771             WHERE agent_id = ?1 AND target_claim_id = ?2
772             ORDER BY asserted_at ASC",
773        )?;
774
775        let to_err = |msg: String| rusqlite::Error::InvalidColumnType(
776            0, msg, rusqlite::types::Type::Text,
777        );
778
779        let rows = stmt.query_map(
780            rusqlite::params![agent_id.0.as_str(), claim_ref.0.to_string()],
781            |row| {
782                let assertion_id_str: String = row.get(0)?;
783                let agent_id_str: String = row.get(1)?;
784                let target_claim_str: String = row.get(2)?;
785                let kind_str: String = row.get(3)?;
786                let bound_at_str: Option<String> = row.get(4)?;
787                let reopen_at_str: Option<String> = row.get(5)?;
788                let prov_str: String = row.get(6)?;
789                let value_confidence: f64 = row.get(7)?;
790                let valid_time_confidence: f64 = row.get(8)?;
791                let asserted_at_str: String = row.get(9)?;
792
793                let assertion_ref = uuid::Uuid::parse_str(&assertion_id_str)
794                    .map_err(|e| to_err(format!("assertion_id UUID: {e}")))?;
795                let target_claim = uuid::Uuid::parse_str(&target_claim_str)
796                    .map(ClaimRef)
797                    .map_err(|e| to_err(format!("target_claim UUID: {e}")))?;
798                let provenance = str_to_provenance(&prov_str)
799                    .map_err(|e| to_err(e.to_string()))?;
800                let asserted_at = chrono::DateTime::parse_from_rfc3339(&asserted_at_str)
801                    .map(|dt| dt.with_timezone(&chrono::Utc))
802                    .map_err(|e| to_err(format!("asserted_at parse: {e}")))?;
803
804                let kind = match kind_str.as_str() {
805                    "Bound" => {
806                        let s = bound_at_str.ok_or_else(|| to_err("bound_at is NULL for Bound assertion".into()))?;
807                        let dt = chrono::DateTime::parse_from_rfc3339(&s)
808                            .map(|dt| dt.with_timezone(&chrono::Utc))
809                            .map_err(|e| to_err(format!("bound_at parse: {e}")))?;
810                        AssertionKind::Bound { bound_at: dt }
811                    }
812                    "Reopen" => {
813                        let s = reopen_at_str.ok_or_else(|| to_err("reopen_at is NULL for Reopen assertion".into()))?;
814                        let dt = chrono::DateTime::parse_from_rfc3339(&s)
815                            .map(|dt| dt.with_timezone(&chrono::Utc))
816                            .map_err(|e| to_err(format!("reopen_at parse: {e}")))?;
817                        AssertionKind::Reopen { reopen_at: dt }
818                    }
819                    other => return Err(to_err(format!("unknown assertion_kind: {other}"))),
820                };
821
822                Ok(ValidityAssertion {
823                    assertion_ref,
824                    agent_id: AgentId(agent_id_str),
825                    target_claim,
826                    kind,
827                    provenance,
828                    confidence: Confidence {
829                        value_confidence: value_confidence as f32,
830                        valid_time_confidence: valid_time_confidence as f32,
831                    },
832                    asserted_at: TransactionTime(asserted_at),
833                })
834            },
835        )?;
836
837        let mut assertions = Vec::new();
838        for row in rows {
839            assertions.push(row?);
840        }
841        Ok(assertions)
842    }
843
844    /// Load ledger entries for an agent, optionally starting from `from` (inclusive),
845    /// limited to `limit` rows, ordered by recorded_at ASC.
846    ///
847    /// Uses `idx_ledger_agent_time` index (§5). `from = None` returns from the beginning.
848    fn load_ledger(
849        &self,
850        agent_id: &AgentId,
851        from: Option<&TransactionTime>,
852        limit: usize,
853    ) -> Result<Vec<LedgerEntry>, SqliteStoreError> {
854        let slot = self.conn.lock().expect("mutex poisoned");
855        let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
856
857        let to_err = |msg: String| rusqlite::Error::InvalidColumnType(
858            0, msg, rusqlite::types::Type::Text,
859        );
860
861        let from_str: Option<String> = from.map(|t| t.0.to_rfc3339());
862        let limit_i64 = limit as i64;
863
864        let map_row = |row: &rusqlite::Row<'_>| {
865            let entry_id_str: String = row.get(0)?;
866            let agent_id_str: String = row.get(1)?;
867            let claim_id_str: String = row.get(2)?;
868            let event_kind_str: String = row.get(3)?;
869            let disposition_str: String = row.get(4)?;
870            let rationale_json: Option<String> = row.get(5)?;
871            let recorded_at_str: String = row.get(6)?;
872
873            let entry_id = uuid::Uuid::parse_str(&entry_id_str)
874                .map_err(|e| to_err(format!("entry_id UUID: {e}")))?;
875            let claim_id = uuid::Uuid::parse_str(&claim_id_str)
876                .map(ClaimRef)
877                .map_err(|e| to_err(format!("claim_id UUID: {e}")))?;
878            let event_kind = str_to_ledger_event_kind(&event_kind_str)
879                .map_err(|e| to_err(e.to_string()))?;
880            let disposition = str_to_disposition(&disposition_str)
881                .map_err(|e| to_err(e.to_string()))?;
882            let rationale: Option<serde_json::Value> = rationale_json
883                .map(|s| serde_json::from_str(&s).map_err(|e| to_err(format!("rationale JSON: {e}"))))
884                .transpose()?;
885            let recorded_at = chrono::DateTime::parse_from_rfc3339(&recorded_at_str)
886                .map(|dt| dt.with_timezone(&chrono::Utc))
887                .map_err(|e| to_err(format!("recorded_at parse: {e}")))?;
888
889            Ok(LedgerEntry {
890                entry_id,
891                agent_id: AgentId(agent_id_str),
892                claim_ref: claim_id,
893                event_kind,
894                disposition,
895                rationale,
896                recorded_at: TransactionTime(recorded_at),
897            })
898        };
899
900        let mut entries = Vec::new();
901
902        if let Some(ref from_val) = from_str {
903            let mut stmt = conn.prepare(
904                "SELECT entry_id, agent_id, claim_id, event_kind, disposition, rationale, recorded_at
905                 FROM ledger_entries
906                 WHERE agent_id = ?1 AND recorded_at >= ?2
907                 ORDER BY recorded_at ASC
908                 LIMIT ?3",
909            )?;
910            let rows = stmt.query_map(
911                rusqlite::params![agent_id.0.as_str(), from_val.as_str(), limit_i64],
912                map_row,
913            )?;
914            for row in rows {
915                entries.push(row?);
916            }
917        } else {
918            let mut stmt = conn.prepare(
919                "SELECT entry_id, agent_id, claim_id, event_kind, disposition, rationale, recorded_at
920                 FROM ledger_entries
921                 WHERE agent_id = ?1
922                 ORDER BY recorded_at ASC
923                 LIMIT ?2",
924            )?;
925            let rows = stmt.query_map(
926                rusqlite::params![agent_id.0.as_str(), limit_i64],
927                map_row,
928            )?;
929            for row in rows {
930                entries.push(row?);
931            }
932        }
933
934        Ok(entries)
935    }
936
937    /// Load ALL ledger entries for the given claim refs, no row cap.
938    ///
939    /// SQLite limits bound parameters to ~999 per statement (SQLITE_LIMIT_VARIABLE_NUMBER).
940    /// Chunks the IN list into batches of 900 and concatenates results so this method is
941    /// safe for any slice size.
942    fn load_ledger_for_claims(
943        &self,
944        agent_id: &AgentId,
945        claim_refs: &[ClaimRef],
946    ) -> Result<Vec<LedgerEntry>, SqliteStoreError> {
947        if claim_refs.is_empty() {
948            return Ok(vec![]);
949        }
950
951        let slot = self.conn.lock().expect("mutex poisoned");
952        let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
953
954        let to_err = |msg: String| rusqlite::Error::InvalidColumnType(
955            0, msg, rusqlite::types::Type::Text,
956        );
957
958        let map_row = |row: &rusqlite::Row<'_>| {
959            let entry_id_str: String = row.get(0)?;
960            let agent_id_str: String = row.get(1)?;
961            let claim_id_str: String = row.get(2)?;
962            let event_kind_str: String = row.get(3)?;
963            let disposition_str: String = row.get(4)?;
964            let rationale_json: Option<String> = row.get(5)?;
965            let recorded_at_str: String = row.get(6)?;
966
967            let entry_id = uuid::Uuid::parse_str(&entry_id_str)
968                .map_err(|e| to_err(format!("entry_id UUID: {e}")))?;
969            let claim_id = uuid::Uuid::parse_str(&claim_id_str)
970                .map(ClaimRef)
971                .map_err(|e| to_err(format!("claim_id UUID: {e}")))?;
972            let event_kind = str_to_ledger_event_kind(&event_kind_str)
973                .map_err(|e| to_err(e.to_string()))?;
974            let disposition = str_to_disposition(&disposition_str)
975                .map_err(|e| to_err(e.to_string()))?;
976            let rationale: Option<serde_json::Value> = rationale_json
977                .map(|s| serde_json::from_str(&s).map_err(|e| to_err(format!("rationale JSON: {e}"))))
978                .transpose()?;
979            let recorded_at = chrono::DateTime::parse_from_rfc3339(&recorded_at_str)
980                .map(|dt| dt.with_timezone(&chrono::Utc))
981                .map_err(|e| to_err(format!("recorded_at parse: {e}")))?;
982
983            Ok(LedgerEntry {
984                entry_id,
985                agent_id: AgentId(agent_id_str),
986                claim_ref: claim_id,
987                event_kind,
988                disposition,
989                rationale,
990                recorded_at: TransactionTime(recorded_at),
991            })
992        };
993
994        let mut all_entries = Vec::new();
995        // SQLite's default SQLITE_LIMIT_VARIABLE_NUMBER is 999; use 900 to leave headroom
996        // for the agent_id parameter.
997        const CHUNK: usize = 900;
998
999        for chunk in claim_refs.chunks(CHUNK) {
1000            let placeholders: Vec<String> = (2..=chunk.len() + 1)
1001                .map(|i| format!("?{i}"))
1002                .collect();
1003            let sql = format!(
1004                "SELECT entry_id, agent_id, claim_id, event_kind, disposition, rationale, recorded_at
1005                 FROM ledger_entries
1006                 WHERE agent_id = ?1 AND claim_id IN ({})
1007                 ORDER BY recorded_at ASC",
1008                placeholders.join(", ")
1009            );
1010
1011            let mut stmt = conn.prepare(&sql)?;
1012            // Build params: agent_id first, then each claim_ref UUID string.
1013            let agent_str = agent_id.0.as_str();
1014            let id_strings: Vec<String> = chunk.iter().map(|r| r.0.to_string()).collect();
1015
1016            // rusqlite requires a Vec<&dyn ToSql> when params are heterogeneous.
1017            let mut params: Vec<&dyn rusqlite::types::ToSql> = Vec::with_capacity(1 + id_strings.len());
1018            params.push(&agent_str);
1019            for s in &id_strings {
1020                params.push(s);
1021            }
1022
1023            let rows = stmt.query_map(params.as_slice(), map_row)?;
1024            for row in rows {
1025                all_entries.push(row?);
1026            }
1027        }
1028
1029        Ok(all_entries)
1030    }
1031
1032    /// Load all edges where `claim_ref` is either the from or to end, for this agent.
1033    /// Ordered by `created_at ASC` (deterministic cascade — required by convention).
1034    ///
1035    /// Uses `idx_edges_from` and `idx_edges_to` indexes (§5).
1036    fn load_edges_for(
1037        &self,
1038        agent_id: &AgentId,
1039        claim_ref: &ClaimRef,
1040    ) -> Result<Vec<ClaimEdge>, SqliteStoreError> {
1041        let slot = self.conn.lock().expect("mutex poisoned");
1042        let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
1043
1044        let claim_id_str = claim_ref.0.to_string();
1045
1046        let mut stmt = conn.prepare(
1047            "SELECT edge_id, agent_id, from_claim_id, to_claim_id, edge_kind, created_at
1048             FROM claim_edges
1049             WHERE agent_id = ?1
1050               AND (from_claim_id = ?2 OR to_claim_id = ?2)
1051             ORDER BY created_at ASC",
1052        )?;
1053
1054        let rows = stmt.query_map(
1055            rusqlite::params![agent_id.0.as_str(), claim_id_str.as_str()],
1056            row_to_edge,
1057        )?;
1058
1059        let mut edges = Vec::new();
1060        for row in rows {
1061            edges.push(row?);
1062        }
1063        Ok(edges)
1064    }
1065
1066    /// Load the set of ClaimRefs served as injected claims for this agent (used by the Amplification Guard).
1067    ///
1068    /// Scans `ledger_entries` for `event_kind = 'ServedAsInjected'` and returns
1069    /// the distinct set of claim IDs, ordered by recorded_at ASC.
1070    fn load_injected_claims(
1071        &self,
1072        agent_id: &AgentId,
1073    ) -> Result<Vec<ClaimRef>, SqliteStoreError> {
1074        let slot = self.conn.lock().expect("mutex poisoned");
1075        let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
1076
1077        let to_err = |msg: String| rusqlite::Error::InvalidColumnType(
1078            0, msg, rusqlite::types::Type::Text,
1079        );
1080
1081        let mut stmt = conn.prepare(
1082            "SELECT claim_id
1083             FROM ledger_entries
1084             WHERE agent_id = ?1 AND event_kind = 'ServedAsInjected'
1085             GROUP BY claim_id
1086             ORDER BY MIN(recorded_at) ASC",
1087        )?;
1088
1089        let rows = stmt.query_map(
1090            rusqlite::params![agent_id.0.as_str()],
1091            |row| {
1092                let claim_id_str: String = row.get(0)?;
1093                uuid::Uuid::parse_str(&claim_id_str)
1094                    .map(ClaimRef)
1095                    .map_err(|e| to_err(format!("claim_id UUID: {e}")))
1096            },
1097        )?;
1098
1099        let mut refs = Vec::new();
1100        for row in rows {
1101            refs.push(row?);
1102        }
1103        Ok(refs)
1104    }
1105
1106    /// Recursive CTE lineage traversal.
1107    ///
1108    /// Traverses `DerivedFrom` edges upward (from `claim_ref` to its ancestors),
1109    /// returning all `ClaimEdge` rows in the lineage sub-graph, ordered by depth
1110    /// (shallowest first, then by `created_at ASC` within the same depth level).
1111    ///
1112    /// The CTE is bounded by `max_depth = 64` to prevent runaway on pathological graphs.
1113    fn load_lineage(
1114        &self,
1115        agent_id: &AgentId,
1116        claim_ref: &ClaimRef,
1117    ) -> Result<Vec<ClaimEdge>, SqliteStoreError> {
1118        let slot = self.conn.lock().expect("mutex poisoned");
1119        let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
1120
1121        let start_id = claim_ref.0.to_string();
1122
1123        // Recursive CTE: start from claim_ref and follow DerivedFrom edges upward.
1124        // Each step follows edges where the current node is the `from_claim_id`
1125        // (meaning: this claim was DerivedFrom to_claim_id, so ancestor is to_claim_id).
1126        let mut stmt = conn.prepare(
1127            "WITH RECURSIVE lineage(edge_id, depth) AS (
1128                -- Base case: all DerivedFrom edges leaving from our starting claim
1129                SELECT ce.edge_id, 1
1130                FROM claim_edges ce
1131                WHERE ce.agent_id = ?1
1132                  AND ce.from_claim_id = ?2
1133                  AND ce.edge_kind = 'DerivedFrom'
1134                UNION ALL
1135                -- Recursive case: follow the to_claim of the previous edge onward
1136                SELECT ce2.edge_id, l.depth + 1
1137                FROM claim_edges ce2
1138                JOIN lineage l ON ce2.from_claim_id = (
1139                    SELECT to_claim_id FROM claim_edges WHERE edge_id = l.edge_id
1140                )
1141                WHERE ce2.agent_id = ?1
1142                  AND ce2.edge_kind = 'DerivedFrom'
1143                  AND l.depth < 64
1144            )
1145            SELECT ce.edge_id, ce.agent_id, ce.from_claim_id, ce.to_claim_id,
1146                   ce.edge_kind, ce.created_at,
1147                   l.depth
1148            FROM claim_edges ce
1149            JOIN lineage l ON ce.edge_id = l.edge_id
1150            ORDER BY l.depth ASC, ce.created_at ASC",
1151        )?;
1152
1153        let to_err = |msg: String| rusqlite::Error::InvalidColumnType(
1154            0, msg, rusqlite::types::Type::Text,
1155        );
1156
1157        let rows = stmt.query_map(
1158            rusqlite::params![agent_id.0.as_str(), start_id.as_str()],
1159            |row| {
1160                let edge_id_str: String = row.get(0)?;
1161                let agent_id_str: String = row.get(1)?;
1162                let from_claim_str: String = row.get(2)?;
1163                let to_claim_str: String = row.get(3)?;
1164                let kind_str: String = row.get(4)?;
1165                let created_at_str: String = row.get(5)?;
1166                // col 6 = depth (used only for ordering; not part of ClaimEdge)
1167
1168                let edge_id = uuid::Uuid::parse_str(&edge_id_str)
1169                    .map_err(|e| to_err(format!("edge_id UUID: {e}")))?;
1170                let from_claim = uuid::Uuid::parse_str(&from_claim_str)
1171                    .map(ClaimRef)
1172                    .map_err(|e| to_err(format!("from_claim UUID: {e}")))?;
1173                let to_claim = uuid::Uuid::parse_str(&to_claim_str)
1174                    .map(ClaimRef)
1175                    .map_err(|e| to_err(format!("to_claim UUID: {e}")))?;
1176                let kind = str_to_edge_kind(&kind_str)
1177                    .map_err(|e| to_err(e.to_string()))?;
1178                let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
1179                    .map(|dt| dt.with_timezone(&chrono::Utc))
1180                    .map_err(|e| to_err(format!("created_at parse: {e}")))?;
1181
1182                Ok(ClaimEdge {
1183                    edge_id,
1184                    agent_id: AgentId(agent_id_str),
1185                    from_claim,
1186                    to_claim,
1187                    kind,
1188                    created_at: TransactionTime(created_at),
1189                })
1190            },
1191        )?;
1192
1193        let mut edges = Vec::new();
1194        for row in rows {
1195            edges.push(row?);
1196        }
1197        Ok(edges)
1198    }
1199}
1200
1201// ── SqlitePendingStore ────────────────────────────────────────────────────────
1202
1203/// SQLite-backed `PendingAdjudicationPort` implementation.
1204///
1205/// Shares the same connection mutex as `SqlitePersistenceStore` but operates OUTSIDE
1206/// the claim transaction — reads and writes go directly on the connection (no BEGIN/COMMIT
1207/// wrapping). The per-agent write lock held by `EngineHandle` ensures these writes are
1208/// serialized with the claim txn commit.
1209///
1210/// Construct via `SqlitePendingStore::new(conn_arc)` sharing the same connection Arc
1211/// as the `SqlitePersistenceStore`.
1212pub struct SqlitePendingStore {
1213    conn: Arc<Mutex<Option<Box<Connection>>>>,
1214}
1215
1216impl SqlitePendingStore {
1217    /// Create a pending store sharing the connection with a `SqlitePersistenceStore`.
1218    pub fn new(conn: Arc<Mutex<Option<Box<Connection>>>>) -> Self {
1219        Self { conn }
1220    }
1221}
1222
1223// SAFETY: Connection is Send; Mutex makes it Sync.
1224unsafe impl Send for SqlitePendingStore {}
1225unsafe impl Sync for SqlitePendingStore {}
1226
1227impl PendingAdjudicationPort for SqlitePendingStore {
1228    type Error = SqliteStoreError;
1229
1230    fn insert_pending(&self, row: &PendingAdjudicationRow) -> Result<(), SqliteStoreError> {
1231        let slot = self.conn.lock().expect("mutex poisoned");
1232        let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
1233
1234        let request_payload = serde_json::to_string(&row.request_payload)
1235            .map_err(|e| SqliteStoreError::Mapping(format!("request_payload serialization: {e}")))?;
1236        let queued_at = row.queued_at.to_rfc3339();
1237        let expires_at: Option<String> = row.expires_at.map(|dt| dt.to_rfc3339());
1238
1239        conn.execute(
1240            "INSERT INTO pending_adjudications (
1241                handle_id, agent_id, subject, predicate,
1242                challenger_claim_ref, incumbent_claim_ref,
1243                request_payload, queued_at, expires_at, status
1244            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
1245            rusqlite::params![
1246                row.handle_id.to_string(),
1247                row.agent_id.0.as_str(),
1248                row.subject.as_str(),
1249                row.predicate.as_str(),
1250                row.challenger_claim_ref.0.to_string(),
1251                row.incumbent_claim_ref.0.to_string(),
1252                request_payload.as_str(),
1253                queued_at.as_str(),
1254                expires_at,
1255                row.status.as_str(),
1256            ],
1257        )?;
1258        Ok(())
1259    }
1260
1261    fn get_pending(&self, handle_id: uuid::Uuid) -> Result<Option<PendingAdjudicationRow>, SqliteStoreError> {
1262        let slot = self.conn.lock().expect("mutex poisoned");
1263        let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
1264
1265        let mut stmt = conn.prepare(
1266            "SELECT handle_id, agent_id, subject, predicate,
1267                    challenger_claim_ref, incumbent_claim_ref,
1268                    request_payload, queued_at, expires_at, status
1269             FROM pending_adjudications
1270             WHERE handle_id = ?1",
1271        )?;
1272
1273        let mut rows = stmt.query_map(
1274            rusqlite::params![handle_id.to_string()],
1275            row_to_pending,
1276        )?;
1277
1278        match rows.next() {
1279            None => Ok(None),
1280            Some(row) => Ok(Some(row.map_err(|e| SqliteStoreError::Mapping(e.to_string()))?)),
1281        }
1282    }
1283
1284    fn list_pending(&self, agent_id: Option<&mempill_types::AgentId>) -> Result<Vec<PendingAdjudicationRow>, SqliteStoreError> {
1285        let slot = self.conn.lock().expect("mutex poisoned");
1286        let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
1287
1288        let rows = if let Some(aid) = agent_id {
1289            let mut stmt = conn.prepare(
1290                "SELECT handle_id, agent_id, subject, predicate,
1291                        challenger_claim_ref, incumbent_claim_ref,
1292                        request_payload, queued_at, expires_at, status
1293                 FROM pending_adjudications
1294                 WHERE agent_id = ?1 AND status = 'pending'
1295                 ORDER BY queued_at ASC",
1296            )?;
1297            let mapped = stmt.query_map(rusqlite::params![aid.0.as_str()], row_to_pending)?;
1298            let mut result = Vec::new();
1299            for r in mapped {
1300                result.push(r.map_err(|e| SqliteStoreError::Mapping(e.to_string()))?);
1301            }
1302            result
1303        } else {
1304            let mut stmt = conn.prepare(
1305                "SELECT handle_id, agent_id, subject, predicate,
1306                        challenger_claim_ref, incumbent_claim_ref,
1307                        request_payload, queued_at, expires_at, status
1308                 FROM pending_adjudications
1309                 WHERE status = 'pending'
1310                 ORDER BY queued_at ASC",
1311            )?;
1312            let mapped = stmt.query_map([], row_to_pending)?;
1313            let mut result = Vec::new();
1314            for r in mapped {
1315                result.push(r.map_err(|e| SqliteStoreError::Mapping(e.to_string()))?);
1316            }
1317            result
1318        };
1319        Ok(rows)
1320    }
1321
1322    fn list_expired(&self, now: chrono::DateTime<chrono::Utc>) -> Result<Vec<PendingAdjudicationRow>, SqliteStoreError> {
1323        let slot = self.conn.lock().expect("mutex poisoned");
1324        let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
1325
1326        let now_str = now.to_rfc3339();
1327        let mut stmt = conn.prepare(
1328            "SELECT handle_id, agent_id, subject, predicate,
1329                    challenger_claim_ref, incumbent_claim_ref,
1330                    request_payload, queued_at, expires_at, status
1331             FROM pending_adjudications
1332             WHERE expires_at IS NOT NULL AND expires_at <= ?1 AND status = 'pending'
1333             ORDER BY expires_at ASC",
1334        )?;
1335        let mapped = stmt.query_map(rusqlite::params![now_str.as_str()], row_to_pending)?;
1336        let mut result = Vec::new();
1337        for r in mapped {
1338            result.push(r.map_err(|e| SqliteStoreError::Mapping(e.to_string()))?);
1339        }
1340        Ok(result)
1341    }
1342
1343    fn mark_resolved(&self, handle_id: uuid::Uuid) -> Result<(), SqliteStoreError> {
1344        let slot = self.conn.lock().expect("mutex poisoned");
1345        let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
1346
1347        conn.execute(
1348            "UPDATE pending_adjudications SET status = 'resolved' WHERE handle_id = ?1",
1349            rusqlite::params![handle_id.to_string()],
1350        )?;
1351        Ok(())
1352    }
1353
1354    fn mark_expired(&self, handle_id: uuid::Uuid) -> Result<(), SqliteStoreError> {
1355        let slot = self.conn.lock().expect("mutex poisoned");
1356        let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
1357
1358        conn.execute(
1359            "UPDATE pending_adjudications SET status = 'expired' WHERE handle_id = ?1",
1360            rusqlite::params![handle_id.to_string()],
1361        )?;
1362        Ok(())
1363    }
1364
1365    /// Detect QueuedForAdjudication claims (by latest ledger disposition) with no matching
1366    /// pending row (status = 'pending').
1367    ///
1368    /// Approach: find claim_ids whose most-recent ledger entry has disposition =
1369    /// 'QueuedForAdjudication' via a subquery on max(recorded_at), then check for absence
1370    /// of a matching pending_adjudications row. Returns orphaned claim refs with
1371    /// agent_id, subject, predicate, and best-guess incumbent.
1372    ///
1373    /// NOTE: The schema uses `claim_id` (not `claim_ref`) in `ledger_entries` and `claims`.
1374    fn list_queued_orphan_claims(
1375        &self,
1376    ) -> Result<Vec<mempill_core::ports::pending_adjudication::OrphanedQueuedClaim>, SqliteStoreError> {
1377        let slot = self.conn.lock().expect("mutex poisoned");
1378        let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
1379
1380        // Step 1: Find all (agent_id, claim_id) pairs whose latest ledger disposition is
1381        // 'QueuedForAdjudication' with no matching pending_adjudications row (status='pending').
1382        let mut stmt = conn.prepare(
1383            "SELECT l.agent_id, l.claim_id, c.subject, c.predicate
1384             FROM ledger_entries l
1385             JOIN claims c ON c.claim_id = l.claim_id AND c.agent_id = l.agent_id
1386             WHERE l.disposition = 'QueuedForAdjudication'
1387               AND l.recorded_at = (
1388                   SELECT MAX(l2.recorded_at) FROM ledger_entries l2
1389                   WHERE l2.claim_id = l.claim_id AND l2.agent_id = l.agent_id
1390               )
1391               AND NOT EXISTS (
1392                   SELECT 1 FROM pending_adjudications pa
1393                   WHERE pa.challenger_claim_ref = l.claim_id
1394                     AND pa.agent_id = l.agent_id
1395                     AND pa.status = 'pending'
1396               )",
1397        )?;
1398
1399        let orphan_rows: Vec<(String, String, String, String)> = stmt
1400            .query_map([], |row| {
1401                Ok((
1402                    row.get::<_, String>(0)?,
1403                    row.get::<_, String>(1)?,
1404                    row.get::<_, String>(2)?,
1405                    row.get::<_, String>(3)?,
1406                ))
1407            })?
1408            .filter_map(|r| r.ok())
1409            .collect();
1410
1411        let mut results = Vec::new();
1412        for (agent_id_str, challenger_str, subject, predicate) in orphan_rows {
1413            use mempill_types::ClaimRef;
1414
1415            let challenger_ref = uuid::Uuid::parse_str(&challenger_str)
1416                .map(ClaimRef)
1417                .map_err(|e| SqliteStoreError::Mapping(format!("challenger_claim_ref UUID: {e}")))?;
1418
1419            // Step 2: Find the incumbent (CommittedCheap) on the same subject line.
1420            let incumbent_ref = find_committed_cheap_claim(conn, &agent_id_str, &subject, &predicate)?;
1421
1422            results.push(mempill_core::ports::pending_adjudication::OrphanedQueuedClaim {
1423                agent_id: mempill_types::AgentId(agent_id_str),
1424                challenger_claim_ref: challenger_ref,
1425                incumbent_claim_ref: incumbent_ref,
1426                subject,
1427                predicate,
1428            });
1429        }
1430
1431        Ok(results)
1432    }
1433}
1434
1435/// Map a rusqlite `Row` from `pending_adjudications` to a `PendingAdjudicationRow`.
1436///
1437/// Column order (must match every SELECT):
1438///   0  handle_id
1439///   1  agent_id
1440///   2  subject
1441///   3  predicate
1442///   4  challenger_claim_ref
1443///   5  incumbent_claim_ref
1444///   6  request_payload  (JSON text)
1445///   7  queued_at        (ISO-8601)
1446///   8  expires_at       (ISO-8601, nullable)
1447///   9  status
1448fn row_to_pending(row: &rusqlite::Row<'_>) -> Result<PendingAdjudicationRow, rusqlite::Error> {
1449    let to_err = |msg: String| rusqlite::Error::InvalidColumnType(
1450        0, msg, rusqlite::types::Type::Text,
1451    );
1452
1453    let handle_id_str: String = row.get(0)?;
1454    let agent_id_str: String = row.get(1)?;
1455    let subject: String = row.get(2)?;
1456    let predicate: String = row.get(3)?;
1457    let challenger_str: String = row.get(4)?;
1458    let incumbent_str: String = row.get(5)?;
1459    let payload_json: String = row.get(6)?;
1460    let queued_at_str: String = row.get(7)?;
1461    let expires_at_str: Option<String> = row.get(8)?;
1462    let status: String = row.get(9)?;
1463
1464    let handle_id = uuid::Uuid::parse_str(&handle_id_str)
1465        .map_err(|e| to_err(format!("handle_id UUID: {e}")))?;
1466    let challenger_claim_ref = uuid::Uuid::parse_str(&challenger_str)
1467        .map(ClaimRef)
1468        .map_err(|e| to_err(format!("challenger_claim_ref UUID: {e}")))?;
1469    let incumbent_claim_ref = uuid::Uuid::parse_str(&incumbent_str)
1470        .map(ClaimRef)
1471        .map_err(|e| to_err(format!("incumbent_claim_ref UUID: {e}")))?;
1472    let request_payload: mempill_types::AdjudicationRequest =
1473        serde_json::from_str(&payload_json)
1474            .map_err(|e| to_err(format!("request_payload JSON: {e}")))?;
1475    let queued_at = chrono::DateTime::parse_from_rfc3339(&queued_at_str)
1476        .map(|dt| dt.with_timezone(&chrono::Utc))
1477        .map_err(|e| to_err(format!("queued_at parse: {e}")))?;
1478    let expires_at = expires_at_str
1479        .map(|s| {
1480            chrono::DateTime::parse_from_rfc3339(&s)
1481                .map(|dt| dt.with_timezone(&chrono::Utc))
1482                .map_err(|e| to_err(format!("expires_at parse: {e}")))
1483        })
1484        .transpose()?;
1485
1486    Ok(PendingAdjudicationRow {
1487        handle_id,
1488        agent_id: AgentId(agent_id_str),
1489        subject,
1490        predicate,
1491        challenger_claim_ref,
1492        incumbent_claim_ref,
1493        request_payload,
1494        queued_at,
1495        expires_at,
1496        status,
1497    })
1498}
1499
1500/// Find the most recent CommittedCheap claim on the same (agent_id, subject, predicate)
1501/// subject line, used to identify the incumbent during orphan recovery.
1502///
1503/// Returns `None` if no CommittedCheap claim exists (sweep will skip reverting such orphans
1504/// — they cannot be surfaced as Contested without a known incumbent).
1505///
1506/// NOTE: The schema uses `claim_id` (not `claim_ref`) in both `claims` and `ledger_entries`.
1507fn find_committed_cheap_claim(
1508    conn: &Connection,
1509    agent_id: &str,
1510    subject: &str,
1511    predicate: &str,
1512) -> Result<Option<mempill_types::ClaimRef>, SqliteStoreError> {
1513    // Find the claim_id from the same subject line whose latest ledger entry is CommittedCheap.
1514    let mut stmt = conn.prepare(
1515        "SELECT l.claim_id
1516         FROM ledger_entries l
1517         JOIN claims c ON c.claim_id = l.claim_id AND c.agent_id = l.agent_id
1518         WHERE l.agent_id = ?1
1519           AND c.subject = ?2
1520           AND c.predicate = ?3
1521           AND l.disposition = 'CommittedCheap'
1522           AND l.recorded_at = (
1523               SELECT MAX(l2.recorded_at) FROM ledger_entries l2
1524               WHERE l2.claim_id = l.claim_id AND l2.agent_id = l.agent_id
1525           )
1526         ORDER BY l.recorded_at DESC
1527         LIMIT 1",
1528    )?;
1529
1530    let mut rows = stmt.query_map(rusqlite::params![agent_id, subject, predicate], |row| {
1531        row.get::<_, String>(0)
1532    })?;
1533
1534    if let Some(Ok(ref_str)) = rows.next() {
1535        let claim_ref = uuid::Uuid::parse_str(&ref_str)
1536            .map(mempill_types::ClaimRef)
1537            .map_err(|e| SqliteStoreError::Mapping(format!("incumbent_claim_ref UUID: {e}")))?;
1538        Ok(Some(claim_ref))
1539    } else {
1540        Ok(None)
1541    }
1542}
1543
1544// ── Tests ─────────────────────────────────────────────────────────────────────
1545
1546#[cfg(test)]
1547mod tests {
1548    use super::*;
1549    use crate::connection::open_in_memory;
1550    use chrono::Utc;
1551    use mempill_types::{
1552        claim::{Cardinality, Claim, Confidence, Criticality, Fact},
1553        disposition::Disposition,
1554        identity::AgentId,
1555        ledger::LedgerEventKind,
1556        provenance::{ExternalAnchor, ExternalKind, ProvenanceLabel},
1557        time::{TransactionTime, ValidTime},
1558        validity::AssertionKind,
1559    };
1560    use uuid::Uuid;
1561
1562    fn make_store() -> SqlitePersistenceStore {
1563        let conn = open_in_memory().expect("in-memory connection should open");
1564        SqlitePersistenceStore::new(conn)
1565    }
1566
1567    fn make_agent() -> AgentId {
1568        AgentId("test-agent-1".into())
1569    }
1570
1571    fn make_claim(agent_id: &AgentId) -> Claim {
1572        Claim::new(
1573            ClaimRef(Uuid::new_v4()),
1574            agent_id.clone(),
1575            Fact {
1576                subject: "user".into(),
1577                predicate: "favourite_colour".into(),
1578                value: serde_json::json!("blue"),
1579            },
1580            Cardinality::Functional,
1581            ProvenanceLabel::External(ExternalKind::UserAsserted),
1582            ExternalAnchor { nearest_external_anchor: None, derivation_depth: 0 },
1583            TransactionTime(Utc::now()),
1584            ValidTime { start: None, end: None, valid_time_confidence: 0.0 },
1585            Confidence { value_confidence: 0.9, valid_time_confidence: 0.0 },
1586            Criticality::Low,
1587            vec![],
1588            None,
1589            None,
1590        )
1591    }
1592
1593    fn make_ledger_entry(
1594        agent_id: &AgentId,
1595        claim_ref: &ClaimRef,
1596    ) -> LedgerEntry {
1597        LedgerEntry {
1598            entry_id: Uuid::new_v4(),
1599            agent_id: agent_id.clone(),
1600            claim_ref: claim_ref.clone(),
1601            event_kind: LedgerEventKind::ClaimCommitted,
1602            disposition: Disposition::CommittedCheap,
1603            rationale: None,
1604            recorded_at: TransactionTime(Utc::now()),
1605        }
1606    }
1607
1608    // ── WRITE ROUND-TRIP ──────────────────────────────────────────────────────
1609
1610    /// Append a claim within a Txn, commit, then verify the row exists via raw SELECT.
1611    /// (We use raw SQL here for direct verification without the typed read path.)
1612    #[test]
1613    fn write_round_trip_claim_persists_after_commit() {
1614        let store = make_store();
1615        let agent = make_agent();
1616        let claim = make_claim(&agent);
1617        let claim_id = claim.claim_ref().0.to_string();
1618
1619        let mut txn = store.begin_atomic(&agent).expect("begin_atomic should succeed");
1620        store.append_claim(&mut txn, &claim).expect("append_claim should succeed");
1621        store.commit(txn).expect("commit should succeed");
1622
1623        // Re-acquire the connection to verify via raw SQL.
1624        let slot = store.conn.lock().unwrap();
1625        let conn = slot.as_ref().expect("connection must be back after commit");
1626        let count: i64 = conn
1627            .query_row(
1628                "SELECT COUNT(*) FROM claims WHERE claim_id = ?1",
1629                [claim_id.as_str()],
1630                |r| r.get(0),
1631            )
1632            .expect("SELECT should succeed");
1633        assert_eq!(count, 1, "claim row must exist after commit");
1634    }
1635
1636    /// Append a claim and verify all provenance columns are stored correctly.
1637    #[test]
1638    fn write_round_trip_provenance_not_null() {
1639        let store = make_store();
1640        let agent = make_agent();
1641        let claim = make_claim(&agent);
1642        let claim_id = claim.claim_ref().0.to_string();
1643
1644        let mut txn = store.begin_atomic(&agent).expect("begin_atomic should succeed");
1645        store.append_claim(&mut txn, &claim).expect("append_claim should succeed");
1646        store.commit(txn).expect("commit should succeed");
1647
1648        let slot = store.conn.lock().unwrap();
1649        let conn = slot.as_ref().unwrap();
1650
1651        // provenance_label must be non-NULL (I2 — NOT NULL constraint in schema).
1652        let prov: String = conn
1653            .query_row(
1654                "SELECT provenance_label FROM claims WHERE claim_id = ?1",
1655                [claim_id.as_str()],
1656                |r| r.get(0),
1657            )
1658            .expect("provenance_label must be selectable");
1659        assert_eq!(
1660            prov, "External_UserAsserted",
1661            "provenance_label column must be non-NULL and correct"
1662        );
1663
1664        // tx_time must be non-NULL (I2).
1665        let tx_time: String = conn
1666            .query_row(
1667                "SELECT tx_time FROM claims WHERE claim_id = ?1",
1668                [claim_id.as_str()],
1669                |r| r.get(0),
1670            )
1671            .expect("tx_time must be selectable");
1672        assert!(!tx_time.is_empty(), "tx_time must be non-NULL");
1673    }
1674
1675    // ── ATOMICITY ─────────────────────────────────────────────────────────────
1676
1677    /// Begin a Txn, append {claim + validity assertion + ledger entry}, force rollback.
1678    /// All three rows must be absent after rollback — all-or-nothing atomicity.
1679    #[test]
1680    fn atomicity_rollback_leaves_zero_rows() {
1681        let store = make_store();
1682        let agent = make_agent();
1683        let claim = make_claim(&agent);
1684        let claim_ref = claim.claim_ref().clone();
1685        let claim_id = claim_ref.0.to_string();
1686
1687        let assertion = mempill_types::validity::ValidityAssertion {
1688            assertion_ref: Uuid::new_v4(),
1689            agent_id: agent.clone(),
1690            target_claim: claim_ref.clone(),
1691            kind: AssertionKind::Bound { bound_at: Utc::now() },
1692            provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
1693            confidence: mempill_types::claim::Confidence {
1694                value_confidence: 0.9,
1695                valid_time_confidence: 0.9,
1696            },
1697            asserted_at: TransactionTime(Utc::now()),
1698        };
1699        let assertion_id = assertion.assertion_ref.to_string();
1700
1701        let ledger_entry = make_ledger_entry(&agent, &claim_ref);
1702        let entry_id = ledger_entry.entry_id.to_string();
1703
1704        let mut txn = store.begin_atomic(&agent).expect("begin_atomic should succeed");
1705        store.append_claim(&mut txn, &claim).expect("append_claim in txn should succeed");
1706        store
1707            .append_validity_assertion(&mut txn, &assertion)
1708            .expect("append_validity_assertion in txn should succeed");
1709        store
1710            .append_ledger_entry(&mut txn, &ledger_entry)
1711            .expect("append_ledger_entry in txn should succeed");
1712
1713        // Force rollback — must leave zero rows.
1714        store.rollback(txn).expect("rollback should succeed");
1715
1716        let slot = store.conn.lock().unwrap();
1717        let conn = slot.as_ref().expect("connection must be back after rollback");
1718
1719        let claim_count: i64 = conn
1720            .query_row(
1721                "SELECT COUNT(*) FROM claims WHERE claim_id = ?1",
1722                [claim_id.as_str()],
1723                |r| r.get(0),
1724            )
1725            .unwrap();
1726        let assertion_count: i64 = conn
1727            .query_row(
1728                "SELECT COUNT(*) FROM validity_assertions WHERE assertion_id = ?1",
1729                [assertion_id.as_str()],
1730                |r| r.get(0),
1731            )
1732            .unwrap();
1733        let ledger_count: i64 = conn
1734            .query_row(
1735                "SELECT COUNT(*) FROM ledger_entries WHERE entry_id = ?1",
1736                [entry_id.as_str()],
1737                |r| r.get(0),
1738            )
1739            .unwrap();
1740
1741        assert_eq!(claim_count, 0, "claim row must not exist after rollback");
1742        assert_eq!(assertion_count, 0, "validity_assertion row must not exist after rollback");
1743        assert_eq!(ledger_count, 0, "ledger_entry row must not exist after rollback");
1744    }
1745
1746    // ── VALIDITY ASSERTION ROUND-TRIP ─────────────────────────────────────────
1747
1748    #[test]
1749    fn write_round_trip_validity_assertion() {
1750        let store = make_store();
1751        let agent = make_agent();
1752        let claim = make_claim(&agent);
1753        let claim_ref = claim.claim_ref().clone();
1754
1755        let assertion = mempill_types::validity::ValidityAssertion {
1756            assertion_ref: Uuid::new_v4(),
1757            agent_id: agent.clone(),
1758            target_claim: claim_ref.clone(),
1759            kind: AssertionKind::Bound { bound_at: Utc::now() },
1760            provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
1761            confidence: mempill_types::claim::Confidence {
1762                value_confidence: 0.95,
1763                valid_time_confidence: 0.8,
1764            },
1765            asserted_at: TransactionTime(Utc::now()),
1766        };
1767        let assertion_id = assertion.assertion_ref.to_string();
1768
1769        let mut txn = store.begin_atomic(&agent).unwrap();
1770        store.append_claim(&mut txn, &claim).unwrap();
1771        store.append_validity_assertion(&mut txn, &assertion).unwrap();
1772        store.commit(txn).unwrap();
1773
1774        let slot = store.conn.lock().unwrap();
1775        let conn = slot.as_ref().unwrap();
1776        let count: i64 = conn
1777            .query_row(
1778                "SELECT COUNT(*) FROM validity_assertions WHERE assertion_id = ?1",
1779                [assertion_id.as_str()],
1780                |r| r.get(0),
1781            )
1782            .unwrap();
1783        assert_eq!(count, 1, "validity_assertion row must exist after commit");
1784    }
1785
1786    // ── LEDGER ENTRY ROUND-TRIP ───────────────────────────────────────────────
1787
1788    #[test]
1789    fn write_round_trip_ledger_entry() {
1790        let store = make_store();
1791        let agent = make_agent();
1792        let claim = make_claim(&agent);
1793        let claim_ref = claim.claim_ref().clone();
1794        let entry = make_ledger_entry(&agent, &claim_ref);
1795        let entry_id = entry.entry_id.to_string();
1796
1797        let mut txn = store.begin_atomic(&agent).unwrap();
1798        store.append_claim(&mut txn, &claim).unwrap();
1799        store.append_ledger_entry(&mut txn, &entry).unwrap();
1800        store.commit(txn).unwrap();
1801
1802        let slot = store.conn.lock().unwrap();
1803        let conn = slot.as_ref().unwrap();
1804        let count: i64 = conn
1805            .query_row(
1806                "SELECT COUNT(*) FROM ledger_entries WHERE entry_id = ?1",
1807                [entry_id.as_str()],
1808                |r| r.get(0),
1809            )
1810            .unwrap();
1811        assert_eq!(count, 1, "ledger_entry row must exist after commit");
1812    }
1813
1814    // ── CLAIM EDGE ROUND-TRIP ─────────────────────────────────────────────────
1815
1816    #[test]
1817    fn write_round_trip_claim_edge() {
1818        let store = make_store();
1819        let agent = make_agent();
1820        let from_claim = make_claim(&agent);
1821        let to_claim = make_claim(&agent);
1822        let from_ref = from_claim.claim_ref().clone();
1823        let to_ref = to_claim.claim_ref().clone();
1824
1825        let edge = ClaimEdge {
1826            edge_id: Uuid::new_v4(),
1827            agent_id: agent.clone(),
1828            from_claim: from_ref.clone(),
1829            to_claim: to_ref.clone(),
1830            kind: EdgeKind::DerivedFrom,
1831            created_at: TransactionTime(Utc::now()),
1832        };
1833        let edge_id = edge.edge_id.to_string();
1834
1835        let mut txn = store.begin_atomic(&agent).unwrap();
1836        store.append_claim(&mut txn, &from_claim).unwrap();
1837        store.append_claim(&mut txn, &to_claim).unwrap();
1838        store.append_claim_edge(&mut txn, &edge).unwrap();
1839        store.commit(txn).unwrap();
1840
1841        let slot = store.conn.lock().unwrap();
1842        let conn = slot.as_ref().unwrap();
1843        let count: i64 = conn
1844            .query_row(
1845                "SELECT COUNT(*) FROM claim_edges WHERE edge_id = ?1",
1846                [edge_id.as_str()],
1847                |r| r.get(0),
1848            )
1849            .unwrap();
1850        assert_eq!(count, 1, "claim_edge row must exist after commit");
1851    }
1852
1853    // ── READ PATH TESTS ───────────────────────────────────────────────────────
1854
1855    /// Write a claim then load_claim returns it with all fields intact (round-trip).
1856    #[test]
1857    fn read_load_claim_round_trip() {
1858        let store = make_store();
1859        let agent = make_agent();
1860        let claim = make_claim(&agent);
1861        let claim_ref = claim.claim_ref().clone();
1862
1863        let mut txn = store.begin_atomic(&agent).unwrap();
1864        store.append_claim(&mut txn, &claim).unwrap();
1865        store.commit(txn).unwrap();
1866
1867        let loaded = store.load_claim(&agent, &claim_ref).unwrap();
1868        assert!(loaded.is_some(), "load_claim must return Some for existing claim");
1869        let loaded = loaded.unwrap();
1870        assert_eq!(loaded.claim_ref(), &claim_ref);
1871        assert_eq!(loaded.agent_id(), &agent);
1872        assert_eq!(loaded.fact().subject, "user");
1873        assert_eq!(loaded.fact().predicate, "favourite_colour");
1874        assert_eq!(loaded.fact().value, serde_json::json!("blue"));
1875        assert_eq!(loaded.provenance(), claim.provenance());
1876        assert_eq!(loaded.cardinality(), claim.cardinality());
1877        assert_eq!(loaded.criticality(), claim.criticality());
1878    }
1879
1880    /// load_claim returns None for a non-existent ClaimRef.
1881    #[test]
1882    fn read_load_claim_missing_returns_none() {
1883        let store = make_store();
1884        let agent = make_agent();
1885        let missing_ref = ClaimRef(Uuid::new_v4());
1886        let result = store.load_claim(&agent, &missing_ref).unwrap();
1887        assert!(result.is_none(), "load_claim must return None for unknown claim_ref");
1888    }
1889
1890    /// Write a claim then load_subject_line returns it.
1891    #[test]
1892    fn read_load_subject_line_round_trip() {
1893        let store = make_store();
1894        let agent = make_agent();
1895        let claim = make_claim(&agent);
1896        let claim_ref = claim.claim_ref().clone();
1897
1898        let mut txn = store.begin_atomic(&agent).unwrap();
1899        store.append_claim(&mut txn, &claim).unwrap();
1900        store.commit(txn).unwrap();
1901
1902        let claims = store.load_subject_line(&agent, "user", "favourite_colour").unwrap();
1903        assert_eq!(claims.len(), 1, "load_subject_line must return the single written claim");
1904        assert_eq!(claims[0].claim_ref(), &claim_ref);
1905    }
1906
1907    /// load_subject_line returns empty vec when nothing matches.
1908    #[test]
1909    fn read_load_subject_line_empty_when_no_match() {
1910        let store = make_store();
1911        let agent = make_agent();
1912        let claims = store.load_subject_line(&agent, "nonexistent", "pred").unwrap();
1913        assert!(claims.is_empty(), "load_subject_line must return empty vec for unknown subject-line");
1914    }
1915
1916    /// Write a validity assertion then load_validity_assertions_for returns it.
1917    #[test]
1918    fn read_load_validity_assertions_round_trip() {
1919        let store = make_store();
1920        let agent = make_agent();
1921        let claim = make_claim(&agent);
1922        let claim_ref = claim.claim_ref().clone();
1923
1924        let assertion = mempill_types::validity::ValidityAssertion {
1925            assertion_ref: Uuid::new_v4(),
1926            agent_id: agent.clone(),
1927            target_claim: claim_ref.clone(),
1928            kind: AssertionKind::Bound { bound_at: Utc::now() },
1929            provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
1930            confidence: mempill_types::claim::Confidence {
1931                value_confidence: 0.9,
1932                valid_time_confidence: 0.8,
1933            },
1934            asserted_at: TransactionTime(Utc::now()),
1935        };
1936
1937        let mut txn = store.begin_atomic(&agent).unwrap();
1938        store.append_claim(&mut txn, &claim).unwrap();
1939        store.append_validity_assertion(&mut txn, &assertion).unwrap();
1940        store.commit(txn).unwrap();
1941
1942        let loaded = store.load_validity_assertions_for(&agent, &claim_ref).unwrap();
1943        assert_eq!(loaded.len(), 1, "must return one validity assertion");
1944        assert_eq!(loaded[0].assertion_ref, assertion.assertion_ref);
1945        assert_eq!(loaded[0].target_claim, claim_ref);
1946        assert!(matches!(loaded[0].kind, AssertionKind::Bound { .. }));
1947    }
1948
1949    /// load_validity_assertions_for returns empty when no assertions exist.
1950    #[test]
1951    fn read_load_validity_assertions_empty_when_none() {
1952        let store = make_store();
1953        let agent = make_agent();
1954        let claim = make_claim(&agent);
1955        let claim_ref = claim.claim_ref().clone();
1956
1957        let mut txn = store.begin_atomic(&agent).unwrap();
1958        store.append_claim(&mut txn, &claim).unwrap();
1959        store.commit(txn).unwrap();
1960
1961        let loaded = store.load_validity_assertions_for(&agent, &claim_ref).unwrap();
1962        assert!(loaded.is_empty(), "must return empty vec when no assertions");
1963    }
1964
1965    /// Write a ledger entry and load_ledger returns it.
1966    #[test]
1967    fn read_load_ledger_round_trip() {
1968        let store = make_store();
1969        let agent = make_agent();
1970        let claim = make_claim(&agent);
1971        let claim_ref = claim.claim_ref().clone();
1972        let entry = make_ledger_entry(&agent, &claim_ref);
1973
1974        let mut txn = store.begin_atomic(&agent).unwrap();
1975        store.append_claim(&mut txn, &claim).unwrap();
1976        store.append_ledger_entry(&mut txn, &entry).unwrap();
1977        store.commit(txn).unwrap();
1978
1979        let loaded = store.load_ledger(&agent, None, 100).unwrap();
1980        assert_eq!(loaded.len(), 1, "must return one ledger entry");
1981        assert_eq!(loaded[0].entry_id, entry.entry_id);
1982        assert_eq!(loaded[0].claim_ref, claim_ref);
1983        assert_eq!(loaded[0].event_kind, LedgerEventKind::ClaimCommitted);
1984    }
1985
1986    /// load_ledger respects the `from` bound — entries before `from` are excluded.
1987    #[test]
1988    fn read_load_ledger_from_bound_filters_earlier_entries() {
1989        let store = make_store();
1990        let agent = make_agent();
1991
1992        // Two claims: early and late
1993        let claim_early = make_claim(&agent);
1994        let claim_late = make_claim(&agent);
1995        let ref_early = claim_early.claim_ref().clone();
1996        let ref_late = claim_late.claim_ref().clone();
1997
1998        let t_early = TransactionTime(Utc::now() - chrono::Duration::seconds(10));
1999        let t_late = TransactionTime(Utc::now());
2000
2001        let entry_early = mempill_types::ledger::LedgerEntry {
2002            entry_id: Uuid::new_v4(),
2003            agent_id: agent.clone(),
2004            claim_ref: ref_early.clone(),
2005            event_kind: LedgerEventKind::ClaimCommitted,
2006            disposition: mempill_types::disposition::Disposition::CommittedCheap,
2007            rationale: None,
2008            recorded_at: t_early.clone(),
2009        };
2010        let entry_late = mempill_types::ledger::LedgerEntry {
2011            entry_id: Uuid::new_v4(),
2012            agent_id: agent.clone(),
2013            claim_ref: ref_late.clone(),
2014            event_kind: LedgerEventKind::ClaimCommitted,
2015            disposition: mempill_types::disposition::Disposition::CommittedCheap,
2016            rationale: None,
2017            recorded_at: t_late.clone(),
2018        };
2019
2020        let mut txn = store.begin_atomic(&agent).unwrap();
2021        store.append_claim(&mut txn, &claim_early).unwrap();
2022        store.append_claim(&mut txn, &claim_late).unwrap();
2023        store.append_ledger_entry(&mut txn, &entry_early).unwrap();
2024        store.append_ledger_entry(&mut txn, &entry_late).unwrap();
2025        store.commit(txn).unwrap();
2026
2027        // Load from t_late — should only see the late entry
2028        let loaded = store.load_ledger(&agent, Some(&t_late), 100).unwrap();
2029        assert_eq!(loaded.len(), 1, "only the late entry must be returned when from=t_late");
2030        assert_eq!(loaded[0].entry_id, entry_late.entry_id);
2031    }
2032
2033    /// load_ledger returns empty when agent has no entries.
2034    #[test]
2035    fn read_load_ledger_empty_when_none() {
2036        let store = make_store();
2037        let agent = make_agent();
2038        let loaded = store.load_ledger(&agent, None, 100).unwrap();
2039        assert!(loaded.is_empty(), "must return empty vec when no ledger entries");
2040    }
2041
2042    /// load_edges_for returns edges and they are ordered by created_at ASC (deterministic).
2043    #[test]
2044    fn read_load_edges_for_ordering_created_at_asc() {
2045        let store = make_store();
2046        let agent = make_agent();
2047
2048        let claim_a = make_claim(&agent);
2049        let claim_b = make_claim(&agent);
2050        let claim_c = make_claim(&agent);
2051        let ref_a = claim_a.claim_ref().clone();
2052        let ref_b = claim_b.claim_ref().clone();
2053        let ref_c = claim_c.claim_ref().clone();
2054
2055        // Edge A→B created first, A→C created second (microsecond gap guaranteed by sleep or offset)
2056        let t1 = TransactionTime(Utc::now() - chrono::Duration::seconds(5));
2057        let t2 = TransactionTime(Utc::now());
2058
2059        let edge_ab = ClaimEdge {
2060            edge_id: Uuid::new_v4(),
2061            agent_id: agent.clone(),
2062            from_claim: ref_a.clone(),
2063            to_claim: ref_b.clone(),
2064            kind: EdgeKind::DependsOn,
2065            created_at: t1,
2066        };
2067        let edge_ac = ClaimEdge {
2068            edge_id: Uuid::new_v4(),
2069            agent_id: agent.clone(),
2070            from_claim: ref_a.clone(),
2071            to_claim: ref_c.clone(),
2072            kind: EdgeKind::DependsOn,
2073            created_at: t2,
2074        };
2075
2076        let mut txn = store.begin_atomic(&agent).unwrap();
2077        // Insert in reverse order to prove ORDER BY drives the result
2078        store.append_claim(&mut txn, &claim_a).unwrap();
2079        store.append_claim(&mut txn, &claim_b).unwrap();
2080        store.append_claim(&mut txn, &claim_c).unwrap();
2081        store.append_claim_edge(&mut txn, &edge_ac).unwrap(); // insert late edge first
2082        store.append_claim_edge(&mut txn, &edge_ab).unwrap(); // insert early edge second
2083        store.commit(txn).unwrap();
2084
2085        let loaded = store.load_edges_for(&agent, &ref_a).unwrap();
2086        assert_eq!(loaded.len(), 2, "must return both edges");
2087        // Verify ASC ordering: AB (earlier created_at) must come before AC
2088        assert_eq!(loaded[0].to_claim, ref_b, "earlier edge (A→B) must be first");
2089        assert_eq!(loaded[1].to_claim, ref_c, "later edge (A→C) must be second");
2090    }
2091
2092    /// load_edges_for returns empty when no edges exist for the claim.
2093    #[test]
2094    fn read_load_edges_for_empty_when_none() {
2095        let store = make_store();
2096        let agent = make_agent();
2097        let claim = make_claim(&agent);
2098        let claim_ref = claim.claim_ref().clone();
2099
2100        let mut txn = store.begin_atomic(&agent).unwrap();
2101        store.append_claim(&mut txn, &claim).unwrap();
2102        store.commit(txn).unwrap();
2103
2104        let loaded = store.load_edges_for(&agent, &claim_ref).unwrap();
2105        assert!(loaded.is_empty(), "must return empty vec when no edges");
2106    }
2107
2108    /// load_injected_claims returns ClaimRefs from ServedAsInjected ledger entries.
2109    #[test]
2110    fn read_load_injected_claims_round_trip() {
2111        use mempill_types::disposition::Disposition;
2112
2113        let store = make_store();
2114        let agent = make_agent();
2115        let claim = make_claim(&agent);
2116        let claim_ref = claim.claim_ref().clone();
2117
2118        let injected_entry = mempill_types::ledger::LedgerEntry {
2119            entry_id: Uuid::new_v4(),
2120            agent_id: agent.clone(),
2121            claim_ref: claim_ref.clone(),
2122            event_kind: LedgerEventKind::ServedAsInjected,
2123            disposition: Disposition::CommittedCheap,
2124            rationale: None,
2125            recorded_at: TransactionTime(Utc::now()),
2126        };
2127
2128        let mut txn = store.begin_atomic(&agent).unwrap();
2129        store.append_claim(&mut txn, &claim).unwrap();
2130        store.append_ledger_entry(&mut txn, &injected_entry).unwrap();
2131        store.commit(txn).unwrap();
2132
2133        let loaded = store.load_injected_claims(&agent).unwrap();
2134        assert_eq!(loaded.len(), 1, "must return one injected claim ref");
2135        assert_eq!(loaded[0], claim_ref);
2136    }
2137
2138    /// load_injected_claims returns empty when no ServedAsInjected entries exist.
2139    #[test]
2140    fn read_load_injected_claims_empty_when_none() {
2141        let store = make_store();
2142        let agent = make_agent();
2143        let loaded = store.load_injected_claims(&agent).unwrap();
2144        assert!(loaded.is_empty(), "must return empty vec when no injected claims");
2145    }
2146
2147    /// LINEAGE CTE: multi-hop A→B→C chain is fully traversed.
2148    #[test]
2149    fn read_load_lineage_multi_hop_derived_from() {
2150        let store = make_store();
2151        let agent = make_agent();
2152
2153        // A is derived from B; B is derived from C.
2154        // load_lineage(A) must return edges: A→B and B→C (full chain).
2155        let claim_a = make_claim(&agent);
2156        let claim_b = make_claim(&agent);
2157        let claim_c = make_claim(&agent);
2158        let ref_a = claim_a.claim_ref().clone();
2159        let ref_b = claim_b.claim_ref().clone();
2160        let ref_c = claim_c.claim_ref().clone();
2161
2162        let edge_ab = ClaimEdge {
2163            edge_id: Uuid::new_v4(),
2164            agent_id: agent.clone(),
2165            from_claim: ref_a.clone(),
2166            to_claim: ref_b.clone(),
2167            kind: EdgeKind::DerivedFrom,
2168            created_at: TransactionTime(Utc::now() - chrono::Duration::seconds(2)),
2169        };
2170        let edge_bc = ClaimEdge {
2171            edge_id: Uuid::new_v4(),
2172            agent_id: agent.clone(),
2173            from_claim: ref_b.clone(),
2174            to_claim: ref_c.clone(),
2175            kind: EdgeKind::DerivedFrom,
2176            created_at: TransactionTime(Utc::now() - chrono::Duration::seconds(1)),
2177        };
2178
2179        let mut txn = store.begin_atomic(&agent).unwrap();
2180        store.append_claim(&mut txn, &claim_a).unwrap();
2181        store.append_claim(&mut txn, &claim_b).unwrap();
2182        store.append_claim(&mut txn, &claim_c).unwrap();
2183        store.append_claim_edge(&mut txn, &edge_ab).unwrap();
2184        store.append_claim_edge(&mut txn, &edge_bc).unwrap();
2185        store.commit(txn).unwrap();
2186
2187        let lineage = store.load_lineage(&agent, &ref_a).unwrap();
2188        assert_eq!(lineage.len(), 2, "lineage must contain both DerivedFrom hops A→B and B→C");
2189
2190        // Shallowest (depth 1) first: A→B edge
2191        assert_eq!(lineage[0].from_claim, ref_a, "first edge must start from A");
2192        assert_eq!(lineage[0].to_claim, ref_b, "first edge must point to B");
2193        // Deeper (depth 2): B→C edge
2194        assert_eq!(lineage[1].from_claim, ref_b, "second edge must start from B");
2195        assert_eq!(lineage[1].to_claim, ref_c, "second edge must point to C");
2196    }
2197
2198    /// load_lineage returns empty when the claim has no DerivedFrom edges.
2199    #[test]
2200    fn read_load_lineage_empty_when_no_derived_from_edges() {
2201        let store = make_store();
2202        let agent = make_agent();
2203        let claim = make_claim(&agent);
2204        let claim_ref = claim.claim_ref().clone();
2205
2206        let mut txn = store.begin_atomic(&agent).unwrap();
2207        store.append_claim(&mut txn, &claim).unwrap();
2208        store.commit(txn).unwrap();
2209
2210        let lineage = store.load_lineage(&agent, &claim_ref).unwrap();
2211        assert!(lineage.is_empty(), "load_lineage must return empty vec when no DerivedFrom edges");
2212    }
2213
2214    // ── TXN ALREADY OPEN guard ────────────────────────────────────────────────
2215
2216    #[test]
2217    fn begin_atomic_while_txn_open_returns_error() {
2218        let store = make_store();
2219        let agent = make_agent();
2220
2221        let _txn = store.begin_atomic(&agent).expect("first begin_atomic should succeed");
2222        let result = store.begin_atomic(&agent);
2223        assert!(
2224            matches!(result, Err(SqliteStoreError::TxnAlreadyOpen)),
2225            "second begin_atomic must return TxnAlreadyOpen"
2226        );
2227    }
2228
2229    // ── FULL ATOMIC UNIT (I9 positive path) ───────────────────────────────────
2230
2231    /// Append {claim + validity assertion + ledger entry + edge} and commit.
2232    /// All four rows must land atomically.
2233    #[test]
2234    fn atomic_unit_all_four_rows_on_commit() {
2235        let store = make_store();
2236        let agent = make_agent();
2237        let claim_a = make_claim(&agent);
2238        let claim_b = make_claim(&agent);
2239        let claim_ref_a = claim_a.claim_ref().clone();
2240        let claim_ref_b = claim_b.claim_ref().clone();
2241
2242        let assertion = mempill_types::validity::ValidityAssertion {
2243            assertion_ref: Uuid::new_v4(),
2244            agent_id: agent.clone(),
2245            target_claim: claim_ref_a.clone(),
2246            kind: AssertionKind::Bound { bound_at: Utc::now() },
2247            provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
2248            confidence: mempill_types::claim::Confidence {
2249                value_confidence: 0.9,
2250                valid_time_confidence: 0.9,
2251            },
2252            asserted_at: TransactionTime(Utc::now()),
2253        };
2254        let ledger = make_ledger_entry(&agent, &claim_ref_a);
2255        let edge = ClaimEdge {
2256            edge_id: Uuid::new_v4(),
2257            agent_id: agent.clone(),
2258            from_claim: claim_ref_a.clone(),
2259            to_claim: claim_ref_b.clone(),
2260            kind: EdgeKind::Supersedes,
2261            created_at: TransactionTime(Utc::now()),
2262        };
2263
2264        let mut txn = store.begin_atomic(&agent).unwrap();
2265        store.append_claim(&mut txn, &claim_a).unwrap();
2266        store.append_claim(&mut txn, &claim_b).unwrap();
2267        store.append_validity_assertion(&mut txn, &assertion).unwrap();
2268        store.append_ledger_entry(&mut txn, &ledger).unwrap();
2269        store.append_claim_edge(&mut txn, &edge).unwrap();
2270        store.commit(txn).unwrap();
2271
2272        let slot = store.conn.lock().unwrap();
2273        let conn = slot.as_ref().unwrap();
2274
2275        let claims: i64 = conn
2276            .query_row("SELECT COUNT(*) FROM claims", [], |r| r.get(0))
2277            .unwrap();
2278        let assertions: i64 = conn
2279            .query_row("SELECT COUNT(*) FROM validity_assertions", [], |r| r.get(0))
2280            .unwrap();
2281        let ledger_count: i64 = conn
2282            .query_row("SELECT COUNT(*) FROM ledger_entries", [], |r| r.get(0))
2283            .unwrap();
2284        let edges: i64 = conn
2285            .query_row("SELECT COUNT(*) FROM claim_edges", [], |r| r.get(0))
2286            .unwrap();
2287
2288        assert_eq!(claims, 2, "two claim rows must exist");
2289        assert_eq!(assertions, 1, "one validity_assertion row must exist");
2290        assert_eq!(ledger_count, 1, "one ledger_entry row must exist");
2291        assert_eq!(edges, 1, "one claim_edge row must exist");
2292    }
2293
2294    // ── SqlitePendingStore tests ──────────────────────────────────────────────
2295
2296    use mempill_core::ports::pending_adjudication::{PendingAdjudicationPort, PendingAdjudicationRow};
2297    use mempill_types::{
2298        AdjudicationRequest, Belief, CurrencySignal, CurrencyState, OverturnReason, SubjectLineRef,
2299    };
2300
2301    fn make_adj_request(agent: &AgentId) -> AdjudicationRequest {
2302        let claim_ref = ClaimRef(Uuid::new_v4());
2303        let now = TransactionTime(Utc::now());
2304        AdjudicationRequest {
2305            subject_line: SubjectLineRef {
2306                agent_id: agent.clone(),
2307                subject: "user".into(),
2308                predicate: "city".into(),
2309            },
2310            incumbent: Belief {
2311                claim_ref: claim_ref.clone(),
2312                fact: mempill_types::Fact {
2313                    subject: "user".into(),
2314                    predicate: "city".into(),
2315                    value: serde_json::json!("Berlin"),
2316                },
2317                provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
2318                valid_time: ValidTime { start: None, end: None, valid_time_confidence: 0.0 },
2319                transaction_time: now.clone(),
2320                confidence: Confidence { value_confidence: 0.9, valid_time_confidence: 0.0 },
2321                currency_signal: CurrencySignal {
2322                    last_refreshed_at: now.clone(),
2323                    state: CurrencyState::Fresh,
2324                    corroboration_count: 0,
2325                },
2326                criticality: Criticality::Low,
2327            },
2328            challenger: make_claim(agent),
2329            criticality: Criticality::Low,
2330            reason: OverturnReason::ExternalContradiction,
2331        }
2332    }
2333
2334    fn make_pending_row(agent: &AgentId) -> PendingAdjudicationRow {
2335        PendingAdjudicationRow {
2336            handle_id: Uuid::new_v4(),
2337            agent_id: agent.clone(),
2338            subject: "user".into(),
2339            predicate: "city".into(),
2340            challenger_claim_ref: ClaimRef(Uuid::new_v4()),
2341            incumbent_claim_ref: ClaimRef(Uuid::new_v4()),
2342            request_payload: make_adj_request(agent),
2343            queued_at: Utc::now(),
2344            expires_at: None,
2345            status: "pending".to_string(),
2346        }
2347    }
2348
2349    /// insert_pending + get_pending round-trip.
2350    #[test]
2351    fn w3_sqlite_pending_insert_and_get_round_trip() {
2352        let store = make_store();
2353        let pending = store.pending_store();
2354        let agent = make_agent();
2355        let row = make_pending_row(&agent);
2356        let handle_id = row.handle_id;
2357
2358        pending.insert_pending(&row).expect("insert_pending must succeed");
2359
2360        let fetched = pending.get_pending(handle_id).expect("get_pending must succeed");
2361        let fetched = fetched.expect("row must be present");
2362        assert_eq!(fetched.handle_id, handle_id);
2363        assert_eq!(fetched.agent_id, agent);
2364        assert_eq!(fetched.subject, "user");
2365        assert_eq!(fetched.predicate, "city");
2366        assert_eq!(fetched.challenger_claim_ref, row.challenger_claim_ref);
2367        assert_eq!(fetched.incumbent_claim_ref, row.incumbent_claim_ref);
2368        assert_eq!(fetched.status, "pending");
2369        assert!(fetched.expires_at.is_none());
2370    }
2371
2372    /// get_pending returns None for unknown handle_id.
2373    #[test]
2374    fn w3_sqlite_pending_get_nonexistent_returns_none() {
2375        let store = make_store();
2376        let pending = store.pending_store();
2377        let result = pending.get_pending(Uuid::new_v4()).expect("get_pending must not error");
2378        assert!(result.is_none(), "unknown handle_id must return None");
2379    }
2380
2381    /// list_pending returns only pending rows for the given agent.
2382    #[test]
2383    fn w3_sqlite_pending_list_pending_by_agent() {
2384        let store = make_store();
2385        let pending = store.pending_store();
2386        let agent = make_agent();
2387        let agent2 = AgentId("other-agent".into());
2388
2389        let row1 = make_pending_row(&agent);
2390        let row2 = make_pending_row(&agent);
2391        let row3 = make_pending_row(&agent2);
2392
2393        pending.insert_pending(&row1).unwrap();
2394        pending.insert_pending(&row2).unwrap();
2395        pending.insert_pending(&row3).unwrap();
2396
2397        let agent_rows = pending.list_pending(Some(&agent)).unwrap();
2398        assert_eq!(agent_rows.len(), 2, "must return exactly 2 rows for agent");
2399
2400        let all_rows = pending.list_pending(None).unwrap();
2401        assert_eq!(all_rows.len(), 3, "list_pending(None) must return all 3 rows");
2402    }
2403
2404    /// mark_resolved changes status to 'resolved'; resolved row no longer in list_pending.
2405    #[test]
2406    fn w3_sqlite_pending_mark_resolved() {
2407        let store = make_store();
2408        let pending = store.pending_store();
2409        let agent = make_agent();
2410        let row = make_pending_row(&agent);
2411        let handle_id = row.handle_id;
2412
2413        pending.insert_pending(&row).unwrap();
2414        pending.mark_resolved(handle_id).unwrap();
2415
2416        // get_pending should still find it (status = 'resolved').
2417        let fetched = pending.get_pending(handle_id).unwrap().unwrap();
2418        assert_eq!(fetched.status, "resolved", "status must be 'resolved' after mark_resolved");
2419
2420        // list_pending should not include it.
2421        let pending_rows = pending.list_pending(Some(&agent)).unwrap();
2422        assert!(pending_rows.is_empty(), "resolved row must not appear in list_pending");
2423    }
2424
2425    /// Durability: persist a pending row, drop the store, reopen on the same in-memory
2426    /// connection via the shared Arc, and confirm get_pending still finds the row.
2427    ///
2428    /// NOTE: true file-backed durability (drop + reopen file) is tested in lib.rs integration.
2429    /// Here we verify the row survives dropping and re-acquiring the store handle.
2430    #[test]
2431    fn w3_sqlite_pending_durability_shared_arc() {
2432        let conn = open_in_memory().expect("in-memory connection must open");
2433        let persistence = SqlitePersistenceStore::new(conn);
2434        let pending = persistence.pending_store();
2435        let agent = make_agent();
2436        let row = make_pending_row(&agent);
2437        let handle_id = row.handle_id;
2438
2439        pending.insert_pending(&row).unwrap();
2440        drop(pending); // drop the pending store handle — Arc keeps connection alive
2441
2442        // Re-acquire a new pending store from the same persistence store.
2443        let pending2 = persistence.pending_store();
2444        let fetched = pending2.get_pending(handle_id).unwrap();
2445        assert!(fetched.is_some(), "pending row must survive store handle drop (durability via shared Arc)");
2446        assert_eq!(fetched.unwrap().handle_id, handle_id);
2447    }
2448}