use async_trait::async_trait;
use std::collections::HashMap;
use crate::connection::Connection;
use crate::error::Result;
use crate::types::{ColumnMetadata, TableMetadata};
#[async_trait]
pub trait SchemaProvider: Send + Sync {
async fn list_schemas(&self) -> Result<Vec<String>>;
async fn list_tables(&self, schema: Option<&str>) -> Result<Vec<String>>;
async fn get_table(&self, schema: Option<&str>, table: &str) -> Result<Option<TableMetadata>>;
async fn table_exists(&self, schema: Option<&str>, table: &str) -> Result<bool> {
Ok(self.get_table(schema, table).await?.is_some())
}
async fn get_primary_key(
&self,
schema: Option<&str>,
table: &str,
) -> Result<Vec<ColumnMetadata>> {
let meta = self.get_table(schema, table).await?;
Ok(meta
.map(|t| t.primary_key_columns().into_iter().cloned().collect())
.unwrap_or_default())
}
async fn get_columns(&self, schema: Option<&str>, table: &str) -> Result<Vec<ColumnMetadata>> {
let meta = self.get_table(schema, table).await?;
Ok(meta.map(|t| t.columns).unwrap_or_default())
}
async fn get_column(
&self,
schema: Option<&str>,
table: &str,
column: &str,
) -> Result<Option<ColumnMetadata>> {
let meta = self.get_table(schema, table).await?;
Ok(meta.and_then(|t| t.column(column).cloned()))
}
async fn list_indexes(&self, schema: Option<&str>, table: &str) -> Result<Vec<IndexMetadata>>;
async fn list_foreign_keys(
&self,
schema: Option<&str>,
table: &str,
) -> Result<Vec<ForeignKeyMetadata>>;
}
#[async_trait]
pub trait SchemaManager: SchemaProvider {
async fn create_table(&self, table: &TableMetadata) -> Result<()>;
async fn create_table_if_not_exists(&self, table: &TableMetadata) -> Result<bool> {
if self
.table_exists(table.schema.as_deref(), &table.name)
.await?
{
return Ok(false);
}
self.create_table(table).await?;
Ok(true)
}
async fn drop_table(&self, schema: Option<&str>, table: &str) -> Result<()>;
async fn drop_table_if_exists(&self, schema: Option<&str>, table: &str) -> Result<bool> {
if !self.table_exists(schema, table).await? {
return Ok(false);
}
self.drop_table(schema, table).await?;
Ok(true)
}
async fn add_column(
&self,
schema: Option<&str>,
table: &str,
column: &ColumnMetadata,
) -> Result<()>;
async fn drop_column(&self, schema: Option<&str>, table: &str, column: &str) -> Result<()>;
async fn alter_column(
&self,
schema: Option<&str>,
table: &str,
column: &ColumnMetadata,
) -> Result<()>;
async fn rename_table(
&self,
schema: Option<&str>,
old_name: &str,
new_name: &str,
) -> Result<()>;
async fn rename_column(
&self,
schema: Option<&str>,
table: &str,
old_name: &str,
new_name: &str,
) -> Result<()>;
async fn create_index(&self, index: &IndexMetadata) -> Result<()>;
async fn drop_index(&self, schema: Option<&str>, index_name: &str) -> Result<()>;
async fn evolve_schema(
&self,
target: &TableMetadata,
mode: SchemaEvolutionMode,
) -> Result<SchemaEvolutionResult>;
}
#[derive(Debug, Clone)]
pub struct IndexMetadata {
pub schema: Option<String>,
pub table: String,
pub name: String,
pub columns: Vec<String>,
pub unique: bool,
pub primary: bool,
pub index_type: Option<String>,
pub predicate: Option<String>,
}
impl IndexMetadata {
pub fn new(table: impl Into<String>, name: impl Into<String>, columns: Vec<String>) -> Self {
Self {
schema: None,
table: table.into(),
name: name.into(),
columns,
unique: false,
primary: false,
index_type: None,
predicate: None,
}
}
pub fn unique(mut self) -> Self {
self.unique = true;
self
}
pub fn primary(mut self) -> Self {
self.primary = true;
self.unique = true;
self
}
}
#[derive(Debug, Clone)]
pub struct ForeignKeyMetadata {
pub name: String,
pub source_schema: Option<String>,
pub source_table: String,
pub source_columns: Vec<String>,
pub target_schema: Option<String>,
pub target_table: String,
pub target_columns: Vec<String>,
pub on_delete: ForeignKeyAction,
pub on_update: ForeignKeyAction,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ForeignKeyAction {
#[default]
NoAction,
Restrict,
Cascade,
SetNull,
SetDefault,
}
impl ForeignKeyAction {
pub fn to_sql(&self) -> &'static str {
match self {
Self::NoAction => "NO ACTION",
Self::Restrict => "RESTRICT",
Self::Cascade => "CASCADE",
Self::SetNull => "SET NULL",
Self::SetDefault => "SET DEFAULT",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum SchemaEvolutionMode {
None,
#[default]
AddColumnsOnly,
AddAndWiden,
Full,
}
#[derive(Debug, Clone, Default)]
pub struct SchemaEvolutionResult {
pub columns_added: Vec<String>,
pub columns_altered: Vec<String>,
pub columns_dropped: Vec<String>,
pub indexes_added: Vec<String>,
pub indexes_dropped: Vec<String>,
pub table_created: bool,
pub changed: bool,
}
impl SchemaEvolutionResult {
pub fn table_created() -> Self {
Self {
table_created: true,
changed: true,
..Default::default()
}
}
pub fn has_changes(&self) -> bool {
self.changed
|| self.table_created
|| !self.columns_added.is_empty()
|| !self.columns_altered.is_empty()
|| !self.columns_dropped.is_empty()
|| !self.indexes_added.is_empty()
|| !self.indexes_dropped.is_empty()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum AutoDdlMode {
#[default]
None,
Create,
CreateAndEvolve,
}
pub trait TableNamingStrategy: Send + Sync {
fn transform(&self, schema: Option<&str>, table: &str) -> (Option<String>, String);
}
#[derive(Debug, Clone, Default)]
pub struct IdentityNaming;
impl TableNamingStrategy for IdentityNaming {
fn transform(&self, schema: Option<&str>, table: &str) -> (Option<String>, String) {
(schema.map(String::from), table.to_string())
}
}
#[derive(Debug, Clone)]
pub struct PrefixNaming {
prefix: String,
}
impl PrefixNaming {
pub fn new(prefix: impl Into<String>) -> Self {
Self {
prefix: prefix.into(),
}
}
}
impl TableNamingStrategy for PrefixNaming {
fn transform(&self, schema: Option<&str>, table: &str) -> (Option<String>, String) {
(
schema.map(String::from),
format!("{}{}", self.prefix, table),
)
}
}
#[derive(Debug, Clone)]
pub struct SuffixNaming {
suffix: String,
}
impl SuffixNaming {
pub fn new(suffix: impl Into<String>) -> Self {
Self {
suffix: suffix.into(),
}
}
}
impl TableNamingStrategy for SuffixNaming {
fn transform(&self, schema: Option<&str>, table: &str) -> (Option<String>, String) {
(
schema.map(String::from),
format!("{}{}", table, self.suffix),
)
}
}
#[derive(Debug, Clone)]
pub struct SchemaMapping {
mappings: HashMap<String, String>,
default_schema: Option<String>,
}
impl SchemaMapping {
pub fn new() -> Self {
Self {
mappings: HashMap::new(),
default_schema: None,
}
}
pub fn map(mut self, from: impl Into<String>, to: impl Into<String>) -> Self {
self.mappings.insert(from.into(), to.into());
self
}
pub fn default_schema(mut self, schema: impl Into<String>) -> Self {
self.default_schema = Some(schema.into());
self
}
}
impl Default for SchemaMapping {
fn default() -> Self {
Self::new()
}
}
impl TableNamingStrategy for SchemaMapping {
fn transform(&self, schema: Option<&str>, table: &str) -> (Option<String>, String) {
let target_schema = schema
.and_then(|s| self.mappings.get(s).cloned())
.or_else(|| self.default_schema.clone())
.or_else(|| schema.map(String::from));
(target_schema, table.to_string())
}
}
pub fn schema_provider(_conn: &dyn Connection) -> Result<Box<dyn SchemaProvider>> {
Err(crate::error::Error::Unsupported {
message: "Schema provider must be created through a backend-specific connection. \
Use the connection's schema_provider() method directly."
.into(),
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_identity_naming() {
let naming = IdentityNaming;
let (schema, table) = naming.transform(Some("public"), "users");
assert_eq!(schema, Some("public".to_string()));
assert_eq!(table, "users");
}
#[test]
fn test_prefix_naming() {
let naming = PrefixNaming::new("cdc_");
let (_, table) = naming.transform(None, "users");
assert_eq!(table, "cdc_users");
}
#[test]
fn test_suffix_naming() {
let naming = SuffixNaming::new("_replica");
let (_, table) = naming.transform(None, "orders");
assert_eq!(table, "orders_replica");
}
#[test]
fn test_schema_mapping() {
let naming = SchemaMapping::new()
.map("source_db", "target_db")
.default_schema("default_target");
let (schema, _) = naming.transform(Some("source_db"), "users");
assert_eq!(schema, Some("target_db".into()));
let (schema, _) = naming.transform(Some("unknown"), "users");
assert_eq!(schema, Some("default_target".into()));
let (schema, _) = naming.transform(None, "users");
assert_eq!(schema, Some("default_target".into()));
}
#[test]
fn test_schema_evolution_result() {
let result = SchemaEvolutionResult::table_created();
assert!(result.has_changes());
assert!(result.table_created);
let mut result = SchemaEvolutionResult::default();
assert!(!result.has_changes());
result.columns_added.push("new_col".into());
assert!(result.has_changes());
}
#[test]
fn test_foreign_key_action_sql() {
assert_eq!(ForeignKeyAction::Cascade.to_sql(), "CASCADE");
assert_eq!(ForeignKeyAction::SetNull.to_sql(), "SET NULL");
}
#[test]
fn test_index_metadata() {
let idx = IndexMetadata::new("users", "idx_users_email", vec!["email".into()]).unique();
assert_eq!(idx.table, "users");
assert_eq!(idx.name, "idx_users_email");
assert!(idx.unique);
assert!(!idx.primary);
}
}