1use super::DatasetAdapter;
32use crate::residual::{cache_io, plan_regression, workload_phase, ResidualStream};
33use anyhow::{Context, Result};
34use chrono::DateTime;
35use rand::Rng;
36use rand::SeedableRng;
37use std::collections::{HashMap, VecDeque};
38use std::path::Path;
39
40const MAX_SNOWSET_ROWS: usize = 100_000_000;
45
46const SNOWSET_BASELINE_WIN: usize = 64;
49
50const SNOWSET_BUCKET_SECONDS: f64 = 300.0;
52
53pub struct Snowset;
54
55#[derive(Debug, serde::Deserialize)]
56struct RawRow {
57 #[serde(rename = "queryId")]
58 query_id: String,
59 #[serde(rename = "warehouseId")]
60 warehouse_id: String,
61 #[serde(rename = "createdTime")]
62 created_time: String,
63 #[serde(rename = "execTime")]
64 exec_time_us: f64,
65 #[serde(default, rename = "persistentReadBytesCache")]
66 bytes_cache: f64,
67 #[serde(default, rename = "persistentReadBytesS3")]
68 bytes_storage: f64,
69}
70
71struct Row {
72 query_id: String,
73 warehouse_id: String,
74 created_time_us: f64,
75 execution_time_us: f64,
76 bytes_cache: f64,
77 bytes_storage: f64,
78}
79
80fn parse_created_time(s: &str) -> Option<f64> {
81 let parsed = DateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f%:z")
85 .or_else(|_| DateTime::parse_from_rfc3339(s))
86 .ok()?;
87 Some(parsed.timestamp_micros() as f64)
88}
89
90fn load_snowset_rows(path: &Path) -> Result<Vec<Row>> {
91 let mut rdr = csv::Reader::from_path(path)
92 .with_context(|| format!("opening snowset subset at {}", path.display()))?;
93 let mut rows: Vec<Row> = Vec::new();
94 for r in rdr.deserialize().take(MAX_SNOWSET_ROWS) {
95 debug_assert!(rows.len() < MAX_SNOWSET_ROWS, "row-count bound enforced");
96 let raw: RawRow = match r {
97 Ok(r) => r,
98 Err(_) => continue,
99 };
100 let Some(created_time_us) = parse_created_time(&raw.created_time) else {
101 continue;
102 };
103 if !raw.exec_time_us.is_finite() {
104 continue;
105 }
106 rows.push(Row {
107 query_id: raw.query_id,
108 warehouse_id: raw.warehouse_id,
109 created_time_us,
110 execution_time_us: raw.exec_time_us,
111 bytes_cache: raw.bytes_cache,
112 bytes_storage: raw.bytes_storage,
113 });
114 }
115 rows.sort_by(|a, b| {
116 a.created_time_us
117 .partial_cmp(&b.created_time_us)
118 .unwrap_or(std::cmp::Ordering::Equal)
119 });
120 Ok(rows)
121}
122
123fn emit_snowset_plan_and_cache(stream: &mut ResidualStream, rows: &[Row], t0: f64) {
124 debug_assert!(t0.is_finite(), "t0 must be finite");
125 let mut baselines: HashMap<(String, String), VecDeque<f64>> = HashMap::new();
126 let mut cache_baseline: HashMap<String, VecDeque<f64>> = HashMap::new();
127
128 for r in rows.iter() {
129 let t = (r.created_time_us - t0) / 1e6;
130 emit_snowset_plan_sample(stream, &mut baselines, r, t);
131 emit_snowset_cache_sample(stream, &mut cache_baseline, r, t);
132 }
133}
134
135fn emit_snowset_plan_sample(
136 stream: &mut ResidualStream,
137 baselines: &mut HashMap<(String, String), VecDeque<f64>>,
138 r: &Row,
139 t: f64,
140) {
141 debug_assert!(t.is_finite(), "t must be finite");
142 debug_assert!(
143 r.execution_time_us.is_finite(),
144 "execution_time_us must be finite"
145 );
146 let key = (r.warehouse_id.clone(), r.query_id.clone());
147 let q = baselines.entry(key).or_default();
148 let baseline = if q.is_empty() {
149 r.execution_time_us
150 } else {
151 q.iter().sum::<f64>() / q.len() as f64
152 };
153 plan_regression::push_latency(
154 stream,
155 t,
156 &format!("{}/{}", r.warehouse_id, r.query_id),
157 r.execution_time_us / 1e3,
158 baseline / 1e3,
159 );
160 q.push_back(r.execution_time_us);
161 if q.len() > SNOWSET_BASELINE_WIN {
162 q.pop_front();
163 }
164 debug_assert!(
165 q.len() <= SNOWSET_BASELINE_WIN,
166 "rolling window bound enforced"
167 );
168}
169
170fn emit_snowset_cache_sample(
171 stream: &mut ResidualStream,
172 cache_baseline: &mut HashMap<String, VecDeque<f64>>,
173 r: &Row,
174 t: f64,
175) {
176 debug_assert!(t.is_finite(), "t must be finite");
177 let total = r.bytes_cache + r.bytes_storage;
178 if total <= 0.0 {
179 return;
180 }
181 let cache_ratio = r.bytes_cache / total;
182 debug_assert!((0.0..=1.0).contains(&cache_ratio), "cache ratio in [0,1]");
183 let cb = cache_baseline.entry(r.warehouse_id.clone()).or_default();
184 let expected = if cb.is_empty() {
185 cache_ratio
186 } else {
187 cb.iter().sum::<f64>() / cb.len() as f64
188 };
189 cache_io::push_hit_ratio(stream, t, &r.warehouse_id, expected, cache_ratio);
190 cb.push_back(cache_ratio);
191 if cb.len() > SNOWSET_BASELINE_WIN {
192 cb.pop_front();
193 }
194 debug_assert!(
195 cb.len() <= SNOWSET_BASELINE_WIN,
196 "rolling window bound enforced"
197 );
198}
199
200fn emit_snowset_workload_phase(stream: &mut ResidualStream, rows: &[Row], t0: f64) {
201 debug_assert!(t0.is_finite(), "t0 must be finite");
202 let mut histos: HashMap<String, HashMap<String, u64>> = HashMap::new();
203 let mut prev_histos: HashMap<String, HashMap<String, u64>> = HashMap::new();
204 let mut current_bucket = 0_i64;
205 for r in rows.iter() {
206 let t = (r.created_time_us - t0) / 1e6;
207 let bucket = (t / SNOWSET_BUCKET_SECONDS) as i64;
208 if bucket != current_bucket {
209 flush_snowset_phase_deltas(stream, &histos, &prev_histos, current_bucket);
210 prev_histos = std::mem::take(&mut histos);
211 current_bucket = bucket;
212 }
213 *histos
214 .entry(r.warehouse_id.clone())
215 .or_default()
216 .entry(r.query_id.clone())
217 .or_insert(0) += 1;
218 }
219}
220
221fn flush_snowset_phase_deltas(
222 stream: &mut ResidualStream,
223 histos: &HashMap<String, HashMap<String, u64>>,
224 prev_histos: &HashMap<String, HashMap<String, u64>>,
225 current_bucket: i64,
226) {
227 debug_assert!(current_bucket >= 0, "bucket index non-negative");
228 for (wh, h) in histos.iter() {
229 if let Some(prev) = prev_histos.get(wh) {
230 let d = workload_phase::js_divergence(prev, h);
231 debug_assert!((0.0..=1.0).contains(&d), "JSD in [0,1]");
232 workload_phase::push_jsd(
233 stream,
234 current_bucket as f64 * SNOWSET_BUCKET_SECONDS,
235 wh,
236 d,
237 );
238 }
239 }
240}
241
242impl DatasetAdapter for Snowset {
243 fn name(&self) -> &'static str {
244 "snowset"
245 }
246
247 fn load(&self, path: &Path) -> Result<ResidualStream> {
248 let rows = load_snowset_rows(path)?;
249 debug_assert!(rows.len() <= MAX_SNOWSET_ROWS, "row-count bound enforced");
250 let mut stream = ResidualStream::new(format!(
251 "snowset@{}",
252 path.file_name().and_then(|n| n.to_str()).unwrap_or("?")
253 ));
254 let t0 = rows.first().map(|r| r.created_time_us).unwrap_or(0.0);
255 debug_assert!(t0.is_finite(), "t0 must be finite");
256
257 emit_snowset_plan_and_cache(&mut stream, &rows, t0);
258 emit_snowset_workload_phase(&mut stream, &rows, t0);
259
260 stream.sort();
261 Ok(stream)
262 }
263
264 fn exemplar(&self, seed: u64) -> ResidualStream {
265 let mut rng = rand_pcg::Pcg64::seed_from_u64(seed);
266 let mut stream = ResidualStream::new(format!("snowset-exemplar-seed{seed}"));
267 let warehouses = ["wh_a", "wh_b", "wh_c"];
268 let queries = ["q1", "q2", "q3", "q4", "q5"];
269 for i in 0..3000 {
271 let t = i as f64;
272 let w = warehouses[(i / 200) % warehouses.len()];
273 let q = queries[(i / 13) % queries.len()];
274 let base = 50.0;
275 let jitter: f64 = rng.gen_range(-3.0..3.0);
276 plan_regression::push_latency(&mut stream, t, &format!("{w}/{q}"), base + jitter, base);
277 cache_io::push_hit_ratio(&mut stream, t, w, 0.92, 0.92 + rng.gen_range(-0.01..0.01));
278 }
279 for i in 3000..6000 {
282 let t = i as f64;
283 let w = "wh_b";
284 let q = if rng.gen_bool(0.7) { "q_heavy" } else { "q5" };
285 let base = 80.0;
286 let jitter: f64 = rng.gen_range(-5.0..15.0);
287 plan_regression::push_latency(&mut stream, t, &format!("{w}/{q}"), base + jitter, base);
288 cache_io::push_hit_ratio(&mut stream, t, w, 0.92, 0.55 + rng.gen_range(-0.05..0.05));
289 }
290 for k in 0..30 {
292 let t = 3000.0 + 50.0 * k as f64;
293 let d = if (10..20).contains(&k) {
294 0.4 + rng.gen_range(-0.05..0.05)
295 } else {
296 0.05 + rng.gen_range(0.0..0.03)
297 };
298 workload_phase::push_jsd(&mut stream, t, "wh_b", d);
299 }
300 stream.sort();
301 stream
302 }
303}