1use std::collections::HashSet;
2use std::fmt::Display;
3use std::future::Future;
4use std::pin::Pin;
5use std::time::SystemTime;
6use tracing::info;
7
8use sea_orm::sea_query::{
9 self, extension::postgres::Type, Alias, Expr, ExprTrait, ForeignKey, IntoIden, JoinType, Order,
10 Query, SelectStatement, SimpleExpr, Table,
11};
12use sea_orm::{
13 ActiveModelTrait, ActiveValue, Condition, ConnectionTrait, DbBackend, DbErr, DeriveIden,
14 DynIden, EntityTrait, FromQueryResult, Iterable, QueryFilter, Schema, Statement,
15 TransactionTrait,
16};
17#[allow(unused_imports)]
18use sea_schema::probe::SchemaProbe;
19
20use super::{seaql_migrations, IntoSchemaManagerConnection, MigrationTrait, SchemaManager};
21
22#[derive(Copy, Clone, Debug, PartialEq, Eq)]
23pub enum MigrationStatus {
25 Pending,
27 Applied,
29}
30
31impl Display for MigrationStatus {
32 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33 let status = match self {
34 MigrationStatus::Pending => "Pending",
35 MigrationStatus::Applied => "Applied",
36 };
37 write!(f, "{status}")
38 }
39}
40
41pub struct Migration {
42 migration: Box<dyn MigrationTrait>,
43 status: MigrationStatus,
44}
45
46impl Migration {
47 pub fn name(&self) -> &str {
49 self.migration.name()
50 }
51
52 pub fn status(&self) -> MigrationStatus {
54 self.status
55 }
56}
57
58#[async_trait::async_trait]
60pub trait MigratorTrait: Send {
61 fn migrations() -> Vec<Box<dyn MigrationTrait>>;
63
64 fn migration_table_name() -> DynIden {
66 seaql_migrations::Entity.into_iden()
67 }
68
69 fn get_migration_files() -> Vec<Migration> {
71 Self::migrations()
72 .into_iter()
73 .map(|migration| Migration {
74 migration,
75 status: MigrationStatus::Pending,
76 })
77 .collect()
78 }
79
80 async fn get_migration_models<C>(db: &C) -> Result<Vec<seaql_migrations::Model>, DbErr>
82 where
83 C: ConnectionTrait,
84 {
85 Self::install(db).await?;
86 let stmt = Query::select()
87 .table_name(Self::migration_table_name())
88 .columns(seaql_migrations::Column::iter().map(IntoIden::into_iden))
89 .order_by(seaql_migrations::Column::Version, Order::Asc)
90 .to_owned();
91 let builder = db.get_database_backend();
92 seaql_migrations::Model::find_by_statement(builder.build(&stmt))
93 .all(db)
94 .await
95 }
96
97 async fn get_migration_with_status<C>(db: &C) -> Result<Vec<Migration>, DbErr>
99 where
100 C: ConnectionTrait,
101 {
102 Self::install(db).await?;
103 let mut migration_files = Self::get_migration_files();
104 let migration_models = Self::get_migration_models(db).await?;
105
106 let migration_in_db: HashSet<String> = migration_models
107 .into_iter()
108 .map(|model| model.version)
109 .collect();
110 let migration_in_fs: HashSet<String> = migration_files
111 .iter()
112 .map(|file| file.migration.name().to_string())
113 .collect();
114
115 let pending_migrations = &migration_in_fs - &migration_in_db;
116 for migration_file in migration_files.iter_mut() {
117 if !pending_migrations.contains(migration_file.migration.name()) {
118 migration_file.status = MigrationStatus::Applied;
119 }
120 }
121
122 let missing_migrations_in_fs = &migration_in_db - &migration_in_fs;
123 let errors: Vec<String> = missing_migrations_in_fs
124 .iter()
125 .map(|missing_migration| {
126 format!("Migration file of version '{missing_migration}' is missing, this migration has been applied but its file is missing")
127 }).collect();
128
129 if !errors.is_empty() {
130 Err(DbErr::Custom(errors.join("\n")))
131 } else {
132 Ok(migration_files)
133 }
134 }
135
136 async fn get_pending_migrations<C>(db: &C) -> Result<Vec<Migration>, DbErr>
138 where
139 C: ConnectionTrait,
140 {
141 Self::install(db).await?;
142 Ok(Self::get_migration_with_status(db)
143 .await?
144 .into_iter()
145 .filter(|file| file.status == MigrationStatus::Pending)
146 .collect())
147 }
148
149 async fn get_applied_migrations<C>(db: &C) -> Result<Vec<Migration>, DbErr>
151 where
152 C: ConnectionTrait,
153 {
154 Self::install(db).await?;
155 Ok(Self::get_migration_with_status(db)
156 .await?
157 .into_iter()
158 .filter(|file| file.status == MigrationStatus::Applied)
159 .collect())
160 }
161
162 async fn install<C>(db: &C) -> Result<(), DbErr>
164 where
165 C: ConnectionTrait,
166 {
167 let builder = db.get_database_backend();
168 let table_name = Self::migration_table_name();
169 let schema = Schema::new(builder);
170 let mut stmt = schema
171 .create_table_from_entity(seaql_migrations::Entity)
172 .table_name(table_name);
173 stmt.if_not_exists();
174 db.execute(builder.build(&stmt)).await.map(|_| ())
175 }
176
177 async fn status<C>(db: &C) -> Result<(), DbErr>
179 where
180 C: ConnectionTrait,
181 {
182 Self::install(db).await?;
183
184 info!("Checking migration status");
185
186 for Migration { migration, status } in Self::get_migration_with_status(db).await? {
187 info!("Migration '{}'... {}", migration.name(), status);
188 }
189
190 Ok(())
191 }
192
193 async fn fresh<'c, C>(db: C) -> Result<(), DbErr>
195 where
196 C: IntoSchemaManagerConnection<'c>,
197 {
198 exec_with_connection::<'_, _, _>(db, move |manager| {
199 Box::pin(async move { exec_fresh::<Self>(manager).await })
200 })
201 .await
202 }
203
204 async fn refresh<'c, C>(db: C) -> Result<(), DbErr>
206 where
207 C: IntoSchemaManagerConnection<'c>,
208 {
209 exec_with_connection::<'_, _, _>(db, move |manager| {
210 Box::pin(async move {
211 exec_down::<Self>(manager, None).await?;
212 exec_up::<Self>(manager, None).await
213 })
214 })
215 .await
216 }
217
218 async fn reset<'c, C>(db: C) -> Result<(), DbErr>
220 where
221 C: IntoSchemaManagerConnection<'c>,
222 {
223 exec_with_connection::<'_, _, _>(db, move |manager| {
224 Box::pin(async move { exec_down::<Self>(manager, None).await })
225 })
226 .await
227 }
228
229 async fn up<'c, C>(db: C, steps: Option<u32>) -> Result<(), DbErr>
231 where
232 C: IntoSchemaManagerConnection<'c>,
233 {
234 exec_with_connection::<'_, _, _>(db, move |manager| {
235 Box::pin(async move { exec_up::<Self>(manager, steps).await })
236 })
237 .await
238 }
239
240 async fn down<'c, C>(db: C, steps: Option<u32>) -> Result<(), DbErr>
242 where
243 C: IntoSchemaManagerConnection<'c>,
244 {
245 exec_with_connection::<'_, _, _>(db, move |manager| {
246 Box::pin(async move { exec_down::<Self>(manager, steps).await })
247 })
248 .await
249 }
250}
251
252async fn exec_with_connection<'c, C, F>(db: C, f: F) -> Result<(), DbErr>
253where
254 C: IntoSchemaManagerConnection<'c>,
255 F: for<'b> Fn(
256 &'b SchemaManager<'_>,
257 ) -> Pin<Box<dyn Future<Output = Result<(), DbErr>> + Send + 'b>>,
258{
259 let db = db.into_schema_manager_connection();
260
261 match db.get_database_backend() {
262 DbBackend::Postgres => {
263 let transaction = db.begin().await?;
264 let manager = SchemaManager::new(&transaction);
265 f(&manager).await?;
266 transaction.commit().await
267 }
268 DbBackend::MySql | DbBackend::Sqlite => {
269 let manager = SchemaManager::new(db);
270 f(&manager).await
271 }
272 }
273}
274
275async fn exec_fresh<M>(manager: &SchemaManager<'_>) -> Result<(), DbErr>
276where
277 M: MigratorTrait + ?Sized,
278{
279 let db = manager.get_connection();
280
281 M::install(db).await?;
282 let db_backend = db.get_database_backend();
283
284 if db_backend == DbBackend::Sqlite {
286 info!("Disabling foreign key check");
287 db.execute(Statement::from_string(
288 db_backend,
289 "PRAGMA foreign_keys = OFF".to_owned(),
290 ))
291 .await?;
292 info!("Foreign key check disabled");
293 }
294
295 if db_backend == DbBackend::MySql {
297 info!("Dropping all foreign keys");
298 let stmt = query_mysql_foreign_keys(db);
299 let rows = db.query_all(db_backend.build(&stmt)).await?;
300 for row in rows.into_iter() {
301 let constraint_name: String = row.try_get("", "CONSTRAINT_NAME")?;
302 let table_name: String = row.try_get("", "TABLE_NAME")?;
303 info!(
304 "Dropping foreign key '{}' from table '{}'",
305 constraint_name, table_name
306 );
307 let mut stmt = ForeignKey::drop();
308 stmt.table(Alias::new(table_name.as_str()))
309 .name(constraint_name.as_str());
310 db.execute(db_backend.build(&stmt)).await?;
311 info!("Foreign key '{}' has been dropped", constraint_name);
312 }
313 info!("All foreign keys dropped");
314 }
315
316 let stmt = query_tables(db).await;
318 let rows = db.query_all(db_backend.build(&stmt)).await?;
319 for row in rows.into_iter() {
320 let table_name: String = row.try_get("", "table_name")?;
321 info!("Dropping table '{}'", table_name);
322 let mut stmt = Table::drop();
323 stmt.table(Alias::new(table_name.as_str()))
324 .if_exists()
325 .cascade();
326 db.execute(db_backend.build(&stmt)).await?;
327 info!("Table '{}' has been dropped", table_name);
328 }
329
330 if db_backend == DbBackend::Postgres {
332 info!("Dropping all types");
333 let stmt = query_pg_types(db);
334 let rows = db.query_all(db_backend.build(&stmt)).await?;
335 for row in rows {
336 let type_name: String = row.try_get("", "typname")?;
337 info!("Dropping type '{}'", type_name);
338 let mut stmt = Type::drop();
339 stmt.name(Alias::new(&type_name));
340 db.execute(db_backend.build(&stmt)).await?;
341 info!("Type '{}' has been dropped", type_name);
342 }
343 }
344
345 if db_backend == DbBackend::Sqlite {
347 info!("Restoring foreign key check");
348 db.execute(Statement::from_string(
349 db_backend,
350 "PRAGMA foreign_keys = ON".to_owned(),
351 ))
352 .await?;
353 info!("Foreign key check restored");
354 }
355
356 exec_up::<M>(manager, None).await
358}
359
360async fn exec_up<M>(manager: &SchemaManager<'_>, mut steps: Option<u32>) -> Result<(), DbErr>
361where
362 M: MigratorTrait + ?Sized,
363{
364 let db = manager.get_connection();
365
366 M::install(db).await?;
367
368 if let Some(steps) = steps {
369 info!("Applying {} pending migrations", steps);
370 } else {
371 info!("Applying all pending migrations");
372 }
373
374 let migrations = M::get_pending_migrations(db).await?.into_iter();
375 if migrations.len() == 0 {
376 info!("No pending migrations");
377 }
378 for Migration { migration, .. } in migrations {
379 if let Some(steps) = steps.as_mut() {
380 if steps == &0 {
381 break;
382 }
383 *steps -= 1;
384 }
385 info!("Applying migration '{}'", migration.name());
386 migration.up(manager).await?;
387 info!("Migration '{}' has been applied", migration.name());
388 let now = SystemTime::now()
389 .duration_since(SystemTime::UNIX_EPOCH)
390 .expect("SystemTime before UNIX EPOCH!");
391 seaql_migrations::Entity::insert(seaql_migrations::ActiveModel {
392 version: ActiveValue::Set(migration.name().to_owned()),
393 applied_at: ActiveValue::Set(now.as_secs() as i64),
394 })
395 .table_name(M::migration_table_name())
396 .exec(db)
397 .await?;
398 }
399
400 Ok(())
401}
402
403async fn exec_down<M>(manager: &SchemaManager<'_>, mut steps: Option<u32>) -> Result<(), DbErr>
404where
405 M: MigratorTrait + ?Sized,
406{
407 let db = manager.get_connection();
408
409 M::install(db).await?;
410
411 if let Some(steps) = steps {
412 info!("Rolling back {} applied migrations", steps);
413 } else {
414 info!("Rolling back all applied migrations");
415 }
416
417 let migrations = M::get_applied_migrations(db).await?.into_iter().rev();
418 if migrations.len() == 0 {
419 info!("No applied migrations");
420 }
421 for Migration { migration, .. } in migrations {
422 if let Some(steps) = steps.as_mut() {
423 if steps == &0 {
424 break;
425 }
426 *steps -= 1;
427 }
428 info!("Rolling back migration '{}'", migration.name());
429 migration.down(manager).await?;
430 info!("Migration '{}' has been rollbacked", migration.name());
431 seaql_migrations::Entity::delete_many()
432 .filter(Expr::col(seaql_migrations::Column::Version).eq(migration.name()))
433 .table_name(M::migration_table_name())
434 .exec(db)
435 .await?;
436 }
437
438 Ok(())
439}
440
441async fn query_tables<C>(db: &C) -> SelectStatement
442where
443 C: ConnectionTrait,
444{
445 match db.get_database_backend() {
446 #[cfg(feature = "sqlx-mysql")]
447 DbBackend::MySql => sea_schema::mysql::MySql.query_tables(),
448 #[cfg(feature = "sqlx-postgres")]
449 DbBackend::Postgres => sea_schema::postgres::Postgres.query_tables(),
450 #[cfg(feature = "sqlx-sqlite")]
451 DbBackend::Sqlite => sea_schema::sqlite::Sqlite.query_tables(),
452 #[allow(unreachable_patterns)]
453 other => panic!("{other:?} feature is off"),
454 }
455}
456
457fn get_current_schema<C>(db: &C) -> SimpleExpr
458where
459 C: ConnectionTrait,
460{
461 match db.get_database_backend() {
462 #[cfg(feature = "sqlx-mysql")]
463 DbBackend::MySql => sea_schema::mysql::MySql::get_current_schema(),
464 #[cfg(feature = "sqlx-postgres")]
465 DbBackend::Postgres => sea_schema::postgres::Postgres::get_current_schema(),
466 #[cfg(feature = "sqlx-sqlite")]
467 DbBackend::Sqlite => sea_schema::sqlite::Sqlite::get_current_schema(),
468 #[allow(unreachable_patterns)]
469 other => panic!("{other:?} feature is off"),
470 }
471}
472
473#[derive(DeriveIden)]
474enum InformationSchema {
475 #[sea_orm(iden = "information_schema")]
476 Schema,
477 #[sea_orm(iden = "TABLE_NAME")]
478 TableName,
479 #[sea_orm(iden = "CONSTRAINT_NAME")]
480 ConstraintName,
481 TableConstraints,
482 TableSchema,
483 ConstraintType,
484}
485
486fn query_mysql_foreign_keys<C>(db: &C) -> SelectStatement
487where
488 C: ConnectionTrait,
489{
490 let mut stmt = Query::select();
491 stmt.columns([
492 InformationSchema::TableName,
493 InformationSchema::ConstraintName,
494 ])
495 .from((
496 InformationSchema::Schema,
497 InformationSchema::TableConstraints,
498 ))
499 .cond_where(
500 Condition::all()
501 .add(get_current_schema(db).equals((
502 InformationSchema::TableConstraints,
503 InformationSchema::TableSchema,
504 )))
505 .add(
506 Expr::col((
507 InformationSchema::TableConstraints,
508 InformationSchema::ConstraintType,
509 ))
510 .eq("FOREIGN KEY"),
511 ),
512 );
513 stmt
514}
515
516#[derive(DeriveIden)]
517enum PgType {
518 Table,
519 Typname,
520 Typnamespace,
521 Typelem,
522}
523
524#[derive(DeriveIden)]
525enum PgNamespace {
526 Table,
527 Oid,
528 Nspname,
529}
530
531fn query_pg_types<C>(db: &C) -> SelectStatement
532where
533 C: ConnectionTrait,
534{
535 let mut stmt = Query::select();
536 stmt.column(PgType::Typname)
537 .from(PgType::Table)
538 .join(
539 JoinType::LeftJoin,
540 PgNamespace::Table,
541 Expr::col((PgNamespace::Table, PgNamespace::Oid))
542 .equals((PgType::Table, PgType::Typnamespace)),
543 )
544 .cond_where(
545 Condition::all()
546 .add(get_current_schema(db).equals((PgNamespace::Table, PgNamespace::Nspname)))
547 .add(Expr::col((PgType::Table, PgType::Typelem)).eq(0)),
548 );
549 stmt
550}
551
552trait QueryTable {
553 type Statement;
554
555 fn table_name(self, table_name: DynIden) -> Self::Statement;
556}
557
558impl QueryTable for SelectStatement {
559 type Statement = SelectStatement;
560
561 fn table_name(mut self, table_name: DynIden) -> SelectStatement {
562 self.from(table_name);
563 self
564 }
565}
566
567impl QueryTable for sea_query::TableCreateStatement {
568 type Statement = sea_query::TableCreateStatement;
569
570 fn table_name(mut self, table_name: DynIden) -> sea_query::TableCreateStatement {
571 self.table(table_name);
572 self
573 }
574}
575
576impl<A> QueryTable for sea_orm::Insert<A>
577where
578 A: ActiveModelTrait,
579{
580 type Statement = sea_orm::Insert<A>;
581
582 fn table_name(mut self, table_name: DynIden) -> sea_orm::Insert<A> {
583 sea_orm::QueryTrait::query(&mut self).into_table(table_name);
584 self
585 }
586}
587
588impl<E> QueryTable for sea_orm::DeleteMany<E>
589where
590 E: EntityTrait,
591{
592 type Statement = sea_orm::DeleteMany<E>;
593
594 fn table_name(mut self, table_name: DynIden) -> sea_orm::DeleteMany<E> {
595 sea_orm::QueryTrait::query(&mut self).from_table(table_name);
596 self
597 }
598}