use std::fmt;
use std::str::FromStr;
use std::time::Duration;
use serde_json::Value as JsonValue;
use sqlx::postgres::{PgArguments, PgConnectOptions, PgPoolOptions, PgRow};
use sqlx::query::Query;
use sqlx::{Column, PgConnection, PgPool, Postgres, Row, TypeInfo};
use crate::store::epistemic::EpistemicError;
use crate::store::filter::{self, build_pg_where, FilterError, SqlValue};
const MAX_POOL_CONNECTIONS: u32 = 10;
const ACQUIRE_TIMEOUT_SECS: u64 = 5;
const IDLE_TIMEOUT_SECS: u64 = 300;
#[derive(Debug, Clone, PartialEq)]
pub enum StoreError {
EmptyConnection,
EmptyEnvVarName,
MissingEnvVar { var: String },
PoolInit { dsn_masked: String, source: String },
InvalidIdentifier { kind: &'static str, name: String },
EmptyData { op: &'static str },
Filter(FilterError),
Epistemic(EpistemicError),
Connect { source: String },
Query { op: &'static str, source: String },
UnsupportedColumnType { column: String, pg_type: String },
Decode { column: String, pg_type: String, source: String },
TableNotResolved { table: String },
AmbiguousTable { table: String, schemas: Vec<String> },
SchemaDrift { op: &'static str, sqlstate: String, source: String },
MissingPerTenantSchemaEnv { store: String, var: String },
DeclaredVsLiveDrift { store: String, drift: String },
}
impl fmt::Display for StoreError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
StoreError::EmptyConnection => write!(
f,
"axonstore `connection` is empty — expected a DSN or an \
`env:VARNAME` reference"
),
StoreError::EmptyEnvVarName => write!(
f,
"axonstore `connection` is the bare prefix `env:` with no \
variable name"
),
StoreError::MissingEnvVar { var } => write!(
f,
"axonstore `connection: \"env:{var}\"` — environment \
variable `{var}` is not set (or not valid UTF-8)"
),
StoreError::PoolInit { dsn_masked, source } => write!(
f,
"axonstore connection pool could not be initialised for \
`{dsn_masked}`: {source}"
),
StoreError::InvalidIdentifier { kind, name } => write!(
f,
"unsafe {kind} identifier `{name}` — must match \
[A-Za-z_][A-Za-z0-9_]* and be ≤ 63 bytes"
),
StoreError::EmptyData { op } => write!(
f,
"axonstore `{op}` was given no column data"
),
StoreError::Filter(e) => write!(f, "where-expression: {e}"),
StoreError::Epistemic(e) => write!(f, "{e}"),
StoreError::Connect { source } => {
write!(f, "axonstore could not reach the database: {source}")
}
StoreError::Query { op, source } => {
write!(f, "axonstore `{op}` SQL failed: {source}")
}
StoreError::UnsupportedColumnType { column, pg_type } => write!(
f,
"column `{column}` has Postgres type `{pg_type}`, outside \
the v1.30.0 supported catalog"
),
StoreError::Decode { column, pg_type, source } => write!(
f,
"column `{column}` (`{pg_type}`) failed to decode: {source}"
),
StoreError::TableNotResolved { table } => write!(
f,
"axonstore could not resolve table `{table}` to a \
relation in any schema of the database — verify the \
table exists in the target database (a deploy-time \
migration is the usual remedy) and that the configured \
credentials can SELECT from it; the introspection scans \
`pg_catalog` independent of `search_path`, so the table \
is genuinely absent on every schema this role can see"
),
StoreError::AmbiguousTable { table, schemas } => write!(
f,
"axonstore table `{table}` is ambiguous — it exists in \
{} schemas ({}) and the connection's `search_path` does \
not disambiguate it; either narrow the role's \
`search_path` so exactly one of the resolving schemas \
is visible, or declare the target schema explicitly on \
the `axonstore` (the Fase 38 `schema:` declaration, \
incl. `schema: env:VAR` per-tenant)",
schemas.len(),
schemas.join(", "),
),
StoreError::SchemaDrift { op, sqlstate, source } => write!(
f,
"axonstore `{op}` hit live schema drift (SQLSTATE \
{sqlstate}) — the cached schema is stale: {source}"
),
StoreError::MissingPerTenantSchemaEnv { store, var } => write!(
f,
"axon-T806 axonstore `{store}` declares `schema: env:{var}` \
but environment variable `{var}` is not set at deploy \
time. The per-tenant schema namespace is required to \
resolve the store's column manifest entry. Either \
export `{var}` with the SQL schema name (e.g. \
`tenant_42`), or declare the schema differently \
(inline `schema {{ … }}` block, or manifest reference \
`schema: \"qualified.name\"`). Never a silent fallback."
),
StoreError::DeclaredVsLiveDrift { store, drift } => write!(
f,
"axon-T807 axonstore `{store}` declared column schema \
disagrees with the live database: {drift}. The deploy \
fails fail-closed (D8 strengthening). Remedy: run `axon \
store introspect {store}` to refresh the manifest, run \
the missing migration on the database, or fix the \
declared `schema:` block to match the live shape."
),
}
}
}
impl std::error::Error for StoreError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
StoreError::Filter(e) => Some(e),
StoreError::Epistemic(e) => Some(e),
_ => None,
}
}
}
impl StoreError {
pub fn is_schema_drift(&self) -> bool {
matches!(self, StoreError::SchemaDrift { .. })
}
}
impl From<FilterError> for StoreError {
fn from(e: FilterError) -> Self {
StoreError::Filter(e)
}
}
impl From<EpistemicError> for StoreError {
fn from(e: EpistemicError) -> Self {
StoreError::Epistemic(e)
}
}
pub fn resolve_dsn(connection: &str) -> Result<String, StoreError> {
let conn = connection.trim();
if conn.is_empty() {
return Err(StoreError::EmptyConnection);
}
match conn.strip_prefix("env:") {
Some(var) => {
let var = var.trim();
if var.is_empty() {
return Err(StoreError::EmptyEnvVarName);
}
std::env::var(var).map_err(|_| StoreError::MissingEnvVar {
var: var.to_string(),
})
}
None => Ok(conn.to_string()),
}
}
fn mask_dsn(dsn: &str) -> String {
if let Some(at) = dsn.find('@') {
if let Some(colon) = dsn[..at].rfind(':') {
return format!("{}***{}", &dsn[..=colon], &dsn[at..]);
}
}
dsn.to_string()
}
pub fn mask_dsn_pub(dsn: &str) -> String {
mask_dsn(dsn)
}
fn application_name_for(store_name: &str) -> String {
application_name_for_with_namespace(store_name, None)
}
pub(crate) fn application_name_for_with_namespace(
store_name: &str,
namespace: Option<&str>,
) -> String {
const MAX: usize = 63;
let base = if store_name.is_empty() {
"axon-store".to_string()
} else {
format!("axon-store/{store_name}")
};
let full = match namespace {
Some(ns) if !ns.is_empty() => format!("{base}/{ns}"),
_ => base,
};
if full.len() <= MAX {
return full;
}
let mut cut = MAX;
while cut > 0 && !full.is_char_boundary(cut) {
cut -= 1;
}
full[..cut].to_string()
}
fn check_identifier(name: &str, kind: &'static str) -> Result<(), StoreError> {
if filter::is_safe_identifier(name) {
Ok(())
} else {
Err(StoreError::InvalidIdentifier {
kind,
name: name.to_string(),
})
}
}
fn qualified_relation(schema: Option<&str>, table: &str) -> String {
match schema {
Some(s) if filter::is_safe_identifier(s) => {
format!("\"{s}\".\"{table}\"")
}
_ => format!("\"{table}\""),
}
}
pub fn build_select_sql(
table: &str,
schema: Option<&str>,
where_expr: &str,
bindings: &std::collections::HashMap<String, String>,
column_types: &std::collections::HashMap<String, String>,
) -> Result<(String, Vec<SqlValue>), StoreError> {
check_identifier(table, "table")?;
let (clause, params) = build_pg_where(where_expr, 0, bindings, column_types)?;
let relation = qualified_relation(schema, table);
Ok((format!("SELECT * FROM {relation} WHERE {clause}"), params))
}
pub fn build_delete_sql(
table: &str,
schema: Option<&str>,
where_expr: &str,
bindings: &std::collections::HashMap<String, String>,
column_types: &std::collections::HashMap<String, String>,
) -> Result<(String, Vec<SqlValue>), StoreError> {
check_identifier(table, "table")?;
let (clause, params) = build_pg_where(where_expr, 0, bindings, column_types)?;
let relation = qualified_relation(schema, table);
Ok((format!("DELETE FROM {relation} WHERE {clause}"), params))
}
#[derive(Debug, Clone)]
pub(crate) struct ResolvedTable {
pub schema: String,
pub column_types: std::collections::HashMap<String, String>,
}
const SCHEMA_CACHE_CAPACITY: usize = 10_000;
struct SchemaCache {
entries: std::collections::HashMap<
(String, String),
(std::sync::Arc<ResolvedTable>, u64),
>,
next_seq: u64,
capacity: usize,
}
impl SchemaCache {
fn new(capacity: usize) -> Self {
Self {
entries: std::collections::HashMap::new(),
next_seq: 0,
capacity,
}
}
fn get(
&self,
key: &(String, String),
) -> Option<std::sync::Arc<ResolvedTable>> {
self.entries.get(key).map(|(arc, _)| std::sync::Arc::clone(arc))
}
fn insert(
&mut self,
key: (String, String),
resolved: std::sync::Arc<ResolvedTable>,
) {
if self.entries.len() >= self.capacity
&& !self.entries.contains_key(&key)
{
let oldest = self
.entries
.iter()
.min_by_key(|item| (item.1).1)
.map(|item| item.0.clone());
if let Some(oldest) = oldest {
self.entries.remove(&oldest);
}
}
let seq = self.next_seq;
self.next_seq = self.next_seq.wrapping_add(1);
self.entries.insert(key, (resolved, seq));
}
fn evict(&mut self, key: &(String, String)) {
self.entries.remove(key);
}
}
static SCHEMA_CACHE: std::sync::LazyLock<std::sync::Mutex<SchemaCache>> =
std::sync::LazyLock::new(|| {
std::sync::Mutex::new(SchemaCache::new(SCHEMA_CACHE_CAPACITY))
});
pub fn resolve_from_rows(
table: &str,
rows: Vec<(String, String, String)>,
) -> Result<(String, std::collections::HashMap<String, String>), StoreError> {
let mut by_schema: std::collections::BTreeMap<
String,
std::collections::HashMap<String, String>,
> = std::collections::BTreeMap::new();
for (schema, column, udt) in rows {
by_schema.entry(schema).or_default().insert(column, udt);
}
match by_schema.len() {
0 => Err(StoreError::TableNotResolved {
table: table.to_string(),
}),
1 => Ok(by_schema.into_iter().next().unwrap()),
_ => Err(StoreError::AmbiguousTable {
table: table.to_string(),
schemas: by_schema.into_keys().collect(),
}),
}
}
fn collect_triples(rows: &[PgRow]) -> Vec<(String, String, String)> {
let mut out = Vec::with_capacity(rows.len());
for row in rows {
let schema: String = row.try_get("schema_name").unwrap_or_default();
let column: String = row.try_get("column_name").unwrap_or_default();
let udt: String = row.try_get("type_name").unwrap_or_default();
if !schema.is_empty() && !column.is_empty() && !udt.is_empty() {
out.push((schema, column, udt));
}
}
out
}
pub fn is_schema_drift_sqlstate(code: &str) -> bool {
matches!(code, "42P01" | "42703" | "42804" | "42883")
}
pub(crate) fn classify_sql_error(
op: &'static str,
err: sqlx::Error,
) -> StoreError {
let sqlstate = err
.as_database_error()
.and_then(|db| db.code())
.map(|c| c.into_owned());
match sqlstate {
Some(code) if is_schema_drift_sqlstate(&code) => {
StoreError::SchemaDrift {
op,
sqlstate: code,
source: err.to_string(),
}
}
_ => StoreError::Query { op, source: err.to_string() },
}
}
pub(crate) async fn introspect_conn(
conn: &mut PgConnection,
table: &str,
) -> Result<std::sync::Arc<ResolvedTable>, StoreError> {
let primary = sqlx::query(
"SELECT n.nspname AS schema_name, a.attname AS column_name, \
t.typname AS type_name \
FROM pg_catalog.pg_class c \
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace \
JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid \
JOIN pg_catalog.pg_type t ON t.oid = a.atttypid \
WHERE c.oid = to_regclass($1) \
AND a.attnum > 0 AND NOT a.attisdropped",
)
.persistent(false)
.bind(format!("\"{table}\""))
.fetch_all(&mut *conn)
.await
.map_err(|e| StoreError::Query {
op: "resolve",
source: e.to_string(),
})?;
let resolution: Result<(String, std::collections::HashMap<String, String>), StoreError> = {
let primary_rows = collect_triples(&primary);
if !primary_rows.is_empty() {
resolve_from_rows(table, primary_rows)
} else {
let scan = sqlx::query(
"SELECT n.nspname AS schema_name, \
a.attname AS column_name, t.typname AS type_name \
FROM pg_catalog.pg_class c \
JOIN pg_catalog.pg_namespace n \
ON n.oid = c.relnamespace \
JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid \
JOIN pg_catalog.pg_type t ON t.oid = a.atttypid \
WHERE c.relname = $1 \
AND c.relkind IN ('r', 'v', 'm', 'p', 'f') \
AND left(n.nspname, 3) <> 'pg_' \
AND n.nspname <> 'information_schema' \
AND a.attnum > 0 AND NOT a.attisdropped",
)
.persistent(false)
.bind(table)
.fetch_all(&mut *conn)
.await
.map_err(|e| StoreError::Query {
op: "resolve",
source: e.to_string(),
})?;
resolve_from_rows(table, collect_triples(&scan))
}
};
match resolution {
Ok((schema, column_types)) => {
Ok(std::sync::Arc::new(ResolvedTable { schema, column_types }))
}
Err(err) => {
match &err {
StoreError::TableNotResolved { table } => {
tracing::error!(
target: "axon::store::resolve",
store_table = %table,
kind = "table_not_resolved",
d_letter = "D6",
"axonstore could not resolve `{table}` on any \
schema visible to this role — see StoreError \
Display for the actionable remedy"
);
}
StoreError::AmbiguousTable { table, schemas } => {
tracing::error!(
target: "axon::store::resolve",
store_table = %table,
kind = "ambiguous_table",
schemas = %schemas.join(","),
d_letter = "D6",
"axonstore `{table}` resolved in {n} schemas — \
declare the target schema or narrow \
`search_path`",
n = schemas.len(),
);
}
other => {
tracing::error!(
target: "axon::store::resolve",
store_table = %table,
kind = "resolve_failed",
d_letter = "D6",
"axonstore resolution of `{table}` failed: \
{other}"
);
}
}
Err(err)
}
}
}
fn write_cast(
column_types: &std::collections::HashMap<String, String>,
column: &str,
) -> String {
match column_types.get(column) {
Some(udt) if filter::is_safe_identifier(udt) => format!("::{udt}"),
_ => String::new(),
}
}
pub fn build_insert_sql(
table: &str,
schema: Option<&str>,
data: &[(String, SqlValue)],
column_types: &std::collections::HashMap<String, String>,
) -> Result<(String, Vec<SqlValue>), StoreError> {
check_identifier(table, "table")?;
if data.is_empty() {
return Err(StoreError::EmptyData { op: "insert" });
}
let mut columns: Vec<String> = Vec::with_capacity(data.len());
let mut value_frags: Vec<String> = Vec::with_capacity(data.len());
let mut params: Vec<SqlValue> = Vec::new();
let mut idx = 1usize;
for (col, val) in data {
check_identifier(col, "column")?;
columns.push(format!("\"{col}\""));
match val {
SqlValue::Null => value_frags.push("NULL".to_string()),
bound => {
value_frags.push(format!("${idx}{}", write_cast(column_types, col)));
params.push(bound.clone());
idx += 1;
}
}
}
let sql = format!(
"INSERT INTO {} ({}) VALUES ({})",
qualified_relation(schema, table),
columns.join(", "),
value_frags.join(", "),
);
Ok((sql, params))
}
pub fn build_update_sql(
table: &str,
schema: Option<&str>,
where_expr: &str,
data: &[(String, SqlValue)],
bindings: &std::collections::HashMap<String, String>,
column_types: &std::collections::HashMap<String, String>,
) -> Result<(String, Vec<SqlValue>), StoreError> {
check_identifier(table, "table")?;
if data.is_empty() {
return Err(StoreError::EmptyData { op: "mutate" });
}
let mut set_frags: Vec<String> = Vec::with_capacity(data.len());
let mut params: Vec<SqlValue> = Vec::new();
let mut idx = 1usize;
for (col, val) in data {
check_identifier(col, "column")?;
match val {
SqlValue::Null => set_frags.push(format!("\"{col}\" = NULL")),
bound => {
set_frags.push(format!(
"\"{col}\" = ${idx}{}",
write_cast(column_types, col)
));
params.push(bound.clone());
idx += 1;
}
}
}
let set_param_count = idx - 1;
let (clause, where_params) =
build_pg_where(where_expr, set_param_count, bindings, column_types)?;
params.extend(where_params);
let sql = format!(
"UPDATE {} SET {} WHERE {clause}",
qualified_relation(schema, table),
set_frags.join(", "),
);
Ok((sql, params))
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PgTypeClass {
Bool,
Int2,
Int4,
Int8,
Float4,
Float8,
Numeric,
Text,
Uuid,
TimestampTz,
Timestamp,
Date,
Time,
Json,
Bytea,
}
pub fn classify_pg_type(pg_type: &str) -> Option<PgTypeClass> {
Some(match pg_type.to_ascii_uppercase().as_str() {
"BOOL" => PgTypeClass::Bool,
"INT2" => PgTypeClass::Int2,
"INT4" => PgTypeClass::Int4,
"INT8" => PgTypeClass::Int8,
"FLOAT4" => PgTypeClass::Float4,
"FLOAT8" => PgTypeClass::Float8,
"NUMERIC" => PgTypeClass::Numeric,
"TEXT" | "VARCHAR" | "BPCHAR" | "NAME" => PgTypeClass::Text,
"UUID" => PgTypeClass::Uuid,
"TIMESTAMPTZ" => PgTypeClass::TimestampTz,
"TIMESTAMP" => PgTypeClass::Timestamp,
"DATE" => PgTypeClass::Date,
"TIME" => PgTypeClass::Time,
"JSON" | "JSONB" => PgTypeClass::Json,
"BYTEA" => PgTypeClass::Bytea,
_ => return None,
})
}
#[derive(Debug, Clone, PartialEq)]
pub struct StoreRow {
pub columns: Vec<(String, JsonValue)>,
}
impl StoreRow {
pub fn get(&self, column: &str) -> Option<&JsonValue> {
self.columns
.iter()
.find(|(name, _)| name == column)
.map(|(_, value)| value)
}
pub fn to_json(&self) -> JsonValue {
JsonValue::Object(self.columns.iter().cloned().collect())
}
}
fn pg_value_to_json(
row: &PgRow,
idx: usize,
column: &str,
pg_type: &str,
) -> Result<JsonValue, StoreError> {
let class = classify_pg_type(pg_type).ok_or_else(|| {
StoreError::UnsupportedColumnType {
column: column.to_string(),
pg_type: pg_type.to_string(),
}
})?;
macro_rules! decode {
($t:ty, $conv:expr) => {{
let opt: Option<$t> = row.try_get(idx).map_err(|e| {
StoreError::Decode {
column: column.to_string(),
pg_type: pg_type.to_string(),
source: e.to_string(),
}
})?;
match opt {
None => JsonValue::Null,
Some(v) => $conv(v),
}
}};
}
Ok(match class {
PgTypeClass::Bool => decode!(bool, JsonValue::Bool),
PgTypeClass::Int2 => decode!(i16, |v| JsonValue::from(v as i64)),
PgTypeClass::Int4 => decode!(i32, |v| JsonValue::from(v as i64)),
PgTypeClass::Int8 => decode!(i64, JsonValue::from),
PgTypeClass::Float4 => decode!(f32, |v| JsonValue::from(v as f64)),
PgTypeClass::Float8 => decode!(f64, JsonValue::from),
PgTypeClass::Numeric => {
decode!(sqlx::types::BigDecimal, |v: sqlx::types::BigDecimal| {
JsonValue::String(v.to_string())
})
}
PgTypeClass::Text => decode!(String, JsonValue::String),
PgTypeClass::Uuid => {
decode!(uuid::Uuid, |v: uuid::Uuid| JsonValue::String(
v.hyphenated().to_string()
))
}
PgTypeClass::TimestampTz => {
decode!(
chrono::DateTime<chrono::Utc>,
|v: chrono::DateTime<chrono::Utc>| JsonValue::String(
v.to_rfc3339()
)
)
}
PgTypeClass::Timestamp => {
decode!(chrono::NaiveDateTime, |v: chrono::NaiveDateTime| {
JsonValue::String(
v.format("%Y-%m-%dT%H:%M:%S%.f").to_string(),
)
})
}
PgTypeClass::Date => {
decode!(chrono::NaiveDate, |v: chrono::NaiveDate| {
JsonValue::String(v.to_string())
})
}
PgTypeClass::Time => {
decode!(chrono::NaiveTime, |v: chrono::NaiveTime| {
JsonValue::String(v.to_string())
})
}
PgTypeClass::Json => decode!(JsonValue, |v| v),
PgTypeClass::Bytea => decode!(Vec<u8>, |v: Vec<u8>| {
use base64::Engine;
JsonValue::String(
base64::engine::general_purpose::STANDARD.encode(v),
)
}),
})
}
pub(crate) fn map_pg_row(row: &PgRow) -> Result<StoreRow, StoreError> {
let mut columns = Vec::with_capacity(row.len());
for (idx, col) in row.columns().iter().enumerate() {
let name = col.name().to_string();
let pg_type = col.type_info().name().to_string();
let value = pg_value_to_json(row, idx, &name, &pg_type)?;
columns.push((name, value));
}
Ok(StoreRow { columns })
}
pub(crate) fn bind_value<'q>(
q: Query<'q, Postgres, PgArguments>,
value: &SqlValue,
) -> Query<'q, Postgres, PgArguments> {
match value {
SqlValue::Text(s) => q.bind(s.clone()),
SqlValue::Integer(n) => q.bind(*n),
SqlValue::Float(x) => q.bind(*x),
SqlValue::Boolean(b) => q.bind(*b),
SqlValue::Null => q.bind(Option::<String>::None),
}
}
#[derive(Clone)]
pub struct PostgresStoreBackend {
dsn: String,
pool: PgPool,
}
impl fmt::Debug for PostgresStoreBackend {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PostgresStoreBackend")
.field("dsn", &mask_dsn(&self.dsn))
.finish()
}
}
impl PostgresStoreBackend {
pub fn connect(connection: &str) -> Result<Self, StoreError> {
Self::connect_named(connection, "")
}
pub fn connect_named(
connection: &str,
store_name: &str,
) -> Result<Self, StoreError> {
Self::connect_named_with_namespace(connection, store_name, None)
}
pub fn connect_named_with_namespace(
connection: &str,
store_name: &str,
namespace: Option<&str>,
) -> Result<Self, StoreError> {
let dsn = resolve_dsn(connection)?;
let opts = PgConnectOptions::from_str(&dsn)
.map_err(|e| StoreError::PoolInit {
dsn_masked: mask_dsn(&dsn),
source: e.to_string(),
})?
.statement_cache_capacity(0)
.application_name(&application_name_for_with_namespace(
store_name, namespace,
));
let pool = PgPoolOptions::new()
.max_connections(MAX_POOL_CONNECTIONS)
.min_connections(0)
.acquire_timeout(Duration::from_secs(ACQUIRE_TIMEOUT_SECS))
.idle_timeout(Duration::from_secs(IDLE_TIMEOUT_SECS))
.after_release(|conn, _meta| Box::pin(async move {
sqlx::query("DEALLOCATE ALL")
.persistent(false)
.execute(&mut *conn)
.await?;
Ok(true)
}))
.connect_lazy_with(opts);
Ok(Self { dsn, pool })
}
pub fn masked_dsn(&self) -> String {
mask_dsn(&self.dsn)
}
pub fn pool(&self) -> &PgPool {
&self.pool
}
pub async fn query(
&self,
table: &str,
where_expr: &str,
bindings: &std::collections::HashMap<String, String>,
) -> Result<Vec<StoreRow>, StoreError> {
if let Some(resolved) = self.cached_schema(table) {
let (sql, params) = build_select_sql(
table,
Some(resolved.schema.as_str()),
where_expr,
bindings,
&resolved.column_types,
)?;
let mut q = sqlx::query(&sql).persistent(false);
for value in ¶ms {
q = bind_value(q, value);
}
match q.fetch_all(&self.pool).await {
Ok(rows) => return rows.iter().map(map_pg_row).collect(),
Err(e) => {
let err = classify_sql_error("retrieve", e);
if !err.is_schema_drift() {
return Err(err);
}
self.evict_schema(table);
}
}
}
let mut tx = self.pool.begin().await.map_err(|e| {
StoreError::Connect { source: e.to_string() }
})?;
let resolved = introspect_conn(&mut tx, table).await;
let no_types = std::collections::HashMap::new();
let (schema, column_types) = match &resolved {
Ok(r) => (Some(r.schema.as_str()), &r.column_types),
Err(e) => {
tracing::warn!(
target: "axon::store",
table = %table,
op = "introspect_in_tx",
error = %e,
d_letter = "D3+38.x.a",
"store introspection failed inside the operation \
transaction; falling back to bare-table SQL — the \
operation will likely fail with the same root cause. \
If the error mentions `prepared statement \"sqlx_s_N\" \
already exists`, the deployment is behind a \
transaction-mode pooler and this axon-lang version \
should already mitigate via `.persistent(false)` (D1) \
+ `DEALLOCATE ALL` after release (D2); check the \
pooler logs."
);
(None, &no_types)
}
};
let (sql, params) =
build_select_sql(table, schema, where_expr, bindings, column_types)?;
let mut q = sqlx::query(&sql).persistent(false);
for value in ¶ms {
q = bind_value(q, value);
}
let rows = q
.fetch_all(&mut *tx)
.await
.map_err(|e| classify_sql_error("retrieve", e))?;
tx.commit().await.map_err(|e| StoreError::Connect {
source: e.to_string(),
})?;
if let Ok(r) = resolved {
self.cache_schema(table, r);
}
rows.iter().map(map_pg_row).collect()
}
pub(crate) fn cached_schema(
&self,
table: &str,
) -> Option<std::sync::Arc<ResolvedTable>> {
let key = (self.dsn.clone(), table.to_string());
SCHEMA_CACHE.lock().unwrap().get(&key)
}
pub(crate) fn cache_schema(
&self,
table: &str,
resolved: std::sync::Arc<ResolvedTable>,
) {
if !resolved.column_types.is_empty() {
let key = (self.dsn.clone(), table.to_string());
SCHEMA_CACHE.lock().unwrap().insert(key, resolved);
}
}
pub(crate) fn evict_schema(&self, table: &str) {
let key = (self.dsn.clone(), table.to_string());
SCHEMA_CACHE.lock().unwrap().evict(&key);
tracing::warn!(
target: "axon::store::cache",
store_table = %table,
masked_dsn = %mask_dsn(&self.dsn),
kind = "schema_drift_evict",
d_letter = "D9",
"axonstore evicted cached schema for `{table}` after a \
schema-drift SQLSTATE — the next operation will \
re-introspect against the live table"
);
}
pub(crate) async fn warm_schema(&self, table: &str) -> Result<(), StoreError> {
if self.cached_schema(table).is_some() {
return Ok(());
}
let mut conn = self.pool.acquire().await.map_err(|e| {
StoreError::Connect { source: e.to_string() }
})?;
let resolved = introspect_conn(&mut conn, table).await?;
self.cache_schema(table, resolved);
Ok(())
}
pub async fn insert(
&self,
table: &str,
data: &[(String, SqlValue)],
) -> Result<u64, StoreError> {
if let Some(resolved) = self.cached_schema(table) {
let (sql, params) = build_insert_sql(
table,
Some(resolved.schema.as_str()),
data,
&resolved.column_types,
)?;
let mut q = sqlx::query(&sql).persistent(false);
for value in ¶ms {
q = bind_value(q, value);
}
match q.execute(&self.pool).await {
Ok(result) => return Ok(result.rows_affected()),
Err(e) => {
let err = classify_sql_error("persist", e);
if !err.is_schema_drift() {
return Err(err);
}
self.evict_schema(table);
}
}
}
let mut tx = self.pool.begin().await.map_err(|e| {
StoreError::Connect { source: e.to_string() }
})?;
let resolved = introspect_conn(&mut tx, table).await;
let no_types = std::collections::HashMap::new();
let (schema, column_types) = match &resolved {
Ok(r) => (Some(r.schema.as_str()), &r.column_types),
Err(e) => {
tracing::warn!(
target: "axon::store",
table = %table,
op = "introspect_in_tx_persist",
error = %e,
d_letter = "D3+38.x.a",
"store introspection failed inside the persist \
transaction; falling back to bare-table INSERT — \
the persist will likely fail with the same root cause."
);
(None, &no_types)
}
};
let (sql, params) = build_insert_sql(table, schema, data, column_types)?;
let mut q = sqlx::query(&sql).persistent(false);
for value in ¶ms {
q = bind_value(q, value);
}
let result = q
.execute(&mut *tx)
.await
.map_err(|e| classify_sql_error("persist", e))?;
tx.commit().await.map_err(|e| StoreError::Connect {
source: e.to_string(),
})?;
if let Ok(r) = resolved {
self.cache_schema(table, r);
}
Ok(result.rows_affected())
}
pub async fn mutate(
&self,
table: &str,
where_expr: &str,
data: &[(String, SqlValue)],
bindings: &std::collections::HashMap<String, String>,
) -> Result<u64, StoreError> {
if let Some(resolved) = self.cached_schema(table) {
let (sql, params) = build_update_sql(
table,
Some(resolved.schema.as_str()),
where_expr,
data,
bindings,
&resolved.column_types,
)?;
let mut q = sqlx::query(&sql).persistent(false);
for value in ¶ms {
q = bind_value(q, value);
}
match q.execute(&self.pool).await {
Ok(result) => return Ok(result.rows_affected()),
Err(e) => {
let err = classify_sql_error("mutate", e);
if !err.is_schema_drift() {
return Err(err);
}
self.evict_schema(table);
}
}
}
let mut tx = self.pool.begin().await.map_err(|e| {
StoreError::Connect { source: e.to_string() }
})?;
let resolved = introspect_conn(&mut tx, table).await;
let no_types = std::collections::HashMap::new();
let (schema, column_types) = match &resolved {
Ok(r) => (Some(r.schema.as_str()), &r.column_types),
Err(e) => {
tracing::warn!(
target: "axon::store",
table = %table,
op = "introspect_in_tx_mutate",
error = %e,
d_letter = "D3+38.x.a",
"store introspection failed inside the mutate \
transaction; falling back to bare-table UPDATE — \
the mutate will likely fail with the same root cause."
);
(None, &no_types)
}
};
let (sql, params) = build_update_sql(
table, schema, where_expr, data, bindings, column_types,
)?;
let mut q = sqlx::query(&sql).persistent(false);
for value in ¶ms {
q = bind_value(q, value);
}
let result = q
.execute(&mut *tx)
.await
.map_err(|e| classify_sql_error("mutate", e))?;
tx.commit().await.map_err(|e| StoreError::Connect {
source: e.to_string(),
})?;
if let Ok(r) = resolved {
self.cache_schema(table, r);
}
Ok(result.rows_affected())
}
pub async fn purge(
&self,
table: &str,
where_expr: &str,
bindings: &std::collections::HashMap<String, String>,
) -> Result<u64, StoreError> {
if let Some(resolved) = self.cached_schema(table) {
let (sql, params) = build_delete_sql(
table,
Some(resolved.schema.as_str()),
where_expr,
bindings,
&resolved.column_types,
)?;
let mut q = sqlx::query(&sql).persistent(false);
for value in ¶ms {
q = bind_value(q, value);
}
match q.execute(&self.pool).await {
Ok(result) => return Ok(result.rows_affected()),
Err(e) => {
let err = classify_sql_error("purge", e);
if !err.is_schema_drift() {
return Err(err);
}
self.evict_schema(table);
}
}
}
let mut tx = self.pool.begin().await.map_err(|e| {
StoreError::Connect { source: e.to_string() }
})?;
let resolved = introspect_conn(&mut tx, table).await;
let no_types = std::collections::HashMap::new();
let (schema, column_types) = match &resolved {
Ok(r) => (Some(r.schema.as_str()), &r.column_types),
Err(e) => {
tracing::warn!(
target: "axon::store",
table = %table,
op = "introspect_in_tx_purge",
error = %e,
d_letter = "D3+38.x.a",
"store introspection failed inside the purge \
transaction; falling back to bare-table DELETE — \
the purge will likely fail with the same root cause."
);
(None, &no_types)
}
};
let (sql, params) =
build_delete_sql(table, schema, where_expr, bindings, column_types)?;
let mut q = sqlx::query(&sql).persistent(false);
for value in ¶ms {
q = bind_value(q, value);
}
let result = q
.execute(&mut *tx)
.await
.map_err(|e| classify_sql_error("purge", e))?;
tx.commit().await.map_err(|e| StoreError::Connect {
source: e.to_string(),
})?;
if let Ok(r) = resolved {
self.cache_schema(table, r);
}
Ok(result.rows_affected())
}
pub async fn ping(&self) -> Result<(), StoreError> {
sqlx::query("SELECT 1")
.persistent(false)
.execute(&self.pool)
.await
.map(|_| ())
.map_err(|e| StoreError::Connect { source: e.to_string() })
}
}
#[cfg(test)]
mod tests {
use super::*;
fn txt(s: &str) -> SqlValue {
SqlValue::Text(s.to_string())
}
fn nb() -> std::collections::HashMap<String, String> {
std::collections::HashMap::new()
}
#[test]
fn resolve_empty_connection_errors() {
assert_eq!(resolve_dsn(""), Err(StoreError::EmptyConnection));
assert_eq!(resolve_dsn(" "), Err(StoreError::EmptyConnection));
}
#[test]
fn resolve_literal_dsn_is_returned_verbatim() {
let dsn = "postgresql://u:p@localhost:5432/axon";
assert_eq!(resolve_dsn(dsn), Ok(dsn.to_string()));
}
#[test]
fn resolve_literal_dsn_is_trimmed() {
assert_eq!(
resolve_dsn(" postgresql://h/db "),
Ok("postgresql://h/db".to_string())
);
}
#[test]
fn resolve_bare_env_prefix_errors() {
assert_eq!(resolve_dsn("env:"), Err(StoreError::EmptyEnvVarName));
assert_eq!(resolve_dsn("env: "), Err(StoreError::EmptyEnvVarName));
}
#[test]
fn resolve_missing_env_var_errors() {
match resolve_dsn("env:AXON_NONEXISTENT_VAR_FASE35C") {
Err(StoreError::MissingEnvVar { var }) => {
assert_eq!(var, "AXON_NONEXISTENT_VAR_FASE35C");
}
other => panic!("expected MissingEnvVar, got {other:?}"),
}
}
#[test]
fn resolve_env_var_reads_the_environment() {
let resolved = resolve_dsn("env:PATH").expect("PATH resolves");
assert_eq!(resolved, std::env::var("PATH").unwrap());
assert!(!resolved.is_empty());
}
#[tokio::test]
async fn connect_with_valid_dsn_is_lazy_and_succeeds() {
let backend =
PostgresStoreBackend::connect("postgresql://u:p@localhost:5432/db")
.expect("a well-formed DSN builds a lazy pool");
let _ = format!("{backend:?}");
}
#[tokio::test]
async fn connect_masks_the_password_in_dsn_and_debug() {
let fake_secret = "fakecred0";
let backend = PostgresStoreBackend::connect(&format!(
"postgresql://user:{fake_secret}@localhost:5432/axon"
))
.unwrap();
let masked = backend.masked_dsn();
assert!(!masked.contains(fake_secret), "password must be masked");
assert!(masked.contains("***"));
assert!(!format!("{backend:?}").contains(fake_secret));
}
#[test]
fn connect_empty_connection_errors() {
assert!(matches!(
PostgresStoreBackend::connect(""),
Err(StoreError::EmptyConnection)
));
}
#[test]
fn connect_missing_env_var_errors() {
assert!(matches!(
PostgresStoreBackend::connect("env:AXON_NONEXISTENT_VAR_FASE35C"),
Err(StoreError::MissingEnvVar { .. })
));
}
#[test]
fn connect_malformed_dsn_errors() {
assert!(matches!(
PostgresStoreBackend::connect("not a valid dsn at all"),
Err(StoreError::PoolInit { .. })
));
}
#[tokio::test]
async fn connect_named_with_valid_dsn_is_lazy_and_succeeds() {
let backend = PostgresStoreBackend::connect_named(
"postgresql://u:p@localhost:5432/db",
"claims",
)
.expect("a well-formed DSN builds a lazy pool");
let _ = format!("{backend:?}");
}
#[test]
fn connect_named_malformed_dsn_errors() {
assert!(matches!(
PostgresStoreBackend::connect_named("not a dsn", "claims"),
Err(StoreError::PoolInit { .. })
));
}
#[test]
fn application_name_carries_the_store_name() {
assert_eq!(application_name_for("claims"), "axon-store/claims");
assert_eq!(
application_name_for("tenant_audit_log"),
"axon-store/tenant_audit_log"
);
}
#[test]
fn application_name_empty_store_is_bare() {
assert_eq!(application_name_for(""), "axon-store");
}
#[test]
fn application_name_capped_at_postgres_namedatalen() {
let long = "s".repeat(200);
let name = application_name_for(&long);
assert!(name.len() <= 63, "must fit NAMEDATALEN-1, got {}", name.len());
assert!(name.starts_with("axon-store/s"));
}
#[test]
fn application_name_truncation_respects_char_boundaries() {
let name = application_name_for(&"é".repeat(100));
assert!(name.len() <= 63);
assert!(name.is_char_boundary(name.len()));
}
#[test]
fn select_with_filter() {
let (sql, params) =
build_select_sql("users", None, "id = 1", &nb(), &nb()).unwrap();
assert_eq!(sql, "SELECT * FROM \"users\" WHERE \"id\"::text = $1");
assert_eq!(params, vec![SqlValue::Integer(1)]);
}
#[test]
fn select_casts_the_filter_value_to_its_introspected_column_type() {
let types = std::collections::HashMap::from([(
"id".to_string(),
"int4".to_string(),
)]);
let (sql, _) =
build_select_sql("users", None, "id = 1", &nb(), &types).unwrap();
assert_eq!(sql, "SELECT * FROM \"users\" WHERE \"id\" = $1::int4");
}
#[test]
fn select_with_empty_filter_renders_where_true() {
let (sql, params) =
build_select_sql("users", None, "", &nb(), &nb()).unwrap();
assert_eq!(sql, "SELECT * FROM \"users\" WHERE TRUE");
assert!(params.is_empty());
}
#[test]
fn select_rejects_unsafe_table_name() {
assert!(matches!(
build_select_sql("users; DROP TABLE x", None, "", &nb(), &nb()),
Err(StoreError::InvalidIdentifier { kind: "table", .. })
));
}
#[test]
fn select_propagates_filter_errors() {
assert!(matches!(
build_select_sql("users", None, "id = 1 AND", &nb(), &nb()),
Err(StoreError::Filter(_))
));
}
#[test]
fn delete_with_filter() {
let (sql, params) =
build_delete_sql("sessions", None, "expired = true", &nb(), &nb())
.unwrap();
assert_eq!(sql, "DELETE FROM \"sessions\" WHERE \"expired\"::text = $1");
assert_eq!(params, vec![SqlValue::Boolean(true)]);
}
#[test]
fn delete_rejects_unsafe_table() {
assert!(matches!(
build_delete_sql("evil\"table", None, "a = 1", &nb(), &nb()),
Err(StoreError::InvalidIdentifier { .. })
));
}
#[test]
fn insert_basic() {
let (sql, params) = build_insert_sql(
"users",
None,
&[("name".into(), txt("Alice")), ("age".into(), SqlValue::Integer(30))],
&nb(),
)
.unwrap();
assert_eq!(
sql,
"INSERT INTO \"users\" (\"name\", \"age\") VALUES ($1, $2)"
);
assert_eq!(params, vec![txt("Alice"), SqlValue::Integer(30)]);
}
#[test]
fn insert_renders_null_inline_consuming_no_placeholder() {
let (sql, params) = build_insert_sql(
"t",
None,
&[
("a".into(), SqlValue::Integer(1)),
("b".into(), SqlValue::Null),
("c".into(), txt("x")),
],
&nb(),
)
.unwrap();
assert_eq!(
sql,
"INSERT INTO \"t\" (\"a\", \"b\", \"c\") VALUES ($1, NULL, $2)"
);
assert_eq!(params, vec![SqlValue::Integer(1), txt("x")]);
}
#[test]
fn insert_empty_data_errors() {
assert_eq!(
build_insert_sql("t", None, &[], &nb()),
Err(StoreError::EmptyData { op: "insert" })
);
}
#[test]
fn insert_rejects_unsafe_column_name() {
assert!(matches!(
build_insert_sql("t", None, &[("a\"; DROP".into(), SqlValue::Integer(1))], &nb()),
Err(StoreError::InvalidIdentifier { kind: "column", .. })
));
}
#[test]
fn insert_rejects_unsafe_table_name() {
assert!(matches!(
build_insert_sql("t t", None, &[("a".into(), SqlValue::Integer(1))], &nb()),
Err(StoreError::InvalidIdentifier { kind: "table", .. })
));
}
#[test]
fn update_basic_where_offset_continues_after_set() {
let (sql, params) = build_update_sql(
"users",
None,
"id = 5",
&[("name".into(), txt("Bob")), ("age".into(), SqlValue::Integer(40))],
&nb(),
&nb(),
)
.unwrap();
assert_eq!(
sql,
"UPDATE \"users\" SET \"name\" = $1, \"age\" = $2 \
WHERE \"id\"::text = $3"
);
assert_eq!(
params,
vec![txt("Bob"), SqlValue::Integer(40), SqlValue::Integer(5)]
);
}
#[test]
fn update_null_set_value_shifts_where_offset_by_non_null_count() {
let (sql, params) = build_update_sql(
"users",
None,
"id = 5",
&[("name".into(), SqlValue::Null), ("age".into(), SqlValue::Integer(40))],
&nb(),
&nb(),
)
.unwrap();
assert_eq!(
sql,
"UPDATE \"users\" SET \"name\" = NULL, \"age\" = $1 \
WHERE \"id\"::text = $2"
);
assert_eq!(params, vec![SqlValue::Integer(40), SqlValue::Integer(5)]);
}
#[test]
fn update_with_empty_where_targets_all_rows() {
let (sql, _) = build_update_sql(
"t",
None,
"",
&[("a".into(), SqlValue::Integer(1))],
&nb(),
&nb(),
)
.unwrap();
assert_eq!(sql, "UPDATE \"t\" SET \"a\" = $1 WHERE TRUE");
}
#[test]
fn update_empty_data_errors() {
assert_eq!(
build_update_sql("t", None, "id = 1", &[], &nb(), &nb()),
Err(StoreError::EmptyData { op: "mutate" })
);
}
#[test]
fn update_rejects_unsafe_column() {
assert!(matches!(
build_update_sql(
"t",
None,
"id = 1",
&[("a-b".into(), SqlValue::Integer(1))],
&nb(),
&nb(),
),
Err(StoreError::InvalidIdentifier { kind: "column", .. })
));
}
#[test]
fn update_propagates_filter_errors() {
assert!(matches!(
build_update_sql(
"t",
None,
"bad ;",
&[("a".into(), SqlValue::Integer(1))],
&nb(),
&nb(),
),
Err(StoreError::Filter(_))
));
}
#[test]
fn insert_casts_each_value_to_its_introspected_column_type() {
let types = std::collections::HashMap::from([
("tenant_id".to_string(), "uuid".to_string()),
("note".to_string(), "text".to_string()),
("n".to_string(), "int4".to_string()),
]);
let (sql, _) = build_insert_sql(
"chat_history",
None,
&[
("tenant_id".into(), txt("83d078e1-b372-42ba-9572-ff8dc521386e")),
("note".into(), txt("hi")),
("n".into(), SqlValue::Integer(3)),
],
&types,
)
.unwrap();
assert_eq!(
sql,
"INSERT INTO \"chat_history\" (\"tenant_id\", \"note\", \"n\") \
VALUES ($1::uuid, $2::text, $3::int4)",
"§v1.36.2 — each value placeholder is cast to its column's \
introspected type so a text-bound value writes into a \
uuid / int column"
);
}
#[test]
fn update_set_casts_each_value_to_its_introspected_column_type() {
let types = std::collections::HashMap::from([(
"status".to_string(),
"uuid".to_string(),
)]);
let (sql, _) = build_update_sql(
"t",
None,
"id = 1",
&[("status".into(), txt("83d078e1-b372-42ba-9572-ff8dc521386e"))],
&nb(),
&types,
)
.unwrap();
assert_eq!(
sql,
"UPDATE \"t\" SET \"status\" = $1::uuid WHERE \"id\"::text = $2",
"§v1.36.2 — the SET value is cast to the column type; `id` \
is absent from the type map so §37.x.e (D4) casts the \
WHERE column to `text` for the equality"
);
}
#[test]
fn update_where_value_is_cast_to_its_column_type() {
let types = std::collections::HashMap::from([
("status".to_string(), "text".to_string()),
("id".to_string(), "int8".to_string()),
]);
let (sql, _) = build_update_sql(
"t",
None,
"id = 1",
&[("status".into(), txt("done"))],
&nb(),
&types,
)
.unwrap();
assert_eq!(
sql,
"UPDATE \"t\" SET \"status\" = $1::text WHERE \"id\" = $2::int8"
);
}
#[test]
fn unknown_column_type_falls_back_to_a_bare_placeholder() {
let (sql, _) =
build_insert_sql("t", None, &[("x".into(), txt("v"))], &nb()).unwrap();
assert_eq!(sql, "INSERT INTO \"t\" (\"x\") VALUES ($1)");
}
#[test]
fn an_unsafe_column_type_name_is_not_spliced_into_sql() {
let types = std::collections::HashMap::from([(
"x".to_string(),
"uuid; DROP TABLE t".to_string(),
)]);
let (sql, _) =
build_insert_sql("t", None, &[("x".into(), txt("v"))], &types).unwrap();
assert_eq!(
sql, "INSERT INTO \"t\" (\"x\") VALUES ($1)",
"an unsafe type name yields no cast — never a splice"
);
}
#[test]
fn injection_in_value_position_is_a_bound_parameter() {
let (sql, params) = build_select_sql(
"users",
None,
"name = '; DROP TABLE users; --'",
&nb(),
&nb(),
)
.unwrap();
assert_eq!(sql, "SELECT * FROM \"users\" WHERE \"name\"::text = $1");
assert_eq!(
params,
vec![txt("; DROP TABLE users; --")]
);
}
#[test]
fn injection_in_table_identifier_is_rejected_not_quoted() {
assert!(matches!(
build_select_sql("users\" WHERE 1=1; --", None, "", &nb(), &nb()),
Err(StoreError::InvalidIdentifier { .. })
));
}
#[test]
fn select_with_a_resolved_schema_is_qualified() {
let (sql, _) =
build_select_sql("tenants", Some("public"), "id = 1", &nb(), &nb())
.unwrap();
assert_eq!(
sql,
"SELECT * FROM \"public\".\"tenants\" WHERE \"id\"::text = $1"
);
}
#[test]
fn every_builder_qualifies_with_a_resolved_schema() {
let data = [("v".to_string(), SqlValue::Integer(1))];
let (sel, _) =
build_select_sql("t", Some("app"), "", &nb(), &nb()).unwrap();
let (del, _) =
build_delete_sql("t", Some("app"), "", &nb(), &nb()).unwrap();
let (ins, _) = build_insert_sql("t", Some("app"), &data, &nb()).unwrap();
let (upd, _) =
build_update_sql("t", Some("app"), "", &data, &nb(), &nb()).unwrap();
assert!(sel.contains("FROM \"app\".\"t\""), "SELECT: {sel}");
assert!(del.contains("FROM \"app\".\"t\""), "DELETE: {del}");
assert!(ins.contains("INTO \"app\".\"t\""), "INSERT: {ins}");
assert!(upd.starts_with("UPDATE \"app\".\"t\""), "UPDATE: {upd}");
}
#[test]
fn no_resolved_schema_renders_the_bare_table() {
let (sql, _) = build_select_sql("t", None, "", &nb(), &nb()).unwrap();
assert_eq!(sql, "SELECT * FROM \"t\" WHERE TRUE");
}
#[test]
fn an_unsafe_schema_name_is_not_spliced_and_falls_back_to_bare_table() {
for unsafe_schema in ["a\"; DROP TABLE x", "my schema", "1schema"] {
let (sql, _) =
build_select_sql("t", Some(unsafe_schema), "", &nb(), &nb())
.unwrap();
assert_eq!(
sql, "SELECT * FROM \"t\" WHERE TRUE",
"unsafe schema `{unsafe_schema}` must not be spliced"
);
}
}
#[test]
fn a_qualified_table_still_casts_and_offsets_correctly() {
let types = std::collections::HashMap::from([
("status".to_string(), "uuid".to_string()),
("id".to_string(), "int8".to_string()),
]);
let (sql, _) = build_update_sql(
"t",
Some("public"),
"id = 1",
&[("status".into(), txt("done"))],
&nb(),
&types,
)
.unwrap();
assert_eq!(
sql,
"UPDATE \"public\".\"t\" SET \"status\" = $1::uuid \
WHERE \"id\" = $2::int8"
);
}
#[test]
fn classify_every_supported_type() {
let cases = [
("BOOL", PgTypeClass::Bool),
("INT2", PgTypeClass::Int2),
("INT4", PgTypeClass::Int4),
("INT8", PgTypeClass::Int8),
("FLOAT4", PgTypeClass::Float4),
("FLOAT8", PgTypeClass::Float8),
("NUMERIC", PgTypeClass::Numeric),
("TEXT", PgTypeClass::Text),
("VARCHAR", PgTypeClass::Text),
("BPCHAR", PgTypeClass::Text),
("NAME", PgTypeClass::Text),
("UUID", PgTypeClass::Uuid),
("TIMESTAMPTZ", PgTypeClass::TimestampTz),
("TIMESTAMP", PgTypeClass::Timestamp),
("DATE", PgTypeClass::Date),
("TIME", PgTypeClass::Time),
("JSON", PgTypeClass::Json),
("JSONB", PgTypeClass::Json),
("BYTEA", PgTypeClass::Bytea),
];
for (name, expected) in cases {
assert_eq!(classify_pg_type(name), Some(expected), "type {name}");
}
}
#[test]
fn classify_is_case_insensitive() {
assert_eq!(classify_pg_type("int4"), Some(PgTypeClass::Int4));
assert_eq!(classify_pg_type("TimestampTz"), Some(PgTypeClass::TimestampTz));
}
#[test]
fn classify_unsupported_types_return_none() {
for name in ["INT4[]", "INET", "POINT", "HSTORE", "CIDR", "MONEY", ""] {
assert_eq!(classify_pg_type(name), None, "type {name} unsupported");
}
}
#[test]
fn store_row_get_and_to_json() {
let row = StoreRow {
columns: vec![
("id".into(), JsonValue::from(7)),
("name".into(), JsonValue::String("Eve".into())),
],
};
assert_eq!(row.get("id"), Some(&JsonValue::from(7)));
assert_eq!(row.get("missing"), None);
assert_eq!(
row.to_json(),
serde_json::json!({ "id": 7, "name": "Eve" })
);
}
fn triple(s: &str, c: &str, t: &str) -> (String, String, String) {
(s.to_string(), c.to_string(), t.to_string())
}
#[test]
fn resolve_from_rows_no_rows_is_table_not_resolved() {
match resolve_from_rows("widgets", vec![]) {
Err(StoreError::TableNotResolved { table }) => {
assert_eq!(table, "widgets");
}
other => panic!("expected TableNotResolved, got {other:?}"),
}
}
#[test]
fn resolve_from_rows_one_schema_resolves_with_its_column_map() {
let (schema, types) = resolve_from_rows(
"tenants",
vec![triple("public", "id", "uuid"), triple("public", "n", "int4")],
)
.expect("a single-schema result resolves");
assert_eq!(schema, "public");
assert_eq!(types.get("id"), Some(&"uuid".to_string()));
assert_eq!(types.get("n"), Some(&"int4".to_string()));
assert_eq!(types.len(), 2);
}
#[test]
fn resolve_from_rows_two_schemas_is_ambiguous_with_sorted_schemas() {
match resolve_from_rows(
"widgets",
vec![
triple("tenant_b", "id", "uuid"),
triple("tenant_a", "id", "int4"),
],
) {
Err(StoreError::AmbiguousTable { table, schemas }) => {
assert_eq!(table, "widgets");
assert_eq!(
schemas,
vec!["tenant_a".to_string(), "tenant_b".to_string()]
);
}
other => panic!("expected AmbiguousTable, got {other:?}"),
}
}
#[test]
fn resolve_from_rows_three_schemas_is_still_one_ambiguous_error() {
assert!(matches!(
resolve_from_rows(
"t",
vec![
triple("s1", "a", "text"),
triple("s2", "a", "text"),
triple("s3", "a", "text"),
],
),
Err(StoreError::AmbiguousTable { .. })
));
}
#[tokio::test]
async fn schema_cache_round_trips_a_resolution() {
let backend = PostgresStoreBackend::connect(
"postgresql://u:p@localhost:5432/fase37xd_cache_rt",
)
.unwrap();
let table = "fase37xd_cache_probe";
assert!(
backend.cached_schema(table).is_none(),
"a cold cache is a miss"
);
let resolved = std::sync::Arc::new(ResolvedTable {
schema: "public".to_string(),
column_types: std::collections::HashMap::from([(
"id".to_string(),
"uuid".to_string(),
)]),
});
backend.cache_schema(table, std::sync::Arc::clone(&resolved));
let hit = backend
.cached_schema(table)
.expect("a warm cache is a hit");
assert_eq!(hit.schema, "public");
assert_eq!(hit.column_types.get("id"), Some(&"uuid".to_string()));
}
#[tokio::test]
async fn schema_cache_never_stores_an_empty_resolution() {
let backend = PostgresStoreBackend::connect(
"postgresql://u:p@localhost:5432/fase37xd_cache_empty",
)
.unwrap();
let table = "fase37xd_empty_probe";
backend.cache_schema(
table,
std::sync::Arc::new(ResolvedTable {
schema: "public".to_string(),
column_types: std::collections::HashMap::new(),
}),
);
assert!(
backend.cached_schema(table).is_none(),
"an empty resolution must never be cached"
);
}
#[test]
fn is_schema_drift_sqlstate_recognises_exactly_the_drift_codes() {
for code in ["42P01", "42703", "42804", "42883"] {
assert!(
is_schema_drift_sqlstate(code),
"`{code}` must be a schema-drift SQLSTATE"
);
}
for code in ["23505", "42601", "08006", "23514", "40001", ""] {
assert!(
!is_schema_drift_sqlstate(code),
"`{code}` is NOT schema drift — must not trigger the \
self-heal retry"
);
}
}
#[test]
fn store_error_is_schema_drift_predicate() {
assert!(StoreError::SchemaDrift {
op: "retrieve",
sqlstate: "42883".to_string(),
source: "operator does not exist: text = uuid".to_string(),
}
.is_schema_drift());
assert!(!StoreError::Query {
op: "retrieve",
source: "syntax error".to_string(),
}
.is_schema_drift());
assert!(!StoreError::TableNotResolved { table: "t".into() }
.is_schema_drift());
}
fn rt(schema: &str) -> std::sync::Arc<ResolvedTable> {
std::sync::Arc::new(ResolvedTable {
schema: schema.to_string(),
column_types: std::collections::HashMap::from([(
"id".to_string(),
"uuid".to_string(),
)]),
})
}
#[test]
fn schema_cache_evicts_the_oldest_entry_at_capacity() {
let mut cache = SchemaCache::new(2);
let key = |t: &str| ("dsn".to_string(), t.to_string());
cache.insert(key("a"), rt("s_a"));
cache.insert(key("b"), rt("s_b"));
cache.insert(key("c"), rt("s_c")); assert_eq!(cache.entries.len(), 2, "the cache is bounded at 2");
assert!(
cache.get(&key("a")).is_none(),
"the oldest entry was evicted"
);
assert_eq!(
cache.get(&key("b")).map(|r| r.schema.clone()),
Some("s_b".to_string())
);
assert_eq!(
cache.get(&key("c")).map(|r| r.schema.clone()),
Some("s_c".to_string())
);
}
#[test]
fn schema_cache_evict_drops_a_named_entry() {
let mut cache = SchemaCache::new(10);
let key = ("dsn".to_string(), "t".to_string());
cache.insert(key.clone(), rt("public"));
assert!(cache.get(&key).is_some());
cache.evict(&key);
assert!(cache.get(&key).is_none(), "evict drops the entry");
}
#[test]
fn schema_cache_reinsert_of_a_key_does_not_evict_another() {
let mut cache = SchemaCache::new(2);
let ka = ("dsn".to_string(), "a".to_string());
let kb = ("dsn".to_string(), "b".to_string());
cache.insert(ka.clone(), rt("public"));
cache.insert(kb.clone(), rt("public"));
cache.insert(ka.clone(), rt("public")); assert_eq!(cache.entries.len(), 2);
assert!(cache.get(&ka).is_some());
assert!(cache.get(&kb).is_some(), "the re-insert evicted nothing");
}
#[test]
fn every_store_error_has_a_non_empty_display() {
let errors = [
StoreError::EmptyConnection,
StoreError::EmptyEnvVarName,
StoreError::MissingEnvVar { var: "X".into() },
StoreError::PoolInit {
dsn_masked: "postgresql://u:***@h/db".into(),
source: "bad".into(),
},
StoreError::InvalidIdentifier { kind: "table", name: "x;".into() },
StoreError::EmptyData { op: "insert" },
StoreError::Filter(FilterError::TooManyConditions { limit: 256 }),
StoreError::Connect { source: "refused".into() },
StoreError::Query { op: "retrieve", source: "syntax".into() },
StoreError::UnsupportedColumnType {
column: "geom".into(),
pg_type: "POINT".into(),
},
StoreError::Decode {
column: "ts".into(),
pg_type: "TIMESTAMPTZ".into(),
source: "overflow".into(),
},
StoreError::TableNotResolved { table: "ghost".into() },
StoreError::AmbiguousTable {
table: "dup".into(),
schemas: vec!["a".into(), "b".into()],
},
StoreError::SchemaDrift {
op: "retrieve",
sqlstate: "42883".into(),
source: "operator does not exist: text = uuid".into(),
},
];
for e in errors {
assert!(!e.to_string().is_empty());
}
}
#[test]
fn filter_error_is_a_store_error_source() {
use std::error::Error;
let e = StoreError::Filter(FilterError::TooManyConditions { limit: 256 });
assert!(e.source().is_some());
}
#[test]
fn filter_error_converts_into_store_error() {
let e: StoreError = FilterError::TooManyConditions { limit: 256 }.into();
assert!(matches!(e, StoreError::Filter(_)));
}
#[test]
fn d6_table_not_resolved_display_carries_an_actionable_hint() {
let err = StoreError::TableNotResolved {
table: "claims".into(),
};
let text = err.to_string();
assert!(
text.contains("`claims`"),
"the table name must appear verbatim, got: {text}"
);
assert!(
text.contains("pg_catalog"),
"the message must disclose pg_catalog (so an adopter knows \
`search_path` is not the culprit), got: {text}"
);
assert!(
text.contains("migration") || text.contains("SELECT"),
"the message must name at least one concrete remedy \
(migration / SELECT permission), got: {text}"
);
}
#[test]
fn d6_ambiguous_table_display_points_at_fase_38_schema_declaration() {
let err = StoreError::AmbiguousTable {
table: "rates".into(),
schemas: vec!["finance".into(), "legacy".into()],
};
let text = err.to_string();
assert!(text.contains("`rates`"), "table name must appear");
assert!(
text.contains("finance") && text.contains("legacy"),
"every resolving schema must appear, got: {text}"
);
assert!(
text.contains("search_path"),
"the search_path remedy must appear, got: {text}"
);
assert!(
text.contains("schema:"),
"the Fase 38 `schema:` declaration must be named (the \
genuinely-superior remedy), got: {text}"
);
assert!(
text.contains("Fase 38"),
"the message must anchor the remedy to Fase 38, got: {text}"
);
}
#[test]
fn d6_display_does_not_leak_internal_sqlstates_or_internal_paths() {
let nr = StoreError::TableNotResolved { table: "t".into() }.to_string();
let amb = StoreError::AmbiguousTable {
table: "t".into(),
schemas: vec!["a".into()],
}
.to_string();
for code in ["42P01", "42703", "42804", "42883"] {
assert!(
!nr.contains(code),
"TableNotResolved must not leak SQLSTATE {code}"
);
assert!(
!amb.contains(code),
"AmbiguousTable must not leak SQLSTATE {code}"
);
}
}
}