use crate::{AzothError, ProjectionStore, Result};
use rusqlite::Connection;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
pub trait Migration: Send + Sync {
fn version(&self) -> u32;
fn name(&self) -> &str;
fn up(&self, conn: &Connection) -> Result<()>;
fn down(&self, _conn: &Connection) -> Result<()> {
Err(AzothError::InvalidState(format!(
"Migration '{}' does not support rollback",
self.name()
)))
}
fn verify(&self, _conn: &Connection) -> Result<()> {
Ok(())
}
}
pub struct MigrationManager {
migrations: Vec<Box<dyn Migration>>,
}
impl MigrationManager {
pub fn new() -> Self {
Self {
migrations: Vec::new(),
}
}
pub fn add(&mut self, migration: Box<dyn Migration>) {
self.migrations.push(migration);
}
pub fn add_all(&mut self, migrations: Vec<Box<dyn Migration>>) {
for m in migrations {
self.add(m);
}
}
pub fn load_from_directory<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
let path = path.as_ref();
if !path.exists() {
return Err(AzothError::InvalidState(format!(
"Migration directory does not exist: {}",
path.display()
)));
}
let mut entries: Vec<_> = fs::read_dir(path)
.map_err(|e| AzothError::Projection(format!("Failed to read directory: {}", e)))?
.filter_map(|e| e.ok())
.filter(|e| {
e.path()
.extension()
.map(|ext| ext == "sql")
.unwrap_or(false)
})
.collect();
entries.sort_by_key(|e| e.path());
for entry in entries {
let path = entry.path();
let file_name = path
.file_stem()
.and_then(|s| s.to_str())
.ok_or_else(|| AzothError::InvalidState("Invalid migration filename".into()))?;
if file_name.ends_with(".down") {
continue;
}
let migration = FileMigration::from_file(&path)?;
self.add(Box::new(migration));
}
Ok(())
}
pub fn run(&self, projection: &Arc<crate::SqliteProjectionStore>) -> Result<()> {
self.init_migration_history(projection)?;
let current_version = projection.schema_version()?;
let mut sorted: Vec<_> = self.migrations.iter().collect();
sorted.sort_by_key(|m| m.version());
let pending: Vec<_> = sorted
.into_iter()
.filter(|m| m.version() > current_version)
.collect();
if pending.is_empty() {
tracing::info!("No pending migrations");
return Ok(());
}
tracing::info!("Running {} pending migrations", pending.len());
for migration in pending {
self.run_single(projection, &**migration)?;
}
Ok(())
}
fn init_migration_history(&self, projection: &Arc<crate::SqliteProjectionStore>) -> Result<()> {
let conn = projection.conn().lock();
conn.execute(
"CREATE TABLE IF NOT EXISTS migration_history (
version INTEGER PRIMARY KEY,
name TEXT NOT NULL,
applied_at TEXT NOT NULL DEFAULT (datetime('now'))
)",
[],
)
.map_err(|e| AzothError::Projection(e.to_string()))?;
Ok(())
}
fn run_single(
&self,
projection: &Arc<crate::SqliteProjectionStore>,
migration: &dyn Migration,
) -> Result<()> {
tracing::info!(
"Applying migration v{}: {}",
migration.version(),
migration.name()
);
let conn = projection.conn().lock();
conn.execute_batch("BEGIN IMMEDIATE TRANSACTION")
.map_err(|e| AzothError::Projection(e.to_string()))?;
let apply_result: Result<()> = (|| {
migration.up(&conn)?;
conn.execute(
"INSERT OR REPLACE INTO migration_history (version, name, applied_at)
VALUES (?1, ?2, datetime('now'))",
rusqlite::params![migration.version(), migration.name()],
)
.map_err(|e| AzothError::Projection(e.to_string()))?;
conn.execute(
"UPDATE projection_meta
SET schema_version = ?1, updated_at = datetime('now')
WHERE id = 0",
[migration.version() as i64],
)
.map_err(|e| AzothError::Projection(e.to_string()))?;
migration.verify(&conn)?;
Ok(())
})();
if let Err(e) = apply_result {
let _ = conn.execute_batch("ROLLBACK");
return Err(e);
}
conn.execute_batch("COMMIT")
.map_err(|e| AzothError::Projection(e.to_string()))?;
tracing::info!("Migration v{} complete", migration.version());
Ok(())
}
pub fn rollback_last(&self, projection: &Arc<crate::SqliteProjectionStore>) -> Result<()> {
let current_version = projection.schema_version()?;
if current_version == 0 {
return Err(AzothError::InvalidState(
"Cannot rollback: no migrations have been applied".into(),
));
}
let migration = self
.migrations
.iter()
.find(|m| m.version() == current_version)
.ok_or_else(|| {
AzothError::InvalidState(format!(
"No migration found for version {}",
current_version
))
})?;
tracing::warn!(
"Rolling back migration v{}: {}",
migration.version(),
migration.name()
);
let conn = projection.conn().lock();
conn.execute_batch("BEGIN IMMEDIATE TRANSACTION")
.map_err(|e| AzothError::Projection(e.to_string()))?;
let rollback_result: Result<()> = (|| {
migration.down(&conn)?;
conn.execute(
"DELETE FROM migration_history WHERE version = ?1",
[current_version as i64],
)
.map_err(|e| AzothError::Projection(e.to_string()))?;
conn.execute(
"UPDATE projection_meta
SET schema_version = ?1, updated_at = datetime('now')
WHERE id = 0",
[(current_version - 1) as i64],
)
.map_err(|e| AzothError::Projection(e.to_string()))?;
Ok(())
})();
if let Err(e) = rollback_result {
let _ = conn.execute_batch("ROLLBACK");
return Err(e);
}
conn.execute_batch("COMMIT")
.map_err(|e| AzothError::Projection(e.to_string()))?;
Ok(())
}
pub fn list(&self) -> Vec<MigrationInfo> {
let mut sorted: Vec<_> = self.migrations.iter().collect();
sorted.sort_by_key(|m| m.version());
sorted
.into_iter()
.map(|m| MigrationInfo {
version: m.version(),
name: m.name().to_string(),
})
.collect()
}
pub fn pending(
&self,
projection: &Arc<crate::SqliteProjectionStore>,
) -> Result<Vec<MigrationInfo>> {
let current_version = projection.schema_version()?;
let mut sorted: Vec<_> = self.migrations.iter().collect();
sorted.sort_by_key(|m| m.version());
Ok(sorted
.into_iter()
.filter(|m| m.version() > current_version)
.map(|m| MigrationInfo {
version: m.version(),
name: m.name().to_string(),
})
.collect())
}
pub fn history(
&self,
projection: &Arc<crate::SqliteProjectionStore>,
) -> Result<Vec<MigrationHistoryEntry>> {
self.init_migration_history(projection)?;
let conn = projection.conn().lock();
let mut stmt = conn
.prepare("SELECT version, name, applied_at FROM migration_history ORDER BY version")
.map_err(|e| AzothError::Projection(e.to_string()))?;
let entries = stmt
.query_map([], |row| {
Ok(MigrationHistoryEntry {
version: row.get(0)?,
name: row.get(1)?,
applied_at: row.get(2)?,
})
})
.map_err(|e| AzothError::Projection(e.to_string()))?
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| AzothError::Projection(e.to_string()))?;
Ok(entries)
}
pub fn generate<P: AsRef<Path>>(&self, migrations_dir: P, name: &str) -> Result<PathBuf> {
let migrations_dir = migrations_dir.as_ref();
fs::create_dir_all(migrations_dir)
.map_err(|e| AzothError::Projection(format!("Failed to create directory: {}", e)))?;
let next_version = self
.migrations
.iter()
.map(|m| m.version())
.max()
.unwrap_or(0)
+ 1;
let filename = format!("{:04}_{}.sql", next_version, name);
let filepath = migrations_dir.join(&filename);
let template = format!(
"-- Migration: {}\n-- Version: {}\n-- Created: {}\n\n-- Add your migration SQL here\n",
name,
next_version,
chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC")
);
fs::write(&filepath, template)
.map_err(|e| AzothError::Projection(format!("Failed to write file: {}", e)))?;
let down_filename = format!("{:04}_{}.down.sql", next_version, name);
let down_filepath = migrations_dir.join(&down_filename);
let down_template = format!(
"-- Rollback: {}\n-- Version: {}\n\n-- Add your rollback SQL here\n",
name, next_version
);
fs::write(&down_filepath, down_template)
.map_err(|e| AzothError::Projection(format!("Failed to write file: {}", e)))?;
tracing::info!(
"Created migration files:\n - {}\n - {}",
filepath.display(),
down_filepath.display()
);
Ok(filepath)
}
}
impl Default for MigrationManager {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct MigrationInfo {
pub version: u32,
pub name: String,
}
#[derive(Debug, Clone)]
pub struct MigrationHistoryEntry {
pub version: u32,
pub name: String,
pub applied_at: String,
}
pub struct FileMigration {
version: u32,
name: String,
up_sql: String,
down_sql: Option<String>,
}
impl FileMigration {
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref();
let file_name = path
.file_stem()
.and_then(|s| s.to_str())
.ok_or_else(|| AzothError::InvalidState("Invalid migration filename".into()))?;
let parts: Vec<&str> = file_name.splitn(2, '_').collect();
if parts.len() != 2 {
return Err(AzothError::InvalidState(format!(
"Invalid migration filename format: {}. Expected: {{version}}_{{name}}.sql",
file_name
)));
}
let version: u32 = parts[0].parse().map_err(|_| {
AzothError::InvalidState(format!("Invalid version number in filename: {}", parts[0]))
})?;
let name = parts[1].to_string();
let up_sql = fs::read_to_string(path)
.map_err(|e| AzothError::Projection(format!("Failed to read migration file: {}", e)))?;
let down_path = path.with_file_name(format!("{}.down.sql", file_name));
let down_sql = if down_path.exists() {
Some(fs::read_to_string(&down_path).map_err(|e| {
AzothError::Projection(format!("Failed to read down migration file: {}", e))
})?)
} else {
None
};
Ok(Self {
version,
name,
up_sql,
down_sql,
})
}
}
impl Migration for FileMigration {
fn version(&self) -> u32 {
self.version
}
fn name(&self) -> &str {
&self.name
}
fn up(&self, conn: &Connection) -> Result<()> {
conn.execute_batch(&self.up_sql)
.map_err(|e| AzothError::Projection(format!("Migration failed: {}", e)))?;
Ok(())
}
fn down(&self, conn: &Connection) -> Result<()> {
if let Some(ref down_sql) = self.down_sql {
conn.execute_batch(down_sql)
.map_err(|e| AzothError::Projection(format!("Rollback failed: {}", e)))?;
Ok(())
} else {
Err(AzothError::InvalidState(format!(
"Migration '{}' does not have a down migration file",
self.name
)))
}
}
}
#[macro_export]
macro_rules! migration {
(
$name:ident,
version: $version:expr,
name: $migration_name:expr,
up: |$up_conn:ident| $up_body:block
$(, down: |$down_conn:ident| $down_body:block)?
) => {
struct $name;
impl $crate::Migration for $name {
fn version(&self) -> u32 {
$version
}
fn name(&self) -> &str {
$migration_name
}
fn up(&self, $up_conn: &rusqlite::Connection) -> $crate::Result<()> {
$up_body
}
$(
fn down(&self, $down_conn: &rusqlite::Connection) -> $crate::Result<()> {
$down_body
}
)?
}
};
}
#[cfg(test)]
mod tests {
use super::*;
struct TestMigration;
impl Migration for TestMigration {
fn version(&self) -> u32 {
1 }
fn name(&self) -> &str {
"test_migration"
}
fn up(&self, _conn: &Connection) -> Result<()> {
Ok(())
}
}
#[test]
fn test_migration_manager() {
let mut manager = MigrationManager::new();
manager.add(Box::new(TestMigration));
let migrations = manager.list();
assert_eq!(migrations.len(), 1);
assert_eq!(migrations[0].version, 1);
assert_eq!(migrations[0].name, "test_migration");
}
}