use crate::commands::migrate::section_executor::{ExecutionMode, SectionExecutor};
use crate::config::Config;
use crate::migration::{discover_migrations, parse_migration_sections, validate_sections};
use crate::migration_tracking::{
ensure_section_tracking_table, format_tracking_table_name, initialize_sections,
version_from_db, version_to_db,
};
use crate::progress::SectionReporter;
use anyhow::{Context, Result};
use std::path::Path;
use std::time::Instant;
use tracing::debug;
pub async fn cmd_migrate_apply(
config: &Config,
root_dir: &Path,
target: &crate::config::TargetUrl,
) -> Result<()> {
println!("Applying migrations to target database");
let migrations_dir = root_dir.join(&config.directories.migrations);
if !migrations_dir.exists() {
println!("No migrations directory found - nothing to apply");
return Ok(());
}
let pool =
crate::db::connection::connect_to_database(target.as_str(), "target database").await?;
let tracking_table_name = format_tracking_table_name(&config.migration.tracking_table)?;
sqlx::query(&format!(
r#"
CREATE TABLE IF NOT EXISTS {} (
version BIGINT PRIMARY KEY,
description TEXT NOT NULL,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
checksum TEXT NOT NULL
)
"#,
tracking_table_name
))
.execute(&pool)
.await?;
ensure_section_tracking_table(&pool, &config.migration.tracking_table).await?;
let applied_migrations: std::collections::HashMap<u64, String> =
sqlx::query_as::<_, (i64, String)>(&format!(
"SELECT version, checksum FROM {}",
tracking_table_name
))
.fetch_all(&pool)
.await?
.into_iter()
.map(|(v, checksum)| (version_from_db(v), checksum))
.collect();
let migrations = discover_migrations(&migrations_dir)?;
for migration in migrations {
let migration_sql = std::fs::read_to_string(&migration.path).with_context(|| {
format!(
"Failed to read migration file: {}",
migration.path.display()
)
})?;
let checksum = format!("{:x}", md5::compute(&migration_sql));
if let Some(stored_checksum) = applied_migrations.get(&migration.version) {
if stored_checksum != &checksum {
anyhow::bail!(
"Migration {} has been modified after being applied!\n\
Expected checksum: {}\n\
Actual checksum: {}\n\n\
Migrations must be immutable once applied. If you need to make changes:\n\
• Create a new migration with the changes\n\
• Or roll back and recreate this migration (dangerous in production)",
migration.version,
stored_checksum,
checksum
);
}
debug!("Migration {} already applied, skipping", migration.version);
continue;
}
println!(
"\nApplying migration {} - {}",
migration.version, migration.description
);
let start = Instant::now();
let sections = parse_migration_sections(&migration.path, &migration_sql)
.with_context(|| format!("Failed to parse migration {}", migration.version))?;
validate_sections(§ions).with_context(|| {
format!(
"Invalid section configuration in migration {}",
migration.version
)
})?;
initialize_sections(
&pool,
&config.migration.tracking_table,
migration.version,
§ions,
)
.await?;
let reporter = SectionReporter::new(sections.len(), false); let mut executor = SectionExecutor::new(
pool.clone(),
config.migration.tracking_table.clone(),
reporter,
ExecutionMode::Production,
);
for section in §ions {
executor
.execute_section(migration.version, section)
.await
.with_context(|| {
format!(
"Migration {} failed at section '{}'",
migration.version, section.name
)
})?;
}
let duration = start.elapsed();
sqlx::query(&format!(
"INSERT INTO {} (version, description, checksum) VALUES ($1, $2, $3)",
tracking_table_name
))
.bind(version_to_db(migration.version)?)
.bind(&migration.description)
.bind(&checksum)
.execute(&pool)
.await?;
let reporter = SectionReporter::new(sections.len(), false);
reporter.migration_summary(duration, sections.len());
}
Ok(())
}