sea_orm_migration/
migrator.rs

1use std::collections::HashSet;
2use std::fmt::Display;
3use std::future::Future;
4use std::pin::Pin;
5use std::time::SystemTime;
6use tracing::info;
7
8use sea_orm::sea_query::{
9    self, extension::postgres::Type, Alias, Expr, ExprTrait, ForeignKey, IntoIden, JoinType, Order,
10    Query, SelectStatement, SimpleExpr, Table,
11};
12use sea_orm::{
13    ActiveModelTrait, ActiveValue, Condition, ConnectionTrait, DbBackend, DbErr, DeriveIden,
14    DynIden, EntityTrait, FromQueryResult, Iterable, QueryFilter, Schema, Statement,
15    TransactionTrait,
16};
17#[allow(unused_imports)]
18use sea_schema::probe::SchemaProbe;
19
20use super::{seaql_migrations, IntoSchemaManagerConnection, MigrationTrait, SchemaManager};
21
22#[derive(Copy, Clone, Debug, PartialEq, Eq)]
23/// Status of migration
24pub enum MigrationStatus {
25    /// Not yet applied
26    Pending,
27    /// Applied
28    Applied,
29}
30
31impl Display for MigrationStatus {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        let status = match self {
34            MigrationStatus::Pending => "Pending",
35            MigrationStatus::Applied => "Applied",
36        };
37        write!(f, "{status}")
38    }
39}
40
41pub struct Migration {
42    migration: Box<dyn MigrationTrait>,
43    status: MigrationStatus,
44}
45
46impl Migration {
47    /// Get migration name from MigrationName trait implementation
48    pub fn name(&self) -> &str {
49        self.migration.name()
50    }
51
52    /// Get migration status
53    pub fn status(&self) -> MigrationStatus {
54        self.status
55    }
56}
57
58/// Performing migrations on a database
59#[async_trait::async_trait]
60pub trait MigratorTrait: Send {
61    /// Vector of migrations in time sequence
62    fn migrations() -> Vec<Box<dyn MigrationTrait>>;
63
64    /// Name of the migration table, it is `seaql_migrations` by default
65    fn migration_table_name() -> DynIden {
66        seaql_migrations::Entity.into_iden()
67    }
68
69    /// Get list of migrations wrapped in `Migration` struct
70    fn get_migration_files() -> Vec<Migration> {
71        Self::migrations()
72            .into_iter()
73            .map(|migration| Migration {
74                migration,
75                status: MigrationStatus::Pending,
76            })
77            .collect()
78    }
79
80    /// Get list of applied migrations from database
81    async fn get_migration_models<C>(db: &C) -> Result<Vec<seaql_migrations::Model>, DbErr>
82    where
83        C: ConnectionTrait,
84    {
85        Self::install(db).await?;
86        let stmt = Query::select()
87            .table_name(Self::migration_table_name())
88            .columns(seaql_migrations::Column::iter().map(IntoIden::into_iden))
89            .order_by(seaql_migrations::Column::Version, Order::Asc)
90            .to_owned();
91        let builder = db.get_database_backend();
92        seaql_migrations::Model::find_by_statement(builder.build(&stmt))
93            .all(db)
94            .await
95    }
96
97    /// Get list of migrations with status
98    async fn get_migration_with_status<C>(db: &C) -> Result<Vec<Migration>, DbErr>
99    where
100        C: ConnectionTrait,
101    {
102        Self::install(db).await?;
103        let mut migration_files = Self::get_migration_files();
104        let migration_models = Self::get_migration_models(db).await?;
105
106        let migration_in_db: HashSet<String> = migration_models
107            .into_iter()
108            .map(|model| model.version)
109            .collect();
110        let migration_in_fs: HashSet<String> = migration_files
111            .iter()
112            .map(|file| file.migration.name().to_string())
113            .collect();
114
115        let pending_migrations = &migration_in_fs - &migration_in_db;
116        for migration_file in migration_files.iter_mut() {
117            if !pending_migrations.contains(migration_file.migration.name()) {
118                migration_file.status = MigrationStatus::Applied;
119            }
120        }
121
122        let missing_migrations_in_fs = &migration_in_db - &migration_in_fs;
123        let errors: Vec<String> = missing_migrations_in_fs
124            .iter()
125            .map(|missing_migration| {
126                format!("Migration file of version '{missing_migration}' is missing, this migration has been applied but its file is missing")
127            }).collect();
128
129        if !errors.is_empty() {
130            Err(DbErr::Custom(errors.join("\n")))
131        } else {
132            Ok(migration_files)
133        }
134    }
135
136    /// Get list of pending migrations
137    async fn get_pending_migrations<C>(db: &C) -> Result<Vec<Migration>, DbErr>
138    where
139        C: ConnectionTrait,
140    {
141        Self::install(db).await?;
142        Ok(Self::get_migration_with_status(db)
143            .await?
144            .into_iter()
145            .filter(|file| file.status == MigrationStatus::Pending)
146            .collect())
147    }
148
149    /// Get list of applied migrations
150    async fn get_applied_migrations<C>(db: &C) -> Result<Vec<Migration>, DbErr>
151    where
152        C: ConnectionTrait,
153    {
154        Self::install(db).await?;
155        Ok(Self::get_migration_with_status(db)
156            .await?
157            .into_iter()
158            .filter(|file| file.status == MigrationStatus::Applied)
159            .collect())
160    }
161
162    /// Create migration table `seaql_migrations` in the database
163    async fn install<C>(db: &C) -> Result<(), DbErr>
164    where
165        C: ConnectionTrait,
166    {
167        let builder = db.get_database_backend();
168        let table_name = Self::migration_table_name();
169        let schema = Schema::new(builder);
170        let mut stmt = schema
171            .create_table_from_entity(seaql_migrations::Entity)
172            .table_name(table_name);
173        stmt.if_not_exists();
174        db.execute(builder.build(&stmt)).await.map(|_| ())
175    }
176
177    /// Check the status of all migrations
178    async fn status<C>(db: &C) -> Result<(), DbErr>
179    where
180        C: ConnectionTrait,
181    {
182        Self::install(db).await?;
183
184        info!("Checking migration status");
185
186        for Migration { migration, status } in Self::get_migration_with_status(db).await? {
187            info!("Migration '{}'... {}", migration.name(), status);
188        }
189
190        Ok(())
191    }
192
193    /// Drop all tables from the database, then reapply all migrations
194    async fn fresh<'c, C>(db: C) -> Result<(), DbErr>
195    where
196        C: IntoSchemaManagerConnection<'c>,
197    {
198        exec_with_connection::<'_, _, _>(db, move |manager| {
199            Box::pin(async move { exec_fresh::<Self>(manager).await })
200        })
201        .await
202    }
203
204    /// Rollback all applied migrations, then reapply all migrations
205    async fn refresh<'c, C>(db: C) -> Result<(), DbErr>
206    where
207        C: IntoSchemaManagerConnection<'c>,
208    {
209        exec_with_connection::<'_, _, _>(db, move |manager| {
210            Box::pin(async move {
211                exec_down::<Self>(manager, None).await?;
212                exec_up::<Self>(manager, None).await
213            })
214        })
215        .await
216    }
217
218    /// Rollback all applied migrations
219    async fn reset<'c, C>(db: C) -> Result<(), DbErr>
220    where
221        C: IntoSchemaManagerConnection<'c>,
222    {
223        exec_with_connection::<'_, _, _>(db, move |manager| {
224            Box::pin(async move { exec_down::<Self>(manager, None).await })
225        })
226        .await
227    }
228
229    /// Apply pending migrations
230    async fn up<'c, C>(db: C, steps: Option<u32>) -> Result<(), DbErr>
231    where
232        C: IntoSchemaManagerConnection<'c>,
233    {
234        exec_with_connection::<'_, _, _>(db, move |manager| {
235            Box::pin(async move { exec_up::<Self>(manager, steps).await })
236        })
237        .await
238    }
239
240    /// Rollback applied migrations
241    async fn down<'c, C>(db: C, steps: Option<u32>) -> Result<(), DbErr>
242    where
243        C: IntoSchemaManagerConnection<'c>,
244    {
245        exec_with_connection::<'_, _, _>(db, move |manager| {
246            Box::pin(async move { exec_down::<Self>(manager, steps).await })
247        })
248        .await
249    }
250}
251
252async fn exec_with_connection<'c, C, F>(db: C, f: F) -> Result<(), DbErr>
253where
254    C: IntoSchemaManagerConnection<'c>,
255    F: for<'b> Fn(
256        &'b SchemaManager<'_>,
257    ) -> Pin<Box<dyn Future<Output = Result<(), DbErr>> + Send + 'b>>,
258{
259    let db = db.into_schema_manager_connection();
260
261    match db.get_database_backend() {
262        DbBackend::Postgres => {
263            let transaction = db.begin().await?;
264            let manager = SchemaManager::new(&transaction);
265            f(&manager).await?;
266            transaction.commit().await
267        }
268        DbBackend::MySql | DbBackend::Sqlite => {
269            let manager = SchemaManager::new(db);
270            f(&manager).await
271        }
272    }
273}
274
275async fn exec_fresh<M>(manager: &SchemaManager<'_>) -> Result<(), DbErr>
276where
277    M: MigratorTrait + ?Sized,
278{
279    let db = manager.get_connection();
280
281    M::install(db).await?;
282    let db_backend = db.get_database_backend();
283
284    // Temporarily disable the foreign key check
285    if db_backend == DbBackend::Sqlite {
286        info!("Disabling foreign key check");
287        db.execute(Statement::from_string(
288            db_backend,
289            "PRAGMA foreign_keys = OFF".to_owned(),
290        ))
291        .await?;
292        info!("Foreign key check disabled");
293    }
294
295    // Drop all foreign keys
296    if db_backend == DbBackend::MySql {
297        info!("Dropping all foreign keys");
298        let stmt = query_mysql_foreign_keys(db);
299        let rows = db.query_all(db_backend.build(&stmt)).await?;
300        for row in rows.into_iter() {
301            let constraint_name: String = row.try_get("", "CONSTRAINT_NAME")?;
302            let table_name: String = row.try_get("", "TABLE_NAME")?;
303            info!(
304                "Dropping foreign key '{}' from table '{}'",
305                constraint_name, table_name
306            );
307            let mut stmt = ForeignKey::drop();
308            stmt.table(Alias::new(table_name.as_str()))
309                .name(constraint_name.as_str());
310            db.execute(db_backend.build(&stmt)).await?;
311            info!("Foreign key '{}' has been dropped", constraint_name);
312        }
313        info!("All foreign keys dropped");
314    }
315
316    // Drop all tables
317    let stmt = query_tables(db).await;
318    let rows = db.query_all(db_backend.build(&stmt)).await?;
319    for row in rows.into_iter() {
320        let table_name: String = row.try_get("", "table_name")?;
321        info!("Dropping table '{}'", table_name);
322        let mut stmt = Table::drop();
323        stmt.table(Alias::new(table_name.as_str()))
324            .if_exists()
325            .cascade();
326        db.execute(db_backend.build(&stmt)).await?;
327        info!("Table '{}' has been dropped", table_name);
328    }
329
330    // Drop all types
331    if db_backend == DbBackend::Postgres {
332        info!("Dropping all types");
333        let stmt = query_pg_types(db);
334        let rows = db.query_all(db_backend.build(&stmt)).await?;
335        for row in rows {
336            let type_name: String = row.try_get("", "typname")?;
337            info!("Dropping type '{}'", type_name);
338            let mut stmt = Type::drop();
339            stmt.name(Alias::new(&type_name));
340            db.execute(db_backend.build(&stmt)).await?;
341            info!("Type '{}' has been dropped", type_name);
342        }
343    }
344
345    // Restore the foreign key check
346    if db_backend == DbBackend::Sqlite {
347        info!("Restoring foreign key check");
348        db.execute(Statement::from_string(
349            db_backend,
350            "PRAGMA foreign_keys = ON".to_owned(),
351        ))
352        .await?;
353        info!("Foreign key check restored");
354    }
355
356    // Reapply all migrations
357    exec_up::<M>(manager, None).await
358}
359
360async fn exec_up<M>(manager: &SchemaManager<'_>, mut steps: Option<u32>) -> Result<(), DbErr>
361where
362    M: MigratorTrait + ?Sized,
363{
364    let db = manager.get_connection();
365
366    M::install(db).await?;
367
368    if let Some(steps) = steps {
369        info!("Applying {} pending migrations", steps);
370    } else {
371        info!("Applying all pending migrations");
372    }
373
374    let migrations = M::get_pending_migrations(db).await?.into_iter();
375    if migrations.len() == 0 {
376        info!("No pending migrations");
377    }
378    for Migration { migration, .. } in migrations {
379        if let Some(steps) = steps.as_mut() {
380            if steps == &0 {
381                break;
382            }
383            *steps -= 1;
384        }
385        info!("Applying migration '{}'", migration.name());
386        migration.up(manager).await?;
387        info!("Migration '{}' has been applied", migration.name());
388        let now = SystemTime::now()
389            .duration_since(SystemTime::UNIX_EPOCH)
390            .expect("SystemTime before UNIX EPOCH!");
391        seaql_migrations::Entity::insert(seaql_migrations::ActiveModel {
392            version: ActiveValue::Set(migration.name().to_owned()),
393            applied_at: ActiveValue::Set(now.as_secs() as i64),
394        })
395        .table_name(M::migration_table_name())
396        .exec(db)
397        .await?;
398    }
399
400    Ok(())
401}
402
403async fn exec_down<M>(manager: &SchemaManager<'_>, mut steps: Option<u32>) -> Result<(), DbErr>
404where
405    M: MigratorTrait + ?Sized,
406{
407    let db = manager.get_connection();
408
409    M::install(db).await?;
410
411    if let Some(steps) = steps {
412        info!("Rolling back {} applied migrations", steps);
413    } else {
414        info!("Rolling back all applied migrations");
415    }
416
417    let migrations = M::get_applied_migrations(db).await?.into_iter().rev();
418    if migrations.len() == 0 {
419        info!("No applied migrations");
420    }
421    for Migration { migration, .. } in migrations {
422        if let Some(steps) = steps.as_mut() {
423            if steps == &0 {
424                break;
425            }
426            *steps -= 1;
427        }
428        info!("Rolling back migration '{}'", migration.name());
429        migration.down(manager).await?;
430        info!("Migration '{}' has been rollbacked", migration.name());
431        seaql_migrations::Entity::delete_many()
432            .filter(Expr::col(seaql_migrations::Column::Version).eq(migration.name()))
433            .table_name(M::migration_table_name())
434            .exec(db)
435            .await?;
436    }
437
438    Ok(())
439}
440
441async fn query_tables<C>(db: &C) -> SelectStatement
442where
443    C: ConnectionTrait,
444{
445    match db.get_database_backend() {
446        #[cfg(feature = "sqlx-mysql")]
447        DbBackend::MySql => sea_schema::mysql::MySql.query_tables(),
448        #[cfg(feature = "sqlx-postgres")]
449        DbBackend::Postgres => sea_schema::postgres::Postgres.query_tables(),
450        #[cfg(feature = "sqlx-sqlite")]
451        DbBackend::Sqlite => sea_schema::sqlite::Sqlite.query_tables(),
452        #[allow(unreachable_patterns)]
453        other => panic!("{other:?} feature is off"),
454    }
455}
456
457fn get_current_schema<C>(db: &C) -> SimpleExpr
458where
459    C: ConnectionTrait,
460{
461    match db.get_database_backend() {
462        #[cfg(feature = "sqlx-mysql")]
463        DbBackend::MySql => sea_schema::mysql::MySql::get_current_schema(),
464        #[cfg(feature = "sqlx-postgres")]
465        DbBackend::Postgres => sea_schema::postgres::Postgres::get_current_schema(),
466        #[cfg(feature = "sqlx-sqlite")]
467        DbBackend::Sqlite => sea_schema::sqlite::Sqlite::get_current_schema(),
468        #[allow(unreachable_patterns)]
469        other => panic!("{other:?} feature is off"),
470    }
471}
472
473#[derive(DeriveIden)]
474enum InformationSchema {
475    #[sea_orm(iden = "information_schema")]
476    Schema,
477    #[sea_orm(iden = "TABLE_NAME")]
478    TableName,
479    #[sea_orm(iden = "CONSTRAINT_NAME")]
480    ConstraintName,
481    TableConstraints,
482    TableSchema,
483    ConstraintType,
484}
485
486fn query_mysql_foreign_keys<C>(db: &C) -> SelectStatement
487where
488    C: ConnectionTrait,
489{
490    let mut stmt = Query::select();
491    stmt.columns([
492        InformationSchema::TableName,
493        InformationSchema::ConstraintName,
494    ])
495    .from((
496        InformationSchema::Schema,
497        InformationSchema::TableConstraints,
498    ))
499    .cond_where(
500        Condition::all()
501            .add(get_current_schema(db).equals((
502                InformationSchema::TableConstraints,
503                InformationSchema::TableSchema,
504            )))
505            .add(
506                Expr::col((
507                    InformationSchema::TableConstraints,
508                    InformationSchema::ConstraintType,
509                ))
510                .eq("FOREIGN KEY"),
511            ),
512    );
513    stmt
514}
515
516#[derive(DeriveIden)]
517enum PgType {
518    Table,
519    Typname,
520    Typnamespace,
521    Typelem,
522}
523
524#[derive(DeriveIden)]
525enum PgNamespace {
526    Table,
527    Oid,
528    Nspname,
529}
530
531fn query_pg_types<C>(db: &C) -> SelectStatement
532where
533    C: ConnectionTrait,
534{
535    let mut stmt = Query::select();
536    stmt.column(PgType::Typname)
537        .from(PgType::Table)
538        .join(
539            JoinType::LeftJoin,
540            PgNamespace::Table,
541            Expr::col((PgNamespace::Table, PgNamespace::Oid))
542                .equals((PgType::Table, PgType::Typnamespace)),
543        )
544        .cond_where(
545            Condition::all()
546                .add(get_current_schema(db).equals((PgNamespace::Table, PgNamespace::Nspname)))
547                .add(Expr::col((PgType::Table, PgType::Typelem)).eq(0)),
548        );
549    stmt
550}
551
552trait QueryTable {
553    type Statement;
554
555    fn table_name(self, table_name: DynIden) -> Self::Statement;
556}
557
558impl QueryTable for SelectStatement {
559    type Statement = SelectStatement;
560
561    fn table_name(mut self, table_name: DynIden) -> SelectStatement {
562        self.from(table_name);
563        self
564    }
565}
566
567impl QueryTable for sea_query::TableCreateStatement {
568    type Statement = sea_query::TableCreateStatement;
569
570    fn table_name(mut self, table_name: DynIden) -> sea_query::TableCreateStatement {
571        self.table(table_name);
572        self
573    }
574}
575
576impl<A> QueryTable for sea_orm::Insert<A>
577where
578    A: ActiveModelTrait,
579{
580    type Statement = sea_orm::Insert<A>;
581
582    fn table_name(mut self, table_name: DynIden) -> sea_orm::Insert<A> {
583        sea_orm::QueryTrait::query(&mut self).into_table(table_name);
584        self
585    }
586}
587
588impl<E> QueryTable for sea_orm::DeleteMany<E>
589where
590    E: EntityTrait,
591{
592    type Statement = sea_orm::DeleteMany<E>;
593
594    fn table_name(mut self, table_name: DynIden) -> sea_orm::DeleteMany<E> {
595        sea_orm::QueryTrait::query(&mut self).from_table(table_name);
596        self
597    }
598}