Skip to main content

dsfb_database/adapters/
postgres.rs

1//! PostgreSQL `pg_stat_statements` adapter — real engine bridge.
2//!
3//! This is the only adapter in the crate that targets a *production engine*
4//! rather than a public benchmark dataset. It is intentionally minimal: it
5//! reads a CSV exported from `pg_stat_statements` snapshots and emits the
6//! two residual classes that view alone supports — **plan-regression** and
7//! **workload-phase**. It does **not** emit cardinality, contention, or
8//! cache-I/O residuals; those require additional views (`EXPLAIN ANALYZE`,
9//! `pg_stat_activity`, `pg_stat_io`) and are deferred to per-view adapters
10//! we have not yet shipped. The honest deployability matrix in §11 of the
11//! paper records these gaps.
12//!
13//! ## Expected CSV schema (PostgreSQL 14+)
14//!
15//! Export with:
16//!
17//! ```sql
18//! \copy (
19//!   SELECT
20//!     extract(epoch from now())::float8 AS snapshot_t,
21//!     md5(queryid::text)                AS query_id,
22//!     calls                             AS calls,
23//!     total_exec_time                   AS total_exec_time_ms
24//!   FROM pg_stat_statements
25//! ) TO '~/pgss_snapshot.csv' WITH CSV HEADER
26//! ```
27//!
28//! at a regular interval (e.g. every 60 seconds), appending each snapshot
29//! to the same file. The adapter expects exactly these columns; older
30//! PostgreSQL releases (≤ 13) named the column `total_time` rather than
31//! `total_exec_time` — pre-process those exports with `s/total_time/total_exec_time/`.
32//! `query_id` is hashed with `md5()` so the export contains no query text.
33//!
34//! ## What the adapter computes
35//!
36//! For each `query_id`, snapshots are sorted by `snapshot_t` and consecutive
37//! pairs produce one *mean-time-per-call* sample:
38//!
39//! ```text
40//! Δexec  = total_exec_time_ms[t] − total_exec_time_ms[t-1]
41//! Δcalls = calls[t] − calls[t-1]
42//! mean   = Δexec / max(Δcalls, 1)
43//! ```
44//!
45//! A per-`query_id` baseline is the mean of the first `BASELINE_WINDOW`
46//! intervals; once the baseline is set, subsequent intervals push a
47//! plan-regression residual via [`crate::residual::plan_regression::push_latency`].
48//!
49//! Workload-phase residuals are pushed once per snapshot timestamp: the
50//! Shannon entropy of the per-snapshot call-share distribution across
51//! `query_id`s, normalised to `[0, 1]` by dividing by `log(n_active_qids)`.
52//! A drop in entropy (the workload concentrates on fewer query classes) is
53//! the workload-phase signal documented in fact #16 of the concordance.
54
55use crate::residual::{plan_regression, workload_phase, ResidualStream};
56use anyhow::{Context, Result};
57use std::collections::HashMap;
58use std::path::Path;
59
60/// Number of intervals at the start of each query's history used to
61/// establish its latency baseline. Picked to match the per-motif
62/// `min_dwell_seconds=5.0` of `plan_regression_onset` on a 60-second
63/// snapshot cadence: 3 intervals × 60 s = 180 s, enough to absorb the
64/// initial warm-up without masking a real regression that begins later.
65const BASELINE_WINDOW: usize = 3;
66
67/// Upper bound on the number of distinct `query_id`s the adapter will hold.
68/// `pg_stat_statements.max` defaults to 5000; this bound is a ~200×
69/// safety headroom that still prevents unbounded-HashMap blow-up on a
70/// corrupted snapshot.
71const MAX_QIDS: usize = 1_000_000;
72
73/// Upper bound on the number of CSV rows the adapter reads. At a
74/// 60-second snapshot cadence and 5000 distinct `query_id`s this is ~2
75/// days of continuous collection — well beyond the ~hour-long analysis
76/// windows the crate is evaluated on.
77const MAX_PGSS_ROWS: usize = 100_000_000;
78
79#[derive(Debug, serde::Deserialize)]
80struct Row {
81    snapshot_t: f64,
82    query_id: String,
83    calls: u64,
84    total_exec_time_ms: f64,
85}
86
87/// Load a `pg_stat_statements` snapshot CSV and produce a residual stream
88/// containing plan-regression + workload-phase samples. Errors if the file
89/// is missing, the schema does not match, or fewer than two snapshots are
90/// present (the adapter cannot compute a delta from a single snapshot).
91pub fn load_pg_stat_statements(path: &Path) -> Result<ResidualStream> {
92    let rows = read_and_filter_rows(path)?;
93    debug_assert!(rows.len() >= 2, "post-condition: caller only sees ≥2 rows");
94
95    let by_qid = group_and_sort_by_qid(rows);
96    debug_assert!(
97        !by_qid.is_empty(),
98        "non-empty input must produce ≥1 qid group"
99    );
100
101    let snapshot_times = collect_unique_snapshot_times(&by_qid);
102    debug_assert!(
103        !snapshot_times.is_empty(),
104        "≥2 rows must contribute ≥1 timestamp"
105    );
106    let t0 = *snapshot_times.first().unwrap_or(&0.0);
107
108    let mut stream = ResidualStream::new(format!(
109        "postgres-pgss@{}",
110        path.file_name().and_then(|n| n.to_str()).unwrap_or("?")
111    ));
112
113    // `.take(MAX_QIDS)` is an explicit finite-source bound: `qids_sorted` is
114    // bounded above by `by_qid.len()`, which is ≤ number of distinct
115    // `query_id`s in the CSV, which `MAX_QIDS` additionally caps.
116    let mut qids_sorted: Vec<&String> = by_qid.keys().take(MAX_QIDS).collect();
117    qids_sorted.sort();
118    debug_assert!(qids_sorted.len() <= MAX_QIDS, "iterator bound enforced");
119    debug_assert_eq!(
120        qids_sorted.len(),
121        by_qid.len(),
122        "sorted view must cover all qids"
123    );
124
125    emit_plan_regression_residuals(&mut stream, &by_qid, &qids_sorted, t0);
126    emit_workload_phase_residuals(&mut stream, &by_qid, &qids_sorted, &snapshot_times, t0);
127
128    stream.sort();
129    Ok(stream)
130}
131
132/// Read every row of the CSV, dropping rows with non-finite floats, and
133/// enforce the ≥2-row precondition so downstream analysis can assume it.
134fn read_and_filter_rows(path: &Path) -> Result<Vec<Row>> {
135    let mut rdr = csv::Reader::from_path(path)
136        .with_context(|| format!("opening pg_stat_statements csv at {}", path.display()))?;
137    let mut rows: Vec<Row> = Vec::new();
138    for r in rdr.deserialize().take(MAX_PGSS_ROWS) {
139        debug_assert!(rows.len() < MAX_PGSS_ROWS, "row-count bound enforced");
140        let r: Row = r.context("parsing pg_stat_statements row")?;
141        if !r.snapshot_t.is_finite() || !r.total_exec_time_ms.is_finite() {
142            continue;
143        }
144        rows.push(r);
145    }
146    if rows.len() < 2 {
147        anyhow::bail!(
148            "pg_stat_statements csv at {} has fewer than 2 rows; need ≥2 snapshots to compute deltas",
149            path.display()
150        );
151    }
152    Ok(rows)
153}
154
155/// Group rows by `query_id` and sort each group by `snapshot_t`.
156fn group_and_sort_by_qid(rows: Vec<Row>) -> HashMap<String, Vec<Row>> {
157    let mut by_qid: HashMap<String, Vec<Row>> = HashMap::new();
158    for r in rows.into_iter() {
159        by_qid.entry(r.query_id.clone()).or_default().push(r);
160    }
161    for v in by_qid.values_mut() {
162        debug_assert!(
163            !v.is_empty(),
164            "inserted groups are non-empty by construction"
165        );
166        v.sort_by(|a, b| {
167            a.snapshot_t
168                .partial_cmp(&b.snapshot_t)
169                .unwrap_or(std::cmp::Ordering::Equal)
170        });
171    }
172    by_qid
173}
174
175/// Collect the unique (to 1e-9 tolerance) sorted snapshot timestamps
176/// present across every qid group.
177fn collect_unique_snapshot_times(by_qid: &HashMap<String, Vec<Row>>) -> Vec<f64> {
178    let mut snapshot_times: Vec<f64> = by_qid
179        .values()
180        .flat_map(|v| v.iter().map(|r| r.snapshot_t))
181        .collect();
182    snapshot_times.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
183    snapshot_times.dedup_by(|a, b| (*a - *b).abs() < 1e-9);
184    snapshot_times
185}
186
187/// Plan-regression residuals: per-query mean-exec-time-per-call deltas
188/// versus a per-query baseline established from the first
189/// `BASELINE_WINDOW` intervals. Iterates qids in sorted order so the
190/// resulting stream is bytewise identical across runs (HashMap
191/// iteration order is not).
192fn emit_plan_regression_residuals(
193    stream: &mut ResidualStream,
194    by_qid: &HashMap<String, Vec<Row>>,
195    qids_sorted: &[&String],
196    t0: f64,
197) {
198    for qid in qids_sorted.iter() {
199        let qid: &String = qid;
200        let snaps = &by_qid[qid];
201        if snaps.len() < 2 {
202            continue;
203        }
204        let means = per_query_mean_exec_time(snaps, t0);
205        if means.len() <= BASELINE_WINDOW {
206            continue;
207        }
208        debug_assert!(
209            means.len() > BASELINE_WINDOW,
210            "post-filter invariant guaranteed by the early-return above"
211        );
212        let baseline: f64 = means
213            .iter()
214            .take(BASELINE_WINDOW)
215            .map(|(_, m)| *m)
216            .sum::<f64>()
217            / BASELINE_WINDOW as f64;
218        debug_assert!(
219            baseline.is_finite(),
220            "baseline from filtered finite samples"
221        );
222        for (i, (t_rel, mean)) in means.iter().enumerate() {
223            if i < BASELINE_WINDOW {
224                continue;
225            }
226            plan_regression::push_latency(stream, *t_rel, qid, *mean, baseline);
227        }
228    }
229}
230
231/// Compute (`t_rel`, `mean_exec_time_per_call`) for every adjacent
232/// snapshot pair. Drops pairs with zero new calls or negative delta.
233fn per_query_mean_exec_time(snaps: &[Row], t0: f64) -> Vec<(f64, f64)> {
234    let mut means: Vec<(f64, f64)> = Vec::with_capacity(snaps.len().saturating_sub(1));
235    for w in snaps.windows(2) {
236        let dt = w[1].total_exec_time_ms - w[0].total_exec_time_ms;
237        let dc = w[1].calls.saturating_sub(w[0].calls);
238        if dc == 0 || dt < 0.0 {
239            continue;
240        }
241        let mean = dt / dc as f64;
242        debug_assert!(
243            mean.is_finite() && mean >= 0.0,
244            "dt≥0 ∧ dc>0 ⇒ finite non-negative mean"
245        );
246        let t_rel = w[1].snapshot_t - t0;
247        means.push((t_rel, mean));
248    }
249    means
250}
251
252/// Workload-phase residuals: per-snapshot share-distribution entropy
253/// across query_ids. Pushes `1 − entropy/entropy_max` so a
254/// *concentration* (entropy drops) maps to a positive residual the
255/// workload-phase motif treats as drift in the same direction as
256/// the TPC-DS JSD spikes.
257fn emit_workload_phase_residuals(
258    stream: &mut ResidualStream,
259    by_qid: &HashMap<String, Vec<Row>>,
260    qids_sorted: &[&String],
261    snapshot_times: &[f64],
262    t0: f64,
263) {
264    let mut prev_calls: HashMap<String, u64> = HashMap::new();
265    let mut max_entropy_seen: f64 = 0.0;
266    let mut snapshot_shares: Vec<(f64, f64)> = Vec::new();
267    for &t in snapshot_times.iter() {
268        let Some(entropy) = snapshot_entropy(by_qid, qids_sorted, t, &mut prev_calls) else {
269            continue;
270        };
271        debug_assert!(
272            entropy.is_finite() && entropy >= 0.0,
273            "entropy finite non-negative"
274        );
275        max_entropy_seen = max_entropy_seen.max(entropy);
276        snapshot_shares.push((t, entropy));
277    }
278    if max_entropy_seen <= 0.0 {
279        return;
280    }
281    debug_assert!(
282        max_entropy_seen.is_finite(),
283        "non-zero max entropy must be finite"
284    );
285    for (t_abs, entropy) in snapshot_shares.into_iter() {
286        let normalised = entropy / max_entropy_seen;
287        let r = 1.0 - normalised;
288        workload_phase::push_jsd(stream, t_abs - t0, "pgss_digest_mix", r);
289    }
290}
291
292/// Entropy of the per-qid call-share distribution at timestamp `t`.
293/// Returns `None` if no qid advanced a call at this snapshot (empty
294/// distribution).
295fn snapshot_entropy(
296    by_qid: &HashMap<String, Vec<Row>>,
297    qids_sorted: &[&String],
298    t: f64,
299    prev_calls: &mut HashMap<String, u64>,
300) -> Option<f64> {
301    let mut shares: Vec<f64> = Vec::new();
302    let mut total: u64 = 0;
303    for qid in qids_sorted.iter() {
304        let snaps = &by_qid[*qid];
305        if let Some(r) = snaps.iter().find(|r| (r.snapshot_t - t).abs() < 1e-9) {
306            let prev = prev_calls.get(*qid).copied().unwrap_or(0);
307            let delta = r.calls.saturating_sub(prev);
308            prev_calls.insert((*qid).clone(), r.calls);
309            if delta > 0 {
310                shares.push(delta as f64);
311                total += delta;
312            }
313        }
314    }
315    if total == 0 || shares.is_empty() {
316        return None;
317    }
318    for s in shares.iter_mut() {
319        *s /= total as f64;
320    }
321    let entropy: f64 = shares
322        .iter()
323        .filter(|s| **s > 0.0)
324        .map(|s| -s * s.ln())
325        .sum();
326    Some(entropy)
327}