use crate::catalog::Catalog;
use crate::config::Config;
use crate::db::cleaner;
use crate::db::connection::connect_with_retry;
use crate::db::error_context::SqlErrorContext;
use crate::db::schema_processor::{SchemaProcessor, SchemaProcessorConfig};
use anyhow::{Context, Result};
use sqlx::PgPool;
use std::path::Path;
use tracing::{debug, info};
#[derive(Debug, Clone)]
pub struct SchemaOpsConfig {
pub clean_before_apply: bool,
pub verbose: bool,
pub validate_after_apply: bool,
}
impl Default for SchemaOpsConfig {
fn default() -> Self {
Self {
clean_before_apply: true,
verbose: true,
validate_after_apply: true,
}
}
}
pub async fn load_and_apply_schema(
pool: &PgPool,
schema_dir: &Path,
config: &SchemaOpsConfig,
objects: &crate::config::types::Objects,
) -> Result<()> {
if config.clean_before_apply {
info!("🧹 Cleaning database before applying schema...");
cleaner::clean_shadow_db(pool, objects)
.await
.map_err(|e| anyhow::anyhow!("Failed to clean database: {}", e))?;
}
let processor_config = SchemaProcessorConfig {
verbose: config.verbose,
clean_before_apply: false, objects: objects.clone(),
};
let processor = SchemaProcessor::new(pool.clone(), processor_config);
processor.process_schema_directory(schema_dir).await?;
if config.validate_after_apply {
info!("🔍 Validating applied schema...");
validate_schema_applied(pool).await?;
info!("✅ Schema validation completed");
}
Ok(())
}
async fn apply_schema_with_file_dependency_tracking(
shadow_pool: &PgPool,
schema_dir: &Path,
config: &SchemaOpsConfig,
verbose: bool,
objects: &crate::config::types::Objects,
) -> Result<Catalog> {
let processor_config = SchemaProcessorConfig {
verbose,
clean_before_apply: config.clean_before_apply,
objects: objects.clone(),
};
let processor = SchemaProcessor::new(shadow_pool.clone(), processor_config);
let processed_schema = processor
.process_schema_directory(schema_dir)
.await
.with_context(|| {
format!(
"Failed to process schema files from directory: {}\n\n\
Common causes:\n\
• Schema directory doesn't exist or is empty\n\
• Circular dependencies between files (A requires B, B requires A)\n\
• Missing dependency files referenced in '-- require:' headers\n\
• Invalid file paths in dependency declarations\n\
• SQL syntax errors in schema files",
schema_dir.display()
)
})?;
let catalog = processed_schema.with_file_dependencies_applied();
if config.validate_after_apply {
info!("🔍 Validating applied schema...");
validate_schema_applied(shadow_pool).await?;
info!("✅ Schema validation completed");
}
Ok(catalog)
}
pub async fn apply_roles_file(pool: &PgPool, roles_file: &Path) -> Result<()> {
if !roles_file.exists() {
debug!("Roles file not found at {}, skipping", roles_file.display());
return Ok(());
}
info!("Applying roles from {}...", roles_file.display());
let content = std::fs::read_to_string(roles_file)
.with_context(|| format!("Failed to read roles file: {}", roles_file.display()))?;
debug!("Executing roles file: {}", truncate_for_log(&content));
sqlx::raw_sql(&content).execute(pool).await.map_err(|e| {
let ctx = SqlErrorContext::from_sqlx_error(&e, &content);
anyhow::anyhow!(
"{}",
ctx.format(&roles_file.display().to_string(), &content)
)
})?;
info!("Roles applied successfully");
Ok(())
}
fn truncate_for_log(s: &str) -> String {
if s.len() > 100 {
format!("{}...", &s[..100])
} else {
s.to_string()
}
}
pub async fn apply_current_schema_to_shadow(config: &Config, root_dir: &Path) -> Result<Catalog> {
let shadow_url = config.databases.shadow.get_connection_string().await?;
let schema_dir = root_dir.join(&config.directories.schema);
let roles_file = root_dir.join(&config.directories.roles);
let ops_config = SchemaOpsConfig::default();
apply_schema_to_shadow_with_roles(
&shadow_url,
&schema_dir,
Some(&roles_file),
&ops_config,
config.schema.augment_dependencies_from_files,
config.schema.verbose_file_processing,
&config.objects,
)
.await
}
pub async fn apply_schema_to_shadow_with_roles(
shadow_url: &str,
schema_dir: &Path,
roles_file: Option<&Path>,
config: &SchemaOpsConfig,
enable_file_deps: bool,
verbose_file_processing: bool,
objects: &crate::config::types::Objects,
) -> Result<Catalog> {
let shadow_pool = connect_with_retry(shadow_url).await?;
if config.clean_before_apply {
info!("🧹 Cleaning database before applying schema...");
cleaner::clean_shadow_db(&shadow_pool, objects)
.await
.map_err(|e| anyhow::anyhow!("Failed to clean database: {}", e))?;
}
if let Some(roles_path) = roles_file {
apply_roles_file(&shadow_pool, roles_path).await?;
}
let schema_config = SchemaOpsConfig {
clean_before_apply: false,
..*config
};
let catalog = if enable_file_deps {
apply_schema_with_file_dependency_tracking(
&shadow_pool,
schema_dir,
&schema_config,
verbose_file_processing,
objects,
)
.await?
} else {
load_and_apply_schema(&shadow_pool, schema_dir, &schema_config, objects).await?;
Catalog::load(&shadow_pool)
.await
.map_err(|e| anyhow::anyhow!("Failed to load catalog from shadow database: {}", e))?
};
shadow_pool.close().await;
Ok(catalog)
}
async fn validate_schema_applied(pool: &PgPool) -> Result<()> {
sqlx::query("SELECT 1")
.execute(pool)
.await
.map_err(|e| anyhow::anyhow!("Database connectivity test failed: {}", e))?;
sqlx::query("SELECT count(*) FROM pg_tables WHERE schemaname NOT IN ('information_schema', 'pg_catalog')")
.execute(pool)
.await
.map_err(|e| anyhow::anyhow!("Failed to query system tables: {}", e))?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_schema_ops_config_default() {
let config = SchemaOpsConfig::default();
assert!(config.clean_before_apply);
assert!(config.verbose);
assert!(config.validate_after_apply);
}
#[test]
fn test_schema_ops_config_custom() {
let config = SchemaOpsConfig {
clean_before_apply: false,
verbose: false,
validate_after_apply: false,
};
assert!(!config.clean_before_apply);
assert!(!config.verbose);
assert!(!config.validate_after_apply);
}
}