pg-blast-radius 0.3.0

Workload-aware blast radius forecaster for PostgreSQL migrations
Documentation
use crate::locks;
use crate::types::*;
use crate::workload::{QueryFamily, TransactionBaseline};

pub fn forecast_duration(
    total_bytes: i64,
    throughput_low_mbps: f64,
    throughput_high_mbps: f64,
    baseline: Option<&TransactionBaseline>,
) -> DurationForecast {
    let bytes = total_bytes as f64;
    let mb = 1024.0 * 1024.0;

    let p50 = (bytes / (throughput_high_mbps * mb)).max(0.1);
    let p90 = (bytes / (throughput_low_mbps * mb)).max(p50);

    let worst = match baseline {
        Some(b) => p90 + (b.max_age_ms / 1000.0),
        None => p90 * 3.0,
    };

    let mut assumptions = vec![
        ForecastAssumption {
            factor: "table size".into(),
            assumed: human_size(total_bytes).to_string(),
            source: AssumptionSource::Catalog,
        },
        ForecastAssumption {
            factor: "cache state".into(),
            assumed: "warm shared_buffers at p50, cold at p90".into(),
            source: AssumptionSource::Assumed,
        },
    ];

    match baseline {
        Some(b) => {
            assumptions.push(ForecastAssumption {
                factor: "lock acquisition delay".into(),
                assumed: format!(
                    "worst case adds {:.1}s from longest observed transaction",
                    b.max_age_ms / 1000.0
                ),
                source: AssumptionSource::Workload,
            });
        }
        None => {
            assumptions.push(ForecastAssumption {
                factor: "lock acquisition delay".into(),
                assumed: "no workload data, worst = 3x p90 heuristic".into(),
                source: AssumptionSource::Assumed,
            });
        }
    }

    DurationForecast {
        p50_seconds: p50,
        p90_seconds: p90,
        worst_seconds: worst,
        assumptions,
    }
}

pub fn forecast_scan(bytes: i64, baseline: Option<&TransactionBaseline>) -> DurationForecast {
    forecast_duration(bytes, 100.0, 200.0, baseline)
}

pub fn forecast_rewrite(bytes: i64, baseline: Option<&TransactionBaseline>) -> DurationForecast {
    forecast_duration(bytes, 50.0, 100.0, baseline)
}

pub fn forecast_index_build(bytes: i64, baseline: Option<&TransactionBaseline>) -> DurationForecast {
    forecast_duration(bytes, 30.0, 80.0, baseline)
}

pub fn forecast_blocked_queries(
    lock_held: LockMode,
    duration: &DurationForecast,
    families: &[&QueryFamily],
) -> Vec<BlockedQueryForecast> {
    let mut blocked: Vec<BlockedQueryForecast> = families
        .iter()
        .filter(|qf| locks::conflicts(qf.lock_mode, lock_held))
        .map(|qf| {
            let queued_p50 = (qf.calls_per_sec * duration.p50_seconds).ceil() as u64;
            let queued_p90 = (qf.calls_per_sec * duration.p90_seconds).ceil() as u64;
            BlockedQueryForecast {
                query_label: qf.label.clone(),
                normalised_sql: qf.normalised_sql.clone(),
                calls_per_sec: qf.calls_per_sec,
                queued_at_p50: queued_p50,
                queued_at_p90: queued_p90,
            }
        })
        .collect();

    blocked.sort_by(|a, b| b.calls_per_sec.partial_cmp(&a.calls_per_sec).unwrap_or(std::cmp::Ordering::Equal));
    blocked
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::locks::DmlKind;

    fn test_baseline() -> TransactionBaseline {
        TransactionBaseline {
            active_sessions: 10,
            idle_in_transaction: 2,
            median_age_ms: 50.0,
            p95_age_ms: 200.0,
            max_age_ms: 5000.0,
        }
    }

    fn test_family(table: &str, kind: DmlKind, qps: f64) -> QueryFamily {
        QueryFamily {
            queryid: 1,
            normalised_sql: format!("{kind} ... FROM {table}"),
            label: format!("{kind} on {table}"),
            tables: vec![table.into()],
            dml_kind: kind,
            lock_mode: kind.lock_mode(),
            calls_per_sec: qps,
            mean_exec_ms: 5.0,
            p95_exec_ms: Some(15.0),
        }
    }

    #[test]
    fn scan_duration_with_baseline() {
        let baseline = test_baseline();
        let forecast = forecast_scan(10_737_418_240, Some(&baseline));

        assert!(forecast.p50_seconds > 0.0);
        assert!(forecast.p90_seconds >= forecast.p50_seconds);
        assert!(forecast.worst_seconds >= forecast.p90_seconds);
        assert!((forecast.worst_seconds - forecast.p90_seconds - 5.0).abs() < 0.1);
    }

    #[test]
    fn scan_duration_without_baseline_uses_heuristic() {
        let forecast = forecast_scan(10_737_418_240, None);
        let expected_worst = forecast.p90_seconds * 3.0;
        assert!((forecast.worst_seconds - expected_worst).abs() < 0.01);
    }

    #[test]
    fn access_exclusive_blocks_all_dml() {
        let baseline = test_baseline();
        let duration = forecast_scan(1_073_741_824, Some(&baseline));

        let select = test_family("orders", DmlKind::Select, 100.0);
        let insert = test_family("orders", DmlKind::Insert, 50.0);

        let families: Vec<&QueryFamily> = vec![&select, &insert];
        let blocked = forecast_blocked_queries(LockMode::AccessExclusive, &duration, &families);

        assert_eq!(blocked.len(), 2);
        assert!(blocked[0].calls_per_sec >= blocked[1].calls_per_sec);
    }

    #[test]
    fn share_lock_blocks_writes_not_reads() {
        let baseline = test_baseline();
        let duration = forecast_scan(1_073_741_824, Some(&baseline));

        let select = test_family("orders", DmlKind::Select, 100.0);
        let insert = test_family("orders", DmlKind::Insert, 50.0);

        let families: Vec<&QueryFamily> = vec![&select, &insert];
        let blocked = forecast_blocked_queries(LockMode::Share, &duration, &families);

        assert_eq!(blocked.len(), 1);
        assert!(blocked[0].query_label.contains("INSERT"));
    }

    #[test]
    fn share_update_exclusive_does_not_block_dml() {
        let baseline = test_baseline();
        let duration = forecast_scan(1_073_741_824, Some(&baseline));

        let select = test_family("orders", DmlKind::Select, 100.0);
        let insert = test_family("orders", DmlKind::Insert, 50.0);

        let families: Vec<&QueryFamily> = vec![&select, &insert];
        let blocked = forecast_blocked_queries(LockMode::ShareUpdateExclusive, &duration, &families);

        assert_eq!(blocked.len(), 0);
    }

    #[test]
    fn queue_depth_calculation() {
        let duration = DurationForecast {
            p50_seconds: 2.0,
            p90_seconds: 8.0,
            worst_seconds: 12.0,
            assumptions: vec![],
        };

        let family = test_family("orders", DmlKind::Select, 135.0);
        let families: Vec<&QueryFamily> = vec![&family];
        let blocked = forecast_blocked_queries(LockMode::AccessExclusive, &duration, &families);

        assert_eq!(blocked.len(), 1);
        assert_eq!(blocked[0].queued_at_p50, 270);
        assert_eq!(blocked[0].queued_at_p90, 1080);
    }
}