use crate::database::Database;
use crate::dialect::Dialect;
use crate::executor::Executor;
use super::checksum::checksum_of;
use super::registry::{MigrationSet, MigrationTrait, MigrationTransaction};
use super::schema::SchemaManager;
use super::store;
const DEFAULT_TABLE: &str = "_tork_migrations";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OnMismatch {
Warn,
Error,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MigrationStatus {
pub revision: String,
pub name: String,
pub applied: bool,
pub checksum_matches: Option<bool>,
}
pub struct Migrator<'d> {
db: &'d Database,
set: MigrationSet,
table: String,
on_mismatch: OnMismatch,
}
impl<'d> Migrator<'d> {
pub fn new(db: &'d Database, set: MigrationSet) -> Self {
Self {
db,
set,
table: DEFAULT_TABLE.to_string(),
on_mismatch: OnMismatch::Error,
}
}
pub fn table(mut self, name: &str) -> Self {
self.table = name.to_string();
self
}
pub fn on_checksum_mismatch(mut self, on_mismatch: OnMismatch) -> Self {
self.on_mismatch = on_mismatch;
self
}
pub async fn up(&self) -> crate::Result<usize> {
let executor = self.db.pinned().await?;
store::ensure_table(&executor, &self.table).await?;
let records = store::applied_records(&executor, &self.table).await?;
let batch = store::next_batch(&executor, &self.table).await?;
let mut count = 0;
for migration in self.set.sorted() {
let checksum = self.checksum_for(migration).await?;
if let Some(record) = records
.iter()
.find(|record| record.revision == migration.revision())
{
if record.checksum != checksum {
self.report_mismatch(migration.revision(), &record.checksum, &checksum)?;
}
continue;
}
let transactional = migration.transaction() == MigrationTransaction::Enabled;
self.begin(&executor, transactional).await?;
match self.apply_up(&executor, migration, &checksum, batch).await {
Ok(()) => {
self.commit(&executor, transactional).await?;
count += 1;
}
Err(error) => {
self.rollback(&executor, transactional).await;
return Err(error);
}
}
}
Ok(count)
}
async fn apply_up<E: Executor + Sync>(
&self,
executor: &E,
migration: &dyn MigrationTrait,
checksum: &str,
batch: i64,
) -> crate::Result<()> {
let mut schema = SchemaManager::executing(executor);
let start = std::time::Instant::now();
migration.up(&mut schema).await?;
let elapsed_ms = start.elapsed().as_millis() as i64;
store::record(
executor,
&self.table,
migration.revision(),
None,
migration.name(),
checksum,
batch,
elapsed_ms,
)
.await
}
fn dialect(&self) -> &dyn Dialect {
self.db.dialect().as_ref()
}
async fn begin<E: Executor + Sync>(&self, executor: &E, enabled: bool) -> crate::Result<()> {
if enabled {
executor
.execute(self.dialect().begin_sql().to_string(), Vec::new())
.await?;
}
Ok(())
}
async fn commit<E: Executor + Sync>(&self, executor: &E, enabled: bool) -> crate::Result<()> {
if enabled {
executor
.execute(self.dialect().commit_sql().to_string(), Vec::new())
.await?;
}
Ok(())
}
async fn rollback<E: Executor + Sync>(&self, executor: &E, enabled: bool) {
if enabled {
let _ = executor
.execute(self.dialect().rollback_sql().to_string(), Vec::new())
.await;
}
}
pub async fn status(&self) -> crate::Result<Vec<MigrationStatus>> {
store::ensure_table(self.db, &self.table).await?;
let records = store::applied_records(self.db, &self.table).await?;
let mut statuses = Vec::new();
for migration in self.set.sorted() {
let checksum = self.checksum_for(migration).await?;
let checksum_matches = records
.iter()
.find(|record| record.revision == migration.revision())
.map(|record| record.checksum == checksum);
statuses.push(MigrationStatus {
revision: migration.revision().to_string(),
name: migration.name().to_string(),
applied: checksum_matches.is_some(),
checksum_matches,
});
}
Ok(statuses)
}
async fn checksum_for(&self, migration: &dyn MigrationTrait) -> crate::Result<String> {
let dialect = self.db.dialect().as_ref();
let mut schema = SchemaManager::collect(dialect);
migration.up(&mut schema).await?;
Ok(checksum_of(&schema.into_collected()))
}
fn report_mismatch(
&self,
revision: &str,
stored: &str,
computed: &str,
) -> crate::Result<()> {
let message = format!(
"migration checksum mismatch: `{revision}` was applied with checksum \
{stored} but now renders to {computed}"
);
match self.on_mismatch {
OnMismatch::Error => Err(crate::OrmError::configuration(message)),
OnMismatch::Warn => {
eprintln!("tork-orm: {message}");
Ok(())
}
}
}
pub async fn down(&self, steps: usize) -> crate::Result<usize> {
let executor = self.db.pinned().await?;
store::ensure_table(&executor, &self.table).await?;
let revisions = store::recent_revisions(&executor, &self.table, steps).await?;
let mut count = 0;
for revision in revisions {
let Some(migration) = self.set.find(&revision) else {
return Err(crate::OrmError::configuration(format!(
"applied revision `{revision}` has no migration in the set"
)));
};
let transactional = migration.transaction() == MigrationTransaction::Enabled;
self.begin(&executor, transactional).await?;
match self.apply_down(&executor, migration, &revision).await {
Ok(()) => {
self.commit(&executor, transactional).await?;
count += 1;
}
Err(error) => {
self.rollback(&executor, transactional).await;
return Err(error);
}
}
}
Ok(count)
}
async fn apply_down<E: Executor + Sync>(
&self,
executor: &E,
migration: &dyn MigrationTrait,
revision: &str,
) -> crate::Result<()> {
let mut schema = SchemaManager::executing(executor);
migration.down(&mut schema).await?;
store::delete_record(executor, &self.table, revision).await
}
}