use tokio_postgres::Client;
use tracing::{debug, info, warn};
use crate::config::MigrationConfig;
use crate::error::Result;
use crate::tls::connect_with_sslmode;
pub async fn run_target_analyze(
target_conn: &str,
schemas: &[String],
verbose: bool,
) -> Result<()> {
info!("running ANALYZE on target database");
let client = connect_with_sslmode(target_conn).await?;
if schemas.is_empty() {
let sql = build_analyze_sql(verbose);
client.batch_execute(&sql).await?;
info!("ANALYZE complete (all schemas)");
} else {
for schema in schemas {
analyze_schema(&client, schema, verbose).await;
}
info!(count = schemas.len(), "ANALYZE complete (filtered schemas)");
}
Ok(())
}
pub async fn run_source_vacuum(source_conn: &str, schemas: &[String], verbose: bool) -> Result<()> {
info!("running VACUUM ANALYZE on source database");
let client = connect_with_sslmode(source_conn).await?;
if schemas.is_empty() {
let sql = build_vacuum_analyze_sql(verbose);
client.batch_execute(&sql).await?;
info!("VACUUM ANALYZE complete (all schemas)");
} else {
for schema in schemas {
vacuum_schema(&client, schema, verbose).await;
}
info!(
count = schemas.len(),
"VACUUM ANALYZE complete (filtered schemas)"
);
}
Ok(())
}
async fn run_per_table(
client: &Client,
schema: &str,
verbose: bool,
build_sql: fn(&str, &str, bool) -> String,
op_name: &str,
list_sql: &str,
) {
let tables = match list_relations(client, schema, list_sql).await {
Ok(t) => t,
Err(e) => {
warn!(schema = %schema, error = %e, "failed to list tables for {op_name}");
return;
}
};
for table in &tables {
let sql = build_sql(schema, table, verbose);
if let Err(e) = client.batch_execute(&sql).await {
warn!(schema = %schema, table = %table, error = %e, "{op_name} failed (continuing)");
} else {
debug!(schema = %schema, table = %table, "{op_name} done");
}
}
}
async fn analyze_schema(client: &Client, schema: &str, verbose: bool) {
run_per_table(
client,
schema,
verbose,
build_analyze_table_sql,
"ANALYZE",
LIST_ANALYZABLE_SQL,
)
.await;
}
async fn vacuum_schema(client: &Client, schema: &str, verbose: bool) {
run_per_table(
client,
schema,
verbose,
build_vacuum_analyze_table_sql,
"VACUUM ANALYZE",
LIST_VACUUMABLE_SQL,
)
.await;
}
async fn list_relations(client: &Client, schema: &str, sql: &str) -> Result<Vec<String>> {
let rows = client.query(sql, &[&schema]).await?;
Ok(rows.iter().map(|r| r.get::<_, String>(0)).collect())
}
pub const LIST_ANALYZABLE_SQL: &str = "\
SELECT c.relname::text \
FROM pg_class c \
JOIN pg_namespace n ON n.oid = c.relnamespace \
WHERE c.relkind IN ('r', 'p', 'm') \
AND n.nspname = $1 \
AND NOT c.relispartition";
pub const LIST_VACUUMABLE_SQL: &str = "\
SELECT c.relname::text \
FROM pg_class c \
JOIN pg_namespace n ON n.oid = c.relnamespace \
WHERE c.relkind IN ('r', 'p') \
AND n.nspname = $1 \
AND NOT c.relispartition";
pub fn build_analyze_sql(verbose: bool) -> String {
let verbose_kw = if verbose { " VERBOSE" } else { "" };
format!("ANALYZE{verbose_kw};")
}
pub fn build_analyze_table_sql(schema: &str, table: &str, verbose: bool) -> String {
let verbose_kw = if verbose { " VERBOSE" } else { "" };
let schema_q = quote_ident_simple(schema);
let table_q = quote_ident_simple(table);
format!("ANALYZE{verbose_kw} {schema_q}.{table_q};")
}
pub fn build_vacuum_analyze_sql(verbose: bool) -> String {
let verbose_kw = if verbose {
" (VERBOSE, ANALYZE)"
} else {
" ANALYZE"
};
format!("VACUUM{verbose_kw};")
}
pub fn build_vacuum_analyze_table_sql(schema: &str, table: &str, verbose: bool) -> String {
let verbose_kw = if verbose {
" (VERBOSE, ANALYZE)"
} else {
" ANALYZE"
};
let schema_q = quote_ident_simple(schema);
let table_q = quote_ident_simple(table);
format!("VACUUM{verbose_kw} {schema_q}.{table_q};")
}
pub fn quote_ident_simple(ident: &str) -> String {
format!("\"{}\"", ident.replace('"', "\"\""))
}
pub async fn maybe_vacuum_source(config: &MigrationConfig) -> Result<()> {
if config.skip_source_vacuum {
return Ok(());
}
run_source_vacuum(
&config.source.connection_string,
&config.schemas,
config.verbose,
)
.await
}
pub async fn maybe_analyze_target(config: &MigrationConfig) -> Result<()> {
if config.skip_analyze {
return Ok(());
}
run_target_analyze(
&config.target.connection_string,
&config.schemas,
config.verbose,
)
.await
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn build_analyze_sql_not_verbose() {
assert_eq!(build_analyze_sql(false), "ANALYZE;");
}
#[test]
fn build_analyze_sql_verbose() {
assert_eq!(build_analyze_sql(true), "ANALYZE VERBOSE;");
}
#[test]
fn build_analyze_table_sql_basic() {
let sql = build_analyze_table_sql("public", "users", false);
assert_eq!(sql, "ANALYZE \"public\".\"users\";");
}
#[test]
fn build_analyze_table_sql_verbose() {
let sql = build_analyze_table_sql("public", "users", true);
assert_eq!(sql, "ANALYZE VERBOSE \"public\".\"users\";");
}
#[test]
fn build_analyze_table_sql_special_chars() {
let sql = build_analyze_table_sql("my\"schema", "my\"table", false);
assert_eq!(sql, "ANALYZE \"my\"\"schema\".\"my\"\"table\";");
}
#[test]
fn build_vacuum_analyze_sql_not_verbose() {
let sql = build_vacuum_analyze_sql(false);
assert_eq!(sql, "VACUUM ANALYZE;");
}
#[test]
fn build_vacuum_analyze_sql_verbose() {
let sql = build_vacuum_analyze_sql(true);
assert_eq!(sql, "VACUUM (VERBOSE, ANALYZE);");
}
#[test]
fn build_vacuum_analyze_table_sql_basic() {
let sql = build_vacuum_analyze_table_sql("public", "users", false);
assert_eq!(sql, "VACUUM ANALYZE \"public\".\"users\";");
}
#[test]
fn build_vacuum_analyze_table_sql_verbose() {
let sql = build_vacuum_analyze_table_sql("public", "users", true);
assert_eq!(sql, "VACUUM (VERBOSE, ANALYZE) \"public\".\"users\";");
}
#[test]
fn build_vacuum_analyze_table_sql_special_chars() {
let sql = build_vacuum_analyze_table_sql("my\"schema", "my\"table", false);
assert_eq!(sql, "VACUUM ANALYZE \"my\"\"schema\".\"my\"\"table\";");
}
#[test]
fn quote_ident_simple_basic() {
assert_eq!(quote_ident_simple("public"), "\"public\"");
}
#[test]
fn quote_ident_simple_with_double_quote() {
assert_eq!(quote_ident_simple("ab\"cd"), "\"ab\"\"cd\"");
}
#[test]
fn quote_ident_simple_empty() {
assert_eq!(quote_ident_simple(""), "\"\"");
}
#[test]
fn list_analyzable_sql_includes_partitioned_and_materialized() {
assert!(LIST_ANALYZABLE_SQL.contains("IN ('r', 'p', 'm')"));
assert!(LIST_ANALYZABLE_SQL.contains("$1"));
assert!(LIST_ANALYZABLE_SQL.contains("pg_namespace"));
assert!(LIST_ANALYZABLE_SQL.contains("NOT c.relispartition"));
}
#[test]
fn list_vacuumable_sql_excludes_materialized_views() {
assert!(LIST_VACUUMABLE_SQL.contains("IN ('r', 'p')"));
assert!(!LIST_VACUUMABLE_SQL.contains("'m'"));
assert!(LIST_VACUUMABLE_SQL.contains("$1"));
assert!(LIST_VACUUMABLE_SQL.contains("NOT c.relispartition"));
}
#[test]
fn maybe_vacuum_source_respects_skip_flag() {
let config = MigrationConfig {
skip_source_vacuum: true,
..MigrationConfig::default()
};
assert!(config.skip_source_vacuum);
}
#[test]
fn maybe_analyze_target_respects_skip_flag() {
let config = MigrationConfig {
skip_analyze: true,
..MigrationConfig::default()
};
assert!(config.skip_analyze);
}
#[test]
fn default_config_runs_both() {
let config = MigrationConfig::default();
assert!(!config.skip_analyze);
assert!(!config.skip_source_vacuum);
}
#[tokio::test]
async fn maybe_vacuum_source_skips_when_flag_set() {
let config = MigrationConfig {
skip_source_vacuum: true,
..MigrationConfig::default()
};
let result = maybe_vacuum_source(&config).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn maybe_analyze_target_skips_when_flag_set() {
let config = MigrationConfig {
skip_analyze: true,
..MigrationConfig::default()
};
let result = maybe_analyze_target(&config).await;
assert!(result.is_ok());
}
}