Skip to main content

dsfb_database/adapters/
sqlshare.rs

1//! SQLShare adapter (Jain et al., SIGMOD 2016).
2//!
3//! Real subset CSV columns (as released by UW eScience):
4//!   * `query_id`, `user_id`, `runtime_seconds`, `submitted_at`, `query_text`
5//!
6//! What we extract:
7//!   * `PlanRegression` — `runtime − rolling_baseline(runtime)` per
8//!     `(user_id, normalised_query_skeleton)`. Normalisation = strip
9//!     literals, collapse whitespace, lowercase. This stands in for query
10//!     digest because SQLShare predates digest IDs.
11//!   * `WorkloadPhase` — JS divergence over the per-user query-skeleton
12//!     histogram in 1-day buckets.
13//!
14//! What we cannot extract:
15//!   * `Cardinality`, `Contention`, `CacheIo` — none of these are in the
16//!     released metadata. The paper's Table on "what each dataset supplies"
17//!     marks these as N/A for SQLShare.
18
19use super::DatasetAdapter;
20use crate::residual::{plan_regression, workload_phase, ResidualStream};
21use anyhow::{Context, Result};
22use rand::{Rng, SeedableRng};
23use std::collections::{HashMap, VecDeque};
24use std::path::Path;
25
26pub struct SqlShare;
27
28#[derive(Debug, serde::Deserialize)]
29struct Row {
30    user_id: String,
31    runtime_seconds: f64,
32    submitted_at: f64, // epoch seconds
33    query_text: String,
34}
35
36fn skeleton(q: &str) -> String {
37    // crude normalisation: collapse digits + quoted strings, lowercase, dedupe whitespace
38    let mut out = String::with_capacity(q.len());
39    let mut in_str = false;
40    let mut prev_ws = false;
41    for c in q.chars() {
42        if c == '\'' || c == '"' {
43            in_str = !in_str;
44            out.push('?');
45            continue;
46        }
47        if in_str {
48            continue;
49        }
50        if c.is_ascii_digit() {
51            out.push('?');
52            prev_ws = false;
53            continue;
54        }
55        if c.is_whitespace() {
56            if !prev_ws {
57                out.push(' ');
58                prev_ws = true;
59            }
60            continue;
61        }
62        prev_ws = false;
63        for lc in c.to_lowercase() {
64            out.push(lc);
65        }
66    }
67    out.trim().to_string()
68}
69
70/// Upper bound on SQLShare rows loaded. The released SQLShare dataset
71/// has ~24k queries; 100M is a ~4000× cap that still rejects pathological
72/// or corrupted inputs without silently truncating realistic traces.
73const MAX_SQLSHARE_ROWS: usize = 100_000_000;
74
75/// Rolling-baseline window for per-skeleton plan-regression deltas.
76const PLAN_BASELINE_WIN: usize = 32;
77
78/// Workload-phase bucket width in seconds (one day).
79const PHASE_BUCKET_SECONDS: f64 = 86_400.0;
80
81/// Max characters retained from the normalised skeleton when building
82/// the per-user channel label. Keeps labels reviewable and bounded.
83const SKELETON_LABEL_MAX: usize = 64;
84
85fn load_sqlshare_rows(path: &Path) -> Result<Vec<Row>> {
86    let mut rdr = csv::Reader::from_path(path)
87        .with_context(|| format!("opening sqlshare csv at {}", path.display()))?;
88    let mut rows: Vec<Row> = rdr
89        .deserialize()
90        .filter_map(Result::ok)
91        .take(MAX_SQLSHARE_ROWS)
92        .collect();
93    debug_assert!(rows.len() <= MAX_SQLSHARE_ROWS, "iterator bound enforced");
94    rows.sort_by(|a, b| {
95        a.submitted_at
96            .partial_cmp(&b.submitted_at)
97            .unwrap_or(std::cmp::Ordering::Equal)
98    });
99    Ok(rows)
100}
101
102fn emit_sqlshare_residuals(stream: &mut ResidualStream, rows: &[Row], t0: f64) {
103    debug_assert!(t0.is_finite(), "t0 must be finite");
104    let mut baselines: HashMap<(String, String), VecDeque<f64>> = HashMap::new();
105    let mut histos: HashMap<String, HashMap<String, u64>> = HashMap::new();
106    let mut prev_histos: HashMap<String, HashMap<String, u64>> = HashMap::new();
107    let mut current_bucket: i64 = 0;
108
109    for r in rows.iter() {
110        let t = r.submitted_at - t0;
111        let sk = skeleton(&r.query_text);
112        emit_plan_regression_sample(stream, &mut baselines, r, t, &sk);
113
114        let bucket = (t / PHASE_BUCKET_SECONDS) as i64;
115        if bucket != current_bucket {
116            flush_histogram_deltas(stream, &histos, &prev_histos, current_bucket);
117            prev_histos = std::mem::take(&mut histos);
118            current_bucket = bucket;
119        }
120        *histos
121            .entry(r.user_id.clone())
122            .or_default()
123            .entry(sk)
124            .or_insert(0) += 1;
125    }
126}
127
128fn emit_plan_regression_sample(
129    stream: &mut ResidualStream,
130    baselines: &mut HashMap<(String, String), VecDeque<f64>>,
131    r: &Row,
132    t: f64,
133    sk: &str,
134) {
135    debug_assert!(r.runtime_seconds.is_finite(), "runtime must be finite");
136    debug_assert!(t.is_finite(), "t must be finite");
137    let key = (r.user_id.clone(), sk.to_string());
138    let q = baselines.entry(key).or_default();
139    let baseline = if q.is_empty() {
140        r.runtime_seconds
141    } else {
142        q.iter().sum::<f64>() / q.len() as f64
143    };
144    plan_regression::push_latency(
145        stream,
146        t,
147        &format!("{}#{}", r.user_id, &sk[..sk.len().min(SKELETON_LABEL_MAX)]),
148        r.runtime_seconds,
149        baseline,
150    );
151    q.push_back(r.runtime_seconds);
152    if q.len() > PLAN_BASELINE_WIN {
153        q.pop_front();
154    }
155    debug_assert!(
156        q.len() <= PLAN_BASELINE_WIN,
157        "rolling-window bound enforced"
158    );
159}
160
161fn flush_histogram_deltas(
162    stream: &mut ResidualStream,
163    histos: &HashMap<String, HashMap<String, u64>>,
164    prev_histos: &HashMap<String, HashMap<String, u64>>,
165    current_bucket: i64,
166) {
167    debug_assert!(current_bucket >= 0, "bucket index is non-negative");
168    for (u, h) in histos.iter() {
169        if let Some(prev) = prev_histos.get(u) {
170            let d = workload_phase::js_divergence(prev, h);
171            debug_assert!((0.0..=1.0).contains(&d), "JSD is in [0,1]");
172            workload_phase::push_jsd(stream, current_bucket as f64 * PHASE_BUCKET_SECONDS, u, d);
173        }
174    }
175}
176
177impl DatasetAdapter for SqlShare {
178    fn name(&self) -> &'static str {
179        "sqlshare"
180    }
181
182    fn load(&self, path: &Path) -> Result<ResidualStream> {
183        let rows = load_sqlshare_rows(path)?;
184        debug_assert!(
185            !rows.is_empty(),
186            "non-empty input implied by csv parse success"
187        );
188        let t0 = rows.first().map(|r| r.submitted_at).unwrap_or(0.0);
189        debug_assert!(t0.is_finite(), "t0 must be finite");
190        let mut stream = ResidualStream::new(format!(
191            "sqlshare@{}",
192            path.file_name().and_then(|n| n.to_str()).unwrap_or("?")
193        ));
194        emit_sqlshare_residuals(&mut stream, &rows, t0);
195        stream.sort();
196        Ok(stream)
197    }
198
199    fn exemplar(&self, seed: u64) -> ResidualStream {
200        let mut rng = rand_pcg::Pcg64::seed_from_u64(seed);
201        let mut stream = ResidualStream::new(format!("sqlshare-exemplar-seed{seed}"));
202        // 5 users; each has 200 queries spread over 5 days; user 3 develops a
203        // long-running ad-hoc query starting at day 3.
204        let users = ["alice", "bob", "carol", "dave", "eve"];
205        let qskeletons = [
206            "select count from t",
207            "join a b",
208            "group by x",
209            "where y",
210            "subselect",
211        ];
212        for (u, user) in users.iter().enumerate() {
213            for q in 0..200 {
214                let t = (q as f64) * 86400.0 / 200.0 * 5.0 + (u as f64) * 30.0;
215                let sk = qskeletons[q % qskeletons.len()];
216                let base = 0.4;
217                let mut runtime = base + rng.gen_range(-0.05..0.05);
218                if u == 2 && t > 3.0 * 86400.0 {
219                    runtime = 5.0 + rng.gen_range(-0.5..0.5);
220                }
221                plan_regression::push_latency(
222                    &mut stream,
223                    t,
224                    &format!("{user}#{sk}"),
225                    runtime,
226                    base,
227                );
228            }
229        }
230        // Phase JSD residual jumping at day 3 for user 'carol'
231        for d in 0..6 {
232            let t = d as f64 * 86400.0;
233            let jsd = if d == 3 { 0.42 } else { 0.05 };
234            workload_phase::push_jsd(&mut stream, t, "carol", jsd);
235        }
236        stream.sort();
237        stream
238    }
239}