Skip to main content

rustrails_record/
migration.rs

1use chrono::Utc;
2use rustrails_support::{database, runtime};
3use sea_orm::{
4    ActiveModelTrait, ActiveValue::Set, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter,
5};
6use sea_orm_migration::{prelude::*, seaql_migrations};
7
8const MIGRATION_TABLE: &str = "seaql_migrations";
9
10/// Migration abstraction mirroring `sea-orm-migration` with an explicit version.
11pub trait Migration: Send + Sync {
12    /// Returns the migration name.
13    fn name(&self) -> &str;
14
15    /// Returns the migration version.
16    fn version(&self) -> &str;
17
18    /// Applies the migration.
19    async fn up(&self, manager: &SchemaManager<'_>) -> Result<(), sea_orm::DbErr>;
20
21    /// Rolls the migration back.
22    async fn down(&self, manager: &SchemaManager<'_>) -> Result<(), sea_orm::DbErr>;
23}
24
25/// Runs a collection of SeaORM migrations.
26pub struct Migrator {
27    migrations: Vec<Box<dyn MigrationTrait>>,
28}
29
30impl Migrator {
31    /// Creates an empty migrator.
32    pub fn new() -> Self {
33        Self {
34            migrations: Vec::new(),
35        }
36    }
37
38    /// Adds a migration to the execution list.
39    pub fn add(&mut self, migration: impl MigrationTrait + 'static) {
40        self.migrations.push(Box::new(migration));
41    }
42
43    /// Applies all pending migrations.
44    pub async fn up(&self, db: &DatabaseConnection) -> Result<(), sea_orm::DbErr> {
45        let manager = SchemaManager::new(db);
46        ensure_migration_table(&manager).await?;
47        let mut applied = applied_versions(db).await?;
48
49        for migration in &self.migrations {
50            if applied.contains(migration.name()) {
51                continue;
52            }
53
54            run_up_migration(migration.as_ref(), &manager).await?;
55            record_applied_migration(migration.name(), db).await?;
56            applied.insert(migration.name().to_owned());
57        }
58
59        Ok(())
60    }
61
62    /// Synchronous wrapper for [`Self::up`].
63    pub fn up_sync(&self) -> Result<(), sea_orm::DbErr> {
64        database::with_db(|db| runtime::block_on(self.up(db)))
65    }
66
67    /// Rolls back applied migrations in reverse order.
68    pub async fn down(&self, db: &DatabaseConnection) -> Result<(), sea_orm::DbErr> {
69        let manager = SchemaManager::new(db);
70        ensure_migration_table(&manager).await?;
71        let mut applied = applied_versions(db).await?;
72
73        for migration in self.migrations.iter().rev() {
74            if !applied.contains(migration.name()) {
75                continue;
76            }
77
78            run_down_migration(migration.as_ref(), &manager).await?;
79            remove_applied_migration(migration.name(), db).await?;
80            applied.remove(migration.name());
81        }
82
83        Ok(())
84    }
85
86    /// Synchronous wrapper for [`Self::down`].
87    pub fn down_sync(&self) -> Result<(), sea_orm::DbErr> {
88        database::with_db(|db| runtime::block_on(self.down(db)))
89    }
90
91    /// Returns the applied status for every registered migration.
92    pub async fn status(&self, db: &DatabaseConnection) -> Vec<MigrationStatus> {
93        let manager = SchemaManager::new(db);
94        let applied = match ensure_migration_table(&manager).await {
95            Ok(()) => applied_versions(db).await.unwrap_or_default(),
96            Err(_) => std::collections::HashSet::new(),
97        };
98
99        self.migrations
100            .iter()
101            .map(|migration| MigrationStatus {
102                name: migration.name().to_owned(),
103                applied: applied.contains(migration.name()),
104            })
105            .collect()
106    }
107
108    /// Synchronous wrapper for [`Self::status`].
109    pub fn status_sync(&self) -> Vec<MigrationStatus> {
110        database::with_db(|db| runtime::block_on(self.status(db)))
111    }
112}
113
114impl Default for Migrator {
115    fn default() -> Self {
116        Self::new()
117    }
118}
119
120/// A migration's applied status.
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct MigrationStatus {
123    /// The migration name.
124    pub name: String,
125    /// Whether the migration has been applied.
126    pub applied: bool,
127}
128
129async fn ensure_migration_table(manager: &SchemaManager<'_>) -> Result<(), sea_orm::DbErr> {
130    manager
131        .create_table(
132            Table::create()
133                .table(Alias::new(MIGRATION_TABLE))
134                .if_not_exists()
135                .col(
136                    ColumnDef::new(Alias::new("version"))
137                        .string()
138                        .not_null()
139                        .primary_key(),
140                )
141                .col(
142                    ColumnDef::new(Alias::new("applied_at"))
143                        .big_integer()
144                        .not_null(),
145                )
146                .to_owned(),
147        )
148        .await
149}
150
151async fn applied_versions(
152    db: &DatabaseConnection,
153) -> Result<std::collections::HashSet<String>, sea_orm::DbErr> {
154    Ok(seaql_migrations::Entity::find()
155        .all(db)
156        .await?
157        .into_iter()
158        .map(|model| model.version)
159        .collect())
160}
161
162async fn record_applied_migration(
163    name: &str,
164    db: &DatabaseConnection,
165) -> Result<(), sea_orm::DbErr> {
166    seaql_migrations::ActiveModel {
167        version: Set(name.to_owned()),
168        applied_at: Set(Utc::now().timestamp()),
169    }
170    .insert(db)
171    .await?;
172    Ok(())
173}
174
175async fn remove_applied_migration(
176    name: &str,
177    db: &DatabaseConnection,
178) -> Result<(), sea_orm::DbErr> {
179    seaql_migrations::Entity::delete_many()
180        .filter(seaql_migrations::Column::Version.eq(name))
181        .exec(db)
182        .await?;
183    Ok(())
184}
185
186async fn run_up_migration(
187    migration: &dyn MigrationTrait,
188    manager: &SchemaManager<'_>,
189) -> Result<(), sea_orm::DbErr> {
190    if should_wrap_in_transaction(migration, manager.get_database_backend()) {
191        let tx_manager = manager.begin().await?;
192        migration.up(&tx_manager).await?;
193        tx_manager.commit().await?;
194        Ok(())
195    } else {
196        migration.up(manager).await
197    }
198}
199
200async fn run_down_migration(
201    migration: &dyn MigrationTrait,
202    manager: &SchemaManager<'_>,
203) -> Result<(), sea_orm::DbErr> {
204    if should_wrap_in_transaction(migration, manager.get_database_backend()) {
205        let tx_manager = manager.begin().await?;
206        migration.down(&tx_manager).await?;
207        tx_manager.commit().await?;
208        Ok(())
209    } else {
210        migration.down(manager).await
211    }
212}
213
214fn should_wrap_in_transaction(migration: &dyn MigrationTrait, backend: sea_orm::DbBackend) -> bool {
215    migration
216        .use_transaction()
217        .unwrap_or(matches!(backend, sea_orm::DbBackend::Postgres))
218}
219
220#[cfg(test)]
221mod tests {
222    use rustrails_support::{database, runtime};
223    use sea_orm::{ConnectionTrait, Database, DbBackend, EntityTrait, Statement};
224    use sea_orm_migration::{prelude::*, seaql_migrations};
225
226    use super::{
227        MigrationStatus, Migrator, applied_versions, ensure_migration_table,
228        record_applied_migration, remove_applied_migration, should_wrap_in_transaction,
229    };
230
231    fn run_sync_migration_test(test: impl FnOnce() + Send + 'static) {
232        std::thread::spawn(move || {
233            let _rt = runtime::init_runtime();
234            database::establish("sqlite::memory:")
235                .expect("sqlite in-memory connection should succeed");
236            test();
237        })
238        .join()
239        .unwrap();
240    }
241
242    struct CreateWidgets;
243
244    impl MigrationName for CreateWidgets {
245        fn name(&self) -> &str {
246            "create_widgets"
247        }
248    }
249
250    #[async_trait::async_trait]
251    impl MigrationTrait for CreateWidgets {
252        async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
253            manager
254                .create_table(
255                    Table::create()
256                        .table(Alias::new("widgets"))
257                        .if_not_exists()
258                        .col(
259                            ColumnDef::new(Alias::new("id"))
260                                .integer()
261                                .not_null()
262                                .auto_increment()
263                                .primary_key(),
264                        )
265                        .col(ColumnDef::new(Alias::new("name")).string().not_null())
266                        .to_owned(),
267                )
268                .await
269        }
270
271        async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
272            manager
273                .drop_table(
274                    Table::drop()
275                        .table(Alias::new("widgets"))
276                        .if_exists()
277                        .to_owned(),
278                )
279                .await
280        }
281    }
282
283    struct CreateGadgets;
284
285    impl MigrationName for CreateGadgets {
286        fn name(&self) -> &str {
287            "create_gadgets"
288        }
289    }
290
291    #[async_trait::async_trait]
292    impl MigrationTrait for CreateGadgets {
293        async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
294            manager
295                .create_table(
296                    Table::create()
297                        .table(Alias::new("gadgets"))
298                        .if_not_exists()
299                        .col(
300                            ColumnDef::new(Alias::new("id"))
301                                .integer()
302                                .not_null()
303                                .auto_increment()
304                                .primary_key(),
305                        )
306                        .col(ColumnDef::new(Alias::new("title")).string().not_null())
307                        .to_owned(),
308                )
309                .await
310        }
311
312        async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
313            manager
314                .drop_table(
315                    Table::drop()
316                        .table(Alias::new("gadgets"))
317                        .if_exists()
318                        .to_owned(),
319                )
320                .await
321        }
322    }
323
324    struct AddWidgetDescription;
325
326    impl MigrationName for AddWidgetDescription {
327        fn name(&self) -> &str {
328            "add_widget_description"
329        }
330    }
331
332    #[async_trait::async_trait]
333    impl MigrationTrait for AddWidgetDescription {
334        async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
335            manager
336                .get_connection()
337                .execute_unprepared("ALTER TABLE widgets ADD COLUMN description TEXT")
338                .await?;
339            Ok(())
340        }
341
342        async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
343            manager
344                .get_connection()
345                .execute_unprepared("ALTER TABLE widgets DROP COLUMN description")
346                .await?;
347            Ok(())
348        }
349    }
350
351    struct RemoveWidgetName;
352
353    impl MigrationName for RemoveWidgetName {
354        fn name(&self) -> &str {
355            "remove_widget_name"
356        }
357    }
358
359    #[async_trait::async_trait]
360    impl MigrationTrait for RemoveWidgetName {
361        async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
362            manager
363                .get_connection()
364                .execute_unprepared("ALTER TABLE widgets DROP COLUMN name")
365                .await?;
366            Ok(())
367        }
368
369        async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
370            manager
371                .get_connection()
372                .execute_unprepared(
373                    "ALTER TABLE widgets ADD COLUMN name VARCHAR NOT NULL DEFAULT ''",
374                )
375                .await?;
376            Ok(())
377        }
378    }
379
380    struct AddWidgetNameIndex;
381
382    impl MigrationName for AddWidgetNameIndex {
383        fn name(&self) -> &str {
384            "add_widget_name_index"
385        }
386    }
387
388    #[async_trait::async_trait]
389    impl MigrationTrait for AddWidgetNameIndex {
390        async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
391            manager
392                .create_index(
393                    Index::create()
394                        .name("idx_widgets_name")
395                        .table(Alias::new("widgets"))
396                        .col(Alias::new("name"))
397                        .to_owned(),
398                )
399                .await
400        }
401
402        async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
403            manager
404                .drop_index(
405                    Index::drop()
406                        .name("idx_widgets_name")
407                        .table(Alias::new("widgets"))
408                        .to_owned(),
409                )
410                .await
411        }
412    }
413
414    struct RemoveWidgetNameIndex;
415
416    impl MigrationName for RemoveWidgetNameIndex {
417        fn name(&self) -> &str {
418            "remove_widget_name_index"
419        }
420    }
421
422    #[async_trait::async_trait]
423    impl MigrationTrait for RemoveWidgetNameIndex {
424        async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
425            manager
426                .drop_index(
427                    Index::drop()
428                        .name("idx_widgets_name")
429                        .table(Alias::new("widgets"))
430                        .to_owned(),
431                )
432                .await
433        }
434
435        async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
436            manager
437                .create_index(
438                    Index::create()
439                        .name("idx_widgets_name")
440                        .table(Alias::new("widgets"))
441                        .col(Alias::new("name"))
442                        .to_owned(),
443                )
444                .await
445        }
446    }
447
448    struct RenameWidgetNameToTitle;
449
450    impl MigrationName for RenameWidgetNameToTitle {
451        fn name(&self) -> &str {
452            "rename_widget_name_to_title"
453        }
454    }
455
456    #[async_trait::async_trait]
457    impl MigrationTrait for RenameWidgetNameToTitle {
458        async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
459            manager
460                .get_connection()
461                .execute_unprepared("ALTER TABLE widgets RENAME COLUMN name TO title")
462                .await?;
463            Ok(())
464        }
465
466        async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
467            manager
468                .get_connection()
469                .execute_unprepared("ALTER TABLE widgets RENAME COLUMN title TO name")
470                .await?;
471            Ok(())
472        }
473    }
474
475    struct ChangeWidgetNameToText;
476
477    impl MigrationName for ChangeWidgetNameToText {
478        fn name(&self) -> &str {
479            "change_widget_name_to_text"
480        }
481    }
482
483    #[async_trait::async_trait]
484    impl MigrationTrait for ChangeWidgetNameToText {
485        async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
486            manager
487                .get_connection()
488                .execute_unprepared(
489                    "ALTER TABLE widgets ADD COLUMN name_text TEXT NOT NULL DEFAULT ''",
490                )
491                .await?;
492            manager
493                .get_connection()
494                .execute_unprepared("UPDATE widgets SET name_text = name")
495                .await?;
496            manager
497                .get_connection()
498                .execute_unprepared("ALTER TABLE widgets DROP COLUMN name")
499                .await?;
500            manager
501                .get_connection()
502                .execute_unprepared("ALTER TABLE widgets RENAME COLUMN name_text TO name")
503                .await?;
504            Ok(())
505        }
506
507        async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
508            manager
509                .get_connection()
510                .execute_unprepared(
511                    "ALTER TABLE widgets ADD COLUMN name_string VARCHAR NOT NULL DEFAULT ''",
512                )
513                .await?;
514            manager
515                .get_connection()
516                .execute_unprepared("UPDATE widgets SET name_string = name")
517                .await?;
518            manager
519                .get_connection()
520                .execute_unprepared("ALTER TABLE widgets DROP COLUMN name")
521                .await?;
522            manager
523                .get_connection()
524                .execute_unprepared("ALTER TABLE widgets RENAME COLUMN name_string TO name")
525                .await?;
526            Ok(())
527        }
528    }
529
530    struct FirstRollbackStep;
531
532    impl MigrationName for FirstRollbackStep {
533        fn name(&self) -> &str {
534            "first_rollback_step"
535        }
536    }
537
538    #[async_trait::async_trait]
539    impl MigrationTrait for FirstRollbackStep {
540        async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
541            manager
542                .create_table(
543                    Table::create()
544                        .table(Alias::new("first_step"))
545                        .if_not_exists()
546                        .col(
547                            ColumnDef::new(Alias::new("id"))
548                                .integer()
549                                .not_null()
550                                .auto_increment()
551                                .primary_key(),
552                        )
553                        .to_owned(),
554                )
555                .await
556        }
557
558        async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
559            manager
560                .get_connection()
561                .execute_unprepared("INSERT INTO rollback_log (step) VALUES ('first')")
562                .await?;
563            manager
564                .drop_table(
565                    Table::drop()
566                        .table(Alias::new("first_step"))
567                        .if_exists()
568                        .to_owned(),
569                )
570                .await
571        }
572    }
573
574    struct SecondRollbackStep;
575
576    impl MigrationName for SecondRollbackStep {
577        fn name(&self) -> &str {
578            "second_rollback_step"
579        }
580    }
581
582    #[async_trait::async_trait]
583    impl MigrationTrait for SecondRollbackStep {
584        async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
585            manager
586                .create_table(
587                    Table::create()
588                        .table(Alias::new("second_step"))
589                        .if_not_exists()
590                        .col(
591                            ColumnDef::new(Alias::new("id"))
592                                .integer()
593                                .not_null()
594                                .auto_increment()
595                                .primary_key(),
596                        )
597                        .to_owned(),
598                )
599                .await
600        }
601
602        async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
603            manager
604                .get_connection()
605                .execute_unprepared("INSERT INTO rollback_log (step) VALUES ('second')")
606                .await?;
607            manager
608                .drop_table(
609                    Table::drop()
610                        .table(Alias::new("second_step"))
611                        .if_exists()
612                        .to_owned(),
613                )
614                .await
615        }
616    }
617
618    struct InsertAuditThenFailWrapped;
619
620    impl MigrationName for InsertAuditThenFailWrapped {
621        fn name(&self) -> &str {
622            "insert_audit_then_fail_wrapped"
623        }
624    }
625
626    #[async_trait::async_trait]
627    impl MigrationTrait for InsertAuditThenFailWrapped {
628        async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
629            manager
630                .get_connection()
631                .execute_unprepared("INSERT INTO audit_entries (message) VALUES ('wrapped')")
632                .await?;
633            Err(DbErr::Custom("wrapped migration failure".to_owned()))
634        }
635
636        async fn down(&self, _: &SchemaManager) -> Result<(), DbErr> {
637            Ok(())
638        }
639
640        fn use_transaction(&self) -> Option<bool> {
641            Some(true)
642        }
643    }
644
645    struct InsertAuditThenFailUnwrapped;
646
647    impl MigrationName for InsertAuditThenFailUnwrapped {
648        fn name(&self) -> &str {
649            "insert_audit_then_fail_unwrapped"
650        }
651    }
652
653    #[async_trait::async_trait]
654    impl MigrationTrait for InsertAuditThenFailUnwrapped {
655        async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
656            manager
657                .get_connection()
658                .execute_unprepared("INSERT INTO audit_entries (message) VALUES ('unwrapped')")
659                .await?;
660            Err(DbErr::Custom("unwrapped migration failure".to_owned()))
661        }
662
663        async fn down(&self, _: &SchemaManager) -> Result<(), DbErr> {
664            Ok(())
665        }
666
667        fn use_transaction(&self) -> Option<bool> {
668            Some(false)
669        }
670    }
671
672    struct TransactionPreferenceMigration {
673        preference: Option<bool>,
674    }
675
676    impl MigrationName for TransactionPreferenceMigration {
677        fn name(&self) -> &str {
678            "transaction_preference_migration"
679        }
680    }
681
682    #[async_trait::async_trait]
683    impl MigrationTrait for TransactionPreferenceMigration {
684        async fn up(&self, _: &SchemaManager) -> Result<(), DbErr> {
685            Ok(())
686        }
687
688        async fn down(&self, _: &SchemaManager) -> Result<(), DbErr> {
689            Ok(())
690        }
691
692        fn use_transaction(&self) -> Option<bool> {
693            self.preference
694        }
695    }
696
697    async fn setup_db() -> sea_orm::DatabaseConnection {
698        Database::connect("sqlite::memory:")
699            .await
700            .expect("in-memory sqlite connection should succeed")
701    }
702
703    async fn table_exists(
704        db: &sea_orm::DatabaseConnection,
705        table_name: &str,
706    ) -> Result<bool, sea_orm::DbErr> {
707        Ok(db
708            .query_one_raw(Statement::from_sql_and_values(
709                db.get_database_backend(),
710                "SELECT name FROM sqlite_master WHERE type = ? AND name = ?",
711                ["table".into(), table_name.into()],
712            ))
713            .await?
714            .is_some())
715    }
716
717    async fn column_exists(
718        db: &sea_orm::DatabaseConnection,
719        table_name: &str,
720        column_name: &str,
721    ) -> bool {
722        db.query_one_raw(Statement::from_string(
723            db.get_database_backend(),
724            format!(
725                "SELECT name FROM pragma_table_info('{table_name}') WHERE name = '{column_name}'"
726            ),
727        ))
728        .await
729        .expect("column existence query should work")
730        .is_some()
731    }
732
733    async fn column_type(
734        db: &sea_orm::DatabaseConnection,
735        table_name: &str,
736        column_name: &str,
737    ) -> Option<String> {
738        db.query_one_raw(Statement::from_string(
739            db.get_database_backend(),
740            format!(
741                "SELECT type FROM pragma_table_info('{table_name}') WHERE name = '{column_name}'"
742            ),
743        ))
744        .await
745        .expect("column type query should work")
746        .map(|row| row.try_get("", "type").expect("type should be readable"))
747    }
748
749    async fn index_exists(
750        db: &sea_orm::DatabaseConnection,
751        table_name: &str,
752        index_name: &str,
753    ) -> bool {
754        db.query_one_raw(Statement::from_string(
755            db.get_database_backend(),
756            format!(
757                "SELECT name FROM pragma_index_list('{table_name}') WHERE name = '{index_name}'"
758            ),
759        ))
760        .await
761        .expect("index existence query should work")
762        .is_some()
763    }
764
765    async fn table_row_count(db: &sea_orm::DatabaseConnection, table_name: &str) -> i64 {
766        db.query_one_raw(Statement::from_string(
767            db.get_database_backend(),
768            format!("SELECT COUNT(*) AS count FROM {table_name}"),
769        ))
770        .await
771        .expect("row count query should work")
772        .expect("count row should exist")
773        .try_get("", "count")
774        .expect("count should be readable")
775    }
776
777    async fn rollback_steps(db: &sea_orm::DatabaseConnection) -> Vec<String> {
778        db.query_all_raw(Statement::from_string(
779            db.get_database_backend(),
780            "SELECT step FROM rollback_log ORDER BY id".to_owned(),
781        ))
782        .await
783        .expect("rollback log query should work")
784        .into_iter()
785        .map(|row| row.try_get("", "step").expect("step should be readable"))
786        .collect()
787    }
788
789    #[tokio::test]
790    async fn new_migrator_starts_empty() {
791        let migrator = Migrator::new();
792        let db = setup_db().await;
793
794        let statuses = migrator.status(&db).await;
795
796        assert!(statuses.is_empty());
797    }
798
799    #[tokio::test]
800    async fn up_applies_registered_migrations() {
801        let db = setup_db().await;
802        let mut migrator = Migrator::new();
803        migrator.add(CreateWidgets);
804
805        migrator.up(&db).await.expect("migration should apply");
806
807        assert!(
808            table_exists(&db, "widgets")
809                .await
810                .expect("table existence query should work")
811        );
812    }
813
814    #[test]
815    fn up_sync_applies_registered_migrations() {
816        run_sync_migration_test(|| {
817            let mut migrator = Migrator::new();
818            migrator.add(CreateWidgets);
819
820            migrator.up_sync().expect("migration should apply");
821
822            let widgets_exist = runtime::block_on(async {
823                let db = database::db();
824                table_exists(&db, "widgets")
825                    .await
826                    .expect("table existence query should work")
827            });
828            assert!(widgets_exist);
829        });
830    }
831
832    #[tokio::test]
833    async fn down_rolls_back_applied_migrations() {
834        let db = setup_db().await;
835        let mut migrator = Migrator::new();
836        migrator.add(CreateWidgets);
837        migrator.up(&db).await.expect("migration should apply");
838
839        migrator.down(&db).await.expect("migration should rollback");
840
841        assert!(
842            !table_exists(&db, "widgets")
843                .await
844                .expect("table existence query should work")
845        );
846    }
847
848    #[test]
849    fn down_sync_rolls_back_applied_migrations() {
850        run_sync_migration_test(|| {
851            let mut migrator = Migrator::new();
852            migrator.add(CreateWidgets);
853            migrator.up_sync().expect("migration should apply");
854
855            migrator.down_sync().expect("migration should rollback");
856
857            let widgets_exist = runtime::block_on(async {
858                let db = database::db();
859                table_exists(&db, "widgets")
860                    .await
861                    .expect("table existence query should work")
862            });
863            assert!(!widgets_exist);
864        });
865    }
866
867    #[tokio::test]
868    async fn status_marks_pending_and_applied_migrations() {
869        let db = setup_db().await;
870        let mut migrator = Migrator::new();
871        migrator.add(CreateWidgets);
872
873        let before = migrator.status(&db).await;
874        assert_eq!(
875            before,
876            vec![MigrationStatus {
877                name: CreateWidgets.name().to_owned(),
878                applied: false,
879            }]
880        );
881
882        migrator.up(&db).await.expect("migration should apply");
883
884        let after = migrator.status(&db).await;
885        assert_eq!(
886            after,
887            vec![MigrationStatus {
888                name: CreateWidgets.name().to_owned(),
889                applied: true,
890            }]
891        );
892    }
893
894    #[test]
895    fn status_sync_marks_pending_and_applied_migrations() {
896        run_sync_migration_test(|| {
897            let mut migrator = Migrator::new();
898            migrator.add(CreateWidgets);
899
900            assert_eq!(
901                migrator.status_sync(),
902                vec![MigrationStatus {
903                    name: CreateWidgets.name().to_owned(),
904                    applied: false,
905                }]
906            );
907
908            migrator.up_sync().expect("migration should apply");
909
910            assert_eq!(
911                migrator.status_sync(),
912                vec![MigrationStatus {
913                    name: CreateWidgets.name().to_owned(),
914                    applied: true,
915                }]
916            );
917        });
918    }
919
920    #[tokio::test]
921    async fn repeated_up_skips_already_applied_migrations() {
922        let db = setup_db().await;
923        let mut migrator = Migrator::new();
924        migrator.add(CreateWidgets);
925
926        migrator.up(&db).await.expect("first up should succeed");
927        migrator.up(&db).await.expect("second up should be a no-op");
928
929        let applied = seaql_migrations::Entity::find()
930            .all(&db)
931            .await
932            .expect("migration rows should load");
933        assert_eq!(applied.len(), 1);
934    }
935
936    #[tokio::test]
937    async fn default_migrator_matches_new() {
938        let db = setup_db().await;
939        let migrator = Migrator::default();
940
941        assert_eq!(
942            migrator.status(&db).await,
943            Migrator::new().status(&db).await
944        );
945    }
946
947    #[tokio::test]
948    async fn ensure_migration_table_creates_tracking_table() {
949        let db = setup_db().await;
950        let manager = SchemaManager::new(&db);
951
952        ensure_migration_table(&manager)
953            .await
954            .expect("tracking table should be created");
955
956        assert!(
957            table_exists(&db, "seaql_migrations")
958                .await
959                .expect("table check should work")
960        );
961    }
962
963    #[tokio::test]
964    async fn ensure_migration_table_is_idempotent() {
965        let db = setup_db().await;
966        let manager = SchemaManager::new(&db);
967
968        ensure_migration_table(&manager)
969            .await
970            .expect("first create should succeed");
971        ensure_migration_table(&manager)
972            .await
973            .expect("second create should succeed");
974
975        assert!(
976            table_exists(&db, "seaql_migrations")
977                .await
978                .expect("table check should work")
979        );
980    }
981
982    #[tokio::test]
983    async fn record_applied_migration_inserts_row_with_timestamp() {
984        let db = setup_db().await;
985        let manager = SchemaManager::new(&db);
986        ensure_migration_table(&manager)
987            .await
988            .expect("tracking table should exist");
989
990        record_applied_migration("create_widgets", &db)
991            .await
992            .expect("tracking row should insert");
993
994        let row = seaql_migrations::Entity::find_by_id("create_widgets".to_owned())
995            .one(&db)
996            .await
997            .expect("tracking row should load")
998            .expect("tracking row should exist");
999
1000        assert_eq!(row.version, "create_widgets");
1001        assert!(row.applied_at > 0);
1002    }
1003
1004    #[tokio::test]
1005    async fn remove_applied_migration_deletes_tracking_row() {
1006        let db = setup_db().await;
1007        let manager = SchemaManager::new(&db);
1008        ensure_migration_table(&manager)
1009            .await
1010            .expect("tracking table should exist");
1011        record_applied_migration("create_widgets", &db)
1012            .await
1013            .expect("tracking row should insert");
1014
1015        remove_applied_migration("create_widgets", &db)
1016            .await
1017            .expect("tracking row should delete");
1018
1019        assert!(
1020            seaql_migrations::Entity::find_by_id("create_widgets".to_owned())
1021                .one(&db)
1022                .await
1023                .expect("tracking row query should succeed")
1024                .is_none()
1025        );
1026    }
1027
1028    #[tokio::test]
1029    async fn applied_versions_returns_recorded_names() {
1030        let db = setup_db().await;
1031        let manager = SchemaManager::new(&db);
1032        ensure_migration_table(&manager)
1033            .await
1034            .expect("tracking table should exist");
1035        record_applied_migration("create_widgets", &db)
1036            .await
1037            .expect("first row should insert");
1038        record_applied_migration("create_gadgets", &db)
1039            .await
1040            .expect("second row should insert");
1041
1042        let versions = applied_versions(&db)
1043            .await
1044            .expect("applied versions should load");
1045
1046        assert_eq!(
1047            versions,
1048            std::collections::HashSet::from([
1049                "create_widgets".to_owned(),
1050                "create_gadgets".to_owned(),
1051            ])
1052        );
1053    }
1054
1055    #[tokio::test]
1056    async fn status_marks_mixed_pending_and_applied_migrations() {
1057        let db = setup_db().await;
1058        let manager = SchemaManager::new(&db);
1059        ensure_migration_table(&manager)
1060            .await
1061            .expect("tracking table should exist");
1062        record_applied_migration(CreateWidgets.name(), &db)
1063            .await
1064            .expect("tracking row should insert");
1065
1066        let mut migrator = Migrator::new();
1067        migrator.add(CreateWidgets);
1068        migrator.add(CreateGadgets);
1069
1070        assert_eq!(
1071            migrator.status(&db).await,
1072            vec![
1073                MigrationStatus {
1074                    name: CreateWidgets.name().to_owned(),
1075                    applied: true,
1076                },
1077                MigrationStatus {
1078                    name: CreateGadgets.name().to_owned(),
1079                    applied: false,
1080                },
1081            ]
1082        );
1083    }
1084
1085    #[tokio::test]
1086    async fn create_table_migration_creates_expected_columns() {
1087        let db = setup_db().await;
1088        let mut migrator = Migrator::new();
1089        migrator.add(CreateWidgets);
1090
1091        migrator.up(&db).await.expect("migration should apply");
1092
1093        assert!(column_exists(&db, "widgets", "id").await);
1094        assert!(column_exists(&db, "widgets", "name").await);
1095    }
1096
1097    #[tokio::test]
1098    async fn up_applies_multiple_registered_migrations() {
1099        let db = setup_db().await;
1100        let mut migrator = Migrator::new();
1101        migrator.add(CreateWidgets);
1102        migrator.add(CreateGadgets);
1103
1104        migrator.up(&db).await.expect("migrations should apply");
1105
1106        assert!(
1107            table_exists(&db, "widgets")
1108                .await
1109                .expect("widgets table check should work")
1110        );
1111        assert!(
1112            table_exists(&db, "gadgets")
1113                .await
1114                .expect("gadgets table check should work")
1115        );
1116    }
1117
1118    #[tokio::test]
1119    async fn up_skips_previously_applied_migration_rows() {
1120        let db = setup_db().await;
1121        let manager = SchemaManager::new(&db);
1122        ensure_migration_table(&manager)
1123            .await
1124            .expect("tracking table should exist");
1125        record_applied_migration(CreateWidgets.name(), &db)
1126            .await
1127            .expect("tracking row should insert");
1128
1129        let mut migrator = Migrator::new();
1130        migrator.add(CreateWidgets);
1131
1132        migrator
1133            .up(&db)
1134            .await
1135            .expect("migration should skip cleanly");
1136
1137        assert!(
1138            !table_exists(&db, "widgets")
1139                .await
1140                .expect("widgets table check should work")
1141        );
1142    }
1143
1144    #[tokio::test]
1145    async fn add_column_migration_adds_column() {
1146        let db = setup_db().await;
1147        let mut migrator = Migrator::new();
1148        migrator.add(CreateWidgets);
1149        migrator.add(AddWidgetDescription);
1150
1151        migrator.up(&db).await.expect("migrations should apply");
1152
1153        assert!(column_exists(&db, "widgets", "description").await);
1154    }
1155
1156    #[tokio::test]
1157    async fn remove_column_migration_removes_column() {
1158        let db = setup_db().await;
1159        let mut migrator = Migrator::new();
1160        migrator.add(CreateWidgets);
1161        migrator.add(RemoveWidgetName);
1162
1163        migrator.up(&db).await.expect("migrations should apply");
1164
1165        assert!(!column_exists(&db, "widgets", "name").await);
1166    }
1167
1168    #[tokio::test]
1169    async fn add_index_migration_creates_index() {
1170        let db = setup_db().await;
1171        let mut migrator = Migrator::new();
1172        migrator.add(CreateWidgets);
1173        migrator.add(AddWidgetNameIndex);
1174
1175        migrator.up(&db).await.expect("migrations should apply");
1176
1177        assert!(index_exists(&db, "widgets", "idx_widgets_name").await);
1178    }
1179
1180    #[tokio::test]
1181    async fn remove_index_migration_drops_index() {
1182        let db = setup_db().await;
1183        let mut migrator = Migrator::new();
1184        migrator.add(CreateWidgets);
1185        migrator.add(AddWidgetNameIndex);
1186        migrator.add(RemoveWidgetNameIndex);
1187
1188        migrator.up(&db).await.expect("migrations should apply");
1189
1190        assert!(!index_exists(&db, "widgets", "idx_widgets_name").await);
1191    }
1192
1193    #[tokio::test]
1194    async fn rename_column_migration_renames_column() {
1195        let db = setup_db().await;
1196        let mut migrator = Migrator::new();
1197        migrator.add(CreateWidgets);
1198        migrator.add(RenameWidgetNameToTitle);
1199
1200        migrator.up(&db).await.expect("migrations should apply");
1201
1202        assert!(!column_exists(&db, "widgets", "name").await);
1203        assert!(column_exists(&db, "widgets", "title").await);
1204    }
1205
1206    #[tokio::test]
1207    async fn change_column_type_migration_updates_declared_type() {
1208        let db = setup_db().await;
1209        let mut migrator = Migrator::new();
1210        migrator.add(CreateWidgets);
1211        migrator.add(ChangeWidgetNameToText);
1212
1213        migrator.up(&db).await.expect("migrations should apply");
1214
1215        let declared_type = column_type(&db, "widgets", "name")
1216            .await
1217            .expect("name column should exist")
1218            .to_uppercase();
1219        assert!(declared_type.contains("TEXT"));
1220    }
1221
1222    #[tokio::test]
1223    async fn down_runs_migrations_in_reverse_order() {
1224        let db = setup_db().await;
1225        db.execute_unprepared(
1226            "CREATE TABLE rollback_log (id INTEGER PRIMARY KEY AUTOINCREMENT, step TEXT NOT NULL)",
1227        )
1228        .await
1229        .expect("rollback log table should be created");
1230
1231        let mut migrator = Migrator::new();
1232        migrator.add(FirstRollbackStep);
1233        migrator.add(SecondRollbackStep);
1234        migrator.up(&db).await.expect("migrations should apply");
1235
1236        migrator
1237            .down(&db)
1238            .await
1239            .expect("migrations should rollback");
1240
1241        assert_eq!(rollback_steps(&db).await, vec!["second", "first"]);
1242    }
1243
1244    #[tokio::test]
1245    async fn down_removes_tracking_rows_after_rollback() {
1246        let db = setup_db().await;
1247        let mut migrator = Migrator::new();
1248        migrator.add(CreateWidgets);
1249        migrator.add(CreateGadgets);
1250        migrator.up(&db).await.expect("migrations should apply");
1251
1252        migrator
1253            .down(&db)
1254            .await
1255            .expect("migrations should rollback");
1256
1257        let applied = seaql_migrations::Entity::find()
1258            .all(&db)
1259            .await
1260            .expect("tracking rows should load");
1261        assert!(applied.is_empty());
1262    }
1263
1264    #[tokio::test]
1265    async fn wrapped_migration_rolls_back_side_effects_on_error() {
1266        let db = setup_db().await;
1267        db.execute_unprepared(
1268            "CREATE TABLE audit_entries (id INTEGER PRIMARY KEY AUTOINCREMENT, message TEXT NOT NULL)",
1269        )
1270        .await
1271        .expect("audit table should be created");
1272
1273        let mut migrator = Migrator::new();
1274        migrator.add(InsertAuditThenFailWrapped);
1275
1276        let error = migrator.up(&db).await.expect_err("migration should fail");
1277
1278        assert!(matches!(error, DbErr::Custom(message) if message == "wrapped migration failure"));
1279        assert_eq!(table_row_count(&db, "audit_entries").await, 0);
1280    }
1281
1282    #[tokio::test]
1283    async fn unwrapped_migration_leaves_side_effects_on_error() {
1284        let db = setup_db().await;
1285        db.execute_unprepared(
1286            "CREATE TABLE audit_entries (id INTEGER PRIMARY KEY AUTOINCREMENT, message TEXT NOT NULL)",
1287        )
1288        .await
1289        .expect("audit table should be created");
1290
1291        let mut migrator = Migrator::new();
1292        migrator.add(InsertAuditThenFailUnwrapped);
1293
1294        let error = migrator.up(&db).await.expect_err("migration should fail");
1295
1296        assert!(
1297            matches!(error, DbErr::Custom(message) if message == "unwrapped migration failure")
1298        );
1299        assert_eq!(table_row_count(&db, "audit_entries").await, 1);
1300    }
1301
1302    #[tokio::test]
1303    async fn reversible_rename_migration_restores_original_schema_on_down() {
1304        let db = setup_db().await;
1305        let manager = SchemaManager::new(&db);
1306        CreateWidgets
1307            .up(&manager)
1308            .await
1309            .expect("base table should be created");
1310
1311        let mut migrator = Migrator::new();
1312        migrator.add(RenameWidgetNameToTitle);
1313        migrator
1314            .up(&db)
1315            .await
1316            .expect("rename migration should apply");
1317        assert!(column_exists(&db, "widgets", "title").await);
1318
1319        migrator
1320            .down(&db)
1321            .await
1322            .expect("rename migration should rollback");
1323
1324        assert!(column_exists(&db, "widgets", "name").await);
1325        assert!(!column_exists(&db, "widgets", "title").await);
1326    }
1327
1328    #[test]
1329    fn should_wrap_in_transaction_honors_backend_defaults_and_overrides() {
1330        let postgres_default = TransactionPreferenceMigration { preference: None };
1331        let always_wrap = TransactionPreferenceMigration {
1332            preference: Some(true),
1333        };
1334        let never_wrap = TransactionPreferenceMigration {
1335            preference: Some(false),
1336        };
1337
1338        assert!(should_wrap_in_transaction(
1339            &postgres_default,
1340            DbBackend::Postgres
1341        ));
1342        assert!(!should_wrap_in_transaction(
1343            &postgres_default,
1344            DbBackend::Sqlite
1345        ));
1346        assert!(should_wrap_in_transaction(&always_wrap, DbBackend::Sqlite));
1347        assert!(!should_wrap_in_transaction(
1348            &never_wrap,
1349            DbBackend::Postgres
1350        ));
1351    }
1352
1353    #[tokio::test]
1354    async fn status_creates_tracking_table_without_recording_rows() {
1355        let db = setup_db().await;
1356        let migrator = Migrator::new();
1357
1358        let statuses = migrator.status(&db).await;
1359
1360        assert!(statuses.is_empty());
1361        assert!(
1362            table_exists(&db, "seaql_migrations")
1363                .await
1364                .expect("tracking table check should work")
1365        );
1366        assert!(
1367            applied_versions(&db)
1368                .await
1369                .expect("applied versions should load")
1370                .is_empty()
1371        );
1372    }
1373
1374    #[tokio::test]
1375    async fn down_skips_pending_migrations_and_rolls_back_only_applied_entries() {
1376        let db = setup_db().await;
1377        let mut up_migrator = Migrator::new();
1378        up_migrator.add(CreateWidgets);
1379        up_migrator
1380            .up(&db)
1381            .await
1382            .expect("base migration should apply");
1383
1384        let mut down_migrator = Migrator::new();
1385        down_migrator.add(CreateWidgets);
1386        down_migrator.add(CreateGadgets);
1387        down_migrator
1388            .down(&db)
1389            .await
1390            .expect("rollback should skip pending migrations");
1391
1392        assert!(
1393            !table_exists(&db, "widgets")
1394                .await
1395                .expect("widgets table check should work")
1396        );
1397        assert!(
1398            !table_exists(&db, "gadgets")
1399                .await
1400                .expect("gadgets table check should work")
1401        );
1402        assert!(
1403            applied_versions(&db)
1404                .await
1405                .expect("applied versions should load")
1406                .is_empty()
1407        );
1408    }
1409
1410    #[tokio::test]
1411    async fn up_stops_after_first_failure_and_skips_later_migrations() {
1412        let db = setup_db().await;
1413        db.execute_unprepared(
1414            "CREATE TABLE audit_entries (id INTEGER PRIMARY KEY AUTOINCREMENT, message TEXT NOT NULL)",
1415        )
1416        .await
1417        .expect("audit table should be created");
1418
1419        let mut migrator = Migrator::new();
1420        migrator.add(CreateWidgets);
1421        migrator.add(InsertAuditThenFailWrapped);
1422        migrator.add(CreateGadgets);
1423
1424        let error = migrator
1425            .up(&db)
1426            .await
1427            .expect_err("migration batch should fail");
1428
1429        assert!(matches!(error, DbErr::Custom(message) if message == "wrapped migration failure"));
1430        assert!(
1431            table_exists(&db, "widgets")
1432                .await
1433                .expect("widgets table check should work")
1434        );
1435        assert!(
1436            !table_exists(&db, "gadgets")
1437                .await
1438                .expect("gadgets table check should work")
1439        );
1440        assert_eq!(table_row_count(&db, "audit_entries").await, 0);
1441        assert_eq!(
1442            applied_versions(&db)
1443                .await
1444                .expect("applied versions should load"),
1445            std::collections::HashSet::from([CreateWidgets.name().to_owned()])
1446        );
1447    }
1448}