use crate::catalog::Catalog;
use crate::config::Config;
use crate::config::filter::ObjectFilter;
use crate::db::cleaner;
use crate::db::error_context::SqlErrorContext;
use crate::db::schema_processor::{SchemaProcessor, SchemaProcessorConfig};
use anyhow::{Context, Result};
use sqlx::PgPool;
use std::path::Path;
use tracing::{debug, info};
pub async fn apply_roles_file(pool: &PgPool, roles_file: &Path) -> Result<()> {
if !roles_file.exists() {
debug!("Roles file not found at {}, skipping", roles_file.display());
return Ok(());
}
info!("Applying roles from {}...", roles_file.display());
let content = std::fs::read_to_string(roles_file)
.with_context(|| format!("Failed to read roles file: {}", roles_file.display()))?;
debug!("Executing roles file: {}", truncate_for_log(&content));
sqlx::raw_sql(&content).execute(pool).await.map_err(|e| {
let ctx = SqlErrorContext::from_sqlx_error(&e, &content);
anyhow::anyhow!(
"{}",
ctx.format(&roles_file.display().to_string(), &content)
)
})?;
info!("Roles applied successfully");
Ok(())
}
fn truncate_for_log(s: &str) -> String {
if s.len() > 100 {
format!("{}...", &s[..100])
} else {
s.to_string()
}
}
pub async fn build_desired_state(
config: &Config,
root_dir: &Path,
shadow_pool: &PgPool,
) -> Result<Catalog> {
let schema_dir = root_dir.join(&config.directories.schema);
let roles_file = root_dir.join(&config.directories.roles);
info!("🧹 Cleaning database before applying schema...");
cleaner::clean_shadow_db(shadow_pool, &config.objects)
.await
.map_err(|e| anyhow::anyhow!("Failed to clean database: {}", e))?;
apply_roles_file(shadow_pool, &roles_file).await?;
let processor_config = SchemaProcessorConfig {
verbose: config.schema.verbose_file_processing,
clean_before_apply: false, objects: config.objects.clone(),
};
let processor = SchemaProcessor::new(shadow_pool.clone(), processor_config);
let processed_schema = processor
.process_schema_directory(&schema_dir)
.await
.with_context(|| {
format!(
"Failed to process schema files from directory: {}\n\n\
Common causes:\n\
• Schema directory doesn't exist or is empty\n\
• Circular dependencies between files (A requires B, B requires A)\n\
• Missing dependency files referenced in '-- require:' headers\n\
• Invalid file paths in dependency declarations\n\
• SQL syntax errors in schema files",
schema_dir.display()
)
})?;
let catalog = if config.schema.augment_dependencies_from_files {
processed_schema.with_file_dependencies_applied()
} else {
processed_schema.catalog
};
info!("🔍 Validating applied schema...");
validate_schema_applied(shadow_pool).await?;
info!("✅ Schema validation completed");
Ok(ObjectFilter::from_config(config).filter_catalog(catalog))
}
pub async fn apply_current_schema_to_shadow(
config: &Config,
root_dir: &Path,
shadow: &crate::config::ShadowDatabase,
) -> Result<Catalog> {
let shadow_pool = shadow.connect_fresh().await?;
let catalog = build_desired_state(config, root_dir, &shadow_pool).await?;
crate::db::branch::drop_branch(shadow_pool).await?;
Ok(catalog)
}
async fn validate_schema_applied(pool: &PgPool) -> Result<()> {
sqlx::query("SELECT 1")
.execute(pool)
.await
.map_err(|e| anyhow::anyhow!("Database connectivity test failed: {}", e))?;
sqlx::query("SELECT count(*) FROM pg_tables WHERE schemaname NOT IN ('information_schema', 'pg_catalog')")
.execute(pool)
.await
.map_err(|e| anyhow::anyhow!("Failed to query system tables: {}", e))?;
Ok(())
}