use std::collections::HashSet;
use std::path::Path;
use crate::core::{inventory, ModelEntry, ModelSchema};
use crate::sql::sqlx::{self, PgPool, Row};
use super::diff::render_changes_split;
use super::file::{self, Migration, Operation};
use super::invert::invert;
use super::snapshot::SchemaSnapshot;
use super::{ddl, MigrateError};
pub const LEDGER_TABLE: &str = "__rustango_migrations__";
#[derive(Debug, Clone, Copy)]
pub struct Builder {
ledger: &'static str,
}
impl Default for Builder {
fn default() -> Self {
Self {
ledger: LEDGER_TABLE,
}
}
}
impl Builder {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn ledger(mut self, name: &'static str) -> Self {
validate_ledger_name(name);
self.ledger = name;
self
}
#[must_use]
pub fn ledger_name(&self) -> &'static str {
self.ledger
}
pub async fn migrate(&self, pool: &PgPool, dir: &Path) -> Result<Vec<Migration>, MigrateError> {
migrate_with_ledger(pool, dir, self.ledger).await
}
pub async fn migrate_to(
&self,
pool: &PgPool,
dir: &Path,
target: &str,
) -> Result<Vec<Migration>, MigrateError> {
migrate_to_with_ledger(pool, dir, target, self.ledger).await
}
pub async fn migrate_embedded(
&self,
pool: &PgPool,
embedded: &[(&str, &str)],
) -> Result<Vec<Migration>, MigrateError> {
migrate_embedded_with_ledger(pool, embedded, self.ledger).await
}
pub async fn migrate_dry_run(
&self,
pool: &PgPool,
dir: &Path,
) -> Result<Vec<MigrationPreview>, MigrateError> {
migrate_dry_run_with_ledger(pool, dir, self.ledger).await
}
pub async fn downgrade(
&self,
pool: &PgPool,
dir: &Path,
steps: usize,
) -> Result<Vec<Migration>, MigrateError> {
downgrade_with_ledger(pool, dir, steps, self.ledger).await
}
pub async fn unapply(
&self,
pool: &PgPool,
dir: &Path,
name: &str,
) -> Result<Migration, MigrateError> {
unapply_with_ledger(pool, dir, name, self.ledger).await
}
pub async fn unapply_force(
&self,
pool: &PgPool,
dir: &Path,
name: &str,
) -> Result<Migration, MigrateError> {
unapply_force_with_ledger(pool, dir, name, self.ledger).await
}
pub async fn applied_set(&self, pool: &PgPool) -> Result<HashSet<String>, MigrateError> {
applied_set_for(pool, self.ledger).await
}
pub async fn ensure_ledger(&self, pool: &PgPool) -> Result<(), MigrateError> {
ensure_ledger_for(pool, self.ledger).await
}
}
fn validate_ledger_name(name: &str) {
let bytes = name.as_bytes();
let valid = !bytes.is_empty()
&& bytes.len() <= 63
&& (bytes[0].is_ascii_alphabetic() || bytes[0] == b'_')
&& bytes
.iter()
.all(|b| b.is_ascii_alphanumeric() || *b == b'_');
assert!(
valid,
"Builder::ledger({name:?}) is not a valid SQL identifier — \
must match [A-Za-z_][A-Za-z0-9_]* and be ≤ 63 bytes"
);
}
const MIGRATE_LOCK_KEY: i64 = 0x5255_5354_4d49_4754;
#[must_use]
pub fn registered_models() -> Vec<&'static ModelSchema> {
inventory::iter::<ModelEntry>
.into_iter()
.map(|e| e.schema)
.collect()
}
pub async fn apply_all(pool: &PgPool) -> Result<(), MigrateError> {
let models = registered_models();
for model in &models {
let sql = ddl::create_table_sql(model);
sqlx::query(&sql).execute(pool).await?;
}
for model in &models {
for sql in ddl::create_constraints_sql(model) {
sqlx::query(&sql).execute(pool).await?;
}
}
Ok(())
}
pub async fn drop_all(pool: &PgPool) -> Result<(), MigrateError> {
for model in registered_models() {
let sql = ddl::drop_table_sql(model, true, true);
sqlx::query(&sql).execute(pool).await?;
}
Ok(())
}
pub async fn apply_all_pool(pool: &crate::sql::Pool) -> Result<(), MigrateError> {
let dialect = pool.dialect();
let models = registered_models();
for model in &models {
let sql = ddl::create_table_sql_with_dialect(dialect, model);
crate::sql::raw_execute_pool(pool, &sql, ::std::vec::Vec::new()).await?;
}
for model in &models {
for sql in ddl::create_constraints_sql_with_dialect(dialect, model) {
crate::sql::raw_execute_pool(pool, &sql, ::std::vec::Vec::new()).await?;
}
}
Ok(())
}
pub async fn drop_all_pool(pool: &crate::sql::Pool) -> Result<(), MigrateError> {
let dialect = pool.dialect();
let cascade = dialect.name() == "postgres";
for model in registered_models() {
let sql =
ddl::drop_table_sql_with_dialect(dialect, model, true, cascade);
crate::sql::raw_execute_pool(pool, &sql, ::std::vec::Vec::new()).await?;
}
Ok(())
}
pub async fn migrate(pool: &PgPool, dir: &Path) -> Result<Vec<Migration>, MigrateError> {
Builder::default().migrate(pool, dir).await
}
async fn migrate_with_ledger(
pool: &PgPool,
dir: &Path,
ledger: &str,
) -> Result<Vec<Migration>, MigrateError> {
ensure_ledger_for(pool, ledger).await?;
with_migrate_lock(pool, async {
let all = file::list_dir(dir)?;
let applied = applied_set_for(pool, ledger).await?;
let pending: Vec<Migration> = all
.into_iter()
.filter(|m| !applied.contains(&m.name))
.collect();
let mut newly = Vec::with_capacity(pending.len());
for mig in pending {
apply_one(pool, &mig, ledger).await?;
newly.push(mig);
}
Ok(newly)
})
.await
}
async fn with_migrate_lock<F, R>(pool: &PgPool, body: F) -> Result<R, MigrateError>
where
F: std::future::Future<Output = Result<R, MigrateError>>,
{
use crate::sql::{Dialect as _, Postgres};
let dialect = Postgres;
let mut lock_conn = pool.acquire().await?;
if let Some(acquire_sql) = dialect.acquire_session_lock_sql() {
sqlx::query(&acquire_sql)
.bind(MIGRATE_LOCK_KEY)
.execute(&mut *lock_conn)
.await?;
}
let result = body.await;
if let Some(release_sql) = dialect.release_session_lock_sql() {
let _ = sqlx::query(&release_sql)
.bind(MIGRATE_LOCK_KEY)
.execute(&mut *lock_conn)
.await;
}
result
}
pub async fn applied_set(pool: &PgPool) -> Result<HashSet<String>, MigrateError> {
applied_set_for(pool, LEDGER_TABLE).await
}
async fn applied_set_for(pool: &PgPool, ledger: &str) -> Result<HashSet<String>, MigrateError> {
let rows = sqlx::query(&format!("SELECT name FROM {ledger}"))
.fetch_all(pool)
.await?;
let mut out = HashSet::with_capacity(rows.len());
for row in rows {
out.insert(row.try_get::<String, _>("name")?);
}
Ok(out)
}
pub async fn ensure_ledger(pool: &PgPool) -> Result<(), MigrateError> {
ensure_ledger_for(pool, LEDGER_TABLE).await
}
async fn ensure_ledger_for(pool: &PgPool, ledger: &str) -> Result<(), MigrateError> {
use crate::sql::{Dialect as _, Postgres};
const LOCK_KEY: i64 = 0x5255_5354;
let dialect = Postgres;
let mut tx = pool.begin().await?;
if let Some(xact_lock_sql) = dialect.acquire_xact_lock_sql() {
sqlx::query(&xact_lock_sql)
.bind(LOCK_KEY)
.execute(&mut *tx)
.await?;
}
let create_sql = format!(
"CREATE TABLE IF NOT EXISTS {ledger} (\
name TEXT PRIMARY KEY, \
applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW())"
);
sqlx::query(&create_sql).execute(&mut *tx).await?;
tx.commit().await?;
Ok(())
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MigrationPreview {
pub name: String,
pub atomic: bool,
pub statements: Vec<String>,
}
pub async fn migrate_dry_run(
pool: &PgPool,
dir: &Path,
) -> Result<Vec<MigrationPreview>, MigrateError> {
Builder::default().migrate_dry_run(pool, dir).await
}
async fn migrate_dry_run_with_ledger(
pool: &PgPool,
dir: &Path,
ledger: &str,
) -> Result<Vec<MigrationPreview>, MigrateError> {
ensure_ledger_for(pool, ledger).await?;
let all = file::list_dir(dir)?;
let applied = applied_set_for(pool, ledger).await?;
let pending: Vec<Migration> = all
.into_iter()
.filter(|m| !applied.contains(&m.name))
.collect();
let mut out = Vec::with_capacity(pending.len());
for mig in pending {
out.push(preview_migration(&mig, ledger)?);
}
Ok(out)
}
fn preview_migration(mig: &Migration, ledger: &str) -> Result<MigrationPreview, MigrateError> {
let mut statements = Vec::new();
let mut deferred_fks: Vec<String> = Vec::new();
if mig.atomic {
statements.push("BEGIN".to_string());
}
for op in &mig.forward {
match op {
Operation::Schema(change) => {
let batch = render_changes_split(std::slice::from_ref(change), &mig.snapshot)
.map_err(MigrateError::Validation)?;
statements.extend(batch.immediate);
deferred_fks.extend(batch.deferred_fks);
}
Operation::Data(d) => {
statements.push(d.sql.clone());
}
}
}
statements.extend(deferred_fks);
statements.push(format!(
"INSERT INTO {ledger} (name) VALUES ('{}')",
mig.name.replace('\'', "''")
));
if mig.atomic {
statements.push("COMMIT".to_string());
}
Ok(MigrationPreview {
name: mig.name.clone(),
atomic: mig.atomic,
statements,
})
}
async fn apply_atomic(pool: &PgPool, mig: &Migration, ledger: &str) -> Result<(), MigrateError> {
tracing::info!(migration = %mig.name, "applying (atomic)");
let mut tx = pool.begin().await?;
let mut deferred_fks: Vec<String> = Vec::new();
for op in &mig.forward {
match op {
Operation::Schema(change) => {
let batch = render_changes_split(std::slice::from_ref(change), &mig.snapshot)
.map_err(MigrateError::Validation)?;
for stmt in batch.immediate {
sqlx::query(&stmt).execute(&mut *tx).await?;
}
deferred_fks.extend(batch.deferred_fks);
}
Operation::Data(d) => {
sqlx::query(&d.sql).execute(&mut *tx).await?;
}
}
}
for stmt in deferred_fks {
sqlx::query(&stmt).execute(&mut *tx).await?;
}
sqlx::query(&format!("INSERT INTO {ledger} (name) VALUES ($1)"))
.bind(&mig.name)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
pub async fn migrate_to(
pool: &PgPool,
dir: &Path,
target: &str,
) -> Result<Vec<Migration>, MigrateError> {
Builder::default().migrate_to(pool, dir, target).await
}
async fn migrate_to_with_ledger(
pool: &PgPool,
dir: &Path,
target: &str,
ledger: &str,
) -> Result<Vec<Migration>, MigrateError> {
ensure_ledger_for(pool, ledger).await?;
with_migrate_lock(pool, async {
let all = file::list_dir(dir)?;
let applied = applied_set_for(pool, ledger).await?;
if target == "zero" {
return unapply_all_in_order(pool, dir, &all, &applied, ledger).await;
}
if !all.iter().any(|m| m.name == target) {
return Err(MigrateError::Validation(format!(
"target migration `{target}` not found in {}",
dir.display()
)));
}
let head = all
.iter()
.rev()
.find(|m| applied.contains(&m.name))
.map(|m| m.name.clone());
let mut touched = Vec::new();
match head {
None => {
for mig in all.into_iter().filter(|m| m.name.as_str() <= target) {
apply_one(pool, &mig, ledger).await?;
touched.push(mig);
}
}
Some(h) => {
use std::cmp::Ordering;
match target.cmp(h.as_str()) {
Ordering::Equal => {}
Ordering::Greater => {
for mig in all.into_iter().filter(|m| {
m.name.as_str() > h.as_str()
&& m.name.as_str() <= target
&& !applied.contains(&m.name)
}) {
apply_one(pool, &mig, ledger).await?;
touched.push(mig);
}
}
Ordering::Less => {
let mut to_unapply: Vec<Migration> = all
.into_iter()
.filter(|m| {
m.name.as_str() > target
&& m.name.as_str() <= h.as_str()
&& applied.contains(&m.name)
})
.collect();
to_unapply.reverse();
for mig in to_unapply {
unapply_locked(pool, dir, &mig.name, ledger).await?;
touched.push(mig);
}
}
}
}
}
Ok(touched)
})
.await
}
pub async fn migrate_embedded(
pool: &PgPool,
embedded: &[(&str, &str)],
) -> Result<Vec<Migration>, MigrateError> {
Builder::default().migrate_embedded(pool, embedded).await
}
async fn migrate_embedded_with_ledger(
pool: &PgPool,
embedded: &[(&str, &str)],
ledger: &str,
) -> Result<Vec<Migration>, MigrateError> {
ensure_ledger_for(pool, ledger).await?;
with_migrate_lock(pool, async {
let mut all: Vec<Migration> = Vec::with_capacity(embedded.len());
for (name, json) in embedded {
let mig = file::parse(json)?;
if mig.name != *name {
return Err(MigrateError::Validation(format!(
"embedded entry key `{name}` doesn't match migration `name` field `{}`",
mig.name,
)));
}
all.push(mig);
}
all.sort_by(|a, b| a.name.cmp(&b.name));
file::validate_chain(&all, "embedded slice")?;
let applied = applied_set_for(pool, ledger).await?;
let pending: Vec<Migration> = all
.into_iter()
.filter(|m| !applied.contains(&m.name))
.collect();
let mut newly = Vec::with_capacity(pending.len());
for mig in pending {
apply_one(pool, &mig, ledger).await?;
newly.push(mig);
}
Ok(newly)
})
.await
}
pub async fn downgrade(
pool: &PgPool,
dir: &Path,
steps: usize,
) -> Result<Vec<Migration>, MigrateError> {
Builder::default().downgrade(pool, dir, steps).await
}
async fn downgrade_with_ledger(
pool: &PgPool,
dir: &Path,
steps: usize,
ledger: &str,
) -> Result<Vec<Migration>, MigrateError> {
if steps == 0 {
return Ok(Vec::new());
}
ensure_ledger_for(pool, ledger).await?;
with_migrate_lock(pool, async {
let all = file::list_dir(dir)?;
let applied = applied_set_for(pool, ledger).await?;
let applied_in_order: Vec<Migration> = all
.into_iter()
.filter(|m| applied.contains(&m.name))
.collect();
if applied_in_order.is_empty() {
return Ok(Vec::new());
}
let n = steps.min(applied_in_order.len());
let to_unapply: Vec<Migration> = applied_in_order.into_iter().rev().take(n).collect();
let mut touched = Vec::with_capacity(to_unapply.len());
for mig in to_unapply {
unapply_locked(pool, dir, &mig.name, ledger).await?;
touched.push(mig);
}
Ok(touched)
})
.await
}
async fn apply_one(pool: &PgPool, mig: &Migration, ledger: &str) -> Result<(), MigrateError> {
if mig.atomic {
apply_atomic(pool, mig, ledger).await
} else {
apply_loose(pool, mig, ledger).await
}
}
async fn unapply_all_in_order(
pool: &PgPool,
dir: &Path,
all: &[Migration],
applied: &HashSet<String>,
ledger: &str,
) -> Result<Vec<Migration>, MigrateError> {
let mut to_unapply: Vec<Migration> = all
.iter()
.filter(|m| applied.contains(&m.name))
.cloned()
.collect();
to_unapply.reverse();
let mut touched = Vec::with_capacity(to_unapply.len());
for mig in to_unapply {
unapply_locked(pool, dir, &mig.name, ledger).await?;
touched.push(mig);
}
Ok(touched)
}
pub async fn unapply(pool: &PgPool, dir: &Path, name: &str) -> Result<Migration, MigrateError> {
Builder::default().unapply(pool, dir, name).await
}
async fn unapply_with_ledger(
pool: &PgPool,
dir: &Path,
name: &str,
ledger: &str,
) -> Result<Migration, MigrateError> {
ensure_ledger_for(pool, ledger).await?;
with_migrate_lock(pool, async {
check_is_head(pool, dir, name, ledger).await?;
unapply_locked(pool, dir, name, ledger).await
})
.await
}
pub async fn unapply_force(
pool: &PgPool,
dir: &Path,
name: &str,
) -> Result<Migration, MigrateError> {
Builder::default().unapply_force(pool, dir, name).await
}
async fn unapply_force_with_ledger(
pool: &PgPool,
dir: &Path,
name: &str,
ledger: &str,
) -> Result<Migration, MigrateError> {
ensure_ledger_for(pool, ledger).await?;
with_migrate_lock(pool, unapply_locked(pool, dir, name, ledger)).await
}
async fn check_is_head(
pool: &PgPool,
dir: &Path,
name: &str,
ledger: &str,
) -> Result<(), MigrateError> {
let applied = applied_set_for(pool, ledger).await?;
if !applied.contains(name) {
return Ok(());
}
let all = file::list_dir(dir)?;
let head = all
.iter()
.rev()
.find(|m| applied.contains(&m.name))
.map(|m| m.name.as_str());
match head {
Some(h) if h == name => Ok(()),
Some(h) => Err(MigrateError::Validation(format!(
"refusing to unapply `{name}` out of order: current head is `{h}`. \
Use `downgrade(pool, dir, n)` / `migrate_to(pool, dir, target)` for \
ordered rollback, or `unapply_force` to bypass.",
))),
None => Ok(()),
}
}
async fn unapply_locked(
pool: &PgPool,
dir: &Path,
name: &str,
ledger: &str,
) -> Result<Migration, MigrateError> {
let all = file::list_dir(dir)?;
let target = all
.iter()
.find(|m| m.name == name)
.cloned()
.ok_or_else(|| {
MigrateError::Validation(format!("migration `{name}` not found in {}", dir.display()))
})?;
let prev_snapshot = match &target.prev {
None => SchemaSnapshot {
tables: vec![],
m2m_tables: vec![],
indexes: vec![],
checks: vec![],
},
Some(prev_name) => all
.iter()
.find(|m| &m.name == prev_name)
.map(|m| m.snapshot.clone())
.ok_or_else(|| {
MigrateError::Validation(format!(
"migration `{name}` declares prev=`{prev_name}` but that file is missing in {}",
dir.display()
))
})?,
};
let inverted = invert(&target.forward, &prev_snapshot)?;
if target.atomic {
unapply_atomic(pool, &target, &inverted, &prev_snapshot, ledger).await?;
} else {
unapply_loose(pool, &target, &inverted, &prev_snapshot, ledger).await?;
}
Ok(target)
}
async fn unapply_atomic(
pool: &PgPool,
target: &Migration,
inverted: &[Operation],
snapshot: &SchemaSnapshot,
ledger: &str,
) -> Result<(), MigrateError> {
tracing::info!(migration = %target.name, "unapplying (atomic)");
let mut tx = pool.begin().await?;
let mut deferred_fks: Vec<String> = Vec::new();
for op in inverted {
match op {
Operation::Schema(change) => {
let batch = render_changes_split(std::slice::from_ref(change), snapshot)
.map_err(MigrateError::Validation)?;
for stmt in batch.immediate {
sqlx::query(&stmt).execute(&mut *tx).await?;
}
deferred_fks.extend(batch.deferred_fks);
}
Operation::Data(d) => {
sqlx::query(&d.sql).execute(&mut *tx).await?;
}
}
}
for stmt in deferred_fks {
sqlx::query(&stmt).execute(&mut *tx).await?;
}
sqlx::query(&format!("DELETE FROM {ledger} WHERE name = $1"))
.bind(&target.name)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
async fn unapply_loose(
pool: &PgPool,
target: &Migration,
inverted: &[Operation],
snapshot: &SchemaSnapshot,
ledger: &str,
) -> Result<(), MigrateError> {
tracing::info!(migration = %target.name, "unapplying (non-atomic)");
let mut deferred_fks: Vec<String> = Vec::new();
for op in inverted {
match op {
Operation::Schema(change) => {
let batch = render_changes_split(std::slice::from_ref(change), snapshot)
.map_err(MigrateError::Validation)?;
for stmt in batch.immediate {
sqlx::query(&stmt).execute(pool).await?;
}
deferred_fks.extend(batch.deferred_fks);
}
Operation::Data(d) => {
sqlx::query(&d.sql).execute(pool).await?;
}
}
}
for stmt in deferred_fks {
sqlx::query(&stmt).execute(pool).await?;
}
sqlx::query(&format!("DELETE FROM {ledger} WHERE name = $1"))
.bind(&target.name)
.execute(pool)
.await?;
Ok(())
}
async fn apply_loose(pool: &PgPool, mig: &Migration, ledger: &str) -> Result<(), MigrateError> {
tracing::info!(migration = %mig.name, "applying (non-atomic)");
let mut deferred_fks: Vec<String> = Vec::new();
for op in &mig.forward {
match op {
Operation::Schema(change) => {
let batch = render_changes_split(std::slice::from_ref(change), &mig.snapshot)
.map_err(MigrateError::Validation)?;
for stmt in batch.immediate {
sqlx::query(&stmt).execute(pool).await?;
}
deferred_fks.extend(batch.deferred_fks);
}
Operation::Data(d) => {
sqlx::query(&d.sql).execute(pool).await?;
}
}
}
for stmt in deferred_fks {
sqlx::query(&stmt).execute(pool).await?;
}
sqlx::query(&format!("INSERT INTO {ledger} (name) VALUES ($1)"))
.bind(&mig.name)
.execute(pool)
.await?;
Ok(())
}
pub async fn ensure_ledger_pool(pool: &crate::sql::Pool) -> Result<(), MigrateError> {
ensure_ledger_pool_for(pool, LEDGER_TABLE).await
}
async fn ensure_ledger_pool_for(pool: &crate::sql::Pool, ledger: &str) -> Result<(), MigrateError> {
let dialect_name = pool.dialect().name();
let timestamp_col = match dialect_name {
"postgres" => "TIMESTAMPTZ NOT NULL DEFAULT NOW()",
"mysql" => "DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)",
"sqlite" => "TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP",
other => {
return Err(MigrateError::Validation(format!(
"ensure_ledger_pool: unrecognized dialect `{other}`"
)));
}
};
let create_sql = format!(
"CREATE TABLE IF NOT EXISTS {ledger} (\
name VARCHAR(255) PRIMARY KEY, \
applied_at {timestamp_col})"
);
crate::sql::raw_execute_pool(pool, &create_sql, ::std::vec::Vec::new()).await?;
Ok(())
}
pub async fn applied_set_pool(pool: &crate::sql::Pool) -> Result<HashSet<String>, MigrateError> {
applied_set_pool_for(pool, LEDGER_TABLE).await
}
async fn applied_set_pool_for(
pool: &crate::sql::Pool,
ledger: &str,
) -> Result<HashSet<String>, MigrateError> {
let sql = format!("SELECT name FROM {ledger}");
match pool {
#[cfg(feature = "postgres")]
crate::sql::Pool::Postgres(pg) => {
let rows = sqlx::query(&sql).fetch_all(pg).await?;
let mut out = HashSet::with_capacity(rows.len());
for row in rows {
out.insert(row.try_get::<String, _>("name")?);
}
Ok(out)
}
#[cfg(feature = "mysql")]
crate::sql::Pool::Mysql(my) => {
let rows = sqlx::query(&sql).fetch_all(my).await?;
let mut out = HashSet::with_capacity(rows.len());
for row in rows {
out.insert(row.try_get::<String, _>("name")?);
}
Ok(out)
}
#[cfg(feature = "sqlite")]
crate::sql::Pool::Sqlite(sq) => {
let rows = sqlx::query(&sql).fetch_all(sq).await?;
let mut out = HashSet::with_capacity(rows.len());
for row in rows {
out.insert(row.try_get::<String, _>("name")?);
}
Ok(out)
}
}
}
pub async fn migrate_pool(
pool: &crate::sql::Pool,
dir: &Path,
) -> Result<Vec<Migration>, MigrateError> {
migrate_pool_with_ledger(pool, dir, LEDGER_TABLE).await
}
async fn migrate_pool_with_ledger(
pool: &crate::sql::Pool,
dir: &Path,
ledger: &str,
) -> Result<Vec<Migration>, MigrateError> {
ensure_ledger_pool_for(pool, ledger).await?;
with_migrate_lock_pool(pool, async {
let all = file::list_dir(dir)?;
let applied = applied_set_pool_for(pool, ledger).await?;
let pending: Vec<Migration> = all
.into_iter()
.filter(|m| !applied.contains(&m.name))
.collect();
let mut newly = Vec::with_capacity(pending.len());
for mig in pending {
if mig.atomic {
apply_atomic_pool(pool, &mig, ledger).await?;
} else {
apply_nonatomic_pool(pool, &mig, ledger).await?;
}
newly.push(mig);
}
Ok(newly)
})
.await
}
async fn with_migrate_lock_pool<F, R>(pool: &crate::sql::Pool, body: F) -> Result<R, MigrateError>
where
F: std::future::Future<Output = Result<R, MigrateError>>,
{
match pool {
#[cfg(feature = "postgres")]
crate::sql::Pool::Postgres(pg) => {
let mut lock_conn = pg.acquire().await?;
sqlx::query("SELECT pg_advisory_lock($1)")
.bind(MIGRATE_LOCK_KEY)
.execute(&mut *lock_conn)
.await?;
let result = body.await;
let _ = sqlx::query("SELECT pg_advisory_unlock($1)")
.bind(MIGRATE_LOCK_KEY)
.execute(&mut *lock_conn)
.await;
result
}
#[cfg(feature = "mysql")]
crate::sql::Pool::Mysql(my) => {
let lock_name = format!("rustango_migrate_{:x}", MIGRATE_LOCK_KEY);
let mut lock_conn = my.acquire().await?;
sqlx::query("SELECT GET_LOCK(?, -1)")
.bind(&lock_name)
.execute(&mut *lock_conn)
.await?;
let result = body.await;
let _ = sqlx::query("SELECT RELEASE_LOCK(?)")
.bind(&lock_name)
.execute(&mut *lock_conn)
.await;
result
}
#[cfg(feature = "sqlite")]
crate::sql::Pool::Sqlite(_) => {
body.await
}
}
}
async fn apply_atomic_pool(
pool: &crate::sql::Pool,
mig: &Migration,
ledger: &str,
) -> Result<(), MigrateError> {
tracing::info!(migration = %mig.name, "applying (atomic, _pool)");
match pool {
#[cfg(feature = "postgres")]
crate::sql::Pool::Postgres(pg) => {
let mut tx = pg.begin().await?;
let mut deferred_fks: Vec<String> = Vec::new();
for op in &mig.forward {
match op {
Operation::Schema(change) => {
let batch =
render_changes_split(std::slice::from_ref(change), &mig.snapshot)
.map_err(MigrateError::Validation)?;
for stmt in batch.immediate {
sqlx::query(&stmt).execute(&mut *tx).await?;
}
deferred_fks.extend(batch.deferred_fks);
}
Operation::Data(d) => {
sqlx::query(&d.sql).execute(&mut *tx).await?;
}
}
}
for stmt in deferred_fks {
sqlx::query(&stmt).execute(&mut *tx).await?;
}
sqlx::query(&format!("INSERT INTO {ledger} (name) VALUES ($1)"))
.bind(&mig.name)
.execute(&mut *tx)
.await?;
tx.commit().await?;
}
#[cfg(feature = "mysql")]
crate::sql::Pool::Mysql(my) => {
let mut tx = my.begin().await?;
let mut deferred_fks: Vec<String> = Vec::new();
for op in &mig.forward {
match op {
Operation::Schema(change) => {
let batch =
render_changes_split(std::slice::from_ref(change), &mig.snapshot)
.map_err(MigrateError::Validation)?;
for stmt in batch.immediate {
sqlx::query(&stmt).execute(&mut *tx).await?;
}
deferred_fks.extend(batch.deferred_fks);
}
Operation::Data(d) => {
sqlx::query(&d.sql).execute(&mut *tx).await?;
}
}
}
for stmt in deferred_fks {
sqlx::query(&stmt).execute(&mut *tx).await?;
}
sqlx::query(&format!("INSERT INTO {ledger} (name) VALUES (?)"))
.bind(&mig.name)
.execute(&mut *tx)
.await?;
tx.commit().await?;
}
#[cfg(feature = "sqlite")]
crate::sql::Pool::Sqlite(sq) => {
let mut tx = sq.begin().await?;
let mut deferred_fks: Vec<String> = Vec::new();
for op in &mig.forward {
match op {
Operation::Schema(change) => {
let batch =
render_changes_split(std::slice::from_ref(change), &mig.snapshot)
.map_err(MigrateError::Validation)?;
for stmt in batch.immediate {
sqlx::query(&stmt).execute(&mut *tx).await?;
}
deferred_fks.extend(batch.deferred_fks);
}
Operation::Data(d) => {
sqlx::query(&d.sql).execute(&mut *tx).await?;
}
}
}
for stmt in deferred_fks {
sqlx::query(&stmt).execute(&mut *tx).await?;
}
sqlx::query(&format!("INSERT INTO {ledger} (name) VALUES (?)"))
.bind(&mig.name)
.execute(&mut *tx)
.await?;
tx.commit().await?;
}
}
Ok(())
}
async fn apply_nonatomic_pool(
pool: &crate::sql::Pool,
mig: &Migration,
ledger: &str,
) -> Result<(), MigrateError> {
tracing::info!(migration = %mig.name, "applying (non-atomic, _pool)");
let mut deferred_fks: Vec<String> = Vec::new();
for op in &mig.forward {
match op {
Operation::Schema(change) => {
let batch = render_changes_split(std::slice::from_ref(change), &mig.snapshot)
.map_err(MigrateError::Validation)?;
for stmt in batch.immediate {
crate::sql::raw_execute_pool(pool, &stmt, ::std::vec::Vec::new()).await?;
}
deferred_fks.extend(batch.deferred_fks);
}
Operation::Data(d) => {
crate::sql::raw_execute_pool(pool, &d.sql, ::std::vec::Vec::new()).await?;
}
}
}
for stmt in deferred_fks {
crate::sql::raw_execute_pool(pool, &stmt, ::std::vec::Vec::new()).await?;
}
let placeholder = pool.dialect().placeholder(1);
let insert_sql = format!("INSERT INTO {ledger} (name) VALUES ({placeholder})");
crate::sql::raw_execute_pool(
pool,
&insert_sql,
::std::vec![crate::core::SqlValue::String(mig.name.clone())],
)
.await?;
Ok(())
}
pub async fn migrate_to_pool(
pool: &crate::sql::Pool,
dir: &Path,
target: &str,
) -> Result<Vec<Migration>, MigrateError> {
migrate_to_pool_with_ledger(pool, dir, target, LEDGER_TABLE).await
}
async fn migrate_to_pool_with_ledger(
pool: &crate::sql::Pool,
dir: &Path,
target: &str,
ledger: &str,
) -> Result<Vec<Migration>, MigrateError> {
ensure_ledger_pool_for(pool, ledger).await?;
with_migrate_lock_pool(pool, async {
let all = file::list_dir(dir)?;
let applied = applied_set_pool_for(pool, ledger).await?;
if target == "zero" {
return unapply_all_in_order_pool(pool, dir, &all, &applied, ledger).await;
}
if !all.iter().any(|m| m.name == target) {
return Err(MigrateError::Validation(format!(
"target migration `{target}` not found in {}",
dir.display()
)));
}
let head = all
.iter()
.rev()
.find(|m| applied.contains(&m.name))
.map(|m| m.name.clone());
let mut touched = Vec::new();
match head {
None => {
for mig in all.into_iter().filter(|m| m.name.as_str() <= target) {
apply_one_pool(pool, &mig, ledger).await?;
touched.push(mig);
}
}
Some(h) => {
use std::cmp::Ordering;
match target.cmp(h.as_str()) {
Ordering::Equal => {}
Ordering::Greater => {
for mig in all.into_iter().filter(|m| {
m.name.as_str() > h.as_str()
&& m.name.as_str() <= target
&& !applied.contains(&m.name)
}) {
apply_one_pool(pool, &mig, ledger).await?;
touched.push(mig);
}
}
Ordering::Less => {
let mut to_unapply: Vec<Migration> = all
.into_iter()
.filter(|m| {
m.name.as_str() > target
&& m.name.as_str() <= h.as_str()
&& applied.contains(&m.name)
})
.collect();
to_unapply.reverse();
for mig in to_unapply {
unapply_locked_pool(pool, dir, &mig.name, ledger).await?;
touched.push(mig);
}
}
}
}
}
Ok(touched)
})
.await
}
pub async fn downgrade_pool(
pool: &crate::sql::Pool,
dir: &Path,
steps: usize,
) -> Result<Vec<Migration>, MigrateError> {
downgrade_pool_with_ledger(pool, dir, steps, LEDGER_TABLE).await
}
async fn downgrade_pool_with_ledger(
pool: &crate::sql::Pool,
dir: &Path,
steps: usize,
ledger: &str,
) -> Result<Vec<Migration>, MigrateError> {
if steps == 0 {
return Ok(Vec::new());
}
ensure_ledger_pool_for(pool, ledger).await?;
with_migrate_lock_pool(pool, async {
let all = file::list_dir(dir)?;
let applied = applied_set_pool_for(pool, ledger).await?;
let applied_in_order: Vec<Migration> = all
.into_iter()
.filter(|m| applied.contains(&m.name))
.collect();
if applied_in_order.is_empty() {
return Ok(Vec::new());
}
let n = steps.min(applied_in_order.len());
let to_unapply: Vec<Migration> = applied_in_order.into_iter().rev().take(n).collect();
let mut touched = Vec::with_capacity(to_unapply.len());
for mig in to_unapply {
unapply_locked_pool(pool, dir, &mig.name, ledger).await?;
touched.push(mig);
}
Ok(touched)
})
.await
}
pub async fn unapply_pool(
pool: &crate::sql::Pool,
dir: &Path,
name: &str,
) -> Result<Migration, MigrateError> {
unapply_pool_with_ledger(pool, dir, name, LEDGER_TABLE).await
}
async fn unapply_pool_with_ledger(
pool: &crate::sql::Pool,
dir: &Path,
name: &str,
ledger: &str,
) -> Result<Migration, MigrateError> {
ensure_ledger_pool_for(pool, ledger).await?;
with_migrate_lock_pool(pool, async {
check_is_head_pool(pool, dir, name, ledger).await?;
unapply_locked_pool(pool, dir, name, ledger).await
})
.await
}
pub async fn unapply_force_pool(
pool: &crate::sql::Pool,
dir: &Path,
name: &str,
) -> Result<Migration, MigrateError> {
unapply_force_pool_with_ledger(pool, dir, name, LEDGER_TABLE).await
}
async fn unapply_force_pool_with_ledger(
pool: &crate::sql::Pool,
dir: &Path,
name: &str,
ledger: &str,
) -> Result<Migration, MigrateError> {
ensure_ledger_pool_for(pool, ledger).await?;
with_migrate_lock_pool(pool, unapply_locked_pool(pool, dir, name, ledger)).await
}
pub async fn migrate_dry_run_pool(
pool: &crate::sql::Pool,
dir: &Path,
) -> Result<Vec<MigrationPreview>, MigrateError> {
migrate_dry_run_pool_with_ledger(pool, dir, LEDGER_TABLE).await
}
async fn migrate_dry_run_pool_with_ledger(
pool: &crate::sql::Pool,
dir: &Path,
ledger: &str,
) -> Result<Vec<MigrationPreview>, MigrateError> {
ensure_ledger_pool_for(pool, ledger).await?;
let all = file::list_dir(dir)?;
let applied = applied_set_pool_for(pool, ledger).await?;
let pending: Vec<Migration> = all
.into_iter()
.filter(|m| !applied.contains(&m.name))
.collect();
let mut out = Vec::with_capacity(pending.len());
for mig in pending {
out.push(preview_migration(&mig, ledger)?);
}
Ok(out)
}
async fn apply_one_pool(
pool: &crate::sql::Pool,
mig: &Migration,
ledger: &str,
) -> Result<(), MigrateError> {
if mig.atomic {
apply_atomic_pool(pool, mig, ledger).await
} else {
apply_nonatomic_pool(pool, mig, ledger).await
}
}
async fn unapply_all_in_order_pool(
pool: &crate::sql::Pool,
dir: &Path,
all: &[Migration],
applied: &HashSet<String>,
ledger: &str,
) -> Result<Vec<Migration>, MigrateError> {
let mut to_unapply: Vec<Migration> = all
.iter()
.filter(|m| applied.contains(&m.name))
.cloned()
.collect();
to_unapply.reverse();
let mut touched = Vec::with_capacity(to_unapply.len());
for mig in to_unapply {
unapply_locked_pool(pool, dir, &mig.name, ledger).await?;
touched.push(mig);
}
Ok(touched)
}
async fn unapply_locked_pool(
pool: &crate::sql::Pool,
dir: &Path,
name: &str,
ledger: &str,
) -> Result<Migration, MigrateError> {
let all = file::list_dir(dir)?;
let target = all
.iter()
.find(|m| m.name == name)
.cloned()
.ok_or_else(|| {
MigrateError::Validation(format!("migration `{name}` not found in {}", dir.display()))
})?;
let prev_snapshot = match &target.prev {
None => SchemaSnapshot {
tables: vec![],
m2m_tables: vec![],
indexes: vec![],
checks: vec![],
},
Some(prev_name) => all
.iter()
.find(|m| &m.name == prev_name)
.map(|m| m.snapshot.clone())
.ok_or_else(|| {
MigrateError::Validation(format!(
"migration `{name}` declares prev=`{prev_name}` but that file is missing in {}",
dir.display()
))
})?,
};
let inverted = invert(&target.forward, &prev_snapshot)?;
if target.atomic {
unapply_atomic_pool(pool, &target, &inverted, &prev_snapshot, ledger).await?;
} else {
unapply_nonatomic_pool(pool, &target, &inverted, &prev_snapshot, ledger).await?;
}
Ok(target)
}
async fn check_is_head_pool(
pool: &crate::sql::Pool,
dir: &Path,
name: &str,
ledger: &str,
) -> Result<(), MigrateError> {
let applied = applied_set_pool_for(pool, ledger).await?;
if !applied.contains(name) {
return Ok(());
}
let all = file::list_dir(dir)?;
let head = all
.iter()
.rev()
.find(|m| applied.contains(&m.name))
.map(|m| m.name.as_str());
match head {
Some(h) if h == name => Ok(()),
Some(h) => Err(MigrateError::Validation(format!(
"refusing to unapply `{name}` out of order: current head is `{h}`. \
Use `downgrade_pool(pool, dir, n)` / `migrate_to_pool(pool, dir, target)` for \
ordered rollback, or `unapply_force_pool` to bypass.",
))),
None => Ok(()),
}
}
async fn unapply_atomic_pool(
pool: &crate::sql::Pool,
target: &Migration,
inverted: &[Operation],
snapshot: &SchemaSnapshot,
ledger: &str,
) -> Result<(), MigrateError> {
tracing::info!(migration = %target.name, "unapplying (atomic, _pool)");
match pool {
#[cfg(feature = "postgres")]
crate::sql::Pool::Postgres(pg) => {
let mut tx = pg.begin().await?;
let mut deferred_fks: Vec<String> = Vec::new();
for op in inverted {
match op {
Operation::Schema(change) => {
let batch = render_changes_split(std::slice::from_ref(change), snapshot)
.map_err(MigrateError::Validation)?;
for stmt in batch.immediate {
sqlx::query(&stmt).execute(&mut *tx).await?;
}
deferred_fks.extend(batch.deferred_fks);
}
Operation::Data(d) => {
sqlx::query(&d.sql).execute(&mut *tx).await?;
}
}
}
for stmt in deferred_fks {
sqlx::query(&stmt).execute(&mut *tx).await?;
}
sqlx::query(&format!("DELETE FROM {ledger} WHERE name = $1"))
.bind(&target.name)
.execute(&mut *tx)
.await?;
tx.commit().await?;
}
#[cfg(feature = "mysql")]
crate::sql::Pool::Mysql(my) => {
let mut tx = my.begin().await?;
let mut deferred_fks: Vec<String> = Vec::new();
for op in inverted {
match op {
Operation::Schema(change) => {
let batch = render_changes_split(std::slice::from_ref(change), snapshot)
.map_err(MigrateError::Validation)?;
for stmt in batch.immediate {
sqlx::query(&stmt).execute(&mut *tx).await?;
}
deferred_fks.extend(batch.deferred_fks);
}
Operation::Data(d) => {
sqlx::query(&d.sql).execute(&mut *tx).await?;
}
}
}
for stmt in deferred_fks {
sqlx::query(&stmt).execute(&mut *tx).await?;
}
sqlx::query(&format!("DELETE FROM {ledger} WHERE name = ?"))
.bind(&target.name)
.execute(&mut *tx)
.await?;
tx.commit().await?;
}
#[cfg(feature = "sqlite")]
crate::sql::Pool::Sqlite(sq) => {
let mut tx = sq.begin().await?;
let mut deferred_fks: Vec<String> = Vec::new();
for op in inverted {
match op {
Operation::Schema(change) => {
let batch = render_changes_split(std::slice::from_ref(change), snapshot)
.map_err(MigrateError::Validation)?;
for stmt in batch.immediate {
sqlx::query(&stmt).execute(&mut *tx).await?;
}
deferred_fks.extend(batch.deferred_fks);
}
Operation::Data(d) => {
sqlx::query(&d.sql).execute(&mut *tx).await?;
}
}
}
for stmt in deferred_fks {
sqlx::query(&stmt).execute(&mut *tx).await?;
}
sqlx::query(&format!("DELETE FROM {ledger} WHERE name = ?"))
.bind(&target.name)
.execute(&mut *tx)
.await?;
tx.commit().await?;
}
}
Ok(())
}
pub async fn migrate_embedded_pool(
pool: &crate::sql::Pool,
embedded: &[(&str, &str)],
) -> Result<Vec<Migration>, MigrateError> {
migrate_embedded_pool_with_ledger(pool, embedded, LEDGER_TABLE).await
}
async fn migrate_embedded_pool_with_ledger(
pool: &crate::sql::Pool,
embedded: &[(&str, &str)],
ledger: &str,
) -> Result<Vec<Migration>, MigrateError> {
ensure_ledger_pool_for(pool, ledger).await?;
with_migrate_lock_pool(pool, async {
let mut all: Vec<Migration> = Vec::with_capacity(embedded.len());
for (name, json) in embedded {
let mig = file::parse(json)?;
if mig.name != *name {
return Err(MigrateError::Validation(format!(
"embedded entry key `{name}` doesn't match migration `name` field `{}`",
mig.name,
)));
}
all.push(mig);
}
all.sort_by(|a, b| a.name.cmp(&b.name));
file::validate_chain(&all, "embedded slice")?;
let applied = applied_set_pool_for(pool, ledger).await?;
let pending: Vec<Migration> = all
.into_iter()
.filter(|m| !applied.contains(&m.name))
.collect();
let mut newly = Vec::with_capacity(pending.len());
for mig in pending {
apply_one_pool(pool, &mig, ledger).await?;
newly.push(mig);
}
Ok(newly)
})
.await
}
async fn unapply_nonatomic_pool(
pool: &crate::sql::Pool,
target: &Migration,
inverted: &[Operation],
snapshot: &SchemaSnapshot,
ledger: &str,
) -> Result<(), MigrateError> {
tracing::info!(migration = %target.name, "unapplying (non-atomic, _pool)");
let mut deferred_fks: Vec<String> = Vec::new();
for op in inverted {
match op {
Operation::Schema(change) => {
let batch = render_changes_split(std::slice::from_ref(change), snapshot)
.map_err(MigrateError::Validation)?;
for stmt in batch.immediate {
crate::sql::raw_execute_pool(pool, &stmt, ::std::vec::Vec::new()).await?;
}
deferred_fks.extend(batch.deferred_fks);
}
Operation::Data(d) => {
crate::sql::raw_execute_pool(pool, &d.sql, ::std::vec::Vec::new()).await?;
}
}
}
for stmt in deferred_fks {
crate::sql::raw_execute_pool(pool, &stmt, ::std::vec::Vec::new()).await?;
}
let placeholder = pool.dialect().placeholder(1);
let delete_sql = format!("DELETE FROM {ledger} WHERE name = {placeholder}");
crate::sql::raw_execute_pool(
pool,
&delete_sql,
::std::vec![crate::core::SqlValue::String(target.name.clone())],
)
.await?;
Ok(())
}