Skip to main content

mempill_postgres/
store.rs

1//! `PostgresPersistenceStore` — impl of `PersistencePort` for mempill-postgres.
2//!
3//! # Append-only
4//!
5//! Every write method is an INSERT. No UPDATE or DELETE paths exist in this file.
6//!
7//! # Atomic commit unit
8//!
9//! `begin_atomic` acquires a pooled connection and opens a `BEGIN` transaction.
10//! `commit`/`rollback` close the transaction; the connection returns to the r2d2 pool.
11//!
12//! # JSONB handling
13//!
14//! `value` and `metadata` are JSONB columns in Postgres (TEXT in SQLite).
15//! On INSERT: serialized to JSON string, cast with `$n::jsonb` in SQL.
16//! On SELECT: cast back to `::text` in `CLAIM_SELECT_COLS` → `serde_json::from_str`.
17//! This confines the JSONB divergence to the INSERT SQL; all row mapping code is identical
18//! to the SQLite path.
19//!
20//! # stream_seq (monotone per-agent sequence number)
21//!
22//! `append_ledger_entry` assigns `stream_seq` via:
23//! `SELECT COALESCE(MAX(stream_seq), 0) + 1 FROM ledger_entries WHERE agent_id = $1`
24//! within the same transaction, under the advisory lock.
25//! INVARIANT: safe only under `pg_advisory_xact_lock`; replace with a Postgres SEQUENCE
26//! if the advisory lock is ever removed.
27
28use std::sync::Arc;
29
30use mempill_core::{
31    ports::pending_adjudication::{PendingAdjudicationPort, PendingAdjudicationRow},
32    ports::persistence::PersistencePort,
33    EngineConfig, EngineHandle,
34};
35use mempill_types::{
36    claim::{Cardinality, Claim, Confidence, Criticality, Fact},
37    disposition::Disposition,
38    edge::{ClaimEdge, EdgeKind},
39    identity::{AgentId, ClaimRef},
40    ledger::{LedgerEntry, LedgerEventKind},
41    provenance::{ExternalAnchor, ExternalKind, ProvenanceLabel},
42    time::{TransactionTime, ValidTime},
43    validity::{AssertionKind, ValidityAssertion},
44};
45
46use crate::{
47    connection::{PostgresPersistenceStore, PostgresStoreError},
48    txn::PostgresTxn,
49};
50
51// ── Domain-type ↔ column mapping helpers (mirrors mempill-sqlite/src/store.rs) ──
52
53fn provenance_to_str(p: &ProvenanceLabel) -> &'static str {
54    match p {
55        ProvenanceLabel::ModelDerived => "ModelDerived",
56        ProvenanceLabel::RecallReEntry => "RecallReEntry",
57        ProvenanceLabel::External(ExternalKind::UserAsserted) => "External_UserAsserted",
58        ProvenanceLabel::External(ExternalKind::ExternalFirstHand) => "External_ExternalFirstHand",
59        _ => "Unknown",
60    }
61}
62
63fn str_to_provenance(s: &str) -> Result<ProvenanceLabel, PostgresStoreError> {
64    match s {
65        "ModelDerived" => Ok(ProvenanceLabel::ModelDerived),
66        "RecallReEntry" => Ok(ProvenanceLabel::RecallReEntry),
67        "External_UserAsserted" => Ok(ProvenanceLabel::External(ExternalKind::UserAsserted)),
68        "External_ExternalFirstHand" => Ok(ProvenanceLabel::External(ExternalKind::ExternalFirstHand)),
69        other => Err(PostgresStoreError::Mapping(format!("unknown provenance_label: {other}"))),
70    }
71}
72
73fn cardinality_to_str(c: &Cardinality) -> &'static str {
74    match c {
75        Cardinality::Functional => "Functional",
76        Cardinality::SetValued => "SetValued",
77        Cardinality::Unknown => "Unknown",
78    }
79}
80
81fn str_to_cardinality(s: &str) -> Result<Cardinality, PostgresStoreError> {
82    match s {
83        "Functional" => Ok(Cardinality::Functional),
84        "SetValued" => Ok(Cardinality::SetValued),
85        "Unknown" => Ok(Cardinality::Unknown),
86        other => Err(PostgresStoreError::Mapping(format!("unknown cardinality: {other}"))),
87    }
88}
89
90fn criticality_to_str(c: &Criticality) -> &'static str {
91    match c {
92        Criticality::Low => "Low",
93        Criticality::Medium => "Medium",
94        Criticality::High => "High",
95        Criticality::Critical => "Critical",
96    }
97}
98
99fn str_to_criticality(s: &str) -> Result<Criticality, PostgresStoreError> {
100    match s {
101        "Low" => Ok(Criticality::Low),
102        "Medium" => Ok(Criticality::Medium),
103        "High" => Ok(Criticality::High),
104        "Critical" => Ok(Criticality::Critical),
105        other => Err(PostgresStoreError::Mapping(format!("unknown criticality: {other}"))),
106    }
107}
108
109fn edge_kind_to_str(k: &EdgeKind) -> &'static str {
110    match k {
111        EdgeKind::DerivedFrom => "DerivedFrom",
112        EdgeKind::Supersedes => "Supersedes",
113        EdgeKind::DependsOn => "DependsOn",
114        EdgeKind::MutualExclusion => "MutualExclusion",
115        // EdgeKind is #[non_exhaustive] — future variants stored as "Unknown".
116        _ => "Unknown",
117    }
118}
119
120fn str_to_edge_kind(s: &str) -> Result<EdgeKind, PostgresStoreError> {
121    match s {
122        "DerivedFrom" => Ok(EdgeKind::DerivedFrom),
123        "Supersedes" => Ok(EdgeKind::Supersedes),
124        "DependsOn" => Ok(EdgeKind::DependsOn),
125        "MutualExclusion" => Ok(EdgeKind::MutualExclusion),
126        other => Err(PostgresStoreError::Mapping(format!("unknown edge_kind: {other}"))),
127    }
128}
129
130fn ledger_event_kind_to_str(k: &LedgerEventKind) -> &'static str {
131    match k {
132        LedgerEventKind::ClaimCommitted => "ClaimCommitted",
133        LedgerEventKind::ValidityAsserted => "ValidityAsserted",
134        LedgerEventKind::AdjudicationRequested => "AdjudicationRequested",
135        LedgerEventKind::AdjudicationResolved => "AdjudicationResolved",
136        LedgerEventKind::RecallReEntryDetected => "RecallReEntryDetected",
137        LedgerEventKind::Quarantined => "Quarantined",
138        LedgerEventKind::DependentFlaggedPendingReview => "DependentFlaggedPendingReview",
139        LedgerEventKind::ServedAsInjected => "ServedAsInjected",
140        LedgerEventKind::AdjudicationExpired => "AdjudicationExpired",
141        // LedgerEventKind is #[non_exhaustive] — future variants stored as "Unknown".
142        _ => "Unknown",
143    }
144}
145
146fn str_to_ledger_event_kind(s: &str) -> Result<LedgerEventKind, PostgresStoreError> {
147    match s {
148        "ClaimCommitted" => Ok(LedgerEventKind::ClaimCommitted),
149        "ValidityAsserted" => Ok(LedgerEventKind::ValidityAsserted),
150        "AdjudicationRequested" => Ok(LedgerEventKind::AdjudicationRequested),
151        "AdjudicationResolved" => Ok(LedgerEventKind::AdjudicationResolved),
152        "RecallReEntryDetected" => Ok(LedgerEventKind::RecallReEntryDetected),
153        "Quarantined" => Ok(LedgerEventKind::Quarantined),
154        "DependentFlaggedPendingReview" => Ok(LedgerEventKind::DependentFlaggedPendingReview),
155        "ServedAsInjected" => Ok(LedgerEventKind::ServedAsInjected),
156        "AdjudicationExpired" => Ok(LedgerEventKind::AdjudicationExpired),
157        other => Err(PostgresStoreError::Mapping(format!("unknown ledger event_kind: {other}"))),
158    }
159}
160
161fn disposition_to_str(d: &Disposition) -> &'static str {
162    match d {
163        Disposition::CommittedCheap => "CommittedCheap",
164        Disposition::CommittedInferred => "CommittedInferred",
165        Disposition::QueuedForAdjudication => "QueuedForAdjudication",
166        Disposition::Contested => "Contested",
167        Disposition::PendingConflict => "PendingConflict",
168        Disposition::PendingReview => "PendingReview",
169        Disposition::PendingLowConfidence => "PendingLowConfidence",
170        Disposition::Quarantined => "Quarantined",
171        Disposition::Superseded => "Superseded",
172        Disposition::Invalidated => "Invalidated",
173        Disposition::Reinstated => "Reinstated",
174        Disposition::Rejected => "Rejected",
175        // Disposition is #[non_exhaustive] — future variants stored as "Unknown".
176        _ => "Unknown",
177    }
178}
179
180fn str_to_disposition(s: &str) -> Result<Disposition, PostgresStoreError> {
181    match s {
182        "CommittedCheap" => Ok(Disposition::CommittedCheap),
183        "CommittedInferred" => Ok(Disposition::CommittedInferred),
184        "QueuedForAdjudication" => Ok(Disposition::QueuedForAdjudication),
185        "Contested" => Ok(Disposition::Contested),
186        "PendingConflict" => Ok(Disposition::PendingConflict),
187        "PendingReview" => Ok(Disposition::PendingReview),
188        "PendingLowConfidence" => Ok(Disposition::PendingLowConfidence),
189        "Quarantined" => Ok(Disposition::Quarantined),
190        "Superseded" => Ok(Disposition::Superseded),
191        "Invalidated" => Ok(Disposition::Invalidated),
192        "Reinstated" => Ok(Disposition::Reinstated),
193        "Rejected" => Ok(Disposition::Rejected),
194        other => Err(PostgresStoreError::Mapping(format!("unknown disposition: {other}"))),
195    }
196}
197
198// ── Row-to-domain-type mapping helpers ───────────────────────────────────────
199
200/// The SELECT column list for `claims` table.
201///
202/// Note: `value::text` and `metadata::text` cast JSONB → TEXT at read time so
203/// `row_to_claim` can call `serde_json::from_str` identically to the SQLite path.
204/// This confines the JSONB divergence to the INSERT path only (§2 CLAIM_SELECT_COLS note).
205///
206/// Column order must exactly match `row_to_claim` indices below.
207const CLAIM_SELECT_COLS: &str = "
208    claim_id, agent_id, subject, predicate, value::text, cardinality,
209    provenance_label, nearest_external_anchor_id, derivation_depth,
210    tx_time, valid_time_start, valid_time_end, valid_time_confidence,
211    value_confidence, criticality, derived_from,
212    metadata::text, snapshot_schema_version
213";
214
215/// Map a postgres `Row` from the `claims` table to a `Claim` domain type.
216///
217/// Column order (must match `CLAIM_SELECT_COLS`):
218///   0  claim_id
219///   1  agent_id
220///   2  subject
221///   3  predicate
222///   4  value::text  (JSONB cast to TEXT)
223///   5  cardinality
224///   6  provenance_label
225///   7  nearest_external_anchor_id  (nullable)
226///   8  derivation_depth
227///   9  tx_time
228///  10  valid_time_start  (nullable)
229///  11  valid_time_end    (nullable)
230///  12  valid_time_confidence
231///  13  value_confidence
232///  14  criticality
233///  15  derived_from  (JSON array TEXT)
234///  16  metadata::text (nullable JSONB cast to TEXT)
235///  17  snapshot_schema_version (nullable INTEGER)
236fn row_to_claim(row: &postgres::Row) -> Result<Claim, PostgresStoreError> {
237    let claim_id_str: String = row.get(0);
238    let agent_id_str: String = row.get(1);
239    let subject: String = row.get(2);
240    let predicate: String = row.get(3);
241    let value_json: String = row.get(4);
242    let cardinality_str: String = row.get(5);
243    let provenance_str: String = row.get(6);
244    let nearest_anchor_str: Option<String> = row.get(7);
245    let derivation_depth: i32 = row.get(8);
246    let tx_time_str: String = row.get(9);
247    let valid_time_start_str: Option<String> = row.get(10);
248    let valid_time_end_str: Option<String> = row.get(11);
249    let valid_time_confidence: f64 = row.get(12);
250    let value_confidence: f64 = row.get(13);
251    let criticality_str: String = row.get(14);
252    let derived_from_json: String = row.get(15);
253    let metadata_json: Option<String> = row.get(16);
254    let snapshot_schema_version_raw: Option<i32> = row.get(17);
255
256    let claim_id = uuid::Uuid::parse_str(&claim_id_str)
257        .map_err(|e| PostgresStoreError::Mapping(format!("claim_id UUID: {e}")))?;
258
259    let value: serde_json::Value = serde_json::from_str(&value_json)
260        .map_err(|e| PostgresStoreError::Mapping(format!("value JSON: {e}")))?;
261
262    let cardinality = str_to_cardinality(&cardinality_str)?;
263    let provenance = str_to_provenance(&provenance_str)?;
264
265    let nearest_external_anchor: Option<ClaimRef> = nearest_anchor_str
266        .map(|s| {
267            uuid::Uuid::parse_str(&s)
268                .map(ClaimRef)
269                .map_err(|e| PostgresStoreError::Mapping(format!("anchor UUID: {e}")))
270        })
271        .transpose()?;
272
273    let tx_time = chrono::DateTime::parse_from_rfc3339(&tx_time_str)
274        .map(|dt| dt.with_timezone(&chrono::Utc))
275        .map_err(|e| PostgresStoreError::Mapping(format!("tx_time parse: {e}")))?;
276
277    let valid_time_start = valid_time_start_str
278        .map(|s| {
279            chrono::DateTime::parse_from_rfc3339(&s)
280                .map(|dt| dt.with_timezone(&chrono::Utc))
281                .map_err(|e| PostgresStoreError::Mapping(format!("valid_time_start: {e}")))
282        })
283        .transpose()?;
284
285    let valid_time_end = valid_time_end_str
286        .map(|s| {
287            chrono::DateTime::parse_from_rfc3339(&s)
288                .map(|dt| dt.with_timezone(&chrono::Utc))
289                .map_err(|e| PostgresStoreError::Mapping(format!("valid_time_end: {e}")))
290        })
291        .transpose()?;
292
293    let criticality = str_to_criticality(&criticality_str)?;
294
295    let derived_from_uuids: Vec<String> = serde_json::from_str(&derived_from_json)
296        .map_err(|e| PostgresStoreError::Mapping(format!("derived_from JSON: {e}")))?;
297
298    let derived_from: Vec<ClaimRef> = derived_from_uuids
299        .iter()
300        .map(|s| {
301            uuid::Uuid::parse_str(s)
302                .map(ClaimRef)
303                .map_err(|e| PostgresStoreError::Mapping(format!("derived_from UUID: {e}")))
304        })
305        .collect::<Result<_, _>>()?;
306
307    let metadata: Option<serde_json::Value> = metadata_json
308        .map(|s| {
309            serde_json::from_str(&s)
310                .map_err(|e| PostgresStoreError::Mapping(format!("metadata JSON: {e}")))
311        })
312        .transpose()?;
313
314    let snapshot_schema_version: Option<u32> = snapshot_schema_version_raw.map(|v| v as u32);
315
316    Ok(Claim::new(
317        ClaimRef(claim_id),
318        AgentId(agent_id_str),
319        Fact { subject, predicate, value },
320        cardinality,
321        provenance,
322        ExternalAnchor {
323            nearest_external_anchor,
324            derivation_depth: derivation_depth as u32,
325        },
326        TransactionTime(tx_time),
327        ValidTime {
328            start: valid_time_start,
329            end: valid_time_end,
330            valid_time_confidence: valid_time_confidence as f32,
331        },
332        Confidence {
333            value_confidence: value_confidence as f32,
334            valid_time_confidence: valid_time_confidence as f32,
335        },
336        criticality,
337        derived_from,
338        metadata,
339        snapshot_schema_version,
340    ))
341}
342
343/// Map a postgres `Row` from the `claim_edges` table to a `ClaimEdge` domain type.
344fn row_to_edge(row: &postgres::Row) -> Result<ClaimEdge, PostgresStoreError> {
345    let edge_id_str: String = row.get(0);
346    let agent_id_str: String = row.get(1);
347    let from_claim_str: String = row.get(2);
348    let to_claim_str: String = row.get(3);
349    let kind_str: String = row.get(4);
350    let created_at_str: String = row.get(5);
351
352    let edge_id = uuid::Uuid::parse_str(&edge_id_str)
353        .map_err(|e| PostgresStoreError::Mapping(format!("edge_id UUID: {e}")))?;
354    let from_claim = uuid::Uuid::parse_str(&from_claim_str)
355        .map(ClaimRef)
356        .map_err(|e| PostgresStoreError::Mapping(format!("from_claim UUID: {e}")))?;
357    let to_claim = uuid::Uuid::parse_str(&to_claim_str)
358        .map(ClaimRef)
359        .map_err(|e| PostgresStoreError::Mapping(format!("to_claim UUID: {e}")))?;
360    let kind = str_to_edge_kind(&kind_str)?;
361    let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
362        .map(|dt| dt.with_timezone(&chrono::Utc))
363        .map_err(|e| PostgresStoreError::Mapping(format!("created_at parse: {e}")))?;
364
365    Ok(ClaimEdge {
366        edge_id,
367        agent_id: AgentId(agent_id_str),
368        from_claim,
369        to_claim,
370        kind,
371        created_at: TransactionTime(created_at),
372    })
373}
374
375// ── PersistencePort impl ──────────────────────────────────────────────────────
376
377impl PostgresPersistenceStore {
378    /// Return a `PostgresPendingStore` that shares the same r2d2 connection pool.
379    ///
380    /// Both `PostgresPersistenceStore` and `PostgresPendingStore` acquire connections
381    /// from the same pool. The per-agent write lock held by `EngineHandle` ensures that
382    /// the pending insert is serialized with the claim transaction commit.
383    pub fn pending_store(&self) -> PostgresPendingStore {
384        PostgresPendingStore::new(self.pool.clone())
385    }
386}
387
388impl PersistencePort for PostgresPersistenceStore {
389    type Transaction = PostgresTxn;
390    type Error = PostgresStoreError;
391
392    // ── Transaction lifecycle ─────────────────────────────────────────────────
393
394    /// Open an explicit `BEGIN` transaction scoped to `agent_id`.
395    ///
396    /// Acquires a connection from the r2d2 pool, issues `BEGIN`, then acquires the
397    /// per-agent_id advisory lock: `SELECT pg_advisory_xact_lock(hashtext($1)::bigint)`.
398    fn begin_atomic(&self, agent_id: &AgentId) -> Result<PostgresTxn, PostgresStoreError> {
399        let conn = self.pool.get()?;
400        PostgresTxn::begin(agent_id.clone(), conn)
401    }
402
403    /// Commit the transaction. The pooled connection returns to the r2d2 pool.
404    fn commit(&self, txn: PostgresTxn) -> Result<(), PostgresStoreError> {
405        txn.commit_and_drop()
406    }
407
408    /// Rollback the transaction. The pooled connection returns to the r2d2 pool.
409    fn rollback(&self, txn: PostgresTxn) -> Result<(), PostgresStoreError> {
410        txn.rollback_and_drop()
411    }
412
413    // ── Write methods (INSERT-only, I1) ───────────────────────────────────────
414
415    /// Append a claim row within the open transaction.
416    ///
417    /// `value` and `metadata` are cast to JSONB via `$n::jsonb` SQL cast (§2 JSONB note).
418    fn append_claim(
419        &self,
420        txn: &mut PostgresTxn,
421        claim: &Claim,
422    ) -> Result<ClaimRef, PostgresStoreError> {
423        let claim_id = claim.claim_ref().0.to_string();
424        let agent_id = claim.agent_id().0.clone();
425        let fact = claim.fact();
426        // Pass value and metadata as serde_json::Value so the postgres driver can
427        // encode them as JSONB binary directly (requires feature "with-serde_json-1").
428        // A String with `$n::jsonb` SQL cast does NOT work — the driver type-checks
429        // the Rust type against the declared column OID before the cast runs.
430        let value_jsonb: &serde_json::Value = &fact.value;
431        let cardinality = cardinality_to_str(claim.cardinality()).to_owned();
432        let provenance = provenance_to_str(claim.provenance()).to_owned();
433        let anchor = claim.external_anchor();
434        let nearest_anchor: Option<String> =
435            anchor.nearest_external_anchor.as_ref().map(|r| r.0.to_string());
436        let derivation_depth = anchor.derivation_depth as i32;
437        let tx_time = claim.transaction_time().0.to_rfc3339();
438        let vt = claim.valid_time();
439        let valid_time_start: Option<String> = vt.start.map(|dt| dt.to_rfc3339());
440        let valid_time_end: Option<String> = vt.end.map(|dt| dt.to_rfc3339());
441        let valid_time_confidence = vt.valid_time_confidence as f64;
442        let conf = claim.confidence();
443        let value_confidence = conf.value_confidence as f64;
444        let criticality = criticality_to_str(claim.criticality()).to_owned();
445        let derived_from_refs: Vec<String> =
446            claim.derived_from().iter().map(|r| r.0.to_string()).collect();
447        let derived_from_json = serde_json::to_string(&derived_from_refs)
448            .map_err(|e| PostgresStoreError::Mapping(format!("derived_from serialization: {e}")))?;
449        // metadata is Option<serde_json::Value>: pass as Option<&serde_json::Value>
450        let metadata_jsonb: Option<serde_json::Value> = claim.metadata().cloned();
451        let snapshot_schema_version: Option<i32> =
452            claim.snapshot_schema_version().map(|v| v as i32);
453
454        txn.client().execute(
455            "INSERT INTO claims (
456                claim_id, agent_id, subject, predicate, value, cardinality,
457                provenance_label, nearest_external_anchor_id, derivation_depth,
458                tx_time, valid_time_start, valid_time_end, valid_time_confidence,
459                value_confidence, criticality, derived_from,
460                metadata, snapshot_schema_version, embedding_model_id
461            ) VALUES (
462                $1,  $2,  $3,  $4,  $5,  $6,
463                $7,  $8,  $9,
464                $10, $11, $12, $13,
465                $14, $15, $16,
466                $17, $18, NULL
467            )",
468            &[
469                &claim_id,
470                &agent_id,
471                &fact.subject.as_str(),
472                &fact.predicate.as_str(),
473                &value_jsonb,
474                &cardinality,
475                &provenance,
476                &nearest_anchor,
477                &derivation_depth,
478                &tx_time,
479                &valid_time_start,
480                &valid_time_end,
481                &valid_time_confidence,
482                &value_confidence,
483                &criticality,
484                &derived_from_json,
485                &metadata_jsonb,
486                &snapshot_schema_version,
487            ],
488        )?;
489
490        Ok(claim.claim_ref().clone())
491    }
492
493    /// Append a validity assertion row within the open transaction.
494    fn append_validity_assertion(
495        &self,
496        txn: &mut PostgresTxn,
497        assertion: &ValidityAssertion,
498    ) -> Result<(), PostgresStoreError> {
499        let assertion_id = assertion.assertion_ref.to_string();
500        let agent_id = assertion.agent_id.0.clone();
501        let target_claim_id = assertion.target_claim.0.to_string();
502        let provenance = provenance_to_str(&assertion.provenance).to_owned();
503        let value_confidence = assertion.confidence.value_confidence as f64;
504        let valid_time_confidence = assertion.confidence.valid_time_confidence as f64;
505        let asserted_at = assertion.asserted_at.0.to_rfc3339();
506
507        let (assertion_kind, bound_at, reopen_at): (&str, Option<String>, Option<String>) =
508            match &assertion.kind {
509                AssertionKind::Bound { bound_at } => ("Bound", Some(bound_at.to_rfc3339()), None),
510                AssertionKind::Reopen { reopen_at } => ("Reopen", None, Some(reopen_at.to_rfc3339())),
511                // AssertionKind is #[non_exhaustive] — future kinds stored as "Unknown" (no-op).
512                _ => ("Unknown", None, None),
513            };
514
515        txn.client().execute(
516            "INSERT INTO validity_assertions (
517                assertion_id, agent_id, target_claim_id,
518                assertion_kind, bound_at, reopen_at,
519                provenance_label, value_confidence, valid_time_confidence, asserted_at
520            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
521            &[
522                &assertion_id,
523                &agent_id,
524                &target_claim_id,
525                &assertion_kind,
526                &bound_at,
527                &reopen_at,
528                &provenance,
529                &value_confidence,
530                &valid_time_confidence,
531                &asserted_at,
532            ],
533        )?;
534
535        Ok(())
536    }
537
538    /// Append a ledger entry row within the open transaction.
539    ///
540    /// `stream_seq` is assigned via:
541    /// `SELECT COALESCE(MAX(stream_seq), 0) + 1 FROM ledger_entries WHERE agent_id = $1`
542    /// within the same transaction, under the per-agent advisory lock.
543    ///
544    /// INVARIANT: this MAX+1 assignment is safe ONLY under `pg_advisory_xact_lock`.
545    /// If the advisory lock is ever removed, replace with a Postgres SEQUENCE object.
546    fn append_ledger_entry(
547        &self,
548        txn: &mut PostgresTxn,
549        entry: &LedgerEntry,
550    ) -> Result<(), PostgresStoreError> {
551        let entry_id = entry.entry_id.to_string();
552        let agent_id = entry.agent_id.0.clone();
553        let claim_id = entry.claim_ref.0.to_string();
554        let event_kind = ledger_event_kind_to_str(&entry.event_kind).to_owned();
555        let disposition = disposition_to_str(&entry.disposition).to_owned();
556        // Pass rationale as Option<serde_json::Value> so the driver encodes it as JSONB.
557        // A String with `$6::jsonb` cast does NOT work — see append_claim note above.
558        let rationale_jsonb: Option<serde_json::Value> = entry.rationale.clone();
559        let recorded_at = entry.recorded_at.0.to_rfc3339();
560
561        // INVARIANT: safe only under pg_advisory_xact_lock; replace with a SEQUENCE if the lock is ever removed.
562        let row = txn.client().query_one(
563            "SELECT COALESCE(MAX(stream_seq), 0) + 1 FROM ledger_entries WHERE agent_id = $1",
564            &[&agent_id],
565        )?;
566        let stream_seq: i64 = row.get(0);
567
568        txn.client().execute(
569            "INSERT INTO ledger_entries (
570                entry_id, agent_id, claim_id, event_kind, disposition, rationale, recorded_at, stream_seq
571            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
572            &[
573                &entry_id,
574                &agent_id,
575                &claim_id,
576                &event_kind,
577                &disposition,
578                &rationale_jsonb,
579                &recorded_at,
580                &stream_seq,
581            ],
582        )?;
583
584        Ok(())
585    }
586
587    /// Append a claim edge row within the open transaction.
588    fn append_claim_edge(
589        &self,
590        txn: &mut PostgresTxn,
591        edge: &ClaimEdge,
592    ) -> Result<(), PostgresStoreError> {
593        let edge_id = edge.edge_id.to_string();
594        let agent_id = edge.agent_id.0.clone();
595        let from_claim_id = edge.from_claim.0.to_string();
596        let to_claim_id = edge.to_claim.0.to_string();
597        let edge_kind = edge_kind_to_str(&edge.kind).to_owned();
598        let created_at = edge.created_at.0.to_rfc3339();
599
600        txn.client().execute(
601            "INSERT INTO claim_edges (
602                edge_id, agent_id, from_claim_id, to_claim_id, edge_kind, created_at
603            ) VALUES ($1, $2, $3, $4, $5, $6)",
604            &[
605                &edge_id,
606                &agent_id,
607                &from_claim_id,
608                &to_claim_id,
609                &edge_kind,
610                &created_at,
611            ],
612        )?;
613
614        Ok(())
615    }
616
617    // ── Read methods (pool.get() per call; non-mutating) ─────────────────────
618
619    /// Load all claims on (agent_id, subject, predicate), ordered by tx_time ASC.
620    fn load_subject_line(
621        &self,
622        agent_id: &AgentId,
623        subject: &str,
624        predicate: &str,
625    ) -> Result<Vec<Claim>, PostgresStoreError> {
626        let mut conn = self.pool.get()?;
627        let sql = format!(
628            "SELECT {CLAIM_SELECT_COLS} FROM claims
629             WHERE agent_id = $1 AND subject = $2 AND predicate = $3
630             ORDER BY tx_time ASC"
631        );
632        let rows = conn.query(
633            &sql,
634            &[&agent_id.0.as_str(), &subject, &predicate],
635        )?;
636        rows.iter().map(row_to_claim).collect()
637    }
638
639    /// Load a single claim by `ClaimRef`. Returns `None` if not found.
640    fn load_claim(
641        &self,
642        agent_id: &AgentId,
643        claim_ref: &ClaimRef,
644    ) -> Result<Option<Claim>, PostgresStoreError> {
645        let mut conn = self.pool.get()?;
646        let claim_id_str = claim_ref.0.to_string();
647        let sql = format!(
648            "SELECT {CLAIM_SELECT_COLS} FROM claims WHERE agent_id = $1 AND claim_id = $2"
649        );
650        let rows = conn.query(&sql, &[&agent_id.0.as_str(), &claim_id_str.as_str()])?;
651        match rows.first() {
652            None => Ok(None),
653            Some(row) => Ok(Some(row_to_claim(row)?)),
654        }
655    }
656
657    /// Load all validity assertions targeting a claim, ordered by asserted_at ASC.
658    fn load_validity_assertions_for(
659        &self,
660        agent_id: &AgentId,
661        claim_ref: &ClaimRef,
662    ) -> Result<Vec<ValidityAssertion>, PostgresStoreError> {
663        let mut conn = self.pool.get()?;
664        let claim_id_str = claim_ref.0.to_string();
665        let rows = conn.query(
666            "SELECT assertion_id, agent_id, target_claim_id,
667                    assertion_kind, bound_at, reopen_at,
668                    provenance_label, value_confidence, valid_time_confidence, asserted_at
669             FROM validity_assertions
670             WHERE agent_id = $1 AND target_claim_id = $2
671             ORDER BY asserted_at ASC",
672            &[&agent_id.0.as_str(), &claim_id_str.as_str()],
673        )?;
674
675        rows.iter()
676            .map(|row| {
677                let assertion_id_str: String = row.get(0);
678                let agent_id_str: String = row.get(1);
679                let target_claim_str: String = row.get(2);
680                let kind_str: String = row.get(3);
681                let bound_at_str: Option<String> = row.get(4);
682                let reopen_at_str: Option<String> = row.get(5);
683                let prov_str: String = row.get(6);
684                let value_confidence: f64 = row.get(7);
685                let valid_time_confidence: f64 = row.get(8);
686                let asserted_at_str: String = row.get(9);
687
688                let assertion_ref = uuid::Uuid::parse_str(&assertion_id_str)
689                    .map_err(|e| PostgresStoreError::Mapping(format!("assertion_id UUID: {e}")))?;
690                let target_claim = uuid::Uuid::parse_str(&target_claim_str)
691                    .map(ClaimRef)
692                    .map_err(|e| PostgresStoreError::Mapping(format!("target_claim UUID: {e}")))?;
693                let provenance = str_to_provenance(&prov_str)?;
694                let asserted_at = chrono::DateTime::parse_from_rfc3339(&asserted_at_str)
695                    .map(|dt| dt.with_timezone(&chrono::Utc))
696                    .map_err(|e| PostgresStoreError::Mapping(format!("asserted_at: {e}")))?;
697
698                let kind = match kind_str.as_str() {
699                    "Bound" => {
700                        let s = bound_at_str.ok_or_else(|| {
701                            PostgresStoreError::Mapping("bound_at is NULL for Bound assertion".into())
702                        })?;
703                        let dt = chrono::DateTime::parse_from_rfc3339(&s)
704                            .map(|dt| dt.with_timezone(&chrono::Utc))
705                            .map_err(|e| PostgresStoreError::Mapping(format!("bound_at: {e}")))?;
706                        AssertionKind::Bound { bound_at: dt }
707                    }
708                    "Reopen" => {
709                        let s = reopen_at_str.ok_or_else(|| {
710                            PostgresStoreError::Mapping("reopen_at is NULL for Reopen assertion".into())
711                        })?;
712                        let dt = chrono::DateTime::parse_from_rfc3339(&s)
713                            .map(|dt| dt.with_timezone(&chrono::Utc))
714                            .map_err(|e| PostgresStoreError::Mapping(format!("reopen_at: {e}")))?;
715                        AssertionKind::Reopen { reopen_at: dt }
716                    }
717                    other => {
718                        return Err(PostgresStoreError::Mapping(format!(
719                            "unknown assertion_kind: {other}"
720                        )))
721                    }
722                };
723
724                Ok(ValidityAssertion {
725                    assertion_ref,
726                    agent_id: AgentId(agent_id_str),
727                    target_claim,
728                    kind,
729                    provenance,
730                    confidence: Confidence {
731                        value_confidence: value_confidence as f32,
732                        valid_time_confidence: valid_time_confidence as f32,
733                    },
734                    asserted_at: TransactionTime(asserted_at),
735                })
736            })
737            .collect()
738    }
739
740    /// Load ledger entries for an agent, optionally starting from `from` (inclusive),
741    /// limited to `limit` rows, ordered by recorded_at ASC.
742    fn load_ledger(
743        &self,
744        agent_id: &AgentId,
745        from: Option<&TransactionTime>,
746        limit: usize,
747    ) -> Result<Vec<LedgerEntry>, PostgresStoreError> {
748        let mut conn = self.pool.get()?;
749        let limit_i64 = limit as i64;
750
751        let map_row = |row: &postgres::Row| -> Result<LedgerEntry, PostgresStoreError> {
752            let entry_id_str: String = row.get(0);
753            let agent_id_str: String = row.get(1);
754            let claim_id_str: String = row.get(2);
755            let event_kind_str: String = row.get(3);
756            let disposition_str: String = row.get(4);
757            let rationale_json: Option<String> = row.get(5);
758            let recorded_at_str: String = row.get(6);
759
760            let entry_id = uuid::Uuid::parse_str(&entry_id_str)
761                .map_err(|e| PostgresStoreError::Mapping(format!("entry_id UUID: {e}")))?;
762            let claim_id = uuid::Uuid::parse_str(&claim_id_str)
763                .map(ClaimRef)
764                .map_err(|e| PostgresStoreError::Mapping(format!("claim_id UUID: {e}")))?;
765            let event_kind = str_to_ledger_event_kind(&event_kind_str)?;
766            let disposition = str_to_disposition(&disposition_str)?;
767            let rationale: Option<serde_json::Value> = rationale_json
768                .map(|s| {
769                    serde_json::from_str(&s)
770                        .map_err(|e| PostgresStoreError::Mapping(format!("rationale JSON: {e}")))
771                })
772                .transpose()?;
773            let recorded_at = chrono::DateTime::parse_from_rfc3339(&recorded_at_str)
774                .map(|dt| dt.with_timezone(&chrono::Utc))
775                .map_err(|e| PostgresStoreError::Mapping(format!("recorded_at: {e}")))?;
776
777            Ok(LedgerEntry {
778                entry_id,
779                agent_id: AgentId(agent_id_str),
780                claim_ref: claim_id,
781                event_kind,
782                disposition,
783                rationale,
784                recorded_at: TransactionTime(recorded_at),
785            })
786        };
787
788        let rows = if let Some(from_time) = from {
789            let from_str = from_time.0.to_rfc3339();
790            conn.query(
791                "SELECT entry_id, agent_id, claim_id, event_kind, disposition, rationale::text, recorded_at
792                 FROM ledger_entries
793                 WHERE agent_id = $1 AND recorded_at >= $2
794                 ORDER BY recorded_at ASC
795                 LIMIT $3",
796                &[&agent_id.0.as_str(), &from_str.as_str(), &limit_i64],
797            )?
798        } else {
799            conn.query(
800                "SELECT entry_id, agent_id, claim_id, event_kind, disposition, rationale::text, recorded_at
801                 FROM ledger_entries
802                 WHERE agent_id = $1
803                 ORDER BY recorded_at ASC
804                 LIMIT $2",
805                &[&agent_id.0.as_str(), &limit_i64],
806            )?
807        };
808
809        rows.iter().map(map_row).collect()
810    }
811
812    /// Load ALL ledger entries for the given claim refs, no row cap.
813    ///
814    /// Uses `claim_id = ANY($2::text[])` to avoid per-parameter binding limits.
815    fn load_ledger_for_claims(
816        &self,
817        agent_id: &AgentId,
818        claim_refs: &[ClaimRef],
819    ) -> Result<Vec<LedgerEntry>, PostgresStoreError> {
820        if claim_refs.is_empty() {
821            return Ok(vec![]);
822        }
823
824        let mut conn = self.pool.get()?;
825
826        let map_row = |row: &postgres::Row| -> Result<LedgerEntry, PostgresStoreError> {
827            let entry_id_str: String = row.get(0);
828            let agent_id_str: String = row.get(1);
829            let claim_id_str: String = row.get(2);
830            let event_kind_str: String = row.get(3);
831            let disposition_str: String = row.get(4);
832            let rationale_json: Option<String> = row.get(5);
833            let recorded_at_str: String = row.get(6);
834
835            let entry_id = uuid::Uuid::parse_str(&entry_id_str)
836                .map_err(|e| PostgresStoreError::Mapping(format!("entry_id UUID: {e}")))?;
837            let claim_id = uuid::Uuid::parse_str(&claim_id_str)
838                .map(ClaimRef)
839                .map_err(|e| PostgresStoreError::Mapping(format!("claim_id UUID: {e}")))?;
840            let event_kind = str_to_ledger_event_kind(&event_kind_str)?;
841            let disposition = str_to_disposition(&disposition_str)?;
842            let rationale: Option<serde_json::Value> = rationale_json
843                .map(|s| {
844                    serde_json::from_str(&s)
845                        .map_err(|e| PostgresStoreError::Mapping(format!("rationale JSON: {e}")))
846                })
847                .transpose()?;
848            let recorded_at = chrono::DateTime::parse_from_rfc3339(&recorded_at_str)
849                .map(|dt| dt.with_timezone(&chrono::Utc))
850                .map_err(|e| PostgresStoreError::Mapping(format!("recorded_at: {e}")))?;
851
852            Ok(LedgerEntry {
853                entry_id,
854                agent_id: AgentId(agent_id_str),
855                claim_ref: claim_id,
856                event_kind,
857                disposition,
858                rationale,
859                recorded_at: TransactionTime(recorded_at),
860            })
861        };
862
863        // Pass the claim refs as a Postgres text array; ANY avoids per-param binding limits.
864        let id_strings: Vec<String> = claim_refs.iter().map(|r| r.0.to_string()).collect();
865        let ids_ref: Vec<&str> = id_strings.iter().map(|s| s.as_str()).collect();
866
867        let rows = conn.query(
868            "SELECT entry_id, agent_id, claim_id, event_kind, disposition, rationale::text, recorded_at
869             FROM ledger_entries
870             WHERE agent_id = $1 AND claim_id = ANY($2)
871             ORDER BY recorded_at ASC",
872            &[&agent_id.0.as_str(), &ids_ref.as_slice()],
873        )?;
874
875        rows.iter().map(map_row).collect()
876    }
877
878    /// Load all edges where `claim_ref` is either the from or to end, ordered by created_at ASC.
879    fn load_edges_for(
880        &self,
881        agent_id: &AgentId,
882        claim_ref: &ClaimRef,
883    ) -> Result<Vec<ClaimEdge>, PostgresStoreError> {
884        let mut conn = self.pool.get()?;
885        let claim_id_str = claim_ref.0.to_string();
886
887        let rows = conn.query(
888            "SELECT edge_id, agent_id, from_claim_id, to_claim_id, edge_kind, created_at
889             FROM claim_edges
890             WHERE agent_id = $1
891               AND (from_claim_id = $2 OR to_claim_id = $2)
892             ORDER BY created_at ASC",
893            &[&agent_id.0.as_str(), &claim_id_str.as_str()],
894        )?;
895
896        rows.iter().map(row_to_edge).collect()
897    }
898
899    /// Load the set of ClaimRefs served as injected claims for this agent (used by the Amplification Guard).
900    fn load_injected_claims(
901        &self,
902        agent_id: &AgentId,
903    ) -> Result<Vec<ClaimRef>, PostgresStoreError> {
904        let mut conn = self.pool.get()?;
905
906        let rows = conn.query(
907            "SELECT claim_id
908             FROM ledger_entries
909             WHERE agent_id = $1 AND event_kind = 'ServedAsInjected'
910             GROUP BY claim_id
911             ORDER BY MIN(recorded_at) ASC",
912            &[&agent_id.0.as_str()],
913        )?;
914
915        rows.iter()
916            .map(|row| {
917                let claim_id_str: String = row.get(0);
918                uuid::Uuid::parse_str(&claim_id_str)
919                    .map(ClaimRef)
920                    .map_err(|e| PostgresStoreError::Mapping(format!("claim_id UUID: {e}")))
921            })
922            .collect()
923    }
924
925    /// Recursive CTE lineage traversal — identical SQL to the SQLite adapter.
926    ///
927    /// Traverses `DerivedFrom` edges upward from `claim_ref`, returning all `ClaimEdge`
928    /// rows in the lineage sub-graph ordered by depth ASC, then created_at ASC within depth.
929    /// Bounded at depth 64 to prevent runaway on pathological graphs.
930    fn load_lineage(
931        &self,
932        agent_id: &AgentId,
933        claim_ref: &ClaimRef,
934    ) -> Result<Vec<ClaimEdge>, PostgresStoreError> {
935        let mut conn = self.pool.get()?;
936        let start_id = claim_ref.0.to_string();
937
938        let rows = conn.query(
939            "WITH RECURSIVE lineage(edge_id, depth) AS (
940                -- Base case: all DerivedFrom edges leaving from our starting claim
941                SELECT ce.edge_id, 1
942                FROM claim_edges ce
943                WHERE ce.agent_id = $1
944                  AND ce.from_claim_id = $2
945                  AND ce.edge_kind = 'DerivedFrom'
946                UNION ALL
947                -- Recursive case: follow the to_claim of the previous edge onward
948                SELECT ce2.edge_id, l.depth + 1
949                FROM claim_edges ce2
950                JOIN lineage l ON ce2.from_claim_id = (
951                    SELECT to_claim_id FROM claim_edges WHERE edge_id = l.edge_id
952                )
953                WHERE ce2.agent_id = $1
954                  AND ce2.edge_kind = 'DerivedFrom'
955                  AND l.depth < 64
956            )
957            SELECT ce.edge_id, ce.agent_id, ce.from_claim_id, ce.to_claim_id,
958                   ce.edge_kind, ce.created_at,
959                   l.depth
960            FROM claim_edges ce
961            JOIN lineage l ON ce.edge_id = l.edge_id
962            ORDER BY l.depth ASC, ce.created_at ASC",
963            &[&agent_id.0.as_str(), &start_id.as_str()],
964        )?;
965
966        rows.iter()
967            .map(|row| {
968                let edge_id_str: String = row.get(0);
969                let agent_id_str: String = row.get(1);
970                let from_claim_str: String = row.get(2);
971                let to_claim_str: String = row.get(3);
972                let kind_str: String = row.get(4);
973                let created_at_str: String = row.get(5);
974                // col 6 = depth (ordering only; not part of ClaimEdge)
975
976                let edge_id = uuid::Uuid::parse_str(&edge_id_str)
977                    .map_err(|e| PostgresStoreError::Mapping(format!("edge_id UUID: {e}")))?;
978                let from_claim = uuid::Uuid::parse_str(&from_claim_str)
979                    .map(ClaimRef)
980                    .map_err(|e| PostgresStoreError::Mapping(format!("from_claim UUID: {e}")))?;
981                let to_claim = uuid::Uuid::parse_str(&to_claim_str)
982                    .map(ClaimRef)
983                    .map_err(|e| PostgresStoreError::Mapping(format!("to_claim UUID: {e}")))?;
984                let kind = str_to_edge_kind(&kind_str)?;
985                let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
986                    .map(|dt| dt.with_timezone(&chrono::Utc))
987                    .map_err(|e| PostgresStoreError::Mapping(format!("created_at: {e}")))?;
988
989                Ok(ClaimEdge {
990                    edge_id,
991                    agent_id: AgentId(agent_id_str),
992                    from_claim,
993                    to_claim,
994                    kind,
995                    created_at: TransactionTime(created_at),
996                })
997            })
998            .collect()
999    }
1000
1001    /// Postgres uses a pool + per-agent advisory lock — no global write lock is needed.
1002    fn requires_global_write_serialization(&self) -> bool {
1003        false
1004    }
1005}
1006
1007// ── PostgresPendingStore ──────────────────────────────────────────────────────
1008
1009/// PostgreSQL-backed `PendingAdjudicationPort` implementation.
1010///
1011/// Uses the same r2d2 pool as `PostgresPersistenceStore`. Each method borrows a pooled
1012/// connection for the duration of a single non-transactional statement (auto-commit).
1013/// Serialization is provided by the per-agent write lock in `EngineHandle`.
1014pub struct PostgresPendingStore {
1015    pool: r2d2::Pool<r2d2_postgres::PostgresConnectionManager<postgres::NoTls>>,
1016}
1017
1018impl PostgresPendingStore {
1019    /// Create a pending store sharing the same connection pool.
1020    pub fn new(pool: r2d2::Pool<r2d2_postgres::PostgresConnectionManager<postgres::NoTls>>) -> Self {
1021        Self { pool }
1022    }
1023}
1024
1025impl PendingAdjudicationPort for PostgresPendingStore {
1026    type Error = PostgresStoreError;
1027
1028    fn insert_pending(&self, row: &PendingAdjudicationRow) -> Result<(), PostgresStoreError> {
1029        let mut conn = self.pool.get()?;
1030        let request_payload = serde_json::to_string(&row.request_payload)
1031            .map_err(|e| PostgresStoreError::Mapping(format!("request_payload serialization: {e}")))?;
1032        // queued_at and expires_at are TIMESTAMPTZ columns — pass as chrono::DateTime<Utc>
1033        // directly (requires postgres feature "with-chrono-0_4"). Passing as String caused
1034        // WrongType { postgres: Timestamptz, rust: "alloc::string::String" } errors.
1035        let queued_at: chrono::DateTime<chrono::Utc> = row.queued_at;
1036        let expires_at: Option<chrono::DateTime<chrono::Utc>> = row.expires_at;
1037        conn.execute(
1038            "INSERT INTO pending_adjudications (
1039                handle_id, agent_id, subject, predicate,
1040                challenger_claim_ref, incumbent_claim_ref,
1041                request_payload, queued_at, expires_at, status
1042            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
1043            &[
1044                &row.handle_id.to_string(),
1045                &row.agent_id.0.as_str(),
1046                &row.subject.as_str(),
1047                &row.predicate.as_str(),
1048                &row.challenger_claim_ref.0.to_string(),
1049                &row.incumbent_claim_ref.0.to_string(),
1050                &request_payload.as_str(),
1051                &queued_at,
1052                &expires_at,
1053                &row.status.as_str(),
1054            ],
1055        )?;
1056        Ok(())
1057    }
1058
1059    fn get_pending(&self, handle_id: uuid::Uuid) -> Result<Option<PendingAdjudicationRow>, PostgresStoreError> {
1060        let mut conn = self.pool.get()?;
1061        let rows = conn.query(
1062            "SELECT handle_id, agent_id, subject, predicate,
1063                    challenger_claim_ref, incumbent_claim_ref,
1064                    request_payload, queued_at, expires_at, status
1065             FROM pending_adjudications
1066             WHERE handle_id = $1",
1067            &[&handle_id.to_string()],
1068        )?;
1069        match rows.into_iter().next() {
1070            None => Ok(None),
1071            Some(row) => Ok(Some(pg_row_to_pending(&row)?)),
1072        }
1073    }
1074
1075    fn list_pending(&self, agent_id: Option<&AgentId>) -> Result<Vec<PendingAdjudicationRow>, PostgresStoreError> {
1076        let mut conn = self.pool.get()?;
1077        let rows = if let Some(aid) = agent_id {
1078            conn.query(
1079                "SELECT handle_id, agent_id, subject, predicate,
1080                        challenger_claim_ref, incumbent_claim_ref,
1081                        request_payload, queued_at, expires_at, status
1082                 FROM pending_adjudications
1083                 WHERE agent_id = $1 AND status = 'pending'
1084                 ORDER BY queued_at ASC",
1085                &[&aid.0.as_str()],
1086            )?
1087        } else {
1088            conn.query(
1089                "SELECT handle_id, agent_id, subject, predicate,
1090                        challenger_claim_ref, incumbent_claim_ref,
1091                        request_payload, queued_at, expires_at, status
1092                 FROM pending_adjudications
1093                 WHERE status = 'pending'
1094                 ORDER BY queued_at ASC",
1095                &[],
1096            )?
1097        };
1098        rows.iter().map(pg_row_to_pending).collect()
1099    }
1100
1101    fn list_expired(&self, now: chrono::DateTime<chrono::Utc>) -> Result<Vec<PendingAdjudicationRow>, PostgresStoreError> {
1102        let mut conn = self.pool.get()?;
1103        // Pass now as chrono::DateTime<Utc> directly — expires_at is TIMESTAMPTZ.
1104        let rows = conn.query(
1105            "SELECT handle_id, agent_id, subject, predicate,
1106                    challenger_claim_ref, incumbent_claim_ref,
1107                    request_payload, queued_at, expires_at, status
1108             FROM pending_adjudications
1109             WHERE expires_at IS NOT NULL AND expires_at <= $1 AND status = 'pending'
1110             ORDER BY expires_at ASC",
1111            &[&now],
1112        )?;
1113        rows.iter().map(pg_row_to_pending).collect()
1114    }
1115
1116    fn mark_resolved(&self, handle_id: uuid::Uuid) -> Result<(), PostgresStoreError> {
1117        let mut conn = self.pool.get()?;
1118        conn.execute(
1119            "UPDATE pending_adjudications SET status = 'resolved' WHERE handle_id = $1",
1120            &[&handle_id.to_string()],
1121        )?;
1122        Ok(())
1123    }
1124
1125    fn mark_expired(&self, handle_id: uuid::Uuid) -> Result<(), PostgresStoreError> {
1126        let mut conn = self.pool.get()?;
1127        conn.execute(
1128            "UPDATE pending_adjudications SET status = 'expired' WHERE handle_id = $1",
1129            &[&handle_id.to_string()],
1130        )?;
1131        Ok(())
1132    }
1133
1134    /// Detect QueuedForAdjudication claims with no matching pending row (Postgres adapter).
1135    ///
1136    /// Same approach as SQLite: cross-table query joining ledger_entries + claims + pending_adjudications.
1137    fn list_queued_orphan_claims(
1138        &self,
1139    ) -> Result<Vec<mempill_core::ports::pending_adjudication::OrphanedQueuedClaim>, PostgresStoreError> {
1140        let mut conn = self.pool.get()?;
1141
1142        // Phase 1: find orphaned QueuedForAdjudication claim refs.
1143        // NOTE: the schema column is `claim_id` in both `ledger_entries` and `claims`
1144        // (not `claim_ref` — that was the original bug caught by live PG tests).
1145        let orphan_rows = conn.query(
1146            "SELECT l.agent_id, l.claim_id, c.subject, c.predicate
1147             FROM ledger_entries l
1148             JOIN claims c ON c.claim_id = l.claim_id AND c.agent_id = l.agent_id
1149             WHERE l.disposition = 'QueuedForAdjudication'
1150               AND l.recorded_at = (
1151                   SELECT MAX(l2.recorded_at) FROM ledger_entries l2
1152                   WHERE l2.claim_id = l.claim_id AND l2.agent_id = l.agent_id
1153               )
1154               AND NOT EXISTS (
1155                   SELECT 1 FROM pending_adjudications pa
1156                   WHERE pa.challenger_claim_ref = l.claim_id
1157                     AND pa.agent_id = l.agent_id
1158                     AND pa.status = 'pending'
1159               )",
1160            &[],
1161        )?;
1162
1163        let mut results = Vec::new();
1164        for row in &orphan_rows {
1165            let agent_id_str: String = row.get(0);
1166            let challenger_str: String = row.get(1);
1167            let subject: String = row.get(2);
1168            let predicate: String = row.get(3);
1169
1170            let challenger_ref = uuid::Uuid::parse_str(&challenger_str)
1171                .map(mempill_types::ClaimRef)
1172                .map_err(|e| PostgresStoreError::Mapping(format!("challenger_claim_ref UUID: {e}")))?;
1173
1174            // Phase 2: find incumbent CommittedCheap claim on the same subject line.
1175            // NOTE: schema column is `claim_id` (not `claim_ref`) in both tables.
1176            let incumbent_rows = conn.query(
1177                "SELECT l.claim_id
1178                 FROM ledger_entries l
1179                 JOIN claims c ON c.claim_id = l.claim_id AND c.agent_id = l.agent_id
1180                 WHERE l.agent_id = $1
1181                   AND c.subject = $2
1182                   AND c.predicate = $3
1183                   AND l.disposition = 'CommittedCheap'
1184                   AND l.recorded_at = (
1185                       SELECT MAX(l2.recorded_at) FROM ledger_entries l2
1186                       WHERE l2.claim_id = l.claim_id AND l2.agent_id = l.agent_id
1187                   )
1188                 ORDER BY l.recorded_at DESC
1189                 LIMIT 1",
1190                &[&agent_id_str.as_str(), &subject.as_str(), &predicate.as_str()],
1191            )?;
1192
1193            let incumbent_ref = incumbent_rows.first()
1194                .map(|ir| {
1195                    let ref_str: String = ir.get(0);
1196                    uuid::Uuid::parse_str(&ref_str)
1197                        .map(mempill_types::ClaimRef)
1198                        .map_err(|e| PostgresStoreError::Mapping(format!("incumbent UUID: {e}")))
1199                })
1200                .transpose()?;
1201
1202            results.push(mempill_core::ports::pending_adjudication::OrphanedQueuedClaim {
1203                agent_id: mempill_types::AgentId(agent_id_str),
1204                challenger_claim_ref: challenger_ref,
1205                incumbent_claim_ref: incumbent_ref,
1206                subject,
1207                predicate,
1208            });
1209        }
1210
1211        Ok(results)
1212    }
1213}
1214
1215/// Map a Postgres `Row` from `pending_adjudications` to a `PendingAdjudicationRow`.
1216///
1217/// `queued_at` and `expires_at` are `TIMESTAMPTZ` columns; we read them as
1218/// `chrono::DateTime<chrono::Utc>` directly via the `with-chrono-0_4` postgres feature.
1219/// All other UUID-like columns are stored as TEXT and parsed manually.
1220fn pg_row_to_pending(row: &postgres::Row) -> Result<PendingAdjudicationRow, PostgresStoreError> {
1221    let handle_id_str: String = row.get(0);
1222    let agent_id_str: String = row.get(1);
1223    let subject: String = row.get(2);
1224    let predicate: String = row.get(3);
1225    let challenger_str: String = row.get(4);
1226    let incumbent_str: String = row.get(5);
1227    let payload_json: String = row.get(6);
1228    // TIMESTAMPTZ columns — read as native chrono type (with-chrono-0_4 feature).
1229    let queued_at: chrono::DateTime<chrono::Utc> = row.get(7);
1230    let expires_at: Option<chrono::DateTime<chrono::Utc>> = row.get(8);
1231    let status: String = row.get(9);
1232
1233    let handle_id = uuid::Uuid::parse_str(&handle_id_str)
1234        .map_err(|e| PostgresStoreError::Mapping(format!("handle_id UUID: {e}")))?;
1235    let challenger_claim_ref = uuid::Uuid::parse_str(&challenger_str)
1236        .map(ClaimRef)
1237        .map_err(|e| PostgresStoreError::Mapping(format!("challenger_claim_ref UUID: {e}")))?;
1238    let incumbent_claim_ref = uuid::Uuid::parse_str(&incumbent_str)
1239        .map(ClaimRef)
1240        .map_err(|e| PostgresStoreError::Mapping(format!("incumbent_claim_ref UUID: {e}")))?;
1241    let request_payload: mempill_types::AdjudicationRequest =
1242        serde_json::from_str(&payload_json)
1243            .map_err(|e| PostgresStoreError::Mapping(format!("request_payload JSON: {e}")))?;
1244
1245    Ok(PendingAdjudicationRow {
1246        handle_id,
1247        agent_id: AgentId(agent_id_str),
1248        subject,
1249        predicate,
1250        challenger_claim_ref,
1251        incumbent_claim_ref,
1252        request_payload,
1253        queued_at,
1254        expires_at,
1255        status,
1256    })
1257}
1258
1259// ── Constructor ───────────────────────────────────────────────────────────────
1260
1261/// Convenience constructor: build a `PostgresEngine<O, V>` (an `EngineHandle` backed
1262/// by `PostgresPersistenceStore`) from a connection string.
1263///
1264/// This is the recommended entry point for callers that want the full async EngineHandle.
1265pub fn open_postgres<O, V>(
1266    connection_string: &str,
1267    oracle: Option<Arc<O>>,
1268    vector: Option<Arc<V>>,
1269    config: EngineConfig,
1270) -> Result<EngineHandle<PostgresPersistenceStore, O, V>, PostgresStoreError>
1271where
1272    O: mempill_core::ports::OraclePort + Send + Sync + 'static,
1273    V: mempill_core::ports::VectorPort + Send + Sync + 'static,
1274{
1275    let store = PostgresPersistenceStore::new(connection_string)?;
1276    Ok(EngineHandle::new(Arc::new(store), oracle, vector, config))
1277}
1278
1279/// Convenience constructor: build a `PostgresEngine<O, V>` wired with a real oracle
1280/// and the Postgres-backed pending-adjudication store.
1281///
1282/// Mirrors `open_postgres` but calls `EngineHandle::new_with_pending_store` so that
1283/// `QueuedForAdjudication` rows are persisted and verdicts can be delivered via
1284/// `EngineHandle::submit_adjudication`.
1285///
1286/// `open_postgres` (no-oracle variant) is left UNCHANGED.
1287pub fn open_postgres_with_oracle<O, V>(
1288    connection_string: &str,
1289    oracle: Arc<O>,
1290    vector: Option<Arc<V>>,
1291    config: EngineConfig,
1292) -> Result<EngineHandle<PostgresPersistenceStore, O, V>, PostgresStoreError>
1293where
1294    O: mempill_core::ports::OraclePort + Send + Sync + 'static,
1295    V: mempill_core::ports::VectorPort + Send + Sync + 'static,
1296{
1297    let store = PostgresPersistenceStore::new(connection_string)?;
1298    let store_arc = Arc::new(store);
1299    let pending_store: Arc<dyn mempill_core::ErasedPendingStore> = Arc::new(
1300        mempill_core::ErasedPendingStoreAdapter::new(store_arc.pending_store()),
1301    );
1302    Ok(EngineHandle::new_with_pending_store::<()>(
1303        store_arc,
1304        Some(oracle),
1305        vector,
1306        pending_store,
1307        config,
1308    ))
1309}