1use std::sync::{Arc, Mutex};
30
31use mempill_core::ports::pending_adjudication::{PendingAdjudicationPort, PendingAdjudicationRow};
32use mempill_core::ports::persistence::PersistencePort;
33use mempill_types::{
34 claim::{Cardinality, Claim, Confidence, Criticality, Fact},
35 edge::{ClaimEdge, EdgeKind},
36 identity::{AgentId, ClaimRef},
37 ledger::{LedgerEntry, LedgerEventKind},
38 provenance::{ExternalAnchor, ExternalKind, ProvenanceLabel},
39 time::{TransactionTime, ValidTime},
40 validity::{AssertionKind, ValidityAssertion},
41};
42use rusqlite::Connection;
43
44use crate::{txn::SqliteTxn, SqliteStoreError};
45
46pub struct SqlitePersistenceStore {
54 conn: Arc<Mutex<Option<Box<Connection>>>>,
56}
57
58impl SqlitePersistenceStore {
59 pub fn new(conn: Connection) -> Self {
61 Self {
62 conn: Arc::new(Mutex::new(Some(Box::new(conn)))),
63 }
64 }
65
66 pub fn pending_store(&self) -> SqlitePendingStore {
77 SqlitePendingStore::new(Arc::clone(&self.conn))
78 }
79}
80
81unsafe impl Send for SqlitePersistenceStore {}
83unsafe impl Sync for SqlitePersistenceStore {}
84
85fn provenance_to_str(p: &ProvenanceLabel) -> &'static str {
91 match p {
92 ProvenanceLabel::ModelDerived => "ModelDerived",
93 ProvenanceLabel::RecallReEntry => "RecallReEntry",
94 ProvenanceLabel::External(ExternalKind::UserAsserted) => "External_UserAsserted",
95 ProvenanceLabel::External(ExternalKind::ExternalFirstHand) => "External_ExternalFirstHand",
96 _ => "Unknown",
98 }
99}
100
101fn str_to_provenance(s: &str) -> Result<ProvenanceLabel, SqliteStoreError> {
104 match s {
105 "ModelDerived" => Ok(ProvenanceLabel::ModelDerived),
106 "RecallReEntry" => Ok(ProvenanceLabel::RecallReEntry),
107 "External_UserAsserted" => {
108 Ok(ProvenanceLabel::External(ExternalKind::UserAsserted))
109 }
110 "External_ExternalFirstHand" => {
111 Ok(ProvenanceLabel::External(ExternalKind::ExternalFirstHand))
112 }
113 other => Err(SqliteStoreError::Mapping(format!(
114 "unknown provenance_label value: {other}"
115 ))),
116 }
117}
118
119fn cardinality_to_str(c: &Cardinality) -> &'static str {
120 match c {
121 Cardinality::Functional => "Functional",
122 Cardinality::SetValued => "SetValued",
123 Cardinality::Unknown => "Unknown",
124 }
125}
126
127fn str_to_cardinality(s: &str) -> Result<Cardinality, SqliteStoreError> {
128 match s {
129 "Functional" => Ok(Cardinality::Functional),
130 "SetValued" => Ok(Cardinality::SetValued),
131 "Unknown" => Ok(Cardinality::Unknown),
132 other => Err(SqliteStoreError::Mapping(format!(
133 "unknown cardinality value: {other}"
134 ))),
135 }
136}
137
138fn criticality_to_str(c: &Criticality) -> &'static str {
139 match c {
140 Criticality::Low => "Low",
141 Criticality::Medium => "Medium",
142 Criticality::High => "High",
143 Criticality::Critical => "Critical",
144 }
145}
146
147fn str_to_criticality(s: &str) -> Result<Criticality, SqliteStoreError> {
148 match s {
149 "Low" => Ok(Criticality::Low),
150 "Medium" => Ok(Criticality::Medium),
151 "High" => Ok(Criticality::High),
152 "Critical" => Ok(Criticality::Critical),
153 other => Err(SqliteStoreError::Mapping(format!(
154 "unknown criticality value: {other}"
155 ))),
156 }
157}
158
159fn edge_kind_to_str(k: &EdgeKind) -> &'static str {
160 match k {
161 EdgeKind::DerivedFrom => "DerivedFrom",
162 EdgeKind::Supersedes => "Supersedes",
163 EdgeKind::DependsOn => "DependsOn",
164 EdgeKind::MutualExclusion => "MutualExclusion",
165 _ => "Unknown",
167 }
168}
169
170fn str_to_edge_kind(s: &str) -> Result<EdgeKind, SqliteStoreError> {
171 match s {
172 "DerivedFrom" => Ok(EdgeKind::DerivedFrom),
173 "Supersedes" => Ok(EdgeKind::Supersedes),
174 "DependsOn" => Ok(EdgeKind::DependsOn),
175 "MutualExclusion" => Ok(EdgeKind::MutualExclusion),
176 other => Err(SqliteStoreError::Mapping(format!(
177 "unknown edge_kind value: {other}"
178 ))),
179 }
180}
181
182fn ledger_event_kind_to_str(k: &LedgerEventKind) -> &'static str {
183 match k {
185 LedgerEventKind::ClaimCommitted => "ClaimCommitted",
186 LedgerEventKind::ValidityAsserted => "ValidityAsserted",
187 LedgerEventKind::AdjudicationRequested => "AdjudicationRequested",
188 LedgerEventKind::AdjudicationResolved => "AdjudicationResolved",
189 LedgerEventKind::RecallReEntryDetected => "RecallReEntryDetected",
190 LedgerEventKind::Quarantined => "Quarantined",
191 LedgerEventKind::DependentFlaggedPendingReview => "DependentFlaggedPendingReview",
192 LedgerEventKind::ServedAsInjected => "ServedAsInjected",
193 LedgerEventKind::AdjudicationExpired => "AdjudicationExpired",
194 _ => "Unknown",
196 }
197}
198
199fn str_to_ledger_event_kind(s: &str) -> Result<LedgerEventKind, SqliteStoreError> {
200 match s {
201 "ClaimCommitted" => Ok(LedgerEventKind::ClaimCommitted),
202 "ValidityAsserted" => Ok(LedgerEventKind::ValidityAsserted),
203 "AdjudicationRequested" => Ok(LedgerEventKind::AdjudicationRequested),
204 "AdjudicationResolved" => Ok(LedgerEventKind::AdjudicationResolved),
205 "RecallReEntryDetected" => Ok(LedgerEventKind::RecallReEntryDetected),
206 "Quarantined" => Ok(LedgerEventKind::Quarantined),
207 "DependentFlaggedPendingReview" => Ok(LedgerEventKind::DependentFlaggedPendingReview),
208 "ServedAsInjected" => Ok(LedgerEventKind::ServedAsInjected),
209 "AdjudicationExpired" => Ok(LedgerEventKind::AdjudicationExpired),
210 other => Err(SqliteStoreError::Mapping(format!(
211 "unknown ledger event_kind value: {other}"
212 ))),
213 }
214}
215
216fn disposition_to_str(d: &mempill_types::disposition::Disposition) -> &'static str {
217 use mempill_types::disposition::Disposition;
218 match d {
219 Disposition::CommittedCheap => "CommittedCheap",
220 Disposition::CommittedInferred => "CommittedInferred",
221 Disposition::QueuedForAdjudication => "QueuedForAdjudication",
222 Disposition::Contested => "Contested",
223 Disposition::PendingConflict => "PendingConflict",
224 Disposition::PendingReview => "PendingReview",
225 Disposition::PendingLowConfidence => "PendingLowConfidence",
226 Disposition::Quarantined => "Quarantined",
227 Disposition::Superseded => "Superseded",
228 Disposition::Invalidated => "Invalidated",
229 Disposition::Reinstated => "Reinstated",
230 Disposition::Rejected => "Rejected",
231 _ => "Unknown",
233 }
234}
235
236fn str_to_disposition(s: &str) -> Result<mempill_types::disposition::Disposition, SqliteStoreError> {
237 use mempill_types::disposition::Disposition;
238 match s {
239 "CommittedCheap" => Ok(Disposition::CommittedCheap),
240 "CommittedInferred" => Ok(Disposition::CommittedInferred),
241 "QueuedForAdjudication" => Ok(Disposition::QueuedForAdjudication),
242 "Contested" => Ok(Disposition::Contested),
243 "PendingConflict" => Ok(Disposition::PendingConflict),
244 "PendingReview" => Ok(Disposition::PendingReview),
245 "PendingLowConfidence" => Ok(Disposition::PendingLowConfidence),
246 "Quarantined" => Ok(Disposition::Quarantined),
247 "Superseded" => Ok(Disposition::Superseded),
248 "Invalidated" => Ok(Disposition::Invalidated),
249 "Reinstated" => Ok(Disposition::Reinstated),
250 "Rejected" => Ok(Disposition::Rejected),
251 other => Err(SqliteStoreError::Mapping(format!(
252 "unknown disposition value: {other}"
253 ))),
254 }
255}
256
257fn row_to_claim(row: &rusqlite::Row<'_>) -> Result<Claim, rusqlite::Error> {
281 let claim_id_str: String = row.get(0)?;
284 let agent_id_str: String = row.get(1)?;
285 let subject: String = row.get(2)?;
286 let predicate: String = row.get(3)?;
287 let value_json: String = row.get(4)?;
288 let cardinality_str: String = row.get(5)?;
289 let provenance_str: String = row.get(6)?;
290 let nearest_anchor_str: Option<String> = row.get(7)?;
291 let derivation_depth: i64 = row.get(8)?;
292 let tx_time_str: String = row.get(9)?;
293 let valid_time_start_str: Option<String> = row.get(10)?;
294 let valid_time_end_str: Option<String> = row.get(11)?;
295 let valid_time_confidence: f64 = row.get(12)?;
296 let value_confidence: f64 = row.get(13)?;
297 let criticality_str: String = row.get(14)?;
298 let derived_from_json: String = row.get(15)?;
299 let metadata_json: Option<String> = row.get(16)?;
300 let snapshot_schema_version_raw: Option<i64> = row.get(17)?;
301
302 let to_rusqlite_err = |msg: String| rusqlite::Error::InvalidColumnType(
305 0,
306 msg,
307 rusqlite::types::Type::Text,
308 );
309
310 let claim_id = uuid::Uuid::parse_str(&claim_id_str)
311 .map_err(|e| to_rusqlite_err(format!("claim_id UUID parse: {e}")))?;
312
313 let value: serde_json::Value = serde_json::from_str(&value_json)
314 .map_err(|e| to_rusqlite_err(format!("value JSON parse: {e}")))?;
315
316 let cardinality = str_to_cardinality(&cardinality_str)
317 .map_err(|e| to_rusqlite_err(e.to_string()))?;
318
319 let provenance = str_to_provenance(&provenance_str)
320 .map_err(|e| to_rusqlite_err(e.to_string()))?;
321
322 let nearest_external_anchor: Option<ClaimRef> = nearest_anchor_str
323 .map(|s| {
324 uuid::Uuid::parse_str(&s)
325 .map(ClaimRef)
326 .map_err(|e| to_rusqlite_err(format!("anchor UUID parse: {e}")))
327 })
328 .transpose()?;
329
330 let tx_time = chrono::DateTime::parse_from_rfc3339(&tx_time_str)
331 .map(|dt| dt.with_timezone(&chrono::Utc))
332 .map_err(|e| to_rusqlite_err(format!("tx_time parse: {e}")))?;
333
334 let valid_time_start = valid_time_start_str
335 .map(|s| {
336 chrono::DateTime::parse_from_rfc3339(&s)
337 .map(|dt| dt.with_timezone(&chrono::Utc))
338 .map_err(|e| to_rusqlite_err(format!("valid_time_start parse: {e}")))
339 })
340 .transpose()?;
341
342 let valid_time_end = valid_time_end_str
343 .map(|s| {
344 chrono::DateTime::parse_from_rfc3339(&s)
345 .map(|dt| dt.with_timezone(&chrono::Utc))
346 .map_err(|e| to_rusqlite_err(format!("valid_time_end parse: {e}")))
347 })
348 .transpose()?;
349
350 let criticality = str_to_criticality(&criticality_str)
351 .map_err(|e| to_rusqlite_err(e.to_string()))?;
352
353 let derived_from_uuids: Vec<String> = serde_json::from_str(&derived_from_json)
354 .map_err(|e| to_rusqlite_err(format!("derived_from JSON parse: {e}")))?;
355
356 let derived_from: Vec<ClaimRef> = derived_from_uuids
357 .iter()
358 .map(|s| {
359 uuid::Uuid::parse_str(s)
360 .map(ClaimRef)
361 .map_err(|e| to_rusqlite_err(format!("derived_from UUID parse: {e}")))
362 })
363 .collect::<Result<_, _>>()?;
364
365 let metadata: Option<serde_json::Value> = metadata_json
366 .map(|s| {
367 serde_json::from_str(&s)
368 .map_err(|e| to_rusqlite_err(format!("metadata JSON parse: {e}")))
369 })
370 .transpose()?;
371
372 let snapshot_schema_version: Option<u32> =
373 snapshot_schema_version_raw.map(|v| v as u32);
374
375 Ok(Claim::new(
376 ClaimRef(claim_id),
377 AgentId(agent_id_str),
378 Fact { subject, predicate, value },
379 cardinality,
380 provenance,
381 ExternalAnchor {
382 nearest_external_anchor,
383 derivation_depth: derivation_depth as u32,
384 },
385 TransactionTime(tx_time),
386 ValidTime {
387 start: valid_time_start,
388 end: valid_time_end,
389 valid_time_confidence: valid_time_confidence as f32,
390 },
391 Confidence {
392 value_confidence: value_confidence as f32,
393 valid_time_confidence: valid_time_confidence as f32,
394 },
395 criticality,
396 derived_from,
397 metadata,
398 snapshot_schema_version,
399 ))
400}
401
402const CLAIM_SELECT_COLS: &str = "
405 claim_id, agent_id, subject, predicate, value, cardinality,
406 provenance_label, nearest_external_anchor_id, derivation_depth,
407 tx_time, valid_time_start, valid_time_end, valid_time_confidence,
408 value_confidence, criticality, derived_from,
409 metadata, snapshot_schema_version
410";
411
412fn row_to_edge(row: &rusqlite::Row<'_>) -> Result<ClaimEdge, rusqlite::Error> {
414 let to_err = |msg: String| rusqlite::Error::InvalidColumnType(
415 0, msg, rusqlite::types::Type::Text,
416 );
417
418 let edge_id_str: String = row.get(0)?;
419 let agent_id_str: String = row.get(1)?;
420 let from_claim_str: String = row.get(2)?;
421 let to_claim_str: String = row.get(3)?;
422 let kind_str: String = row.get(4)?;
423 let created_at_str: String = row.get(5)?;
424
425 let edge_id = uuid::Uuid::parse_str(&edge_id_str)
426 .map_err(|e| to_err(format!("edge_id UUID: {e}")))?;
427 let from_claim = uuid::Uuid::parse_str(&from_claim_str)
428 .map(ClaimRef)
429 .map_err(|e| to_err(format!("from_claim UUID: {e}")))?;
430 let to_claim = uuid::Uuid::parse_str(&to_claim_str)
431 .map(ClaimRef)
432 .map_err(|e| to_err(format!("to_claim UUID: {e}")))?;
433 let kind = str_to_edge_kind(&kind_str)
434 .map_err(|e| to_err(e.to_string()))?;
435 let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
436 .map(|dt| dt.with_timezone(&chrono::Utc))
437 .map_err(|e| to_err(format!("created_at parse: {e}")))?;
438
439 Ok(ClaimEdge {
440 edge_id,
441 agent_id: AgentId(agent_id_str),
442 from_claim,
443 to_claim,
444 kind,
445 created_at: TransactionTime(created_at),
446 })
447}
448
449impl PersistencePort for SqlitePersistenceStore {
452 type Transaction = SqliteTxn;
453 type Error = SqliteStoreError;
454
455 fn begin_atomic(&self, agent_id: &AgentId) -> Result<SqliteTxn, SqliteStoreError> {
462 let mut slot = self.conn.lock().expect("SqlitePersistenceStore: mutex poisoned");
463 let conn = slot.take().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
464 SqliteTxn::begin(agent_id.clone(), conn)
465 }
466
467 fn commit(&self, txn: SqliteTxn) -> Result<(), SqliteStoreError> {
469 let conn = txn.commit_and_return()?;
470 let mut slot = self.conn.lock().expect("SqlitePersistenceStore: mutex poisoned");
471 *slot = Some(conn);
472 Ok(())
473 }
474
475 fn rollback(&self, txn: SqliteTxn) -> Result<(), SqliteStoreError> {
478 let conn = txn.rollback_and_return()?;
479 let mut slot = self.conn.lock().expect("SqlitePersistenceStore: mutex poisoned");
480 *slot = Some(conn);
481 Ok(())
482 }
483
484 fn append_claim(
495 &self,
496 txn: &mut SqliteTxn,
497 claim: &Claim,
498 ) -> Result<ClaimRef, SqliteStoreError> {
499 let conn = txn.conn();
500
501 let claim_id = claim.claim_ref().0.to_string();
502 let agent_id = claim.agent_id().0.as_str();
503 let fact = claim.fact();
504 let value_json = serde_json::to_string(&fact.value)
505 .map_err(|e| SqliteStoreError::Mapping(format!("value serialization: {e}")))?;
506 let cardinality = cardinality_to_str(claim.cardinality());
507 let provenance = provenance_to_str(claim.provenance());
508 let anchor = claim.external_anchor();
509 let nearest_anchor: Option<String> =
510 anchor.nearest_external_anchor.as_ref().map(|r| r.0.to_string());
511 let derivation_depth = anchor.derivation_depth as i64;
512 let tx_time = claim.transaction_time().0.to_rfc3339();
513 let vt = claim.valid_time();
514 let valid_time_start: Option<String> = vt.start.map(|dt| dt.to_rfc3339());
515 let valid_time_end: Option<String> = vt.end.map(|dt| dt.to_rfc3339());
516 let valid_time_confidence = vt.valid_time_confidence as f64;
517 let conf = claim.confidence();
518 let value_confidence = conf.value_confidence as f64;
519 let criticality = criticality_to_str(claim.criticality());
520 let derived_from_refs: Vec<String> =
521 claim.derived_from().iter().map(|r| r.0.to_string()).collect();
522 let derived_from_json = serde_json::to_string(&derived_from_refs)
523 .map_err(|e| SqliteStoreError::Mapping(format!("derived_from serialization: {e}")))?;
524 let metadata: Option<String> = claim
525 .metadata()
526 .map(|v| {
527 serde_json::to_string(v)
528 .map_err(|e| SqliteStoreError::Mapping(format!("metadata serialization: {e}")))
529 })
530 .transpose()?;
531 let snapshot_schema_version: Option<i64> =
532 claim.snapshot_schema_version().map(|v| v as i64);
533
534 conn.execute(
535 "INSERT INTO claims (
536 claim_id, agent_id, subject, predicate, value, cardinality,
537 provenance_label, nearest_external_anchor_id, derivation_depth,
538 tx_time, valid_time_start, valid_time_end, valid_time_confidence,
539 value_confidence, criticality, derived_from,
540 metadata, snapshot_schema_version, embedding_model_id
541 ) VALUES (
542 ?1, ?2, ?3, ?4, ?5, ?6,
543 ?7, ?8, ?9,
544 ?10, ?11, ?12, ?13,
545 ?14, ?15, ?16,
546 ?17, ?18, NULL
547 )",
548 rusqlite::params![
549 claim_id,
550 agent_id,
551 fact.subject.as_str(),
552 fact.predicate.as_str(),
553 value_json.as_str(),
554 cardinality,
555 provenance,
556 nearest_anchor,
557 derivation_depth,
558 tx_time.as_str(),
559 valid_time_start,
560 valid_time_end,
561 valid_time_confidence,
562 value_confidence,
563 criticality,
564 derived_from_json.as_str(),
565 metadata,
566 snapshot_schema_version,
567 ],
568 )?;
569
570 Ok(claim.claim_ref().clone())
571 }
572
573 fn append_validity_assertion(
575 &self,
576 txn: &mut SqliteTxn,
577 assertion: &ValidityAssertion,
578 ) -> Result<(), SqliteStoreError> {
579 let conn = txn.conn();
580
581 let assertion_id = assertion.assertion_ref.to_string();
582 let agent_id = assertion.agent_id.0.as_str();
583 let target_claim_id = assertion.target_claim.0.to_string();
584 let provenance = provenance_to_str(&assertion.provenance);
585 let value_confidence = assertion.confidence.value_confidence as f64;
586 let valid_time_confidence = assertion.confidence.valid_time_confidence as f64;
587 let asserted_at = assertion.asserted_at.0.to_rfc3339();
588
589 let (assertion_kind, bound_at, reopen_at): (&str, Option<String>, Option<String>) =
590 match &assertion.kind {
591 AssertionKind::Bound { bound_at } => {
592 ("Bound", Some(bound_at.to_rfc3339()), None)
593 }
594 AssertionKind::Reopen { reopen_at } => {
595 ("Reopen", None, Some(reopen_at.to_rfc3339()))
596 }
597 _ => ("Unknown", None, None),
599 };
600
601 conn.execute(
602 "INSERT INTO validity_assertions (
603 assertion_id, agent_id, target_claim_id,
604 assertion_kind, bound_at, reopen_at,
605 provenance_label, value_confidence, valid_time_confidence, asserted_at
606 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
607 rusqlite::params![
608 assertion_id.as_str(),
609 agent_id,
610 target_claim_id.as_str(),
611 assertion_kind,
612 bound_at,
613 reopen_at,
614 provenance,
615 value_confidence,
616 valid_time_confidence,
617 asserted_at.as_str(),
618 ],
619 )?;
620
621 Ok(())
622 }
623
624 fn append_ledger_entry(
626 &self,
627 txn: &mut SqliteTxn,
628 entry: &LedgerEntry,
629 ) -> Result<(), SqliteStoreError> {
630 let conn = txn.conn();
631
632 let entry_id = entry.entry_id.to_string();
633 let agent_id = entry.agent_id.0.as_str();
634 let claim_id = entry.claim_ref.0.to_string();
635 let event_kind = ledger_event_kind_to_str(&entry.event_kind);
636 let disposition = disposition_to_str(&entry.disposition);
637 let rationale: Option<String> = entry
638 .rationale
639 .as_ref()
640 .map(|v| {
641 serde_json::to_string(v)
642 .map_err(|e| SqliteStoreError::Mapping(format!("rationale serialization: {e}")))
643 })
644 .transpose()?;
645 let recorded_at = entry.recorded_at.0.to_rfc3339();
646
647 conn.execute(
648 "INSERT INTO ledger_entries (
649 entry_id, agent_id, claim_id, event_kind, disposition, rationale, recorded_at
650 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
651 rusqlite::params![
652 entry_id.as_str(),
653 agent_id,
654 claim_id.as_str(),
655 event_kind,
656 disposition,
657 rationale,
658 recorded_at.as_str(),
659 ],
660 )?;
661
662 Ok(())
663 }
664
665 fn append_claim_edge(
667 &self,
668 txn: &mut SqliteTxn,
669 edge: &ClaimEdge,
670 ) -> Result<(), SqliteStoreError> {
671 let conn = txn.conn();
672
673 let edge_id = edge.edge_id.to_string();
674 let agent_id = edge.agent_id.0.as_str();
675 let from_claim_id = edge.from_claim.0.to_string();
676 let to_claim_id = edge.to_claim.0.to_string();
677 let edge_kind = edge_kind_to_str(&edge.kind);
678 let created_at = edge.created_at.0.to_rfc3339();
679
680 conn.execute(
681 "INSERT INTO claim_edges (
682 edge_id, agent_id, from_claim_id, to_claim_id, edge_kind, created_at
683 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
684 rusqlite::params![
685 edge_id.as_str(),
686 agent_id,
687 from_claim_id.as_str(),
688 to_claim_id.as_str(),
689 edge_kind,
690 created_at.as_str(),
691 ],
692 )?;
693
694 Ok(())
695 }
696
697 fn load_subject_line(
704 &self,
705 agent_id: &AgentId,
706 subject: &str,
707 predicate: &str,
708 ) -> Result<Vec<Claim>, SqliteStoreError> {
709 let slot = self.conn.lock().expect("mutex poisoned");
710 let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
711
712 let sql = format!(
713 "SELECT {CLAIM_SELECT_COLS} FROM claims
714 WHERE agent_id = ?1 AND subject = ?2 AND predicate = ?3
715 ORDER BY tx_time ASC"
716 );
717 let mut stmt = conn.prepare(&sql)?;
718 let rows = stmt.query_map(
719 rusqlite::params![agent_id.0.as_str(), subject, predicate],
720 row_to_claim,
721 )?;
722
723 let mut claims = Vec::new();
724 for row in rows {
725 claims.push(row?);
726 }
727 Ok(claims)
728 }
729
730 fn load_claim(
732 &self,
733 agent_id: &AgentId,
734 claim_ref: &ClaimRef,
735 ) -> Result<Option<Claim>, SqliteStoreError> {
736 let slot = self.conn.lock().expect("mutex poisoned");
737 let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
738
739 let sql = format!(
740 "SELECT {CLAIM_SELECT_COLS} FROM claims
741 WHERE agent_id = ?1 AND claim_id = ?2"
742 );
743 let mut stmt = conn.prepare(&sql)?;
744 let mut rows = stmt.query_map(
745 rusqlite::params![agent_id.0.as_str(), claim_ref.0.to_string()],
746 row_to_claim,
747 )?;
748
749 match rows.next() {
750 None => Ok(None),
751 Some(row) => Ok(Some(row?)),
752 }
753 }
754
755 fn load_validity_assertions_for(
759 &self,
760 agent_id: &AgentId,
761 claim_ref: &ClaimRef,
762 ) -> Result<Vec<ValidityAssertion>, SqliteStoreError> {
763 let slot = self.conn.lock().expect("mutex poisoned");
764 let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
765
766 let mut stmt = conn.prepare(
767 "SELECT assertion_id, agent_id, target_claim_id,
768 assertion_kind, bound_at, reopen_at,
769 provenance_label, value_confidence, valid_time_confidence, asserted_at
770 FROM validity_assertions
771 WHERE agent_id = ?1 AND target_claim_id = ?2
772 ORDER BY asserted_at ASC",
773 )?;
774
775 let to_err = |msg: String| rusqlite::Error::InvalidColumnType(
776 0, msg, rusqlite::types::Type::Text,
777 );
778
779 let rows = stmt.query_map(
780 rusqlite::params![agent_id.0.as_str(), claim_ref.0.to_string()],
781 |row| {
782 let assertion_id_str: String = row.get(0)?;
783 let agent_id_str: String = row.get(1)?;
784 let target_claim_str: String = row.get(2)?;
785 let kind_str: String = row.get(3)?;
786 let bound_at_str: Option<String> = row.get(4)?;
787 let reopen_at_str: Option<String> = row.get(5)?;
788 let prov_str: String = row.get(6)?;
789 let value_confidence: f64 = row.get(7)?;
790 let valid_time_confidence: f64 = row.get(8)?;
791 let asserted_at_str: String = row.get(9)?;
792
793 let assertion_ref = uuid::Uuid::parse_str(&assertion_id_str)
794 .map_err(|e| to_err(format!("assertion_id UUID: {e}")))?;
795 let target_claim = uuid::Uuid::parse_str(&target_claim_str)
796 .map(ClaimRef)
797 .map_err(|e| to_err(format!("target_claim UUID: {e}")))?;
798 let provenance = str_to_provenance(&prov_str)
799 .map_err(|e| to_err(e.to_string()))?;
800 let asserted_at = chrono::DateTime::parse_from_rfc3339(&asserted_at_str)
801 .map(|dt| dt.with_timezone(&chrono::Utc))
802 .map_err(|e| to_err(format!("asserted_at parse: {e}")))?;
803
804 let kind = match kind_str.as_str() {
805 "Bound" => {
806 let s = bound_at_str.ok_or_else(|| to_err("bound_at is NULL for Bound assertion".into()))?;
807 let dt = chrono::DateTime::parse_from_rfc3339(&s)
808 .map(|dt| dt.with_timezone(&chrono::Utc))
809 .map_err(|e| to_err(format!("bound_at parse: {e}")))?;
810 AssertionKind::Bound { bound_at: dt }
811 }
812 "Reopen" => {
813 let s = reopen_at_str.ok_or_else(|| to_err("reopen_at is NULL for Reopen assertion".into()))?;
814 let dt = chrono::DateTime::parse_from_rfc3339(&s)
815 .map(|dt| dt.with_timezone(&chrono::Utc))
816 .map_err(|e| to_err(format!("reopen_at parse: {e}")))?;
817 AssertionKind::Reopen { reopen_at: dt }
818 }
819 other => return Err(to_err(format!("unknown assertion_kind: {other}"))),
820 };
821
822 Ok(ValidityAssertion {
823 assertion_ref,
824 agent_id: AgentId(agent_id_str),
825 target_claim,
826 kind,
827 provenance,
828 confidence: Confidence {
829 value_confidence: value_confidence as f32,
830 valid_time_confidence: valid_time_confidence as f32,
831 },
832 asserted_at: TransactionTime(asserted_at),
833 })
834 },
835 )?;
836
837 let mut assertions = Vec::new();
838 for row in rows {
839 assertions.push(row?);
840 }
841 Ok(assertions)
842 }
843
844 fn load_ledger(
849 &self,
850 agent_id: &AgentId,
851 from: Option<&TransactionTime>,
852 limit: usize,
853 ) -> Result<Vec<LedgerEntry>, SqliteStoreError> {
854 let slot = self.conn.lock().expect("mutex poisoned");
855 let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
856
857 let to_err = |msg: String| rusqlite::Error::InvalidColumnType(
858 0, msg, rusqlite::types::Type::Text,
859 );
860
861 let from_str: Option<String> = from.map(|t| t.0.to_rfc3339());
862 let limit_i64 = limit as i64;
863
864 let map_row = |row: &rusqlite::Row<'_>| {
865 let entry_id_str: String = row.get(0)?;
866 let agent_id_str: String = row.get(1)?;
867 let claim_id_str: String = row.get(2)?;
868 let event_kind_str: String = row.get(3)?;
869 let disposition_str: String = row.get(4)?;
870 let rationale_json: Option<String> = row.get(5)?;
871 let recorded_at_str: String = row.get(6)?;
872
873 let entry_id = uuid::Uuid::parse_str(&entry_id_str)
874 .map_err(|e| to_err(format!("entry_id UUID: {e}")))?;
875 let claim_id = uuid::Uuid::parse_str(&claim_id_str)
876 .map(ClaimRef)
877 .map_err(|e| to_err(format!("claim_id UUID: {e}")))?;
878 let event_kind = str_to_ledger_event_kind(&event_kind_str)
879 .map_err(|e| to_err(e.to_string()))?;
880 let disposition = str_to_disposition(&disposition_str)
881 .map_err(|e| to_err(e.to_string()))?;
882 let rationale: Option<serde_json::Value> = rationale_json
883 .map(|s| serde_json::from_str(&s).map_err(|e| to_err(format!("rationale JSON: {e}"))))
884 .transpose()?;
885 let recorded_at = chrono::DateTime::parse_from_rfc3339(&recorded_at_str)
886 .map(|dt| dt.with_timezone(&chrono::Utc))
887 .map_err(|e| to_err(format!("recorded_at parse: {e}")))?;
888
889 Ok(LedgerEntry {
890 entry_id,
891 agent_id: AgentId(agent_id_str),
892 claim_ref: claim_id,
893 event_kind,
894 disposition,
895 rationale,
896 recorded_at: TransactionTime(recorded_at),
897 })
898 };
899
900 let mut entries = Vec::new();
901
902 if let Some(ref from_val) = from_str {
903 let mut stmt = conn.prepare(
904 "SELECT entry_id, agent_id, claim_id, event_kind, disposition, rationale, recorded_at
905 FROM ledger_entries
906 WHERE agent_id = ?1 AND recorded_at >= ?2
907 ORDER BY recorded_at ASC
908 LIMIT ?3",
909 )?;
910 let rows = stmt.query_map(
911 rusqlite::params![agent_id.0.as_str(), from_val.as_str(), limit_i64],
912 map_row,
913 )?;
914 for row in rows {
915 entries.push(row?);
916 }
917 } else {
918 let mut stmt = conn.prepare(
919 "SELECT entry_id, agent_id, claim_id, event_kind, disposition, rationale, recorded_at
920 FROM ledger_entries
921 WHERE agent_id = ?1
922 ORDER BY recorded_at ASC
923 LIMIT ?2",
924 )?;
925 let rows = stmt.query_map(
926 rusqlite::params![agent_id.0.as_str(), limit_i64],
927 map_row,
928 )?;
929 for row in rows {
930 entries.push(row?);
931 }
932 }
933
934 Ok(entries)
935 }
936
937 fn load_ledger_for_claims(
943 &self,
944 agent_id: &AgentId,
945 claim_refs: &[ClaimRef],
946 ) -> Result<Vec<LedgerEntry>, SqliteStoreError> {
947 if claim_refs.is_empty() {
948 return Ok(vec![]);
949 }
950
951 let slot = self.conn.lock().expect("mutex poisoned");
952 let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
953
954 let to_err = |msg: String| rusqlite::Error::InvalidColumnType(
955 0, msg, rusqlite::types::Type::Text,
956 );
957
958 let map_row = |row: &rusqlite::Row<'_>| {
959 let entry_id_str: String = row.get(0)?;
960 let agent_id_str: String = row.get(1)?;
961 let claim_id_str: String = row.get(2)?;
962 let event_kind_str: String = row.get(3)?;
963 let disposition_str: String = row.get(4)?;
964 let rationale_json: Option<String> = row.get(5)?;
965 let recorded_at_str: String = row.get(6)?;
966
967 let entry_id = uuid::Uuid::parse_str(&entry_id_str)
968 .map_err(|e| to_err(format!("entry_id UUID: {e}")))?;
969 let claim_id = uuid::Uuid::parse_str(&claim_id_str)
970 .map(ClaimRef)
971 .map_err(|e| to_err(format!("claim_id UUID: {e}")))?;
972 let event_kind = str_to_ledger_event_kind(&event_kind_str)
973 .map_err(|e| to_err(e.to_string()))?;
974 let disposition = str_to_disposition(&disposition_str)
975 .map_err(|e| to_err(e.to_string()))?;
976 let rationale: Option<serde_json::Value> = rationale_json
977 .map(|s| serde_json::from_str(&s).map_err(|e| to_err(format!("rationale JSON: {e}"))))
978 .transpose()?;
979 let recorded_at = chrono::DateTime::parse_from_rfc3339(&recorded_at_str)
980 .map(|dt| dt.with_timezone(&chrono::Utc))
981 .map_err(|e| to_err(format!("recorded_at parse: {e}")))?;
982
983 Ok(LedgerEntry {
984 entry_id,
985 agent_id: AgentId(agent_id_str),
986 claim_ref: claim_id,
987 event_kind,
988 disposition,
989 rationale,
990 recorded_at: TransactionTime(recorded_at),
991 })
992 };
993
994 let mut all_entries = Vec::new();
995 const CHUNK: usize = 900;
998
999 for chunk in claim_refs.chunks(CHUNK) {
1000 let placeholders: Vec<String> = (2..=chunk.len() + 1)
1001 .map(|i| format!("?{i}"))
1002 .collect();
1003 let sql = format!(
1004 "SELECT entry_id, agent_id, claim_id, event_kind, disposition, rationale, recorded_at
1005 FROM ledger_entries
1006 WHERE agent_id = ?1 AND claim_id IN ({})
1007 ORDER BY recorded_at ASC",
1008 placeholders.join(", ")
1009 );
1010
1011 let mut stmt = conn.prepare(&sql)?;
1012 let agent_str = agent_id.0.as_str();
1014 let id_strings: Vec<String> = chunk.iter().map(|r| r.0.to_string()).collect();
1015
1016 let mut params: Vec<&dyn rusqlite::types::ToSql> = Vec::with_capacity(1 + id_strings.len());
1018 params.push(&agent_str);
1019 for s in &id_strings {
1020 params.push(s);
1021 }
1022
1023 let rows = stmt.query_map(params.as_slice(), map_row)?;
1024 for row in rows {
1025 all_entries.push(row?);
1026 }
1027 }
1028
1029 Ok(all_entries)
1030 }
1031
1032 fn load_edges_for(
1037 &self,
1038 agent_id: &AgentId,
1039 claim_ref: &ClaimRef,
1040 ) -> Result<Vec<ClaimEdge>, SqliteStoreError> {
1041 let slot = self.conn.lock().expect("mutex poisoned");
1042 let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
1043
1044 let claim_id_str = claim_ref.0.to_string();
1045
1046 let mut stmt = conn.prepare(
1047 "SELECT edge_id, agent_id, from_claim_id, to_claim_id, edge_kind, created_at
1048 FROM claim_edges
1049 WHERE agent_id = ?1
1050 AND (from_claim_id = ?2 OR to_claim_id = ?2)
1051 ORDER BY created_at ASC",
1052 )?;
1053
1054 let rows = stmt.query_map(
1055 rusqlite::params![agent_id.0.as_str(), claim_id_str.as_str()],
1056 row_to_edge,
1057 )?;
1058
1059 let mut edges = Vec::new();
1060 for row in rows {
1061 edges.push(row?);
1062 }
1063 Ok(edges)
1064 }
1065
1066 fn load_injected_claims(
1071 &self,
1072 agent_id: &AgentId,
1073 ) -> Result<Vec<ClaimRef>, SqliteStoreError> {
1074 let slot = self.conn.lock().expect("mutex poisoned");
1075 let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
1076
1077 let to_err = |msg: String| rusqlite::Error::InvalidColumnType(
1078 0, msg, rusqlite::types::Type::Text,
1079 );
1080
1081 let mut stmt = conn.prepare(
1082 "SELECT claim_id
1083 FROM ledger_entries
1084 WHERE agent_id = ?1 AND event_kind = 'ServedAsInjected'
1085 GROUP BY claim_id
1086 ORDER BY MIN(recorded_at) ASC",
1087 )?;
1088
1089 let rows = stmt.query_map(
1090 rusqlite::params![agent_id.0.as_str()],
1091 |row| {
1092 let claim_id_str: String = row.get(0)?;
1093 uuid::Uuid::parse_str(&claim_id_str)
1094 .map(ClaimRef)
1095 .map_err(|e| to_err(format!("claim_id UUID: {e}")))
1096 },
1097 )?;
1098
1099 let mut refs = Vec::new();
1100 for row in rows {
1101 refs.push(row?);
1102 }
1103 Ok(refs)
1104 }
1105
1106 fn load_lineage(
1114 &self,
1115 agent_id: &AgentId,
1116 claim_ref: &ClaimRef,
1117 ) -> Result<Vec<ClaimEdge>, SqliteStoreError> {
1118 let slot = self.conn.lock().expect("mutex poisoned");
1119 let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
1120
1121 let start_id = claim_ref.0.to_string();
1122
1123 let mut stmt = conn.prepare(
1127 "WITH RECURSIVE lineage(edge_id, depth) AS (
1128 -- Base case: all DerivedFrom edges leaving from our starting claim
1129 SELECT ce.edge_id, 1
1130 FROM claim_edges ce
1131 WHERE ce.agent_id = ?1
1132 AND ce.from_claim_id = ?2
1133 AND ce.edge_kind = 'DerivedFrom'
1134 UNION ALL
1135 -- Recursive case: follow the to_claim of the previous edge onward
1136 SELECT ce2.edge_id, l.depth + 1
1137 FROM claim_edges ce2
1138 JOIN lineage l ON ce2.from_claim_id = (
1139 SELECT to_claim_id FROM claim_edges WHERE edge_id = l.edge_id
1140 )
1141 WHERE ce2.agent_id = ?1
1142 AND ce2.edge_kind = 'DerivedFrom'
1143 AND l.depth < 64
1144 )
1145 SELECT ce.edge_id, ce.agent_id, ce.from_claim_id, ce.to_claim_id,
1146 ce.edge_kind, ce.created_at,
1147 l.depth
1148 FROM claim_edges ce
1149 JOIN lineage l ON ce.edge_id = l.edge_id
1150 ORDER BY l.depth ASC, ce.created_at ASC",
1151 )?;
1152
1153 let to_err = |msg: String| rusqlite::Error::InvalidColumnType(
1154 0, msg, rusqlite::types::Type::Text,
1155 );
1156
1157 let rows = stmt.query_map(
1158 rusqlite::params![agent_id.0.as_str(), start_id.as_str()],
1159 |row| {
1160 let edge_id_str: String = row.get(0)?;
1161 let agent_id_str: String = row.get(1)?;
1162 let from_claim_str: String = row.get(2)?;
1163 let to_claim_str: String = row.get(3)?;
1164 let kind_str: String = row.get(4)?;
1165 let created_at_str: String = row.get(5)?;
1166 let edge_id = uuid::Uuid::parse_str(&edge_id_str)
1169 .map_err(|e| to_err(format!("edge_id UUID: {e}")))?;
1170 let from_claim = uuid::Uuid::parse_str(&from_claim_str)
1171 .map(ClaimRef)
1172 .map_err(|e| to_err(format!("from_claim UUID: {e}")))?;
1173 let to_claim = uuid::Uuid::parse_str(&to_claim_str)
1174 .map(ClaimRef)
1175 .map_err(|e| to_err(format!("to_claim UUID: {e}")))?;
1176 let kind = str_to_edge_kind(&kind_str)
1177 .map_err(|e| to_err(e.to_string()))?;
1178 let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
1179 .map(|dt| dt.with_timezone(&chrono::Utc))
1180 .map_err(|e| to_err(format!("created_at parse: {e}")))?;
1181
1182 Ok(ClaimEdge {
1183 edge_id,
1184 agent_id: AgentId(agent_id_str),
1185 from_claim,
1186 to_claim,
1187 kind,
1188 created_at: TransactionTime(created_at),
1189 })
1190 },
1191 )?;
1192
1193 let mut edges = Vec::new();
1194 for row in rows {
1195 edges.push(row?);
1196 }
1197 Ok(edges)
1198 }
1199}
1200
1201pub struct SqlitePendingStore {
1213 conn: Arc<Mutex<Option<Box<Connection>>>>,
1214}
1215
1216impl SqlitePendingStore {
1217 pub fn new(conn: Arc<Mutex<Option<Box<Connection>>>>) -> Self {
1219 Self { conn }
1220 }
1221}
1222
1223unsafe impl Send for SqlitePendingStore {}
1225unsafe impl Sync for SqlitePendingStore {}
1226
1227impl PendingAdjudicationPort for SqlitePendingStore {
1228 type Error = SqliteStoreError;
1229
1230 fn insert_pending(&self, row: &PendingAdjudicationRow) -> Result<(), SqliteStoreError> {
1231 let slot = self.conn.lock().expect("mutex poisoned");
1232 let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
1233
1234 let request_payload = serde_json::to_string(&row.request_payload)
1235 .map_err(|e| SqliteStoreError::Mapping(format!("request_payload serialization: {e}")))?;
1236 let queued_at = row.queued_at.to_rfc3339();
1237 let expires_at: Option<String> = row.expires_at.map(|dt| dt.to_rfc3339());
1238
1239 conn.execute(
1240 "INSERT INTO pending_adjudications (
1241 handle_id, agent_id, subject, predicate,
1242 challenger_claim_ref, incumbent_claim_ref,
1243 request_payload, queued_at, expires_at, status
1244 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
1245 rusqlite::params![
1246 row.handle_id.to_string(),
1247 row.agent_id.0.as_str(),
1248 row.subject.as_str(),
1249 row.predicate.as_str(),
1250 row.challenger_claim_ref.0.to_string(),
1251 row.incumbent_claim_ref.0.to_string(),
1252 request_payload.as_str(),
1253 queued_at.as_str(),
1254 expires_at,
1255 row.status.as_str(),
1256 ],
1257 )?;
1258 Ok(())
1259 }
1260
1261 fn get_pending(&self, handle_id: uuid::Uuid) -> Result<Option<PendingAdjudicationRow>, SqliteStoreError> {
1262 let slot = self.conn.lock().expect("mutex poisoned");
1263 let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
1264
1265 let mut stmt = conn.prepare(
1266 "SELECT handle_id, agent_id, subject, predicate,
1267 challenger_claim_ref, incumbent_claim_ref,
1268 request_payload, queued_at, expires_at, status
1269 FROM pending_adjudications
1270 WHERE handle_id = ?1",
1271 )?;
1272
1273 let mut rows = stmt.query_map(
1274 rusqlite::params![handle_id.to_string()],
1275 row_to_pending,
1276 )?;
1277
1278 match rows.next() {
1279 None => Ok(None),
1280 Some(row) => Ok(Some(row.map_err(|e| SqliteStoreError::Mapping(e.to_string()))?)),
1281 }
1282 }
1283
1284 fn list_pending(&self, agent_id: Option<&mempill_types::AgentId>) -> Result<Vec<PendingAdjudicationRow>, SqliteStoreError> {
1285 let slot = self.conn.lock().expect("mutex poisoned");
1286 let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
1287
1288 let rows = if let Some(aid) = agent_id {
1289 let mut stmt = conn.prepare(
1290 "SELECT handle_id, agent_id, subject, predicate,
1291 challenger_claim_ref, incumbent_claim_ref,
1292 request_payload, queued_at, expires_at, status
1293 FROM pending_adjudications
1294 WHERE agent_id = ?1 AND status = 'pending'
1295 ORDER BY queued_at ASC",
1296 )?;
1297 let mapped = stmt.query_map(rusqlite::params![aid.0.as_str()], row_to_pending)?;
1298 let mut result = Vec::new();
1299 for r in mapped {
1300 result.push(r.map_err(|e| SqliteStoreError::Mapping(e.to_string()))?);
1301 }
1302 result
1303 } else {
1304 let mut stmt = conn.prepare(
1305 "SELECT handle_id, agent_id, subject, predicate,
1306 challenger_claim_ref, incumbent_claim_ref,
1307 request_payload, queued_at, expires_at, status
1308 FROM pending_adjudications
1309 WHERE status = 'pending'
1310 ORDER BY queued_at ASC",
1311 )?;
1312 let mapped = stmt.query_map([], row_to_pending)?;
1313 let mut result = Vec::new();
1314 for r in mapped {
1315 result.push(r.map_err(|e| SqliteStoreError::Mapping(e.to_string()))?);
1316 }
1317 result
1318 };
1319 Ok(rows)
1320 }
1321
1322 fn list_expired(&self, now: chrono::DateTime<chrono::Utc>) -> Result<Vec<PendingAdjudicationRow>, SqliteStoreError> {
1323 let slot = self.conn.lock().expect("mutex poisoned");
1324 let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
1325
1326 let now_str = now.to_rfc3339();
1327 let mut stmt = conn.prepare(
1328 "SELECT handle_id, agent_id, subject, predicate,
1329 challenger_claim_ref, incumbent_claim_ref,
1330 request_payload, queued_at, expires_at, status
1331 FROM pending_adjudications
1332 WHERE expires_at IS NOT NULL AND expires_at <= ?1 AND status = 'pending'
1333 ORDER BY expires_at ASC",
1334 )?;
1335 let mapped = stmt.query_map(rusqlite::params![now_str.as_str()], row_to_pending)?;
1336 let mut result = Vec::new();
1337 for r in mapped {
1338 result.push(r.map_err(|e| SqliteStoreError::Mapping(e.to_string()))?);
1339 }
1340 Ok(result)
1341 }
1342
1343 fn mark_resolved(&self, handle_id: uuid::Uuid) -> Result<(), SqliteStoreError> {
1344 let slot = self.conn.lock().expect("mutex poisoned");
1345 let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
1346
1347 conn.execute(
1348 "UPDATE pending_adjudications SET status = 'resolved' WHERE handle_id = ?1",
1349 rusqlite::params![handle_id.to_string()],
1350 )?;
1351 Ok(())
1352 }
1353
1354 fn mark_expired(&self, handle_id: uuid::Uuid) -> Result<(), SqliteStoreError> {
1355 let slot = self.conn.lock().expect("mutex poisoned");
1356 let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
1357
1358 conn.execute(
1359 "UPDATE pending_adjudications SET status = 'expired' WHERE handle_id = ?1",
1360 rusqlite::params![handle_id.to_string()],
1361 )?;
1362 Ok(())
1363 }
1364
1365 fn list_queued_orphan_claims(
1375 &self,
1376 ) -> Result<Vec<mempill_core::ports::pending_adjudication::OrphanedQueuedClaim>, SqliteStoreError> {
1377 let slot = self.conn.lock().expect("mutex poisoned");
1378 let conn = slot.as_ref().ok_or(SqliteStoreError::TxnAlreadyOpen)?;
1379
1380 let mut stmt = conn.prepare(
1383 "SELECT l.agent_id, l.claim_id, c.subject, c.predicate
1384 FROM ledger_entries l
1385 JOIN claims c ON c.claim_id = l.claim_id AND c.agent_id = l.agent_id
1386 WHERE l.disposition = 'QueuedForAdjudication'
1387 AND l.recorded_at = (
1388 SELECT MAX(l2.recorded_at) FROM ledger_entries l2
1389 WHERE l2.claim_id = l.claim_id AND l2.agent_id = l.agent_id
1390 )
1391 AND NOT EXISTS (
1392 SELECT 1 FROM pending_adjudications pa
1393 WHERE pa.challenger_claim_ref = l.claim_id
1394 AND pa.agent_id = l.agent_id
1395 AND pa.status = 'pending'
1396 )",
1397 )?;
1398
1399 let orphan_rows: Vec<(String, String, String, String)> = stmt
1400 .query_map([], |row| {
1401 Ok((
1402 row.get::<_, String>(0)?,
1403 row.get::<_, String>(1)?,
1404 row.get::<_, String>(2)?,
1405 row.get::<_, String>(3)?,
1406 ))
1407 })?
1408 .filter_map(|r| r.ok())
1409 .collect();
1410
1411 let mut results = Vec::new();
1412 for (agent_id_str, challenger_str, subject, predicate) in orphan_rows {
1413 use mempill_types::ClaimRef;
1414
1415 let challenger_ref = uuid::Uuid::parse_str(&challenger_str)
1416 .map(ClaimRef)
1417 .map_err(|e| SqliteStoreError::Mapping(format!("challenger_claim_ref UUID: {e}")))?;
1418
1419 let incumbent_ref = find_committed_cheap_claim(conn, &agent_id_str, &subject, &predicate)?;
1421
1422 results.push(mempill_core::ports::pending_adjudication::OrphanedQueuedClaim {
1423 agent_id: mempill_types::AgentId(agent_id_str),
1424 challenger_claim_ref: challenger_ref,
1425 incumbent_claim_ref: incumbent_ref,
1426 subject,
1427 predicate,
1428 });
1429 }
1430
1431 Ok(results)
1432 }
1433}
1434
1435fn row_to_pending(row: &rusqlite::Row<'_>) -> Result<PendingAdjudicationRow, rusqlite::Error> {
1449 let to_err = |msg: String| rusqlite::Error::InvalidColumnType(
1450 0, msg, rusqlite::types::Type::Text,
1451 );
1452
1453 let handle_id_str: String = row.get(0)?;
1454 let agent_id_str: String = row.get(1)?;
1455 let subject: String = row.get(2)?;
1456 let predicate: String = row.get(3)?;
1457 let challenger_str: String = row.get(4)?;
1458 let incumbent_str: String = row.get(5)?;
1459 let payload_json: String = row.get(6)?;
1460 let queued_at_str: String = row.get(7)?;
1461 let expires_at_str: Option<String> = row.get(8)?;
1462 let status: String = row.get(9)?;
1463
1464 let handle_id = uuid::Uuid::parse_str(&handle_id_str)
1465 .map_err(|e| to_err(format!("handle_id UUID: {e}")))?;
1466 let challenger_claim_ref = uuid::Uuid::parse_str(&challenger_str)
1467 .map(ClaimRef)
1468 .map_err(|e| to_err(format!("challenger_claim_ref UUID: {e}")))?;
1469 let incumbent_claim_ref = uuid::Uuid::parse_str(&incumbent_str)
1470 .map(ClaimRef)
1471 .map_err(|e| to_err(format!("incumbent_claim_ref UUID: {e}")))?;
1472 let request_payload: mempill_types::AdjudicationRequest =
1473 serde_json::from_str(&payload_json)
1474 .map_err(|e| to_err(format!("request_payload JSON: {e}")))?;
1475 let queued_at = chrono::DateTime::parse_from_rfc3339(&queued_at_str)
1476 .map(|dt| dt.with_timezone(&chrono::Utc))
1477 .map_err(|e| to_err(format!("queued_at parse: {e}")))?;
1478 let expires_at = expires_at_str
1479 .map(|s| {
1480 chrono::DateTime::parse_from_rfc3339(&s)
1481 .map(|dt| dt.with_timezone(&chrono::Utc))
1482 .map_err(|e| to_err(format!("expires_at parse: {e}")))
1483 })
1484 .transpose()?;
1485
1486 Ok(PendingAdjudicationRow {
1487 handle_id,
1488 agent_id: AgentId(agent_id_str),
1489 subject,
1490 predicate,
1491 challenger_claim_ref,
1492 incumbent_claim_ref,
1493 request_payload,
1494 queued_at,
1495 expires_at,
1496 status,
1497 })
1498}
1499
1500fn find_committed_cheap_claim(
1508 conn: &Connection,
1509 agent_id: &str,
1510 subject: &str,
1511 predicate: &str,
1512) -> Result<Option<mempill_types::ClaimRef>, SqliteStoreError> {
1513 let mut stmt = conn.prepare(
1515 "SELECT l.claim_id
1516 FROM ledger_entries l
1517 JOIN claims c ON c.claim_id = l.claim_id AND c.agent_id = l.agent_id
1518 WHERE l.agent_id = ?1
1519 AND c.subject = ?2
1520 AND c.predicate = ?3
1521 AND l.disposition = 'CommittedCheap'
1522 AND l.recorded_at = (
1523 SELECT MAX(l2.recorded_at) FROM ledger_entries l2
1524 WHERE l2.claim_id = l.claim_id AND l2.agent_id = l.agent_id
1525 )
1526 ORDER BY l.recorded_at DESC
1527 LIMIT 1",
1528 )?;
1529
1530 let mut rows = stmt.query_map(rusqlite::params![agent_id, subject, predicate], |row| {
1531 row.get::<_, String>(0)
1532 })?;
1533
1534 if let Some(Ok(ref_str)) = rows.next() {
1535 let claim_ref = uuid::Uuid::parse_str(&ref_str)
1536 .map(mempill_types::ClaimRef)
1537 .map_err(|e| SqliteStoreError::Mapping(format!("incumbent_claim_ref UUID: {e}")))?;
1538 Ok(Some(claim_ref))
1539 } else {
1540 Ok(None)
1541 }
1542}
1543
1544#[cfg(test)]
1547mod tests {
1548 use super::*;
1549 use crate::connection::open_in_memory;
1550 use chrono::Utc;
1551 use mempill_types::{
1552 claim::{Cardinality, Claim, Confidence, Criticality, Fact},
1553 disposition::Disposition,
1554 identity::AgentId,
1555 ledger::LedgerEventKind,
1556 provenance::{ExternalAnchor, ExternalKind, ProvenanceLabel},
1557 time::{TransactionTime, ValidTime},
1558 validity::AssertionKind,
1559 };
1560 use uuid::Uuid;
1561
1562 fn make_store() -> SqlitePersistenceStore {
1563 let conn = open_in_memory().expect("in-memory connection should open");
1564 SqlitePersistenceStore::new(conn)
1565 }
1566
1567 fn make_agent() -> AgentId {
1568 AgentId("test-agent-1".into())
1569 }
1570
1571 fn make_claim(agent_id: &AgentId) -> Claim {
1572 Claim::new(
1573 ClaimRef(Uuid::new_v4()),
1574 agent_id.clone(),
1575 Fact {
1576 subject: "user".into(),
1577 predicate: "favourite_colour".into(),
1578 value: serde_json::json!("blue"),
1579 },
1580 Cardinality::Functional,
1581 ProvenanceLabel::External(ExternalKind::UserAsserted),
1582 ExternalAnchor { nearest_external_anchor: None, derivation_depth: 0 },
1583 TransactionTime(Utc::now()),
1584 ValidTime { start: None, end: None, valid_time_confidence: 0.0 },
1585 Confidence { value_confidence: 0.9, valid_time_confidence: 0.0 },
1586 Criticality::Low,
1587 vec![],
1588 None,
1589 None,
1590 )
1591 }
1592
1593 fn make_ledger_entry(
1594 agent_id: &AgentId,
1595 claim_ref: &ClaimRef,
1596 ) -> LedgerEntry {
1597 LedgerEntry {
1598 entry_id: Uuid::new_v4(),
1599 agent_id: agent_id.clone(),
1600 claim_ref: claim_ref.clone(),
1601 event_kind: LedgerEventKind::ClaimCommitted,
1602 disposition: Disposition::CommittedCheap,
1603 rationale: None,
1604 recorded_at: TransactionTime(Utc::now()),
1605 }
1606 }
1607
1608 #[test]
1613 fn write_round_trip_claim_persists_after_commit() {
1614 let store = make_store();
1615 let agent = make_agent();
1616 let claim = make_claim(&agent);
1617 let claim_id = claim.claim_ref().0.to_string();
1618
1619 let mut txn = store.begin_atomic(&agent).expect("begin_atomic should succeed");
1620 store.append_claim(&mut txn, &claim).expect("append_claim should succeed");
1621 store.commit(txn).expect("commit should succeed");
1622
1623 let slot = store.conn.lock().unwrap();
1625 let conn = slot.as_ref().expect("connection must be back after commit");
1626 let count: i64 = conn
1627 .query_row(
1628 "SELECT COUNT(*) FROM claims WHERE claim_id = ?1",
1629 [claim_id.as_str()],
1630 |r| r.get(0),
1631 )
1632 .expect("SELECT should succeed");
1633 assert_eq!(count, 1, "claim row must exist after commit");
1634 }
1635
1636 #[test]
1638 fn write_round_trip_provenance_not_null() {
1639 let store = make_store();
1640 let agent = make_agent();
1641 let claim = make_claim(&agent);
1642 let claim_id = claim.claim_ref().0.to_string();
1643
1644 let mut txn = store.begin_atomic(&agent).expect("begin_atomic should succeed");
1645 store.append_claim(&mut txn, &claim).expect("append_claim should succeed");
1646 store.commit(txn).expect("commit should succeed");
1647
1648 let slot = store.conn.lock().unwrap();
1649 let conn = slot.as_ref().unwrap();
1650
1651 let prov: String = conn
1653 .query_row(
1654 "SELECT provenance_label FROM claims WHERE claim_id = ?1",
1655 [claim_id.as_str()],
1656 |r| r.get(0),
1657 )
1658 .expect("provenance_label must be selectable");
1659 assert_eq!(
1660 prov, "External_UserAsserted",
1661 "provenance_label column must be non-NULL and correct"
1662 );
1663
1664 let tx_time: String = conn
1666 .query_row(
1667 "SELECT tx_time FROM claims WHERE claim_id = ?1",
1668 [claim_id.as_str()],
1669 |r| r.get(0),
1670 )
1671 .expect("tx_time must be selectable");
1672 assert!(!tx_time.is_empty(), "tx_time must be non-NULL");
1673 }
1674
1675 #[test]
1680 fn atomicity_rollback_leaves_zero_rows() {
1681 let store = make_store();
1682 let agent = make_agent();
1683 let claim = make_claim(&agent);
1684 let claim_ref = claim.claim_ref().clone();
1685 let claim_id = claim_ref.0.to_string();
1686
1687 let assertion = mempill_types::validity::ValidityAssertion {
1688 assertion_ref: Uuid::new_v4(),
1689 agent_id: agent.clone(),
1690 target_claim: claim_ref.clone(),
1691 kind: AssertionKind::Bound { bound_at: Utc::now() },
1692 provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
1693 confidence: mempill_types::claim::Confidence {
1694 value_confidence: 0.9,
1695 valid_time_confidence: 0.9,
1696 },
1697 asserted_at: TransactionTime(Utc::now()),
1698 };
1699 let assertion_id = assertion.assertion_ref.to_string();
1700
1701 let ledger_entry = make_ledger_entry(&agent, &claim_ref);
1702 let entry_id = ledger_entry.entry_id.to_string();
1703
1704 let mut txn = store.begin_atomic(&agent).expect("begin_atomic should succeed");
1705 store.append_claim(&mut txn, &claim).expect("append_claim in txn should succeed");
1706 store
1707 .append_validity_assertion(&mut txn, &assertion)
1708 .expect("append_validity_assertion in txn should succeed");
1709 store
1710 .append_ledger_entry(&mut txn, &ledger_entry)
1711 .expect("append_ledger_entry in txn should succeed");
1712
1713 store.rollback(txn).expect("rollback should succeed");
1715
1716 let slot = store.conn.lock().unwrap();
1717 let conn = slot.as_ref().expect("connection must be back after rollback");
1718
1719 let claim_count: i64 = conn
1720 .query_row(
1721 "SELECT COUNT(*) FROM claims WHERE claim_id = ?1",
1722 [claim_id.as_str()],
1723 |r| r.get(0),
1724 )
1725 .unwrap();
1726 let assertion_count: i64 = conn
1727 .query_row(
1728 "SELECT COUNT(*) FROM validity_assertions WHERE assertion_id = ?1",
1729 [assertion_id.as_str()],
1730 |r| r.get(0),
1731 )
1732 .unwrap();
1733 let ledger_count: i64 = conn
1734 .query_row(
1735 "SELECT COUNT(*) FROM ledger_entries WHERE entry_id = ?1",
1736 [entry_id.as_str()],
1737 |r| r.get(0),
1738 )
1739 .unwrap();
1740
1741 assert_eq!(claim_count, 0, "claim row must not exist after rollback");
1742 assert_eq!(assertion_count, 0, "validity_assertion row must not exist after rollback");
1743 assert_eq!(ledger_count, 0, "ledger_entry row must not exist after rollback");
1744 }
1745
1746 #[test]
1749 fn write_round_trip_validity_assertion() {
1750 let store = make_store();
1751 let agent = make_agent();
1752 let claim = make_claim(&agent);
1753 let claim_ref = claim.claim_ref().clone();
1754
1755 let assertion = mempill_types::validity::ValidityAssertion {
1756 assertion_ref: Uuid::new_v4(),
1757 agent_id: agent.clone(),
1758 target_claim: claim_ref.clone(),
1759 kind: AssertionKind::Bound { bound_at: Utc::now() },
1760 provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
1761 confidence: mempill_types::claim::Confidence {
1762 value_confidence: 0.95,
1763 valid_time_confidence: 0.8,
1764 },
1765 asserted_at: TransactionTime(Utc::now()),
1766 };
1767 let assertion_id = assertion.assertion_ref.to_string();
1768
1769 let mut txn = store.begin_atomic(&agent).unwrap();
1770 store.append_claim(&mut txn, &claim).unwrap();
1771 store.append_validity_assertion(&mut txn, &assertion).unwrap();
1772 store.commit(txn).unwrap();
1773
1774 let slot = store.conn.lock().unwrap();
1775 let conn = slot.as_ref().unwrap();
1776 let count: i64 = conn
1777 .query_row(
1778 "SELECT COUNT(*) FROM validity_assertions WHERE assertion_id = ?1",
1779 [assertion_id.as_str()],
1780 |r| r.get(0),
1781 )
1782 .unwrap();
1783 assert_eq!(count, 1, "validity_assertion row must exist after commit");
1784 }
1785
1786 #[test]
1789 fn write_round_trip_ledger_entry() {
1790 let store = make_store();
1791 let agent = make_agent();
1792 let claim = make_claim(&agent);
1793 let claim_ref = claim.claim_ref().clone();
1794 let entry = make_ledger_entry(&agent, &claim_ref);
1795 let entry_id = entry.entry_id.to_string();
1796
1797 let mut txn = store.begin_atomic(&agent).unwrap();
1798 store.append_claim(&mut txn, &claim).unwrap();
1799 store.append_ledger_entry(&mut txn, &entry).unwrap();
1800 store.commit(txn).unwrap();
1801
1802 let slot = store.conn.lock().unwrap();
1803 let conn = slot.as_ref().unwrap();
1804 let count: i64 = conn
1805 .query_row(
1806 "SELECT COUNT(*) FROM ledger_entries WHERE entry_id = ?1",
1807 [entry_id.as_str()],
1808 |r| r.get(0),
1809 )
1810 .unwrap();
1811 assert_eq!(count, 1, "ledger_entry row must exist after commit");
1812 }
1813
1814 #[test]
1817 fn write_round_trip_claim_edge() {
1818 let store = make_store();
1819 let agent = make_agent();
1820 let from_claim = make_claim(&agent);
1821 let to_claim = make_claim(&agent);
1822 let from_ref = from_claim.claim_ref().clone();
1823 let to_ref = to_claim.claim_ref().clone();
1824
1825 let edge = ClaimEdge {
1826 edge_id: Uuid::new_v4(),
1827 agent_id: agent.clone(),
1828 from_claim: from_ref.clone(),
1829 to_claim: to_ref.clone(),
1830 kind: EdgeKind::DerivedFrom,
1831 created_at: TransactionTime(Utc::now()),
1832 };
1833 let edge_id = edge.edge_id.to_string();
1834
1835 let mut txn = store.begin_atomic(&agent).unwrap();
1836 store.append_claim(&mut txn, &from_claim).unwrap();
1837 store.append_claim(&mut txn, &to_claim).unwrap();
1838 store.append_claim_edge(&mut txn, &edge).unwrap();
1839 store.commit(txn).unwrap();
1840
1841 let slot = store.conn.lock().unwrap();
1842 let conn = slot.as_ref().unwrap();
1843 let count: i64 = conn
1844 .query_row(
1845 "SELECT COUNT(*) FROM claim_edges WHERE edge_id = ?1",
1846 [edge_id.as_str()],
1847 |r| r.get(0),
1848 )
1849 .unwrap();
1850 assert_eq!(count, 1, "claim_edge row must exist after commit");
1851 }
1852
1853 #[test]
1857 fn read_load_claim_round_trip() {
1858 let store = make_store();
1859 let agent = make_agent();
1860 let claim = make_claim(&agent);
1861 let claim_ref = claim.claim_ref().clone();
1862
1863 let mut txn = store.begin_atomic(&agent).unwrap();
1864 store.append_claim(&mut txn, &claim).unwrap();
1865 store.commit(txn).unwrap();
1866
1867 let loaded = store.load_claim(&agent, &claim_ref).unwrap();
1868 assert!(loaded.is_some(), "load_claim must return Some for existing claim");
1869 let loaded = loaded.unwrap();
1870 assert_eq!(loaded.claim_ref(), &claim_ref);
1871 assert_eq!(loaded.agent_id(), &agent);
1872 assert_eq!(loaded.fact().subject, "user");
1873 assert_eq!(loaded.fact().predicate, "favourite_colour");
1874 assert_eq!(loaded.fact().value, serde_json::json!("blue"));
1875 assert_eq!(loaded.provenance(), claim.provenance());
1876 assert_eq!(loaded.cardinality(), claim.cardinality());
1877 assert_eq!(loaded.criticality(), claim.criticality());
1878 }
1879
1880 #[test]
1882 fn read_load_claim_missing_returns_none() {
1883 let store = make_store();
1884 let agent = make_agent();
1885 let missing_ref = ClaimRef(Uuid::new_v4());
1886 let result = store.load_claim(&agent, &missing_ref).unwrap();
1887 assert!(result.is_none(), "load_claim must return None for unknown claim_ref");
1888 }
1889
1890 #[test]
1892 fn read_load_subject_line_round_trip() {
1893 let store = make_store();
1894 let agent = make_agent();
1895 let claim = make_claim(&agent);
1896 let claim_ref = claim.claim_ref().clone();
1897
1898 let mut txn = store.begin_atomic(&agent).unwrap();
1899 store.append_claim(&mut txn, &claim).unwrap();
1900 store.commit(txn).unwrap();
1901
1902 let claims = store.load_subject_line(&agent, "user", "favourite_colour").unwrap();
1903 assert_eq!(claims.len(), 1, "load_subject_line must return the single written claim");
1904 assert_eq!(claims[0].claim_ref(), &claim_ref);
1905 }
1906
1907 #[test]
1909 fn read_load_subject_line_empty_when_no_match() {
1910 let store = make_store();
1911 let agent = make_agent();
1912 let claims = store.load_subject_line(&agent, "nonexistent", "pred").unwrap();
1913 assert!(claims.is_empty(), "load_subject_line must return empty vec for unknown subject-line");
1914 }
1915
1916 #[test]
1918 fn read_load_validity_assertions_round_trip() {
1919 let store = make_store();
1920 let agent = make_agent();
1921 let claim = make_claim(&agent);
1922 let claim_ref = claim.claim_ref().clone();
1923
1924 let assertion = mempill_types::validity::ValidityAssertion {
1925 assertion_ref: Uuid::new_v4(),
1926 agent_id: agent.clone(),
1927 target_claim: claim_ref.clone(),
1928 kind: AssertionKind::Bound { bound_at: Utc::now() },
1929 provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
1930 confidence: mempill_types::claim::Confidence {
1931 value_confidence: 0.9,
1932 valid_time_confidence: 0.8,
1933 },
1934 asserted_at: TransactionTime(Utc::now()),
1935 };
1936
1937 let mut txn = store.begin_atomic(&agent).unwrap();
1938 store.append_claim(&mut txn, &claim).unwrap();
1939 store.append_validity_assertion(&mut txn, &assertion).unwrap();
1940 store.commit(txn).unwrap();
1941
1942 let loaded = store.load_validity_assertions_for(&agent, &claim_ref).unwrap();
1943 assert_eq!(loaded.len(), 1, "must return one validity assertion");
1944 assert_eq!(loaded[0].assertion_ref, assertion.assertion_ref);
1945 assert_eq!(loaded[0].target_claim, claim_ref);
1946 assert!(matches!(loaded[0].kind, AssertionKind::Bound { .. }));
1947 }
1948
1949 #[test]
1951 fn read_load_validity_assertions_empty_when_none() {
1952 let store = make_store();
1953 let agent = make_agent();
1954 let claim = make_claim(&agent);
1955 let claim_ref = claim.claim_ref().clone();
1956
1957 let mut txn = store.begin_atomic(&agent).unwrap();
1958 store.append_claim(&mut txn, &claim).unwrap();
1959 store.commit(txn).unwrap();
1960
1961 let loaded = store.load_validity_assertions_for(&agent, &claim_ref).unwrap();
1962 assert!(loaded.is_empty(), "must return empty vec when no assertions");
1963 }
1964
1965 #[test]
1967 fn read_load_ledger_round_trip() {
1968 let store = make_store();
1969 let agent = make_agent();
1970 let claim = make_claim(&agent);
1971 let claim_ref = claim.claim_ref().clone();
1972 let entry = make_ledger_entry(&agent, &claim_ref);
1973
1974 let mut txn = store.begin_atomic(&agent).unwrap();
1975 store.append_claim(&mut txn, &claim).unwrap();
1976 store.append_ledger_entry(&mut txn, &entry).unwrap();
1977 store.commit(txn).unwrap();
1978
1979 let loaded = store.load_ledger(&agent, None, 100).unwrap();
1980 assert_eq!(loaded.len(), 1, "must return one ledger entry");
1981 assert_eq!(loaded[0].entry_id, entry.entry_id);
1982 assert_eq!(loaded[0].claim_ref, claim_ref);
1983 assert_eq!(loaded[0].event_kind, LedgerEventKind::ClaimCommitted);
1984 }
1985
1986 #[test]
1988 fn read_load_ledger_from_bound_filters_earlier_entries() {
1989 let store = make_store();
1990 let agent = make_agent();
1991
1992 let claim_early = make_claim(&agent);
1994 let claim_late = make_claim(&agent);
1995 let ref_early = claim_early.claim_ref().clone();
1996 let ref_late = claim_late.claim_ref().clone();
1997
1998 let t_early = TransactionTime(Utc::now() - chrono::Duration::seconds(10));
1999 let t_late = TransactionTime(Utc::now());
2000
2001 let entry_early = mempill_types::ledger::LedgerEntry {
2002 entry_id: Uuid::new_v4(),
2003 agent_id: agent.clone(),
2004 claim_ref: ref_early.clone(),
2005 event_kind: LedgerEventKind::ClaimCommitted,
2006 disposition: mempill_types::disposition::Disposition::CommittedCheap,
2007 rationale: None,
2008 recorded_at: t_early.clone(),
2009 };
2010 let entry_late = mempill_types::ledger::LedgerEntry {
2011 entry_id: Uuid::new_v4(),
2012 agent_id: agent.clone(),
2013 claim_ref: ref_late.clone(),
2014 event_kind: LedgerEventKind::ClaimCommitted,
2015 disposition: mempill_types::disposition::Disposition::CommittedCheap,
2016 rationale: None,
2017 recorded_at: t_late.clone(),
2018 };
2019
2020 let mut txn = store.begin_atomic(&agent).unwrap();
2021 store.append_claim(&mut txn, &claim_early).unwrap();
2022 store.append_claim(&mut txn, &claim_late).unwrap();
2023 store.append_ledger_entry(&mut txn, &entry_early).unwrap();
2024 store.append_ledger_entry(&mut txn, &entry_late).unwrap();
2025 store.commit(txn).unwrap();
2026
2027 let loaded = store.load_ledger(&agent, Some(&t_late), 100).unwrap();
2029 assert_eq!(loaded.len(), 1, "only the late entry must be returned when from=t_late");
2030 assert_eq!(loaded[0].entry_id, entry_late.entry_id);
2031 }
2032
2033 #[test]
2035 fn read_load_ledger_empty_when_none() {
2036 let store = make_store();
2037 let agent = make_agent();
2038 let loaded = store.load_ledger(&agent, None, 100).unwrap();
2039 assert!(loaded.is_empty(), "must return empty vec when no ledger entries");
2040 }
2041
2042 #[test]
2044 fn read_load_edges_for_ordering_created_at_asc() {
2045 let store = make_store();
2046 let agent = make_agent();
2047
2048 let claim_a = make_claim(&agent);
2049 let claim_b = make_claim(&agent);
2050 let claim_c = make_claim(&agent);
2051 let ref_a = claim_a.claim_ref().clone();
2052 let ref_b = claim_b.claim_ref().clone();
2053 let ref_c = claim_c.claim_ref().clone();
2054
2055 let t1 = TransactionTime(Utc::now() - chrono::Duration::seconds(5));
2057 let t2 = TransactionTime(Utc::now());
2058
2059 let edge_ab = ClaimEdge {
2060 edge_id: Uuid::new_v4(),
2061 agent_id: agent.clone(),
2062 from_claim: ref_a.clone(),
2063 to_claim: ref_b.clone(),
2064 kind: EdgeKind::DependsOn,
2065 created_at: t1,
2066 };
2067 let edge_ac = ClaimEdge {
2068 edge_id: Uuid::new_v4(),
2069 agent_id: agent.clone(),
2070 from_claim: ref_a.clone(),
2071 to_claim: ref_c.clone(),
2072 kind: EdgeKind::DependsOn,
2073 created_at: t2,
2074 };
2075
2076 let mut txn = store.begin_atomic(&agent).unwrap();
2077 store.append_claim(&mut txn, &claim_a).unwrap();
2079 store.append_claim(&mut txn, &claim_b).unwrap();
2080 store.append_claim(&mut txn, &claim_c).unwrap();
2081 store.append_claim_edge(&mut txn, &edge_ac).unwrap(); store.append_claim_edge(&mut txn, &edge_ab).unwrap(); store.commit(txn).unwrap();
2084
2085 let loaded = store.load_edges_for(&agent, &ref_a).unwrap();
2086 assert_eq!(loaded.len(), 2, "must return both edges");
2087 assert_eq!(loaded[0].to_claim, ref_b, "earlier edge (A→B) must be first");
2089 assert_eq!(loaded[1].to_claim, ref_c, "later edge (A→C) must be second");
2090 }
2091
2092 #[test]
2094 fn read_load_edges_for_empty_when_none() {
2095 let store = make_store();
2096 let agent = make_agent();
2097 let claim = make_claim(&agent);
2098 let claim_ref = claim.claim_ref().clone();
2099
2100 let mut txn = store.begin_atomic(&agent).unwrap();
2101 store.append_claim(&mut txn, &claim).unwrap();
2102 store.commit(txn).unwrap();
2103
2104 let loaded = store.load_edges_for(&agent, &claim_ref).unwrap();
2105 assert!(loaded.is_empty(), "must return empty vec when no edges");
2106 }
2107
2108 #[test]
2110 fn read_load_injected_claims_round_trip() {
2111 use mempill_types::disposition::Disposition;
2112
2113 let store = make_store();
2114 let agent = make_agent();
2115 let claim = make_claim(&agent);
2116 let claim_ref = claim.claim_ref().clone();
2117
2118 let injected_entry = mempill_types::ledger::LedgerEntry {
2119 entry_id: Uuid::new_v4(),
2120 agent_id: agent.clone(),
2121 claim_ref: claim_ref.clone(),
2122 event_kind: LedgerEventKind::ServedAsInjected,
2123 disposition: Disposition::CommittedCheap,
2124 rationale: None,
2125 recorded_at: TransactionTime(Utc::now()),
2126 };
2127
2128 let mut txn = store.begin_atomic(&agent).unwrap();
2129 store.append_claim(&mut txn, &claim).unwrap();
2130 store.append_ledger_entry(&mut txn, &injected_entry).unwrap();
2131 store.commit(txn).unwrap();
2132
2133 let loaded = store.load_injected_claims(&agent).unwrap();
2134 assert_eq!(loaded.len(), 1, "must return one injected claim ref");
2135 assert_eq!(loaded[0], claim_ref);
2136 }
2137
2138 #[test]
2140 fn read_load_injected_claims_empty_when_none() {
2141 let store = make_store();
2142 let agent = make_agent();
2143 let loaded = store.load_injected_claims(&agent).unwrap();
2144 assert!(loaded.is_empty(), "must return empty vec when no injected claims");
2145 }
2146
2147 #[test]
2149 fn read_load_lineage_multi_hop_derived_from() {
2150 let store = make_store();
2151 let agent = make_agent();
2152
2153 let claim_a = make_claim(&agent);
2156 let claim_b = make_claim(&agent);
2157 let claim_c = make_claim(&agent);
2158 let ref_a = claim_a.claim_ref().clone();
2159 let ref_b = claim_b.claim_ref().clone();
2160 let ref_c = claim_c.claim_ref().clone();
2161
2162 let edge_ab = ClaimEdge {
2163 edge_id: Uuid::new_v4(),
2164 agent_id: agent.clone(),
2165 from_claim: ref_a.clone(),
2166 to_claim: ref_b.clone(),
2167 kind: EdgeKind::DerivedFrom,
2168 created_at: TransactionTime(Utc::now() - chrono::Duration::seconds(2)),
2169 };
2170 let edge_bc = ClaimEdge {
2171 edge_id: Uuid::new_v4(),
2172 agent_id: agent.clone(),
2173 from_claim: ref_b.clone(),
2174 to_claim: ref_c.clone(),
2175 kind: EdgeKind::DerivedFrom,
2176 created_at: TransactionTime(Utc::now() - chrono::Duration::seconds(1)),
2177 };
2178
2179 let mut txn = store.begin_atomic(&agent).unwrap();
2180 store.append_claim(&mut txn, &claim_a).unwrap();
2181 store.append_claim(&mut txn, &claim_b).unwrap();
2182 store.append_claim(&mut txn, &claim_c).unwrap();
2183 store.append_claim_edge(&mut txn, &edge_ab).unwrap();
2184 store.append_claim_edge(&mut txn, &edge_bc).unwrap();
2185 store.commit(txn).unwrap();
2186
2187 let lineage = store.load_lineage(&agent, &ref_a).unwrap();
2188 assert_eq!(lineage.len(), 2, "lineage must contain both DerivedFrom hops A→B and B→C");
2189
2190 assert_eq!(lineage[0].from_claim, ref_a, "first edge must start from A");
2192 assert_eq!(lineage[0].to_claim, ref_b, "first edge must point to B");
2193 assert_eq!(lineage[1].from_claim, ref_b, "second edge must start from B");
2195 assert_eq!(lineage[1].to_claim, ref_c, "second edge must point to C");
2196 }
2197
2198 #[test]
2200 fn read_load_lineage_empty_when_no_derived_from_edges() {
2201 let store = make_store();
2202 let agent = make_agent();
2203 let claim = make_claim(&agent);
2204 let claim_ref = claim.claim_ref().clone();
2205
2206 let mut txn = store.begin_atomic(&agent).unwrap();
2207 store.append_claim(&mut txn, &claim).unwrap();
2208 store.commit(txn).unwrap();
2209
2210 let lineage = store.load_lineage(&agent, &claim_ref).unwrap();
2211 assert!(lineage.is_empty(), "load_lineage must return empty vec when no DerivedFrom edges");
2212 }
2213
2214 #[test]
2217 fn begin_atomic_while_txn_open_returns_error() {
2218 let store = make_store();
2219 let agent = make_agent();
2220
2221 let _txn = store.begin_atomic(&agent).expect("first begin_atomic should succeed");
2222 let result = store.begin_atomic(&agent);
2223 assert!(
2224 matches!(result, Err(SqliteStoreError::TxnAlreadyOpen)),
2225 "second begin_atomic must return TxnAlreadyOpen"
2226 );
2227 }
2228
2229 #[test]
2234 fn atomic_unit_all_four_rows_on_commit() {
2235 let store = make_store();
2236 let agent = make_agent();
2237 let claim_a = make_claim(&agent);
2238 let claim_b = make_claim(&agent);
2239 let claim_ref_a = claim_a.claim_ref().clone();
2240 let claim_ref_b = claim_b.claim_ref().clone();
2241
2242 let assertion = mempill_types::validity::ValidityAssertion {
2243 assertion_ref: Uuid::new_v4(),
2244 agent_id: agent.clone(),
2245 target_claim: claim_ref_a.clone(),
2246 kind: AssertionKind::Bound { bound_at: Utc::now() },
2247 provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
2248 confidence: mempill_types::claim::Confidence {
2249 value_confidence: 0.9,
2250 valid_time_confidence: 0.9,
2251 },
2252 asserted_at: TransactionTime(Utc::now()),
2253 };
2254 let ledger = make_ledger_entry(&agent, &claim_ref_a);
2255 let edge = ClaimEdge {
2256 edge_id: Uuid::new_v4(),
2257 agent_id: agent.clone(),
2258 from_claim: claim_ref_a.clone(),
2259 to_claim: claim_ref_b.clone(),
2260 kind: EdgeKind::Supersedes,
2261 created_at: TransactionTime(Utc::now()),
2262 };
2263
2264 let mut txn = store.begin_atomic(&agent).unwrap();
2265 store.append_claim(&mut txn, &claim_a).unwrap();
2266 store.append_claim(&mut txn, &claim_b).unwrap();
2267 store.append_validity_assertion(&mut txn, &assertion).unwrap();
2268 store.append_ledger_entry(&mut txn, &ledger).unwrap();
2269 store.append_claim_edge(&mut txn, &edge).unwrap();
2270 store.commit(txn).unwrap();
2271
2272 let slot = store.conn.lock().unwrap();
2273 let conn = slot.as_ref().unwrap();
2274
2275 let claims: i64 = conn
2276 .query_row("SELECT COUNT(*) FROM claims", [], |r| r.get(0))
2277 .unwrap();
2278 let assertions: i64 = conn
2279 .query_row("SELECT COUNT(*) FROM validity_assertions", [], |r| r.get(0))
2280 .unwrap();
2281 let ledger_count: i64 = conn
2282 .query_row("SELECT COUNT(*) FROM ledger_entries", [], |r| r.get(0))
2283 .unwrap();
2284 let edges: i64 = conn
2285 .query_row("SELECT COUNT(*) FROM claim_edges", [], |r| r.get(0))
2286 .unwrap();
2287
2288 assert_eq!(claims, 2, "two claim rows must exist");
2289 assert_eq!(assertions, 1, "one validity_assertion row must exist");
2290 assert_eq!(ledger_count, 1, "one ledger_entry row must exist");
2291 assert_eq!(edges, 1, "one claim_edge row must exist");
2292 }
2293
2294 use mempill_core::ports::pending_adjudication::{PendingAdjudicationPort, PendingAdjudicationRow};
2297 use mempill_types::{
2298 AdjudicationRequest, Belief, CurrencySignal, CurrencyState, OverturnReason, SubjectLineRef,
2299 };
2300
2301 fn make_adj_request(agent: &AgentId) -> AdjudicationRequest {
2302 let claim_ref = ClaimRef(Uuid::new_v4());
2303 let now = TransactionTime(Utc::now());
2304 AdjudicationRequest {
2305 subject_line: SubjectLineRef {
2306 agent_id: agent.clone(),
2307 subject: "user".into(),
2308 predicate: "city".into(),
2309 },
2310 incumbent: Belief {
2311 claim_ref: claim_ref.clone(),
2312 fact: mempill_types::Fact {
2313 subject: "user".into(),
2314 predicate: "city".into(),
2315 value: serde_json::json!("Berlin"),
2316 },
2317 provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
2318 valid_time: ValidTime { start: None, end: None, valid_time_confidence: 0.0 },
2319 transaction_time: now.clone(),
2320 confidence: Confidence { value_confidence: 0.9, valid_time_confidence: 0.0 },
2321 currency_signal: CurrencySignal {
2322 last_refreshed_at: now.clone(),
2323 state: CurrencyState::Fresh,
2324 corroboration_count: 0,
2325 },
2326 criticality: Criticality::Low,
2327 },
2328 challenger: make_claim(agent),
2329 criticality: Criticality::Low,
2330 reason: OverturnReason::ExternalContradiction,
2331 }
2332 }
2333
2334 fn make_pending_row(agent: &AgentId) -> PendingAdjudicationRow {
2335 PendingAdjudicationRow {
2336 handle_id: Uuid::new_v4(),
2337 agent_id: agent.clone(),
2338 subject: "user".into(),
2339 predicate: "city".into(),
2340 challenger_claim_ref: ClaimRef(Uuid::new_v4()),
2341 incumbent_claim_ref: ClaimRef(Uuid::new_v4()),
2342 request_payload: make_adj_request(agent),
2343 queued_at: Utc::now(),
2344 expires_at: None,
2345 status: "pending".to_string(),
2346 }
2347 }
2348
2349 #[test]
2351 fn w3_sqlite_pending_insert_and_get_round_trip() {
2352 let store = make_store();
2353 let pending = store.pending_store();
2354 let agent = make_agent();
2355 let row = make_pending_row(&agent);
2356 let handle_id = row.handle_id;
2357
2358 pending.insert_pending(&row).expect("insert_pending must succeed");
2359
2360 let fetched = pending.get_pending(handle_id).expect("get_pending must succeed");
2361 let fetched = fetched.expect("row must be present");
2362 assert_eq!(fetched.handle_id, handle_id);
2363 assert_eq!(fetched.agent_id, agent);
2364 assert_eq!(fetched.subject, "user");
2365 assert_eq!(fetched.predicate, "city");
2366 assert_eq!(fetched.challenger_claim_ref, row.challenger_claim_ref);
2367 assert_eq!(fetched.incumbent_claim_ref, row.incumbent_claim_ref);
2368 assert_eq!(fetched.status, "pending");
2369 assert!(fetched.expires_at.is_none());
2370 }
2371
2372 #[test]
2374 fn w3_sqlite_pending_get_nonexistent_returns_none() {
2375 let store = make_store();
2376 let pending = store.pending_store();
2377 let result = pending.get_pending(Uuid::new_v4()).expect("get_pending must not error");
2378 assert!(result.is_none(), "unknown handle_id must return None");
2379 }
2380
2381 #[test]
2383 fn w3_sqlite_pending_list_pending_by_agent() {
2384 let store = make_store();
2385 let pending = store.pending_store();
2386 let agent = make_agent();
2387 let agent2 = AgentId("other-agent".into());
2388
2389 let row1 = make_pending_row(&agent);
2390 let row2 = make_pending_row(&agent);
2391 let row3 = make_pending_row(&agent2);
2392
2393 pending.insert_pending(&row1).unwrap();
2394 pending.insert_pending(&row2).unwrap();
2395 pending.insert_pending(&row3).unwrap();
2396
2397 let agent_rows = pending.list_pending(Some(&agent)).unwrap();
2398 assert_eq!(agent_rows.len(), 2, "must return exactly 2 rows for agent");
2399
2400 let all_rows = pending.list_pending(None).unwrap();
2401 assert_eq!(all_rows.len(), 3, "list_pending(None) must return all 3 rows");
2402 }
2403
2404 #[test]
2406 fn w3_sqlite_pending_mark_resolved() {
2407 let store = make_store();
2408 let pending = store.pending_store();
2409 let agent = make_agent();
2410 let row = make_pending_row(&agent);
2411 let handle_id = row.handle_id;
2412
2413 pending.insert_pending(&row).unwrap();
2414 pending.mark_resolved(handle_id).unwrap();
2415
2416 let fetched = pending.get_pending(handle_id).unwrap().unwrap();
2418 assert_eq!(fetched.status, "resolved", "status must be 'resolved' after mark_resolved");
2419
2420 let pending_rows = pending.list_pending(Some(&agent)).unwrap();
2422 assert!(pending_rows.is_empty(), "resolved row must not appear in list_pending");
2423 }
2424
2425 #[test]
2431 fn w3_sqlite_pending_durability_shared_arc() {
2432 let conn = open_in_memory().expect("in-memory connection must open");
2433 let persistence = SqlitePersistenceStore::new(conn);
2434 let pending = persistence.pending_store();
2435 let agent = make_agent();
2436 let row = make_pending_row(&agent);
2437 let handle_id = row.handle_id;
2438
2439 pending.insert_pending(&row).unwrap();
2440 drop(pending); let pending2 = persistence.pending_store();
2444 let fetched = pending2.get_pending(handle_id).unwrap();
2445 assert!(fetched.is_some(), "pending row must survive store handle drop (durability via shared Arc)");
2446 assert_eq!(fetched.unwrap().handle_id, handle_id);
2447 }
2448}