#![cfg_attr(
feature = "postgres",
doc = r#"
# Example
Create own custom Migrator which only supports postgres and uses own unique
table name instead of default table name
```rust,no_run
use sqlx::{Database, Pool, Postgres};
use sqlx_migrator::error::Error;
use sqlx_migrator::migration::{AppliedMigrationSqlRow, Migration};
use sqlx_migrator::migrator::{DatabaseOperation, Info, Migrate};
use sqlx_migrator::sync::Synchronize;
#[derive(Default)]
pub struct CustomMigrator {
migrations: Vec<Box<dyn Migration<Postgres>>>,
}
impl Info<Postgres> for CustomMigrator {
fn migrations(&self) -> &Vec<Box<dyn Migration<Postgres>>> {
&self.migrations
}
fn migrations_mut(&mut self) -> &mut Vec<Box<dyn Migration<Postgres>>> {
&mut self.migrations
}
}
#[async_trait::async_trait]
impl DatabaseOperation<Postgres> for CustomMigrator {
async fn ensure_migration_table_exists(
&self,
connection: &mut <Postgres as sqlx::Database>::Connection,
) -> Result<(), Error> {
sqlx::query(
"CREATE TABLE IF NOT EXISTS _custom_table_name (
id INT PRIMARY KEY NOT NULL GENERATED ALWAYS AS IDENTITY,
app TEXT NOT NULL,
name TEXT NOT NULL,
applied_time TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (app, name)
)",
)
.execute(connection)
.await?;
Ok(())
}
async fn drop_migration_table_if_exists(
&self,
connection: &mut <Postgres as Database>::Connection,
) -> Result<(), Error> {
sqlx::query("DROP TABLE IF EXISTS _custom_table_name")
.execute(connection)
.await?;
Ok(())
}
async fn add_migration_to_db_table(
&self,
connection: &mut <Postgres as Database>::Connection,
migration: &Box<dyn Migration<Postgres>>,
) -> Result<(), Error> {
sqlx::query("INSERT INTO _custom_table_name(app, name) VALUES ($1, $2)")
.bind(migration.app())
.bind(migration.name())
.execute(connection)
.await?;
Ok(())
}
async fn delete_migration_from_db_table(
&self,
connection: &mut <Postgres as Database>::Connection,
migration: &Box<dyn Migration<Postgres>>,
) -> Result<(), Error> {
sqlx::query("DELETE FROM _custom_table_name WHERE app = $1 AND name = $2")
.bind(migration.app())
.bind(migration.name())
.execute(connection)
.await?;
Ok(())
}
async fn fetch_applied_migration_from_db(
&self,
connection: &mut <Postgres as Database>::Connection,
) -> Result<Vec<AppliedMigrationSqlRow>, Error> {
Ok(sqlx::query_as::<_, AppliedMigrationSqlRow>(
"SELECT id, app, name, applied_time FROM _custom_table_name",
)
.fetch_all(connection)
.await?)
}
async fn lock(
&self,
connection: &mut <Postgres as Database>::Connection,
) -> Result<(), Error> {
let (database_name,): (String,) = sqlx::query_as("SELECT CURRENT_DATABASE()")
.fetch_one(&mut *connection)
.await?;
let lock_id = i64::from(crc32fast::hash(database_name.as_bytes()));
sqlx::query("SELECT pg_advisory_lock($1)")
.bind(lock_id)
.execute(connection)
.await?;
Ok(())
}
async fn unlock(
&self,
connection: &mut <Postgres as Database>::Connection,
) -> Result<(), Error> {
let (database_name,): (String,) = sqlx::query_as("SELECT CURRENT_DATABASE()")
.fetch_one(&mut *connection)
.await?;
let lock_id = i64::from(crc32fast::hash(database_name.as_bytes()));
sqlx::query("SELECT pg_advisory_unlock($1)")
.bind(lock_id)
.execute(connection)
.await?;
Ok(())
}
}
impl Migrate<Postgres> for CustomMigrator {}
impl Synchronize<Postgres> for CustomMigrator {}
```
"#
)]
use std::collections::HashMap;
use sqlx::{Connection, Database};
use crate::error::Error;
use crate::migration::{AppliedMigrationSqlRow, Migration};
#[cfg(all(
any(feature = "postgres", feature = "mysql", feature = "sqlite"),
feature = "any"
))]
mod any;
#[cfg(feature = "mysql")]
mod mysql;
#[cfg(feature = "sqlite")]
mod sqlite;
#[cfg(feature = "postgres")]
mod postgres;
#[cfg(all(test, feature = "sqlite"))]
mod tests;
type BoxMigration<DB> = Box<dyn Migration<DB>>;
type MigrationVec<'migration, DB> = Vec<&'migration BoxMigration<DB>>;
type MigrationVecResult<'migration, DB> = Result<MigrationVec<'migration, DB>, Error>;
#[derive(Debug)]
enum PlanType {
Apply,
Revert,
}
#[derive(Debug)]
pub struct Plan {
#[expect(
clippy::struct_field_names,
reason = "type is a keyword so it cannot be used"
)]
plan_type: PlanType,
app_migration: Option<(String, Option<String>)>,
count: Option<usize>,
fake: bool,
}
impl Plan {
fn new(
plan_type: PlanType,
app_migration: Option<(String, Option<String>)>,
count: Option<usize>,
) -> Self {
Self {
plan_type,
app_migration,
count,
fake: false,
}
}
#[must_use]
pub fn fake(self, fake: bool) -> Self {
let mut plan = self;
plan.fake = fake;
plan
}
#[must_use]
pub fn apply_all() -> Self {
Self::new(PlanType::Apply, None, None)
}
#[must_use]
pub fn apply_name(app: &str, name: &Option<String>) -> Self {
Self::new(PlanType::Apply, Some((app.to_string(), name.clone())), None)
}
#[must_use]
pub fn apply_count(count: usize) -> Self {
Self::new(PlanType::Apply, None, Some(count))
}
#[must_use]
pub fn revert_all() -> Self {
Self::new(PlanType::Revert, None, None)
}
#[must_use]
pub fn revert_name(app: &str, name: &Option<String>) -> Self {
Self::new(
PlanType::Revert,
Some((app.to_string(), name.clone())),
None,
)
}
#[must_use]
pub fn revert_count(count: usize) -> Self {
Self::new(PlanType::Revert, None, Some(count))
}
}
pub trait Info<DB> {
fn migrations(&self) -> &Vec<BoxMigration<DB>>;
fn migrations_mut(&mut self) -> &mut Vec<BoxMigration<DB>>;
fn add_migrations(&mut self, migrations: Vec<BoxMigration<DB>>) -> Result<(), Error> {
for migration in migrations {
self.add_migration(migration)?;
}
Ok(())
}
fn add_migration(&mut self, migration: BoxMigration<DB>) -> Result<(), Error> {
if migration.is_virtual() {
if !migration.parents().is_empty()
|| !migration.operations().is_empty()
|| !migration.replaces().is_empty()
|| !migration.run_before().is_empty()
{
return Err(Error::InvalidVirtualMigration);
}
} else if let Some((migration_index, found_migration)) = self
.migrations()
.iter()
.enumerate()
.find(|(_, elem)| elem == &&migration)
{
if found_migration.is_virtual() {
self.migrations_mut().remove(migration_index);
}
else if found_migration.parents() != migration.parents()
|| found_migration.operations().len() != migration.operations().len()
|| found_migration.replaces() != migration.replaces()
|| found_migration.run_before() != migration.run_before()
|| found_migration.is_atomic() != migration.is_atomic()
{
return Err(Error::InconsistentMigration {
app: migration.app().to_string(),
name: migration.name().to_string(),
});
}
}
if !self.migrations().contains(&migration) {
if migration.is_virtual() {
self.migrations_mut().push(migration);
} else {
let migration_parents = migration.parents();
let migration_replaces = migration.replaces();
let migration_run_before = migration.run_before();
self.migrations_mut().push(migration);
for parent in migration_parents {
self.add_migration(parent)?;
}
for replace in migration_replaces {
self.add_migration(replace)?;
}
for run_before in migration_run_before {
self.add_migration(run_before)?;
}
}
}
Ok(())
}
}
#[async_trait::async_trait]
pub trait DatabaseOperation<DB>
where
DB: Database,
{
async fn ensure_migration_table_exists(
&self,
connection: &mut <DB as Database>::Connection,
) -> Result<(), Error>;
async fn drop_migration_table_if_exists(
&self,
connection: &mut <DB as Database>::Connection,
) -> Result<(), Error>;
async fn add_migration_to_db_table(
&self,
connection: &mut <DB as Database>::Connection,
migration: &BoxMigration<DB>,
) -> Result<(), Error>;
async fn delete_migration_from_db_table(
&self,
connection: &mut <DB as Database>::Connection,
migration: &BoxMigration<DB>,
) -> Result<(), Error>;
async fn fetch_applied_migration_from_db(
&self,
connection: &mut <DB as Database>::Connection,
) -> Result<Vec<AppliedMigrationSqlRow>, Error>;
async fn lock(&self, connection: &mut <DB as Database>::Connection) -> Result<(), Error>;
async fn unlock(&self, connection: &mut <DB as Database>::Connection) -> Result<(), Error>;
}
fn populate_replace_recursive<'populate, DB>(
replace_hash_map: &mut HashMap<&'populate BoxMigration<DB>, Vec<&'populate BoxMigration<DB>>>,
key: &'populate BoxMigration<DB>,
value: &'populate BoxMigration<DB>,
) -> Result<(), Error> {
if key == value {
return Err(Error::PlanError {
message: "two migrations replaces each other".to_string(),
});
}
let replace_hash_map_vec = replace_hash_map.entry(key).or_default();
if !replace_hash_map_vec.contains(&value) {
replace_hash_map_vec.push(value);
}
if let Some(grand_values) = replace_hash_map.clone().get(value) {
for grand_value in grand_values {
populate_replace_recursive(replace_hash_map, key, grand_value)?;
}
}
Ok(())
}
fn get_parent_recursive<DB>(
migration: &BoxMigration<DB>,
original_migration: &[BoxMigration<DB>],
) -> Result<Vec<BoxMigration<DB>>, Error> {
let mut parents: Vec<BoxMigration<DB>> = vec![];
for parent in migration.parents() {
parents.push(Box::new((
parent.app().to_string(),
parent.name().to_string(),
)));
let found_parent = if parent.is_virtual() {
original_migration
.iter()
.find(|&search_parent| search_parent == &parent)
.ok_or(Error::PlanError {
message: "failed to find parent non virtual migration".to_string(),
})?
} else {
&parent
};
parents.extend(get_parent_recursive(found_parent, original_migration)?);
}
Ok(parents)
}
fn get_run_before_recursive<DB>(
migration: &BoxMigration<DB>,
original_migration: &[BoxMigration<DB>],
) -> Result<Vec<BoxMigration<DB>>, Error> {
let mut run_before_list: Vec<BoxMigration<DB>> = vec![];
for run_before in migration.run_before() {
run_before_list.push(Box::new((
run_before.app().to_string(),
run_before.name().to_string(),
)));
let found_run_before = if run_before.is_virtual() {
original_migration
.iter()
.find(|&search_run_before| search_run_before == &run_before)
.ok_or(Error::PlanError {
message: "failed to find run before non virtual migration".to_string(),
})?
} else {
&run_before
};
run_before_list.extend(get_parent_recursive(found_run_before, original_migration)?);
}
Ok(run_before_list)
}
fn only_related_migration<DB>(
migration_list: &mut MigrationVec<DB>,
with_list: Vec<&BoxMigration<DB>>,
plan_type: &PlanType,
original_migration: &[BoxMigration<DB>],
) -> Result<(), Error> {
let mut related_migrations = vec![];
for with in with_list {
if !related_migrations.contains(&with) {
related_migrations.push(with);
match plan_type {
PlanType::Apply => {
let with_parents = get_parent_recursive(with, original_migration)?;
for &migration in migration_list.iter() {
if !related_migrations.contains(&migration)
&& (with_parents.contains(migration)
|| get_run_before_recursive(migration, original_migration)?
.contains(with))
{
related_migrations.push(migration);
}
}
}
PlanType::Revert => {
let with_run_before = get_run_before_recursive(with, original_migration)?;
for &migration in migration_list.iter() {
if !related_migrations.contains(&migration)
&& (with_run_before.contains(migration)
|| get_parent_recursive(migration, original_migration)?
.contains(with))
{
related_migrations.push(migration);
}
}
}
}
}
}
migration_list.retain(|&migration| related_migrations.contains(&migration));
Ok(())
}
fn process_plan<DB>(
migration_list: &mut MigrationVec<DB>,
applied_migrations: &MigrationVec<DB>,
plan: &Plan,
original_migration: &[BoxMigration<DB>],
) -> Result<(), Error>
where
DB: Database,
{
match plan.plan_type {
PlanType::Apply => {
migration_list.retain(|migration| !applied_migrations.contains(migration));
}
PlanType::Revert => {
migration_list.retain(|migration| applied_migrations.contains(migration));
migration_list.reverse();
}
}
if let Some((app, migration_name)) = &plan.app_migration {
let position = if let Some(name) = migration_name {
let Some(pos) = migration_list
.iter()
.rposition(|migration| migration.app() == app && migration.name() == name)
else {
if migration_list
.iter()
.any(|migration| migration.app() == app)
{
return Err(Error::PlanError {
message: format!("migration {app}:{name} doesn't exists for app"),
});
}
return Err(Error::PlanError {
message: format!("app {app} doesn't exists"),
});
};
pos
} else {
let Some(pos) = migration_list
.iter()
.rposition(|migration| migration.app() == app)
else {
return Err(Error::PlanError {
message: format!("app {app} doesn't exists"),
});
};
pos
};
migration_list.truncate(position + 1);
let with_list = if migration_name.is_some() {
vec![migration_list[position]]
} else {
migration_list
.iter()
.filter(|pos_migration| pos_migration.app() == app)
.copied()
.collect::<Vec<_>>()
};
only_related_migration(
migration_list,
with_list,
&plan.plan_type,
original_migration,
)?;
} else if let Some(count) = plan.count {
let actual_len = migration_list.len();
if count > actual_len {
return Err(Error::PlanError {
message: format!(
"passed count value is larger than migration length: {actual_len}"
),
});
}
migration_list.truncate(count);
}
Ok(())
}
fn get_recursive<'get, DB>(
hash_map: &'get HashMap<BoxMigration<DB>, &'get BoxMigration<DB>>,
val: &'get BoxMigration<DB>,
) -> Vec<&'get BoxMigration<DB>> {
let mut recursive_vec = vec![val];
if let Some(&parent) = hash_map.get(val) {
recursive_vec.extend(get_recursive(hash_map, parent));
}
recursive_vec
}
#[async_trait::async_trait]
pub trait Migrate<DB>: Info<DB> + DatabaseOperation<DB> + Send + Sync
where
DB: Database,
{
#[expect(clippy::too_many_lines)]
async fn generate_migration_plan(
&self,
connection: &mut <DB as Database>::Connection,
plan: Option<&Plan>,
) -> MigrationVecResult<DB> {
if self.migrations().is_empty() {
return Err(Error::PlanError {
message: "no migration are added to migration list".to_string(),
});
}
if self
.migrations()
.iter()
.any(|migration| migration.is_virtual())
{
return Err(Error::PlanError {
message: "virtual migrations which is not replaced is present".to_string(),
});
}
tracing::debug!("generating {:?} migration plan", plan);
let mut replaces_child_parent_hash_map = HashMap::new();
for parent_migration in self.migrations() {
for child_migration in parent_migration.replaces() {
let child_name = format!("{}:{}", child_migration.app(), child_migration.name());
if replaces_child_parent_hash_map
.insert(child_migration, parent_migration)
.is_some()
{
return Err(Error::PlanError {
message: format!("migration {child_name} replaced multiple times",),
});
}
}
}
let mut replace_children = HashMap::<_, Vec<_>>::new();
for (child, &parent) in &replaces_child_parent_hash_map {
let children_migration = if child.is_virtual() {
self.migrations()
.iter()
.find(|&search_migration| search_migration == child)
.ok_or(Error::PlanError {
message: "Failed finding non virtual migration for virtual migration"
.to_string(),
})?
} else {
child
};
replace_children
.entry(parent)
.or_default()
.push(children_migration);
}
for (child, &parent) in &replaces_child_parent_hash_map {
let children_migration = if child.is_virtual() {
self.migrations()
.iter()
.find(|&search_migration| search_migration == child)
.ok_or(Error::PlanError {
message: "Failed finding non virtual migration for virtual migration"
.to_string(),
})?
} else {
child
};
populate_replace_recursive(&mut replace_children, parent, children_migration)?;
}
let mut run_before_child_parent_hash_map = HashMap::<_, Vec<_>>::new();
for parent_migration in self.migrations() {
for run_before_migration in parent_migration.run_before() {
run_before_child_parent_hash_map
.entry(run_before_migration)
.or_default()
.push(parent_migration);
}
}
let mut migration_list = Vec::new();
let original_migration_length = self.migrations().len();
while migration_list.len() != original_migration_length {
let loop_initial_migration_list_length = migration_list.len();
for migration in self.migrations() {
let all_required_added = !migration_list.contains(&migration)
&& migration
.parents()
.iter()
.all(|parent_migration| migration_list.contains(&parent_migration))
&& run_before_child_parent_hash_map
.get(migration)
.unwrap_or(&vec![])
.iter()
.all(|run_before_migration| migration_list.contains(run_before_migration))
&& replaces_child_parent_hash_map
.get(migration)
.is_none_or(|replace_migration| migration_list.contains(replace_migration))
&& replace_children.get(migration).is_none_or(|children| {
children.iter().all(|&child| {
child
.parents()
.iter()
.all(|child_parent| migration_list.contains(&child_parent))
&& run_before_child_parent_hash_map
.get(child)
.unwrap_or(&vec![])
.iter()
.all(|run_before_migration| {
migration_list.contains(run_before_migration)
|| children.contains(run_before_migration)
})
})
});
if all_required_added {
migration_list.push(migration);
}
}
if loop_initial_migration_list_length == migration_list.len() {
return Err(Error::PlanError {
message: "reached deadlock stage during plan generation".to_string(),
});
}
}
if let Some(some_plan) = plan {
self.ensure_migration_table_exists(connection).await?;
let applied_migration_sql_rows =
self.fetch_applied_migration_from_db(connection).await?;
let mut applied_migrations = Vec::new();
for migration in self.migrations() {
if applied_migration_sql_rows
.iter()
.any(|sqlx_migration| sqlx_migration == migration)
{
applied_migrations.push(migration);
}
}
for &migration in &applied_migrations {
let mut parents = vec![];
if let Some(run_before_list) = run_before_child_parent_hash_map.get(migration) {
for &run_before in run_before_list {
parents.push(run_before);
}
}
let main_parents = migration.parents();
for parent in &main_parents {
parents.push(parent);
}
for parent in parents {
let recursive_vec = get_recursive(&replaces_child_parent_hash_map, parent);
if !applied_migrations
.iter()
.any(|applied| recursive_vec.contains(applied))
{
return Err(Error::PlanError {
message: format!(
"children migration {}:{} applied before its parent migration \
{}:{}",
migration.app(),
migration.name(),
parent.app(),
parent.name()
),
});
}
}
}
for migration in migration_list.clone() {
if let Some(children) = replace_children.get(&migration) {
let replaces_applied = children
.iter()
.any(|&replace_migration| applied_migrations.contains(&replace_migration));
if replaces_applied {
if applied_migrations.contains(&migration) {
return Err(Error::PlanError {
message: format!(
"migration {}:{} and its replaces are applied together",
migration.app(),
migration.name(),
),
});
}
migration_list.retain(|&plan_migration| migration != plan_migration);
} else {
for replaced_migration in children {
migration_list
.retain(|plan_migration| replaced_migration != plan_migration);
}
}
}
}
process_plan(
&mut migration_list,
&applied_migrations,
some_plan,
self.migrations(),
)?;
}
Ok(migration_list)
}
async fn run(
&self,
connection: &mut <DB as Database>::Connection,
plan: &Plan,
) -> Result<(), Error> {
tracing::debug!("running plan {:?}", plan);
self.lock(connection).await?;
let result = async {
for migration in self.generate_migration_plan(connection, Some(plan)).await? {
match plan.plan_type {
PlanType::Apply => {
tracing::debug!("applying {} : {}", migration.app(), migration.name());
let operations = migration.operations();
if migration.is_atomic() {
let mut transaction = connection.begin().await?;
if !plan.fake {
for operation in operations {
operation.up(&mut transaction).await?;
}
}
self.add_migration_to_db_table(&mut transaction, migration)
.await?;
transaction.commit().await?;
} else {
if !plan.fake {
for operation in operations {
operation.up(connection).await?;
}
}
self.add_migration_to_db_table(connection, migration)
.await?;
}
}
PlanType::Revert => {
tracing::debug!("reverting {} : {}", migration.app(), migration.name());
let mut operations = migration.operations();
operations.reverse();
if migration.is_atomic() {
let mut transaction = connection.begin().await?;
if !plan.fake {
for operation in operations {
operation.down(&mut transaction).await?;
}
}
self.delete_migration_from_db_table(&mut transaction, migration)
.await?;
transaction.commit().await?;
} else {
if !plan.fake {
for operation in operations {
operation.down(connection).await?;
}
}
self.delete_migration_from_db_table(connection, migration)
.await?;
}
}
}
}
Ok(())
}
.await;
self.unlock(connection).await?;
result
}
}
const DEFAULT_TABLE_NAME: &str = "_sqlx_migrator_migrations";
pub struct Migrator<DB> {
migrations: Vec<BoxMigration<DB>>,
table_prefix: Option<String>,
schema: Option<String>,
}
impl<DB> Migrator<DB> {
#[must_use]
pub fn new() -> Self {
Self {
migrations: Vec::default(),
table_prefix: None,
schema: None,
}
}
pub fn set_table_prefix(mut self, prefix: impl Into<String>) -> Result<Self, Error> {
let prefix_str = prefix.into();
if prefix_str.is_empty()
|| !prefix_str
.chars()
.all(|c| char::is_ascii_lowercase(&c) || char::is_numeric(c) || c == '_')
{
return Err(Error::InvalidTablePrefix);
}
self.table_prefix = Some(prefix_str);
Ok(self)
}
pub fn set_schema(mut self, schema: impl Into<String>) -> Result<Self, Error> {
let schema_str = schema.into();
if schema_str.is_empty()
|| !schema_str
.chars()
.next()
.is_some_and(|c| char::is_ascii_lowercase(&c) || c == '_')
|| !schema_str
.chars()
.all(|c| char::is_ascii_lowercase(&c) || char::is_numeric(c) || c == '_')
{
return Err(Error::InvalidSchema);
}
self.schema = Some(schema_str);
Ok(self)
}
#[must_use]
pub fn table_name(&self) -> String {
let mut table_name = DEFAULT_TABLE_NAME.to_string();
if let Some(prefix) = &self.table_prefix {
table_name = format!("_{prefix}{table_name}");
}
if let Some(schema) = &self.schema {
table_name = format!("{schema}.{table_name}");
}
table_name
}
}
impl<DB> Default for Migrator<DB> {
fn default() -> Self {
Self::new()
}
}
impl<DB> Info<DB> for Migrator<DB> {
fn migrations(&self) -> &Vec<BoxMigration<DB>> {
&self.migrations
}
fn migrations_mut(&mut self) -> &mut Vec<BoxMigration<DB>> {
&mut self.migrations
}
}
impl<DB> Migrate<DB> for Migrator<DB>
where
DB: Database,
Self: DatabaseOperation<DB>,
{
}