use std::fmt::Write as _;
use serde::{Deserialize, Serialize};
use super::snapshot::{FieldSnapshot, SchemaSnapshot, TableSnapshot};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum SchemaChange {
CreateTable(String ),
DropTable(String ),
AddColumn {
table: String,
column: String,
},
DropColumn {
table: String,
column: String,
},
AlterColumnType {
table: String,
column: String,
from: String,
to: String,
},
AlterColumnNullable {
table: String,
column: String,
nullable: bool,
},
AlterColumnDefault {
table: String,
column: String,
from: Option<String>,
to: Option<String>,
},
AlterColumnMaxLength {
table: String,
column: String,
from: Option<u32>,
to: Option<u32>,
},
RenameTable {
old_name: String,
new_name: String,
},
RenameColumn {
table: String,
old_column: String,
new_column: String,
},
AlterColumnUnique {
table: String,
column: String,
unique: bool,
},
CreateIndex {
name: String,
table: String,
columns: Vec<String>,
unique: bool,
},
DropIndex {
name: String,
},
AddCheckConstraint {
name: String,
table: String,
expr: String,
},
DropCheckConstraint {
name: String,
table: String,
},
CreateM2MTable {
through: String,
src_table: String,
src_col: String,
dst_table: String,
dst_col: String,
},
DropM2MTable {
through: String,
},
AddCompositeFk {
table: String,
name: String,
to: String,
from: Vec<String>,
on: Vec<String>,
},
DropCompositeFk {
table: String,
name: String,
},
}
#[must_use]
pub fn detect_changes(prev: &SchemaSnapshot, current: &SchemaSnapshot) -> Vec<SchemaChange> {
let mut changes = Vec::new();
for t in ¤t.tables {
if prev.table(&t.name).is_none() {
changes.push(SchemaChange::CreateTable(t.name.clone()));
}
}
for t in ¤t.tables {
let Some(pt) = prev.table(&t.name) else {
continue;
};
for f in &t.fields {
if pt.field(&f.column).is_none() {
changes.push(SchemaChange::AddColumn {
table: t.name.clone(),
column: f.column.clone(),
});
}
}
}
for ct in ¤t.tables {
let Some(pt) = prev.table(&ct.name) else {
continue;
};
for cf in &ct.fields {
let Some(pf) = pt.field(&cf.column) else {
continue;
};
push_alter_changes(&ct.name, pf, cf, &mut changes);
}
}
for pt in &prev.tables {
let Some(t) = current.table(&pt.name) else {
continue;
};
for f in &pt.fields {
if t.field(&f.column).is_none() {
changes.push(SchemaChange::DropColumn {
table: pt.name.clone(),
column: f.column.clone(),
});
}
}
}
for pt in &prev.tables {
if current.table(&pt.name).is_none() {
changes.push(SchemaChange::DropTable(pt.name.clone()));
}
}
for idx in ¤t.indexes {
if prev.index(&idx.name).is_none() {
changes.push(SchemaChange::CreateIndex {
name: idx.name.clone(),
table: idx.table.clone(),
columns: idx.columns.clone(),
unique: idx.unique,
});
}
}
for idx in &prev.indexes {
if current.index(&idx.name).is_none() {
changes.push(SchemaChange::DropIndex {
name: idx.name.clone(),
});
}
}
for idx in ¤t.indexes {
if let Some(prev_idx) = prev.index(&idx.name) {
if prev_idx.columns != idx.columns
|| prev_idx.unique != idx.unique
|| prev_idx.table != idx.table
{
changes.push(SchemaChange::DropIndex {
name: idx.name.clone(),
});
changes.push(SchemaChange::CreateIndex {
name: idx.name.clone(),
table: idx.table.clone(),
columns: idx.columns.clone(),
unique: idx.unique,
});
}
}
}
for c in ¤t.checks {
if prev.check(&c.name).is_none() {
changes.push(SchemaChange::AddCheckConstraint {
name: c.name.clone(),
table: c.table.clone(),
expr: c.expr.clone(),
});
}
}
for c in &prev.checks {
if current.check(&c.name).is_none() {
changes.push(SchemaChange::DropCheckConstraint {
name: c.name.clone(),
table: c.table.clone(),
});
}
}
for mt in ¤t.m2m_tables {
if prev.m2m_table(&mt.through).is_none() {
changes.push(SchemaChange::CreateM2MTable {
through: mt.through.clone(),
src_table: mt.src_table.clone(),
src_col: mt.src_col.clone(),
dst_table: mt.dst_table.clone(),
dst_col: mt.dst_col.clone(),
});
}
}
for mt in &prev.m2m_tables {
if current.m2m_table(&mt.through).is_none() {
changes.push(SchemaChange::DropM2MTable {
through: mt.through.clone(),
});
}
}
for ct in ¤t.tables {
let prev_fks: &[_] = prev
.table(&ct.name)
.map(|t| t.composite_fks.as_slice())
.unwrap_or(&[]);
for cf in &ct.composite_fks {
if !prev_fks.iter().any(|p| p.name == cf.name) {
changes.push(SchemaChange::AddCompositeFk {
table: ct.name.clone(),
name: cf.name.clone(),
to: cf.to.clone(),
from: cf.from.clone(),
on: cf.on.clone(),
});
}
}
}
for pt in &prev.tables {
let Some(ct) = current.table(&pt.name) else {
continue;
};
for pf in &pt.composite_fks {
if !ct.composite_fks.iter().any(|c| c.name == pf.name) {
changes.push(SchemaChange::DropCompositeFk {
table: pt.name.clone(),
name: pf.name.clone(),
});
}
}
}
changes
}
fn push_alter_changes(
table: &str,
pf: &FieldSnapshot,
cf: &FieldSnapshot,
out: &mut Vec<SchemaChange>,
) {
if pf.ty != cf.ty {
out.push(SchemaChange::AlterColumnType {
table: table.to_owned(),
column: cf.column.clone(),
from: pf.ty.clone(),
to: cf.ty.clone(),
});
}
if pf.nullable != cf.nullable {
out.push(SchemaChange::AlterColumnNullable {
table: table.to_owned(),
column: cf.column.clone(),
nullable: cf.nullable,
});
}
if pf.default != cf.default {
out.push(SchemaChange::AlterColumnDefault {
table: table.to_owned(),
column: cf.column.clone(),
from: pf.default.clone(),
to: cf.default.clone(),
});
}
if pf.max_length != cf.max_length {
out.push(SchemaChange::AlterColumnMaxLength {
table: table.to_owned(),
column: cf.column.clone(),
from: pf.max_length,
to: cf.max_length,
});
}
if pf.unique != cf.unique {
out.push(SchemaChange::AlterColumnUnique {
table: table.to_owned(),
column: cf.column.clone(),
unique: cf.unique,
});
}
}
#[must_use]
pub fn detect_unsupported_field_changes(
prev: &SchemaSnapshot,
current: &SchemaSnapshot,
) -> Vec<String> {
let mut out = Vec::new();
for ct in ¤t.tables {
let Some(pt) = prev.table(&ct.name) else {
continue;
};
for cf in &ct.fields {
let Some(pf) = pt.field(&cf.column) else {
continue;
};
push_field_diffs(&ct.name, pf, cf, &mut out);
}
}
out
}
fn push_field_diffs(table: &str, pf: &FieldSnapshot, cf: &FieldSnapshot, out: &mut Vec<String>) {
let col = &cf.column;
if pf.primary_key != cf.primary_key {
out.push(format!(
"`{table}.{col}` primary_key changed: {} → {}",
pf.primary_key, cf.primary_key
));
}
if pf.min != cf.min {
out.push(format!(
"`{table}.{col}` min changed: {:?} → {:?}",
pf.min, cf.min
));
}
if pf.max != cf.max {
out.push(format!(
"`{table}.{col}` max changed: {:?} → {:?}",
pf.max, cf.max
));
}
if pf.fk != cf.fk {
out.push(format!(
"`{table}.{col}` fk changed: {:?} → {:?}",
pf.fk, cf.fk
));
}
if pf.auto != cf.auto {
out.push(format!(
"`{table}.{col}` auto changed: {} → {}",
pf.auto, cf.auto
));
}
}
pub fn render_changes(
changes: &[SchemaChange],
current: &SchemaSnapshot,
) -> Result<Vec<String>, String> {
let RenderedBatch {
mut immediate,
deferred_fks,
} = render_changes_split(changes, current)?;
immediate.extend(deferred_fks);
Ok(immediate)
}
#[derive(Debug, Default)]
pub struct RenderedBatch {
pub immediate: Vec<String>,
pub deferred_fks: Vec<String>,
}
pub fn render_changes_split(
changes: &[SchemaChange],
current: &SchemaSnapshot,
) -> Result<RenderedBatch, String> {
render_changes_split_with_dialect(changes, current, &crate::sql::Postgres)
}
pub fn render_changes_split_with_dialect(
changes: &[SchemaChange],
current: &SchemaSnapshot,
dialect: &dyn crate::sql::Dialect,
) -> Result<RenderedBatch, String> {
render_changes_split_inner(changes, current, dialect)
}
fn render_changes_split_inner(
changes: &[SchemaChange],
current: &SchemaSnapshot,
dialect: &dyn crate::sql::Dialect,
) -> Result<RenderedBatch, String> {
let mut out = RenderedBatch::default();
for change in changes {
match change {
SchemaChange::CreateTable(name) => {
let table = current.table(name).ok_or_else(|| {
format!("CreateTable for `{name}` but no snapshot entry for it")
})?;
out.immediate
.push(create_table_sql_from_snapshot_with_dialect(table, dialect));
out.deferred_fks
.extend(constraints_sql_from_snapshot(table));
}
SchemaChange::DropColumn { table, column } => {
out.immediate
.push(format!(r#"ALTER TABLE "{table}" DROP COLUMN "{column}""#,));
}
SchemaChange::AddColumn { table, column } => {
let t = current.table(table).ok_or_else(|| {
format!("AddColumn for `{table}.{column}` but table missing in snapshot")
})?;
let f = t.field(column).ok_or_else(|| {
format!("AddColumn for `{table}.{column}` but field missing in snapshot")
})?;
if !f.nullable && f.default.is_none() {
return Err(format!(
"AddColumn `{table}.{column}` is NOT NULL with no `default` — \
Postgres can't backfill existing rows. Pick one:\n \
(1) Make the field `Option<…>` — column becomes nullable and existing \
rows get NULL.\n \
(2) Set `#[rustango(default = \"…\")]` so existing rows get the \
default backfill.\n \
(3) (dev iteration / fresh table only) Delete the pending migration \
JSON that emitted this `AddColumn`, then re-run `makemigrations` so \
`{column}` lands in the original `CreateTable` for `{table}` — \
see #84 in the backlog for the full `migrate --squash` proposal.\n \
Note: option (3) requires the column to NOT exist in the database \
yet (i.e. the `CreateTable` migration hasn't been applied, OR you're \
willing to drop and recreate the table). Option (1) or (2) is the \
right fix for any table that has production data.",
));
}
out.immediate.push(add_column_sql(table, f));
}
SchemaChange::DropTable(name) => {
out.immediate
.push(format!(r#"DROP TABLE "{name}" CASCADE"#));
}
SchemaChange::AlterColumnType {
table,
column,
from: _,
to,
} => {
let pg_to = pg_type_for_ty_name(to);
out.immediate.push(format!(
r#"ALTER TABLE "{table}" ALTER COLUMN "{column}" TYPE {pg_to} USING "{column}"::{pg_to}"#,
));
}
SchemaChange::AlterColumnNullable {
table,
column,
nullable,
} => {
let action = if *nullable {
"DROP NOT NULL"
} else {
"SET NOT NULL"
};
out.immediate.push(format!(
r#"ALTER TABLE "{table}" ALTER COLUMN "{column}" {action}"#,
));
}
SchemaChange::AlterColumnDefault {
table,
column,
from: _,
to,
} => match to {
Some(expr) => out.immediate.push(format!(
r#"ALTER TABLE "{table}" ALTER COLUMN "{column}" SET DEFAULT {expr}"#,
)),
None => out.immediate.push(format!(
r#"ALTER TABLE "{table}" ALTER COLUMN "{column}" DROP DEFAULT"#,
)),
},
SchemaChange::AlterColumnMaxLength {
table,
column,
from: _,
to,
} => {
let pg_to = match to {
Some(n) => format!("VARCHAR({n})"),
None => "TEXT".into(),
};
out.immediate.push(format!(
r#"ALTER TABLE "{table}" ALTER COLUMN "{column}" TYPE {pg_to} USING "{column}"::{pg_to}"#,
));
}
SchemaChange::AlterColumnUnique {
table,
column,
unique,
} => {
if *unique {
out.immediate.push(format!(
r#"ALTER TABLE "{table}" ADD CONSTRAINT "{table}_{column}_key" UNIQUE ("{column}")"#,
));
} else {
out.immediate.push(format!(
r#"ALTER TABLE "{table}" DROP CONSTRAINT "{table}_{column}_key""#,
));
}
}
SchemaChange::RenameTable { old_name, new_name } => {
out.immediate.push(format!(
r#"ALTER TABLE "{old_name}" RENAME TO "{new_name}""#,
));
}
SchemaChange::RenameColumn {
table,
old_column,
new_column,
} => {
out.immediate.push(format!(
r#"ALTER TABLE "{table}" RENAME COLUMN "{old_column}" TO "{new_column}""#,
));
}
SchemaChange::CreateIndex {
name,
table,
columns,
unique,
} => {
let unique_kw = if *unique { "UNIQUE " } else { "" };
let cols = columns
.iter()
.map(|c| format!(r#""{c}""#))
.collect::<Vec<_>>()
.join(", ");
out.immediate.push(format!(
r#"CREATE {unique_kw}INDEX IF NOT EXISTS "{name}" ON "{table}" ({cols})"#,
));
}
SchemaChange::DropIndex { name } => {
out.immediate
.push(format!(r#"DROP INDEX IF EXISTS "{name}""#));
}
SchemaChange::AddCheckConstraint { name, table, expr } => {
out.immediate.push(format!(
r#"ALTER TABLE "{table}" ADD CONSTRAINT "{name}" CHECK ({expr})"#,
));
}
SchemaChange::DropCheckConstraint { name, table } => {
out.immediate.push(format!(
r#"ALTER TABLE "{table}" DROP CONSTRAINT IF EXISTS "{name}""#,
));
}
SchemaChange::CreateM2MTable {
through,
src_table,
src_col,
dst_table,
dst_col,
} => {
out.immediate.push(format!(
r#"CREATE TABLE "{through}" ("{src_col}" BIGINT NOT NULL, "{dst_col}" BIGINT NOT NULL, PRIMARY KEY ("{src_col}", "{dst_col}"))"#,
));
out.deferred_fks.push(format!(
r#"ALTER TABLE "{through}" ADD CONSTRAINT "{through}_{src_col}_fkey" FOREIGN KEY ("{src_col}") REFERENCES "{src_table}" ("id") ON DELETE CASCADE"#,
));
out.deferred_fks.push(format!(
r#"ALTER TABLE "{through}" ADD CONSTRAINT "{through}_{dst_col}_fkey" FOREIGN KEY ("{dst_col}") REFERENCES "{dst_table}" ("id") ON DELETE CASCADE"#,
));
}
SchemaChange::DropM2MTable { through } => {
out.immediate
.push(format!(r#"DROP TABLE IF EXISTS "{through}" CASCADE"#));
}
SchemaChange::AddCompositeFk {
table,
name,
to,
from,
on,
} => {
let from_cols = from
.iter()
.map(|c| format!(r#""{c}""#))
.collect::<Vec<_>>()
.join(", ");
let on_cols = on
.iter()
.map(|c| format!(r#""{c}""#))
.collect::<Vec<_>>()
.join(", ");
out.deferred_fks.push(format!(
r#"ALTER TABLE "{table}" ADD CONSTRAINT "{name}" FOREIGN KEY ({from_cols}) REFERENCES "{to}" ({on_cols})"#,
));
}
SchemaChange::DropCompositeFk { table, name } => {
out.immediate.push(format!(
r#"ALTER TABLE "{table}" DROP CONSTRAINT IF EXISTS "{name}""#,
));
}
}
}
Ok(out)
}
fn pg_type_for_ty_name(ty: &str) -> String {
match ty {
"i16" => "SMALLINT".into(),
"i32" => "INTEGER".into(),
"i64" => "BIGINT".into(),
"f32" => "REAL".into(),
"f64" => "DOUBLE PRECISION".into(),
"bool" => "BOOLEAN".into(),
"string" => "TEXT".into(),
"datetime" => "TIMESTAMPTZ".into(),
"date" => "DATE".into(),
"uuid" => "UUID".into(),
"json" => "JSONB".into(),
other => other.to_uppercase(),
}
}
fn create_table_sql_from_snapshot(t: &TableSnapshot) -> String {
create_table_sql_from_snapshot_with_dialect(t, &crate::sql::Postgres)
}
fn create_table_sql_from_snapshot_with_dialect(
t: &TableSnapshot,
dialect: &dyn crate::sql::Dialect,
) -> String {
let mut sql = format!("CREATE TABLE {} (", dialect.quote_ident(&t.name));
let mut first = true;
for f in &t.fields {
if !first {
sql.push_str(", ");
}
first = false;
let _ = write!(
sql,
"{} {}",
dialect.quote_ident(&f.column),
sql_type_with_dialect(f, dialect)
);
if let Some(expr) = &f.default {
let _ = write!(sql, " DEFAULT {expr}");
}
if !f.nullable {
sql.push_str(" NOT NULL");
}
let serial_pk_inline = f.auto
&& matches!(f.ty.as_str(), "i16" | "i32" | "i64")
&& dialect.serial_type_includes_primary_key();
if f.primary_key && !serial_pk_inline {
sql.push_str(" PRIMARY KEY");
}
if f.unique && !f.primary_key {
sql.push_str(" UNIQUE");
}
if f.min.is_some() || f.max.is_some() {
sql.push_str(" CHECK (");
let mut wrote = false;
if let Some(min) = f.min {
let _ = write!(sql, "{} >= {}", dialect.quote_ident(&f.column), min);
wrote = true;
}
if let Some(max) = f.max {
if wrote {
sql.push_str(" AND ");
}
let _ = write!(sql, "{} <= {}", dialect.quote_ident(&f.column), max);
}
sql.push(')');
}
}
sql.push(')');
sql
}
fn constraints_sql_from_snapshot(t: &TableSnapshot) -> Vec<String> {
let mut out: Vec<String> = t
.fields
.iter()
.filter_map(|f| {
f.fk.as_ref().map(|rel| {
format!(
r#"ALTER TABLE "{}" ADD CONSTRAINT "{}_{}_fkey" FOREIGN KEY ("{}") REFERENCES "{}" ("{}")"#,
t.name, t.name, f.column, f.column, rel.to, rel.on,
)
})
})
.collect();
for cf in &t.composite_fks {
let from_cols = cf
.from
.iter()
.map(|c| format!(r#""{c}""#))
.collect::<Vec<_>>()
.join(", ");
let on_cols = cf
.on
.iter()
.map(|c| format!(r#""{c}""#))
.collect::<Vec<_>>()
.join(", ");
out.push(format!(
r#"ALTER TABLE "{}" ADD CONSTRAINT "{}" FOREIGN KEY ({}) REFERENCES "{}" ({})"#,
t.name, cf.name, from_cols, cf.to, on_cols,
));
}
out
}
fn add_column_sql(table: &str, f: &FieldSnapshot) -> String {
let mut sql = format!(
r#"ALTER TABLE "{}" ADD COLUMN "{}" {}"#,
table,
f.column,
sql_type(f)
);
if let Some(expr) = &f.default {
let _ = write!(sql, " DEFAULT {expr}");
}
if !f.nullable {
sql.push_str(" NOT NULL");
}
if f.min.is_some() || f.max.is_some() {
sql.push_str(" CHECK (");
let mut wrote = false;
if let Some(min) = f.min {
let _ = write!(sql, r#""{}" >= {}"#, f.column, min);
wrote = true;
}
if let Some(max) = f.max {
if wrote {
sql.push_str(" AND ");
}
let _ = write!(sql, r#""{}" <= {}"#, f.column, max);
}
sql.push(')');
}
sql
}
fn sql_type(f: &FieldSnapshot) -> String {
sql_type_with_dialect(f, &crate::sql::Postgres)
}
fn sql_type_with_dialect(f: &FieldSnapshot, dialect: &dyn crate::sql::Dialect) -> String {
use crate::core::FieldType;
let ty = match f.ty.as_str() {
"i16" => Some(FieldType::I16),
"i32" => Some(FieldType::I32),
"i64" => Some(FieldType::I64),
"f32" => Some(FieldType::F32),
"f64" => Some(FieldType::F64),
"bool" => Some(FieldType::Bool),
"string" => Some(FieldType::String),
"datetime" => Some(FieldType::DateTime),
"date" => Some(FieldType::Date),
"uuid" => Some(FieldType::Uuid),
"json" => Some(FieldType::Json),
_ => None,
};
if f.auto {
if let Some(t) = ty {
if matches!(t, FieldType::I16 | FieldType::I32 | FieldType::I64) {
return dialect.serial_type(t).to_owned();
}
}
}
if let Some(t) = ty {
return dialect.column_type(t, f.max_length);
}
f.ty.to_uppercase()
}
#[cfg(test)]
mod sql_type_tests {
use super::*;
use crate::migrate::snapshot::FieldSnapshot;
fn fs(ty: &str, auto: bool) -> FieldSnapshot {
FieldSnapshot {
name: "x".into(),
column: "x".into(),
ty: ty.into(),
nullable: false,
primary_key: false,
max_length: None,
min: None,
max: None,
default: None,
auto,
unique: false,
fk: None,
}
}
#[test]
fn auto_integer_emits_serial() {
assert_eq!(sql_type(&fs("i32", true)), "SERIAL");
assert_eq!(sql_type(&fs("i64", true)), "BIGSERIAL");
}
#[test]
fn auto_non_integer_falls_through_to_real_type() {
assert_eq!(sql_type(&fs("datetime", true)), "TIMESTAMPTZ");
assert_eq!(sql_type(&fs("date", true)), "DATE");
assert_eq!(sql_type(&fs("uuid", true)), "UUID");
assert_eq!(sql_type(&fs("bool", true)), "BOOLEAN");
assert_eq!(sql_type(&fs("string", true)), "TEXT");
}
#[test]
fn non_auto_passes_through_normally() {
assert_eq!(sql_type(&fs("i64", false)), "BIGINT");
assert_eq!(sql_type(&fs("datetime", false)), "TIMESTAMPTZ");
}
}