Skip to main content

dsfb_database/perturbation/
mod.rs

1//! Perturbation harness.
2//!
3//! Real SQL workloads lack ground-truth labels for "what went wrong when."
4//! The honest replacement (per Strategy A in the panel discussion) is
5//! controlled perturbation injection: take a clean trace, deterministically
6//! inject a known fault inside a known time window, run DSFB-Database, and
7//! check whether the emitted episodes overlap that window.
8//!
9//! Each perturbation:
10//!   * has a name + class
11//!   * is restricted to a `[t_start, t_end]` window (the *ground-truth window*)
12//!   * is deterministic given a seed
13//!   * is documented in `spec/perturbations.yaml`
14//!
15//! The five perturbations cover the five motif classes one-to-one so the
16//! evaluation cleanly maps motif → injection → window → F1.
17
18use crate::residual::{
19    cache_io, cardinality, contention, plan_regression, workload_phase, ResidualStream,
20};
21use rand::{Rng, SeedableRng};
22use serde::{Deserialize, Serialize};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25pub enum PerturbationClass {
26    LatencyInjection,
27    StatisticsStaleness,
28    LockHold,
29    CacheEviction,
30    WorkloadShift,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct PerturbationWindow {
35    pub class: PerturbationClass,
36    pub t_start: f64,
37    pub t_end: f64,
38    pub channel: String,
39    pub magnitude: f64,
40    pub seed: u64,
41}
42
43/// Build a TPC-DS-shaped trace with all five perturbations injected at
44/// disjoint, documented windows. The returned (stream, ground-truth windows)
45/// pair is the empirical evidence for §8 of the paper.
46pub fn tpcds_with_perturbations(seed: u64) -> (ResidualStream, Vec<PerturbationWindow>) {
47    tpcds_with_perturbations_scaled(seed, 1.0)
48}
49
50/// Same harness, but with each perturbation's *magnitude* multiplied by
51/// `scale`. `scale = 1.0` reproduces the canonical pinned-fingerprint
52/// stream exactly (same RNG draw sequence, same byte output). Lower
53/// scales produce subthreshold perturbations — the residual is still
54/// present but barely above noise — and the stress sweep
55/// (`stress-sweep` subcommand) reports per-motif F1 across a range
56/// of scales so we can see *where each motif breaks down*, not just
57/// that it works at the published baseline.
58pub fn tpcds_with_perturbations_scaled(
59    seed: u64,
60    scale: f64,
61) -> (ResidualStream, Vec<PerturbationWindow>) {
62    let mut rng = rand_pcg::Pcg64::seed_from_u64(seed);
63    let mut stream = ResidualStream::new(if (scale - 1.0).abs() < 1e-12 {
64        format!("tpcds-perturbed-seed{seed}")
65    } else {
66        format!("tpcds-perturbed-seed{seed}-scale{:.3}", scale)
67    });
68    let mut windows = Vec::new();
69    let bucket_seconds = 30.0;
70
71    // Stable backbone — 30 minutes (1800 s) of clean traffic across q1..q99.
72    // The RNG draw sequence here is identical to the original
73    // `tpcds_with_perturbations` so that scale=1.0 reproduces the
74    // canonical fingerprint byte-for-byte.
75    for t_int in 0..1800 {
76        let t = t_int as f64;
77        let q = (t_int % 99) + 1;
78        let qid = format!("q{}", q);
79        let true_rows: f64 = 5000.0 * (1.0 + rng.gen_range(0.0..0.4));
80        let est_rows = true_rows * (1.0 + rng.gen_range(-0.08..0.08));
81        cardinality::push(&mut stream, t, &qid, est_rows, true_rows);
82        plan_regression::push_latency(&mut stream, t, &qid, 50.0 + rng.gen_range(-2.0..2.0), 50.0);
83        cache_io::push_hit_ratio(
84            &mut stream,
85            t,
86            "tpcds",
87            0.95,
88            0.95 + rng.gen_range(-0.005..0.005),
89        );
90    }
91
92    // 1) Latency injection — at scale=1.0 q42 latency runs at 6× baseline.
93    let win = PerturbationWindow {
94        class: PerturbationClass::LatencyInjection,
95        t_start: 200.0,
96        t_end: 280.0,
97        channel: "q42".into(),
98        magnitude: 1.0 + 5.0 * scale,
99        seed,
100    };
101    for t_int in 200..280 {
102        let t = t_int as f64;
103        // 50 ms baseline + 250 ms*scale extra latency + ±10 ms jitter
104        plan_regression::push_latency(
105            &mut stream,
106            t,
107            "q42",
108            50.0 + 250.0 * scale + rng.gen_range(-10.0..10.0),
109            50.0,
110        );
111    }
112    windows.push(win);
113
114    // 2) Statistics staleness — at scale=1.0 q17 cardinality est/actual = 30×.
115    let win = PerturbationWindow {
116        class: PerturbationClass::StatisticsStaleness,
117        t_start: 600.0,
118        t_end: 720.0,
119        channel: "q17".into(),
120        magnitude: 1.0 + 29.0 * scale,
121        seed,
122    };
123    for t_int in 600..720 {
124        let t = t_int as f64;
125        let true_rows: f64 = 30000.0;
126        let est_rows = true_rows / (1.0 + 29.0 * scale);
127        cardinality::push(&mut stream, t, "q17", est_rows, true_rows);
128    }
129    windows.push(win);
130
131    // 3) Lock hold — wait + chain-depth ramp. At scale=1.0, max wait 1.55 s.
132    let win = PerturbationWindow {
133        class: PerturbationClass::LockHold,
134        t_start: 900.0,
135        t_end: 1020.0,
136        channel: "row_lock".into(),
137        magnitude: 1.5 * scale,
138        seed,
139    };
140    for t_int in 900..1020 {
141        let t = t_int as f64;
142        let progress = (t - 900.0) / 120.0;
143        let wait_s = 0.05 + 1.5 * scale * progress;
144        contention::push_wait(&mut stream, t, "row_lock", wait_s);
145        let depth = (1.0 + 4.0 * scale * progress) as usize;
146        contention::push_chain_depth(&mut stream, t, "row_lock", depth);
147    }
148    windows.push(win);
149
150    // 4) Cache eviction — at scale=1.0 hit_ratio drops 0.95 → 0.50.
151    let win = PerturbationWindow {
152        class: PerturbationClass::CacheEviction,
153        t_start: 1200.0,
154        t_end: 1320.0,
155        channel: "tpcds".into(),
156        magnitude: 0.45 * scale,
157        seed,
158    };
159    for t_int in 1200..1320 {
160        let t = t_int as f64;
161        cache_io::push_hit_ratio(
162            &mut stream,
163            t,
164            "tpcds",
165            0.95,
166            0.95 - 0.45 * scale + rng.gen_range(-0.02..0.02),
167        );
168    }
169    windows.push(win);
170
171    // 5) Workload shift — at scale=1.0 JSD elevates to ~0.45.
172    let win = PerturbationWindow {
173        class: PerturbationClass::WorkloadShift,
174        t_start: 1500.0,
175        t_end: 1680.0,
176        channel: "tpcds".into(),
177        magnitude: 0.4 * scale,
178        seed,
179    };
180    let mut t = 1500.0;
181    while t < 1680.0 {
182        let d = 0.05 + 0.4 * scale + rng.gen_range(-0.05..0.05);
183        workload_phase::push_jsd(&mut stream, t, "tpcds", d);
184        t += bucket_seconds;
185    }
186    windows.push(win);
187
188    stream.sort();
189    (stream, windows)
190}