use crate::residual::{cache_io, contention, plan_regression, workload_phase, ResidualSample, ResidualStream};
use std::collections::HashMap;
#[derive(Debug, Clone, Default)]
pub struct Snapshot {
pub t: f64,
pub pgss: Vec<PgssRow>,
pub activity: Vec<ActivityRow>,
pub stat_io: Vec<StatIoRow>,
pub stat_database: Vec<StatDatabaseRow>,
}
#[derive(Debug, Clone)]
pub struct PgssRow {
pub query_id: String,
pub calls: u64,
pub total_exec_time_ms: f64,
}
#[derive(Debug, Clone)]
pub struct ActivityRow {
pub wait_event_type: String,
pub wait_event: String,
pub state: String,
}
#[derive(Debug, Clone)]
pub struct StatIoRow {
pub backend_type: String,
pub object: String,
pub context: String,
pub reads: u64,
pub hits: u64,
pub read_time_ms: f64,
}
#[derive(Debug, Clone)]
pub struct StatDatabaseRow {
pub datname: String,
pub blks_hit: u64,
pub blks_read: u64,
}
pub const BASELINE_WINDOW: usize = 3;
#[derive(Debug, Default, Clone)]
pub struct PgssQidState {
last_calls: Option<u64>,
last_total_ms: Option<f64>,
means_for_baseline: Vec<f64>,
baseline: Option<f64>,
}
impl PgssQidState {
pub fn push_snapshot(&mut self, calls: u64, total_exec_ms: f64) -> Option<(f64, f64)> {
let (prev_calls, prev_total) = match (self.last_calls, self.last_total_ms) {
(Some(c), Some(t)) => (c, t),
_ => {
self.last_calls = Some(calls);
self.last_total_ms = Some(total_exec_ms);
return None;
}
};
let dc = calls.saturating_sub(prev_calls);
let dt = total_exec_ms - prev_total;
self.last_calls = Some(calls);
self.last_total_ms = Some(total_exec_ms);
if dc == 0 || dt < 0.0 {
return None;
}
let mean = dt / dc as f64;
debug_assert!(
mean.is_finite() && mean >= 0.0,
"dt>=0 && dc>0 => finite non-negative mean"
);
if self.means_for_baseline.len() < BASELINE_WINDOW {
self.means_for_baseline.push(mean);
if self.means_for_baseline.len() == BASELINE_WINDOW {
let s: f64 = self.means_for_baseline.iter().sum();
self.baseline = Some(s / BASELINE_WINDOW as f64);
}
return None;
}
let baseline = self.baseline.expect(
"baseline must be populated once means_for_baseline.len() == BASELINE_WINDOW",
);
Some((mean, baseline))
}
}
#[derive(Debug, Default, Clone)]
pub struct ContentionWaitState {
last_count: Option<u64>,
}
#[derive(Debug, Default, Clone)]
pub struct CacheIoDbState {
last_hit: Option<u64>,
last_read: Option<u64>,
}
#[derive(Debug, Default)]
pub struct DistillerState {
t0: Option<f64>,
pgss_qids: HashMap<String, PgssQidState>,
prev_pgss_calls: HashMap<String, u64>,
activity_waits: HashMap<(String, String), ContentionWaitState>,
cache_db: HashMap<String, CacheIoDbState>,
max_entropy_seen: f64,
}
impl DistillerState {
pub fn new() -> Self {
Self::default()
}
fn t_rel(&mut self, t_abs: f64) -> f64 {
if self.t0.is_none() {
self.t0 = Some(t_abs);
}
t_abs - self.t0.unwrap()
}
pub fn ingest(&mut self, snap: &Snapshot) -> Vec<ResidualSample> {
let mut out = Vec::new();
let t_rel = self.t_rel(snap.t);
self.ingest_pgss(snap, t_rel, &mut out);
self.ingest_activity(snap, t_rel, &mut out);
self.ingest_stat_io(snap, t_rel, &mut out);
self.ingest_stat_database(snap, t_rel, &mut out);
self.ingest_workload_phase(snap, t_rel, &mut out);
out
}
fn ingest_pgss(&mut self, snap: &Snapshot, t_rel: f64, out: &mut Vec<ResidualSample>) {
let mut qids_in_snapshot: Vec<&PgssRow> = snap.pgss.iter().collect();
qids_in_snapshot.sort_by(|a, b| a.query_id.cmp(&b.query_id));
for row in qids_in_snapshot {
let st = self
.pgss_qids
.entry(row.query_id.clone())
.or_default();
if let Some((mean, baseline)) = st.push_snapshot(row.calls, row.total_exec_time_ms) {
let mut stream = ResidualStream::new("");
plan_regression::push_latency(
&mut stream,
t_rel,
&row.query_id,
mean,
baseline,
);
out.extend(stream.samples);
}
}
}
fn ingest_activity(&mut self, snap: &Snapshot, t_rel: f64, out: &mut Vec<ResidualSample>) {
let mut counts: HashMap<(String, String), u64> = HashMap::new();
for row in snap.activity.iter() {
if row.wait_event_type == "None" {
continue;
}
let key = (row.wait_event_type.clone(), row.wait_event.clone());
*counts.entry(key).or_default() += 1;
}
let mut keys: Vec<(String, String)> = counts.keys().cloned().collect();
keys.sort();
for key in keys {
let count = counts[&key];
let st = self
.activity_waits
.entry(key.clone())
.or_default();
let prev = st.last_count.unwrap_or(0);
st.last_count = Some(count);
let delta = count.saturating_sub(prev);
if delta == 0 {
continue;
}
let mut stream = ResidualStream::new("");
let wait_label = format!("{}::{}", key.0, key.1);
contention::push_wait(&mut stream, t_rel, &wait_label, delta as f64);
out.extend(stream.samples);
}
}
fn ingest_stat_io(&mut self, snap: &Snapshot, t_rel: f64, out: &mut Vec<ResidualSample>) {
let mut buckets: HashMap<(String, String), (u64, u64)> = HashMap::new();
for row in snap.stat_io.iter() {
let key = (row.object.clone(), row.context.clone());
let e = buckets.entry(key).or_default();
e.0 += row.hits;
e.1 += row.reads;
}
let mut keys: Vec<(String, String)> = buckets.keys().cloned().collect();
keys.sort();
for key in keys {
let (hits, reads) = buckets[&key];
let total = hits + reads;
if total == 0 {
continue;
}
let observed = hits as f64 / total as f64;
let mut stream = ResidualStream::new("");
let bucket_label = format!("{}::{}", key.0, key.1);
cache_io::push_hit_ratio(&mut stream, t_rel, &bucket_label, 1.0, observed);
out.extend(stream.samples);
}
}
fn ingest_stat_database(
&mut self,
snap: &Snapshot,
t_rel: f64,
out: &mut Vec<ResidualSample>,
) {
let emit_fallback = snap.stat_io.is_empty();
let mut rows: Vec<&StatDatabaseRow> = snap.stat_database.iter().collect();
rows.sort_by(|a, b| a.datname.cmp(&b.datname));
for row in rows {
let st = self.cache_db.entry(row.datname.clone()).or_default();
let prev_hit = st.last_hit.unwrap_or(row.blks_hit);
let prev_read = st.last_read.unwrap_or(row.blks_read);
st.last_hit = Some(row.blks_hit);
st.last_read = Some(row.blks_read);
if !emit_fallback {
continue;
}
let dh = row.blks_hit.saturating_sub(prev_hit);
let dr = row.blks_read.saturating_sub(prev_read);
let total = dh + dr;
if total == 0 {
continue;
}
let observed = dh as f64 / total as f64;
let mut stream = ResidualStream::new("");
let label = format!("db::{}", row.datname);
cache_io::push_hit_ratio(&mut stream, t_rel, &label, 1.0, observed);
out.extend(stream.samples);
}
}
fn ingest_workload_phase(
&mut self,
snap: &Snapshot,
t_rel: f64,
out: &mut Vec<ResidualSample>,
) {
let mut rows: Vec<&PgssRow> = snap.pgss.iter().collect();
rows.sort_by(|a, b| a.query_id.cmp(&b.query_id));
let mut shares: Vec<f64> = Vec::new();
let mut total: u64 = 0;
for row in rows.iter() {
let prev = self
.prev_pgss_calls
.get(&row.query_id)
.copied()
.unwrap_or(0);
let delta = row.calls.saturating_sub(prev);
self.prev_pgss_calls.insert(row.query_id.clone(), row.calls);
if delta > 0 {
shares.push(delta as f64);
total += delta;
}
}
if total == 0 || shares.is_empty() {
return;
}
for s in shares.iter_mut() {
*s /= total as f64;
}
let entropy: f64 = shares
.iter()
.filter(|s| **s > 0.0)
.map(|s| -s * s.ln())
.sum();
debug_assert!(entropy.is_finite() && entropy >= 0.0);
self.max_entropy_seen = self.max_entropy_seen.max(entropy);
if self.max_entropy_seen <= 0.0 {
return;
}
let normalised = entropy / self.max_entropy_seen;
let r = 1.0 - normalised;
let mut stream = ResidualStream::new("");
workload_phase::push_jsd(&mut stream, t_rel, "pgss_digest_mix", r);
out.extend(stream.samples);
}
}
#[cfg(test)]
mod tests {
use super::*;
fn pgss(qid: &str, calls: u64, total: f64) -> PgssRow {
PgssRow {
query_id: qid.to_string(),
calls,
total_exec_time_ms: total,
}
}
#[test]
fn plan_regression_warmup_honors_baseline_window() {
let mut st = PgssQidState::default();
assert_eq!(st.push_snapshot(10, 100.0), None);
assert_eq!(st.push_snapshot(20, 200.0), None);
assert_eq!(st.push_snapshot(30, 300.0), None);
assert_eq!(st.push_snapshot(40, 400.0), None);
let out = st.push_snapshot(50, 500.0).unwrap();
assert!(out.0.is_finite());
assert!(out.1.is_finite());
assert!((out.1 - 10.0).abs() < 1e-9, "baseline should be 10 ms/call");
}
#[test]
fn live_and_batch_math_agree_for_plan_regression() {
let snaps = [(10, 100.0), (20, 220.0), (30, 360.0), (40, 520.0), (50, 700.0)];
let mut live = PgssQidState::default();
let mut live_out = Vec::new();
for (c, t) in snaps.iter() {
if let Some(pair) = live.push_snapshot(*c, *t) {
live_out.push(pair);
}
}
let mut means: Vec<f64> = Vec::new();
for w in snaps.windows(2) {
let dt = w[1].1 - w[0].1;
let dc = w[1].0 - w[0].0;
if dc == 0 {
continue;
}
means.push(dt / dc as f64);
}
assert!(means.len() > BASELINE_WINDOW);
let baseline: f64 = means.iter().take(BASELINE_WINDOW).sum::<f64>()
/ BASELINE_WINDOW as f64;
let batch_out: Vec<(f64, f64)> = means
.iter()
.enumerate()
.filter(|(i, _)| *i >= BASELINE_WINDOW)
.map(|(_, m)| (*m, baseline))
.collect();
assert_eq!(live_out, batch_out);
}
#[test]
fn distiller_emits_plan_regression_after_warmup() {
let mut d = DistillerState::new();
for (i, (c, t)) in [(10, 100.0), (20, 220.0), (30, 360.0), (40, 520.0), (50, 700.0)]
.iter()
.enumerate()
{
let snap = Snapshot {
t: i as f64,
pgss: vec![pgss("q1", *c, *t)],
..Default::default()
};
let emitted = d.ingest(&snap);
if i < 4 {
assert!(
!emitted.iter().any(|s| matches!(
s.class,
crate::residual::ResidualClass::PlanRegression
)),
"plan_regression should not emit before warm-up (i={})",
i
);
} else {
assert!(emitted.iter().any(|s| matches!(
s.class,
crate::residual::ResidualClass::PlanRegression
)));
}
}
}
}