Skip to main content

dsfb_database/adapters/
snowset.rs

1//! Snowset adapter (Vuppalapati et al., NSDI 2020).
2//!
3//! Real subset: CSV distributed at
4//! [github.com/resource-disaggregation/snowset](https://github.com/resource-disaggregation/snowset)
5//! (mirror: `http://www.cs.cornell.edu/~midhul/snowset/snowset-main.csv.gz`),
6//! verified schema (2026-04) for the columns this adapter touches:
7//!   * `queryId`, `warehouseId`
8//!   * `createdTime` — ISO-8601 UTC string with microsecond precision,
9//!     e.g. `2018-03-02 14:44:02.768000+00:00`
10//!   * `execTime` — query execution time in microseconds
11//!   * `persistentReadBytesCache` — bytes served from the persistent
12//!     cache (the analogue of the earlier-documented
13//!     `bytesScannedFromCache`)
14//!   * `persistentReadBytesS3` — bytes read from S3 (the analogue of
15//!     `bytesScannedFromStorage`)
16//!
17//! What we extract:
18//!   * `PlanRegression` — `execTime − rolling_baseline(execTime)`
19//!     per `(warehouseId, queryId)` pair (proxy for query class —
20//!     Snowset anonymises SQL text per fact #16).
21//!   * `WorkloadPhase` — JS divergence over the per-warehouse query-class
22//!     histogram in 5-minute buckets.
23//!   * `CacheIo` — `persistentReadBytesS3 /
24//!     (persistentReadBytesCache + persistentReadBytesS3)` drift
25//!     (cache-miss-rate residual).
26//!
27//! What we cannot extract (paper says so explicitly):
28//!   * `Cardinality` — Snowset does not publish `est_rows`/`actual_rows`.
29//!   * `Contention` — no lock-wait stream.
30
31use super::DatasetAdapter;
32use crate::residual::{cache_io, plan_regression, workload_phase, ResidualStream};
33use anyhow::{Context, Result};
34use chrono::DateTime;
35use rand::Rng;
36use rand::SeedableRng;
37use std::collections::{HashMap, VecDeque};
38use std::path::Path;
39
40/// Upper bound on Snowset rows read from a CSV. The published
41/// `snowset-main.csv.gz` unpacks to ~70M rows; 100M is the next
42/// order-of-magnitude cap that still accepts the real release plus
43/// any resampling.
44const MAX_SNOWSET_ROWS: usize = 100_000_000;
45
46/// Rolling-baseline window for per-`(warehouse, query)` plan-regression
47/// and per-warehouse cache-hit-ratio residuals.
48const SNOWSET_BASELINE_WIN: usize = 64;
49
50/// Workload-phase histogram bucket width in seconds (5 minutes).
51const SNOWSET_BUCKET_SECONDS: f64 = 300.0;
52
53pub struct Snowset;
54
55#[derive(Debug, serde::Deserialize)]
56struct RawRow {
57    #[serde(rename = "queryId")]
58    query_id: String,
59    #[serde(rename = "warehouseId")]
60    warehouse_id: String,
61    #[serde(rename = "createdTime")]
62    created_time: String,
63    #[serde(rename = "execTime")]
64    exec_time_us: f64,
65    #[serde(default, rename = "persistentReadBytesCache")]
66    bytes_cache: f64,
67    #[serde(default, rename = "persistentReadBytesS3")]
68    bytes_storage: f64,
69}
70
71struct Row {
72    query_id: String,
73    warehouse_id: String,
74    created_time_us: f64,
75    execution_time_us: f64,
76    bytes_cache: f64,
77    bytes_storage: f64,
78}
79
80fn parse_created_time(s: &str) -> Option<f64> {
81    // Snowset ships UTC timestamps with microsecond precision, e.g.
82    // "2018-03-02 14:44:02.768000+00:00". chrono parses both the
83    // hyphen-offset ("+00:00") and the Z form.
84    let parsed = DateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f%:z")
85        .or_else(|_| DateTime::parse_from_rfc3339(s))
86        .ok()?;
87    Some(parsed.timestamp_micros() as f64)
88}
89
90fn load_snowset_rows(path: &Path) -> Result<Vec<Row>> {
91    let mut rdr = csv::Reader::from_path(path)
92        .with_context(|| format!("opening snowset subset at {}", path.display()))?;
93    let mut rows: Vec<Row> = Vec::new();
94    for r in rdr.deserialize().take(MAX_SNOWSET_ROWS) {
95        debug_assert!(rows.len() < MAX_SNOWSET_ROWS, "row-count bound enforced");
96        let raw: RawRow = match r {
97            Ok(r) => r,
98            Err(_) => continue,
99        };
100        let Some(created_time_us) = parse_created_time(&raw.created_time) else {
101            continue;
102        };
103        if !raw.exec_time_us.is_finite() {
104            continue;
105        }
106        rows.push(Row {
107            query_id: raw.query_id,
108            warehouse_id: raw.warehouse_id,
109            created_time_us,
110            execution_time_us: raw.exec_time_us,
111            bytes_cache: raw.bytes_cache,
112            bytes_storage: raw.bytes_storage,
113        });
114    }
115    rows.sort_by(|a, b| {
116        a.created_time_us
117            .partial_cmp(&b.created_time_us)
118            .unwrap_or(std::cmp::Ordering::Equal)
119    });
120    Ok(rows)
121}
122
123fn emit_snowset_plan_and_cache(stream: &mut ResidualStream, rows: &[Row], t0: f64) {
124    debug_assert!(t0.is_finite(), "t0 must be finite");
125    let mut baselines: HashMap<(String, String), VecDeque<f64>> = HashMap::new();
126    let mut cache_baseline: HashMap<String, VecDeque<f64>> = HashMap::new();
127
128    for r in rows.iter() {
129        let t = (r.created_time_us - t0) / 1e6;
130        emit_snowset_plan_sample(stream, &mut baselines, r, t);
131        emit_snowset_cache_sample(stream, &mut cache_baseline, r, t);
132    }
133}
134
135fn emit_snowset_plan_sample(
136    stream: &mut ResidualStream,
137    baselines: &mut HashMap<(String, String), VecDeque<f64>>,
138    r: &Row,
139    t: f64,
140) {
141    debug_assert!(t.is_finite(), "t must be finite");
142    debug_assert!(
143        r.execution_time_us.is_finite(),
144        "execution_time_us must be finite"
145    );
146    let key = (r.warehouse_id.clone(), r.query_id.clone());
147    let q = baselines.entry(key).or_default();
148    let baseline = if q.is_empty() {
149        r.execution_time_us
150    } else {
151        q.iter().sum::<f64>() / q.len() as f64
152    };
153    plan_regression::push_latency(
154        stream,
155        t,
156        &format!("{}/{}", r.warehouse_id, r.query_id),
157        r.execution_time_us / 1e3,
158        baseline / 1e3,
159    );
160    q.push_back(r.execution_time_us);
161    if q.len() > SNOWSET_BASELINE_WIN {
162        q.pop_front();
163    }
164    debug_assert!(
165        q.len() <= SNOWSET_BASELINE_WIN,
166        "rolling window bound enforced"
167    );
168}
169
170fn emit_snowset_cache_sample(
171    stream: &mut ResidualStream,
172    cache_baseline: &mut HashMap<String, VecDeque<f64>>,
173    r: &Row,
174    t: f64,
175) {
176    debug_assert!(t.is_finite(), "t must be finite");
177    let total = r.bytes_cache + r.bytes_storage;
178    if total <= 0.0 {
179        return;
180    }
181    let cache_ratio = r.bytes_cache / total;
182    debug_assert!((0.0..=1.0).contains(&cache_ratio), "cache ratio in [0,1]");
183    let cb = cache_baseline.entry(r.warehouse_id.clone()).or_default();
184    let expected = if cb.is_empty() {
185        cache_ratio
186    } else {
187        cb.iter().sum::<f64>() / cb.len() as f64
188    };
189    cache_io::push_hit_ratio(stream, t, &r.warehouse_id, expected, cache_ratio);
190    cb.push_back(cache_ratio);
191    if cb.len() > SNOWSET_BASELINE_WIN {
192        cb.pop_front();
193    }
194    debug_assert!(
195        cb.len() <= SNOWSET_BASELINE_WIN,
196        "rolling window bound enforced"
197    );
198}
199
200fn emit_snowset_workload_phase(stream: &mut ResidualStream, rows: &[Row], t0: f64) {
201    debug_assert!(t0.is_finite(), "t0 must be finite");
202    let mut histos: HashMap<String, HashMap<String, u64>> = HashMap::new();
203    let mut prev_histos: HashMap<String, HashMap<String, u64>> = HashMap::new();
204    let mut current_bucket = 0_i64;
205    for r in rows.iter() {
206        let t = (r.created_time_us - t0) / 1e6;
207        let bucket = (t / SNOWSET_BUCKET_SECONDS) as i64;
208        if bucket != current_bucket {
209            flush_snowset_phase_deltas(stream, &histos, &prev_histos, current_bucket);
210            prev_histos = std::mem::take(&mut histos);
211            current_bucket = bucket;
212        }
213        *histos
214            .entry(r.warehouse_id.clone())
215            .or_default()
216            .entry(r.query_id.clone())
217            .or_insert(0) += 1;
218    }
219}
220
221fn flush_snowset_phase_deltas(
222    stream: &mut ResidualStream,
223    histos: &HashMap<String, HashMap<String, u64>>,
224    prev_histos: &HashMap<String, HashMap<String, u64>>,
225    current_bucket: i64,
226) {
227    debug_assert!(current_bucket >= 0, "bucket index non-negative");
228    for (wh, h) in histos.iter() {
229        if let Some(prev) = prev_histos.get(wh) {
230            let d = workload_phase::js_divergence(prev, h);
231            debug_assert!((0.0..=1.0).contains(&d), "JSD in [0,1]");
232            workload_phase::push_jsd(
233                stream,
234                current_bucket as f64 * SNOWSET_BUCKET_SECONDS,
235                wh,
236                d,
237            );
238        }
239    }
240}
241
242impl DatasetAdapter for Snowset {
243    fn name(&self) -> &'static str {
244        "snowset"
245    }
246
247    fn load(&self, path: &Path) -> Result<ResidualStream> {
248        let rows = load_snowset_rows(path)?;
249        debug_assert!(rows.len() <= MAX_SNOWSET_ROWS, "row-count bound enforced");
250        let mut stream = ResidualStream::new(format!(
251            "snowset@{}",
252            path.file_name().and_then(|n| n.to_str()).unwrap_or("?")
253        ));
254        let t0 = rows.first().map(|r| r.created_time_us).unwrap_or(0.0);
255        debug_assert!(t0.is_finite(), "t0 must be finite");
256
257        emit_snowset_plan_and_cache(&mut stream, &rows, t0);
258        emit_snowset_workload_phase(&mut stream, &rows, t0);
259
260        stream.sort();
261        Ok(stream)
262    }
263
264    fn exemplar(&self, seed: u64) -> ResidualStream {
265        let mut rng = rand_pcg::Pcg64::seed_from_u64(seed);
266        let mut stream = ResidualStream::new(format!("snowset-exemplar-seed{seed}"));
267        let warehouses = ["wh_a", "wh_b", "wh_c"];
268        let queries = ["q1", "q2", "q3", "q4", "q5"];
269        // Stable phase: 3000 s of low-residual traffic.
270        for i in 0..3000 {
271            let t = i as f64;
272            let w = warehouses[(i / 200) % warehouses.len()];
273            let q = queries[(i / 13) % queries.len()];
274            let base = 50.0;
275            let jitter: f64 = rng.gen_range(-3.0..3.0);
276            plan_regression::push_latency(&mut stream, t, &format!("{w}/{q}"), base + jitter, base);
277            cache_io::push_hit_ratio(&mut stream, t, w, 0.92, 0.92 + rng.gen_range(-0.01..0.01));
278        }
279        // Phase shift: warehouse `wh_b` adopts a heavier query mix; phase JSD
280        // crosses threshold around t=3300.
281        for i in 3000..6000 {
282            let t = i as f64;
283            let w = "wh_b";
284            let q = if rng.gen_bool(0.7) { "q_heavy" } else { "q5" };
285            let base = 80.0;
286            let jitter: f64 = rng.gen_range(-5.0..15.0);
287            plan_regression::push_latency(&mut stream, t, &format!("{w}/{q}"), base + jitter, base);
288            cache_io::push_hit_ratio(&mut stream, t, w, 0.92, 0.55 + rng.gen_range(-0.05..0.05));
289        }
290        // Synthetic JSD residual rising at the phase boundary
291        for k in 0..30 {
292            let t = 3000.0 + 50.0 * k as f64;
293            let d = if (10..20).contains(&k) {
294                0.4 + rng.gen_range(-0.05..0.05)
295            } else {
296                0.05 + rng.gen_range(0.0..0.03)
297            };
298            workload_phase::push_jsd(&mut stream, t, "wh_b", d);
299        }
300        stream.sort();
301        stream
302    }
303}