sea-orm-migration 2.0.0-rc.38

Migration utility for SeaORM
Documentation
use std::collections::HashSet;
#[cfg(not(feature = "with-time"))]
use std::time::SystemTime;
use tracing::info;

use super::{Migration, MigrationStatus, queries::*};
use crate::{SchemaManager, seaql_migrations};
use sea_orm::sea_query::{
    Alias, Expr, ExprTrait, ForeignKey, IntoIden, Order, Query, Table, extension::postgres::Type,
};
use sea_orm::{
    ActiveValue, ConnectionTrait, DbBackend, DbErr, DynIden, EntityTrait, FromQueryResult,
    Iterable, QueryFilter, Schema, Statement, TransactionSession, TransactionTrait,
};

pub async fn get_migration_models<C>(
    db: &C,
    migration_table_name: DynIden,
) -> Result<Vec<seaql_migrations::Model>, DbErr>
where
    C: ConnectionTrait,
{
    let stmt = Query::select()
        .table_name(migration_table_name)
        .columns(seaql_migrations::Column::iter().map(IntoIden::into_iden))
        .order_by(seaql_migrations::Column::Version, Order::Asc)
        .take();

    db.query_all(&stmt)
        .await?
        .into_iter()
        .map(|row| seaql_migrations::Model::from_query_result(&row, ""))
        .collect()
}

pub fn get_migration_with_status(
    migration_files: Vec<Migration>,
    migration_models: Vec<seaql_migrations::Model>,
) -> Result<Vec<Migration>, DbErr> {
    let mut migration_files = migration_files;

    let migration_in_db: HashSet<String> = migration_models
        .into_iter()
        .map(|model| model.version)
        .collect();
    let migration_in_fs: HashSet<String> = migration_files
        .iter()
        .map(|file| file.migration.name().to_string())
        .collect();

    let pending_migrations = &migration_in_fs - &migration_in_db;
    for migration_file in migration_files.iter_mut() {
        if !pending_migrations.contains(migration_file.migration.name()) {
            migration_file.status = MigrationStatus::Applied;
        }
    }

    let missing_migrations_in_fs = &migration_in_db - &migration_in_fs;
    let errors: Vec<String> = missing_migrations_in_fs
            .iter()
            .map(|missing_migration| {
                format!("Migration file of version '{missing_migration}' is missing, this migration has been applied but its file is missing")
            }).collect();

    if !errors.is_empty() {
        Err(DbErr::Custom(errors.join("\n")))
    } else {
        Ok(migration_files)
    }
}

pub async fn install<C>(db: &C, migration_table_name: DynIden) -> Result<(), DbErr>
where
    C: ConnectionTrait,
{
    let builder = db.get_database_backend();
    let schema = Schema::new(builder);
    let mut stmt = schema
        .create_table_from_entity(seaql_migrations::Entity)
        .table_name(migration_table_name);
    stmt.if_not_exists();
    db.execute(&stmt).await?;
    Ok(())
}

pub async fn uninstall(
    manager: &SchemaManager<'_>,
    migration_table_name: DynIden,
) -> Result<(), DbErr> {
    let mut stmt = Table::drop();
    stmt.table(migration_table_name).if_exists().cascade();
    manager.drop_table(stmt).await?;
    Ok(())
}

pub async fn drop_everything<C: ConnectionTrait + TransactionTrait>(db: &C) -> Result<(), DbErr> {
    if db.get_database_backend() == DbBackend::Postgres {
        let transaction = db.begin().await?;
        drop_everything_impl(&transaction).await?;
        transaction.commit().await
    } else {
        drop_everything_impl(db).await
    }
}

async fn drop_everything_impl<C: ConnectionTrait>(db: &C) -> Result<(), DbErr> {
    let db_backend = db.get_database_backend();

    // Temporarily disable the foreign key check
    if db_backend == DbBackend::Sqlite {
        info!("Disabling foreign key check");
        db.execute_raw(Statement::from_string(
            db_backend,
            "PRAGMA foreign_keys = OFF".to_owned(),
        ))
        .await?;
        info!("Foreign key check disabled");
    }

    // Drop all foreign keys
    if db_backend == DbBackend::MySql {
        info!("Dropping all foreign keys");
        let stmt = query_mysql_foreign_keys(db);
        let rows = db.query_all(&stmt).await?;
        for row in rows.into_iter() {
            let constraint_name: String = row.try_get("", "CONSTRAINT_NAME")?;
            let table_name: String = row.try_get("", "TABLE_NAME")?;
            info!(
                "Dropping foreign key '{}' from table '{}'",
                constraint_name, table_name
            );
            let mut stmt = ForeignKey::drop();
            stmt.table(Alias::new(table_name.as_str()))
                .name(constraint_name.as_str());
            db.execute(&stmt).await?;
            info!("Foreign key '{}' has been dropped", constraint_name);
        }
        info!("All foreign keys dropped");
    }

    // Drop all tables
    let stmt = query_tables(db)?;
    let rows = db.query_all(&stmt).await?;
    for row in rows.into_iter() {
        let table_name: String = row.try_get("", "table_name")?;
        info!("Dropping table '{}'", table_name);
        let mut stmt = Table::drop();
        stmt.table(Alias::new(table_name.as_str()))
            .if_exists()
            .cascade();
        db.execute(&stmt).await?;
        info!("Table '{}' has been dropped", table_name);
    }

    // Drop all types
    if db_backend == DbBackend::Postgres {
        info!("Dropping all types");
        let stmt = query_pg_types(db);
        let rows = db.query_all(&stmt).await?;
        for row in rows {
            let type_name: String = row.try_get("", "typname")?;
            info!("Dropping type '{}'", type_name);
            let mut stmt = Type::drop();
            stmt.name(Alias::new(&type_name));
            db.execute(&stmt).await?;
            info!("Type '{}' has been dropped", type_name);
        }
    }

    // Restore the foreign key check
    if db_backend == DbBackend::Sqlite {
        info!("Restoring foreign key check");
        db.execute_raw(Statement::from_string(
            db_backend,
            "PRAGMA foreign_keys = ON".to_owned(),
        ))
        .await?;
        info!("Foreign key check restored");
    }

    Ok(())
}

fn should_use_transaction(migration: &dyn crate::MigrationTrait, backend: DbBackend) -> bool {
    match migration.use_transaction() {
        Some(v) => v,
        None => backend == DbBackend::Postgres,
    }
}

async fn insert_migration_record<C: ConnectionTrait>(
    db: &C,
    name: &str,
    migration_table_name: DynIden,
) -> Result<(), DbErr> {
    #[cfg(not(feature = "with-time"))]
    let applied_at = SystemTime::now()
        .duration_since(SystemTime::UNIX_EPOCH)
        .expect("SystemTime before UNIX EPOCH!")
        .as_secs() as i64;
    #[cfg(feature = "with-time")]
    let applied_at = sea_orm::prelude::TimeDateTimeWithTimeZone::now_utc().unix_timestamp();
    seaql_migrations::Entity::insert(seaql_migrations::ActiveModel {
        version: ActiveValue::Set(name.to_owned()),
        applied_at: ActiveValue::Set(applied_at),
    })
    .table_name(migration_table_name)
    .exec(db)
    .await?;
    Ok(())
}

async fn delete_migration_record<C: ConnectionTrait>(
    db: &C,
    name: &str,
    migration_table_name: DynIden,
) -> Result<(), DbErr> {
    seaql_migrations::Entity::delete_many()
        .filter(Expr::col(seaql_migrations::Column::Version).eq(name))
        .table_name(migration_table_name)
        .exec(db)
        .await?;
    Ok(())
}

pub async fn exec_up_with(
    manager: &SchemaManager<'_>,
    mut steps: Option<u32>,
    pending_migrations: Vec<Migration>,
    migration_table_name: DynIden,
) -> Result<(), DbErr> {
    let db = manager.get_connection();

    if let Some(steps) = steps {
        info!("Applying {} pending migrations", steps);
    } else {
        info!("Applying all pending migrations");
    }
    if pending_migrations.is_empty() {
        info!("No pending migrations");
    }

    for Migration { migration, .. } in pending_migrations {
        if let Some(steps) = steps.as_mut() {
            if steps == &0 {
                break;
            }
            *steps -= 1;
        }

        let use_txn = should_use_transaction(migration.as_ref(), db.get_database_backend());
        info!("Applying migration '{}'", migration.name());

        if use_txn {
            let transaction = db.begin().await?;
            let txn_manager = SchemaManager::new(&transaction);
            migration.up(&txn_manager).await?;
            info!("Migration '{}' has been applied", migration.name());
            insert_migration_record(&transaction, migration.name(), migration_table_name.clone())
                .await?;
            transaction.commit().await?;
        } else {
            migration.up(manager).await?;
            info!("Migration '{}' has been applied", migration.name());
            insert_migration_record(db, migration.name(), migration_table_name.clone()).await?;
        }
    }

    Ok(())
}

pub async fn exec_down_with(
    manager: &SchemaManager<'_>,
    mut steps: Option<u32>,
    applied_migrations: Vec<Migration>,
    migration_table_name: DynIden,
) -> Result<(), DbErr> {
    let db = manager.get_connection();

    if let Some(steps) = steps {
        info!("Rolling back {} applied migrations", steps);
    } else {
        info!("Rolling back all applied migrations");
    }
    if applied_migrations.is_empty() {
        info!("No applied migrations");
    }

    for Migration { migration, .. } in applied_migrations.into_iter().rev() {
        if let Some(steps) = steps.as_mut() {
            if steps == &0 {
                break;
            }
            *steps -= 1;
        }

        let use_txn = should_use_transaction(migration.as_ref(), db.get_database_backend());
        info!("Rolling back migration '{}'", migration.name());

        if use_txn {
            let transaction = db.begin().await?;
            let txn_manager = SchemaManager::new(&transaction);
            migration.down(&txn_manager).await?;
            info!("Migration '{}' has been rolled back", migration.name());
            delete_migration_record(&transaction, migration.name(), migration_table_name.clone())
                .await?;
            transaction.commit().await?;
        } else {
            migration.down(manager).await?;
            info!("Migration '{}' has been rolled back", migration.name());
            delete_migration_record(db, migration.name(), migration_table_name.clone()).await?;
        }
    }

    Ok(())
}