#[cfg(feature = "postgres")]
pub mod postgres;
#[cfg(feature = "mysql")]
pub mod mysql;
use crate::Result;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::Database;
use std::collections::HashMap;
use tracing::info;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Migration {
pub id: String,
pub description: String,
pub version: u32,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Clone)]
pub struct MigrationRecord {
pub migration_id: String,
pub executed_at: DateTime<Utc>,
pub execution_time_ms: u64,
}
#[async_trait::async_trait]
pub trait MigrationRunner<DB: Database> {
async fn run_migration(&self, migration: &Migration, sql: &str) -> Result<()>;
async fn migration_table_exists(&self) -> Result<bool>;
async fn create_migration_table(&self) -> Result<()>;
async fn get_executed_migrations(&self) -> Result<Vec<MigrationRecord>>;
async fn record_migration(&self, migration: &Migration, execution_time_ms: u64) -> Result<()>;
}
pub struct MigrationManager<DB: Database> {
runner: Box<dyn MigrationRunner<DB> + Send + Sync>,
migrations: HashMap<String, (Migration, String, String)>, }
impl<DB: Database> MigrationManager<DB> {
pub fn new(runner: Box<dyn MigrationRunner<DB> + Send + Sync>) -> Self {
let mut manager = Self {
runner,
migrations: HashMap::new(),
};
manager.register_builtin_migrations();
manager
}
pub fn register_migration(
&mut self,
migration: Migration,
postgres_sql: String,
mysql_sql: String,
) {
self.migrations
.insert(migration.id.clone(), (migration, postgres_sql, mysql_sql));
}
pub async fn run_migrations(&self) -> Result<()> {
info!("Starting migration process...");
if !self.runner.migration_table_exists().await? {
info!("Creating migration tracking table...");
self.runner.create_migration_table().await?;
}
let executed = self.runner.get_executed_migrations().await?;
let executed_ids: std::collections::HashSet<String> =
executed.iter().map(|r| r.migration_id.clone()).collect();
let mut pending_migrations: Vec<_> = self
.migrations
.values()
.filter(|(migration, _, _)| !executed_ids.contains(&migration.id))
.collect();
pending_migrations.sort_by_key(|(migration, _, _)| migration.version);
if pending_migrations.is_empty() {
info!("No pending migrations to run");
return Ok(());
}
info!("Found {} pending migrations", pending_migrations.len());
for (migration, postgres_sql, mysql_sql) in pending_migrations {
info!(
"Running migration: {} - {}",
migration.id, migration.description
);
let start_time = std::time::Instant::now();
let sql = if std::any::type_name::<DB>().contains("Postgres") {
postgres_sql
} else {
mysql_sql
};
self.runner.run_migration(migration, sql).await?;
let execution_time_ms = start_time.elapsed().as_millis() as u64;
self.runner
.record_migration(migration, execution_time_ms)
.await?;
info!(
"Completed migration {} in {}ms",
migration.id, execution_time_ms
);
}
info!("All migrations completed successfully");
Ok(())
}
pub async fn get_migration_status(&self) -> Result<Vec<(Migration, bool)>> {
let executed = self.runner.get_executed_migrations().await?;
let executed_ids: std::collections::HashSet<String> =
executed.iter().map(|r| r.migration_id.clone()).collect();
let mut status: Vec<_> = self
.migrations
.values()
.map(|(migration, _, _)| (migration.clone(), executed_ids.contains(&migration.id)))
.collect();
status.sort_by_key(|(migration, _)| migration.version);
Ok(status)
}
fn register_builtin_migrations(&mut self) {
self.register_migration(
Migration {
id: "001_initial_schema".to_string(),
description: "Create initial hammerwork_jobs table".to_string(),
version: 1,
created_at: chrono::DateTime::parse_from_rfc3339("2025-01-01T00:00:00Z")
.unwrap()
.with_timezone(&Utc),
},
include_str!("001_initial_schema.postgres.sql").to_string(),
include_str!("001_initial_schema.mysql.sql").to_string(),
);
self.register_migration(
Migration {
id: "002_add_priority".to_string(),
description: "Add priority field and indexes for job prioritization".to_string(),
version: 2,
created_at: chrono::DateTime::parse_from_rfc3339("2025-02-01T00:00:00Z")
.unwrap()
.with_timezone(&Utc),
},
include_str!("002_add_priority.postgres.sql").to_string(),
include_str!("002_add_priority.mysql.sql").to_string(),
);
self.register_migration(
Migration {
id: "003_add_timeouts".to_string(),
description: "Add timeout_seconds and timed_out_at fields".to_string(),
version: 3,
created_at: chrono::DateTime::parse_from_rfc3339("2025-03-01T00:00:00Z")
.unwrap()
.with_timezone(&Utc),
},
include_str!("003_add_timeouts.postgres.sql").to_string(),
include_str!("003_add_timeouts.mysql.sql").to_string(),
);
self.register_migration(
Migration {
id: "004_add_cron".to_string(),
description: "Add cron scheduling fields and indexes".to_string(),
version: 4,
created_at: chrono::DateTime::parse_from_rfc3339("2025-04-01T00:00:00Z")
.unwrap()
.with_timezone(&Utc),
},
include_str!("004_add_cron.postgres.sql").to_string(),
include_str!("004_add_cron.mysql.sql").to_string(),
);
self.register_migration(
Migration {
id: "005_add_batches".to_string(),
description: "Add batch processing table and job batch_id field".to_string(),
version: 5,
created_at: chrono::DateTime::parse_from_rfc3339("2025-05-01T00:00:00Z")
.unwrap()
.with_timezone(&Utc),
},
include_str!("005_add_batches.postgres.sql").to_string(),
include_str!("005_add_batches.mysql.sql").to_string(),
);
self.register_migration(
Migration {
id: "006_add_result_storage".to_string(),
description: "Add result storage fields for job execution results".to_string(),
version: 6,
created_at: chrono::DateTime::parse_from_rfc3339("2025-06-01T00:00:00Z")
.unwrap()
.with_timezone(&Utc),
},
include_str!("006_add_result_storage.postgres.sql").to_string(),
include_str!("006_add_result_storage.mysql.sql").to_string(),
);
self.register_migration(
Migration {
id: "007_add_dependencies".to_string(),
description: "Add job dependencies and workflow support".to_string(),
version: 7,
created_at: chrono::DateTime::parse_from_rfc3339("2025-07-01T00:00:00Z")
.unwrap()
.with_timezone(&Utc),
},
include_str!("007_add_dependencies.postgres.sql").to_string(),
include_str!("007_add_dependencies.mysql.sql").to_string(),
);
self.register_migration(
Migration {
id: "008_add_result_config".to_string(),
description: "Add result configuration storage fields".to_string(),
version: 8,
created_at: chrono::DateTime::parse_from_rfc3339("2025-08-01T00:00:00Z")
.unwrap()
.with_timezone(&Utc),
},
include_str!("008_add_result_config.postgres.sql").to_string(),
include_str!("008_add_result_config.mysql.sql").to_string(),
);
self.register_migration(
Migration {
id: "009_add_tracing".to_string(),
description: "Add distributed tracing and correlation fields".to_string(),
version: 9,
created_at: chrono::DateTime::parse_from_rfc3339("2025-09-01T00:00:00Z")
.unwrap()
.with_timezone(&Utc),
},
include_str!("009_add_tracing.postgres.sql").to_string(),
include_str!("009_add_tracing.mysql.sql").to_string(),
);
self.register_migration(
Migration {
id: "010_add_archival".to_string(),
description: "Add job archival support and archive table".to_string(),
version: 10,
created_at: chrono::DateTime::parse_from_rfc3339("2025-10-01T00:00:00Z")
.unwrap()
.with_timezone(&Utc),
},
include_str!("010_add_archival.postgres.sql").to_string(),
include_str!("010_add_archival.mysql.sql").to_string(),
);
self.register_migration(
Migration {
id: "011_add_encryption".to_string(),
description: "Add job payload encryption and key management".to_string(),
version: 11,
created_at: chrono::DateTime::parse_from_rfc3339("2025-11-01T00:00:00Z")
.unwrap()
.with_timezone(&Utc),
},
include_str!("011_add_encryption.postgres.sql").to_string(),
include_str!("011_add_encryption.mysql.sql").to_string(),
);
self.register_migration(
Migration {
id: "012_optimize_dependencies".to_string(),
description: "Optimize job dependencies using native PostgreSQL UUID arrays"
.to_string(),
version: 12,
created_at: chrono::DateTime::parse_from_rfc3339("2025-12-01T00:00:00Z")
.unwrap()
.with_timezone(&Utc),
},
include_str!("012_optimize_dependencies.postgres.sql").to_string(),
include_str!("012_optimize_dependencies.mysql.sql").to_string(),
);
self.register_migration(
Migration {
id: "013_add_key_audit".to_string(),
description: "Add encryption key audit trail and operation logging".to_string(),
version: 13,
created_at: chrono::DateTime::parse_from_rfc3339("2025-12-15T00:00:00Z")
.unwrap()
.with_timezone(&Utc),
},
include_str!("013_add_key_audit.postgres.sql").to_string(),
include_str!("013_add_key_audit.mysql.sql").to_string(),
);
self.register_migration(
Migration {
id: "014_add_queue_pause".to_string(),
description: "Add queue pause and resume functionality for operational control"
.to_string(),
version: 14,
created_at: chrono::DateTime::parse_from_rfc3339("2025-12-30T00:00:00Z")
.unwrap()
.with_timezone(&Utc),
},
include_str!("014_add_queue_pause.postgres.sql").to_string(),
include_str!("014_add_queue_pause.mysql.sql").to_string(),
);
}
}