dsfb_database/adapters/ceb.rs
1//! CEB adapter (Cardinality Estimation Benchmark, Negi et al.).
2//!
3//! Real subset CSV columns (after `scripts/fetch_ceb.sh` exports the
4//! pickle to CSV):
5//! * `query_id`, `subplan_id`
6//! * `true_rows` (ground truth from PostgreSQL `EXPLAIN ANALYZE`)
7//! * `est_rows` (PostgreSQL optimiser estimate)
8//! * optional: `query_class`, `template_id`
9//!
10//! What we extract:
11//! * `Cardinality` — `log10(true_rows / est_rows)` per `(query_id, subplan_id)`.
12//! This is the only public dataset in our stack with **ground-truth
13//! cardinalities**, which is why we treat its results as the
14//! cardinality-mismatch motif's primary empirical evidence.
15//!
16//! What we cannot extract:
17//! * Latency, plan changes over time, contention, cache I/O — CEB is a
18//! batch benchmark, not a temporal trace; we synthesise time as
19//! `t = subplan_index_within_query + query_index * 1.0` so the trace is
20//! well-defined.
21
22use super::DatasetAdapter;
23use crate::residual::{cardinality, ResidualStream};
24use anyhow::{Context, Result};
25use rand::{Rng, SeedableRng};
26use std::collections::HashMap;
27use std::path::Path;
28
29/// Upper bound on the number of CEB rows loaded from a CSV. The
30/// published CEB release is ~13k rows; this ~7000× headroom catches
31/// runaway input without silently truncating realistic datasets.
32const MAX_CEB_ROWS: usize = 100_000_000;
33
34pub struct Ceb;
35
36#[derive(Debug, serde::Deserialize)]
37struct Row {
38 query_id: String,
39 subplan_id: String,
40 true_rows: f64,
41 est_rows: f64,
42}
43
44impl DatasetAdapter for Ceb {
45 fn name(&self) -> &'static str {
46 "ceb"
47 }
48
49 fn load(&self, path: &Path) -> Result<ResidualStream> {
50 let mut rdr = csv::Reader::from_path(path)
51 .with_context(|| format!("opening ceb csv at {}", path.display()))?;
52 let mut rows: Vec<Row> = rdr
53 .deserialize()
54 .filter_map(Result::ok)
55 .take(MAX_CEB_ROWS)
56 .collect();
57 debug_assert!(rows.len() <= MAX_CEB_ROWS, "iterator bound enforced");
58 rows.sort_by(|a, b| a.query_id.cmp(&b.query_id));
59 let mut stream = ResidualStream::new(format!(
60 "ceb@{}",
61 path.file_name().and_then(|n| n.to_str()).unwrap_or("?")
62 ));
63 // Synthesise a time index: each query gets one second; subplans
64 // within a query are spaced 0.01 s apart.
65 let mut q_index: HashMap<String, usize> = HashMap::new();
66 let mut sp_index: HashMap<String, usize> = HashMap::new();
67 for r in &rows {
68 let next_q = q_index.len();
69 let qi = *q_index.entry(r.query_id.clone()).or_insert(next_q);
70 let next_sp = sp_index.len();
71 let sp = *sp_index
72 .entry(format!("{}#{}", r.query_id, r.subplan_id))
73 .or_insert(next_sp);
74 let t = qi as f64 + (sp % 100) as f64 * 0.01;
75 cardinality::push(
76 &mut stream,
77 t,
78 &format!("{}#{}", r.query_id, r.subplan_id),
79 r.est_rows,
80 r.true_rows,
81 );
82 }
83 stream.sort();
84 Ok(stream)
85 }
86
87 fn exemplar(&self, seed: u64) -> ResidualStream {
88 let mut rng = rand_pcg::Pcg64::seed_from_u64(seed);
89 let mut stream = ResidualStream::new(format!("ceb-exemplar-seed{seed}"));
90 // 200 queries; subplans 1..=10. The first 100 queries have
91 // well-calibrated estimates; queries 100..200 develop a 30x
92 // under-estimate on subplan 7 (a join with stale stats).
93 //
94 // Channels in the exemplar are *per-subplan-template* (`sp{n}`),
95 // not per-(query, subplan), because the operator-meaningful
96 // grouping for cardinality drift is "the same logical join
97 // across queries" — and a per-(query, subplan) channel would
98 // contain a single sample, which the EMA cannot smooth.
99 // Real-data CEB load (`load(path)`) keeps the
100 // per-(query, subplan) channel because each subplan in CEB is a
101 // distinct statement; the exemplar collapses for demo clarity.
102 for q in 0..200 {
103 for sp in 1..=10 {
104 let t = q as f64 + (sp as f64) * 0.01;
105 let true_rows: f64 = 1000.0_f64 * (1.0 + rng.gen_range(0.0..2.0));
106 let est_rows: f64 = if q >= 100 && sp == 7 {
107 true_rows / 30.0
108 } else {
109 true_rows * (1.0 + rng.gen_range(-0.1..0.1))
110 };
111 cardinality::push(&mut stream, t, &format!("sp{sp}"), est_rows, true_rows);
112 }
113 }
114 stream.sort();
115 stream
116 }
117}