use std::collections::HashMap;
use std::fmt;
use std::sync::Mutex;
use crate::ir_nodes::{IRAxonStore, IRStoreColumnSchema};
use crate::store::postgres_backend::{
resolve_dsn, PostgresStoreBackend, StoreError,
};
use crate::store_schema::StoreColumnType;
use crate::store_schema_manifest::{Manifest, ManifestStore};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StoreBackendKind {
InMemory,
Postgresql,
}
impl StoreBackendKind {
pub fn as_str(self) -> &'static str {
match self {
StoreBackendKind::InMemory => "in_memory",
StoreBackendKind::Postgresql => "postgresql",
}
}
}
impl fmt::Display for StoreBackendKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
pub fn classify_backend(backend: &str) -> Option<StoreBackendKind> {
match backend.trim().to_ascii_lowercase().as_str() {
"" | "in_memory" => Some(StoreBackendKind::InMemory),
"postgresql" => Some(StoreBackendKind::Postgresql),
_ => None,
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum RegistryError {
UnknownBackend { store: String, backend: String },
DuplicateStore { store: String },
}
impl fmt::Display for RegistryError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RegistryError::UnknownBackend { store, backend } => write!(
f,
"axonstore `{store}` declares unknown backend `{backend}` \
— the v1.30.0 closed catalog is {{in_memory, postgresql}} \
(sqlite is a documented future fase)"
),
RegistryError::DuplicateStore { store } => write!(
f,
"axonstore `{store}` is declared more than once — store \
names must be unique"
),
}
}
}
impl std::error::Error for RegistryError {}
#[derive(Debug, Clone)]
pub enum StoreHandle {
InMemory,
Postgres(PostgresStoreBackend),
}
impl StoreHandle {
pub fn is_in_memory(&self) -> bool {
matches!(self, StoreHandle::InMemory)
}
pub fn is_postgres(&self) -> bool {
matches!(self, StoreHandle::Postgres(_))
}
}
#[derive(Debug, Default, Clone, PartialEq)]
pub struct SchemaVerifyReport {
pub verified: Vec<String>,
pub missing: Vec<(String, String)>,
pub unreachable: Vec<(String, String)>,
}
impl SchemaVerifyReport {
pub fn has_fatal(&self) -> bool {
!self.missing.is_empty()
}
pub fn fatal_summary(&self) -> String {
if self.missing.is_empty() {
return String::new();
}
let detail = self
.missing
.iter()
.map(|(store, diag)| format!("`{store}` — {diag}"))
.collect::<Vec<_>>()
.join("; ");
format!(
"deploy-time store-schema verification failed: {} declared \
postgresql store table(s) do not resolve on a reachable \
database: {detail}",
self.missing.len()
)
}
}
#[derive(Debug, Clone)]
struct RegisteredStore {
spec: IRAxonStore,
kind: StoreBackendKind,
}
pub struct StoreRegistry {
stores: HashMap<String, RegisteredStore>,
pool_cache: Mutex<HashMap<String, PostgresStoreBackend>>,
}
impl fmt::Debug for StoreRegistry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut kinds: Vec<(&str, StoreBackendKind)> = self
.stores
.iter()
.map(|(name, r)| (name.as_str(), r.kind))
.collect();
kinds.sort_by(|a, b| a.0.cmp(b.0));
f.debug_struct("StoreRegistry")
.field("stores", &kinds)
.field("cached_pools", &self.cached_pool_count())
.finish()
}
}
impl StoreRegistry {
pub fn build(specs: &[IRAxonStore]) -> Result<StoreRegistry, RegistryError> {
let mut stores: HashMap<String, RegisteredStore> =
HashMap::with_capacity(specs.len());
for spec in specs {
let kind = classify_backend(&spec.backend).ok_or_else(|| {
RegistryError::UnknownBackend {
store: spec.name.clone(),
backend: spec.backend.clone(),
}
})?;
if stores.contains_key(&spec.name) {
return Err(RegistryError::DuplicateStore {
store: spec.name.clone(),
});
}
stores.insert(
spec.name.clone(),
RegisteredStore { spec: spec.clone(), kind },
);
}
Ok(StoreRegistry {
stores,
pool_cache: Mutex::new(HashMap::new()),
})
}
pub fn empty() -> StoreRegistry {
StoreRegistry {
stores: HashMap::new(),
pool_cache: Mutex::new(HashMap::new()),
}
}
pub fn resolve(&self, store_name: &str) -> Result<StoreHandle, StoreError> {
let registered = match self.stores.get(store_name) {
None => return Ok(StoreHandle::InMemory),
Some(r) => r,
};
match registered.kind {
StoreBackendKind::InMemory => Ok(StoreHandle::InMemory),
StoreBackendKind::Postgresql => {
let dsn = resolve_dsn(®istered.spec.connection)?;
let mut cache = self.lock_cache();
if let Some(backend) = cache.get(&dsn) {
return Ok(StoreHandle::Postgres(backend.clone()));
}
let backend = PostgresStoreBackend::connect_named(
®istered.spec.connection,
store_name,
)?;
cache.insert(dsn, backend.clone());
Ok(StoreHandle::Postgres(backend))
}
}
}
pub async fn verify_postgres_schemas(&self) -> SchemaVerifyReport {
self.verify_postgres_schemas_with_manifest(None).await
}
pub async fn verify_postgres_schemas_with_manifest(
&self,
manifest: Option<&Manifest>,
) -> SchemaVerifyReport {
let mut report = SchemaVerifyReport::default();
let mut pg_stores: Vec<&str> = self
.stores
.iter()
.filter(|(_, r)| r.kind == StoreBackendKind::Postgresql)
.map(|(name, _)| name.as_str())
.collect();
pg_stores.sort_unstable();
for name in pg_stores {
let column_schema = self
.stores
.get(name)
.and_then(|r| r.spec.column_schema.clone());
let resolved_namespace = match &column_schema {
Some(IRStoreColumnSchema::EnvVar { var_name }) => {
match std::env::var(var_name) {
Ok(v) if !v.trim().is_empty() => Some(v),
_ => {
let err = StoreError::MissingPerTenantSchemaEnv {
store: name.to_string(),
var: var_name.clone(),
};
report
.missing
.push((name.to_string(), err.to_string()));
continue;
}
}
}
_ => None,
};
if let Some(ns) = &resolved_namespace {
if let Err(e) = self.restamp_backend_with_namespace(name, ns) {
report.missing.push((name.to_string(), e.to_string()));
continue;
}
}
match self.resolve(name) {
Ok(StoreHandle::Postgres(backend)) => {
let masked = backend.masked_dsn();
match backend.warm_schema(name).await {
Ok(()) => {
if let Some(drift) = verify_declared_columns(
name,
&backend,
column_schema.as_ref(),
resolved_namespace.as_deref(),
manifest,
&masked,
) {
report.missing.push((name.to_string(), drift));
} else {
report.verified.push(name.to_string());
}
}
Err(
e @ (StoreError::TableNotResolved { .. }
| StoreError::AmbiguousTable { .. }),
) => {
report.missing.push((
name.to_string(),
format!("{e} (database: {masked})"),
));
}
Err(e) => {
report.unreachable.push((
name.to_string(),
format!("{e} (database: {masked})"),
));
}
}
}
Ok(StoreHandle::InMemory) => {}
Err(e) => {
report
.unreachable
.push((name.to_string(), e.to_string()));
}
}
}
report
}
fn restamp_backend_with_namespace(
&self,
store_name: &str,
namespace: &str,
) -> Result<(), StoreError> {
let registered = self.stores.get(store_name).ok_or_else(|| {
StoreError::Query {
op: "verify",
source: format!("axonstore `{store_name}` is not declared"),
}
})?;
let dsn = resolve_dsn(®istered.spec.connection)?;
let backend = PostgresStoreBackend::connect_named_with_namespace(
®istered.spec.connection,
store_name,
Some(namespace),
)?;
let mut cache = self.lock_cache();
cache.insert(dsn, backend);
Ok(())
}
pub fn spec(&self, store_name: &str) -> Option<&IRAxonStore> {
self.stores.get(store_name).map(|r| &r.spec)
}
pub fn backend_kind(&self, store_name: &str) -> Option<StoreBackendKind> {
self.stores.get(store_name).map(|r| r.kind)
}
pub fn len(&self) -> usize {
self.stores.len()
}
pub fn is_empty(&self) -> bool {
self.stores.is_empty()
}
pub fn cached_pool_count(&self) -> usize {
self.lock_cache().len()
}
fn lock_cache(
&self,
) -> std::sync::MutexGuard<'_, HashMap<String, PostgresStoreBackend>> {
self.pool_cache
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
}
}
fn declared_columns_for(
store_name: &str,
column_schema: Option<&IRStoreColumnSchema>,
resolved_namespace: Option<&str>,
manifest: Option<&Manifest>,
) -> Result<Option<std::collections::BTreeMap<String, StoreColumnType>>, String> {
let Some(schema) = column_schema else {
return Ok(None);
};
match schema {
IRStoreColumnSchema::Inline { columns } => {
let mut out = std::collections::BTreeMap::new();
for col in columns {
let Some(ty) = StoreColumnType::from_token(&col.col_type) else {
return Err(format!(
"axonstore `{store_name}` inline schema column \
`{}` declares unknown type `{}` — the closed \
catalog is {{{}}}",
col.name,
col.col_type,
StoreColumnType::all_canonical_names().join(", ")
));
};
out.insert(col.name.clone(), ty);
}
Ok(Some(out))
}
IRStoreColumnSchema::ManifestRef { qualified_name } => {
let Some(m) = manifest else {
return Ok(None);
};
let Some(store) = m.lookup(qualified_name) else {
return Err(format!(
"axonstore `{store_name}` declares `schema: \
\"{qualified_name}\"` but no manifest entry \
matches that qualified name. Available manifest \
entries: {{{}}}.",
m.stores.keys().cloned().collect::<Vec<_>>().join(", ")
));
};
Ok(Some(manifest_store_to_btreemap(store)))
}
IRStoreColumnSchema::EnvVar { .. } => {
let Some(m) = manifest else {
return Ok(None);
};
let ns = resolved_namespace.unwrap_or("");
let key = format!("{ns}.{store_name}");
if let Some(store) = m.lookup(&key) {
return Ok(Some(manifest_store_to_btreemap(store)));
}
let suffix = format!(".{store_name}");
for (k, s) in &m.stores {
if k.ends_with(&suffix) {
return Ok(Some(manifest_store_to_btreemap(s)));
}
}
Ok(None)
}
}
}
fn manifest_store_to_btreemap(
s: &ManifestStore,
) -> std::collections::BTreeMap<String, StoreColumnType> {
let mut out = std::collections::BTreeMap::new();
for (col_name, col) in &s.columns {
out.insert(col_name.clone(), col.col_type);
}
out
}
fn verify_declared_columns(
store_name: &str,
backend: &PostgresStoreBackend,
column_schema: Option<&IRStoreColumnSchema>,
resolved_namespace: Option<&str>,
manifest: Option<&Manifest>,
masked_dsn: &str,
) -> Option<String> {
let declared = match declared_columns_for(
store_name,
column_schema,
resolved_namespace,
manifest,
) {
Ok(Some(d)) => d,
Ok(None) => return None, Err(msg) => return Some(format!("{msg} (database: {masked_dsn})")),
};
let cached = backend.cached_schema(store_name);
let Some(resolved) = cached else {
return None;
};
let live = &resolved.column_types;
let mut missing_cols: Vec<String> = Vec::new();
let mut type_drifts: Vec<String> = Vec::new();
for (col_name, declared_type) in &declared {
match live.get(col_name) {
None => missing_cols.push(col_name.clone()),
Some(pg_udt) => {
if !pg_udt_matches_catalog_type(pg_udt, *declared_type) {
type_drifts.push(format!(
"`{col_name}` declared as `{}` but live type is `{pg_udt}`",
declared_type.canonical_name()
));
}
}
}
}
if missing_cols.is_empty() && type_drifts.is_empty() {
return None;
}
let mut parts: Vec<String> = Vec::new();
if !missing_cols.is_empty() {
parts.push(format!(
"missing on live database: {{{}}}",
missing_cols.join(", ")
));
}
if !type_drifts.is_empty() {
parts.push(format!("type mismatches: {{{}}}", type_drifts.join("; ")));
}
let drift = parts.join("; ");
let err = StoreError::DeclaredVsLiveDrift {
store: store_name.to_string(),
drift,
};
Some(format!("{err} (database: {masked_dsn})"))
}
fn pg_udt_matches_catalog_type(udt: &str, declared: StoreColumnType) -> bool {
let u = udt.to_ascii_lowercase();
use StoreColumnType as C;
match declared {
C::Uuid => u == "uuid",
C::Text => matches!(u.as_str(), "text" | "varchar" | "bpchar" | "name"),
C::Int => matches!(u.as_str(), "int4" | "integer"),
C::BigInt => matches!(u.as_str(), "int8" | "bigint"),
C::Float => matches!(u.as_str(), "float4" | "real"),
C::Double => matches!(u.as_str(), "float8" | "double precision"),
C::Bool => u == "bool" || u == "boolean",
C::Timestamptz => u == "timestamptz",
C::Timestamp => u == "timestamp",
C::Date => u == "date",
C::Time => u == "time",
C::Jsonb => u == "jsonb",
C::Json => u == "json",
C::Bytea => u == "bytea",
C::Numeric => matches!(u.as_str(), "numeric" | "decimal"),
}
}
#[cfg(test)]
mod tests {
use super::*;
fn spec(name: &str, backend: &str, connection: &str) -> IRAxonStore {
IRAxonStore {
node_type: "axonstore",
source_line: 0,
source_column: 0,
name: name.to_string(),
backend: backend.to_string(),
connection: connection.to_string(),
confidence_floor: None,
isolation: String::new(),
on_breach: String::new(),
capability: String::new(),
column_schema: None,
}
}
#[test]
fn classify_postgresql() {
assert_eq!(
classify_backend("postgresql"),
Some(StoreBackendKind::Postgresql)
);
}
#[test]
fn classify_in_memory_and_empty_default() {
assert_eq!(
classify_backend("in_memory"),
Some(StoreBackendKind::InMemory)
);
assert_eq!(classify_backend(""), Some(StoreBackendKind::InMemory));
}
#[test]
fn classify_is_trimmed_and_case_insensitive() {
assert_eq!(
classify_backend(" PostgreSQL "),
Some(StoreBackendKind::Postgresql)
);
assert_eq!(
classify_backend("IN_MEMORY"),
Some(StoreBackendKind::InMemory)
);
}
#[test]
fn classify_unknown_backends_are_none() {
for backend in ["sqlite", "mysql", "postgres", "mongodb", "redis"] {
assert_eq!(classify_backend(backend), None, "backend {backend}");
}
}
#[test]
fn build_accepts_valid_specs() {
let specs = [
spec("cache", "in_memory", ""),
spec("tenants", "postgresql", "env:DATABASE_URL"),
spec("scratch", "", ""),
];
let registry = StoreRegistry::build(&specs).unwrap();
assert_eq!(registry.len(), 3);
assert!(!registry.is_empty());
}
#[test]
fn build_rejects_unknown_backend() {
let specs = [spec("legacy", "sqlite", "file:./db.sqlite")];
match StoreRegistry::build(&specs) {
Err(RegistryError::UnknownBackend { store, backend }) => {
assert_eq!(store, "legacy");
assert_eq!(backend, "sqlite");
}
other => panic!("expected UnknownBackend, got {other:?}"),
}
}
#[test]
fn build_rejects_duplicate_store_name() {
let specs = [
spec("tenants", "in_memory", ""),
spec("tenants", "postgresql", "env:DB"),
];
match StoreRegistry::build(&specs) {
Err(RegistryError::DuplicateStore { store }) => {
assert_eq!(store, "tenants");
}
other => panic!("expected DuplicateStore, got {other:?}"),
}
}
#[test]
fn build_empty_specs_yields_empty_registry() {
let registry = StoreRegistry::build(&[]).unwrap();
assert!(registry.is_empty());
assert_eq!(registry.len(), 0);
}
#[test]
fn empty_constructor_is_empty() {
assert!(StoreRegistry::empty().is_empty());
}
#[test]
fn resolve_undeclared_store_is_in_memory() {
let registry = StoreRegistry::empty();
let handle = registry.resolve("never_declared").unwrap();
assert!(handle.is_in_memory());
}
#[test]
fn resolve_declared_in_memory_store() {
let registry =
StoreRegistry::build(&[spec("cache", "in_memory", "")]).unwrap();
assert!(registry.resolve("cache").unwrap().is_in_memory());
}
#[test]
fn resolve_empty_backend_store_is_in_memory() {
let registry = StoreRegistry::build(&[spec("s", "", "")]).unwrap();
assert!(registry.resolve("s").unwrap().is_in_memory());
}
#[test]
fn resolve_empty_store_name_is_in_memory() {
assert!(StoreRegistry::empty().resolve("").unwrap().is_in_memory());
}
#[test]
fn resolve_postgres_with_missing_env_var_errors_not_kv_fallback() {
let registry = StoreRegistry::build(&[spec(
"tenants",
"postgresql",
"env:AXON_NONEXISTENT_VAR_FASE35D",
)])
.unwrap();
match registry.resolve("tenants") {
Err(StoreError::MissingEnvVar { var }) => {
assert_eq!(var, "AXON_NONEXISTENT_VAR_FASE35D");
}
other => panic!("expected MissingEnvVar, got {other:?}"),
}
}
#[test]
fn resolve_postgres_with_empty_connection_errors() {
let registry =
StoreRegistry::build(&[spec("t", "postgresql", "")]).unwrap();
assert!(matches!(
registry.resolve("t"),
Err(StoreError::EmptyConnection)
));
}
#[tokio::test]
async fn resolve_postgres_store_yields_a_postgres_handle() {
let registry = StoreRegistry::build(&[spec(
"tenants",
"postgresql",
"postgresql://u:p@localhost:5432/axon",
)])
.unwrap();
assert!(registry.resolve("tenants").unwrap().is_postgres());
}
#[tokio::test]
async fn resolving_one_store_twice_reuses_one_pool() {
let registry = StoreRegistry::build(&[spec(
"tenants",
"postgresql",
"postgresql://u:p@localhost:5432/axon",
)])
.unwrap();
assert_eq!(registry.cached_pool_count(), 0);
registry.resolve("tenants").unwrap();
registry.resolve("tenants").unwrap();
assert_eq!(
registry.cached_pool_count(),
1,
"the second resolve must hit the cache, not reconnect"
);
}
#[tokio::test]
async fn two_stores_sharing_a_dsn_share_one_pool() {
let dsn = "postgresql://u:p@localhost:5432/shared";
let registry = StoreRegistry::build(&[
spec("alpha", "postgresql", dsn),
spec("beta", "postgresql", dsn),
])
.unwrap();
registry.resolve("alpha").unwrap();
registry.resolve("beta").unwrap();
assert_eq!(
registry.cached_pool_count(),
1,
"stores on the same DSN must share one pool"
);
}
#[tokio::test]
async fn two_stores_with_distinct_dsns_get_distinct_pools() {
let registry = StoreRegistry::build(&[
spec("alpha", "postgresql", "postgresql://u:p@localhost/db_a"),
spec("beta", "postgresql", "postgresql://u:p@localhost/db_b"),
])
.unwrap();
registry.resolve("alpha").unwrap();
registry.resolve("beta").unwrap();
assert_eq!(registry.cached_pool_count(), 2);
}
#[tokio::test]
async fn malformed_dsn_errors_and_is_not_cached() {
let registry = StoreRegistry::build(&[spec(
"broken",
"postgresql",
"this is not a dsn",
)])
.unwrap();
assert!(matches!(
registry.resolve("broken"),
Err(StoreError::PoolInit { .. })
));
assert_eq!(
registry.cached_pool_count(),
0,
"a failed connect must not populate the cache"
);
}
#[test]
fn spec_accessor_returns_the_declaration() {
let registry = StoreRegistry::build(&[spec(
"tenants",
"postgresql",
"env:DB",
)])
.unwrap();
let s = registry.spec("tenants").unwrap();
assert_eq!(s.name, "tenants");
assert_eq!(s.backend, "postgresql");
assert!(registry.spec("absent").is_none());
}
#[test]
fn backend_kind_accessor() {
let registry = StoreRegistry::build(&[
spec("kv", "in_memory", ""),
spec("pg", "postgresql", "env:DB"),
])
.unwrap();
assert_eq!(
registry.backend_kind("kv"),
Some(StoreBackendKind::InMemory)
);
assert_eq!(
registry.backend_kind("pg"),
Some(StoreBackendKind::Postgresql)
);
assert_eq!(registry.backend_kind("absent"), None);
}
#[test]
fn store_handle_predicates() {
assert!(StoreHandle::InMemory.is_in_memory());
assert!(!StoreHandle::InMemory.is_postgres());
}
#[test]
fn backend_kind_display() {
assert_eq!(StoreBackendKind::InMemory.to_string(), "in_memory");
assert_eq!(StoreBackendKind::Postgresql.to_string(), "postgresql");
}
#[test]
fn registry_debug_does_not_leak_connection_strings() {
let registry = StoreRegistry::build(&[spec(
"tenants",
"postgresql",
"postgresql://user:fakecred0@localhost/db",
)])
.unwrap();
let debug = format!("{registry:?}");
assert!(!debug.contains("fakecred0"), "Debug must not leak the DSN");
assert!(debug.contains("tenants"));
assert!(debug.to_lowercase().contains("postgresql"));
}
#[test]
fn registry_errors_have_non_empty_display() {
let errors = [
RegistryError::UnknownBackend {
store: "s".into(),
backend: "mysql".into(),
},
RegistryError::DuplicateStore { store: "s".into() },
];
for e in errors {
assert!(!e.to_string().is_empty());
}
}
#[test]
fn schema_verify_report_has_fatal_iff_a_table_is_missing() {
let mut report = SchemaVerifyReport::default();
assert!(!report.has_fatal(), "an empty report is not fatal");
assert!(report.fatal_summary().is_empty());
report.unreachable.push(("s".into(), "down".into()));
assert!(
!report.has_fatal(),
"an unreachable store is a non-fatal warning"
);
report.missing.push(("t".into(), "no such table".into()));
assert!(report.has_fatal(), "a missing table is fatal");
assert!(report.fatal_summary().contains("`t`"));
}
#[tokio::test]
async fn verify_postgres_schemas_skips_in_memory_and_warns_on_unreachable() {
let registry = StoreRegistry::build(&[
spec("cache", "in_memory", ""),
spec("tenants", "postgresql", "this is not a dsn"),
])
.unwrap();
let report = registry.verify_postgres_schemas().await;
assert!(report.verified.is_empty());
assert!(
report.missing.is_empty(),
"an unreachable store must not be a fatal `missing` entry"
);
assert_eq!(report.unreachable.len(), 1);
assert_eq!(report.unreachable[0].0, "tenants");
assert!(
!report.has_fatal(),
"an unreachable store must NOT fail the deploy"
);
}
#[tokio::test]
async fn verify_postgres_schemas_empty_registry_is_clean() {
let report = StoreRegistry::empty().verify_postgres_schemas().await;
assert!(report.verified.is_empty());
assert!(report.missing.is_empty());
assert!(!report.has_fatal());
}
fn spec_with_schema(
name: &str,
connection: &str,
schema: crate::ir_nodes::IRStoreColumnSchema,
) -> IRAxonStore {
IRAxonStore {
node_type: "axonstore",
source_line: 0,
source_column: 0,
name: name.to_string(),
backend: "postgresql".to_string(),
connection: connection.to_string(),
confidence_floor: None,
isolation: String::new(),
on_breach: String::new(),
capability: String::new(),
column_schema: Some(schema),
}
}
#[tokio::test]
async fn t806_missing_per_tenant_env_var_fails_deploy_with_named_code() {
let var_name = "AXON_T806_FASE38F_UNSET_VAR_XYZ_DO_NOT_SET";
std::env::remove_var(var_name);
let registry = StoreRegistry::build(&[spec_with_schema(
"tenants",
"postgresql://u:p@localhost:5432/axon",
crate::ir_nodes::IRStoreColumnSchema::EnvVar {
var_name: var_name.to_string(),
},
)])
.unwrap();
let report = registry.verify_postgres_schemas_with_manifest(None).await;
assert!(report.has_fatal(), "T806 must fail-close the deploy");
let (store, diag) = &report.missing[0];
assert_eq!(store, "tenants");
assert!(diag.contains("axon-T806"), "diag must carry T806 slug: {diag}");
assert!(diag.contains(var_name), "diag must name the env var: {diag}");
}
#[tokio::test]
async fn t806_empty_string_env_var_also_fails_t806() {
let var_name = "AXON_T806_FASE38F_EMPTY_VAR";
std::env::set_var(var_name, "");
let registry = StoreRegistry::build(&[spec_with_schema(
"tenants",
"postgresql://u:p@localhost:5432/axon",
crate::ir_nodes::IRStoreColumnSchema::EnvVar {
var_name: var_name.to_string(),
},
)])
.unwrap();
let report = registry.verify_postgres_schemas_with_manifest(None).await;
std::env::remove_var(var_name);
assert!(report.has_fatal(), "empty-string env var must fail-close");
assert!(report.missing[0].1.contains("axon-T806"));
}
#[tokio::test]
async fn three_tenants_each_get_their_namespace_resolved_independently() {
for (var, value) in [
("AXON_T806_FASE38F_T1", "tenant_a"),
("AXON_T806_FASE38F_T2", "tenant_b"),
("AXON_T806_FASE38F_T3", "tenant_c"),
] {
std::env::set_var(var, value);
}
let specs: Vec<IRAxonStore> = ["AXON_T806_FASE38F_T1", "AXON_T806_FASE38F_T2", "AXON_T806_FASE38F_T3"]
.iter()
.enumerate()
.map(|(i, v)| {
spec_with_schema(
&format!("tenants_{i}"),
"postgresql://u:p@localhost:5432/axon",
crate::ir_nodes::IRStoreColumnSchema::EnvVar {
var_name: (*v).to_string(),
},
)
})
.collect();
let registry = StoreRegistry::build(&specs).unwrap();
let _ = registry.verify_postgres_schemas_with_manifest(None).await;
for var in ["AXON_T806_FASE38F_T1", "AXON_T806_FASE38F_T2", "AXON_T806_FASE38F_T3"] {
std::env::remove_var(var);
}
assert!(registry.cached_pool_count() <= 1);
}
#[test]
fn application_name_stamping_includes_resolved_namespace() {
use crate::store::postgres_backend::application_name_for_with_namespace;
assert_eq!(
application_name_for_with_namespace("claims", None),
"axon-store/claims"
);
assert_eq!(
application_name_for_with_namespace("claims", Some("tenant_42")),
"axon-store/claims/tenant_42"
);
assert_eq!(
application_name_for_with_namespace("claims", Some("")),
"axon-store/claims"
);
assert_eq!(
application_name_for_with_namespace("", Some("tenant_42")),
"axon-store/tenant_42"
);
}
#[test]
fn application_name_stamping_truncates_at_namedatalen_with_char_boundary() {
use crate::store::postgres_backend::application_name_for_with_namespace;
let long_store = "s".repeat(50);
let long_ns = "é".repeat(50);
let stamped = application_name_for_with_namespace(&long_store, Some(&long_ns));
assert!(stamped.len() <= 63, "got {}: {stamped}", stamped.len());
assert!(stamped.is_char_boundary(stamped.len()));
assert!(stamped.starts_with("axon-store/"));
}
#[test]
fn pg_udt_matches_catalog_type_recognises_text_class_aliases() {
for udt in ["text", "varchar", "bpchar", "name", "TEXT", "VARCHAR"] {
assert!(
pg_udt_matches_catalog_type(udt, StoreColumnType::Text),
"Text class must accept `{udt}`"
);
}
for udt in ["int4", "integer", "INT4"] {
assert!(pg_udt_matches_catalog_type(udt, StoreColumnType::Int));
}
for udt in ["int8", "bigint"] {
assert!(pg_udt_matches_catalog_type(udt, StoreColumnType::BigInt));
}
}
#[test]
fn pg_udt_matches_catalog_type_rejects_off_class_udts() {
assert!(!pg_udt_matches_catalog_type("int4", StoreColumnType::Text));
assert!(!pg_udt_matches_catalog_type("uuid", StoreColumnType::Int));
assert!(!pg_udt_matches_catalog_type("varchar", StoreColumnType::Uuid));
assert!(!pg_udt_matches_catalog_type("bool", StoreColumnType::Numeric));
}
#[test]
fn verify_declared_columns_no_schema_means_nothing_to_prove() {
let result = declared_columns_for("tenants", None, None, None);
assert!(matches!(result, Ok(None)));
}
#[test]
fn declared_columns_for_inline_returns_btreemap_keyed_on_column_names() {
let schema = crate::ir_nodes::IRStoreColumnSchema::Inline {
columns: vec![
crate::ir_nodes::IRStoreColumn {
name: "tenant_id".to_string(),
col_type: "Uuid".to_string(),
primary_key: true,
auto_increment: false,
not_null: false,
unique: false,
default_value: String::new(),
identity: false,
},
crate::ir_nodes::IRStoreColumn {
name: "tier".to_string(),
col_type: "Text".to_string(),
primary_key: false,
auto_increment: false,
not_null: true,
unique: false,
default_value: String::new(),
identity: false,
},
],
};
let cols = declared_columns_for("tenants", Some(&schema), None, None)
.unwrap()
.unwrap();
assert_eq!(cols.len(), 2);
assert_eq!(cols.get("tenant_id").copied(), Some(StoreColumnType::Uuid));
assert_eq!(cols.get("tier").copied(), Some(StoreColumnType::Text));
}
#[test]
fn declared_columns_for_inline_unknown_type_returns_a_named_error() {
let schema = crate::ir_nodes::IRStoreColumnSchema::Inline {
columns: vec![crate::ir_nodes::IRStoreColumn {
name: "loc".to_string(),
col_type: "Geometry".to_string(),
primary_key: false,
auto_increment: false,
not_null: false,
unique: false,
default_value: String::new(),
identity: false,
}],
};
let result = declared_columns_for("tenants", Some(&schema), None, None);
match result {
Err(msg) => {
assert!(msg.contains("Geometry"));
assert!(msg.contains("closed catalog"));
}
other => panic!("expected named error, got {other:?}"),
}
}
#[test]
fn declared_columns_for_manifest_ref_returns_none_when_no_manifest_in_scope() {
let schema = crate::ir_nodes::IRStoreColumnSchema::ManifestRef {
qualified_name: "public.tenants".to_string(),
};
let result = declared_columns_for("tenants", Some(&schema), None, None);
assert!(matches!(result, Ok(None)));
}
#[test]
fn declared_columns_for_env_var_no_manifest_returns_none() {
let schema = crate::ir_nodes::IRStoreColumnSchema::EnvVar {
var_name: "TENANT_SCHEMA".to_string(),
};
let result = declared_columns_for("tenants", Some(&schema), Some("tenant_42"), None);
assert!(matches!(result, Ok(None)));
}
#[test]
fn declared_columns_for_manifest_ref_resolves_against_provided_manifest() {
let m = Manifest::parse_json(
r#"{
"version": 1,
"stores": {
"public.tenants": {
"columns": {
"tenant_id": { "type": "Uuid", "primary_key": true },
"tier": { "type": "Text", "not_null": true }
}
}
}
}"#,
)
.unwrap();
let schema = crate::ir_nodes::IRStoreColumnSchema::ManifestRef {
qualified_name: "public.tenants".to_string(),
};
let cols = declared_columns_for("tenants", Some(&schema), None, Some(&m))
.unwrap()
.unwrap();
assert_eq!(cols.len(), 2);
assert_eq!(cols.get("tenant_id").copied(), Some(StoreColumnType::Uuid));
}
#[test]
fn declared_columns_for_env_var_uses_first_match_heuristic_at_deploy() {
let m = Manifest::parse_json(
r#"{
"version": 1,
"stores": {
"tenant_42.events": {
"columns": {
"event_id": { "type": "Uuid" }
}
}
}
}"#,
)
.unwrap();
let schema = crate::ir_nodes::IRStoreColumnSchema::EnvVar {
var_name: "TENANT_SCHEMA".to_string(),
};
let cols = declared_columns_for("events", Some(&schema), Some("tenant_99"), Some(&m))
.unwrap()
.unwrap();
assert!(cols.contains_key("event_id"));
}
#[test]
fn declared_columns_for_manifest_ref_missing_entry_is_a_named_error() {
let m = Manifest::parse_json(
r#"{"version":1,"stores":{"public.other":{"columns":{"x":{"type":"Uuid"}}}}}"#,
)
.unwrap();
let schema = crate::ir_nodes::IRStoreColumnSchema::ManifestRef {
qualified_name: "public.tenants".to_string(),
};
let result = declared_columns_for("tenants", Some(&schema), None, Some(&m));
match result {
Err(msg) => {
assert!(msg.contains("public.tenants"));
assert!(msg.contains("Available manifest entries"));
}
other => panic!("expected named missing-entry error, got {other:?}"),
}
}
#[test]
fn store_error_t806_and_t807_display_carries_the_slug_and_remedy() {
let t806 = StoreError::MissingPerTenantSchemaEnv {
store: "tenants".to_string(),
var: "TENANT_SCHEMA".to_string(),
};
let msg = t806.to_string();
assert!(msg.contains("axon-T806"));
assert!(msg.contains("TENANT_SCHEMA"));
assert!(msg.contains("Never a silent fallback"));
let t807 = StoreError::DeclaredVsLiveDrift {
store: "tenants".to_string(),
drift: "missing on live database: {tier}".to_string(),
};
let msg = t807.to_string();
assert!(msg.contains("axon-T807"));
assert!(msg.contains("tenants"));
assert!(msg.contains("axon store introspect"), "remedy must point at the CLI: {msg}");
}
}