pub struct PostgresStoreBackend { /* private fields */ }Expand description
A Postgres-backed axonstore. Holds one lazy, bounded PgPool.
Cheap to Clone (the pool is internally reference-counted).
Implementations§
Source§impl PostgresStoreBackend
impl PostgresStoreBackend
Sourcepub fn connect(connection: &str) -> Result<Self, StoreError>
pub fn connect(connection: &str) -> Result<Self, StoreError>
Resolve connection and build a lazy, bounded connection pool.
Equivalent to connect_named with no
store name — the connection’s application_name is the bare
axon-store. Prefer connect_named so each session is
attributable to its declaring axonstore.
Sourcepub fn connect_named(
connection: &str,
store_name: &str,
) -> Result<Self, StoreError>
pub fn connect_named( connection: &str, store_name: &str, ) -> Result<Self, StoreError>
Resolve connection and build a lazy, bounded connection pool,
stamping each connection’s application_name with store_name.
Synchronous and cheap: the DSN is parsed into a
PgConnectOptions (a malformed DSN is a typed
StoreError::PoolInit) but connect_lazy_with opens no
connection — the first real connection is made on the first
operation (D7 — lazy).
Two production-grade properties are set on every connection:
statement_cache_capacity(0)(Gap 3) — disables sqlx’s named server-side prepared-statement cache so the backend is safe behind a transaction-mode pooler (PgBouncerpool_mode=transaction, Supabase Supavisor:6543, Neon, RDS Proxy), where a cached name minted on one physical session collides on the next (prepared statement "sqlx_s_1" already exists). Applied unconditionally — harmless on a direct connection, and there is no knob to misconfigure.application_name—axon-store/<store_name>(bareaxon-storewhenstore_nameis empty), capped at the Postgres 63-byteNAMEDATALEN-1limit on a char boundary, so every axon-owned session is identifiable inpg_stat_activity, pooler logs and DBA dashboards.
Must be called within a Tokio runtime context: a well-formed DSN registers a background connection reaper. In production this is always satisfied — the registry (35.d) is built while the axum server’s runtime is live.
Sourcepub fn connect_named_with_namespace(
connection: &str,
store_name: &str,
namespace: Option<&str>,
) -> Result<Self, StoreError>
pub fn connect_named_with_namespace( connection: &str, store_name: &str, namespace: Option<&str>, ) -> Result<Self, StoreError>
§Fase 38.f (D3) — same as Self::connect_named but stamps an
OPTIONAL per-tenant schema namespace into application_name.
connect_named_with_namespace("env:DB", "claims", Some("tenant_42"))
produces a pool whose every session’s application_name reads
axon-store/claims/tenant_42 — so a DBA reading
pg_stat_activity, pooler logs, or RDS Performance Insights
sees both the axonstore declaration AND the resolved tenant.
None for namespace is the pre-38 shape (axon-store/<store>,
byte-identical to connect_named).
Sourcepub fn masked_dsn(&self) -> String
pub fn masked_dsn(&self) -> String
The resolved DSN with its password masked — safe to log.
Sourcepub async fn acquire_pin(&self) -> Result<PoolConnection<Postgres>, StoreError>
pub async fn acquire_pin(&self) -> Result<PoolConnection<Postgres>, StoreError>
§Fase 37.x.j (D1) — Acquire ONE physical Postgres connection
from the pool to be held for the duration of a flow execution
([crate::runner::ExecContext] for the sync path,
crate::flow_dispatcher::DispatchCtx for the async streaming
path).
The returned sqlx::pool::PoolConnection is wrapped by the
caller in crate::store::store_conn::StoreConn::Pinned and
passed to every operation (query / insert / mutate /
purge / ping) against this axonstore for the flow lifetime.
Because every op runs against the same physical Postgres backend
connection, a transaction-mode pooler (Supabase Supavisor,
PgBouncer, Neon, RDS Proxy) cannot swap the backend between
queries — the D3 “unnamed prepared statement does not exist”
race that Fase 37.x.j closes.
The connection is released back to the pool on Drop of the
returned PoolConnection. The existing
after_release(DEALLOCATE ALL) hook (Fase 38.x.a D2) wipes any
prepared statements before the conn is reused — composing
cleanly with the per-flow pinning of 37.x.j.
Failure modes:
StoreError::Connectif the pool’sacquire_timeoutelapses (no conn becomes available — pool exhausted or Postgres unreachable).StoreError::Connectif the pool is in a permanently-bad state (TLS handshake failure, DNS resolution failure, etc.).
Sourcepub async fn query(
&self,
conn: &mut StoreConn<'_>,
table: &str,
where_expr: &str,
bindings: &HashMap<String, String>,
) -> Result<Vec<StoreRow>, StoreError>
pub async fn query( &self, conn: &mut StoreConn<'_>, table: &str, where_expr: &str, bindings: &HashMap<String, String>, ) -> Result<Vec<StoreRow>, StoreError>
retrieve — run SELECT * FROM "schema"."table" WHERE … and map
every row to a JSON-safe StoreRow.
§Fase 37.x.d (D3) — on a cache MISS the schema introspection and
the SELECT execute inside ONE transaction, so a
transaction-mode pooler pins one physical backend for both —
they cannot split across sessions. A cache HIT needs no
transaction: the cached resolution is already correct and the
SELECT is schema-qualified, so it resolves on any session.
v1.30.0 materializes the full result (fetch_all); 35.i adds the
backpressured Stream<Row> variant (Pillar III).
Sourcepub async fn insert(
&self,
conn: &mut StoreConn<'_>,
table: &str,
data: &[(String, SqlValue)],
) -> Result<u64, StoreError>
pub async fn insert( &self, conn: &mut StoreConn<'_>, table: &str, data: &[(String, SqlValue)], ) -> Result<u64, StoreError>
persist — run INSERT INTO "schema"."table" (…) VALUES (…).
Returns the number of rows inserted. §Fase 37.x.d (D3) — on a
cache MISS the resolution + the INSERT execute in ONE
transaction; a cache HIT needs no transaction.
Sourcepub async fn mutate(
&self,
conn: &mut StoreConn<'_>,
table: &str,
where_expr: &str,
data: &[(String, SqlValue)],
bindings: &HashMap<String, String>,
) -> Result<u64, StoreError>
pub async fn mutate( &self, conn: &mut StoreConn<'_>, table: &str, where_expr: &str, data: &[(String, SqlValue)], bindings: &HashMap<String, String>, ) -> Result<u64, StoreError>
mutate — run UPDATE "schema"."table" SET … WHERE …. Returns
the number of rows affected. §Fase 37.x.d (D3) — on a cache MISS
the resolution + the UPDATE execute in ONE transaction; a
cache HIT needs no transaction.
Sourcepub async fn purge(
&self,
conn: &mut StoreConn<'_>,
table: &str,
where_expr: &str,
bindings: &HashMap<String, String>,
) -> Result<u64, StoreError>
pub async fn purge( &self, conn: &mut StoreConn<'_>, table: &str, where_expr: &str, bindings: &HashMap<String, String>, ) -> Result<u64, StoreError>
purge — run DELETE FROM "schema"."table" WHERE …. Returns the
number of rows deleted. §Fase 37.x.d (D3) — on a cache MISS the
resolution + the DELETE execute in ONE transaction; a cache
HIT needs no transaction.
Sourcepub async fn ping(&self) -> Result<(), StoreError>
pub async fn ping(&self) -> Result<(), StoreError>
Verify database reachability with SELECT 1.
Trait Implementations§
Source§impl Clone for PostgresStoreBackend
impl Clone for PostgresStoreBackend
Source§fn clone(&self) -> PostgresStoreBackend
fn clone(&self) -> PostgresStoreBackend
1.0.0 (const: unstable) · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreAuto Trait Implementations§
impl !RefUnwindSafe for PostgresStoreBackend
impl !UnwindSafe for PostgresStoreBackend
impl Freeze for PostgresStoreBackend
impl Send for PostgresStoreBackend
impl Sync for PostgresStoreBackend
impl Unpin for PostgresStoreBackend
impl UnsafeUnpin for PostgresStoreBackend
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more