pub struct PostgresPersistenceStore { /* private fields */ }Expand description
The PostgreSQL-backed PersistencePort implementation.
Construct via PostgresPersistenceStore::new.
Clone-friendly: the inner r2d2::Pool is Arc-wrapped by r2d2.
Implementations§
Source§impl PostgresPersistenceStore
impl PostgresPersistenceStore
Sourcepub fn new(connection_string: &str) -> Result<Self, PostgresStoreError>
pub fn new(connection_string: &str) -> Result<Self, PostgresStoreError>
Bootstrap entry point: run refinery migrations on a dedicated connection, then build the r2d2 connection pool.
§Errors
Returns PostgresStoreError if the connection string is invalid, the DB
is unreachable, migrations fail, or the pool cannot be built.
Source§impl PostgresPersistenceStore
impl PostgresPersistenceStore
Sourcepub fn pending_store(&self) -> PostgresPendingStore
pub fn pending_store(&self) -> PostgresPendingStore
Return a PostgresPendingStore that shares the same r2d2 connection pool.
Both PostgresPersistenceStore and PostgresPendingStore acquire connections
from the same pool. The per-agent write lock held by EngineHandle ensures that
the pending insert is serialized with the claim transaction commit.
Trait Implementations§
Source§impl PersistencePort for PostgresPersistenceStore
impl PersistencePort for PostgresPersistenceStore
Source§fn begin_atomic(
&self,
agent_id: &AgentId,
) -> Result<PostgresTxn, PostgresStoreError>
fn begin_atomic( &self, agent_id: &AgentId, ) -> Result<PostgresTxn, PostgresStoreError>
Open an explicit BEGIN transaction scoped to agent_id.
Acquires a connection from the r2d2 pool, issues BEGIN, then acquires the
per-agent_id advisory lock: SELECT pg_advisory_xact_lock(hashtext($1)::bigint).
Source§fn commit(&self, txn: PostgresTxn) -> Result<(), PostgresStoreError>
fn commit(&self, txn: PostgresTxn) -> Result<(), PostgresStoreError>
Commit the transaction. The pooled connection returns to the r2d2 pool.
Source§fn rollback(&self, txn: PostgresTxn) -> Result<(), PostgresStoreError>
fn rollback(&self, txn: PostgresTxn) -> Result<(), PostgresStoreError>
Rollback the transaction. The pooled connection returns to the r2d2 pool.
Source§fn append_claim(
&self,
txn: &mut PostgresTxn,
claim: &Claim,
) -> Result<ClaimRef, PostgresStoreError>
fn append_claim( &self, txn: &mut PostgresTxn, claim: &Claim, ) -> Result<ClaimRef, PostgresStoreError>
Append a claim row within the open transaction.
value and metadata are cast to JSONB via $n::jsonb SQL cast (§2 JSONB note).
Source§fn append_validity_assertion(
&self,
txn: &mut PostgresTxn,
assertion: &ValidityAssertion,
) -> Result<(), PostgresStoreError>
fn append_validity_assertion( &self, txn: &mut PostgresTxn, assertion: &ValidityAssertion, ) -> Result<(), PostgresStoreError>
Append a validity assertion row within the open transaction.
Source§fn append_ledger_entry(
&self,
txn: &mut PostgresTxn,
entry: &LedgerEntry,
) -> Result<(), PostgresStoreError>
fn append_ledger_entry( &self, txn: &mut PostgresTxn, entry: &LedgerEntry, ) -> Result<(), PostgresStoreError>
Append a ledger entry row within the open transaction.
stream_seq is assigned via:
SELECT COALESCE(MAX(stream_seq), 0) + 1 FROM ledger_entries WHERE agent_id = $1
within the same transaction, under the per-agent advisory lock.
INVARIANT: this MAX+1 assignment is safe ONLY under pg_advisory_xact_lock.
If the advisory lock is ever removed, replace with a Postgres SEQUENCE object.
Source§fn append_claim_edge(
&self,
txn: &mut PostgresTxn,
edge: &ClaimEdge,
) -> Result<(), PostgresStoreError>
fn append_claim_edge( &self, txn: &mut PostgresTxn, edge: &ClaimEdge, ) -> Result<(), PostgresStoreError>
Append a claim edge row within the open transaction.
Source§fn load_subject_line(
&self,
agent_id: &AgentId,
subject: &str,
predicate: &str,
) -> Result<Vec<Claim>, PostgresStoreError>
fn load_subject_line( &self, agent_id: &AgentId, subject: &str, predicate: &str, ) -> Result<Vec<Claim>, PostgresStoreError>
Load all claims on (agent_id, subject, predicate), ordered by tx_time ASC.
Source§fn load_claim(
&self,
agent_id: &AgentId,
claim_ref: &ClaimRef,
) -> Result<Option<Claim>, PostgresStoreError>
fn load_claim( &self, agent_id: &AgentId, claim_ref: &ClaimRef, ) -> Result<Option<Claim>, PostgresStoreError>
Load a single claim by ClaimRef. Returns None if not found.
Source§fn load_validity_assertions_for(
&self,
agent_id: &AgentId,
claim_ref: &ClaimRef,
) -> Result<Vec<ValidityAssertion>, PostgresStoreError>
fn load_validity_assertions_for( &self, agent_id: &AgentId, claim_ref: &ClaimRef, ) -> Result<Vec<ValidityAssertion>, PostgresStoreError>
Load all validity assertions targeting a claim, ordered by asserted_at ASC.
Source§fn load_ledger(
&self,
agent_id: &AgentId,
from: Option<&TransactionTime>,
limit: usize,
) -> Result<Vec<LedgerEntry>, PostgresStoreError>
fn load_ledger( &self, agent_id: &AgentId, from: Option<&TransactionTime>, limit: usize, ) -> Result<Vec<LedgerEntry>, PostgresStoreError>
Load ledger entries for an agent, optionally starting from from (inclusive),
limited to limit rows, ordered by recorded_at ASC.
Source§fn load_ledger_for_claims(
&self,
agent_id: &AgentId,
claim_refs: &[ClaimRef],
) -> Result<Vec<LedgerEntry>, PostgresStoreError>
fn load_ledger_for_claims( &self, agent_id: &AgentId, claim_refs: &[ClaimRef], ) -> Result<Vec<LedgerEntry>, PostgresStoreError>
Load ALL ledger entries for the given claim refs, no row cap.
Uses claim_id = ANY($2::text[]) to avoid per-parameter binding limits.
Source§fn load_edges_for(
&self,
agent_id: &AgentId,
claim_ref: &ClaimRef,
) -> Result<Vec<ClaimEdge>, PostgresStoreError>
fn load_edges_for( &self, agent_id: &AgentId, claim_ref: &ClaimRef, ) -> Result<Vec<ClaimEdge>, PostgresStoreError>
Load all edges where claim_ref is either the from or to end, ordered by created_at ASC.
Source§fn load_injected_claims(
&self,
agent_id: &AgentId,
) -> Result<Vec<ClaimRef>, PostgresStoreError>
fn load_injected_claims( &self, agent_id: &AgentId, ) -> Result<Vec<ClaimRef>, PostgresStoreError>
Load the set of ClaimRefs served as injected claims for this agent (used by the Amplification Guard).
Source§fn load_lineage(
&self,
agent_id: &AgentId,
claim_ref: &ClaimRef,
) -> Result<Vec<ClaimEdge>, PostgresStoreError>
fn load_lineage( &self, agent_id: &AgentId, claim_ref: &ClaimRef, ) -> Result<Vec<ClaimEdge>, PostgresStoreError>
Recursive CTE lineage traversal — identical SQL to the SQLite adapter.
Traverses DerivedFrom edges upward from claim_ref, returning all ClaimEdge
rows in the lineage sub-graph ordered by depth ASC, then created_at ASC within depth.
Bounded at depth 64 to prevent runaway on pathological graphs.
Source§fn requires_global_write_serialization(&self) -> bool
fn requires_global_write_serialization(&self) -> bool
Postgres uses a pool + per-agent advisory lock — no global write lock is needed.