use std::error::Error;
use std::fmt::{Display, Formatter};
use std::sync::Arc;
use async_trait::async_trait;
pub use db_up_codegen::{ migrations };
pub use db_up_sql_changelog::{Result as ChangelogResult, *};
#[derive(Debug)]
pub enum MigrationsErrorKind {
MigrationDatabaseStepFailed(Option<Box<dyn Error + Send + Sync>>),
MigrationDatabaseFailed(Option<Box<dyn Error + Send + Sync>>),
MigrationSetupFailed(Option<Box<dyn Error + Send + Sync>>),
MigrationVersioningFailed(Option<Box<dyn Error + Send + Sync>>),
CustomErrorMessage(String, Option<Box<dyn Error + Send + Sync>>),
}
#[derive(Debug)]
pub struct MigrationsError {
kind: MigrationsErrorKind,
last_successful_version: Option<u32>,
}
impl MigrationsError {
pub fn migration_database_step_failed(last_successful_version: Option<u32>,
cause: Option<Box<dyn Error + Send + Sync>>) -> MigrationsError {
return MigrationsError {
kind: MigrationsErrorKind::MigrationDatabaseStepFailed(cause),
last_successful_version
};
}
pub fn migration_database_failed(last_successful_version: Option<u32>,
cause: Option<Box<dyn Error + Send + Sync>>) -> MigrationsError {
return MigrationsError {
kind: MigrationsErrorKind::MigrationDatabaseFailed(cause),
last_successful_version
};
}
pub fn migration_setup_failed(cause: Option<Box<dyn Error + Send + Sync>>) -> MigrationsError {
return MigrationsError {
kind: MigrationsErrorKind::MigrationSetupFailed(cause),
last_successful_version: None,
};
}
pub fn migration_versioning_failed(cause: Option<Box<dyn Error + Send + Sync>>) -> MigrationsError {
return MigrationsError {
kind: MigrationsErrorKind::MigrationVersioningFailed(cause),
last_successful_version: None,
};
}
pub fn custom_message(message: &str, last_successful_version: Option<u32>,
cause: Option<Box<dyn Error + Send + Sync>>) -> MigrationsError {
return MigrationsError {
kind: MigrationsErrorKind::CustomErrorMessage(message.to_string(), cause),
last_successful_version,
};
}
pub fn kind(&self) -> &MigrationsErrorKind {
&self.kind
}
pub fn last_successful_version(&self) -> Option<u32> {
self.last_successful_version
}
}
pub type Result<T> = std::result::Result<T, MigrationsError>;
impl Display for MigrationsError {
fn fmt(&self, fmt: &mut Formatter<'_>) -> std::fmt::Result {
match &self.kind {
MigrationsErrorKind::MigrationDatabaseStepFailed(err_opt) => {
let mut result = write!(fmt, "Migration step failed.");
if err_opt.is_some() {
result = write!(fmt, "\nCaused by: {}", err_opt.as_ref().unwrap());
}
return result;
},
MigrationsErrorKind::MigrationDatabaseFailed(err_opt) => {
let mut result = write!(fmt, "Migration failed.");
if err_opt.is_some() {
result = write!(fmt, "\nCaused by: {}", err_opt.as_ref().unwrap());
}
return result;
},
MigrationsErrorKind::MigrationSetupFailed(err_opt) => {
let mut result = write!(fmt, "Migration setup failed.");
if err_opt.is_some() {
result = write!(fmt, "\nCaused by: {}", err_opt.as_ref().unwrap());
}
return result;
},
MigrationsErrorKind::MigrationVersioningFailed(err_opt) => {
let mut result = write!(fmt, "Migration versioning failed.");
if err_opt.is_some() {
result = write!(fmt, "\nCaused by: {}", err_opt.as_ref().unwrap());
}
return result;
},
MigrationsErrorKind::CustomErrorMessage(message, err_opt) => {
let mut result = write!(fmt, "{}", message.as_str());
if err_opt.is_some() {
result = write!(fmt, "\nCaused by: {}", err_opt.as_ref().unwrap());
}
return result;
}
};
}
}
impl Error for MigrationsError {
}
#[derive(Debug, Clone)]
pub enum MigrationStatus {
InProgress,
Deployed,
}
#[derive(Debug, Clone)]
pub struct MigrationState {
pub version: u32,
pub status: MigrationStatus,
}
#[async_trait]
pub trait MigrationStateManager {
async fn prepare(&self) -> Result<()>;
async fn lowest_version(&self) -> Result<Option<MigrationState>>;
async fn highest_version(&self) -> Result<Option<MigrationState>>;
async fn list_versions(&self) -> Result<Vec<MigrationState>>;
async fn begin_version(&self, version: u32) -> Result<()>;
async fn finish_version(&self, version: u32) -> Result<()>;
}
#[async_trait]
pub trait MigrationExecutor {
async fn begin_transaction(&self) -> Result<()>;
async fn execute_changelog_file(&self, changelog_file: ChangelogFile) -> Result<()>;
async fn commit_transaction(&self) -> Result<()>;
async fn rollback_transaction(&self) -> Result<()>;
}
pub struct MigrationRunner<S, M, E> {
store: S,
state_manager: Arc<M>,
executor: Arc<E>,
}
pub trait MigrationStore {
fn changelogs(&self) -> Vec<ChangelogFile>;
}
impl<S, M, E> MigrationRunner<S, M, E>
where S: MigrationStore,
M: MigrationStateManager,
E: MigrationExecutor {
pub fn new(store: S, state_manager: Arc<M>, executor: Arc<E>) -> Self {
return Self {
store, state_manager, executor
};
}
pub async fn migrate(&self) -> Result<Option<u32>> {
self.state_manager.prepare().await?;
let mut current_highest_version = self.state_manager.highest_version()
.await?
.map(|state| state.version);
let mut migrations: Vec<ChangelogFile> = self.store.changelogs().into_iter()
.filter(|migration| {
let version: u32 = migration.version()
.parse()
.expect("Version must be an integer");
return current_highest_version.map(|highest_version| version > highest_version)
.or(Some(true))
.unwrap();
})
.collect::<Vec<ChangelogFile>>();
println!("sorting migrations ...");
migrations.sort_by(|a, b| a.version().cmp(b.version()));
let migrations = migrations;
println!("running migrations ... {:?}", &migrations);
for changelog in migrations.into_iter() {
let version: u32 = changelog.version().parse().unwrap();
self.state_manager.begin_version(version).await?;
self.executor.begin_transaction().await?;
let result = self.executor
.execute_changelog_file(changelog)
.await;
match result {
Ok(_) => {
self.executor.commit_transaction().await?;
self.state_manager.finish_version(version).await?;
current_highest_version = Some(version);
},
Err(err) => {
let _result = self.executor.rollback_transaction().await
.or::<MigrationsError>(Ok(()))
.unwrap();
return Err(err);
}
}
}
return Ok(current_highest_version);
}
}