sea_orm_migration/
migrator.rs

1mod queries;
2use queries::*;
3
4use std::collections::HashSet;
5use std::fmt::Display;
6use std::future::Future;
7use std::pin::Pin;
8use std::time::SystemTime;
9use tracing::info;
10
11use sea_orm::sea_query::{
12    Alias, Expr, ExprTrait, ForeignKey, IntoIden, Order, Query, Table, extension::postgres::Type,
13};
14use sea_orm::{
15    ActiveValue, ConnectionTrait, DbBackend, DbErr, DynIden, EntityTrait, FromQueryResult,
16    Iterable, QueryFilter, Schema, Statement, TransactionTrait,
17};
18#[allow(unused_imports)]
19use sea_schema::probe::SchemaProbe;
20
21use super::{IntoSchemaManagerConnection, MigrationTrait, SchemaManager, seaql_migrations};
22
23#[derive(Copy, Clone, Debug, PartialEq, Eq)]
24/// Status of migration
25pub enum MigrationStatus {
26    /// Not yet applied
27    Pending,
28    /// Applied
29    Applied,
30}
31
32impl Display for MigrationStatus {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34        let status = match self {
35            MigrationStatus::Pending => "Pending",
36            MigrationStatus::Applied => "Applied",
37        };
38        write!(f, "{status}")
39    }
40}
41
42pub struct Migration {
43    migration: Box<dyn MigrationTrait>,
44    status: MigrationStatus,
45}
46
47impl Migration {
48    /// Get migration name from MigrationName trait implementation
49    pub fn name(&self) -> &str {
50        self.migration.name()
51    }
52
53    /// Get migration status
54    pub fn status(&self) -> MigrationStatus {
55        self.status
56    }
57}
58
59/// Performing migrations on a database
60#[async_trait::async_trait]
61pub trait MigratorTrait: Send {
62    /// Vector of migrations in time sequence
63    fn migrations() -> Vec<Box<dyn MigrationTrait>>;
64
65    /// Name of the migration table, it is `seaql_migrations` by default
66    fn migration_table_name() -> DynIden {
67        seaql_migrations::Entity.into_iden()
68    }
69
70    /// Get list of migrations wrapped in `Migration` struct
71    fn get_migration_files() -> Vec<Migration> {
72        Self::migrations()
73            .into_iter()
74            .map(|migration| Migration {
75                migration,
76                status: MigrationStatus::Pending,
77            })
78            .collect()
79    }
80
81    /// Get list of applied migrations from database
82    async fn get_migration_models<C>(db: &C) -> Result<Vec<seaql_migrations::Model>, DbErr>
83    where
84        C: ConnectionTrait,
85    {
86        Self::install(db).await?;
87        get_migration_models(db, Self::migration_table_name()).await
88    }
89
90    /// Get list of migrations with status
91    async fn get_migration_with_status<C>(db: &C) -> Result<Vec<Migration>, DbErr>
92    where
93        C: ConnectionTrait,
94    {
95        Self::install(db).await?;
96        get_migration_with_status(
97            Self::get_migration_files(),
98            Self::get_migration_models(db).await?,
99        )
100    }
101
102    /// Get list of pending migrations
103    async fn get_pending_migrations<C>(db: &C) -> Result<Vec<Migration>, DbErr>
104    where
105        C: ConnectionTrait,
106    {
107        Self::install(db).await?;
108        Ok(Self::get_migration_with_status(db)
109            .await?
110            .into_iter()
111            .filter(|file| file.status == MigrationStatus::Pending)
112            .collect())
113    }
114
115    /// Get list of applied migrations
116    async fn get_applied_migrations<C>(db: &C) -> Result<Vec<Migration>, DbErr>
117    where
118        C: ConnectionTrait,
119    {
120        Self::install(db).await?;
121        Ok(Self::get_migration_with_status(db)
122            .await?
123            .into_iter()
124            .filter(|file| file.status == MigrationStatus::Applied)
125            .collect())
126    }
127
128    /// Create migration table `seaql_migrations` in the database
129    async fn install<C>(db: &C) -> Result<(), DbErr>
130    where
131        C: ConnectionTrait,
132    {
133        install(db, Self::migration_table_name()).await
134    }
135
136    /// Check the status of all migrations
137    async fn status<C>(db: &C) -> Result<(), DbErr>
138    where
139        C: ConnectionTrait,
140    {
141        Self::install(db).await?;
142
143        info!("Checking migration status");
144
145        for Migration { migration, status } in Self::get_migration_with_status(db).await? {
146            info!("Migration '{}'... {}", migration.name(), status);
147        }
148
149        Ok(())
150    }
151
152    /// Drop all tables from the database, then reapply all migrations
153    async fn fresh<'c, C>(db: C) -> Result<(), DbErr>
154    where
155        C: IntoSchemaManagerConnection<'c>,
156    {
157        exec_with_connection::<'_, _, _>(db, move |manager| {
158            Box::pin(async move { exec_fresh::<Self>(manager).await })
159        })
160        .await
161    }
162
163    /// Rollback all applied migrations, then reapply all migrations
164    async fn refresh<'c, C>(db: C) -> Result<(), DbErr>
165    where
166        C: IntoSchemaManagerConnection<'c>,
167    {
168        exec_with_connection::<'_, _, _>(db, move |manager| {
169            Box::pin(async move {
170                exec_down::<Self>(manager, None).await?;
171                exec_up::<Self>(manager, None).await
172            })
173        })
174        .await
175    }
176
177    /// Rollback all applied migrations
178    async fn reset<'c, C>(db: C) -> Result<(), DbErr>
179    where
180        C: IntoSchemaManagerConnection<'c>,
181    {
182        exec_with_connection::<'_, _, _>(db, move |manager| {
183            Box::pin(async move {
184                // Rollback all applied migrations first
185                exec_down::<Self>(manager, None).await?;
186
187                // Then drop the migration table itself
188                uninstall(manager, Self::migration_table_name()).await?;
189
190                Ok(())
191            })
192        })
193        .await
194    }
195
196    /// Uninstall migration tracking table only (non-destructive)
197    /// This will drop the `seaql_migrations` table but won't rollback other schema changes.
198    async fn uninstall<'c, C>(db: C) -> Result<(), DbErr>
199    where
200        C: IntoSchemaManagerConnection<'c>,
201    {
202        exec_with_connection::<'_, _, _>(db, move |manager| {
203            Box::pin(uninstall(manager, Self::migration_table_name()))
204        })
205        .await
206    }
207
208    /// Apply pending migrations
209    async fn up<'c, C>(db: C, steps: Option<u32>) -> Result<(), DbErr>
210    where
211        C: IntoSchemaManagerConnection<'c>,
212    {
213        exec_with_connection::<'_, _, _>(db, move |manager| {
214            Box::pin(async move { exec_up::<Self>(manager, steps).await })
215        })
216        .await
217    }
218
219    /// Rollback applied migrations
220    async fn down<'c, C>(db: C, steps: Option<u32>) -> Result<(), DbErr>
221    where
222        C: IntoSchemaManagerConnection<'c>,
223    {
224        exec_with_connection::<'_, _, _>(db, move |manager| {
225            Box::pin(async move { exec_down::<Self>(manager, steps).await })
226        })
227        .await
228    }
229}
230
231async fn get_migration_models<C>(
232    db: &C,
233    migration_table_name: DynIden,
234) -> Result<Vec<seaql_migrations::Model>, DbErr>
235where
236    C: ConnectionTrait,
237{
238    let stmt = Query::select()
239        .table_name(migration_table_name)
240        .columns(seaql_migrations::Column::iter().map(IntoIden::into_iden))
241        .order_by(seaql_migrations::Column::Version, Order::Asc)
242        .to_owned();
243    let builder = db.get_database_backend();
244    seaql_migrations::Model::find_by_statement(builder.build(&stmt))
245        .all(db)
246        .await
247}
248
249fn get_migration_with_status(
250    migration_files: Vec<Migration>,
251    migration_models: Vec<seaql_migrations::Model>,
252) -> Result<Vec<Migration>, DbErr> {
253    let mut migration_files = migration_files;
254
255    let migration_in_db: HashSet<String> = migration_models
256        .into_iter()
257        .map(|model| model.version)
258        .collect();
259    let migration_in_fs: HashSet<String> = migration_files
260        .iter()
261        .map(|file| file.migration.name().to_string())
262        .collect();
263
264    let pending_migrations = &migration_in_fs - &migration_in_db;
265    for migration_file in migration_files.iter_mut() {
266        if !pending_migrations.contains(migration_file.migration.name()) {
267            migration_file.status = MigrationStatus::Applied;
268        }
269    }
270
271    let missing_migrations_in_fs = &migration_in_db - &migration_in_fs;
272    let errors: Vec<String> = missing_migrations_in_fs
273            .iter()
274            .map(|missing_migration| {
275                format!("Migration file of version '{missing_migration}' is missing, this migration has been applied but its file is missing")
276            }).collect();
277
278    if !errors.is_empty() {
279        Err(DbErr::Custom(errors.join("\n")))
280    } else {
281        Ok(migration_files)
282    }
283}
284
285async fn exec_with_connection<'c, C, F>(db: C, f: F) -> Result<(), DbErr>
286where
287    C: IntoSchemaManagerConnection<'c>,
288    F: for<'b> Fn(
289        &'b SchemaManager<'_>,
290    ) -> Pin<Box<dyn Future<Output = Result<(), DbErr>> + Send + 'b>>,
291{
292    let db = db.into_database_executor();
293
294    match db.get_database_backend() {
295        DbBackend::Postgres => {
296            let transaction = db.begin().await?;
297            let manager = SchemaManager::new(&transaction);
298            f(&manager).await?;
299            transaction.commit().await
300        }
301        DbBackend::MySql | DbBackend::Sqlite => {
302            let manager = SchemaManager::new(db);
303            f(&manager).await
304        }
305        db => Err(DbErr::BackendNotSupported {
306            db: db.as_str(),
307            ctx: "exec_with_connection",
308        }),
309    }
310}
311
312async fn install<C>(db: &C, migration_table_name: DynIden) -> Result<(), DbErr>
313where
314    C: ConnectionTrait,
315{
316    let builder = db.get_database_backend();
317    let schema = Schema::new(builder);
318    let mut stmt = schema
319        .create_table_from_entity(seaql_migrations::Entity)
320        .table_name(migration_table_name);
321    stmt.if_not_exists();
322    db.execute(&stmt).await?;
323    Ok(())
324}
325
326async fn uninstall(
327    manager: &SchemaManager<'_>,
328    migration_table_name: DynIden,
329) -> Result<(), DbErr> {
330    let mut stmt = Table::drop();
331    stmt.table(migration_table_name).if_exists().cascade();
332    manager.drop_table(stmt).await?;
333    Ok(())
334}
335
336async fn exec_fresh<M>(manager: &SchemaManager<'_>) -> Result<(), DbErr>
337where
338    M: MigratorTrait + ?Sized,
339{
340    let db = manager.get_connection();
341
342    M::install(db).await?;
343
344    drop_everything(db).await?;
345
346    exec_up::<M>(manager, None).await
347}
348
349async fn drop_everything<C: ConnectionTrait>(db: &C) -> Result<(), DbErr> {
350    let db_backend = db.get_database_backend();
351
352    // Temporarily disable the foreign key check
353    if db_backend == DbBackend::Sqlite {
354        info!("Disabling foreign key check");
355        db.execute_raw(Statement::from_string(
356            db_backend,
357            "PRAGMA foreign_keys = OFF".to_owned(),
358        ))
359        .await?;
360        info!("Foreign key check disabled");
361    }
362
363    // Drop all foreign keys
364    if db_backend == DbBackend::MySql {
365        info!("Dropping all foreign keys");
366        let stmt = query_mysql_foreign_keys(db);
367        let rows = db.query_all(&stmt).await?;
368        for row in rows.into_iter() {
369            let constraint_name: String = row.try_get("", "CONSTRAINT_NAME")?;
370            let table_name: String = row.try_get("", "TABLE_NAME")?;
371            info!(
372                "Dropping foreign key '{}' from table '{}'",
373                constraint_name, table_name
374            );
375            let mut stmt = ForeignKey::drop();
376            stmt.table(Alias::new(table_name.as_str()))
377                .name(constraint_name.as_str());
378            db.execute(&stmt).await?;
379            info!("Foreign key '{}' has been dropped", constraint_name);
380        }
381        info!("All foreign keys dropped");
382    }
383
384    // Drop all tables
385    let stmt = query_tables(db)?;
386    let rows = db.query_all(&stmt).await?;
387    for row in rows.into_iter() {
388        let table_name: String = row.try_get("", "table_name")?;
389        info!("Dropping table '{}'", table_name);
390        let mut stmt = Table::drop();
391        stmt.table(Alias::new(table_name.as_str()))
392            .if_exists()
393            .cascade();
394        db.execute(&stmt).await?;
395        info!("Table '{}' has been dropped", table_name);
396    }
397
398    // Drop all types
399    if db_backend == DbBackend::Postgres {
400        info!("Dropping all types");
401        let stmt = query_pg_types(db);
402        let rows = db.query_all(&stmt).await?;
403        for row in rows {
404            let type_name: String = row.try_get("", "typname")?;
405            info!("Dropping type '{}'", type_name);
406            let mut stmt = Type::drop();
407            stmt.name(Alias::new(&type_name));
408            db.execute(&stmt).await?;
409            info!("Type '{}' has been dropped", type_name);
410        }
411    }
412
413    // Restore the foreign key check
414    if db_backend == DbBackend::Sqlite {
415        info!("Restoring foreign key check");
416        db.execute_raw(Statement::from_string(
417            db_backend,
418            "PRAGMA foreign_keys = ON".to_owned(),
419        ))
420        .await?;
421        info!("Foreign key check restored");
422    }
423
424    Ok(())
425}
426
427async fn exec_up<M>(manager: &SchemaManager<'_>, steps: Option<u32>) -> Result<(), DbErr>
428where
429    M: MigratorTrait + ?Sized,
430{
431    let db = manager.get_connection();
432
433    M::install(db).await?;
434
435    exec_up_with(
436        manager,
437        steps,
438        M::get_pending_migrations(db).await?,
439        M::migration_table_name(),
440    )
441    .await
442}
443
444async fn exec_up_with(
445    manager: &SchemaManager<'_>,
446    mut steps: Option<u32>,
447    pending_migrations: Vec<Migration>,
448    migration_table_name: DynIden,
449) -> Result<(), DbErr> {
450    let db = manager.get_connection();
451
452    if let Some(steps) = steps {
453        info!("Applying {} pending migrations", steps);
454    } else {
455        info!("Applying all pending migrations");
456    }
457    if pending_migrations.is_empty() {
458        info!("No pending migrations");
459    }
460
461    for Migration { migration, .. } in pending_migrations {
462        if let Some(steps) = steps.as_mut() {
463            if steps == &0 {
464                break;
465            }
466            *steps -= 1;
467        }
468        info!("Applying migration '{}'", migration.name());
469        migration.up(manager).await?;
470        info!("Migration '{}' has been applied", migration.name());
471        let now = SystemTime::now()
472            .duration_since(SystemTime::UNIX_EPOCH)
473            .expect("SystemTime before UNIX EPOCH!");
474        seaql_migrations::Entity::insert(seaql_migrations::ActiveModel {
475            version: ActiveValue::Set(migration.name().to_owned()),
476            applied_at: ActiveValue::Set(now.as_secs() as i64),
477        })
478        .table_name(migration_table_name.clone())
479        .exec(db)
480        .await?;
481    }
482
483    Ok(())
484}
485
486async fn exec_down<M>(manager: &SchemaManager<'_>, steps: Option<u32>) -> Result<(), DbErr>
487where
488    M: MigratorTrait + ?Sized,
489{
490    let db = manager.get_connection();
491
492    M::install(db).await?;
493
494    exec_down_with(
495        manager,
496        steps,
497        M::get_applied_migrations(db).await?,
498        M::migration_table_name(),
499    )
500    .await
501}
502
503async fn exec_down_with(
504    manager: &SchemaManager<'_>,
505    mut steps: Option<u32>,
506    applied_migrations: Vec<Migration>,
507    migration_table_name: DynIden,
508) -> Result<(), DbErr> {
509    let db = manager.get_connection();
510
511    if let Some(steps) = steps {
512        info!("Rolling back {} applied migrations", steps);
513    } else {
514        info!("Rolling back all applied migrations");
515    }
516    if applied_migrations.is_empty() {
517        info!("No applied migrations");
518    }
519
520    for Migration { migration, .. } in applied_migrations.into_iter().rev() {
521        if let Some(steps) = steps.as_mut() {
522            if steps == &0 {
523                break;
524            }
525            *steps -= 1;
526        }
527        info!("Rolling back migration '{}'", migration.name());
528        migration.down(manager).await?;
529        info!("Migration '{}' has been rollbacked", migration.name());
530        seaql_migrations::Entity::delete_many()
531            .filter(Expr::col(seaql_migrations::Column::Version).eq(migration.name()))
532            .table_name(migration_table_name.clone())
533            .exec(db)
534            .await?;
535    }
536
537    Ok(())
538}