1use std::sync::Arc;
29
30use mempill_core::{
31 ports::pending_adjudication::{PendingAdjudicationPort, PendingAdjudicationRow},
32 ports::persistence::PersistencePort,
33 EngineConfig, EngineHandle,
34};
35use mempill_types::{
36 claim::{Cardinality, Claim, Confidence, Criticality, Fact},
37 disposition::Disposition,
38 edge::{ClaimEdge, EdgeKind},
39 identity::{AgentId, ClaimRef},
40 ledger::{LedgerEntry, LedgerEventKind},
41 provenance::{ExternalAnchor, ExternalKind, ProvenanceLabel},
42 time::{TransactionTime, ValidTime},
43 validity::{AssertionKind, ValidityAssertion},
44};
45
46use crate::{
47 connection::{PostgresPersistenceStore, PostgresStoreError},
48 txn::PostgresTxn,
49};
50
51fn provenance_to_str(p: &ProvenanceLabel) -> &'static str {
54 match p {
55 ProvenanceLabel::ModelDerived => "ModelDerived",
56 ProvenanceLabel::RecallReEntry => "RecallReEntry",
57 ProvenanceLabel::External(ExternalKind::UserAsserted) => "External_UserAsserted",
58 ProvenanceLabel::External(ExternalKind::ExternalFirstHand) => "External_ExternalFirstHand",
59 _ => "Unknown",
60 }
61}
62
63fn str_to_provenance(s: &str) -> Result<ProvenanceLabel, PostgresStoreError> {
64 match s {
65 "ModelDerived" => Ok(ProvenanceLabel::ModelDerived),
66 "RecallReEntry" => Ok(ProvenanceLabel::RecallReEntry),
67 "External_UserAsserted" => Ok(ProvenanceLabel::External(ExternalKind::UserAsserted)),
68 "External_ExternalFirstHand" => Ok(ProvenanceLabel::External(ExternalKind::ExternalFirstHand)),
69 other => Err(PostgresStoreError::Mapping(format!("unknown provenance_label: {other}"))),
70 }
71}
72
73fn cardinality_to_str(c: &Cardinality) -> &'static str {
74 match c {
75 Cardinality::Functional => "Functional",
76 Cardinality::SetValued => "SetValued",
77 Cardinality::Unknown => "Unknown",
78 }
79}
80
81fn str_to_cardinality(s: &str) -> Result<Cardinality, PostgresStoreError> {
82 match s {
83 "Functional" => Ok(Cardinality::Functional),
84 "SetValued" => Ok(Cardinality::SetValued),
85 "Unknown" => Ok(Cardinality::Unknown),
86 other => Err(PostgresStoreError::Mapping(format!("unknown cardinality: {other}"))),
87 }
88}
89
90fn criticality_to_str(c: &Criticality) -> &'static str {
91 match c {
92 Criticality::Low => "Low",
93 Criticality::Medium => "Medium",
94 Criticality::High => "High",
95 Criticality::Critical => "Critical",
96 }
97}
98
99fn str_to_criticality(s: &str) -> Result<Criticality, PostgresStoreError> {
100 match s {
101 "Low" => Ok(Criticality::Low),
102 "Medium" => Ok(Criticality::Medium),
103 "High" => Ok(Criticality::High),
104 "Critical" => Ok(Criticality::Critical),
105 other => Err(PostgresStoreError::Mapping(format!("unknown criticality: {other}"))),
106 }
107}
108
109fn edge_kind_to_str(k: &EdgeKind) -> &'static str {
110 match k {
111 EdgeKind::DerivedFrom => "DerivedFrom",
112 EdgeKind::Supersedes => "Supersedes",
113 EdgeKind::DependsOn => "DependsOn",
114 EdgeKind::MutualExclusion => "MutualExclusion",
115 _ => "Unknown",
117 }
118}
119
120fn str_to_edge_kind(s: &str) -> Result<EdgeKind, PostgresStoreError> {
121 match s {
122 "DerivedFrom" => Ok(EdgeKind::DerivedFrom),
123 "Supersedes" => Ok(EdgeKind::Supersedes),
124 "DependsOn" => Ok(EdgeKind::DependsOn),
125 "MutualExclusion" => Ok(EdgeKind::MutualExclusion),
126 other => Err(PostgresStoreError::Mapping(format!("unknown edge_kind: {other}"))),
127 }
128}
129
130fn ledger_event_kind_to_str(k: &LedgerEventKind) -> &'static str {
131 match k {
132 LedgerEventKind::ClaimCommitted => "ClaimCommitted",
133 LedgerEventKind::ValidityAsserted => "ValidityAsserted",
134 LedgerEventKind::AdjudicationRequested => "AdjudicationRequested",
135 LedgerEventKind::AdjudicationResolved => "AdjudicationResolved",
136 LedgerEventKind::RecallReEntryDetected => "RecallReEntryDetected",
137 LedgerEventKind::Quarantined => "Quarantined",
138 LedgerEventKind::DependentFlaggedPendingReview => "DependentFlaggedPendingReview",
139 LedgerEventKind::ServedAsInjected => "ServedAsInjected",
140 LedgerEventKind::AdjudicationExpired => "AdjudicationExpired",
141 _ => "Unknown",
143 }
144}
145
146fn str_to_ledger_event_kind(s: &str) -> Result<LedgerEventKind, PostgresStoreError> {
147 match s {
148 "ClaimCommitted" => Ok(LedgerEventKind::ClaimCommitted),
149 "ValidityAsserted" => Ok(LedgerEventKind::ValidityAsserted),
150 "AdjudicationRequested" => Ok(LedgerEventKind::AdjudicationRequested),
151 "AdjudicationResolved" => Ok(LedgerEventKind::AdjudicationResolved),
152 "RecallReEntryDetected" => Ok(LedgerEventKind::RecallReEntryDetected),
153 "Quarantined" => Ok(LedgerEventKind::Quarantined),
154 "DependentFlaggedPendingReview" => Ok(LedgerEventKind::DependentFlaggedPendingReview),
155 "ServedAsInjected" => Ok(LedgerEventKind::ServedAsInjected),
156 "AdjudicationExpired" => Ok(LedgerEventKind::AdjudicationExpired),
157 other => Err(PostgresStoreError::Mapping(format!("unknown ledger event_kind: {other}"))),
158 }
159}
160
161fn disposition_to_str(d: &Disposition) -> &'static str {
162 match d {
163 Disposition::CommittedCheap => "CommittedCheap",
164 Disposition::CommittedInferred => "CommittedInferred",
165 Disposition::QueuedForAdjudication => "QueuedForAdjudication",
166 Disposition::Contested => "Contested",
167 Disposition::PendingConflict => "PendingConflict",
168 Disposition::PendingReview => "PendingReview",
169 Disposition::PendingLowConfidence => "PendingLowConfidence",
170 Disposition::Quarantined => "Quarantined",
171 Disposition::Superseded => "Superseded",
172 Disposition::Invalidated => "Invalidated",
173 Disposition::Reinstated => "Reinstated",
174 Disposition::Rejected => "Rejected",
175 _ => "Unknown",
177 }
178}
179
180fn str_to_disposition(s: &str) -> Result<Disposition, PostgresStoreError> {
181 match s {
182 "CommittedCheap" => Ok(Disposition::CommittedCheap),
183 "CommittedInferred" => Ok(Disposition::CommittedInferred),
184 "QueuedForAdjudication" => Ok(Disposition::QueuedForAdjudication),
185 "Contested" => Ok(Disposition::Contested),
186 "PendingConflict" => Ok(Disposition::PendingConflict),
187 "PendingReview" => Ok(Disposition::PendingReview),
188 "PendingLowConfidence" => Ok(Disposition::PendingLowConfidence),
189 "Quarantined" => Ok(Disposition::Quarantined),
190 "Superseded" => Ok(Disposition::Superseded),
191 "Invalidated" => Ok(Disposition::Invalidated),
192 "Reinstated" => Ok(Disposition::Reinstated),
193 "Rejected" => Ok(Disposition::Rejected),
194 other => Err(PostgresStoreError::Mapping(format!("unknown disposition: {other}"))),
195 }
196}
197
198const CLAIM_SELECT_COLS: &str = "
208 claim_id, agent_id, subject, predicate, value::text, cardinality,
209 provenance_label, nearest_external_anchor_id, derivation_depth,
210 tx_time, valid_time_start, valid_time_end, valid_time_confidence,
211 value_confidence, criticality, derived_from,
212 metadata::text, snapshot_schema_version
213";
214
215fn row_to_claim(row: &postgres::Row) -> Result<Claim, PostgresStoreError> {
237 let claim_id_str: String = row.get(0);
238 let agent_id_str: String = row.get(1);
239 let subject: String = row.get(2);
240 let predicate: String = row.get(3);
241 let value_json: String = row.get(4);
242 let cardinality_str: String = row.get(5);
243 let provenance_str: String = row.get(6);
244 let nearest_anchor_str: Option<String> = row.get(7);
245 let derivation_depth: i32 = row.get(8);
246 let tx_time_str: String = row.get(9);
247 let valid_time_start_str: Option<String> = row.get(10);
248 let valid_time_end_str: Option<String> = row.get(11);
249 let valid_time_confidence: f64 = row.get(12);
250 let value_confidence: f64 = row.get(13);
251 let criticality_str: String = row.get(14);
252 let derived_from_json: String = row.get(15);
253 let metadata_json: Option<String> = row.get(16);
254 let snapshot_schema_version_raw: Option<i32> = row.get(17);
255
256 let claim_id = uuid::Uuid::parse_str(&claim_id_str)
257 .map_err(|e| PostgresStoreError::Mapping(format!("claim_id UUID: {e}")))?;
258
259 let value: serde_json::Value = serde_json::from_str(&value_json)
260 .map_err(|e| PostgresStoreError::Mapping(format!("value JSON: {e}")))?;
261
262 let cardinality = str_to_cardinality(&cardinality_str)?;
263 let provenance = str_to_provenance(&provenance_str)?;
264
265 let nearest_external_anchor: Option<ClaimRef> = nearest_anchor_str
266 .map(|s| {
267 uuid::Uuid::parse_str(&s)
268 .map(ClaimRef)
269 .map_err(|e| PostgresStoreError::Mapping(format!("anchor UUID: {e}")))
270 })
271 .transpose()?;
272
273 let tx_time = chrono::DateTime::parse_from_rfc3339(&tx_time_str)
274 .map(|dt| dt.with_timezone(&chrono::Utc))
275 .map_err(|e| PostgresStoreError::Mapping(format!("tx_time parse: {e}")))?;
276
277 let valid_time_start = valid_time_start_str
278 .map(|s| {
279 chrono::DateTime::parse_from_rfc3339(&s)
280 .map(|dt| dt.with_timezone(&chrono::Utc))
281 .map_err(|e| PostgresStoreError::Mapping(format!("valid_time_start: {e}")))
282 })
283 .transpose()?;
284
285 let valid_time_end = valid_time_end_str
286 .map(|s| {
287 chrono::DateTime::parse_from_rfc3339(&s)
288 .map(|dt| dt.with_timezone(&chrono::Utc))
289 .map_err(|e| PostgresStoreError::Mapping(format!("valid_time_end: {e}")))
290 })
291 .transpose()?;
292
293 let criticality = str_to_criticality(&criticality_str)?;
294
295 let derived_from_uuids: Vec<String> = serde_json::from_str(&derived_from_json)
296 .map_err(|e| PostgresStoreError::Mapping(format!("derived_from JSON: {e}")))?;
297
298 let derived_from: Vec<ClaimRef> = derived_from_uuids
299 .iter()
300 .map(|s| {
301 uuid::Uuid::parse_str(s)
302 .map(ClaimRef)
303 .map_err(|e| PostgresStoreError::Mapping(format!("derived_from UUID: {e}")))
304 })
305 .collect::<Result<_, _>>()?;
306
307 let metadata: Option<serde_json::Value> = metadata_json
308 .map(|s| {
309 serde_json::from_str(&s)
310 .map_err(|e| PostgresStoreError::Mapping(format!("metadata JSON: {e}")))
311 })
312 .transpose()?;
313
314 let snapshot_schema_version: Option<u32> = snapshot_schema_version_raw.map(|v| v as u32);
315
316 Ok(Claim::new(
317 ClaimRef(claim_id),
318 AgentId(agent_id_str),
319 Fact { subject, predicate, value },
320 cardinality,
321 provenance,
322 ExternalAnchor {
323 nearest_external_anchor,
324 derivation_depth: derivation_depth as u32,
325 },
326 TransactionTime(tx_time),
327 ValidTime {
328 start: valid_time_start,
329 end: valid_time_end,
330 valid_time_confidence: valid_time_confidence as f32,
331 },
332 Confidence {
333 value_confidence: value_confidence as f32,
334 valid_time_confidence: valid_time_confidence as f32,
335 },
336 criticality,
337 derived_from,
338 metadata,
339 snapshot_schema_version,
340 ))
341}
342
343fn row_to_edge(row: &postgres::Row) -> Result<ClaimEdge, PostgresStoreError> {
345 let edge_id_str: String = row.get(0);
346 let agent_id_str: String = row.get(1);
347 let from_claim_str: String = row.get(2);
348 let to_claim_str: String = row.get(3);
349 let kind_str: String = row.get(4);
350 let created_at_str: String = row.get(5);
351
352 let edge_id = uuid::Uuid::parse_str(&edge_id_str)
353 .map_err(|e| PostgresStoreError::Mapping(format!("edge_id UUID: {e}")))?;
354 let from_claim = uuid::Uuid::parse_str(&from_claim_str)
355 .map(ClaimRef)
356 .map_err(|e| PostgresStoreError::Mapping(format!("from_claim UUID: {e}")))?;
357 let to_claim = uuid::Uuid::parse_str(&to_claim_str)
358 .map(ClaimRef)
359 .map_err(|e| PostgresStoreError::Mapping(format!("to_claim UUID: {e}")))?;
360 let kind = str_to_edge_kind(&kind_str)?;
361 let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
362 .map(|dt| dt.with_timezone(&chrono::Utc))
363 .map_err(|e| PostgresStoreError::Mapping(format!("created_at parse: {e}")))?;
364
365 Ok(ClaimEdge {
366 edge_id,
367 agent_id: AgentId(agent_id_str),
368 from_claim,
369 to_claim,
370 kind,
371 created_at: TransactionTime(created_at),
372 })
373}
374
375impl PostgresPersistenceStore {
378 pub fn pending_store(&self) -> PostgresPendingStore {
384 PostgresPendingStore::new(self.pool.clone())
385 }
386}
387
388impl PersistencePort for PostgresPersistenceStore {
389 type Transaction = PostgresTxn;
390 type Error = PostgresStoreError;
391
392 fn begin_atomic(&self, agent_id: &AgentId) -> Result<PostgresTxn, PostgresStoreError> {
399 let conn = self.pool.get()?;
400 PostgresTxn::begin(agent_id.clone(), conn)
401 }
402
403 fn commit(&self, txn: PostgresTxn) -> Result<(), PostgresStoreError> {
405 txn.commit_and_drop()
406 }
407
408 fn rollback(&self, txn: PostgresTxn) -> Result<(), PostgresStoreError> {
410 txn.rollback_and_drop()
411 }
412
413 fn append_claim(
419 &self,
420 txn: &mut PostgresTxn,
421 claim: &Claim,
422 ) -> Result<ClaimRef, PostgresStoreError> {
423 let claim_id = claim.claim_ref().0.to_string();
424 let agent_id = claim.agent_id().0.clone();
425 let fact = claim.fact();
426 let value_jsonb: &serde_json::Value = &fact.value;
431 let cardinality = cardinality_to_str(claim.cardinality()).to_owned();
432 let provenance = provenance_to_str(claim.provenance()).to_owned();
433 let anchor = claim.external_anchor();
434 let nearest_anchor: Option<String> =
435 anchor.nearest_external_anchor.as_ref().map(|r| r.0.to_string());
436 let derivation_depth = anchor.derivation_depth as i32;
437 let tx_time = claim.transaction_time().0.to_rfc3339();
438 let vt = claim.valid_time();
439 let valid_time_start: Option<String> = vt.start.map(|dt| dt.to_rfc3339());
440 let valid_time_end: Option<String> = vt.end.map(|dt| dt.to_rfc3339());
441 let valid_time_confidence = vt.valid_time_confidence as f64;
442 let conf = claim.confidence();
443 let value_confidence = conf.value_confidence as f64;
444 let criticality = criticality_to_str(claim.criticality()).to_owned();
445 let derived_from_refs: Vec<String> =
446 claim.derived_from().iter().map(|r| r.0.to_string()).collect();
447 let derived_from_json = serde_json::to_string(&derived_from_refs)
448 .map_err(|e| PostgresStoreError::Mapping(format!("derived_from serialization: {e}")))?;
449 let metadata_jsonb: Option<serde_json::Value> = claim.metadata().cloned();
451 let snapshot_schema_version: Option<i32> =
452 claim.snapshot_schema_version().map(|v| v as i32);
453
454 txn.client().execute(
455 "INSERT INTO claims (
456 claim_id, agent_id, subject, predicate, value, cardinality,
457 provenance_label, nearest_external_anchor_id, derivation_depth,
458 tx_time, valid_time_start, valid_time_end, valid_time_confidence,
459 value_confidence, criticality, derived_from,
460 metadata, snapshot_schema_version, embedding_model_id
461 ) VALUES (
462 $1, $2, $3, $4, $5, $6,
463 $7, $8, $9,
464 $10, $11, $12, $13,
465 $14, $15, $16,
466 $17, $18, NULL
467 )",
468 &[
469 &claim_id,
470 &agent_id,
471 &fact.subject.as_str(),
472 &fact.predicate.as_str(),
473 &value_jsonb,
474 &cardinality,
475 &provenance,
476 &nearest_anchor,
477 &derivation_depth,
478 &tx_time,
479 &valid_time_start,
480 &valid_time_end,
481 &valid_time_confidence,
482 &value_confidence,
483 &criticality,
484 &derived_from_json,
485 &metadata_jsonb,
486 &snapshot_schema_version,
487 ],
488 )?;
489
490 Ok(claim.claim_ref().clone())
491 }
492
493 fn append_validity_assertion(
495 &self,
496 txn: &mut PostgresTxn,
497 assertion: &ValidityAssertion,
498 ) -> Result<(), PostgresStoreError> {
499 let assertion_id = assertion.assertion_ref.to_string();
500 let agent_id = assertion.agent_id.0.clone();
501 let target_claim_id = assertion.target_claim.0.to_string();
502 let provenance = provenance_to_str(&assertion.provenance).to_owned();
503 let value_confidence = assertion.confidence.value_confidence as f64;
504 let valid_time_confidence = assertion.confidence.valid_time_confidence as f64;
505 let asserted_at = assertion.asserted_at.0.to_rfc3339();
506
507 let (assertion_kind, bound_at, reopen_at): (&str, Option<String>, Option<String>) =
508 match &assertion.kind {
509 AssertionKind::Bound { bound_at } => ("Bound", Some(bound_at.to_rfc3339()), None),
510 AssertionKind::Reopen { reopen_at } => ("Reopen", None, Some(reopen_at.to_rfc3339())),
511 _ => ("Unknown", None, None),
513 };
514
515 txn.client().execute(
516 "INSERT INTO validity_assertions (
517 assertion_id, agent_id, target_claim_id,
518 assertion_kind, bound_at, reopen_at,
519 provenance_label, value_confidence, valid_time_confidence, asserted_at
520 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
521 &[
522 &assertion_id,
523 &agent_id,
524 &target_claim_id,
525 &assertion_kind,
526 &bound_at,
527 &reopen_at,
528 &provenance,
529 &value_confidence,
530 &valid_time_confidence,
531 &asserted_at,
532 ],
533 )?;
534
535 Ok(())
536 }
537
538 fn append_ledger_entry(
547 &self,
548 txn: &mut PostgresTxn,
549 entry: &LedgerEntry,
550 ) -> Result<(), PostgresStoreError> {
551 let entry_id = entry.entry_id.to_string();
552 let agent_id = entry.agent_id.0.clone();
553 let claim_id = entry.claim_ref.0.to_string();
554 let event_kind = ledger_event_kind_to_str(&entry.event_kind).to_owned();
555 let disposition = disposition_to_str(&entry.disposition).to_owned();
556 let rationale_jsonb: Option<serde_json::Value> = entry.rationale.clone();
559 let recorded_at = entry.recorded_at.0.to_rfc3339();
560
561 let row = txn.client().query_one(
563 "SELECT COALESCE(MAX(stream_seq), 0) + 1 FROM ledger_entries WHERE agent_id = $1",
564 &[&agent_id],
565 )?;
566 let stream_seq: i64 = row.get(0);
567
568 txn.client().execute(
569 "INSERT INTO ledger_entries (
570 entry_id, agent_id, claim_id, event_kind, disposition, rationale, recorded_at, stream_seq
571 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
572 &[
573 &entry_id,
574 &agent_id,
575 &claim_id,
576 &event_kind,
577 &disposition,
578 &rationale_jsonb,
579 &recorded_at,
580 &stream_seq,
581 ],
582 )?;
583
584 Ok(())
585 }
586
587 fn append_claim_edge(
589 &self,
590 txn: &mut PostgresTxn,
591 edge: &ClaimEdge,
592 ) -> Result<(), PostgresStoreError> {
593 let edge_id = edge.edge_id.to_string();
594 let agent_id = edge.agent_id.0.clone();
595 let from_claim_id = edge.from_claim.0.to_string();
596 let to_claim_id = edge.to_claim.0.to_string();
597 let edge_kind = edge_kind_to_str(&edge.kind).to_owned();
598 let created_at = edge.created_at.0.to_rfc3339();
599
600 txn.client().execute(
601 "INSERT INTO claim_edges (
602 edge_id, agent_id, from_claim_id, to_claim_id, edge_kind, created_at
603 ) VALUES ($1, $2, $3, $4, $5, $6)",
604 &[
605 &edge_id,
606 &agent_id,
607 &from_claim_id,
608 &to_claim_id,
609 &edge_kind,
610 &created_at,
611 ],
612 )?;
613
614 Ok(())
615 }
616
617 fn load_subject_line(
621 &self,
622 agent_id: &AgentId,
623 subject: &str,
624 predicate: &str,
625 ) -> Result<Vec<Claim>, PostgresStoreError> {
626 let mut conn = self.pool.get()?;
627 let sql = format!(
628 "SELECT {CLAIM_SELECT_COLS} FROM claims
629 WHERE agent_id = $1 AND subject = $2 AND predicate = $3
630 ORDER BY tx_time ASC"
631 );
632 let rows = conn.query(
633 &sql,
634 &[&agent_id.0.as_str(), &subject, &predicate],
635 )?;
636 rows.iter().map(row_to_claim).collect()
637 }
638
639 fn load_claim(
641 &self,
642 agent_id: &AgentId,
643 claim_ref: &ClaimRef,
644 ) -> Result<Option<Claim>, PostgresStoreError> {
645 let mut conn = self.pool.get()?;
646 let claim_id_str = claim_ref.0.to_string();
647 let sql = format!(
648 "SELECT {CLAIM_SELECT_COLS} FROM claims WHERE agent_id = $1 AND claim_id = $2"
649 );
650 let rows = conn.query(&sql, &[&agent_id.0.as_str(), &claim_id_str.as_str()])?;
651 match rows.first() {
652 None => Ok(None),
653 Some(row) => Ok(Some(row_to_claim(row)?)),
654 }
655 }
656
657 fn load_validity_assertions_for(
659 &self,
660 agent_id: &AgentId,
661 claim_ref: &ClaimRef,
662 ) -> Result<Vec<ValidityAssertion>, PostgresStoreError> {
663 let mut conn = self.pool.get()?;
664 let claim_id_str = claim_ref.0.to_string();
665 let rows = conn.query(
666 "SELECT assertion_id, agent_id, target_claim_id,
667 assertion_kind, bound_at, reopen_at,
668 provenance_label, value_confidence, valid_time_confidence, asserted_at
669 FROM validity_assertions
670 WHERE agent_id = $1 AND target_claim_id = $2
671 ORDER BY asserted_at ASC",
672 &[&agent_id.0.as_str(), &claim_id_str.as_str()],
673 )?;
674
675 rows.iter()
676 .map(|row| {
677 let assertion_id_str: String = row.get(0);
678 let agent_id_str: String = row.get(1);
679 let target_claim_str: String = row.get(2);
680 let kind_str: String = row.get(3);
681 let bound_at_str: Option<String> = row.get(4);
682 let reopen_at_str: Option<String> = row.get(5);
683 let prov_str: String = row.get(6);
684 let value_confidence: f64 = row.get(7);
685 let valid_time_confidence: f64 = row.get(8);
686 let asserted_at_str: String = row.get(9);
687
688 let assertion_ref = uuid::Uuid::parse_str(&assertion_id_str)
689 .map_err(|e| PostgresStoreError::Mapping(format!("assertion_id UUID: {e}")))?;
690 let target_claim = uuid::Uuid::parse_str(&target_claim_str)
691 .map(ClaimRef)
692 .map_err(|e| PostgresStoreError::Mapping(format!("target_claim UUID: {e}")))?;
693 let provenance = str_to_provenance(&prov_str)?;
694 let asserted_at = chrono::DateTime::parse_from_rfc3339(&asserted_at_str)
695 .map(|dt| dt.with_timezone(&chrono::Utc))
696 .map_err(|e| PostgresStoreError::Mapping(format!("asserted_at: {e}")))?;
697
698 let kind = match kind_str.as_str() {
699 "Bound" => {
700 let s = bound_at_str.ok_or_else(|| {
701 PostgresStoreError::Mapping("bound_at is NULL for Bound assertion".into())
702 })?;
703 let dt = chrono::DateTime::parse_from_rfc3339(&s)
704 .map(|dt| dt.with_timezone(&chrono::Utc))
705 .map_err(|e| PostgresStoreError::Mapping(format!("bound_at: {e}")))?;
706 AssertionKind::Bound { bound_at: dt }
707 }
708 "Reopen" => {
709 let s = reopen_at_str.ok_or_else(|| {
710 PostgresStoreError::Mapping("reopen_at is NULL for Reopen assertion".into())
711 })?;
712 let dt = chrono::DateTime::parse_from_rfc3339(&s)
713 .map(|dt| dt.with_timezone(&chrono::Utc))
714 .map_err(|e| PostgresStoreError::Mapping(format!("reopen_at: {e}")))?;
715 AssertionKind::Reopen { reopen_at: dt }
716 }
717 other => {
718 return Err(PostgresStoreError::Mapping(format!(
719 "unknown assertion_kind: {other}"
720 )))
721 }
722 };
723
724 Ok(ValidityAssertion {
725 assertion_ref,
726 agent_id: AgentId(agent_id_str),
727 target_claim,
728 kind,
729 provenance,
730 confidence: Confidence {
731 value_confidence: value_confidence as f32,
732 valid_time_confidence: valid_time_confidence as f32,
733 },
734 asserted_at: TransactionTime(asserted_at),
735 })
736 })
737 .collect()
738 }
739
740 fn load_ledger(
743 &self,
744 agent_id: &AgentId,
745 from: Option<&TransactionTime>,
746 limit: usize,
747 ) -> Result<Vec<LedgerEntry>, PostgresStoreError> {
748 let mut conn = self.pool.get()?;
749 let limit_i64 = limit as i64;
750
751 let map_row = |row: &postgres::Row| -> Result<LedgerEntry, PostgresStoreError> {
752 let entry_id_str: String = row.get(0);
753 let agent_id_str: String = row.get(1);
754 let claim_id_str: String = row.get(2);
755 let event_kind_str: String = row.get(3);
756 let disposition_str: String = row.get(4);
757 let rationale_json: Option<String> = row.get(5);
758 let recorded_at_str: String = row.get(6);
759
760 let entry_id = uuid::Uuid::parse_str(&entry_id_str)
761 .map_err(|e| PostgresStoreError::Mapping(format!("entry_id UUID: {e}")))?;
762 let claim_id = uuid::Uuid::parse_str(&claim_id_str)
763 .map(ClaimRef)
764 .map_err(|e| PostgresStoreError::Mapping(format!("claim_id UUID: {e}")))?;
765 let event_kind = str_to_ledger_event_kind(&event_kind_str)?;
766 let disposition = str_to_disposition(&disposition_str)?;
767 let rationale: Option<serde_json::Value> = rationale_json
768 .map(|s| {
769 serde_json::from_str(&s)
770 .map_err(|e| PostgresStoreError::Mapping(format!("rationale JSON: {e}")))
771 })
772 .transpose()?;
773 let recorded_at = chrono::DateTime::parse_from_rfc3339(&recorded_at_str)
774 .map(|dt| dt.with_timezone(&chrono::Utc))
775 .map_err(|e| PostgresStoreError::Mapping(format!("recorded_at: {e}")))?;
776
777 Ok(LedgerEntry {
778 entry_id,
779 agent_id: AgentId(agent_id_str),
780 claim_ref: claim_id,
781 event_kind,
782 disposition,
783 rationale,
784 recorded_at: TransactionTime(recorded_at),
785 })
786 };
787
788 let rows = if let Some(from_time) = from {
789 let from_str = from_time.0.to_rfc3339();
790 conn.query(
791 "SELECT entry_id, agent_id, claim_id, event_kind, disposition, rationale::text, recorded_at
792 FROM ledger_entries
793 WHERE agent_id = $1 AND recorded_at >= $2
794 ORDER BY recorded_at ASC
795 LIMIT $3",
796 &[&agent_id.0.as_str(), &from_str.as_str(), &limit_i64],
797 )?
798 } else {
799 conn.query(
800 "SELECT entry_id, agent_id, claim_id, event_kind, disposition, rationale::text, recorded_at
801 FROM ledger_entries
802 WHERE agent_id = $1
803 ORDER BY recorded_at ASC
804 LIMIT $2",
805 &[&agent_id.0.as_str(), &limit_i64],
806 )?
807 };
808
809 rows.iter().map(map_row).collect()
810 }
811
812 fn load_ledger_for_claims(
816 &self,
817 agent_id: &AgentId,
818 claim_refs: &[ClaimRef],
819 ) -> Result<Vec<LedgerEntry>, PostgresStoreError> {
820 if claim_refs.is_empty() {
821 return Ok(vec![]);
822 }
823
824 let mut conn = self.pool.get()?;
825
826 let map_row = |row: &postgres::Row| -> Result<LedgerEntry, PostgresStoreError> {
827 let entry_id_str: String = row.get(0);
828 let agent_id_str: String = row.get(1);
829 let claim_id_str: String = row.get(2);
830 let event_kind_str: String = row.get(3);
831 let disposition_str: String = row.get(4);
832 let rationale_json: Option<String> = row.get(5);
833 let recorded_at_str: String = row.get(6);
834
835 let entry_id = uuid::Uuid::parse_str(&entry_id_str)
836 .map_err(|e| PostgresStoreError::Mapping(format!("entry_id UUID: {e}")))?;
837 let claim_id = uuid::Uuid::parse_str(&claim_id_str)
838 .map(ClaimRef)
839 .map_err(|e| PostgresStoreError::Mapping(format!("claim_id UUID: {e}")))?;
840 let event_kind = str_to_ledger_event_kind(&event_kind_str)?;
841 let disposition = str_to_disposition(&disposition_str)?;
842 let rationale: Option<serde_json::Value> = rationale_json
843 .map(|s| {
844 serde_json::from_str(&s)
845 .map_err(|e| PostgresStoreError::Mapping(format!("rationale JSON: {e}")))
846 })
847 .transpose()?;
848 let recorded_at = chrono::DateTime::parse_from_rfc3339(&recorded_at_str)
849 .map(|dt| dt.with_timezone(&chrono::Utc))
850 .map_err(|e| PostgresStoreError::Mapping(format!("recorded_at: {e}")))?;
851
852 Ok(LedgerEntry {
853 entry_id,
854 agent_id: AgentId(agent_id_str),
855 claim_ref: claim_id,
856 event_kind,
857 disposition,
858 rationale,
859 recorded_at: TransactionTime(recorded_at),
860 })
861 };
862
863 let id_strings: Vec<String> = claim_refs.iter().map(|r| r.0.to_string()).collect();
865 let ids_ref: Vec<&str> = id_strings.iter().map(|s| s.as_str()).collect();
866
867 let rows = conn.query(
868 "SELECT entry_id, agent_id, claim_id, event_kind, disposition, rationale::text, recorded_at
869 FROM ledger_entries
870 WHERE agent_id = $1 AND claim_id = ANY($2)
871 ORDER BY recorded_at ASC",
872 &[&agent_id.0.as_str(), &ids_ref.as_slice()],
873 )?;
874
875 rows.iter().map(map_row).collect()
876 }
877
878 fn load_edges_for(
880 &self,
881 agent_id: &AgentId,
882 claim_ref: &ClaimRef,
883 ) -> Result<Vec<ClaimEdge>, PostgresStoreError> {
884 let mut conn = self.pool.get()?;
885 let claim_id_str = claim_ref.0.to_string();
886
887 let rows = conn.query(
888 "SELECT edge_id, agent_id, from_claim_id, to_claim_id, edge_kind, created_at
889 FROM claim_edges
890 WHERE agent_id = $1
891 AND (from_claim_id = $2 OR to_claim_id = $2)
892 ORDER BY created_at ASC",
893 &[&agent_id.0.as_str(), &claim_id_str.as_str()],
894 )?;
895
896 rows.iter().map(row_to_edge).collect()
897 }
898
899 fn load_injected_claims(
901 &self,
902 agent_id: &AgentId,
903 ) -> Result<Vec<ClaimRef>, PostgresStoreError> {
904 let mut conn = self.pool.get()?;
905
906 let rows = conn.query(
907 "SELECT claim_id
908 FROM ledger_entries
909 WHERE agent_id = $1 AND event_kind = 'ServedAsInjected'
910 GROUP BY claim_id
911 ORDER BY MIN(recorded_at) ASC",
912 &[&agent_id.0.as_str()],
913 )?;
914
915 rows.iter()
916 .map(|row| {
917 let claim_id_str: String = row.get(0);
918 uuid::Uuid::parse_str(&claim_id_str)
919 .map(ClaimRef)
920 .map_err(|e| PostgresStoreError::Mapping(format!("claim_id UUID: {e}")))
921 })
922 .collect()
923 }
924
925 fn load_lineage(
931 &self,
932 agent_id: &AgentId,
933 claim_ref: &ClaimRef,
934 ) -> Result<Vec<ClaimEdge>, PostgresStoreError> {
935 let mut conn = self.pool.get()?;
936 let start_id = claim_ref.0.to_string();
937
938 let rows = conn.query(
939 "WITH RECURSIVE lineage(edge_id, depth) AS (
940 -- Base case: all DerivedFrom edges leaving from our starting claim
941 SELECT ce.edge_id, 1
942 FROM claim_edges ce
943 WHERE ce.agent_id = $1
944 AND ce.from_claim_id = $2
945 AND ce.edge_kind = 'DerivedFrom'
946 UNION ALL
947 -- Recursive case: follow the to_claim of the previous edge onward
948 SELECT ce2.edge_id, l.depth + 1
949 FROM claim_edges ce2
950 JOIN lineage l ON ce2.from_claim_id = (
951 SELECT to_claim_id FROM claim_edges WHERE edge_id = l.edge_id
952 )
953 WHERE ce2.agent_id = $1
954 AND ce2.edge_kind = 'DerivedFrom'
955 AND l.depth < 64
956 )
957 SELECT ce.edge_id, ce.agent_id, ce.from_claim_id, ce.to_claim_id,
958 ce.edge_kind, ce.created_at,
959 l.depth
960 FROM claim_edges ce
961 JOIN lineage l ON ce.edge_id = l.edge_id
962 ORDER BY l.depth ASC, ce.created_at ASC",
963 &[&agent_id.0.as_str(), &start_id.as_str()],
964 )?;
965
966 rows.iter()
967 .map(|row| {
968 let edge_id_str: String = row.get(0);
969 let agent_id_str: String = row.get(1);
970 let from_claim_str: String = row.get(2);
971 let to_claim_str: String = row.get(3);
972 let kind_str: String = row.get(4);
973 let created_at_str: String = row.get(5);
974 let edge_id = uuid::Uuid::parse_str(&edge_id_str)
977 .map_err(|e| PostgresStoreError::Mapping(format!("edge_id UUID: {e}")))?;
978 let from_claim = uuid::Uuid::parse_str(&from_claim_str)
979 .map(ClaimRef)
980 .map_err(|e| PostgresStoreError::Mapping(format!("from_claim UUID: {e}")))?;
981 let to_claim = uuid::Uuid::parse_str(&to_claim_str)
982 .map(ClaimRef)
983 .map_err(|e| PostgresStoreError::Mapping(format!("to_claim UUID: {e}")))?;
984 let kind = str_to_edge_kind(&kind_str)?;
985 let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
986 .map(|dt| dt.with_timezone(&chrono::Utc))
987 .map_err(|e| PostgresStoreError::Mapping(format!("created_at: {e}")))?;
988
989 Ok(ClaimEdge {
990 edge_id,
991 agent_id: AgentId(agent_id_str),
992 from_claim,
993 to_claim,
994 kind,
995 created_at: TransactionTime(created_at),
996 })
997 })
998 .collect()
999 }
1000
1001 fn requires_global_write_serialization(&self) -> bool {
1003 false
1004 }
1005}
1006
1007pub struct PostgresPendingStore {
1015 pool: r2d2::Pool<r2d2_postgres::PostgresConnectionManager<postgres::NoTls>>,
1016}
1017
1018impl PostgresPendingStore {
1019 pub fn new(pool: r2d2::Pool<r2d2_postgres::PostgresConnectionManager<postgres::NoTls>>) -> Self {
1021 Self { pool }
1022 }
1023}
1024
1025impl PendingAdjudicationPort for PostgresPendingStore {
1026 type Error = PostgresStoreError;
1027
1028 fn insert_pending(&self, row: &PendingAdjudicationRow) -> Result<(), PostgresStoreError> {
1029 let mut conn = self.pool.get()?;
1030 let request_payload = serde_json::to_string(&row.request_payload)
1031 .map_err(|e| PostgresStoreError::Mapping(format!("request_payload serialization: {e}")))?;
1032 let queued_at: chrono::DateTime<chrono::Utc> = row.queued_at;
1036 let expires_at: Option<chrono::DateTime<chrono::Utc>> = row.expires_at;
1037 conn.execute(
1038 "INSERT INTO pending_adjudications (
1039 handle_id, agent_id, subject, predicate,
1040 challenger_claim_ref, incumbent_claim_ref,
1041 request_payload, queued_at, expires_at, status
1042 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
1043 &[
1044 &row.handle_id.to_string(),
1045 &row.agent_id.0.as_str(),
1046 &row.subject.as_str(),
1047 &row.predicate.as_str(),
1048 &row.challenger_claim_ref.0.to_string(),
1049 &row.incumbent_claim_ref.0.to_string(),
1050 &request_payload.as_str(),
1051 &queued_at,
1052 &expires_at,
1053 &row.status.as_str(),
1054 ],
1055 )?;
1056 Ok(())
1057 }
1058
1059 fn get_pending(&self, handle_id: uuid::Uuid) -> Result<Option<PendingAdjudicationRow>, PostgresStoreError> {
1060 let mut conn = self.pool.get()?;
1061 let rows = conn.query(
1062 "SELECT handle_id, agent_id, subject, predicate,
1063 challenger_claim_ref, incumbent_claim_ref,
1064 request_payload, queued_at, expires_at, status
1065 FROM pending_adjudications
1066 WHERE handle_id = $1",
1067 &[&handle_id.to_string()],
1068 )?;
1069 match rows.into_iter().next() {
1070 None => Ok(None),
1071 Some(row) => Ok(Some(pg_row_to_pending(&row)?)),
1072 }
1073 }
1074
1075 fn list_pending(&self, agent_id: Option<&AgentId>) -> Result<Vec<PendingAdjudicationRow>, PostgresStoreError> {
1076 let mut conn = self.pool.get()?;
1077 let rows = if let Some(aid) = agent_id {
1078 conn.query(
1079 "SELECT handle_id, agent_id, subject, predicate,
1080 challenger_claim_ref, incumbent_claim_ref,
1081 request_payload, queued_at, expires_at, status
1082 FROM pending_adjudications
1083 WHERE agent_id = $1 AND status = 'pending'
1084 ORDER BY queued_at ASC",
1085 &[&aid.0.as_str()],
1086 )?
1087 } else {
1088 conn.query(
1089 "SELECT handle_id, agent_id, subject, predicate,
1090 challenger_claim_ref, incumbent_claim_ref,
1091 request_payload, queued_at, expires_at, status
1092 FROM pending_adjudications
1093 WHERE status = 'pending'
1094 ORDER BY queued_at ASC",
1095 &[],
1096 )?
1097 };
1098 rows.iter().map(pg_row_to_pending).collect()
1099 }
1100
1101 fn list_expired(&self, now: chrono::DateTime<chrono::Utc>) -> Result<Vec<PendingAdjudicationRow>, PostgresStoreError> {
1102 let mut conn = self.pool.get()?;
1103 let rows = conn.query(
1105 "SELECT handle_id, agent_id, subject, predicate,
1106 challenger_claim_ref, incumbent_claim_ref,
1107 request_payload, queued_at, expires_at, status
1108 FROM pending_adjudications
1109 WHERE expires_at IS NOT NULL AND expires_at <= $1 AND status = 'pending'
1110 ORDER BY expires_at ASC",
1111 &[&now],
1112 )?;
1113 rows.iter().map(pg_row_to_pending).collect()
1114 }
1115
1116 fn mark_resolved(&self, handle_id: uuid::Uuid) -> Result<(), PostgresStoreError> {
1117 let mut conn = self.pool.get()?;
1118 conn.execute(
1119 "UPDATE pending_adjudications SET status = 'resolved' WHERE handle_id = $1",
1120 &[&handle_id.to_string()],
1121 )?;
1122 Ok(())
1123 }
1124
1125 fn mark_expired(&self, handle_id: uuid::Uuid) -> Result<(), PostgresStoreError> {
1126 let mut conn = self.pool.get()?;
1127 conn.execute(
1128 "UPDATE pending_adjudications SET status = 'expired' WHERE handle_id = $1",
1129 &[&handle_id.to_string()],
1130 )?;
1131 Ok(())
1132 }
1133
1134 fn list_queued_orphan_claims(
1138 &self,
1139 ) -> Result<Vec<mempill_core::ports::pending_adjudication::OrphanedQueuedClaim>, PostgresStoreError> {
1140 let mut conn = self.pool.get()?;
1141
1142 let orphan_rows = conn.query(
1146 "SELECT l.agent_id, l.claim_id, c.subject, c.predicate
1147 FROM ledger_entries l
1148 JOIN claims c ON c.claim_id = l.claim_id AND c.agent_id = l.agent_id
1149 WHERE l.disposition = 'QueuedForAdjudication'
1150 AND l.recorded_at = (
1151 SELECT MAX(l2.recorded_at) FROM ledger_entries l2
1152 WHERE l2.claim_id = l.claim_id AND l2.agent_id = l.agent_id
1153 )
1154 AND NOT EXISTS (
1155 SELECT 1 FROM pending_adjudications pa
1156 WHERE pa.challenger_claim_ref = l.claim_id
1157 AND pa.agent_id = l.agent_id
1158 AND pa.status = 'pending'
1159 )",
1160 &[],
1161 )?;
1162
1163 let mut results = Vec::new();
1164 for row in &orphan_rows {
1165 let agent_id_str: String = row.get(0);
1166 let challenger_str: String = row.get(1);
1167 let subject: String = row.get(2);
1168 let predicate: String = row.get(3);
1169
1170 let challenger_ref = uuid::Uuid::parse_str(&challenger_str)
1171 .map(mempill_types::ClaimRef)
1172 .map_err(|e| PostgresStoreError::Mapping(format!("challenger_claim_ref UUID: {e}")))?;
1173
1174 let incumbent_rows = conn.query(
1177 "SELECT l.claim_id
1178 FROM ledger_entries l
1179 JOIN claims c ON c.claim_id = l.claim_id AND c.agent_id = l.agent_id
1180 WHERE l.agent_id = $1
1181 AND c.subject = $2
1182 AND c.predicate = $3
1183 AND l.disposition = 'CommittedCheap'
1184 AND l.recorded_at = (
1185 SELECT MAX(l2.recorded_at) FROM ledger_entries l2
1186 WHERE l2.claim_id = l.claim_id AND l2.agent_id = l.agent_id
1187 )
1188 ORDER BY l.recorded_at DESC
1189 LIMIT 1",
1190 &[&agent_id_str.as_str(), &subject.as_str(), &predicate.as_str()],
1191 )?;
1192
1193 let incumbent_ref = incumbent_rows.first()
1194 .map(|ir| {
1195 let ref_str: String = ir.get(0);
1196 uuid::Uuid::parse_str(&ref_str)
1197 .map(mempill_types::ClaimRef)
1198 .map_err(|e| PostgresStoreError::Mapping(format!("incumbent UUID: {e}")))
1199 })
1200 .transpose()?;
1201
1202 results.push(mempill_core::ports::pending_adjudication::OrphanedQueuedClaim {
1203 agent_id: mempill_types::AgentId(agent_id_str),
1204 challenger_claim_ref: challenger_ref,
1205 incumbent_claim_ref: incumbent_ref,
1206 subject,
1207 predicate,
1208 });
1209 }
1210
1211 Ok(results)
1212 }
1213}
1214
1215fn pg_row_to_pending(row: &postgres::Row) -> Result<PendingAdjudicationRow, PostgresStoreError> {
1221 let handle_id_str: String = row.get(0);
1222 let agent_id_str: String = row.get(1);
1223 let subject: String = row.get(2);
1224 let predicate: String = row.get(3);
1225 let challenger_str: String = row.get(4);
1226 let incumbent_str: String = row.get(5);
1227 let payload_json: String = row.get(6);
1228 let queued_at: chrono::DateTime<chrono::Utc> = row.get(7);
1230 let expires_at: Option<chrono::DateTime<chrono::Utc>> = row.get(8);
1231 let status: String = row.get(9);
1232
1233 let handle_id = uuid::Uuid::parse_str(&handle_id_str)
1234 .map_err(|e| PostgresStoreError::Mapping(format!("handle_id UUID: {e}")))?;
1235 let challenger_claim_ref = uuid::Uuid::parse_str(&challenger_str)
1236 .map(ClaimRef)
1237 .map_err(|e| PostgresStoreError::Mapping(format!("challenger_claim_ref UUID: {e}")))?;
1238 let incumbent_claim_ref = uuid::Uuid::parse_str(&incumbent_str)
1239 .map(ClaimRef)
1240 .map_err(|e| PostgresStoreError::Mapping(format!("incumbent_claim_ref UUID: {e}")))?;
1241 let request_payload: mempill_types::AdjudicationRequest =
1242 serde_json::from_str(&payload_json)
1243 .map_err(|e| PostgresStoreError::Mapping(format!("request_payload JSON: {e}")))?;
1244
1245 Ok(PendingAdjudicationRow {
1246 handle_id,
1247 agent_id: AgentId(agent_id_str),
1248 subject,
1249 predicate,
1250 challenger_claim_ref,
1251 incumbent_claim_ref,
1252 request_payload,
1253 queued_at,
1254 expires_at,
1255 status,
1256 })
1257}
1258
1259pub fn open_postgres<O, V>(
1266 connection_string: &str,
1267 oracle: Option<Arc<O>>,
1268 vector: Option<Arc<V>>,
1269 config: EngineConfig,
1270) -> Result<EngineHandle<PostgresPersistenceStore, O, V>, PostgresStoreError>
1271where
1272 O: mempill_core::ports::OraclePort + Send + Sync + 'static,
1273 V: mempill_core::ports::VectorPort + Send + Sync + 'static,
1274{
1275 let store = PostgresPersistenceStore::new(connection_string)?;
1276 Ok(EngineHandle::new(Arc::new(store), oracle, vector, config))
1277}
1278
1279pub fn open_postgres_with_oracle<O, V>(
1288 connection_string: &str,
1289 oracle: Arc<O>,
1290 vector: Option<Arc<V>>,
1291 config: EngineConfig,
1292) -> Result<EngineHandle<PostgresPersistenceStore, O, V>, PostgresStoreError>
1293where
1294 O: mempill_core::ports::OraclePort + Send + Sync + 'static,
1295 V: mempill_core::ports::VectorPort + Send + Sync + 'static,
1296{
1297 let store = PostgresPersistenceStore::new(connection_string)?;
1298 let store_arc = Arc::new(store);
1299 let pending_store: Arc<dyn mempill_core::ErasedPendingStore> = Arc::new(
1300 mempill_core::ErasedPendingStoreAdapter::new(store_arc.pending_store()),
1301 );
1302 Ok(EngineHandle::new_with_pending_store::<()>(
1303 store_arc,
1304 Some(oracle),
1305 vector,
1306 pending_store,
1307 config,
1308 ))
1309}