Skip to main content

pg_blast_radius/
analysis.rs

1use std::collections::HashMap;
2
3use crate::forecast;
4use crate::types::*;
5use crate::workload::WorkloadProfile;
6
7pub fn build_result(
8    file: &str,
9    findings: Vec<Finding>,
10    workload: Option<&WorkloadProfile>,
11) -> AnalysisResult {
12    let mut per_table: HashMap<String, TableBlastRadius> = HashMap::new();
13
14    for f in &findings {
15        let Some(ref table) = f.affected_table else {
16            continue;
17        };
18        let entry = per_table
19            .entry(table.clone())
20            .or_insert_with(|| TableBlastRadius {
21                table_name: table.clone(),
22                strongest_lock: LockMode::AccessShare,
23                blocks_reads: false,
24                blocks_writes: false,
25                statement_count: 0,
26                table_size: None,
27                duration_forecast: None,
28                blocked_queries: vec![],
29                total_blocked_qps: 0.0,
30                confidence: ConfidenceLedger::static_only(vec![]),
31                recommendation: None,
32            });
33
34        if f.lock_mode > entry.strongest_lock {
35            entry.strongest_lock = f.lock_mode;
36        }
37        entry.blocks_reads = entry.strongest_lock.blocks_reads();
38        entry.blocks_writes = entry.strongest_lock.blocks_writes();
39        entry.statement_count += 1;
40
41        if let Some(ref est) = f.duration_forecast {
42            entry.duration_forecast = Some(match entry.duration_forecast.take() {
43                Some(existing) => DurationForecast {
44                    p50_seconds: existing.p50_seconds + est.p50_seconds,
45                    p90_seconds: existing.p90_seconds + est.p90_seconds,
46                    worst_seconds: existing.worst_seconds + est.worst_seconds,
47                    assumptions: existing.assumptions,
48                },
49                None => est.clone(),
50            });
51        }
52    }
53
54    for entry in per_table.values_mut() {
55        if entry.statement_count > 1 && entry.strongest_lock.blocks_writes() {
56            entry.recommendation = Some(format!(
57                "{} statements touch \"{}\". Consider splitting into separate migrations.",
58                entry.statement_count, entry.table_name
59            ));
60        }
61
62        if let Some(workload_profile) = workload {
63            let families = workload_profile.families_for_table(&entry.table_name);
64            if !families.is_empty() {
65                if let Some(ref dur) = entry.duration_forecast {
66                    entry.blocked_queries = forecast::forecast_blocked_queries(
67                        entry.strongest_lock,
68                        dur,
69                        &families,
70                    );
71                    entry.total_blocked_qps = entry
72                        .blocked_queries
73                        .iter()
74                        .map(|bq| bq.calls_per_sec)
75                        .sum();
76                }
77
78                let table_qps = workload_profile.table_qps(&entry.table_name);
79                let family_count = families.len();
80
81                let mut doc_facts: Vec<String> = vec![
82                    format!("lock mode is {} for this operation", entry.strongest_lock),
83                ];
84                let catalog_facts: Vec<String> = entry
85                    .table_size
86                    .as_ref()
87                    .map(|s| vec![format!("table size is {} (~{} rows)", s.human_size, s.row_estimate)])
88                    .unwrap_or_default();
89                let stats_facts = vec![
90                    format!("{family_count} query families, {:.0} calls/min combined", table_qps * 60.0),
91                ];
92
93                if entry.duration_forecast.is_some() {
94                    doc_facts.push("lock hold modeled from table size and IO throughput assumptions".into());
95                }
96
97                entry.confidence = ConfidenceLedger::with_workload(doc_facts, catalog_facts, stats_facts);
98            } else if entry.duration_forecast.is_some() {
99                entry.confidence = ConfidenceLedger::with_catalog(
100                    vec![format!("lock mode is {} for this operation", entry.strongest_lock)],
101                    entry
102                        .table_size
103                        .as_ref()
104                        .map(|s| vec![format!("table size is {}", s.human_size)])
105                        .unwrap_or_default(),
106                );
107            }
108        } else if entry.duration_forecast.is_some() {
109            entry.confidence = ConfidenceLedger::with_catalog(
110                vec![format!("lock mode is {} for this operation", entry.strongest_lock)],
111                entry
112                    .table_size
113                    .as_ref()
114                    .map(|s| vec![format!("table size is {}", s.human_size)])
115                    .unwrap_or_default(),
116            );
117        }
118    }
119
120    let mut tables: Vec<TableBlastRadius> = per_table.into_values().collect();
121    tables.sort_by(|a, b| b.strongest_lock.cmp(&a.strongest_lock));
122
123    let overall_risk = findings
124        .iter()
125        .map(|f| f.risk_level)
126        .max()
127        .unwrap_or(RiskLevel::Low);
128
129    let overall_confidence = findings
130        .iter()
131        .map(|f| f.confidence.grade)
132        .min()
133        .unwrap_or(ConfidenceGrade::Static);
134
135    let workload_meta = workload.map(|w| WorkloadMeta {
136        stats_reset: w.stats_reset.clone(),
137        collected_at: w.collected_at.clone(),
138        stats_window_seconds: w.stats_window_seconds,
139        unparseable_queries: w.unparseable_queries,
140    });
141
142    AnalysisResult {
143        file: file.into(),
144        findings,
145        blast_radius: BlastRadius { per_table: tables },
146        overall_risk,
147        overall_confidence,
148        workload_meta,
149    }
150}