use crate::residual::{
cache_io, cardinality, contention, plan_regression, workload_phase, ResidualStream,
};
use rand::{Rng, SeedableRng};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum PerturbationClass {
LatencyInjection,
StatisticsStaleness,
LockHold,
CacheEviction,
WorkloadShift,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerturbationWindow {
pub class: PerturbationClass,
pub t_start: f64,
pub t_end: f64,
pub channel: String,
pub magnitude: f64,
pub seed: u64,
}
pub fn tpcds_with_perturbations(seed: u64) -> (ResidualStream, Vec<PerturbationWindow>) {
tpcds_with_perturbations_scaled(seed, 1.0)
}
pub fn tpcds_with_perturbations_scaled(
seed: u64,
scale: f64,
) -> (ResidualStream, Vec<PerturbationWindow>) {
let mut rng = rand_pcg::Pcg64::seed_from_u64(seed);
let mut stream = ResidualStream::new(if (scale - 1.0).abs() < 1e-12 {
format!("tpcds-perturbed-seed{seed}")
} else {
format!("tpcds-perturbed-seed{seed}-scale{:.3}", scale)
});
let mut windows = Vec::new();
let bucket_seconds = 30.0;
for t_int in 0..1800 {
let t = t_int as f64;
let q = (t_int % 99) + 1;
let qid = format!("q{}", q);
let true_rows: f64 = 5000.0 * (1.0 + rng.gen_range(0.0..0.4));
let est_rows = true_rows * (1.0 + rng.gen_range(-0.08..0.08));
cardinality::push(&mut stream, t, &qid, est_rows, true_rows);
plan_regression::push_latency(&mut stream, t, &qid, 50.0 + rng.gen_range(-2.0..2.0), 50.0);
cache_io::push_hit_ratio(
&mut stream,
t,
"tpcds",
0.95,
0.95 + rng.gen_range(-0.005..0.005),
);
}
let win = PerturbationWindow {
class: PerturbationClass::LatencyInjection,
t_start: 200.0,
t_end: 280.0,
channel: "q42".into(),
magnitude: 1.0 + 5.0 * scale,
seed,
};
for t_int in 200..280 {
let t = t_int as f64;
plan_regression::push_latency(
&mut stream,
t,
"q42",
50.0 + 250.0 * scale + rng.gen_range(-10.0..10.0),
50.0,
);
}
windows.push(win);
let win = PerturbationWindow {
class: PerturbationClass::StatisticsStaleness,
t_start: 600.0,
t_end: 720.0,
channel: "q17".into(),
magnitude: 1.0 + 29.0 * scale,
seed,
};
for t_int in 600..720 {
let t = t_int as f64;
let true_rows: f64 = 30000.0;
let est_rows = true_rows / (1.0 + 29.0 * scale);
cardinality::push(&mut stream, t, "q17", est_rows, true_rows);
}
windows.push(win);
let win = PerturbationWindow {
class: PerturbationClass::LockHold,
t_start: 900.0,
t_end: 1020.0,
channel: "row_lock".into(),
magnitude: 1.5 * scale,
seed,
};
for t_int in 900..1020 {
let t = t_int as f64;
let progress = (t - 900.0) / 120.0;
let wait_s = 0.05 + 1.5 * scale * progress;
contention::push_wait(&mut stream, t, "row_lock", wait_s);
let depth = (1.0 + 4.0 * scale * progress) as usize;
contention::push_chain_depth(&mut stream, t, "row_lock", depth);
}
windows.push(win);
let win = PerturbationWindow {
class: PerturbationClass::CacheEviction,
t_start: 1200.0,
t_end: 1320.0,
channel: "tpcds".into(),
magnitude: 0.45 * scale,
seed,
};
for t_int in 1200..1320 {
let t = t_int as f64;
cache_io::push_hit_ratio(
&mut stream,
t,
"tpcds",
0.95,
0.95 - 0.45 * scale + rng.gen_range(-0.02..0.02),
);
}
windows.push(win);
let win = PerturbationWindow {
class: PerturbationClass::WorkloadShift,
t_start: 1500.0,
t_end: 1680.0,
channel: "tpcds".into(),
magnitude: 0.4 * scale,
seed,
};
let mut t = 1500.0;
while t < 1680.0 {
let d = 0.05 + 0.4 * scale + rng.gen_range(-0.05..0.05);
workload_phase::push_jsd(&mut stream, t, "tpcds", d);
t += bucket_seconds;
}
windows.push(win);
stream.sort();
(stream, windows)
}