use crate::catalog::Catalog;
use crate::config::{Config, ObjectFilter};
use crate::db::connection::connect_with_retry;
use crate::migration::{discover_migrations, find_latest_baseline};
use crate::migration_tracking::format_tracking_table_name;
use crate::validation::{ValidationConfig, validate_catalogs};
use crate::validation_output::{BaselineInfo, ValidationOutputOptions, format_validation_output};
use anyhow::{Context, Result, anyhow};
use sqlx::PgPool;
use std::path::Path;
use crate::db::connection::connect_to_database;
pub async fn cmd_migrate_status(config: &Config) -> Result<()> {
println!("Checking migration status");
let dev_pool = connect_to_database(&config.databases.dev, "development database").await?;
let tracking_table_name = format_tracking_table_name(&config.migration.tracking_table)?;
sqlx::query(&format!(
r#"
CREATE TABLE IF NOT EXISTS {} (
version BIGINT PRIMARY KEY,
description TEXT NOT NULL,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
checksum TEXT NOT NULL
)
"#,
tracking_table_name
))
.execute(&dev_pool)
.await?;
let applied_migrations: Vec<(i64, String, String)> = sqlx::query_as(&format!(
"SELECT version, description, applied_at::TEXT FROM {} ORDER BY version",
tracking_table_name
))
.fetch_all(&dev_pool)
.await?;
if applied_migrations.is_empty() {
println!("No migrations have been applied");
} else {
println!("Applied migrations:");
for (version, description, applied_at) in applied_migrations {
println!(" {} - {} (applied: {})", version, description, applied_at);
}
}
dev_pool.close().await;
Ok(())
}
pub async fn cmd_migrate_validate(
config: &Config,
root_dir: &Path,
validation_options: &ValidationOutputOptions,
) -> Result<()> {
if !validation_options.quiet {
eprintln!("🔍 Validating migration consistency...");
}
let migrations_dir = root_dir.join(&config.directories.migrations);
let baselines_dir = root_dir.join(&config.directories.baselines);
std::fs::create_dir_all(&migrations_dir)?;
std::fs::create_dir_all(&baselines_dir)?;
let shadow_url = config.databases.shadow.get_connection_string().await?;
let shadow_pool = connect_with_retry(&shadow_url).await?;
if !validation_options.quiet {
eprintln!("📊 Reconstructing expected state from baseline + migration files...");
}
let roles_file = root_dir.join(&config.directories.roles);
let roles_path = if roles_file.exists() {
Some(roles_file.as_path())
} else {
None
};
let expected_catalog = reconstruct_expected_state_from_schema_files(
&shadow_pool,
&baselines_dir,
&migrations_dir,
roles_path,
&config.objects,
)
.await?;
if !validation_options.quiet {
eprintln!("🔍 Loading desired state from current schema files...");
}
let desired_catalog =
crate::schema_ops::apply_current_schema_to_shadow(config, root_dir).await?;
let filter = ObjectFilter::new(&config.objects, &config.migration.tracking_table);
let filtered_expected = filter.filter_catalog(expected_catalog);
let filtered_desired = filter.filter_catalog(desired_catalog);
if !validation_options.quiet {
eprintln!(
"🔍 Comparing expected state (baseline + migrations) vs desired state (schema files)..."
);
}
let validation_config = ValidationConfig {
show_differences: validation_options.format == "human", apply_object_filter: false, verbose: false,
};
let result = validate_catalogs(
&filtered_expected,
&filtered_desired,
config,
&validation_config,
)?;
let all_migrations = discover_migrations(&migrations_dir)?;
let migration_versions: Vec<u64> = all_migrations.iter().map(|m| m.version).collect();
let baseline_info = if let Some(latest_baseline) = find_latest_baseline(&baselines_dir)? {
Some(BaselineInfo {
version: latest_baseline.version,
object_count: 0, description: format!("baseline_V{}", latest_baseline.version),
})
} else {
None
};
let output = format_validation_output(
&result,
validation_options,
&migration_versions,
&[], baseline_info.as_ref(),
)?;
println!("{}", output);
if result.passed {
if !validation_options.quiet {
eprintln!("✅ Migration consistency validation passed");
}
Ok(())
} else {
Err(anyhow!(
"Migration validation failed: Schema files don't match expected state from baseline + migrations (found {} differences)",
result.differences.len()
))
}
}
async fn reconstruct_expected_state_from_schema_files(
shadow_pool: &PgPool,
baselines_dir: &Path,
migrations_dir: &Path,
roles_file: Option<&Path>,
objects: &crate::config::types::Objects,
) -> Result<Catalog> {
use crate::commands::migrate::section_executor::{ExecutionMode, SectionExecutor};
use crate::config::types::TrackingTable;
use crate::db::cleaner;
use crate::db::schema_executor::SchemaExecutor;
use crate::migration::{parse_migration_sections, validate_sections};
use crate::progress::SectionReporter;
cleaner::clean_shadow_db(shadow_pool, objects).await?;
if let Some(roles_path) = roles_file {
crate::schema_ops::apply_roles_file(shadow_pool, roles_path).await?;
}
if let Some(baseline) = find_latest_baseline(baselines_dir)? {
let baseline_sql = std::fs::read_to_string(&baseline.path)?;
SchemaExecutor::execute_sql_with_enhanced_errors(
shadow_pool,
&baseline.path,
&baseline_sql,
)
.await
.context("Failed to apply baseline during schema file reconstruction")?;
}
let all_migrations = discover_migrations(migrations_dir)?;
let mut sorted_migrations = all_migrations;
sorted_migrations.sort_by_key(|m| m.version);
let baseline_version = find_latest_baseline(baselines_dir)?
.map(|b| b.version)
.unwrap_or(0);
for migration_file in &sorted_migrations {
if migration_file.version <= baseline_version {
eprintln!(
"Warning: Migration {} predates baseline {} and will be skipped. \
Run 'pgmt migrate update {}' to renumber it.",
migration_file.version, baseline_version, migration_file.version
);
}
}
for migration_file in sorted_migrations {
if migration_file.version > baseline_version {
let migration_sql = std::fs::read_to_string(&migration_file.path).context(format!(
"Failed to read migration file {}",
migration_file.version
))?;
let sections = parse_migration_sections(&migration_file.path, &migration_sql)?;
validate_sections(§ions)?;
let tracking_table = TrackingTable::default();
let reporter = SectionReporter::new(sections.len(), false);
let mut executor = SectionExecutor::new(
shadow_pool.clone(),
tracking_table,
reporter,
ExecutionMode::Validation,
);
for section in §ions {
executor
.execute_section(migration_file.version, section)
.await
.context(format!(
"Failed to apply migration file {} during reconstruction",
migration_file.version
))?;
}
}
}
Catalog::load(shadow_pool).await
}