Skip to main content

dsfb_debug/
episode.rs

1//! DSFB-Debug: episode aggregation — Trace Event Collapse implementation.
2//!
3//! # Trace Event Collapse — paper §7's "primary developer-facing delta"
4//!
5//! This module is the structural-aggregation core. It collapses
6//! per-(window, signal) anomaly events into a small number of typed
7//! structural episodes. The collapse is the operator-visible delta
8//! of DSFB-Debug versus flat alerting: 11 raw cell-level alerts on
9//! F-11 collapse into 3 typed episodes (RSCR 3.67×); 52 raw alerts
10//! on the AIOps Challenge KPI fixture collapse into 1 episode (RSCR
11//! 52×).
12//!
13//! # Algorithm
14//!
15//! `aggregate_episodes` is a deterministic run-length aggregator over
16//! the per-(window, signal) `PolicyState` grid. An episode opens when
17//! any signal transitions to `Review` or `Escalate`; an episode
18//! closes when all signals return to `Silent` or `Watch` for
19//! `correlation_window` consecutive windows. The closed episode
20//! carries:
21//!
22//! - `episode_id` — sequential id from 0
23//! - `start_window` / `end_window` — inclusive range
24//! - `peak_grammar_state` — max grammar state observed
25//! - `primary_reason_code` — most-frequent reason code
26//! - `policy_state` — peak policy state (Watch / Review / Escalate)
27//! - `contributing_signal_count` — distinct signals in non-Admissible
28//!   state during the episode
29//! - `structural_signature` — `(dominant_drift_direction,
30//!   peak_slew_magnitude, duration_windows, signal_correlation)`
31//! - `matched_motif` — left as `Unknown`; populated by the bank's
32//!   `match_episode_with_consensus` after fusion.
33//!
34//! # `compute_metrics` — RSCR + fault-recall + clean-window FP rate
35//!
36//! Computes the paper §13 headline metrics from a closed episode list:
37//!
38//! | Metric | Formula |
39//! |--------|---------|
40//! | RSCR | raw_alerts / max(1, dsfb_episode_count) |
41//! | episode_precision | episodes_overlapping_labeled_fault / dsfb_episode_count |
42//! | fault_recall | labeled_faults_captured_by_at_least_one_episode / total_labeled_faults |
43//! | investigation_load_reduction_pct | (1 - dsfb / raw) × 100 |
44//! | clean_window_false_episode_rate | episodes_in_clean_window / clean_window_count |
45//!
46//! All formulas are repeated inline here so a reader doesn't need to
47//! cross-reference the paper.
48//!
49//! # Determinism (Theorem 9)
50//!
51//! The aggregator is a pure function of the policy-state grid.
52//! Iteration order is deterministic (row-major over windows then
53//! signals). Tie-breakers (most-frequent-reason-code resolves ties by
54//! lower enum index) preserve byte-identical output across replays.
55
56use crate::types::*;
57
58/// Aggregate per-window policy evaluations into episodes.
59///
60/// An episode opens when any signal transitions to Review or Escalate.
61/// An episode closes when all signals return to Silent/Watch for
62/// `correlation_window` consecutive windows.
63///
64/// # Arguments
65/// * `policy_states` - per-window, per-signal policy states (row-major: [window][signal])
66/// * `num_signals` - number of signals per window
67/// * `num_windows` - number of windows
68/// * `reason_codes` - per-window, per-signal reason codes
69/// * `correlation_window` - windows of silence required to close an episode
70/// * `episodes_out` - output buffer for episodes
71///
72/// # Returns
73/// Number of episodes written
74#[allow(clippy::too_many_arguments)]
75pub fn aggregate_episodes(
76    policy_states: &[PolicyState],
77    num_signals: usize,
78    num_windows: usize,
79    reason_codes: &[ReasonCode],
80    drift_directions: &[DriftDirection],
81    slew_magnitudes: &[f64],
82    correlation_window: u64,
83    episodes_out: &mut [DebugEpisode],
84) -> usize {
85    if num_signals == 0 || num_windows == 0 {
86        return 0;
87    }
88
89    let mut episode_count: usize = 0;
90    let mut in_episode = false;
91    let mut episode_start: u64 = 0;
92    let mut silent_streak: u64 = 0;
93    let mut peak_state = GrammarState::Admissible;
94    let mut primary_reason = ReasonCode::Admissible;
95    let mut peak_slew: f64 = 0.0;
96    let mut contributing_signals: u16 = 0;
97
98    let mut w: usize = 0;
99    while w < num_windows {
100        // Check if any signal in this window is Review or Escalate
101        let mut window_has_action = false;
102        let mut window_contributing: u16 = 0;
103        let mut s: usize = 0;
104        while s < num_signals {
105            let idx = w * num_signals + s;
106            if idx < policy_states.len() {
107                let ps = policy_states[idx];
108                if ps >= PolicyState::Review {
109                    window_has_action = true;
110                    window_contributing += 1;
111
112                    // Track peak grammar state
113                    let gs = match ps {
114                        PolicyState::Escalate => GrammarState::Violation,
115                        PolicyState::Review => GrammarState::Boundary,
116                        _ => GrammarState::Admissible,
117                    };
118                    if gs > peak_state {
119                        peak_state = gs;
120                    }
121
122                    // Track reason code (prefer more severe)
123                    if idx < reason_codes.len() {
124                        let rc = reason_codes[idx];
125                        if reason_severity(rc) > reason_severity(primary_reason) {
126                            primary_reason = rc;
127                        }
128                    }
129
130                    // Track peak slew
131                    if idx < slew_magnitudes.len() {
132                        let sm = slew_magnitudes[idx];
133                        if sm > peak_slew { peak_slew = sm; }
134                    }
135                }
136            }
137            s += 1;
138        }
139
140        if window_has_action {
141            if !in_episode {
142                // Open new episode
143                in_episode = true;
144                episode_start = w as u64;
145                peak_state = GrammarState::Admissible;
146                primary_reason = ReasonCode::Admissible;
147                peak_slew = 0.0;
148                contributing_signals = 0;
149            }
150            silent_streak = 0;
151            if window_contributing > contributing_signals {
152                contributing_signals = window_contributing;
153            }
154            // Re-check peak state for this window
155            let mut s2: usize = 0;
156            while s2 < num_signals {
157                let idx = w * num_signals + s2;
158                if idx < policy_states.len() && policy_states[idx] >= PolicyState::Review {
159                    let gs = if policy_states[idx] == PolicyState::Escalate {
160                        GrammarState::Violation
161                    } else {
162                        GrammarState::Boundary
163                    };
164                    if gs > peak_state { peak_state = gs; }
165                    if idx < reason_codes.len() {
166                        let rc = reason_codes[idx];
167                        if reason_severity(rc) > reason_severity(primary_reason) {
168                            primary_reason = rc;
169                        }
170                    }
171                    if idx < slew_magnitudes.len() && slew_magnitudes[idx] > peak_slew {
172                        peak_slew = slew_magnitudes[idx];
173                    }
174                }
175                s2 += 1;
176            }
177        } else if in_episode {
178            silent_streak += 1;
179            if silent_streak >= correlation_window {
180                // Close episode
181                if episode_count < episodes_out.len() {
182                    let dominant_drift = if w > 0 && (w - 1) * num_signals < drift_directions.len() {
183                        drift_directions[(w - 1) * num_signals] // first signal's drift as proxy
184                    } else {
185                        DriftDirection::None
186                    };
187
188                    episodes_out[episode_count] = DebugEpisode {
189                        episode_id: episode_count as u32,
190                        start_window: episode_start,
191                        end_window: w as u64 - silent_streak,
192                        peak_grammar_state: peak_state,
193                        primary_reason_code: primary_reason,
194                        matched_motif: SemanticDisposition::Unknown, // filled by caller
195                        policy_state: if peak_state == GrammarState::Violation {
196                            PolicyState::Escalate
197                        } else {
198                            PolicyState::Review
199                        },
200                        contributing_signal_count: contributing_signals,
201                        structural_signature: StructuralSignature {
202                            dominant_drift_direction: dominant_drift,
203                            peak_slew_magnitude: peak_slew,
204                            duration_windows: (w as u64 - silent_streak) - episode_start + 1,
205                            signal_correlation: contributing_signals as f64 / num_signals as f64,
206                        },
207            root_cause_signal_index: None,
208                    };
209                    episode_count += 1;
210                }
211                in_episode = false;
212                peak_state = GrammarState::Admissible;
213                primary_reason = ReasonCode::Admissible;
214                peak_slew = 0.0;
215                contributing_signals = 0;
216            }
217        }
218
219        w += 1;
220    }
221
222    // Close any open episode at end of data
223    if in_episode && episode_count < episodes_out.len() {
224        episodes_out[episode_count] = DebugEpisode {
225            episode_id: episode_count as u32,
226            start_window: episode_start,
227            end_window: num_windows as u64 - 1,
228            peak_grammar_state: peak_state,
229            primary_reason_code: primary_reason,
230            matched_motif: SemanticDisposition::Unknown,
231            policy_state: if peak_state == GrammarState::Violation {
232                PolicyState::Escalate
233            } else {
234                PolicyState::Review
235            },
236            contributing_signal_count: contributing_signals,
237            structural_signature: StructuralSignature {
238                dominant_drift_direction: DriftDirection::None,
239                peak_slew_magnitude: peak_slew,
240                duration_windows: num_windows as u64 - episode_start,
241                signal_correlation: contributing_signals as f64 / num_signals as f64,
242            },
243            root_cause_signal_index: None,
244        };
245        episode_count += 1;
246    }
247
248    episode_count
249}
250
251fn reason_severity(r: ReasonCode) -> u8 {
252    match r {
253        ReasonCode::Admissible => 0,
254        ReasonCode::BoundaryApproach => 1,
255        ReasonCode::SingleCrossing => 1,
256        ReasonCode::DriftWithRecovery => 2,
257        ReasonCode::RecurrentBoundaryGrazing => 3,
258        ReasonCode::SustainedOutwardDrift => 4,
259        ReasonCode::AbruptSlewViolation => 5,
260        ReasonCode::EnvelopeViolation => 6,
261    }
262}
263
264/// Compute benchmark metrics from episodes and fault labels.
265/// Paper §7.4
266#[allow(clippy::too_many_arguments)]
267pub fn compute_metrics(
268    episodes: &[DebugEpisode],
269    episode_count: usize,
270    fault_labels: &[bool],
271    raw_anomaly_count: u64,
272    precision_window: u64,
273    dataset_name: &'static str,
274    num_signals: u16,
275) -> BenchmarkMetrics {
276    let num_windows = fault_labels.len() as u64;
277    let dsfb_episode_count = episode_count as u64;
278
279    let rscr = if dsfb_episode_count > 0 {
280        raw_anomaly_count as f64 / dsfb_episode_count as f64
281    } else {
282        0.0
283    };
284
285    // Episode precision: fraction of episodes followed by a fault within W_pred
286    let mut precise_count: u64 = 0;
287    let mut i = 0;
288    while i < episode_count {
289        let ep = &episodes[i];
290        let check_end = ep.end_window + precision_window;
291        let check_end = if check_end >= num_windows { num_windows - 1 } else { check_end };
292        let mut found_fault = false;
293        let mut w = ep.start_window;
294        while w <= check_end {
295            if (w as usize) < fault_labels.len() && fault_labels[w as usize] {
296                found_fault = true;
297            }
298            w += 1;
299        }
300        if found_fault {
301            precise_count += 1;
302        }
303        i += 1;
304    }
305    let episode_precision = if dsfb_episode_count > 0 {
306        precise_count as f64 / dsfb_episode_count as f64
307    } else {
308        0.0
309    };
310
311    // Fault recall: fraction of labeled faults captured by at least one episode
312    let mut total_faults: u64 = 0;
313    let mut captured_faults: u64 = 0;
314    let mut w: usize = 0;
315    while w < fault_labels.len() {
316        if fault_labels[w] {
317            total_faults += 1;
318            // Check if any episode covers this window
319            let mut covered = false;
320            let mut j = 0;
321            while j < episode_count {
322                let ep = &episodes[j];
323                // Fault is captured if it falls within episode ± precision_window
324                if (w as u64) >= ep.start_window.saturating_sub(precision_window)
325                    && (w as u64) <= ep.end_window + precision_window
326                {
327                    covered = true;
328                }
329                j += 1;
330            }
331            if covered {
332                captured_faults += 1;
333            }
334        }
335        w += 1;
336    }
337    let fault_recall = if total_faults > 0 {
338        captured_faults as f64 / total_faults as f64
339    } else {
340        1.0 // no faults → vacuous recall
341    };
342
343    // Investigation load
344    let investigation_load_dsfb = dsfb_episode_count;
345    let investigation_load_reduction_pct = if raw_anomaly_count > 0 {
346        (1.0 - investigation_load_dsfb as f64 / raw_anomaly_count as f64) * 100.0
347    } else {
348        0.0
349    };
350
351    // Clean-window false episode rate: episodes in windows with no faults nearby
352    let false_episodes = dsfb_episode_count - precise_count;
353    let clean_windows = num_windows - total_faults;
354    let clean_window_false_episode_rate = if clean_windows > 0 {
355        false_episodes as f64 / clean_windows as f64
356    } else {
357        0.0
358    };
359
360    BenchmarkMetrics {
361        dataset_name,
362        total_windows: num_windows,
363        total_signals: num_signals,
364        raw_anomaly_count,
365        dsfb_episode_count,
366        rscr,
367        episode_precision,
368        fault_recall,
369        investigation_load_raw: raw_anomaly_count,
370        investigation_load_dsfb,
371        investigation_load_reduction_pct,
372        clean_window_false_episode_rate,
373    }
374}
375
376#[cfg(test)]
377mod tests {
378    use super::*;
379
380    #[test]
381    fn test_no_episodes_from_silent() {
382        let policies = [PolicyState::Silent; 100];
383        let reasons = [ReasonCode::Admissible; 100];
384        let drifts = [DriftDirection::None; 100];
385        let slews = [0.0_f64; 100];
386        let mut episodes = [DebugEpisode {
387            episode_id: 0, start_window: 0, end_window: 0,
388            peak_grammar_state: GrammarState::Admissible,
389            primary_reason_code: ReasonCode::Admissible,
390            matched_motif: SemanticDisposition::Unknown,
391            policy_state: PolicyState::Silent,
392            contributing_signal_count: 0,
393            structural_signature: StructuralSignature {
394                dominant_drift_direction: DriftDirection::None,
395                peak_slew_magnitude: 0.0, duration_windows: 0, signal_correlation: 0.0,
396            },
397            root_cause_signal_index: None,
398        }; 16];
399
400        let count = aggregate_episodes(
401            &policies, 1, 100, &reasons, &drifts, &slews, 5, &mut episodes,
402        );
403        assert_eq!(count, 0);
404    }
405
406    #[test]
407    fn test_single_episode() {
408        // 10 windows, 1 signal. Windows 3-5 are Escalate, rest Silent.
409        let mut policies = [PolicyState::Silent; 10];
410        policies[3] = PolicyState::Escalate;
411        policies[4] = PolicyState::Escalate;
412        policies[5] = PolicyState::Escalate;
413        let reasons = [ReasonCode::AbruptSlewViolation; 10];
414        let drifts = [DriftDirection::Positive; 10];
415        let slews = [1.0_f64; 10];
416
417        let blank = DebugEpisode {
418            episode_id: 0, start_window: 0, end_window: 0,
419            peak_grammar_state: GrammarState::Admissible,
420            primary_reason_code: ReasonCode::Admissible,
421            matched_motif: SemanticDisposition::Unknown,
422            policy_state: PolicyState::Silent,
423            contributing_signal_count: 0,
424            structural_signature: StructuralSignature {
425                dominant_drift_direction: DriftDirection::None,
426                peak_slew_magnitude: 0.0, duration_windows: 0, signal_correlation: 0.0,
427            },
428            root_cause_signal_index: None,
429        };
430        let mut episodes = [blank; 16];
431        let count = aggregate_episodes(
432            &policies, 1, 10, &reasons, &drifts, &slews, 3, &mut episodes,
433        );
434        assert_eq!(count, 1);
435        assert_eq!(episodes[0].start_window, 3);
436    }
437}