1mod queries;
2use queries::*;
3
4use std::collections::HashSet;
5use std::fmt::Display;
6use std::future::Future;
7use std::pin::Pin;
8use std::time::SystemTime;
9use tracing::info;
10
11use sea_orm::sea_query::{
12 Alias, Expr, ExprTrait, ForeignKey, IntoIden, Order, Query, Table, extension::postgres::Type,
13};
14use sea_orm::{
15 ActiveValue, ConnectionTrait, DbBackend, DbErr, DynIden, EntityTrait, FromQueryResult,
16 Iterable, QueryFilter, Schema, Statement, TransactionTrait,
17};
18#[allow(unused_imports)]
19use sea_schema::probe::SchemaProbe;
20
21use super::{IntoSchemaManagerConnection, MigrationTrait, SchemaManager, seaql_migrations};
22
23#[derive(Copy, Clone, Debug, PartialEq, Eq)]
24pub enum MigrationStatus {
26 Pending,
28 Applied,
30}
31
32impl Display for MigrationStatus {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 let status = match self {
35 MigrationStatus::Pending => "Pending",
36 MigrationStatus::Applied => "Applied",
37 };
38 write!(f, "{status}")
39 }
40}
41
42pub struct Migration {
43 migration: Box<dyn MigrationTrait>,
44 status: MigrationStatus,
45}
46
47impl Migration {
48 pub fn name(&self) -> &str {
50 self.migration.name()
51 }
52
53 pub fn status(&self) -> MigrationStatus {
55 self.status
56 }
57}
58
59#[async_trait::async_trait]
61pub trait MigratorTrait: Send {
62 fn migrations() -> Vec<Box<dyn MigrationTrait>>;
64
65 fn migration_table_name() -> DynIden {
67 seaql_migrations::Entity.into_iden()
68 }
69
70 fn get_migration_files() -> Vec<Migration> {
72 Self::migrations()
73 .into_iter()
74 .map(|migration| Migration {
75 migration,
76 status: MigrationStatus::Pending,
77 })
78 .collect()
79 }
80
81 async fn get_migration_models<C>(db: &C) -> Result<Vec<seaql_migrations::Model>, DbErr>
83 where
84 C: ConnectionTrait,
85 {
86 Self::install(db).await?;
87 get_migration_models(db, Self::migration_table_name()).await
88 }
89
90 async fn get_migration_with_status<C>(db: &C) -> Result<Vec<Migration>, DbErr>
92 where
93 C: ConnectionTrait,
94 {
95 Self::install(db).await?;
96 get_migration_with_status(
97 Self::get_migration_files(),
98 Self::get_migration_models(db).await?,
99 )
100 }
101
102 async fn get_pending_migrations<C>(db: &C) -> Result<Vec<Migration>, DbErr>
104 where
105 C: ConnectionTrait,
106 {
107 Self::install(db).await?;
108 Ok(Self::get_migration_with_status(db)
109 .await?
110 .into_iter()
111 .filter(|file| file.status == MigrationStatus::Pending)
112 .collect())
113 }
114
115 async fn get_applied_migrations<C>(db: &C) -> Result<Vec<Migration>, DbErr>
117 where
118 C: ConnectionTrait,
119 {
120 Self::install(db).await?;
121 Ok(Self::get_migration_with_status(db)
122 .await?
123 .into_iter()
124 .filter(|file| file.status == MigrationStatus::Applied)
125 .collect())
126 }
127
128 async fn install<C>(db: &C) -> Result<(), DbErr>
130 where
131 C: ConnectionTrait,
132 {
133 install(db, Self::migration_table_name()).await
134 }
135
136 async fn status<C>(db: &C) -> Result<(), DbErr>
138 where
139 C: ConnectionTrait,
140 {
141 Self::install(db).await?;
142
143 info!("Checking migration status");
144
145 for Migration { migration, status } in Self::get_migration_with_status(db).await? {
146 info!("Migration '{}'... {}", migration.name(), status);
147 }
148
149 Ok(())
150 }
151
152 async fn fresh<'c, C>(db: C) -> Result<(), DbErr>
154 where
155 C: IntoSchemaManagerConnection<'c>,
156 {
157 exec_with_connection::<'_, _, _>(db, move |manager| {
158 Box::pin(async move { exec_fresh::<Self>(manager).await })
159 })
160 .await
161 }
162
163 async fn refresh<'c, C>(db: C) -> Result<(), DbErr>
165 where
166 C: IntoSchemaManagerConnection<'c>,
167 {
168 exec_with_connection::<'_, _, _>(db, move |manager| {
169 Box::pin(async move {
170 exec_down::<Self>(manager, None).await?;
171 exec_up::<Self>(manager, None).await
172 })
173 })
174 .await
175 }
176
177 async fn reset<'c, C>(db: C) -> Result<(), DbErr>
179 where
180 C: IntoSchemaManagerConnection<'c>,
181 {
182 exec_with_connection::<'_, _, _>(db, move |manager| {
183 Box::pin(async move {
184 exec_down::<Self>(manager, None).await?;
186
187 uninstall(manager, Self::migration_table_name()).await?;
189
190 Ok(())
191 })
192 })
193 .await
194 }
195
196 async fn uninstall<'c, C>(db: C) -> Result<(), DbErr>
199 where
200 C: IntoSchemaManagerConnection<'c>,
201 {
202 exec_with_connection::<'_, _, _>(db, move |manager| {
203 Box::pin(uninstall(manager, Self::migration_table_name()))
204 })
205 .await
206 }
207
208 async fn up<'c, C>(db: C, steps: Option<u32>) -> Result<(), DbErr>
210 where
211 C: IntoSchemaManagerConnection<'c>,
212 {
213 exec_with_connection::<'_, _, _>(db, move |manager| {
214 Box::pin(async move { exec_up::<Self>(manager, steps).await })
215 })
216 .await
217 }
218
219 async fn down<'c, C>(db: C, steps: Option<u32>) -> Result<(), DbErr>
221 where
222 C: IntoSchemaManagerConnection<'c>,
223 {
224 exec_with_connection::<'_, _, _>(db, move |manager| {
225 Box::pin(async move { exec_down::<Self>(manager, steps).await })
226 })
227 .await
228 }
229}
230
231async fn get_migration_models<C>(
232 db: &C,
233 migration_table_name: DynIden,
234) -> Result<Vec<seaql_migrations::Model>, DbErr>
235where
236 C: ConnectionTrait,
237{
238 let stmt = Query::select()
239 .table_name(migration_table_name)
240 .columns(seaql_migrations::Column::iter().map(IntoIden::into_iden))
241 .order_by(seaql_migrations::Column::Version, Order::Asc)
242 .to_owned();
243 let builder = db.get_database_backend();
244 seaql_migrations::Model::find_by_statement(builder.build(&stmt))
245 .all(db)
246 .await
247}
248
249fn get_migration_with_status(
250 migration_files: Vec<Migration>,
251 migration_models: Vec<seaql_migrations::Model>,
252) -> Result<Vec<Migration>, DbErr> {
253 let mut migration_files = migration_files;
254
255 let migration_in_db: HashSet<String> = migration_models
256 .into_iter()
257 .map(|model| model.version)
258 .collect();
259 let migration_in_fs: HashSet<String> = migration_files
260 .iter()
261 .map(|file| file.migration.name().to_string())
262 .collect();
263
264 let pending_migrations = &migration_in_fs - &migration_in_db;
265 for migration_file in migration_files.iter_mut() {
266 if !pending_migrations.contains(migration_file.migration.name()) {
267 migration_file.status = MigrationStatus::Applied;
268 }
269 }
270
271 let missing_migrations_in_fs = &migration_in_db - &migration_in_fs;
272 let errors: Vec<String> = missing_migrations_in_fs
273 .iter()
274 .map(|missing_migration| {
275 format!("Migration file of version '{missing_migration}' is missing, this migration has been applied but its file is missing")
276 }).collect();
277
278 if !errors.is_empty() {
279 Err(DbErr::Custom(errors.join("\n")))
280 } else {
281 Ok(migration_files)
282 }
283}
284
285async fn exec_with_connection<'c, C, F>(db: C, f: F) -> Result<(), DbErr>
286where
287 C: IntoSchemaManagerConnection<'c>,
288 F: for<'b> Fn(
289 &'b SchemaManager<'_>,
290 ) -> Pin<Box<dyn Future<Output = Result<(), DbErr>> + Send + 'b>>,
291{
292 let db = db.into_database_executor();
293
294 match db.get_database_backend() {
295 DbBackend::Postgres => {
296 let transaction = db.begin().await?;
297 let manager = SchemaManager::new(&transaction);
298 f(&manager).await?;
299 transaction.commit().await
300 }
301 DbBackend::MySql | DbBackend::Sqlite => {
302 let manager = SchemaManager::new(db);
303 f(&manager).await
304 }
305 db => Err(DbErr::BackendNotSupported {
306 db: db.as_str(),
307 ctx: "exec_with_connection",
308 }),
309 }
310}
311
312async fn install<C>(db: &C, migration_table_name: DynIden) -> Result<(), DbErr>
313where
314 C: ConnectionTrait,
315{
316 let builder = db.get_database_backend();
317 let schema = Schema::new(builder);
318 let mut stmt = schema
319 .create_table_from_entity(seaql_migrations::Entity)
320 .table_name(migration_table_name);
321 stmt.if_not_exists();
322 db.execute(&stmt).await?;
323 Ok(())
324}
325
326async fn uninstall(
327 manager: &SchemaManager<'_>,
328 migration_table_name: DynIden,
329) -> Result<(), DbErr> {
330 let mut stmt = Table::drop();
331 stmt.table(migration_table_name).if_exists().cascade();
332 manager.drop_table(stmt).await?;
333 Ok(())
334}
335
336async fn exec_fresh<M>(manager: &SchemaManager<'_>) -> Result<(), DbErr>
337where
338 M: MigratorTrait + ?Sized,
339{
340 let db = manager.get_connection();
341
342 M::install(db).await?;
343
344 drop_everything(db).await?;
345
346 exec_up::<M>(manager, None).await
347}
348
349async fn drop_everything<C: ConnectionTrait>(db: &C) -> Result<(), DbErr> {
350 let db_backend = db.get_database_backend();
351
352 if db_backend == DbBackend::Sqlite {
354 info!("Disabling foreign key check");
355 db.execute_raw(Statement::from_string(
356 db_backend,
357 "PRAGMA foreign_keys = OFF".to_owned(),
358 ))
359 .await?;
360 info!("Foreign key check disabled");
361 }
362
363 if db_backend == DbBackend::MySql {
365 info!("Dropping all foreign keys");
366 let stmt = query_mysql_foreign_keys(db);
367 let rows = db.query_all(&stmt).await?;
368 for row in rows.into_iter() {
369 let constraint_name: String = row.try_get("", "CONSTRAINT_NAME")?;
370 let table_name: String = row.try_get("", "TABLE_NAME")?;
371 info!(
372 "Dropping foreign key '{}' from table '{}'",
373 constraint_name, table_name
374 );
375 let mut stmt = ForeignKey::drop();
376 stmt.table(Alias::new(table_name.as_str()))
377 .name(constraint_name.as_str());
378 db.execute(&stmt).await?;
379 info!("Foreign key '{}' has been dropped", constraint_name);
380 }
381 info!("All foreign keys dropped");
382 }
383
384 let stmt = query_tables(db)?;
386 let rows = db.query_all(&stmt).await?;
387 for row in rows.into_iter() {
388 let table_name: String = row.try_get("", "table_name")?;
389 info!("Dropping table '{}'", table_name);
390 let mut stmt = Table::drop();
391 stmt.table(Alias::new(table_name.as_str()))
392 .if_exists()
393 .cascade();
394 db.execute(&stmt).await?;
395 info!("Table '{}' has been dropped", table_name);
396 }
397
398 if db_backend == DbBackend::Postgres {
400 info!("Dropping all types");
401 let stmt = query_pg_types(db);
402 let rows = db.query_all(&stmt).await?;
403 for row in rows {
404 let type_name: String = row.try_get("", "typname")?;
405 info!("Dropping type '{}'", type_name);
406 let mut stmt = Type::drop();
407 stmt.name(Alias::new(&type_name));
408 db.execute(&stmt).await?;
409 info!("Type '{}' has been dropped", type_name);
410 }
411 }
412
413 if db_backend == DbBackend::Sqlite {
415 info!("Restoring foreign key check");
416 db.execute_raw(Statement::from_string(
417 db_backend,
418 "PRAGMA foreign_keys = ON".to_owned(),
419 ))
420 .await?;
421 info!("Foreign key check restored");
422 }
423
424 Ok(())
425}
426
427async fn exec_up<M>(manager: &SchemaManager<'_>, steps: Option<u32>) -> Result<(), DbErr>
428where
429 M: MigratorTrait + ?Sized,
430{
431 let db = manager.get_connection();
432
433 M::install(db).await?;
434
435 exec_up_with(
436 manager,
437 steps,
438 M::get_pending_migrations(db).await?,
439 M::migration_table_name(),
440 )
441 .await
442}
443
444async fn exec_up_with(
445 manager: &SchemaManager<'_>,
446 mut steps: Option<u32>,
447 pending_migrations: Vec<Migration>,
448 migration_table_name: DynIden,
449) -> Result<(), DbErr> {
450 let db = manager.get_connection();
451
452 if let Some(steps) = steps {
453 info!("Applying {} pending migrations", steps);
454 } else {
455 info!("Applying all pending migrations");
456 }
457 if pending_migrations.is_empty() {
458 info!("No pending migrations");
459 }
460
461 for Migration { migration, .. } in pending_migrations {
462 if let Some(steps) = steps.as_mut() {
463 if steps == &0 {
464 break;
465 }
466 *steps -= 1;
467 }
468 info!("Applying migration '{}'", migration.name());
469 migration.up(manager).await?;
470 info!("Migration '{}' has been applied", migration.name());
471 let now = SystemTime::now()
472 .duration_since(SystemTime::UNIX_EPOCH)
473 .expect("SystemTime before UNIX EPOCH!");
474 seaql_migrations::Entity::insert(seaql_migrations::ActiveModel {
475 version: ActiveValue::Set(migration.name().to_owned()),
476 applied_at: ActiveValue::Set(now.as_secs() as i64),
477 })
478 .table_name(migration_table_name.clone())
479 .exec(db)
480 .await?;
481 }
482
483 Ok(())
484}
485
486async fn exec_down<M>(manager: &SchemaManager<'_>, steps: Option<u32>) -> Result<(), DbErr>
487where
488 M: MigratorTrait + ?Sized,
489{
490 let db = manager.get_connection();
491
492 M::install(db).await?;
493
494 exec_down_with(
495 manager,
496 steps,
497 M::get_applied_migrations(db).await?,
498 M::migration_table_name(),
499 )
500 .await
501}
502
503async fn exec_down_with(
504 manager: &SchemaManager<'_>,
505 mut steps: Option<u32>,
506 applied_migrations: Vec<Migration>,
507 migration_table_name: DynIden,
508) -> Result<(), DbErr> {
509 let db = manager.get_connection();
510
511 if let Some(steps) = steps {
512 info!("Rolling back {} applied migrations", steps);
513 } else {
514 info!("Rolling back all applied migrations");
515 }
516 if applied_migrations.is_empty() {
517 info!("No applied migrations");
518 }
519
520 for Migration { migration, .. } in applied_migrations.into_iter().rev() {
521 if let Some(steps) = steps.as_mut() {
522 if steps == &0 {
523 break;
524 }
525 *steps -= 1;
526 }
527 info!("Rolling back migration '{}'", migration.name());
528 migration.down(manager).await?;
529 info!("Migration '{}' has been rollbacked", migration.name());
530 seaql_migrations::Entity::delete_many()
531 .filter(Expr::col(seaql_migrations::Column::Version).eq(migration.name()))
532 .table_name(migration_table_name.clone())
533 .exec(db)
534 .await?;
535 }
536
537 Ok(())
538}