pg-blast-radius 0.3.0

Workload-aware blast radius forecaster for PostgreSQL migrations
Documentation
use std::collections::HashMap;

use pg_blast_radius::analysis;
use pg_blast_radius::catalog::CatalogInfo;
use pg_blast_radius::locks::DmlKind;
use pg_blast_radius::rules::{PgVersion, RuleContext, analyse};
use pg_blast_radius::types::*;
use pg_blast_radius::workload::{QueryFamily, TransactionBaseline, WorkloadProfile};

fn mock_catalog(tables: &[(&str, i64, i64)]) -> CatalogInfo {
    let mut map = HashMap::new();
    for &(name, bytes, rows) in tables {
        map.insert(
            name.to_string(),
            TableSize {
                total_bytes: bytes,
                row_estimate: rows,
                human_size: human_size(bytes),
            },
        );
    }
    CatalogInfo {
        tables: map,
        workload: None,
    }
}

fn mock_workload(families: Vec<QueryFamily>) -> WorkloadProfile {
    WorkloadProfile {
        query_families: families,
        transaction_baseline: TransactionBaseline {
            active_sessions: 10,
            idle_in_transaction: 2,
            median_age_ms: 50.0,
            p95_age_ms: 200.0,
            max_age_ms: 5000.0,
        },
        collected_at: "2026-03-29T10:00:00Z".into(),
        stats_reset: Some("2026-03-01T00:00:00Z".into()),
        stats_window_seconds: Some(28.0 * 86400.0),
        unparseable_queries: 0,
    }
}

fn analyse_fixture(sql: &str, catalog: Option<&CatalogInfo>) -> AnalysisResult {
    let ctx = RuleContext {
        pg_version: PgVersion { major: 16 },
        catalog,
        transaction_baseline: catalog
            .and_then(|c| c.workload.as_ref())
            .map(|w| &w.transaction_baseline),
    };
    let findings = analyse(sql, &ctx).unwrap();
    let workload = catalog.and_then(|c| c.workload.as_ref());
    analysis::build_result("test.sql", findings, workload)
}

fn fixture(name: &str) -> String {
    std::fs::read_to_string(format!("testdata/fixtures/{name}")).unwrap()
}

#[test]
fn add_column_simple_is_low_risk() {
    let result = analyse_fixture(&fixture("add_column_simple.sql"), None);
    assert_eq!(result.overall_risk, RiskLevel::Low);
    assert_eq!(result.findings.len(), 1);
    assert_eq!(result.findings[0].rule_id, "add-column");
    assert_eq!(result.findings[0].lock_mode, LockMode::AccessExclusive);
    assert!(matches!(result.findings[0].rewrite, RewriteRisk::None));
}

#[test]
fn add_column_with_default_is_low_on_pg16() {
    let result = analyse_fixture(&fixture("add_column_with_default.sql"), None);
    assert_eq!(result.overall_risk, RiskLevel::Low);
    assert_eq!(result.findings[0].rule_id, "add-column-default");
}

#[test]
fn add_column_volatile_default_is_extreme() {
    let result = analyse_fixture(&fixture("add_column_volatile_default.sql"), None);
    assert_eq!(result.overall_risk, RiskLevel::Extreme);
    assert_eq!(result.findings[0].rule_id, "add-column-volatile-default");
    assert!(matches!(result.findings[0].rewrite, RewriteRisk::Required));
}

#[test]
fn add_column_not_null_default_is_low_on_pg16() {
    let result = analyse_fixture(&fixture("add_column_not_null_default.sql"), None);
    assert_eq!(result.overall_risk, RiskLevel::Low);
}

#[test]
fn drop_column_is_medium() {
    let result = analyse_fixture(&fixture("drop_column.sql"), None);
    assert_eq!(result.overall_risk, RiskLevel::Medium);
    assert_eq!(result.findings[0].rule_id, "drop-column");
    assert!(result.findings[0].recipe.is_some());
}

#[test]
fn alter_type_safe_is_low() {
    let result = analyse_fixture(&fixture("alter_type_safe.sql"), None);
    assert_eq!(result.overall_risk, RiskLevel::Low);
    assert_eq!(result.findings[0].rule_id, "change-column-type");
    assert!(matches!(result.findings[0].rewrite, RewriteRisk::None));
}

#[test]
fn alter_type_rewrite_is_high_without_catalog() {
    let result = analyse_fixture(&fixture("alter_type_rewrite.sql"), None);
    assert_eq!(result.overall_risk, RiskLevel::High);
    assert!(matches!(result.findings[0].rewrite, RewriteRisk::Required));
    assert!(result.findings[0].recipe.is_some());
}

#[test]
fn alter_type_rewrite_is_extreme_on_huge_table() {
    let catalog = mock_catalog(&[("orders", 50_000_000_000, 800_000_000)]);
    let result = analyse_fixture(&fixture("alter_type_rewrite.sql"), Some(&catalog));
    assert_eq!(result.overall_risk, RiskLevel::Extreme);
    assert!(result.findings[0].duration_forecast.is_some());
}

#[test]
fn set_not_null_is_high() {
    let result = analyse_fixture(&fixture("set_not_null.sql"), None);
    assert_eq!(result.overall_risk, RiskLevel::High);
    assert_eq!(result.findings[0].rule_id, "set-not-null");
    assert!(result.findings[0].recipe.is_some());
}

#[test]
fn set_not_null_safe_pattern_produces_multiple_findings() {
    let result = analyse_fixture(&fixture("set_not_null_safe_pattern.sql"), None);
    assert!(result.findings.len() >= 3);
}

#[test]
fn add_check_constraint_is_high() {
    let result = analyse_fixture(&fixture("add_check_constraint.sql"), None);
    assert_eq!(result.overall_risk, RiskLevel::High);
    assert!(result.findings[0].recipe.is_some());
}

#[test]
fn add_check_not_valid_is_low() {
    let result = analyse_fixture(&fixture("add_check_not_valid.sql"), None);
    assert_eq!(result.overall_risk, RiskLevel::Low);
    assert_eq!(result.findings[0].rule_id, "add-check-not-valid");
}

#[test]
fn validate_constraint_is_low() {
    let result = analyse_fixture(&fixture("validate_constraint.sql"), None);
    assert_eq!(result.overall_risk, RiskLevel::Low);
    assert_eq!(result.findings[0].lock_mode, LockMode::ShareUpdateExclusive);
}

#[test]
fn create_index_is_high() {
    let result = analyse_fixture(&fixture("create_index.sql"), None);
    assert_eq!(result.overall_risk, RiskLevel::High);
    assert_eq!(result.findings[0].lock_mode, LockMode::Share);
    assert!(result.findings[0].recipe.is_some());
}

#[test]
fn create_index_concurrently_is_low() {
    let result = analyse_fixture(&fixture("create_index_concurrently.sql"), None);
    assert_eq!(result.overall_risk, RiskLevel::Low);
    assert_eq!(result.findings[0].lock_mode, LockMode::ShareUpdateExclusive);
}

#[test]
fn drop_index_is_high() {
    let result = analyse_fixture(&fixture("drop_index.sql"), None);
    assert_eq!(result.overall_risk, RiskLevel::High);
    assert_eq!(result.findings[0].lock_mode, LockMode::AccessExclusive);
    assert!(result.findings[0].recipe.is_some());
}

#[test]
fn drop_index_concurrently_is_low() {
    let result = analyse_fixture(&fixture("drop_index_concurrently.sql"), None);
    assert_eq!(result.overall_risk, RiskLevel::Low);
}

#[test]
fn add_foreign_key_is_high() {
    let result = analyse_fixture(&fixture("add_foreign_key.sql"), None);
    assert_eq!(result.overall_risk, RiskLevel::High);
    assert!(result.findings[0].recipe.is_some());
}

#[test]
fn add_foreign_key_not_valid_is_medium() {
    let result = analyse_fixture(&fixture("add_foreign_key_not_valid.sql"), None);
    let fk_finding = result
        .findings
        .iter()
        .find(|f| f.rule_id.starts_with("add-foreign-key"))
        .unwrap();
    assert_eq!(fk_finding.risk_level, RiskLevel::Medium);
}

#[test]
fn multi_statement_aggregates_blast_radius() {
    let result = analyse_fixture(&fixture("multi_statement_dangerous.sql"), None);
    assert_eq!(result.overall_risk, RiskLevel::High);
    assert!(result.findings.len() >= 3);

    let users_blast = result
        .blast_radius
        .per_table
        .iter()
        .find(|t| t.table_name == "users")
        .unwrap();
    assert_eq!(users_blast.statement_count, 3);
    assert_eq!(users_blast.strongest_lock, LockMode::AccessExclusive);
    assert!(users_blast.recommendation.is_some());
}

#[test]
fn create_index_on_tiny_table_is_downgraded() {
    let catalog = mock_catalog(&[("sessions", 16_384, 50)]);
    let sql = "CREATE INDEX idx_sessions_user ON sessions (user_id);";
    let result = analyse_fixture(sql, Some(&catalog));
    assert_eq!(result.overall_risk, RiskLevel::Medium);
}

#[test]
fn create_index_on_huge_table_is_extreme() {
    let catalog = mock_catalog(&[("orders", 50_000_000_000, 800_000_000)]);
    let sql = "CREATE INDEX idx_orders_cust ON orders (customer_id);";
    let result = analyse_fixture(sql, Some(&catalog));
    assert_eq!(result.overall_risk, RiskLevel::Extreme);
}

#[test]
fn json_output_is_valid() {
    let result = analyse_fixture(&fixture("create_index.sql"), None);
    let json = serde_json::to_string(&[&result]).unwrap();
    let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
    assert!(parsed.is_array());
}

#[test]
fn pg10_add_column_default_is_extreme() {
    let ctx = RuleContext {
        pg_version: PgVersion { major: 10 },
        catalog: None,
        transaction_baseline: None,
    };
    let sql = "ALTER TABLE users ADD COLUMN status TEXT DEFAULT 'active';";
    let findings = analyse(sql, &ctx).unwrap();
    let result = analysis::build_result("test.sql", findings, None);
    assert_eq!(result.overall_risk, RiskLevel::Extreme);
}

#[test]
fn confidence_is_static_without_catalog() {
    let result = analyse_fixture(&fixture("create_index.sql"), None);
    assert_eq!(result.overall_confidence, ConfidenceGrade::Static);
    assert_eq!(result.findings[0].confidence.grade, ConfidenceGrade::Static);
}

#[test]
fn confidence_is_estimated_with_catalog() {
    let catalog = mock_catalog(&[("orders", 50_000_000_000, 800_000_000)]);
    let sql = "CREATE INDEX idx_orders_cust ON orders (customer_id);";
    let result = analyse_fixture(sql, Some(&catalog));
    assert_eq!(result.findings[0].confidence.grade, ConfidenceGrade::Estimated);
    assert!(!result.findings[0].confidence.from_catalog.is_empty());
}

#[test]
fn duration_forecast_has_p50_p90_worst() {
    let catalog = mock_catalog(&[("orders", 10_737_418_240, 100_000_000)]);
    let sql = "CREATE INDEX idx_orders_cust ON orders (customer_id);";
    let result = analyse_fixture(sql, Some(&catalog));
    let forecast = result.findings[0].duration_forecast.as_ref().unwrap();
    assert!(forecast.p50_seconds > 0.0);
    assert!(forecast.p90_seconds >= forecast.p50_seconds);
    assert!(forecast.worst_seconds >= forecast.p90_seconds);
}

#[test]
fn workload_aware_blocked_query_forecast() {
    let mut catalog = mock_catalog(&[("orders", 10_737_418_240, 100_000_000)]);
    catalog.workload = Some(mock_workload(vec![
        QueryFamily {
            queryid: 1,
            normalised_sql: "SELECT * FROM orders WHERE customer_id = $1".into(),
            label: "SELECT ... FROM orders WHERE customer_id = $1".into(),
            tables: vec!["orders".into()],
            dml_kind: DmlKind::Select,
            lock_mode: LockMode::AccessShare,
            calls_per_sec: 135.0,
            mean_exec_ms: 5.0,
            p95_exec_ms: Some(15.0),
        },
        QueryFamily {
            queryid: 2,
            normalised_sql: "INSERT INTO orders (customer_id, total) VALUES ($1, $2)".into(),
            label: "INSERT INTO orders (...)".into(),
            tables: vec!["orders".into()],
            dml_kind: DmlKind::Insert,
            lock_mode: LockMode::RowExclusive,
            calls_per_sec: 80.0,
            mean_exec_ms: 3.0,
            p95_exec_ms: Some(10.0),
        },
    ]));

    let sql = "CREATE INDEX idx_orders_status ON orders (status);";
    let result = analyse_fixture(sql, Some(&catalog));

    let orders_blast = result
        .blast_radius
        .per_table
        .iter()
        .find(|t| t.table_name == "orders")
        .unwrap();

    assert!(!orders_blast.blocked_queries.is_empty());
    assert!(orders_blast.total_blocked_qps > 0.0);
    assert_eq!(orders_blast.confidence.grade, ConfidenceGrade::Measured);
}

#[test]
fn share_lock_only_blocks_writes_in_forecast() {
    let mut catalog = mock_catalog(&[("orders", 10_737_418_240, 100_000_000)]);
    catalog.workload = Some(mock_workload(vec![
        QueryFamily {
            queryid: 1,
            normalised_sql: "SELECT * FROM orders WHERE id = $1".into(),
            label: "SELECT ... FROM orders".into(),
            tables: vec!["orders".into()],
            dml_kind: DmlKind::Select,
            lock_mode: LockMode::AccessShare,
            calls_per_sec: 100.0,
            mean_exec_ms: 2.0,
            p95_exec_ms: None,
        },
        QueryFamily {
            queryid: 2,
            normalised_sql: "INSERT INTO orders (...) VALUES (...)".into(),
            label: "INSERT INTO orders".into(),
            tables: vec!["orders".into()],
            dml_kind: DmlKind::Insert,
            lock_mode: LockMode::RowExclusive,
            calls_per_sec: 50.0,
            mean_exec_ms: 3.0,
            p95_exec_ms: None,
        },
    ]));

    let sql = "CREATE INDEX idx_orders_status ON orders (status);";
    let result = analyse_fixture(sql, Some(&catalog));

    let orders_blast = result
        .blast_radius
        .per_table
        .iter()
        .find(|t| t.table_name == "orders")
        .unwrap();

    assert_eq!(orders_blast.strongest_lock, LockMode::Share);
    assert_eq!(orders_blast.blocked_queries.len(), 1);
    assert!(orders_blast.blocked_queries[0].query_label.contains("INSERT"));
}

#[test]
fn insta_multi_statement() {
    let result = analyse_fixture(&fixture("multi_statement_dangerous.sql"), None);
    insta::assert_json_snapshot!(result);
}

#[test]
fn insta_create_index_large_table() {
    let catalog = mock_catalog(&[("orders", 50_000_000_000, 800_000_000)]);
    let sql = "CREATE INDEX idx_orders_customer ON orders (customer_id);";
    let result = analyse_fixture(sql, Some(&catalog));
    insta::assert_json_snapshot!(result);
}

#[test]
fn insta_set_not_null() {
    let result = analyse_fixture(&fixture("set_not_null.sql"), None);
    insta::assert_json_snapshot!(result);
}