use super::DatasetAdapter;
use crate::residual::{cache_io, plan_regression, workload_phase, ResidualStream};
use anyhow::{Context, Result};
use chrono::DateTime;
use rand::Rng;
use rand::SeedableRng;
use std::collections::{HashMap, VecDeque};
use std::path::Path;
const MAX_SNOWSET_ROWS: usize = 100_000_000;
const SNOWSET_BASELINE_WIN: usize = 64;
const SNOWSET_BUCKET_SECONDS: f64 = 300.0;
pub struct Snowset;
#[derive(Debug, serde::Deserialize)]
struct RawRow {
#[serde(rename = "queryId")]
query_id: String,
#[serde(rename = "warehouseId")]
warehouse_id: String,
#[serde(rename = "createdTime")]
created_time: String,
#[serde(rename = "execTime")]
exec_time_us: f64,
#[serde(default, rename = "persistentReadBytesCache")]
bytes_cache: f64,
#[serde(default, rename = "persistentReadBytesS3")]
bytes_storage: f64,
}
struct Row {
query_id: String,
warehouse_id: String,
created_time_us: f64,
execution_time_us: f64,
bytes_cache: f64,
bytes_storage: f64,
}
fn parse_created_time(s: &str) -> Option<f64> {
let parsed = DateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f%:z")
.or_else(|_| DateTime::parse_from_rfc3339(s))
.ok()?;
Some(parsed.timestamp_micros() as f64)
}
fn load_snowset_rows(path: &Path) -> Result<Vec<Row>> {
let mut rdr = csv::Reader::from_path(path)
.with_context(|| format!("opening snowset subset at {}", path.display()))?;
let mut rows: Vec<Row> = Vec::new();
for r in rdr.deserialize().take(MAX_SNOWSET_ROWS) {
debug_assert!(rows.len() < MAX_SNOWSET_ROWS, "row-count bound enforced");
let raw: RawRow = match r {
Ok(r) => r,
Err(_) => continue,
};
let Some(created_time_us) = parse_created_time(&raw.created_time) else {
continue;
};
if !raw.exec_time_us.is_finite() {
continue;
}
rows.push(Row {
query_id: raw.query_id,
warehouse_id: raw.warehouse_id,
created_time_us,
execution_time_us: raw.exec_time_us,
bytes_cache: raw.bytes_cache,
bytes_storage: raw.bytes_storage,
});
}
rows.sort_by(|a, b| {
a.created_time_us
.partial_cmp(&b.created_time_us)
.unwrap_or(std::cmp::Ordering::Equal)
});
Ok(rows)
}
fn emit_snowset_plan_and_cache(stream: &mut ResidualStream, rows: &[Row], t0: f64) {
debug_assert!(t0.is_finite(), "t0 must be finite");
let mut baselines: HashMap<(String, String), VecDeque<f64>> = HashMap::new();
let mut cache_baseline: HashMap<String, VecDeque<f64>> = HashMap::new();
for r in rows.iter() {
let t = (r.created_time_us - t0) / 1e6;
emit_snowset_plan_sample(stream, &mut baselines, r, t);
emit_snowset_cache_sample(stream, &mut cache_baseline, r, t);
}
}
fn emit_snowset_plan_sample(
stream: &mut ResidualStream,
baselines: &mut HashMap<(String, String), VecDeque<f64>>,
r: &Row,
t: f64,
) {
debug_assert!(t.is_finite(), "t must be finite");
debug_assert!(
r.execution_time_us.is_finite(),
"execution_time_us must be finite"
);
let key = (r.warehouse_id.clone(), r.query_id.clone());
let q = baselines.entry(key).or_default();
let baseline = if q.is_empty() {
r.execution_time_us
} else {
q.iter().sum::<f64>() / q.len() as f64
};
plan_regression::push_latency(
stream,
t,
&format!("{}/{}", r.warehouse_id, r.query_id),
r.execution_time_us / 1e3,
baseline / 1e3,
);
q.push_back(r.execution_time_us);
if q.len() > SNOWSET_BASELINE_WIN {
q.pop_front();
}
debug_assert!(
q.len() <= SNOWSET_BASELINE_WIN,
"rolling window bound enforced"
);
}
fn emit_snowset_cache_sample(
stream: &mut ResidualStream,
cache_baseline: &mut HashMap<String, VecDeque<f64>>,
r: &Row,
t: f64,
) {
debug_assert!(t.is_finite(), "t must be finite");
let total = r.bytes_cache + r.bytes_storage;
if total <= 0.0 {
return;
}
let cache_ratio = r.bytes_cache / total;
debug_assert!((0.0..=1.0).contains(&cache_ratio), "cache ratio in [0,1]");
let cb = cache_baseline.entry(r.warehouse_id.clone()).or_default();
let expected = if cb.is_empty() {
cache_ratio
} else {
cb.iter().sum::<f64>() / cb.len() as f64
};
cache_io::push_hit_ratio(stream, t, &r.warehouse_id, expected, cache_ratio);
cb.push_back(cache_ratio);
if cb.len() > SNOWSET_BASELINE_WIN {
cb.pop_front();
}
debug_assert!(
cb.len() <= SNOWSET_BASELINE_WIN,
"rolling window bound enforced"
);
}
fn emit_snowset_workload_phase(stream: &mut ResidualStream, rows: &[Row], t0: f64) {
debug_assert!(t0.is_finite(), "t0 must be finite");
let mut histos: HashMap<String, HashMap<String, u64>> = HashMap::new();
let mut prev_histos: HashMap<String, HashMap<String, u64>> = HashMap::new();
let mut current_bucket = 0_i64;
for r in rows.iter() {
let t = (r.created_time_us - t0) / 1e6;
let bucket = (t / SNOWSET_BUCKET_SECONDS) as i64;
if bucket != current_bucket {
flush_snowset_phase_deltas(stream, &histos, &prev_histos, current_bucket);
prev_histos = std::mem::take(&mut histos);
current_bucket = bucket;
}
*histos
.entry(r.warehouse_id.clone())
.or_default()
.entry(r.query_id.clone())
.or_insert(0) += 1;
}
}
fn flush_snowset_phase_deltas(
stream: &mut ResidualStream,
histos: &HashMap<String, HashMap<String, u64>>,
prev_histos: &HashMap<String, HashMap<String, u64>>,
current_bucket: i64,
) {
debug_assert!(current_bucket >= 0, "bucket index non-negative");
for (wh, h) in histos.iter() {
if let Some(prev) = prev_histos.get(wh) {
let d = workload_phase::js_divergence(prev, h);
debug_assert!((0.0..=1.0).contains(&d), "JSD in [0,1]");
workload_phase::push_jsd(
stream,
current_bucket as f64 * SNOWSET_BUCKET_SECONDS,
wh,
d,
);
}
}
}
impl DatasetAdapter for Snowset {
fn name(&self) -> &'static str {
"snowset"
}
fn load(&self, path: &Path) -> Result<ResidualStream> {
let rows = load_snowset_rows(path)?;
debug_assert!(rows.len() <= MAX_SNOWSET_ROWS, "row-count bound enforced");
let mut stream = ResidualStream::new(format!(
"snowset@{}",
path.file_name().and_then(|n| n.to_str()).unwrap_or("?")
));
let t0 = rows.first().map(|r| r.created_time_us).unwrap_or(0.0);
debug_assert!(t0.is_finite(), "t0 must be finite");
emit_snowset_plan_and_cache(&mut stream, &rows, t0);
emit_snowset_workload_phase(&mut stream, &rows, t0);
stream.sort();
Ok(stream)
}
fn exemplar(&self, seed: u64) -> ResidualStream {
let mut rng = rand_pcg::Pcg64::seed_from_u64(seed);
let mut stream = ResidualStream::new(format!("snowset-exemplar-seed{seed}"));
let warehouses = ["wh_a", "wh_b", "wh_c"];
let queries = ["q1", "q2", "q3", "q4", "q5"];
for i in 0..3000 {
let t = i as f64;
let w = warehouses[(i / 200) % warehouses.len()];
let q = queries[(i / 13) % queries.len()];
let base = 50.0;
let jitter: f64 = rng.gen_range(-3.0..3.0);
plan_regression::push_latency(&mut stream, t, &format!("{w}/{q}"), base + jitter, base);
cache_io::push_hit_ratio(&mut stream, t, w, 0.92, 0.92 + rng.gen_range(-0.01..0.01));
}
for i in 3000..6000 {
let t = i as f64;
let w = "wh_b";
let q = if rng.gen_bool(0.7) { "q_heavy" } else { "q5" };
let base = 80.0;
let jitter: f64 = rng.gen_range(-5.0..15.0);
plan_regression::push_latency(&mut stream, t, &format!("{w}/{q}"), base + jitter, base);
cache_io::push_hit_ratio(&mut stream, t, w, 0.92, 0.55 + rng.gen_range(-0.05..0.05));
}
for k in 0..30 {
let t = 3000.0 + 50.0 * k as f64;
let d = if (10..20).contains(&k) {
0.4 + rng.gen_range(-0.05..0.05)
} else {
0.05 + rng.gen_range(0.0..0.03)
};
workload_phase::push_jsd(&mut stream, t, "wh_b", d);
}
stream.sort();
stream
}
}