use anyhow::{Context, Result};
use sqlx::PgPool;
use std::path::{Path, PathBuf};
use tracing::info;
use crate::catalog::Catalog;
use crate::commands::migrate::section_executor::{ExecutionMode, SectionExecutor};
use crate::config::Config;
use crate::config::filter::ObjectFilter;
use crate::db::cleaner;
use crate::db::schema_executor::BaselineExecutor;
use crate::migration::{
ParsedMigration, discover_migrations, find_baseline_for_version, find_latest_baseline,
generate_baseline_filename, parse_migration_sections, validate_sections,
};
use crate::progress::SectionReporter;
use crate::validation::validate_baseline_consistency;
#[derive(Debug, Clone)]
pub struct BaselineConfig {
pub validate_consistency: bool,
pub verbose: bool,
}
impl Default for BaselineConfig {
fn default() -> Self {
Self {
validate_consistency: true,
verbose: true,
}
}
}
#[derive(Debug)]
pub struct BaselineOperationResult {
pub path: PathBuf,
}
async fn load_managed_catalog(shadow_pool: &PgPool, config: &Config) -> Result<Catalog> {
let filter = ObjectFilter::from_config(config);
Catalog::load_managed(shadow_pool, &filter).await
}
pub async fn load_baseline_into_shadow(
shadow_pool: &PgPool,
baseline_path: &Path,
roles_file: &Path,
config: &Config,
) -> Result<Catalog> {
cleaner::clean_shadow_db(shadow_pool, &config.objects).await?;
crate::schema_ops::apply_roles_file(shadow_pool, roles_file).await?;
let baseline_sql = std::fs::read_to_string(baseline_path)
.with_context(|| format!("Failed to read baseline file: {}", baseline_path.display()))?;
let executor = BaselineExecutor::new(shadow_pool.clone(), false, false);
let source = baseline_path
.file_name()
.and_then(|name| name.to_str())
.unwrap_or("unknown_baseline.sql");
executor
.execute_baseline(&baseline_sql, source)
.await
.with_context(|| format!("Failed to apply baseline SQL: {}", baseline_path.display()))?;
load_managed_catalog(shadow_pool, config).await
}
async fn replay_migrations(
shadow_pool: &PgPool,
migrations: &[ParsedMigration],
config: &Config,
verbose: bool,
) -> Result<()> {
for migration in migrations {
if verbose {
println!(
" Applying {} - {}",
migration.version, migration.description
);
}
let migration_sql = std::fs::read_to_string(&migration.path).with_context(|| {
format!(
"Failed to read migration file: {}",
migration.path.display()
)
})?;
let sections = parse_migration_sections(&migration.path, &migration_sql)?;
validate_sections(§ions)?;
let reporter = SectionReporter::new(sections.len(), false);
let mut executor = SectionExecutor::new(
shadow_pool.clone(),
config.migration.tracking_table.clone(),
reporter,
ExecutionMode::Validation,
);
for section in §ions {
executor
.execute_section(migration.version, section)
.await
.with_context(|| {
format!(
"Failed to apply migration {}: {}",
migration.version,
migration.path.display()
)
})?;
}
}
Ok(())
}
fn warn_pre_baseline_migrations(migrations: &[ParsedMigration], baseline_version: u64) {
for m in migrations.iter().filter(|m| m.version <= baseline_version) {
eprintln!(
"Warning: Migration {} predates baseline {} and will be skipped. \
Run 'pgmt migrate update {}' to renumber it.",
m.version, baseline_version, m.version
);
}
}
pub async fn ensure_baseline_for_migration(
baselines_dir: &Path,
version: u64,
baseline_sql: &str,
config: &BaselineConfig,
) -> Result<BaselineOperationResult> {
let baseline_filename = generate_baseline_filename(version);
let baseline_path = baselines_dir.join(&baseline_filename);
std::fs::create_dir_all(baselines_dir).with_context(|| {
format!(
"Failed to create baselines directory: {}",
baselines_dir.display()
)
})?;
if config.verbose {
println!("💾 Writing baseline: {}", baseline_path.display());
}
std::fs::write(&baseline_path, baseline_sql)
.with_context(|| format!("Failed to write baseline file: {}", baseline_path.display()))?;
Ok(BaselineOperationResult {
path: baseline_path,
})
}
pub async fn validate_baseline_against_catalog(
shadow_pool: &PgPool,
baseline_path: &Path,
expected_catalog: &Catalog,
baseline_config: &BaselineConfig,
roles_file: &Path,
config: &Config,
) -> Result<()> {
if !baseline_config.validate_consistency {
return Ok(());
}
if baseline_config.verbose {
println!("Validating baseline matches intended schema...");
}
let baseline_catalog =
load_baseline_into_shadow(shadow_pool, baseline_path, roles_file, config).await?;
validate_baseline_consistency(&baseline_catalog, expected_catalog, config)?;
if baseline_config.verbose {
println!("✓ Baseline validation passed");
}
Ok(())
}
pub async fn get_migration_starting_state(
shadow_pool: &PgPool,
baselines_dir: &Path,
migrations_dir: &Path,
roles_file: &Path,
baseline_config: &BaselineConfig,
config: &Config,
) -> Result<Catalog> {
let all_migrations = discover_migrations(migrations_dir)?;
let migrations_to_replay = if let Some(baseline) = find_latest_baseline(baselines_dir)? {
if baseline_config.verbose {
info!("Loading baseline: {}", baseline.path.display());
}
load_baseline_into_shadow(shadow_pool, &baseline.path, roles_file, config).await?;
warn_pre_baseline_migrations(&all_migrations, baseline.version);
let after_baseline: Vec<_> = all_migrations
.into_iter()
.filter(|m| m.version > baseline.version)
.collect();
if baseline_config.verbose && !after_baseline.is_empty() {
println!(
"Applying {} migration(s) after baseline",
after_baseline.len()
);
}
after_baseline
} else {
if baseline_config.verbose {
info!("No existing baseline found, reconstructing from existing migrations");
}
cleaner::clean_shadow_db(shadow_pool, &config.objects).await?;
crate::schema_ops::apply_roles_file(shadow_pool, roles_file).await?;
if all_migrations.is_empty() {
println!("No existing migrations found, starting from empty schema");
} else {
println!(
"Applying {} existing migration(s) to reconstruct state",
all_migrations.len()
);
}
all_migrations
};
replay_migrations(
shadow_pool,
&migrations_to_replay,
config,
baseline_config.verbose,
)
.await?;
load_managed_catalog(shadow_pool, config).await
}
pub async fn get_migration_update_starting_state(
shadow_pool: &PgPool,
baselines_dir: &Path,
migrations_dir: &Path,
target_version: u64,
roles_file: &Path,
baseline_config: &BaselineConfig,
config: &Config,
) -> Result<Catalog> {
let all_migrations = discover_migrations(migrations_dir)?;
let migrations_to_replay =
if let Some(baseline) = find_baseline_for_version(baselines_dir, target_version)? {
if baseline_config.verbose {
info!("Loading previous baseline: {}", baseline.path.display());
}
load_baseline_into_shadow(shadow_pool, &baseline.path, roles_file, config).await?;
let in_range: Vec<_> = all_migrations
.into_iter()
.filter(|m| m.version > baseline.version && m.version < target_version)
.collect();
if baseline_config.verbose && !in_range.is_empty() {
println!(
"Applying {} migration(s) between baseline and target",
in_range.len()
);
}
in_range
} else {
if baseline_config.verbose {
info!(
"No previous baseline found, reconstructing from migrations before {}",
target_version
);
}
cleaner::clean_shadow_db(shadow_pool, &config.objects).await?;
crate::schema_ops::apply_roles_file(shadow_pool, roles_file).await?;
let before_target: Vec<_> = all_migrations
.into_iter()
.filter(|m| m.version < target_version)
.collect();
if before_target.is_empty() {
println!(
"No existing migrations found before {}, starting from empty schema",
target_version
);
} else {
println!(
"Applying {} existing migration(s) before {}",
before_target.len(),
target_version
);
}
before_target
};
replay_migrations(
shadow_pool,
&migrations_to_replay,
config,
baseline_config.verbose,
)
.await?;
load_managed_catalog(shadow_pool, config).await
}
pub fn should_manage_baseline_for_migration(
_config: &Config,
baseline_path: &Path,
create_baselines_by_default: bool,
) -> bool {
create_baselines_by_default || baseline_path.exists()
}
#[cfg(test)]
mod tests {
use super::*;
use std::env;
#[test]
fn test_baseline_config_default() {
let config = BaselineConfig::default();
assert!(config.validate_consistency);
assert!(config.verbose);
}
#[test]
fn test_should_manage_baseline_for_migration() {
let temp_dir = env::temp_dir().join("pgmt_test_baseline_management");
let baseline_path = temp_dir.join("baseline_V123.sql");
assert!(should_manage_baseline_for_migration(
&Config::default(),
&baseline_path,
true
));
assert!(!should_manage_baseline_for_migration(
&Config::default(),
&baseline_path,
false
));
std::fs::create_dir_all(&temp_dir).unwrap();
std::fs::write(&baseline_path, "test").unwrap();
assert!(should_manage_baseline_for_migration(
&Config::default(),
&baseline_path,
false
));
let _ = std::fs::remove_dir_all(&temp_dir);
}
}