1use sea_orm::{
23 ConnectionTrait, DatabaseBackend, DbErr, ExecResult, FromQueryResult, Statement,
24 TransactionTrait,
25};
26use sea_orm_migration::MigrationTrait;
27use std::collections::HashSet;
28use thiserror::Error;
29use tracing::{debug, info};
30use xxhash_rust::xxh3::xxh3_64;
31
32#[derive(Debug, Error)]
34pub enum MigrationError {
35 #[error("failed to create migration table for module '{module}': {source}")]
37 CreateTable { module: String, source: DbErr },
38
39 #[error("failed to query migration history for module '{module}': {source}")]
41 QueryHistory { module: String, source: DbErr },
42
43 #[error("migration '{migration}' failed for module '{module}': {source}")]
45 MigrationFailed {
46 module: String,
47 migration: String,
48 source: DbErr,
49 },
50
51 #[error("failed to record migration '{migration}' for module '{module}': {source}")]
53 RecordFailed {
54 module: String,
55 migration: String,
56 source: DbErr,
57 },
58
59 #[error("duplicate migration name '{name}' for module '{module}'")]
61 DuplicateMigrationName { module: String, name: String },
62}
63
64#[derive(Debug, Clone)]
66pub struct MigrationResult {
67 pub applied: usize,
69 pub skipped: usize,
71 pub applied_names: Vec<String>,
73}
74
75#[derive(Debug, FromQueryResult)]
77struct MigrationRecord {
78 version: String,
79}
80
81fn sanitize_module_name(name: &str) -> String {
87 let mut out = String::with_capacity(name.len());
88 for c in name.chars() {
89 match c {
90 'a'..='z' | 'A'..='Z' | '0'..='9' | '_' => out.push(c),
91 _ => out.push('_'),
92 }
93 }
94 if out.is_empty() { "_".to_owned() } else { out }
95}
96
97fn migration_table_name(module_name: &str) -> String {
105 const PREFIX: &str = "modkit_migrations__";
106 const SEP: &str = "__";
107 const HASH_LEN: usize = 8;
108 const PG_IDENT_MAX: usize = 63;
109
110 let sanitized = sanitize_module_name(module_name);
111 let hash = xxh3_64(module_name.as_bytes());
112 let hash8 = format!("{hash:016x}")[..HASH_LEN].to_owned();
113
114 let reserved = PREFIX.len() + SEP.len() + HASH_LEN;
116 let max_prefix_len = PG_IDENT_MAX.saturating_sub(reserved);
117 let prefix_part = if max_prefix_len == 0 {
118 String::new()
119 } else if sanitized.len() > max_prefix_len {
120 sanitized[..max_prefix_len].to_owned()
121 } else {
122 sanitized
123 };
124
125 format!("{PREFIX}{prefix_part}{SEP}{hash8}")
126}
127
128async fn ensure_migration_table(
130 conn: &impl ConnectionTrait,
131 table_name: &str,
132 module_name: &str,
133) -> Result<(), MigrationError> {
134 let backend = conn.get_database_backend();
135
136 let sql = match backend {
137 DatabaseBackend::Postgres => format!(
138 r#"
139 CREATE TABLE IF NOT EXISTS "{table_name}" (
140 version VARCHAR(255) PRIMARY KEY,
141 applied_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
142 )
143 "#
144 ),
145 DatabaseBackend::MySql => format!(
146 r"
147 CREATE TABLE IF NOT EXISTS `{table_name}` (
148 version VARCHAR(255) PRIMARY KEY,
149 applied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
150 )
151 "
152 ),
153 DatabaseBackend::Sqlite => format!(
154 r#"
155 CREATE TABLE IF NOT EXISTS "{table_name}" (
156 version TEXT PRIMARY KEY,
157 applied_at TEXT NOT NULL DEFAULT (datetime('now'))
158 )
159 "#
160 ),
161 };
162
163 conn.execute(Statement::from_string(backend, sql))
164 .await
165 .map_err(|e| MigrationError::CreateTable {
166 module: module_name.to_owned(),
167 source: e,
168 })?;
169
170 Ok(())
171}
172
173async fn get_applied_migrations(
175 conn: &impl ConnectionTrait,
176 table_name: &str,
177 module_name: &str,
178) -> Result<HashSet<String>, MigrationError> {
179 let backend = conn.get_database_backend();
180
181 let sql = match backend {
182 DatabaseBackend::Postgres | DatabaseBackend::Sqlite => {
183 format!(r#"SELECT version FROM "{table_name}""#)
184 }
185 DatabaseBackend::MySql => format!(r"SELECT version FROM `{table_name}`"),
186 };
187
188 let records: Vec<MigrationRecord> =
189 MigrationRecord::find_by_statement(Statement::from_string(backend, sql))
190 .all(conn)
191 .await
192 .map_err(|e| MigrationError::QueryHistory {
193 module: module_name.to_owned(),
194 source: e,
195 })?;
196
197 Ok(records.into_iter().map(|r| r.version).collect())
198}
199
200async fn record_migration(
202 conn: &impl ConnectionTrait,
203 table_name: &str,
204 module_name: &str,
205 migration_name: &str,
206) -> Result<ExecResult, MigrationError> {
207 let backend = conn.get_database_backend();
208
209 let sql = match backend {
210 DatabaseBackend::Postgres | DatabaseBackend::Sqlite => {
211 format!(r#"INSERT INTO "{table_name}" (version) VALUES ($1)"#)
212 }
213 DatabaseBackend::MySql => format!(r"INSERT INTO `{table_name}` (version) VALUES (?)"),
214 };
215
216 conn.execute(Statement::from_sql_and_values(
217 backend,
218 &sql,
219 [migration_name.into()],
220 ))
221 .await
222 .map_err(|e| MigrationError::RecordFailed {
223 module: module_name.to_owned(),
224 migration: migration_name.to_owned(),
225 source: e,
226 })
227}
228
229pub async fn run_migrations_for_module(
260 db: &crate::Db,
261 module_name: &str,
262 migrations: Vec<Box<dyn MigrationTrait>>,
263) -> Result<MigrationResult, MigrationError> {
264 let conn = db.sea_internal();
265 run_module_migrations(&conn, module_name, migrations).await
266}
267
268async fn run_module_migrations<C>(
287 conn: &C,
288 module_name: &str,
289 migrations: Vec<Box<dyn MigrationTrait>>,
290) -> Result<MigrationResult, MigrationError>
291where
292 C: ConnectionTrait + TransactionTrait,
293{
294 if migrations.is_empty() {
295 debug!(module = module_name, "No migrations to run");
296 return Ok(MigrationResult {
297 applied: 0,
298 skipped: 0,
299 applied_names: vec![],
300 });
301 }
302
303 let mut seen = HashSet::new();
305 for m in &migrations {
306 let n = m.name().to_owned();
307 if !seen.insert(n.clone()) {
308 return Err(MigrationError::DuplicateMigrationName {
309 module: module_name.to_owned(),
310 name: n,
311 });
312 }
313 }
314
315 let table_name = migration_table_name(module_name);
317
318 ensure_migration_table(conn, &table_name, module_name).await?;
320
321 let applied = get_applied_migrations(conn, &table_name, module_name).await?;
323
324 let mut sorted_migrations: Vec<_> = migrations.into_iter().collect();
326 sorted_migrations.sort_by(|a, b| a.name().cmp(b.name()));
327
328 let mut result = MigrationResult {
329 applied: 0,
330 skipped: 0,
331 applied_names: vec![],
332 };
333
334 for migration in sorted_migrations {
335 let name = migration.name().to_owned();
336
337 if applied.contains(&name) {
338 debug!(
339 module = module_name,
340 migration = %name,
341 "Migration already applied, skipping"
342 );
343 result.skipped += 1;
344 continue;
345 }
346
347 info!(
348 module = module_name,
349 migration = %name,
350 "Applying migration"
351 );
352
353 let txn = conn
357 .begin()
358 .await
359 .map_err(|e| MigrationError::MigrationFailed {
360 module: module_name.to_owned(),
361 migration: name.clone(),
362 source: e,
363 })?;
364
365 let manager = sea_orm_migration::SchemaManager::new(&txn);
366 let res: Result<(), MigrationError> = (async {
367 migration
368 .up(&manager)
369 .await
370 .map_err(|e| MigrationError::MigrationFailed {
371 module: module_name.to_owned(),
372 migration: name.clone(),
373 source: e,
374 })?;
375
376 record_migration(&txn, &table_name, module_name, &name).await?;
377 Ok(())
378 })
379 .await;
380
381 match res {
382 Ok(()) => {
383 txn.commit()
384 .await
385 .map_err(|e| MigrationError::MigrationFailed {
386 module: module_name.to_owned(),
387 migration: name.clone(),
388 source: e,
389 })?;
390 }
391 Err(err) => {
392 _ = txn.rollback().await;
393 return Err(err);
394 }
395 }
396
397 info!(
398 module = module_name,
399 migration = %name,
400 "Migration applied successfully"
401 );
402
403 result.applied += 1;
404 result.applied_names.push(name);
405 }
406
407 info!(
408 module = module_name,
409 applied = result.applied,
410 skipped = result.skipped,
411 "Migration run complete"
412 );
413
414 Ok(result)
415}
416
417pub async fn run_migrations_for_testing(
451 db: &crate::Db,
452 migrations: Vec<Box<dyn MigrationTrait>>,
453) -> Result<MigrationResult, MigrationError> {
454 let conn = db.sea_internal();
455 run_module_migrations(&conn, "_test", migrations).await
456}
457
458pub async fn get_pending_migrations(
474 db: &crate::Db,
475 module_name: &str,
476 migrations: &[Box<dyn MigrationTrait>],
477) -> Result<Vec<String>, MigrationError> {
478 let conn = db.sea_internal();
479 get_pending_migrations_internal(&conn, module_name, migrations).await
480}
481
482async fn get_pending_migrations_internal(
484 conn: &impl ConnectionTrait,
485 module_name: &str,
486 migrations: &[Box<dyn MigrationTrait>],
487) -> Result<Vec<String>, MigrationError> {
488 if migrations.is_empty() {
489 return Ok(vec![]);
490 }
491
492 let table_name = migration_table_name(module_name);
493
494 let backend = conn.get_database_backend();
497 let table_exists = match backend {
498 DatabaseBackend::Postgres => {
499 let sql = format!(
500 "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = '{table_name}')"
501 );
502 let row = conn
503 .query_one(Statement::from_string(backend, sql))
504 .await
505 .map_err(|e| MigrationError::QueryHistory {
506 module: module_name.to_owned(),
507 source: e,
508 })?;
509 row.and_then(|r| r.try_get_by_index::<bool>(0).ok())
510 .unwrap_or(false)
511 }
512 DatabaseBackend::MySql => {
513 let sql = format!(
514 "SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{table_name}'"
515 );
516 let row = conn
517 .query_one(Statement::from_string(backend, sql))
518 .await
519 .map_err(|e| MigrationError::QueryHistory {
520 module: module_name.to_owned(),
521 source: e,
522 })?;
523 row.and_then(|r| r.try_get_by_index::<i64>(0).ok())
524 .is_some_and(|c| c > 0)
525 }
526 DatabaseBackend::Sqlite => {
527 let sql = format!(
528 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='{table_name}'"
529 );
530 let row = conn
531 .query_one(Statement::from_string(backend, sql))
532 .await
533 .map_err(|e| MigrationError::QueryHistory {
534 module: module_name.to_owned(),
535 source: e,
536 })?;
537 row.and_then(|r| r.try_get_by_index::<i32>(0).ok())
538 .is_some_and(|c| c > 0)
539 }
540 };
541
542 if !table_exists {
543 return Ok(migrations.iter().map(|m| m.name().to_owned()).collect());
544 }
545
546 let applied = get_applied_migrations(conn, &table_name, module_name).await?;
547
548 Ok(migrations
549 .iter()
550 .filter(|m| !applied.contains(m.name()))
551 .map(|m| m.name().to_owned())
552 .collect())
553}
554
555#[cfg(test)]
556#[cfg_attr(coverage_nightly, coverage(off))]
557mod tests {
558 use super::*;
559 use sea_orm_migration::prelude::*;
560 use sea_orm_migration::sea_orm::DatabaseBackend;
561
562 #[test]
563 fn test_sanitize_module_name() {
564 assert_eq!(sanitize_module_name("my_module"), "my_module");
565 assert_eq!(sanitize_module_name("my-module"), "my_module");
566 assert_eq!(sanitize_module_name("MyModule123"), "MyModule123");
567 assert_eq!(sanitize_module_name("my.module"), "my_module");
568 assert_eq!(sanitize_module_name("my/module"), "my_module");
569 assert_eq!(sanitize_module_name(""), "_");
570 }
571
572 #[test]
573 fn test_migration_table_name() {
574 let users_info_table_1 = migration_table_name("users-info");
575 let users_info_table_2 = migration_table_name("users-info");
576 assert_eq!(users_info_table_1, users_info_table_2, "deterministic");
577 assert!(users_info_table_1.starts_with("modkit_migrations__"));
578 assert!(users_info_table_1.len() <= 63);
579
580 let simple_settings_table = migration_table_name("simple-user-settings");
581 assert!(simple_settings_table.contains("simple_user_settings"));
583 assert!(simple_settings_table.len() <= 63);
584 }
585
586 #[allow(dead_code)]
588 struct TestMigration {
589 name: String,
590 }
591
592 impl MigrationName for TestMigration {
593 fn name(&self) -> &str {
594 &self.name
595 }
596 }
597
598 #[async_trait::async_trait]
599 impl MigrationTrait for TestMigration {
600 async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
601 let backend = manager.get_database_backend();
603 let table_name = format!("test_{}", self.name.replace('-', "_"));
604
605 let sql = match backend {
606 DatabaseBackend::Sqlite => {
607 format!("CREATE TABLE IF NOT EXISTS \"{table_name}\" (id INTEGER PRIMARY KEY)")
608 }
609 DatabaseBackend::Postgres => {
610 format!("CREATE TABLE IF NOT EXISTS \"{table_name}\" (id SERIAL PRIMARY KEY)")
611 }
612 DatabaseBackend::MySql => format!(
613 "CREATE TABLE IF NOT EXISTS `{table_name}` (id INT AUTO_INCREMENT PRIMARY KEY)"
614 ),
615 };
616
617 manager
618 .get_connection()
619 .execute(Statement::from_string(backend, sql))
620 .await?;
621 Ok(())
622 }
623
624 async fn down(&self, _manager: &SchemaManager) -> Result<(), DbErr> {
625 Ok(())
626 }
627 }
628
629 #[cfg(feature = "sqlite")]
630 mod sqlite_tests {
631 use super::*;
632 use crate::{ConnectOpts, Db, connect_db};
633
634 async fn setup_test_db() -> Db {
635 connect_db("sqlite::memory:", ConnectOpts::default())
636 .await
637 .expect("Failed to create test database")
638 }
639
640 #[tokio::test]
641 async fn test_run_module_migrations_empty() {
642 let db = setup_test_db().await;
643
644 let result = run_migrations_for_module(&db, "test_module", vec![])
645 .await
646 .expect("Migration should succeed");
647
648 assert_eq!(result.applied, 0);
649 assert_eq!(result.skipped, 0);
650 assert!(result.applied_names.is_empty());
651 }
652
653 #[tokio::test]
654 async fn test_run_module_migrations_single() {
655 let db = setup_test_db().await;
656
657 let migrations: Vec<Box<dyn MigrationTrait>> = vec![Box::new(TestMigration {
658 name: "m001_initial".to_owned(),
659 })];
660
661 let result = run_migrations_for_module(&db, "test_module_single", migrations)
662 .await
663 .expect("Migration should succeed");
664
665 assert_eq!(result.applied, 1);
666 assert_eq!(result.skipped, 0);
667 assert_eq!(result.applied_names, vec!["m001_initial"]);
668 }
669
670 #[tokio::test]
671 async fn test_run_module_migrations_idempotent() {
672 let db = setup_test_db().await;
673
674 let module_name = "test_module_idempotent";
675
676 let migrations: Vec<Box<dyn MigrationTrait>> = vec![Box::new(TestMigration {
678 name: "m001_initial".to_owned(),
679 })];
680
681 let result1 = run_migrations_for_module(&db, module_name, migrations)
682 .await
683 .expect("First migration run should succeed");
684
685 assert_eq!(result1.applied, 1);
686
687 let migrations: Vec<Box<dyn MigrationTrait>> = vec![Box::new(TestMigration {
689 name: "m001_initial".to_owned(),
690 })];
691
692 let result2 = run_migrations_for_module(&db, module_name, migrations)
693 .await
694 .expect("Second migration run should succeed");
695
696 assert_eq!(result2.applied, 0);
697 assert_eq!(result2.skipped, 1);
698 }
699
700 #[tokio::test]
701 async fn test_run_module_migrations_deterministic_ordering() {
702 let db = setup_test_db().await;
703
704 let migrations: Vec<Box<dyn MigrationTrait>> = vec![
706 Box::new(TestMigration {
707 name: "m003_third".to_owned(),
708 }),
709 Box::new(TestMigration {
710 name: "m001_first".to_owned(),
711 }),
712 Box::new(TestMigration {
713 name: "m002_second".to_owned(),
714 }),
715 ];
716
717 let result = run_migrations_for_module(&db, "test_ordering", migrations)
718 .await
719 .expect("Migration should succeed");
720
721 assert_eq!(
723 result.applied_names,
724 vec!["m001_first", "m002_second", "m003_third"]
725 );
726 }
727
728 #[tokio::test]
729 async fn test_per_module_table_separation() {
730 let db = setup_test_db().await;
731
732 let migrations_a: Vec<Box<dyn MigrationTrait>> = vec![Box::new(TestMigration {
734 name: "m001_initial".to_owned(),
735 })];
736
737 let result_a = run_migrations_for_module(&db, "module_a", migrations_a)
738 .await
739 .expect("Module A migration should succeed");
740
741 assert_eq!(result_a.applied, 1);
742
743 let migrations_b: Vec<Box<dyn MigrationTrait>> = vec![Box::new(TestMigration {
745 name: "m001_initial".to_owned(),
746 })];
747
748 let result_b = run_migrations_for_module(&db, "module_b", migrations_b)
749 .await
750 .expect("Module B migration should succeed");
751
752 assert_eq!(result_b.applied, 1);
754
755 let table_a = migration_table_name("module_a");
757 let table_b = migration_table_name("module_b");
758 let conn = db.sea_internal();
759 let backend = conn.get_database_backend();
760 let check_a = conn
761 .query_one(Statement::from_string(
762 backend,
763 format!(
764 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='{table_a}'"
765 ),
766 ))
767 .await
768 .expect("Query should succeed")
769 .expect("Result should exist");
770
771 let count_a: i32 = check_a.try_get_by_index(0).expect("Should get count");
772 assert_eq!(count_a, 1);
773
774 let check_b = conn
775 .query_one(Statement::from_string(
776 backend,
777 format!(
778 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='{table_b}'"
779 ),
780 ))
781 .await
782 .expect("Query should succeed")
783 .expect("Result should exist");
784
785 let count_b: i32 = check_b.try_get_by_index(0).expect("Should get count");
786 assert_eq!(count_b, 1);
787 }
788
789 #[tokio::test]
790 async fn test_duplicate_migration_name_rejected() {
791 let db = setup_test_db().await;
792
793 let migrations: Vec<Box<dyn MigrationTrait>> = vec![
794 Box::new(TestMigration {
795 name: "m001_dup".to_owned(),
796 }),
797 Box::new(TestMigration {
798 name: "m001_dup".to_owned(),
799 }),
800 ];
801
802 let err = run_migrations_for_module(&db, "dup_module", migrations)
803 .await
804 .unwrap_err();
805
806 match err {
807 MigrationError::DuplicateMigrationName { module, name } => {
808 assert_eq!(module, "dup_module");
809 assert_eq!(name, "m001_dup");
810 }
811 other => panic!("expected DuplicateMigrationName, got: {other:?}"),
812 }
813 }
814
815 #[test]
816 fn test_table_name_length_limit() {
817 let long =
819 "this-is-a-very-long-module-name/with.weird.chars/and-more-and-more-and-more";
820 let t = migration_table_name(long);
821 assert!(t.len() <= 63);
822 assert!(t.starts_with("modkit_migrations__"));
823 }
824
825 #[tokio::test]
826 async fn test_get_pending_migrations() {
827 let db = setup_test_db().await;
828
829 let module_name = "test_pending";
830
831 let migrations: Vec<Box<dyn MigrationTrait>> = vec![
833 Box::new(TestMigration {
834 name: "m001_first".to_owned(),
835 }),
836 Box::new(TestMigration {
837 name: "m002_second".to_owned(),
838 }),
839 ];
840
841 let pending = get_pending_migrations(&db, module_name, &migrations)
842 .await
843 .expect("Should succeed");
844
845 assert_eq!(pending.len(), 2);
846
847 let first: Vec<Box<dyn MigrationTrait>> = vec![Box::new(TestMigration {
849 name: "m001_first".to_owned(),
850 })];
851
852 run_migrations_for_module(&db, module_name, first)
853 .await
854 .expect("Should succeed");
855
856 let pending = get_pending_migrations(&db, module_name, &migrations)
858 .await
859 .expect("Should succeed");
860
861 assert_eq!(pending, vec!["m002_second"]);
862 }
863
864 #[tokio::test]
865 async fn test_run_migrations_for_testing() {
866 let db = setup_test_db().await;
867
868 let migrations: Vec<Box<dyn MigrationTrait>> = vec![Box::new(TestMigration {
869 name: "m001_test".to_owned(),
870 })];
871
872 let result = run_migrations_for_testing(&db, migrations)
873 .await
874 .expect("Test migrations should succeed");
875
876 assert_eq!(result.applied, 1);
877 }
878 }
879}