1use chrono::Utc;
2use rustrails_support::{database, runtime};
3use sea_orm::{
4 ActiveModelTrait, ActiveValue::Set, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter,
5};
6use sea_orm_migration::{prelude::*, seaql_migrations};
7
8const MIGRATION_TABLE: &str = "seaql_migrations";
9
10pub trait Migration: Send + Sync {
12 fn name(&self) -> &str;
14
15 fn version(&self) -> &str;
17
18 async fn up(&self, manager: &SchemaManager<'_>) -> Result<(), sea_orm::DbErr>;
20
21 async fn down(&self, manager: &SchemaManager<'_>) -> Result<(), sea_orm::DbErr>;
23}
24
25pub struct Migrator {
27 migrations: Vec<Box<dyn MigrationTrait>>,
28}
29
30impl Migrator {
31 pub fn new() -> Self {
33 Self {
34 migrations: Vec::new(),
35 }
36 }
37
38 pub fn add(&mut self, migration: impl MigrationTrait + 'static) {
40 self.migrations.push(Box::new(migration));
41 }
42
43 pub async fn up(&self, db: &DatabaseConnection) -> Result<(), sea_orm::DbErr> {
45 let manager = SchemaManager::new(db);
46 ensure_migration_table(&manager).await?;
47 let mut applied = applied_versions(db).await?;
48
49 for migration in &self.migrations {
50 if applied.contains(migration.name()) {
51 continue;
52 }
53
54 run_up_migration(migration.as_ref(), &manager).await?;
55 record_applied_migration(migration.name(), db).await?;
56 applied.insert(migration.name().to_owned());
57 }
58
59 Ok(())
60 }
61
62 pub fn up_sync(&self) -> Result<(), sea_orm::DbErr> {
64 database::with_db(|db| runtime::block_on(self.up(db)))
65 }
66
67 pub async fn down(&self, db: &DatabaseConnection) -> Result<(), sea_orm::DbErr> {
69 let manager = SchemaManager::new(db);
70 ensure_migration_table(&manager).await?;
71 let mut applied = applied_versions(db).await?;
72
73 for migration in self.migrations.iter().rev() {
74 if !applied.contains(migration.name()) {
75 continue;
76 }
77
78 run_down_migration(migration.as_ref(), &manager).await?;
79 remove_applied_migration(migration.name(), db).await?;
80 applied.remove(migration.name());
81 }
82
83 Ok(())
84 }
85
86 pub fn down_sync(&self) -> Result<(), sea_orm::DbErr> {
88 database::with_db(|db| runtime::block_on(self.down(db)))
89 }
90
91 pub async fn status(&self, db: &DatabaseConnection) -> Vec<MigrationStatus> {
93 let manager = SchemaManager::new(db);
94 let applied = match ensure_migration_table(&manager).await {
95 Ok(()) => applied_versions(db).await.unwrap_or_default(),
96 Err(_) => std::collections::HashSet::new(),
97 };
98
99 self.migrations
100 .iter()
101 .map(|migration| MigrationStatus {
102 name: migration.name().to_owned(),
103 applied: applied.contains(migration.name()),
104 })
105 .collect()
106 }
107
108 pub fn status_sync(&self) -> Vec<MigrationStatus> {
110 database::with_db(|db| runtime::block_on(self.status(db)))
111 }
112}
113
114impl Default for Migrator {
115 fn default() -> Self {
116 Self::new()
117 }
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct MigrationStatus {
123 pub name: String,
125 pub applied: bool,
127}
128
129async fn ensure_migration_table(manager: &SchemaManager<'_>) -> Result<(), sea_orm::DbErr> {
130 manager
131 .create_table(
132 Table::create()
133 .table(Alias::new(MIGRATION_TABLE))
134 .if_not_exists()
135 .col(
136 ColumnDef::new(Alias::new("version"))
137 .string()
138 .not_null()
139 .primary_key(),
140 )
141 .col(
142 ColumnDef::new(Alias::new("applied_at"))
143 .big_integer()
144 .not_null(),
145 )
146 .to_owned(),
147 )
148 .await
149}
150
151async fn applied_versions(
152 db: &DatabaseConnection,
153) -> Result<std::collections::HashSet<String>, sea_orm::DbErr> {
154 Ok(seaql_migrations::Entity::find()
155 .all(db)
156 .await?
157 .into_iter()
158 .map(|model| model.version)
159 .collect())
160}
161
162async fn record_applied_migration(
163 name: &str,
164 db: &DatabaseConnection,
165) -> Result<(), sea_orm::DbErr> {
166 seaql_migrations::ActiveModel {
167 version: Set(name.to_owned()),
168 applied_at: Set(Utc::now().timestamp()),
169 }
170 .insert(db)
171 .await?;
172 Ok(())
173}
174
175async fn remove_applied_migration(
176 name: &str,
177 db: &DatabaseConnection,
178) -> Result<(), sea_orm::DbErr> {
179 seaql_migrations::Entity::delete_many()
180 .filter(seaql_migrations::Column::Version.eq(name))
181 .exec(db)
182 .await?;
183 Ok(())
184}
185
186async fn run_up_migration(
187 migration: &dyn MigrationTrait,
188 manager: &SchemaManager<'_>,
189) -> Result<(), sea_orm::DbErr> {
190 if should_wrap_in_transaction(migration, manager.get_database_backend()) {
191 let tx_manager = manager.begin().await?;
192 migration.up(&tx_manager).await?;
193 tx_manager.commit().await?;
194 Ok(())
195 } else {
196 migration.up(manager).await
197 }
198}
199
200async fn run_down_migration(
201 migration: &dyn MigrationTrait,
202 manager: &SchemaManager<'_>,
203) -> Result<(), sea_orm::DbErr> {
204 if should_wrap_in_transaction(migration, manager.get_database_backend()) {
205 let tx_manager = manager.begin().await?;
206 migration.down(&tx_manager).await?;
207 tx_manager.commit().await?;
208 Ok(())
209 } else {
210 migration.down(manager).await
211 }
212}
213
214fn should_wrap_in_transaction(migration: &dyn MigrationTrait, backend: sea_orm::DbBackend) -> bool {
215 migration
216 .use_transaction()
217 .unwrap_or(matches!(backend, sea_orm::DbBackend::Postgres))
218}
219
220#[cfg(test)]
221mod tests {
222 use rustrails_support::{database, runtime};
223 use sea_orm::{ConnectionTrait, Database, DbBackend, EntityTrait, Statement};
224 use sea_orm_migration::{prelude::*, seaql_migrations};
225
226 use super::{
227 MigrationStatus, Migrator, applied_versions, ensure_migration_table,
228 record_applied_migration, remove_applied_migration, should_wrap_in_transaction,
229 };
230
231 fn run_sync_migration_test(test: impl FnOnce() + Send + 'static) {
232 std::thread::spawn(move || {
233 let _rt = runtime::init_runtime();
234 database::establish("sqlite::memory:")
235 .expect("sqlite in-memory connection should succeed");
236 test();
237 })
238 .join()
239 .unwrap();
240 }
241
242 struct CreateWidgets;
243
244 impl MigrationName for CreateWidgets {
245 fn name(&self) -> &str {
246 "create_widgets"
247 }
248 }
249
250 #[async_trait::async_trait]
251 impl MigrationTrait for CreateWidgets {
252 async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
253 manager
254 .create_table(
255 Table::create()
256 .table(Alias::new("widgets"))
257 .if_not_exists()
258 .col(
259 ColumnDef::new(Alias::new("id"))
260 .integer()
261 .not_null()
262 .auto_increment()
263 .primary_key(),
264 )
265 .col(ColumnDef::new(Alias::new("name")).string().not_null())
266 .to_owned(),
267 )
268 .await
269 }
270
271 async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
272 manager
273 .drop_table(
274 Table::drop()
275 .table(Alias::new("widgets"))
276 .if_exists()
277 .to_owned(),
278 )
279 .await
280 }
281 }
282
283 struct CreateGadgets;
284
285 impl MigrationName for CreateGadgets {
286 fn name(&self) -> &str {
287 "create_gadgets"
288 }
289 }
290
291 #[async_trait::async_trait]
292 impl MigrationTrait for CreateGadgets {
293 async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
294 manager
295 .create_table(
296 Table::create()
297 .table(Alias::new("gadgets"))
298 .if_not_exists()
299 .col(
300 ColumnDef::new(Alias::new("id"))
301 .integer()
302 .not_null()
303 .auto_increment()
304 .primary_key(),
305 )
306 .col(ColumnDef::new(Alias::new("title")).string().not_null())
307 .to_owned(),
308 )
309 .await
310 }
311
312 async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
313 manager
314 .drop_table(
315 Table::drop()
316 .table(Alias::new("gadgets"))
317 .if_exists()
318 .to_owned(),
319 )
320 .await
321 }
322 }
323
324 struct AddWidgetDescription;
325
326 impl MigrationName for AddWidgetDescription {
327 fn name(&self) -> &str {
328 "add_widget_description"
329 }
330 }
331
332 #[async_trait::async_trait]
333 impl MigrationTrait for AddWidgetDescription {
334 async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
335 manager
336 .get_connection()
337 .execute_unprepared("ALTER TABLE widgets ADD COLUMN description TEXT")
338 .await?;
339 Ok(())
340 }
341
342 async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
343 manager
344 .get_connection()
345 .execute_unprepared("ALTER TABLE widgets DROP COLUMN description")
346 .await?;
347 Ok(())
348 }
349 }
350
351 struct RemoveWidgetName;
352
353 impl MigrationName for RemoveWidgetName {
354 fn name(&self) -> &str {
355 "remove_widget_name"
356 }
357 }
358
359 #[async_trait::async_trait]
360 impl MigrationTrait for RemoveWidgetName {
361 async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
362 manager
363 .get_connection()
364 .execute_unprepared("ALTER TABLE widgets DROP COLUMN name")
365 .await?;
366 Ok(())
367 }
368
369 async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
370 manager
371 .get_connection()
372 .execute_unprepared(
373 "ALTER TABLE widgets ADD COLUMN name VARCHAR NOT NULL DEFAULT ''",
374 )
375 .await?;
376 Ok(())
377 }
378 }
379
380 struct AddWidgetNameIndex;
381
382 impl MigrationName for AddWidgetNameIndex {
383 fn name(&self) -> &str {
384 "add_widget_name_index"
385 }
386 }
387
388 #[async_trait::async_trait]
389 impl MigrationTrait for AddWidgetNameIndex {
390 async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
391 manager
392 .create_index(
393 Index::create()
394 .name("idx_widgets_name")
395 .table(Alias::new("widgets"))
396 .col(Alias::new("name"))
397 .to_owned(),
398 )
399 .await
400 }
401
402 async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
403 manager
404 .drop_index(
405 Index::drop()
406 .name("idx_widgets_name")
407 .table(Alias::new("widgets"))
408 .to_owned(),
409 )
410 .await
411 }
412 }
413
414 struct RemoveWidgetNameIndex;
415
416 impl MigrationName for RemoveWidgetNameIndex {
417 fn name(&self) -> &str {
418 "remove_widget_name_index"
419 }
420 }
421
422 #[async_trait::async_trait]
423 impl MigrationTrait for RemoveWidgetNameIndex {
424 async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
425 manager
426 .drop_index(
427 Index::drop()
428 .name("idx_widgets_name")
429 .table(Alias::new("widgets"))
430 .to_owned(),
431 )
432 .await
433 }
434
435 async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
436 manager
437 .create_index(
438 Index::create()
439 .name("idx_widgets_name")
440 .table(Alias::new("widgets"))
441 .col(Alias::new("name"))
442 .to_owned(),
443 )
444 .await
445 }
446 }
447
448 struct RenameWidgetNameToTitle;
449
450 impl MigrationName for RenameWidgetNameToTitle {
451 fn name(&self) -> &str {
452 "rename_widget_name_to_title"
453 }
454 }
455
456 #[async_trait::async_trait]
457 impl MigrationTrait for RenameWidgetNameToTitle {
458 async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
459 manager
460 .get_connection()
461 .execute_unprepared("ALTER TABLE widgets RENAME COLUMN name TO title")
462 .await?;
463 Ok(())
464 }
465
466 async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
467 manager
468 .get_connection()
469 .execute_unprepared("ALTER TABLE widgets RENAME COLUMN title TO name")
470 .await?;
471 Ok(())
472 }
473 }
474
475 struct ChangeWidgetNameToText;
476
477 impl MigrationName for ChangeWidgetNameToText {
478 fn name(&self) -> &str {
479 "change_widget_name_to_text"
480 }
481 }
482
483 #[async_trait::async_trait]
484 impl MigrationTrait for ChangeWidgetNameToText {
485 async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
486 manager
487 .get_connection()
488 .execute_unprepared(
489 "ALTER TABLE widgets ADD COLUMN name_text TEXT NOT NULL DEFAULT ''",
490 )
491 .await?;
492 manager
493 .get_connection()
494 .execute_unprepared("UPDATE widgets SET name_text = name")
495 .await?;
496 manager
497 .get_connection()
498 .execute_unprepared("ALTER TABLE widgets DROP COLUMN name")
499 .await?;
500 manager
501 .get_connection()
502 .execute_unprepared("ALTER TABLE widgets RENAME COLUMN name_text TO name")
503 .await?;
504 Ok(())
505 }
506
507 async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
508 manager
509 .get_connection()
510 .execute_unprepared(
511 "ALTER TABLE widgets ADD COLUMN name_string VARCHAR NOT NULL DEFAULT ''",
512 )
513 .await?;
514 manager
515 .get_connection()
516 .execute_unprepared("UPDATE widgets SET name_string = name")
517 .await?;
518 manager
519 .get_connection()
520 .execute_unprepared("ALTER TABLE widgets DROP COLUMN name")
521 .await?;
522 manager
523 .get_connection()
524 .execute_unprepared("ALTER TABLE widgets RENAME COLUMN name_string TO name")
525 .await?;
526 Ok(())
527 }
528 }
529
530 struct FirstRollbackStep;
531
532 impl MigrationName for FirstRollbackStep {
533 fn name(&self) -> &str {
534 "first_rollback_step"
535 }
536 }
537
538 #[async_trait::async_trait]
539 impl MigrationTrait for FirstRollbackStep {
540 async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
541 manager
542 .create_table(
543 Table::create()
544 .table(Alias::new("first_step"))
545 .if_not_exists()
546 .col(
547 ColumnDef::new(Alias::new("id"))
548 .integer()
549 .not_null()
550 .auto_increment()
551 .primary_key(),
552 )
553 .to_owned(),
554 )
555 .await
556 }
557
558 async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
559 manager
560 .get_connection()
561 .execute_unprepared("INSERT INTO rollback_log (step) VALUES ('first')")
562 .await?;
563 manager
564 .drop_table(
565 Table::drop()
566 .table(Alias::new("first_step"))
567 .if_exists()
568 .to_owned(),
569 )
570 .await
571 }
572 }
573
574 struct SecondRollbackStep;
575
576 impl MigrationName for SecondRollbackStep {
577 fn name(&self) -> &str {
578 "second_rollback_step"
579 }
580 }
581
582 #[async_trait::async_trait]
583 impl MigrationTrait for SecondRollbackStep {
584 async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
585 manager
586 .create_table(
587 Table::create()
588 .table(Alias::new("second_step"))
589 .if_not_exists()
590 .col(
591 ColumnDef::new(Alias::new("id"))
592 .integer()
593 .not_null()
594 .auto_increment()
595 .primary_key(),
596 )
597 .to_owned(),
598 )
599 .await
600 }
601
602 async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
603 manager
604 .get_connection()
605 .execute_unprepared("INSERT INTO rollback_log (step) VALUES ('second')")
606 .await?;
607 manager
608 .drop_table(
609 Table::drop()
610 .table(Alias::new("second_step"))
611 .if_exists()
612 .to_owned(),
613 )
614 .await
615 }
616 }
617
618 struct InsertAuditThenFailWrapped;
619
620 impl MigrationName for InsertAuditThenFailWrapped {
621 fn name(&self) -> &str {
622 "insert_audit_then_fail_wrapped"
623 }
624 }
625
626 #[async_trait::async_trait]
627 impl MigrationTrait for InsertAuditThenFailWrapped {
628 async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
629 manager
630 .get_connection()
631 .execute_unprepared("INSERT INTO audit_entries (message) VALUES ('wrapped')")
632 .await?;
633 Err(DbErr::Custom("wrapped migration failure".to_owned()))
634 }
635
636 async fn down(&self, _: &SchemaManager) -> Result<(), DbErr> {
637 Ok(())
638 }
639
640 fn use_transaction(&self) -> Option<bool> {
641 Some(true)
642 }
643 }
644
645 struct InsertAuditThenFailUnwrapped;
646
647 impl MigrationName for InsertAuditThenFailUnwrapped {
648 fn name(&self) -> &str {
649 "insert_audit_then_fail_unwrapped"
650 }
651 }
652
653 #[async_trait::async_trait]
654 impl MigrationTrait for InsertAuditThenFailUnwrapped {
655 async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
656 manager
657 .get_connection()
658 .execute_unprepared("INSERT INTO audit_entries (message) VALUES ('unwrapped')")
659 .await?;
660 Err(DbErr::Custom("unwrapped migration failure".to_owned()))
661 }
662
663 async fn down(&self, _: &SchemaManager) -> Result<(), DbErr> {
664 Ok(())
665 }
666
667 fn use_transaction(&self) -> Option<bool> {
668 Some(false)
669 }
670 }
671
672 struct TransactionPreferenceMigration {
673 preference: Option<bool>,
674 }
675
676 impl MigrationName for TransactionPreferenceMigration {
677 fn name(&self) -> &str {
678 "transaction_preference_migration"
679 }
680 }
681
682 #[async_trait::async_trait]
683 impl MigrationTrait for TransactionPreferenceMigration {
684 async fn up(&self, _: &SchemaManager) -> Result<(), DbErr> {
685 Ok(())
686 }
687
688 async fn down(&self, _: &SchemaManager) -> Result<(), DbErr> {
689 Ok(())
690 }
691
692 fn use_transaction(&self) -> Option<bool> {
693 self.preference
694 }
695 }
696
697 async fn setup_db() -> sea_orm::DatabaseConnection {
698 Database::connect("sqlite::memory:")
699 .await
700 .expect("in-memory sqlite connection should succeed")
701 }
702
703 async fn table_exists(
704 db: &sea_orm::DatabaseConnection,
705 table_name: &str,
706 ) -> Result<bool, sea_orm::DbErr> {
707 Ok(db
708 .query_one_raw(Statement::from_sql_and_values(
709 db.get_database_backend(),
710 "SELECT name FROM sqlite_master WHERE type = ? AND name = ?",
711 ["table".into(), table_name.into()],
712 ))
713 .await?
714 .is_some())
715 }
716
717 async fn column_exists(
718 db: &sea_orm::DatabaseConnection,
719 table_name: &str,
720 column_name: &str,
721 ) -> bool {
722 db.query_one_raw(Statement::from_string(
723 db.get_database_backend(),
724 format!(
725 "SELECT name FROM pragma_table_info('{table_name}') WHERE name = '{column_name}'"
726 ),
727 ))
728 .await
729 .expect("column existence query should work")
730 .is_some()
731 }
732
733 async fn column_type(
734 db: &sea_orm::DatabaseConnection,
735 table_name: &str,
736 column_name: &str,
737 ) -> Option<String> {
738 db.query_one_raw(Statement::from_string(
739 db.get_database_backend(),
740 format!(
741 "SELECT type FROM pragma_table_info('{table_name}') WHERE name = '{column_name}'"
742 ),
743 ))
744 .await
745 .expect("column type query should work")
746 .map(|row| row.try_get("", "type").expect("type should be readable"))
747 }
748
749 async fn index_exists(
750 db: &sea_orm::DatabaseConnection,
751 table_name: &str,
752 index_name: &str,
753 ) -> bool {
754 db.query_one_raw(Statement::from_string(
755 db.get_database_backend(),
756 format!(
757 "SELECT name FROM pragma_index_list('{table_name}') WHERE name = '{index_name}'"
758 ),
759 ))
760 .await
761 .expect("index existence query should work")
762 .is_some()
763 }
764
765 async fn table_row_count(db: &sea_orm::DatabaseConnection, table_name: &str) -> i64 {
766 db.query_one_raw(Statement::from_string(
767 db.get_database_backend(),
768 format!("SELECT COUNT(*) AS count FROM {table_name}"),
769 ))
770 .await
771 .expect("row count query should work")
772 .expect("count row should exist")
773 .try_get("", "count")
774 .expect("count should be readable")
775 }
776
777 async fn rollback_steps(db: &sea_orm::DatabaseConnection) -> Vec<String> {
778 db.query_all_raw(Statement::from_string(
779 db.get_database_backend(),
780 "SELECT step FROM rollback_log ORDER BY id".to_owned(),
781 ))
782 .await
783 .expect("rollback log query should work")
784 .into_iter()
785 .map(|row| row.try_get("", "step").expect("step should be readable"))
786 .collect()
787 }
788
789 #[tokio::test]
790 async fn new_migrator_starts_empty() {
791 let migrator = Migrator::new();
792 let db = setup_db().await;
793
794 let statuses = migrator.status(&db).await;
795
796 assert!(statuses.is_empty());
797 }
798
799 #[tokio::test]
800 async fn up_applies_registered_migrations() {
801 let db = setup_db().await;
802 let mut migrator = Migrator::new();
803 migrator.add(CreateWidgets);
804
805 migrator.up(&db).await.expect("migration should apply");
806
807 assert!(
808 table_exists(&db, "widgets")
809 .await
810 .expect("table existence query should work")
811 );
812 }
813
814 #[test]
815 fn up_sync_applies_registered_migrations() {
816 run_sync_migration_test(|| {
817 let mut migrator = Migrator::new();
818 migrator.add(CreateWidgets);
819
820 migrator.up_sync().expect("migration should apply");
821
822 let widgets_exist = runtime::block_on(async {
823 let db = database::db();
824 table_exists(&db, "widgets")
825 .await
826 .expect("table existence query should work")
827 });
828 assert!(widgets_exist);
829 });
830 }
831
832 #[tokio::test]
833 async fn down_rolls_back_applied_migrations() {
834 let db = setup_db().await;
835 let mut migrator = Migrator::new();
836 migrator.add(CreateWidgets);
837 migrator.up(&db).await.expect("migration should apply");
838
839 migrator.down(&db).await.expect("migration should rollback");
840
841 assert!(
842 !table_exists(&db, "widgets")
843 .await
844 .expect("table existence query should work")
845 );
846 }
847
848 #[test]
849 fn down_sync_rolls_back_applied_migrations() {
850 run_sync_migration_test(|| {
851 let mut migrator = Migrator::new();
852 migrator.add(CreateWidgets);
853 migrator.up_sync().expect("migration should apply");
854
855 migrator.down_sync().expect("migration should rollback");
856
857 let widgets_exist = runtime::block_on(async {
858 let db = database::db();
859 table_exists(&db, "widgets")
860 .await
861 .expect("table existence query should work")
862 });
863 assert!(!widgets_exist);
864 });
865 }
866
867 #[tokio::test]
868 async fn status_marks_pending_and_applied_migrations() {
869 let db = setup_db().await;
870 let mut migrator = Migrator::new();
871 migrator.add(CreateWidgets);
872
873 let before = migrator.status(&db).await;
874 assert_eq!(
875 before,
876 vec![MigrationStatus {
877 name: CreateWidgets.name().to_owned(),
878 applied: false,
879 }]
880 );
881
882 migrator.up(&db).await.expect("migration should apply");
883
884 let after = migrator.status(&db).await;
885 assert_eq!(
886 after,
887 vec![MigrationStatus {
888 name: CreateWidgets.name().to_owned(),
889 applied: true,
890 }]
891 );
892 }
893
894 #[test]
895 fn status_sync_marks_pending_and_applied_migrations() {
896 run_sync_migration_test(|| {
897 let mut migrator = Migrator::new();
898 migrator.add(CreateWidgets);
899
900 assert_eq!(
901 migrator.status_sync(),
902 vec![MigrationStatus {
903 name: CreateWidgets.name().to_owned(),
904 applied: false,
905 }]
906 );
907
908 migrator.up_sync().expect("migration should apply");
909
910 assert_eq!(
911 migrator.status_sync(),
912 vec![MigrationStatus {
913 name: CreateWidgets.name().to_owned(),
914 applied: true,
915 }]
916 );
917 });
918 }
919
920 #[tokio::test]
921 async fn repeated_up_skips_already_applied_migrations() {
922 let db = setup_db().await;
923 let mut migrator = Migrator::new();
924 migrator.add(CreateWidgets);
925
926 migrator.up(&db).await.expect("first up should succeed");
927 migrator.up(&db).await.expect("second up should be a no-op");
928
929 let applied = seaql_migrations::Entity::find()
930 .all(&db)
931 .await
932 .expect("migration rows should load");
933 assert_eq!(applied.len(), 1);
934 }
935
936 #[tokio::test]
937 async fn default_migrator_matches_new() {
938 let db = setup_db().await;
939 let migrator = Migrator::default();
940
941 assert_eq!(
942 migrator.status(&db).await,
943 Migrator::new().status(&db).await
944 );
945 }
946
947 #[tokio::test]
948 async fn ensure_migration_table_creates_tracking_table() {
949 let db = setup_db().await;
950 let manager = SchemaManager::new(&db);
951
952 ensure_migration_table(&manager)
953 .await
954 .expect("tracking table should be created");
955
956 assert!(
957 table_exists(&db, "seaql_migrations")
958 .await
959 .expect("table check should work")
960 );
961 }
962
963 #[tokio::test]
964 async fn ensure_migration_table_is_idempotent() {
965 let db = setup_db().await;
966 let manager = SchemaManager::new(&db);
967
968 ensure_migration_table(&manager)
969 .await
970 .expect("first create should succeed");
971 ensure_migration_table(&manager)
972 .await
973 .expect("second create should succeed");
974
975 assert!(
976 table_exists(&db, "seaql_migrations")
977 .await
978 .expect("table check should work")
979 );
980 }
981
982 #[tokio::test]
983 async fn record_applied_migration_inserts_row_with_timestamp() {
984 let db = setup_db().await;
985 let manager = SchemaManager::new(&db);
986 ensure_migration_table(&manager)
987 .await
988 .expect("tracking table should exist");
989
990 record_applied_migration("create_widgets", &db)
991 .await
992 .expect("tracking row should insert");
993
994 let row = seaql_migrations::Entity::find_by_id("create_widgets".to_owned())
995 .one(&db)
996 .await
997 .expect("tracking row should load")
998 .expect("tracking row should exist");
999
1000 assert_eq!(row.version, "create_widgets");
1001 assert!(row.applied_at > 0);
1002 }
1003
1004 #[tokio::test]
1005 async fn remove_applied_migration_deletes_tracking_row() {
1006 let db = setup_db().await;
1007 let manager = SchemaManager::new(&db);
1008 ensure_migration_table(&manager)
1009 .await
1010 .expect("tracking table should exist");
1011 record_applied_migration("create_widgets", &db)
1012 .await
1013 .expect("tracking row should insert");
1014
1015 remove_applied_migration("create_widgets", &db)
1016 .await
1017 .expect("tracking row should delete");
1018
1019 assert!(
1020 seaql_migrations::Entity::find_by_id("create_widgets".to_owned())
1021 .one(&db)
1022 .await
1023 .expect("tracking row query should succeed")
1024 .is_none()
1025 );
1026 }
1027
1028 #[tokio::test]
1029 async fn applied_versions_returns_recorded_names() {
1030 let db = setup_db().await;
1031 let manager = SchemaManager::new(&db);
1032 ensure_migration_table(&manager)
1033 .await
1034 .expect("tracking table should exist");
1035 record_applied_migration("create_widgets", &db)
1036 .await
1037 .expect("first row should insert");
1038 record_applied_migration("create_gadgets", &db)
1039 .await
1040 .expect("second row should insert");
1041
1042 let versions = applied_versions(&db)
1043 .await
1044 .expect("applied versions should load");
1045
1046 assert_eq!(
1047 versions,
1048 std::collections::HashSet::from([
1049 "create_widgets".to_owned(),
1050 "create_gadgets".to_owned(),
1051 ])
1052 );
1053 }
1054
1055 #[tokio::test]
1056 async fn status_marks_mixed_pending_and_applied_migrations() {
1057 let db = setup_db().await;
1058 let manager = SchemaManager::new(&db);
1059 ensure_migration_table(&manager)
1060 .await
1061 .expect("tracking table should exist");
1062 record_applied_migration(CreateWidgets.name(), &db)
1063 .await
1064 .expect("tracking row should insert");
1065
1066 let mut migrator = Migrator::new();
1067 migrator.add(CreateWidgets);
1068 migrator.add(CreateGadgets);
1069
1070 assert_eq!(
1071 migrator.status(&db).await,
1072 vec![
1073 MigrationStatus {
1074 name: CreateWidgets.name().to_owned(),
1075 applied: true,
1076 },
1077 MigrationStatus {
1078 name: CreateGadgets.name().to_owned(),
1079 applied: false,
1080 },
1081 ]
1082 );
1083 }
1084
1085 #[tokio::test]
1086 async fn create_table_migration_creates_expected_columns() {
1087 let db = setup_db().await;
1088 let mut migrator = Migrator::new();
1089 migrator.add(CreateWidgets);
1090
1091 migrator.up(&db).await.expect("migration should apply");
1092
1093 assert!(column_exists(&db, "widgets", "id").await);
1094 assert!(column_exists(&db, "widgets", "name").await);
1095 }
1096
1097 #[tokio::test]
1098 async fn up_applies_multiple_registered_migrations() {
1099 let db = setup_db().await;
1100 let mut migrator = Migrator::new();
1101 migrator.add(CreateWidgets);
1102 migrator.add(CreateGadgets);
1103
1104 migrator.up(&db).await.expect("migrations should apply");
1105
1106 assert!(
1107 table_exists(&db, "widgets")
1108 .await
1109 .expect("widgets table check should work")
1110 );
1111 assert!(
1112 table_exists(&db, "gadgets")
1113 .await
1114 .expect("gadgets table check should work")
1115 );
1116 }
1117
1118 #[tokio::test]
1119 async fn up_skips_previously_applied_migration_rows() {
1120 let db = setup_db().await;
1121 let manager = SchemaManager::new(&db);
1122 ensure_migration_table(&manager)
1123 .await
1124 .expect("tracking table should exist");
1125 record_applied_migration(CreateWidgets.name(), &db)
1126 .await
1127 .expect("tracking row should insert");
1128
1129 let mut migrator = Migrator::new();
1130 migrator.add(CreateWidgets);
1131
1132 migrator
1133 .up(&db)
1134 .await
1135 .expect("migration should skip cleanly");
1136
1137 assert!(
1138 !table_exists(&db, "widgets")
1139 .await
1140 .expect("widgets table check should work")
1141 );
1142 }
1143
1144 #[tokio::test]
1145 async fn add_column_migration_adds_column() {
1146 let db = setup_db().await;
1147 let mut migrator = Migrator::new();
1148 migrator.add(CreateWidgets);
1149 migrator.add(AddWidgetDescription);
1150
1151 migrator.up(&db).await.expect("migrations should apply");
1152
1153 assert!(column_exists(&db, "widgets", "description").await);
1154 }
1155
1156 #[tokio::test]
1157 async fn remove_column_migration_removes_column() {
1158 let db = setup_db().await;
1159 let mut migrator = Migrator::new();
1160 migrator.add(CreateWidgets);
1161 migrator.add(RemoveWidgetName);
1162
1163 migrator.up(&db).await.expect("migrations should apply");
1164
1165 assert!(!column_exists(&db, "widgets", "name").await);
1166 }
1167
1168 #[tokio::test]
1169 async fn add_index_migration_creates_index() {
1170 let db = setup_db().await;
1171 let mut migrator = Migrator::new();
1172 migrator.add(CreateWidgets);
1173 migrator.add(AddWidgetNameIndex);
1174
1175 migrator.up(&db).await.expect("migrations should apply");
1176
1177 assert!(index_exists(&db, "widgets", "idx_widgets_name").await);
1178 }
1179
1180 #[tokio::test]
1181 async fn remove_index_migration_drops_index() {
1182 let db = setup_db().await;
1183 let mut migrator = Migrator::new();
1184 migrator.add(CreateWidgets);
1185 migrator.add(AddWidgetNameIndex);
1186 migrator.add(RemoveWidgetNameIndex);
1187
1188 migrator.up(&db).await.expect("migrations should apply");
1189
1190 assert!(!index_exists(&db, "widgets", "idx_widgets_name").await);
1191 }
1192
1193 #[tokio::test]
1194 async fn rename_column_migration_renames_column() {
1195 let db = setup_db().await;
1196 let mut migrator = Migrator::new();
1197 migrator.add(CreateWidgets);
1198 migrator.add(RenameWidgetNameToTitle);
1199
1200 migrator.up(&db).await.expect("migrations should apply");
1201
1202 assert!(!column_exists(&db, "widgets", "name").await);
1203 assert!(column_exists(&db, "widgets", "title").await);
1204 }
1205
1206 #[tokio::test]
1207 async fn change_column_type_migration_updates_declared_type() {
1208 let db = setup_db().await;
1209 let mut migrator = Migrator::new();
1210 migrator.add(CreateWidgets);
1211 migrator.add(ChangeWidgetNameToText);
1212
1213 migrator.up(&db).await.expect("migrations should apply");
1214
1215 let declared_type = column_type(&db, "widgets", "name")
1216 .await
1217 .expect("name column should exist")
1218 .to_uppercase();
1219 assert!(declared_type.contains("TEXT"));
1220 }
1221
1222 #[tokio::test]
1223 async fn down_runs_migrations_in_reverse_order() {
1224 let db = setup_db().await;
1225 db.execute_unprepared(
1226 "CREATE TABLE rollback_log (id INTEGER PRIMARY KEY AUTOINCREMENT, step TEXT NOT NULL)",
1227 )
1228 .await
1229 .expect("rollback log table should be created");
1230
1231 let mut migrator = Migrator::new();
1232 migrator.add(FirstRollbackStep);
1233 migrator.add(SecondRollbackStep);
1234 migrator.up(&db).await.expect("migrations should apply");
1235
1236 migrator
1237 .down(&db)
1238 .await
1239 .expect("migrations should rollback");
1240
1241 assert_eq!(rollback_steps(&db).await, vec!["second", "first"]);
1242 }
1243
1244 #[tokio::test]
1245 async fn down_removes_tracking_rows_after_rollback() {
1246 let db = setup_db().await;
1247 let mut migrator = Migrator::new();
1248 migrator.add(CreateWidgets);
1249 migrator.add(CreateGadgets);
1250 migrator.up(&db).await.expect("migrations should apply");
1251
1252 migrator
1253 .down(&db)
1254 .await
1255 .expect("migrations should rollback");
1256
1257 let applied = seaql_migrations::Entity::find()
1258 .all(&db)
1259 .await
1260 .expect("tracking rows should load");
1261 assert!(applied.is_empty());
1262 }
1263
1264 #[tokio::test]
1265 async fn wrapped_migration_rolls_back_side_effects_on_error() {
1266 let db = setup_db().await;
1267 db.execute_unprepared(
1268 "CREATE TABLE audit_entries (id INTEGER PRIMARY KEY AUTOINCREMENT, message TEXT NOT NULL)",
1269 )
1270 .await
1271 .expect("audit table should be created");
1272
1273 let mut migrator = Migrator::new();
1274 migrator.add(InsertAuditThenFailWrapped);
1275
1276 let error = migrator.up(&db).await.expect_err("migration should fail");
1277
1278 assert!(matches!(error, DbErr::Custom(message) if message == "wrapped migration failure"));
1279 assert_eq!(table_row_count(&db, "audit_entries").await, 0);
1280 }
1281
1282 #[tokio::test]
1283 async fn unwrapped_migration_leaves_side_effects_on_error() {
1284 let db = setup_db().await;
1285 db.execute_unprepared(
1286 "CREATE TABLE audit_entries (id INTEGER PRIMARY KEY AUTOINCREMENT, message TEXT NOT NULL)",
1287 )
1288 .await
1289 .expect("audit table should be created");
1290
1291 let mut migrator = Migrator::new();
1292 migrator.add(InsertAuditThenFailUnwrapped);
1293
1294 let error = migrator.up(&db).await.expect_err("migration should fail");
1295
1296 assert!(
1297 matches!(error, DbErr::Custom(message) if message == "unwrapped migration failure")
1298 );
1299 assert_eq!(table_row_count(&db, "audit_entries").await, 1);
1300 }
1301
1302 #[tokio::test]
1303 async fn reversible_rename_migration_restores_original_schema_on_down() {
1304 let db = setup_db().await;
1305 let manager = SchemaManager::new(&db);
1306 CreateWidgets
1307 .up(&manager)
1308 .await
1309 .expect("base table should be created");
1310
1311 let mut migrator = Migrator::new();
1312 migrator.add(RenameWidgetNameToTitle);
1313 migrator
1314 .up(&db)
1315 .await
1316 .expect("rename migration should apply");
1317 assert!(column_exists(&db, "widgets", "title").await);
1318
1319 migrator
1320 .down(&db)
1321 .await
1322 .expect("rename migration should rollback");
1323
1324 assert!(column_exists(&db, "widgets", "name").await);
1325 assert!(!column_exists(&db, "widgets", "title").await);
1326 }
1327
1328 #[test]
1329 fn should_wrap_in_transaction_honors_backend_defaults_and_overrides() {
1330 let postgres_default = TransactionPreferenceMigration { preference: None };
1331 let always_wrap = TransactionPreferenceMigration {
1332 preference: Some(true),
1333 };
1334 let never_wrap = TransactionPreferenceMigration {
1335 preference: Some(false),
1336 };
1337
1338 assert!(should_wrap_in_transaction(
1339 &postgres_default,
1340 DbBackend::Postgres
1341 ));
1342 assert!(!should_wrap_in_transaction(
1343 &postgres_default,
1344 DbBackend::Sqlite
1345 ));
1346 assert!(should_wrap_in_transaction(&always_wrap, DbBackend::Sqlite));
1347 assert!(!should_wrap_in_transaction(
1348 &never_wrap,
1349 DbBackend::Postgres
1350 ));
1351 }
1352
1353 #[tokio::test]
1354 async fn status_creates_tracking_table_without_recording_rows() {
1355 let db = setup_db().await;
1356 let migrator = Migrator::new();
1357
1358 let statuses = migrator.status(&db).await;
1359
1360 assert!(statuses.is_empty());
1361 assert!(
1362 table_exists(&db, "seaql_migrations")
1363 .await
1364 .expect("tracking table check should work")
1365 );
1366 assert!(
1367 applied_versions(&db)
1368 .await
1369 .expect("applied versions should load")
1370 .is_empty()
1371 );
1372 }
1373
1374 #[tokio::test]
1375 async fn down_skips_pending_migrations_and_rolls_back_only_applied_entries() {
1376 let db = setup_db().await;
1377 let mut up_migrator = Migrator::new();
1378 up_migrator.add(CreateWidgets);
1379 up_migrator
1380 .up(&db)
1381 .await
1382 .expect("base migration should apply");
1383
1384 let mut down_migrator = Migrator::new();
1385 down_migrator.add(CreateWidgets);
1386 down_migrator.add(CreateGadgets);
1387 down_migrator
1388 .down(&db)
1389 .await
1390 .expect("rollback should skip pending migrations");
1391
1392 assert!(
1393 !table_exists(&db, "widgets")
1394 .await
1395 .expect("widgets table check should work")
1396 );
1397 assert!(
1398 !table_exists(&db, "gadgets")
1399 .await
1400 .expect("gadgets table check should work")
1401 );
1402 assert!(
1403 applied_versions(&db)
1404 .await
1405 .expect("applied versions should load")
1406 .is_empty()
1407 );
1408 }
1409
1410 #[tokio::test]
1411 async fn up_stops_after_first_failure_and_skips_later_migrations() {
1412 let db = setup_db().await;
1413 db.execute_unprepared(
1414 "CREATE TABLE audit_entries (id INTEGER PRIMARY KEY AUTOINCREMENT, message TEXT NOT NULL)",
1415 )
1416 .await
1417 .expect("audit table should be created");
1418
1419 let mut migrator = Migrator::new();
1420 migrator.add(CreateWidgets);
1421 migrator.add(InsertAuditThenFailWrapped);
1422 migrator.add(CreateGadgets);
1423
1424 let error = migrator
1425 .up(&db)
1426 .await
1427 .expect_err("migration batch should fail");
1428
1429 assert!(matches!(error, DbErr::Custom(message) if message == "wrapped migration failure"));
1430 assert!(
1431 table_exists(&db, "widgets")
1432 .await
1433 .expect("widgets table check should work")
1434 );
1435 assert!(
1436 !table_exists(&db, "gadgets")
1437 .await
1438 .expect("gadgets table check should work")
1439 );
1440 assert_eq!(table_row_count(&db, "audit_entries").await, 0);
1441 assert_eq!(
1442 applied_versions(&db)
1443 .await
1444 .expect("applied versions should load"),
1445 std::collections::HashSet::from([CreateWidgets.name().to_owned()])
1446 );
1447 }
1448}