use super::DatasetAdapter;
use crate::residual::{cardinality, ResidualStream};
use anyhow::{Context, Result};
use rand::{Rng, SeedableRng};
use std::collections::HashMap;
use std::path::Path;
const MAX_CEB_ROWS: usize = 100_000_000;
pub struct Ceb;
#[derive(Debug, serde::Deserialize)]
struct Row {
query_id: String,
subplan_id: String,
true_rows: f64,
est_rows: f64,
}
impl DatasetAdapter for Ceb {
fn name(&self) -> &'static str {
"ceb"
}
fn load(&self, path: &Path) -> Result<ResidualStream> {
let mut rdr = csv::Reader::from_path(path)
.with_context(|| format!("opening ceb csv at {}", path.display()))?;
let mut rows: Vec<Row> = rdr
.deserialize()
.filter_map(Result::ok)
.take(MAX_CEB_ROWS)
.collect();
debug_assert!(rows.len() <= MAX_CEB_ROWS, "iterator bound enforced");
rows.sort_by(|a, b| a.query_id.cmp(&b.query_id));
let mut stream = ResidualStream::new(format!(
"ceb@{}",
path.file_name().and_then(|n| n.to_str()).unwrap_or("?")
));
let mut q_index: HashMap<String, usize> = HashMap::new();
let mut sp_index: HashMap<String, usize> = HashMap::new();
for r in &rows {
let next_q = q_index.len();
let qi = *q_index.entry(r.query_id.clone()).or_insert(next_q);
let next_sp = sp_index.len();
let sp = *sp_index
.entry(format!("{}#{}", r.query_id, r.subplan_id))
.or_insert(next_sp);
let t = qi as f64 + (sp % 100) as f64 * 0.01;
cardinality::push(
&mut stream,
t,
&format!("{}#{}", r.query_id, r.subplan_id),
r.est_rows,
r.true_rows,
);
}
stream.sort();
Ok(stream)
}
fn exemplar(&self, seed: u64) -> ResidualStream {
let mut rng = rand_pcg::Pcg64::seed_from_u64(seed);
let mut stream = ResidualStream::new(format!("ceb-exemplar-seed{seed}"));
for q in 0..200 {
for sp in 1..=10 {
let t = q as f64 + (sp as f64) * 0.01;
let true_rows: f64 = 1000.0_f64 * (1.0 + rng.gen_range(0.0..2.0));
let est_rows: f64 = if q >= 100 && sp == 7 {
true_rows / 30.0
} else {
true_rows * (1.0 + rng.gen_range(-0.1..0.1))
};
cardinality::push(&mut stream, t, &format!("sp{sp}"), est_rows, true_rows);
}
}
stream.sort();
stream
}
}