use crate::config::types::*;
use crate::config::{validate, FullConfig};
use crate::db::parse_canonical;
use crate::db::pool::Pool;
use crate::db::Dialect;
use crate::error::AppError;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
fn quote(s: &str) -> String {
format!("\"{}\"", s.replace('\\', "\\\\").replace('"', "\\\""))
}
pub const RLS_TENANT_COLUMN: &str = "tenant_id";
pub async fn apply_migrations(
pool: &Pool,
config: &FullConfig,
schema_override: Option<&str>,
rls_tenant_column: Option<&str>,
dialect: &dyn Dialect,
cross_package_configs: &HashMap<String, FullConfig>,
) -> Result<(), AppError> {
validate(config)?;
let default_sid = config
.schemas
.first()
.map(|s| s.id.as_str())
.ok_or_else(|| {
AppError::Config(crate::error::ConfigError::Validation(
"at least one schema required".into(),
))
})?;
if dialect.supports_schemas() {
if let Some(s) = schema_override {
let name = quote(s);
sqlx::query(&format!("CREATE SCHEMA IF NOT EXISTS {}", name))
.execute(pool)
.await?;
}
}
let schemas_by_id: HashMap<_, _> = config.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
let tables_by_id: HashMap<_, _> = config.tables.iter().map(|t| (t.id.as_str(), t)).collect();
let columns_by_table: HashMap<_, Vec<&ColumnConfig>> =
config.columns.iter().fold(HashMap::new(), |mut m, c| {
m.entry(c.table_id.as_str()).or_default().push(c);
m
});
if schema_override.is_none() && dialect.supports_schemas() {
for s in &config.schemas {
let name = quote(&s.name);
let comment = s
.comment
.as_ref()
.map(|c| format!("COMMENT ON SCHEMA {} IS '{}'", name, c.replace('\'', "''")));
sqlx::query(&format!("CREATE SCHEMA IF NOT EXISTS {}", name))
.execute(pool)
.await?;
if let Some(sql) = comment {
let _ = sqlx::query(&sql).execute(pool).await;
}
}
}
for e in &config.enums {
let sid = e.schema_id.as_deref().unwrap_or(default_sid);
let schema = schemas_by_id.get(sid).ok_or_else(|| {
AppError::Config(crate::error::ConfigError::MissingReference {
kind: "schema",
id: sid.to_string(),
})
})?;
let schema_name = quote(schema_override.unwrap_or(&schema.name));
let type_name = quote(&e.name);
if dialect.supports_named_enum_types() {
let values: Vec<String> = e
.values
.iter()
.map(|v| format!("'{}'", v.replace('\'', "''")))
.collect();
let sql = format!(
"CREATE TYPE {}.{} AS ENUM ({})",
schema_name,
type_name,
values.join(", ")
);
let _ = sqlx::query(&sql).execute(pool).await;
}
}
for t in &config.tables {
let sid = t.schema_id.as_deref().unwrap_or(default_sid);
let schema = schemas_by_id.get(sid).ok_or_else(|| {
AppError::Config(crate::error::ConfigError::MissingReference {
kind: "schema",
id: sid.to_string(),
})
})?;
let schema_name = quote(schema_override.unwrap_or(&schema.name));
let table_name = quote(&t.name);
let full_name = format!("{}.{}", schema_name, table_name);
let cols = columns_by_table
.get(t.id.as_str())
.map(|v| v.as_slice())
.unwrap_or(&[]);
let mut col_defs: Vec<String> = Vec::new();
for c in cols {
let typ = dialect.ddl_type(&parse_canonical(&c.type_));
let mut def = format!("{} {}", quote(&c.name), typ);
if !c.nullable {
def.push_str(" NOT NULL");
}
if let Some(ref d) = c.default {
def.push_str(" DEFAULT ");
match d {
ColumnDefaultConfig::Literal(s) => def.push_str(s),
ColumnDefaultConfig::Expression { expression } => def.push_str(expression),
}
}
col_defs.push(def);
}
let config_col_names: HashSet<&str> = cols.iter().map(|c| c.name.as_str()).collect();
let ts_default = format!(
"{} NOT NULL DEFAULT {}",
dialect.sys_timestamp_type(),
dialect.now_fn()
);
let ts_nullable = dialect.sys_timestamp_type().to_string();
for (name, def_suffix) in [
("created_at", ts_default.as_str()),
("updated_at", ts_default.as_str()),
("archived_at", ts_nullable.as_str()),
("created_by", "TEXT"),
("updated_by", "TEXT"),
] {
if !config_col_names.contains(name) {
col_defs.push(format!("{} {}", quote(name), def_suffix));
}
}
let pk_cols = match &t.primary_key {
PrimaryKeyConfig::Single(s) => vec![quote(s)],
PrimaryKeyConfig::Composite(v) => v.iter().map(|s| quote(s)).collect::<Vec<_>>(),
};
let pk_def = format!("PRIMARY KEY ({})", pk_cols.join(", "));
col_defs.push(pk_def);
for u in &t.unique {
let cols: Vec<String> = u.iter().map(|s| quote(s)).collect();
col_defs.push(format!("UNIQUE ({})", cols.join(", ")));
}
for ch in &t.check {
col_defs.push(format!(
"CONSTRAINT {} CHECK ({})",
quote(&ch.name),
ch.expression
));
}
let sql = format!(
"CREATE TABLE IF NOT EXISTS {} (\n {}\n)",
full_name,
col_defs.join(",\n ")
);
sqlx::query(&sql).execute(pool).await?;
if t.audit_log {
let schema_raw = schema_override.unwrap_or(&schema.name);
let audit_sql = audit_table_ddl(schema_raw, &t.name, cols, dialect);
sqlx::query(&audit_sql).execute(pool).await?;
let pk_col = match &t.primary_key {
PrimaryKeyConfig::Single(s) => s.clone(),
PrimaryKeyConfig::Composite(v) => v[0].clone(),
};
let audit_full = format!(
"{}.{}",
quote(schema_raw),
quote(&format!("{}_audit", t.name))
);
let idx_sql = format!(
"CREATE INDEX IF NOT EXISTS {} ON {} ({}, {})",
quote(&format!("{}_audit_record_idx", t.name)),
audit_full,
quote(&pk_col),
quote("audit_at")
);
let _ = sqlx::query(&idx_sql).execute(pool).await;
}
if t.versioning.as_ref().is_some_and(|v| v.enabled) {
let schema_raw = schema_override.unwrap_or(&schema.name);
let pk_col = match &t.primary_key {
PrimaryKeyConfig::Single(s) => s.clone(),
PrimaryKeyConfig::Composite(v) => v[0].clone(),
};
let history_ddl = history_table_ddl(schema_raw, &t.name, &pk_col, cols, dialect);
let create_only = history_ddl
.lines()
.take_while(|l| !l.trim_start().starts_with("-- index:"))
.collect::<Vec<_>>()
.join("\n");
sqlx::query(create_only.trim()).execute(pool).await?;
let idx_sql = history_index_ddl(schema_raw, &t.name, &pk_col);
let _ = sqlx::query(&idx_sql).execute(pool).await;
}
if let Some(col) = rls_tenant_column {
if dialect.supports_rls() {
if !config_col_names.contains(col) {
let q_col = quote(col);
let add_col = format!(
"ALTER TABLE {} ADD COLUMN IF NOT EXISTS {} TEXT",
full_name, q_col
);
sqlx::query(&add_col).execute(pool).await?;
}
let enable_rls = format!("ALTER TABLE {} ENABLE ROW LEVEL SECURITY", full_name);
sqlx::query(&enable_rls).execute(pool).await?;
let q_col = quote(col);
let setting = "current_setting('app.tenant_id', true)";
let cond = format!("{} = {}", q_col, setting);
let policy_prefix = format!("rls_tenant_{}", t.name);
let policies: &[(&str, &str, Option<&str>, Option<&str>)] = &[
("select", "SELECT", Some(cond.as_str()), None),
("insert", "INSERT", None, Some(cond.as_str())),
("update", "UPDATE", Some(cond.as_str()), Some(cond.as_str())),
("delete", "DELETE", Some(cond.as_str()), None),
];
for (suffix, cmd, using_cond, with_check) in policies.iter() {
let policy_name = format!("{}_{}", policy_prefix, suffix);
let drop_sql = format!(
"DROP POLICY IF EXISTS {} ON {}",
quote(&policy_name),
full_name
);
let _ = sqlx::query(&drop_sql).execute(pool).await;
let create_sql = match (using_cond, with_check) {
(Some(u), Some(w)) => format!(
"CREATE POLICY {} ON {} FOR {} USING ( {} ) WITH CHECK ( {} )",
quote(&policy_name),
full_name,
cmd,
u,
w
),
(Some(u), None) => format!(
"CREATE POLICY {} ON {} FOR {} USING ( {} )",
quote(&policy_name),
full_name,
cmd,
u
),
(None, Some(w)) => format!(
"CREATE POLICY {} ON {} FOR {} WITH CHECK ( {} )",
quote(&policy_name),
full_name,
cmd,
w
),
(None, None) => continue,
};
sqlx::query(&create_sql).execute(pool).await?;
}
} else {
tracing::warn!(table = %full_name, dialect = %dialect.name(), "RLS requested but not supported by this dialect; skipping");
}
}
}
for idx in &config.indexes {
let sid = idx.schema_id.as_deref().unwrap_or(default_sid);
let schema = schemas_by_id.get(sid).ok_or_else(|| {
AppError::Config(crate::error::ConfigError::MissingReference {
kind: "schema",
id: sid.to_string(),
})
})?;
let table = tables_by_id.get(idx.table_id.as_str()).ok_or_else(|| {
AppError::Config(crate::error::ConfigError::MissingReference {
kind: "table",
id: idx.table_id.clone(),
})
})?;
let schema_name = quote(schema_override.unwrap_or(&schema.name));
let table_name = quote(&table.name);
let full_table = format!("{}.{}", schema_name, table_name);
let index_name = quote(&idx.name);
let mut col_parts: Vec<String> = Vec::new();
for col in &idx.columns {
match col {
IndexColumnEntry::Name(n) => col_parts.push(quote(n)),
IndexColumnEntry::Spec {
name, direction, ..
} => {
let dir = direction
.as_deref()
.map(|d| format!(" {}", d.to_uppercase()))
.unwrap_or_default();
col_parts.push(format!("{}{}", quote(name), dir));
}
IndexColumnEntry::Expression { expression } => col_parts.push(expression.clone()),
}
}
let method = idx.method.as_deref().unwrap_or("btree");
let unique = if idx.unique { "UNIQUE " } else { "" };
let include: String = if idx.include.is_empty() {
String::new()
} else {
let inc: Vec<String> = idx.include.iter().map(|s| quote(s)).collect();
format!(" INCLUDE ({})", inc.join(", "))
};
let where_clause: String = idx
.where_
.as_ref()
.map(|w| format!(" WHERE {}", w))
.unwrap_or_default();
let sql = format!(
"CREATE {}INDEX IF NOT EXISTS {} ON {} USING {} ({}){}{}",
unique,
index_name,
full_table,
method,
col_parts.join(", "),
include,
where_clause
);
let _ = sqlx::query(&sql).execute(pool).await;
}
for rel in &config.relationships {
let from_sid = rel.from_schema_id.as_deref().unwrap_or(default_sid);
let from_schema = schemas_by_id.get(from_sid).ok_or_else(|| {
AppError::Config(crate::error::ConfigError::MissingReference {
kind: "schema",
id: from_sid.to_string(),
})
})?;
let from_table = tables_by_id
.get(rel.from_table_id.as_str())
.ok_or_else(|| {
AppError::Config(crate::error::ConfigError::MissingReference {
kind: "table",
id: rel.from_table_id.clone(),
})
})?;
let (to_schema_name_owned, to_table_name, to_col_name) = if let Some(pkg_id) =
rel.to_package_id.as_deref()
{
let foreign = cross_package_configs.get(pkg_id).ok_or_else(|| {
AppError::Config(crate::error::ConfigError::MissingReference {
kind: "cross_package",
id: pkg_id.to_string(),
})
})?;
let foreign_tables: HashMap<_, _> =
foreign.tables.iter().map(|t| (t.id.as_str(), t)).collect();
let foreign_schemas: HashMap<_, _> =
foreign.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
let to_tbl = foreign_tables
.get(rel.to_table_id.as_str())
.ok_or_else(|| {
AppError::Config(crate::error::ConfigError::MissingReference {
kind: "table",
id: rel.to_table_id.clone(),
})
})?;
let foreign_default_sid = foreign.schemas.first().map(|s| s.id.as_str()).unwrap_or("");
let to_sid = rel.to_schema_id.as_deref().unwrap_or(foreign_default_sid);
let to_schema = foreign_schemas.get(to_sid).ok_or_else(|| {
AppError::Config(crate::error::ConfigError::MissingReference {
kind: "schema",
id: to_sid.to_string(),
})
})?;
let col_name = foreign
.columns
.iter()
.find(|c| c.id == rel.to_column_id)
.map(|c| c.name.clone())
.ok_or_else(|| {
AppError::Config(crate::error::ConfigError::MissingReference {
kind: "column",
id: rel.to_column_id.clone(),
})
})?;
(to_schema.name.clone(), to_tbl.name.clone(), col_name)
} else {
let to_sid = rel.to_schema_id.as_deref().unwrap_or(default_sid);
let to_schema = schemas_by_id.get(to_sid).ok_or_else(|| {
AppError::Config(crate::error::ConfigError::MissingReference {
kind: "schema",
id: to_sid.to_string(),
})
})?;
let to_table = tables_by_id.get(rel.to_table_id.as_str()).ok_or_else(|| {
AppError::Config(crate::error::ConfigError::MissingReference {
kind: "table",
id: rel.to_table_id.clone(),
})
})?;
let col_name = config
.columns
.iter()
.find(|c| c.id == rel.to_column_id)
.map(|c| c.name.clone())
.ok_or_else(|| {
AppError::Config(crate::error::ConfigError::MissingReference {
kind: "column",
id: rel.to_column_id.clone(),
})
})?;
(
schema_override.unwrap_or(&to_schema.name).to_string(),
to_table.name.clone(),
col_name,
)
};
let from_schema_name = schema_override.unwrap_or(&from_schema.name);
let from_col = config
.columns
.iter()
.find(|c| c.id == rel.from_column_id)
.map(|c| c.name.as_str())
.ok_or_else(|| {
AppError::Config(crate::error::ConfigError::MissingReference {
kind: "column",
id: rel.from_column_id.clone(),
})
})?;
let from_full = format!("{}.{}", quote(from_schema_name), quote(&from_table.name));
let to_full = format!("{}.{}", quote(&to_schema_name_owned), quote(&to_table_name));
let constraint_name = rel.name.as_deref().unwrap_or(&rel.id);
let on_update = rel.on_update.as_deref().unwrap_or("NO ACTION");
let on_delete = rel.on_delete.as_deref().unwrap_or("NO ACTION");
let sql = format!(
"ALTER TABLE {} ADD CONSTRAINT {} FOREIGN KEY ({}) REFERENCES {} ({}) ON UPDATE {} ON DELETE {}",
from_full,
quote(constraint_name),
quote(from_col),
to_full,
quote(&to_col_name),
on_update,
on_delete
);
let _ = sqlx::query(&sql).execute(pool).await;
}
Ok(())
}
pub async fn revert_migrations(
pool: &Pool,
config: &FullConfig,
schema_override: Option<&str>,
) -> Result<(), AppError> {
let default_sid = config
.schemas
.first()
.map(|s| s.id.as_str())
.ok_or_else(|| {
AppError::Config(crate::error::ConfigError::Validation(
"at least one schema required".into(),
))
})?;
let schemas_by_id: HashMap<_, _> = config.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
for t in &config.tables {
let sid = t.schema_id.as_deref().unwrap_or(default_sid);
let schema = schemas_by_id.get(sid).ok_or_else(|| {
AppError::Config(crate::error::ConfigError::MissingReference {
kind: "schema",
id: sid.to_string(),
})
})?;
let schema_raw = schema_override.unwrap_or(&schema.name);
let schema_name = quote(schema_raw);
let table_name = quote(&t.name);
let full_name = format!("{}.{}", schema_name, table_name);
if t.audit_log {
let audit_full = format!("{}.{}", schema_name, quote(&format!("{}_audit", t.name)));
let _ = sqlx::query(&format!("DROP TABLE IF EXISTS {} CASCADE", audit_full))
.execute(pool)
.await;
}
if t.versioning.as_ref().is_some_and(|v| v.enabled) {
let history_full = format!("{}.{}", schema_name, quote(&format!("{}_history", t.name)));
let _ = sqlx::query(&format!("DROP TABLE IF EXISTS {} CASCADE", history_full))
.execute(pool)
.await;
}
let drop_sql = format!("DROP TABLE IF EXISTS {} CASCADE", full_name);
let _ = sqlx::query(&drop_sql).execute(pool).await;
}
for e in &config.enums {
let sid = e.schema_id.as_deref().unwrap_or(default_sid);
let schema = schemas_by_id.get(sid).ok_or_else(|| {
AppError::Config(crate::error::ConfigError::MissingReference {
kind: "schema",
id: sid.to_string(),
})
})?;
let schema_name = quote(schema_override.unwrap_or(&schema.name));
let type_name = quote(&e.name);
let drop_sql = format!("DROP TYPE IF EXISTS {}.{} CASCADE", schema_name, type_name);
let _ = sqlx::query(&drop_sql).execute(pool).await;
}
if schema_override.is_none() {
for s in &config.schemas {
if s.name.eq_ignore_ascii_case("public") {
continue;
}
let schema_name = quote(&s.name);
let drop_sql = format!("DROP SCHEMA IF EXISTS {} CASCADE", schema_name);
let _ = sqlx::query(&drop_sql).execute(pool).await;
}
}
Ok(())
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum MigrationOperation {
CreateSchema,
CreateEnum,
DropEnum,
AddEnumValue,
RemoveEnumValue,
CreateTable,
DropTable,
AddColumn,
DropColumn,
RenameColumn,
AlterColumnType,
BackfillNulls,
SetNotNull,
DropNotNull,
SetDefault,
DropDefault,
CreateIndex,
DropIndex,
AddForeignKey,
DropForeignKey,
}
impl std::fmt::Display for MigrationOperation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = serde_json::to_value(self)
.ok()
.and_then(|v| v.as_str().map(String::from))
.unwrap_or_else(|| format!("{:?}", self));
write!(f, "{}", s)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum MigrationSafety {
Safe,
BestEffort,
WarnOnly,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum MigrationRisk {
None,
MayFail,
ExistingNullsMustBeAbsent,
DataWillBeModified,
ManualActionRequired,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MigrationStep {
pub step: usize,
pub operation: MigrationOperation,
pub schema: String,
pub table: Option<String>,
pub object: String,
pub object_type: String,
pub description: String,
pub ddl: Option<String>,
pub safety: MigrationSafety,
pub risk: MigrationRisk,
pub risk_detail: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MigrationPlan {
pub steps: Vec<MigrationStep>,
}
#[derive(Debug, Clone, Serialize)]
pub struct MigrationSummary {
pub total: usize,
pub safe: usize,
pub best_effort: usize,
pub warn_only: usize,
}
impl MigrationPlan {
pub fn summary(&self) -> MigrationSummary {
let (mut safe, mut best_effort, mut warn_only) = (0, 0, 0);
for s in &self.steps {
match s.safety {
MigrationSafety::Safe => safe += 1,
MigrationSafety::BestEffort => best_effort += 1,
MigrationSafety::WarnOnly => warn_only += 1,
}
}
MigrationSummary {
total: self.steps.len(),
safe,
best_effort,
warn_only,
}
}
}
pub struct MigrationExecutionResult {
pub applied: usize,
pub warned: usize,
pub warnings: Vec<String>,
}
fn default_str(d: &ColumnDefaultConfig) -> String {
match d {
ColumnDefaultConfig::Literal(s) => s.clone(),
ColumnDefaultConfig::Expression { expression } => expression.clone(),
}
}
struct EnumColumnRef {
schema: String,
table: String,
column: String,
default: Option<String>,
is_array: bool,
}
fn enum_type_name(t: &crate::db::CanonicalType) -> Option<(String, bool)> {
use crate::db::CanonicalType;
let unqualified = |s: &str| s.rsplit('.').next().unwrap_or(s).to_string();
match t {
CanonicalType::Custom(s) => Some((unqualified(s), false)),
CanonicalType::Array(inner) => match inner.as_ref() {
CanonicalType::Custom(s) => Some((unqualified(s), true)),
_ => None,
},
_ => None,
}
}
fn enum_dependent_columns(
new_enum: &EnumConfig,
new: &FullConfig,
new_tables: &HashMap<&str, &TableConfig>,
new_schemas: &HashMap<&str, &SchemaConfig>,
schema_override: Option<&str>,
) -> Vec<EnumColumnRef> {
let default_sid = new.schemas.first().map(|s| s.id.as_str()).unwrap_or("");
let mut out = Vec::new();
for c in &new.columns {
let Some((tyname, is_array)) = enum_type_name(&parse_canonical(&c.type_)) else {
continue;
};
if tyname != new_enum.name {
continue;
}
let Some(table) = new_tables.get(c.table_id.as_str()) else {
continue;
};
let tsid = table.schema_id.as_deref().unwrap_or(default_sid);
let schema = schema_override.map(String::from).unwrap_or_else(|| {
new_schemas
.get(tsid)
.map(|s| s.name.clone())
.unwrap_or_else(|| tsid.to_string())
});
out.push(EnumColumnRef {
schema,
table: table.name.clone(),
column: c.name.clone(),
default: c.default.as_ref().map(default_str),
is_array,
});
}
out
}
fn recreate_enum_steps(
steps: &mut Vec<MigrationStep>,
schema: &str,
new_enum: &EnumConfig,
removed: &[&str],
dependents: &[EnumColumnRef],
) {
let type_q = format!("{}.{}", quote(schema), quote(&new_enum.name));
let tmp_name = format!("{}__arch_old", new_enum.name);
let values: Vec<String> = new_enum
.values
.iter()
.map(|v| format!("'{}'", v.replace('\'', "''")))
.collect();
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::RemoveEnumValue,
schema: schema.to_string(),
table: None,
object: format!("{}:{}", new_enum.name, removed.join(",")),
object_type: "enum".into(),
description: format!(
"Rebuild enum \"{}\".\"{}\" to remove value(s): {}",
schema,
new_enum.name,
removed.join(", ")
),
ddl: None,
safety: MigrationSafety::WarnOnly,
risk: MigrationRisk::ManualActionRequired,
risk_detail: Some(format!(
"PostgreSQL cannot drop enum values in place. The type is rebuilt and {} dependent \
column(s) are recast via a text cast. Any existing row holding a removed value ({}) \
will make its recast fail — reassign those rows first.",
dependents.len(),
removed.join(", ")
)),
});
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::DropEnum,
schema: schema.to_string(),
table: None,
object: new_enum.name.clone(),
object_type: "enum".into(),
description: format!(
"Rename enum \"{}\".\"{}\" to \"{}\" before rebuild",
schema, new_enum.name, tmp_name
),
ddl: Some(format!(
"ALTER TYPE {} RENAME TO {}",
type_q,
quote(&tmp_name)
)),
safety: MigrationSafety::BestEffort,
risk: MigrationRisk::None,
risk_detail: None,
});
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::CreateEnum,
schema: schema.to_string(),
table: None,
object: new_enum.name.clone(),
object_type: "enum".into(),
description: format!(
"Recreate enum \"{}\".\"{}\" with {} value(s)",
schema,
new_enum.name,
new_enum.values.len()
),
ddl: Some(format!(
"CREATE TYPE {} AS ENUM ({})",
type_q,
values.join(", ")
)),
safety: MigrationSafety::BestEffort,
risk: MigrationRisk::None,
risk_detail: None,
});
for dep in dependents {
let table_q = format!("{}.{}", quote(&dep.schema), quote(&dep.table));
let col_q = quote(&dep.column);
if dep.default.is_some() {
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::DropDefault,
schema: dep.schema.clone(),
table: Some(dep.table.clone()),
object: dep.column.clone(),
object_type: "column".into(),
description: format!(
"Drop default on {}.{} before enum recast",
dep.table, dep.column
),
ddl: Some(format!(
"ALTER TABLE {} ALTER COLUMN {} DROP DEFAULT",
table_q, col_q
)),
safety: MigrationSafety::BestEffort,
risk: MigrationRisk::None,
risk_detail: None,
});
}
let (col_type, using) = if dep.is_array {
(
format!("{}[]", type_q),
format!("{}::text[]::{}[]", col_q, type_q),
)
} else {
(type_q.clone(), format!("{}::text::{}", col_q, type_q))
};
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::AlterColumnType,
schema: dep.schema.clone(),
table: Some(dep.table.clone()),
object: dep.column.clone(),
object_type: "column".into(),
description: format!(
"Recast {}.{} onto rebuilt enum \"{}\"",
dep.table, dep.column, new_enum.name
),
ddl: Some(format!(
"ALTER TABLE {} ALTER COLUMN {} TYPE {} USING {}",
table_q, col_q, col_type, using
)),
safety: MigrationSafety::BestEffort,
risk: MigrationRisk::MayFail,
risk_detail: Some(format!(
"Cast fails if any row holds a removed value ({}).",
removed.join(", ")
)),
});
if let Some(def) = &dep.default {
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::SetDefault,
schema: dep.schema.clone(),
table: Some(dep.table.clone()),
object: dep.column.clone(),
object_type: "column".into(),
description: format!(
"Restore default on {}.{} after enum recast",
dep.table, dep.column
),
ddl: Some(format!(
"ALTER TABLE {} ALTER COLUMN {} SET DEFAULT {}",
table_q, col_q, def
)),
safety: MigrationSafety::BestEffort,
risk: MigrationRisk::None,
risk_detail: None,
});
}
}
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::DropEnum,
schema: schema.to_string(),
table: None,
object: tmp_name.clone(),
object_type: "enum".into(),
description: format!("Drop superseded enum \"{}\".\"{}\"", schema, tmp_name),
ddl: Some(format!(
"DROP TYPE IF EXISTS {}.{}",
quote(schema),
quote(&tmp_name)
)),
safety: MigrationSafety::BestEffort,
risk: MigrationRisk::None,
risk_detail: None,
});
}
pub fn compute_migration_plan(
old: &FullConfig,
new: &FullConfig,
schema_override: Option<&str>,
_rls_tenant_column: Option<&str>,
dialect: &dyn Dialect,
cross_package_configs: &HashMap<String, FullConfig>,
) -> Result<MigrationPlan, AppError> {
validate(new)?;
let default_old_sid = old.schemas.first().map(|s| s.id.as_str()).unwrap_or("");
let default_new_sid = new.schemas.first().map(|s| s.id.as_str()).unwrap_or("");
let old_schemas: HashMap<&str, &SchemaConfig> =
old.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
let new_schemas: HashMap<&str, &SchemaConfig> =
new.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
let old_tables: HashMap<&str, &TableConfig> =
old.tables.iter().map(|t| (t.id.as_str(), t)).collect();
let new_tables: HashMap<&str, &TableConfig> =
new.tables.iter().map(|t| (t.id.as_str(), t)).collect();
let old_columns: HashMap<&str, &ColumnConfig> =
old.columns.iter().map(|c| (c.id.as_str(), c)).collect();
let old_enums: HashMap<&str, &EnumConfig> =
old.enums.iter().map(|e| (e.id.as_str(), e)).collect();
let new_enums: HashMap<&str, &EnumConfig> =
new.enums.iter().map(|e| (e.id.as_str(), e)).collect();
let old_indexes: HashMap<&str, &IndexConfig> =
old.indexes.iter().map(|i| (i.id.as_str(), i)).collect();
let new_indexes: HashMap<&str, &IndexConfig> =
new.indexes.iter().map(|i| (i.id.as_str(), i)).collect();
let old_rels: HashMap<&str, &RelationshipConfig> = old
.relationships
.iter()
.map(|r| (r.id.as_str(), r))
.collect();
let new_rels: HashMap<&str, &RelationshipConfig> = new
.relationships
.iter()
.map(|r| (r.id.as_str(), r))
.collect();
let mut steps: Vec<MigrationStep> = Vec::new();
let schema_name_for = |sid: &str, schemas: &HashMap<&str, &SchemaConfig>| -> String {
schema_override.map(String::from).unwrap_or_else(|| {
schemas
.get(sid)
.map(|s| s.name.clone())
.unwrap_or_else(|| sid.to_string())
})
};
if schema_override.is_none() {
for s in &new.schemas {
if !old_schemas.contains_key(s.id.as_str()) {
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::CreateSchema,
schema: s.name.clone(),
table: None,
object: s.name.clone(),
object_type: "schema".into(),
description: format!("Create schema \"{}\"", s.name),
ddl: Some(format!("CREATE SCHEMA IF NOT EXISTS {}", quote(&s.name))),
safety: MigrationSafety::Safe,
risk: MigrationRisk::None,
risk_detail: None,
});
}
}
}
for new_enum in &new.enums {
let sid = new_enum.schema_id.as_deref().unwrap_or(default_new_sid);
let schema = schema_name_for(sid, &new_schemas);
if let Some(old_enum) = old_enums.get(new_enum.id.as_str()) {
let old_vals: HashSet<&str> = old_enum.values.iter().map(String::as_str).collect();
let new_vals: HashSet<&str> = new_enum.values.iter().map(String::as_str).collect();
let removed: Vec<&str> = old_enum
.values
.iter()
.map(String::as_str)
.filter(|v| !new_vals.contains(v))
.collect();
if removed.is_empty() {
for val in new_enum
.values
.iter()
.map(String::as_str)
.filter(|v| !old_vals.contains(v))
{
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::AddEnumValue,
schema: schema.clone(),
table: None,
object: format!("{}:{}", new_enum.name, val),
object_type: "enum_value".into(),
description: format!(
"Add value '{}' to enum \"{}\".\"{}\"",
val, schema, new_enum.name
),
ddl: Some(format!(
"ALTER TYPE {}.{} ADD VALUE IF NOT EXISTS '{}'",
quote(&schema),
quote(&new_enum.name),
val.replace('\'', "''")
)),
safety: MigrationSafety::Safe,
risk: MigrationRisk::None,
risk_detail: None,
});
}
} else {
let dependents = enum_dependent_columns(
new_enum,
new,
&new_tables,
&new_schemas,
schema_override,
);
recreate_enum_steps(&mut steps, &schema, new_enum, &removed, &dependents);
}
} else {
let values: Vec<String> = new_enum
.values
.iter()
.map(|v| format!("'{}'", v.replace('\'', "''")))
.collect();
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::CreateEnum,
schema: schema.clone(),
table: None,
object: new_enum.name.clone(),
object_type: "enum".into(),
description: format!("Create enum type \"{}\".\"{}\"", schema, new_enum.name),
ddl: Some(format!("CREATE TYPE {}.{} AS ENUM ({})", quote(&schema), quote(&new_enum.name), values.join(", "))),
safety: MigrationSafety::BestEffort,
risk: MigrationRisk::None,
risk_detail: Some("PostgreSQL has no CREATE TYPE IF NOT EXISTS; ignored if the type already exists.".into()),
});
}
}
for old_enum in &old.enums {
if !new_enums.contains_key(old_enum.id.as_str()) {
let sid = old_enum.schema_id.as_deref().unwrap_or(default_old_sid);
let schema = schema_name_for(sid, &old_schemas);
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::DropEnum,
schema: schema.clone(),
table: None,
object: old_enum.name.clone(),
object_type: "enum".into(),
description: format!("Enum \"{}\".\"{}\" removed from config", schema, old_enum.name),
ddl: None,
safety: MigrationSafety::WarnOnly,
risk: MigrationRisk::ManualActionRequired,
risk_detail: Some("Enum type NOT dropped from database (data safety). Run DROP TYPE manually if intended.".into()),
});
}
}
let added_table_ids: HashSet<&str> = new
.tables
.iter()
.filter(|t| !old_tables.contains_key(t.id.as_str()))
.map(|t| t.id.as_str())
.collect();
let cols_by_table: HashMap<&str, Vec<&ColumnConfig>> =
new.columns.iter().fold(HashMap::new(), |mut m, c| {
m.entry(c.table_id.as_str()).or_default().push(c);
m
});
for new_table in &new.tables {
if !added_table_ids.contains(new_table.id.as_str()) {
continue;
}
let sid = new_table.schema_id.as_deref().unwrap_or(default_new_sid);
let schema = schema_name_for(sid, &new_schemas);
let full = format!("{}.{}", quote(&schema), quote(&new_table.name));
let cols = cols_by_table
.get(new_table.id.as_str())
.map(|v| v.as_slice())
.unwrap_or(&[]);
let mut col_defs: Vec<String> = Vec::new();
for c in cols {
let typ = dialect.ddl_type(&parse_canonical(&c.type_));
let mut def = format!("{} {}", quote(&c.name), typ);
if !c.nullable {
def.push_str(" NOT NULL");
}
if let Some(ref d) = c.default {
def.push_str(" DEFAULT ");
match d {
ColumnDefaultConfig::Literal(s) => def.push_str(s),
ColumnDefaultConfig::Expression { expression } => def.push_str(expression),
}
}
col_defs.push(def);
}
let cfg_col_names: HashSet<&str> = cols.iter().map(|c| c.name.as_str()).collect();
for (name, suf) in [
("created_at", "TIMESTAMPTZ NOT NULL DEFAULT NOW()"),
("updated_at", "TIMESTAMPTZ NOT NULL DEFAULT NOW()"),
("archived_at", "TIMESTAMPTZ"),
("created_by", "TEXT"),
("updated_by", "TEXT"),
] {
if !cfg_col_names.contains(name) {
col_defs.push(format!("{} {}", quote(name), suf));
}
}
let pk_cols = match &new_table.primary_key {
PrimaryKeyConfig::Single(s) => vec![quote(s)],
PrimaryKeyConfig::Composite(v) => v.iter().map(|s| quote(s)).collect(),
};
col_defs.push(format!("PRIMARY KEY ({})", pk_cols.join(", ")));
for u in &new_table.unique {
col_defs.push(format!(
"UNIQUE ({})",
u.iter().map(|s| quote(s)).collect::<Vec<_>>().join(", ")
));
}
for ch in &new_table.check {
col_defs.push(format!(
"CONSTRAINT {} CHECK ({})",
quote(&ch.name),
ch.expression
));
}
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::CreateTable,
schema: schema.clone(),
table: Some(new_table.name.clone()),
object: new_table.name.clone(),
object_type: "table".into(),
description: format!("Create table \"{}\".\"{}\"", schema, new_table.name),
ddl: Some(format!(
"CREATE TABLE IF NOT EXISTS {} (\n {}\n)",
full,
col_defs.join(",\n ")
)),
safety: MigrationSafety::Safe,
risk: MigrationRisk::None,
risk_detail: None,
});
if new_table.audit_log {
let audit_ddl = audit_table_ddl(&schema, &new_table.name, cols, dialect);
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::CreateTable,
schema: schema.clone(),
table: Some(format!("{}_audit", new_table.name)),
object: format!("{}_audit", new_table.name),
object_type: "table".into(),
description: format!(
"Create audit table \"{}\".\"{}_audit\"",
schema, new_table.name
),
ddl: Some(audit_ddl),
safety: MigrationSafety::Safe,
risk: MigrationRisk::None,
risk_detail: None,
});
}
if new_table.versioning.as_ref().is_some_and(|v| v.enabled) {
let pk_col = match &new_table.primary_key {
PrimaryKeyConfig::Single(s) => s.clone(),
PrimaryKeyConfig::Composite(v) => v[0].clone(),
};
let history_create = format!(
"CREATE TABLE IF NOT EXISTS {}.{} (\n {}\n)",
quote(&schema),
quote(&format!("{}_history", new_table.name)),
{
let full_ddl =
history_table_ddl(&schema, &new_table.name, &pk_col, cols, dialect);
full_ddl
.lines()
.skip(1) .take_while(|l| !l.trim_start().starts_with("-- index:"))
.collect::<Vec<_>>()
.join("\n")
.trim_end_matches(['\n', ',', ')'])
.to_string()
+ "\n)"
}
);
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::CreateTable,
schema: schema.clone(),
table: Some(format!("{}_history", new_table.name)),
object: format!("{}_history", new_table.name),
object_type: "table".into(),
description: format!(
"Create history table \"{}\".\"{}_history\" (versioning)",
schema, new_table.name
),
ddl: Some(history_create),
safety: MigrationSafety::Safe,
risk: MigrationRisk::None,
risk_detail: None,
});
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::CreateIndex,
schema: schema.clone(),
table: Some(format!("{}_history", new_table.name)),
object: format!("{}_history_{}_idx", new_table.name, pk_col),
object_type: "index".into(),
description: format!(
"Create index on history table \"{}\".\"{}\" ({pk_col}, _version DESC)",
schema, new_table.name
),
ddl: Some(history_index_ddl(&schema, &new_table.name, &pk_col)),
safety: MigrationSafety::Safe,
risk: MigrationRisk::None,
risk_detail: None,
});
}
}
for new_table in &new.tables {
if added_table_ids.contains(new_table.id.as_str()) {
continue;
}
if let Some(old_table) = old_tables.get(new_table.id.as_str()) {
let sid = new_table.schema_id.as_deref().unwrap_or(default_new_sid);
let schema = schema_name_for(sid, &new_schemas);
let cols = cols_by_table
.get(new_table.id.as_str())
.map(|v| v.as_slice())
.unwrap_or(&[]);
if !old_table.audit_log && new_table.audit_log {
let audit_ddl = audit_table_ddl(&schema, &new_table.name, cols, dialect);
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::CreateTable,
schema: schema.clone(),
table: Some(format!("{}_audit", new_table.name)),
object: format!("{}_audit", new_table.name),
object_type: "table".into(),
description: format!(
"Enable audit log: create \"{}\".\"{}_audit\"",
schema, new_table.name
),
ddl: Some(audit_ddl),
safety: MigrationSafety::Safe,
risk: MigrationRisk::None,
risk_detail: None,
});
}
let old_versioning_enabled = old_table.versioning.as_ref().is_some_and(|v| v.enabled);
let new_versioning_enabled = new_table.versioning.as_ref().is_some_and(|v| v.enabled);
if !old_versioning_enabled && new_versioning_enabled {
let pk_col = match &new_table.primary_key {
PrimaryKeyConfig::Single(s) => s.clone(),
PrimaryKeyConfig::Composite(v) => v[0].clone(),
};
let history_ddl =
history_table_ddl(&schema, &new_table.name, &pk_col, cols, dialect);
let create_only = history_ddl
.lines()
.take_while(|l| !l.trim_start().starts_with("-- index:"))
.collect::<Vec<_>>()
.join("\n");
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::CreateTable,
schema: schema.clone(),
table: Some(format!("{}_history", new_table.name)),
object: format!("{}_history", new_table.name),
object_type: "table".into(),
description: format!(
"Enable versioning: create \"{}\".\"{}_history\"",
schema, new_table.name
),
ddl: Some(create_only.trim().to_string()),
safety: MigrationSafety::Safe,
risk: MigrationRisk::None,
risk_detail: None,
});
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::CreateIndex,
schema: schema.clone(),
table: Some(format!("{}_history", new_table.name)),
object: format!("{}_history_{}_idx", new_table.name, pk_col),
object_type: "index".into(),
description: format!(
"Create history index on \"{}\".\"{}\"",
schema, new_table.name
),
ddl: Some(history_index_ddl(&schema, &new_table.name, &pk_col)),
safety: MigrationSafety::Safe,
risk: MigrationRisk::None,
risk_detail: None,
});
}
}
}
for old_table in &old.tables {
if !new_tables.contains_key(old_table.id.as_str()) {
let sid = old_table.schema_id.as_deref().unwrap_or(default_old_sid);
let schema = schema_name_for(sid, &old_schemas);
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::DropTable,
schema: schema.clone(),
table: Some(old_table.name.clone()),
object: old_table.name.clone(),
object_type: "table".into(),
description: format!("Table \"{}\".\"{}\" removed from config", schema, old_table.name),
ddl: None,
safety: MigrationSafety::WarnOnly,
risk: MigrationRisk::ManualActionRequired,
risk_detail: Some("Table NOT dropped from database (data safety). Run DROP TABLE manually if intended.".into()),
});
}
}
for new_col in &new.columns {
if added_table_ids.contains(new_col.table_id.as_str()) {
continue;
}
let table = match new_tables.get(new_col.table_id.as_str()) {
Some(t) => t,
None => continue,
};
let sid = table.schema_id.as_deref().unwrap_or(default_new_sid);
let schema = schema_name_for(sid, &new_schemas);
let full = format!("{}.{}", quote(&schema), quote(&table.name));
if let Some(old_col) = old_columns.get(new_col.id.as_str()) {
if old_col.table_id != new_col.table_id {
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::AddColumn,
schema: schema.clone(),
table: Some(table.name.clone()),
object: new_col.name.clone(),
object_type: "column".into(),
description: format!("Column \"{}\" (id: {}) appears to have moved tables — manual migration required", new_col.name, new_col.id),
ddl: None,
safety: MigrationSafety::WarnOnly,
risk: MigrationRisk::ManualActionRequired,
risk_detail: Some(format!("Cannot automate column move from table {} to {}.", old_col.table_id, new_col.table_id)),
});
continue;
}
if old_col.name != new_col.name {
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::RenameColumn,
schema: schema.clone(),
table: Some(table.name.clone()),
object: new_col.name.clone(),
object_type: "column".into(),
description: format!(
"Rename column \"{}\" → \"{}\" on \"{}\".\"{}\"",
old_col.name, new_col.name, schema, table.name
),
ddl: Some(format!(
"ALTER TABLE {} RENAME COLUMN {} TO {}",
full,
quote(&old_col.name),
quote(&new_col.name)
)),
safety: MigrationSafety::Safe,
risk: MigrationRisk::None,
risk_detail: None,
});
}
let old_type = dialect.ddl_type(&parse_canonical(&old_col.type_));
let new_type = dialect.ddl_type(&parse_canonical(&new_col.type_));
if old_type.to_uppercase() != new_type.to_uppercase() {
let col_name = &new_col.name;
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::AlterColumnType,
schema: schema.clone(),
table: Some(table.name.clone()),
object: col_name.clone(),
object_type: "column".into(),
description: format!("Change type of \"{}\".\"{}\".\"{}\": {} → {}", schema, table.name, col_name, old_type, new_type),
ddl: Some(format!("ALTER TABLE {} ALTER COLUMN {} TYPE {} USING {}::{}", full, quote(col_name), new_type, quote(col_name), new_type)),
safety: MigrationSafety::BestEffort,
risk: MigrationRisk::MayFail,
risk_detail: Some(format!("USING {}::{} cast may fail for incompatible values. Provide a custom USING expression if needed.", col_name, new_type)),
});
}
if old_col.nullable && !new_col.nullable {
if let Some(ref d) = new_col.default {
let default_val = default_str(d);
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::BackfillNulls,
schema: schema.clone(),
table: Some(table.name.clone()),
object: new_col.name.clone(),
object_type: "column".into(),
description: format!("Backfill NULLs in \"{}\".\"{}\".\"{}\": SET {} = {} WHERE {} IS NULL", schema, table.name, new_col.name, new_col.name, default_val, new_col.name),
ddl: Some(format!("UPDATE {} SET {} = {} WHERE {} IS NULL", full, quote(&new_col.name), default_val, quote(&new_col.name))),
safety: MigrationSafety::Safe,
risk: MigrationRisk::DataWillBeModified,
risk_detail: Some(format!("Existing NULLs in column \"{}\" will be set to {} before NOT NULL is enforced.", new_col.name, default_val)),
});
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::SetNotNull,
schema: schema.clone(),
table: Some(table.name.clone()),
object: new_col.name.clone(),
object_type: "column".into(),
description: format!("Set NOT NULL on \"{}\".\"{}\".\"{}\": NULLs pre-filled with default ({})", schema, table.name, new_col.name, default_val),
ddl: Some(format!("ALTER TABLE {} ALTER COLUMN {} SET NOT NULL", full, quote(&new_col.name))),
safety: MigrationSafety::Safe,
risk: MigrationRisk::None,
risk_detail: None,
});
} else {
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::SetNotNull,
schema: schema.clone(),
table: Some(table.name.clone()),
object: new_col.name.clone(),
object_type: "column".into(),
description: format!("Set NOT NULL on \"{}\".\"{}\".\"{}\": no default configured — will fail if NULLs exist", schema, table.name, new_col.name),
ddl: Some(format!("ALTER TABLE {} ALTER COLUMN {} SET NOT NULL", full, quote(&new_col.name))),
safety: MigrationSafety::BestEffort,
risk: MigrationRisk::ExistingNullsMustBeAbsent,
risk_detail: Some(format!(
"No default value configured for column \"{}\". Add a default to the config to enable automatic NULL backfill before enforcing NOT NULL.",
new_col.name
)),
});
}
}
if !old_col.nullable && new_col.nullable {
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::DropNotNull,
schema: schema.clone(),
table: Some(table.name.clone()),
object: new_col.name.clone(),
object_type: "column".into(),
description: format!(
"Drop NOT NULL on \"{}\".\"{}\".\"{}\": column becomes nullable",
schema, table.name, new_col.name
),
ddl: Some(format!(
"ALTER TABLE {} ALTER COLUMN {} DROP NOT NULL",
full,
quote(&new_col.name)
)),
safety: MigrationSafety::Safe,
risk: MigrationRisk::None,
risk_detail: None,
});
}
let old_def = old_col.default.as_ref().map(default_str);
let new_def = new_col.default.as_ref().map(default_str);
if old_def != new_def {
match &new_col.default {
Some(d) => {
let val = default_str(d);
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::SetDefault,
schema: schema.clone(),
table: Some(table.name.clone()),
object: new_col.name.clone(),
object_type: "column".into(),
description: format!(
"Set DEFAULT {} on \"{}\".\"{}\".\"{}\": was {}",
val,
schema,
table.name,
new_col.name,
old_def.as_deref().unwrap_or("none")
),
ddl: Some(format!(
"ALTER TABLE {} ALTER COLUMN {} SET DEFAULT {}",
full,
quote(&new_col.name),
val
)),
safety: MigrationSafety::Safe,
risk: MigrationRisk::None,
risk_detail: None,
});
}
None => {
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::DropDefault,
schema: schema.clone(),
table: Some(table.name.clone()),
object: new_col.name.clone(),
object_type: "column".into(),
description: format!(
"Drop DEFAULT on \"{}\".\"{}\".\"{}\": was {}",
schema,
table.name,
new_col.name,
old_def.as_deref().unwrap_or("none")
),
ddl: Some(format!(
"ALTER TABLE {} ALTER COLUMN {} DROP DEFAULT",
full,
quote(&new_col.name)
)),
safety: MigrationSafety::Safe,
risk: MigrationRisk::None,
risk_detail: None,
});
}
}
}
} else {
let new_type = dialect.ddl_type(&parse_canonical(&new_col.type_));
let mut col_def = format!("{} {}", quote(&new_col.name), new_type);
if !new_col.nullable {
col_def.push_str(" NOT NULL");
}
if let Some(ref d) = new_col.default {
col_def.push_str(" DEFAULT ");
match d {
ColumnDefaultConfig::Literal(s) => col_def.push_str(s),
ColumnDefaultConfig::Expression { expression } => col_def.push_str(expression),
}
}
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::AddColumn,
schema: schema.clone(),
table: Some(table.name.clone()),
object: new_col.name.clone(),
object_type: "column".into(),
description: format!(
"Add column \"{}\" {} to \"{}\".\"{}\"",
new_col.name, new_type, schema, table.name
),
ddl: Some(format!("ALTER TABLE {} ADD COLUMN {}", full, col_def)),
safety: MigrationSafety::Safe,
risk: MigrationRisk::None,
risk_detail: None,
});
}
}
for old_col in &old.columns {
if new.columns.iter().any(|c| c.id == old_col.id) {
continue;
}
if !new_tables.contains_key(old_col.table_id.as_str()) {
continue;
}
let table_name = old_tables
.get(old_col.table_id.as_str())
.map(|t| t.name.as_str())
.unwrap_or(&old_col.table_id);
let sid = old_tables
.get(old_col.table_id.as_str())
.and_then(|t| t.schema_id.as_deref())
.unwrap_or(default_old_sid);
let schema = schema_name_for(sid, &old_schemas);
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::DropColumn,
schema: schema.clone(),
table: Some(table_name.to_string()),
object: old_col.name.clone(),
object_type: "column".into(),
description: format!("Column \"{}\" removed from config on \"{}\".\"{}\"", old_col.name, schema, table_name),
ddl: None,
safety: MigrationSafety::WarnOnly,
risk: MigrationRisk::ManualActionRequired,
risk_detail: Some("Column NOT dropped from database (data safety). Run ALTER TABLE DROP COLUMN manually if intended.".into()),
});
}
for old_idx in &old.indexes {
if !new_indexes.contains_key(old_idx.id.as_str()) {
let sid = old_idx.schema_id.as_deref().unwrap_or(default_old_sid);
let schema = schema_name_for(sid, &old_schemas);
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::DropIndex,
schema: schema.clone(),
table: old_tables
.get(old_idx.table_id.as_str())
.map(|t| t.name.clone()),
object: old_idx.name.clone(),
object_type: "index".into(),
description: format!("Drop index \"{}\" in schema \"{}\"", old_idx.name, schema),
ddl: Some(format!(
"DROP INDEX IF EXISTS {}.{}",
quote(&schema),
quote(&old_idx.name)
)),
safety: MigrationSafety::Safe,
risk: MigrationRisk::None,
risk_detail: None,
});
}
}
for new_idx in &new.indexes {
if old_indexes.contains_key(new_idx.id.as_str())
|| added_table_ids.contains(new_idx.table_id.as_str())
{
continue;
}
let sid = new_idx.schema_id.as_deref().unwrap_or(default_new_sid);
let schema = match new_schemas.get(sid) {
Some(s) => schema_override.unwrap_or(&s.name).to_string(),
None => continue,
};
let table = match new_tables.get(new_idx.table_id.as_str()) {
Some(t) => t,
None => continue,
};
let full_table = format!("{}.{}", quote(&schema), quote(&table.name));
let mut col_parts: Vec<String> = Vec::new();
for col in &new_idx.columns {
match col {
IndexColumnEntry::Name(n) => col_parts.push(quote(n)),
IndexColumnEntry::Spec {
name, direction, ..
} => {
let dir = direction
.as_deref()
.map(|d| format!(" {}", d.to_uppercase()))
.unwrap_or_default();
col_parts.push(format!("{}{}", quote(name), dir));
}
IndexColumnEntry::Expression { expression } => col_parts.push(expression.clone()),
}
}
let method = new_idx.method.as_deref().unwrap_or("btree");
let unique_kw = if new_idx.unique { "UNIQUE " } else { "" };
let include = if new_idx.include.is_empty() {
String::new()
} else {
format!(
" INCLUDE ({})",
new_idx
.include
.iter()
.map(|s| quote(s))
.collect::<Vec<_>>()
.join(", ")
)
};
let where_clause = new_idx
.where_
.as_ref()
.map(|w| format!(" WHERE {}", w))
.unwrap_or_default();
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::CreateIndex,
schema: schema.clone(),
table: Some(table.name.clone()),
object: new_idx.name.clone(),
object_type: "index".into(),
description: format!(
"Create {}index \"{}\" on \"{}\".\"{}\"",
if new_idx.unique { "unique " } else { "" },
new_idx.name,
schema,
table.name
),
ddl: Some(format!(
"CREATE {}INDEX IF NOT EXISTS {} ON {} USING {} ({}){}{}",
unique_kw,
quote(&new_idx.name),
full_table,
method,
col_parts.join(", "),
include,
where_clause
)),
safety: MigrationSafety::Safe,
risk: MigrationRisk::None,
risk_detail: None,
});
}
for old_rel in &old.relationships {
if !new_rels.contains_key(old_rel.id.as_str()) {
let from_sid_fallback = old_rel.from_schema_id.as_deref().unwrap_or(default_old_sid);
let from_schema = old_schemas
.get(from_sid_fallback)
.map(|s| s.name.as_str())
.unwrap_or(from_sid_fallback);
let from_table = old_tables
.get(old_rel.from_table_id.as_str())
.map(|t| t.name.as_str())
.unwrap_or(&old_rel.from_table_id);
let constraint = old_rel.name.as_deref().unwrap_or(&old_rel.id);
let schema_q = quote(schema_override.unwrap_or(from_schema));
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::DropForeignKey,
schema: schema_override.unwrap_or(from_schema).to_string(),
table: Some(from_table.to_string()),
object: constraint.to_string(),
object_type: "foreign_key".into(),
description: format!(
"Drop FK \"{}\" from \"{}\".\"{}\"",
constraint,
schema_override.unwrap_or(from_schema),
from_table
),
ddl: Some(format!(
"ALTER TABLE {}.{} DROP CONSTRAINT IF EXISTS {}",
schema_q,
quote(from_table),
quote(constraint)
)),
safety: MigrationSafety::Safe,
risk: MigrationRisk::None,
risk_detail: None,
});
}
}
for new_rel in &new.relationships {
if old_rels.contains_key(new_rel.id.as_str())
|| added_table_ids.contains(new_rel.from_table_id.as_str())
|| added_table_ids.contains(new_rel.to_table_id.as_str())
{
continue;
}
let from_sid = new_rel.from_schema_id.as_deref().unwrap_or(default_new_sid);
let from_schema = match new_schemas.get(from_sid) {
Some(s) => s,
None => continue,
};
let from_table = match new_tables.get(new_rel.from_table_id.as_str()) {
Some(t) => t,
None => continue,
};
let from_col = new
.columns
.iter()
.find(|c| c.id == new_rel.from_column_id)
.map(|c| c.name.clone())
.unwrap_or_else(|| new_rel.from_column_id.clone());
let (to_schema_name, to_table_name, to_col) =
if let Some(pkg_id) = new_rel.to_package_id.as_deref() {
match cross_package_configs.get(pkg_id) {
Some(foreign) => {
let foreign_tables: HashMap<_, _> =
foreign.tables.iter().map(|t| (t.id.as_str(), t)).collect();
let foreign_schemas: HashMap<_, _> =
foreign.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
let foreign_default_sid =
foreign.schemas.first().map(|s| s.id.as_str()).unwrap_or("");
let to_sid = new_rel
.to_schema_id
.as_deref()
.unwrap_or(foreign_default_sid);
let tbl = match foreign_tables.get(new_rel.to_table_id.as_str()) {
Some(t) => t,
None => continue,
};
let schema = match foreign_schemas.get(to_sid) {
Some(s) => s,
None => continue,
};
let col = foreign
.columns
.iter()
.find(|c| c.id == new_rel.to_column_id)
.map(|c| c.name.clone())
.unwrap_or_else(|| new_rel.to_column_id.clone());
(schema.name.clone(), tbl.name.clone(), col)
}
None => continue,
}
} else {
let to_sid = new_rel.to_schema_id.as_deref().unwrap_or(default_new_sid);
let to_schema = match new_schemas.get(to_sid) {
Some(s) => s,
None => continue,
};
let to_table = match new_tables.get(new_rel.to_table_id.as_str()) {
Some(t) => t,
None => continue,
};
let col = new
.columns
.iter()
.find(|c| c.id == new_rel.to_column_id)
.map(|c| c.name.clone())
.unwrap_or_else(|| new_rel.to_column_id.clone());
(
schema_override.unwrap_or(&to_schema.name).to_string(),
to_table.name.clone(),
col,
)
};
let from_schema_str = schema_override.unwrap_or(&from_schema.name);
let from_q = format!("{}.{}", quote(from_schema_str), quote(&from_table.name));
let to_q = format!("{}.{}", quote(&to_schema_name), quote(&to_table_name));
let constraint = new_rel.name.as_deref().unwrap_or(&new_rel.id);
let on_update = new_rel.on_update.as_deref().unwrap_or("NO ACTION");
let on_delete = new_rel.on_delete.as_deref().unwrap_or("NO ACTION");
steps.push(MigrationStep {
step: 0,
operation: MigrationOperation::AddForeignKey,
schema: from_schema_str.to_string(),
table: Some(from_table.name.clone()),
object: constraint.to_string(),
object_type: "foreign_key".into(),
description: format!(
"Add FK \"{}\" on \"{}\".\"{}\" → \"{}\".\"{}\"",
constraint, from_schema_str, from_table.name, to_schema_name, to_table_name
),
ddl: Some(format!(
"ALTER TABLE {} ADD CONSTRAINT {} FOREIGN KEY ({}) REFERENCES {} ({}) ON UPDATE {} ON DELETE {}",
from_q, quote(constraint), quote(&from_col), to_q, quote(&to_col), on_update, on_delete
)),
safety: MigrationSafety::BestEffort,
risk: MigrationRisk::None,
risk_detail: Some("PostgreSQL has no ADD CONSTRAINT IF NOT EXISTS; ignored if constraint already exists.".into()),
});
}
for (i, s) in steps.iter_mut().enumerate() {
s.step = i + 1;
}
Ok(MigrationPlan { steps })
}
#[allow(clippy::too_many_arguments)]
pub async fn execute_migration_plan(
migration_pool: &Pool,
config_pool: &Pool,
plan: &MigrationPlan,
migration_plan_id: &str,
package_id: &str,
tenant_id: &str,
from_version: Option<&str>,
to_version: &str,
) -> Result<MigrationExecutionResult, AppError> {
let mut applied = 0usize;
let mut warned = 0usize;
let mut warnings: Vec<String> = Vec::new();
for step in &plan.steps {
let op = step.operation.to_string();
let safety_str = format!("{:?}", step.safety);
let risk_str = format!("{:?}", step.risk);
match step.safety {
MigrationSafety::WarnOnly => {
let msg = step
.risk_detail
.clone()
.unwrap_or_else(|| step.description.clone());
tracing::warn!(step = step.step, %op, "migration plan warning (no DDL)");
warnings.push(format!("[Step {}] {}", step.step, msg));
let _ = crate::store::insert_migration_audit(
config_pool,
migration_plan_id,
package_id,
tenant_id,
from_version,
to_version,
step.step as i32,
&op,
&step.schema,
step.table.as_deref(),
&step.object,
&step.object_type,
&step.description,
step.ddl.as_deref(),
&safety_str,
&risk_str,
"skipped",
None,
)
.await;
warned += 1;
}
MigrationSafety::Safe | MigrationSafety::BestEffort => {
if let Some(ref sql) = step.ddl {
tracing::info!(step = step.step, %op, %sql, "executing migration step");
match sqlx::query(sql).execute(migration_pool).await {
Ok(_) => {
let _ = crate::store::insert_migration_audit(
config_pool,
migration_plan_id,
package_id,
tenant_id,
from_version,
to_version,
step.step as i32,
&op,
&step.schema,
step.table.as_deref(),
&step.object,
&step.object_type,
&step.description,
step.ddl.as_deref(),
&safety_str,
&risk_str,
"applied",
None,
)
.await;
applied += 1;
}
Err(e) => {
let err_str = e.to_string();
if matches!(step.safety, MigrationSafety::BestEffort) {
tracing::warn!(step = step.step, %op, error = %e, "migration step failed (best-effort, continuing)");
let msg = format!(
"[Step {}] {} — Error: {}",
step.step, step.description, err_str
);
warnings.push(msg);
let _ = crate::store::insert_migration_audit(
config_pool,
migration_plan_id,
package_id,
tenant_id,
from_version,
to_version,
step.step as i32,
&op,
&step.schema,
step.table.as_deref(),
&step.object,
&step.object_type,
&step.description,
step.ddl.as_deref(),
&safety_str,
&risk_str,
"warned",
Some(&err_str),
)
.await;
warned += 1;
} else {
let _ = crate::store::insert_migration_audit(
config_pool,
migration_plan_id,
package_id,
tenant_id,
from_version,
to_version,
step.step as i32,
&op,
&step.schema,
step.table.as_deref(),
&step.object,
&step.object_type,
&step.description,
step.ddl.as_deref(),
&safety_str,
&risk_str,
"failed",
Some(&err_str),
)
.await;
return Err(AppError::Db(e));
}
}
}
}
}
}
}
Ok(MigrationExecutionResult {
applied,
warned,
warnings,
})
}
pub fn history_table_ddl(
schema_name: &str,
table_name: &str,
pk_col: &str,
source_cols: &[&ColumnConfig],
dialect: &dyn Dialect,
) -> String {
let history_name = format!("{}_history", table_name);
let history_full = format!("{}.{}", quote(schema_name), quote(&history_name));
let mut col_defs: Vec<String> = Vec::new();
col_defs.push(format!(
"{} {} NOT NULL DEFAULT {}",
quote("_history_id"),
"UUID",
dialect.uuid_default_expr()
));
col_defs.push(format!("{} BIGINT NOT NULL", quote("_version")));
col_defs.push(format!("{} TEXT NOT NULL", quote("_operation")));
col_defs.push(format!(
"{} {} NOT NULL DEFAULT {}",
quote("_recorded_at"),
dialect.audit_timestamp_type(),
dialect.now_fn()
));
col_defs.push(format!(
"{} {}",
quote("_valid_from"),
dialect.audit_timestamp_type()
));
col_defs.push(format!(
"{} {}",
quote("_valid_to"),
dialect.audit_timestamp_type()
));
let config_col_names: HashSet<&str> = source_cols.iter().map(|c| c.name.as_str()).collect();
for c in source_cols {
let typ = dialect.ddl_type(&parse_canonical(&c.type_));
col_defs.push(format!("{} {}", quote(&c.name), typ));
}
let audit_ts = dialect.audit_timestamp_type();
for (name, typ) in [
("created_at", audit_ts),
("updated_at", audit_ts),
("archived_at", audit_ts),
("created_by", "TEXT"),
("updated_by", "TEXT"),
] {
if !config_col_names.contains(name) {
col_defs.push(format!("{} {}", quote(name), typ));
}
}
col_defs.push(format!("PRIMARY KEY ({})", quote("_history_id")));
let history_full_quoted = format!("{}.{}", quote(schema_name), quote(&history_name));
let idx_sql = format!(
"-- index: CREATE INDEX IF NOT EXISTS {} ON {} ({}, {})",
quote(&format!("{}_history_{}_idx", table_name, pk_col)),
history_full_quoted,
quote(pk_col),
quote("_version")
);
format!(
"CREATE TABLE IF NOT EXISTS {} (\n {}\n)\n{}",
history_full,
col_defs.join(",\n "),
idx_sql
)
}
fn history_index_ddl(schema_name: &str, table_name: &str, pk_col: &str) -> String {
format!(
"CREATE INDEX IF NOT EXISTS {} ON {}.{} ({}, {} DESC)",
quote(&format!("{}_history_{}_idx", table_name, pk_col)),
quote(schema_name),
quote(&format!("{}_history", table_name)),
quote(pk_col),
quote("_version")
)
}
fn audit_table_ddl(
schema_name: &str,
table_name: &str,
source_cols: &[&ColumnConfig],
dialect: &dyn Dialect,
) -> String {
let audit_name = format!("{}_audit", table_name);
let audit_full = format!("{}.{}", quote(schema_name), quote(&audit_name));
let mut col_defs: Vec<String> = Vec::new();
col_defs.push(format!(
"{} {} NOT NULL DEFAULT {}",
quote("audit_id"),
"UUID",
dialect.uuid_default_expr()
));
col_defs.push(format!("{} TEXT NOT NULL", quote("audit_action")));
col_defs.push(format!(
"{} {} NOT NULL DEFAULT {}",
quote("audit_at"),
dialect.audit_timestamp_type(),
dialect.now_fn()
));
col_defs.push(format!("{} TEXT", quote("audit_by")));
col_defs.push(format!(
"{} {}",
quote("changed_fields"),
dialect.sys_json_type()
));
let config_col_names: HashSet<&str> = source_cols.iter().map(|c| c.name.as_str()).collect();
for c in source_cols {
let typ = dialect.ddl_type(&parse_canonical(&c.type_));
col_defs.push(format!("{} {}", quote(&c.name), typ));
}
let audit_ts = dialect.audit_timestamp_type();
for (name, typ) in [
("created_at", audit_ts),
("updated_at", audit_ts),
("archived_at", audit_ts),
("created_by", "TEXT"),
("updated_by", "TEXT"),
] {
if !config_col_names.contains(name) {
col_defs.push(format!("{} {}", quote(name), typ));
}
}
col_defs.push(format!("PRIMARY KEY ({})", quote("audit_id")));
format!(
"CREATE TABLE IF NOT EXISTS {} (\n {}\n)",
audit_full,
col_defs.join(",\n ")
)
}
#[cfg(test)]
mod enum_recreate_tests {
use super::*;
fn schema(id: &str, name: &str) -> SchemaConfig {
SchemaConfig {
id: id.into(),
name: name.into(),
comment: None,
}
}
fn table(id: &str, name: &str, schema_id: &str) -> TableConfig {
TableConfig {
id: id.into(),
schema_id: Some(schema_id.into()),
name: name.into(),
comment: None,
primary_key: PrimaryKeyConfig::Single("id".into()),
unique: vec![],
check: vec![],
audit_log: false,
versioning: None,
}
}
fn col(id: &str, table_id: &str, name: &str, ty: &str, default: Option<&str>) -> ColumnConfig {
ColumnConfig {
id: id.into(),
table_id: table_id.into(),
name: name.into(),
type_: ColumnTypeConfig::Simple(ty.into()),
nullable: true,
default: default.map(|d| ColumnDefaultConfig::Literal(d.into())),
comment: None,
asset: None,
}
}
fn enum_cfg(id: &str, name: &str, schema_id: &str, values: &[&str]) -> EnumConfig {
EnumConfig {
id: id.into(),
schema_id: Some(schema_id.into()),
name: name.into(),
values: values.iter().map(|s| s.to_string()).collect(),
comment: None,
}
}
fn ddls(steps: &[MigrationStep]) -> Vec<String> {
steps.iter().filter_map(|s| s.ddl.clone()).collect()
}
#[test]
fn finds_scalar_and_array_dependent_columns() {
let mut cfg = FullConfig::default();
cfg.schemas = vec![schema("s1", "app")];
cfg.tables = vec![table("t_orders", "orders", "s1")];
cfg.columns = vec![
col(
"c1",
"t_orders",
"status",
"order_status",
Some("'pending'"),
),
col("c2", "t_orders", "tags", "order_status[]", None),
col("c3", "t_orders", "name", "text", None), ];
let e = enum_cfg("e1", "order_status", "s1", &["pending", "shipped"]);
let new_tables: HashMap<&str, &TableConfig> =
cfg.tables.iter().map(|t| (t.id.as_str(), t)).collect();
let new_schemas: HashMap<&str, &SchemaConfig> =
cfg.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
let deps = enum_dependent_columns(&e, &cfg, &new_tables, &new_schemas, None);
assert_eq!(deps.len(), 2);
let scalar = deps.iter().find(|d| d.column == "status").unwrap();
assert_eq!(scalar.schema, "app");
assert_eq!(scalar.table, "orders");
assert!(!scalar.is_array);
assert_eq!(scalar.default.as_deref(), Some("'pending'"));
let arr = deps.iter().find(|d| d.column == "tags").unwrap();
assert!(arr.is_array);
assert!(arr.default.is_none());
}
#[test]
fn recreate_sequence_emits_rename_create_recast_drop() {
let e = enum_cfg("e1", "order_status", "s1", &["pending", "shipped"]);
let deps = vec![
EnumColumnRef {
schema: "app".into(),
table: "orders".into(),
column: "status".into(),
default: Some("'pending'".into()),
is_array: false,
},
EnumColumnRef {
schema: "app".into(),
table: "orders".into(),
column: "tags".into(),
default: None,
is_array: true,
},
];
let mut steps = Vec::new();
recreate_enum_steps(&mut steps, "app", &e, &["cancelled"], &deps);
let sql = ddls(&steps);
assert!(matches!(steps[0].safety, MigrationSafety::WarnOnly));
assert!(steps[0].ddl.is_none());
assert_eq!(
sql[0],
r#"ALTER TYPE "app"."order_status" RENAME TO "order_status__arch_old""#
);
assert_eq!(
sql[1],
r#"CREATE TYPE "app"."order_status" AS ENUM ('pending', 'shipped')"#
);
assert_eq!(
sql[2],
r#"ALTER TABLE "app"."orders" ALTER COLUMN "status" DROP DEFAULT"#
);
assert_eq!(
sql[3],
r#"ALTER TABLE "app"."orders" ALTER COLUMN "status" TYPE "app"."order_status" USING "status"::text::"app"."order_status""#
);
assert_eq!(
sql[4],
r#"ALTER TABLE "app"."orders" ALTER COLUMN "status" SET DEFAULT 'pending'"#
);
assert_eq!(
sql[5],
r#"ALTER TABLE "app"."orders" ALTER COLUMN "tags" TYPE "app"."order_status"[] USING "tags"::text[]::"app"."order_status"[]"#
);
assert_eq!(
*sql.last().unwrap(),
r#"DROP TYPE IF EXISTS "app"."order_status__arch_old""#
);
}
}