Skip to main content

dsfb_gray/
inject.rs

1#![allow(missing_docs)]
2
3//! # DSFB Fault Injection — Deterministic Chaos for Distributed Rust Systems
4//!
5//! Provides reproducible, seed-controlled fault injection scenarios that
6//! produce gray failures: states where a system isn't "down" but is
7//! performing incorrectly in ways that standard health checks miss.
8
9use crate::{
10    DsfbObserver, GrammarState, ObserverConfig, ReasonCode, ResidualSample, ResidualSource,
11};
12
13/// A deterministic fault scenario producing a time-series of residual samples.
14pub trait FaultScenario {
15    fn name(&self) -> &str;
16    fn description(&self) -> &str;
17    fn next_sample(&mut self, step: u64) -> Option<ResidualSample>;
18    fn expected_reason_code(&self) -> ReasonCode;
19    fn total_steps(&self) -> u64;
20    fn reset(&mut self);
21    fn injection_start(&self) -> u64;
22}
23
24/// Deterministic xorshift64 noise generator.
25fn xorshift_noise(state: &mut u64, amplitude: f64) -> f64 {
26    *state ^= *state << 13;
27    *state ^= *state >> 7;
28    *state ^= *state << 17;
29    let normalized = (*state as f64) / (u64::MAX as f64) * 2.0 - 1.0;
30    normalized * amplitude
31}
32
33/// Clock drift injection scenario.
34pub struct ClockDriftScenario {
35    pub baseline_rtt_ms: f64,
36    pub drift_rate: f64,
37    pub injection_start_step: u64,
38    pub duration: u64,
39    pub noise_amp: f64,
40    noise_state: u64,
41    seed: u64,
42}
43
44impl ClockDriftScenario {
45    pub fn new(baseline: f64, drift_rate: f64, start: u64, duration: u64, noise: f64) -> Self {
46        Self {
47            baseline_rtt_ms: baseline,
48            drift_rate,
49            injection_start_step: start,
50            duration,
51            noise_amp: noise * baseline,
52            noise_state: 42,
53            seed: 42,
54        }
55    }
56    pub fn default_scenario() -> Self {
57        Self::new(5.0, 0.05, 50, 200, 0.02)
58    }
59}
60
61impl FaultScenario for ClockDriftScenario {
62    fn name(&self) -> &str {
63        "Clock Drift Injection"
64    }
65    fn description(&self) -> &str {
66        "Monotonic clock divergence producing increasing apparent heartbeat RTT"
67    }
68    fn injection_start(&self) -> u64 {
69        self.injection_start_step
70    }
71    fn next_sample(&mut self, step: u64) -> Option<ResidualSample> {
72        if step >= self.duration {
73            return None;
74        }
75        let drift = if step >= self.injection_start_step {
76            self.drift_rate * (step - self.injection_start_step) as f64
77        } else {
78            0.0
79        };
80        let noise = xorshift_noise(&mut self.noise_state, self.noise_amp);
81        Some(ResidualSample {
82            value: self.baseline_rtt_ms + drift + noise,
83            baseline: self.baseline_rtt_ms,
84            timestamp_ns: step * 1_000_000_000,
85            source: ResidualSource::HeartbeatRtt,
86        })
87    }
88    fn expected_reason_code(&self) -> ReasonCode {
89        ReasonCode::ClockDriftDivergence
90    }
91    fn total_steps(&self) -> u64 {
92        self.duration
93    }
94    fn reset(&mut self) {
95        self.noise_state = self.seed;
96    }
97}
98
99/// Partial network partition scenario.
100pub struct PartialPartitionScenario {
101    pub baseline: f64,
102    pub start: u64,
103    pub duration: u64,
104    pub rate: f64,
105    pub burst: f64,
106    pub burst_dur: u64,
107    pub noise_state: u64,
108    pub seed: u64,
109}
110
111impl PartialPartitionScenario {
112    pub fn default_scenario() -> Self {
113        Self {
114            baseline: 5.0,
115            start: 40,
116            duration: 200,
117            rate: 0.08,
118            burst: 3.0,
119            burst_dur: 10,
120            noise_state: 137,
121            seed: 137,
122        }
123    }
124}
125
126impl FaultScenario for PartialPartitionScenario {
127    fn name(&self) -> &str {
128        "Partial Network Partition"
129    }
130    fn description(&self) -> &str {
131        "Selective packet loss producing burst-then-drift latency signature"
132    }
133    fn injection_start(&self) -> u64 {
134        self.start
135    }
136    fn next_sample(&mut self, step: u64) -> Option<ResidualSample> {
137        if step >= self.duration {
138            return None;
139        }
140        let pert = if step >= self.start {
141            let e = (step - self.start) as f64;
142            let b = if (step - self.start) < self.burst_dur {
143                self.burst * (1.0 - e / self.burst_dur as f64)
144            } else {
145                0.0
146            };
147            b + self.rate * e
148        } else {
149            0.0
150        };
151        let noise = xorshift_noise(&mut self.noise_state, self.baseline * 0.03);
152        Some(ResidualSample {
153            value: self.baseline + pert + noise,
154            baseline: self.baseline,
155            timestamp_ns: step * 1_000_000_000,
156            source: ResidualSource::Latency,
157        })
158    }
159    fn expected_reason_code(&self) -> ReasonCode {
160        ReasonCode::PartialPartitionSignature
161    }
162    fn total_steps(&self) -> u64 {
163        self.duration
164    }
165    fn reset(&mut self) {
166        self.noise_state = self.seed;
167    }
168}
169
170/// Channel backpressure scenario.
171pub struct ChannelBackpressureScenario {
172    pub baseline: f64,
173    pub start: u64,
174    pub duration: u64,
175    pub rate: f64,
176    pub noise_state: u64,
177    pub seed: u64,
178}
179impl ChannelBackpressureScenario {
180    pub fn default_scenario() -> Self {
181        Self {
182            baseline: 100.0,
183            start: 30,
184            duration: 200,
185            rate: 5.0,
186            noise_state: 271,
187            seed: 271,
188        }
189    }
190}
191impl FaultScenario for ChannelBackpressureScenario {
192    fn name(&self) -> &str {
193        "Channel Backpressure Onset"
194    }
195    fn description(&self) -> &str {
196        "Bounded mpsc channel depth growing toward capacity"
197    }
198    fn injection_start(&self) -> u64 {
199        self.start
200    }
201    fn next_sample(&mut self, step: u64) -> Option<ResidualSample> {
202        if step >= self.duration {
203            return None;
204        }
205        let growth = if step >= self.start {
206            let e = (step - self.start) as f64;
207            self.rate * e + 0.05 * e * e
208        } else {
209            0.0
210        };
211        let noise = xorshift_noise(&mut self.noise_state, 5.0);
212        Some(ResidualSample {
213            value: (self.baseline + growth + noise).min(1000.0),
214            baseline: self.baseline,
215            timestamp_ns: step * 1_000_000_000,
216            source: ResidualSource::QueueDepth,
217        })
218    }
219    fn expected_reason_code(&self) -> ReasonCode {
220        ReasonCode::ChannelBackpressureOnset
221    }
222    fn total_steps(&self) -> u64 {
223        self.duration
224    }
225    fn reset(&mut self) {
226        self.noise_state = self.seed;
227    }
228}
229
230/// Async runtime starvation scenario.
231pub struct AsyncStarvationScenario {
232    pub baseline: f64,
233    pub start: u64,
234    pub duration: u64,
235    pub rate: f64,
236    pub noise_state: u64,
237    pub seed: u64,
238}
239impl AsyncStarvationScenario {
240    pub fn default_scenario() -> Self {
241        Self {
242            baseline: 50.0,
243            start: 60,
244            duration: 200,
245            rate: 2.0,
246            noise_state: 313,
247            seed: 313,
248        }
249    }
250}
251impl FaultScenario for AsyncStarvationScenario {
252    fn name(&self) -> &str {
253        "Async Runtime Starvation"
254    }
255    fn description(&self) -> &str {
256        "Tokio task poll duration increasing from blocking in async context"
257    }
258    fn injection_start(&self) -> u64 {
259        self.start
260    }
261    fn next_sample(&mut self, step: u64) -> Option<ResidualSample> {
262        if step >= self.duration {
263            return None;
264        }
265        let starv = if step >= self.start {
266            self.rate * (step - self.start) as f64
267        } else {
268            0.0
269        };
270        let noise = xorshift_noise(&mut self.noise_state, 3.0);
271        Some(ResidualSample {
272            value: self.baseline + starv + noise,
273            baseline: self.baseline,
274            timestamp_ns: step * 1_000_000_000,
275            source: ResidualSource::PollDuration,
276        })
277    }
278    fn expected_reason_code(&self) -> ReasonCode {
279        ReasonCode::AsyncRuntimeStarvation
280    }
281    fn total_steps(&self) -> u64 {
282        self.duration
283    }
284    fn reset(&mut self) {
285        self.noise_state = self.seed;
286    }
287}
288
289/// Run a scenario through a DSFB observer and collect results.
290pub fn run_scenario(scenario: &mut dyn FaultScenario, config: &ObserverConfig) -> ScenarioResult {
291    scenario.reset();
292    let first = scenario.next_sample(0);
293    scenario.reset();
294    let src = first.map(|s| s.source).unwrap_or(ResidualSource::Latency);
295    let mut observer = DsfbObserver::new(src, config);
296    let injection_start = scenario.injection_start();
297    let mut stats = ScenarioRunStats::default();
298    let mut samples = Vec::with_capacity(scenario.total_steps() as usize);
299
300    for step in 0..scenario.total_steps() {
301        if let Some(sample) = scenario.next_sample(step) {
302            let observation = observer.observe(&sample);
303            samples.push(sample_record(step, &sample, &observation));
304            update_scenario_run_stats(
305                &mut stats,
306                step,
307                injection_start,
308                observation.grammar_state,
309                observation.heuristic_match.reason_code,
310            );
311        }
312    }
313    build_scenario_result(scenario, injection_start, stats, samples)
314}
315
316#[derive(Default)]
317struct ScenarioRunStats {
318    first_boundary: Option<u64>,
319    first_violation: Option<u64>,
320    first_anomaly: Option<u64>,
321    detected_reason_code: Option<ReasonCode>,
322    boundary_count: u32,
323    violation_count: u32,
324    false_alarms: u32,
325}
326
327fn sample_record(
328    step: u64,
329    sample: &ResidualSample,
330    observation: &crate::ObservationResult,
331) -> SampleRecord {
332    SampleRecord {
333        step,
334        value: sample.value,
335        baseline: sample.baseline,
336        residual: observation.sign.residual,
337        drift: observation.sign.drift,
338        slew: observation.sign.slew,
339        grammar_state: observation.grammar_state,
340    }
341}
342
343fn update_scenario_run_stats(
344    stats: &mut ScenarioRunStats,
345    step: u64,
346    injection_start: u64,
347    grammar_state: GrammarState,
348    reason_code: ReasonCode,
349) {
350    if matches!(
351        grammar_state,
352        GrammarState::Boundary | GrammarState::Violation
353    ) && stats.first_anomaly.is_none()
354    {
355        stats.first_anomaly = Some(step);
356        stats.detected_reason_code = Some(reason_code);
357        if step < injection_start {
358            stats.false_alarms += 1;
359        }
360    }
361
362    match grammar_state {
363        GrammarState::Boundary => {
364            stats.boundary_count += 1;
365            if stats.first_boundary.is_none() {
366                stats.first_boundary = Some(step);
367            }
368        }
369        GrammarState::Violation => {
370            stats.violation_count += 1;
371            if stats.first_violation.is_none() {
372                stats.first_violation = Some(step);
373            }
374        }
375        GrammarState::Admissible => {}
376    }
377}
378
379fn build_scenario_result(
380    scenario: &dyn FaultScenario,
381    injection_start: u64,
382    stats: ScenarioRunStats,
383    samples: Vec<SampleRecord>,
384) -> ScenarioResult {
385    ScenarioResult {
386        scenario_name: scenario.name().into(),
387        total_steps: scenario.total_steps(),
388        injection_start,
389        first_anomaly_step: stats.first_anomaly,
390        first_boundary_step: stats.first_boundary,
391        first_violation_step: stats.first_violation,
392        detected_reason_code: stats.detected_reason_code,
393        false_alarms_before_injection: stats.false_alarms,
394        total_boundary_steps: stats.boundary_count,
395        total_violation_steps: stats.violation_count,
396        expected_reason_code: scenario.expected_reason_code(),
397        samples,
398    }
399}
400
401#[derive(Debug, Clone)]
402pub struct SampleRecord {
403    pub step: u64,
404    pub value: f64,
405    pub baseline: f64,
406    pub residual: f64,
407    pub drift: f64,
408    pub slew: f64,
409    pub grammar_state: GrammarState,
410}
411
412#[derive(Debug, Clone)]
413pub struct ScenarioResult {
414    pub scenario_name: String,
415    pub total_steps: u64,
416    pub injection_start: u64,
417    pub first_anomaly_step: Option<u64>,
418    pub first_boundary_step: Option<u64>,
419    pub first_violation_step: Option<u64>,
420    pub detected_reason_code: Option<ReasonCode>,
421    pub false_alarms_before_injection: u32,
422    pub total_boundary_steps: u32,
423    pub total_violation_steps: u32,
424    pub expected_reason_code: ReasonCode,
425    pub samples: Vec<SampleRecord>,
426}
427
428impl ScenarioResult {
429    pub fn detection_lead_time(&self) -> Option<u64> {
430        self.first_anomaly_step
431            .map(|s| self.total_steps.saturating_sub(s))
432    }
433
434    pub fn detected(&self) -> bool {
435        self.first_anomaly_step.is_some()
436    }
437
438    pub fn detection_delay_from_injection(&self) -> Option<u64> {
439        self.first_anomaly_step
440            .filter(|step| *step >= self.injection_start)
441            .map(|step| step - self.injection_start)
442    }
443}
444
445#[cfg(test)]
446mod tests {
447    use super::*;
448    use crate::{AdmissibilityEnvelope, WorkloadPhase};
449
450    #[test]
451    fn test_clock_drift_detected() {
452        let mut s = ClockDriftScenario::default_scenario();
453        let config = ObserverConfig {
454            persistence_window: 20,
455            hysteresis_count: 3,
456            default_envelope: AdmissibilityEnvelope::symmetric(
457                2.0,
458                0.1,
459                0.05,
460                WorkloadPhase::SteadyState,
461            ),
462            ..ObserverConfig::fast_response()
463        };
464        let r = run_scenario(&mut s, &config);
465        assert!(r.detected(), "Clock drift must be detected");
466        assert!(
467            r.detection_lead_time().unwrap() > 10,
468            "Must detect with lead time > 10 steps"
469        );
470    }
471
472    #[test]
473    fn test_partial_partition_detected() {
474        let mut s = PartialPartitionScenario::default_scenario();
475        let config = ObserverConfig {
476            persistence_window: 15,
477            hysteresis_count: 3,
478            default_envelope: AdmissibilityEnvelope::symmetric(
479                3.0,
480                0.15,
481                0.08,
482                WorkloadPhase::SteadyState,
483            ),
484            ..ObserverConfig::fast_response()
485        };
486        assert!(run_scenario(&mut s, &config).detected());
487    }
488
489    #[test]
490    fn test_backpressure_detected() {
491        let mut s = ChannelBackpressureScenario::default_scenario();
492        let config = ObserverConfig {
493            persistence_window: 15,
494            hysteresis_count: 3,
495            default_envelope: AdmissibilityEnvelope::symmetric(
496                100.0,
497                10.0,
498                5.0,
499                WorkloadPhase::SteadyState,
500            ),
501            ..ObserverConfig::fast_response()
502        };
503        assert!(run_scenario(&mut s, &config).detected());
504    }
505
506    #[test]
507    fn test_async_starvation_detected() {
508        let mut s = AsyncStarvationScenario::default_scenario();
509        let config = ObserverConfig {
510            persistence_window: 15,
511            hysteresis_count: 3,
512            default_envelope: AdmissibilityEnvelope::symmetric(
513                30.0,
514                3.0,
515                1.5,
516                WorkloadPhase::SteadyState,
517            ),
518            ..ObserverConfig::fast_response()
519        };
520        assert!(run_scenario(&mut s, &config).detected());
521    }
522
523    #[test]
524    fn test_deterministic_replay() {
525        let config = ObserverConfig {
526            persistence_window: 20,
527            hysteresis_count: 3,
528            default_envelope: AdmissibilityEnvelope::symmetric(
529                2.0,
530                0.1,
531                0.05,
532                WorkloadPhase::SteadyState,
533            ),
534            ..ObserverConfig::fast_response()
535        };
536        let mut s1 = ClockDriftScenario::default_scenario();
537        let r1 = run_scenario(&mut s1, &config);
538        let mut s2 = ClockDriftScenario::default_scenario();
539        let r2 = run_scenario(&mut s2, &config);
540        assert_eq!(
541            r1.first_anomaly_step, r2.first_anomaly_step,
542            "Must be deterministically reproducible"
543        );
544    }
545}