mod queries;
mod exec;
use exec::*;
mod with_self;
pub use with_self::*;
use std::fmt::Display;
use tracing::info;
use super::{IntoSchemaManagerConnection, MigrationTrait, SchemaManager, seaql_migrations};
use sea_orm::sea_query::IntoIden;
use sea_orm::{ConnectionTrait, DbErr, DynIden};
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum MigrationStatus {
Pending,
Applied,
}
impl Display for MigrationStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let status = match self {
MigrationStatus::Pending => "Pending",
MigrationStatus::Applied => "Applied",
};
write!(f, "{status}")
}
}
pub struct Migration {
migration: Box<dyn MigrationTrait>,
status: MigrationStatus,
}
impl Migration {
pub fn name(&self) -> &str {
self.migration.name()
}
pub fn status(&self) -> MigrationStatus {
self.status
}
}
#[async_trait::async_trait]
pub trait MigratorTrait: Send {
fn migrations() -> Vec<Box<dyn MigrationTrait>>;
fn migration_table_name() -> DynIden {
seaql_migrations::Entity.into_iden()
}
fn get_migration_files() -> Vec<Migration> {
Self::migrations()
.into_iter()
.map(|migration| Migration {
migration,
status: MigrationStatus::Pending,
})
.collect()
}
async fn get_migration_models<C>(db: &C) -> Result<Vec<seaql_migrations::Model>, DbErr>
where
C: ConnectionTrait,
{
Self::install(db).await?;
get_migration_models(db, Self::migration_table_name()).await
}
async fn get_migration_with_status<C>(db: &C) -> Result<Vec<Migration>, DbErr>
where
C: ConnectionTrait,
{
Self::install(db).await?;
get_migration_with_status(
Self::get_migration_files(),
Self::get_migration_models(db).await?,
)
}
async fn get_pending_migrations<C>(db: &C) -> Result<Vec<Migration>, DbErr>
where
C: ConnectionTrait,
{
Self::install(db).await?;
Ok(Self::get_migration_with_status(db)
.await?
.into_iter()
.filter(|file| file.status == MigrationStatus::Pending)
.collect())
}
async fn get_applied_migrations<C>(db: &C) -> Result<Vec<Migration>, DbErr>
where
C: ConnectionTrait,
{
Self::install(db).await?;
Ok(Self::get_migration_with_status(db)
.await?
.into_iter()
.filter(|file| file.status == MigrationStatus::Applied)
.collect())
}
async fn install<C>(db: &C) -> Result<(), DbErr>
where
C: ConnectionTrait,
{
install(db, Self::migration_table_name()).await
}
async fn status<C>(db: &C) -> Result<(), DbErr>
where
C: ConnectionTrait,
{
Self::install(db).await?;
info!("Checking migration status");
for Migration { migration, status } in Self::get_migration_with_status(db).await? {
info!("Migration '{}'... {}", migration.name(), status);
}
Ok(())
}
async fn fresh<'c, C>(db: C) -> Result<(), DbErr>
where
C: IntoSchemaManagerConnection<'c>,
{
let db = db.into_database_executor();
let manager = SchemaManager::new(db);
exec_fresh::<Self>(&manager).await
}
async fn refresh<'c, C>(db: C) -> Result<(), DbErr>
where
C: IntoSchemaManagerConnection<'c>,
{
let db = db.into_database_executor();
let manager = SchemaManager::new(db);
exec_down::<Self>(&manager, None).await?;
exec_up::<Self>(&manager, None).await
}
async fn reset<'c, C>(db: C) -> Result<(), DbErr>
where
C: IntoSchemaManagerConnection<'c>,
{
let db = db.into_database_executor();
let manager = SchemaManager::new(db);
exec_down::<Self>(&manager, None).await?;
uninstall(&manager, Self::migration_table_name()).await
}
async fn uninstall<'c, C>(db: C) -> Result<(), DbErr>
where
C: IntoSchemaManagerConnection<'c>,
{
let db = db.into_database_executor();
let manager = SchemaManager::new(db);
uninstall(&manager, Self::migration_table_name()).await
}
async fn up<'c, C>(db: C, steps: Option<u32>) -> Result<(), DbErr>
where
C: IntoSchemaManagerConnection<'c>,
{
let db = db.into_database_executor();
let manager = SchemaManager::new(db);
exec_up::<Self>(&manager, steps).await
}
async fn down<'c, C>(db: C, steps: Option<u32>) -> Result<(), DbErr>
where
C: IntoSchemaManagerConnection<'c>,
{
let db = db.into_database_executor();
let manager = SchemaManager::new(db);
exec_down::<Self>(&manager, steps).await
}
}
async fn exec_fresh<M>(manager: &SchemaManager<'_>) -> Result<(), DbErr>
where
M: MigratorTrait + ?Sized,
{
let db = manager.get_connection();
M::install(db).await?;
drop_everything(db).await?;
exec_up::<M>(manager, None).await
}
async fn exec_up<M>(manager: &SchemaManager<'_>, steps: Option<u32>) -> Result<(), DbErr>
where
M: MigratorTrait + ?Sized,
{
let db = manager.get_connection();
M::install(db).await?;
exec_up_with(
manager,
steps,
M::get_pending_migrations(db).await?,
M::migration_table_name(),
)
.await
}
async fn exec_down<M>(manager: &SchemaManager<'_>, steps: Option<u32>) -> Result<(), DbErr>
where
M: MigratorTrait + ?Sized,
{
let db = manager.get_connection();
M::install(db).await?;
exec_down_with(
manager,
steps,
M::get_applied_migrations(db).await?,
M::migration_table_name(),
)
.await
}