Skip to main content

net/adapter/net/contested/
correlation.rs

1//! Correlated failure detection.
2//!
3//! Wraps `FailureDetector` with a time-windowed correlation layer.
4//! Classifies failures as independent or correlated (mass failure),
5//! and identifies whether failures are concentrated in a subnet
6//! (likely partition) or spread broadly (likely infrastructure outage).
7
8use std::collections::{HashMap, VecDeque};
9use std::time::{Duration, Instant};
10
11use crate::adapter::net::subnet::SubnetId;
12
13/// Configuration for correlated failure detection.
14#[derive(Debug, Clone)]
15pub struct CorrelatedFailureConfig {
16    /// Time window for correlating failures.
17    pub correlation_window: Duration,
18    /// Fraction of tracked nodes failing within the window to trigger
19    /// mass-failure classification (0.0 - 1.0).
20    pub mass_failure_threshold: f32,
21    /// If this fraction of failures in the window share a common subnet
22    /// ancestor, classify as subnet-correlated (likely partition).
23    pub subnet_correlation_threshold: f32,
24    /// Maximum concurrent recovery actions during mass failure.
25    pub max_concurrent_migrations: usize,
26}
27
28impl Default for CorrelatedFailureConfig {
29    fn default() -> Self {
30        Self {
31            correlation_window: Duration::from_secs(2),
32            mass_failure_threshold: 0.30,
33            subnet_correlation_threshold: 0.80,
34            max_concurrent_migrations: 3,
35        }
36    }
37}
38
39/// A recorded failure event within the correlation window.
40#[derive(Debug, Clone)]
41struct FailureEvent {
42    node_id: u64,
43    detected_at: Instant,
44    _subnet: Option<SubnetId>,
45}
46
47/// Verdict from correlated failure analysis.
48#[derive(Debug, Clone)]
49pub enum CorrelationVerdict {
50    /// Independent failures — handle normally via RecoveryManager.
51    Independent {
52        /// Nodes that failed.
53        failed_nodes: Vec<u64>,
54    },
55    /// Mass correlated failure — throttle recovery.
56    MassFailure {
57        /// Nodes that failed.
58        failed_nodes: Vec<u64>,
59        /// Fraction of tracked nodes that failed.
60        failure_ratio: f32,
61        /// Suspected root cause.
62        suspected_cause: FailureCause,
63    },
64}
65
66impl CorrelationVerdict {
67    /// Get the failed nodes regardless of verdict type.
68    pub fn failed_nodes(&self) -> &[u64] {
69        match self {
70            Self::Independent { failed_nodes } => failed_nodes,
71            Self::MassFailure { failed_nodes, .. } => failed_nodes,
72        }
73    }
74
75    /// Whether this is a mass failure.
76    pub fn is_mass_failure(&self) -> bool {
77        matches!(self, Self::MassFailure { .. })
78    }
79}
80
81/// Suspected cause of a mass failure.
82#[derive(Debug, Clone, PartialEq)]
83pub enum FailureCause {
84    /// Failures concentrated in a single subnet (likely partition).
85    SubnetFailure {
86        /// The subnet ancestor where failures are concentrated.
87        subnet: SubnetId,
88        /// Fraction of failures in this subnet.
89        affected_ratio: f32,
90    },
91    /// Failures spread across subnets (likely infrastructure outage).
92    BroadOutage,
93    /// Insufficient subnet data to determine cause.
94    Unknown,
95}
96
97/// Correlated failure detector.
98///
99/// Sits alongside `FailureDetector` as a correlation layer. Consumes
100/// failure events and classifies them as independent or correlated.
101pub struct CorrelatedFailureDetector {
102    config: CorrelatedFailureConfig,
103    /// Recent failures within the correlation window.
104    recent_failures: VecDeque<FailureEvent>,
105    /// Node -> subnet mapping for correlation analysis.
106    node_subnets: HashMap<u64, SubnetId>,
107    /// Whether we're currently in mass-failure mode.
108    in_mass_failure: bool,
109}
110
111impl CorrelatedFailureDetector {
112    /// Create a new detector with the given configuration.
113    pub fn new(config: CorrelatedFailureConfig) -> Self {
114        Self {
115            config,
116            recent_failures: VecDeque::new(),
117            node_subnets: HashMap::new(),
118            in_mass_failure: false,
119        }
120    }
121
122    /// Register a node's subnet for correlation analysis.
123    pub fn register_node(&mut self, node_id: u64, subnet: SubnetId) {
124        self.node_subnets.insert(node_id, subnet);
125    }
126
127    /// Record new failures and classify them.
128    ///
129    /// Call this after `FailureDetector::check_all()` with the newly
130    /// failed nodes and the total number of tracked nodes.
131    pub fn record_failures(
132        &mut self,
133        failed_nodes: &[u64],
134        total_tracked: usize,
135    ) -> CorrelationVerdict {
136        let now = Instant::now();
137
138        // Record new failures
139        for &node_id in failed_nodes {
140            self.recent_failures.push_back(FailureEvent {
141                node_id,
142                detected_at: now,
143                _subnet: self.node_subnets.get(&node_id).copied(),
144            });
145        }
146
147        // Prune events older than the correlation window.
148        //
149        // `now - duration` panics when `duration > now.elapsed()`
150        // (shortly after process start with a long correlation
151        // window). With the default 2s window this is fine, but
152        // configurable windows of hours/days can panic on the
153        // first second of process life. Saturate to the
154        // process-start `Instant` (`now - now.elapsed()`) so the
155        // cutoff is at most as old as the earliest possible
156        // observation — equivalent to "no events to prune yet."
157        let cutoff = now
158            .checked_sub(self.config.correlation_window)
159            .unwrap_or_else(|| now - now.elapsed());
160        while self
161            .recent_failures
162            .front()
163            .is_some_and(|e| e.detected_at < cutoff)
164        {
165            self.recent_failures.pop_front();
166        }
167
168        // Count unique failures in the window.
169        //
170        // Pre-fix this collected through a `HashSet<u64>`
171        // and converted back to `Vec`, which exposed the HashSet's
172        // randomized iteration order to downstream consumers.
173        // `window_failures` flows verbatim into
174        // `PartitionRecord::other_side` (partition.rs:160), so two
175        // nodes with identical inputs produced different
176        // `other_side` orderings, breaking cross-node serialization
177        // / reconcile-ordering / replay validation. Sort + dedup
178        // gives a canonical Vec deterministic across processes.
179        let mut window_failures: Vec<u64> =
180            self.recent_failures.iter().map(|e| e.node_id).collect();
181        window_failures.sort_unstable();
182        window_failures.dedup();
183
184        if total_tracked == 0 {
185            return CorrelationVerdict::Independent {
186                failed_nodes: failed_nodes.to_vec(),
187            };
188        }
189
190        let failure_ratio = window_failures.len() as f32 / total_tracked as f32;
191
192        if failure_ratio < self.config.mass_failure_threshold {
193            self.in_mass_failure = false;
194            return CorrelationVerdict::Independent {
195                failed_nodes: failed_nodes.to_vec(),
196            };
197        }
198
199        // Mass failure detected — analyze subnet correlation
200        self.in_mass_failure = true;
201        let cause = self.analyze_subnet_correlation(&window_failures);
202
203        CorrelationVerdict::MassFailure {
204            failed_nodes: window_failures,
205            failure_ratio,
206            suspected_cause: cause,
207        }
208    }
209
210    /// How many concurrent recovery actions are allowed.
211    ///
212    /// Throttled during mass failure to avoid overloading survivors.
213    pub fn recovery_budget(&self) -> usize {
214        if self.in_mass_failure {
215            self.config.max_concurrent_migrations
216        } else {
217            usize::MAX
218        }
219    }
220
221    /// Whether we're currently in mass-failure mode.
222    pub fn in_mass_failure(&self) -> bool {
223        self.in_mass_failure
224    }
225
226    /// Clear the failure window (e.g., when conditions normalize).
227    pub fn clear_window(&mut self) {
228        self.recent_failures.clear();
229        self.in_mass_failure = false;
230    }
231
232    /// Number of failures in the current window.
233    pub fn window_size(&self) -> usize {
234        self.recent_failures.len()
235    }
236
237    /// Analyze whether failures are concentrated in a subnet subtree.
238    fn analyze_subnet_correlation(&self, failed_nodes: &[u64]) -> FailureCause {
239        let mut subnet_counts: HashMap<SubnetId, usize> = HashMap::new();
240        let mut with_subnet = 0usize;
241
242        for &node_id in failed_nodes {
243            if let Some(&subnet) = self.node_subnets.get(&node_id) {
244                with_subnet += 1;
245                // Count at each hierarchy level. The break
246                // conditions (`parent == current`, `parent.is_global`)
247                // cover every well-formed `SubnetId::parent`
248                // implementation, but a defensive depth cap
249                // matches the 4-level hierarchy and forecloses
250                // an infinite loop if a future regression in
251                // `SubnetId::parent` ever returns a non-self,
252                // non-global subnet that cycles back to an
253                // ancestor (e.g., a typo in a 4→3→2→1→4 walk
254                // returning to the deepest level). The cap is
255                // generously above the 4-level hierarchy so
256                // legitimate walks always complete inside it.
257                let mut current = subnet;
258                for _ in 0..8 {
259                    *subnet_counts.entry(current).or_insert(0) += 1;
260                    let parent = current.parent();
261                    if parent == current || parent.is_global() {
262                        break;
263                    }
264                    current = parent;
265                }
266            }
267        }
268
269        if with_subnet == 0 {
270            return FailureCause::Unknown;
271        }
272
273        // Find the most specific subnet with the highest concentration
274        // Ceiling to avoid false subnet correlation from rounding down
275        let threshold =
276            (with_subnet as f32 * self.config.subnet_correlation_threshold).ceil() as usize;
277
278        // Iterate a sorted snapshot so ties resolve deterministically:
279        // higher `depth` wins; on equal depth, the lower `SubnetId`
280        // wins (the inner `u32` comparison is a stable total order
281        // without semantic hierarchy meaning — see `SubnetId`'s
282        // `Ord` rustdoc). Iterating a `HashMap` directly with `>=` on
283        // depth as the tiebreaker would let hash iteration order
284        // (randomized per process) pick the winner, and downstream
285        // `partition.rs::detect` would brand the partition record
286        // with a subnet that flips between runs given identical
287        // inputs.
288        let mut entries: Vec<(SubnetId, usize)> =
289            subnet_counts.iter().map(|(&s, &c)| (s, c)).collect();
290        entries.sort_by(|a, b| b.0.depth().cmp(&a.0.depth()).then_with(|| a.0.cmp(&b.0)));
291
292        // Sort (above) guarantees the entries are visited deepest-
293        // first within the threshold-meeting set, so the first hit
294        // is the deterministic winner. We `break` immediately on
295        // the first hit; no `best_depth` tracking needed.
296        let mut best_subnet = None;
297        for (subnet, count) in entries {
298            if count >= threshold {
299                best_subnet = Some(subnet);
300                break;
301            }
302        }
303
304        match best_subnet {
305            Some(subnet) => {
306                #[expect(
307                    clippy::unwrap_used,
308                    reason = "subnet was just yielded by iterating subnet_counts; the key is always present"
309                )]
310                let ratio = *subnet_counts.get(&subnet).unwrap() as f32 / with_subnet as f32;
311                FailureCause::SubnetFailure {
312                    subnet,
313                    affected_ratio: ratio,
314                }
315            }
316            None => FailureCause::BroadOutage,
317        }
318    }
319}
320
321impl std::fmt::Debug for CorrelatedFailureDetector {
322    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
323        f.debug_struct("CorrelatedFailureDetector")
324            .field("window_size", &self.recent_failures.len())
325            .field("tracked_nodes", &self.node_subnets.len())
326            .field("in_mass_failure", &self.in_mass_failure)
327            .finish()
328    }
329}
330
331#[cfg(test)]
332mod tests {
333    use super::*;
334
335    fn make_detector(threshold: f32) -> CorrelatedFailureDetector {
336        CorrelatedFailureDetector::new(CorrelatedFailureConfig {
337            mass_failure_threshold: threshold,
338            ..Default::default()
339        })
340    }
341
342    #[test]
343    fn test_independent_failures() {
344        let mut det = make_detector(0.30);
345        for i in 0..10 {
346            det.register_node(i, SubnetId::new(&[1]));
347        }
348
349        // 1 out of 10 fails = 10% < 30%
350        let verdict = det.record_failures(&[0], 10);
351        assert!(!verdict.is_mass_failure());
352        assert!(!det.in_mass_failure());
353        assert_eq!(det.recovery_budget(), usize::MAX);
354    }
355
356    #[test]
357    fn test_mass_failure() {
358        let mut det = make_detector(0.30);
359        for i in 0..10 {
360            det.register_node(i, SubnetId::new(&[1]));
361        }
362
363        // 4 out of 10 fails = 40% > 30%
364        let verdict = det.record_failures(&[0, 1, 2, 3], 10);
365        assert!(verdict.is_mass_failure());
366        assert!(det.in_mass_failure());
367        assert_eq!(det.recovery_budget(), 3); // default max_concurrent_migrations
368    }
369
370    #[test]
371    fn test_subnet_correlated() {
372        let mut det = make_detector(0.30);
373        // 5 nodes in subnet [1, 1], 5 in subnet [1, 2]
374        for i in 0..5 {
375            det.register_node(i, SubnetId::new(&[1, 1]));
376        }
377        for i in 5..10 {
378            det.register_node(i, SubnetId::new(&[1, 2]));
379        }
380
381        // All 5 nodes in subnet [1, 1] fail
382        let verdict = det.record_failures(&[0, 1, 2, 3, 4], 10);
383        assert!(verdict.is_mass_failure());
384
385        if let CorrelationVerdict::MassFailure {
386            suspected_cause, ..
387        } = &verdict
388        {
389            match suspected_cause {
390                FailureCause::SubnetFailure { subnet, .. } => {
391                    // Should identify subnet [1, 1] as the correlated subnet
392                    assert_eq!(*subnet, SubnetId::new(&[1, 1]));
393                }
394                other => panic!("expected SubnetFailure, got {:?}", other),
395            }
396        }
397    }
398
399    #[test]
400    fn test_broad_outage() {
401        let mut det = make_detector(0.30);
402        // Nodes spread across 4 different subnets
403        det.register_node(0, SubnetId::new(&[1]));
404        det.register_node(1, SubnetId::new(&[2]));
405        det.register_node(2, SubnetId::new(&[3]));
406        det.register_node(3, SubnetId::new(&[4]));
407        for i in 4..10 {
408            det.register_node(i, SubnetId::new(&[(i + 1) as u8]));
409        }
410
411        // Failures spread across all subnets
412        let verdict = det.record_failures(&[0, 1, 2, 3], 10);
413        assert!(verdict.is_mass_failure());
414
415        if let CorrelationVerdict::MassFailure {
416            suspected_cause, ..
417        } = &verdict
418        {
419            assert_eq!(*suspected_cause, FailureCause::BroadOutage);
420        }
421    }
422
423    /// Regression for BUG_AUDIT_2026_04_30_CORE.md #91: previously
424    /// `analyze_subnet_correlation` iterated `subnet_counts` (a
425    /// `HashMap`) directly with `>=` on depth as the tiebreaker.
426    /// On tied `best_depth`, the chosen subnet depended on hash
427    /// iteration order, which `std::collections::HashMap` randomizes
428    /// per process — recovery scope flipped between runs given
429    /// identical inputs.
430    ///
431    /// We pin the deterministic-tiebreak fix by:
432    ///   1. Building a scenario with two equally-deep subnets
433    ///      that both meet the correlation threshold and have
434    ///      equal failure counts (the pre-fix nondeterminism
435    ///      window).
436    ///   2. Running the analysis many times back-to-back. The
437    ///      same `CorrelatedFailureDetector` is rebuilt each
438    ///      iteration to maximize the chance the underlying
439    ///      hasher state shifts.
440    ///   3. Asserting every run picks the same subnet — the
441    ///      lower `SubnetId` wins on ties (per the new sort).
442    ///
443    /// Pre-fix this would intermittently return `SubnetId::new(&[1, 2])`
444    /// instead of `SubnetId::new(&[1, 1])`.
445    #[test]
446    fn ties_resolve_deterministically_across_runs() {
447        for _attempt in 0..32 {
448            // Two sibling subnets at depth 2, each with 3 nodes.
449            // Threshold of 0.30 means a subnet needs count ≥
450            // ceil(6 * 0.30) = 2 to qualify. Both [1,1] and [1,2]
451            // hit count=3 — the tied case the pre-fix code
452            // resolved nondeterministically. (The default
453            // `subnet_correlation_threshold` of 0.80 would put
454            // the threshold at 5 and select only the parent
455            // rollup, so we override it explicitly.)
456            let mut det = CorrelatedFailureDetector::new(CorrelatedFailureConfig {
457                mass_failure_threshold: 0.30,
458                subnet_correlation_threshold: 0.30,
459                ..Default::default()
460            });
461            for i in 0..3 {
462                det.register_node(i, SubnetId::new(&[1, 1]));
463            }
464            for i in 3..6 {
465                det.register_node(i, SubnetId::new(&[1, 2]));
466            }
467            for i in 6..10 {
468                det.register_node(i, SubnetId::new(&[2, (i as u8)]));
469            }
470
471            // Fail all 6 nodes in [1,1] + [1,2]. with_subnet=6.
472            // Both [1,1] and [1,2] hit count=3 ≥ 2 at depth 2;
473            // [1] hits count=6 at depth 1. Pre-fix the loop
474            // would pick whichever depth-2 child the HashMap
475            // visited last in iteration order.
476            let verdict = det.record_failures(&[0, 1, 2, 3, 4, 5], 20);
477            assert!(verdict.is_mass_failure());
478            if let CorrelationVerdict::MassFailure {
479                suspected_cause, ..
480            } = &verdict
481            {
482                match suspected_cause {
483                    FailureCause::SubnetFailure { subnet, .. } => {
484                        // Deterministic tiebreak: lower id wins
485                        // on equal depth. `SubnetId::new(&[1, 1])`
486                        // < `SubnetId::new(&[1, 2])` under the
487                        // derived `Ord` on the inner u32.
488                        assert_eq!(
489                            *subnet,
490                            SubnetId::new(&[1, 1]),
491                            "tied subnets at the same depth must \
492                             resolve to the lower SubnetId every \
493                             run — pre-fix this flipped between \
494                             [1,1] and [1,2] depending on hash \
495                             iteration order"
496                        );
497                    }
498                    other => panic!("expected SubnetFailure, got {:?}", other),
499                }
500            }
501        }
502    }
503
504    #[test]
505    fn test_clear_window() {
506        let mut det = make_detector(0.30);
507        for i in 0..10 {
508            det.register_node(i, SubnetId::new(&[1]));
509        }
510
511        det.record_failures(&[0, 1, 2, 3], 10);
512        assert!(det.in_mass_failure());
513
514        det.clear_window();
515        assert!(!det.in_mass_failure());
516        assert_eq!(det.window_size(), 0);
517    }
518
519    #[test]
520    fn test_no_subnet_data() {
521        let mut det = make_detector(0.30);
522        // Don't register any subnets
523
524        let verdict = det.record_failures(&[0, 1, 2, 3], 10);
525        assert!(verdict.is_mass_failure());
526
527        if let CorrelationVerdict::MassFailure {
528            suspected_cause, ..
529        } = &verdict
530        {
531            assert_eq!(*suspected_cause, FailureCause::Unknown);
532        }
533    }
534
535    /// Pin: a correlation window longer than the time the process
536    /// has been running must NOT panic in the prune path. Pre-fix
537    /// `now - self.config.correlation_window` panicked when
538    /// `correlation_window > now.elapsed()` — trivially reachable
539    /// for any operator-tunable window of hours/days that's
540    /// hit during the first second of process startup.
541    #[test]
542    fn record_failures_does_not_panic_with_long_window_at_startup() {
543        let mut det = CorrelatedFailureDetector::new(CorrelatedFailureConfig {
544            mass_failure_threshold: 0.30,
545            // A correlation window much larger than any plausible
546            // process-uptime-since-Instant-creation. Pre-fix this
547            // panicked inside `now - duration`.
548            correlation_window: std::time::Duration::from_secs(86_400 * 365),
549            ..Default::default()
550        });
551        for i in 0..10 {
552            det.register_node(i, SubnetId::new(&[1]));
553        }
554        // Should not panic; should still produce a valid verdict.
555        let verdict = det.record_failures(&[0, 1, 2, 3], 10);
556        assert!(verdict.is_mass_failure());
557    }
558
559    /// `failed_nodes` in the verdict (sourced from
560    /// `window_failures` after dedup) must be sorted, not in
561    /// arbitrary HashSet iteration order. Pre-fix the same input
562    /// could produce different orderings on each run / process,
563    /// breaking serialization parity across nodes that observe
564    /// the same partition.
565    ///
566    /// We can't easily test "different across processes" in a
567    /// single test run, but we CAN check the ordering is
568    /// monotonic, which is a strong proxy: a sorted output is
569    /// canonical, while a HashSet output is not.
570    #[test]
571    fn mass_failure_failed_nodes_are_sorted_canonically() {
572        let mut det = make_detector(0.30);
573        for i in 0..10 {
574            det.register_node(i, SubnetId::new(&[1]));
575        }
576
577        // Record failures in a deliberately-unsorted order.
578        let verdict = det.record_failures(&[7, 2, 9, 4, 0, 5, 8, 1], 10);
579        assert!(verdict.is_mass_failure());
580        if let CorrelationVerdict::MassFailure { failed_nodes, .. } = &verdict {
581            let mut sorted = failed_nodes.clone();
582            sorted.sort_unstable();
583            assert_eq!(
584                failed_nodes, &sorted,
585                "MassFailure.failed_nodes must be in canonical \
586                 (sorted) order; pre-fix it leaked HashSet iteration order \
587                 and varied per process"
588            );
589        }
590    }
591}