use crate::residual::{plan_regression, workload_phase, ResidualStream};
use anyhow::{Context, Result};
use std::collections::HashMap;
use std::path::Path;
const BASELINE_WINDOW: usize = 3;
const MAX_QIDS: usize = 1_000_000;
const MAX_PGSS_ROWS: usize = 100_000_000;
#[derive(Debug, serde::Deserialize)]
struct Row {
snapshot_t: f64,
query_id: String,
calls: u64,
total_exec_time_ms: f64,
}
pub fn load_pg_stat_statements(path: &Path) -> Result<ResidualStream> {
let rows = read_and_filter_rows(path)?;
debug_assert!(rows.len() >= 2, "post-condition: caller only sees ≥2 rows");
let by_qid = group_and_sort_by_qid(rows);
debug_assert!(
!by_qid.is_empty(),
"non-empty input must produce ≥1 qid group"
);
let snapshot_times = collect_unique_snapshot_times(&by_qid);
debug_assert!(
!snapshot_times.is_empty(),
"≥2 rows must contribute ≥1 timestamp"
);
let t0 = *snapshot_times.first().unwrap_or(&0.0);
let mut stream = ResidualStream::new(format!(
"postgres-pgss@{}",
path.file_name().and_then(|n| n.to_str()).unwrap_or("?")
));
let mut qids_sorted: Vec<&String> = by_qid.keys().take(MAX_QIDS).collect();
qids_sorted.sort();
debug_assert!(qids_sorted.len() <= MAX_QIDS, "iterator bound enforced");
debug_assert_eq!(
qids_sorted.len(),
by_qid.len(),
"sorted view must cover all qids"
);
emit_plan_regression_residuals(&mut stream, &by_qid, &qids_sorted, t0);
emit_workload_phase_residuals(&mut stream, &by_qid, &qids_sorted, &snapshot_times, t0);
stream.sort();
Ok(stream)
}
fn read_and_filter_rows(path: &Path) -> Result<Vec<Row>> {
let mut rdr = csv::Reader::from_path(path)
.with_context(|| format!("opening pg_stat_statements csv at {}", path.display()))?;
let mut rows: Vec<Row> = Vec::new();
for r in rdr.deserialize().take(MAX_PGSS_ROWS) {
debug_assert!(rows.len() < MAX_PGSS_ROWS, "row-count bound enforced");
let r: Row = r.context("parsing pg_stat_statements row")?;
if !r.snapshot_t.is_finite() || !r.total_exec_time_ms.is_finite() {
continue;
}
rows.push(r);
}
if rows.len() < 2 {
anyhow::bail!(
"pg_stat_statements csv at {} has fewer than 2 rows; need ≥2 snapshots to compute deltas",
path.display()
);
}
Ok(rows)
}
fn group_and_sort_by_qid(rows: Vec<Row>) -> HashMap<String, Vec<Row>> {
let mut by_qid: HashMap<String, Vec<Row>> = HashMap::new();
for r in rows.into_iter() {
by_qid.entry(r.query_id.clone()).or_default().push(r);
}
for v in by_qid.values_mut() {
debug_assert!(
!v.is_empty(),
"inserted groups are non-empty by construction"
);
v.sort_by(|a, b| {
a.snapshot_t
.partial_cmp(&b.snapshot_t)
.unwrap_or(std::cmp::Ordering::Equal)
});
}
by_qid
}
fn collect_unique_snapshot_times(by_qid: &HashMap<String, Vec<Row>>) -> Vec<f64> {
let mut snapshot_times: Vec<f64> = by_qid
.values()
.flat_map(|v| v.iter().map(|r| r.snapshot_t))
.collect();
snapshot_times.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
snapshot_times.dedup_by(|a, b| (*a - *b).abs() < 1e-9);
snapshot_times
}
fn emit_plan_regression_residuals(
stream: &mut ResidualStream,
by_qid: &HashMap<String, Vec<Row>>,
qids_sorted: &[&String],
t0: f64,
) {
for qid in qids_sorted.iter() {
let qid: &String = qid;
let snaps = &by_qid[qid];
if snaps.len() < 2 {
continue;
}
let means = per_query_mean_exec_time(snaps, t0);
if means.len() <= BASELINE_WINDOW {
continue;
}
debug_assert!(
means.len() > BASELINE_WINDOW,
"post-filter invariant guaranteed by the early-return above"
);
let baseline: f64 = means
.iter()
.take(BASELINE_WINDOW)
.map(|(_, m)| *m)
.sum::<f64>()
/ BASELINE_WINDOW as f64;
debug_assert!(
baseline.is_finite(),
"baseline from filtered finite samples"
);
for (i, (t_rel, mean)) in means.iter().enumerate() {
if i < BASELINE_WINDOW {
continue;
}
plan_regression::push_latency(stream, *t_rel, qid, *mean, baseline);
}
}
}
fn per_query_mean_exec_time(snaps: &[Row], t0: f64) -> Vec<(f64, f64)> {
let mut means: Vec<(f64, f64)> = Vec::with_capacity(snaps.len().saturating_sub(1));
for w in snaps.windows(2) {
let dt = w[1].total_exec_time_ms - w[0].total_exec_time_ms;
let dc = w[1].calls.saturating_sub(w[0].calls);
if dc == 0 || dt < 0.0 {
continue;
}
let mean = dt / dc as f64;
debug_assert!(
mean.is_finite() && mean >= 0.0,
"dt≥0 ∧ dc>0 ⇒ finite non-negative mean"
);
let t_rel = w[1].snapshot_t - t0;
means.push((t_rel, mean));
}
means
}
fn emit_workload_phase_residuals(
stream: &mut ResidualStream,
by_qid: &HashMap<String, Vec<Row>>,
qids_sorted: &[&String],
snapshot_times: &[f64],
t0: f64,
) {
let mut prev_calls: HashMap<String, u64> = HashMap::new();
let mut max_entropy_seen: f64 = 0.0;
let mut snapshot_shares: Vec<(f64, f64)> = Vec::new();
for &t in snapshot_times.iter() {
let Some(entropy) = snapshot_entropy(by_qid, qids_sorted, t, &mut prev_calls) else {
continue;
};
debug_assert!(
entropy.is_finite() && entropy >= 0.0,
"entropy finite non-negative"
);
max_entropy_seen = max_entropy_seen.max(entropy);
snapshot_shares.push((t, entropy));
}
if max_entropy_seen <= 0.0 {
return;
}
debug_assert!(
max_entropy_seen.is_finite(),
"non-zero max entropy must be finite"
);
for (t_abs, entropy) in snapshot_shares.into_iter() {
let normalised = entropy / max_entropy_seen;
let r = 1.0 - normalised;
workload_phase::push_jsd(stream, t_abs - t0, "pgss_digest_mix", r);
}
}
fn snapshot_entropy(
by_qid: &HashMap<String, Vec<Row>>,
qids_sorted: &[&String],
t: f64,
prev_calls: &mut HashMap<String, u64>,
) -> Option<f64> {
let mut shares: Vec<f64> = Vec::new();
let mut total: u64 = 0;
for qid in qids_sorted.iter() {
let snaps = &by_qid[*qid];
if let Some(r) = snaps.iter().find(|r| (r.snapshot_t - t).abs() < 1e-9) {
let prev = prev_calls.get(*qid).copied().unwrap_or(0);
let delta = r.calls.saturating_sub(prev);
prev_calls.insert((*qid).clone(), r.calls);
if delta > 0 {
shares.push(delta as f64);
total += delta;
}
}
}
if total == 0 || shares.is_empty() {
return None;
}
for s in shares.iter_mut() {
*s /= total as f64;
}
let entropy: f64 = shares
.iter()
.filter(|s| **s > 0.0)
.map(|s| -s * s.ln())
.sum();
Some(entropy)
}