use chrono::Utc;
use rustrails_support::{database, runtime};
use sea_orm::{
ActiveModelTrait, ActiveValue::Set, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter,
};
use sea_orm_migration::{prelude::*, seaql_migrations};
const MIGRATION_TABLE: &str = "seaql_migrations";
pub trait Migration: Send + Sync {
fn name(&self) -> &str;
fn version(&self) -> &str;
async fn up(&self, manager: &SchemaManager<'_>) -> Result<(), sea_orm::DbErr>;
async fn down(&self, manager: &SchemaManager<'_>) -> Result<(), sea_orm::DbErr>;
}
pub struct Migrator {
migrations: Vec<Box<dyn MigrationTrait>>,
}
impl Migrator {
pub fn new() -> Self {
Self {
migrations: Vec::new(),
}
}
pub fn add(&mut self, migration: impl MigrationTrait + 'static) {
self.migrations.push(Box::new(migration));
}
pub async fn up(&self, db: &DatabaseConnection) -> Result<(), sea_orm::DbErr> {
let manager = SchemaManager::new(db);
ensure_migration_table(&manager).await?;
let mut applied = applied_versions(db).await?;
for migration in &self.migrations {
if applied.contains(migration.name()) {
continue;
}
run_up_migration(migration.as_ref(), &manager).await?;
record_applied_migration(migration.name(), db).await?;
applied.insert(migration.name().to_owned());
}
Ok(())
}
pub fn up_sync(&self) -> Result<(), sea_orm::DbErr> {
database::with_db(|db| runtime::block_on(self.up(db)))
}
pub async fn down(&self, db: &DatabaseConnection) -> Result<(), sea_orm::DbErr> {
let manager = SchemaManager::new(db);
ensure_migration_table(&manager).await?;
let mut applied = applied_versions(db).await?;
for migration in self.migrations.iter().rev() {
if !applied.contains(migration.name()) {
continue;
}
run_down_migration(migration.as_ref(), &manager).await?;
remove_applied_migration(migration.name(), db).await?;
applied.remove(migration.name());
}
Ok(())
}
pub fn down_sync(&self) -> Result<(), sea_orm::DbErr> {
database::with_db(|db| runtime::block_on(self.down(db)))
}
pub async fn status(&self, db: &DatabaseConnection) -> Vec<MigrationStatus> {
let manager = SchemaManager::new(db);
let applied = match ensure_migration_table(&manager).await {
Ok(()) => applied_versions(db).await.unwrap_or_default(),
Err(_) => std::collections::HashSet::new(),
};
self.migrations
.iter()
.map(|migration| MigrationStatus {
name: migration.name().to_owned(),
applied: applied.contains(migration.name()),
})
.collect()
}
pub fn status_sync(&self) -> Vec<MigrationStatus> {
database::with_db(|db| runtime::block_on(self.status(db)))
}
}
impl Default for Migrator {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MigrationStatus {
pub name: String,
pub applied: bool,
}
async fn ensure_migration_table(manager: &SchemaManager<'_>) -> Result<(), sea_orm::DbErr> {
manager
.create_table(
Table::create()
.table(Alias::new(MIGRATION_TABLE))
.if_not_exists()
.col(
ColumnDef::new(Alias::new("version"))
.string()
.not_null()
.primary_key(),
)
.col(
ColumnDef::new(Alias::new("applied_at"))
.big_integer()
.not_null(),
)
.to_owned(),
)
.await
}
async fn applied_versions(
db: &DatabaseConnection,
) -> Result<std::collections::HashSet<String>, sea_orm::DbErr> {
Ok(seaql_migrations::Entity::find()
.all(db)
.await?
.into_iter()
.map(|model| model.version)
.collect())
}
async fn record_applied_migration(
name: &str,
db: &DatabaseConnection,
) -> Result<(), sea_orm::DbErr> {
seaql_migrations::ActiveModel {
version: Set(name.to_owned()),
applied_at: Set(Utc::now().timestamp()),
}
.insert(db)
.await?;
Ok(())
}
async fn remove_applied_migration(
name: &str,
db: &DatabaseConnection,
) -> Result<(), sea_orm::DbErr> {
seaql_migrations::Entity::delete_many()
.filter(seaql_migrations::Column::Version.eq(name))
.exec(db)
.await?;
Ok(())
}
async fn run_up_migration(
migration: &dyn MigrationTrait,
manager: &SchemaManager<'_>,
) -> Result<(), sea_orm::DbErr> {
if should_wrap_in_transaction(migration, manager.get_database_backend()) {
let tx_manager = manager.begin().await?;
migration.up(&tx_manager).await?;
tx_manager.commit().await?;
Ok(())
} else {
migration.up(manager).await
}
}
async fn run_down_migration(
migration: &dyn MigrationTrait,
manager: &SchemaManager<'_>,
) -> Result<(), sea_orm::DbErr> {
if should_wrap_in_transaction(migration, manager.get_database_backend()) {
let tx_manager = manager.begin().await?;
migration.down(&tx_manager).await?;
tx_manager.commit().await?;
Ok(())
} else {
migration.down(manager).await
}
}
fn should_wrap_in_transaction(migration: &dyn MigrationTrait, backend: sea_orm::DbBackend) -> bool {
migration
.use_transaction()
.unwrap_or(matches!(backend, sea_orm::DbBackend::Postgres))
}
#[cfg(test)]
mod tests {
use rustrails_support::{database, runtime};
use sea_orm::{ConnectionTrait, Database, DbBackend, EntityTrait, Statement};
use sea_orm_migration::{prelude::*, seaql_migrations};
use super::{
MigrationStatus, Migrator, applied_versions, ensure_migration_table,
record_applied_migration, remove_applied_migration, should_wrap_in_transaction,
};
fn run_sync_migration_test(test: impl FnOnce() + Send + 'static) {
std::thread::spawn(move || {
let _rt = runtime::init_runtime();
database::establish("sqlite::memory:")
.expect("sqlite in-memory connection should succeed");
test();
})
.join()
.unwrap();
}
struct CreateWidgets;
impl MigrationName for CreateWidgets {
fn name(&self) -> &str {
"create_widgets"
}
}
#[async_trait::async_trait]
impl MigrationTrait for CreateWidgets {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Alias::new("widgets"))
.if_not_exists()
.col(
ColumnDef::new(Alias::new("id"))
.integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(ColumnDef::new(Alias::new("name")).string().not_null())
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(
Table::drop()
.table(Alias::new("widgets"))
.if_exists()
.to_owned(),
)
.await
}
}
struct CreateGadgets;
impl MigrationName for CreateGadgets {
fn name(&self) -> &str {
"create_gadgets"
}
}
#[async_trait::async_trait]
impl MigrationTrait for CreateGadgets {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Alias::new("gadgets"))
.if_not_exists()
.col(
ColumnDef::new(Alias::new("id"))
.integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(ColumnDef::new(Alias::new("title")).string().not_null())
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(
Table::drop()
.table(Alias::new("gadgets"))
.if_exists()
.to_owned(),
)
.await
}
}
struct AddWidgetDescription;
impl MigrationName for AddWidgetDescription {
fn name(&self) -> &str {
"add_widget_description"
}
}
#[async_trait::async_trait]
impl MigrationTrait for AddWidgetDescription {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared("ALTER TABLE widgets ADD COLUMN description TEXT")
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared("ALTER TABLE widgets DROP COLUMN description")
.await?;
Ok(())
}
}
struct RemoveWidgetName;
impl MigrationName for RemoveWidgetName {
fn name(&self) -> &str {
"remove_widget_name"
}
}
#[async_trait::async_trait]
impl MigrationTrait for RemoveWidgetName {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared("ALTER TABLE widgets DROP COLUMN name")
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared(
"ALTER TABLE widgets ADD COLUMN name VARCHAR NOT NULL DEFAULT ''",
)
.await?;
Ok(())
}
}
struct AddWidgetNameIndex;
impl MigrationName for AddWidgetNameIndex {
fn name(&self) -> &str {
"add_widget_name_index"
}
}
#[async_trait::async_trait]
impl MigrationTrait for AddWidgetNameIndex {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_index(
Index::create()
.name("idx_widgets_name")
.table(Alias::new("widgets"))
.col(Alias::new("name"))
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_index(
Index::drop()
.name("idx_widgets_name")
.table(Alias::new("widgets"))
.to_owned(),
)
.await
}
}
struct RemoveWidgetNameIndex;
impl MigrationName for RemoveWidgetNameIndex {
fn name(&self) -> &str {
"remove_widget_name_index"
}
}
#[async_trait::async_trait]
impl MigrationTrait for RemoveWidgetNameIndex {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_index(
Index::drop()
.name("idx_widgets_name")
.table(Alias::new("widgets"))
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_index(
Index::create()
.name("idx_widgets_name")
.table(Alias::new("widgets"))
.col(Alias::new("name"))
.to_owned(),
)
.await
}
}
struct RenameWidgetNameToTitle;
impl MigrationName for RenameWidgetNameToTitle {
fn name(&self) -> &str {
"rename_widget_name_to_title"
}
}
#[async_trait::async_trait]
impl MigrationTrait for RenameWidgetNameToTitle {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared("ALTER TABLE widgets RENAME COLUMN name TO title")
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared("ALTER TABLE widgets RENAME COLUMN title TO name")
.await?;
Ok(())
}
}
struct ChangeWidgetNameToText;
impl MigrationName for ChangeWidgetNameToText {
fn name(&self) -> &str {
"change_widget_name_to_text"
}
}
#[async_trait::async_trait]
impl MigrationTrait for ChangeWidgetNameToText {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared(
"ALTER TABLE widgets ADD COLUMN name_text TEXT NOT NULL DEFAULT ''",
)
.await?;
manager
.get_connection()
.execute_unprepared("UPDATE widgets SET name_text = name")
.await?;
manager
.get_connection()
.execute_unprepared("ALTER TABLE widgets DROP COLUMN name")
.await?;
manager
.get_connection()
.execute_unprepared("ALTER TABLE widgets RENAME COLUMN name_text TO name")
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared(
"ALTER TABLE widgets ADD COLUMN name_string VARCHAR NOT NULL DEFAULT ''",
)
.await?;
manager
.get_connection()
.execute_unprepared("UPDATE widgets SET name_string = name")
.await?;
manager
.get_connection()
.execute_unprepared("ALTER TABLE widgets DROP COLUMN name")
.await?;
manager
.get_connection()
.execute_unprepared("ALTER TABLE widgets RENAME COLUMN name_string TO name")
.await?;
Ok(())
}
}
struct FirstRollbackStep;
impl MigrationName for FirstRollbackStep {
fn name(&self) -> &str {
"first_rollback_step"
}
}
#[async_trait::async_trait]
impl MigrationTrait for FirstRollbackStep {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Alias::new("first_step"))
.if_not_exists()
.col(
ColumnDef::new(Alias::new("id"))
.integer()
.not_null()
.auto_increment()
.primary_key(),
)
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared("INSERT INTO rollback_log (step) VALUES ('first')")
.await?;
manager
.drop_table(
Table::drop()
.table(Alias::new("first_step"))
.if_exists()
.to_owned(),
)
.await
}
}
struct SecondRollbackStep;
impl MigrationName for SecondRollbackStep {
fn name(&self) -> &str {
"second_rollback_step"
}
}
#[async_trait::async_trait]
impl MigrationTrait for SecondRollbackStep {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Alias::new("second_step"))
.if_not_exists()
.col(
ColumnDef::new(Alias::new("id"))
.integer()
.not_null()
.auto_increment()
.primary_key(),
)
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared("INSERT INTO rollback_log (step) VALUES ('second')")
.await?;
manager
.drop_table(
Table::drop()
.table(Alias::new("second_step"))
.if_exists()
.to_owned(),
)
.await
}
}
struct InsertAuditThenFailWrapped;
impl MigrationName for InsertAuditThenFailWrapped {
fn name(&self) -> &str {
"insert_audit_then_fail_wrapped"
}
}
#[async_trait::async_trait]
impl MigrationTrait for InsertAuditThenFailWrapped {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared("INSERT INTO audit_entries (message) VALUES ('wrapped')")
.await?;
Err(DbErr::Custom("wrapped migration failure".to_owned()))
}
async fn down(&self, _: &SchemaManager) -> Result<(), DbErr> {
Ok(())
}
fn use_transaction(&self) -> Option<bool> {
Some(true)
}
}
struct InsertAuditThenFailUnwrapped;
impl MigrationName for InsertAuditThenFailUnwrapped {
fn name(&self) -> &str {
"insert_audit_then_fail_unwrapped"
}
}
#[async_trait::async_trait]
impl MigrationTrait for InsertAuditThenFailUnwrapped {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared("INSERT INTO audit_entries (message) VALUES ('unwrapped')")
.await?;
Err(DbErr::Custom("unwrapped migration failure".to_owned()))
}
async fn down(&self, _: &SchemaManager) -> Result<(), DbErr> {
Ok(())
}
fn use_transaction(&self) -> Option<bool> {
Some(false)
}
}
struct TransactionPreferenceMigration {
preference: Option<bool>,
}
impl MigrationName for TransactionPreferenceMigration {
fn name(&self) -> &str {
"transaction_preference_migration"
}
}
#[async_trait::async_trait]
impl MigrationTrait for TransactionPreferenceMigration {
async fn up(&self, _: &SchemaManager) -> Result<(), DbErr> {
Ok(())
}
async fn down(&self, _: &SchemaManager) -> Result<(), DbErr> {
Ok(())
}
fn use_transaction(&self) -> Option<bool> {
self.preference
}
}
async fn setup_db() -> sea_orm::DatabaseConnection {
Database::connect("sqlite::memory:")
.await
.expect("in-memory sqlite connection should succeed")
}
async fn table_exists(
db: &sea_orm::DatabaseConnection,
table_name: &str,
) -> Result<bool, sea_orm::DbErr> {
Ok(db
.query_one_raw(Statement::from_sql_and_values(
db.get_database_backend(),
"SELECT name FROM sqlite_master WHERE type = ? AND name = ?",
["table".into(), table_name.into()],
))
.await?
.is_some())
}
async fn column_exists(
db: &sea_orm::DatabaseConnection,
table_name: &str,
column_name: &str,
) -> bool {
db.query_one_raw(Statement::from_string(
db.get_database_backend(),
format!(
"SELECT name FROM pragma_table_info('{table_name}') WHERE name = '{column_name}'"
),
))
.await
.expect("column existence query should work")
.is_some()
}
async fn column_type(
db: &sea_orm::DatabaseConnection,
table_name: &str,
column_name: &str,
) -> Option<String> {
db.query_one_raw(Statement::from_string(
db.get_database_backend(),
format!(
"SELECT type FROM pragma_table_info('{table_name}') WHERE name = '{column_name}'"
),
))
.await
.expect("column type query should work")
.map(|row| row.try_get("", "type").expect("type should be readable"))
}
async fn index_exists(
db: &sea_orm::DatabaseConnection,
table_name: &str,
index_name: &str,
) -> bool {
db.query_one_raw(Statement::from_string(
db.get_database_backend(),
format!(
"SELECT name FROM pragma_index_list('{table_name}') WHERE name = '{index_name}'"
),
))
.await
.expect("index existence query should work")
.is_some()
}
async fn table_row_count(db: &sea_orm::DatabaseConnection, table_name: &str) -> i64 {
db.query_one_raw(Statement::from_string(
db.get_database_backend(),
format!("SELECT COUNT(*) AS count FROM {table_name}"),
))
.await
.expect("row count query should work")
.expect("count row should exist")
.try_get("", "count")
.expect("count should be readable")
}
async fn rollback_steps(db: &sea_orm::DatabaseConnection) -> Vec<String> {
db.query_all_raw(Statement::from_string(
db.get_database_backend(),
"SELECT step FROM rollback_log ORDER BY id".to_owned(),
))
.await
.expect("rollback log query should work")
.into_iter()
.map(|row| row.try_get("", "step").expect("step should be readable"))
.collect()
}
#[tokio::test]
async fn new_migrator_starts_empty() {
let migrator = Migrator::new();
let db = setup_db().await;
let statuses = migrator.status(&db).await;
assert!(statuses.is_empty());
}
#[tokio::test]
async fn up_applies_registered_migrations() {
let db = setup_db().await;
let mut migrator = Migrator::new();
migrator.add(CreateWidgets);
migrator.up(&db).await.expect("migration should apply");
assert!(
table_exists(&db, "widgets")
.await
.expect("table existence query should work")
);
}
#[test]
fn up_sync_applies_registered_migrations() {
run_sync_migration_test(|| {
let mut migrator = Migrator::new();
migrator.add(CreateWidgets);
migrator.up_sync().expect("migration should apply");
let widgets_exist = runtime::block_on(async {
let db = database::db();
table_exists(&db, "widgets")
.await
.expect("table existence query should work")
});
assert!(widgets_exist);
});
}
#[tokio::test]
async fn down_rolls_back_applied_migrations() {
let db = setup_db().await;
let mut migrator = Migrator::new();
migrator.add(CreateWidgets);
migrator.up(&db).await.expect("migration should apply");
migrator.down(&db).await.expect("migration should rollback");
assert!(
!table_exists(&db, "widgets")
.await
.expect("table existence query should work")
);
}
#[test]
fn down_sync_rolls_back_applied_migrations() {
run_sync_migration_test(|| {
let mut migrator = Migrator::new();
migrator.add(CreateWidgets);
migrator.up_sync().expect("migration should apply");
migrator.down_sync().expect("migration should rollback");
let widgets_exist = runtime::block_on(async {
let db = database::db();
table_exists(&db, "widgets")
.await
.expect("table existence query should work")
});
assert!(!widgets_exist);
});
}
#[tokio::test]
async fn status_marks_pending_and_applied_migrations() {
let db = setup_db().await;
let mut migrator = Migrator::new();
migrator.add(CreateWidgets);
let before = migrator.status(&db).await;
assert_eq!(
before,
vec![MigrationStatus {
name: CreateWidgets.name().to_owned(),
applied: false,
}]
);
migrator.up(&db).await.expect("migration should apply");
let after = migrator.status(&db).await;
assert_eq!(
after,
vec![MigrationStatus {
name: CreateWidgets.name().to_owned(),
applied: true,
}]
);
}
#[test]
fn status_sync_marks_pending_and_applied_migrations() {
run_sync_migration_test(|| {
let mut migrator = Migrator::new();
migrator.add(CreateWidgets);
assert_eq!(
migrator.status_sync(),
vec![MigrationStatus {
name: CreateWidgets.name().to_owned(),
applied: false,
}]
);
migrator.up_sync().expect("migration should apply");
assert_eq!(
migrator.status_sync(),
vec![MigrationStatus {
name: CreateWidgets.name().to_owned(),
applied: true,
}]
);
});
}
#[tokio::test]
async fn repeated_up_skips_already_applied_migrations() {
let db = setup_db().await;
let mut migrator = Migrator::new();
migrator.add(CreateWidgets);
migrator.up(&db).await.expect("first up should succeed");
migrator.up(&db).await.expect("second up should be a no-op");
let applied = seaql_migrations::Entity::find()
.all(&db)
.await
.expect("migration rows should load");
assert_eq!(applied.len(), 1);
}
#[tokio::test]
async fn default_migrator_matches_new() {
let db = setup_db().await;
let migrator = Migrator::default();
assert_eq!(
migrator.status(&db).await,
Migrator::new().status(&db).await
);
}
#[tokio::test]
async fn ensure_migration_table_creates_tracking_table() {
let db = setup_db().await;
let manager = SchemaManager::new(&db);
ensure_migration_table(&manager)
.await
.expect("tracking table should be created");
assert!(
table_exists(&db, "seaql_migrations")
.await
.expect("table check should work")
);
}
#[tokio::test]
async fn ensure_migration_table_is_idempotent() {
let db = setup_db().await;
let manager = SchemaManager::new(&db);
ensure_migration_table(&manager)
.await
.expect("first create should succeed");
ensure_migration_table(&manager)
.await
.expect("second create should succeed");
assert!(
table_exists(&db, "seaql_migrations")
.await
.expect("table check should work")
);
}
#[tokio::test]
async fn record_applied_migration_inserts_row_with_timestamp() {
let db = setup_db().await;
let manager = SchemaManager::new(&db);
ensure_migration_table(&manager)
.await
.expect("tracking table should exist");
record_applied_migration("create_widgets", &db)
.await
.expect("tracking row should insert");
let row = seaql_migrations::Entity::find_by_id("create_widgets".to_owned())
.one(&db)
.await
.expect("tracking row should load")
.expect("tracking row should exist");
assert_eq!(row.version, "create_widgets");
assert!(row.applied_at > 0);
}
#[tokio::test]
async fn remove_applied_migration_deletes_tracking_row() {
let db = setup_db().await;
let manager = SchemaManager::new(&db);
ensure_migration_table(&manager)
.await
.expect("tracking table should exist");
record_applied_migration("create_widgets", &db)
.await
.expect("tracking row should insert");
remove_applied_migration("create_widgets", &db)
.await
.expect("tracking row should delete");
assert!(
seaql_migrations::Entity::find_by_id("create_widgets".to_owned())
.one(&db)
.await
.expect("tracking row query should succeed")
.is_none()
);
}
#[tokio::test]
async fn applied_versions_returns_recorded_names() {
let db = setup_db().await;
let manager = SchemaManager::new(&db);
ensure_migration_table(&manager)
.await
.expect("tracking table should exist");
record_applied_migration("create_widgets", &db)
.await
.expect("first row should insert");
record_applied_migration("create_gadgets", &db)
.await
.expect("second row should insert");
let versions = applied_versions(&db)
.await
.expect("applied versions should load");
assert_eq!(
versions,
std::collections::HashSet::from([
"create_widgets".to_owned(),
"create_gadgets".to_owned(),
])
);
}
#[tokio::test]
async fn status_marks_mixed_pending_and_applied_migrations() {
let db = setup_db().await;
let manager = SchemaManager::new(&db);
ensure_migration_table(&manager)
.await
.expect("tracking table should exist");
record_applied_migration(CreateWidgets.name(), &db)
.await
.expect("tracking row should insert");
let mut migrator = Migrator::new();
migrator.add(CreateWidgets);
migrator.add(CreateGadgets);
assert_eq!(
migrator.status(&db).await,
vec![
MigrationStatus {
name: CreateWidgets.name().to_owned(),
applied: true,
},
MigrationStatus {
name: CreateGadgets.name().to_owned(),
applied: false,
},
]
);
}
#[tokio::test]
async fn create_table_migration_creates_expected_columns() {
let db = setup_db().await;
let mut migrator = Migrator::new();
migrator.add(CreateWidgets);
migrator.up(&db).await.expect("migration should apply");
assert!(column_exists(&db, "widgets", "id").await);
assert!(column_exists(&db, "widgets", "name").await);
}
#[tokio::test]
async fn up_applies_multiple_registered_migrations() {
let db = setup_db().await;
let mut migrator = Migrator::new();
migrator.add(CreateWidgets);
migrator.add(CreateGadgets);
migrator.up(&db).await.expect("migrations should apply");
assert!(
table_exists(&db, "widgets")
.await
.expect("widgets table check should work")
);
assert!(
table_exists(&db, "gadgets")
.await
.expect("gadgets table check should work")
);
}
#[tokio::test]
async fn up_skips_previously_applied_migration_rows() {
let db = setup_db().await;
let manager = SchemaManager::new(&db);
ensure_migration_table(&manager)
.await
.expect("tracking table should exist");
record_applied_migration(CreateWidgets.name(), &db)
.await
.expect("tracking row should insert");
let mut migrator = Migrator::new();
migrator.add(CreateWidgets);
migrator
.up(&db)
.await
.expect("migration should skip cleanly");
assert!(
!table_exists(&db, "widgets")
.await
.expect("widgets table check should work")
);
}
#[tokio::test]
async fn add_column_migration_adds_column() {
let db = setup_db().await;
let mut migrator = Migrator::new();
migrator.add(CreateWidgets);
migrator.add(AddWidgetDescription);
migrator.up(&db).await.expect("migrations should apply");
assert!(column_exists(&db, "widgets", "description").await);
}
#[tokio::test]
async fn remove_column_migration_removes_column() {
let db = setup_db().await;
let mut migrator = Migrator::new();
migrator.add(CreateWidgets);
migrator.add(RemoveWidgetName);
migrator.up(&db).await.expect("migrations should apply");
assert!(!column_exists(&db, "widgets", "name").await);
}
#[tokio::test]
async fn add_index_migration_creates_index() {
let db = setup_db().await;
let mut migrator = Migrator::new();
migrator.add(CreateWidgets);
migrator.add(AddWidgetNameIndex);
migrator.up(&db).await.expect("migrations should apply");
assert!(index_exists(&db, "widgets", "idx_widgets_name").await);
}
#[tokio::test]
async fn remove_index_migration_drops_index() {
let db = setup_db().await;
let mut migrator = Migrator::new();
migrator.add(CreateWidgets);
migrator.add(AddWidgetNameIndex);
migrator.add(RemoveWidgetNameIndex);
migrator.up(&db).await.expect("migrations should apply");
assert!(!index_exists(&db, "widgets", "idx_widgets_name").await);
}
#[tokio::test]
async fn rename_column_migration_renames_column() {
let db = setup_db().await;
let mut migrator = Migrator::new();
migrator.add(CreateWidgets);
migrator.add(RenameWidgetNameToTitle);
migrator.up(&db).await.expect("migrations should apply");
assert!(!column_exists(&db, "widgets", "name").await);
assert!(column_exists(&db, "widgets", "title").await);
}
#[tokio::test]
async fn change_column_type_migration_updates_declared_type() {
let db = setup_db().await;
let mut migrator = Migrator::new();
migrator.add(CreateWidgets);
migrator.add(ChangeWidgetNameToText);
migrator.up(&db).await.expect("migrations should apply");
let declared_type = column_type(&db, "widgets", "name")
.await
.expect("name column should exist")
.to_uppercase();
assert!(declared_type.contains("TEXT"));
}
#[tokio::test]
async fn down_runs_migrations_in_reverse_order() {
let db = setup_db().await;
db.execute_unprepared(
"CREATE TABLE rollback_log (id INTEGER PRIMARY KEY AUTOINCREMENT, step TEXT NOT NULL)",
)
.await
.expect("rollback log table should be created");
let mut migrator = Migrator::new();
migrator.add(FirstRollbackStep);
migrator.add(SecondRollbackStep);
migrator.up(&db).await.expect("migrations should apply");
migrator
.down(&db)
.await
.expect("migrations should rollback");
assert_eq!(rollback_steps(&db).await, vec!["second", "first"]);
}
#[tokio::test]
async fn down_removes_tracking_rows_after_rollback() {
let db = setup_db().await;
let mut migrator = Migrator::new();
migrator.add(CreateWidgets);
migrator.add(CreateGadgets);
migrator.up(&db).await.expect("migrations should apply");
migrator
.down(&db)
.await
.expect("migrations should rollback");
let applied = seaql_migrations::Entity::find()
.all(&db)
.await
.expect("tracking rows should load");
assert!(applied.is_empty());
}
#[tokio::test]
async fn wrapped_migration_rolls_back_side_effects_on_error() {
let db = setup_db().await;
db.execute_unprepared(
"CREATE TABLE audit_entries (id INTEGER PRIMARY KEY AUTOINCREMENT, message TEXT NOT NULL)",
)
.await
.expect("audit table should be created");
let mut migrator = Migrator::new();
migrator.add(InsertAuditThenFailWrapped);
let error = migrator.up(&db).await.expect_err("migration should fail");
assert!(matches!(error, DbErr::Custom(message) if message == "wrapped migration failure"));
assert_eq!(table_row_count(&db, "audit_entries").await, 0);
}
#[tokio::test]
async fn unwrapped_migration_leaves_side_effects_on_error() {
let db = setup_db().await;
db.execute_unprepared(
"CREATE TABLE audit_entries (id INTEGER PRIMARY KEY AUTOINCREMENT, message TEXT NOT NULL)",
)
.await
.expect("audit table should be created");
let mut migrator = Migrator::new();
migrator.add(InsertAuditThenFailUnwrapped);
let error = migrator.up(&db).await.expect_err("migration should fail");
assert!(
matches!(error, DbErr::Custom(message) if message == "unwrapped migration failure")
);
assert_eq!(table_row_count(&db, "audit_entries").await, 1);
}
#[tokio::test]
async fn reversible_rename_migration_restores_original_schema_on_down() {
let db = setup_db().await;
let manager = SchemaManager::new(&db);
CreateWidgets
.up(&manager)
.await
.expect("base table should be created");
let mut migrator = Migrator::new();
migrator.add(RenameWidgetNameToTitle);
migrator
.up(&db)
.await
.expect("rename migration should apply");
assert!(column_exists(&db, "widgets", "title").await);
migrator
.down(&db)
.await
.expect("rename migration should rollback");
assert!(column_exists(&db, "widgets", "name").await);
assert!(!column_exists(&db, "widgets", "title").await);
}
#[test]
fn should_wrap_in_transaction_honors_backend_defaults_and_overrides() {
let postgres_default = TransactionPreferenceMigration { preference: None };
let always_wrap = TransactionPreferenceMigration {
preference: Some(true),
};
let never_wrap = TransactionPreferenceMigration {
preference: Some(false),
};
assert!(should_wrap_in_transaction(
&postgres_default,
DbBackend::Postgres
));
assert!(!should_wrap_in_transaction(
&postgres_default,
DbBackend::Sqlite
));
assert!(should_wrap_in_transaction(&always_wrap, DbBackend::Sqlite));
assert!(!should_wrap_in_transaction(
&never_wrap,
DbBackend::Postgres
));
}
#[tokio::test]
async fn status_creates_tracking_table_without_recording_rows() {
let db = setup_db().await;
let migrator = Migrator::new();
let statuses = migrator.status(&db).await;
assert!(statuses.is_empty());
assert!(
table_exists(&db, "seaql_migrations")
.await
.expect("tracking table check should work")
);
assert!(
applied_versions(&db)
.await
.expect("applied versions should load")
.is_empty()
);
}
#[tokio::test]
async fn down_skips_pending_migrations_and_rolls_back_only_applied_entries() {
let db = setup_db().await;
let mut up_migrator = Migrator::new();
up_migrator.add(CreateWidgets);
up_migrator
.up(&db)
.await
.expect("base migration should apply");
let mut down_migrator = Migrator::new();
down_migrator.add(CreateWidgets);
down_migrator.add(CreateGadgets);
down_migrator
.down(&db)
.await
.expect("rollback should skip pending migrations");
assert!(
!table_exists(&db, "widgets")
.await
.expect("widgets table check should work")
);
assert!(
!table_exists(&db, "gadgets")
.await
.expect("gadgets table check should work")
);
assert!(
applied_versions(&db)
.await
.expect("applied versions should load")
.is_empty()
);
}
#[tokio::test]
async fn up_stops_after_first_failure_and_skips_later_migrations() {
let db = setup_db().await;
db.execute_unprepared(
"CREATE TABLE audit_entries (id INTEGER PRIMARY KEY AUTOINCREMENT, message TEXT NOT NULL)",
)
.await
.expect("audit table should be created");
let mut migrator = Migrator::new();
migrator.add(CreateWidgets);
migrator.add(InsertAuditThenFailWrapped);
migrator.add(CreateGadgets);
let error = migrator
.up(&db)
.await
.expect_err("migration batch should fail");
assert!(matches!(error, DbErr::Custom(message) if message == "wrapped migration failure"));
assert!(
table_exists(&db, "widgets")
.await
.expect("widgets table check should work")
);
assert!(
!table_exists(&db, "gadgets")
.await
.expect("gadgets table check should work")
);
assert_eq!(table_row_count(&db, "audit_entries").await, 0);
assert_eq!(
applied_versions(&db)
.await
.expect("applied versions should load"),
std::collections::HashSet::from([CreateWidgets.name().to_owned()])
);
}
}