use super::DatasetAdapter;
use crate::residual::{plan_regression, workload_phase, ResidualStream};
use anyhow::{Context, Result};
use rand::{Rng, SeedableRng};
use std::collections::{HashMap, VecDeque};
use std::path::Path;
pub struct SqlShare;
#[derive(Debug, serde::Deserialize)]
struct Row {
user_id: String,
runtime_seconds: f64,
submitted_at: f64, query_text: String,
}
fn skeleton(q: &str) -> String {
let mut out = String::with_capacity(q.len());
let mut in_str = false;
let mut prev_ws = false;
for c in q.chars() {
if c == '\'' || c == '"' {
in_str = !in_str;
out.push('?');
continue;
}
if in_str {
continue;
}
if c.is_ascii_digit() {
out.push('?');
prev_ws = false;
continue;
}
if c.is_whitespace() {
if !prev_ws {
out.push(' ');
prev_ws = true;
}
continue;
}
prev_ws = false;
for lc in c.to_lowercase() {
out.push(lc);
}
}
out.trim().to_string()
}
const MAX_SQLSHARE_ROWS: usize = 100_000_000;
const PLAN_BASELINE_WIN: usize = 32;
const PHASE_BUCKET_SECONDS: f64 = 86_400.0;
const SKELETON_LABEL_MAX: usize = 64;
fn load_sqlshare_rows(path: &Path) -> Result<Vec<Row>> {
let mut rdr = csv::Reader::from_path(path)
.with_context(|| format!("opening sqlshare csv at {}", path.display()))?;
let mut rows: Vec<Row> = rdr
.deserialize()
.filter_map(Result::ok)
.take(MAX_SQLSHARE_ROWS)
.collect();
debug_assert!(rows.len() <= MAX_SQLSHARE_ROWS, "iterator bound enforced");
rows.sort_by(|a, b| {
a.submitted_at
.partial_cmp(&b.submitted_at)
.unwrap_or(std::cmp::Ordering::Equal)
});
Ok(rows)
}
fn emit_sqlshare_residuals(stream: &mut ResidualStream, rows: &[Row], t0: f64) {
debug_assert!(t0.is_finite(), "t0 must be finite");
let mut baselines: HashMap<(String, String), VecDeque<f64>> = HashMap::new();
let mut histos: HashMap<String, HashMap<String, u64>> = HashMap::new();
let mut prev_histos: HashMap<String, HashMap<String, u64>> = HashMap::new();
let mut current_bucket: i64 = 0;
for r in rows.iter() {
let t = r.submitted_at - t0;
let sk = skeleton(&r.query_text);
emit_plan_regression_sample(stream, &mut baselines, r, t, &sk);
let bucket = (t / PHASE_BUCKET_SECONDS) as i64;
if bucket != current_bucket {
flush_histogram_deltas(stream, &histos, &prev_histos, current_bucket);
prev_histos = std::mem::take(&mut histos);
current_bucket = bucket;
}
*histos
.entry(r.user_id.clone())
.or_default()
.entry(sk)
.or_insert(0) += 1;
}
}
fn emit_plan_regression_sample(
stream: &mut ResidualStream,
baselines: &mut HashMap<(String, String), VecDeque<f64>>,
r: &Row,
t: f64,
sk: &str,
) {
debug_assert!(r.runtime_seconds.is_finite(), "runtime must be finite");
debug_assert!(t.is_finite(), "t must be finite");
let key = (r.user_id.clone(), sk.to_string());
let q = baselines.entry(key).or_default();
let baseline = if q.is_empty() {
r.runtime_seconds
} else {
q.iter().sum::<f64>() / q.len() as f64
};
plan_regression::push_latency(
stream,
t,
&format!("{}#{}", r.user_id, &sk[..sk.len().min(SKELETON_LABEL_MAX)]),
r.runtime_seconds,
baseline,
);
q.push_back(r.runtime_seconds);
if q.len() > PLAN_BASELINE_WIN {
q.pop_front();
}
debug_assert!(
q.len() <= PLAN_BASELINE_WIN,
"rolling-window bound enforced"
);
}
fn flush_histogram_deltas(
stream: &mut ResidualStream,
histos: &HashMap<String, HashMap<String, u64>>,
prev_histos: &HashMap<String, HashMap<String, u64>>,
current_bucket: i64,
) {
debug_assert!(current_bucket >= 0, "bucket index is non-negative");
for (u, h) in histos.iter() {
if let Some(prev) = prev_histos.get(u) {
let d = workload_phase::js_divergence(prev, h);
debug_assert!((0.0..=1.0).contains(&d), "JSD is in [0,1]");
workload_phase::push_jsd(stream, current_bucket as f64 * PHASE_BUCKET_SECONDS, u, d);
}
}
}
impl DatasetAdapter for SqlShare {
fn name(&self) -> &'static str {
"sqlshare"
}
fn load(&self, path: &Path) -> Result<ResidualStream> {
let rows = load_sqlshare_rows(path)?;
debug_assert!(
!rows.is_empty(),
"non-empty input implied by csv parse success"
);
let t0 = rows.first().map(|r| r.submitted_at).unwrap_or(0.0);
debug_assert!(t0.is_finite(), "t0 must be finite");
let mut stream = ResidualStream::new(format!(
"sqlshare@{}",
path.file_name().and_then(|n| n.to_str()).unwrap_or("?")
));
emit_sqlshare_residuals(&mut stream, &rows, t0);
stream.sort();
Ok(stream)
}
fn exemplar(&self, seed: u64) -> ResidualStream {
let mut rng = rand_pcg::Pcg64::seed_from_u64(seed);
let mut stream = ResidualStream::new(format!("sqlshare-exemplar-seed{seed}"));
let users = ["alice", "bob", "carol", "dave", "eve"];
let qskeletons = [
"select count from t",
"join a b",
"group by x",
"where y",
"subselect",
];
for (u, user) in users.iter().enumerate() {
for q in 0..200 {
let t = (q as f64) * 86400.0 / 200.0 * 5.0 + (u as f64) * 30.0;
let sk = qskeletons[q % qskeletons.len()];
let base = 0.4;
let mut runtime = base + rng.gen_range(-0.05..0.05);
if u == 2 && t > 3.0 * 86400.0 {
runtime = 5.0 + rng.gen_range(-0.5..0.5);
}
plan_regression::push_latency(
&mut stream,
t,
&format!("{user}#{sk}"),
runtime,
base,
);
}
}
for d in 0..6 {
let t = d as f64 * 86400.0;
let jsd = if d == 3 { 0.42 } else { 0.05 };
workload_phase::push_jsd(&mut stream, t, "carol", jsd);
}
stream.sort();
stream
}
}