pg_blast_radius/
analysis.rs1use std::collections::HashMap;
2
3use crate::forecast;
4use crate::types::*;
5use crate::workload::WorkloadProfile;
6
7pub fn build_result(
8 file: &str,
9 findings: Vec<Finding>,
10 workload: Option<&WorkloadProfile>,
11) -> AnalysisResult {
12 let mut per_table: HashMap<String, TableBlastRadius> = HashMap::new();
13
14 for f in &findings {
15 let Some(ref table) = f.affected_table else {
16 continue;
17 };
18 let entry = per_table
19 .entry(table.clone())
20 .or_insert_with(|| TableBlastRadius {
21 table_name: table.clone(),
22 strongest_lock: LockMode::AccessShare,
23 blocks_reads: false,
24 blocks_writes: false,
25 statement_count: 0,
26 table_size: None,
27 duration_forecast: None,
28 blocked_queries: vec![],
29 total_blocked_qps: 0.0,
30 confidence: ConfidenceLedger::static_only(vec![]),
31 recommendation: None,
32 });
33
34 if f.lock_mode > entry.strongest_lock {
35 entry.strongest_lock = f.lock_mode;
36 }
37 entry.blocks_reads = entry.strongest_lock.blocks_reads();
38 entry.blocks_writes = entry.strongest_lock.blocks_writes();
39 entry.statement_count += 1;
40
41 if let Some(ref est) = f.duration_forecast {
42 entry.duration_forecast = Some(match entry.duration_forecast.take() {
43 Some(existing) => DurationForecast {
44 p50_seconds: existing.p50_seconds + est.p50_seconds,
45 p90_seconds: existing.p90_seconds + est.p90_seconds,
46 worst_seconds: existing.worst_seconds + est.worst_seconds,
47 assumptions: existing.assumptions,
48 },
49 None => est.clone(),
50 });
51 }
52 }
53
54 for entry in per_table.values_mut() {
55 if entry.statement_count > 1 && entry.strongest_lock.blocks_writes() {
56 entry.recommendation = Some(format!(
57 "{} statements touch \"{}\". Consider splitting into separate migrations.",
58 entry.statement_count, entry.table_name
59 ));
60 }
61
62 if let Some(workload_profile) = workload {
63 let families = workload_profile.families_for_table(&entry.table_name);
64 if !families.is_empty() {
65 if let Some(ref dur) = entry.duration_forecast {
66 entry.blocked_queries = forecast::forecast_blocked_queries(
67 entry.strongest_lock,
68 dur,
69 &families,
70 );
71 entry.total_blocked_qps = entry
72 .blocked_queries
73 .iter()
74 .map(|bq| bq.calls_per_sec)
75 .sum();
76 }
77
78 let table_qps = workload_profile.table_qps(&entry.table_name);
79 let family_count = families.len();
80
81 let mut doc_facts: Vec<String> = vec![
82 format!("lock mode is {} for this operation", entry.strongest_lock),
83 ];
84 let catalog_facts: Vec<String> = entry
85 .table_size
86 .as_ref()
87 .map(|s| vec![format!("table size is {} (~{} rows)", s.human_size, s.row_estimate)])
88 .unwrap_or_default();
89 let stats_facts = vec![
90 format!("{family_count} query families, {:.0} calls/min combined", table_qps * 60.0),
91 ];
92
93 if entry.duration_forecast.is_some() {
94 doc_facts.push("lock hold modeled from table size and IO throughput assumptions".into());
95 }
96
97 entry.confidence = ConfidenceLedger::with_workload(doc_facts, catalog_facts, stats_facts);
98 } else if entry.duration_forecast.is_some() {
99 entry.confidence = ConfidenceLedger::with_catalog(
100 vec![format!("lock mode is {} for this operation", entry.strongest_lock)],
101 entry
102 .table_size
103 .as_ref()
104 .map(|s| vec![format!("table size is {}", s.human_size)])
105 .unwrap_or_default(),
106 );
107 }
108 } else if entry.duration_forecast.is_some() {
109 entry.confidence = ConfidenceLedger::with_catalog(
110 vec![format!("lock mode is {} for this operation", entry.strongest_lock)],
111 entry
112 .table_size
113 .as_ref()
114 .map(|s| vec![format!("table size is {}", s.human_size)])
115 .unwrap_or_default(),
116 );
117 }
118 }
119
120 let mut tables: Vec<TableBlastRadius> = per_table.into_values().collect();
121 tables.sort_by(|a, b| b.strongest_lock.cmp(&a.strongest_lock));
122
123 let overall_risk = findings
124 .iter()
125 .map(|f| f.risk_level)
126 .max()
127 .unwrap_or(RiskLevel::Low);
128
129 let overall_confidence = findings
130 .iter()
131 .map(|f| f.confidence.grade)
132 .min()
133 .unwrap_or(ConfidenceGrade::Static);
134
135 let workload_meta = workload.map(|w| WorkloadMeta {
136 stats_reset: w.stats_reset.clone(),
137 collected_at: w.collected_at.clone(),
138 stats_window_seconds: w.stats_window_seconds,
139 unparseable_queries: w.unparseable_queries,
140 });
141
142 AnalysisResult {
143 file: file.into(),
144 findings,
145 blast_radius: BlastRadius { per_table: tables },
146 overall_risk,
147 overall_confidence,
148 workload_meta,
149 }
150}