axon/store/postgres_backend.rs
1//! §Fase 35.c (v1.30.0) — `PostgresStoreBackend`, the SQL substrate of
2//! the `axonstore` cognitive data plane.
3//!
4//! This module makes `axonstore { backend: postgresql }` real: the four
5//! store operations — `query` (retrieve), `insert` (persist), `mutate`,
6//! `purge` — execute parameterized SQL against a `sqlx::PgPool` instead
7//! of the key-value path. It is the substrate the four pillars (35.g-j)
8//! enrich.
9//!
10//! # D6 — connection resolution
11//!
12//! [`resolve_dsn`] honors `connection: "env:VAR"` (resolve the named
13//! environment variable) and a literal DSN. A missing env var is a
14//! named [`StoreError::MissingEnvVar`] — never a panic, never a silent
15//! fallback to the key-value store.
16//!
17//! # D7 — pooling + honest typed failure surface
18//!
19//! [`PostgresStoreBackend::connect`] builds ONE lazy, bounded
20//! `sqlx::PgPool` (`connect_lazy_with` — no connection is opened until
21//! the first operation). Every failure path — empty connection, missing
22//! env var, malformed DSN, connect failure, SQL error, an unsupported
23//! column type, a decode failure — surfaces as a typed [`StoreError`].
24//! No panic; no silent empty result masking a failed query.
25//!
26//! # Gap 3 (v1.36.3) — transaction-mode pooler safety
27//!
28//! The pool's `PgConnectOptions` set `statement_cache_capacity(0)`
29//! unconditionally. sqlx otherwise caches server-side prepared
30//! statements under generated names (`sqlx_s_1`, …); behind a
31//! transaction-mode pooler — PgBouncer `pool_mode=transaction`,
32//! Supabase Supavisor (`:6543`), Neon, RDS Proxy — successive
33//! operations land on different physical sessions, so a name minted on
34//! one collides on the next (`prepared statement "sqlx_s_1" already
35//! exists`). Capacity 0 routes every query through the *unnamed*
36//! prepared statement — collision-free by construction, harmless on a
37//! direct/session-mode connection. An axonstore DSN is pooler-agnostic
38//! with no knob to misconfigure. Each connection also carries an
39//! `application_name` of `axon-store/<store>` so every session is
40//! attributable to its declaration in `pg_stat_activity`, pooler logs
41//! and DBA dashboards.
42//!
43//! # §Fase 37.x.b — search-path-independent table resolution
44//!
45//! [`introspect_conn`] resolves a store table to
46//! its schema + column types against `pg_catalog` — NOT via the
47//! ambient `search_path`, which a transaction-mode pooler does not
48//! preserve across checkouts. `to_regclass` is the search-path-correct
49//! primary; a `pg_catalog` scan keyed on `relname` is the
50//! search-path-independent fallback. An unresolved or cross-schema-
51//! ambiguous table is a typed [`StoreError`].
52//!
53//! # D4 — injection-proof, identifiers included
54//!
55//! Values flow through 35.b's [`build_pg_where`] as `$N` bind
56//! placeholders. The *identifier* surface — table names and
57//! `insert`/`mutate` column names, which ARE interpolated into SQL
58//! text — is validated against [`filter::is_safe_identifier`]
59//! (`[A-Za-z_]\w*`, ≤ 63 bytes) before being double-quoted. No
60//! untrusted identifier reaches SQL.
61//!
62//! # Architecture — pure builders + thin async execution
63//!
64//! SQL construction ([`build_select_sql`], [`build_insert_sql`],
65//! [`build_update_sql`], [`build_delete_sql`]) is **pure and total** —
66//! no I/O — and therefore exhaustively unit-tested here without a
67//! database. The async methods are thin: build → bind → execute. The
68//! row-decode path and live execution are proven against a real
69//! Postgres in 35.l (the integration harness).
70//!
71//! # Honest scope (D12)
72//!
73//! No DDL: `IRAxonStore` carries no column schema, so v1.30.0 operates
74//! against existing tables (no `CREATE TABLE` / `migrate` / index). Each
75//! operation is a single-statement autocommit; the multi-statement
76//! `transact { … }` block is a documented future fase. The supported
77//! column-type catalog is [`classify_pg_type`]; a column outside it is
78//! a clear [`StoreError::UnsupportedColumnType`], not a silent miss.
79
80use std::fmt;
81use std::str::FromStr;
82use std::time::Duration;
83
84use serde_json::Value as JsonValue;
85use sqlx::postgres::{PgArguments, PgConnectOptions, PgPoolOptions, PgRow};
86use sqlx::query::Query;
87use sqlx::{Column, PgConnection, PgPool, Postgres, Row, TypeInfo};
88
89use crate::store::epistemic::EpistemicError;
90use crate::store::filter::{self, build_pg_where, FilterError, SqlValue};
91
92/// Upper bound on pooled connections per backend (D7 — bounded).
93const MAX_POOL_CONNECTIONS: u32 = 10;
94/// How long to wait to acquire a pooled connection before failing.
95const ACQUIRE_TIMEOUT_SECS: u64 = 5;
96/// How long an idle pooled connection is kept before being reaped.
97const IDLE_TIMEOUT_SECS: u64 = 300;
98
99// ════════════════════════════════════════════════════════════════════
100// Error catalog (typed, total — D7)
101// ════════════════════════════════════════════════════════════════════
102
103/// Every way an `axonstore` SQL operation can fail. The backend is
104/// total: it returns one of these or a result — never a panic, never a
105/// silent empty result masking a failure.
106#[derive(Debug, Clone, PartialEq)]
107pub enum StoreError {
108 /// `connection` was empty or whitespace-only.
109 EmptyConnection,
110 /// `connection` was the bare prefix `env:` with no variable name.
111 EmptyEnvVarName,
112 /// `connection: "env:VAR"` and `VAR` is unset (or not UTF-8).
113 MissingEnvVar { var: String },
114 /// The resolved DSN is malformed — `connect_lazy` rejected it.
115 PoolInit { dsn_masked: String, source: String },
116 /// A table or column identifier failed the `[A-Za-z_]\w*` / 63-byte
117 /// safety check (D4 — no untrusted identifier reaches SQL).
118 InvalidIdentifier { kind: &'static str, name: String },
119 /// `insert` / `mutate` was called with no column data.
120 EmptyData { op: &'static str },
121 /// The `where` expression did not compile (delegates to 35.b).
122 Filter(FilterError),
123 /// A `confidence_floor` violation — a sub-floor or un-elevated
124 /// `persist` (delegates to 35.g's Pillar I epistemic data plane).
125 Epistemic(EpistemicError),
126 /// A live connection could not be acquired / the ping failed.
127 Connect { source: String },
128 /// A SQL statement failed at execution time.
129 Query { op: &'static str, source: String },
130 /// A retrieved column has a type outside the supported catalog
131 /// ([`classify_pg_type`]). Honest scope, not a silent miss.
132 UnsupportedColumnType { column: String, pg_type: String },
133 /// A retrieved column of a supported type failed to decode.
134 Decode { column: String, pg_type: String, source: String },
135 /// §Fase 37.x.b (D1) — the table named by a store operation could
136 /// not be resolved to a relation in ANY schema of the database.
137 TableNotResolved { table: String },
138 /// §Fase 37.x.b (D1) — the table name resolves to a relation in
139 /// more than one schema and the connection's `search_path` does not
140 /// disambiguate it. Carries the schemas found, sorted.
141 AmbiguousTable { table: String, schemas: Vec<String> },
142 /// §Fase 37.x.f (D9) — a store SQL statement failed with a
143 /// schema-drift SQLSTATE: the cached schema no longer matches the
144 /// live table (an `ALTER TABLE` ran since the cache was populated).
145 /// `42P01` undefined_table, `42703` undefined_column, `42804`
146 /// datatype_mismatch (a stale write cast), `42883` undefined
147 /// operator (a stale read cast). Triggers the D9 self-heal — the
148 /// `(dsn, table)` cache entry is evicted and the operation retried
149 /// once against fresh introspection. Safe: every one is a
150 /// parse/plan-time rejection, so the failed statement had ZERO side
151 /// effects (a retried `persist`/`mutate` cannot double-write).
152 SchemaDrift { op: &'static str, sqlstate: String, source: String },
153 /// §Fase 38.f (D3) — `axon-T806`. A `postgresql` store declared
154 /// `schema: env:VAR` and the named env var is unset at deploy
155 /// time. Never falls back silently — the deploy fails, the
156 /// operator either exports the var or fixes the declaration.
157 MissingPerTenantSchemaEnv { store: String, var: String },
158 /// §Fase 38.f (D8 strengthening) — `axon-T807`. A declared column
159 /// schema and the live introspected columns disagree at deploy
160 /// time. Carries a human-readable drift summary (which columns
161 /// are missing on the live DB, which have a type mismatch). The
162 /// remedy is named in the message: run `axon store introspect
163 /// <store>` to refresh the manifest, run the missing migration,
164 /// or fix the declaration.
165 DeclaredVsLiveDrift { store: String, drift: String },
166}
167
168impl fmt::Display for StoreError {
169 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
170 match self {
171 StoreError::EmptyConnection => write!(
172 f,
173 "axonstore `connection` is empty — expected a DSN or an \
174 `env:VARNAME` reference"
175 ),
176 StoreError::EmptyEnvVarName => write!(
177 f,
178 "axonstore `connection` is the bare prefix `env:` with no \
179 variable name"
180 ),
181 StoreError::MissingEnvVar { var } => write!(
182 f,
183 "axonstore `connection: \"env:{var}\"` — environment \
184 variable `{var}` is not set (or not valid UTF-8)"
185 ),
186 StoreError::PoolInit { dsn_masked, source } => write!(
187 f,
188 "axonstore connection pool could not be initialised for \
189 `{dsn_masked}`: {source}"
190 ),
191 StoreError::InvalidIdentifier { kind, name } => write!(
192 f,
193 "unsafe {kind} identifier `{name}` — must match \
194 [A-Za-z_][A-Za-z0-9_]* and be ≤ 63 bytes"
195 ),
196 StoreError::EmptyData { op } => write!(
197 f,
198 "axonstore `{op}` was given no column data"
199 ),
200 StoreError::Filter(e) => write!(f, "where-expression: {e}"),
201 StoreError::Epistemic(e) => write!(f, "{e}"),
202 StoreError::Connect { source } => {
203 write!(f, "axonstore could not reach the database: {source}")
204 }
205 StoreError::Query { op, source } => {
206 write!(f, "axonstore `{op}` SQL failed: {source}")
207 }
208 StoreError::UnsupportedColumnType { column, pg_type } => write!(
209 f,
210 "column `{column}` has Postgres type `{pg_type}`, outside \
211 the v1.30.0 supported catalog"
212 ),
213 StoreError::Decode { column, pg_type, source } => write!(
214 f,
215 "column `{column}` (`{pg_type}`) failed to decode: {source}"
216 ),
217 StoreError::TableNotResolved { table } => write!(
218 f,
219 "axonstore could not resolve table `{table}` to a \
220 relation in any schema of the database — verify the \
221 table exists in the target database (a deploy-time \
222 migration is the usual remedy) and that the configured \
223 credentials can SELECT from it; the introspection scans \
224 `pg_catalog` independent of `search_path`, so the table \
225 is genuinely absent on every schema this role can see"
226 ),
227 StoreError::AmbiguousTable { table, schemas } => write!(
228 f,
229 "axonstore table `{table}` is ambiguous — it exists in \
230 {} schemas ({}) and the connection's `search_path` does \
231 not disambiguate it; either narrow the role's \
232 `search_path` so exactly one of the resolving schemas \
233 is visible, or declare the target schema explicitly on \
234 the `axonstore` (the Fase 38 `schema:` declaration, \
235 incl. `schema: env:VAR` per-tenant)",
236 schemas.len(),
237 schemas.join(", "),
238 ),
239 StoreError::SchemaDrift { op, sqlstate, source } => write!(
240 f,
241 "axonstore `{op}` hit live schema drift (SQLSTATE \
242 {sqlstate}) — the cached schema is stale: {source}"
243 ),
244 StoreError::MissingPerTenantSchemaEnv { store, var } => write!(
245 f,
246 "axon-T806 axonstore `{store}` declares `schema: env:{var}` \
247 but environment variable `{var}` is not set at deploy \
248 time. The per-tenant schema namespace is required to \
249 resolve the store's column manifest entry. Either \
250 export `{var}` with the SQL schema name (e.g. \
251 `tenant_42`), or declare the schema differently \
252 (inline `schema {{ … }}` block, or manifest reference \
253 `schema: \"qualified.name\"`). Never a silent fallback."
254 ),
255 StoreError::DeclaredVsLiveDrift { store, drift } => write!(
256 f,
257 "axon-T807 axonstore `{store}` declared column schema \
258 disagrees with the live database: {drift}. The deploy \
259 fails fail-closed (D8 strengthening). Remedy: run `axon \
260 store introspect {store}` to refresh the manifest, run \
261 the missing migration on the database, or fix the \
262 declared `schema:` block to match the live shape."
263 ),
264 }
265 }
266}
267
268impl std::error::Error for StoreError {
269 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
270 match self {
271 StoreError::Filter(e) => Some(e),
272 StoreError::Epistemic(e) => Some(e),
273 _ => None,
274 }
275 }
276}
277
278impl StoreError {
279 /// §Fase 37.x.f (D9) — `true` iff this is a schema-drift failure
280 /// ([`StoreError::SchemaDrift`]) — the signal that triggers the
281 /// `(dsn, table)` cache self-heal (evict + retry once).
282 pub fn is_schema_drift(&self) -> bool {
283 matches!(self, StoreError::SchemaDrift { .. })
284 }
285}
286
287impl From<FilterError> for StoreError {
288 fn from(e: FilterError) -> Self {
289 StoreError::Filter(e)
290 }
291}
292
293impl From<EpistemicError> for StoreError {
294 fn from(e: EpistemicError) -> Self {
295 StoreError::Epistemic(e)
296 }
297}
298
299// ════════════════════════════════════════════════════════════════════
300// D6 — connection resolution
301// ════════════════════════════════════════════════════════════════════
302
303/// Resolve an `axonstore` `connection` string into a concrete DSN.
304///
305/// - `"env:VAR"` → the value of environment variable `VAR`.
306/// - any other non-empty value → a literal DSN, returned verbatim.
307///
308/// Leading/trailing whitespace is trimmed. An empty connection, a bare
309/// `env:`, or a missing environment variable is a typed [`StoreError`]
310/// — never a panic, never a silent fallback.
311pub fn resolve_dsn(connection: &str) -> Result<String, StoreError> {
312 let conn = connection.trim();
313 if conn.is_empty() {
314 return Err(StoreError::EmptyConnection);
315 }
316 match conn.strip_prefix("env:") {
317 Some(var) => {
318 let var = var.trim();
319 if var.is_empty() {
320 return Err(StoreError::EmptyEnvVarName);
321 }
322 std::env::var(var).map_err(|_| StoreError::MissingEnvVar {
323 var: var.to_string(),
324 })
325 }
326 None => Ok(conn.to_string()),
327 }
328}
329
330/// Mask the password segment of a DSN for safe logging / `Debug`.
331fn mask_dsn(dsn: &str) -> String {
332 if let Some(at) = dsn.find('@') {
333 if let Some(colon) = dsn[..at].rfind(':') {
334 return format!("{}***{}", &dsn[..=colon], &dsn[at..]);
335 }
336 }
337 dsn.to_string()
338}
339
340/// §Fase 38.h — public alias of [`mask_dsn`] so the introspection
341/// CLI (`store::introspect_cli`) can render error messages with
342/// masked credentials without re-implementing the routine.
343pub fn mask_dsn_pub(dsn: &str) -> String {
344 mask_dsn(dsn)
345}
346
347/// The `application_name` stamped on an axonstore's Postgres
348/// connections (Gap 3 bonus, v1.36.3).
349///
350/// `axon-store/<store_name>` makes every session attributable to its
351/// `axonstore` declaration in `pg_stat_activity`, pooler logs and DBA
352/// dashboards; a bare `axon-store` when no store name is available.
353///
354/// Total and bounded: Postgres caps `application_name` at
355/// `NAMEDATALEN - 1` (63 bytes) and *silently truncates* a longer
356/// value. This caps it ourselves on a UTF-8 char boundary so the
357/// stamped name is exactly what a DBA sees — never a server-mangled
358/// suffix.
359fn application_name_for(store_name: &str) -> String {
360 application_name_for_with_namespace(store_name, None)
361}
362
363/// §Fase 38.f (D3) — `application_name` stamping that optionally
364/// carries a resolved per-tenant schema namespace (Gap-3 inheritance):
365///
366/// * `application_name_for_with_namespace("claims", None)` →
367/// `"axon-store/claims"` (the existing v1.36.3 shape — preserved
368/// byte-for-byte for non-namespace stores).
369/// * `application_name_for_with_namespace("claims", Some("tenant_42"))`
370/// → `"axon-store/claims/tenant_42"`.
371///
372/// A DBA reading `pg_stat_activity` or pooler logs sees both the
373/// `axonstore` declaration AND the tenant namespace at a glance —
374/// triaging a multi-tenant slow query stops requiring a join through
375/// adopter telemetry.
376///
377/// Total + bounded: caps the result at `NAMEDATALEN - 1` (63 bytes)
378/// on a UTF-8 char boundary, as v1.36.3 already does, so the stamped
379/// name is exactly what Postgres records.
380pub(crate) fn application_name_for_with_namespace(
381 store_name: &str,
382 namespace: Option<&str>,
383) -> String {
384 const MAX: usize = 63;
385 let base = if store_name.is_empty() {
386 "axon-store".to_string()
387 } else {
388 format!("axon-store/{store_name}")
389 };
390 let full = match namespace {
391 Some(ns) if !ns.is_empty() => format!("{base}/{ns}"),
392 _ => base,
393 };
394 if full.len() <= MAX {
395 return full;
396 }
397 let mut cut = MAX;
398 while cut > 0 && !full.is_char_boundary(cut) {
399 cut -= 1;
400 }
401 full[..cut].to_string()
402}
403
404/// Validate a table / column identifier, mapping a failure to a typed
405/// [`StoreError::InvalidIdentifier`] (D4).
406fn check_identifier(name: &str, kind: &'static str) -> Result<(), StoreError> {
407 if filter::is_safe_identifier(name) {
408 Ok(())
409 } else {
410 Err(StoreError::InvalidIdentifier {
411 kind,
412 name: name.to_string(),
413 })
414 }
415}
416
417/// §Fase 37.x.c (D2) — render the SCHEMA-QUALIFIED relation reference
418/// for an operation's SQL: `"schema"."table"` when the schema resolved
419/// to a safe identifier, the bare `"table"` otherwise.
420///
421/// A schema-qualified reference resolves on ANY session regardless of
422/// the ambient `search_path` — the D2 guarantee. The schema name is
423/// discovered from `pg_catalog` (37.x.b's `resolve_table`); it is
424/// validated with [`filter::is_safe_identifier`] before being
425/// double-quoted (D4 — no untrusted identifier reaches SQL), exactly
426/// as the table name is. When the schema is absent (`None` — the
427/// resolution failed) or is not a safe identifier (an exotic quoted
428/// schema name `pg_catalog` could yield), the reference falls back to
429/// the bare `"table"` — never an unsafe splice, never a false error;
430/// `search_path` then resolves it as in the pre-37.x behaviour. The
431/// `table` is assumed already [`check_identifier`]-validated.
432fn qualified_relation(schema: Option<&str>, table: &str) -> String {
433 match schema {
434 Some(s) if filter::is_safe_identifier(s) => {
435 format!("\"{s}\".\"{table}\"")
436 }
437 _ => format!("\"{table}\""),
438 }
439}
440
441// ════════════════════════════════════════════════════════════════════
442// Pure SQL builders (no I/O — exhaustively unit-tested)
443// ════════════════════════════════════════════════════════════════════
444
445/// Build a parameterized `SELECT * FROM "schema"."table" WHERE …`
446/// statement.
447///
448/// §Fase 37.x.c (D2) — `schema` is the table's resolved schema (from
449/// [`introspect_conn`]); when `Some` and a safe
450/// identifier the relation is emitted SCHEMA-QUALIFIED so it resolves
451/// on any session regardless of the ambient `search_path`. `None`
452/// renders the bare `"table"` (the pre-37.x form — D5).
453/// §Fase 37.d (D3) — `bindings` resolves `${name}` placeholders in the
454/// `where` expression to `$N` bind parameters (never string-spliced).
455/// §v1.36.4 — `column_types` (the `column → udt_name` map) lets
456/// [`build_pg_where`] cast each `where`-clause value to its column's
457/// Postgres type. Pass an empty map when the schema is unknown — the
458/// filter then renders bare `$N` placeholders.
459pub fn build_select_sql(
460 table: &str,
461 schema: Option<&str>,
462 where_expr: &str,
463 bindings: &std::collections::HashMap<String, String>,
464 column_types: &std::collections::HashMap<String, String>,
465) -> Result<(String, Vec<SqlValue>), StoreError> {
466 check_identifier(table, "table")?;
467 let (clause, params) = build_pg_where(where_expr, 0, bindings, column_types)?;
468 let relation = qualified_relation(schema, table);
469 Ok((format!("SELECT * FROM {relation} WHERE {clause}"), params))
470}
471
472/// Build a parameterized `DELETE FROM "schema"."table" WHERE …`
473/// statement.
474///
475/// §Fase 37.x.c (D2) — `schema` schema-qualifies the relation (see
476/// [`build_select_sql`]). §v1.36.4 — `column_types` drives the
477/// `where`-clause value cast.
478pub fn build_delete_sql(
479 table: &str,
480 schema: Option<&str>,
481 where_expr: &str,
482 bindings: &std::collections::HashMap<String, String>,
483 column_types: &std::collections::HashMap<String, String>,
484) -> Result<(String, Vec<SqlValue>), StoreError> {
485 check_identifier(table, "table")?;
486 let (clause, params) = build_pg_where(where_expr, 0, bindings, column_types)?;
487 let relation = qualified_relation(schema, table);
488 Ok((format!("DELETE FROM {relation} WHERE {clause}"), params))
489}
490
491/// §Fase 37.x.b (D1) — a store table resolved against `pg_catalog`,
492/// independent of the ambient `search_path`. The product of
493/// [`introspect_conn`].
494#[derive(Debug, Clone)]
495pub(crate) struct ResolvedTable {
496 /// The schema the table resolves to (e.g. `public`). §37.x.c (D2)
497 /// emits the schema-qualified `"schema"."table"` so an operation
498 /// stops depending on the connection's `search_path`.
499 pub schema: String,
500 /// The `column → udt_name` map driving the `$N::<type>` cast on
501 /// both the write side (`build_insert_sql` / `build_update_sql`)
502 /// and the read side (`build_pg_where`).
503 pub column_types: std::collections::HashMap<String, String>,
504}
505
506/// §Fase 37.x.f (D9) — capacity bound on the schema cache. A many-
507/// table / many-DSN / multi-tenant adopter cannot grow it unbounded; at
508/// the bound the OLDEST entry (smallest insertion sequence) is evicted.
509/// 10k matches the idempotency / replay store bound.
510const SCHEMA_CACHE_CAPACITY: usize = 10_000;
511
512/// §Fase 37.x.f (D9) — the process-global schema cache:
513/// `(dsn, table) → ResolvedTable`, capacity-bounded + self-healing.
514///
515/// A table's schema + column types are stable for a process lifetime,
516/// so one resolution per `(connection, table)` suffices — but the table
517/// CAN drift (a live `ALTER TABLE`). D9 makes the cache self-heal: an
518/// operation that fails with a schema-drift SQLSTATE evicts the
519/// `(dsn, table)` entry ([`PostgresStoreBackend::evict_schema`]) and is
520/// retried once against fresh introspection. The cache is also
521/// capacity-bounded ([`SCHEMA_CACHE_CAPACITY`]) so it cannot grow
522/// without limit. Only a successfully-resolved, non-empty entry is
523/// cached (the §v1.36.5 don't-cache-failures rule, preserved).
524struct SchemaCache {
525 /// `(dsn, table)` → the resolution + its insertion sequence.
526 entries: std::collections::HashMap<
527 (String, String),
528 (std::sync::Arc<ResolvedTable>, u64),
529 >,
530 /// Monotonic insertion counter — drives oldest-first eviction.
531 next_seq: u64,
532 /// The capacity bound. A field (not a hard-coded constant) so the
533 /// eviction logic is unit-testable with a small bound.
534 capacity: usize,
535}
536
537impl SchemaCache {
538 fn new(capacity: usize) -> Self {
539 Self {
540 entries: std::collections::HashMap::new(),
541 next_seq: 0,
542 capacity,
543 }
544 }
545
546 /// The cached resolution for `key`, or `None` on a miss.
547 fn get(
548 &self,
549 key: &(String, String),
550 ) -> Option<std::sync::Arc<ResolvedTable>> {
551 self.entries.get(key).map(|(arc, _)| std::sync::Arc::clone(arc))
552 }
553
554 /// Insert (or refresh) a resolution. §D9 — at capacity the oldest
555 /// entry (smallest sequence) is evicted first; a linear scan,
556 /// acceptable at the 10k bound (the idempotency store's approach).
557 fn insert(
558 &mut self,
559 key: (String, String),
560 resolved: std::sync::Arc<ResolvedTable>,
561 ) {
562 if self.entries.len() >= self.capacity
563 && !self.entries.contains_key(&key)
564 {
565 // Linear scan for the smallest insertion sequence.
566 let oldest = self
567 .entries
568 .iter()
569 .min_by_key(|item| (item.1).1)
570 .map(|item| item.0.clone());
571 if let Some(oldest) = oldest {
572 self.entries.remove(&oldest);
573 }
574 }
575 let seq = self.next_seq;
576 self.next_seq = self.next_seq.wrapping_add(1);
577 self.entries.insert(key, (resolved, seq));
578 }
579
580 /// §D9 — drop `key` so the next operation re-introspects.
581 fn evict(&mut self, key: &(String, String)) {
582 self.entries.remove(key);
583 }
584}
585
586static SCHEMA_CACHE: std::sync::LazyLock<std::sync::Mutex<SchemaCache>> =
587 std::sync::LazyLock::new(|| {
588 std::sync::Mutex::new(SchemaCache::new(SCHEMA_CACHE_CAPACITY))
589 });
590
591/// §Fase 37.x.b (D1) — the pure resolution core: group a flat
592/// `(schema, column, udt)` introspection result by schema and decide.
593///
594/// - 0 schemas → [`StoreError::TableNotResolved`].
595/// - exactly 1 schema → `Ok((schema, column → udt map))`.
596/// - 2+ schemas → [`StoreError::AmbiguousTable`] (the schemas sorted).
597///
598/// Pure + total — exhaustively unit-tested without a database. Both the
599/// search-path-correct primary resolution and the search-path-
600/// independent `pg_catalog` fallback feed their rows through this one
601/// function, so the resolution verdict is computed identically.
602/// `pub` so 37.x.i's property/fuzz pack can drive it across arbitrary
603/// schema topologies — same exposure rationale as [`build_pg_where`] /
604/// [`build_select_sql`] / [`classify_pg_type`] (pure totals worth
605/// exhaustive external test).
606pub fn resolve_from_rows(
607 table: &str,
608 rows: Vec<(String, String, String)>,
609) -> Result<(String, std::collections::HashMap<String, String>), StoreError> {
610 let mut by_schema: std::collections::BTreeMap<
611 String,
612 std::collections::HashMap<String, String>,
613 > = std::collections::BTreeMap::new();
614 for (schema, column, udt) in rows {
615 by_schema.entry(schema).or_default().insert(column, udt);
616 }
617 match by_schema.len() {
618 0 => Err(StoreError::TableNotResolved {
619 table: table.to_string(),
620 }),
621 // A `BTreeMap` of length 1 — `into_iter().next()` is total.
622 1 => Ok(by_schema.into_iter().next().unwrap()),
623 // `BTreeMap` keys iterate sorted — a deterministic schema list.
624 _ => Err(StoreError::AmbiguousTable {
625 table: table.to_string(),
626 schemas: by_schema.into_keys().collect(),
627 }),
628 }
629}
630
631/// §Fase 37.x.b — decode a `pg_catalog` introspection result into the
632/// flat `(schema, column, udt)` triples [`resolve_from_rows`] groups. A
633/// row missing any field is skipped (defensive — the resolution
634/// queries always project all three).
635fn collect_triples(rows: &[PgRow]) -> Vec<(String, String, String)> {
636 let mut out = Vec::with_capacity(rows.len());
637 for row in rows {
638 let schema: String = row.try_get("schema_name").unwrap_or_default();
639 let column: String = row.try_get("column_name").unwrap_or_default();
640 let udt: String = row.try_get("type_name").unwrap_or_default();
641 if !schema.is_empty() && !column.is_empty() && !udt.is_empty() {
642 out.push((schema, column, udt));
643 }
644 }
645 out
646}
647
648/// §Fase 37.x.f (D9) — `true` iff `code` is a schema-drift SQLSTATE: a
649/// store SQL statement that fails with one has hit a STALE cache.
650///
651/// - `42P01` undefined_table — the table was dropped / renamed / had
652/// its schema changed since the resolution was cached.
653/// - `42703` undefined_column — a column was dropped / renamed.
654/// - `42804` datatype_mismatch — a stale WRITE cast (`$N::<old>` into
655/// a column whose type changed).
656/// - `42883` undefined_function — a stale READ cast (`"col" = $N::<old>`
657/// whose operator no longer exists, e.g. `text = uuid`).
658///
659/// Every one is a PARSE / PLAN-time rejection — the statement never
660/// executed, so the failed operation had ZERO side effects and the D9
661/// retry cannot double-write. `pub` so 37.x.i's property/fuzz pack
662/// drives it across all ASCII inputs (the closed-set membership test
663/// must be total + never panic).
664pub fn is_schema_drift_sqlstate(code: &str) -> bool {
665 matches!(code, "42P01" | "42703" | "42804" | "42883")
666}
667
668/// §Fase 37.x.f (D9) — classify a failed store SQL statement: a
669/// schema-drift SQLSTATE ([`is_schema_drift_sqlstate`]) yields
670/// [`StoreError::SchemaDrift`] (which triggers the cache self-heal);
671/// anything else is a plain [`StoreError::Query`]. `pub(crate)` so the
672/// `row_stream` cursor maps its errors through the same classifier.
673pub(crate) fn classify_sql_error(
674 op: &'static str,
675 err: sqlx::Error,
676) -> StoreError {
677 let sqlstate = err
678 .as_database_error()
679 .and_then(|db| db.code())
680 .map(|c| c.into_owned());
681 match sqlstate {
682 Some(code) if is_schema_drift_sqlstate(&code) => {
683 StoreError::SchemaDrift {
684 op,
685 sqlstate: code,
686 source: err.to_string(),
687 }
688 }
689 _ => StoreError::Query { op, source: err.to_string() },
690 }
691}
692
693/// §Fase 37.x.b/d (D1/D3) — the two-stage `pg_catalog` table resolution
694/// run on a CALLER-PROVIDED connection, so it shares the operation's
695/// transaction (D3 — one coherent introspect-and-operate session).
696///
697/// 1. **Primary — search-path-correct.** `to_regclass($1)` (the
698/// double-quoted table name) resolves the table exactly as an
699/// unqualified `SELECT * FROM "table"` would; the same query
700/// introspects its columns.
701/// 2. **Fallback — search-path-INDEPENDENT.** When `to_regclass`
702/// yields NULL (the table is not on this session's `search_path`),
703/// a `pg_class` + `pg_namespace` scan keyed on `relname` finds the
704/// table in ANY user schema (system schemas excluded; only real
705/// relations — `relkind` table / view / matview / partitioned /
706/// foreign).
707///
708/// Exactly one schema resolves the table; zero is
709/// [`StoreError::TableNotResolved`], two or more is
710/// [`StoreError::AmbiguousTable`]. `pub(crate)` so `row_stream`'s
711/// cursor drain runs it inside the cursor's own transaction.
712pub(crate) async fn introspect_conn(
713 conn: &mut PgConnection,
714 table: &str,
715) -> Result<std::sync::Arc<ResolvedTable>, StoreError> {
716 // — Stage 1: primary, search-path-correct via `to_regclass`. —
717 // §Fase 38.x.a (D1) — `.persistent(false)` issues an UNNAMED PARSE
718 // (empty name `""`), which Postgres auto-discards/replaces on the
719 // next unnamed PARSE — structurally collision-free behind every
720 // transaction-mode pooler. Setting `statement_cache_capacity(0)`
721 // on the pool's `PgConnectOptions` is necessary but NOT sufficient;
722 // sqlx's named PARSE protocol (`sqlx_s_N`) leaks across logical
723 // sessions when the physical conn behind the pooler is reused.
724 let primary = sqlx::query(
725 "SELECT n.nspname AS schema_name, a.attname AS column_name, \
726 t.typname AS type_name \
727 FROM pg_catalog.pg_class c \
728 JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace \
729 JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid \
730 JOIN pg_catalog.pg_type t ON t.oid = a.atttypid \
731 WHERE c.oid = to_regclass($1) \
732 AND a.attnum > 0 AND NOT a.attisdropped",
733 )
734 .persistent(false)
735 .bind(format!("\"{table}\""))
736 .fetch_all(&mut *conn)
737 .await
738 .map_err(|e| StoreError::Query {
739 op: "resolve",
740 source: e.to_string(),
741 })?;
742
743 let resolution: Result<(String, std::collections::HashMap<String, String>), StoreError> = {
744 let primary_rows = collect_triples(&primary);
745 if !primary_rows.is_empty() {
746 // `to_regclass` resolved — one relation, one schema.
747 resolve_from_rows(table, primary_rows)
748 } else {
749 // — Stage 2: fallback, search-path-INDEPENDENT scan. —
750 let scan = sqlx::query(
751 "SELECT n.nspname AS schema_name, \
752 a.attname AS column_name, t.typname AS type_name \
753 FROM pg_catalog.pg_class c \
754 JOIN pg_catalog.pg_namespace n \
755 ON n.oid = c.relnamespace \
756 JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid \
757 JOIN pg_catalog.pg_type t ON t.oid = a.atttypid \
758 WHERE c.relname = $1 \
759 AND c.relkind IN ('r', 'v', 'm', 'p', 'f') \
760 AND left(n.nspname, 3) <> 'pg_' \
761 AND n.nspname <> 'information_schema' \
762 AND a.attnum > 0 AND NOT a.attisdropped",
763 )
764 .persistent(false)
765 .bind(table)
766 .fetch_all(&mut *conn)
767 .await
768 .map_err(|e| StoreError::Query {
769 op: "resolve",
770 source: e.to_string(),
771 })?;
772 resolve_from_rows(table, collect_triples(&scan))
773 }
774 };
775 // §Fase 37.x.h (D6) — every resolution failure logs as a structured
776 // `tracing::error!` so an adopter's operator can SEE it in production
777 // logs / journald, not only in the propagated `StoreError`. The
778 // Display hint (the `TableNotResolved` / `AmbiguousTable` arms above)
779 // is the actionable line; the structured fields here are the index
780 // for log search.
781 match resolution {
782 Ok((schema, column_types)) => {
783 Ok(std::sync::Arc::new(ResolvedTable { schema, column_types }))
784 }
785 Err(err) => {
786 match &err {
787 StoreError::TableNotResolved { table } => {
788 tracing::error!(
789 target: "axon::store::resolve",
790 store_table = %table,
791 kind = "table_not_resolved",
792 d_letter = "D6",
793 "axonstore could not resolve `{table}` on any \
794 schema visible to this role — see StoreError \
795 Display for the actionable remedy"
796 );
797 }
798 StoreError::AmbiguousTable { table, schemas } => {
799 tracing::error!(
800 target: "axon::store::resolve",
801 store_table = %table,
802 kind = "ambiguous_table",
803 schemas = %schemas.join(","),
804 d_letter = "D6",
805 "axonstore `{table}` resolved in {n} schemas — \
806 declare the target schema or narrow \
807 `search_path`",
808 n = schemas.len(),
809 );
810 }
811 other => {
812 tracing::error!(
813 target: "axon::store::resolve",
814 store_table = %table,
815 kind = "resolve_failed",
816 d_letter = "D6",
817 "axonstore resolution of `{table}` failed: \
818 {other}"
819 );
820 }
821 }
822 Err(err)
823 }
824 }
825}
826
827/// §v1.36.2 — the `::<type>` cast suffix for a `$N` value placeholder.
828///
829/// axon's runtime carries no column schema (D12), so a `text`-bound
830/// value cannot reach a `uuid` / `int` / `timestamptz` column: Postgres
831/// has no cross-type operator. The cure is to cast the VALUE to the
832/// column's type — `$N::uuid` is a valid explicit cast over the bound
833/// parameter (`'83d0…'::uuid` parses the text). v1.36.2 applies it to
834/// every WRITE placeholder (`INSERT` values, `UPDATE … SET`); §v1.36.4
835/// applies the identical cure to the read side via [`build_pg_where`]
836/// (`"col" {op} $N::<type>`). The column's Postgres type name comes
837/// from a cached `to_regclass` + `pg_catalog` introspection
838/// ([`introspect_conn`]).
839///
840/// Empty when the column type is unknown (introspection missed the
841/// column, or ran against a table outside `current_schema()`) or the
842/// type name is not a safe identifier — the builder then emits a bare
843/// `$N`: a `text` column still works, a typed column fails LOUDLY (no
844/// regression, no silent-wrong write).
845fn write_cast(
846 column_types: &std::collections::HashMap<String, String>,
847 column: &str,
848) -> String {
849 match column_types.get(column) {
850 Some(udt) if filter::is_safe_identifier(udt) => format!("::{udt}"),
851 _ => String::new(),
852 }
853}
854
855/// Build a parameterized `INSERT INTO "table" (…) VALUES (…)`.
856///
857/// A `NULL` data value renders as the inline `NULL` keyword (a fixed
858/// SQL token, injection-safe) and consumes no `$N` placeholder — the
859/// same discipline 35.b applies to `NULL` in a `where` clause. Postgres
860/// infers the column type for an inline `NULL`.
861///
862/// §v1.36.2 — each `$N` value placeholder is cast to its column's
863/// introspected type (`column_types`) so a `text`-bound value writes
864/// into a `uuid` / `int` / `timestamptz` column. An empty
865/// `column_types` map emits bare `$N` (the pre-1.36.2 behaviour).
866/// §Fase 37.x.c (D2) — `schema` schema-qualifies the relation
867/// (`INSERT INTO "schema"."table"`); `None` renders the bare `"table"`.
868pub fn build_insert_sql(
869 table: &str,
870 schema: Option<&str>,
871 data: &[(String, SqlValue)],
872 column_types: &std::collections::HashMap<String, String>,
873) -> Result<(String, Vec<SqlValue>), StoreError> {
874 check_identifier(table, "table")?;
875 if data.is_empty() {
876 return Err(StoreError::EmptyData { op: "insert" });
877 }
878
879 let mut columns: Vec<String> = Vec::with_capacity(data.len());
880 let mut value_frags: Vec<String> = Vec::with_capacity(data.len());
881 let mut params: Vec<SqlValue> = Vec::new();
882 let mut idx = 1usize;
883
884 for (col, val) in data {
885 check_identifier(col, "column")?;
886 columns.push(format!("\"{col}\""));
887 match val {
888 SqlValue::Null => value_frags.push("NULL".to_string()),
889 bound => {
890 value_frags.push(format!("${idx}{}", write_cast(column_types, col)));
891 params.push(bound.clone());
892 idx += 1;
893 }
894 }
895 }
896
897 let sql = format!(
898 "INSERT INTO {} ({}) VALUES ({})",
899 qualified_relation(schema, table),
900 columns.join(", "),
901 value_frags.join(", "),
902 );
903 Ok((sql, params))
904}
905
906/// Build a parameterized `UPDATE "table" SET … WHERE …`.
907///
908/// The `WHERE` placeholders continue the numbering after the `SET`
909/// placeholders **actually emitted** — not after the column count.
910/// Because a `NULL` `SET` value renders inline (no placeholder), the
911/// offset is the count of non-`NULL` `SET` values. (The frozen Python
912/// reference offsets by column count and so mis-numbers the moment a
913/// `SET` value is `NULL`.)
914///
915/// §v1.36.2 — each `SET` value placeholder is cast to its column's
916/// introspected type (`column_types`), the same `$N::<type>` cure
917/// `build_insert_sql` applies, so a `text`-bound value writes into a
918/// non-`text` column. §v1.36.4 — the same `column_types` map is now
919/// threaded into the `WHERE` side too, so a `where`-clause value is
920/// cast to its column's type (`"col" {op} $N::<type>`).
921/// §Fase 37.x.c (D2) — `schema` schema-qualifies the relation
922/// (`UPDATE "schema"."table"`); `None` renders the bare `"table"`.
923pub fn build_update_sql(
924 table: &str,
925 schema: Option<&str>,
926 where_expr: &str,
927 data: &[(String, SqlValue)],
928 bindings: &std::collections::HashMap<String, String>,
929 column_types: &std::collections::HashMap<String, String>,
930) -> Result<(String, Vec<SqlValue>), StoreError> {
931 check_identifier(table, "table")?;
932 if data.is_empty() {
933 return Err(StoreError::EmptyData { op: "mutate" });
934 }
935
936 let mut set_frags: Vec<String> = Vec::with_capacity(data.len());
937 let mut params: Vec<SqlValue> = Vec::new();
938 let mut idx = 1usize;
939
940 for (col, val) in data {
941 check_identifier(col, "column")?;
942 match val {
943 SqlValue::Null => set_frags.push(format!("\"{col}\" = NULL")),
944 bound => {
945 set_frags.push(format!(
946 "\"{col}\" = ${idx}{}",
947 write_cast(column_types, col)
948 ));
949 params.push(bound.clone());
950 idx += 1;
951 }
952 }
953 }
954
955 // `idx - 1` SET placeholders were emitted; WHERE continues there.
956 let set_param_count = idx - 1;
957 let (clause, where_params) =
958 build_pg_where(where_expr, set_param_count, bindings, column_types)?;
959 params.extend(where_params);
960
961 let sql = format!(
962 "UPDATE {} SET {} WHERE {clause}",
963 qualified_relation(schema, table),
964 set_frags.join(", "),
965 );
966 Ok((sql, params))
967}
968
969// ════════════════════════════════════════════════════════════════════
970// Column-type catalog (D12 honest scope) + row → JSON mapping
971// ════════════════════════════════════════════════════════════════════
972
973/// The supported Postgres column-type classes. A column whose type is
974/// outside this closed catalog is a [`StoreError::UnsupportedColumnType`]
975/// — an honest, documented boundary rather than a silent miss.
976#[derive(Debug, Clone, Copy, PartialEq, Eq)]
977pub enum PgTypeClass {
978 /// `BOOL`
979 Bool,
980 /// `INT2` (smallint)
981 Int2,
982 /// `INT4` (integer)
983 Int4,
984 /// `INT8` (bigint)
985 Int8,
986 /// `FLOAT4` (real)
987 Float4,
988 /// `FLOAT8` (double precision)
989 Float8,
990 /// `NUMERIC` / `DECIMAL` — JSON-encoded as a string (precision-safe)
991 Numeric,
992 /// `TEXT` / `VARCHAR` / `BPCHAR` / `NAME`
993 Text,
994 /// `UUID` — JSON-encoded as a hyphenated string
995 Uuid,
996 /// `TIMESTAMPTZ` — JSON-encoded as an RFC 3339 string
997 TimestampTz,
998 /// `TIMESTAMP` — JSON-encoded as an ISO 8601 (no-zone) string
999 Timestamp,
1000 /// `DATE` — JSON-encoded as a `YYYY-MM-DD` string
1001 Date,
1002 /// `TIME` — JSON-encoded as a `HH:MM:SS` string
1003 Time,
1004 /// `JSON` / `JSONB` — passed through as the JSON value
1005 Json,
1006 /// `BYTEA` — JSON-encoded as a base64 string
1007 Bytea,
1008}
1009
1010/// Classify a Postgres type name into a [`PgTypeClass`], or `None` if
1011/// the type is outside the v1.30.0 supported catalog. Pure + total.
1012pub fn classify_pg_type(pg_type: &str) -> Option<PgTypeClass> {
1013 Some(match pg_type.to_ascii_uppercase().as_str() {
1014 "BOOL" => PgTypeClass::Bool,
1015 "INT2" => PgTypeClass::Int2,
1016 "INT4" => PgTypeClass::Int4,
1017 "INT8" => PgTypeClass::Int8,
1018 "FLOAT4" => PgTypeClass::Float4,
1019 "FLOAT8" => PgTypeClass::Float8,
1020 "NUMERIC" => PgTypeClass::Numeric,
1021 "TEXT" | "VARCHAR" | "BPCHAR" | "NAME" => PgTypeClass::Text,
1022 "UUID" => PgTypeClass::Uuid,
1023 "TIMESTAMPTZ" => PgTypeClass::TimestampTz,
1024 "TIMESTAMP" => PgTypeClass::Timestamp,
1025 "DATE" => PgTypeClass::Date,
1026 "TIME" => PgTypeClass::Time,
1027 "JSON" | "JSONB" => PgTypeClass::Json,
1028 "BYTEA" => PgTypeClass::Bytea,
1029 _ => return None,
1030 })
1031}
1032
1033/// A single retrieved row, as JSON-safe column → value pairs in column
1034/// order. Every value is `serde_json`-representable — UUID, TIMESTAMPTZ
1035/// and NUMERIC are pre-mapped to strings, so an adopter never has to
1036/// monkey-patch a JSON encoder (the kivi-reported Python pain).
1037#[derive(Debug, Clone, PartialEq)]
1038pub struct StoreRow {
1039 /// Column name → JSON value, in `SELECT` column order.
1040 pub columns: Vec<(String, JsonValue)>,
1041}
1042
1043impl StoreRow {
1044 /// Look up a column's value by name.
1045 pub fn get(&self, column: &str) -> Option<&JsonValue> {
1046 self.columns
1047 .iter()
1048 .find(|(name, _)| name == column)
1049 .map(|(_, value)| value)
1050 }
1051
1052 /// Render the row as a JSON object.
1053 pub fn to_json(&self) -> JsonValue {
1054 JsonValue::Object(self.columns.iter().cloned().collect())
1055 }
1056}
1057
1058/// Decode one column of a `PgRow` into a JSON-safe value.
1059fn pg_value_to_json(
1060 row: &PgRow,
1061 idx: usize,
1062 column: &str,
1063 pg_type: &str,
1064) -> Result<JsonValue, StoreError> {
1065 let class = classify_pg_type(pg_type).ok_or_else(|| {
1066 StoreError::UnsupportedColumnType {
1067 column: column.to_string(),
1068 pg_type: pg_type.to_string(),
1069 }
1070 })?;
1071
1072 // Each branch decodes as `Option<T>` so a SQL `NULL` maps to
1073 // `JsonValue::Null` rather than failing the decode.
1074 macro_rules! decode {
1075 ($t:ty, $conv:expr) => {{
1076 let opt: Option<$t> = row.try_get(idx).map_err(|e| {
1077 StoreError::Decode {
1078 column: column.to_string(),
1079 pg_type: pg_type.to_string(),
1080 source: e.to_string(),
1081 }
1082 })?;
1083 match opt {
1084 None => JsonValue::Null,
1085 Some(v) => $conv(v),
1086 }
1087 }};
1088 }
1089
1090 Ok(match class {
1091 PgTypeClass::Bool => decode!(bool, JsonValue::Bool),
1092 PgTypeClass::Int2 => decode!(i16, |v| JsonValue::from(v as i64)),
1093 PgTypeClass::Int4 => decode!(i32, |v| JsonValue::from(v as i64)),
1094 PgTypeClass::Int8 => decode!(i64, JsonValue::from),
1095 PgTypeClass::Float4 => decode!(f32, |v| JsonValue::from(v as f64)),
1096 PgTypeClass::Float8 => decode!(f64, JsonValue::from),
1097 PgTypeClass::Numeric => {
1098 decode!(sqlx::types::BigDecimal, |v: sqlx::types::BigDecimal| {
1099 JsonValue::String(v.to_string())
1100 })
1101 }
1102 PgTypeClass::Text => decode!(String, JsonValue::String),
1103 PgTypeClass::Uuid => {
1104 decode!(uuid::Uuid, |v: uuid::Uuid| JsonValue::String(
1105 v.hyphenated().to_string()
1106 ))
1107 }
1108 PgTypeClass::TimestampTz => {
1109 decode!(
1110 chrono::DateTime<chrono::Utc>,
1111 |v: chrono::DateTime<chrono::Utc>| JsonValue::String(
1112 v.to_rfc3339()
1113 )
1114 )
1115 }
1116 PgTypeClass::Timestamp => {
1117 decode!(chrono::NaiveDateTime, |v: chrono::NaiveDateTime| {
1118 JsonValue::String(
1119 v.format("%Y-%m-%dT%H:%M:%S%.f").to_string(),
1120 )
1121 })
1122 }
1123 PgTypeClass::Date => {
1124 decode!(chrono::NaiveDate, |v: chrono::NaiveDate| {
1125 JsonValue::String(v.to_string())
1126 })
1127 }
1128 PgTypeClass::Time => {
1129 decode!(chrono::NaiveTime, |v: chrono::NaiveTime| {
1130 JsonValue::String(v.to_string())
1131 })
1132 }
1133 PgTypeClass::Json => decode!(JsonValue, |v| v),
1134 PgTypeClass::Bytea => decode!(Vec<u8>, |v: Vec<u8>| {
1135 use base64::Engine;
1136 JsonValue::String(
1137 base64::engine::general_purpose::STANDARD.encode(v),
1138 )
1139 }),
1140 })
1141}
1142
1143/// Map a whole `PgRow` to a [`StoreRow`]. `pub(crate)` so 35.i's
1144/// `row_stream` cursor drain shares one row-decode path with `query`.
1145pub(crate) fn map_pg_row(row: &PgRow) -> Result<StoreRow, StoreError> {
1146 let mut columns = Vec::with_capacity(row.len());
1147 for (idx, col) in row.columns().iter().enumerate() {
1148 let name = col.name().to_string();
1149 let pg_type = col.type_info().name().to_string();
1150 let value = pg_value_to_json(row, idx, &name, &pg_type)?;
1151 columns.push((name, value));
1152 }
1153 Ok(StoreRow { columns })
1154}
1155
1156/// Bind one [`SqlValue`] onto a query. `NULL` is rendered inline by the
1157/// builders and so never reaches this function in practice; the `Null`
1158/// arm binds a typed NULL defensively to keep the function total.
1159/// `pub(crate)` so 35.i's `row_stream` binds cursor-query params
1160/// through the same path.
1161pub(crate) fn bind_value<'q>(
1162 q: Query<'q, Postgres, PgArguments>,
1163 value: &SqlValue,
1164) -> Query<'q, Postgres, PgArguments> {
1165 match value {
1166 SqlValue::Text(s) => q.bind(s.clone()),
1167 SqlValue::Integer(n) => q.bind(*n),
1168 SqlValue::Float(x) => q.bind(*x),
1169 SqlValue::Boolean(b) => q.bind(*b),
1170 SqlValue::Null => q.bind(Option::<String>::None),
1171 }
1172}
1173
1174// ════════════════════════════════════════════════════════════════════
1175// PostgresStoreBackend
1176// ════════════════════════════════════════════════════════════════════
1177
1178/// A Postgres-backed `axonstore`. Holds one lazy, bounded `PgPool`.
1179/// Cheap to [`Clone`] (the pool is internally reference-counted).
1180#[derive(Clone)]
1181pub struct PostgresStoreBackend {
1182 /// The resolved DSN — masked whenever surfaced (`Debug`, errors).
1183 dsn: String,
1184 pool: PgPool,
1185}
1186
1187impl fmt::Debug for PostgresStoreBackend {
1188 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1189 // Never expose the DSN password through `Debug`.
1190 f.debug_struct("PostgresStoreBackend")
1191 .field("dsn", &mask_dsn(&self.dsn))
1192 .finish()
1193 }
1194}
1195
1196impl PostgresStoreBackend {
1197 /// Resolve `connection` and build a lazy, bounded connection pool.
1198 ///
1199 /// Equivalent to [`connect_named`](Self::connect_named) with no
1200 /// store name — the connection's `application_name` is the bare
1201 /// `axon-store`. Prefer `connect_named` so each session is
1202 /// attributable to its declaring `axonstore`.
1203 pub fn connect(connection: &str) -> Result<Self, StoreError> {
1204 Self::connect_named(connection, "")
1205 }
1206
1207 /// Resolve `connection` and build a lazy, bounded connection pool,
1208 /// stamping each connection's `application_name` with `store_name`.
1209 ///
1210 /// Synchronous and cheap: the DSN is parsed into a
1211 /// [`PgConnectOptions`] (a malformed DSN is a typed
1212 /// [`StoreError::PoolInit`]) but `connect_lazy_with` opens **no**
1213 /// connection — the first real connection is made on the first
1214 /// operation (D7 — lazy).
1215 ///
1216 /// Two production-grade properties are set on every connection:
1217 ///
1218 /// - **`statement_cache_capacity(0)`** (Gap 3) — disables sqlx's
1219 /// named server-side prepared-statement cache so the backend is
1220 /// safe behind a transaction-mode pooler (PgBouncer
1221 /// `pool_mode=transaction`, Supabase Supavisor `:6543`, Neon, RDS
1222 /// Proxy), where a cached name minted on one physical session
1223 /// collides on the next (`prepared statement "sqlx_s_1" already
1224 /// exists`). Applied unconditionally — harmless on a direct
1225 /// connection, and there is no knob to misconfigure.
1226 /// - **`application_name`** — `axon-store/<store_name>` (bare
1227 /// `axon-store` when `store_name` is empty), capped at the
1228 /// Postgres 63-byte `NAMEDATALEN-1` limit on a char boundary, so
1229 /// every axon-owned session is identifiable in `pg_stat_activity`,
1230 /// pooler logs and DBA dashboards.
1231 ///
1232 /// Must be called within a Tokio runtime context: a well-formed DSN
1233 /// registers a background connection reaper. In production this is
1234 /// always satisfied — the registry (35.d) is built while the axum
1235 /// server's runtime is live.
1236 pub fn connect_named(
1237 connection: &str,
1238 store_name: &str,
1239 ) -> Result<Self, StoreError> {
1240 Self::connect_named_with_namespace(connection, store_name, None)
1241 }
1242
1243 /// §Fase 38.f (D3) — same as [`Self::connect_named`] but stamps an
1244 /// OPTIONAL per-tenant schema namespace into `application_name`.
1245 ///
1246 /// `connect_named_with_namespace("env:DB", "claims", Some("tenant_42"))`
1247 /// produces a pool whose every session's `application_name` reads
1248 /// `axon-store/claims/tenant_42` — so a DBA reading
1249 /// `pg_stat_activity`, pooler logs, or RDS Performance Insights
1250 /// sees both the `axonstore` declaration AND the resolved tenant.
1251 ///
1252 /// `None` for `namespace` is the pre-38 shape (`axon-store/<store>`,
1253 /// byte-identical to `connect_named`).
1254 pub fn connect_named_with_namespace(
1255 connection: &str,
1256 store_name: &str,
1257 namespace: Option<&str>,
1258 ) -> Result<Self, StoreError> {
1259 let dsn = resolve_dsn(connection)?;
1260 let opts = PgConnectOptions::from_str(&dsn)
1261 .map_err(|e| StoreError::PoolInit {
1262 dsn_masked: mask_dsn(&dsn),
1263 source: e.to_string(),
1264 })?
1265 .statement_cache_capacity(0)
1266 .application_name(&application_name_for_with_namespace(
1267 store_name, namespace,
1268 ));
1269 // §Fase 38.x.a (D2) — `DEALLOCATE ALL` on every released conn.
1270 //
1271 // This is the SECOND layer of pooler-coherent transaction safety,
1272 // composing with the per-query `.persistent(false)` (D1). If a
1273 // future code path accidentally omits `.persistent(false)`, the
1274 // named prepared statement it allocated would otherwise survive
1275 // on the physical Postgres conn across logical sessions through a
1276 // transaction-mode pooler (Supabase Supavisor `:6543`, PgBouncer
1277 // `pool_mode=transaction`, Neon, RDS Proxy). The next logical
1278 // session that lands on the same physical conn would collide on
1279 // `PARSE sqlx_s_N` → Postgres `42710` `duplicate_prepared_statement`.
1280 //
1281 // Running `DEALLOCATE ALL` on `after_release` wipes every prepared
1282 // statement (named + unnamed) from the physical conn BEFORE the
1283 // pooler returns it to its pool. Belt-and-suspenders: D1 prevents
1284 // the bug at the source; D2 catches anything that slips past.
1285 //
1286 // The cleanup query itself uses `.persistent(false)` — the
1287 // meta-invariant: even the cleanup is unnamed, so no prepared
1288 // statement can ever survive a connection release.
1289 let pool = PgPoolOptions::new()
1290 .max_connections(MAX_POOL_CONNECTIONS)
1291 .min_connections(0)
1292 .acquire_timeout(Duration::from_secs(ACQUIRE_TIMEOUT_SECS))
1293 .idle_timeout(Duration::from_secs(IDLE_TIMEOUT_SECS))
1294 .after_release(|conn, _meta| Box::pin(async move {
1295 // `DEALLOCATE ALL` clears every prepared statement —
1296 // named (`sqlx_s_N`) AND unnamed (`""`) — from the
1297 // physical Postgres connection. Cheap (<1ms typically).
1298 sqlx::query("DEALLOCATE ALL")
1299 .persistent(false)
1300 .execute(&mut *conn)
1301 .await?;
1302 // `Ok(true)` keeps the conn alive in the pool for reuse.
1303 // We never drop a conn on `DEALLOCATE` failure because a
1304 // failure here means something more fundamental is wrong
1305 // (lost socket, server-side crash); the next acquire will
1306 // surface the real error.
1307 Ok(true)
1308 }))
1309 .connect_lazy_with(opts);
1310 Ok(Self { dsn, pool })
1311 }
1312
1313 /// The resolved DSN with its password masked — safe to log.
1314 pub fn masked_dsn(&self) -> String {
1315 mask_dsn(&self.dsn)
1316 }
1317
1318 /// The underlying pool — 35.i's `Stream<Row>` borrows it.
1319 pub fn pool(&self) -> &PgPool {
1320 &self.pool
1321 }
1322
1323 /// §Fase 37.x.j (D1) — Acquire ONE physical Postgres connection
1324 /// from the pool to be held for the duration of a flow execution
1325 /// ([`crate::runner::ExecContext`] for the sync path,
1326 /// [`crate::flow_dispatcher::DispatchCtx`] for the async streaming
1327 /// path).
1328 ///
1329 /// The returned [`sqlx::pool::PoolConnection`] is wrapped by the
1330 /// caller in [`crate::store::store_conn::StoreConn::Pinned`] and
1331 /// passed to every operation (`query` / `insert` / `mutate` /
1332 /// `purge` / `ping`) against this axonstore for the flow lifetime.
1333 /// Because every op runs against the same physical Postgres backend
1334 /// connection, a transaction-mode pooler (Supabase Supavisor,
1335 /// PgBouncer, Neon, RDS Proxy) cannot swap the backend between
1336 /// queries — the D3 "unnamed prepared statement does not exist"
1337 /// race that Fase 37.x.j closes.
1338 ///
1339 /// The connection is released back to the pool on `Drop` of the
1340 /// returned `PoolConnection`. The existing
1341 /// `after_release(DEALLOCATE ALL)` hook (Fase 38.x.a D2) wipes any
1342 /// prepared statements before the conn is reused — composing
1343 /// cleanly with the per-flow pinning of 37.x.j.
1344 ///
1345 /// Failure modes:
1346 /// - `StoreError::Connect` if the pool's `acquire_timeout`
1347 /// elapses (no conn becomes available — pool exhausted or
1348 /// Postgres unreachable).
1349 /// - `StoreError::Connect` if the pool is in a permanently-bad
1350 /// state (TLS handshake failure, DNS resolution failure, etc.).
1351 pub async fn acquire_pin(
1352 &self,
1353 ) -> Result<sqlx::pool::PoolConnection<sqlx::Postgres>, StoreError> {
1354 self.pool
1355 .acquire()
1356 .await
1357 .map_err(|e| StoreError::Connect { source: e.to_string() })
1358 }
1359
1360 /// `retrieve` — run `SELECT * FROM "schema"."table" WHERE …` and map
1361 /// every row to a JSON-safe [`StoreRow`].
1362 ///
1363 /// §Fase 37.x.d (D3) — on a cache MISS the schema introspection and
1364 /// the `SELECT` execute inside ONE transaction, so a
1365 /// transaction-mode pooler pins one physical backend for both —
1366 /// they cannot split across sessions. A cache HIT needs no
1367 /// transaction: the cached resolution is already correct and the
1368 /// `SELECT` is schema-qualified, so it resolves on any session.
1369 ///
1370 /// v1.30.0 materializes the full result (`fetch_all`); 35.i adds the
1371 /// backpressured `Stream<Row>` variant (Pillar III).
1372 pub async fn query(
1373 &self,
1374 // §Fase 37.x.j (D1) — the connection source for this op.
1375 // `StoreConn::Pool(&self.pool)` for legacy callers (the
1376 // v1.38.5 and earlier behavior); `StoreConn::Pinned(conn)` for
1377 // 37.x.j flow-pinned execution where the caller acquired a
1378 // `PoolConnection` at flow start via `acquire_pin()`. Both
1379 // variants run the cache-HIT SELECT + cache-MISS introspect+
1380 // SELECT-in-tx paths identically; the pinned variant
1381 // additionally guarantees the same physical Postgres backend
1382 // services every op against this store for the flow lifetime.
1383 conn: &mut crate::store::store_conn::StoreConn<'_>,
1384 table: &str,
1385 where_expr: &str,
1386 bindings: &std::collections::HashMap<String, String>,
1387 ) -> Result<Vec<StoreRow>, StoreError> {
1388 // — cache HIT: operate with the cached resolution; no
1389 // transaction. §37.x.f (D9) self-heals a stale cache. —
1390 if let Some(resolved) = self.cached_schema(table) {
1391 let (sql, params) = build_select_sql(
1392 table,
1393 Some(resolved.schema.as_str()),
1394 where_expr,
1395 bindings,
1396 &resolved.column_types,
1397 )?;
1398 // §Fase 38.x.a (D1) — see `introspect_conn` for the full rationale.
1399 let mut q = sqlx::query(&sql).persistent(false);
1400 for value in ¶ms {
1401 q = bind_value(q, value);
1402 }
1403 // §Fase 37.x.j (D1) — dispatch through the StoreConn so
1404 // a pinned variant routes through the same physical conn
1405 // as every other op against this store for the flow.
1406 match conn.fetch_all(q).await {
1407 Ok(rows) => return rows.iter().map(map_pg_row).collect(),
1408 Err(e) => {
1409 let err = classify_sql_error("retrieve", e);
1410 if !err.is_schema_drift() {
1411 return Err(err);
1412 }
1413 // §37.x.f (D9) — the cached schema is STALE; evict
1414 // and fall through to the miss path: the single
1415 // retry, with fresh introspection.
1416 self.evict_schema(table);
1417 }
1418 }
1419 }
1420 // — cache MISS, or the §37.x.f (D9) self-heal retry: resolve +
1421 // operate in ONE transaction (D3). —
1422 // §Fase 37.x.j (D1) — `conn.begin()` borrows the `StoreConn`
1423 // mutably for the transaction's lifetime; on the Pinned variant
1424 // the transaction runs on the same physical backend as the
1425 // cache-HIT attempt above (D3 invariant preserved).
1426 let mut tx = conn.begin().await.map_err(|e| {
1427 StoreError::Connect { source: e.to_string() }
1428 })?;
1429 // §Fase 37.x.j.11 (POST-CLOSE HOTFIX 2026-05-21) — ROLLBACK +
1430 // propagate the introspect error directly. Pre-hotfix the
1431 // code fell through to bare-table SQL with `(None, &no_types)`
1432 // inside the SAME (now poisoned) transaction. The cascade
1433 // error (`25P02 in_failed_sql_transaction` / `42703 column
1434 // does not exist`) was returned to the application layer,
1435 // masking the actual root cause from any caller that didn't
1436 // filter the `axon::store` tracing target.
1437 //
1438 // Honest scope cut: adopters whose introspect privileges
1439 // differ from query privileges (rare in practice — same DB
1440 // user) no longer get the fall-through. If real adopter
1441 // demand surfaces, a future fase can add an opt-in
1442 // `unsafe_skip_introspect` flag.
1443 let resolved = match introspect_conn(&mut tx, table).await {
1444 Ok(r) => r,
1445 Err(introspect_err) => {
1446 tracing::warn!(
1447 target: "axon::store",
1448 table = %table,
1449 op = "introspect_in_tx",
1450 error = %introspect_err,
1451 d_letter = "37.x.j.11",
1452 "store introspection failed; rolling back the \
1453 transaction and returning the primary error \
1454 directly. Pre-37.x.j.11 the runtime fell through \
1455 to bare-table SQL inside the poisoned tx → \
1456 cascade error masked the root cause."
1457 );
1458 let _ = tx.rollback().await;
1459 return Err(introspect_err);
1460 }
1461 };
1462 let (sql, params) = build_select_sql(
1463 table,
1464 Some(resolved.schema.as_str()),
1465 where_expr,
1466 bindings,
1467 &resolved.column_types,
1468 )?;
1469 // §Fase 38.x.a (D1) — `.persistent(false)` is mandatory inside the
1470 // `pool.begin()` transaction: the named PARSE protocol leaks across
1471 // logical sessions when the physical conn behind the pooler is
1472 // reused. See `introspect_conn` for the full rationale.
1473 let mut q = sqlx::query(&sql).persistent(false);
1474 for value in ¶ms {
1475 q = bind_value(q, value);
1476 }
1477 let rows = q
1478 .fetch_all(&mut *tx)
1479 .await
1480 .map_err(|e| classify_sql_error("retrieve", e))?;
1481 tx.commit().await.map_err(|e| StoreError::Connect {
1482 source: e.to_string(),
1483 })?;
1484 self.cache_schema(table, resolved);
1485 rows.iter().map(map_pg_row).collect()
1486 }
1487
1488 /// §Fase 37.x.d (D3) — the cached `(schema, column_types)`
1489 /// resolution for `table`, or `None` on a cache miss. Pure — no
1490 /// I/O. A HIT lets an operation skip the transaction; a MISS makes
1491 /// the caller introspect ([`introspect_conn`]) inside the
1492 /// operation's own transaction, so a transaction-mode pooler pins
1493 /// one backend for resolution + operation.
1494 pub(crate) fn cached_schema(
1495 &self,
1496 table: &str,
1497 ) -> Option<std::sync::Arc<ResolvedTable>> {
1498 let key = (self.dsn.clone(), table.to_string());
1499 SCHEMA_CACHE.lock().unwrap().get(&key)
1500 }
1501
1502 /// §Fase 37.x.d (D3) — store a successful resolution in the
1503 /// process-global `(dsn, table)` cache.
1504 ///
1505 /// §v1.36.5 rule preserved — an EMPTY resolution is NEVER cached: a
1506 /// real relation always has at least one column, so an empty map is
1507 /// a transient failure that must be retried, never a poisoned
1508 /// entry. §Fase 37.x.f (D9) adds the bounded-LRU + drift eviction.
1509 pub(crate) fn cache_schema(
1510 &self,
1511 table: &str,
1512 resolved: std::sync::Arc<ResolvedTable>,
1513 ) {
1514 if !resolved.column_types.is_empty() {
1515 let key = (self.dsn.clone(), table.to_string());
1516 SCHEMA_CACHE.lock().unwrap().insert(key, resolved);
1517 }
1518 }
1519
1520 /// §Fase 37.x.f (D9) — evict `table`'s cached resolution so the
1521 /// next operation re-introspects. Called by the self-heal path when
1522 /// a store SQL statement fails with a schema-drift SQLSTATE — the
1523 /// live table has drifted from the cached schema. `pub(crate)` so
1524 /// the `row_stream` cursor drain shares the self-heal.
1525 pub(crate) fn evict_schema(&self, table: &str) {
1526 let key = (self.dsn.clone(), table.to_string());
1527 SCHEMA_CACHE.lock().unwrap().evict(&key);
1528 // §Fase 37.x.h (D6) — observability of the D9 self-heal. A live
1529 // `ALTER TABLE` is the expected trigger; a flood of these from
1530 // one `(masked_dsn, table)` means a misconfiguration (a migration
1531 // never finished, two services racing against the same table) and
1532 // an operator needs to SEE the eviction. The masked DSN gives the
1533 // physical-store context without ever leaking a credential.
1534 tracing::warn!(
1535 target: "axon::store::cache",
1536 store_table = %table,
1537 masked_dsn = %mask_dsn(&self.dsn),
1538 kind = "schema_drift_evict",
1539 d_letter = "D9",
1540 "axonstore evicted cached schema for `{table}` after a \
1541 schema-drift SQLSTATE — the next operation will \
1542 re-introspect against the live table"
1543 );
1544 }
1545
1546 /// §Fase 37.x.g (D8) — EAGERLY resolve + introspect `table` against
1547 /// the live database, populating the process-global schema cache.
1548 /// The deploy-time verification entry point: a resolution failure
1549 /// surfaces at DEPLOY, not at the first production request.
1550 ///
1551 /// A cache hit is a no-op `Ok`. Otherwise one connection is
1552 /// acquired and the two-stage [`introspect_conn`] resolution runs;
1553 /// the result is cached so the first runtime operation hits a warm
1554 /// cache. The caller distinguishes the `Err`: a `TableNotResolved`
1555 /// / `AmbiguousTable` means the table genuinely does not resolve on
1556 /// a reachable store (a fatal deploy error); a `Connect` means the
1557 /// store is unreachable (a non-fatal deploy warning — the D9
1558 /// runtime resolution still applies).
1559 pub(crate) async fn warm_schema(&self, table: &str) -> Result<(), StoreError> {
1560 if self.cached_schema(table).is_some() {
1561 return Ok(());
1562 }
1563 let mut conn = self.pool.acquire().await.map_err(|e| {
1564 StoreError::Connect { source: e.to_string() }
1565 })?;
1566 let resolved = introspect_conn(&mut conn, table).await?;
1567 self.cache_schema(table, resolved);
1568 Ok(())
1569 }
1570
1571 /// `persist` — run `INSERT INTO "schema"."table" (…) VALUES (…)`.
1572 /// Returns the number of rows inserted. §Fase 37.x.d (D3) — on a
1573 /// cache MISS the resolution + the `INSERT` execute in ONE
1574 /// transaction; a cache HIT needs no transaction.
1575 pub async fn insert(
1576 &self,
1577 // §Fase 37.x.j (D1) — see `query()` for the rationale on the
1578 // `StoreConn` connection-source parameter.
1579 conn: &mut crate::store::store_conn::StoreConn<'_>,
1580 table: &str,
1581 data: &[(String, SqlValue)],
1582 ) -> Result<u64, StoreError> {
1583 // — cache HIT: operate with the cached resolution; no
1584 // transaction. §37.x.f (D9) self-heals a stale cache. —
1585 if let Some(resolved) = self.cached_schema(table) {
1586 let (sql, params) = build_insert_sql(
1587 table,
1588 Some(resolved.schema.as_str()),
1589 data,
1590 &resolved.column_types,
1591 )?;
1592 // §Fase 38.x.a (D1) — see `introspect_conn` for the full rationale.
1593 let mut q = sqlx::query(&sql).persistent(false);
1594 for value in ¶ms {
1595 q = bind_value(q, value);
1596 }
1597 // §Fase 37.x.j (D1) — dispatch through StoreConn.
1598 match conn.execute(q).await {
1599 Ok(result) => return Ok(result.rows_affected()),
1600 Err(e) => {
1601 let err = classify_sql_error("persist", e);
1602 if !err.is_schema_drift() {
1603 return Err(err);
1604 }
1605 // §37.x.f (D9) — stale cache: evict + fall through
1606 // (the single retry). Safe — a drift SQLSTATE is a
1607 // parse/plan-time rejection, so this `INSERT` wrote
1608 // zero rows; the retry cannot double-write.
1609 self.evict_schema(table);
1610 }
1611 }
1612 }
1613 // — cache MISS, or the §37.x.f (D9) self-heal retry: resolve +
1614 // operate in ONE transaction (D3). —
1615 // §Fase 37.x.j (D1) — see `query()` for the begin() rationale.
1616 let mut tx = conn.begin().await.map_err(|e| {
1617 StoreError::Connect { source: e.to_string() }
1618 })?;
1619 // §Fase 37.x.j.11 — ROLLBACK + propagate introspect error
1620 // directly. See `query()` above for the full rationale.
1621 let resolved = match introspect_conn(&mut tx, table).await {
1622 Ok(r) => r,
1623 Err(introspect_err) => {
1624 tracing::warn!(
1625 target: "axon::store",
1626 table = %table,
1627 op = "introspect_in_tx_persist",
1628 error = %introspect_err,
1629 d_letter = "37.x.j.11",
1630 "persist introspection failed; rolling back the \
1631 transaction and returning the primary error \
1632 directly."
1633 );
1634 let _ = tx.rollback().await;
1635 return Err(introspect_err);
1636 }
1637 };
1638 let (sql, params) = build_insert_sql(
1639 table,
1640 Some(resolved.schema.as_str()),
1641 data,
1642 &resolved.column_types,
1643 )?;
1644 // §Fase 38.x.a (D1) — mandatory inside the `pool.begin()` tx.
1645 let mut q = sqlx::query(&sql).persistent(false);
1646 for value in ¶ms {
1647 q = bind_value(q, value);
1648 }
1649 let result = q
1650 .execute(&mut *tx)
1651 .await
1652 .map_err(|e| classify_sql_error("persist", e))?;
1653 tx.commit().await.map_err(|e| StoreError::Connect {
1654 source: e.to_string(),
1655 })?;
1656 self.cache_schema(table, resolved);
1657 Ok(result.rows_affected())
1658 }
1659
1660 /// `mutate` — run `UPDATE "schema"."table" SET … WHERE …`. Returns
1661 /// the number of rows affected. §Fase 37.x.d (D3) — on a cache MISS
1662 /// the resolution + the `UPDATE` execute in ONE transaction; a
1663 /// cache HIT needs no transaction.
1664 pub async fn mutate(
1665 &self,
1666 // §Fase 37.x.j (D1) — see `query()` for the rationale.
1667 conn: &mut crate::store::store_conn::StoreConn<'_>,
1668 table: &str,
1669 where_expr: &str,
1670 data: &[(String, SqlValue)],
1671 bindings: &std::collections::HashMap<String, String>,
1672 ) -> Result<u64, StoreError> {
1673 // — cache HIT: operate with the cached resolution; no
1674 // transaction. §37.x.f (D9) self-heals a stale cache. —
1675 if let Some(resolved) = self.cached_schema(table) {
1676 let (sql, params) = build_update_sql(
1677 table,
1678 Some(resolved.schema.as_str()),
1679 where_expr,
1680 data,
1681 bindings,
1682 &resolved.column_types,
1683 )?;
1684 // §Fase 38.x.a (D1) — see `introspect_conn` for the full rationale.
1685 let mut q = sqlx::query(&sql).persistent(false);
1686 for value in ¶ms {
1687 q = bind_value(q, value);
1688 }
1689 // §Fase 37.x.j (D1) — dispatch through StoreConn.
1690 match conn.execute(q).await {
1691 Ok(result) => return Ok(result.rows_affected()),
1692 Err(e) => {
1693 let err = classify_sql_error("mutate", e);
1694 if !err.is_schema_drift() {
1695 return Err(err);
1696 }
1697 // §37.x.f (D9) — stale cache: evict + fall through
1698 // (the single retry). Safe — a drift SQLSTATE is a
1699 // parse/plan-time rejection, so this `UPDATE`
1700 // modified zero rows; the retry cannot double-write.
1701 self.evict_schema(table);
1702 }
1703 }
1704 }
1705 // — cache MISS, or the §37.x.f (D9) self-heal retry: resolve +
1706 // operate in ONE transaction (D3). —
1707 // §Fase 37.x.j (D1) — see `query()` for the begin() rationale.
1708 let mut tx = conn.begin().await.map_err(|e| {
1709 StoreError::Connect { source: e.to_string() }
1710 })?;
1711 // §Fase 37.x.j.11 — ROLLBACK + propagate introspect error
1712 // directly. See `query()` above for the full rationale.
1713 let resolved = match introspect_conn(&mut tx, table).await {
1714 Ok(r) => r,
1715 Err(introspect_err) => {
1716 tracing::warn!(
1717 target: "axon::store",
1718 table = %table,
1719 op = "introspect_in_tx_mutate",
1720 error = %introspect_err,
1721 d_letter = "37.x.j.11",
1722 "mutate introspection failed; rolling back the \
1723 transaction and returning the primary error \
1724 directly."
1725 );
1726 let _ = tx.rollback().await;
1727 return Err(introspect_err);
1728 }
1729 };
1730 let (sql, params) = build_update_sql(
1731 table,
1732 Some(resolved.schema.as_str()),
1733 where_expr,
1734 data,
1735 bindings,
1736 &resolved.column_types,
1737 )?;
1738 // §Fase 38.x.a (D1) — mandatory inside the `pool.begin()` tx.
1739 let mut q = sqlx::query(&sql).persistent(false);
1740 for value in ¶ms {
1741 q = bind_value(q, value);
1742 }
1743 let result = q
1744 .execute(&mut *tx)
1745 .await
1746 .map_err(|e| classify_sql_error("mutate", e))?;
1747 tx.commit().await.map_err(|e| StoreError::Connect {
1748 source: e.to_string(),
1749 })?;
1750 self.cache_schema(table, resolved);
1751 Ok(result.rows_affected())
1752 }
1753
1754 /// `purge` — run `DELETE FROM "schema"."table" WHERE …`. Returns the
1755 /// number of rows deleted. §Fase 37.x.d (D3) — on a cache MISS the
1756 /// resolution + the `DELETE` execute in ONE transaction; a cache
1757 /// HIT needs no transaction.
1758 pub async fn purge(
1759 &self,
1760 // §Fase 37.x.j (D1) — see `query()` for the rationale.
1761 conn: &mut crate::store::store_conn::StoreConn<'_>,
1762 table: &str,
1763 where_expr: &str,
1764 bindings: &std::collections::HashMap<String, String>,
1765 ) -> Result<u64, StoreError> {
1766 // — cache HIT: operate with the cached resolution; no
1767 // transaction. §37.x.f (D9) self-heals a stale cache. —
1768 if let Some(resolved) = self.cached_schema(table) {
1769 let (sql, params) = build_delete_sql(
1770 table,
1771 Some(resolved.schema.as_str()),
1772 where_expr,
1773 bindings,
1774 &resolved.column_types,
1775 )?;
1776 // §Fase 38.x.a (D1) — see `introspect_conn` for the full rationale.
1777 let mut q = sqlx::query(&sql).persistent(false);
1778 for value in ¶ms {
1779 q = bind_value(q, value);
1780 }
1781 // §Fase 37.x.j (D1) — dispatch through StoreConn.
1782 match conn.execute(q).await {
1783 Ok(result) => return Ok(result.rows_affected()),
1784 Err(e) => {
1785 let err = classify_sql_error("purge", e);
1786 if !err.is_schema_drift() {
1787 return Err(err);
1788 }
1789 // §37.x.f (D9) — stale cache: evict + fall through
1790 // (the single retry). Safe — a drift SQLSTATE is a
1791 // parse/plan-time rejection, so this `DELETE`
1792 // removed zero rows; the retry cannot double-delete.
1793 self.evict_schema(table);
1794 }
1795 }
1796 }
1797 // — cache MISS, or the §37.x.f (D9) self-heal retry: resolve +
1798 // operate in ONE transaction (D3). —
1799 // §Fase 37.x.j (D1) — see `query()` for the begin() rationale.
1800 let mut tx = conn.begin().await.map_err(|e| {
1801 StoreError::Connect { source: e.to_string() }
1802 })?;
1803 // §Fase 37.x.j.11 — ROLLBACK + propagate introspect error
1804 // directly. See `query()` above for the full rationale.
1805 let resolved = match introspect_conn(&mut tx, table).await {
1806 Ok(r) => r,
1807 Err(introspect_err) => {
1808 tracing::warn!(
1809 target: "axon::store",
1810 table = %table,
1811 op = "introspect_in_tx_purge",
1812 error = %introspect_err,
1813 d_letter = "37.x.j.11",
1814 "purge introspection failed; rolling back the \
1815 transaction and returning the primary error \
1816 directly."
1817 );
1818 let _ = tx.rollback().await;
1819 return Err(introspect_err);
1820 }
1821 };
1822 let (sql, params) = build_delete_sql(
1823 table,
1824 Some(resolved.schema.as_str()),
1825 where_expr,
1826 bindings,
1827 &resolved.column_types,
1828 )?;
1829 // §Fase 38.x.a (D1) — mandatory inside the `pool.begin()` tx.
1830 let mut q = sqlx::query(&sql).persistent(false);
1831 for value in ¶ms {
1832 q = bind_value(q, value);
1833 }
1834 let result = q
1835 .execute(&mut *tx)
1836 .await
1837 .map_err(|e| classify_sql_error("purge", e))?;
1838 tx.commit().await.map_err(|e| StoreError::Connect {
1839 source: e.to_string(),
1840 })?;
1841 self.cache_schema(table, resolved);
1842 Ok(result.rows_affected())
1843 }
1844
1845 /// Verify database reachability with `SELECT 1`.
1846 pub async fn ping(&self) -> Result<(), StoreError> {
1847 // §Fase 38.x.a (D1) — even the trivial reachability probe carries
1848 // `.persistent(false)`: a `SELECT 1` PARSE collision is rare but
1849 // possible behind an aggressive transaction-mode pooler, and the
1850 // grep §-assertion in `fase38x_a_pooler_prepared_statement_regression.rs`
1851 // enforces the invariant uniformly.
1852 sqlx::query("SELECT 1")
1853 .persistent(false)
1854 .execute(&self.pool)
1855 .await
1856 .map(|_| ())
1857 .map_err(|e| StoreError::Connect { source: e.to_string() })
1858 }
1859}
1860
1861// ════════════════════════════════════════════════════════════════════
1862// Unit tests — pure surface (no database)
1863// ════════════════════════════════════════════════════════════════════
1864
1865#[cfg(test)]
1866mod tests {
1867 use super::*;
1868
1869 fn txt(s: &str) -> SqlValue {
1870 SqlValue::Text(s.to_string())
1871 }
1872
1873 /// Empty bindings — these `build_*_sql` tests pin the pre-37.d
1874 /// behaviour (no `${name}` resolution). The §Fase 37.d resolution
1875 /// is exercised by `tests/fase37_d_*` and `store::filter`.
1876 fn nb() -> std::collections::HashMap<String, String> {
1877 std::collections::HashMap::new()
1878 }
1879
1880 // ── resolve_dsn ──────────────────────────────────────────────────
1881
1882 #[test]
1883 fn resolve_empty_connection_errors() {
1884 assert_eq!(resolve_dsn(""), Err(StoreError::EmptyConnection));
1885 assert_eq!(resolve_dsn(" "), Err(StoreError::EmptyConnection));
1886 }
1887
1888 #[test]
1889 fn resolve_literal_dsn_is_returned_verbatim() {
1890 let dsn = "postgresql://u:p@localhost:5432/axon";
1891 assert_eq!(resolve_dsn(dsn), Ok(dsn.to_string()));
1892 }
1893
1894 #[test]
1895 fn resolve_literal_dsn_is_trimmed() {
1896 assert_eq!(
1897 resolve_dsn(" postgresql://h/db "),
1898 Ok("postgresql://h/db".to_string())
1899 );
1900 }
1901
1902 #[test]
1903 fn resolve_bare_env_prefix_errors() {
1904 assert_eq!(resolve_dsn("env:"), Err(StoreError::EmptyEnvVarName));
1905 assert_eq!(resolve_dsn("env: "), Err(StoreError::EmptyEnvVarName));
1906 }
1907
1908 #[test]
1909 fn resolve_missing_env_var_errors() {
1910 match resolve_dsn("env:AXON_NONEXISTENT_VAR_FASE35C") {
1911 Err(StoreError::MissingEnvVar { var }) => {
1912 assert_eq!(var, "AXON_NONEXISTENT_VAR_FASE35C");
1913 }
1914 other => panic!("expected MissingEnvVar, got {other:?}"),
1915 }
1916 }
1917
1918 #[test]
1919 fn resolve_env_var_reads_the_environment() {
1920 // `PATH` is set on every supported OS — exercise the success
1921 // path without mutating the process environment.
1922 let resolved = resolve_dsn("env:PATH").expect("PATH resolves");
1923 assert_eq!(resolved, std::env::var("PATH").unwrap());
1924 assert!(!resolved.is_empty());
1925 }
1926
1927 // ── connect / masking ────────────────────────────────────────────
1928
1929 #[tokio::test]
1930 async fn connect_with_valid_dsn_is_lazy_and_succeeds() {
1931 // `connect_lazy` opens no connection — a well-formed DSN to a
1932 // host that may not exist still yields Ok.
1933 let backend =
1934 PostgresStoreBackend::connect("postgresql://u:p@localhost:5432/db")
1935 .expect("a well-formed DSN builds a lazy pool");
1936 let _ = format!("{backend:?}");
1937 }
1938
1939 #[tokio::test]
1940 async fn connect_masks_the_password_in_dsn_and_debug() {
1941 // A deliberately fake credential — this test asserts the
1942 // backend never surfaces a DSN password.
1943 let fake_secret = "fakecred0";
1944 let backend = PostgresStoreBackend::connect(&format!(
1945 "postgresql://user:{fake_secret}@localhost:5432/axon"
1946 ))
1947 .unwrap();
1948 let masked = backend.masked_dsn();
1949 assert!(!masked.contains(fake_secret), "password must be masked");
1950 assert!(masked.contains("***"));
1951 assert!(!format!("{backend:?}").contains(fake_secret));
1952 }
1953
1954 #[test]
1955 fn connect_empty_connection_errors() {
1956 assert!(matches!(
1957 PostgresStoreBackend::connect(""),
1958 Err(StoreError::EmptyConnection)
1959 ));
1960 }
1961
1962 #[test]
1963 fn connect_missing_env_var_errors() {
1964 assert!(matches!(
1965 PostgresStoreBackend::connect("env:AXON_NONEXISTENT_VAR_FASE35C"),
1966 Err(StoreError::MissingEnvVar { .. })
1967 ));
1968 }
1969
1970 #[test]
1971 fn connect_malformed_dsn_errors() {
1972 assert!(matches!(
1973 PostgresStoreBackend::connect("not a valid dsn at all"),
1974 Err(StoreError::PoolInit { .. })
1975 ));
1976 }
1977
1978 // ── Gap 3 (v1.36.3) — pooler safety + application_name ───────────
1979
1980 #[tokio::test]
1981 async fn connect_named_with_valid_dsn_is_lazy_and_succeeds() {
1982 // `connect_named` builds the same lazy pool — Gap 3 only adds
1983 // `statement_cache_capacity(0)` + `application_name`, neither of
1984 // which opens a connection.
1985 let backend = PostgresStoreBackend::connect_named(
1986 "postgresql://u:p@localhost:5432/db",
1987 "claims",
1988 )
1989 .expect("a well-formed DSN builds a lazy pool");
1990 let _ = format!("{backend:?}");
1991 }
1992
1993 #[test]
1994 fn connect_named_malformed_dsn_errors() {
1995 assert!(matches!(
1996 PostgresStoreBackend::connect_named("not a dsn", "claims"),
1997 Err(StoreError::PoolInit { .. })
1998 ));
1999 }
2000
2001 #[test]
2002 fn application_name_carries_the_store_name() {
2003 assert_eq!(application_name_for("claims"), "axon-store/claims");
2004 assert_eq!(
2005 application_name_for("tenant_audit_log"),
2006 "axon-store/tenant_audit_log"
2007 );
2008 }
2009
2010 #[test]
2011 fn application_name_empty_store_is_bare() {
2012 // `connect` delegates with no store name — the bare label, with
2013 // no dangling slash.
2014 assert_eq!(application_name_for(""), "axon-store");
2015 }
2016
2017 #[test]
2018 fn application_name_capped_at_postgres_namedatalen() {
2019 // Postgres silently truncates `application_name` past 63 bytes;
2020 // we cap it ourselves so the stamped name is exactly observed.
2021 let long = "s".repeat(200);
2022 let name = application_name_for(&long);
2023 assert!(name.len() <= 63, "must fit NAMEDATALEN-1, got {}", name.len());
2024 assert!(name.starts_with("axon-store/s"));
2025 }
2026
2027 #[test]
2028 fn application_name_truncation_respects_char_boundaries() {
2029 // A multi-byte tail must never be cut mid-codepoint — the result
2030 // is always valid UTF-8 (`String` guarantees it, but the cut
2031 // must land on a boundary or the slice panics).
2032 let name = application_name_for(&"é".repeat(100));
2033 assert!(name.len() <= 63);
2034 assert!(name.is_char_boundary(name.len()));
2035 }
2036
2037 // ── build_select_sql ─────────────────────────────────────────────
2038
2039 #[test]
2040 fn select_with_filter() {
2041 let (sql, params) =
2042 build_select_sql("users", None, "id = 1", &nb(), &nb()).unwrap();
2043 // §37.x.e (D4) — unknown column type + equality → `::text`.
2044 assert_eq!(sql, "SELECT * FROM \"users\" WHERE \"id\"::text = $1");
2045 assert_eq!(params, vec![SqlValue::Integer(1)]);
2046 }
2047
2048 #[test]
2049 fn select_casts_the_filter_value_to_its_introspected_column_type() {
2050 // §v1.36.4 — a known column type casts the WHERE value, so the
2051 // comparison uses the native operator (`int4 = int4`).
2052 let types = std::collections::HashMap::from([(
2053 "id".to_string(),
2054 "int4".to_string(),
2055 )]);
2056 let (sql, _) =
2057 build_select_sql("users", None, "id = 1", &nb(), &types).unwrap();
2058 assert_eq!(sql, "SELECT * FROM \"users\" WHERE \"id\" = $1::int4");
2059 }
2060
2061 #[test]
2062 fn select_with_empty_filter_renders_where_true() {
2063 let (sql, params) =
2064 build_select_sql("users", None, "", &nb(), &nb()).unwrap();
2065 assert_eq!(sql, "SELECT * FROM \"users\" WHERE TRUE");
2066 assert!(params.is_empty());
2067 }
2068
2069 #[test]
2070 fn select_rejects_unsafe_table_name() {
2071 assert!(matches!(
2072 build_select_sql("users; DROP TABLE x", None, "", &nb(), &nb()),
2073 Err(StoreError::InvalidIdentifier { kind: "table", .. })
2074 ));
2075 }
2076
2077 #[test]
2078 fn select_propagates_filter_errors() {
2079 assert!(matches!(
2080 build_select_sql("users", None, "id = 1 AND", &nb(), &nb()),
2081 Err(StoreError::Filter(_))
2082 ));
2083 }
2084
2085 // ── build_delete_sql ─────────────────────────────────────────────
2086
2087 #[test]
2088 fn delete_with_filter() {
2089 let (sql, params) =
2090 build_delete_sql("sessions", None, "expired = true", &nb(), &nb())
2091 .unwrap();
2092 assert_eq!(sql, "DELETE FROM \"sessions\" WHERE \"expired\"::text = $1");
2093 assert_eq!(params, vec![SqlValue::Boolean(true)]);
2094 }
2095
2096 #[test]
2097 fn delete_rejects_unsafe_table() {
2098 assert!(matches!(
2099 build_delete_sql("evil\"table", None, "a = 1", &nb(), &nb()),
2100 Err(StoreError::InvalidIdentifier { .. })
2101 ));
2102 }
2103
2104 // ── build_insert_sql ─────────────────────────────────────────────
2105
2106 #[test]
2107 fn insert_basic() {
2108 let (sql, params) = build_insert_sql(
2109 "users",
2110 None,
2111 &[("name".into(), txt("Alice")), ("age".into(), SqlValue::Integer(30))],
2112 &nb(),
2113 )
2114 .unwrap();
2115 assert_eq!(
2116 sql,
2117 "INSERT INTO \"users\" (\"name\", \"age\") VALUES ($1, $2)"
2118 );
2119 assert_eq!(params, vec![txt("Alice"), SqlValue::Integer(30)]);
2120 }
2121
2122 #[test]
2123 fn insert_renders_null_inline_consuming_no_placeholder() {
2124 let (sql, params) = build_insert_sql(
2125 "t",
2126 None,
2127 &[
2128 ("a".into(), SqlValue::Integer(1)),
2129 ("b".into(), SqlValue::Null),
2130 ("c".into(), txt("x")),
2131 ],
2132 &nb(),
2133 )
2134 .unwrap();
2135 assert_eq!(
2136 sql,
2137 "INSERT INTO \"t\" (\"a\", \"b\", \"c\") VALUES ($1, NULL, $2)"
2138 );
2139 assert_eq!(params, vec![SqlValue::Integer(1), txt("x")]);
2140 }
2141
2142 #[test]
2143 fn insert_empty_data_errors() {
2144 assert_eq!(
2145 build_insert_sql("t", None, &[], &nb()),
2146 Err(StoreError::EmptyData { op: "insert" })
2147 );
2148 }
2149
2150 #[test]
2151 fn insert_rejects_unsafe_column_name() {
2152 assert!(matches!(
2153 build_insert_sql("t", None, &[("a\"; DROP".into(), SqlValue::Integer(1))], &nb()),
2154 Err(StoreError::InvalidIdentifier { kind: "column", .. })
2155 ));
2156 }
2157
2158 #[test]
2159 fn insert_rejects_unsafe_table_name() {
2160 assert!(matches!(
2161 build_insert_sql("t t", None, &[("a".into(), SqlValue::Integer(1))], &nb()),
2162 Err(StoreError::InvalidIdentifier { kind: "table", .. })
2163 ));
2164 }
2165
2166 // ── build_update_sql ─────────────────────────────────────────────
2167
2168 #[test]
2169 fn update_basic_where_offset_continues_after_set() {
2170 let (sql, params) = build_update_sql(
2171 "users",
2172 None,
2173 "id = 5",
2174 &[("name".into(), txt("Bob")), ("age".into(), SqlValue::Integer(40))],
2175 &nb(),
2176 &nb(),
2177 )
2178 .unwrap();
2179 assert_eq!(
2180 sql,
2181 "UPDATE \"users\" SET \"name\" = $1, \"age\" = $2 \
2182 WHERE \"id\"::text = $3"
2183 );
2184 assert_eq!(
2185 params,
2186 vec![txt("Bob"), SqlValue::Integer(40), SqlValue::Integer(5)]
2187 );
2188 }
2189
2190 #[test]
2191 fn update_null_set_value_shifts_where_offset_by_non_null_count() {
2192 // The defect the Python reference has: a NULL SET value renders
2193 // inline, so the WHERE offset is the NON-NULL set count (1),
2194 // not the column count (2). `id` must be $2, not $3.
2195 let (sql, params) = build_update_sql(
2196 "users",
2197 None,
2198 "id = 5",
2199 &[("name".into(), SqlValue::Null), ("age".into(), SqlValue::Integer(40))],
2200 &nb(),
2201 &nb(),
2202 )
2203 .unwrap();
2204 assert_eq!(
2205 sql,
2206 "UPDATE \"users\" SET \"name\" = NULL, \"age\" = $1 \
2207 WHERE \"id\"::text = $2"
2208 );
2209 assert_eq!(params, vec![SqlValue::Integer(40), SqlValue::Integer(5)]);
2210 }
2211
2212 #[test]
2213 fn update_with_empty_where_targets_all_rows() {
2214 let (sql, _) = build_update_sql(
2215 "t",
2216 None,
2217 "",
2218 &[("a".into(), SqlValue::Integer(1))],
2219 &nb(),
2220 &nb(),
2221 )
2222 .unwrap();
2223 assert_eq!(sql, "UPDATE \"t\" SET \"a\" = $1 WHERE TRUE");
2224 }
2225
2226 #[test]
2227 fn update_empty_data_errors() {
2228 assert_eq!(
2229 build_update_sql("t", None, "id = 1", &[], &nb(), &nb()),
2230 Err(StoreError::EmptyData { op: "mutate" })
2231 );
2232 }
2233
2234 #[test]
2235 fn update_rejects_unsafe_column() {
2236 assert!(matches!(
2237 build_update_sql(
2238 "t",
2239 None,
2240 "id = 1",
2241 &[("a-b".into(), SqlValue::Integer(1))],
2242 &nb(),
2243 &nb(),
2244 ),
2245 Err(StoreError::InvalidIdentifier { kind: "column", .. })
2246 ));
2247 }
2248
2249 #[test]
2250 fn update_propagates_filter_errors() {
2251 assert!(matches!(
2252 build_update_sql(
2253 "t",
2254 None,
2255 "bad ;",
2256 &[("a".into(), SqlValue::Integer(1))],
2257 &nb(),
2258 &nb(),
2259 ),
2260 Err(StoreError::Filter(_))
2261 ));
2262 }
2263
2264 // ── §v1.36.2 — typed-column write cast ───────────────────────────
2265
2266 #[test]
2267 fn insert_casts_each_value_to_its_introspected_column_type() {
2268 let types = std::collections::HashMap::from([
2269 ("tenant_id".to_string(), "uuid".to_string()),
2270 ("note".to_string(), "text".to_string()),
2271 ("n".to_string(), "int4".to_string()),
2272 ]);
2273 let (sql, _) = build_insert_sql(
2274 "chat_history",
2275 None,
2276 &[
2277 ("tenant_id".into(), txt("83d078e1-b372-42ba-9572-ff8dc521386e")),
2278 ("note".into(), txt("hi")),
2279 ("n".into(), SqlValue::Integer(3)),
2280 ],
2281 &types,
2282 )
2283 .unwrap();
2284 assert_eq!(
2285 sql,
2286 "INSERT INTO \"chat_history\" (\"tenant_id\", \"note\", \"n\") \
2287 VALUES ($1::uuid, $2::text, $3::int4)",
2288 "§v1.36.2 — each value placeholder is cast to its column's \
2289 introspected type so a text-bound value writes into a \
2290 uuid / int column"
2291 );
2292 }
2293
2294 #[test]
2295 fn update_set_casts_each_value_to_its_introspected_column_type() {
2296 let types = std::collections::HashMap::from([(
2297 "status".to_string(),
2298 "uuid".to_string(),
2299 )]);
2300 let (sql, _) = build_update_sql(
2301 "t",
2302 None,
2303 "id = 1",
2304 &[("status".into(), txt("83d078e1-b372-42ba-9572-ff8dc521386e"))],
2305 &nb(),
2306 &types,
2307 )
2308 .unwrap();
2309 assert_eq!(
2310 sql,
2311 "UPDATE \"t\" SET \"status\" = $1::uuid WHERE \"id\"::text = $2",
2312 "§v1.36.2 — the SET value is cast to the column type; `id` \
2313 is absent from the type map so §37.x.e (D4) casts the \
2314 WHERE column to `text` for the equality"
2315 );
2316 }
2317
2318 #[test]
2319 fn update_where_value_is_cast_to_its_column_type() {
2320 // §v1.36.4 — when the WHERE column's type IS known, its value
2321 // placeholder is cast too (the SET-side cure applied to WHERE).
2322 let types = std::collections::HashMap::from([
2323 ("status".to_string(), "text".to_string()),
2324 ("id".to_string(), "int8".to_string()),
2325 ]);
2326 let (sql, _) = build_update_sql(
2327 "t",
2328 None,
2329 "id = 1",
2330 &[("status".into(), txt("done"))],
2331 &nb(),
2332 &types,
2333 )
2334 .unwrap();
2335 assert_eq!(
2336 sql,
2337 "UPDATE \"t\" SET \"status\" = $1::text WHERE \"id\" = $2::int8"
2338 );
2339 }
2340
2341 #[test]
2342 fn unknown_column_type_falls_back_to_a_bare_placeholder() {
2343 // An empty type map (introspection missed the table / column)
2344 // → bare `$N`, the pre-1.36.2 behaviour: a `text` column still
2345 // works, a typed column fails LOUDLY — no regression, no
2346 // silent-wrong write.
2347 let (sql, _) =
2348 build_insert_sql("t", None, &[("x".into(), txt("v"))], &nb()).unwrap();
2349 assert_eq!(sql, "INSERT INTO \"t\" (\"x\") VALUES ($1)");
2350 }
2351
2352 #[test]
2353 fn an_unsafe_column_type_name_is_not_spliced_into_sql() {
2354 // Defense in depth: `udt_name` comes from Postgres, but a type
2355 // name that is not a safe identifier is never spliced — the
2356 // builder falls back to a bare `$N`.
2357 let types = std::collections::HashMap::from([(
2358 "x".to_string(),
2359 "uuid; DROP TABLE t".to_string(),
2360 )]);
2361 let (sql, _) =
2362 build_insert_sql("t", None, &[("x".into(), txt("v"))], &types).unwrap();
2363 assert_eq!(
2364 sql, "INSERT INTO \"t\" (\"x\") VALUES ($1)",
2365 "an unsafe type name yields no cast — never a splice"
2366 );
2367 }
2368
2369 // ── D4 — injection resistance, end to end ────────────────────────
2370
2371 #[test]
2372 fn injection_in_value_position_is_a_bound_parameter() {
2373 let (sql, params) = build_select_sql(
2374 "users",
2375 None,
2376 "name = '; DROP TABLE users; --'",
2377 &nb(),
2378 &nb(),
2379 )
2380 .unwrap();
2381 assert_eq!(sql, "SELECT * FROM \"users\" WHERE \"name\"::text = $1");
2382 assert_eq!(
2383 params,
2384 vec![txt("; DROP TABLE users; --")]
2385 );
2386 }
2387
2388 #[test]
2389 fn injection_in_table_identifier_is_rejected_not_quoted() {
2390 assert!(matches!(
2391 build_select_sql("users\" WHERE 1=1; --", None, "", &nb(), &nb()),
2392 Err(StoreError::InvalidIdentifier { .. })
2393 ));
2394 }
2395
2396 // ── §Fase 37.x.c — schema-anchored relation (D2) ─────────────────
2397
2398 #[test]
2399 fn select_with_a_resolved_schema_is_qualified() {
2400 // §37.x.c (D2) — a resolved schema renders `"schema"."table"`,
2401 // so the SELECT resolves on any session regardless of the
2402 // ambient `search_path`.
2403 let (sql, _) =
2404 build_select_sql("tenants", Some("public"), "id = 1", &nb(), &nb())
2405 .unwrap();
2406 assert_eq!(
2407 sql,
2408 "SELECT * FROM \"public\".\"tenants\" WHERE \"id\"::text = $1"
2409 );
2410 }
2411
2412 #[test]
2413 fn every_builder_qualifies_with_a_resolved_schema() {
2414 // D2 must flip ALL FOUR builders, not three.
2415 let data = [("v".to_string(), SqlValue::Integer(1))];
2416 let (sel, _) =
2417 build_select_sql("t", Some("app"), "", &nb(), &nb()).unwrap();
2418 let (del, _) =
2419 build_delete_sql("t", Some("app"), "", &nb(), &nb()).unwrap();
2420 let (ins, _) = build_insert_sql("t", Some("app"), &data, &nb()).unwrap();
2421 let (upd, _) =
2422 build_update_sql("t", Some("app"), "", &data, &nb(), &nb()).unwrap();
2423 assert!(sel.contains("FROM \"app\".\"t\""), "SELECT: {sel}");
2424 assert!(del.contains("FROM \"app\".\"t\""), "DELETE: {del}");
2425 assert!(ins.contains("INTO \"app\".\"t\""), "INSERT: {ins}");
2426 assert!(upd.starts_with("UPDATE \"app\".\"t\""), "UPDATE: {upd}");
2427 }
2428
2429 #[test]
2430 fn no_resolved_schema_renders_the_bare_table() {
2431 // D5 backwards-compat — `schema = None` (resolution failed or
2432 // not attempted) renders the pre-37.x un-qualified `"table"`.
2433 let (sql, _) = build_select_sql("t", None, "", &nb(), &nb()).unwrap();
2434 assert_eq!(sql, "SELECT * FROM \"t\" WHERE TRUE");
2435 }
2436
2437 #[test]
2438 fn an_unsafe_schema_name_is_not_spliced_and_falls_back_to_bare_table() {
2439 // Defense in depth (D4) — a schema name from `pg_catalog` that
2440 // is not a safe identifier is NEVER spliced; the builder falls
2441 // back to the bare `"table"` (search_path resolves it), exactly
2442 // as an unsafe `udt_name` yields no cast.
2443 for unsafe_schema in ["a\"; DROP TABLE x", "my schema", "1schema"] {
2444 let (sql, _) =
2445 build_select_sql("t", Some(unsafe_schema), "", &nb(), &nb())
2446 .unwrap();
2447 assert_eq!(
2448 sql, "SELECT * FROM \"t\" WHERE TRUE",
2449 "unsafe schema `{unsafe_schema}` must not be spliced"
2450 );
2451 }
2452 }
2453
2454 #[test]
2455 fn a_qualified_table_still_casts_and_offsets_correctly() {
2456 // §37.x.c composes with §v1.36.2/§v1.36.4 — schema-qualification
2457 // is orthogonal to the value cast + the WHERE param offset.
2458 let types = std::collections::HashMap::from([
2459 ("status".to_string(), "uuid".to_string()),
2460 ("id".to_string(), "int8".to_string()),
2461 ]);
2462 let (sql, _) = build_update_sql(
2463 "t",
2464 Some("public"),
2465 "id = 1",
2466 &[("status".into(), txt("done"))],
2467 &nb(),
2468 &types,
2469 )
2470 .unwrap();
2471 assert_eq!(
2472 sql,
2473 "UPDATE \"public\".\"t\" SET \"status\" = $1::uuid \
2474 WHERE \"id\" = $2::int8"
2475 );
2476 }
2477
2478 // ── classify_pg_type ─────────────────────────────────────────────
2479
2480 #[test]
2481 fn classify_every_supported_type() {
2482 let cases = [
2483 ("BOOL", PgTypeClass::Bool),
2484 ("INT2", PgTypeClass::Int2),
2485 ("INT4", PgTypeClass::Int4),
2486 ("INT8", PgTypeClass::Int8),
2487 ("FLOAT4", PgTypeClass::Float4),
2488 ("FLOAT8", PgTypeClass::Float8),
2489 ("NUMERIC", PgTypeClass::Numeric),
2490 ("TEXT", PgTypeClass::Text),
2491 ("VARCHAR", PgTypeClass::Text),
2492 ("BPCHAR", PgTypeClass::Text),
2493 ("NAME", PgTypeClass::Text),
2494 ("UUID", PgTypeClass::Uuid),
2495 ("TIMESTAMPTZ", PgTypeClass::TimestampTz),
2496 ("TIMESTAMP", PgTypeClass::Timestamp),
2497 ("DATE", PgTypeClass::Date),
2498 ("TIME", PgTypeClass::Time),
2499 ("JSON", PgTypeClass::Json),
2500 ("JSONB", PgTypeClass::Json),
2501 ("BYTEA", PgTypeClass::Bytea),
2502 ];
2503 for (name, expected) in cases {
2504 assert_eq!(classify_pg_type(name), Some(expected), "type {name}");
2505 }
2506 }
2507
2508 #[test]
2509 fn classify_is_case_insensitive() {
2510 assert_eq!(classify_pg_type("int4"), Some(PgTypeClass::Int4));
2511 assert_eq!(classify_pg_type("TimestampTz"), Some(PgTypeClass::TimestampTz));
2512 }
2513
2514 #[test]
2515 fn classify_unsupported_types_return_none() {
2516 for name in ["INT4[]", "INET", "POINT", "HSTORE", "CIDR", "MONEY", ""] {
2517 assert_eq!(classify_pg_type(name), None, "type {name} unsupported");
2518 }
2519 }
2520
2521 // ── StoreRow ─────────────────────────────────────────────────────
2522
2523 #[test]
2524 fn store_row_get_and_to_json() {
2525 let row = StoreRow {
2526 columns: vec![
2527 ("id".into(), JsonValue::from(7)),
2528 ("name".into(), JsonValue::String("Eve".into())),
2529 ],
2530 };
2531 assert_eq!(row.get("id"), Some(&JsonValue::from(7)));
2532 assert_eq!(row.get("missing"), None);
2533 assert_eq!(
2534 row.to_json(),
2535 serde_json::json!({ "id": 7, "name": "Eve" })
2536 );
2537 }
2538
2539 // ── §Fase 37.x.b — resolve_from_rows (D1 pure resolution core) ───
2540
2541 fn triple(s: &str, c: &str, t: &str) -> (String, String, String) {
2542 (s.to_string(), c.to_string(), t.to_string())
2543 }
2544
2545 #[test]
2546 fn resolve_from_rows_no_rows_is_table_not_resolved() {
2547 match resolve_from_rows("widgets", vec![]) {
2548 Err(StoreError::TableNotResolved { table }) => {
2549 assert_eq!(table, "widgets");
2550 }
2551 other => panic!("expected TableNotResolved, got {other:?}"),
2552 }
2553 }
2554
2555 #[test]
2556 fn resolve_from_rows_one_schema_resolves_with_its_column_map() {
2557 let (schema, types) = resolve_from_rows(
2558 "tenants",
2559 vec![triple("public", "id", "uuid"), triple("public", "n", "int4")],
2560 )
2561 .expect("a single-schema result resolves");
2562 assert_eq!(schema, "public");
2563 assert_eq!(types.get("id"), Some(&"uuid".to_string()));
2564 assert_eq!(types.get("n"), Some(&"int4".to_string()));
2565 assert_eq!(types.len(), 2);
2566 }
2567
2568 #[test]
2569 fn resolve_from_rows_two_schemas_is_ambiguous_with_sorted_schemas() {
2570 match resolve_from_rows(
2571 "widgets",
2572 vec![
2573 triple("tenant_b", "id", "uuid"),
2574 triple("tenant_a", "id", "int4"),
2575 ],
2576 ) {
2577 Err(StoreError::AmbiguousTable { table, schemas }) => {
2578 assert_eq!(table, "widgets");
2579 // `BTreeMap` keys iterate sorted — a deterministic list.
2580 assert_eq!(
2581 schemas,
2582 vec!["tenant_a".to_string(), "tenant_b".to_string()]
2583 );
2584 }
2585 other => panic!("expected AmbiguousTable, got {other:?}"),
2586 }
2587 }
2588
2589 #[test]
2590 fn resolve_from_rows_three_schemas_is_still_one_ambiguous_error() {
2591 assert!(matches!(
2592 resolve_from_rows(
2593 "t",
2594 vec![
2595 triple("s1", "a", "text"),
2596 triple("s2", "a", "text"),
2597 triple("s3", "a", "text"),
2598 ],
2599 ),
2600 Err(StoreError::AmbiguousTable { .. })
2601 ));
2602 }
2603
2604 // ── §Fase 37.x.d — schema cache (D3) ─────────────────────────────
2605
2606 #[tokio::test]
2607 async fn schema_cache_round_trips_a_resolution() {
2608 // The cache surface that lets a coherent-session operation skip
2609 // the transaction on a hit. `connect` is lazy — no database.
2610 let backend = PostgresStoreBackend::connect(
2611 "postgresql://u:p@localhost:5432/fase37xd_cache_rt",
2612 )
2613 .unwrap();
2614 let table = "fase37xd_cache_probe";
2615 assert!(
2616 backend.cached_schema(table).is_none(),
2617 "a cold cache is a miss"
2618 );
2619 let resolved = std::sync::Arc::new(ResolvedTable {
2620 schema: "public".to_string(),
2621 column_types: std::collections::HashMap::from([(
2622 "id".to_string(),
2623 "uuid".to_string(),
2624 )]),
2625 });
2626 backend.cache_schema(table, std::sync::Arc::clone(&resolved));
2627 let hit = backend
2628 .cached_schema(table)
2629 .expect("a warm cache is a hit");
2630 assert_eq!(hit.schema, "public");
2631 assert_eq!(hit.column_types.get("id"), Some(&"uuid".to_string()));
2632 }
2633
2634 #[tokio::test]
2635 async fn schema_cache_never_stores_an_empty_resolution() {
2636 // §v1.36.5 rule preserved — a real relation always has ≥ 1
2637 // column, so an empty map is a transient failure to retry,
2638 // never a poisoned cache entry.
2639 let backend = PostgresStoreBackend::connect(
2640 "postgresql://u:p@localhost:5432/fase37xd_cache_empty",
2641 )
2642 .unwrap();
2643 let table = "fase37xd_empty_probe";
2644 backend.cache_schema(
2645 table,
2646 std::sync::Arc::new(ResolvedTable {
2647 schema: "public".to_string(),
2648 column_types: std::collections::HashMap::new(),
2649 }),
2650 );
2651 assert!(
2652 backend.cached_schema(table).is_none(),
2653 "an empty resolution must never be cached"
2654 );
2655 }
2656
2657 // ── §Fase 37.x.f — D9 self-healing bounded cache ─────────────────
2658
2659 #[test]
2660 fn is_schema_drift_sqlstate_recognises_exactly_the_drift_codes() {
2661 // The four parse/plan-time rejections that signal a stale cache.
2662 for code in ["42P01", "42703", "42804", "42883"] {
2663 assert!(
2664 is_schema_drift_sqlstate(code),
2665 "`{code}` must be a schema-drift SQLSTATE"
2666 );
2667 }
2668 // Non-drift samples — unique-violation, syntax error, connection
2669 // failure, check-violation, serialization failure, empty.
2670 for code in ["23505", "42601", "08006", "23514", "40001", ""] {
2671 assert!(
2672 !is_schema_drift_sqlstate(code),
2673 "`{code}` is NOT schema drift — must not trigger the \
2674 self-heal retry"
2675 );
2676 }
2677 }
2678
2679 #[test]
2680 fn store_error_is_schema_drift_predicate() {
2681 assert!(StoreError::SchemaDrift {
2682 op: "retrieve",
2683 sqlstate: "42883".to_string(),
2684 source: "operator does not exist: text = uuid".to_string(),
2685 }
2686 .is_schema_drift());
2687 assert!(!StoreError::Query {
2688 op: "retrieve",
2689 source: "syntax error".to_string(),
2690 }
2691 .is_schema_drift());
2692 assert!(!StoreError::TableNotResolved { table: "t".into() }
2693 .is_schema_drift());
2694 }
2695
2696 /// A small `ResolvedTable` for the cache tests.
2697 fn rt(schema: &str) -> std::sync::Arc<ResolvedTable> {
2698 std::sync::Arc::new(ResolvedTable {
2699 schema: schema.to_string(),
2700 column_types: std::collections::HashMap::from([(
2701 "id".to_string(),
2702 "uuid".to_string(),
2703 )]),
2704 })
2705 }
2706
2707 #[test]
2708 fn schema_cache_evicts_the_oldest_entry_at_capacity() {
2709 // §D9 — the bound: a many-table adopter cannot grow the cache
2710 // without limit; at capacity the OLDEST insertion is evicted.
2711 let mut cache = SchemaCache::new(2);
2712 let key = |t: &str| ("dsn".to_string(), t.to_string());
2713 cache.insert(key("a"), rt("s_a"));
2714 cache.insert(key("b"), rt("s_b"));
2715 cache.insert(key("c"), rt("s_c")); // over capacity → evict `a`.
2716 assert_eq!(cache.entries.len(), 2, "the cache is bounded at 2");
2717 assert!(
2718 cache.get(&key("a")).is_none(),
2719 "the oldest entry was evicted"
2720 );
2721 assert_eq!(
2722 cache.get(&key("b")).map(|r| r.schema.clone()),
2723 Some("s_b".to_string())
2724 );
2725 assert_eq!(
2726 cache.get(&key("c")).map(|r| r.schema.clone()),
2727 Some("s_c".to_string())
2728 );
2729 }
2730
2731 #[test]
2732 fn schema_cache_evict_drops_a_named_entry() {
2733 // §D9 — the self-heal eviction primitive.
2734 let mut cache = SchemaCache::new(10);
2735 let key = ("dsn".to_string(), "t".to_string());
2736 cache.insert(key.clone(), rt("public"));
2737 assert!(cache.get(&key).is_some());
2738 cache.evict(&key);
2739 assert!(cache.get(&key).is_none(), "evict drops the entry");
2740 }
2741
2742 #[test]
2743 fn schema_cache_reinsert_of_a_key_does_not_evict_another() {
2744 // Re-inserting an EXISTING key (a self-heal re-introspection)
2745 // refreshes it in place — it must not evict another entry.
2746 let mut cache = SchemaCache::new(2);
2747 let ka = ("dsn".to_string(), "a".to_string());
2748 let kb = ("dsn".to_string(), "b".to_string());
2749 cache.insert(ka.clone(), rt("public"));
2750 cache.insert(kb.clone(), rt("public"));
2751 cache.insert(ka.clone(), rt("public")); // re-insert — no eviction.
2752 assert_eq!(cache.entries.len(), 2);
2753 assert!(cache.get(&ka).is_some());
2754 assert!(cache.get(&kb).is_some(), "the re-insert evicted nothing");
2755 }
2756
2757 // ── StoreError display ───────────────────────────────────────────
2758
2759 #[test]
2760 fn every_store_error_has_a_non_empty_display() {
2761 let errors = [
2762 StoreError::EmptyConnection,
2763 StoreError::EmptyEnvVarName,
2764 StoreError::MissingEnvVar { var: "X".into() },
2765 StoreError::PoolInit {
2766 dsn_masked: "postgresql://u:***@h/db".into(),
2767 source: "bad".into(),
2768 },
2769 StoreError::InvalidIdentifier { kind: "table", name: "x;".into() },
2770 StoreError::EmptyData { op: "insert" },
2771 StoreError::Filter(FilterError::TooManyConditions { limit: 256 }),
2772 StoreError::Connect { source: "refused".into() },
2773 StoreError::Query { op: "retrieve", source: "syntax".into() },
2774 StoreError::UnsupportedColumnType {
2775 column: "geom".into(),
2776 pg_type: "POINT".into(),
2777 },
2778 StoreError::Decode {
2779 column: "ts".into(),
2780 pg_type: "TIMESTAMPTZ".into(),
2781 source: "overflow".into(),
2782 },
2783 StoreError::TableNotResolved { table: "ghost".into() },
2784 StoreError::AmbiguousTable {
2785 table: "dup".into(),
2786 schemas: vec!["a".into(), "b".into()],
2787 },
2788 StoreError::SchemaDrift {
2789 op: "retrieve",
2790 sqlstate: "42883".into(),
2791 source: "operator does not exist: text = uuid".into(),
2792 },
2793 ];
2794 for e in errors {
2795 assert!(!e.to_string().is_empty());
2796 }
2797 }
2798
2799 #[test]
2800 fn filter_error_is_a_store_error_source() {
2801 use std::error::Error;
2802 let e = StoreError::Filter(FilterError::TooManyConditions { limit: 256 });
2803 assert!(e.source().is_some());
2804 }
2805
2806 #[test]
2807 fn filter_error_converts_into_store_error() {
2808 let e: StoreError = FilterError::TooManyConditions { limit: 256 }.into();
2809 assert!(matches!(e, StoreError::Filter(_)));
2810 }
2811
2812 // ── §Fase 37.x.h — D6 honest, actionable failure ─────────────────
2813
2814 #[test]
2815 fn d6_table_not_resolved_display_carries_an_actionable_hint() {
2816 // The Display of `TableNotResolved` is the user-facing surface of
2817 // the D1 resolution failure — it MUST tell an adopter (a) what
2818 // happened, (b) the table involved, and (c) at least one concrete
2819 // remedy. A bare "could not resolve" is the *un-actionable* form
2820 // 37.x.h replaces.
2821 let err = StoreError::TableNotResolved {
2822 table: "claims".into(),
2823 };
2824 let text = err.to_string();
2825 assert!(
2826 text.contains("`claims`"),
2827 "the table name must appear verbatim, got: {text}"
2828 );
2829 assert!(
2830 text.contains("pg_catalog"),
2831 "the message must disclose pg_catalog (so an adopter knows \
2832 `search_path` is not the culprit), got: {text}"
2833 );
2834 assert!(
2835 text.contains("migration") || text.contains("SELECT"),
2836 "the message must name at least one concrete remedy \
2837 (migration / SELECT permission), got: {text}"
2838 );
2839 }
2840
2841 #[test]
2842 fn d6_ambiguous_table_display_points_at_fase_38_schema_declaration() {
2843 // The Display of `AmbiguousTable` MUST tell an adopter both the
2844 // schemas the table resolved into AND the two real remedies —
2845 // narrow `search_path` OR declare the target schema explicitly
2846 // (the Fase 38 `schema:` declaration the gap report names).
2847 let err = StoreError::AmbiguousTable {
2848 table: "rates".into(),
2849 schemas: vec!["finance".into(), "legacy".into()],
2850 };
2851 let text = err.to_string();
2852 assert!(text.contains("`rates`"), "table name must appear");
2853 assert!(
2854 text.contains("finance") && text.contains("legacy"),
2855 "every resolving schema must appear, got: {text}"
2856 );
2857 assert!(
2858 text.contains("search_path"),
2859 "the search_path remedy must appear, got: {text}"
2860 );
2861 assert!(
2862 text.contains("schema:"),
2863 "the Fase 38 `schema:` declaration must be named (the \
2864 genuinely-superior remedy), got: {text}"
2865 );
2866 assert!(
2867 text.contains("Fase 38"),
2868 "the message must anchor the remedy to Fase 38, got: {text}"
2869 );
2870 }
2871
2872 #[test]
2873 fn d6_display_does_not_leak_internal_sqlstates_or_internal_paths() {
2874 // The Display is operator-facing prose, not a stack trace —
2875 // SQLSTATE codes belong on `SchemaDrift` (where they ARE the
2876 // diagnostic), not on a resolution failure. A regression would
2877 // be code spilling into the friendly arms.
2878 let nr = StoreError::TableNotResolved { table: "t".into() }.to_string();
2879 let amb = StoreError::AmbiguousTable {
2880 table: "t".into(),
2881 schemas: vec!["a".into()],
2882 }
2883 .to_string();
2884 for code in ["42P01", "42703", "42804", "42883"] {
2885 assert!(
2886 !nr.contains(code),
2887 "TableNotResolved must not leak SQLSTATE {code}"
2888 );
2889 assert!(
2890 !amb.contains(code),
2891 "AmbiguousTable must not leak SQLSTATE {code}"
2892 );
2893 }
2894 }
2895}