1use super::DatasetAdapter;
20use crate::residual::{plan_regression, workload_phase, ResidualStream};
21use anyhow::{Context, Result};
22use rand::{Rng, SeedableRng};
23use std::collections::{HashMap, VecDeque};
24use std::path::Path;
25
26pub struct SqlShare;
27
28#[derive(Debug, serde::Deserialize)]
29struct Row {
30 user_id: String,
31 runtime_seconds: f64,
32 submitted_at: f64, query_text: String,
34}
35
36fn skeleton(q: &str) -> String {
37 let mut out = String::with_capacity(q.len());
39 let mut in_str = false;
40 let mut prev_ws = false;
41 for c in q.chars() {
42 if c == '\'' || c == '"' {
43 in_str = !in_str;
44 out.push('?');
45 continue;
46 }
47 if in_str {
48 continue;
49 }
50 if c.is_ascii_digit() {
51 out.push('?');
52 prev_ws = false;
53 continue;
54 }
55 if c.is_whitespace() {
56 if !prev_ws {
57 out.push(' ');
58 prev_ws = true;
59 }
60 continue;
61 }
62 prev_ws = false;
63 for lc in c.to_lowercase() {
64 out.push(lc);
65 }
66 }
67 out.trim().to_string()
68}
69
70const MAX_SQLSHARE_ROWS: usize = 100_000_000;
74
75const PLAN_BASELINE_WIN: usize = 32;
77
78const PHASE_BUCKET_SECONDS: f64 = 86_400.0;
80
81const SKELETON_LABEL_MAX: usize = 64;
84
85fn load_sqlshare_rows(path: &Path) -> Result<Vec<Row>> {
86 let mut rdr = csv::Reader::from_path(path)
87 .with_context(|| format!("opening sqlshare csv at {}", path.display()))?;
88 let mut rows: Vec<Row> = rdr
89 .deserialize()
90 .filter_map(Result::ok)
91 .take(MAX_SQLSHARE_ROWS)
92 .collect();
93 debug_assert!(rows.len() <= MAX_SQLSHARE_ROWS, "iterator bound enforced");
94 rows.sort_by(|a, b| {
95 a.submitted_at
96 .partial_cmp(&b.submitted_at)
97 .unwrap_or(std::cmp::Ordering::Equal)
98 });
99 Ok(rows)
100}
101
102fn emit_sqlshare_residuals(stream: &mut ResidualStream, rows: &[Row], t0: f64) {
103 debug_assert!(t0.is_finite(), "t0 must be finite");
104 let mut baselines: HashMap<(String, String), VecDeque<f64>> = HashMap::new();
105 let mut histos: HashMap<String, HashMap<String, u64>> = HashMap::new();
106 let mut prev_histos: HashMap<String, HashMap<String, u64>> = HashMap::new();
107 let mut current_bucket: i64 = 0;
108
109 for r in rows.iter() {
110 let t = r.submitted_at - t0;
111 let sk = skeleton(&r.query_text);
112 emit_plan_regression_sample(stream, &mut baselines, r, t, &sk);
113
114 let bucket = (t / PHASE_BUCKET_SECONDS) as i64;
115 if bucket != current_bucket {
116 flush_histogram_deltas(stream, &histos, &prev_histos, current_bucket);
117 prev_histos = std::mem::take(&mut histos);
118 current_bucket = bucket;
119 }
120 *histos
121 .entry(r.user_id.clone())
122 .or_default()
123 .entry(sk)
124 .or_insert(0) += 1;
125 }
126}
127
128fn emit_plan_regression_sample(
129 stream: &mut ResidualStream,
130 baselines: &mut HashMap<(String, String), VecDeque<f64>>,
131 r: &Row,
132 t: f64,
133 sk: &str,
134) {
135 debug_assert!(r.runtime_seconds.is_finite(), "runtime must be finite");
136 debug_assert!(t.is_finite(), "t must be finite");
137 let key = (r.user_id.clone(), sk.to_string());
138 let q = baselines.entry(key).or_default();
139 let baseline = if q.is_empty() {
140 r.runtime_seconds
141 } else {
142 q.iter().sum::<f64>() / q.len() as f64
143 };
144 plan_regression::push_latency(
145 stream,
146 t,
147 &format!("{}#{}", r.user_id, &sk[..sk.len().min(SKELETON_LABEL_MAX)]),
148 r.runtime_seconds,
149 baseline,
150 );
151 q.push_back(r.runtime_seconds);
152 if q.len() > PLAN_BASELINE_WIN {
153 q.pop_front();
154 }
155 debug_assert!(
156 q.len() <= PLAN_BASELINE_WIN,
157 "rolling-window bound enforced"
158 );
159}
160
161fn flush_histogram_deltas(
162 stream: &mut ResidualStream,
163 histos: &HashMap<String, HashMap<String, u64>>,
164 prev_histos: &HashMap<String, HashMap<String, u64>>,
165 current_bucket: i64,
166) {
167 debug_assert!(current_bucket >= 0, "bucket index is non-negative");
168 for (u, h) in histos.iter() {
169 if let Some(prev) = prev_histos.get(u) {
170 let d = workload_phase::js_divergence(prev, h);
171 debug_assert!((0.0..=1.0).contains(&d), "JSD is in [0,1]");
172 workload_phase::push_jsd(stream, current_bucket as f64 * PHASE_BUCKET_SECONDS, u, d);
173 }
174 }
175}
176
177impl DatasetAdapter for SqlShare {
178 fn name(&self) -> &'static str {
179 "sqlshare"
180 }
181
182 fn load(&self, path: &Path) -> Result<ResidualStream> {
183 let rows = load_sqlshare_rows(path)?;
184 debug_assert!(
185 !rows.is_empty(),
186 "non-empty input implied by csv parse success"
187 );
188 let t0 = rows.first().map(|r| r.submitted_at).unwrap_or(0.0);
189 debug_assert!(t0.is_finite(), "t0 must be finite");
190 let mut stream = ResidualStream::new(format!(
191 "sqlshare@{}",
192 path.file_name().and_then(|n| n.to_str()).unwrap_or("?")
193 ));
194 emit_sqlshare_residuals(&mut stream, &rows, t0);
195 stream.sort();
196 Ok(stream)
197 }
198
199 fn exemplar(&self, seed: u64) -> ResidualStream {
200 let mut rng = rand_pcg::Pcg64::seed_from_u64(seed);
201 let mut stream = ResidualStream::new(format!("sqlshare-exemplar-seed{seed}"));
202 let users = ["alice", "bob", "carol", "dave", "eve"];
205 let qskeletons = [
206 "select count from t",
207 "join a b",
208 "group by x",
209 "where y",
210 "subselect",
211 ];
212 for (u, user) in users.iter().enumerate() {
213 for q in 0..200 {
214 let t = (q as f64) * 86400.0 / 200.0 * 5.0 + (u as f64) * 30.0;
215 let sk = qskeletons[q % qskeletons.len()];
216 let base = 0.4;
217 let mut runtime = base + rng.gen_range(-0.05..0.05);
218 if u == 2 && t > 3.0 * 86400.0 {
219 runtime = 5.0 + rng.gen_range(-0.5..0.5);
220 }
221 plan_regression::push_latency(
222 &mut stream,
223 t,
224 &format!("{user}#{sk}"),
225 runtime,
226 base,
227 );
228 }
229 }
230 for d in 0..6 {
232 let t = d as f64 * 86400.0;
233 let jsd = if d == 3 { 0.42 } else { 0.05 };
234 workload_phase::push_jsd(&mut stream, t, "carol", jsd);
235 }
236 stream.sort();
237 stream
238 }
239}