Skip to main content

axon/store/
postgres_backend.rs

1//! §Fase 35.c (v1.30.0) — `PostgresStoreBackend`, the SQL substrate of
2//! the `axonstore` cognitive data plane.
3//!
4//! This module makes `axonstore { backend: postgresql }` real: the four
5//! store operations — `query` (retrieve), `insert` (persist), `mutate`,
6//! `purge` — execute parameterized SQL against a `sqlx::PgPool` instead
7//! of the key-value path. It is the substrate the four pillars (35.g-j)
8//! enrich.
9//!
10//! # D6 — connection resolution
11//!
12//! [`resolve_dsn`] honors `connection: "env:VAR"` (resolve the named
13//! environment variable) and a literal DSN. A missing env var is a
14//! named [`StoreError::MissingEnvVar`] — never a panic, never a silent
15//! fallback to the key-value store.
16//!
17//! # D7 — pooling + honest typed failure surface
18//!
19//! [`PostgresStoreBackend::connect`] builds ONE lazy, bounded
20//! `sqlx::PgPool` (`connect_lazy_with` — no connection is opened until
21//! the first operation). Every failure path — empty connection, missing
22//! env var, malformed DSN, connect failure, SQL error, an unsupported
23//! column type, a decode failure — surfaces as a typed [`StoreError`].
24//! No panic; no silent empty result masking a failed query.
25//!
26//! # Gap 3 (v1.36.3) — transaction-mode pooler safety
27//!
28//! The pool's `PgConnectOptions` set `statement_cache_capacity(0)`
29//! unconditionally. sqlx otherwise caches server-side prepared
30//! statements under generated names (`sqlx_s_1`, …); behind a
31//! transaction-mode pooler — PgBouncer `pool_mode=transaction`,
32//! Supabase Supavisor (`:6543`), Neon, RDS Proxy — successive
33//! operations land on different physical sessions, so a name minted on
34//! one collides on the next (`prepared statement "sqlx_s_1" already
35//! exists`). Capacity 0 routes every query through the *unnamed*
36//! prepared statement — collision-free by construction, harmless on a
37//! direct/session-mode connection. An axonstore DSN is pooler-agnostic
38//! with no knob to misconfigure. Each connection also carries an
39//! `application_name` of `axon-store/<store>` so every session is
40//! attributable to its declaration in `pg_stat_activity`, pooler logs
41//! and DBA dashboards.
42//!
43//! # §Fase 37.x.b — search-path-independent table resolution
44//!
45//! [`introspect_conn`] resolves a store table to
46//! its schema + column types against `pg_catalog` — NOT via the
47//! ambient `search_path`, which a transaction-mode pooler does not
48//! preserve across checkouts. `to_regclass` is the search-path-correct
49//! primary; a `pg_catalog` scan keyed on `relname` is the
50//! search-path-independent fallback. An unresolved or cross-schema-
51//! ambiguous table is a typed [`StoreError`].
52//!
53//! # D4 — injection-proof, identifiers included
54//!
55//! Values flow through 35.b's [`build_pg_where`] as `$N` bind
56//! placeholders. The *identifier* surface — table names and
57//! `insert`/`mutate` column names, which ARE interpolated into SQL
58//! text — is validated against [`filter::is_safe_identifier`]
59//! (`[A-Za-z_]\w*`, ≤ 63 bytes) before being double-quoted. No
60//! untrusted identifier reaches SQL.
61//!
62//! # Architecture — pure builders + thin async execution
63//!
64//! SQL construction ([`build_select_sql`], [`build_insert_sql`],
65//! [`build_update_sql`], [`build_delete_sql`]) is **pure and total** —
66//! no I/O — and therefore exhaustively unit-tested here without a
67//! database. The async methods are thin: build → bind → execute. The
68//! row-decode path and live execution are proven against a real
69//! Postgres in 35.l (the integration harness).
70//!
71//! # Honest scope (D12)
72//!
73//! No DDL: `IRAxonStore` carries no column schema, so v1.30.0 operates
74//! against existing tables (no `CREATE TABLE` / `migrate` / index). Each
75//! operation is a single-statement autocommit; the multi-statement
76//! `transact { … }` block is a documented future fase. The supported
77//! column-type catalog is [`classify_pg_type`]; a column outside it is
78//! a clear [`StoreError::UnsupportedColumnType`], not a silent miss.
79
80use std::fmt;
81use std::str::FromStr;
82use std::time::Duration;
83
84use serde_json::Value as JsonValue;
85use sqlx::postgres::{PgArguments, PgConnectOptions, PgPoolOptions, PgRow};
86use sqlx::query::Query;
87use sqlx::{Column, PgConnection, PgPool, Postgres, Row, TypeInfo};
88
89use crate::store::epistemic::EpistemicError;
90use crate::store::filter::{self, build_pg_where, FilterError, SqlValue};
91
92/// Upper bound on pooled connections per backend (D7 — bounded).
93const MAX_POOL_CONNECTIONS: u32 = 10;
94/// How long to wait to acquire a pooled connection before failing.
95const ACQUIRE_TIMEOUT_SECS: u64 = 5;
96/// How long an idle pooled connection is kept before being reaped.
97const IDLE_TIMEOUT_SECS: u64 = 300;
98
99// ════════════════════════════════════════════════════════════════════
100//  Error catalog (typed, total — D7)
101// ════════════════════════════════════════════════════════════════════
102
103/// Every way an `axonstore` SQL operation can fail. The backend is
104/// total: it returns one of these or a result — never a panic, never a
105/// silent empty result masking a failure.
106#[derive(Debug, Clone, PartialEq)]
107pub enum StoreError {
108    /// `connection` was empty or whitespace-only.
109    EmptyConnection,
110    /// `connection` was the bare prefix `env:` with no variable name.
111    EmptyEnvVarName,
112    /// `connection: "env:VAR"` and `VAR` is unset (or not UTF-8).
113    MissingEnvVar { var: String },
114    /// The resolved DSN is malformed — `connect_lazy` rejected it.
115    PoolInit { dsn_masked: String, source: String },
116    /// A table or column identifier failed the `[A-Za-z_]\w*` / 63-byte
117    /// safety check (D4 — no untrusted identifier reaches SQL).
118    InvalidIdentifier { kind: &'static str, name: String },
119    /// `insert` / `mutate` was called with no column data.
120    EmptyData { op: &'static str },
121    /// The `where` expression did not compile (delegates to 35.b).
122    Filter(FilterError),
123    /// A `confidence_floor` violation — a sub-floor or un-elevated
124    /// `persist` (delegates to 35.g's Pillar I epistemic data plane).
125    Epistemic(EpistemicError),
126    /// A live connection could not be acquired / the ping failed.
127    Connect { source: String },
128    /// A SQL statement failed at execution time.
129    Query { op: &'static str, source: String },
130    /// A retrieved column has a type outside the supported catalog
131    /// ([`classify_pg_type`]). Honest scope, not a silent miss.
132    UnsupportedColumnType { column: String, pg_type: String },
133    /// A retrieved column of a supported type failed to decode.
134    Decode { column: String, pg_type: String, source: String },
135    /// §Fase 37.x.b (D1) — the table named by a store operation could
136    /// not be resolved to a relation in ANY schema of the database.
137    TableNotResolved { table: String },
138    /// §Fase 37.x.b (D1) — the table name resolves to a relation in
139    /// more than one schema and the connection's `search_path` does not
140    /// disambiguate it. Carries the schemas found, sorted.
141    AmbiguousTable { table: String, schemas: Vec<String> },
142    /// §Fase 37.x.f (D9) — a store SQL statement failed with a
143    /// schema-drift SQLSTATE: the cached schema no longer matches the
144    /// live table (an `ALTER TABLE` ran since the cache was populated).
145    /// `42P01` undefined_table, `42703` undefined_column, `42804`
146    /// datatype_mismatch (a stale write cast), `42883` undefined
147    /// operator (a stale read cast). Triggers the D9 self-heal — the
148    /// `(dsn, table)` cache entry is evicted and the operation retried
149    /// once against fresh introspection. Safe: every one is a
150    /// parse/plan-time rejection, so the failed statement had ZERO side
151    /// effects (a retried `persist`/`mutate` cannot double-write).
152    SchemaDrift { op: &'static str, sqlstate: String, source: String },
153    /// §Fase 38.f (D3) — `axon-T806`. A `postgresql` store declared
154    /// `schema: env:VAR` and the named env var is unset at deploy
155    /// time. Never falls back silently — the deploy fails, the
156    /// operator either exports the var or fixes the declaration.
157    MissingPerTenantSchemaEnv { store: String, var: String },
158    /// §Fase 38.f (D8 strengthening) — `axon-T807`. A declared column
159    /// schema and the live introspected columns disagree at deploy
160    /// time. Carries a human-readable drift summary (which columns
161    /// are missing on the live DB, which have a type mismatch). The
162    /// remedy is named in the message: run `axon store introspect
163    /// <store>` to refresh the manifest, run the missing migration,
164    /// or fix the declaration.
165    DeclaredVsLiveDrift { store: String, drift: String },
166}
167
168impl fmt::Display for StoreError {
169    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
170        match self {
171            StoreError::EmptyConnection => write!(
172                f,
173                "axonstore `connection` is empty — expected a DSN or an \
174                 `env:VARNAME` reference"
175            ),
176            StoreError::EmptyEnvVarName => write!(
177                f,
178                "axonstore `connection` is the bare prefix `env:` with no \
179                 variable name"
180            ),
181            StoreError::MissingEnvVar { var } => write!(
182                f,
183                "axonstore `connection: \"env:{var}\"` — environment \
184                 variable `{var}` is not set (or not valid UTF-8)"
185            ),
186            StoreError::PoolInit { dsn_masked, source } => write!(
187                f,
188                "axonstore connection pool could not be initialised for \
189                 `{dsn_masked}`: {source}"
190            ),
191            StoreError::InvalidIdentifier { kind, name } => write!(
192                f,
193                "unsafe {kind} identifier `{name}` — must match \
194                 [A-Za-z_][A-Za-z0-9_]* and be ≤ 63 bytes"
195            ),
196            StoreError::EmptyData { op } => write!(
197                f,
198                "axonstore `{op}` was given no column data"
199            ),
200            StoreError::Filter(e) => write!(f, "where-expression: {e}"),
201            StoreError::Epistemic(e) => write!(f, "{e}"),
202            StoreError::Connect { source } => {
203                write!(f, "axonstore could not reach the database: {source}")
204            }
205            StoreError::Query { op, source } => {
206                write!(f, "axonstore `{op}` SQL failed: {source}")
207            }
208            StoreError::UnsupportedColumnType { column, pg_type } => write!(
209                f,
210                "column `{column}` has Postgres type `{pg_type}`, outside \
211                 the v1.30.0 supported catalog"
212            ),
213            StoreError::Decode { column, pg_type, source } => write!(
214                f,
215                "column `{column}` (`{pg_type}`) failed to decode: {source}"
216            ),
217            StoreError::TableNotResolved { table } => write!(
218                f,
219                "axonstore could not resolve table `{table}` to a \
220                 relation in any schema of the database — verify the \
221                 table exists in the target database (a deploy-time \
222                 migration is the usual remedy) and that the configured \
223                 credentials can SELECT from it; the introspection scans \
224                 `pg_catalog` independent of `search_path`, so the table \
225                 is genuinely absent on every schema this role can see"
226            ),
227            StoreError::AmbiguousTable { table, schemas } => write!(
228                f,
229                "axonstore table `{table}` is ambiguous — it exists in \
230                 {} schemas ({}) and the connection's `search_path` does \
231                 not disambiguate it; either narrow the role's \
232                 `search_path` so exactly one of the resolving schemas \
233                 is visible, or declare the target schema explicitly on \
234                 the `axonstore` (the Fase 38 `schema:` declaration, \
235                 incl. `schema: env:VAR` per-tenant)",
236                schemas.len(),
237                schemas.join(", "),
238            ),
239            StoreError::SchemaDrift { op, sqlstate, source } => write!(
240                f,
241                "axonstore `{op}` hit live schema drift (SQLSTATE \
242                 {sqlstate}) — the cached schema is stale: {source}"
243            ),
244            StoreError::MissingPerTenantSchemaEnv { store, var } => write!(
245                f,
246                "axon-T806 axonstore `{store}` declares `schema: env:{var}` \
247                 but environment variable `{var}` is not set at deploy \
248                 time. The per-tenant schema namespace is required to \
249                 resolve the store's column manifest entry. Either \
250                 export `{var}` with the SQL schema name (e.g. \
251                 `tenant_42`), or declare the schema differently \
252                 (inline `schema {{ … }}` block, or manifest reference \
253                 `schema: \"qualified.name\"`). Never a silent fallback."
254            ),
255            StoreError::DeclaredVsLiveDrift { store, drift } => write!(
256                f,
257                "axon-T807 axonstore `{store}` declared column schema \
258                 disagrees with the live database: {drift}. The deploy \
259                 fails fail-closed (D8 strengthening). Remedy: run `axon \
260                 store introspect {store}` to refresh the manifest, run \
261                 the missing migration on the database, or fix the \
262                 declared `schema:` block to match the live shape."
263            ),
264        }
265    }
266}
267
268impl std::error::Error for StoreError {
269    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
270        match self {
271            StoreError::Filter(e) => Some(e),
272            StoreError::Epistemic(e) => Some(e),
273            _ => None,
274        }
275    }
276}
277
278impl StoreError {
279    /// §Fase 37.x.f (D9) — `true` iff this is a schema-drift failure
280    /// ([`StoreError::SchemaDrift`]) — the signal that triggers the
281    /// `(dsn, table)` cache self-heal (evict + retry once).
282    pub fn is_schema_drift(&self) -> bool {
283        matches!(self, StoreError::SchemaDrift { .. })
284    }
285}
286
287impl From<FilterError> for StoreError {
288    fn from(e: FilterError) -> Self {
289        StoreError::Filter(e)
290    }
291}
292
293impl From<EpistemicError> for StoreError {
294    fn from(e: EpistemicError) -> Self {
295        StoreError::Epistemic(e)
296    }
297}
298
299// ════════════════════════════════════════════════════════════════════
300//  D6 — connection resolution
301// ════════════════════════════════════════════════════════════════════
302
303/// Resolve an `axonstore` `connection` string into a concrete DSN.
304///
305/// - `"env:VAR"` → the value of environment variable `VAR`.
306/// - any other non-empty value → a literal DSN, returned verbatim.
307///
308/// Leading/trailing whitespace is trimmed. An empty connection, a bare
309/// `env:`, or a missing environment variable is a typed [`StoreError`]
310/// — never a panic, never a silent fallback.
311pub fn resolve_dsn(connection: &str) -> Result<String, StoreError> {
312    let conn = connection.trim();
313    if conn.is_empty() {
314        return Err(StoreError::EmptyConnection);
315    }
316    match conn.strip_prefix("env:") {
317        Some(var) => {
318            let var = var.trim();
319            if var.is_empty() {
320                return Err(StoreError::EmptyEnvVarName);
321            }
322            std::env::var(var).map_err(|_| StoreError::MissingEnvVar {
323                var: var.to_string(),
324            })
325        }
326        None => Ok(conn.to_string()),
327    }
328}
329
330/// Mask the password segment of a DSN for safe logging / `Debug`.
331fn mask_dsn(dsn: &str) -> String {
332    if let Some(at) = dsn.find('@') {
333        if let Some(colon) = dsn[..at].rfind(':') {
334            return format!("{}***{}", &dsn[..=colon], &dsn[at..]);
335        }
336    }
337    dsn.to_string()
338}
339
340/// §Fase 38.h — public alias of [`mask_dsn`] so the introspection
341/// CLI (`store::introspect_cli`) can render error messages with
342/// masked credentials without re-implementing the routine.
343pub fn mask_dsn_pub(dsn: &str) -> String {
344    mask_dsn(dsn)
345}
346
347/// The `application_name` stamped on an axonstore's Postgres
348/// connections (Gap 3 bonus, v1.36.3).
349///
350/// `axon-store/<store_name>` makes every session attributable to its
351/// `axonstore` declaration in `pg_stat_activity`, pooler logs and DBA
352/// dashboards; a bare `axon-store` when no store name is available.
353///
354/// Total and bounded: Postgres caps `application_name` at
355/// `NAMEDATALEN - 1` (63 bytes) and *silently truncates* a longer
356/// value. This caps it ourselves on a UTF-8 char boundary so the
357/// stamped name is exactly what a DBA sees — never a server-mangled
358/// suffix.
359fn application_name_for(store_name: &str) -> String {
360    application_name_for_with_namespace(store_name, None)
361}
362
363/// §Fase 38.f (D3) — `application_name` stamping that optionally
364/// carries a resolved per-tenant schema namespace (Gap-3 inheritance):
365///
366///   * `application_name_for_with_namespace("claims", None)` →
367///     `"axon-store/claims"` (the existing v1.36.3 shape — preserved
368///     byte-for-byte for non-namespace stores).
369///   * `application_name_for_with_namespace("claims", Some("tenant_42"))`
370///     → `"axon-store/claims/tenant_42"`.
371///
372/// A DBA reading `pg_stat_activity` or pooler logs sees both the
373/// `axonstore` declaration AND the tenant namespace at a glance —
374/// triaging a multi-tenant slow query stops requiring a join through
375/// adopter telemetry.
376///
377/// Total + bounded: caps the result at `NAMEDATALEN - 1` (63 bytes)
378/// on a UTF-8 char boundary, as v1.36.3 already does, so the stamped
379/// name is exactly what Postgres records.
380pub(crate) fn application_name_for_with_namespace(
381    store_name: &str,
382    namespace: Option<&str>,
383) -> String {
384    const MAX: usize = 63;
385    let base = if store_name.is_empty() {
386        "axon-store".to_string()
387    } else {
388        format!("axon-store/{store_name}")
389    };
390    let full = match namespace {
391        Some(ns) if !ns.is_empty() => format!("{base}/{ns}"),
392        _ => base,
393    };
394    if full.len() <= MAX {
395        return full;
396    }
397    let mut cut = MAX;
398    while cut > 0 && !full.is_char_boundary(cut) {
399        cut -= 1;
400    }
401    full[..cut].to_string()
402}
403
404/// Validate a table / column identifier, mapping a failure to a typed
405/// [`StoreError::InvalidIdentifier`] (D4).
406fn check_identifier(name: &str, kind: &'static str) -> Result<(), StoreError> {
407    if filter::is_safe_identifier(name) {
408        Ok(())
409    } else {
410        Err(StoreError::InvalidIdentifier {
411            kind,
412            name: name.to_string(),
413        })
414    }
415}
416
417/// §Fase 37.x.c (D2) — render the SCHEMA-QUALIFIED relation reference
418/// for an operation's SQL: `"schema"."table"` when the schema resolved
419/// to a safe identifier, the bare `"table"` otherwise.
420///
421/// A schema-qualified reference resolves on ANY session regardless of
422/// the ambient `search_path` — the D2 guarantee. The schema name is
423/// discovered from `pg_catalog` (37.x.b's `resolve_table`); it is
424/// validated with [`filter::is_safe_identifier`] before being
425/// double-quoted (D4 — no untrusted identifier reaches SQL), exactly
426/// as the table name is. When the schema is absent (`None` — the
427/// resolution failed) or is not a safe identifier (an exotic quoted
428/// schema name `pg_catalog` could yield), the reference falls back to
429/// the bare `"table"` — never an unsafe splice, never a false error;
430/// `search_path` then resolves it as in the pre-37.x behaviour. The
431/// `table` is assumed already [`check_identifier`]-validated.
432fn qualified_relation(schema: Option<&str>, table: &str) -> String {
433    match schema {
434        Some(s) if filter::is_safe_identifier(s) => {
435            format!("\"{s}\".\"{table}\"")
436        }
437        _ => format!("\"{table}\""),
438    }
439}
440
441// ════════════════════════════════════════════════════════════════════
442//  Pure SQL builders (no I/O — exhaustively unit-tested)
443// ════════════════════════════════════════════════════════════════════
444
445/// Build a parameterized `SELECT * FROM "schema"."table" WHERE …`
446/// statement.
447///
448/// §Fase 37.x.c (D2) — `schema` is the table's resolved schema (from
449/// [`introspect_conn`]); when `Some` and a safe
450/// identifier the relation is emitted SCHEMA-QUALIFIED so it resolves
451/// on any session regardless of the ambient `search_path`. `None`
452/// renders the bare `"table"` (the pre-37.x form — D5).
453/// §Fase 37.d (D3) — `bindings` resolves `${name}` placeholders in the
454/// `where` expression to `$N` bind parameters (never string-spliced).
455/// §v1.36.4 — `column_types` (the `column → udt_name` map) lets
456/// [`build_pg_where`] cast each `where`-clause value to its column's
457/// Postgres type. Pass an empty map when the schema is unknown — the
458/// filter then renders bare `$N` placeholders.
459pub fn build_select_sql(
460    table: &str,
461    schema: Option<&str>,
462    where_expr: &str,
463    bindings: &std::collections::HashMap<String, String>,
464    column_types: &std::collections::HashMap<String, String>,
465) -> Result<(String, Vec<SqlValue>), StoreError> {
466    check_identifier(table, "table")?;
467    let (clause, params) = build_pg_where(where_expr, 0, bindings, column_types)?;
468    let relation = qualified_relation(schema, table);
469    Ok((format!("SELECT * FROM {relation} WHERE {clause}"), params))
470}
471
472/// Build a parameterized `DELETE FROM "schema"."table" WHERE …`
473/// statement.
474///
475/// §Fase 37.x.c (D2) — `schema` schema-qualifies the relation (see
476/// [`build_select_sql`]). §v1.36.4 — `column_types` drives the
477/// `where`-clause value cast.
478pub fn build_delete_sql(
479    table: &str,
480    schema: Option<&str>,
481    where_expr: &str,
482    bindings: &std::collections::HashMap<String, String>,
483    column_types: &std::collections::HashMap<String, String>,
484) -> Result<(String, Vec<SqlValue>), StoreError> {
485    check_identifier(table, "table")?;
486    let (clause, params) = build_pg_where(where_expr, 0, bindings, column_types)?;
487    let relation = qualified_relation(schema, table);
488    Ok((format!("DELETE FROM {relation} WHERE {clause}"), params))
489}
490
491/// §Fase 37.x.b (D1) — a store table resolved against `pg_catalog`,
492/// independent of the ambient `search_path`. The product of
493/// [`introspect_conn`].
494#[derive(Debug, Clone)]
495pub(crate) struct ResolvedTable {
496    /// The schema the table resolves to (e.g. `public`). §37.x.c (D2)
497    /// emits the schema-qualified `"schema"."table"` so an operation
498    /// stops depending on the connection's `search_path`.
499    pub schema: String,
500    /// The `column → udt_name` map driving the `$N::<type>` cast on
501    /// both the write side (`build_insert_sql` / `build_update_sql`)
502    /// and the read side (`build_pg_where`).
503    pub column_types: std::collections::HashMap<String, String>,
504}
505
506/// §Fase 37.x.f (D9) — capacity bound on the schema cache. A many-
507/// table / many-DSN / multi-tenant adopter cannot grow it unbounded; at
508/// the bound the OLDEST entry (smallest insertion sequence) is evicted.
509/// 10k matches the idempotency / replay store bound.
510const SCHEMA_CACHE_CAPACITY: usize = 10_000;
511
512/// §Fase 37.x.f (D9) — the process-global schema cache:
513/// `(dsn, table) → ResolvedTable`, capacity-bounded + self-healing.
514///
515/// A table's schema + column types are stable for a process lifetime,
516/// so one resolution per `(connection, table)` suffices — but the table
517/// CAN drift (a live `ALTER TABLE`). D9 makes the cache self-heal: an
518/// operation that fails with a schema-drift SQLSTATE evicts the
519/// `(dsn, table)` entry ([`PostgresStoreBackend::evict_schema`]) and is
520/// retried once against fresh introspection. The cache is also
521/// capacity-bounded ([`SCHEMA_CACHE_CAPACITY`]) so it cannot grow
522/// without limit. Only a successfully-resolved, non-empty entry is
523/// cached (the §v1.36.5 don't-cache-failures rule, preserved).
524struct SchemaCache {
525    /// `(dsn, table)` → the resolution + its insertion sequence.
526    entries: std::collections::HashMap<
527        (String, String),
528        (std::sync::Arc<ResolvedTable>, u64),
529    >,
530    /// Monotonic insertion counter — drives oldest-first eviction.
531    next_seq: u64,
532    /// The capacity bound. A field (not a hard-coded constant) so the
533    /// eviction logic is unit-testable with a small bound.
534    capacity: usize,
535}
536
537impl SchemaCache {
538    fn new(capacity: usize) -> Self {
539        Self {
540            entries: std::collections::HashMap::new(),
541            next_seq: 0,
542            capacity,
543        }
544    }
545
546    /// The cached resolution for `key`, or `None` on a miss.
547    fn get(
548        &self,
549        key: &(String, String),
550    ) -> Option<std::sync::Arc<ResolvedTable>> {
551        self.entries.get(key).map(|(arc, _)| std::sync::Arc::clone(arc))
552    }
553
554    /// Insert (or refresh) a resolution. §D9 — at capacity the oldest
555    /// entry (smallest sequence) is evicted first; a linear scan,
556    /// acceptable at the 10k bound (the idempotency store's approach).
557    fn insert(
558        &mut self,
559        key: (String, String),
560        resolved: std::sync::Arc<ResolvedTable>,
561    ) {
562        if self.entries.len() >= self.capacity
563            && !self.entries.contains_key(&key)
564        {
565            // Linear scan for the smallest insertion sequence.
566            let oldest = self
567                .entries
568                .iter()
569                .min_by_key(|item| (item.1).1)
570                .map(|item| item.0.clone());
571            if let Some(oldest) = oldest {
572                self.entries.remove(&oldest);
573            }
574        }
575        let seq = self.next_seq;
576        self.next_seq = self.next_seq.wrapping_add(1);
577        self.entries.insert(key, (resolved, seq));
578    }
579
580    /// §D9 — drop `key` so the next operation re-introspects.
581    fn evict(&mut self, key: &(String, String)) {
582        self.entries.remove(key);
583    }
584}
585
586static SCHEMA_CACHE: std::sync::LazyLock<std::sync::Mutex<SchemaCache>> =
587    std::sync::LazyLock::new(|| {
588        std::sync::Mutex::new(SchemaCache::new(SCHEMA_CACHE_CAPACITY))
589    });
590
591/// §Fase 37.x.b (D1) — the pure resolution core: group a flat
592/// `(schema, column, udt)` introspection result by schema and decide.
593///
594/// - 0 schemas → [`StoreError::TableNotResolved`].
595/// - exactly 1 schema → `Ok((schema, column → udt map))`.
596/// - 2+ schemas → [`StoreError::AmbiguousTable`] (the schemas sorted).
597///
598/// Pure + total — exhaustively unit-tested without a database. Both the
599/// search-path-correct primary resolution and the search-path-
600/// independent `pg_catalog` fallback feed their rows through this one
601/// function, so the resolution verdict is computed identically.
602/// `pub` so 37.x.i's property/fuzz pack can drive it across arbitrary
603/// schema topologies — same exposure rationale as [`build_pg_where`] /
604/// [`build_select_sql`] / [`classify_pg_type`] (pure totals worth
605/// exhaustive external test).
606pub fn resolve_from_rows(
607    table: &str,
608    rows: Vec<(String, String, String)>,
609) -> Result<(String, std::collections::HashMap<String, String>), StoreError> {
610    let mut by_schema: std::collections::BTreeMap<
611        String,
612        std::collections::HashMap<String, String>,
613    > = std::collections::BTreeMap::new();
614    for (schema, column, udt) in rows {
615        by_schema.entry(schema).or_default().insert(column, udt);
616    }
617    match by_schema.len() {
618        0 => Err(StoreError::TableNotResolved {
619            table: table.to_string(),
620        }),
621        // A `BTreeMap` of length 1 — `into_iter().next()` is total.
622        1 => Ok(by_schema.into_iter().next().unwrap()),
623        // `BTreeMap` keys iterate sorted — a deterministic schema list.
624        _ => Err(StoreError::AmbiguousTable {
625            table: table.to_string(),
626            schemas: by_schema.into_keys().collect(),
627        }),
628    }
629}
630
631/// §Fase 37.x.b — decode a `pg_catalog` introspection result into the
632/// flat `(schema, column, udt)` triples [`resolve_from_rows`] groups. A
633/// row missing any field is skipped (defensive — the resolution
634/// queries always project all three).
635fn collect_triples(rows: &[PgRow]) -> Vec<(String, String, String)> {
636    let mut out = Vec::with_capacity(rows.len());
637    for row in rows {
638        let schema: String = row.try_get("schema_name").unwrap_or_default();
639        let column: String = row.try_get("column_name").unwrap_or_default();
640        let udt: String = row.try_get("type_name").unwrap_or_default();
641        if !schema.is_empty() && !column.is_empty() && !udt.is_empty() {
642            out.push((schema, column, udt));
643        }
644    }
645    out
646}
647
648/// §Fase 37.x.f (D9) — `true` iff `code` is a schema-drift SQLSTATE: a
649/// store SQL statement that fails with one has hit a STALE cache.
650///
651///  - `42P01` undefined_table — the table was dropped / renamed / had
652///    its schema changed since the resolution was cached.
653///  - `42703` undefined_column — a column was dropped / renamed.
654///  - `42804` datatype_mismatch — a stale WRITE cast (`$N::<old>` into
655///    a column whose type changed).
656///  - `42883` undefined_function — a stale READ cast (`"col" = $N::<old>`
657///    whose operator no longer exists, e.g. `text = uuid`).
658///
659/// Every one is a PARSE / PLAN-time rejection — the statement never
660/// executed, so the failed operation had ZERO side effects and the D9
661/// retry cannot double-write. `pub` so 37.x.i's property/fuzz pack
662/// drives it across all ASCII inputs (the closed-set membership test
663/// must be total + never panic).
664pub fn is_schema_drift_sqlstate(code: &str) -> bool {
665    matches!(code, "42P01" | "42703" | "42804" | "42883")
666}
667
668/// §Fase 37.x.f (D9) — classify a failed store SQL statement: a
669/// schema-drift SQLSTATE ([`is_schema_drift_sqlstate`]) yields
670/// [`StoreError::SchemaDrift`] (which triggers the cache self-heal);
671/// anything else is a plain [`StoreError::Query`]. `pub(crate)` so the
672/// `row_stream` cursor maps its errors through the same classifier.
673pub(crate) fn classify_sql_error(
674    op: &'static str,
675    err: sqlx::Error,
676) -> StoreError {
677    let sqlstate = err
678        .as_database_error()
679        .and_then(|db| db.code())
680        .map(|c| c.into_owned());
681    match sqlstate {
682        Some(code) if is_schema_drift_sqlstate(&code) => {
683            StoreError::SchemaDrift {
684                op,
685                sqlstate: code,
686                source: err.to_string(),
687            }
688        }
689        _ => StoreError::Query { op, source: err.to_string() },
690    }
691}
692
693/// §Fase 37.x.b/d (D1/D3) — the two-stage `pg_catalog` table resolution
694/// run on a CALLER-PROVIDED connection, so it shares the operation's
695/// transaction (D3 — one coherent introspect-and-operate session).
696///
697///  1. **Primary — search-path-correct.** `to_regclass($1)` (the
698///     double-quoted table name) resolves the table exactly as an
699///     unqualified `SELECT * FROM "table"` would; the same query
700///     introspects its columns.
701///  2. **Fallback — search-path-INDEPENDENT.** When `to_regclass`
702///     yields NULL (the table is not on this session's `search_path`),
703///     a `pg_class` + `pg_namespace` scan keyed on `relname` finds the
704///     table in ANY user schema (system schemas excluded; only real
705///     relations — `relkind` table / view / matview / partitioned /
706///     foreign).
707///
708/// Exactly one schema resolves the table; zero is
709/// [`StoreError::TableNotResolved`], two or more is
710/// [`StoreError::AmbiguousTable`]. `pub(crate)` so `row_stream`'s
711/// cursor drain runs it inside the cursor's own transaction.
712pub(crate) async fn introspect_conn(
713    conn: &mut PgConnection,
714    table: &str,
715) -> Result<std::sync::Arc<ResolvedTable>, StoreError> {
716    // — Stage 1: primary, search-path-correct via `to_regclass`. —
717    // §Fase 38.x.a (D1) — `.persistent(false)` issues an UNNAMED PARSE
718    // (empty name `""`), which Postgres auto-discards/replaces on the
719    // next unnamed PARSE — structurally collision-free behind every
720    // transaction-mode pooler. Setting `statement_cache_capacity(0)`
721    // on the pool's `PgConnectOptions` is necessary but NOT sufficient;
722    // sqlx's named PARSE protocol (`sqlx_s_N`) leaks across logical
723    // sessions when the physical conn behind the pooler is reused.
724    let primary = sqlx::query(
725        "SELECT n.nspname AS schema_name, a.attname AS column_name, \
726         t.typname AS type_name \
727         FROM pg_catalog.pg_class c \
728         JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace \
729         JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid \
730         JOIN pg_catalog.pg_type t ON t.oid = a.atttypid \
731         WHERE c.oid = to_regclass($1) \
732           AND a.attnum > 0 AND NOT a.attisdropped",
733    )
734    .persistent(false)
735    .bind(format!("\"{table}\""))
736    .fetch_all(&mut *conn)
737    .await
738    .map_err(|e| StoreError::Query {
739        op: "resolve",
740        source: e.to_string(),
741    })?;
742
743    let resolution: Result<(String, std::collections::HashMap<String, String>), StoreError> = {
744        let primary_rows = collect_triples(&primary);
745        if !primary_rows.is_empty() {
746            // `to_regclass` resolved — one relation, one schema.
747            resolve_from_rows(table, primary_rows)
748        } else {
749            // — Stage 2: fallback, search-path-INDEPENDENT scan. —
750            let scan = sqlx::query(
751                "SELECT n.nspname AS schema_name, \
752                 a.attname AS column_name, t.typname AS type_name \
753                 FROM pg_catalog.pg_class c \
754                 JOIN pg_catalog.pg_namespace n \
755                   ON n.oid = c.relnamespace \
756                 JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid \
757                 JOIN pg_catalog.pg_type t ON t.oid = a.atttypid \
758                 WHERE c.relname = $1 \
759                   AND c.relkind IN ('r', 'v', 'm', 'p', 'f') \
760                   AND left(n.nspname, 3) <> 'pg_' \
761                   AND n.nspname <> 'information_schema' \
762                   AND a.attnum > 0 AND NOT a.attisdropped",
763            )
764            .persistent(false)
765            .bind(table)
766            .fetch_all(&mut *conn)
767            .await
768            .map_err(|e| StoreError::Query {
769                op: "resolve",
770                source: e.to_string(),
771            })?;
772            resolve_from_rows(table, collect_triples(&scan))
773        }
774    };
775    // §Fase 37.x.h (D6) — every resolution failure logs as a structured
776    // `tracing::error!` so an adopter's operator can SEE it in production
777    // logs / journald, not only in the propagated `StoreError`. The
778    // Display hint (the `TableNotResolved` / `AmbiguousTable` arms above)
779    // is the actionable line; the structured fields here are the index
780    // for log search.
781    match resolution {
782        Ok((schema, column_types)) => {
783            Ok(std::sync::Arc::new(ResolvedTable { schema, column_types }))
784        }
785        Err(err) => {
786            match &err {
787                StoreError::TableNotResolved { table } => {
788                    tracing::error!(
789                        target: "axon::store::resolve",
790                        store_table = %table,
791                        kind = "table_not_resolved",
792                        d_letter = "D6",
793                        "axonstore could not resolve `{table}` on any \
794                         schema visible to this role — see StoreError \
795                         Display for the actionable remedy"
796                    );
797                }
798                StoreError::AmbiguousTable { table, schemas } => {
799                    tracing::error!(
800                        target: "axon::store::resolve",
801                        store_table = %table,
802                        kind = "ambiguous_table",
803                        schemas = %schemas.join(","),
804                        d_letter = "D6",
805                        "axonstore `{table}` resolved in {n} schemas — \
806                         declare the target schema or narrow \
807                         `search_path`",
808                        n = schemas.len(),
809                    );
810                }
811                other => {
812                    tracing::error!(
813                        target: "axon::store::resolve",
814                        store_table = %table,
815                        kind = "resolve_failed",
816                        d_letter = "D6",
817                        "axonstore resolution of `{table}` failed: \
818                         {other}"
819                    );
820                }
821            }
822            Err(err)
823        }
824    }
825}
826
827/// §v1.36.2 — the `::<type>` cast suffix for a `$N` value placeholder.
828///
829/// axon's runtime carries no column schema (D12), so a `text`-bound
830/// value cannot reach a `uuid` / `int` / `timestamptz` column: Postgres
831/// has no cross-type operator. The cure is to cast the VALUE to the
832/// column's type — `$N::uuid` is a valid explicit cast over the bound
833/// parameter (`'83d0…'::uuid` parses the text). v1.36.2 applies it to
834/// every WRITE placeholder (`INSERT` values, `UPDATE … SET`); §v1.36.4
835/// applies the identical cure to the read side via [`build_pg_where`]
836/// (`"col" {op} $N::<type>`). The column's Postgres type name comes
837/// from a cached `to_regclass` + `pg_catalog` introspection
838/// ([`introspect_conn`]).
839///
840/// Empty when the column type is unknown (introspection missed the
841/// column, or ran against a table outside `current_schema()`) or the
842/// type name is not a safe identifier — the builder then emits a bare
843/// `$N`: a `text` column still works, a typed column fails LOUDLY (no
844/// regression, no silent-wrong write).
845fn write_cast(
846    column_types: &std::collections::HashMap<String, String>,
847    column: &str,
848) -> String {
849    match column_types.get(column) {
850        Some(udt) if filter::is_safe_identifier(udt) => format!("::{udt}"),
851        _ => String::new(),
852    }
853}
854
855/// Build a parameterized `INSERT INTO "table" (…) VALUES (…)`.
856///
857/// A `NULL` data value renders as the inline `NULL` keyword (a fixed
858/// SQL token, injection-safe) and consumes no `$N` placeholder — the
859/// same discipline 35.b applies to `NULL` in a `where` clause. Postgres
860/// infers the column type for an inline `NULL`.
861///
862/// §v1.36.2 — each `$N` value placeholder is cast to its column's
863/// introspected type (`column_types`) so a `text`-bound value writes
864/// into a `uuid` / `int` / `timestamptz` column. An empty
865/// `column_types` map emits bare `$N` (the pre-1.36.2 behaviour).
866/// §Fase 37.x.c (D2) — `schema` schema-qualifies the relation
867/// (`INSERT INTO "schema"."table"`); `None` renders the bare `"table"`.
868pub fn build_insert_sql(
869    table: &str,
870    schema: Option<&str>,
871    data: &[(String, SqlValue)],
872    column_types: &std::collections::HashMap<String, String>,
873) -> Result<(String, Vec<SqlValue>), StoreError> {
874    check_identifier(table, "table")?;
875    if data.is_empty() {
876        return Err(StoreError::EmptyData { op: "insert" });
877    }
878
879    let mut columns: Vec<String> = Vec::with_capacity(data.len());
880    let mut value_frags: Vec<String> = Vec::with_capacity(data.len());
881    let mut params: Vec<SqlValue> = Vec::new();
882    let mut idx = 1usize;
883
884    for (col, val) in data {
885        check_identifier(col, "column")?;
886        columns.push(format!("\"{col}\""));
887        match val {
888            SqlValue::Null => value_frags.push("NULL".to_string()),
889            bound => {
890                value_frags.push(format!("${idx}{}", write_cast(column_types, col)));
891                params.push(bound.clone());
892                idx += 1;
893            }
894        }
895    }
896
897    let sql = format!(
898        "INSERT INTO {} ({}) VALUES ({})",
899        qualified_relation(schema, table),
900        columns.join(", "),
901        value_frags.join(", "),
902    );
903    Ok((sql, params))
904}
905
906/// Build a parameterized `UPDATE "table" SET … WHERE …`.
907///
908/// The `WHERE` placeholders continue the numbering after the `SET`
909/// placeholders **actually emitted** — not after the column count.
910/// Because a `NULL` `SET` value renders inline (no placeholder), the
911/// offset is the count of non-`NULL` `SET` values. (The frozen Python
912/// reference offsets by column count and so mis-numbers the moment a
913/// `SET` value is `NULL`.)
914///
915/// §v1.36.2 — each `SET` value placeholder is cast to its column's
916/// introspected type (`column_types`), the same `$N::<type>` cure
917/// `build_insert_sql` applies, so a `text`-bound value writes into a
918/// non-`text` column. §v1.36.4 — the same `column_types` map is now
919/// threaded into the `WHERE` side too, so a `where`-clause value is
920/// cast to its column's type (`"col" {op} $N::<type>`).
921/// §Fase 37.x.c (D2) — `schema` schema-qualifies the relation
922/// (`UPDATE "schema"."table"`); `None` renders the bare `"table"`.
923pub fn build_update_sql(
924    table: &str,
925    schema: Option<&str>,
926    where_expr: &str,
927    data: &[(String, SqlValue)],
928    bindings: &std::collections::HashMap<String, String>,
929    column_types: &std::collections::HashMap<String, String>,
930) -> Result<(String, Vec<SqlValue>), StoreError> {
931    check_identifier(table, "table")?;
932    if data.is_empty() {
933        return Err(StoreError::EmptyData { op: "mutate" });
934    }
935
936    let mut set_frags: Vec<String> = Vec::with_capacity(data.len());
937    let mut params: Vec<SqlValue> = Vec::new();
938    let mut idx = 1usize;
939
940    for (col, val) in data {
941        check_identifier(col, "column")?;
942        match val {
943            SqlValue::Null => set_frags.push(format!("\"{col}\" = NULL")),
944            bound => {
945                set_frags.push(format!(
946                    "\"{col}\" = ${idx}{}",
947                    write_cast(column_types, col)
948                ));
949                params.push(bound.clone());
950                idx += 1;
951            }
952        }
953    }
954
955    // `idx - 1` SET placeholders were emitted; WHERE continues there.
956    let set_param_count = idx - 1;
957    let (clause, where_params) =
958        build_pg_where(where_expr, set_param_count, bindings, column_types)?;
959    params.extend(where_params);
960
961    let sql = format!(
962        "UPDATE {} SET {} WHERE {clause}",
963        qualified_relation(schema, table),
964        set_frags.join(", "),
965    );
966    Ok((sql, params))
967}
968
969// ════════════════════════════════════════════════════════════════════
970//  Column-type catalog (D12 honest scope) + row → JSON mapping
971// ════════════════════════════════════════════════════════════════════
972
973/// The supported Postgres column-type classes. A column whose type is
974/// outside this closed catalog is a [`StoreError::UnsupportedColumnType`]
975/// — an honest, documented boundary rather than a silent miss.
976#[derive(Debug, Clone, Copy, PartialEq, Eq)]
977pub enum PgTypeClass {
978    /// `BOOL`
979    Bool,
980    /// `INT2` (smallint)
981    Int2,
982    /// `INT4` (integer)
983    Int4,
984    /// `INT8` (bigint)
985    Int8,
986    /// `FLOAT4` (real)
987    Float4,
988    /// `FLOAT8` (double precision)
989    Float8,
990    /// `NUMERIC` / `DECIMAL` — JSON-encoded as a string (precision-safe)
991    Numeric,
992    /// `TEXT` / `VARCHAR` / `BPCHAR` / `NAME`
993    Text,
994    /// `UUID` — JSON-encoded as a hyphenated string
995    Uuid,
996    /// `TIMESTAMPTZ` — JSON-encoded as an RFC 3339 string
997    TimestampTz,
998    /// `TIMESTAMP` — JSON-encoded as an ISO 8601 (no-zone) string
999    Timestamp,
1000    /// `DATE` — JSON-encoded as a `YYYY-MM-DD` string
1001    Date,
1002    /// `TIME` — JSON-encoded as a `HH:MM:SS` string
1003    Time,
1004    /// `JSON` / `JSONB` — passed through as the JSON value
1005    Json,
1006    /// `BYTEA` — JSON-encoded as a base64 string
1007    Bytea,
1008}
1009
1010/// Classify a Postgres type name into a [`PgTypeClass`], or `None` if
1011/// the type is outside the v1.30.0 supported catalog. Pure + total.
1012pub fn classify_pg_type(pg_type: &str) -> Option<PgTypeClass> {
1013    Some(match pg_type.to_ascii_uppercase().as_str() {
1014        "BOOL" => PgTypeClass::Bool,
1015        "INT2" => PgTypeClass::Int2,
1016        "INT4" => PgTypeClass::Int4,
1017        "INT8" => PgTypeClass::Int8,
1018        "FLOAT4" => PgTypeClass::Float4,
1019        "FLOAT8" => PgTypeClass::Float8,
1020        "NUMERIC" => PgTypeClass::Numeric,
1021        "TEXT" | "VARCHAR" | "BPCHAR" | "NAME" => PgTypeClass::Text,
1022        "UUID" => PgTypeClass::Uuid,
1023        "TIMESTAMPTZ" => PgTypeClass::TimestampTz,
1024        "TIMESTAMP" => PgTypeClass::Timestamp,
1025        "DATE" => PgTypeClass::Date,
1026        "TIME" => PgTypeClass::Time,
1027        "JSON" | "JSONB" => PgTypeClass::Json,
1028        "BYTEA" => PgTypeClass::Bytea,
1029        _ => return None,
1030    })
1031}
1032
1033/// A single retrieved row, as JSON-safe column → value pairs in column
1034/// order. Every value is `serde_json`-representable — UUID, TIMESTAMPTZ
1035/// and NUMERIC are pre-mapped to strings, so an adopter never has to
1036/// monkey-patch a JSON encoder (the kivi-reported Python pain).
1037#[derive(Debug, Clone, PartialEq)]
1038pub struct StoreRow {
1039    /// Column name → JSON value, in `SELECT` column order.
1040    pub columns: Vec<(String, JsonValue)>,
1041}
1042
1043impl StoreRow {
1044    /// Look up a column's value by name.
1045    pub fn get(&self, column: &str) -> Option<&JsonValue> {
1046        self.columns
1047            .iter()
1048            .find(|(name, _)| name == column)
1049            .map(|(_, value)| value)
1050    }
1051
1052    /// Render the row as a JSON object.
1053    pub fn to_json(&self) -> JsonValue {
1054        JsonValue::Object(self.columns.iter().cloned().collect())
1055    }
1056}
1057
1058/// Decode one column of a `PgRow` into a JSON-safe value.
1059fn pg_value_to_json(
1060    row: &PgRow,
1061    idx: usize,
1062    column: &str,
1063    pg_type: &str,
1064) -> Result<JsonValue, StoreError> {
1065    let class = classify_pg_type(pg_type).ok_or_else(|| {
1066        StoreError::UnsupportedColumnType {
1067            column: column.to_string(),
1068            pg_type: pg_type.to_string(),
1069        }
1070    })?;
1071
1072    // Each branch decodes as `Option<T>` so a SQL `NULL` maps to
1073    // `JsonValue::Null` rather than failing the decode.
1074    macro_rules! decode {
1075        ($t:ty, $conv:expr) => {{
1076            let opt: Option<$t> = row.try_get(idx).map_err(|e| {
1077                StoreError::Decode {
1078                    column: column.to_string(),
1079                    pg_type: pg_type.to_string(),
1080                    source: e.to_string(),
1081                }
1082            })?;
1083            match opt {
1084                None => JsonValue::Null,
1085                Some(v) => $conv(v),
1086            }
1087        }};
1088    }
1089
1090    Ok(match class {
1091        PgTypeClass::Bool => decode!(bool, JsonValue::Bool),
1092        PgTypeClass::Int2 => decode!(i16, |v| JsonValue::from(v as i64)),
1093        PgTypeClass::Int4 => decode!(i32, |v| JsonValue::from(v as i64)),
1094        PgTypeClass::Int8 => decode!(i64, JsonValue::from),
1095        PgTypeClass::Float4 => decode!(f32, |v| JsonValue::from(v as f64)),
1096        PgTypeClass::Float8 => decode!(f64, JsonValue::from),
1097        PgTypeClass::Numeric => {
1098            decode!(sqlx::types::BigDecimal, |v: sqlx::types::BigDecimal| {
1099                JsonValue::String(v.to_string())
1100            })
1101        }
1102        PgTypeClass::Text => decode!(String, JsonValue::String),
1103        PgTypeClass::Uuid => {
1104            decode!(uuid::Uuid, |v: uuid::Uuid| JsonValue::String(
1105                v.hyphenated().to_string()
1106            ))
1107        }
1108        PgTypeClass::TimestampTz => {
1109            decode!(
1110                chrono::DateTime<chrono::Utc>,
1111                |v: chrono::DateTime<chrono::Utc>| JsonValue::String(
1112                    v.to_rfc3339()
1113                )
1114            )
1115        }
1116        PgTypeClass::Timestamp => {
1117            decode!(chrono::NaiveDateTime, |v: chrono::NaiveDateTime| {
1118                JsonValue::String(
1119                    v.format("%Y-%m-%dT%H:%M:%S%.f").to_string(),
1120                )
1121            })
1122        }
1123        PgTypeClass::Date => {
1124            decode!(chrono::NaiveDate, |v: chrono::NaiveDate| {
1125                JsonValue::String(v.to_string())
1126            })
1127        }
1128        PgTypeClass::Time => {
1129            decode!(chrono::NaiveTime, |v: chrono::NaiveTime| {
1130                JsonValue::String(v.to_string())
1131            })
1132        }
1133        PgTypeClass::Json => decode!(JsonValue, |v| v),
1134        PgTypeClass::Bytea => decode!(Vec<u8>, |v: Vec<u8>| {
1135            use base64::Engine;
1136            JsonValue::String(
1137                base64::engine::general_purpose::STANDARD.encode(v),
1138            )
1139        }),
1140    })
1141}
1142
1143/// Map a whole `PgRow` to a [`StoreRow`]. `pub(crate)` so 35.i's
1144/// `row_stream` cursor drain shares one row-decode path with `query`.
1145pub(crate) fn map_pg_row(row: &PgRow) -> Result<StoreRow, StoreError> {
1146    let mut columns = Vec::with_capacity(row.len());
1147    for (idx, col) in row.columns().iter().enumerate() {
1148        let name = col.name().to_string();
1149        let pg_type = col.type_info().name().to_string();
1150        let value = pg_value_to_json(row, idx, &name, &pg_type)?;
1151        columns.push((name, value));
1152    }
1153    Ok(StoreRow { columns })
1154}
1155
1156/// Bind one [`SqlValue`] onto a query. `NULL` is rendered inline by the
1157/// builders and so never reaches this function in practice; the `Null`
1158/// arm binds a typed NULL defensively to keep the function total.
1159/// `pub(crate)` so 35.i's `row_stream` binds cursor-query params
1160/// through the same path.
1161pub(crate) fn bind_value<'q>(
1162    q: Query<'q, Postgres, PgArguments>,
1163    value: &SqlValue,
1164) -> Query<'q, Postgres, PgArguments> {
1165    match value {
1166        SqlValue::Text(s) => q.bind(s.clone()),
1167        SqlValue::Integer(n) => q.bind(*n),
1168        SqlValue::Float(x) => q.bind(*x),
1169        SqlValue::Boolean(b) => q.bind(*b),
1170        SqlValue::Null => q.bind(Option::<String>::None),
1171    }
1172}
1173
1174// ════════════════════════════════════════════════════════════════════
1175//  PostgresStoreBackend
1176// ════════════════════════════════════════════════════════════════════
1177
1178/// A Postgres-backed `axonstore`. Holds one lazy, bounded `PgPool`.
1179/// Cheap to [`Clone`] (the pool is internally reference-counted).
1180#[derive(Clone)]
1181pub struct PostgresStoreBackend {
1182    /// The resolved DSN — masked whenever surfaced (`Debug`, errors).
1183    dsn: String,
1184    pool: PgPool,
1185}
1186
1187impl fmt::Debug for PostgresStoreBackend {
1188    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1189        // Never expose the DSN password through `Debug`.
1190        f.debug_struct("PostgresStoreBackend")
1191            .field("dsn", &mask_dsn(&self.dsn))
1192            .finish()
1193    }
1194}
1195
1196impl PostgresStoreBackend {
1197    /// Resolve `connection` and build a lazy, bounded connection pool.
1198    ///
1199    /// Equivalent to [`connect_named`](Self::connect_named) with no
1200    /// store name — the connection's `application_name` is the bare
1201    /// `axon-store`. Prefer `connect_named` so each session is
1202    /// attributable to its declaring `axonstore`.
1203    pub fn connect(connection: &str) -> Result<Self, StoreError> {
1204        Self::connect_named(connection, "")
1205    }
1206
1207    /// Resolve `connection` and build a lazy, bounded connection pool,
1208    /// stamping each connection's `application_name` with `store_name`.
1209    ///
1210    /// Synchronous and cheap: the DSN is parsed into a
1211    /// [`PgConnectOptions`] (a malformed DSN is a typed
1212    /// [`StoreError::PoolInit`]) but `connect_lazy_with` opens **no**
1213    /// connection — the first real connection is made on the first
1214    /// operation (D7 — lazy).
1215    ///
1216    /// Two production-grade properties are set on every connection:
1217    ///
1218    /// - **`statement_cache_capacity(0)`** (Gap 3) — disables sqlx's
1219    ///   named server-side prepared-statement cache so the backend is
1220    ///   safe behind a transaction-mode pooler (PgBouncer
1221    ///   `pool_mode=transaction`, Supabase Supavisor `:6543`, Neon, RDS
1222    ///   Proxy), where a cached name minted on one physical session
1223    ///   collides on the next (`prepared statement "sqlx_s_1" already
1224    ///   exists`). Applied unconditionally — harmless on a direct
1225    ///   connection, and there is no knob to misconfigure.
1226    /// - **`application_name`** — `axon-store/<store_name>` (bare
1227    ///   `axon-store` when `store_name` is empty), capped at the
1228    ///   Postgres 63-byte `NAMEDATALEN-1` limit on a char boundary, so
1229    ///   every axon-owned session is identifiable in `pg_stat_activity`,
1230    ///   pooler logs and DBA dashboards.
1231    ///
1232    /// Must be called within a Tokio runtime context: a well-formed DSN
1233    /// registers a background connection reaper. In production this is
1234    /// always satisfied — the registry (35.d) is built while the axum
1235    /// server's runtime is live.
1236    pub fn connect_named(
1237        connection: &str,
1238        store_name: &str,
1239    ) -> Result<Self, StoreError> {
1240        Self::connect_named_with_namespace(connection, store_name, None)
1241    }
1242
1243    /// §Fase 38.f (D3) — same as [`Self::connect_named`] but stamps an
1244    /// OPTIONAL per-tenant schema namespace into `application_name`.
1245    ///
1246    /// `connect_named_with_namespace("env:DB", "claims", Some("tenant_42"))`
1247    /// produces a pool whose every session's `application_name` reads
1248    /// `axon-store/claims/tenant_42` — so a DBA reading
1249    /// `pg_stat_activity`, pooler logs, or RDS Performance Insights
1250    /// sees both the `axonstore` declaration AND the resolved tenant.
1251    ///
1252    /// `None` for `namespace` is the pre-38 shape (`axon-store/<store>`,
1253    /// byte-identical to `connect_named`).
1254    pub fn connect_named_with_namespace(
1255        connection: &str,
1256        store_name: &str,
1257        namespace: Option<&str>,
1258    ) -> Result<Self, StoreError> {
1259        let dsn = resolve_dsn(connection)?;
1260        let opts = PgConnectOptions::from_str(&dsn)
1261            .map_err(|e| StoreError::PoolInit {
1262                dsn_masked: mask_dsn(&dsn),
1263                source: e.to_string(),
1264            })?
1265            .statement_cache_capacity(0)
1266            .application_name(&application_name_for_with_namespace(
1267                store_name, namespace,
1268            ));
1269        // §Fase 38.x.a (D2) — `DEALLOCATE ALL` on every released conn.
1270        //
1271        // This is the SECOND layer of pooler-coherent transaction safety,
1272        // composing with the per-query `.persistent(false)` (D1). If a
1273        // future code path accidentally omits `.persistent(false)`, the
1274        // named prepared statement it allocated would otherwise survive
1275        // on the physical Postgres conn across logical sessions through a
1276        // transaction-mode pooler (Supabase Supavisor `:6543`, PgBouncer
1277        // `pool_mode=transaction`, Neon, RDS Proxy). The next logical
1278        // session that lands on the same physical conn would collide on
1279        // `PARSE sqlx_s_N` → Postgres `42710` `duplicate_prepared_statement`.
1280        //
1281        // Running `DEALLOCATE ALL` on `after_release` wipes every prepared
1282        // statement (named + unnamed) from the physical conn BEFORE the
1283        // pooler returns it to its pool. Belt-and-suspenders: D1 prevents
1284        // the bug at the source; D2 catches anything that slips past.
1285        //
1286        // The cleanup query itself uses `.persistent(false)` — the
1287        // meta-invariant: even the cleanup is unnamed, so no prepared
1288        // statement can ever survive a connection release.
1289        let pool = PgPoolOptions::new()
1290            .max_connections(MAX_POOL_CONNECTIONS)
1291            .min_connections(0)
1292            .acquire_timeout(Duration::from_secs(ACQUIRE_TIMEOUT_SECS))
1293            .idle_timeout(Duration::from_secs(IDLE_TIMEOUT_SECS))
1294            .after_release(|conn, _meta| Box::pin(async move {
1295                // `DEALLOCATE ALL` clears every prepared statement —
1296                // named (`sqlx_s_N`) AND unnamed (`""`) — from the
1297                // physical Postgres connection. Cheap (<1ms typically).
1298                sqlx::query("DEALLOCATE ALL")
1299                    .persistent(false)
1300                    .execute(&mut *conn)
1301                    .await?;
1302                // `Ok(true)` keeps the conn alive in the pool for reuse.
1303                // We never drop a conn on `DEALLOCATE` failure because a
1304                // failure here means something more fundamental is wrong
1305                // (lost socket, server-side crash); the next acquire will
1306                // surface the real error.
1307                Ok(true)
1308            }))
1309            .connect_lazy_with(opts);
1310        Ok(Self { dsn, pool })
1311    }
1312
1313    /// The resolved DSN with its password masked — safe to log.
1314    pub fn masked_dsn(&self) -> String {
1315        mask_dsn(&self.dsn)
1316    }
1317
1318    /// The underlying pool — 35.i's `Stream<Row>` borrows it.
1319    pub fn pool(&self) -> &PgPool {
1320        &self.pool
1321    }
1322
1323    /// §Fase 37.x.j (D1) — Acquire ONE physical Postgres connection
1324    /// from the pool to be held for the duration of a flow execution
1325    /// ([`crate::runner::ExecContext`] for the sync path,
1326    /// [`crate::flow_dispatcher::DispatchCtx`] for the async streaming
1327    /// path).
1328    ///
1329    /// The returned [`sqlx::pool::PoolConnection`] is wrapped by the
1330    /// caller in [`crate::store::store_conn::StoreConn::Pinned`] and
1331    /// passed to every operation (`query` / `insert` / `mutate` /
1332    /// `purge` / `ping`) against this axonstore for the flow lifetime.
1333    /// Because every op runs against the same physical Postgres backend
1334    /// connection, a transaction-mode pooler (Supabase Supavisor,
1335    /// PgBouncer, Neon, RDS Proxy) cannot swap the backend between
1336    /// queries — the D3 "unnamed prepared statement does not exist"
1337    /// race that Fase 37.x.j closes.
1338    ///
1339    /// The connection is released back to the pool on `Drop` of the
1340    /// returned `PoolConnection`. The existing
1341    /// `after_release(DEALLOCATE ALL)` hook (Fase 38.x.a D2) wipes any
1342    /// prepared statements before the conn is reused — composing
1343    /// cleanly with the per-flow pinning of 37.x.j.
1344    ///
1345    /// Failure modes:
1346    ///   - `StoreError::Connect` if the pool's `acquire_timeout`
1347    ///     elapses (no conn becomes available — pool exhausted or
1348    ///     Postgres unreachable).
1349    ///   - `StoreError::Connect` if the pool is in a permanently-bad
1350    ///     state (TLS handshake failure, DNS resolution failure, etc.).
1351    pub async fn acquire_pin(
1352        &self,
1353    ) -> Result<sqlx::pool::PoolConnection<sqlx::Postgres>, StoreError> {
1354        self.pool
1355            .acquire()
1356            .await
1357            .map_err(|e| StoreError::Connect { source: e.to_string() })
1358    }
1359
1360    /// `retrieve` — run `SELECT * FROM "schema"."table" WHERE …` and map
1361    /// every row to a JSON-safe [`StoreRow`].
1362    ///
1363    /// §Fase 37.x.d (D3) — on a cache MISS the schema introspection and
1364    /// the `SELECT` execute inside ONE transaction, so a
1365    /// transaction-mode pooler pins one physical backend for both —
1366    /// they cannot split across sessions. A cache HIT needs no
1367    /// transaction: the cached resolution is already correct and the
1368    /// `SELECT` is schema-qualified, so it resolves on any session.
1369    ///
1370    /// v1.30.0 materializes the full result (`fetch_all`); 35.i adds the
1371    /// backpressured `Stream<Row>` variant (Pillar III).
1372    pub async fn query(
1373        &self,
1374        // §Fase 37.x.j (D1) — the connection source for this op.
1375        // `StoreConn::Pool(&self.pool)` for legacy callers (the
1376        // v1.38.5 and earlier behavior); `StoreConn::Pinned(conn)` for
1377        // 37.x.j flow-pinned execution where the caller acquired a
1378        // `PoolConnection` at flow start via `acquire_pin()`. Both
1379        // variants run the cache-HIT SELECT + cache-MISS introspect+
1380        // SELECT-in-tx paths identically; the pinned variant
1381        // additionally guarantees the same physical Postgres backend
1382        // services every op against this store for the flow lifetime.
1383        conn: &mut crate::store::store_conn::StoreConn<'_>,
1384        table: &str,
1385        where_expr: &str,
1386        bindings: &std::collections::HashMap<String, String>,
1387    ) -> Result<Vec<StoreRow>, StoreError> {
1388        // — cache HIT: operate with the cached resolution; no
1389        //   transaction. §37.x.f (D9) self-heals a stale cache. —
1390        if let Some(resolved) = self.cached_schema(table) {
1391            let (sql, params) = build_select_sql(
1392                table,
1393                Some(resolved.schema.as_str()),
1394                where_expr,
1395                bindings,
1396                &resolved.column_types,
1397            )?;
1398            // §Fase 38.x.a (D1) — see `introspect_conn` for the full rationale.
1399            let mut q = sqlx::query(&sql).persistent(false);
1400            for value in &params {
1401                q = bind_value(q, value);
1402            }
1403            // §Fase 37.x.j (D1) — dispatch through the StoreConn so
1404            // a pinned variant routes through the same physical conn
1405            // as every other op against this store for the flow.
1406            match conn.fetch_all(q).await {
1407                Ok(rows) => return rows.iter().map(map_pg_row).collect(),
1408                Err(e) => {
1409                    let err = classify_sql_error("retrieve", e);
1410                    if !err.is_schema_drift() {
1411                        return Err(err);
1412                    }
1413                    // §37.x.f (D9) — the cached schema is STALE; evict
1414                    // and fall through to the miss path: the single
1415                    // retry, with fresh introspection.
1416                    self.evict_schema(table);
1417                }
1418            }
1419        }
1420        // — cache MISS, or the §37.x.f (D9) self-heal retry: resolve +
1421        //   operate in ONE transaction (D3). —
1422        // §Fase 37.x.j (D1) — `conn.begin()` borrows the `StoreConn`
1423        // mutably for the transaction's lifetime; on the Pinned variant
1424        // the transaction runs on the same physical backend as the
1425        // cache-HIT attempt above (D3 invariant preserved).
1426        let mut tx = conn.begin().await.map_err(|e| {
1427            StoreError::Connect { source: e.to_string() }
1428        })?;
1429        // §Fase 37.x.j.11 (POST-CLOSE HOTFIX 2026-05-21) — ROLLBACK +
1430        // propagate the introspect error directly. Pre-hotfix the
1431        // code fell through to bare-table SQL with `(None, &no_types)`
1432        // inside the SAME (now poisoned) transaction. The cascade
1433        // error (`25P02 in_failed_sql_transaction` / `42703 column
1434        // does not exist`) was returned to the application layer,
1435        // masking the actual root cause from any caller that didn't
1436        // filter the `axon::store` tracing target.
1437        //
1438        // Honest scope cut: adopters whose introspect privileges
1439        // differ from query privileges (rare in practice — same DB
1440        // user) no longer get the fall-through. If real adopter
1441        // demand surfaces, a future fase can add an opt-in
1442        // `unsafe_skip_introspect` flag.
1443        let resolved = match introspect_conn(&mut tx, table).await {
1444            Ok(r) => r,
1445            Err(introspect_err) => {
1446                tracing::warn!(
1447                    target: "axon::store",
1448                    table = %table,
1449                    op = "introspect_in_tx",
1450                    error = %introspect_err,
1451                    d_letter = "37.x.j.11",
1452                    "store introspection failed; rolling back the \
1453                     transaction and returning the primary error \
1454                     directly. Pre-37.x.j.11 the runtime fell through \
1455                     to bare-table SQL inside the poisoned tx → \
1456                     cascade error masked the root cause."
1457                );
1458                let _ = tx.rollback().await;
1459                return Err(introspect_err);
1460            }
1461        };
1462        let (sql, params) = build_select_sql(
1463            table,
1464            Some(resolved.schema.as_str()),
1465            where_expr,
1466            bindings,
1467            &resolved.column_types,
1468        )?;
1469        // §Fase 38.x.a (D1) — `.persistent(false)` is mandatory inside the
1470        // `pool.begin()` transaction: the named PARSE protocol leaks across
1471        // logical sessions when the physical conn behind the pooler is
1472        // reused. See `introspect_conn` for the full rationale.
1473        let mut q = sqlx::query(&sql).persistent(false);
1474        for value in &params {
1475            q = bind_value(q, value);
1476        }
1477        let rows = q
1478            .fetch_all(&mut *tx)
1479            .await
1480            .map_err(|e| classify_sql_error("retrieve", e))?;
1481        tx.commit().await.map_err(|e| StoreError::Connect {
1482            source: e.to_string(),
1483        })?;
1484        self.cache_schema(table, resolved);
1485        rows.iter().map(map_pg_row).collect()
1486    }
1487
1488    /// §Fase 37.x.d (D3) — the cached `(schema, column_types)`
1489    /// resolution for `table`, or `None` on a cache miss. Pure — no
1490    /// I/O. A HIT lets an operation skip the transaction; a MISS makes
1491    /// the caller introspect ([`introspect_conn`]) inside the
1492    /// operation's own transaction, so a transaction-mode pooler pins
1493    /// one backend for resolution + operation.
1494    pub(crate) fn cached_schema(
1495        &self,
1496        table: &str,
1497    ) -> Option<std::sync::Arc<ResolvedTable>> {
1498        let key = (self.dsn.clone(), table.to_string());
1499        SCHEMA_CACHE.lock().unwrap().get(&key)
1500    }
1501
1502    /// §Fase 37.x.d (D3) — store a successful resolution in the
1503    /// process-global `(dsn, table)` cache.
1504    ///
1505    /// §v1.36.5 rule preserved — an EMPTY resolution is NEVER cached: a
1506    /// real relation always has at least one column, so an empty map is
1507    /// a transient failure that must be retried, never a poisoned
1508    /// entry. §Fase 37.x.f (D9) adds the bounded-LRU + drift eviction.
1509    pub(crate) fn cache_schema(
1510        &self,
1511        table: &str,
1512        resolved: std::sync::Arc<ResolvedTable>,
1513    ) {
1514        if !resolved.column_types.is_empty() {
1515            let key = (self.dsn.clone(), table.to_string());
1516            SCHEMA_CACHE.lock().unwrap().insert(key, resolved);
1517        }
1518    }
1519
1520    /// §Fase 37.x.f (D9) — evict `table`'s cached resolution so the
1521    /// next operation re-introspects. Called by the self-heal path when
1522    /// a store SQL statement fails with a schema-drift SQLSTATE — the
1523    /// live table has drifted from the cached schema. `pub(crate)` so
1524    /// the `row_stream` cursor drain shares the self-heal.
1525    pub(crate) fn evict_schema(&self, table: &str) {
1526        let key = (self.dsn.clone(), table.to_string());
1527        SCHEMA_CACHE.lock().unwrap().evict(&key);
1528        // §Fase 37.x.h (D6) — observability of the D9 self-heal. A live
1529        // `ALTER TABLE` is the expected trigger; a flood of these from
1530        // one `(masked_dsn, table)` means a misconfiguration (a migration
1531        // never finished, two services racing against the same table) and
1532        // an operator needs to SEE the eviction. The masked DSN gives the
1533        // physical-store context without ever leaking a credential.
1534        tracing::warn!(
1535            target: "axon::store::cache",
1536            store_table = %table,
1537            masked_dsn = %mask_dsn(&self.dsn),
1538            kind = "schema_drift_evict",
1539            d_letter = "D9",
1540            "axonstore evicted cached schema for `{table}` after a \
1541             schema-drift SQLSTATE — the next operation will \
1542             re-introspect against the live table"
1543        );
1544    }
1545
1546    /// §Fase 37.x.g (D8) — EAGERLY resolve + introspect `table` against
1547    /// the live database, populating the process-global schema cache.
1548    /// The deploy-time verification entry point: a resolution failure
1549    /// surfaces at DEPLOY, not at the first production request.
1550    ///
1551    /// A cache hit is a no-op `Ok`. Otherwise one connection is
1552    /// acquired and the two-stage [`introspect_conn`] resolution runs;
1553    /// the result is cached so the first runtime operation hits a warm
1554    /// cache. The caller distinguishes the `Err`: a `TableNotResolved`
1555    /// / `AmbiguousTable` means the table genuinely does not resolve on
1556    /// a reachable store (a fatal deploy error); a `Connect` means the
1557    /// store is unreachable (a non-fatal deploy warning — the D9
1558    /// runtime resolution still applies).
1559    pub(crate) async fn warm_schema(&self, table: &str) -> Result<(), StoreError> {
1560        if self.cached_schema(table).is_some() {
1561            return Ok(());
1562        }
1563        let mut conn = self.pool.acquire().await.map_err(|e| {
1564            StoreError::Connect { source: e.to_string() }
1565        })?;
1566        let resolved = introspect_conn(&mut conn, table).await?;
1567        self.cache_schema(table, resolved);
1568        Ok(())
1569    }
1570
1571    /// `persist` — run `INSERT INTO "schema"."table" (…) VALUES (…)`.
1572    /// Returns the number of rows inserted. §Fase 37.x.d (D3) — on a
1573    /// cache MISS the resolution + the `INSERT` execute in ONE
1574    /// transaction; a cache HIT needs no transaction.
1575    pub async fn insert(
1576        &self,
1577        // §Fase 37.x.j (D1) — see `query()` for the rationale on the
1578        // `StoreConn` connection-source parameter.
1579        conn: &mut crate::store::store_conn::StoreConn<'_>,
1580        table: &str,
1581        data: &[(String, SqlValue)],
1582    ) -> Result<u64, StoreError> {
1583        // — cache HIT: operate with the cached resolution; no
1584        //   transaction. §37.x.f (D9) self-heals a stale cache. —
1585        if let Some(resolved) = self.cached_schema(table) {
1586            let (sql, params) = build_insert_sql(
1587                table,
1588                Some(resolved.schema.as_str()),
1589                data,
1590                &resolved.column_types,
1591            )?;
1592            // §Fase 38.x.a (D1) — see `introspect_conn` for the full rationale.
1593            let mut q = sqlx::query(&sql).persistent(false);
1594            for value in &params {
1595                q = bind_value(q, value);
1596            }
1597            // §Fase 37.x.j (D1) — dispatch through StoreConn.
1598            match conn.execute(q).await {
1599                Ok(result) => return Ok(result.rows_affected()),
1600                Err(e) => {
1601                    let err = classify_sql_error("persist", e);
1602                    if !err.is_schema_drift() {
1603                        return Err(err);
1604                    }
1605                    // §37.x.f (D9) — stale cache: evict + fall through
1606                    // (the single retry). Safe — a drift SQLSTATE is a
1607                    // parse/plan-time rejection, so this `INSERT` wrote
1608                    // zero rows; the retry cannot double-write.
1609                    self.evict_schema(table);
1610                }
1611            }
1612        }
1613        // — cache MISS, or the §37.x.f (D9) self-heal retry: resolve +
1614        //   operate in ONE transaction (D3). —
1615        // §Fase 37.x.j (D1) — see `query()` for the begin() rationale.
1616        let mut tx = conn.begin().await.map_err(|e| {
1617            StoreError::Connect { source: e.to_string() }
1618        })?;
1619        // §Fase 37.x.j.11 — ROLLBACK + propagate introspect error
1620        // directly. See `query()` above for the full rationale.
1621        let resolved = match introspect_conn(&mut tx, table).await {
1622            Ok(r) => r,
1623            Err(introspect_err) => {
1624                tracing::warn!(
1625                    target: "axon::store",
1626                    table = %table,
1627                    op = "introspect_in_tx_persist",
1628                    error = %introspect_err,
1629                    d_letter = "37.x.j.11",
1630                    "persist introspection failed; rolling back the \
1631                     transaction and returning the primary error \
1632                     directly."
1633                );
1634                let _ = tx.rollback().await;
1635                return Err(introspect_err);
1636            }
1637        };
1638        let (sql, params) = build_insert_sql(
1639            table,
1640            Some(resolved.schema.as_str()),
1641            data,
1642            &resolved.column_types,
1643        )?;
1644        // §Fase 38.x.a (D1) — mandatory inside the `pool.begin()` tx.
1645        let mut q = sqlx::query(&sql).persistent(false);
1646        for value in &params {
1647            q = bind_value(q, value);
1648        }
1649        let result = q
1650            .execute(&mut *tx)
1651            .await
1652            .map_err(|e| classify_sql_error("persist", e))?;
1653        tx.commit().await.map_err(|e| StoreError::Connect {
1654            source: e.to_string(),
1655        })?;
1656        self.cache_schema(table, resolved);
1657        Ok(result.rows_affected())
1658    }
1659
1660    /// `mutate` — run `UPDATE "schema"."table" SET … WHERE …`. Returns
1661    /// the number of rows affected. §Fase 37.x.d (D3) — on a cache MISS
1662    /// the resolution + the `UPDATE` execute in ONE transaction; a
1663    /// cache HIT needs no transaction.
1664    pub async fn mutate(
1665        &self,
1666        // §Fase 37.x.j (D1) — see `query()` for the rationale.
1667        conn: &mut crate::store::store_conn::StoreConn<'_>,
1668        table: &str,
1669        where_expr: &str,
1670        data: &[(String, SqlValue)],
1671        bindings: &std::collections::HashMap<String, String>,
1672    ) -> Result<u64, StoreError> {
1673        // — cache HIT: operate with the cached resolution; no
1674        //   transaction. §37.x.f (D9) self-heals a stale cache. —
1675        if let Some(resolved) = self.cached_schema(table) {
1676            let (sql, params) = build_update_sql(
1677                table,
1678                Some(resolved.schema.as_str()),
1679                where_expr,
1680                data,
1681                bindings,
1682                &resolved.column_types,
1683            )?;
1684            // §Fase 38.x.a (D1) — see `introspect_conn` for the full rationale.
1685            let mut q = sqlx::query(&sql).persistent(false);
1686            for value in &params {
1687                q = bind_value(q, value);
1688            }
1689            // §Fase 37.x.j (D1) — dispatch through StoreConn.
1690            match conn.execute(q).await {
1691                Ok(result) => return Ok(result.rows_affected()),
1692                Err(e) => {
1693                    let err = classify_sql_error("mutate", e);
1694                    if !err.is_schema_drift() {
1695                        return Err(err);
1696                    }
1697                    // §37.x.f (D9) — stale cache: evict + fall through
1698                    // (the single retry). Safe — a drift SQLSTATE is a
1699                    // parse/plan-time rejection, so this `UPDATE`
1700                    // modified zero rows; the retry cannot double-write.
1701                    self.evict_schema(table);
1702                }
1703            }
1704        }
1705        // — cache MISS, or the §37.x.f (D9) self-heal retry: resolve +
1706        //   operate in ONE transaction (D3). —
1707        // §Fase 37.x.j (D1) — see `query()` for the begin() rationale.
1708        let mut tx = conn.begin().await.map_err(|e| {
1709            StoreError::Connect { source: e.to_string() }
1710        })?;
1711        // §Fase 37.x.j.11 — ROLLBACK + propagate introspect error
1712        // directly. See `query()` above for the full rationale.
1713        let resolved = match introspect_conn(&mut tx, table).await {
1714            Ok(r) => r,
1715            Err(introspect_err) => {
1716                tracing::warn!(
1717                    target: "axon::store",
1718                    table = %table,
1719                    op = "introspect_in_tx_mutate",
1720                    error = %introspect_err,
1721                    d_letter = "37.x.j.11",
1722                    "mutate introspection failed; rolling back the \
1723                     transaction and returning the primary error \
1724                     directly."
1725                );
1726                let _ = tx.rollback().await;
1727                return Err(introspect_err);
1728            }
1729        };
1730        let (sql, params) = build_update_sql(
1731            table,
1732            Some(resolved.schema.as_str()),
1733            where_expr,
1734            data,
1735            bindings,
1736            &resolved.column_types,
1737        )?;
1738        // §Fase 38.x.a (D1) — mandatory inside the `pool.begin()` tx.
1739        let mut q = sqlx::query(&sql).persistent(false);
1740        for value in &params {
1741            q = bind_value(q, value);
1742        }
1743        let result = q
1744            .execute(&mut *tx)
1745            .await
1746            .map_err(|e| classify_sql_error("mutate", e))?;
1747        tx.commit().await.map_err(|e| StoreError::Connect {
1748            source: e.to_string(),
1749        })?;
1750        self.cache_schema(table, resolved);
1751        Ok(result.rows_affected())
1752    }
1753
1754    /// `purge` — run `DELETE FROM "schema"."table" WHERE …`. Returns the
1755    /// number of rows deleted. §Fase 37.x.d (D3) — on a cache MISS the
1756    /// resolution + the `DELETE` execute in ONE transaction; a cache
1757    /// HIT needs no transaction.
1758    pub async fn purge(
1759        &self,
1760        // §Fase 37.x.j (D1) — see `query()` for the rationale.
1761        conn: &mut crate::store::store_conn::StoreConn<'_>,
1762        table: &str,
1763        where_expr: &str,
1764        bindings: &std::collections::HashMap<String, String>,
1765    ) -> Result<u64, StoreError> {
1766        // — cache HIT: operate with the cached resolution; no
1767        //   transaction. §37.x.f (D9) self-heals a stale cache. —
1768        if let Some(resolved) = self.cached_schema(table) {
1769            let (sql, params) = build_delete_sql(
1770                table,
1771                Some(resolved.schema.as_str()),
1772                where_expr,
1773                bindings,
1774                &resolved.column_types,
1775            )?;
1776            // §Fase 38.x.a (D1) — see `introspect_conn` for the full rationale.
1777            let mut q = sqlx::query(&sql).persistent(false);
1778            for value in &params {
1779                q = bind_value(q, value);
1780            }
1781            // §Fase 37.x.j (D1) — dispatch through StoreConn.
1782            match conn.execute(q).await {
1783                Ok(result) => return Ok(result.rows_affected()),
1784                Err(e) => {
1785                    let err = classify_sql_error("purge", e);
1786                    if !err.is_schema_drift() {
1787                        return Err(err);
1788                    }
1789                    // §37.x.f (D9) — stale cache: evict + fall through
1790                    // (the single retry). Safe — a drift SQLSTATE is a
1791                    // parse/plan-time rejection, so this `DELETE`
1792                    // removed zero rows; the retry cannot double-delete.
1793                    self.evict_schema(table);
1794                }
1795            }
1796        }
1797        // — cache MISS, or the §37.x.f (D9) self-heal retry: resolve +
1798        //   operate in ONE transaction (D3). —
1799        // §Fase 37.x.j (D1) — see `query()` for the begin() rationale.
1800        let mut tx = conn.begin().await.map_err(|e| {
1801            StoreError::Connect { source: e.to_string() }
1802        })?;
1803        // §Fase 37.x.j.11 — ROLLBACK + propagate introspect error
1804        // directly. See `query()` above for the full rationale.
1805        let resolved = match introspect_conn(&mut tx, table).await {
1806            Ok(r) => r,
1807            Err(introspect_err) => {
1808                tracing::warn!(
1809                    target: "axon::store",
1810                    table = %table,
1811                    op = "introspect_in_tx_purge",
1812                    error = %introspect_err,
1813                    d_letter = "37.x.j.11",
1814                    "purge introspection failed; rolling back the \
1815                     transaction and returning the primary error \
1816                     directly."
1817                );
1818                let _ = tx.rollback().await;
1819                return Err(introspect_err);
1820            }
1821        };
1822        let (sql, params) = build_delete_sql(
1823            table,
1824            Some(resolved.schema.as_str()),
1825            where_expr,
1826            bindings,
1827            &resolved.column_types,
1828        )?;
1829        // §Fase 38.x.a (D1) — mandatory inside the `pool.begin()` tx.
1830        let mut q = sqlx::query(&sql).persistent(false);
1831        for value in &params {
1832            q = bind_value(q, value);
1833        }
1834        let result = q
1835            .execute(&mut *tx)
1836            .await
1837            .map_err(|e| classify_sql_error("purge", e))?;
1838        tx.commit().await.map_err(|e| StoreError::Connect {
1839            source: e.to_string(),
1840        })?;
1841        self.cache_schema(table, resolved);
1842        Ok(result.rows_affected())
1843    }
1844
1845    /// Verify database reachability with `SELECT 1`.
1846    pub async fn ping(&self) -> Result<(), StoreError> {
1847        // §Fase 38.x.a (D1) — even the trivial reachability probe carries
1848        // `.persistent(false)`: a `SELECT 1` PARSE collision is rare but
1849        // possible behind an aggressive transaction-mode pooler, and the
1850        // grep §-assertion in `fase38x_a_pooler_prepared_statement_regression.rs`
1851        // enforces the invariant uniformly.
1852        sqlx::query("SELECT 1")
1853            .persistent(false)
1854            .execute(&self.pool)
1855            .await
1856            .map(|_| ())
1857            .map_err(|e| StoreError::Connect { source: e.to_string() })
1858    }
1859}
1860
1861// ════════════════════════════════════════════════════════════════════
1862//  Unit tests — pure surface (no database)
1863// ════════════════════════════════════════════════════════════════════
1864
1865#[cfg(test)]
1866mod tests {
1867    use super::*;
1868
1869    fn txt(s: &str) -> SqlValue {
1870        SqlValue::Text(s.to_string())
1871    }
1872
1873    /// Empty bindings — these `build_*_sql` tests pin the pre-37.d
1874    /// behaviour (no `${name}` resolution). The §Fase 37.d resolution
1875    /// is exercised by `tests/fase37_d_*` and `store::filter`.
1876    fn nb() -> std::collections::HashMap<String, String> {
1877        std::collections::HashMap::new()
1878    }
1879
1880    // ── resolve_dsn ──────────────────────────────────────────────────
1881
1882    #[test]
1883    fn resolve_empty_connection_errors() {
1884        assert_eq!(resolve_dsn(""), Err(StoreError::EmptyConnection));
1885        assert_eq!(resolve_dsn("    "), Err(StoreError::EmptyConnection));
1886    }
1887
1888    #[test]
1889    fn resolve_literal_dsn_is_returned_verbatim() {
1890        let dsn = "postgresql://u:p@localhost:5432/axon";
1891        assert_eq!(resolve_dsn(dsn), Ok(dsn.to_string()));
1892    }
1893
1894    #[test]
1895    fn resolve_literal_dsn_is_trimmed() {
1896        assert_eq!(
1897            resolve_dsn("  postgresql://h/db  "),
1898            Ok("postgresql://h/db".to_string())
1899        );
1900    }
1901
1902    #[test]
1903    fn resolve_bare_env_prefix_errors() {
1904        assert_eq!(resolve_dsn("env:"), Err(StoreError::EmptyEnvVarName));
1905        assert_eq!(resolve_dsn("env:   "), Err(StoreError::EmptyEnvVarName));
1906    }
1907
1908    #[test]
1909    fn resolve_missing_env_var_errors() {
1910        match resolve_dsn("env:AXON_NONEXISTENT_VAR_FASE35C") {
1911            Err(StoreError::MissingEnvVar { var }) => {
1912                assert_eq!(var, "AXON_NONEXISTENT_VAR_FASE35C");
1913            }
1914            other => panic!("expected MissingEnvVar, got {other:?}"),
1915        }
1916    }
1917
1918    #[test]
1919    fn resolve_env_var_reads_the_environment() {
1920        // `PATH` is set on every supported OS — exercise the success
1921        // path without mutating the process environment.
1922        let resolved = resolve_dsn("env:PATH").expect("PATH resolves");
1923        assert_eq!(resolved, std::env::var("PATH").unwrap());
1924        assert!(!resolved.is_empty());
1925    }
1926
1927    // ── connect / masking ────────────────────────────────────────────
1928
1929    #[tokio::test]
1930    async fn connect_with_valid_dsn_is_lazy_and_succeeds() {
1931        // `connect_lazy` opens no connection — a well-formed DSN to a
1932        // host that may not exist still yields Ok.
1933        let backend =
1934            PostgresStoreBackend::connect("postgresql://u:p@localhost:5432/db")
1935                .expect("a well-formed DSN builds a lazy pool");
1936        let _ = format!("{backend:?}");
1937    }
1938
1939    #[tokio::test]
1940    async fn connect_masks_the_password_in_dsn_and_debug() {
1941        // A deliberately fake credential — this test asserts the
1942        // backend never surfaces a DSN password.
1943        let fake_secret = "fakecred0";
1944        let backend = PostgresStoreBackend::connect(&format!(
1945            "postgresql://user:{fake_secret}@localhost:5432/axon"
1946        ))
1947        .unwrap();
1948        let masked = backend.masked_dsn();
1949        assert!(!masked.contains(fake_secret), "password must be masked");
1950        assert!(masked.contains("***"));
1951        assert!(!format!("{backend:?}").contains(fake_secret));
1952    }
1953
1954    #[test]
1955    fn connect_empty_connection_errors() {
1956        assert!(matches!(
1957            PostgresStoreBackend::connect(""),
1958            Err(StoreError::EmptyConnection)
1959        ));
1960    }
1961
1962    #[test]
1963    fn connect_missing_env_var_errors() {
1964        assert!(matches!(
1965            PostgresStoreBackend::connect("env:AXON_NONEXISTENT_VAR_FASE35C"),
1966            Err(StoreError::MissingEnvVar { .. })
1967        ));
1968    }
1969
1970    #[test]
1971    fn connect_malformed_dsn_errors() {
1972        assert!(matches!(
1973            PostgresStoreBackend::connect("not a valid dsn at all"),
1974            Err(StoreError::PoolInit { .. })
1975        ));
1976    }
1977
1978    // ── Gap 3 (v1.36.3) — pooler safety + application_name ───────────
1979
1980    #[tokio::test]
1981    async fn connect_named_with_valid_dsn_is_lazy_and_succeeds() {
1982        // `connect_named` builds the same lazy pool — Gap 3 only adds
1983        // `statement_cache_capacity(0)` + `application_name`, neither of
1984        // which opens a connection.
1985        let backend = PostgresStoreBackend::connect_named(
1986            "postgresql://u:p@localhost:5432/db",
1987            "claims",
1988        )
1989        .expect("a well-formed DSN builds a lazy pool");
1990        let _ = format!("{backend:?}");
1991    }
1992
1993    #[test]
1994    fn connect_named_malformed_dsn_errors() {
1995        assert!(matches!(
1996            PostgresStoreBackend::connect_named("not a dsn", "claims"),
1997            Err(StoreError::PoolInit { .. })
1998        ));
1999    }
2000
2001    #[test]
2002    fn application_name_carries_the_store_name() {
2003        assert_eq!(application_name_for("claims"), "axon-store/claims");
2004        assert_eq!(
2005            application_name_for("tenant_audit_log"),
2006            "axon-store/tenant_audit_log"
2007        );
2008    }
2009
2010    #[test]
2011    fn application_name_empty_store_is_bare() {
2012        // `connect` delegates with no store name — the bare label, with
2013        // no dangling slash.
2014        assert_eq!(application_name_for(""), "axon-store");
2015    }
2016
2017    #[test]
2018    fn application_name_capped_at_postgres_namedatalen() {
2019        // Postgres silently truncates `application_name` past 63 bytes;
2020        // we cap it ourselves so the stamped name is exactly observed.
2021        let long = "s".repeat(200);
2022        let name = application_name_for(&long);
2023        assert!(name.len() <= 63, "must fit NAMEDATALEN-1, got {}", name.len());
2024        assert!(name.starts_with("axon-store/s"));
2025    }
2026
2027    #[test]
2028    fn application_name_truncation_respects_char_boundaries() {
2029        // A multi-byte tail must never be cut mid-codepoint — the result
2030        // is always valid UTF-8 (`String` guarantees it, but the cut
2031        // must land on a boundary or the slice panics).
2032        let name = application_name_for(&"é".repeat(100));
2033        assert!(name.len() <= 63);
2034        assert!(name.is_char_boundary(name.len()));
2035    }
2036
2037    // ── build_select_sql ─────────────────────────────────────────────
2038
2039    #[test]
2040    fn select_with_filter() {
2041        let (sql, params) =
2042            build_select_sql("users", None, "id = 1", &nb(), &nb()).unwrap();
2043        // §37.x.e (D4) — unknown column type + equality → `::text`.
2044        assert_eq!(sql, "SELECT * FROM \"users\" WHERE \"id\"::text = $1");
2045        assert_eq!(params, vec![SqlValue::Integer(1)]);
2046    }
2047
2048    #[test]
2049    fn select_casts_the_filter_value_to_its_introspected_column_type() {
2050        // §v1.36.4 — a known column type casts the WHERE value, so the
2051        // comparison uses the native operator (`int4 = int4`).
2052        let types = std::collections::HashMap::from([(
2053            "id".to_string(),
2054            "int4".to_string(),
2055        )]);
2056        let (sql, _) =
2057            build_select_sql("users", None, "id = 1", &nb(), &types).unwrap();
2058        assert_eq!(sql, "SELECT * FROM \"users\" WHERE \"id\" = $1::int4");
2059    }
2060
2061    #[test]
2062    fn select_with_empty_filter_renders_where_true() {
2063        let (sql, params) =
2064            build_select_sql("users", None, "", &nb(), &nb()).unwrap();
2065        assert_eq!(sql, "SELECT * FROM \"users\" WHERE TRUE");
2066        assert!(params.is_empty());
2067    }
2068
2069    #[test]
2070    fn select_rejects_unsafe_table_name() {
2071        assert!(matches!(
2072            build_select_sql("users; DROP TABLE x", None, "", &nb(), &nb()),
2073            Err(StoreError::InvalidIdentifier { kind: "table", .. })
2074        ));
2075    }
2076
2077    #[test]
2078    fn select_propagates_filter_errors() {
2079        assert!(matches!(
2080            build_select_sql("users", None, "id = 1 AND", &nb(), &nb()),
2081            Err(StoreError::Filter(_))
2082        ));
2083    }
2084
2085    // ── build_delete_sql ─────────────────────────────────────────────
2086
2087    #[test]
2088    fn delete_with_filter() {
2089        let (sql, params) =
2090            build_delete_sql("sessions", None, "expired = true", &nb(), &nb())
2091                .unwrap();
2092        assert_eq!(sql, "DELETE FROM \"sessions\" WHERE \"expired\"::text = $1");
2093        assert_eq!(params, vec![SqlValue::Boolean(true)]);
2094    }
2095
2096    #[test]
2097    fn delete_rejects_unsafe_table() {
2098        assert!(matches!(
2099            build_delete_sql("evil\"table", None, "a = 1", &nb(), &nb()),
2100            Err(StoreError::InvalidIdentifier { .. })
2101        ));
2102    }
2103
2104    // ── build_insert_sql ─────────────────────────────────────────────
2105
2106    #[test]
2107    fn insert_basic() {
2108        let (sql, params) = build_insert_sql(
2109            "users",
2110            None,
2111            &[("name".into(), txt("Alice")), ("age".into(), SqlValue::Integer(30))],
2112            &nb(),
2113        )
2114        .unwrap();
2115        assert_eq!(
2116            sql,
2117            "INSERT INTO \"users\" (\"name\", \"age\") VALUES ($1, $2)"
2118        );
2119        assert_eq!(params, vec![txt("Alice"), SqlValue::Integer(30)]);
2120    }
2121
2122    #[test]
2123    fn insert_renders_null_inline_consuming_no_placeholder() {
2124        let (sql, params) = build_insert_sql(
2125            "t",
2126            None,
2127            &[
2128                ("a".into(), SqlValue::Integer(1)),
2129                ("b".into(), SqlValue::Null),
2130                ("c".into(), txt("x")),
2131            ],
2132            &nb(),
2133        )
2134        .unwrap();
2135        assert_eq!(
2136            sql,
2137            "INSERT INTO \"t\" (\"a\", \"b\", \"c\") VALUES ($1, NULL, $2)"
2138        );
2139        assert_eq!(params, vec![SqlValue::Integer(1), txt("x")]);
2140    }
2141
2142    #[test]
2143    fn insert_empty_data_errors() {
2144        assert_eq!(
2145            build_insert_sql("t", None, &[], &nb()),
2146            Err(StoreError::EmptyData { op: "insert" })
2147        );
2148    }
2149
2150    #[test]
2151    fn insert_rejects_unsafe_column_name() {
2152        assert!(matches!(
2153            build_insert_sql("t", None, &[("a\"; DROP".into(), SqlValue::Integer(1))], &nb()),
2154            Err(StoreError::InvalidIdentifier { kind: "column", .. })
2155        ));
2156    }
2157
2158    #[test]
2159    fn insert_rejects_unsafe_table_name() {
2160        assert!(matches!(
2161            build_insert_sql("t t", None, &[("a".into(), SqlValue::Integer(1))], &nb()),
2162            Err(StoreError::InvalidIdentifier { kind: "table", .. })
2163        ));
2164    }
2165
2166    // ── build_update_sql ─────────────────────────────────────────────
2167
2168    #[test]
2169    fn update_basic_where_offset_continues_after_set() {
2170        let (sql, params) = build_update_sql(
2171            "users",
2172            None,
2173            "id = 5",
2174            &[("name".into(), txt("Bob")), ("age".into(), SqlValue::Integer(40))],
2175            &nb(),
2176            &nb(),
2177        )
2178        .unwrap();
2179        assert_eq!(
2180            sql,
2181            "UPDATE \"users\" SET \"name\" = $1, \"age\" = $2 \
2182             WHERE \"id\"::text = $3"
2183        );
2184        assert_eq!(
2185            params,
2186            vec![txt("Bob"), SqlValue::Integer(40), SqlValue::Integer(5)]
2187        );
2188    }
2189
2190    #[test]
2191    fn update_null_set_value_shifts_where_offset_by_non_null_count() {
2192        // The defect the Python reference has: a NULL SET value renders
2193        // inline, so the WHERE offset is the NON-NULL set count (1),
2194        // not the column count (2). `id` must be $2, not $3.
2195        let (sql, params) = build_update_sql(
2196            "users",
2197            None,
2198            "id = 5",
2199            &[("name".into(), SqlValue::Null), ("age".into(), SqlValue::Integer(40))],
2200            &nb(),
2201            &nb(),
2202        )
2203        .unwrap();
2204        assert_eq!(
2205            sql,
2206            "UPDATE \"users\" SET \"name\" = NULL, \"age\" = $1 \
2207             WHERE \"id\"::text = $2"
2208        );
2209        assert_eq!(params, vec![SqlValue::Integer(40), SqlValue::Integer(5)]);
2210    }
2211
2212    #[test]
2213    fn update_with_empty_where_targets_all_rows() {
2214        let (sql, _) = build_update_sql(
2215            "t",
2216            None,
2217            "",
2218            &[("a".into(), SqlValue::Integer(1))],
2219            &nb(),
2220            &nb(),
2221        )
2222        .unwrap();
2223        assert_eq!(sql, "UPDATE \"t\" SET \"a\" = $1 WHERE TRUE");
2224    }
2225
2226    #[test]
2227    fn update_empty_data_errors() {
2228        assert_eq!(
2229            build_update_sql("t", None, "id = 1", &[], &nb(), &nb()),
2230            Err(StoreError::EmptyData { op: "mutate" })
2231        );
2232    }
2233
2234    #[test]
2235    fn update_rejects_unsafe_column() {
2236        assert!(matches!(
2237            build_update_sql(
2238                "t",
2239                None,
2240                "id = 1",
2241                &[("a-b".into(), SqlValue::Integer(1))],
2242                &nb(),
2243                &nb(),
2244            ),
2245            Err(StoreError::InvalidIdentifier { kind: "column", .. })
2246        ));
2247    }
2248
2249    #[test]
2250    fn update_propagates_filter_errors() {
2251        assert!(matches!(
2252            build_update_sql(
2253                "t",
2254                None,
2255                "bad ;",
2256                &[("a".into(), SqlValue::Integer(1))],
2257                &nb(),
2258                &nb(),
2259            ),
2260            Err(StoreError::Filter(_))
2261        ));
2262    }
2263
2264    // ── §v1.36.2 — typed-column write cast ───────────────────────────
2265
2266    #[test]
2267    fn insert_casts_each_value_to_its_introspected_column_type() {
2268        let types = std::collections::HashMap::from([
2269            ("tenant_id".to_string(), "uuid".to_string()),
2270            ("note".to_string(), "text".to_string()),
2271            ("n".to_string(), "int4".to_string()),
2272        ]);
2273        let (sql, _) = build_insert_sql(
2274            "chat_history",
2275            None,
2276            &[
2277                ("tenant_id".into(), txt("83d078e1-b372-42ba-9572-ff8dc521386e")),
2278                ("note".into(), txt("hi")),
2279                ("n".into(), SqlValue::Integer(3)),
2280            ],
2281            &types,
2282        )
2283        .unwrap();
2284        assert_eq!(
2285            sql,
2286            "INSERT INTO \"chat_history\" (\"tenant_id\", \"note\", \"n\") \
2287             VALUES ($1::uuid, $2::text, $3::int4)",
2288            "§v1.36.2 — each value placeholder is cast to its column's \
2289             introspected type so a text-bound value writes into a \
2290             uuid / int column"
2291        );
2292    }
2293
2294    #[test]
2295    fn update_set_casts_each_value_to_its_introspected_column_type() {
2296        let types = std::collections::HashMap::from([(
2297            "status".to_string(),
2298            "uuid".to_string(),
2299        )]);
2300        let (sql, _) = build_update_sql(
2301            "t",
2302            None,
2303            "id = 1",
2304            &[("status".into(), txt("83d078e1-b372-42ba-9572-ff8dc521386e"))],
2305            &nb(),
2306            &types,
2307        )
2308        .unwrap();
2309        assert_eq!(
2310            sql,
2311            "UPDATE \"t\" SET \"status\" = $1::uuid WHERE \"id\"::text = $2",
2312            "§v1.36.2 — the SET value is cast to the column type; `id` \
2313             is absent from the type map so §37.x.e (D4) casts the \
2314             WHERE column to `text` for the equality"
2315        );
2316    }
2317
2318    #[test]
2319    fn update_where_value_is_cast_to_its_column_type() {
2320        // §v1.36.4 — when the WHERE column's type IS known, its value
2321        // placeholder is cast too (the SET-side cure applied to WHERE).
2322        let types = std::collections::HashMap::from([
2323            ("status".to_string(), "text".to_string()),
2324            ("id".to_string(), "int8".to_string()),
2325        ]);
2326        let (sql, _) = build_update_sql(
2327            "t",
2328            None,
2329            "id = 1",
2330            &[("status".into(), txt("done"))],
2331            &nb(),
2332            &types,
2333        )
2334        .unwrap();
2335        assert_eq!(
2336            sql,
2337            "UPDATE \"t\" SET \"status\" = $1::text WHERE \"id\" = $2::int8"
2338        );
2339    }
2340
2341    #[test]
2342    fn unknown_column_type_falls_back_to_a_bare_placeholder() {
2343        // An empty type map (introspection missed the table / column)
2344        // → bare `$N`, the pre-1.36.2 behaviour: a `text` column still
2345        // works, a typed column fails LOUDLY — no regression, no
2346        // silent-wrong write.
2347        let (sql, _) =
2348            build_insert_sql("t", None, &[("x".into(), txt("v"))], &nb()).unwrap();
2349        assert_eq!(sql, "INSERT INTO \"t\" (\"x\") VALUES ($1)");
2350    }
2351
2352    #[test]
2353    fn an_unsafe_column_type_name_is_not_spliced_into_sql() {
2354        // Defense in depth: `udt_name` comes from Postgres, but a type
2355        // name that is not a safe identifier is never spliced — the
2356        // builder falls back to a bare `$N`.
2357        let types = std::collections::HashMap::from([(
2358            "x".to_string(),
2359            "uuid; DROP TABLE t".to_string(),
2360        )]);
2361        let (sql, _) =
2362            build_insert_sql("t", None, &[("x".into(), txt("v"))], &types).unwrap();
2363        assert_eq!(
2364            sql, "INSERT INTO \"t\" (\"x\") VALUES ($1)",
2365            "an unsafe type name yields no cast — never a splice"
2366        );
2367    }
2368
2369    // ── D4 — injection resistance, end to end ────────────────────────
2370
2371    #[test]
2372    fn injection_in_value_position_is_a_bound_parameter() {
2373        let (sql, params) = build_select_sql(
2374            "users",
2375            None,
2376            "name = '; DROP TABLE users; --'",
2377            &nb(),
2378            &nb(),
2379        )
2380        .unwrap();
2381        assert_eq!(sql, "SELECT * FROM \"users\" WHERE \"name\"::text = $1");
2382        assert_eq!(
2383            params,
2384            vec![txt("; DROP TABLE users; --")]
2385        );
2386    }
2387
2388    #[test]
2389    fn injection_in_table_identifier_is_rejected_not_quoted() {
2390        assert!(matches!(
2391            build_select_sql("users\" WHERE 1=1; --", None, "", &nb(), &nb()),
2392            Err(StoreError::InvalidIdentifier { .. })
2393        ));
2394    }
2395
2396    // ── §Fase 37.x.c — schema-anchored relation (D2) ─────────────────
2397
2398    #[test]
2399    fn select_with_a_resolved_schema_is_qualified() {
2400        // §37.x.c (D2) — a resolved schema renders `"schema"."table"`,
2401        // so the SELECT resolves on any session regardless of the
2402        // ambient `search_path`.
2403        let (sql, _) =
2404            build_select_sql("tenants", Some("public"), "id = 1", &nb(), &nb())
2405                .unwrap();
2406        assert_eq!(
2407            sql,
2408            "SELECT * FROM \"public\".\"tenants\" WHERE \"id\"::text = $1"
2409        );
2410    }
2411
2412    #[test]
2413    fn every_builder_qualifies_with_a_resolved_schema() {
2414        // D2 must flip ALL FOUR builders, not three.
2415        let data = [("v".to_string(), SqlValue::Integer(1))];
2416        let (sel, _) =
2417            build_select_sql("t", Some("app"), "", &nb(), &nb()).unwrap();
2418        let (del, _) =
2419            build_delete_sql("t", Some("app"), "", &nb(), &nb()).unwrap();
2420        let (ins, _) = build_insert_sql("t", Some("app"), &data, &nb()).unwrap();
2421        let (upd, _) =
2422            build_update_sql("t", Some("app"), "", &data, &nb(), &nb()).unwrap();
2423        assert!(sel.contains("FROM \"app\".\"t\""), "SELECT: {sel}");
2424        assert!(del.contains("FROM \"app\".\"t\""), "DELETE: {del}");
2425        assert!(ins.contains("INTO \"app\".\"t\""), "INSERT: {ins}");
2426        assert!(upd.starts_with("UPDATE \"app\".\"t\""), "UPDATE: {upd}");
2427    }
2428
2429    #[test]
2430    fn no_resolved_schema_renders_the_bare_table() {
2431        // D5 backwards-compat — `schema = None` (resolution failed or
2432        // not attempted) renders the pre-37.x un-qualified `"table"`.
2433        let (sql, _) = build_select_sql("t", None, "", &nb(), &nb()).unwrap();
2434        assert_eq!(sql, "SELECT * FROM \"t\" WHERE TRUE");
2435    }
2436
2437    #[test]
2438    fn an_unsafe_schema_name_is_not_spliced_and_falls_back_to_bare_table() {
2439        // Defense in depth (D4) — a schema name from `pg_catalog` that
2440        // is not a safe identifier is NEVER spliced; the builder falls
2441        // back to the bare `"table"` (search_path resolves it), exactly
2442        // as an unsafe `udt_name` yields no cast.
2443        for unsafe_schema in ["a\"; DROP TABLE x", "my schema", "1schema"] {
2444            let (sql, _) =
2445                build_select_sql("t", Some(unsafe_schema), "", &nb(), &nb())
2446                    .unwrap();
2447            assert_eq!(
2448                sql, "SELECT * FROM \"t\" WHERE TRUE",
2449                "unsafe schema `{unsafe_schema}` must not be spliced"
2450            );
2451        }
2452    }
2453
2454    #[test]
2455    fn a_qualified_table_still_casts_and_offsets_correctly() {
2456        // §37.x.c composes with §v1.36.2/§v1.36.4 — schema-qualification
2457        // is orthogonal to the value cast + the WHERE param offset.
2458        let types = std::collections::HashMap::from([
2459            ("status".to_string(), "uuid".to_string()),
2460            ("id".to_string(), "int8".to_string()),
2461        ]);
2462        let (sql, _) = build_update_sql(
2463            "t",
2464            Some("public"),
2465            "id = 1",
2466            &[("status".into(), txt("done"))],
2467            &nb(),
2468            &types,
2469        )
2470        .unwrap();
2471        assert_eq!(
2472            sql,
2473            "UPDATE \"public\".\"t\" SET \"status\" = $1::uuid \
2474             WHERE \"id\" = $2::int8"
2475        );
2476    }
2477
2478    // ── classify_pg_type ─────────────────────────────────────────────
2479
2480    #[test]
2481    fn classify_every_supported_type() {
2482        let cases = [
2483            ("BOOL", PgTypeClass::Bool),
2484            ("INT2", PgTypeClass::Int2),
2485            ("INT4", PgTypeClass::Int4),
2486            ("INT8", PgTypeClass::Int8),
2487            ("FLOAT4", PgTypeClass::Float4),
2488            ("FLOAT8", PgTypeClass::Float8),
2489            ("NUMERIC", PgTypeClass::Numeric),
2490            ("TEXT", PgTypeClass::Text),
2491            ("VARCHAR", PgTypeClass::Text),
2492            ("BPCHAR", PgTypeClass::Text),
2493            ("NAME", PgTypeClass::Text),
2494            ("UUID", PgTypeClass::Uuid),
2495            ("TIMESTAMPTZ", PgTypeClass::TimestampTz),
2496            ("TIMESTAMP", PgTypeClass::Timestamp),
2497            ("DATE", PgTypeClass::Date),
2498            ("TIME", PgTypeClass::Time),
2499            ("JSON", PgTypeClass::Json),
2500            ("JSONB", PgTypeClass::Json),
2501            ("BYTEA", PgTypeClass::Bytea),
2502        ];
2503        for (name, expected) in cases {
2504            assert_eq!(classify_pg_type(name), Some(expected), "type {name}");
2505        }
2506    }
2507
2508    #[test]
2509    fn classify_is_case_insensitive() {
2510        assert_eq!(classify_pg_type("int4"), Some(PgTypeClass::Int4));
2511        assert_eq!(classify_pg_type("TimestampTz"), Some(PgTypeClass::TimestampTz));
2512    }
2513
2514    #[test]
2515    fn classify_unsupported_types_return_none() {
2516        for name in ["INT4[]", "INET", "POINT", "HSTORE", "CIDR", "MONEY", ""] {
2517            assert_eq!(classify_pg_type(name), None, "type {name} unsupported");
2518        }
2519    }
2520
2521    // ── StoreRow ─────────────────────────────────────────────────────
2522
2523    #[test]
2524    fn store_row_get_and_to_json() {
2525        let row = StoreRow {
2526            columns: vec![
2527                ("id".into(), JsonValue::from(7)),
2528                ("name".into(), JsonValue::String("Eve".into())),
2529            ],
2530        };
2531        assert_eq!(row.get("id"), Some(&JsonValue::from(7)));
2532        assert_eq!(row.get("missing"), None);
2533        assert_eq!(
2534            row.to_json(),
2535            serde_json::json!({ "id": 7, "name": "Eve" })
2536        );
2537    }
2538
2539    // ── §Fase 37.x.b — resolve_from_rows (D1 pure resolution core) ───
2540
2541    fn triple(s: &str, c: &str, t: &str) -> (String, String, String) {
2542        (s.to_string(), c.to_string(), t.to_string())
2543    }
2544
2545    #[test]
2546    fn resolve_from_rows_no_rows_is_table_not_resolved() {
2547        match resolve_from_rows("widgets", vec![]) {
2548            Err(StoreError::TableNotResolved { table }) => {
2549                assert_eq!(table, "widgets");
2550            }
2551            other => panic!("expected TableNotResolved, got {other:?}"),
2552        }
2553    }
2554
2555    #[test]
2556    fn resolve_from_rows_one_schema_resolves_with_its_column_map() {
2557        let (schema, types) = resolve_from_rows(
2558            "tenants",
2559            vec![triple("public", "id", "uuid"), triple("public", "n", "int4")],
2560        )
2561        .expect("a single-schema result resolves");
2562        assert_eq!(schema, "public");
2563        assert_eq!(types.get("id"), Some(&"uuid".to_string()));
2564        assert_eq!(types.get("n"), Some(&"int4".to_string()));
2565        assert_eq!(types.len(), 2);
2566    }
2567
2568    #[test]
2569    fn resolve_from_rows_two_schemas_is_ambiguous_with_sorted_schemas() {
2570        match resolve_from_rows(
2571            "widgets",
2572            vec![
2573                triple("tenant_b", "id", "uuid"),
2574                triple("tenant_a", "id", "int4"),
2575            ],
2576        ) {
2577            Err(StoreError::AmbiguousTable { table, schemas }) => {
2578                assert_eq!(table, "widgets");
2579                // `BTreeMap` keys iterate sorted — a deterministic list.
2580                assert_eq!(
2581                    schemas,
2582                    vec!["tenant_a".to_string(), "tenant_b".to_string()]
2583                );
2584            }
2585            other => panic!("expected AmbiguousTable, got {other:?}"),
2586        }
2587    }
2588
2589    #[test]
2590    fn resolve_from_rows_three_schemas_is_still_one_ambiguous_error() {
2591        assert!(matches!(
2592            resolve_from_rows(
2593                "t",
2594                vec![
2595                    triple("s1", "a", "text"),
2596                    triple("s2", "a", "text"),
2597                    triple("s3", "a", "text"),
2598                ],
2599            ),
2600            Err(StoreError::AmbiguousTable { .. })
2601        ));
2602    }
2603
2604    // ── §Fase 37.x.d — schema cache (D3) ─────────────────────────────
2605
2606    #[tokio::test]
2607    async fn schema_cache_round_trips_a_resolution() {
2608        // The cache surface that lets a coherent-session operation skip
2609        // the transaction on a hit. `connect` is lazy — no database.
2610        let backend = PostgresStoreBackend::connect(
2611            "postgresql://u:p@localhost:5432/fase37xd_cache_rt",
2612        )
2613        .unwrap();
2614        let table = "fase37xd_cache_probe";
2615        assert!(
2616            backend.cached_schema(table).is_none(),
2617            "a cold cache is a miss"
2618        );
2619        let resolved = std::sync::Arc::new(ResolvedTable {
2620            schema: "public".to_string(),
2621            column_types: std::collections::HashMap::from([(
2622                "id".to_string(),
2623                "uuid".to_string(),
2624            )]),
2625        });
2626        backend.cache_schema(table, std::sync::Arc::clone(&resolved));
2627        let hit = backend
2628            .cached_schema(table)
2629            .expect("a warm cache is a hit");
2630        assert_eq!(hit.schema, "public");
2631        assert_eq!(hit.column_types.get("id"), Some(&"uuid".to_string()));
2632    }
2633
2634    #[tokio::test]
2635    async fn schema_cache_never_stores_an_empty_resolution() {
2636        // §v1.36.5 rule preserved — a real relation always has ≥ 1
2637        // column, so an empty map is a transient failure to retry,
2638        // never a poisoned cache entry.
2639        let backend = PostgresStoreBackend::connect(
2640            "postgresql://u:p@localhost:5432/fase37xd_cache_empty",
2641        )
2642        .unwrap();
2643        let table = "fase37xd_empty_probe";
2644        backend.cache_schema(
2645            table,
2646            std::sync::Arc::new(ResolvedTable {
2647                schema: "public".to_string(),
2648                column_types: std::collections::HashMap::new(),
2649            }),
2650        );
2651        assert!(
2652            backend.cached_schema(table).is_none(),
2653            "an empty resolution must never be cached"
2654        );
2655    }
2656
2657    // ── §Fase 37.x.f — D9 self-healing bounded cache ─────────────────
2658
2659    #[test]
2660    fn is_schema_drift_sqlstate_recognises_exactly_the_drift_codes() {
2661        // The four parse/plan-time rejections that signal a stale cache.
2662        for code in ["42P01", "42703", "42804", "42883"] {
2663            assert!(
2664                is_schema_drift_sqlstate(code),
2665                "`{code}` must be a schema-drift SQLSTATE"
2666            );
2667        }
2668        // Non-drift samples — unique-violation, syntax error, connection
2669        // failure, check-violation, serialization failure, empty.
2670        for code in ["23505", "42601", "08006", "23514", "40001", ""] {
2671            assert!(
2672                !is_schema_drift_sqlstate(code),
2673                "`{code}` is NOT schema drift — must not trigger the \
2674                 self-heal retry"
2675            );
2676        }
2677    }
2678
2679    #[test]
2680    fn store_error_is_schema_drift_predicate() {
2681        assert!(StoreError::SchemaDrift {
2682            op: "retrieve",
2683            sqlstate: "42883".to_string(),
2684            source: "operator does not exist: text = uuid".to_string(),
2685        }
2686        .is_schema_drift());
2687        assert!(!StoreError::Query {
2688            op: "retrieve",
2689            source: "syntax error".to_string(),
2690        }
2691        .is_schema_drift());
2692        assert!(!StoreError::TableNotResolved { table: "t".into() }
2693            .is_schema_drift());
2694    }
2695
2696    /// A small `ResolvedTable` for the cache tests.
2697    fn rt(schema: &str) -> std::sync::Arc<ResolvedTable> {
2698        std::sync::Arc::new(ResolvedTable {
2699            schema: schema.to_string(),
2700            column_types: std::collections::HashMap::from([(
2701                "id".to_string(),
2702                "uuid".to_string(),
2703            )]),
2704        })
2705    }
2706
2707    #[test]
2708    fn schema_cache_evicts_the_oldest_entry_at_capacity() {
2709        // §D9 — the bound: a many-table adopter cannot grow the cache
2710        // without limit; at capacity the OLDEST insertion is evicted.
2711        let mut cache = SchemaCache::new(2);
2712        let key = |t: &str| ("dsn".to_string(), t.to_string());
2713        cache.insert(key("a"), rt("s_a"));
2714        cache.insert(key("b"), rt("s_b"));
2715        cache.insert(key("c"), rt("s_c")); // over capacity → evict `a`.
2716        assert_eq!(cache.entries.len(), 2, "the cache is bounded at 2");
2717        assert!(
2718            cache.get(&key("a")).is_none(),
2719            "the oldest entry was evicted"
2720        );
2721        assert_eq!(
2722            cache.get(&key("b")).map(|r| r.schema.clone()),
2723            Some("s_b".to_string())
2724        );
2725        assert_eq!(
2726            cache.get(&key("c")).map(|r| r.schema.clone()),
2727            Some("s_c".to_string())
2728        );
2729    }
2730
2731    #[test]
2732    fn schema_cache_evict_drops_a_named_entry() {
2733        // §D9 — the self-heal eviction primitive.
2734        let mut cache = SchemaCache::new(10);
2735        let key = ("dsn".to_string(), "t".to_string());
2736        cache.insert(key.clone(), rt("public"));
2737        assert!(cache.get(&key).is_some());
2738        cache.evict(&key);
2739        assert!(cache.get(&key).is_none(), "evict drops the entry");
2740    }
2741
2742    #[test]
2743    fn schema_cache_reinsert_of_a_key_does_not_evict_another() {
2744        // Re-inserting an EXISTING key (a self-heal re-introspection)
2745        // refreshes it in place — it must not evict another entry.
2746        let mut cache = SchemaCache::new(2);
2747        let ka = ("dsn".to_string(), "a".to_string());
2748        let kb = ("dsn".to_string(), "b".to_string());
2749        cache.insert(ka.clone(), rt("public"));
2750        cache.insert(kb.clone(), rt("public"));
2751        cache.insert(ka.clone(), rt("public")); // re-insert — no eviction.
2752        assert_eq!(cache.entries.len(), 2);
2753        assert!(cache.get(&ka).is_some());
2754        assert!(cache.get(&kb).is_some(), "the re-insert evicted nothing");
2755    }
2756
2757    // ── StoreError display ───────────────────────────────────────────
2758
2759    #[test]
2760    fn every_store_error_has_a_non_empty_display() {
2761        let errors = [
2762            StoreError::EmptyConnection,
2763            StoreError::EmptyEnvVarName,
2764            StoreError::MissingEnvVar { var: "X".into() },
2765            StoreError::PoolInit {
2766                dsn_masked: "postgresql://u:***@h/db".into(),
2767                source: "bad".into(),
2768            },
2769            StoreError::InvalidIdentifier { kind: "table", name: "x;".into() },
2770            StoreError::EmptyData { op: "insert" },
2771            StoreError::Filter(FilterError::TooManyConditions { limit: 256 }),
2772            StoreError::Connect { source: "refused".into() },
2773            StoreError::Query { op: "retrieve", source: "syntax".into() },
2774            StoreError::UnsupportedColumnType {
2775                column: "geom".into(),
2776                pg_type: "POINT".into(),
2777            },
2778            StoreError::Decode {
2779                column: "ts".into(),
2780                pg_type: "TIMESTAMPTZ".into(),
2781                source: "overflow".into(),
2782            },
2783            StoreError::TableNotResolved { table: "ghost".into() },
2784            StoreError::AmbiguousTable {
2785                table: "dup".into(),
2786                schemas: vec!["a".into(), "b".into()],
2787            },
2788            StoreError::SchemaDrift {
2789                op: "retrieve",
2790                sqlstate: "42883".into(),
2791                source: "operator does not exist: text = uuid".into(),
2792            },
2793        ];
2794        for e in errors {
2795            assert!(!e.to_string().is_empty());
2796        }
2797    }
2798
2799    #[test]
2800    fn filter_error_is_a_store_error_source() {
2801        use std::error::Error;
2802        let e = StoreError::Filter(FilterError::TooManyConditions { limit: 256 });
2803        assert!(e.source().is_some());
2804    }
2805
2806    #[test]
2807    fn filter_error_converts_into_store_error() {
2808        let e: StoreError = FilterError::TooManyConditions { limit: 256 }.into();
2809        assert!(matches!(e, StoreError::Filter(_)));
2810    }
2811
2812    // ── §Fase 37.x.h — D6 honest, actionable failure ─────────────────
2813
2814    #[test]
2815    fn d6_table_not_resolved_display_carries_an_actionable_hint() {
2816        // The Display of `TableNotResolved` is the user-facing surface of
2817        // the D1 resolution failure — it MUST tell an adopter (a) what
2818        // happened, (b) the table involved, and (c) at least one concrete
2819        // remedy. A bare "could not resolve" is the *un-actionable* form
2820        // 37.x.h replaces.
2821        let err = StoreError::TableNotResolved {
2822            table: "claims".into(),
2823        };
2824        let text = err.to_string();
2825        assert!(
2826            text.contains("`claims`"),
2827            "the table name must appear verbatim, got: {text}"
2828        );
2829        assert!(
2830            text.contains("pg_catalog"),
2831            "the message must disclose pg_catalog (so an adopter knows \
2832             `search_path` is not the culprit), got: {text}"
2833        );
2834        assert!(
2835            text.contains("migration") || text.contains("SELECT"),
2836            "the message must name at least one concrete remedy \
2837             (migration / SELECT permission), got: {text}"
2838        );
2839    }
2840
2841    #[test]
2842    fn d6_ambiguous_table_display_points_at_fase_38_schema_declaration() {
2843        // The Display of `AmbiguousTable` MUST tell an adopter both the
2844        // schemas the table resolved into AND the two real remedies —
2845        // narrow `search_path` OR declare the target schema explicitly
2846        // (the Fase 38 `schema:` declaration the gap report names).
2847        let err = StoreError::AmbiguousTable {
2848            table: "rates".into(),
2849            schemas: vec!["finance".into(), "legacy".into()],
2850        };
2851        let text = err.to_string();
2852        assert!(text.contains("`rates`"), "table name must appear");
2853        assert!(
2854            text.contains("finance") && text.contains("legacy"),
2855            "every resolving schema must appear, got: {text}"
2856        );
2857        assert!(
2858            text.contains("search_path"),
2859            "the search_path remedy must appear, got: {text}"
2860        );
2861        assert!(
2862            text.contains("schema:"),
2863            "the Fase 38 `schema:` declaration must be named (the \
2864             genuinely-superior remedy), got: {text}"
2865        );
2866        assert!(
2867            text.contains("Fase 38"),
2868            "the message must anchor the remedy to Fase 38, got: {text}"
2869        );
2870    }
2871
2872    #[test]
2873    fn d6_display_does_not_leak_internal_sqlstates_or_internal_paths() {
2874        // The Display is operator-facing prose, not a stack trace —
2875        // SQLSTATE codes belong on `SchemaDrift` (where they ARE the
2876        // diagnostic), not on a resolution failure. A regression would
2877        // be code spilling into the friendly arms.
2878        let nr = StoreError::TableNotResolved { table: "t".into() }.to_string();
2879        let amb = StoreError::AmbiguousTable {
2880            table: "t".into(),
2881            schemas: vec!["a".into()],
2882        }
2883        .to_string();
2884        for code in ["42P01", "42703", "42804", "42883"] {
2885            assert!(
2886                !nr.contains(code),
2887                "TableNotResolved must not leak SQLSTATE {code}"
2888            );
2889            assert!(
2890                !amb.contains(code),
2891                "AmbiguousTable must not leak SQLSTATE {code}"
2892            );
2893        }
2894    }
2895}