1#![allow(missing_docs)]
2
3use crate::{
10 DsfbObserver, GrammarState, ObserverConfig, ReasonCode, ResidualSample, ResidualSource,
11};
12
13pub trait FaultScenario {
15 fn name(&self) -> &str;
16 fn description(&self) -> &str;
17 fn next_sample(&mut self, step: u64) -> Option<ResidualSample>;
18 fn expected_reason_code(&self) -> ReasonCode;
19 fn total_steps(&self) -> u64;
20 fn reset(&mut self);
21 fn injection_start(&self) -> u64;
22}
23
24fn xorshift_noise(state: &mut u64, amplitude: f64) -> f64 {
26 *state ^= *state << 13;
27 *state ^= *state >> 7;
28 *state ^= *state << 17;
29 let normalized = (*state as f64) / (u64::MAX as f64) * 2.0 - 1.0;
30 normalized * amplitude
31}
32
33pub struct ClockDriftScenario {
35 pub baseline_rtt_ms: f64,
36 pub drift_rate: f64,
37 pub injection_start_step: u64,
38 pub duration: u64,
39 pub noise_amp: f64,
40 noise_state: u64,
41 seed: u64,
42}
43
44impl ClockDriftScenario {
45 pub fn new(baseline: f64, drift_rate: f64, start: u64, duration: u64, noise: f64) -> Self {
46 Self {
47 baseline_rtt_ms: baseline,
48 drift_rate,
49 injection_start_step: start,
50 duration,
51 noise_amp: noise * baseline,
52 noise_state: 42,
53 seed: 42,
54 }
55 }
56 pub fn default_scenario() -> Self {
57 Self::new(5.0, 0.05, 50, 200, 0.02)
58 }
59}
60
61impl FaultScenario for ClockDriftScenario {
62 fn name(&self) -> &str {
63 "Clock Drift Injection"
64 }
65 fn description(&self) -> &str {
66 "Monotonic clock divergence producing increasing apparent heartbeat RTT"
67 }
68 fn injection_start(&self) -> u64 {
69 self.injection_start_step
70 }
71 fn next_sample(&mut self, step: u64) -> Option<ResidualSample> {
72 if step >= self.duration {
73 return None;
74 }
75 let drift = if step >= self.injection_start_step {
76 self.drift_rate * (step - self.injection_start_step) as f64
77 } else {
78 0.0
79 };
80 let noise = xorshift_noise(&mut self.noise_state, self.noise_amp);
81 Some(ResidualSample {
82 value: self.baseline_rtt_ms + drift + noise,
83 baseline: self.baseline_rtt_ms,
84 timestamp_ns: step * 1_000_000_000,
85 source: ResidualSource::HeartbeatRtt,
86 })
87 }
88 fn expected_reason_code(&self) -> ReasonCode {
89 ReasonCode::ClockDriftDivergence
90 }
91 fn total_steps(&self) -> u64 {
92 self.duration
93 }
94 fn reset(&mut self) {
95 self.noise_state = self.seed;
96 }
97}
98
99pub struct PartialPartitionScenario {
101 pub baseline: f64,
102 pub start: u64,
103 pub duration: u64,
104 pub rate: f64,
105 pub burst: f64,
106 pub burst_dur: u64,
107 pub noise_state: u64,
108 pub seed: u64,
109}
110
111impl PartialPartitionScenario {
112 pub fn default_scenario() -> Self {
113 Self {
114 baseline: 5.0,
115 start: 40,
116 duration: 200,
117 rate: 0.08,
118 burst: 3.0,
119 burst_dur: 10,
120 noise_state: 137,
121 seed: 137,
122 }
123 }
124}
125
126impl FaultScenario for PartialPartitionScenario {
127 fn name(&self) -> &str {
128 "Partial Network Partition"
129 }
130 fn description(&self) -> &str {
131 "Selective packet loss producing burst-then-drift latency signature"
132 }
133 fn injection_start(&self) -> u64 {
134 self.start
135 }
136 fn next_sample(&mut self, step: u64) -> Option<ResidualSample> {
137 if step >= self.duration {
138 return None;
139 }
140 let pert = if step >= self.start {
141 let e = (step - self.start) as f64;
142 let b = if (step - self.start) < self.burst_dur {
143 self.burst * (1.0 - e / self.burst_dur as f64)
144 } else {
145 0.0
146 };
147 b + self.rate * e
148 } else {
149 0.0
150 };
151 let noise = xorshift_noise(&mut self.noise_state, self.baseline * 0.03);
152 Some(ResidualSample {
153 value: self.baseline + pert + noise,
154 baseline: self.baseline,
155 timestamp_ns: step * 1_000_000_000,
156 source: ResidualSource::Latency,
157 })
158 }
159 fn expected_reason_code(&self) -> ReasonCode {
160 ReasonCode::PartialPartitionSignature
161 }
162 fn total_steps(&self) -> u64 {
163 self.duration
164 }
165 fn reset(&mut self) {
166 self.noise_state = self.seed;
167 }
168}
169
170pub struct ChannelBackpressureScenario {
172 pub baseline: f64,
173 pub start: u64,
174 pub duration: u64,
175 pub rate: f64,
176 pub noise_state: u64,
177 pub seed: u64,
178}
179impl ChannelBackpressureScenario {
180 pub fn default_scenario() -> Self {
181 Self {
182 baseline: 100.0,
183 start: 30,
184 duration: 200,
185 rate: 5.0,
186 noise_state: 271,
187 seed: 271,
188 }
189 }
190}
191impl FaultScenario for ChannelBackpressureScenario {
192 fn name(&self) -> &str {
193 "Channel Backpressure Onset"
194 }
195 fn description(&self) -> &str {
196 "Bounded mpsc channel depth growing toward capacity"
197 }
198 fn injection_start(&self) -> u64 {
199 self.start
200 }
201 fn next_sample(&mut self, step: u64) -> Option<ResidualSample> {
202 if step >= self.duration {
203 return None;
204 }
205 let growth = if step >= self.start {
206 let e = (step - self.start) as f64;
207 self.rate * e + 0.05 * e * e
208 } else {
209 0.0
210 };
211 let noise = xorshift_noise(&mut self.noise_state, 5.0);
212 Some(ResidualSample {
213 value: (self.baseline + growth + noise).min(1000.0),
214 baseline: self.baseline,
215 timestamp_ns: step * 1_000_000_000,
216 source: ResidualSource::QueueDepth,
217 })
218 }
219 fn expected_reason_code(&self) -> ReasonCode {
220 ReasonCode::ChannelBackpressureOnset
221 }
222 fn total_steps(&self) -> u64 {
223 self.duration
224 }
225 fn reset(&mut self) {
226 self.noise_state = self.seed;
227 }
228}
229
230pub struct AsyncStarvationScenario {
232 pub baseline: f64,
233 pub start: u64,
234 pub duration: u64,
235 pub rate: f64,
236 pub noise_state: u64,
237 pub seed: u64,
238}
239impl AsyncStarvationScenario {
240 pub fn default_scenario() -> Self {
241 Self {
242 baseline: 50.0,
243 start: 60,
244 duration: 200,
245 rate: 2.0,
246 noise_state: 313,
247 seed: 313,
248 }
249 }
250}
251impl FaultScenario for AsyncStarvationScenario {
252 fn name(&self) -> &str {
253 "Async Runtime Starvation"
254 }
255 fn description(&self) -> &str {
256 "Tokio task poll duration increasing from blocking in async context"
257 }
258 fn injection_start(&self) -> u64 {
259 self.start
260 }
261 fn next_sample(&mut self, step: u64) -> Option<ResidualSample> {
262 if step >= self.duration {
263 return None;
264 }
265 let starv = if step >= self.start {
266 self.rate * (step - self.start) as f64
267 } else {
268 0.0
269 };
270 let noise = xorshift_noise(&mut self.noise_state, 3.0);
271 Some(ResidualSample {
272 value: self.baseline + starv + noise,
273 baseline: self.baseline,
274 timestamp_ns: step * 1_000_000_000,
275 source: ResidualSource::PollDuration,
276 })
277 }
278 fn expected_reason_code(&self) -> ReasonCode {
279 ReasonCode::AsyncRuntimeStarvation
280 }
281 fn total_steps(&self) -> u64 {
282 self.duration
283 }
284 fn reset(&mut self) {
285 self.noise_state = self.seed;
286 }
287}
288
289pub fn run_scenario(scenario: &mut dyn FaultScenario, config: &ObserverConfig) -> ScenarioResult {
291 scenario.reset();
292 let first = scenario.next_sample(0);
293 scenario.reset();
294 let src = first.map(|s| s.source).unwrap_or(ResidualSource::Latency);
295 let mut observer = DsfbObserver::new(src, config);
296 let injection_start = scenario.injection_start();
297 let mut stats = ScenarioRunStats::default();
298 let mut samples = Vec::with_capacity(scenario.total_steps() as usize);
299
300 for step in 0..scenario.total_steps() {
301 if let Some(sample) = scenario.next_sample(step) {
302 let observation = observer.observe(&sample);
303 samples.push(sample_record(step, &sample, &observation));
304 update_scenario_run_stats(
305 &mut stats,
306 step,
307 injection_start,
308 observation.grammar_state,
309 observation.heuristic_match.reason_code,
310 );
311 }
312 }
313 build_scenario_result(scenario, injection_start, stats, samples)
314}
315
316#[derive(Default)]
317struct ScenarioRunStats {
318 first_boundary: Option<u64>,
319 first_violation: Option<u64>,
320 first_anomaly: Option<u64>,
321 detected_reason_code: Option<ReasonCode>,
322 boundary_count: u32,
323 violation_count: u32,
324 false_alarms: u32,
325}
326
327fn sample_record(
328 step: u64,
329 sample: &ResidualSample,
330 observation: &crate::ObservationResult,
331) -> SampleRecord {
332 SampleRecord {
333 step,
334 value: sample.value,
335 baseline: sample.baseline,
336 residual: observation.sign.residual,
337 drift: observation.sign.drift,
338 slew: observation.sign.slew,
339 grammar_state: observation.grammar_state,
340 }
341}
342
343fn update_scenario_run_stats(
344 stats: &mut ScenarioRunStats,
345 step: u64,
346 injection_start: u64,
347 grammar_state: GrammarState,
348 reason_code: ReasonCode,
349) {
350 if matches!(
351 grammar_state,
352 GrammarState::Boundary | GrammarState::Violation
353 ) && stats.first_anomaly.is_none()
354 {
355 stats.first_anomaly = Some(step);
356 stats.detected_reason_code = Some(reason_code);
357 if step < injection_start {
358 stats.false_alarms += 1;
359 }
360 }
361
362 match grammar_state {
363 GrammarState::Boundary => {
364 stats.boundary_count += 1;
365 if stats.first_boundary.is_none() {
366 stats.first_boundary = Some(step);
367 }
368 }
369 GrammarState::Violation => {
370 stats.violation_count += 1;
371 if stats.first_violation.is_none() {
372 stats.first_violation = Some(step);
373 }
374 }
375 GrammarState::Admissible => {}
376 }
377}
378
379fn build_scenario_result(
380 scenario: &dyn FaultScenario,
381 injection_start: u64,
382 stats: ScenarioRunStats,
383 samples: Vec<SampleRecord>,
384) -> ScenarioResult {
385 ScenarioResult {
386 scenario_name: scenario.name().into(),
387 total_steps: scenario.total_steps(),
388 injection_start,
389 first_anomaly_step: stats.first_anomaly,
390 first_boundary_step: stats.first_boundary,
391 first_violation_step: stats.first_violation,
392 detected_reason_code: stats.detected_reason_code,
393 false_alarms_before_injection: stats.false_alarms,
394 total_boundary_steps: stats.boundary_count,
395 total_violation_steps: stats.violation_count,
396 expected_reason_code: scenario.expected_reason_code(),
397 samples,
398 }
399}
400
401#[derive(Debug, Clone)]
402pub struct SampleRecord {
403 pub step: u64,
404 pub value: f64,
405 pub baseline: f64,
406 pub residual: f64,
407 pub drift: f64,
408 pub slew: f64,
409 pub grammar_state: GrammarState,
410}
411
412#[derive(Debug, Clone)]
413pub struct ScenarioResult {
414 pub scenario_name: String,
415 pub total_steps: u64,
416 pub injection_start: u64,
417 pub first_anomaly_step: Option<u64>,
418 pub first_boundary_step: Option<u64>,
419 pub first_violation_step: Option<u64>,
420 pub detected_reason_code: Option<ReasonCode>,
421 pub false_alarms_before_injection: u32,
422 pub total_boundary_steps: u32,
423 pub total_violation_steps: u32,
424 pub expected_reason_code: ReasonCode,
425 pub samples: Vec<SampleRecord>,
426}
427
428impl ScenarioResult {
429 pub fn detection_lead_time(&self) -> Option<u64> {
430 self.first_anomaly_step
431 .map(|s| self.total_steps.saturating_sub(s))
432 }
433
434 pub fn detected(&self) -> bool {
435 self.first_anomaly_step.is_some()
436 }
437
438 pub fn detection_delay_from_injection(&self) -> Option<u64> {
439 self.first_anomaly_step
440 .filter(|step| *step >= self.injection_start)
441 .map(|step| step - self.injection_start)
442 }
443}
444
445#[cfg(test)]
446mod tests {
447 use super::*;
448 use crate::{AdmissibilityEnvelope, WorkloadPhase};
449
450 #[test]
451 fn test_clock_drift_detected() {
452 let mut s = ClockDriftScenario::default_scenario();
453 let config = ObserverConfig {
454 persistence_window: 20,
455 hysteresis_count: 3,
456 default_envelope: AdmissibilityEnvelope::symmetric(
457 2.0,
458 0.1,
459 0.05,
460 WorkloadPhase::SteadyState,
461 ),
462 ..ObserverConfig::fast_response()
463 };
464 let r = run_scenario(&mut s, &config);
465 assert!(r.detected(), "Clock drift must be detected");
466 assert!(
467 r.detection_lead_time().unwrap() > 10,
468 "Must detect with lead time > 10 steps"
469 );
470 }
471
472 #[test]
473 fn test_partial_partition_detected() {
474 let mut s = PartialPartitionScenario::default_scenario();
475 let config = ObserverConfig {
476 persistence_window: 15,
477 hysteresis_count: 3,
478 default_envelope: AdmissibilityEnvelope::symmetric(
479 3.0,
480 0.15,
481 0.08,
482 WorkloadPhase::SteadyState,
483 ),
484 ..ObserverConfig::fast_response()
485 };
486 assert!(run_scenario(&mut s, &config).detected());
487 }
488
489 #[test]
490 fn test_backpressure_detected() {
491 let mut s = ChannelBackpressureScenario::default_scenario();
492 let config = ObserverConfig {
493 persistence_window: 15,
494 hysteresis_count: 3,
495 default_envelope: AdmissibilityEnvelope::symmetric(
496 100.0,
497 10.0,
498 5.0,
499 WorkloadPhase::SteadyState,
500 ),
501 ..ObserverConfig::fast_response()
502 };
503 assert!(run_scenario(&mut s, &config).detected());
504 }
505
506 #[test]
507 fn test_async_starvation_detected() {
508 let mut s = AsyncStarvationScenario::default_scenario();
509 let config = ObserverConfig {
510 persistence_window: 15,
511 hysteresis_count: 3,
512 default_envelope: AdmissibilityEnvelope::symmetric(
513 30.0,
514 3.0,
515 1.5,
516 WorkloadPhase::SteadyState,
517 ),
518 ..ObserverConfig::fast_response()
519 };
520 assert!(run_scenario(&mut s, &config).detected());
521 }
522
523 #[test]
524 fn test_deterministic_replay() {
525 let config = ObserverConfig {
526 persistence_window: 20,
527 hysteresis_count: 3,
528 default_envelope: AdmissibilityEnvelope::symmetric(
529 2.0,
530 0.1,
531 0.05,
532 WorkloadPhase::SteadyState,
533 ),
534 ..ObserverConfig::fast_response()
535 };
536 let mut s1 = ClockDriftScenario::default_scenario();
537 let r1 = run_scenario(&mut s1, &config);
538 let mut s2 = ClockDriftScenario::default_scenario();
539 let r2 = run_scenario(&mut s2, &config);
540 assert_eq!(
541 r1.first_anomaly_step, r2.first_anomaly_step,
542 "Must be deterministically reproducible"
543 );
544 }
545}