use std::path::{Path, PathBuf};
use std::sync::OnceLock;
use serde::{Deserialize, Serialize};
use crate::backend::DatabaseBackend;
use crate::orm::{FieldSpec, Model, SqlType};
static REGISTRY: OnceLock<Vec<(String, ModelMeta)>> = OnceLock::new();
pub(crate) fn init_plugins(per_plugin: std::collections::HashMap<String, Vec<ModelMeta>>) {
let mut flat: Vec<(String, ModelMeta)> = Vec::new();
let mut plugin_names: Vec<String> = per_plugin.keys().cloned().collect();
plugin_names.sort();
for plugin in plugin_names {
for m in per_plugin.get(&plugin).cloned().unwrap_or_default() {
flat.push((plugin.clone(), m));
}
}
REGISTRY
.set(flat)
.expect("umbral::migrate::init_plugins called more than once");
}
pub fn registered_models() -> Vec<ModelMeta> {
REGISTRY
.get()
.expect("umbral: model registry not initialised — did you call App::build()?")
.iter()
.map(|(_, m)| m.clone())
.collect()
}
pub fn is_initialised() -> bool {
REGISTRY.get().is_some()
}
pub fn pk_meta_for_table(table: &str) -> Option<(String, crate::orm::SqlType)> {
if !is_initialised() {
return None;
}
static CACHE: std::sync::OnceLock<
std::collections::HashMap<String, (String, crate::orm::SqlType)>,
> = std::sync::OnceLock::new();
let map = CACHE.get_or_init(|| {
let mut out = std::collections::HashMap::new();
for m in registered_models() {
if let Some(pk) = m.pk_column() {
out.insert(m.table.clone(), (pk.name.clone(), pk.ty));
}
}
out
});
map.get(table).cloned()
}
pub fn model_meta_for_table(table: &str) -> Option<ModelMeta> {
if !is_initialised() {
return None;
}
static CACHE: std::sync::OnceLock<std::collections::HashMap<String, ModelMeta>> =
std::sync::OnceLock::new();
let map = CACHE.get_or_init(|| {
registered_models()
.into_iter()
.map(|m| (m.table.clone(), m))
.collect()
});
map.get(table).cloned()
}
pub fn fk_effective_type(col: &Column) -> crate::orm::SqlType {
if matches!(col.ty, crate::orm::SqlType::ForeignKey) {
col.fk_target
.as_deref()
.and_then(pk_meta_for_table)
.map(|(_, ty)| ty)
.unwrap_or(crate::orm::SqlType::BigInt)
} else {
col.ty
}
}
pub fn registered_plugins() -> Vec<String> {
let mut names: Vec<String> = REGISTRY
.get()
.expect("umbral: model registry not initialised — did you call App::build()?")
.iter()
.map(|(p, _)| p.clone())
.collect();
names.sort();
names.dedup();
names
}
static PLUGIN_ORDER: OnceLock<Vec<String>> = OnceLock::new();
static MODEL_ALIASES: OnceLock<std::collections::HashMap<String, String>> = OnceLock::new();
pub(crate) fn init_plugin_order(order: Vec<String>) {
PLUGIN_ORDER
.set(order)
.expect("umbral::migrate::init_plugin_order called more than once");
}
pub fn plugin_order() -> Vec<String> {
PLUGIN_ORDER
.get()
.cloned()
.unwrap_or_else(registered_plugins)
}
static API_ENDPOINTS: OnceLock<Vec<crate::plugin::ApiEndpoint>> = OnceLock::new();
pub(crate) fn init_api_endpoints(endpoints: Vec<crate::plugin::ApiEndpoint>) {
let _ = API_ENDPOINTS.set(endpoints);
}
pub fn registered_api_endpoints() -> Vec<crate::plugin::ApiEndpoint> {
API_ENDPOINTS.get().cloned().unwrap_or_default()
}
pub(crate) fn init_model_aliases(map: std::collections::HashMap<String, String>) {
MODEL_ALIASES
.set(map)
.expect("umbral::migrate::init_model_aliases called more than once");
}
pub fn table_alias(table_name: &str) -> String {
for meta in registered_models() {
if meta.table == table_name {
return model_alias(&meta.name).unwrap_or_else(|| "default".to_string());
}
}
"default".to_string()
}
pub fn model_alias(model_name: &str) -> Option<String> {
MODEL_ALIASES.get()?.get(model_name).cloned()
}
static MODEL_META_BY_NAME: OnceLock<std::collections::HashMap<String, ModelMeta>> = OnceLock::new();
pub fn model_meta_ref(name: &str) -> Option<&'static ModelMeta> {
if !is_initialised() {
return None;
}
MODEL_META_BY_NAME
.get_or_init(|| {
registered_models()
.into_iter()
.map(|m| (m.name.clone(), m))
.collect()
})
.get(name)
}
pub fn models_for_plugin(plugin: &str) -> Vec<ModelMeta> {
REGISTRY
.get()
.expect("umbral: model registry not initialised — did you call App::build()?")
.iter()
.filter(|(p, _)| p == plugin)
.map(|(_, m)| m.clone())
.collect()
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ModelMeta {
pub name: String,
pub table: String,
pub fields: Vec<Column>,
#[serde(default)]
pub display: String,
#[serde(default = "default_icon")]
pub icon: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub database: Option<String>,
#[serde(default, skip_serializing_if = "is_false")]
pub singleton: bool,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub unique_together: Vec<Vec<String>>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub indexes: Vec<Vec<String>>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub ordering: Vec<(String, bool)>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub m2m_relations: Vec<M2MRelation>,
#[serde(default, skip_serializing_if = "is_false")]
pub soft_delete: bool,
#[serde(default = "default_app_label")]
pub app_label: String,
}
fn default_app_label() -> String {
"app".to_string()
}
impl Default for ModelMeta {
fn default() -> Self {
Self {
name: String::new(),
table: String::new(),
fields: Vec::new(),
display: String::new(),
icon: default_icon(),
database: None,
singleton: false,
unique_together: Vec::new(),
indexes: Vec::new(),
ordering: Vec::new(),
m2m_relations: Vec::new(),
soft_delete: false,
app_label: default_app_label(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct M2MRelation {
pub field_name: String,
pub target_table: String,
pub target_name: String,
}
fn default_icon() -> String {
"database".to_string()
}
fn default_bigint() -> crate::orm::SqlType {
crate::orm::SqlType::BigInt
}
impl ModelMeta {
pub fn pk_column(&self) -> Option<&Column> {
self.fields.iter().find(|c| c.primary_key)
}
pub fn for_<T: Model>() -> Self {
Self {
name: T::NAME.to_string(),
table: T::TABLE.to_string(),
fields: T::FIELDS.iter().map(Column::from).collect(),
display: T::DISPLAY.to_string(),
icon: T::ICON.to_string(),
database: T::DATABASE.map(|s| s.to_string()),
singleton: T::SINGLETON,
unique_together: T::UNIQUE_TOGETHER
.iter()
.map(|group| group.iter().map(|s| s.to_string()).collect())
.collect(),
indexes: T::INDEXES
.iter()
.map(|group| group.iter().map(|s| s.to_string()).collect())
.collect(),
ordering: T::ORDERING
.iter()
.map(|(col, desc)| (col.to_string(), *desc))
.collect(),
m2m_relations: T::M2M_RELATIONS
.iter()
.map(|r| M2MRelation {
field_name: r.field_name.to_string(),
target_table: r.target_table.to_string(),
target_name: r.target_name.to_string(),
})
.collect(),
soft_delete: T::SOFT_DELETE,
app_label: T::APP_LABEL.to_string(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct Snapshot {
pub models: Vec<ModelMeta>,
}
impl Snapshot {
pub fn current() -> Self {
let mut models = registered_models().to_vec();
models.sort_by(|a, b| a.name.cmp(&b.name));
Self { models }
}
pub fn current_for(plugin: &str) -> Self {
let mut models = models_for_plugin(plugin);
models.sort_by(|a, b| a.name.cmp(&b.name));
Self { models }
}
pub fn hash(&self) -> String {
use sha2::{Digest, Sha256};
let json = serde_json::to_string(self).expect("Snapshot serializes");
let digest = Sha256::digest(json.as_bytes());
hex(&digest[..])
}
}
fn hex(bytes: &[u8]) -> String {
const HEX: &[u8; 16] = b"0123456789abcdef";
let mut s = String::with_capacity(bytes.len() * 2);
for b in bytes {
s.push(HEX[(b >> 4) as usize] as char);
s.push(HEX[(b & 0x0f) as usize] as char);
}
s
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind")]
pub enum Operation {
CreateTable {
table: String,
columns: Vec<Column>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
unique_together: Vec<Vec<String>>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
indexes: Vec<Vec<String>>,
},
DropTable { table: String },
AddColumn { table: String, column: Column },
DropColumn { table: String, column: String },
AlterColumn {
table: String,
column: String,
new_columns: Vec<Column>,
#[serde(default, skip_serializing_if = "Option::is_none")]
prev_columns: Option<Vec<Column>>,
},
RenameTable { from: String, to: String },
CreateM2MTable {
junction_table: String,
parent_table: String,
parent_col: String,
child_table: String,
child_col: String,
#[serde(default = "default_bigint")]
parent_ty: crate::orm::SqlType,
#[serde(default = "default_bigint")]
child_ty: crate::orm::SqlType,
},
DropM2MTable { junction_table: String },
RenameColumn {
table: String,
from: String,
to: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
column: Option<Column>,
},
RunSql {
sql: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
reverse_sql: Option<String>,
},
}
impl Operation {
pub fn table_name(&self) -> &str {
match self {
Operation::CreateTable { table, .. }
| Operation::DropTable { table }
| Operation::AddColumn { table, .. }
| Operation::DropColumn { table, .. }
| Operation::AlterColumn { table, .. }
| Operation::RenameColumn { table, .. } => table,
Operation::RenameTable { from, .. } => from,
Operation::CreateM2MTable { junction_table, .. }
| Operation::DropM2MTable { junction_table } => junction_table,
Operation::RunSql { .. } => "",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Column {
pub name: String,
pub ty: SqlType,
pub primary_key: bool,
pub nullable: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub fk_target: Option<String>,
#[serde(default)]
pub noform: bool,
#[serde(default = "default_true", skip_serializing_if = "is_true")]
pub db_constraint: bool,
#[serde(default)]
pub noedit: bool,
#[serde(default)]
pub is_string_repr: bool,
#[serde(default)]
pub max_length: u32,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub choices: Vec<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub choice_labels: Vec<String>,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub default: String,
#[serde(default, skip_serializing_if = "is_false")]
pub is_multichoice: bool,
#[serde(default, skip_serializing_if = "is_false")]
pub unique: bool,
#[serde(default, skip_serializing_if = "is_no_action")]
pub on_delete: crate::orm::FkAction,
#[serde(default, skip_serializing_if = "is_no_action")]
pub on_update: crate::orm::FkAction,
#[serde(default, skip_serializing_if = "is_false")]
pub index: bool,
#[serde(default, skip_serializing_if = "is_false")]
pub auto_now_add: bool,
#[serde(default, skip_serializing_if = "is_false")]
pub auto_now: bool,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub help: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub example: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub widget: Option<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub supported_backends: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub min: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub text_format: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub slug_from: Option<String>,
}
fn is_no_action(a: &crate::orm::FkAction) -> bool {
matches!(a, crate::orm::FkAction::NoAction)
}
fn create_index_stmt(table: &str, column: &str) -> String {
let t = table.replace('"', "\"\"");
let c = column.replace('"', "\"\"");
format!(
"CREATE INDEX IF NOT EXISTS \"idx_{table}_{column}\" ON \"{t}\" (\"{c}\")",
table = table.replace('"', ""),
column = column.replace('"', ""),
)
}
fn create_gin_index_stmt(table: &str, column: &str) -> String {
let t = table.replace('"', "\"\"");
let c = column.replace('"', "\"\"");
format!(
"CREATE INDEX IF NOT EXISTS \"idx_{table}_{column}_gin\" ON \"{t}\" USING GIN (\"{c}\")",
table = table.replace('"', ""),
column = column.replace('"', ""),
)
}
fn create_multi_index_stmt(table: &str, columns: &[String]) -> String {
if columns.is_empty() {
return String::new();
}
let t = table.replace('"', "");
let name_suffix = columns
.iter()
.map(|c| c.replace('"', ""))
.collect::<Vec<_>>()
.join("_");
let col_list = columns
.iter()
.map(|c| format!("\"{}\"", c.replace('"', "\"\"")))
.collect::<Vec<_>>()
.join(", ");
format!(
"CREATE INDEX IF NOT EXISTS \"idx_{t}_{name_suffix}\" ON \"{t}\" ({col_list})",
t = t.replace('"', "\"\""),
)
}
fn m2m_pk_sql_type_sqlite(ty: crate::orm::SqlType) -> &'static str {
use crate::orm::SqlType;
match ty {
SqlType::SmallInt | SqlType::Integer | SqlType::BigInt | SqlType::ForeignKey => "INTEGER",
SqlType::Text | SqlType::Uuid => "TEXT",
_ => "TEXT",
}
}
fn m2m_pk_sql_type_postgres(ty: crate::orm::SqlType) -> &'static str {
use crate::orm::SqlType;
match ty {
SqlType::SmallInt => "SMALLINT",
SqlType::Integer => "INTEGER",
SqlType::BigInt | SqlType::ForeignKey => "BIGINT",
SqlType::Text => "TEXT",
SqlType::Uuid => "UUID",
_ => "TEXT",
}
}
fn fk_action_suffix(col: &Column) -> String {
let mut s = String::new();
if let Some(kw) = col.on_delete.sql_keyword() {
s.push_str(" ON DELETE ");
s.push_str(kw);
}
if let Some(kw) = col.on_update.sql_keyword() {
s.push_str(" ON UPDATE ");
s.push_str(kw);
}
s
}
fn is_false(b: &bool) -> bool {
!*b
}
fn default_true() -> bool {
true
}
fn is_true(b: &bool) -> bool {
*b
}
impl From<&FieldSpec> for Column {
fn from(f: &FieldSpec) -> Self {
Self {
name: f.name.to_string(),
ty: f.ty,
primary_key: f.primary_key,
nullable: f.nullable,
fk_target: f.fk_target.map(|s| s.to_string()),
noform: f.noform,
db_constraint: f.db_constraint,
noedit: f.noedit,
is_string_repr: f.is_string_repr,
max_length: f.max_length,
choices: f.choices.iter().map(|s| s.to_string()).collect(),
choice_labels: f.choice_labels.iter().map(|s| s.to_string()).collect(),
default: f.default.to_string(),
is_multichoice: f.is_multichoice,
unique: f.unique,
on_delete: f.on_delete,
on_update: f.on_update,
index: f.index,
auto_now_add: f.auto_now_add,
auto_now: f.auto_now,
help: f.help.to_string(),
example: f.example.to_string(),
widget: f.widget.map(|s| s.to_string()),
supported_backends: f.supported_backends.iter().map(|s| s.to_string()).collect(),
min: f.min,
max: f.max,
text_format: f.text_format.map(|s| s.to_string()),
slug_from: f.slug_from.map(|s| s.to_string()),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct MigrationFile {
pub id: String,
pub plugin: String,
#[serde(default)]
pub depends_on: Vec<MigrationRef>,
pub operations: Vec<Operation>,
pub snapshot_after: Snapshot,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct MigrationRef {
pub plugin: String,
pub migration: String,
}
pub const APP_PLUGIN_NAME: &str = "app";
pub const MIGRATIONS_DIR: &str = "migrations";
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MigrationStatus {
Applied,
AppliedButMissing,
OutOfOrder,
Pending,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MigrationEntry {
pub plugin: String,
pub name: String,
pub status: MigrationStatus,
}
#[derive(Debug, Clone, Default)]
pub struct DriftReport {
pub entries: Vec<MigrationEntry>,
}
impl DriftReport {
pub fn has_critical_drift(&self) -> bool {
self.entries
.iter()
.any(|e| e.status == MigrationStatus::AppliedButMissing)
}
pub fn missing_on_disk(&self) -> Vec<&MigrationEntry> {
self.entries
.iter()
.filter(|e| e.status == MigrationStatus::AppliedButMissing)
.collect()
}
}
#[derive(Debug)]
pub enum MigrateError {
Io(std::io::Error),
Json(serde_json::Error),
Sqlx(sqlx::Error),
NoChanges,
UnsupportedChange(String),
UnsafeAlter {
model: String,
column: String,
reason: String,
},
DriftDetected { missing: Vec<(String, String)> },
SchemaUnsupportedOnSqlite { schema: String },
UnknownPlugin { requested: String, known: Vec<String> },
}
impl std::fmt::Display for MigrateError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
MigrateError::Io(e) => write!(f, "umbral migrate: io: {e}"),
MigrateError::Json(e) => write!(f, "umbral migrate: json: {e}"),
MigrateError::Sqlx(e) => write!(f, "umbral migrate: sqlx: {e}"),
MigrateError::NoChanges => write!(
f,
"umbral migrate: no changes detected; declare or change a model first"
),
MigrateError::UnsupportedChange(msg) => {
write!(f, "umbral migrate: unsupported change at M5 v1: {msg}")
}
MigrateError::UnsafeAlter {
model,
column,
reason,
} => write!(
f,
"umbral migrate: unsafe column change on `{model}.{column}`: {reason}; \
hand-write the migration with a data-preserving step"
),
MigrateError::DriftDetected { missing } => {
let names: Vec<String> = missing
.iter()
.map(|(plugin, name)| format!("{plugin}/{name}"))
.collect();
write!(
f,
"umbral migrate: drift detected — the following migrations are recorded in \
the tracking table but their files are missing from disk:\n {}\n\
Restore the files from VCS or run `umbral migrate --allow-drift` to \
proceed despite the inconsistency.",
names.join("\n ")
)
}
MigrateError::SchemaUnsupportedOnSqlite { schema } => write!(
f,
"umbral migrate: schema-per-tenant migration into `{schema}` requires \
Postgres; SQLite has no schemas. Point the app at a Postgres pool."
),
MigrateError::UnknownPlugin { requested, known } => write!(
f,
"umbral makemigrations --empty: no registered plugin named `{requested}`. \
Known plugins: {}",
known.join(", ")
),
}
}
}
impl std::error::Error for MigrateError {}
impl From<std::io::Error> for MigrateError {
fn from(e: std::io::Error) -> Self {
Self::Io(e)
}
}
impl From<serde_json::Error> for MigrateError {
fn from(e: serde_json::Error) -> Self {
Self::Json(e)
}
}
impl From<sqlx::Error> for MigrateError {
fn from(e: sqlx::Error) -> Self {
Self::Sqlx(e)
}
}
pub async fn make() -> Result<Vec<PathBuf>, MigrateError> {
make_in(Path::new(MIGRATIONS_DIR)).await
}
pub async fn make_in(dir: &Path) -> Result<Vec<PathBuf>, MigrateError> {
let mut written: Vec<PathBuf> = Vec::new();
for plugin in plugin_order() {
let plugin_dir = dir.join(&plugin);
let existing = list_migration_files(&plugin_dir)?;
let previous = match existing.last() {
Some(path) => read_migration_file(path)?.snapshot_after,
None => Snapshot::default(),
};
let current = Snapshot::current_for(&plugin);
let operations = diff(&previous, ¤t)?;
if operations.is_empty() {
continue;
}
let seq = (existing.len() + 1) as u32;
let suffix = suffix_for(&operations);
let id = format!("{seq:04}_{suffix}");
let filename = format!("{id}.json");
let file = MigrationFile {
id: id.clone(),
plugin: plugin.clone(),
depends_on: Vec::new(),
operations,
snapshot_after: current,
};
std::fs::create_dir_all(&plugin_dir)?;
let path = plugin_dir.join(filename);
let json = serde_json::to_string_pretty(&file)?;
std::fs::write(&path, json)?;
written.push(path);
}
if written.is_empty() {
return Err(MigrateError::NoChanges);
}
Ok(written)
}
pub async fn make_empty(plugin: &str) -> Result<PathBuf, MigrateError> {
make_empty_in(Path::new(MIGRATIONS_DIR), plugin).await
}
pub async fn make_empty_in(dir: &Path, plugin: &str) -> Result<PathBuf, MigrateError> {
let known = plugin_order();
if !known.iter().any(|p| p == plugin) {
return Err(MigrateError::UnknownPlugin {
requested: plugin.to_string(),
known,
});
}
let plugin_dir = dir.join(plugin);
let existing = list_migration_files(&plugin_dir)?;
let snapshot = match existing.last() {
Some(path) => read_migration_file(path)?.snapshot_after,
None => Snapshot::current_for(plugin),
};
let seq = (existing.len() + 1) as u32;
let id = format!("{seq:04}_empty");
let filename = format!("{id}.json");
let file = MigrationFile {
id: id.clone(),
plugin: plugin.to_string(),
depends_on: Vec::new(),
operations: Vec::new(),
snapshot_after: snapshot,
};
std::fs::create_dir_all(&plugin_dir)?;
let path = plugin_dir.join(filename);
let json = serde_json::to_string_pretty(&file)?;
std::fs::write(&path, json)?;
Ok(path)
}
pub async fn run() -> Result<u64, MigrateError> {
run_checked(false).await
}
pub async fn run_checked(allow_drift: bool) -> Result<u64, MigrateError> {
run_checked_in(Path::new(MIGRATIONS_DIR), allow_drift).await
}
pub async fn run_checked_in(dir: &Path, allow_drift: bool) -> Result<u64, MigrateError> {
let mut total: u64 = 0;
for alias in crate::db::registered_aliases() {
match crate::db::pool_for_dispatched(&alias) {
crate::db::DbPool::Sqlite(p) => {
total += run_in_sqlite_checked(dir, p, allow_drift, &alias).await?
}
crate::db::DbPool::Postgres(p) => {
total += run_in_postgres_checked(dir, p, allow_drift, &alias).await?
}
}
}
Ok(total)
}
pub async fn run_in(dir: &Path) -> Result<u64, MigrateError> {
let mut total: u64 = 0;
for alias in crate::db::registered_aliases() {
match crate::db::pool_for_dispatched(&alias) {
crate::db::DbPool::Sqlite(p) => {
total += run_in_sqlite_for_alias(dir, &alias, p, None).await?
}
crate::db::DbPool::Postgres(p) => {
total += run_in_postgres_for_alias(dir, &alias, p, None).await?
}
}
}
Ok(total)
}
pub async fn run_shared(shared_apps: &std::collections::HashSet<String>) -> Result<u64, MigrateError> {
run_shared_in(Path::new(MIGRATIONS_DIR), shared_apps).await
}
pub async fn run_shared_in(
dir: &Path,
shared_apps: &std::collections::HashSet<String>,
) -> Result<u64, MigrateError> {
let mut total: u64 = 0;
for alias in crate::db::registered_aliases() {
match crate::db::pool_for_dispatched(&alias) {
crate::db::DbPool::Sqlite(p) => {
total += run_in_sqlite_for_alias(dir, &alias, p, Some(shared_apps)).await?
}
crate::db::DbPool::Postgres(p) => {
total += run_in_postgres_for_alias(dir, &alias, p, Some(shared_apps)).await?
}
}
}
Ok(total)
}
fn op_targets_alias(op: &Operation, alias: &str) -> bool {
if table_alias(op.table_name()) != alias {
return false;
}
match model_meta_for_table(op.table_name()) {
Some(meta) => crate::db::router::router().allow_migrate(alias, &meta),
None => true, }
}
async fn run_in_sqlite_for_alias(
dir: &Path,
alias: &str,
pool: &sqlx::SqlitePool,
shared_only: Option<&std::collections::HashSet<String>>,
) -> Result<u64, MigrateError> {
ensure_tracking_table_sqlite(pool).await?;
let applied = applied_names_sqlite(pool).await?;
let mut applied_count: u64 = 0;
for plugin in plugin_order() {
if let Some(shared) = shared_only {
if !shared.contains(&plugin) {
continue;
}
}
let plugin_dir = dir.join(&plugin);
let paths = list_migration_files(&plugin_dir)?;
for path in paths {
let file = read_migration_file(&path)?;
if applied.contains(&(file.plugin.clone(), file.id.clone())) {
continue;
}
let ops_for_this_db: Vec<&Operation> = file
.operations
.iter()
.filter(|op| op_targets_alias(op, alias))
.collect();
if ops_for_this_db.is_empty() {
continue;
}
let mut tx = pool.begin().await?;
for op in &ops_for_this_db {
for sql in render_operation(op) {
sqlx::query(&sql).execute(&mut *tx).await?;
}
}
let snapshot_hash = file.snapshot_after.hash();
let applied_at = chrono::Utc::now().to_rfc3339();
sqlx::query(
"INSERT INTO umbral_migrations (plugin, name, applied_at, snapshot_hash) \
VALUES (?, ?, ?, ?)",
)
.bind(&file.plugin)
.bind(&file.id)
.bind(&applied_at)
.bind(&snapshot_hash)
.execute(&mut *tx)
.await?;
tx.commit().await?;
applied_count += 1;
}
}
Ok(applied_count)
}
async fn run_in_postgres_for_alias(
dir: &Path,
alias: &str,
pool: &sqlx::PgPool,
shared_only: Option<&std::collections::HashSet<String>>,
) -> Result<u64, MigrateError> {
ensure_tracking_table_postgres(pool).await?;
let applied = applied_names_postgres(pool).await?;
let mut applied_count: u64 = 0;
for plugin in plugin_order() {
if let Some(shared) = shared_only {
if !shared.contains(&plugin) {
continue;
}
}
let plugin_dir = dir.join(&plugin);
let paths = list_migration_files(&plugin_dir)?;
for path in paths {
let file = read_migration_file(&path)?;
if applied.contains(&(file.plugin.clone(), file.id.clone())) {
continue;
}
let ops_for_this_db: Vec<&Operation> = file
.operations
.iter()
.filter(|op| op_targets_alias(op, alias))
.collect();
if ops_for_this_db.is_empty() {
continue;
}
let mut tx = pool.begin().await?;
for op in &ops_for_this_db {
for sql in render_operation(op) {
sqlx::query(&sql).execute(&mut *tx).await?;
}
}
let snapshot_hash = file.snapshot_after.hash();
let applied_at = chrono::Utc::now().to_rfc3339();
sqlx::query(
"INSERT INTO umbral_migrations (plugin, name, applied_at, snapshot_hash) \
VALUES ($1, $2, $3, $4)",
)
.bind(&file.plugin)
.bind(&file.id)
.bind(&applied_at)
.bind(&snapshot_hash)
.execute(&mut *tx)
.await?;
tx.commit().await?;
applied_count += 1;
}
}
Ok(applied_count)
}
pub async fn run_for_schema(
schema: &crate::db::Schema,
shared_apps: &std::collections::HashSet<String>,
) -> Result<u64, MigrateError> {
run_for_schema_in(Path::new(MIGRATIONS_DIR), schema, shared_apps).await
}
pub async fn run_for_schema_in(
dir: &Path,
schema: &crate::db::Schema,
shared_apps: &std::collections::HashSet<String>,
) -> Result<u64, MigrateError> {
match crate::db::pool_dispatched() {
crate::db::DbPool::Postgres(p) => {
run_tenant_apps_in_postgres_schema(dir, schema, shared_apps, p).await
}
crate::db::DbPool::Sqlite(_) => Err(MigrateError::SchemaUnsupportedOnSqlite {
schema: schema.as_str().to_string(),
}),
}
}
async fn run_tenant_apps_in_postgres_schema(
dir: &Path,
schema: &crate::db::Schema,
shared_apps: &std::collections::HashSet<String>,
pool: &sqlx::PgPool,
) -> Result<u64, MigrateError> {
let quoted = format!("\"{}\"", schema.as_str());
sqlx::query(&format!("CREATE SCHEMA IF NOT EXISTS {quoted}"))
.execute(pool)
.await?;
{
let mut tx = pool.begin().await?;
sqlx::query(&format!("SET LOCAL search_path TO {quoted}"))
.execute(&mut *tx)
.await?;
ensure_tracking_table_pg_conn(&mut tx).await?;
tx.commit().await?;
}
let applied = {
let mut tx = pool.begin().await?;
sqlx::query(&format!("SET LOCAL search_path TO {quoted}"))
.execute(&mut *tx)
.await?;
let rows: Vec<(String, String)> =
sqlx::query_as("SELECT plugin, name FROM umbral_migrations")
.fetch_all(&mut *tx)
.await?;
tx.commit().await?;
rows.into_iter().collect::<std::collections::HashSet<_>>()
};
let mut applied_count: u64 = 0;
for plugin in plugin_order() {
if shared_apps.contains(&plugin) {
continue;
}
let plugin_dir = dir.join(&plugin);
let paths = list_migration_files(&plugin_dir)?;
for path in paths {
let file = read_migration_file(&path)?;
if applied.contains(&(file.plugin.clone(), file.id.clone())) {
continue;
}
if shared_apps.contains(&file.plugin) {
continue;
}
let mut tx = pool.begin().await?;
sqlx::query(&format!("SET LOCAL search_path TO {quoted}, public"))
.execute(&mut *tx)
.await?;
for op in &file.operations {
for sql in render_operation_for(op, "postgres") {
sqlx::query(&sql).execute(&mut *tx).await?;
}
}
let snapshot_hash = file.snapshot_after.hash();
let applied_at = chrono::Utc::now().to_rfc3339();
sqlx::query(
"INSERT INTO umbral_migrations (plugin, name, applied_at, snapshot_hash) \
VALUES ($1, $2, $3, $4)",
)
.bind(&file.plugin)
.bind(&file.id)
.bind(&applied_at)
.bind(&snapshot_hash)
.execute(&mut *tx)
.await?;
tx.commit().await?;
applied_count += 1;
}
}
Ok(applied_count)
}
pub async fn migrate_apps_into_pool(
alias: &str,
shared_apps: &std::collections::HashSet<String>,
) -> Result<u64, MigrateError> {
migrate_apps_into_pool_in(Path::new(MIGRATIONS_DIR), alias, shared_apps).await
}
pub async fn migrate_apps_into_pool_in(
dir: &Path,
alias: &str,
shared_apps: &std::collections::HashSet<String>,
) -> Result<u64, MigrateError> {
match crate::db::pool_for_dispatched(alias) {
crate::db::DbPool::Postgres(p) => {
migrate_tenant_apps_into_pg_pool(dir, shared_apps, p).await
}
crate::db::DbPool::Sqlite(p) => {
migrate_tenant_apps_into_sqlite_pool(dir, shared_apps, p).await
}
}
}
async fn migrate_tenant_apps_into_pg_pool(
dir: &Path,
shared_apps: &std::collections::HashSet<String>,
pool: &sqlx::PgPool,
) -> Result<u64, MigrateError> {
ensure_tracking_table_postgres(pool).await?;
let applied = applied_names_postgres(pool).await?;
let mut applied_count: u64 = 0;
for plugin in plugin_order() {
if shared_apps.contains(&plugin) {
continue;
}
let plugin_dir = dir.join(&plugin);
for path in list_migration_files(&plugin_dir)? {
let file = read_migration_file(&path)?;
if applied.contains(&(file.plugin.clone(), file.id.clone())) {
continue;
}
if shared_apps.contains(&file.plugin) {
continue;
}
let mut tx = pool.begin().await?;
for op in &file.operations {
for sql in render_operation_for(op, "postgres") {
sqlx::query(&sql).execute(&mut *tx).await?;
}
}
let snapshot_hash = file.snapshot_after.hash();
let applied_at = chrono::Utc::now().to_rfc3339();
sqlx::query(
"INSERT INTO umbral_migrations (plugin, name, applied_at, snapshot_hash) \
VALUES ($1, $2, $3, $4)",
)
.bind(&file.plugin)
.bind(&file.id)
.bind(&applied_at)
.bind(&snapshot_hash)
.execute(&mut *tx)
.await?;
tx.commit().await?;
applied_count += 1;
}
}
Ok(applied_count)
}
async fn migrate_tenant_apps_into_sqlite_pool(
dir: &Path,
shared_apps: &std::collections::HashSet<String>,
pool: &sqlx::SqlitePool,
) -> Result<u64, MigrateError> {
ensure_tracking_table_sqlite(pool).await?;
let applied = applied_names_sqlite(pool).await?;
let mut applied_count: u64 = 0;
for plugin in plugin_order() {
if shared_apps.contains(&plugin) {
continue;
}
let plugin_dir = dir.join(&plugin);
for path in list_migration_files(&plugin_dir)? {
let file = read_migration_file(&path)?;
if applied.contains(&(file.plugin.clone(), file.id.clone())) {
continue;
}
if shared_apps.contains(&file.plugin) {
continue;
}
let mut tx = pool.begin().await?;
for op in &file.operations {
for sql in render_operation_for(op, "sqlite") {
sqlx::query(&sql).execute(&mut *tx).await?;
}
}
let snapshot_hash = file.snapshot_after.hash();
let applied_at = chrono::Utc::now().to_rfc3339();
sqlx::query(
"INSERT INTO umbral_migrations (plugin, name, applied_at, snapshot_hash) \
VALUES (?, ?, ?, ?)",
)
.bind(&file.plugin)
.bind(&file.id)
.bind(&applied_at)
.bind(&snapshot_hash)
.execute(&mut *tx)
.await?;
tx.commit().await?;
applied_count += 1;
}
}
Ok(applied_count)
}
async fn ensure_tracking_table_pg_conn(
conn: &mut sqlx::PgConnection,
) -> Result<(), MigrateError> {
sqlx::query(
"CREATE TABLE IF NOT EXISTS umbral_migrations (
plugin TEXT NOT NULL,
name TEXT NOT NULL,
applied_at TEXT NOT NULL,
snapshot_hash TEXT NOT NULL,
PRIMARY KEY (plugin, name)
)",
)
.execute(conn)
.await?;
Ok(())
}
async fn run_in_sqlite_checked(
dir: &Path,
pool: &sqlx::SqlitePool,
allow_drift: bool,
alias: &str,
) -> Result<u64, MigrateError> {
ensure_tracking_table_sqlite(pool).await?;
let applied = applied_names_sqlite(pool).await?;
let report = detect_all_drift(&applied, dir)?;
if report.has_critical_drift() {
if allow_drift {
let missing = report.missing_on_disk();
for entry in &missing {
eprintln!(
"warning: umbral migrate --allow-drift: migration {}/{} is recorded in \
the tracking table but the file is missing from disk; proceeding.",
entry.plugin, entry.name
);
}
} else {
let missing: Vec<(String, String)> = report
.missing_on_disk()
.iter()
.map(|e| (e.plugin.clone(), e.name.clone()))
.collect();
return Err(MigrateError::DriftDetected { missing });
}
}
for entry in report
.entries
.iter()
.filter(|e| e.status == MigrationStatus::OutOfOrder)
{
eprintln!(
"warning: umbral migrate: migration {}/{} is on disk but appears before the \
last applied migration for this plugin; it looks like a file was restored \
after a teammate already applied later ones.",
entry.plugin, entry.name
);
}
run_in_sqlite_for_alias(dir, alias, pool, None).await
}
async fn run_in_postgres_checked(
dir: &Path,
pool: &sqlx::PgPool,
allow_drift: bool,
alias: &str,
) -> Result<u64, MigrateError> {
ensure_tracking_table_postgres(pool).await?;
let applied = applied_names_postgres(pool).await?;
let report = detect_all_drift(&applied, dir)?;
if report.has_critical_drift() {
if allow_drift {
let missing = report.missing_on_disk();
for entry in &missing {
eprintln!(
"warning: umbral migrate --allow-drift: migration {}/{} is recorded in \
the tracking table but the file is missing from disk; proceeding.",
entry.plugin, entry.name
);
}
} else {
let missing: Vec<(String, String)> = report
.missing_on_disk()
.iter()
.map(|e| (e.plugin.clone(), e.name.clone()))
.collect();
return Err(MigrateError::DriftDetected { missing });
}
}
for entry in report
.entries
.iter()
.filter(|e| e.status == MigrationStatus::OutOfOrder)
{
eprintln!(
"warning: umbral migrate: migration {}/{} is on disk but appears before the \
last applied migration for this plugin; it looks like a file was restored \
after a teammate already applied later ones.",
entry.plugin, entry.name
);
}
run_in_postgres_for_alias(dir, alias, pool, None).await
}
pub async fn record_applied(
plugin: &str,
name: &str,
snapshot_hash: &str,
) -> Result<(), MigrateError> {
let applied_at = chrono::Utc::now().to_rfc3339();
match crate::db::pool_dispatched() {
crate::db::DbPool::Sqlite(pool) => {
ensure_tracking_table_sqlite(pool).await?;
sqlx::query(
"INSERT OR IGNORE INTO umbral_migrations \
(plugin, name, applied_at, snapshot_hash) \
VALUES (?, ?, ?, ?)",
)
.bind(plugin)
.bind(name)
.bind(&applied_at)
.bind(snapshot_hash)
.execute(pool)
.await?;
}
crate::db::DbPool::Postgres(pool) => {
ensure_tracking_table_postgres(pool).await?;
sqlx::query(
"INSERT INTO umbral_migrations \
(plugin, name, applied_at, snapshot_hash) \
VALUES ($1, $2, $3, $4) \
ON CONFLICT (plugin, name) DO NOTHING",
)
.bind(plugin)
.bind(name)
.bind(&applied_at)
.bind(snapshot_hash)
.execute(pool)
.await?;
}
}
Ok(())
}
pub fn detect_drift(
plugin: &str,
applied: &std::collections::HashSet<(String, String)>,
plugin_dir: &Path,
) -> Result<Vec<MigrationEntry>, MigrateError> {
let paths = list_migration_files(plugin_dir)?;
let mut on_disk: Vec<String> = Vec::new();
for path in &paths {
let file = read_migration_file(path)?;
on_disk.push(file.id.clone());
}
let plugin_applied: Vec<&str> = applied
.iter()
.filter(|(p, _)| p == plugin)
.map(|(_, n)| n.as_str())
.collect();
let max_applied_seq: u32 = plugin_applied
.iter()
.filter_map(|name| name.split('_').next()?.parse::<u32>().ok())
.max()
.unwrap_or(0);
let on_disk_set: std::collections::HashSet<&str> = on_disk.iter().map(|s| s.as_str()).collect();
let mut entries: Vec<MigrationEntry> = Vec::new();
for name in &on_disk {
let key = (plugin.to_string(), name.clone());
let status = if applied.contains(&key) {
MigrationStatus::Applied
} else {
let seq: u32 = name
.split('_')
.next()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
if seq <= max_applied_seq && max_applied_seq > 0 {
MigrationStatus::OutOfOrder
} else {
MigrationStatus::Pending
}
};
entries.push(MigrationEntry {
plugin: plugin.to_string(),
name: name.clone(),
status,
});
}
for name in &plugin_applied {
if !on_disk_set.contains(*name) {
entries.push(MigrationEntry {
plugin: plugin.to_string(),
name: (*name).to_string(),
status: MigrationStatus::AppliedButMissing,
});
}
}
entries.sort_by(|a, b| a.name.cmp(&b.name));
Ok(entries)
}
pub fn detect_all_drift(
applied: &std::collections::HashSet<(String, String)>,
dir: &Path,
) -> Result<DriftReport, MigrateError> {
let mut all_entries: Vec<MigrationEntry> = Vec::new();
let mut seen_plugins: std::collections::HashSet<String> = std::collections::HashSet::new();
for plugin in plugin_order() {
seen_plugins.insert(plugin.clone());
let plugin_dir = dir.join(&plugin);
let entries = detect_drift(&plugin, applied, &plugin_dir)?;
all_entries.extend(entries);
}
for (plugin, name) in applied {
if !seen_plugins.contains(plugin.as_str()) {
all_entries.push(MigrationEntry {
plugin: plugin.clone(),
name: name.clone(),
status: MigrationStatus::AppliedButMissing,
});
}
}
Ok(DriftReport {
entries: all_entries,
})
}
pub async fn fake_apply(plugin: &str, name: &str) -> Result<(), MigrateError> {
fake_apply_in(plugin, name, Path::new(MIGRATIONS_DIR)).await
}
pub async fn fake_apply_in(plugin: &str, name: &str, dir: &Path) -> Result<(), MigrateError> {
let path = dir.join(plugin).join(format!("{name}.json"));
let file = read_migration_file(&path)?;
let snapshot_hash = file.snapshot_after.hash();
record_applied(plugin, name, &snapshot_hash).await
}
pub async fn fake_initial() -> Result<u64, MigrateError> {
fake_initial_in(Path::new(MIGRATIONS_DIR)).await
}
pub async fn fake_initial_in(dir: &Path) -> Result<u64, MigrateError> {
match crate::db::pool_dispatched() {
crate::db::DbPool::Sqlite(pool) => fake_initial_sqlite(dir, pool).await,
crate::db::DbPool::Postgres(pool) => fake_initial_postgres(dir, pool).await,
}
}
async fn fake_initial_sqlite(dir: &Path, pool: &sqlx::SqlitePool) -> Result<u64, MigrateError> {
ensure_tracking_table_sqlite(pool).await?;
let applied = applied_names_sqlite(pool).await?;
let mut count: u64 = 0;
for plugin in plugin_order() {
let plugin_dir = dir.join(&plugin);
let paths = list_migration_files(&plugin_dir)?;
let first = paths.first();
let first = match first {
Some(p) => p,
None => continue,
};
let file = read_migration_file(first)?;
if applied.contains(&(file.plugin.clone(), file.id.clone())) {
continue;
}
let tables_to_create: Vec<&str> = file
.operations
.iter()
.filter_map(|op| match op {
Operation::CreateTable { table, .. } => Some(table.as_str()),
_ => None,
})
.collect();
if tables_to_create.is_empty() {
continue;
}
let mut all_present = true;
for table in &tables_to_create {
let exists: Option<(String,)> =
sqlx::query_as("SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?")
.bind(*table)
.fetch_optional(pool)
.await?;
if exists.is_none() {
all_present = false;
break;
}
}
if all_present {
let snapshot_hash = file.snapshot_after.hash();
let applied_at = chrono::Utc::now().to_rfc3339();
sqlx::query(
"INSERT OR IGNORE INTO umbral_migrations \
(plugin, name, applied_at, snapshot_hash) VALUES (?, ?, ?, ?)",
)
.bind(&file.plugin)
.bind(&file.id)
.bind(&applied_at)
.bind(&snapshot_hash)
.execute(pool)
.await?;
count += 1;
}
}
Ok(count)
}
async fn fake_initial_postgres(dir: &Path, pool: &sqlx::PgPool) -> Result<u64, MigrateError> {
ensure_tracking_table_postgres(pool).await?;
let applied = applied_names_postgres(pool).await?;
let mut count: u64 = 0;
for plugin in plugin_order() {
let plugin_dir = dir.join(&plugin);
let paths = list_migration_files(&plugin_dir)?;
let first = paths.first();
let first = match first {
Some(p) => p,
None => continue,
};
let file = read_migration_file(first)?;
if applied.contains(&(file.plugin.clone(), file.id.clone())) {
continue;
}
let tables_to_create: Vec<&str> = file
.operations
.iter()
.filter_map(|op| match op {
Operation::CreateTable { table, .. } => Some(table.as_str()),
_ => None,
})
.collect();
if tables_to_create.is_empty() {
continue;
}
let mut all_present = true;
for table in &tables_to_create {
let exists: Option<(String,)> = sqlx::query_as(
"SELECT table_name FROM information_schema.tables \
WHERE table_schema = 'public' AND table_name = $1",
)
.bind(*table)
.fetch_optional(pool)
.await?;
if exists.is_none() {
all_present = false;
break;
}
}
if all_present {
let snapshot_hash = file.snapshot_after.hash();
let applied_at = chrono::Utc::now().to_rfc3339();
sqlx::query(
"INSERT INTO umbral_migrations \
(plugin, name, applied_at, snapshot_hash) VALUES ($1, $2, $3, $4) \
ON CONFLICT (plugin, name) DO NOTHING",
)
.bind(&file.plugin)
.bind(&file.id)
.bind(&applied_at)
.bind(&snapshot_hash)
.execute(pool)
.await?;
count += 1;
}
}
Ok(count)
}
pub async fn show() -> Result<u64, MigrateError> {
show_in(Path::new(MIGRATIONS_DIR)).await
}
pub async fn show_in(dir: &Path) -> Result<u64, MigrateError> {
let applied = match crate::db::pool_dispatched() {
crate::db::DbPool::Sqlite(pool) => {
ensure_tracking_table_sqlite(pool).await?;
applied_names_sqlite(pool).await?
}
crate::db::DbPool::Postgres(pool) => {
ensure_tracking_table_postgres(pool).await?;
applied_names_postgres(pool).await?
}
};
let report = detect_all_drift(&applied, dir)?;
let mut by_plugin: std::collections::BTreeMap<&str, Vec<&MigrationEntry>> =
std::collections::BTreeMap::new();
for entry in &report.entries {
by_plugin
.entry(entry.plugin.as_str())
.or_default()
.push(entry);
}
let mut pending: u64 = 0;
for (plugin, entries) in &by_plugin {
if entries.is_empty() {
continue;
}
println!("# plugin: {plugin}");
for entry in entries {
let marker = match entry.status {
MigrationStatus::Applied => "[X]",
MigrationStatus::Pending => {
pending += 1;
"[ ]"
}
MigrationStatus::AppliedButMissing => "[!]",
MigrationStatus::OutOfOrder => "[?]",
};
println!("{marker} {}/{}", entry.plugin, entry.name);
}
}
Ok(pending)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OpSafety {
Safe,
Warning(String),
Unsafe(String),
}
impl OpSafety {
pub fn reason(&self) -> &str {
match self {
OpSafety::Safe => "",
OpSafety::Warning(r) | OpSafety::Unsafe(r) => r,
}
}
pub fn is_unsafe(&self) -> bool {
matches!(self, OpSafety::Unsafe(_))
}
pub fn is_warning(&self) -> bool {
matches!(self, OpSafety::Warning(_))
}
}
#[derive(Debug, Clone)]
pub struct ClassifiedOp {
pub plugin: String,
pub migration: String,
pub op: Operation,
pub safety: OpSafety,
}
pub fn classify_operation(op: &Operation) -> OpSafety {
match op {
Operation::CreateTable { .. } | Operation::CreateM2MTable { .. } => OpSafety::Safe,
Operation::AddColumn { table, column } => {
if !column.nullable && column.default.is_empty() {
OpSafety::Warning(format!(
"adds NOT NULL column `{}.{}` with no default — old code inserting without it will fail. Add it nullable (or with a default), backfill, then tighten",
table, column.name
))
} else {
OpSafety::Safe
}
}
Operation::DropTable { table } => OpSafety::Unsafe(format!(
"drops table `{table}` and every row in it — irreversible, and old code still reading it breaks. Stop using it, deploy, then drop in a later migration"
)),
Operation::DropM2MTable { junction_table } => OpSafety::Unsafe(format!(
"drops join table `{junction_table}` and every row in it — irreversible"
)),
Operation::DropColumn { table, column } => OpSafety::Unsafe(format!(
"drops column `{table}.{column}` and its data — old code reading it breaks. Expand-contract: stop writing it, deploy, then drop"
)),
Operation::RenameTable { from, to } => OpSafety::Warning(format!(
"renames table `{from}` → `{to}` — not atomic with a code deploy; old code references `{from}`. Expand-contract: add `{to}`, dual-write, switch, then drop `{from}`"
)),
Operation::RenameColumn {
table, from, to, ..
} => OpSafety::Warning(format!(
"renames column `{table}.{from}` → `{to}` — old code references `{from}`. Expand-contract: add `{to}`, backfill, switch reads, then drop `{from}`"
)),
Operation::AlterColumn { table, column, .. } => OpSafety::Warning(format!(
"alters column `{table}.{column}` — a type change rewrites the column (locks the table on large data) and a NOT NULL tightening fails on existing NULLs; verify against production data first"
)),
Operation::RunSql { .. } => OpSafety::Warning(
"runs a hand-authored data migration (raw SQL) — review its row impact, ensure it's idempotent or guarded, and verify it against production data first".to_string(),
),
}
}
pub async fn check_pending_safety() -> Result<Vec<ClassifiedOp>, MigrateError> {
check_pending_safety_in(Path::new(MIGRATIONS_DIR)).await
}
pub async fn check_pending_safety_in(dir: &Path) -> Result<Vec<ClassifiedOp>, MigrateError> {
let applied = match crate::db::pool_dispatched() {
crate::db::DbPool::Sqlite(pool) => {
ensure_tracking_table_sqlite(pool).await?;
applied_names_sqlite(pool).await?
}
crate::db::DbPool::Postgres(pool) => {
ensure_tracking_table_postgres(pool).await?;
applied_names_postgres(pool).await?
}
};
let report = detect_all_drift(&applied, dir)?;
let mut out: Vec<ClassifiedOp> = Vec::new();
for entry in &report.entries {
if entry.status != MigrationStatus::Pending {
continue;
}
let path = dir.join(&entry.plugin).join(format!("{}.json", entry.name));
let file = read_migration_file(&path)?;
for op in &file.operations {
out.push(ClassifiedOp {
plugin: entry.plugin.clone(),
migration: entry.name.clone(),
op: op.clone(),
safety: classify_operation(op),
});
}
}
Ok(out)
}
fn list_migration_files(plugin_dir: &Path) -> Result<Vec<PathBuf>, MigrateError> {
if !plugin_dir.exists() {
return Ok(Vec::new());
}
let mut paths: Vec<PathBuf> = Vec::new();
for entry in std::fs::read_dir(plugin_dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("json") {
paths.push(path);
}
}
paths.sort();
Ok(paths)
}
fn read_migration_file(path: &Path) -> Result<MigrationFile, MigrateError> {
let text = std::fs::read_to_string(path)?;
let file: MigrationFile = serde_json::from_str(&text)?;
Ok(file)
}
pub fn diff(previous: &Snapshot, current: &Snapshot) -> Result<Vec<Operation>, MigrateError> {
use std::collections::{BTreeMap, HashSet};
let prev_by_name: BTreeMap<&str, &ModelMeta> = previous
.models
.iter()
.map(|m| (m.name.as_str(), m))
.collect();
let curr_by_name: BTreeMap<&str, &ModelMeta> = current
.models
.iter()
.map(|m| (m.name.as_str(), m))
.collect();
let mut ops: Vec<Operation> = Vec::new();
let mut drop_candidates: Vec<&ModelMeta> = Vec::new(); let mut create_candidates: Vec<&ModelMeta> = Vec::new();
for (name, curr) in &curr_by_name {
match prev_by_name.get(name) {
None => {
create_candidates.push(curr);
}
Some(prev) if prev.table != curr.table => {
println!(
"umbral makemigrations: rename detected (struct-name match): \
table `{}` → `{}`",
prev.table, curr.table
);
ops.push(Operation::RenameTable {
from: prev.table.clone(),
to: curr.table.clone(),
});
let col_ops = diff_columns(name, prev, curr)?;
ops.extend(col_ops);
}
Some(prev) if prev == curr => {}
Some(prev) => {
ops.extend(diff_columns(name, prev, curr)?);
}
}
}
for (name, prev) in &prev_by_name {
if !curr_by_name.contains_key(name) {
drop_candidates.push(prev);
}
}
let mut paired_drop_tables: HashSet<&str> = HashSet::new();
let mut paired_create_tables: HashSet<&str> = HashSet::new();
for create in &create_candidates {
let create_shape = column_shape(&create.fields);
for drop in &drop_candidates {
if paired_drop_tables.contains(drop.table.as_str()) {
continue;
}
let drop_shape = column_shape(&drop.fields);
if create_shape == drop_shape {
eprintln!(
"umbral makemigrations: rename detected (column-shape match): \
`{}` → `{}` — please verify this is a rename and not a coincidental \
column-shape match between two unrelated models",
drop.table, create.table
);
ops.push(Operation::RenameTable {
from: drop.table.clone(),
to: create.table.clone(),
});
paired_drop_tables.insert(drop.table.as_str());
paired_create_tables.insert(create.table.as_str());
break;
}
}
}
let creates: Vec<&&ModelMeta> = create_candidates
.iter()
.filter(|c| !paired_create_tables.contains(c.table.as_str()))
.collect();
let batch_tables: HashSet<&str> = creates.iter().map(|c| c.table.as_str()).collect();
let mut deps: BTreeMap<&str, HashSet<&str>> = BTreeMap::new();
for create in &creates {
let mut in_batch: HashSet<&str> = HashSet::new();
for col in &create.fields {
if let Some(target) = col.fk_target.as_deref()
&& target != create.table.as_str()
&& batch_tables.contains(target)
{
in_batch.insert(target);
}
}
deps.insert(create.table.as_str(), in_batch);
}
let mut ordered: Vec<&&ModelMeta> = Vec::with_capacity(creates.len());
while !deps.is_empty() {
let ready: Vec<&str> = deps
.iter()
.filter(|(_, d)| d.is_empty())
.map(|(t, _)| *t)
.collect();
if ready.is_empty() {
for create in &creates {
if deps.contains_key(create.table.as_str()) {
ordered.push(create);
}
}
break;
}
for t in &ready {
if let Some(create) = creates.iter().find(|c| c.table.as_str() == *t) {
ordered.push(create);
}
deps.remove(t);
}
for (_, set) in deps.iter_mut() {
for t in &ready {
set.remove(t);
}
}
}
for create in ordered {
ops.push(Operation::CreateTable {
table: create.table.clone(),
columns: create.fields.clone(),
unique_together: create.unique_together.clone(),
indexes: create.indexes.clone(),
});
}
for drop in &drop_candidates {
if !paired_drop_tables.contains(drop.table.as_str()) {
ops.push(Operation::DropTable {
table: drop.table.clone(),
});
}
}
let prev_m2m = collect_m2m_pairs(previous);
let curr_m2m = collect_m2m_pairs(current);
for (key, spec) in &curr_m2m {
if prev_m2m.contains_key(key) {
continue;
}
match build_create_m2m_op(spec, current) {
Ok(op) => ops.push(op),
Err(e) => return Err(e),
}
}
for (key, spec) in &prev_m2m {
if curr_m2m.contains_key(key) {
continue;
}
ops.push(Operation::DropM2MTable {
junction_table: spec.junction_table.clone(),
});
}
Ok(ops)
}
#[derive(Debug, Clone)]
struct M2MPair {
parent_table: String,
parent_pk: String,
field_name: String,
target_table: String,
junction_table: String,
}
fn collect_m2m_pairs(snap: &Snapshot) -> std::collections::BTreeMap<(String, String), M2MPair> {
let mut out = std::collections::BTreeMap::new();
for model in &snap.models {
let parent_pk = model
.fields
.iter()
.find(|c| c.primary_key)
.map(|c| c.name.clone())
.unwrap_or_else(|| "id".to_string());
for rel in &model.m2m_relations {
let key = (model.table.clone(), rel.field_name.clone());
out.insert(
key,
M2MPair {
parent_table: model.table.clone(),
parent_pk: parent_pk.clone(),
field_name: rel.field_name.clone(),
target_table: rel.target_table.clone(),
junction_table: format!("{}_{}", model.table, rel.field_name),
},
);
}
}
out
}
fn build_create_m2m_op(spec: &M2MPair, current: &Snapshot) -> Result<Operation, MigrateError> {
let pk_col_and_ty = |m: &ModelMeta| -> (String, crate::orm::SqlType) {
let pk = m.fields.iter().find(|c| c.primary_key);
(
pk.map(|c| c.name.clone()).unwrap_or_else(|| "id".to_string()),
pk.map(|c| c.ty).unwrap_or(crate::orm::SqlType::BigInt),
)
};
let (child_pk_col, child_ty) = current
.models
.iter()
.find(|m| m.table == spec.target_table)
.map(|m| pk_col_and_ty(m))
.or_else(|| {
REGISTRY.get().and_then(|reg| {
reg.iter()
.find(|(_, m)| m.table == spec.target_table)
.map(|(_, m)| pk_col_and_ty(m))
})
})
.ok_or_else(|| {
MigrateError::UnsupportedChange(format!(
"M2M `{}.{}` targets table `{}` which is not registered \
anywhere — register the target model via \
`AppBuilder::model::<{}>()` or its owning plugin.",
spec.parent_table, spec.field_name, spec.target_table, spec.target_table,
))
})?;
let parent_model = current
.models
.iter()
.find(|m| m.table == spec.parent_table)
.expect("parent model exists in snapshot — collect_m2m_pairs iterated it");
let parent_ty = parent_model
.fields
.iter()
.find(|c| c.primary_key)
.map(|c| c.ty)
.unwrap_or(crate::orm::SqlType::BigInt);
Ok(Operation::CreateM2MTable {
junction_table: spec.junction_table.clone(),
parent_table: spec.parent_table.clone(),
parent_col: spec.parent_pk.clone(),
child_table: spec.target_table.clone(),
child_col: child_pk_col,
parent_ty,
child_ty,
})
}
fn column_shape(fields: &[Column]) -> Vec<(String, SqlType, bool, Option<String>)> {
let mut shape: Vec<(String, SqlType, bool, Option<String>)> = fields
.iter()
.map(|c| (c.name.clone(), c.ty, c.nullable, c.fk_target.clone()))
.collect();
shape.sort_by(|a, b| a.0.cmp(&b.0));
shape
}
fn is_safe_cast(from: SqlType, to: SqlType) -> bool {
use SqlType::*;
if from == to {
return true;
}
match (from, to) {
(
SmallInt | Integer | BigInt | Real | Double | Boolean | Date | Time | Timestamptz
| Uuid | Inet | Cidr | MacAddr | ForeignKey,
Text,
) => true,
(SmallInt, Integer | BigInt) => true,
(Integer, BigInt) => true,
(Real, Double) => true,
(ForeignKey, BigInt) => true,
(BigInt, ForeignKey) => true,
_ => false,
}
}
fn postgres_type_name(ty: SqlType) -> &'static str {
use SqlType::*;
match ty {
SmallInt => "smallint",
Integer => "integer",
BigInt | ForeignKey => "bigint",
Real => "real",
Double => "double precision",
Boolean => "boolean",
Text => "text",
Date => "date",
Time => "time",
Timestamptz => "timestamp with time zone",
Uuid => "uuid",
Json => "jsonb",
Inet => "inet",
Cidr => "cidr",
MacAddr => "macaddr",
Xml => "xml",
Ltree => "ltree",
Bit => "bit varying",
FullText => "tsvector",
Bytes => "bytea",
Decimal => "numeric(19, 4)",
Array(_) => "text[]",
}
}
fn diff_columns(
model: &str,
previous: &ModelMeta,
current: &ModelMeta,
) -> Result<Vec<Operation>, MigrateError> {
use std::collections::BTreeMap;
let prev_cols: BTreeMap<&str, &Column> = previous
.fields
.iter()
.map(|c| (c.name.as_str(), c))
.collect();
let curr_cols: BTreeMap<&str, &Column> = current
.fields
.iter()
.map(|c| (c.name.as_str(), c))
.collect();
let mut alter_columns: Vec<&str> = Vec::new();
for (name, prev_col) in &prev_cols {
if let Some(curr_col) = curr_cols.get(name) {
if prev_col.primary_key != curr_col.primary_key {
return Err(MigrateError::UnsafeAlter {
model: model.to_string(),
column: (*name).to_string(),
reason: "primary-key flips need a manual data-preserving migration".to_string(),
});
}
let type_changed = prev_col.ty != curr_col.ty;
if type_changed && !is_safe_cast(prev_col.ty, curr_col.ty) {
return Err(MigrateError::UnsafeAlter {
model: model.to_string(),
column: (*name).to_string(),
reason: format!(
"type change {prev_ty:?} -> {curr_ty:?} is not in the safe-cast whitelist — write a data-preserving migration by hand",
prev_ty = prev_col.ty,
curr_ty = curr_col.ty,
),
});
}
if prev_col.nullable && !curr_col.nullable && curr_col.default.is_empty() {
return Err(MigrateError::UnsafeAlter {
model: model.to_string(),
column: (*name).to_string(),
reason: "nullable → NOT NULL requires a default/backfill before tightening; otherwise existing NULL rows abort the migration".to_string(),
});
}
if !prev_col.unique && curr_col.unique {
return Err(MigrateError::UnsafeAlter {
model: model.to_string(),
column: (*name).to_string(),
reason: "adding UNIQUE to an existing column requires a duplicate pre-check/backfill migration; otherwise existing duplicate values abort the migration".to_string(),
});
}
if type_changed
|| prev_col.nullable != curr_col.nullable
|| prev_col.fk_target != curr_col.fk_target
|| prev_col.unique != curr_col.unique
|| prev_col.default != curr_col.default
|| prev_col.choices != curr_col.choices
|| prev_col.choice_labels != curr_col.choice_labels
|| prev_col.on_delete != curr_col.on_delete
|| prev_col.on_update != curr_col.on_update
|| prev_col.index != curr_col.index
{
alter_columns.push(*name);
}
}
}
let mut ops: Vec<Operation> = Vec::new();
let new_columns: Vec<Column> = current.fields.clone();
let prev_columns_snapshot: Vec<Column> = previous.fields.clone();
for name in alter_columns {
ops.push(Operation::AlterColumn {
table: current.table.clone(),
column: name.to_string(),
new_columns: new_columns.clone(),
prev_columns: Some(prev_columns_snapshot.clone()),
});
}
let mut dropped: Vec<&Column> = Vec::new();
let mut added: Vec<&Column> = Vec::new();
for (name, prev_col) in &prev_cols {
if !curr_cols.contains_key(name) {
dropped.push(prev_col);
}
}
for col in ¤t.fields {
if !prev_cols.contains_key(col.name.as_str()) {
added.push(col);
}
}
let mut paired_drop: Option<&str> = None;
let mut paired_add: Option<&str> = None;
if dropped.len() == 1 && added.len() == 1 {
let d = dropped[0];
let a = added[0];
if column_shape_matches(d, a) {
eprintln!(
"umbral makemigrations: column rename detected on `{}`: \
`{}` → `{}` — verify this is a rename and not a coincidental \
shape match; edit the migration file if it's wrong",
current.table, d.name, a.name,
);
ops.push(Operation::RenameColumn {
table: current.table.clone(),
from: d.name.clone(),
to: a.name.clone(),
column: Some(a.clone()),
});
paired_drop = Some(d.name.as_str());
paired_add = Some(a.name.as_str());
}
}
for col in &dropped {
if Some(col.name.as_str()) == paired_drop {
continue;
}
ops.push(Operation::DropColumn {
table: current.table.clone(),
column: col.name.clone(),
});
}
for col in &added {
if Some(col.name.as_str()) == paired_add {
continue;
}
if !col.nullable
&& col.default.is_empty()
&& !col.auto_now_add
&& !col.auto_now
&& !col.primary_key
{
return Err(MigrateError::UnsafeAlter {
model: model.to_string(),
column: col.name.clone(),
reason: format!(
"adding NOT NULL column `{}` without a default to existing \
table `{}` would fail on every populated row. Pick one: \
(a) make the field `Option<T>`, (b) add `#[umbral(default = \
\"...\")]` so the migration backfills, or (c) add \
`#[umbral(auto_now_add)]` for timestamp columns",
col.name, current.table,
),
});
}
ops.push(Operation::AddColumn {
table: current.table.clone(),
column: (*col).clone(),
});
}
Ok(ops)
}
fn column_shape_matches(a: &Column, b: &Column) -> bool {
a.ty == b.ty
&& a.primary_key == b.primary_key
&& a.nullable == b.nullable
&& a.fk_target == b.fk_target
&& a.choices == b.choices
&& a.choice_labels == b.choice_labels
&& a.default == b.default
&& a.is_multichoice == b.is_multichoice
&& a.unique == b.unique
&& a.on_delete == b.on_delete
&& a.on_update == b.on_update
&& a.index == b.index
&& a.auto_now_add == b.auto_now_add
&& a.auto_now == b.auto_now
&& a.min == b.min
&& a.max == b.max
&& a.text_format == b.text_format
}
fn suffix_for(ops: &[Operation]) -> String {
match ops {
[Operation::CreateTable { table, .. }] => format!("create_{table}"),
[Operation::DropTable { table }] => format!("drop_{table}"),
[Operation::AddColumn { table, column }] => format!("add_{}_{}", table, column.name),
[Operation::DropColumn { table, column }] => format!("drop_{table}_{column}"),
[Operation::AlterColumn { table, column, .. }] => format!("alter_{table}_{column}"),
[Operation::RenameTable { from, to }] => format!("rename_{from}_to_{to}"),
[
Operation::RenameColumn {
table, from, to, ..
},
] => format!("rename_{table}_{from}_to_{to}"),
[Operation::RunSql { .. }] => "run_sql".to_string(),
_ => "auto".to_string(),
}
}
async fn ensure_tracking_table_sqlite(pool: &sqlx::SqlitePool) -> Result<(), MigrateError> {
sqlx::query(
"CREATE TABLE IF NOT EXISTS umbral_migrations (
plugin TEXT NOT NULL,
name TEXT NOT NULL,
applied_at TEXT NOT NULL,
snapshot_hash TEXT NOT NULL,
PRIMARY KEY (plugin, name)
)",
)
.execute(pool)
.await?;
Ok(())
}
async fn ensure_tracking_table_postgres(pool: &sqlx::PgPool) -> Result<(), MigrateError> {
sqlx::query(
"CREATE TABLE IF NOT EXISTS umbral_migrations (
plugin TEXT NOT NULL,
name TEXT NOT NULL,
applied_at TEXT NOT NULL,
snapshot_hash TEXT NOT NULL,
PRIMARY KEY (plugin, name)
)",
)
.execute(pool)
.await?;
Ok(())
}
async fn applied_names_sqlite(
pool: &sqlx::SqlitePool,
) -> Result<std::collections::HashSet<(String, String)>, MigrateError> {
let rows: Vec<(String, String)> = sqlx::query_as("SELECT plugin, name FROM umbral_migrations")
.fetch_all(pool)
.await?;
Ok(rows.into_iter().collect())
}
async fn applied_names_postgres(
pool: &sqlx::PgPool,
) -> Result<std::collections::HashSet<(String, String)>, MigrateError> {
let rows: Vec<(String, String)> = sqlx::query_as("SELECT plugin, name FROM umbral_migrations")
.fetch_all(pool)
.await?;
Ok(rows.into_iter().collect())
}
fn render_operation(op: &Operation) -> Vec<String> {
render_operation_for(op, crate::backend::active().name())
}
fn should_emit_btree_index(col: &Column) -> bool {
!col.primary_key
&& !col.unique
&& (col.index || matches!(col.ty, SqlType::ForeignKey) || col.name == "deleted_at")
}
pub fn render_operation_for(op: &Operation, backend_name: &str) -> Vec<String> {
match backend_name {
"sqlite" => render_operation_sqlite(op),
"postgres" => render_operation_postgres(op),
other => panic!(
"umbral::migrate: no DDL renderer for backend `{other}`; \
Phase 2 ships sqlite and postgres only"
),
}
}
fn render_operation_sqlite(op: &Operation) -> Vec<String> {
use sea_query::{Alias, SqliteQueryBuilder, Table};
match op {
Operation::CreateTable {
table,
columns,
unique_together,
indexes,
} => {
let mut stmt = Table::create();
stmt.table(Alias::new(table));
for col in columns {
let mut def = build_column_def_sqlite(col);
stmt.col(&mut def);
}
for group in unique_together {
let mut idx = sea_query::Index::create().unique().to_owned();
for col in group {
idx.col(Alias::new(col));
}
stmt.index(&mut idx);
}
let mut stmts = vec![stmt.build(SqliteQueryBuilder)];
for col in columns {
if should_emit_btree_index(col) {
stmts.push(create_index_stmt(table, &col.name));
}
}
for group in indexes {
stmts.push(create_multi_index_stmt(table, group));
}
stmts
}
Operation::DropTable { table } => vec![
Table::drop()
.table(Alias::new(table))
.build(SqliteQueryBuilder),
],
Operation::AddColumn { table, column } => {
let needs_backfill = (column.auto_now || column.auto_now_add)
&& !column.nullable
&& matches!(
column.ty,
SqlType::Timestamptz | SqlType::Date | SqlType::Time
);
let mut stmts = if needs_backfill {
let mut nullable_col = column.clone();
nullable_col.nullable = true;
let mut stmt = Table::alter();
stmt.table(Alias::new(table));
let mut def = build_column_def_sqlite(&nullable_col);
stmt.add_column(&mut def);
let add_sql = stmt.build(SqliteQueryBuilder);
let table_quoted = table.replace('"', "\"\"");
let col_quoted = column.name.replace('"', "\"\"");
let backfill_sql = format!(
"UPDATE \"{table_quoted}\" SET \"{col_quoted}\" = datetime('now') \
WHERE \"{col_quoted}\" IS NULL"
);
vec![add_sql, backfill_sql]
} else {
let mut stmt = Table::alter();
stmt.table(Alias::new(table));
let mut def = build_column_def_sqlite(column);
stmt.add_column(&mut def);
vec![stmt.build(SqliteQueryBuilder)]
};
if should_emit_btree_index(column) {
stmts.push(create_index_stmt(table, &column.name));
}
stmts
}
Operation::DropColumn { table, column } => vec![
Table::alter()
.table(Alias::new(table))
.drop_column(Alias::new(column))
.build(SqliteQueryBuilder),
],
Operation::AlterColumn {
table,
column: _,
new_columns,
prev_columns: _,
} => render_alter_column_dance_sqlite(table, new_columns),
Operation::CreateM2MTable {
junction_table,
parent_table,
parent_col,
child_table,
child_col,
parent_ty,
child_ty,
} => {
vec![format!(
r#"CREATE TABLE "{jt}" (
"parent_id" {pty} NOT NULL REFERENCES "{pt}"("{pc}") ON DELETE CASCADE,
"child_id" {cty} NOT NULL REFERENCES "{ct}"("{cc}") ON DELETE CASCADE,
PRIMARY KEY ("parent_id", "child_id")
)"#,
jt = junction_table,
pt = parent_table,
pc = parent_col,
ct = child_table,
cc = child_col,
pty = m2m_pk_sql_type_sqlite(*parent_ty),
cty = m2m_pk_sql_type_sqlite(*child_ty),
)]
}
Operation::DropM2MTable { junction_table } => vec![
Table::drop()
.table(Alias::new(junction_table))
.build(SqliteQueryBuilder),
],
Operation::RenameTable { from, to } => {
use sea_query::{Alias, SqliteQueryBuilder, Table};
vec![
Table::rename()
.table(Alias::new(from.as_str()), Alias::new(to.as_str()))
.build(SqliteQueryBuilder),
]
}
Operation::RenameColumn {
table, from, to, ..
} => {
let t = table.replace('"', "\"\"");
let f = from.replace('"', "\"\"");
let tn = to.replace('"', "\"\"");
vec![format!(
"ALTER TABLE \"{t}\" RENAME COLUMN \"{f}\" TO \"{tn}\""
)]
}
Operation::RunSql { sql, .. } => vec![sql.clone()],
}
}
fn render_operation_postgres(op: &Operation) -> Vec<String> {
use sea_query::{Alias, PostgresQueryBuilder, Table};
match op {
Operation::CreateTable {
table,
columns,
unique_together,
indexes,
} => {
let mut stmt = Table::create();
stmt.table(Alias::new(table));
for col in columns {
let mut def = build_column_def_postgres(col);
stmt.col(&mut def);
}
for group in unique_together {
let mut idx = sea_query::Index::create().unique().to_owned();
for col in group {
idx.col(Alias::new(col));
}
stmt.index(&mut idx);
}
let mut stmts = vec![stmt.build(PostgresQueryBuilder)];
for col in columns {
if matches!(col.ty, crate::orm::SqlType::FullText) {
stmts.push(create_gin_index_stmt(table, &col.name));
} else if should_emit_btree_index(col) {
stmts.push(create_index_stmt(table, &col.name));
}
}
for group in indexes {
stmts.push(create_multi_index_stmt(table, group));
}
stmts
}
Operation::DropTable { table } => vec![
Table::drop()
.table(Alias::new(table))
.build(PostgresQueryBuilder),
],
Operation::AddColumn { table, column } => {
let mut stmt = Table::alter();
stmt.table(Alias::new(table));
let mut def = build_column_def_postgres(column);
stmt.add_column(&mut def);
let mut stmts = vec![stmt.build(PostgresQueryBuilder)];
if matches!(column.ty, crate::orm::SqlType::FullText) {
stmts.push(create_gin_index_stmt(table, &column.name));
} else if should_emit_btree_index(column) {
stmts.push(create_index_stmt(table, &column.name));
}
stmts
}
Operation::DropColumn { table, column } => vec![
Table::alter()
.table(Alias::new(table))
.drop_column(Alias::new(column))
.build(PostgresQueryBuilder),
],
Operation::AlterColumn {
table,
column,
new_columns,
prev_columns,
} => render_alter_column_postgres(table, column, new_columns, prev_columns.as_deref()),
Operation::CreateM2MTable {
junction_table,
parent_table,
parent_col,
child_table,
child_col,
parent_ty,
child_ty,
} => {
vec![format!(
r#"CREATE TABLE "{jt}" (
"parent_id" {pty} NOT NULL REFERENCES "{pt}"("{pc}") ON DELETE CASCADE,
"child_id" {cty} NOT NULL REFERENCES "{ct}"("{cc}") ON DELETE CASCADE,
PRIMARY KEY ("parent_id", "child_id")
)"#,
jt = junction_table,
pt = parent_table,
pc = parent_col,
ct = child_table,
cc = child_col,
pty = m2m_pk_sql_type_postgres(*parent_ty),
cty = m2m_pk_sql_type_postgres(*child_ty),
)]
}
Operation::DropM2MTable { junction_table } => vec![
Table::drop()
.table(Alias::new(junction_table))
.build(PostgresQueryBuilder),
],
Operation::RenameTable { from, to } => {
use sea_query::{Alias, PostgresQueryBuilder, Table};
vec![
Table::rename()
.table(Alias::new(from.as_str()), Alias::new(to.as_str()))
.build(PostgresQueryBuilder),
]
}
Operation::RenameColumn {
table, from, to, ..
} => {
let t = table.replace('"', "\"\"");
let f = from.replace('"', "\"\"");
let tn = to.replace('"', "\"\"");
vec![format!(
"ALTER TABLE \"{t}\" RENAME COLUMN \"{f}\" TO \"{tn}\""
)]
}
Operation::RunSql { sql, .. } => vec![sql.clone()],
}
}
fn render_alter_column_dance_sqlite(table: &str, new_columns: &[Column]) -> Vec<String> {
use sea_query::{Alias, SqliteQueryBuilder, Table};
let tmp = format!("_umbral_new_{table}");
let mut create = Table::create();
create.table(Alias::new(&tmp));
for col in new_columns {
let mut def = build_column_def_sqlite(col);
create.col(&mut def);
}
let column_list = new_columns
.iter()
.map(|c| format!("\"{}\"", c.name.replace('"', "\"\"")))
.collect::<Vec<_>>()
.join(", ");
let insert_sql =
format!("INSERT INTO \"{tmp}\" ({column_list}) SELECT {column_list} FROM \"{table}\"");
let drop_sql = Table::drop()
.table(Alias::new(table))
.build(SqliteQueryBuilder);
let rename_sql = Table::rename()
.table(Alias::new(&tmp), Alias::new(table))
.build(SqliteQueryBuilder);
vec![
create.build(SqliteQueryBuilder),
insert_sql,
drop_sql,
rename_sql,
]
}
fn render_alter_column_postgres(
table: &str,
column: &str,
new_columns: &[Column],
prev_columns: Option<&[Column]>,
) -> Vec<String> {
let new = new_columns.iter().find(|c| c.name == column).expect(
"umbral::migrate: AlterColumn op references a column missing from new_columns; \
this is a bug in `diff_columns`",
);
let prev = prev_columns.and_then(|cols| cols.iter().find(|c| c.name == column));
let q_table = quote_pg_ident(table);
let q_column = quote_pg_ident(column);
let mut stmts: Vec<String> = Vec::new();
if let Some(prev_col) = prev {
if prev_col.ty != new.ty && is_safe_cast(prev_col.ty, new.ty) {
let new_ty_sql = postgres_type_name(new.ty);
stmts.push(format!(
"ALTER TABLE {q_table} ALTER COLUMN {q_column} TYPE {new_ty_sql} USING {q_column}::{new_ty_sql}"
));
}
}
let nullable_changed = match prev {
Some(prev_col) => prev_col.nullable != new.nullable,
None => true,
};
if nullable_changed {
let clause = if new.nullable {
"DROP NOT NULL"
} else {
"SET NOT NULL"
};
stmts.push(format!(
"ALTER TABLE {q_table} ALTER COLUMN {q_column} {clause}"
));
}
if let Some(prev_col) = prev {
if prev_col.unique != new.unique {
let cname = format!("{table}_{column}_key");
if new.unique {
stmts.push(format!(
"ALTER TABLE {q_table} ADD CONSTRAINT \"{cname}\" UNIQUE ({q_column})"
));
} else {
stmts.push(format!(
"ALTER TABLE {q_table} DROP CONSTRAINT IF EXISTS \"{cname}\""
));
}
}
if prev_col.default != new.default {
if new.default.is_empty() {
stmts.push(format!(
"ALTER TABLE {q_table} ALTER COLUMN {q_column} DROP DEFAULT"
));
} else {
let escaped = new.default.replace('\'', "''");
stmts.push(format!(
"ALTER TABLE {q_table} ALTER COLUMN {q_column} SET DEFAULT '{escaped}'"
));
}
}
let fk_changed = prev_col.fk_target != new.fk_target
|| prev_col.on_delete != new.on_delete
|| prev_col.on_update != new.on_update;
if fk_changed && matches!(new.ty, SqlType::ForeignKey) {
let cname = format!("{table}_{column}_fkey");
stmts.push(format!(
"ALTER TABLE {q_table} DROP CONSTRAINT IF EXISTS \"{cname}\""
));
if let Some(target) = &new.fk_target
&& new.db_constraint
{
let q_target = quote_pg_ident(target);
let on_delete_clause = new
.on_delete
.sql_keyword()
.map(|k| format!(" ON DELETE {k}"))
.unwrap_or_default();
let on_update_clause = new
.on_update
.sql_keyword()
.map(|k| format!(" ON UPDATE {k}"))
.unwrap_or_default();
stmts.push(format!(
"ALTER TABLE {q_table} ADD CONSTRAINT \"{cname}\" \
FOREIGN KEY ({q_column}) REFERENCES {q_target}(\"id\")\
{on_delete_clause}{on_update_clause}"
));
}
}
if prev_col.choices != new.choices && !new.is_multichoice {
let cname = format!("{table}_{column}_check");
stmts.push(format!(
"ALTER TABLE {q_table} DROP CONSTRAINT IF EXISTS \"{cname}\""
));
if !new.choices.is_empty() {
let values_sql = new
.choices
.iter()
.map(|v| format!("'{}'", v.replace('\'', "''")))
.collect::<Vec<_>>()
.join(", ");
stmts.push(format!(
"ALTER TABLE {q_table} ADD CONSTRAINT \"{cname}\" \
CHECK ({q_column} IN ({values_sql}))"
));
}
}
}
if stmts.is_empty() {
let clause = if new.nullable {
"DROP NOT NULL"
} else {
"SET NOT NULL"
};
stmts.push(format!(
"ALTER TABLE {q_table} ALTER COLUMN {q_column} {clause}"
));
}
stmts
}
fn quote_pg_ident(ident: &str) -> String {
format!("\"{}\"", ident.replace('"', "\"\""))
}
fn fk_target_pk(fk_target_table: &str) -> (String, sea_query::ColumnType) {
use sea_query::ColumnType;
let unesc = fk_target_table.replace("\"\"", "\"");
let Some(metas) = REGISTRY.get() else {
return ("id".to_string(), ColumnType::BigInteger);
};
for meta in metas.iter().map(|(_, m)| m) {
if meta.table != unesc {
continue;
}
if let Some(pk) = meta.fields.iter().find(|c| c.primary_key) {
let ct = match pk.ty {
SqlType::BigInt | SqlType::Integer => ColumnType::BigInteger,
SqlType::SmallInt => ColumnType::SmallInteger,
SqlType::Text => ColumnType::Text,
SqlType::Uuid => ColumnType::Uuid,
_ => ColumnType::BigInteger,
};
return (pk.name.clone(), ct);
}
}
("id".to_string(), ColumnType::BigInteger)
}
fn build_column_def_sqlite(col: &Column) -> sea_query::ColumnDef {
use sea_query::{Alias, ColumnDef, ColumnType};
if matches!(col.ty, SqlType::ForeignKey) {
let fk_target = col
.fk_target
.as_deref()
.unwrap_or("_unknown_")
.replace('"', "\"\"");
let (pk_col_name, pk_col_type) = fk_target_pk(&fk_target);
let mut def = ColumnDef::new_with_type(Alias::new(&col.name), pk_col_type);
if !col.nullable {
def.not_null();
}
if col.unique {
def.unique_key();
}
if col.db_constraint {
def.extra(format!(
"REFERENCES \"{fk_target}\"(\"{pk_col_name}\"){}",
fk_action_suffix(col),
));
}
return def;
}
let is_int_pk = col.primary_key && matches!(col.ty, SqlType::Integer | SqlType::BigInt);
let column_type = if is_int_pk {
ColumnType::Integer
} else {
crate::backend::SqliteBackend.map_column(col)
};
let mut def = ColumnDef::new_with_type(Alias::new(&col.name), column_type);
if !col.nullable {
def.not_null();
}
if col.primary_key {
def.primary_key();
if is_int_pk {
def.auto_increment();
}
}
if col.unique && !col.primary_key {
def.unique_key();
}
if let Some(check) = check_min_max_sql(col) {
def.extra(check);
}
if !col.default.is_empty() {
if matches!(col.ty, SqlType::Boolean) {
def.default(sqlite_bool_default(&col.default));
} else {
def.default(col.default.clone());
}
}
def
}
fn sqlite_bool_default(raw: &str) -> i32 {
match raw.trim().to_ascii_lowercase().as_str() {
"true" | "1" | "t" | "yes" => 1,
_ => 0,
}
}
fn check_min_max_sql(col: &Column) -> Option<String> {
if col.min.is_none() && col.max.is_none() {
return None;
}
if !matches!(
col.ty,
SqlType::SmallInt | SqlType::Integer | SqlType::BigInt | SqlType::Real | SqlType::Double
) {
return None;
}
let name = col.name.replace('"', "\"\"");
let mut parts = Vec::with_capacity(2);
if let Some(n) = col.min {
parts.push(format!("\"{name}\" >= {n}"));
}
if let Some(n) = col.max {
parts.push(format!("\"{name}\" <= {n}"));
}
Some(format!("CHECK ({})", parts.join(" AND ")))
}
fn build_column_def_postgres(col: &Column) -> sea_query::ColumnDef {
use sea_query::{Alias, ColumnDef};
if matches!(col.ty, SqlType::ForeignKey) {
let fk_target = col
.fk_target
.as_deref()
.unwrap_or("_unknown_")
.replace('"', "\"\"");
let (pk_col_name, pk_col_type) = fk_target_pk(&fk_target);
let mut def = ColumnDef::new_with_type(Alias::new(&col.name), pk_col_type);
if !col.nullable {
def.not_null();
}
if col.unique {
def.unique_key();
}
if col.db_constraint {
def.extra(format!(
"REFERENCES \"{fk_target}\"(\"{pk_col_name}\"){}",
fk_action_suffix(col),
));
}
return def;
}
let column_type = crate::backend::PostgresBackend.map_column(col);
let mut def = ColumnDef::new_with_type(Alias::new(&col.name), column_type);
if !col.nullable {
def.not_null();
}
if col.primary_key {
def.primary_key();
if matches!(
col.ty,
SqlType::Integer | SqlType::BigInt | SqlType::SmallInt
) {
def.auto_increment();
}
}
if col.unique && !col.primary_key {
def.unique_key();
}
if let Some(check) = check_min_max_sql(col) {
def.extra(check);
}
if !col.choices.is_empty() && !col.is_multichoice {
let col_name_escaped = col.name.replace('"', "\"\"");
let values_sql = col
.choices
.iter()
.map(|v| format!("'{}'", v.replace('\'', "''")))
.collect::<Vec<_>>()
.join(", ");
def.extra(format!("CHECK (\"{col_name_escaped}\" IN ({values_sql}))"));
}
if !col.default.is_empty() {
def.default(col.default.clone());
} else if (col.auto_now || col.auto_now_add)
&& matches!(col.ty, SqlType::Timestamptz | SqlType::Date | SqlType::Time)
{
def.default(sea_query::Expr::cust("now()"));
}
def
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn plugin_order_falls_back_to_registered_plugins_when_unpublished() {
let mut per_plugin: std::collections::HashMap<String, Vec<ModelMeta>> =
std::collections::HashMap::new();
per_plugin.insert(
"zeta".to_string(),
vec![ModelMeta {
name: "ZetaModel".to_string(),
table: "zeta".to_string(),
fields: Vec::new(),
display: "ZetaModel".to_string(),
icon: "database".to_string(),
database: None,
singleton: false,
unique_together: Vec::new(),
indexes: Vec::new(),
ordering: Vec::new(),
m2m_relations: Vec::new(),
soft_delete: false,
app_label: "app".to_string(),
}],
);
per_plugin.insert(
"alpha".to_string(),
vec![ModelMeta {
name: "AlphaModel".to_string(),
table: "alpha".to_string(),
fields: Vec::new(),
display: "AlphaModel".to_string(),
icon: "database".to_string(),
database: None,
singleton: false,
unique_together: Vec::new(),
indexes: Vec::new(),
ordering: Vec::new(),
m2m_relations: Vec::new(),
soft_delete: false,
app_label: "app".to_string(),
}],
);
init_plugins(per_plugin);
let order = plugin_order();
assert_eq!(
order,
vec!["alpha".to_string(), "zeta".to_string()],
"fallback should sort by name; got {order:?}",
);
assert_eq!(
order,
registered_plugins(),
"fallback should exactly equal registered_plugins()",
);
}
#[test]
fn unique_column_emits_unique_keyword_on_both_backends() {
use sea_query::{Alias, PostgresQueryBuilder, SqliteQueryBuilder, Table};
let id = Column {
name: "id".into(),
ty: SqlType::BigInt,
primary_key: true,
nullable: false,
fk_target: None,
noform: false,
db_constraint: true,
noedit: false,
is_string_repr: false,
max_length: 0,
choices: vec![],
choice_labels: vec![],
default: String::new(),
is_multichoice: false,
unique: true,
on_delete: crate::orm::FkAction::NoAction,
on_update: crate::orm::FkAction::NoAction,
index: false,
auto_now_add: false,
auto_now: false,
help: String::new(),
example: String::new(),
widget: None,
supported_backends: Vec::new(),
min: None,
max: None,
text_format: ::core::option::Option::None,
slug_from: ::core::option::Option::None,
};
let username = Column {
name: "username".into(),
ty: SqlType::Text,
primary_key: false,
nullable: false,
fk_target: None,
noform: false,
db_constraint: true,
noedit: false,
is_string_repr: false,
max_length: 0,
choices: vec![],
choice_labels: vec![],
default: String::new(),
is_multichoice: false,
unique: true,
on_delete: crate::orm::FkAction::NoAction,
on_update: crate::orm::FkAction::NoAction,
index: false,
auto_now_add: false,
auto_now: false,
help: String::new(),
example: String::new(),
widget: None,
supported_backends: Vec::new(),
min: None,
max: None,
text_format: ::core::option::Option::None,
slug_from: ::core::option::Option::None,
};
let email = Column {
name: "email".into(),
ty: SqlType::Text,
primary_key: false,
nullable: false,
fk_target: None,
noform: false,
db_constraint: true,
noedit: false,
is_string_repr: false,
max_length: 0,
choices: vec![],
choice_labels: vec![],
default: String::new(),
is_multichoice: false,
unique: false,
on_delete: crate::orm::FkAction::NoAction,
on_update: crate::orm::FkAction::NoAction,
index: false,
auto_now_add: false,
auto_now: false,
help: String::new(),
example: String::new(),
widget: None,
supported_backends: Vec::new(),
min: None,
max: None,
text_format: ::core::option::Option::None,
slug_from: ::core::option::Option::None,
};
for backend in ["sqlite", "postgres"] {
let mut stmt = Table::create();
stmt.table(Alias::new("u"));
for col in [&id, &username, &email] {
let mut def = if backend == "sqlite" {
build_column_def_sqlite(col)
} else {
build_column_def_postgres(col)
};
stmt.col(&mut def);
}
let sql = if backend == "sqlite" {
stmt.to_string(SqliteQueryBuilder)
} else {
stmt.to_string(PostgresQueryBuilder)
};
assert!(
sql.contains("\"username\"") && sql.to_uppercase().contains("UNIQUE"),
"{backend}: expected UNIQUE on username; got: {sql}",
);
let email_clause = sql
.split("\"email\"")
.nth(1)
.unwrap_or_default()
.split(',')
.next()
.unwrap_or_default();
assert!(
!email_clause.to_uppercase().contains("UNIQUE"),
"{backend}: email should not be UNIQUE; clause: {email_clause}",
);
let id_clause = sql
.split("\"id\"")
.nth(1)
.unwrap_or_default()
.split(',')
.next()
.unwrap_or_default();
assert!(
id_clause.to_uppercase().contains("PRIMARY KEY"),
"{backend}: id should still be PRIMARY KEY; clause: {id_clause}",
);
assert!(
!id_clause.to_uppercase().contains("UNIQUE"),
"{backend}: PK column should not also carry UNIQUE; clause: {id_clause}",
);
}
}
#[test]
fn fk_action_lifts_to_references_clause_on_both_backends() {
use sea_query::{Alias, PostgresQueryBuilder, SqliteQueryBuilder, Table};
let plain_fk = Column {
name: "owner_id".into(),
ty: SqlType::ForeignKey,
primary_key: false,
nullable: false,
fk_target: Some("post".into()),
noform: false,
db_constraint: true,
noedit: false,
is_string_repr: false,
max_length: 0,
choices: vec![],
choice_labels: vec![],
default: String::new(),
is_multichoice: false,
unique: false,
on_delete: crate::orm::FkAction::NoAction,
on_update: crate::orm::FkAction::NoAction,
index: false,
auto_now_add: false,
auto_now: false,
help: String::new(),
example: String::new(),
widget: None,
supported_backends: Vec::new(),
min: None,
max: None,
text_format: ::core::option::Option::None,
slug_from: ::core::option::Option::None,
};
let cascade_fk = Column {
on_delete: crate::orm::FkAction::Cascade,
on_update: crate::orm::FkAction::Cascade,
index: false,
auto_now_add: false,
auto_now: false,
help: String::new(),
example: String::new(),
widget: None,
supported_backends: Vec::new(),
..plain_fk.clone()
};
let restrict_fk = Column {
on_delete: crate::orm::FkAction::Restrict,
..plain_fk.clone()
};
let set_null_fk = Column {
nullable: true,
on_delete: crate::orm::FkAction::SetNull,
..plain_fk.clone()
};
for backend in ["sqlite", "postgres"] {
let render_one = |col: &Column| -> String {
let mut stmt = Table::create();
stmt.table(Alias::new("t"));
let mut def = if backend == "sqlite" {
build_column_def_sqlite(col)
} else {
build_column_def_postgres(col)
};
stmt.col(&mut def);
if backend == "sqlite" {
stmt.to_string(SqliteQueryBuilder)
} else {
stmt.to_string(PostgresQueryBuilder)
}
};
let sql = render_one(&plain_fk);
assert!(
sql.contains("REFERENCES")
&& !sql.to_uppercase().contains("ON DELETE")
&& !sql.to_uppercase().contains("ON UPDATE"),
"{backend}: NoAction should emit REFERENCES alone; got: {sql}",
);
let sql = render_one(&cascade_fk);
assert!(
sql.to_uppercase().contains("ON DELETE CASCADE")
&& sql.to_uppercase().contains("ON UPDATE CASCADE"),
"{backend}: Cascade should emit both clauses; got: {sql}",
);
let sql = render_one(&restrict_fk);
assert!(
sql.to_uppercase().contains("ON DELETE RESTRICT"),
"{backend}: Restrict missing; got: {sql}",
);
assert!(
!sql.to_uppercase().contains("ON UPDATE"),
"{backend}: ON UPDATE shouldn't appear for NoAction; got: {sql}",
);
let sql = render_one(&set_null_fk);
assert!(
sql.to_uppercase().contains("ON DELETE SET NULL"),
"{backend}: SET NULL missing; got: {sql}",
);
}
}
#[test]
fn diff_detects_all_schema_meaningful_field_changes() {
fn baseline() -> Column {
Column {
name: "x".into(),
ty: SqlType::Text,
primary_key: false,
nullable: false,
fk_target: None,
noform: false,
db_constraint: true,
noedit: false,
is_string_repr: false,
max_length: 0,
choices: vec![],
choice_labels: vec![],
default: String::new(),
is_multichoice: false,
unique: false,
on_delete: crate::orm::FkAction::NoAction,
on_update: crate::orm::FkAction::NoAction,
index: false,
auto_now_add: false,
auto_now: false,
help: String::new(),
example: String::new(),
widget: None,
supported_backends: Vec::new(),
min: None,
max: None,
text_format: ::core::option::Option::None,
slug_from: ::core::option::Option::None,
}
}
fn meta_with(col: Column) -> ModelMeta {
ModelMeta {
name: "M".into(),
table: "m".into(),
fields: vec![col],
display: "M".into(),
icon: "database".into(),
database: None,
singleton: false,
unique_together: Vec::new(),
indexes: Vec::new(),
ordering: Vec::new(),
m2m_relations: Vec::new(),
soft_delete: false,
app_label: "app".into(),
}
}
let prev = meta_with(baseline());
let safe_mutations: Vec<(&str, fn(&mut Column))> = vec![
("default", |c| c.default = "hello".into()),
("choices", |c| {
c.choices = vec!["a".into(), "b".into()];
c.choice_labels = vec!["A".into(), "B".into()];
}),
("nullable", |c| c.nullable = true),
];
for (label, mutate) in safe_mutations {
let mut col = baseline();
mutate(&mut col);
let current = meta_with(col);
let ops = diff_columns("M", &prev, ¤t).expect("diff should succeed");
assert!(
!ops.is_empty(),
"{label}: diff should produce at least one op; got none",
);
assert!(
ops.iter()
.any(|op| matches!(op, Operation::AlterColumn { column, .. } if column == "x")),
"{label}: expected AlterColumn on `x`; got: {ops:?}",
);
}
let mut col = baseline();
col.unique = true;
let current = meta_with(col);
match diff_columns("M", &prev, ¤t) {
Err(MigrateError::UnsafeAlter { column, reason, .. }) => {
assert_eq!(column, "x");
assert!(
reason.contains("UNIQUE"),
"unsafe-alter reason should mention UNIQUE; got: {reason}",
);
}
other => panic!("unique add should be an UnsafeAlter guard; got: {other:?}"),
}
}
#[test]
fn postgres_alter_column_renders_constraint_changes() {
let baseline = Column {
name: "x".into(),
ty: SqlType::Text,
primary_key: false,
nullable: false,
fk_target: None,
noform: false,
db_constraint: true,
noedit: false,
is_string_repr: false,
max_length: 0,
choices: vec![],
choice_labels: vec![],
default: String::new(),
is_multichoice: false,
unique: false,
on_delete: crate::orm::FkAction::NoAction,
on_update: crate::orm::FkAction::NoAction,
index: false,
auto_now_add: false,
auto_now: false,
help: String::new(),
example: String::new(),
widget: None,
supported_backends: Vec::new(),
min: None,
max: None,
text_format: ::core::option::Option::None,
slug_from: ::core::option::Option::None,
};
let mut new = baseline.clone();
new.unique = true;
let stmts = render_alter_column_postgres("m", "x", &[new], Some(&[baseline.clone()]));
let joined = stmts.join("\n");
assert!(
joined.contains("ADD CONSTRAINT") && joined.contains("UNIQUE"),
"unique add: expected ADD CONSTRAINT UNIQUE; got: {joined}",
);
let prev_unique = Column {
unique: true,
..baseline.clone()
};
let stmts =
render_alter_column_postgres("m", "x", &[baseline.clone()], Some(&[prev_unique]));
let joined = stmts.join("\n");
assert!(
joined.contains("DROP CONSTRAINT IF EXISTS"),
"unique drop: expected DROP CONSTRAINT IF EXISTS; got: {joined}",
);
let mut new = baseline.clone();
new.default = "hello".into();
let stmts = render_alter_column_postgres("m", "x", &[new], Some(&[baseline.clone()]));
let joined = stmts.join("\n");
assert!(
joined.contains("SET DEFAULT 'hello'"),
"default set: expected SET DEFAULT; got: {joined}",
);
let prev_default = Column {
default: "hello".into(),
..baseline.clone()
};
let stmts =
render_alter_column_postgres("m", "x", &[baseline.clone()], Some(&[prev_default]));
let joined = stmts.join("\n");
assert!(
joined.contains("DROP DEFAULT"),
"default drop: expected DROP DEFAULT; got: {joined}",
);
let fk_baseline = Column {
ty: SqlType::ForeignKey,
fk_target: Some("other".into()),
..baseline.clone()
};
let fk_cascade = Column {
on_delete: crate::orm::FkAction::Cascade,
..fk_baseline.clone()
};
let stmts = render_alter_column_postgres("m", "x", &[fk_cascade], Some(&[fk_baseline]));
let joined = stmts.join("\n");
assert!(
joined.contains("DROP CONSTRAINT IF EXISTS")
&& joined.contains("FOREIGN KEY")
&& joined.contains("ON DELETE CASCADE"),
"FK cascade add: expected drop+readd with ON DELETE CASCADE; got: {joined}",
);
}
#[test]
fn sqlite_bool_default_translates_to_integer_literal() {
use sea_query::{Alias, SqliteQueryBuilder, Table};
let bool_col = Column {
name: "is_active".into(),
ty: SqlType::Boolean,
primary_key: false,
nullable: false,
fk_target: None,
noform: false,
db_constraint: true,
noedit: false,
is_string_repr: false,
max_length: 0,
choices: vec![],
choice_labels: vec![],
default: "true".into(),
is_multichoice: false,
unique: false,
on_delete: crate::orm::FkAction::NoAction,
on_update: crate::orm::FkAction::NoAction,
index: false,
auto_now_add: false,
auto_now: false,
help: String::new(),
example: String::new(),
widget: None,
supported_backends: Vec::new(),
min: None,
max: None,
text_format: ::core::option::Option::None,
slug_from: ::core::option::Option::None,
};
let mut stmt = Table::create();
stmt.table(Alias::new("t"));
let mut def = build_column_def_sqlite(&bool_col);
stmt.col(&mut def);
let sql = stmt.to_string(SqliteQueryBuilder);
assert!(
sql.contains("DEFAULT 1") && !sql.contains("DEFAULT 'true'"),
"bool default 'true' on sqlite should render as DEFAULT 1; got: {sql}",
);
let mut bool_col_false = bool_col.clone();
bool_col_false.default = "false".into();
let mut stmt = Table::create();
stmt.table(Alias::new("t"));
let mut def = build_column_def_sqlite(&bool_col_false);
stmt.col(&mut def);
let sql = stmt.to_string(SqliteQueryBuilder);
assert!(
sql.contains("DEFAULT 0") && !sql.contains("DEFAULT 'false'"),
"bool default 'false' on sqlite should render as DEFAULT 0; got: {sql}",
);
let text_col = Column {
name: "label".into(),
ty: SqlType::Text,
default: "hello".into(),
..bool_col.clone()
};
let mut stmt = Table::create();
stmt.table(Alias::new("t"));
let mut def = build_column_def_sqlite(&text_col);
stmt.col(&mut def);
let sql = stmt.to_string(SqliteQueryBuilder);
assert!(
sql.contains("DEFAULT 'hello'"),
"text default should stay quoted; got: {sql}",
);
}
#[test]
fn index_attribute_emits_create_index_alongside_create_table() {
let id = Column {
name: "id".into(),
ty: SqlType::BigInt,
primary_key: true,
nullable: false,
fk_target: None,
noform: false,
db_constraint: true,
noedit: false,
is_string_repr: false,
max_length: 0,
choices: vec![],
choice_labels: vec![],
default: String::new(),
is_multichoice: false,
unique: false,
on_delete: crate::orm::FkAction::NoAction,
on_update: crate::orm::FkAction::NoAction,
index: true,
auto_now_add: false,
auto_now: false,
help: String::new(),
example: String::new(),
widget: None,
supported_backends: Vec::new(),
min: None,
max: None,
text_format: ::core::option::Option::None,
slug_from: ::core::option::Option::None,
};
let slug = Column {
name: "slug".into(),
ty: SqlType::Text,
primary_key: false,
nullable: false,
index: true,
auto_now_add: false,
auto_now: false,
help: String::new(),
example: String::new(),
widget: None,
supported_backends: Vec::new(),
..id.clone()
};
let title = Column {
name: "title".into(),
ty: SqlType::Text,
primary_key: false,
nullable: false,
index: false,
auto_now_add: false,
auto_now: false,
help: String::new(),
example: String::new(),
widget: None,
supported_backends: Vec::new(),
..id.clone()
};
let op = Operation::CreateTable {
table: "post".into(),
columns: vec![id, slug, title],
unique_together: Vec::new(),
indexes: Vec::new(),
};
for backend in ["sqlite", "postgres"] {
let stmts = render_operation_for(&op, backend);
assert!(
stmts
.iter()
.any(|s| s.to_uppercase().contains("CREATE TABLE")),
"{backend}: expected a CREATE TABLE; got: {stmts:?}",
);
let index_stmts: Vec<_> = stmts
.iter()
.filter(|s| s.to_uppercase().contains("CREATE INDEX"))
.collect();
assert_eq!(
index_stmts.len(),
1,
"{backend}: expected exactly one CREATE INDEX (on `slug`); got {index_stmts:?}",
);
let ix = index_stmts[0];
assert!(
ix.contains("\"idx_post_slug\"") && ix.contains("(\"slug\")"),
"{backend}: index should target post(slug); got: {ix}",
);
assert!(
ix.to_uppercase().contains("IF NOT EXISTS"),
"{backend}: should be idempotent via IF NOT EXISTS; got: {ix}",
);
}
}
#[test]
fn auto_now_add_column_renders_safe_backfill_per_backend() {
for (label, auto_now, auto_now_add) in
[("auto_now", true, false), ("auto_now_add", false, true)]
{
let col = Column {
name: "updated_at".to_string(),
ty: SqlType::Timestamptz,
primary_key: false,
nullable: false,
fk_target: None,
noform: false,
db_constraint: true,
noedit: false,
is_string_repr: false,
max_length: 0,
choices: Vec::new(),
choice_labels: Vec::new(),
default: String::new(),
is_multichoice: false,
unique: false,
on_delete: crate::orm::FkAction::NoAction,
on_update: crate::orm::FkAction::NoAction,
index: false,
auto_now_add,
auto_now,
help: String::new(),
example: String::new(),
widget: None,
supported_backends: Vec::new(),
min: None,
max: None,
text_format: None,
slug_from: None,
};
let op = Operation::AddColumn {
table: "customer".to_string(),
column: col.clone(),
};
let stmts = render_operation_sqlite(&op);
assert_eq!(
stmts.len(),
2,
"{label} SQLite: must emit ADD + UPDATE, got: {stmts:?}",
);
let add_sql = stmts[0].to_uppercase();
assert!(
add_sql.contains("ADD COLUMN"),
"{label} SQLite: first stmt must be ADD COLUMN, got: {}",
stmts[0],
);
assert!(
!add_sql.contains("NOT NULL"),
"{label} SQLite: ADD COLUMN must be nullable (NOT NULL = SQLite reject), got: {}",
stmts[0],
);
assert!(
!add_sql.contains("DEFAULT"),
"{label} SQLite: ADD COLUMN must omit DEFAULT (non-constant = SQLite reject), got: {}",
stmts[0],
);
let backfill_sql = &stmts[1];
assert!(
backfill_sql.contains("UPDATE") && backfill_sql.contains("datetime('now')"),
"{label} SQLite: second stmt must be backfill UPDATE, got: {backfill_sql}",
);
let pstmts = render_operation_postgres(&op);
assert_eq!(
pstmts.len(),
1,
"{label} Postgres: single statement suffices, got: {pstmts:?}",
);
let p = &pstmts[0];
assert!(
p.to_lowercase().contains("default now()"),
"{label} Postgres: expected DEFAULT now() in ALTER, got: {p}",
);
assert!(
p.to_uppercase().contains("NOT NULL"),
"{label} Postgres: keeps NOT NULL (Postgres allows non-constant defaults), got: {p}",
);
}
}
}