Skip to main content

dsfb_database/adapters/
job.rs

1//! JOB adapter (Join Order Benchmark, Leis et al., VLDB 2015).
2//!
3//! Real subset CSV columns (after running JOB through DuckDB / PostgreSQL
4//! with `EXPLAIN ANALYZE` and exporting via `scripts/fetch_ceb.sh`-style
5//! tooling — JOB itself is just the 113 SQL files; you generate the trace):
6//!   * `query_id` (e.g. `1a`, `33c`)
7//!   * `iteration` (replay number)
8//!   * `est_rows`, `actual_rows` per top-level result
9//!   * `latency_ms`
10//!   * `plan_hash` (SHA-1 of the EXPLAIN tree, for plan-change detection)
11//!
12//! What we extract:
13//!   * `Cardinality` — `log10(actual / est)` per query (top-level).
14//!   * `PlanRegression` — latency residual per query class + plan-hash
15//!     transition events.
16//!
17//! What we cannot extract:
18//!   * `Contention`, `CacheIo`, `WorkloadPhase` (single-tenant replay; no
19//!     phase changes in a 113-query benchmark).
20
21use super::DatasetAdapter;
22use crate::residual::{cardinality, plan_regression, ResidualStream};
23use anyhow::{Context, Result};
24use rand::{Rng, SeedableRng};
25use std::collections::{HashMap, VecDeque};
26use std::path::Path;
27
28/// Upper bound on the number of JOB rows loaded from a CSV. The
29/// original Join Order Benchmark has 113 queries; even with repeated
30/// iterations and plan-variant rows this cap catches runaway inputs
31/// without truncating realistic traces.
32const MAX_JOB_ROWS: usize = 100_000_000;
33
34pub struct Job;
35
36#[derive(Debug, serde::Deserialize)]
37struct Row {
38    query_id: String,
39    iteration: u64,
40    est_rows: f64,
41    actual_rows: f64,
42    latency_ms: f64,
43    #[serde(default)]
44    plan_hash: String,
45}
46
47impl DatasetAdapter for Job {
48    fn name(&self) -> &'static str {
49        "job"
50    }
51
52    fn load(&self, path: &Path) -> Result<ResidualStream> {
53        let mut rdr = csv::Reader::from_path(path)
54            .with_context(|| format!("opening job csv at {}", path.display()))?;
55        let mut rows: Vec<Row> = rdr
56            .deserialize()
57            .filter_map(Result::ok)
58            .take(MAX_JOB_ROWS)
59            .collect();
60        debug_assert!(rows.len() <= MAX_JOB_ROWS, "iterator bound enforced");
61        rows.sort_by(|a, b| {
62            (a.iteration, a.query_id.clone()).cmp(&(b.iteration, b.query_id.clone()))
63        });
64        let mut stream = ResidualStream::new(format!(
65            "job@{}",
66            path.file_name().and_then(|n| n.to_str()).unwrap_or("?")
67        ));
68        let mut last_hash: HashMap<String, String> = HashMap::new();
69        let mut baselines: HashMap<String, VecDeque<f64>> = HashMap::new();
70        const WIN: usize = 8;
71        let mut t: f64 = 0.0;
72        for r in &rows {
73            cardinality::push(&mut stream, t, &r.query_id, r.est_rows, r.actual_rows);
74            let q = baselines.entry(r.query_id.clone()).or_default();
75            let baseline = if q.is_empty() {
76                r.latency_ms
77            } else {
78                q.iter().sum::<f64>() / q.len() as f64
79            };
80            plan_regression::push_latency(&mut stream, t, &r.query_id, r.latency_ms, baseline);
81            q.push_back(r.latency_ms);
82            if q.len() > WIN {
83                q.pop_front();
84            }
85            if !r.plan_hash.is_empty() {
86                let prev = last_hash.get(&r.query_id).cloned().unwrap_or_default();
87                if !prev.is_empty() && prev != r.plan_hash {
88                    plan_regression::push_plan_change(&mut stream, t, &r.query_id);
89                }
90                last_hash.insert(r.query_id.clone(), r.plan_hash.clone());
91            }
92            t += 1.0;
93        }
94        stream.sort();
95        Ok(stream)
96    }
97
98    fn exemplar(&self, seed: u64) -> ResidualStream {
99        let mut rng = rand_pcg::Pcg64::seed_from_u64(seed);
100        let mut stream = ResidualStream::new(format!("job-exemplar-seed{seed}"));
101        // 33 queries × 5 iterations; query 17 develops a plan regression at
102        // iteration 3 (latency jumps 8x and est/actual ratio jumps to 25x).
103        let mut t = 0.0;
104        for it in 0..5 {
105            for q in 1..=33 {
106                let qid = format!("q{:02}", q);
107                let true_rows: f64 = 1000.0_f64 * (1.0 + rng.gen_range(0.0..0.5));
108                let est_rows = if q == 17 && it >= 3 {
109                    true_rows / 25.0
110                } else {
111                    true_rows * (1.0 + rng.gen_range(-0.05..0.05))
112                };
113                cardinality::push(&mut stream, t, &qid, est_rows, true_rows);
114                let baseline = 100.0;
115                let latency = if q == 17 && it >= 3 {
116                    800.0 + rng.gen_range(-30.0..30.0)
117                } else {
118                    100.0 + rng.gen_range(-5.0..5.0)
119                };
120                plan_regression::push_latency(&mut stream, t, &qid, latency, baseline);
121                if q == 17 && it == 3 {
122                    plan_regression::push_plan_change(&mut stream, t, &qid);
123                }
124                t += 1.0;
125            }
126        }
127        stream.sort();
128        stream
129    }
130}