use sea_orm::{
ConnectionTrait, DatabaseBackend, DbErr, ExecResult, FromQueryResult, Statement,
TransactionTrait,
};
use sea_orm_migration::MigrationTrait;
use std::collections::HashSet;
use thiserror::Error;
use tracing::{debug, info};
use xxhash_rust::xxh3::xxh3_64;
#[derive(Debug, Error)]
pub enum MigrationError {
#[error("failed to create migration table for module '{module}': {source}")]
CreateTable { module: String, source: DbErr },
#[error("failed to query migration history for module '{module}': {source}")]
QueryHistory { module: String, source: DbErr },
#[error("migration '{migration}' failed for module '{module}': {source}")]
MigrationFailed {
module: String,
migration: String,
source: DbErr,
},
#[error("failed to record migration '{migration}' for module '{module}': {source}")]
RecordFailed {
module: String,
migration: String,
source: DbErr,
},
#[error("duplicate migration name '{name}' for module '{module}'")]
DuplicateMigrationName { module: String, name: String },
}
#[derive(Debug, Clone)]
pub struct MigrationResult {
pub applied: usize,
pub skipped: usize,
pub applied_names: Vec<String>,
}
#[derive(Debug, FromQueryResult)]
struct MigrationRecord {
version: String,
}
fn sanitize_module_name(name: &str) -> String {
let mut out = String::with_capacity(name.len());
for c in name.chars() {
match c {
'a'..='z' | 'A'..='Z' | '0'..='9' | '_' => out.push(c),
_ => out.push('_'),
}
}
if out.is_empty() { "_".to_owned() } else { out }
}
fn migration_table_name(module_name: &str) -> String {
const PREFIX: &str = "modkit_migrations__";
const SEP: &str = "__";
const HASH_LEN: usize = 8;
const PG_IDENT_MAX: usize = 63;
let sanitized = sanitize_module_name(module_name);
let hash = xxh3_64(module_name.as_bytes());
let hash8 = format!("{hash:016x}")[..HASH_LEN].to_owned();
let reserved = PREFIX.len() + SEP.len() + HASH_LEN;
let max_prefix_len = PG_IDENT_MAX.saturating_sub(reserved);
let prefix_part = if max_prefix_len == 0 {
String::new()
} else if sanitized.len() > max_prefix_len {
sanitized[..max_prefix_len].to_owned()
} else {
sanitized
};
format!("{PREFIX}{prefix_part}{SEP}{hash8}")
}
async fn ensure_migration_table(
conn: &impl ConnectionTrait,
table_name: &str,
module_name: &str,
) -> Result<(), MigrationError> {
let backend = conn.get_database_backend();
let sql = match backend {
DatabaseBackend::Postgres => format!(
r#"
CREATE TABLE IF NOT EXISTS "{table_name}" (
version VARCHAR(255) PRIMARY KEY,
applied_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
)
"#
),
DatabaseBackend::MySql => format!(
r"
CREATE TABLE IF NOT EXISTS `{table_name}` (
version VARCHAR(255) PRIMARY KEY,
applied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
)
"
),
DatabaseBackend::Sqlite => format!(
r#"
CREATE TABLE IF NOT EXISTS "{table_name}" (
version TEXT PRIMARY KEY,
applied_at TEXT NOT NULL DEFAULT (datetime('now'))
)
"#
),
};
conn.execute(Statement::from_string(backend, sql))
.await
.map_err(|e| MigrationError::CreateTable {
module: module_name.to_owned(),
source: e,
})?;
Ok(())
}
async fn get_applied_migrations(
conn: &impl ConnectionTrait,
table_name: &str,
module_name: &str,
) -> Result<HashSet<String>, MigrationError> {
let backend = conn.get_database_backend();
let sql = match backend {
DatabaseBackend::Postgres | DatabaseBackend::Sqlite => {
format!(r#"SELECT version FROM "{table_name}""#)
}
DatabaseBackend::MySql => format!(r"SELECT version FROM `{table_name}`"),
};
let records: Vec<MigrationRecord> =
MigrationRecord::find_by_statement(Statement::from_string(backend, sql))
.all(conn)
.await
.map_err(|e| MigrationError::QueryHistory {
module: module_name.to_owned(),
source: e,
})?;
Ok(records.into_iter().map(|r| r.version).collect())
}
async fn record_migration(
conn: &impl ConnectionTrait,
table_name: &str,
module_name: &str,
migration_name: &str,
) -> Result<ExecResult, MigrationError> {
let backend = conn.get_database_backend();
let sql = match backend {
DatabaseBackend::Postgres | DatabaseBackend::Sqlite => {
format!(r#"INSERT INTO "{table_name}" (version) VALUES ($1)"#)
}
DatabaseBackend::MySql => format!(r"INSERT INTO `{table_name}` (version) VALUES (?)"),
};
conn.execute(Statement::from_sql_and_values(
backend,
&sql,
[migration_name.into()],
))
.await
.map_err(|e| MigrationError::RecordFailed {
module: module_name.to_owned(),
migration: migration_name.to_owned(),
source: e,
})
}
pub async fn run_migrations_for_module(
db: &crate::Db,
module_name: &str,
migrations: Vec<Box<dyn MigrationTrait>>,
) -> Result<MigrationResult, MigrationError> {
let conn = db.sea_internal();
run_module_migrations(&conn, module_name, migrations).await
}
async fn run_module_migrations<C>(
conn: &C,
module_name: &str,
migrations: Vec<Box<dyn MigrationTrait>>,
) -> Result<MigrationResult, MigrationError>
where
C: ConnectionTrait + TransactionTrait,
{
if migrations.is_empty() {
debug!(module = module_name, "No migrations to run");
return Ok(MigrationResult {
applied: 0,
skipped: 0,
applied_names: vec![],
});
}
let mut seen = HashSet::new();
for m in &migrations {
let n = m.name().to_owned();
if !seen.insert(n.clone()) {
return Err(MigrationError::DuplicateMigrationName {
module: module_name.to_owned(),
name: n,
});
}
}
let table_name = migration_table_name(module_name);
ensure_migration_table(conn, &table_name, module_name).await?;
let applied = get_applied_migrations(conn, &table_name, module_name).await?;
let mut sorted_migrations: Vec<_> = migrations.into_iter().collect();
sorted_migrations.sort_by(|a, b| a.name().cmp(b.name()));
let mut result = MigrationResult {
applied: 0,
skipped: 0,
applied_names: vec![],
};
for migration in sorted_migrations {
let name = migration.name().to_owned();
if applied.contains(&name) {
debug!(
module = module_name,
migration = %name,
"Migration already applied, skipping"
);
result.skipped += 1;
continue;
}
info!(
module = module_name,
migration = %name,
"Applying migration"
);
let txn = conn
.begin()
.await
.map_err(|e| MigrationError::MigrationFailed {
module: module_name.to_owned(),
migration: name.clone(),
source: e,
})?;
let manager = sea_orm_migration::SchemaManager::new(&txn);
let res: Result<(), MigrationError> = (async {
migration
.up(&manager)
.await
.map_err(|e| MigrationError::MigrationFailed {
module: module_name.to_owned(),
migration: name.clone(),
source: e,
})?;
record_migration(&txn, &table_name, module_name, &name).await?;
Ok(())
})
.await;
match res {
Ok(()) => {
txn.commit()
.await
.map_err(|e| MigrationError::MigrationFailed {
module: module_name.to_owned(),
migration: name.clone(),
source: e,
})?;
}
Err(err) => {
_ = txn.rollback().await;
return Err(err);
}
}
info!(
module = module_name,
migration = %name,
"Migration applied successfully"
);
result.applied += 1;
result.applied_names.push(name);
}
info!(
module = module_name,
applied = result.applied,
skipped = result.skipped,
"Migration run complete"
);
Ok(result)
}
pub async fn run_migrations_for_testing(
db: &crate::Db,
migrations: Vec<Box<dyn MigrationTrait>>,
) -> Result<MigrationResult, MigrationError> {
let conn = db.sea_internal();
run_module_migrations(&conn, "_test", migrations).await
}
pub async fn get_pending_migrations(
db: &crate::Db,
module_name: &str,
migrations: &[Box<dyn MigrationTrait>],
) -> Result<Vec<String>, MigrationError> {
let conn = db.sea_internal();
get_pending_migrations_internal(&conn, module_name, migrations).await
}
async fn get_pending_migrations_internal(
conn: &impl ConnectionTrait,
module_name: &str,
migrations: &[Box<dyn MigrationTrait>],
) -> Result<Vec<String>, MigrationError> {
if migrations.is_empty() {
return Ok(vec![]);
}
let table_name = migration_table_name(module_name);
let backend = conn.get_database_backend();
let table_exists = match backend {
DatabaseBackend::Postgres => {
let sql = format!(
"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = '{table_name}')"
);
let row = conn
.query_one(Statement::from_string(backend, sql))
.await
.map_err(|e| MigrationError::QueryHistory {
module: module_name.to_owned(),
source: e,
})?;
row.and_then(|r| r.try_get_by_index::<bool>(0).ok())
.unwrap_or(false)
}
DatabaseBackend::MySql => {
let sql = format!(
"SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{table_name}'"
);
let row = conn
.query_one(Statement::from_string(backend, sql))
.await
.map_err(|e| MigrationError::QueryHistory {
module: module_name.to_owned(),
source: e,
})?;
row.and_then(|r| r.try_get_by_index::<i64>(0).ok())
.is_some_and(|c| c > 0)
}
DatabaseBackend::Sqlite => {
let sql = format!(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='{table_name}'"
);
let row = conn
.query_one(Statement::from_string(backend, sql))
.await
.map_err(|e| MigrationError::QueryHistory {
module: module_name.to_owned(),
source: e,
})?;
row.and_then(|r| r.try_get_by_index::<i32>(0).ok())
.is_some_and(|c| c > 0)
}
};
if !table_exists {
return Ok(migrations.iter().map(|m| m.name().to_owned()).collect());
}
let applied = get_applied_migrations(conn, &table_name, module_name).await?;
Ok(migrations
.iter()
.filter(|m| !applied.contains(m.name()))
.map(|m| m.name().to_owned())
.collect())
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use super::*;
use sea_orm_migration::prelude::*;
use sea_orm_migration::sea_orm::DatabaseBackend;
#[test]
fn test_sanitize_module_name() {
assert_eq!(sanitize_module_name("my_module"), "my_module");
assert_eq!(sanitize_module_name("my-module"), "my_module");
assert_eq!(sanitize_module_name("MyModule123"), "MyModule123");
assert_eq!(sanitize_module_name("my.module"), "my_module");
assert_eq!(sanitize_module_name("my/module"), "my_module");
assert_eq!(sanitize_module_name(""), "_");
}
#[test]
fn test_migration_table_name() {
let users_info_table_1 = migration_table_name("users-info");
let users_info_table_2 = migration_table_name("users-info");
assert_eq!(users_info_table_1, users_info_table_2, "deterministic");
assert!(users_info_table_1.starts_with("modkit_migrations__"));
assert!(users_info_table_1.len() <= 63);
let simple_settings_table = migration_table_name("simple-user-settings");
assert!(simple_settings_table.contains("simple_user_settings"));
assert!(simple_settings_table.len() <= 63);
}
#[allow(dead_code)]
struct TestMigration {
name: String,
}
impl MigrationName for TestMigration {
fn name(&self) -> &str {
&self.name
}
}
#[async_trait::async_trait]
impl MigrationTrait for TestMigration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let backend = manager.get_database_backend();
let table_name = format!("test_{}", self.name.replace('-', "_"));
let sql = match backend {
DatabaseBackend::Sqlite => {
format!("CREATE TABLE IF NOT EXISTS \"{table_name}\" (id INTEGER PRIMARY KEY)")
}
DatabaseBackend::Postgres => {
format!("CREATE TABLE IF NOT EXISTS \"{table_name}\" (id SERIAL PRIMARY KEY)")
}
DatabaseBackend::MySql => format!(
"CREATE TABLE IF NOT EXISTS `{table_name}` (id INT AUTO_INCREMENT PRIMARY KEY)"
),
};
manager
.get_connection()
.execute(Statement::from_string(backend, sql))
.await?;
Ok(())
}
async fn down(&self, _manager: &SchemaManager) -> Result<(), DbErr> {
Ok(())
}
}
#[cfg(feature = "sqlite")]
mod sqlite_tests {
use super::*;
use crate::{ConnectOpts, Db, connect_db};
async fn setup_test_db() -> Db {
connect_db("sqlite::memory:", ConnectOpts::default())
.await
.expect("Failed to create test database")
}
#[tokio::test]
async fn test_run_module_migrations_empty() {
let db = setup_test_db().await;
let result = run_migrations_for_module(&db, "test_module", vec![])
.await
.expect("Migration should succeed");
assert_eq!(result.applied, 0);
assert_eq!(result.skipped, 0);
assert!(result.applied_names.is_empty());
}
#[tokio::test]
async fn test_run_module_migrations_single() {
let db = setup_test_db().await;
let migrations: Vec<Box<dyn MigrationTrait>> = vec![Box::new(TestMigration {
name: "m001_initial".to_owned(),
})];
let result = run_migrations_for_module(&db, "test_module_single", migrations)
.await
.expect("Migration should succeed");
assert_eq!(result.applied, 1);
assert_eq!(result.skipped, 0);
assert_eq!(result.applied_names, vec!["m001_initial"]);
}
#[tokio::test]
async fn test_run_module_migrations_idempotent() {
let db = setup_test_db().await;
let module_name = "test_module_idempotent";
let migrations: Vec<Box<dyn MigrationTrait>> = vec![Box::new(TestMigration {
name: "m001_initial".to_owned(),
})];
let result1 = run_migrations_for_module(&db, module_name, migrations)
.await
.expect("First migration run should succeed");
assert_eq!(result1.applied, 1);
let migrations: Vec<Box<dyn MigrationTrait>> = vec![Box::new(TestMigration {
name: "m001_initial".to_owned(),
})];
let result2 = run_migrations_for_module(&db, module_name, migrations)
.await
.expect("Second migration run should succeed");
assert_eq!(result2.applied, 0);
assert_eq!(result2.skipped, 1);
}
#[tokio::test]
async fn test_run_module_migrations_deterministic_ordering() {
let db = setup_test_db().await;
let migrations: Vec<Box<dyn MigrationTrait>> = vec![
Box::new(TestMigration {
name: "m003_third".to_owned(),
}),
Box::new(TestMigration {
name: "m001_first".to_owned(),
}),
Box::new(TestMigration {
name: "m002_second".to_owned(),
}),
];
let result = run_migrations_for_module(&db, "test_ordering", migrations)
.await
.expect("Migration should succeed");
assert_eq!(
result.applied_names,
vec!["m001_first", "m002_second", "m003_third"]
);
}
#[tokio::test]
async fn test_per_module_table_separation() {
let db = setup_test_db().await;
let migrations_a: Vec<Box<dyn MigrationTrait>> = vec![Box::new(TestMigration {
name: "m001_initial".to_owned(),
})];
let result_a = run_migrations_for_module(&db, "module_a", migrations_a)
.await
.expect("Module A migration should succeed");
assert_eq!(result_a.applied, 1);
let migrations_b: Vec<Box<dyn MigrationTrait>> = vec![Box::new(TestMigration {
name: "m001_initial".to_owned(),
})];
let result_b = run_migrations_for_module(&db, "module_b", migrations_b)
.await
.expect("Module B migration should succeed");
assert_eq!(result_b.applied, 1);
let table_a = migration_table_name("module_a");
let table_b = migration_table_name("module_b");
let conn = db.sea_internal();
let backend = conn.get_database_backend();
let check_a = conn
.query_one(Statement::from_string(
backend,
format!(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='{table_a}'"
),
))
.await
.expect("Query should succeed")
.expect("Result should exist");
let count_a: i32 = check_a.try_get_by_index(0).expect("Should get count");
assert_eq!(count_a, 1);
let check_b = conn
.query_one(Statement::from_string(
backend,
format!(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='{table_b}'"
),
))
.await
.expect("Query should succeed")
.expect("Result should exist");
let count_b: i32 = check_b.try_get_by_index(0).expect("Should get count");
assert_eq!(count_b, 1);
}
#[tokio::test]
async fn test_duplicate_migration_name_rejected() {
let db = setup_test_db().await;
let migrations: Vec<Box<dyn MigrationTrait>> = vec![
Box::new(TestMigration {
name: "m001_dup".to_owned(),
}),
Box::new(TestMigration {
name: "m001_dup".to_owned(),
}),
];
let err = run_migrations_for_module(&db, "dup_module", migrations)
.await
.unwrap_err();
match err {
MigrationError::DuplicateMigrationName { module, name } => {
assert_eq!(module, "dup_module");
assert_eq!(name, "m001_dup");
}
other => panic!("expected DuplicateMigrationName, got: {other:?}"),
}
}
#[test]
fn test_table_name_length_limit() {
let long =
"this-is-a-very-long-module-name/with.weird.chars/and-more-and-more-and-more";
let t = migration_table_name(long);
assert!(t.len() <= 63);
assert!(t.starts_with("modkit_migrations__"));
}
#[tokio::test]
async fn test_get_pending_migrations() {
let db = setup_test_db().await;
let module_name = "test_pending";
let migrations: Vec<Box<dyn MigrationTrait>> = vec![
Box::new(TestMigration {
name: "m001_first".to_owned(),
}),
Box::new(TestMigration {
name: "m002_second".to_owned(),
}),
];
let pending = get_pending_migrations(&db, module_name, &migrations)
.await
.expect("Should succeed");
assert_eq!(pending.len(), 2);
let first: Vec<Box<dyn MigrationTrait>> = vec![Box::new(TestMigration {
name: "m001_first".to_owned(),
})];
run_migrations_for_module(&db, module_name, first)
.await
.expect("Should succeed");
let pending = get_pending_migrations(&db, module_name, &migrations)
.await
.expect("Should succeed");
assert_eq!(pending, vec!["m002_second"]);
}
#[tokio::test]
async fn test_run_migrations_for_testing() {
let db = setup_test_db().await;
let migrations: Vec<Box<dyn MigrationTrait>> = vec![Box::new(TestMigration {
name: "m001_test".to_owned(),
})];
let result = run_migrations_for_testing(&db, migrations)
.await
.expect("Test migrations should succeed");
assert_eq!(result.applied, 1);
}
}
}