dsfb_database/adapters/
tpcds.rs1use super::DatasetAdapter;
19use crate::residual::{
20 cache_io, cardinality, contention, plan_regression, workload_phase, ResidualStream,
21};
22use anyhow::{Context, Result};
23use rand::{Rng, SeedableRng};
24use std::collections::{HashMap, VecDeque};
25use std::path::Path;
26
27const MAX_TPCDS_ROWS: usize = 100_000_000;
31
32const TPCDS_BASELINE_WIN: usize = 16;
34
35const TPCDS_BUCKET_SECONDS: f64 = 30.0;
37
38const TPCDS_CACHE_TARGET_RATIO: f64 = 0.95;
42
43pub struct TpcDs;
44
45#[derive(Debug, serde::Deserialize)]
46struct Row {
47 query_id: String,
48 t_seconds: f64,
49 latency_ms: f64,
50 est_rows: f64,
51 actual_rows: f64,
52 #[serde(default)]
53 wait_event: String,
54 #[serde(default)]
55 wait_seconds: f64,
56 #[serde(default)]
57 cache_hit_ratio: f64,
58}
59
60fn load_tpcds_rows(path: &Path) -> Result<Vec<Row>> {
61 let mut rdr = csv::Reader::from_path(path)
62 .with_context(|| format!("opening tpcds csv at {}", path.display()))?;
63 let mut rows: Vec<Row> = rdr
64 .deserialize()
65 .filter_map(Result::ok)
66 .take(MAX_TPCDS_ROWS)
67 .collect();
68 debug_assert!(rows.len() <= MAX_TPCDS_ROWS, "iterator bound enforced");
69 rows.sort_by(|a, b| {
70 a.t_seconds
71 .partial_cmp(&b.t_seconds)
72 .unwrap_or(std::cmp::Ordering::Equal)
73 });
74 Ok(rows)
75}
76
77fn emit_tpcds_residuals(stream: &mut ResidualStream, rows: &[Row]) {
78 let mut baselines: HashMap<String, VecDeque<f64>> = HashMap::new();
79 let mut histos: HashMap<String, u64> = HashMap::new();
80 let mut prev_histos: HashMap<String, u64> = HashMap::new();
81 let mut current_bucket: i64 = 0;
82
83 for r in rows.iter() {
84 cardinality::push(stream, r.t_seconds, &r.query_id, r.est_rows, r.actual_rows);
85 emit_tpcds_plan_regression(stream, &mut baselines, r);
86 if !r.wait_event.is_empty() && r.wait_seconds > 0.0 {
87 contention::push_wait(stream, r.t_seconds, &r.wait_event, r.wait_seconds);
88 }
89 if r.cache_hit_ratio > 0.0 {
90 cache_io::push_hit_ratio(
91 stream,
92 r.t_seconds,
93 "tpcds",
94 TPCDS_CACHE_TARGET_RATIO,
95 r.cache_hit_ratio,
96 );
97 }
98 let bucket = (r.t_seconds / TPCDS_BUCKET_SECONDS) as i64;
99 if bucket != current_bucket {
100 let d = workload_phase::js_divergence(&prev_histos, &histos);
101 debug_assert!((0.0..=1.0).contains(&d), "JSD is in [0,1]");
102 workload_phase::push_jsd(
103 stream,
104 current_bucket as f64 * TPCDS_BUCKET_SECONDS,
105 "tpcds",
106 d,
107 );
108 prev_histos = std::mem::take(&mut histos);
109 current_bucket = bucket;
110 }
111 *histos.entry(r.query_id.clone()).or_insert(0) += 1;
112 }
113}
114
115fn emit_tpcds_plan_regression(
116 stream: &mut ResidualStream,
117 baselines: &mut HashMap<String, VecDeque<f64>>,
118 r: &Row,
119) {
120 debug_assert!(r.latency_ms.is_finite(), "latency must be finite");
121 debug_assert!(r.t_seconds.is_finite(), "t_seconds must be finite");
122 let q = baselines.entry(r.query_id.clone()).or_default();
123 let baseline = if q.is_empty() {
124 r.latency_ms
125 } else {
126 q.iter().sum::<f64>() / q.len() as f64
127 };
128 plan_regression::push_latency(stream, r.t_seconds, &r.query_id, r.latency_ms, baseline);
129 q.push_back(r.latency_ms);
130 if q.len() > TPCDS_BASELINE_WIN {
131 q.pop_front();
132 }
133 debug_assert!(
134 q.len() <= TPCDS_BASELINE_WIN,
135 "rolling window bound enforced"
136 );
137}
138
139impl DatasetAdapter for TpcDs {
140 fn name(&self) -> &'static str {
141 "tpcds"
142 }
143
144 fn load(&self, path: &Path) -> Result<ResidualStream> {
145 let rows = load_tpcds_rows(path)?;
146 debug_assert!(rows.len() <= MAX_TPCDS_ROWS, "row-count bound enforced");
147 let mut stream = ResidualStream::new(format!(
148 "tpcds@{}",
149 path.file_name().and_then(|n| n.to_str()).unwrap_or("?")
150 ));
151 emit_tpcds_residuals(&mut stream, &rows);
152 stream.sort();
153 Ok(stream)
154 }
155
156 fn exemplar(&self, seed: u64) -> ResidualStream {
157 let mut rng = rand_pcg::Pcg64::seed_from_u64(seed);
161 let mut stream = ResidualStream::new(format!("tpcds-exemplar-seed{seed}"));
162 for q in 1..=99 {
163 let qid = format!("q{}", q);
164 for it in 0..30 {
165 let t = q as f64 * 30.0 + it as f64;
166 let true_rows: f64 = 5000.0 * (1.0 + rng.gen_range(0.0..0.4));
167 let est_rows = true_rows * (1.0 + rng.gen_range(-0.08..0.08));
168 cardinality::push(&mut stream, t, &qid, est_rows, true_rows);
169 let base = 50.0_f64;
170 plan_regression::push_latency(
171 &mut stream,
172 t,
173 &qid,
174 base + rng.gen_range(-2.0..2.0),
175 base,
176 );
177 cache_io::push_hit_ratio(
178 &mut stream,
179 t,
180 "tpcds",
181 0.95,
182 0.95 + rng.gen_range(-0.005..0.005),
183 );
184 }
185 }
186 stream.sort();
187 stream
188 }
189}