Skip to main content

dsfb_database/adapters/
tpcds.rs

1//! TPC-DS adapter.
2//!
3//! TPC-DS is a benchmark *control* environment, not a real workload — we
4//! use it for the perturbation harness (see `crate::perturbation`) so that
5//! every motif class can be evaluated against an injected, known-window
6//! ground truth. The crate ships a fully-deterministic exemplar that
7//! mimics the structure of a TPC-DS scale-1 trace under each perturbation
8//! class. To run on real TPC-DS data, install `duckdb` with the `tpcds`
9//! extension and call `scripts/build_tpcds.sh` which writes a CSV in the
10//! same format as the exemplar.
11//!
12//! Trace CSV columns:
13//!   * `query_id` (one of `q1`..`q99`)
14//!   * `t_seconds` (wall-clock since trace start)
15//!   * `latency_ms`, `est_rows`, `actual_rows`
16//!   * optional `wait_event`, `wait_seconds`, `cache_hit_ratio`
17
18use super::DatasetAdapter;
19use crate::residual::{
20    cache_io, cardinality, contention, plan_regression, workload_phase, ResidualStream,
21};
22use anyhow::{Context, Result};
23use rand::{Rng, SeedableRng};
24use std::collections::{HashMap, VecDeque};
25use std::path::Path;
26
27/// Upper bound on TPC-DS rows loaded from a CSV. TPC-DS scale-1 on a
28/// 99-query workload produces ~10k-50k rows depending on iterations;
29/// 100M is a cap that still accepts scale-1000 long runs.
30const MAX_TPCDS_ROWS: usize = 100_000_000;
31
32/// Rolling-baseline window for per-query latency.
33const TPCDS_BASELINE_WIN: usize = 16;
34
35/// Workload-phase histogram bucket width in seconds.
36const TPCDS_BUCKET_SECONDS: f64 = 30.0;
37
38/// Target cache-hit ratio treated as the reference point for
39/// cache-I/O residuals (`cache_io::push_hit_ratio`). Matches the §7
40/// default for the TPC-DS exemplar.
41const TPCDS_CACHE_TARGET_RATIO: f64 = 0.95;
42
43pub struct TpcDs;
44
45#[derive(Debug, serde::Deserialize)]
46struct Row {
47    query_id: String,
48    t_seconds: f64,
49    latency_ms: f64,
50    est_rows: f64,
51    actual_rows: f64,
52    #[serde(default)]
53    wait_event: String,
54    #[serde(default)]
55    wait_seconds: f64,
56    #[serde(default)]
57    cache_hit_ratio: f64,
58}
59
60fn load_tpcds_rows(path: &Path) -> Result<Vec<Row>> {
61    let mut rdr = csv::Reader::from_path(path)
62        .with_context(|| format!("opening tpcds csv at {}", path.display()))?;
63    let mut rows: Vec<Row> = rdr
64        .deserialize()
65        .filter_map(Result::ok)
66        .take(MAX_TPCDS_ROWS)
67        .collect();
68    debug_assert!(rows.len() <= MAX_TPCDS_ROWS, "iterator bound enforced");
69    rows.sort_by(|a, b| {
70        a.t_seconds
71            .partial_cmp(&b.t_seconds)
72            .unwrap_or(std::cmp::Ordering::Equal)
73    });
74    Ok(rows)
75}
76
77fn emit_tpcds_residuals(stream: &mut ResidualStream, rows: &[Row]) {
78    let mut baselines: HashMap<String, VecDeque<f64>> = HashMap::new();
79    let mut histos: HashMap<String, u64> = HashMap::new();
80    let mut prev_histos: HashMap<String, u64> = HashMap::new();
81    let mut current_bucket: i64 = 0;
82
83    for r in rows.iter() {
84        cardinality::push(stream, r.t_seconds, &r.query_id, r.est_rows, r.actual_rows);
85        emit_tpcds_plan_regression(stream, &mut baselines, r);
86        if !r.wait_event.is_empty() && r.wait_seconds > 0.0 {
87            contention::push_wait(stream, r.t_seconds, &r.wait_event, r.wait_seconds);
88        }
89        if r.cache_hit_ratio > 0.0 {
90            cache_io::push_hit_ratio(
91                stream,
92                r.t_seconds,
93                "tpcds",
94                TPCDS_CACHE_TARGET_RATIO,
95                r.cache_hit_ratio,
96            );
97        }
98        let bucket = (r.t_seconds / TPCDS_BUCKET_SECONDS) as i64;
99        if bucket != current_bucket {
100            let d = workload_phase::js_divergence(&prev_histos, &histos);
101            debug_assert!((0.0..=1.0).contains(&d), "JSD is in [0,1]");
102            workload_phase::push_jsd(
103                stream,
104                current_bucket as f64 * TPCDS_BUCKET_SECONDS,
105                "tpcds",
106                d,
107            );
108            prev_histos = std::mem::take(&mut histos);
109            current_bucket = bucket;
110        }
111        *histos.entry(r.query_id.clone()).or_insert(0) += 1;
112    }
113}
114
115fn emit_tpcds_plan_regression(
116    stream: &mut ResidualStream,
117    baselines: &mut HashMap<String, VecDeque<f64>>,
118    r: &Row,
119) {
120    debug_assert!(r.latency_ms.is_finite(), "latency must be finite");
121    debug_assert!(r.t_seconds.is_finite(), "t_seconds must be finite");
122    let q = baselines.entry(r.query_id.clone()).or_default();
123    let baseline = if q.is_empty() {
124        r.latency_ms
125    } else {
126        q.iter().sum::<f64>() / q.len() as f64
127    };
128    plan_regression::push_latency(stream, r.t_seconds, &r.query_id, r.latency_ms, baseline);
129    q.push_back(r.latency_ms);
130    if q.len() > TPCDS_BASELINE_WIN {
131        q.pop_front();
132    }
133    debug_assert!(
134        q.len() <= TPCDS_BASELINE_WIN,
135        "rolling window bound enforced"
136    );
137}
138
139impl DatasetAdapter for TpcDs {
140    fn name(&self) -> &'static str {
141        "tpcds"
142    }
143
144    fn load(&self, path: &Path) -> Result<ResidualStream> {
145        let rows = load_tpcds_rows(path)?;
146        debug_assert!(rows.len() <= MAX_TPCDS_ROWS, "row-count bound enforced");
147        let mut stream = ResidualStream::new(format!(
148            "tpcds@{}",
149            path.file_name().and_then(|n| n.to_str()).unwrap_or("?")
150        ));
151        emit_tpcds_residuals(&mut stream, &rows);
152        stream.sort();
153        Ok(stream)
154    }
155
156    fn exemplar(&self, seed: u64) -> ResidualStream {
157        // The exemplar is built by the perturbation harness; see
158        // `crate::perturbation::tpcds_with_perturbations`. This function
159        // returns a clean baseline (no perturbations) for unit tests.
160        let mut rng = rand_pcg::Pcg64::seed_from_u64(seed);
161        let mut stream = ResidualStream::new(format!("tpcds-exemplar-seed{seed}"));
162        for q in 1..=99 {
163            let qid = format!("q{}", q);
164            for it in 0..30 {
165                let t = q as f64 * 30.0 + it as f64;
166                let true_rows: f64 = 5000.0 * (1.0 + rng.gen_range(0.0..0.4));
167                let est_rows = true_rows * (1.0 + rng.gen_range(-0.08..0.08));
168                cardinality::push(&mut stream, t, &qid, est_rows, true_rows);
169                let base = 50.0_f64;
170                plan_regression::push_latency(
171                    &mut stream,
172                    t,
173                    &qid,
174                    base + rng.gen_range(-2.0..2.0),
175                    base,
176                );
177                cache_io::push_hit_ratio(
178                    &mut stream,
179                    t,
180                    "tpcds",
181                    0.95,
182                    0.95 + rng.gen_range(-0.005..0.005),
183                );
184            }
185        }
186        stream.sort();
187        stream
188    }
189}