use anyhow::Result;
use dsfb_database::grammar::{MotifClass, MotifEngine, MotifGrammar};
use dsfb_database::live::distiller::{DistillerState, PgssRow, Snapshot};
use dsfb_database::report::plots_live::plot_live_pulsed_scrape;
use dsfb_database::residual::ResidualStream;
use std::fs;
use std::io::Write;
use std::path::PathBuf;
const SNAPSHOTS: usize = 600;
const CALLS_PER_SEC: u64 = 50;
const BASELINE_MS_PER_CALL: f64 = 10.0;
const PERTURBED_MS_PER_CALL: f64 = 30.0;
const PERTURB_START: usize = 250;
const PERTURB_END: usize = 400;
const THROTTLE_START: f64 = 200.0;
const THROTTLE_END: f64 = 260.0;
fn ms_per_call_at(t_s: usize) -> f64 {
if t_s >= PERTURB_START && t_s < PERTURB_END {
PERTURBED_MS_PER_CALL
} else {
BASELINE_MS_PER_CALL
}
}
fn throttle_factor_at(t_s: f64) -> f64 {
if t_s >= THROTTLE_START && t_s < THROTTLE_END {
let center = (THROTTLE_START + THROTTLE_END) / 2.0;
let half_span = (THROTTLE_END - THROTTLE_START) / 2.0;
let u = (t_s - center) / half_span;
1.0 + 2.0 * (1.0 - u * u).max(0.0)
} else {
1.0
}
}
fn main() -> Result<()> {
let root: PathBuf = std::env::current_dir()?;
let fixtures_dir = root.join("paper/fixtures/live_pg");
let figs_dir = root.join("paper/figs");
fs::create_dir_all(&fixtures_dir)?;
fs::create_dir_all(&figs_dir)?;
let pgss_csv = fixtures_dir.join("pg_stat_statements.csv");
let activity_csv = fixtures_dir.join("pg_stat_activity.csv");
let fig_path = figs_dir.join("live_pulsed_scrape.png");
let readme = fixtures_dir.join("README.md");
let mut snapshots_t: Vec<f64> = Vec::with_capacity(SNAPSHOTS);
let mut total_exec_ms: Vec<f64> = Vec::with_capacity(SNAPSHOTS);
let mut calls_cum: Vec<f64> = Vec::with_capacity(SNAPSHOTS);
let mut residual_t: Vec<f64> = Vec::with_capacity(SNAPSHOTS);
let mut residual_v: Vec<f64> = Vec::with_capacity(SNAPSHOTS);
let mut csv_file = fs::File::create(&pgss_csv)?;
writeln!(csv_file, "snapshot_t,query_id,calls,total_exec_time_ms")?;
let mut cum_calls: u64 = 0;
let mut cum_total_ms: f64 = 0.0;
let mut distiller = DistillerState::new();
let mut stream = ResidualStream::new("live_pg_fixture");
let mut prev_calls: Option<u64> = None;
let mut prev_total_ms: Option<f64> = None;
for t in 0..SNAPSHOTS {
let t_f = t as f64;
let ms_per_call = ms_per_call_at(t);
cum_calls += CALLS_PER_SEC;
cum_total_ms += CALLS_PER_SEC as f64 * ms_per_call;
snapshots_t.push(t_f);
total_exec_ms.push(cum_total_ms);
calls_cum.push(cum_calls as f64);
writeln!(
csv_file,
"{},{},{},{}",
t_f, "q1", cum_calls, cum_total_ms
)?;
if let (Some(pc), Some(pt)) = (prev_calls, prev_total_ms) {
let dc = cum_calls - pc;
let dt = cum_total_ms - pt;
if dc > 0 {
residual_t.push(t_f);
residual_v.push(dt / dc as f64);
}
}
prev_calls = Some(cum_calls);
prev_total_ms = Some(cum_total_ms);
let snap = Snapshot {
t: t_f,
pgss: vec![PgssRow {
query_id: "q1".to_string(),
calls: cum_calls,
total_exec_time_ms: cum_total_ms,
}],
activity: Vec::new(),
stat_io: Vec::new(),
stat_database: Vec::new(),
};
let samples = distiller.ingest(&snap);
for s in samples {
stream.push(s);
}
}
csv_file.flush()?;
drop(csv_file);
let mut activity_file = fs::File::create(&activity_csv)?;
writeln!(
activity_file,
"snapshot_t,pid,wait_event_type,wait_event,state"
)?;
activity_file.flush()?;
drop(activity_file);
stream.sort();
let engine = MotifEngine::new(MotifGrammar::default());
let episodes = engine.run(&stream);
let episode_window = episodes
.iter()
.find(|e| e.motif == MotifClass::PlanRegressionOnset)
.map(|e| (e.t_start, e.t_end));
let throttle_t: Vec<f64> = (0..=SNAPSHOTS).map(|i| i as f64).collect();
let throttle_factor: Vec<f64> = throttle_t
.iter()
.copied()
.map(throttle_factor_at)
.collect();
plot_live_pulsed_scrape(
&fig_path,
&snapshots_t,
&total_exec_ms,
&calls_cum,
&residual_t,
&residual_v,
episode_window,
&throttle_t,
&throttle_factor,
)?;
let mut r = fs::File::create(&readme)?;
writeln!(
r,
"# Live PG fixtures (deterministically synthesised)\n\n\
This directory contains a pinned, deterministic synthesis of the\n\
`pg_stat_statements` snapshot stream used by the paper's\n\
`live_pulsed_scrape` figure. It is **not** a capture from a real\n\
PostgreSQL engine — the generator is pure and seedless, so two\n\
invocations produce byte-identical files.\n\n\
## Shape\n\n\
- `pg_stat_statements.csv` — 600 snapshots @ 1 Hz, one `query_id` (`q1`):\n\
- t ∈ [0, 250): baseline 50 calls/s at 10 ms/call.\n\
- t ∈ [250, 400): planted plan regression — same call rate, 30 ms/call.\n\
- t ∈ [400, 600): recovery to 10 ms/call.\n\
- `pg_stat_activity.csv` — header only. The figure does not exercise the contention channel.\n\n\
## Regeneration\n\n\
```\n\
cargo run --release --features \"cli report live-postgres\" --bin live_pulsed_scrape_figure\n\
```\n\n\
The generator writes the two CSVs here and renders\n\
`paper/figs/live_pulsed_scrape.png`. Paper §Live cites this fixture\n\
and the figure caption discloses its synthesised origin explicitly.\n",
)?;
r.flush()?;
drop(r);
if let Some(w) = episode_window {
eprintln!(
"wrote {} and {}; plan_regression_onset window = [{:.2}, {:.2}]",
pgss_csv.display(),
fig_path.display(),
w.0,
w.1
);
} else {
eprintln!(
"wrote {} and {}; no plan_regression_onset episode emitted (check thresholds)",
pgss_csv.display(),
fig_path.display()
);
}
Ok(())
}