dsfb_database/adapters/
postgres.rs1use crate::residual::{plan_regression, workload_phase, ResidualStream};
56use anyhow::{Context, Result};
57use std::collections::HashMap;
58use std::path::Path;
59
60const BASELINE_WINDOW: usize = 3;
66
67const MAX_QIDS: usize = 1_000_000;
72
73const MAX_PGSS_ROWS: usize = 100_000_000;
78
79#[derive(Debug, serde::Deserialize)]
80struct Row {
81 snapshot_t: f64,
82 query_id: String,
83 calls: u64,
84 total_exec_time_ms: f64,
85}
86
87pub fn load_pg_stat_statements(path: &Path) -> Result<ResidualStream> {
92 let rows = read_and_filter_rows(path)?;
93 debug_assert!(rows.len() >= 2, "post-condition: caller only sees ≥2 rows");
94
95 let by_qid = group_and_sort_by_qid(rows);
96 debug_assert!(
97 !by_qid.is_empty(),
98 "non-empty input must produce ≥1 qid group"
99 );
100
101 let snapshot_times = collect_unique_snapshot_times(&by_qid);
102 debug_assert!(
103 !snapshot_times.is_empty(),
104 "≥2 rows must contribute ≥1 timestamp"
105 );
106 let t0 = *snapshot_times.first().unwrap_or(&0.0);
107
108 let mut stream = ResidualStream::new(format!(
109 "postgres-pgss@{}",
110 path.file_name().and_then(|n| n.to_str()).unwrap_or("?")
111 ));
112
113 let mut qids_sorted: Vec<&String> = by_qid.keys().take(MAX_QIDS).collect();
117 qids_sorted.sort();
118 debug_assert!(qids_sorted.len() <= MAX_QIDS, "iterator bound enforced");
119 debug_assert_eq!(
120 qids_sorted.len(),
121 by_qid.len(),
122 "sorted view must cover all qids"
123 );
124
125 emit_plan_regression_residuals(&mut stream, &by_qid, &qids_sorted, t0);
126 emit_workload_phase_residuals(&mut stream, &by_qid, &qids_sorted, &snapshot_times, t0);
127
128 stream.sort();
129 Ok(stream)
130}
131
132fn read_and_filter_rows(path: &Path) -> Result<Vec<Row>> {
135 let mut rdr = csv::Reader::from_path(path)
136 .with_context(|| format!("opening pg_stat_statements csv at {}", path.display()))?;
137 let mut rows: Vec<Row> = Vec::new();
138 for r in rdr.deserialize().take(MAX_PGSS_ROWS) {
139 debug_assert!(rows.len() < MAX_PGSS_ROWS, "row-count bound enforced");
140 let r: Row = r.context("parsing pg_stat_statements row")?;
141 if !r.snapshot_t.is_finite() || !r.total_exec_time_ms.is_finite() {
142 continue;
143 }
144 rows.push(r);
145 }
146 if rows.len() < 2 {
147 anyhow::bail!(
148 "pg_stat_statements csv at {} has fewer than 2 rows; need ≥2 snapshots to compute deltas",
149 path.display()
150 );
151 }
152 Ok(rows)
153}
154
155fn group_and_sort_by_qid(rows: Vec<Row>) -> HashMap<String, Vec<Row>> {
157 let mut by_qid: HashMap<String, Vec<Row>> = HashMap::new();
158 for r in rows.into_iter() {
159 by_qid.entry(r.query_id.clone()).or_default().push(r);
160 }
161 for v in by_qid.values_mut() {
162 debug_assert!(
163 !v.is_empty(),
164 "inserted groups are non-empty by construction"
165 );
166 v.sort_by(|a, b| {
167 a.snapshot_t
168 .partial_cmp(&b.snapshot_t)
169 .unwrap_or(std::cmp::Ordering::Equal)
170 });
171 }
172 by_qid
173}
174
175fn collect_unique_snapshot_times(by_qid: &HashMap<String, Vec<Row>>) -> Vec<f64> {
178 let mut snapshot_times: Vec<f64> = by_qid
179 .values()
180 .flat_map(|v| v.iter().map(|r| r.snapshot_t))
181 .collect();
182 snapshot_times.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
183 snapshot_times.dedup_by(|a, b| (*a - *b).abs() < 1e-9);
184 snapshot_times
185}
186
187fn emit_plan_regression_residuals(
193 stream: &mut ResidualStream,
194 by_qid: &HashMap<String, Vec<Row>>,
195 qids_sorted: &[&String],
196 t0: f64,
197) {
198 for qid in qids_sorted.iter() {
199 let qid: &String = qid;
200 let snaps = &by_qid[qid];
201 if snaps.len() < 2 {
202 continue;
203 }
204 let means = per_query_mean_exec_time(snaps, t0);
205 if means.len() <= BASELINE_WINDOW {
206 continue;
207 }
208 debug_assert!(
209 means.len() > BASELINE_WINDOW,
210 "post-filter invariant guaranteed by the early-return above"
211 );
212 let baseline: f64 = means
213 .iter()
214 .take(BASELINE_WINDOW)
215 .map(|(_, m)| *m)
216 .sum::<f64>()
217 / BASELINE_WINDOW as f64;
218 debug_assert!(
219 baseline.is_finite(),
220 "baseline from filtered finite samples"
221 );
222 for (i, (t_rel, mean)) in means.iter().enumerate() {
223 if i < BASELINE_WINDOW {
224 continue;
225 }
226 plan_regression::push_latency(stream, *t_rel, qid, *mean, baseline);
227 }
228 }
229}
230
231fn per_query_mean_exec_time(snaps: &[Row], t0: f64) -> Vec<(f64, f64)> {
234 let mut means: Vec<(f64, f64)> = Vec::with_capacity(snaps.len().saturating_sub(1));
235 for w in snaps.windows(2) {
236 let dt = w[1].total_exec_time_ms - w[0].total_exec_time_ms;
237 let dc = w[1].calls.saturating_sub(w[0].calls);
238 if dc == 0 || dt < 0.0 {
239 continue;
240 }
241 let mean = dt / dc as f64;
242 debug_assert!(
243 mean.is_finite() && mean >= 0.0,
244 "dt≥0 ∧ dc>0 ⇒ finite non-negative mean"
245 );
246 let t_rel = w[1].snapshot_t - t0;
247 means.push((t_rel, mean));
248 }
249 means
250}
251
252fn emit_workload_phase_residuals(
258 stream: &mut ResidualStream,
259 by_qid: &HashMap<String, Vec<Row>>,
260 qids_sorted: &[&String],
261 snapshot_times: &[f64],
262 t0: f64,
263) {
264 let mut prev_calls: HashMap<String, u64> = HashMap::new();
265 let mut max_entropy_seen: f64 = 0.0;
266 let mut snapshot_shares: Vec<(f64, f64)> = Vec::new();
267 for &t in snapshot_times.iter() {
268 let Some(entropy) = snapshot_entropy(by_qid, qids_sorted, t, &mut prev_calls) else {
269 continue;
270 };
271 debug_assert!(
272 entropy.is_finite() && entropy >= 0.0,
273 "entropy finite non-negative"
274 );
275 max_entropy_seen = max_entropy_seen.max(entropy);
276 snapshot_shares.push((t, entropy));
277 }
278 if max_entropy_seen <= 0.0 {
279 return;
280 }
281 debug_assert!(
282 max_entropy_seen.is_finite(),
283 "non-zero max entropy must be finite"
284 );
285 for (t_abs, entropy) in snapshot_shares.into_iter() {
286 let normalised = entropy / max_entropy_seen;
287 let r = 1.0 - normalised;
288 workload_phase::push_jsd(stream, t_abs - t0, "pgss_digest_mix", r);
289 }
290}
291
292fn snapshot_entropy(
296 by_qid: &HashMap<String, Vec<Row>>,
297 qids_sorted: &[&String],
298 t: f64,
299 prev_calls: &mut HashMap<String, u64>,
300) -> Option<f64> {
301 let mut shares: Vec<f64> = Vec::new();
302 let mut total: u64 = 0;
303 for qid in qids_sorted.iter() {
304 let snaps = &by_qid[*qid];
305 if let Some(r) = snaps.iter().find(|r| (r.snapshot_t - t).abs() < 1e-9) {
306 let prev = prev_calls.get(*qid).copied().unwrap_or(0);
307 let delta = r.calls.saturating_sub(prev);
308 prev_calls.insert((*qid).clone(), r.calls);
309 if delta > 0 {
310 shares.push(delta as f64);
311 total += delta;
312 }
313 }
314 }
315 if total == 0 || shares.is_empty() {
316 return None;
317 }
318 for s in shares.iter_mut() {
319 *s /= total as f64;
320 }
321 let entropy: f64 = shares
322 .iter()
323 .filter(|s| **s > 0.0)
324 .map(|s| -s * s.ln())
325 .sum();
326 Some(entropy)
327}