#![cfg_attr(
feature = "postgres",
doc = r##"
# Example
Create own custom Migrator which only supports postgres and uses own unique
table name instead of default table name
```rust,no_run
use sqlx::{Pool, Postgres};
use sqlx_migrator::error::Error;
use sqlx_migrator::migration::{AppliedMigrationSqlRow, Migration};
use sqlx_migrator::migrator::{DatabaseOperation, Info, Migrate};
#[derive(Default)]
pub struct CustomMigrator {
migrations: Vec<Box<dyn Migration<Postgres>>>,
}
impl Info<Postgres, ()> for CustomMigrator {
fn migrations(&self) -> &Vec<Box<dyn Migration<Postgres>>> {
&self.migrations
}
fn migrations_mut(&mut self) -> &mut Vec<Box<dyn Migration<Postgres>>> {
&mut self.migrations
}
fn state(&self) -> &() {
&()
}
}
#[async_trait::async_trait]
impl DatabaseOperation<Postgres, ()> for CustomMigrator {
async fn ensure_migration_table_exists(
&self,
connection: &mut <Postgres as sqlx::Database>::Connection,
) -> Result<(), Error> {
sqlx::query(
"CREATE TABLE IF NOT EXISTS _custom_table_name (
id INT PRIMARY KEY NOT NULL GENERATED ALWAYS AS IDENTITY,
app TEXT NOT NULL,
name TEXT NOT NULL,
applied_time TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (app, name)
)",
)
.execute(connection)
.await?;
Ok(())
}
async fn drop_migration_table_if_exists(
&self,
connection: &mut <Postgres as sqlx::Database>::Connection,
) -> Result<(), Error> {
sqlx::query("DROP TABLE IF EXISTS _custom_table_name")
.execute(connection)
.await?;
Ok(())
}
async fn add_migration_to_db_table(
&self,
migration: &Box<dyn Migration<Postgres>>,
connection: &mut <Postgres as sqlx::Database>::Connection,
) -> Result<(), Error> {
sqlx::query("INSERT INTO _custom_table_name(app, name) VALUES ($1, $2)")
.bind(migration.app())
.bind(migration.name())
.execute(connection)
.await?;
Ok(())
}
async fn delete_migration_from_db_table(
&self,
migration: &Box<dyn Migration<Postgres>>,
connection: &mut <Postgres as sqlx::Database>::Connection,
) -> Result<(), Error> {
sqlx::query("DELETE FROM _custom_table_name WHERE app = $1 AND name = $2")
.bind(migration.app())
.bind(migration.name())
.execute(connection)
.await?;
Ok(())
}
async fn fetch_applied_migration_from_db(
&self,
connection: &mut <Postgres as sqlx::Database>::Connection,
) -> Result<Vec<AppliedMigrationSqlRow>, Error> {
Ok(sqlx::query_as::<_, AppliedMigrationSqlRow>(
"SELECT id, app, name, applied_time FROM _custom_table_name",
)
.fetch_all(connection)
.await?)
}
async fn lock(
&self,
connection: &mut <Postgres as sqlx::Database>::Connection,
) -> Result<(), Error> {
let (database_name,): (String,) = sqlx::query_as("SELECT CURRENT_DATABASE()")
.fetch_one(&mut *connection)
.await?;
let lock_id = i64::from(crc32fast::hash(database_name.as_bytes()));
sqlx::query("SELECT pg_advisory_lock($1)")
.bind(lock_id)
.execute(connection)
.await?;
Ok(())
}
async fn unlock(
&self,
connection: &mut <Postgres as sqlx::Database>::Connection,
) -> Result<(), Error> {
let (database_name,): (String,) = sqlx::query_as("SELECT CURRENT_DATABASE()")
.fetch_one(&mut *connection)
.await?;
let lock_id = i64::from(crc32fast::hash(database_name.as_bytes()));
sqlx::query("SELECT pg_advisory_unlock($1)")
.bind(lock_id)
.execute(connection)
.await?;
Ok(())
}
}
impl Migrate<Postgres, ()> for CustomMigrator {}
```
"##
)]
use std::collections::HashMap;
use sqlx::Connection;
use crate::error::Error;
use crate::migration::{AppliedMigrationSqlRow, Migration};
#[cfg(all(
any(feature = "postgres", feature = "mysql", feature = "sqlite"),
feature = "any"
))]
mod any;
#[cfg(feature = "mysql")]
mod mysql;
#[cfg(feature = "sqlite")]
mod sqlite;
#[cfg(feature = "postgres")]
mod postgres;
#[cfg(all(test, feature = "sqlite"))]
mod tests;
type BoxMigration<DB, State> = Box<dyn Migration<DB, State>>;
type MigrationVec<'migration, DB, State> = Vec<&'migration BoxMigration<DB, State>>;
type MigrationVecResult<'migration, DB, State> = Result<MigrationVec<'migration, DB, State>, Error>;
#[derive(Debug)]
enum PlanType {
Apply,
Revert,
}
#[derive(Debug)]
pub struct Plan {
plan_type: PlanType,
app_migration: Option<(String, Option<String>)>,
count: Option<usize>,
}
impl Plan {
fn new(
plan_type: PlanType,
app_migration: Option<(String, Option<String>)>,
count: Option<usize>,
) -> Self {
Self {
plan_type,
app_migration,
count,
}
}
#[must_use]
pub fn apply_all() -> Self {
Self::new(PlanType::Apply, None, None)
}
#[must_use]
pub fn apply_name(app: &str, name: &Option<String>) -> Self {
Self::new(PlanType::Apply, Some((app.to_string(), name.clone())), None)
}
#[must_use]
pub fn apply_count(count: usize) -> Self {
Self::new(PlanType::Apply, None, Some(count))
}
#[must_use]
pub fn revert_all() -> Self {
Self::new(PlanType::Revert, None, None)
}
#[must_use]
pub fn revert_name(app: &str, name: &Option<String>) -> Self {
Self::new(
PlanType::Revert,
Some((app.to_string(), name.clone())),
None,
)
}
#[must_use]
pub fn revert_count(count: usize) -> Self {
Self::new(PlanType::Revert, None, Some(count))
}
}
pub trait Info<DB, State> {
fn state(&self) -> &State;
fn migrations(&self) -> &Vec<BoxMigration<DB, State>>;
fn migrations_mut(&mut self) -> &mut Vec<BoxMigration<DB, State>>;
fn add_migrations(&mut self, migrations: Vec<BoxMigration<DB, State>>) {
for migration in migrations {
self.add_migration(migration);
}
}
fn add_migration(&mut self, migration: BoxMigration<DB, State>) {
let migration_parents = migration.parents();
let migration_replaces = migration.replaces();
let migration_run_before = migration.run_before();
if !self.migrations().contains(&migration) {
self.migrations_mut().push(migration);
for parent in migration_parents {
self.add_migration(parent);
}
for replace in migration_replaces {
self.add_migration(replace);
}
for run_before in migration_run_before {
self.add_migration(run_before);
}
}
}
}
#[async_trait::async_trait]
pub trait DatabaseOperation<DB, State>
where
DB: sqlx::Database,
{
async fn ensure_migration_table_exists(
&self,
connection: &mut <DB as sqlx::Database>::Connection,
) -> Result<(), Error>;
async fn drop_migration_table_if_exists(
&self,
connection: &mut <DB as sqlx::Database>::Connection,
) -> Result<(), Error>;
#[allow(clippy::borrowed_box)]
async fn add_migration_to_db_table(
&self,
migration: &BoxMigration<DB, State>,
connection: &mut <DB as sqlx::Database>::Connection,
) -> Result<(), Error>;
#[allow(clippy::borrowed_box)]
async fn delete_migration_from_db_table(
&self,
migration: &BoxMigration<DB, State>,
connection: &mut <DB as sqlx::Database>::Connection,
) -> Result<(), Error>;
async fn fetch_applied_migration_from_db(
&self,
connection: &mut <DB as sqlx::Database>::Connection,
) -> Result<Vec<AppliedMigrationSqlRow>, Error>;
async fn lock(&self, connection: &mut <DB as sqlx::Database>::Connection) -> Result<(), Error>;
async fn unlock(
&self,
connection: &mut <DB as sqlx::Database>::Connection,
) -> Result<(), Error>;
}
fn populate_recursive<'populate, DB, State>(
populate_hash_map: &mut HashMap<
&'populate BoxMigration<DB, State>,
Vec<&'populate BoxMigration<DB, State>>,
>,
key: &'populate BoxMigration<DB, State>,
value: &'populate BoxMigration<DB, State>,
) {
let populate_hash_map_vec = populate_hash_map.entry(key).or_default();
if !populate_hash_map_vec.contains(&value) {
populate_hash_map_vec.push(value);
if let Some(grand_values) = populate_hash_map.clone().get(value) {
for grand_value in grand_values {
populate_recursive(populate_hash_map, key, grand_value);
}
}
}
}
fn get_parent_recursive<DB, State>(with: &BoxMigration<DB, State>) -> Vec<BoxMigration<DB, State>> {
let mut parents = with.parents();
for parent in with.parents() {
parents.extend(get_parent_recursive(&parent));
}
parents
}
fn get_run_before_recursive<DB, State>(
with: &BoxMigration<DB, State>,
) -> Vec<BoxMigration<DB, State>> {
let mut run_before_list = with.run_before();
for run_before in with.run_before() {
run_before_list.extend(get_run_before_recursive(&run_before));
}
run_before_list
}
fn is_apply_related<DB, State>(
with: &BoxMigration<DB, State>,
migration: &BoxMigration<DB, State>,
) -> bool {
migration.replaces().iter().any(|migration_replace| {
migration_replace == with || is_apply_related(with, migration_replace)
}) || migration.run_before().iter().any(|migration_run_before| {
migration_run_before == with || is_apply_related(with, migration_run_before)
})
}
fn is_revert_related<DB, State>(
with: &BoxMigration<DB, State>,
migration: &BoxMigration<DB, State>,
) -> bool {
let parents = get_parent_recursive(migration);
parents.contains(with)
}
fn only_related_migration<DB, State>(
migration_list: &mut MigrationVec<DB, State>,
with_list: Vec<&BoxMigration<DB, State>>,
plan_type: &PlanType,
) {
let mut related_migrations = vec![];
match plan_type {
PlanType::Apply => {
for with in with_list {
if !related_migrations.contains(&with) {
related_migrations.push(with);
let with_parents = get_parent_recursive(with);
for &migration in migration_list.iter() {
if !related_migrations.contains(&migration)
&& (with_parents.contains(migration)
|| is_apply_related(with, migration))
{
related_migrations.push(migration);
}
}
}
}
}
PlanType::Revert => {
for with in with_list {
if !related_migrations.contains(&with) {
related_migrations.push(with);
let with_run_before = get_run_before_recursive(with);
for &migration in migration_list.iter() {
if !related_migrations.contains(&migration)
&& (with_run_before.contains(migration)
|| is_revert_related(with, migration))
{
related_migrations.push(migration);
}
}
}
}
}
}
migration_list.retain(|&migration| related_migrations.contains(&migration));
}
fn process_plan<DB, State>(
migration_list: &mut MigrationVec<DB, State>,
applied_migrations: &MigrationVec<DB, State>,
plan: &Plan,
) -> Result<(), Error>
where
DB: sqlx::Database,
{
match plan.plan_type {
PlanType::Apply => {
migration_list.retain(|migration| !applied_migrations.contains(migration));
}
PlanType::Revert => {
migration_list.retain(|migration| applied_migrations.contains(migration));
migration_list.reverse();
}
}
if let Some((app, migration_name)) = &plan.app_migration {
let position = if let Some(name) = migration_name {
let Some(pos) = migration_list
.iter()
.rposition(|migration| migration.app() == app && migration.name() == name)
else {
if migration_list
.iter()
.any(|migration| migration.app() == app)
{
return Err(Error::MigrationNameNotExists {
app: app.to_string(),
migration: name.to_string(),
});
}
return Err(Error::AppNameNotExists {
app: app.to_string(),
});
};
pos
} else {
let Some(pos) = migration_list
.iter()
.rposition(|migration| migration.app() == app)
else {
return Err(Error::AppNameNotExists {
app: app.to_string(),
});
};
pos
};
migration_list.truncate(position + 1);
let with_list = if migration_name.is_some() {
vec![migration_list[position]]
} else {
migration_list
.iter()
.filter(|pos_migration| pos_migration.app() == app)
.copied()
.collect::<Vec<_>>()
};
only_related_migration(migration_list, with_list, &plan.plan_type);
} else if let Some(count) = plan.count {
let actual_len = migration_list.len();
if count > actual_len {
return Err(Error::CountGreater { actual_len, count });
}
migration_list.truncate(count);
}
Ok(())
}
fn get_recursive<'get, DB, State>(
hash_map: &'get HashMap<BoxMigration<DB, State>, &'get BoxMigration<DB, State>>,
val: &'get BoxMigration<DB, State>,
) -> Vec<&'get BoxMigration<DB, State>> {
let mut recursive_vec = vec![val];
if let Some(&parent) = hash_map.get(val) {
recursive_vec.extend(get_recursive(hash_map, parent));
}
recursive_vec
}
#[async_trait::async_trait]
pub trait Migrate<DB, State>: Info<DB, State> + DatabaseOperation<DB, State> + Send + Sync
where
DB: sqlx::Database,
State: Send + Sync,
{
#[allow(clippy::too_many_lines)]
async fn generate_migration_plan(
&self,
plan: Option<&Plan>,
connection: &mut <DB as sqlx::Database>::Connection,
) -> MigrationVecResult<DB, State> {
tracing::debug!("fetching applied migrations");
self.ensure_migration_table_exists(connection).await?;
tracing::debug!("generating {:?} migration plan", plan);
let mut parent_due_to_replaces = HashMap::new();
for parent_migration in self.migrations() {
for child_migration in parent_migration.replaces() {
if parent_due_to_replaces
.insert(child_migration, parent_migration)
.is_some()
{
return Err(Error::MigrationReplacedMultipleTimes);
}
}
}
let mut replace_children = HashMap::<_, Vec<_>>::new();
for (child, &parent) in &parent_due_to_replaces {
replace_children.entry(parent).or_default().push(child);
}
for (child, &parent) in &parent_due_to_replaces {
populate_recursive(&mut replace_children, parent, child);
}
let mut parents_due_to_run_before = HashMap::<_, Vec<_>>::new();
for parent_migration in self.migrations() {
for run_before_migration in parent_migration.run_before() {
parents_due_to_run_before
.entry(run_before_migration)
.or_default()
.push(parent_migration);
}
}
let mut migration_list = Vec::new();
let original_migration_length = self.migrations().len();
while migration_list.len() != original_migration_length {
let loop_initial_migration_list_length = migration_list.len();
for migration in self.migrations() {
let all_required_added = !migration_list.contains(&migration)
&& migration
.parents()
.iter()
.all(|parent_migration| migration_list.contains(&parent_migration))
&& parents_due_to_run_before
.get(migration)
.unwrap_or(&vec![])
.iter()
.all(|run_before_migration| migration_list.contains(run_before_migration))
&& parent_due_to_replaces
.get(migration)
.map_or(true, |replace_migration| {
migration_list.contains(replace_migration)
})
&& replace_children.get(migration).map_or(true, |children| {
children.iter().all(|&child| {
child
.parents()
.iter()
.all(|child_parent| migration_list.contains(&child_parent));
parents_due_to_run_before
.get(child)
.unwrap_or(&vec![])
.iter()
.all(|run_before_migration| {
migration_list.contains(run_before_migration)
|| children.contains(run_before_migration)
})
})
});
if all_required_added {
migration_list.push(migration);
}
}
if loop_initial_migration_list_length == migration_list.len() {
return Err(Error::FailedToCreateMigrationPlan);
}
}
if let Some(some_plan) = plan {
let applied_migration_sql_rows =
self.fetch_applied_migration_from_db(connection).await?;
let mut applied_migrations = Vec::new();
for migration in self.migrations() {
if applied_migration_sql_rows
.iter()
.any(|sqlx_migration| sqlx_migration == migration)
{
applied_migrations.push(migration);
}
}
for &migration in &applied_migrations {
let mut parents = vec![];
if let Some(run_before_list) = parents_due_to_run_before.get(migration) {
for &run_before in run_before_list {
parents.push(run_before);
}
}
let main_parents = migration.parents();
for parent in &main_parents {
parents.push(parent);
}
for parent in parents {
let recursive_vec = get_recursive(&parent_due_to_replaces, parent);
if !applied_migrations
.iter()
.any(|applied| recursive_vec.contains(applied))
{
return Err(Error::ParentIsNotApplied);
}
}
}
for migration in migration_list.clone() {
if let Some(children) = replace_children.get(&migration) {
let replaces_applied = children
.iter()
.any(|&replace_migration| applied_migrations.contains(&replace_migration));
if replaces_applied {
if applied_migrations.contains(&migration) {
return Err(Error::BothMigrationTypeApplied);
}
migration_list.retain(|&plan_migration| migration != plan_migration);
} else {
for replaced_migration in children {
migration_list
.retain(|plan_migration| replaced_migration != plan_migration);
}
}
}
}
process_plan(&mut migration_list, &applied_migrations, some_plan)?;
}
Ok(migration_list)
}
async fn run(
&self,
connection: &mut <DB as sqlx::Database>::Connection,
plan: &Plan,
) -> Result<(), Error> {
tracing::debug!("running plan {:?}", plan);
self.lock(connection).await?;
for migration in self.generate_migration_plan(Some(plan), connection).await? {
match plan.plan_type {
PlanType::Apply => {
tracing::debug!("applying {} : {}", migration.app(), migration.name());
if migration.is_atomic() {
let mut transaction = connection.begin().await?;
for operation in migration.operations() {
operation.up(&mut transaction, self.state()).await?;
}
self.add_migration_to_db_table(migration, &mut transaction)
.await?;
transaction.commit().await?;
} else {
for operation in migration.operations() {
operation.up(connection, self.state()).await?;
}
self.add_migration_to_db_table(migration, connection)
.await?;
}
}
PlanType::Revert => {
tracing::debug!("reverting {} : {}", migration.app(), migration.name());
let mut operations = migration.operations();
operations.reverse();
if migration.is_atomic() {
let mut transaction = connection.begin().await?;
for operation in operations {
operation.down(&mut transaction, self.state()).await?;
}
self.delete_migration_from_db_table(migration, &mut transaction)
.await?;
transaction.commit().await?;
} else {
for operation in operations {
operation.down(connection, self.state()).await?;
}
self.delete_migration_from_db_table(migration, connection)
.await?;
}
}
};
}
self.unlock(connection).await?;
Ok(())
}
}
const DEFAULT_TABLE_NAME: &str = "_sqlx_migrator_migrations";
pub struct Migrator<DB, State> {
migrations: Vec<BoxMigration<DB, State>>,
table_name: String,
state: State,
}
impl<DB, State> Migrator<DB, State> {
pub fn new(state: State) -> Self {
Self {
migrations: Vec::default(),
table_name: DEFAULT_TABLE_NAME.to_string(),
state,
}
}
pub fn with_prefix(mut self, prefix: impl Into<String>) -> Result<Self, Error> {
let prefix_str = prefix.into();
if !prefix_str
.chars()
.all(|c| char::is_ascii_alphanumeric(&c) || c == '_')
{
return Err(Error::NonAsciiAlphaNumeric);
}
self.table_name = format!("_{prefix_str}{DEFAULT_TABLE_NAME}");
Ok(self)
}
#[must_use]
pub fn table_name(&self) -> &str {
&self.table_name
}
}
impl<DB, State> Default for Migrator<DB, State>
where
State: Default,
{
fn default() -> Self {
Self::new(State::default())
}
}
impl<DB, State> Info<DB, State> for Migrator<DB, State> {
fn state(&self) -> &State {
&self.state
}
fn migrations(&self) -> &Vec<BoxMigration<DB, State>> {
&self.migrations
}
fn migrations_mut(&mut self) -> &mut Vec<BoxMigration<DB, State>> {
&mut self.migrations
}
}
impl<DB, State> Migrate<DB, State> for Migrator<DB, State>
where
DB: sqlx::Database,
Self: DatabaseOperation<DB, State>,
State: Send + Sync,
{
}