dsfb_database/adapters/
job.rs1use super::DatasetAdapter;
22use crate::residual::{cardinality, plan_regression, ResidualStream};
23use anyhow::{Context, Result};
24use rand::{Rng, SeedableRng};
25use std::collections::{HashMap, VecDeque};
26use std::path::Path;
27
28const MAX_JOB_ROWS: usize = 100_000_000;
33
34pub struct Job;
35
36#[derive(Debug, serde::Deserialize)]
37struct Row {
38 query_id: String,
39 iteration: u64,
40 est_rows: f64,
41 actual_rows: f64,
42 latency_ms: f64,
43 #[serde(default)]
44 plan_hash: String,
45}
46
47impl DatasetAdapter for Job {
48 fn name(&self) -> &'static str {
49 "job"
50 }
51
52 fn load(&self, path: &Path) -> Result<ResidualStream> {
53 let mut rdr = csv::Reader::from_path(path)
54 .with_context(|| format!("opening job csv at {}", path.display()))?;
55 let mut rows: Vec<Row> = rdr
56 .deserialize()
57 .filter_map(Result::ok)
58 .take(MAX_JOB_ROWS)
59 .collect();
60 debug_assert!(rows.len() <= MAX_JOB_ROWS, "iterator bound enforced");
61 rows.sort_by(|a, b| {
62 (a.iteration, a.query_id.clone()).cmp(&(b.iteration, b.query_id.clone()))
63 });
64 let mut stream = ResidualStream::new(format!(
65 "job@{}",
66 path.file_name().and_then(|n| n.to_str()).unwrap_or("?")
67 ));
68 let mut last_hash: HashMap<String, String> = HashMap::new();
69 let mut baselines: HashMap<String, VecDeque<f64>> = HashMap::new();
70 const WIN: usize = 8;
71 let mut t: f64 = 0.0;
72 for r in &rows {
73 cardinality::push(&mut stream, t, &r.query_id, r.est_rows, r.actual_rows);
74 let q = baselines.entry(r.query_id.clone()).or_default();
75 let baseline = if q.is_empty() {
76 r.latency_ms
77 } else {
78 q.iter().sum::<f64>() / q.len() as f64
79 };
80 plan_regression::push_latency(&mut stream, t, &r.query_id, r.latency_ms, baseline);
81 q.push_back(r.latency_ms);
82 if q.len() > WIN {
83 q.pop_front();
84 }
85 if !r.plan_hash.is_empty() {
86 let prev = last_hash.get(&r.query_id).cloned().unwrap_or_default();
87 if !prev.is_empty() && prev != r.plan_hash {
88 plan_regression::push_plan_change(&mut stream, t, &r.query_id);
89 }
90 last_hash.insert(r.query_id.clone(), r.plan_hash.clone());
91 }
92 t += 1.0;
93 }
94 stream.sort();
95 Ok(stream)
96 }
97
98 fn exemplar(&self, seed: u64) -> ResidualStream {
99 let mut rng = rand_pcg::Pcg64::seed_from_u64(seed);
100 let mut stream = ResidualStream::new(format!("job-exemplar-seed{seed}"));
101 let mut t = 0.0;
104 for it in 0..5 {
105 for q in 1..=33 {
106 let qid = format!("q{:02}", q);
107 let true_rows: f64 = 1000.0_f64 * (1.0 + rng.gen_range(0.0..0.5));
108 let est_rows = if q == 17 && it >= 3 {
109 true_rows / 25.0
110 } else {
111 true_rows * (1.0 + rng.gen_range(-0.05..0.05))
112 };
113 cardinality::push(&mut stream, t, &qid, est_rows, true_rows);
114 let baseline = 100.0;
115 let latency = if q == 17 && it >= 3 {
116 800.0 + rng.gen_range(-30.0..30.0)
117 } else {
118 100.0 + rng.gen_range(-5.0..5.0)
119 };
120 plan_regression::push_latency(&mut stream, t, &qid, latency, baseline);
121 if q == 17 && it == 3 {
122 plan_regression::push_plan_change(&mut stream, t, &qid);
123 }
124 t += 1.0;
125 }
126 }
127 stream.sort();
128 stream
129 }
130}