use crate::types::{
issue_codes, CaseSensitivity, ColumnSchema, Dialect, Issue, SchemaMetadata, SchemaOrigin,
SchemaTable, TableConstraintInfo,
};
use chrono::{DateTime, Utc};
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
const MAX_CACHE_ENTRIES: usize = 10_000;
use super::helpers::{
extract_simple_name, is_quoted_identifier, split_qualified_identifiers, unquote_identifier,
};
pub(crate) struct RegisterImpliedParams<'a> {
pub(crate) canonical: &'a str,
pub(crate) columns: Vec<ColumnSchema>,
pub(crate) constraints: Vec<TableConstraintInfo>,
pub(crate) is_temporary: bool,
pub(crate) statement_type: &'a str,
pub(crate) statement_index: usize,
pub(crate) emit_warnings: bool,
pub(crate) is_seed: bool,
}
#[derive(Debug, Clone)]
pub(crate) struct SchemaTableEntry {
pub(crate) table: SchemaTable,
pub(crate) origin: SchemaOrigin,
pub(crate) source_statement_idx: Option<usize>,
pub(crate) updated_at: DateTime<Utc>,
pub(crate) temporary: bool,
pub(crate) constraints: Vec<TableConstraintInfo>,
}
#[derive(Debug, Clone)]
pub(crate) struct SearchPathEntry {
pub(crate) catalog: Option<String>,
pub(crate) schema: String,
}
#[derive(Debug, Clone)]
pub(crate) struct TableResolution {
pub(crate) canonical: String,
pub(crate) matched_schema: bool,
}
pub(crate) struct SchemaRegistry {
pub(crate) known_tables: HashSet<String>,
forward_declared_tables: HashSet<String>,
ddl_seeded_tables: HashSet<String>,
pub(crate) imported_tables: HashSet<String>,
pub(crate) schema_tables: HashMap<String, SchemaTableEntry>,
pub(crate) default_catalog: Option<String>,
pub(crate) default_schema: Option<String>,
pub(crate) search_path: Vec<SearchPathEntry>,
case_sensitivity: CaseSensitivity,
dialect: Dialect,
allow_implied: bool,
identifier_cache: RefCell<HashMap<String, String>>,
table_resolution_cache: RefCell<HashMap<String, TableResolution>>,
}
impl SchemaRegistry {
pub(crate) fn new(schema: Option<&SchemaMetadata>, dialect: Dialect) -> (Self, Vec<Issue>) {
let mut registry = Self {
known_tables: HashSet::new(),
forward_declared_tables: HashSet::new(),
ddl_seeded_tables: HashSet::new(),
imported_tables: HashSet::new(),
schema_tables: HashMap::new(),
default_catalog: None,
default_schema: None,
search_path: Vec::new(),
case_sensitivity: CaseSensitivity::Dialect,
dialect,
allow_implied: true,
identifier_cache: RefCell::new(HashMap::new()),
table_resolution_cache: RefCell::new(HashMap::new()),
};
let issues = registry.initialize_from_metadata(schema);
(registry, issues)
}
fn initialize_from_metadata(&mut self, schema: Option<&SchemaMetadata>) -> Vec<Issue> {
let issues = Vec::new();
if let Some(schema) = schema {
self.case_sensitivity = schema.case_sensitivity.unwrap_or(CaseSensitivity::Dialect);
self.allow_implied = schema.allow_implied;
self.default_catalog = schema
.default_catalog
.as_ref()
.map(|c| self.normalize_identifier(c));
self.default_schema = schema
.default_schema
.as_ref()
.map(|s| self.normalize_identifier(s));
if let Some(search_path) = schema.search_path.as_ref() {
self.search_path = search_path
.iter()
.map(|hint| SearchPathEntry {
catalog: hint.catalog.as_ref().map(|c| self.normalize_identifier(c)),
schema: self.normalize_identifier(&hint.schema),
})
.collect();
} else if let Some(default_schema) = &self.default_schema {
self.search_path = vec![SearchPathEntry {
catalog: self.default_catalog.clone(),
schema: default_schema.clone(),
}];
}
for table in &schema.tables {
let canonical = self.schema_table_key(table);
self.known_tables.insert(canonical.clone());
self.imported_tables.insert(canonical.clone());
self.schema_tables.insert(
canonical,
SchemaTableEntry {
table: table.clone(),
origin: SchemaOrigin::Imported,
source_statement_idx: None,
updated_at: Utc::now(),
temporary: false,
constraints: Vec::new(),
},
);
}
}
issues
}
pub(crate) fn allow_implied(&self) -> bool {
self.allow_implied
}
fn invalidate_resolution_cache(&self) {
self.table_resolution_cache.borrow_mut().clear();
}
pub(crate) fn get(&self, canonical: &str) -> Option<&SchemaTableEntry> {
self.schema_tables.get(canonical)
}
pub(crate) fn is_known(&self, canonical: &str) -> bool {
self.known_tables.contains(canonical)
}
pub(crate) fn is_imported(&self, canonical: &str) -> bool {
self.imported_tables.contains(canonical)
}
pub(crate) fn is_ddl_seeded(&self, canonical: &str) -> bool {
self.ddl_seeded_tables.contains(canonical)
}
pub(crate) fn mark_as_ddl_seeded(&mut self, canonical: &str) {
self.ddl_seeded_tables.insert(canonical.to_string());
}
pub(crate) fn remove_implied(&mut self, canonical: &str) {
if !self.imported_tables.contains(canonical) {
self.schema_tables.remove(canonical);
self.known_tables.remove(canonical);
self.forward_declared_tables.remove(canonical);
self.ddl_seeded_tables.remove(canonical);
self.invalidate_resolution_cache();
}
}
fn register_implied_internal(&mut self, params: RegisterImpliedParams<'_>) -> Option<Issue> {
let RegisterImpliedParams {
canonical,
columns,
constraints,
is_temporary,
statement_type,
statement_index,
emit_warnings,
is_seed,
} = params;
self.known_tables.insert(canonical.to_string());
self.invalidate_resolution_cache();
if is_seed {
self.forward_declared_tables.insert(canonical.to_string());
} else {
self.forward_declared_tables.remove(canonical);
}
if self.imported_tables.contains(canonical) {
if emit_warnings {
if let Some(imported_entry) = self.schema_tables.get(canonical) {
let imported_cols: HashSet<_> = imported_entry
.table
.columns
.iter()
.map(|c| &c.name)
.collect();
let ddl_cols: HashSet<_> = columns.iter().map(|c| &c.name).collect();
if imported_cols != ddl_cols {
return Some(
Issue::warning(
issue_codes::SCHEMA_CONFLICT,
format!(
"{} for '{}' conflicts with imported schema. Using imported schema (imported has {} columns, {} has {} columns)",
statement_type,
canonical,
imported_cols.len(),
statement_type,
ddl_cols.len()
),
)
.with_statement(statement_index),
);
}
}
}
return None;
}
if !self.allow_implied || columns.is_empty() {
return None;
}
let parts = split_qualified_identifiers(canonical);
let (catalog, schema, table_name) = match parts.as_slice() {
[catalog_part, schema_part, table] => (
Some(catalog_part.clone()),
Some(schema_part.clone()),
table.clone(),
),
[schema_part, table] => (None, Some(schema_part.clone()), table.clone()),
[table] => (None, None, table.clone()),
_ => (None, None, extract_simple_name(canonical)),
};
self.schema_tables.insert(
canonical.to_string(),
SchemaTableEntry {
table: SchemaTable {
catalog,
schema,
name: table_name,
columns,
},
origin: SchemaOrigin::Implied,
source_statement_idx: Some(statement_index),
updated_at: Utc::now(),
temporary: is_temporary,
constraints,
},
);
None
}
pub(crate) fn register_implied(
&mut self,
canonical: &str,
columns: Vec<ColumnSchema>,
is_temporary: bool,
statement_type: &str,
statement_index: usize,
) -> Option<Issue> {
self.register_implied_internal(RegisterImpliedParams {
canonical,
columns,
constraints: Vec::new(),
is_temporary,
statement_type,
statement_index,
emit_warnings: true,
is_seed: false,
})
}
pub(crate) fn register_implied_with_constraints(
&mut self,
canonical: &str,
columns: Vec<ColumnSchema>,
constraints: Vec<TableConstraintInfo>,
is_temporary: bool,
statement_type: &str,
statement_index: usize,
) -> Option<Issue> {
self.register_implied_internal(RegisterImpliedParams {
canonical,
columns,
constraints,
is_temporary,
statement_type,
statement_index,
emit_warnings: true,
is_seed: false,
})
}
pub(crate) fn mark_table_known(&mut self, canonical: &str) {
self.known_tables.insert(canonical.to_string());
self.forward_declared_tables.insert(canonical.to_string());
self.invalidate_resolution_cache();
}
pub(crate) fn seed_implied_schema_with_constraints(
&mut self,
canonical: &str,
columns: Vec<ColumnSchema>,
constraints: Vec<TableConstraintInfo>,
is_temporary: bool,
statement_index: usize,
) {
self.ddl_seeded_tables.insert(canonical.to_string());
let _ = self.register_implied_internal(RegisterImpliedParams {
canonical,
columns,
constraints,
is_temporary,
statement_type: "seed",
statement_index,
emit_warnings: false,
is_seed: true,
});
}
pub(crate) fn schema_table_key(&self, table: &SchemaTable) -> String {
let mut parts = Vec::new();
if let Some(catalog) = &table.catalog {
parts.push(catalog.clone());
}
if let Some(schema) = &table.schema {
parts.push(schema.clone());
}
parts.push(table.name.clone());
self.normalize_table_name(&parts.join("."))
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip(self), fields(input = name)))]
pub(crate) fn canonicalize_table_reference(&self, name: &str) -> TableResolution {
{
let cache = self.table_resolution_cache.borrow();
if let Some(cached) = cache.get(name) {
return cached.clone();
}
}
let resolution = self.canonicalize_table_reference_uncached(name);
{
let mut cache = self.table_resolution_cache.borrow_mut();
if cache.len() >= MAX_CACHE_ENTRIES {
cache.clear();
}
cache.insert(name.to_string(), resolution.clone());
}
resolution
}
fn canonicalize_table_reference_uncached(&self, name: &str) -> TableResolution {
let parts = split_qualified_identifiers(name);
if parts.is_empty() {
return TableResolution {
canonical: String::new(),
matched_schema: false,
};
}
let normalized: Vec<String> = parts
.into_iter()
.map(|part| self.normalize_identifier(&part))
.collect();
match normalized.len() {
len if len >= 3 => {
let canonical = normalized.join(".");
let matched = self.known_tables.contains(&canonical);
TableResolution {
canonical,
matched_schema: matched,
}
}
2 => {
let canonical = normalized.join(".");
if self.known_tables.contains(&canonical) {
return TableResolution {
canonical,
matched_schema: true,
};
}
if let Some(default_catalog) = &self.default_catalog {
let with_catalog = format!("{default_catalog}.{canonical}");
if self.known_tables.contains(&with_catalog) {
return TableResolution {
canonical: with_catalog,
matched_schema: true,
};
}
}
TableResolution {
canonical,
matched_schema: false,
}
}
_ => {
let table_only = normalized[0].clone();
if self.known_tables.contains(&table_only) {
return TableResolution {
canonical: table_only,
matched_schema: true,
};
}
if let Some(candidate) = self.resolve_via_search_path(&table_only) {
return TableResolution {
canonical: candidate,
matched_schema: true,
};
}
if let Some(schema) = &self.default_schema {
let canonical = if let Some(catalog) = &self.default_catalog {
format!("{catalog}.{schema}.{table_only}")
} else {
format!("{schema}.{table_only}")
};
let matched = self.known_tables.contains(&canonical);
return TableResolution {
canonical,
matched_schema: matched,
};
}
TableResolution {
canonical: table_only.clone(),
matched_schema: self.known_tables.contains(&table_only),
}
}
}
}
pub(crate) fn resolve_via_search_path(&self, table: &str) -> Option<String> {
for entry in &self.search_path {
let canonical = match (&entry.catalog, &entry.schema) {
(Some(catalog), schema) => format!("{catalog}.{schema}.{table}"),
(None, schema) => format!("{schema}.{table}"),
};
if self.known_tables.contains(&canonical) {
return Some(canonical);
}
}
None
}
pub(crate) fn normalize_identifier(&self, name: &str) -> String {
{
let cache = self.identifier_cache.borrow();
if let Some(cached) = cache.get(name) {
return cached.clone();
}
}
let strategy = self.case_sensitivity.resolve(self.dialect);
let normalized = if is_quoted_identifier(name) {
unquote_identifier(name)
} else {
strategy.apply(name).into_owned()
};
{
let mut cache = self.identifier_cache.borrow_mut();
if cache.len() >= MAX_CACHE_ENTRIES {
cache.clear();
}
cache.insert(name.to_string(), normalized.clone());
}
normalized
}
pub(crate) fn normalize_table_name(&self, name: &str) -> String {
let strategy = self.case_sensitivity.resolve(self.dialect);
let parts = split_qualified_identifiers(name);
if parts.is_empty() {
return String::new();
}
let normalized: Vec<String> = parts
.into_iter()
.map(|part| {
if is_quoted_identifier(&part) {
unquote_identifier(&part)
} else {
strategy.apply(&part).into_owned()
}
})
.collect();
normalized.join(".")
}
pub(crate) fn lookup_column_type(&self, canonical: &str, column: &str) -> Option<String> {
let entry = self.schema_tables.get(canonical)?;
let normalized_col = self.normalize_identifier(column);
entry
.table
.columns
.iter()
.find(|c| self.normalize_identifier(&c.name) == normalized_col)
.and_then(|c| c.data_type.clone())
}
pub(crate) fn validate_column(
&self,
canonical: &str,
column: &str,
statement_index: usize,
) -> Option<Issue> {
if let Some(entry) = self.schema_tables.get(canonical) {
let normalized_col = self.normalize_identifier(column);
let column_exists = entry
.table
.columns
.iter()
.any(|c| self.normalize_identifier(&c.name) == normalized_col);
if !column_exists {
return Some(
Issue::warning(
issue_codes::UNKNOWN_COLUMN,
format!(
"Column '{}' not found in table '{}'. Available columns: {}",
column,
canonical,
entry
.table
.columns
.iter()
.map(|c| c.name.as_str())
.collect::<Vec<_>>()
.join(", ")
),
)
.with_statement(statement_index),
);
}
}
None
}
pub(crate) fn all_entries(&self) -> impl Iterator<Item = &SchemaTableEntry> {
self.schema_tables.values()
}
pub(crate) fn is_empty(&self) -> bool {
self.schema_tables.is_empty()
}
pub(crate) fn has_no_known_tables(&self) -> bool {
if !self.imported_tables.is_empty() {
return false;
}
self.known_tables
.iter()
.all(|name| self.forward_declared_tables.contains(name))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::SchemaNamespaceHint;
#[test]
fn test_normalize_identifier_lowercase() {
let (registry, _) = SchemaRegistry::new(None, Dialect::Postgres);
assert_eq!(registry.normalize_identifier("MyTable"), "mytable");
}
#[test]
fn test_normalize_identifier_quoted() {
let (registry, _) = SchemaRegistry::new(None, Dialect::Postgres);
assert_eq!(registry.normalize_identifier("\"MyTable\""), "MyTable");
}
#[test]
fn test_normalize_identifier_uppercase_dialect() {
let schema = SchemaMetadata {
tables: vec![],
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: Some(CaseSensitivity::Upper),
allow_implied: true,
};
let (registry, _) = SchemaRegistry::new(Some(&schema), Dialect::Snowflake);
assert_eq!(registry.normalize_identifier("MyTable"), "MYTABLE");
}
#[test]
fn test_normalize_identifier_exact() {
let schema = SchemaMetadata {
tables: vec![],
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: Some(CaseSensitivity::Exact),
allow_implied: true,
};
let (registry, _) = SchemaRegistry::new(Some(&schema), Dialect::Postgres);
assert_eq!(registry.normalize_identifier("MyTable"), "MyTable");
}
#[test]
fn test_canonicalize_simple_name() {
let schema = SchemaMetadata {
tables: vec![SchemaTable {
catalog: None,
schema: Some("public".to_string()),
name: "users".to_string(),
columns: vec![],
}],
default_catalog: None,
default_schema: Some("public".to_string()),
search_path: None,
case_sensitivity: None,
allow_implied: true,
};
let (registry, _) = SchemaRegistry::new(Some(&schema), Dialect::Postgres);
let resolution = registry.canonicalize_table_reference("users");
assert_eq!(resolution.canonical, "public.users");
assert!(resolution.matched_schema);
}
#[test]
fn test_canonicalize_qualified_name() {
let schema = SchemaMetadata {
tables: vec![SchemaTable {
catalog: None,
schema: Some("analytics".to_string()),
name: "events".to_string(),
columns: vec![],
}],
default_catalog: None,
default_schema: Some("public".to_string()),
search_path: None,
case_sensitivity: None,
allow_implied: true,
};
let (registry, _) = SchemaRegistry::new(Some(&schema), Dialect::Postgres);
let resolution = registry.canonicalize_table_reference("analytics.events");
assert_eq!(resolution.canonical, "analytics.events");
assert!(resolution.matched_schema);
}
#[test]
fn test_canonicalize_with_search_path() {
let schema = SchemaMetadata {
tables: vec![
SchemaTable {
catalog: None,
schema: Some("staging".to_string()),
name: "users".to_string(),
columns: vec![],
},
SchemaTable {
catalog: None,
schema: Some("public".to_string()),
name: "orders".to_string(),
columns: vec![],
},
],
default_catalog: None,
default_schema: None,
search_path: Some(vec![
SchemaNamespaceHint {
catalog: None,
schema: "staging".to_string(),
},
SchemaNamespaceHint {
catalog: None,
schema: "public".to_string(),
},
]),
case_sensitivity: None,
allow_implied: true,
};
let (registry, _) = SchemaRegistry::new(Some(&schema), Dialect::Postgres);
let resolution = registry.canonicalize_table_reference("users");
assert_eq!(resolution.canonical, "staging.users");
assert!(resolution.matched_schema);
let resolution = registry.canonicalize_table_reference("orders");
assert_eq!(resolution.canonical, "public.orders");
assert!(resolution.matched_schema);
}
#[test]
fn test_canonicalize_unknown_table() {
let (registry, _) = SchemaRegistry::new(None, Dialect::Postgres);
let resolution = registry.canonicalize_table_reference("unknown_table");
assert_eq!(resolution.canonical, "unknown_table");
assert!(!resolution.matched_schema);
}
#[test]
fn test_register_implied_schema() {
let (mut registry, _) = SchemaRegistry::new(None, Dialect::Postgres);
let columns = vec![
ColumnSchema {
name: "id".to_string(),
data_type: Some("integer".to_string()),
is_primary_key: None,
foreign_key: None,
},
ColumnSchema {
name: "name".to_string(),
data_type: Some("text".to_string()),
is_primary_key: None,
foreign_key: None,
},
];
let issue = registry.register_implied("public.users", columns, false, "CREATE TABLE", 0);
assert!(issue.is_none());
assert!(registry.is_known("public.users"));
assert!(!registry.is_imported("public.users"));
let entry = registry.get("public.users").unwrap();
assert_eq!(entry.table.columns.len(), 2);
assert_eq!(entry.origin, SchemaOrigin::Implied);
}
#[test]
fn test_register_implied_conflict_with_imported() {
let schema = SchemaMetadata {
tables: vec![SchemaTable {
catalog: None,
schema: Some("public".to_string()),
name: "users".to_string(),
columns: vec![ColumnSchema {
name: "id".to_string(),
data_type: Some("integer".to_string()),
is_primary_key: None,
foreign_key: None,
}],
}],
default_catalog: None,
default_schema: Some("public".to_string()),
search_path: None,
case_sensitivity: None,
allow_implied: true,
};
let (mut registry, _) = SchemaRegistry::new(Some(&schema), Dialect::Postgres);
let columns = vec![
ColumnSchema {
name: "id".to_string(),
data_type: Some("integer".to_string()),
is_primary_key: None,
foreign_key: None,
},
ColumnSchema {
name: "email".to_string(),
data_type: Some("text".to_string()),
is_primary_key: None,
foreign_key: None,
},
];
let issue = registry.register_implied("public.users", columns, false, "CREATE TABLE", 0);
assert!(issue.is_some());
let issue = issue.unwrap();
assert!(issue.message.contains("conflicts with imported schema"));
}
#[test]
fn test_remove_implied_schema() {
let (mut registry, _) = SchemaRegistry::new(None, Dialect::Postgres);
let columns = vec![ColumnSchema {
name: "id".to_string(),
data_type: Some("integer".to_string()),
is_primary_key: None,
foreign_key: None,
}];
registry.register_implied("public.temp", columns, false, "CREATE TABLE", 0);
assert!(registry.is_known("public.temp"));
registry.remove_implied("public.temp");
assert!(!registry.is_known("public.temp"));
}
#[test]
fn test_remove_does_not_affect_imported() {
let schema = SchemaMetadata {
tables: vec![SchemaTable {
catalog: None,
schema: Some("public".to_string()),
name: "users".to_string(),
columns: vec![],
}],
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: None,
allow_implied: true,
};
let (mut registry, _) = SchemaRegistry::new(Some(&schema), Dialect::Postgres);
assert!(registry.is_known("public.users"));
registry.remove_implied("public.users");
assert!(registry.is_known("public.users"));
}
#[test]
fn test_validate_column_exists() {
let schema = SchemaMetadata {
tables: vec![SchemaTable {
catalog: None,
schema: Some("public".to_string()),
name: "users".to_string(),
columns: vec![
ColumnSchema {
name: "id".to_string(),
data_type: Some("integer".to_string()),
is_primary_key: None,
foreign_key: None,
},
ColumnSchema {
name: "email".to_string(),
data_type: Some("text".to_string()),
is_primary_key: None,
foreign_key: None,
},
],
}],
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: None,
allow_implied: true,
};
let (registry, _) = SchemaRegistry::new(Some(&schema), Dialect::Postgres);
let issue = registry.validate_column("public.users", "id", 0);
assert!(issue.is_none());
let issue = registry.validate_column("public.users", "nonexistent", 0);
assert!(issue.is_some());
let issue = issue.unwrap();
assert!(issue.message.contains("not found in table"));
}
#[test]
fn test_validate_column_case_insensitive() {
let schema = SchemaMetadata {
tables: vec![SchemaTable {
catalog: None,
schema: Some("public".to_string()),
name: "users".to_string(),
columns: vec![ColumnSchema {
name: "UserName".to_string(),
data_type: Some("text".to_string()),
is_primary_key: None,
foreign_key: None,
}],
}],
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: None,
allow_implied: true,
};
let (registry, _) = SchemaRegistry::new(Some(&schema), Dialect::Postgres);
let issue = registry.validate_column("public.users", "username", 0);
assert!(issue.is_none());
}
#[test]
fn test_normalize_table_name_qualified() {
let (registry, _) = SchemaRegistry::new(None, Dialect::Postgres);
assert_eq!(
registry.normalize_table_name("Schema.TableName"),
"schema.tablename"
);
assert_eq!(
registry.normalize_table_name("Catalog.Schema.Table"),
"catalog.schema.table"
);
}
#[test]
fn test_allow_implied_disabled() {
let schema = SchemaMetadata {
tables: vec![],
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: None,
allow_implied: false,
};
let (mut registry, _) = SchemaRegistry::new(Some(&schema), Dialect::Postgres);
assert!(!registry.allow_implied());
let columns = vec![ColumnSchema {
name: "id".to_string(),
data_type: Some("integer".to_string()),
is_primary_key: None,
foreign_key: None,
}];
registry.register_implied("public.users", columns, false, "CREATE TABLE", 0);
assert!(registry.is_known("public.users"));
assert!(registry.get("public.users").is_none());
}
#[test]
fn test_empty_table_reference() {
let (registry, _) = SchemaRegistry::new(None, Dialect::Postgres);
let resolution = registry.canonicalize_table_reference("");
assert_eq!(resolution.canonical, "");
assert!(!resolution.matched_schema);
}
#[test]
fn test_canonicalize_three_part_name() {
let schema = SchemaMetadata {
tables: vec![SchemaTable {
catalog: Some("mydb".to_string()),
schema: Some("myschema".to_string()),
name: "mytable".to_string(),
columns: vec![],
}],
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: None,
allow_implied: true,
};
let (registry, _) = SchemaRegistry::new(Some(&schema), Dialect::Postgres);
let resolution = registry.canonicalize_table_reference("mydb.myschema.mytable");
assert_eq!(resolution.canonical, "mydb.myschema.mytable");
assert!(resolution.matched_schema);
}
#[test]
fn test_canonicalize_two_part_with_default_catalog() {
let schema = SchemaMetadata {
tables: vec![SchemaTable {
catalog: Some("defaultdb".to_string()),
schema: Some("myschema".to_string()),
name: "mytable".to_string(),
columns: vec![],
}],
default_catalog: Some("defaultdb".to_string()),
default_schema: None,
search_path: None,
case_sensitivity: None,
allow_implied: true,
};
let (registry, _) = SchemaRegistry::new(Some(&schema), Dialect::Postgres);
let resolution = registry.canonicalize_table_reference("myschema.mytable");
assert_eq!(resolution.canonical, "defaultdb.myschema.mytable");
assert!(resolution.matched_schema);
}
#[test]
fn test_search_path_priority() {
let schema = SchemaMetadata {
tables: vec![
SchemaTable {
catalog: None,
schema: Some("schema_a".to_string()),
name: "users".to_string(),
columns: vec![ColumnSchema {
name: "a_col".to_string(),
data_type: None,
is_primary_key: None,
foreign_key: None,
}],
},
SchemaTable {
catalog: None,
schema: Some("schema_b".to_string()),
name: "users".to_string(),
columns: vec![ColumnSchema {
name: "b_col".to_string(),
data_type: None,
is_primary_key: None,
foreign_key: None,
}],
},
],
default_catalog: None,
default_schema: None,
search_path: Some(vec![
SchemaNamespaceHint {
catalog: None,
schema: "schema_b".to_string(),
},
SchemaNamespaceHint {
catalog: None,
schema: "schema_a".to_string(),
},
]),
case_sensitivity: None,
allow_implied: true,
};
let (registry, _) = SchemaRegistry::new(Some(&schema), Dialect::Postgres);
let resolution = registry.canonicalize_table_reference("users");
assert_eq!(resolution.canonical, "schema_b.users");
assert!(resolution.matched_schema);
}
#[test]
fn test_register_implied_with_empty_columns() {
let (mut registry, _) = SchemaRegistry::new(None, Dialect::Postgres);
let issue =
registry.register_implied("public.empty_table", vec![], false, "CREATE TABLE", 0);
assert!(issue.is_none());
assert!(registry.is_known("public.empty_table"));
assert!(registry.get("public.empty_table").is_none());
}
#[test]
fn test_validate_column_unknown_table() {
let (registry, _) = SchemaRegistry::new(None, Dialect::Postgres);
let issue = registry.validate_column("unknown.table", "any_column", 0);
assert!(issue.is_none());
}
#[test]
fn test_has_no_known_tables_initially() {
let (registry, _) = SchemaRegistry::new(None, Dialect::Postgres);
assert!(registry.has_no_known_tables());
}
#[test]
fn test_has_known_tables_after_import() {
let schema = SchemaMetadata {
tables: vec![SchemaTable {
catalog: None,
schema: Some("public".to_string()),
name: "users".to_string(),
columns: vec![],
}],
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: None,
allow_implied: true,
};
let (registry, _) = SchemaRegistry::new(Some(&schema), Dialect::Postgres);
assert!(!registry.has_no_known_tables());
}
#[test]
fn test_all_entries_iteration() {
let schema = SchemaMetadata {
tables: vec![
SchemaTable {
catalog: None,
schema: Some("public".to_string()),
name: "users".to_string(),
columns: vec![],
},
SchemaTable {
catalog: None,
schema: Some("public".to_string()),
name: "orders".to_string(),
columns: vec![],
},
],
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: None,
allow_implied: true,
};
let (registry, _) = SchemaRegistry::new(Some(&schema), Dialect::Postgres);
let entries: Vec<_> = registry.all_entries().collect();
assert_eq!(entries.len(), 2);
}
#[test]
fn test_is_empty() {
let (registry, _) = SchemaRegistry::new(None, Dialect::Postgres);
assert!(registry.is_empty());
let schema = SchemaMetadata {
tables: vec![SchemaTable {
catalog: None,
schema: None,
name: "test".to_string(),
columns: vec![],
}],
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: None,
allow_implied: true,
};
let (registry_with_tables, _) = SchemaRegistry::new(Some(&schema), Dialect::Postgres);
assert!(!registry_with_tables.is_empty());
}
#[test]
fn test_register_implied_identical_to_imported() {
let schema = SchemaMetadata {
tables: vec![SchemaTable {
catalog: None,
schema: Some("public".to_string()),
name: "users".to_string(),
columns: vec![ColumnSchema {
name: "id".to_string(),
data_type: Some("integer".to_string()),
is_primary_key: None,
foreign_key: None,
}],
}],
default_catalog: None,
default_schema: Some("public".to_string()),
search_path: None,
case_sensitivity: None,
allow_implied: true,
};
let (mut registry, _) = SchemaRegistry::new(Some(&schema), Dialect::Postgres);
let columns = vec![ColumnSchema {
name: "id".to_string(),
data_type: Some("integer".to_string()),
is_primary_key: None,
foreign_key: None,
}];
let issue = registry.register_implied("public.users", columns, false, "CREATE TABLE", 0);
assert!(issue.is_none());
}
#[test]
fn test_snowflake_uppercase_normalization() {
let schema = SchemaMetadata {
tables: vec![SchemaTable {
catalog: None,
schema: Some("PUBLIC".to_string()),
name: "USERS".to_string(),
columns: vec![],
}],
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: Some(CaseSensitivity::Upper),
allow_implied: true,
};
let (registry, _) = SchemaRegistry::new(Some(&schema), Dialect::Snowflake);
let resolution = registry.canonicalize_table_reference("public.users");
assert_eq!(resolution.canonical, "PUBLIC.USERS");
assert!(resolution.matched_schema);
}
#[test]
fn test_column_with_primary_key() {
let schema = SchemaMetadata {
tables: vec![SchemaTable {
catalog: None,
schema: Some("public".to_string()),
name: "users".to_string(),
columns: vec![ColumnSchema {
name: "id".to_string(),
data_type: Some("integer".to_string()),
is_primary_key: Some(true),
foreign_key: None,
}],
}],
default_catalog: None,
default_schema: Some("public".to_string()),
search_path: None,
case_sensitivity: None,
allow_implied: true,
};
let (registry, _) = SchemaRegistry::new(Some(&schema), Dialect::Postgres);
let entry = registry.get("public.users").unwrap();
assert_eq!(entry.table.columns[0].is_primary_key, Some(true));
}
#[test]
fn test_column_with_foreign_key() {
use crate::types::ForeignKeyRef;
let schema = SchemaMetadata {
tables: vec![SchemaTable {
catalog: None,
schema: Some("public".to_string()),
name: "orders".to_string(),
columns: vec![ColumnSchema {
name: "user_id".to_string(),
data_type: Some("integer".to_string()),
is_primary_key: None,
foreign_key: Some(ForeignKeyRef {
table: "public.users".to_string(),
column: "id".to_string(),
}),
}],
}],
default_catalog: None,
default_schema: Some("public".to_string()),
search_path: None,
case_sensitivity: None,
allow_implied: true,
};
let (registry, _) = SchemaRegistry::new(Some(&schema), Dialect::Postgres);
let entry = registry.get("public.orders").unwrap();
let fk = entry.table.columns[0].foreign_key.as_ref().unwrap();
assert_eq!(fk.table, "public.users");
assert_eq!(fk.column, "id");
}
#[test]
fn test_implied_schema_with_constraints() {
use crate::types::{ConstraintType, ForeignKeyRef};
let (mut registry, _) = SchemaRegistry::new(None, Dialect::Postgres);
let columns = vec![
ColumnSchema {
name: "id".to_string(),
data_type: Some("integer".to_string()),
is_primary_key: Some(true),
foreign_key: None,
},
ColumnSchema {
name: "order_id".to_string(),
data_type: Some("integer".to_string()),
is_primary_key: None,
foreign_key: Some(ForeignKeyRef {
table: "orders".to_string(),
column: "id".to_string(),
}),
},
];
let constraints = vec![TableConstraintInfo {
constraint_type: ConstraintType::ForeignKey,
columns: vec!["order_id".to_string()],
referenced_table: Some("orders".to_string()),
referenced_columns: Some(vec!["id".to_string()]),
}];
registry.register_implied_with_constraints(
"public.order_items",
columns,
constraints,
false,
"CREATE TABLE",
0,
);
let entry = registry.get("public.order_items").unwrap();
assert_eq!(entry.table.columns.len(), 2);
assert_eq!(entry.table.columns[0].is_primary_key, Some(true));
assert!(entry.table.columns[1].foreign_key.is_some());
assert_eq!(entry.constraints.len(), 1);
assert_eq!(
entry.constraints[0].constraint_type,
ConstraintType::ForeignKey
);
}
#[test]
fn test_lookup_column_type_found() {
let schema = SchemaMetadata {
tables: vec![SchemaTable {
catalog: None,
schema: Some("public".to_string()),
name: "users".to_string(),
columns: vec![
ColumnSchema {
name: "id".to_string(),
data_type: Some("integer".to_string()),
is_primary_key: None,
foreign_key: None,
},
ColumnSchema {
name: "email".to_string(),
data_type: Some("varchar".to_string()),
is_primary_key: None,
foreign_key: None,
},
ColumnSchema {
name: "created_at".to_string(),
data_type: Some("timestamp".to_string()),
is_primary_key: None,
foreign_key: None,
},
],
}],
default_catalog: None,
default_schema: Some("public".to_string()),
search_path: None,
case_sensitivity: None,
allow_implied: true,
};
let (registry, _) = SchemaRegistry::new(Some(&schema), Dialect::Postgres);
assert_eq!(
registry.lookup_column_type("public.users", "id"),
Some("integer".to_string())
);
assert_eq!(
registry.lookup_column_type("public.users", "email"),
Some("varchar".to_string())
);
assert_eq!(
registry.lookup_column_type("public.users", "created_at"),
Some("timestamp".to_string())
);
}
#[test]
fn test_lookup_column_type_not_found() {
let schema = SchemaMetadata {
tables: vec![SchemaTable {
catalog: None,
schema: Some("public".to_string()),
name: "users".to_string(),
columns: vec![ColumnSchema {
name: "id".to_string(),
data_type: Some("integer".to_string()),
is_primary_key: None,
foreign_key: None,
}],
}],
default_catalog: None,
default_schema: Some("public".to_string()),
search_path: None,
case_sensitivity: None,
allow_implied: true,
};
let (registry, _) = SchemaRegistry::new(Some(&schema), Dialect::Postgres);
assert_eq!(
registry.lookup_column_type("public.users", "nonexistent"),
None
);
assert_eq!(
registry.lookup_column_type("public.unknown_table", "id"),
None
);
}
#[test]
fn test_lookup_column_type_case_insensitive() {
let schema = SchemaMetadata {
tables: vec![SchemaTable {
catalog: None,
schema: Some("public".to_string()),
name: "users".to_string(),
columns: vec![ColumnSchema {
name: "UserName".to_string(),
data_type: Some("text".to_string()),
is_primary_key: None,
foreign_key: None,
}],
}],
default_catalog: None,
default_schema: Some("public".to_string()),
search_path: None,
case_sensitivity: None,
allow_implied: true,
};
let (registry, _) = SchemaRegistry::new(Some(&schema), Dialect::Postgres);
assert_eq!(
registry.lookup_column_type("public.users", "username"),
Some("text".to_string())
);
assert_eq!(
registry.lookup_column_type("public.users", "USERNAME"),
Some("text".to_string())
);
}
#[test]
fn test_lookup_column_type_no_type_defined() {
let schema = SchemaMetadata {
tables: vec![SchemaTable {
catalog: None,
schema: Some("public".to_string()),
name: "users".to_string(),
columns: vec![ColumnSchema {
name: "id".to_string(),
data_type: None, is_primary_key: None,
foreign_key: None,
}],
}],
default_catalog: None,
default_schema: Some("public".to_string()),
search_path: None,
case_sensitivity: None,
allow_implied: true,
};
let (registry, _) = SchemaRegistry::new(Some(&schema), Dialect::Postgres);
assert_eq!(registry.lookup_column_type("public.users", "id"), None);
}
}