1use sea_orm::{
23 ConnectionTrait, DatabaseBackend, DbErr, ExecResult, FromQueryResult, Statement,
24 TransactionTrait,
25};
26use sea_orm_migration::MigrationTrait;
27use std::collections::HashSet;
28use thiserror::Error;
29use tracing::{debug, info};
30use xxhash_rust::xxh3::xxh3_64;
31
32#[derive(Debug, Error)]
34pub enum MigrationError {
35 #[error("failed to create migration table for module '{module}': {source}")]
37 CreateTable { module: String, source: DbErr },
38
39 #[error("failed to query migration history for module '{module}': {source}")]
41 QueryHistory { module: String, source: DbErr },
42
43 #[error("migration '{migration}' failed for module '{module}': {source}")]
45 MigrationFailed {
46 module: String,
47 migration: String,
48 source: DbErr,
49 },
50
51 #[error("failed to record migration '{migration}' for module '{module}': {source}")]
53 RecordFailed {
54 module: String,
55 migration: String,
56 source: DbErr,
57 },
58
59 #[error("duplicate migration name '{name}' for module '{module}'")]
61 DuplicateMigrationName { module: String, name: String },
62}
63
64#[derive(Debug, Clone)]
66pub struct MigrationResult {
67 pub applied: usize,
69 pub skipped: usize,
71 pub applied_names: Vec<String>,
73}
74
75#[derive(Debug, FromQueryResult)]
77struct MigrationRecord {
78 version: String,
79}
80
81fn sanitize_module_name(name: &str) -> String {
87 let mut out = String::with_capacity(name.len());
88 for c in name.chars() {
89 match c {
90 'a'..='z' | 'A'..='Z' | '0'..='9' | '_' => out.push(c),
91 _ => out.push('_'),
92 }
93 }
94 if out.is_empty() { "_".to_owned() } else { out }
95}
96
97fn migration_table_name(module_name: &str) -> String {
105 const PREFIX: &str = "modkit_migrations__";
106 const SEP: &str = "__";
107 const HASH_LEN: usize = 8;
108 const PG_IDENT_MAX: usize = 63;
109
110 let sanitized = sanitize_module_name(module_name);
111 let hash = xxh3_64(module_name.as_bytes());
112 let hash8 = format!("{hash:016x}")[..HASH_LEN].to_owned();
113
114 let reserved = PREFIX.len() + SEP.len() + HASH_LEN;
116 let max_prefix_len = PG_IDENT_MAX.saturating_sub(reserved);
117 let prefix_part = if max_prefix_len == 0 {
118 String::new()
119 } else if sanitized.len() > max_prefix_len {
120 sanitized[..max_prefix_len].to_owned()
121 } else {
122 sanitized
123 };
124
125 format!("{PREFIX}{prefix_part}{SEP}{hash8}")
126}
127
128async fn ensure_migration_table(
130 conn: &impl ConnectionTrait,
131 table_name: &str,
132 module_name: &str,
133) -> Result<(), MigrationError> {
134 let backend = conn.get_database_backend();
135
136 let sql = match backend {
137 DatabaseBackend::Postgres => format!(
138 r#"
139 CREATE TABLE IF NOT EXISTS "{table_name}" (
140 version VARCHAR(255) PRIMARY KEY,
141 applied_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
142 )
143 "#
144 ),
145 DatabaseBackend::MySql => format!(
146 r"
147 CREATE TABLE IF NOT EXISTS `{table_name}` (
148 version VARCHAR(255) PRIMARY KEY,
149 applied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
150 )
151 "
152 ),
153 DatabaseBackend::Sqlite => format!(
154 r#"
155 CREATE TABLE IF NOT EXISTS "{table_name}" (
156 version TEXT PRIMARY KEY,
157 applied_at TEXT NOT NULL DEFAULT (datetime('now'))
158 )
159 "#
160 ),
161 };
162
163 conn.execute(Statement::from_string(backend, sql))
164 .await
165 .map_err(|e| MigrationError::CreateTable {
166 module: module_name.to_owned(),
167 source: e,
168 })?;
169
170 Ok(())
171}
172
173async fn get_applied_migrations(
175 conn: &impl ConnectionTrait,
176 table_name: &str,
177 module_name: &str,
178) -> Result<HashSet<String>, MigrationError> {
179 let backend = conn.get_database_backend();
180
181 let sql = match backend {
182 DatabaseBackend::Postgres | DatabaseBackend::Sqlite => {
183 format!(r#"SELECT version FROM "{table_name}""#)
184 }
185 DatabaseBackend::MySql => format!(r"SELECT version FROM `{table_name}`"),
186 };
187
188 let records: Vec<MigrationRecord> =
189 MigrationRecord::find_by_statement(Statement::from_string(backend, sql))
190 .all(conn)
191 .await
192 .map_err(|e| MigrationError::QueryHistory {
193 module: module_name.to_owned(),
194 source: e,
195 })?;
196
197 Ok(records.into_iter().map(|r| r.version).collect())
198}
199
200async fn record_migration(
202 conn: &impl ConnectionTrait,
203 table_name: &str,
204 module_name: &str,
205 migration_name: &str,
206) -> Result<ExecResult, MigrationError> {
207 let backend = conn.get_database_backend();
208
209 let sql = match backend {
210 DatabaseBackend::Postgres | DatabaseBackend::Sqlite => {
211 format!(r#"INSERT INTO "{table_name}" (version) VALUES ($1)"#)
212 }
213 DatabaseBackend::MySql => format!(r"INSERT INTO `{table_name}` (version) VALUES (?)"),
214 };
215
216 conn.execute(Statement::from_sql_and_values(
217 backend,
218 &sql,
219 [migration_name.into()],
220 ))
221 .await
222 .map_err(|e| MigrationError::RecordFailed {
223 module: module_name.to_owned(),
224 migration: migration_name.to_owned(),
225 source: e,
226 })
227}
228
229pub async fn run_migrations_for_module(
260 db: &crate::Db,
261 module_name: &str,
262 migrations: Vec<Box<dyn MigrationTrait>>,
263) -> Result<MigrationResult, MigrationError> {
264 let conn = db.sea_internal();
265 run_module_migrations(&conn, module_name, migrations).await
266}
267
268async fn run_module_migrations<C>(
287 conn: &C,
288 module_name: &str,
289 migrations: Vec<Box<dyn MigrationTrait>>,
290) -> Result<MigrationResult, MigrationError>
291where
292 C: ConnectionTrait + TransactionTrait,
293{
294 if migrations.is_empty() {
295 debug!(module = module_name, "No migrations to run");
296 return Ok(MigrationResult {
297 applied: 0,
298 skipped: 0,
299 applied_names: vec![],
300 });
301 }
302
303 let mut seen = HashSet::new();
305 for m in &migrations {
306 let n = m.name().to_owned();
307 if !seen.insert(n.clone()) {
308 return Err(MigrationError::DuplicateMigrationName {
309 module: module_name.to_owned(),
310 name: n,
311 });
312 }
313 }
314
315 let table_name = migration_table_name(module_name);
317
318 ensure_migration_table(conn, &table_name, module_name).await?;
320
321 let applied = get_applied_migrations(conn, &table_name, module_name).await?;
323
324 let mut sorted_migrations: Vec<_> = migrations.into_iter().collect();
326 sorted_migrations.sort_by(|a, b| a.name().cmp(b.name()));
327
328 let mut result = MigrationResult {
329 applied: 0,
330 skipped: 0,
331 applied_names: vec![],
332 };
333
334 for migration in sorted_migrations {
335 let name = migration.name().to_owned();
336
337 if applied.contains(&name) {
338 debug!(
339 module = module_name,
340 migration = %name,
341 "Migration already applied, skipping"
342 );
343 result.skipped += 1;
344 continue;
345 }
346
347 info!(
348 module = module_name,
349 migration = %name,
350 "Applying migration"
351 );
352
353 let txn = conn
357 .begin()
358 .await
359 .map_err(|e| MigrationError::MigrationFailed {
360 module: module_name.to_owned(),
361 migration: name.clone(),
362 source: e,
363 })?;
364
365 let manager = sea_orm_migration::SchemaManager::new(&txn);
366 let res: Result<(), MigrationError> = (async {
367 migration
368 .up(&manager)
369 .await
370 .map_err(|e| MigrationError::MigrationFailed {
371 module: module_name.to_owned(),
372 migration: name.clone(),
373 source: e,
374 })?;
375
376 record_migration(&txn, &table_name, module_name, &name).await?;
377 Ok(())
378 })
379 .await;
380
381 match res {
382 Ok(()) => {
383 txn.commit()
384 .await
385 .map_err(|e| MigrationError::MigrationFailed {
386 module: module_name.to_owned(),
387 migration: name.clone(),
388 source: e,
389 })?;
390 }
391 Err(err) => {
392 let _ = txn.rollback().await;
393 return Err(err);
394 }
395 }
396
397 info!(
398 module = module_name,
399 migration = %name,
400 "Migration applied successfully"
401 );
402
403 result.applied += 1;
404 result.applied_names.push(name);
405 }
406
407 info!(
408 module = module_name,
409 applied = result.applied,
410 skipped = result.skipped,
411 "Migration run complete"
412 );
413
414 Ok(result)
415}
416
417pub async fn run_migrations_for_testing(
451 db: &crate::Db,
452 migrations: Vec<Box<dyn MigrationTrait>>,
453) -> Result<MigrationResult, MigrationError> {
454 let conn = db.sea_internal();
455 run_module_migrations(&conn, "_test", migrations).await
456}
457
458pub async fn get_pending_migrations(
474 db: &crate::Db,
475 module_name: &str,
476 migrations: &[Box<dyn MigrationTrait>],
477) -> Result<Vec<String>, MigrationError> {
478 let conn = db.sea_internal();
479 get_pending_migrations_internal(&conn, module_name, migrations).await
480}
481
482async fn get_pending_migrations_internal(
484 conn: &impl ConnectionTrait,
485 module_name: &str,
486 migrations: &[Box<dyn MigrationTrait>],
487) -> Result<Vec<String>, MigrationError> {
488 if migrations.is_empty() {
489 return Ok(vec![]);
490 }
491
492 let table_name = migration_table_name(module_name);
493
494 let backend = conn.get_database_backend();
497 let table_exists = match backend {
498 DatabaseBackend::Postgres => {
499 let sql = format!(
500 "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = '{table_name}')"
501 );
502 let row = conn
503 .query_one(Statement::from_string(backend, sql))
504 .await
505 .map_err(|e| MigrationError::QueryHistory {
506 module: module_name.to_owned(),
507 source: e,
508 })?;
509 row.and_then(|r| r.try_get_by_index::<bool>(0).ok())
510 .unwrap_or(false)
511 }
512 DatabaseBackend::MySql => {
513 let sql = format!(
514 "SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{table_name}'"
515 );
516 let row = conn
517 .query_one(Statement::from_string(backend, sql))
518 .await
519 .map_err(|e| MigrationError::QueryHistory {
520 module: module_name.to_owned(),
521 source: e,
522 })?;
523 row.and_then(|r| r.try_get_by_index::<i64>(0).ok())
524 .is_some_and(|c| c > 0)
525 }
526 DatabaseBackend::Sqlite => {
527 let sql = format!(
528 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='{table_name}'"
529 );
530 let row = conn
531 .query_one(Statement::from_string(backend, sql))
532 .await
533 .map_err(|e| MigrationError::QueryHistory {
534 module: module_name.to_owned(),
535 source: e,
536 })?;
537 row.and_then(|r| r.try_get_by_index::<i32>(0).ok())
538 .is_some_and(|c| c > 0)
539 }
540 };
541
542 if !table_exists {
543 return Ok(migrations.iter().map(|m| m.name().to_owned()).collect());
544 }
545
546 let applied = get_applied_migrations(conn, &table_name, module_name).await?;
547
548 Ok(migrations
549 .iter()
550 .filter(|m| !applied.contains(m.name()))
551 .map(|m| m.name().to_owned())
552 .collect())
553}
554
555#[cfg(test)]
556#[cfg_attr(coverage_nightly, coverage(off))]
557mod tests {
558 use super::*;
559 use sea_orm_migration::prelude::*;
560 use sea_orm_migration::sea_orm::DatabaseBackend;
561
562 #[test]
563 fn test_sanitize_module_name() {
564 assert_eq!(sanitize_module_name("my_module"), "my_module");
565 assert_eq!(sanitize_module_name("my-module"), "my_module");
566 assert_eq!(sanitize_module_name("MyModule123"), "MyModule123");
567 assert_eq!(sanitize_module_name("my.module"), "my_module");
568 assert_eq!(sanitize_module_name("my/module"), "my_module");
569 assert_eq!(sanitize_module_name(""), "_");
570 }
571
572 #[test]
573 fn test_migration_table_name() {
574 let a = migration_table_name("users_info");
575 let b = migration_table_name("users_info");
576 assert_eq!(a, b, "deterministic");
577 assert!(a.starts_with("modkit_migrations__"));
578 assert!(a.len() <= 63);
579
580 let c = migration_table_name("simple-user-settings");
581 assert!(c.contains("simple_user_settings"));
582 assert!(c.len() <= 63);
583 }
584
585 #[allow(dead_code)]
587 struct TestMigration {
588 name: String,
589 }
590
591 impl MigrationName for TestMigration {
592 fn name(&self) -> &str {
593 &self.name
594 }
595 }
596
597 #[async_trait::async_trait]
598 impl MigrationTrait for TestMigration {
599 async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
600 let backend = manager.get_database_backend();
602 let table_name = format!("test_{}", self.name.replace('-', "_"));
603
604 let sql = match backend {
605 DatabaseBackend::Sqlite => {
606 format!("CREATE TABLE IF NOT EXISTS \"{table_name}\" (id INTEGER PRIMARY KEY)")
607 }
608 DatabaseBackend::Postgres => {
609 format!("CREATE TABLE IF NOT EXISTS \"{table_name}\" (id SERIAL PRIMARY KEY)")
610 }
611 DatabaseBackend::MySql => format!(
612 "CREATE TABLE IF NOT EXISTS `{table_name}` (id INT AUTO_INCREMENT PRIMARY KEY)"
613 ),
614 };
615
616 manager
617 .get_connection()
618 .execute(Statement::from_string(backend, sql))
619 .await?;
620 Ok(())
621 }
622
623 async fn down(&self, _manager: &SchemaManager) -> Result<(), DbErr> {
624 Ok(())
625 }
626 }
627
628 #[cfg(feature = "sqlite")]
629 mod sqlite_tests {
630 use super::*;
631 use crate::{ConnectOpts, Db, connect_db};
632
633 async fn setup_test_db() -> Db {
634 connect_db("sqlite::memory:", ConnectOpts::default())
635 .await
636 .expect("Failed to create test database")
637 }
638
639 #[tokio::test]
640 async fn test_run_module_migrations_empty() {
641 let db = setup_test_db().await;
642
643 let result = run_migrations_for_module(&db, "test_module", vec![])
644 .await
645 .expect("Migration should succeed");
646
647 assert_eq!(result.applied, 0);
648 assert_eq!(result.skipped, 0);
649 assert!(result.applied_names.is_empty());
650 }
651
652 #[tokio::test]
653 async fn test_run_module_migrations_single() {
654 let db = setup_test_db().await;
655
656 let migrations: Vec<Box<dyn MigrationTrait>> = vec![Box::new(TestMigration {
657 name: "m001_initial".to_owned(),
658 })];
659
660 let result = run_migrations_for_module(&db, "test_module_single", migrations)
661 .await
662 .expect("Migration should succeed");
663
664 assert_eq!(result.applied, 1);
665 assert_eq!(result.skipped, 0);
666 assert_eq!(result.applied_names, vec!["m001_initial"]);
667 }
668
669 #[tokio::test]
670 async fn test_run_module_migrations_idempotent() {
671 let db = setup_test_db().await;
672
673 let module_name = "test_module_idempotent";
674
675 let migrations: Vec<Box<dyn MigrationTrait>> = vec![Box::new(TestMigration {
677 name: "m001_initial".to_owned(),
678 })];
679
680 let result1 = run_migrations_for_module(&db, module_name, migrations)
681 .await
682 .expect("First migration run should succeed");
683
684 assert_eq!(result1.applied, 1);
685
686 let migrations: Vec<Box<dyn MigrationTrait>> = vec![Box::new(TestMigration {
688 name: "m001_initial".to_owned(),
689 })];
690
691 let result2 = run_migrations_for_module(&db, module_name, migrations)
692 .await
693 .expect("Second migration run should succeed");
694
695 assert_eq!(result2.applied, 0);
696 assert_eq!(result2.skipped, 1);
697 }
698
699 #[tokio::test]
700 async fn test_run_module_migrations_deterministic_ordering() {
701 let db = setup_test_db().await;
702
703 let migrations: Vec<Box<dyn MigrationTrait>> = vec![
705 Box::new(TestMigration {
706 name: "m003_third".to_owned(),
707 }),
708 Box::new(TestMigration {
709 name: "m001_first".to_owned(),
710 }),
711 Box::new(TestMigration {
712 name: "m002_second".to_owned(),
713 }),
714 ];
715
716 let result = run_migrations_for_module(&db, "test_ordering", migrations)
717 .await
718 .expect("Migration should succeed");
719
720 assert_eq!(
722 result.applied_names,
723 vec!["m001_first", "m002_second", "m003_third"]
724 );
725 }
726
727 #[tokio::test]
728 async fn test_per_module_table_separation() {
729 let db = setup_test_db().await;
730
731 let migrations_a: Vec<Box<dyn MigrationTrait>> = vec![Box::new(TestMigration {
733 name: "m001_initial".to_owned(),
734 })];
735
736 let result_a = run_migrations_for_module(&db, "module_a", migrations_a)
737 .await
738 .expect("Module A migration should succeed");
739
740 assert_eq!(result_a.applied, 1);
741
742 let migrations_b: Vec<Box<dyn MigrationTrait>> = vec![Box::new(TestMigration {
744 name: "m001_initial".to_owned(),
745 })];
746
747 let result_b = run_migrations_for_module(&db, "module_b", migrations_b)
748 .await
749 .expect("Module B migration should succeed");
750
751 assert_eq!(result_b.applied, 1);
753
754 let table_a = migration_table_name("module_a");
756 let table_b = migration_table_name("module_b");
757 let conn = db.sea_internal();
758 let backend = conn.get_database_backend();
759 let check_a = conn
760 .query_one(Statement::from_string(
761 backend,
762 format!(
763 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='{table_a}'"
764 ),
765 ))
766 .await
767 .expect("Query should succeed")
768 .expect("Result should exist");
769
770 let count_a: i32 = check_a.try_get_by_index(0).expect("Should get count");
771 assert_eq!(count_a, 1);
772
773 let check_b = conn
774 .query_one(Statement::from_string(
775 backend,
776 format!(
777 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='{table_b}'"
778 ),
779 ))
780 .await
781 .expect("Query should succeed")
782 .expect("Result should exist");
783
784 let count_b: i32 = check_b.try_get_by_index(0).expect("Should get count");
785 assert_eq!(count_b, 1);
786 }
787
788 #[tokio::test]
789 async fn test_duplicate_migration_name_rejected() {
790 let db = setup_test_db().await;
791
792 let migrations: Vec<Box<dyn MigrationTrait>> = vec![
793 Box::new(TestMigration {
794 name: "m001_dup".to_owned(),
795 }),
796 Box::new(TestMigration {
797 name: "m001_dup".to_owned(),
798 }),
799 ];
800
801 let err = run_migrations_for_module(&db, "dup_module", migrations)
802 .await
803 .unwrap_err();
804
805 match err {
806 MigrationError::DuplicateMigrationName { module, name } => {
807 assert_eq!(module, "dup_module");
808 assert_eq!(name, "m001_dup");
809 }
810 other => panic!("expected DuplicateMigrationName, got: {other:?}"),
811 }
812 }
813
814 #[test]
815 fn test_table_name_length_limit() {
816 let long =
818 "this-is-a-very-long-module-name/with.weird.chars/and-more-and-more-and-more";
819 let t = migration_table_name(long);
820 assert!(t.len() <= 63);
821 assert!(t.starts_with("modkit_migrations__"));
822 }
823
824 #[tokio::test]
825 async fn test_get_pending_migrations() {
826 let db = setup_test_db().await;
827
828 let module_name = "test_pending";
829
830 let migrations: Vec<Box<dyn MigrationTrait>> = vec![
832 Box::new(TestMigration {
833 name: "m001_first".to_owned(),
834 }),
835 Box::new(TestMigration {
836 name: "m002_second".to_owned(),
837 }),
838 ];
839
840 let pending = get_pending_migrations(&db, module_name, &migrations)
841 .await
842 .expect("Should succeed");
843
844 assert_eq!(pending.len(), 2);
845
846 let first: Vec<Box<dyn MigrationTrait>> = vec![Box::new(TestMigration {
848 name: "m001_first".to_owned(),
849 })];
850
851 run_migrations_for_module(&db, module_name, first)
852 .await
853 .expect("Should succeed");
854
855 let pending = get_pending_migrations(&db, module_name, &migrations)
857 .await
858 .expect("Should succeed");
859
860 assert_eq!(pending, vec!["m002_second"]);
861 }
862
863 #[tokio::test]
864 async fn test_run_migrations_for_testing() {
865 let db = setup_test_db().await;
866
867 let migrations: Vec<Box<dyn MigrationTrait>> = vec![Box::new(TestMigration {
868 name: "m001_test".to_owned(),
869 })];
870
871 let result = run_migrations_for_testing(&db, migrations)
872 .await
873 .expect("Test migrations should succeed");
874
875 assert_eq!(result.applied, 1);
876 }
877 }
878}