dsfb_database/perturbation/
mod.rs1use crate::residual::{
19 cache_io, cardinality, contention, plan_regression, workload_phase, ResidualStream,
20};
21use rand::{Rng, SeedableRng};
22use serde::{Deserialize, Serialize};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25pub enum PerturbationClass {
26 LatencyInjection,
27 StatisticsStaleness,
28 LockHold,
29 CacheEviction,
30 WorkloadShift,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct PerturbationWindow {
35 pub class: PerturbationClass,
36 pub t_start: f64,
37 pub t_end: f64,
38 pub channel: String,
39 pub magnitude: f64,
40 pub seed: u64,
41}
42
43pub fn tpcds_with_perturbations(seed: u64) -> (ResidualStream, Vec<PerturbationWindow>) {
47 tpcds_with_perturbations_scaled(seed, 1.0)
48}
49
50pub fn tpcds_with_perturbations_scaled(
59 seed: u64,
60 scale: f64,
61) -> (ResidualStream, Vec<PerturbationWindow>) {
62 let mut rng = rand_pcg::Pcg64::seed_from_u64(seed);
63 let mut stream = ResidualStream::new(if (scale - 1.0).abs() < 1e-12 {
64 format!("tpcds-perturbed-seed{seed}")
65 } else {
66 format!("tpcds-perturbed-seed{seed}-scale{:.3}", scale)
67 });
68 let mut windows = Vec::new();
69 let bucket_seconds = 30.0;
70
71 for t_int in 0..1800 {
76 let t = t_int as f64;
77 let q = (t_int % 99) + 1;
78 let qid = format!("q{}", q);
79 let true_rows: f64 = 5000.0 * (1.0 + rng.gen_range(0.0..0.4));
80 let est_rows = true_rows * (1.0 + rng.gen_range(-0.08..0.08));
81 cardinality::push(&mut stream, t, &qid, est_rows, true_rows);
82 plan_regression::push_latency(&mut stream, t, &qid, 50.0 + rng.gen_range(-2.0..2.0), 50.0);
83 cache_io::push_hit_ratio(
84 &mut stream,
85 t,
86 "tpcds",
87 0.95,
88 0.95 + rng.gen_range(-0.005..0.005),
89 );
90 }
91
92 let win = PerturbationWindow {
94 class: PerturbationClass::LatencyInjection,
95 t_start: 200.0,
96 t_end: 280.0,
97 channel: "q42".into(),
98 magnitude: 1.0 + 5.0 * scale,
99 seed,
100 };
101 for t_int in 200..280 {
102 let t = t_int as f64;
103 plan_regression::push_latency(
105 &mut stream,
106 t,
107 "q42",
108 50.0 + 250.0 * scale + rng.gen_range(-10.0..10.0),
109 50.0,
110 );
111 }
112 windows.push(win);
113
114 let win = PerturbationWindow {
116 class: PerturbationClass::StatisticsStaleness,
117 t_start: 600.0,
118 t_end: 720.0,
119 channel: "q17".into(),
120 magnitude: 1.0 + 29.0 * scale,
121 seed,
122 };
123 for t_int in 600..720 {
124 let t = t_int as f64;
125 let true_rows: f64 = 30000.0;
126 let est_rows = true_rows / (1.0 + 29.0 * scale);
127 cardinality::push(&mut stream, t, "q17", est_rows, true_rows);
128 }
129 windows.push(win);
130
131 let win = PerturbationWindow {
133 class: PerturbationClass::LockHold,
134 t_start: 900.0,
135 t_end: 1020.0,
136 channel: "row_lock".into(),
137 magnitude: 1.5 * scale,
138 seed,
139 };
140 for t_int in 900..1020 {
141 let t = t_int as f64;
142 let progress = (t - 900.0) / 120.0;
143 let wait_s = 0.05 + 1.5 * scale * progress;
144 contention::push_wait(&mut stream, t, "row_lock", wait_s);
145 let depth = (1.0 + 4.0 * scale * progress) as usize;
146 contention::push_chain_depth(&mut stream, t, "row_lock", depth);
147 }
148 windows.push(win);
149
150 let win = PerturbationWindow {
152 class: PerturbationClass::CacheEviction,
153 t_start: 1200.0,
154 t_end: 1320.0,
155 channel: "tpcds".into(),
156 magnitude: 0.45 * scale,
157 seed,
158 };
159 for t_int in 1200..1320 {
160 let t = t_int as f64;
161 cache_io::push_hit_ratio(
162 &mut stream,
163 t,
164 "tpcds",
165 0.95,
166 0.95 - 0.45 * scale + rng.gen_range(-0.02..0.02),
167 );
168 }
169 windows.push(win);
170
171 let win = PerturbationWindow {
173 class: PerturbationClass::WorkloadShift,
174 t_start: 1500.0,
175 t_end: 1680.0,
176 channel: "tpcds".into(),
177 magnitude: 0.4 * scale,
178 seed,
179 };
180 let mut t = 1500.0;
181 while t < 1680.0 {
182 let d = 0.05 + 0.4 * scale + rng.gen_range(-0.05..0.05);
183 workload_phase::push_jsd(&mut stream, t, "tpcds", d);
184 t += bucket_seconds;
185 }
186 windows.push(win);
187
188 stream.sort();
189 (stream, windows)
190}