net/adapter/net/contested/correlation.rs
1//! Correlated failure detection.
2//!
3//! Wraps `FailureDetector` with a time-windowed correlation layer.
4//! Classifies failures as independent or correlated (mass failure),
5//! and identifies whether failures are concentrated in a subnet
6//! (likely partition) or spread broadly (likely infrastructure outage).
7
8use std::collections::{HashMap, VecDeque};
9use std::time::{Duration, Instant};
10
11use crate::adapter::net::subnet::SubnetId;
12
13/// Configuration for correlated failure detection.
14#[derive(Debug, Clone)]
15pub struct CorrelatedFailureConfig {
16 /// Time window for correlating failures.
17 pub correlation_window: Duration,
18 /// Fraction of tracked nodes failing within the window to trigger
19 /// mass-failure classification (0.0 - 1.0).
20 pub mass_failure_threshold: f32,
21 /// If this fraction of failures in the window share a common subnet
22 /// ancestor, classify as subnet-correlated (likely partition).
23 pub subnet_correlation_threshold: f32,
24 /// Maximum concurrent recovery actions during mass failure.
25 pub max_concurrent_migrations: usize,
26}
27
28impl Default for CorrelatedFailureConfig {
29 fn default() -> Self {
30 Self {
31 correlation_window: Duration::from_secs(2),
32 mass_failure_threshold: 0.30,
33 subnet_correlation_threshold: 0.80,
34 max_concurrent_migrations: 3,
35 }
36 }
37}
38
39/// A recorded failure event within the correlation window.
40#[derive(Debug, Clone)]
41struct FailureEvent {
42 node_id: u64,
43 detected_at: Instant,
44 _subnet: Option<SubnetId>,
45}
46
47/// Verdict from correlated failure analysis.
48#[derive(Debug, Clone)]
49pub enum CorrelationVerdict {
50 /// Independent failures — handle normally via RecoveryManager.
51 Independent {
52 /// Nodes that failed.
53 failed_nodes: Vec<u64>,
54 },
55 /// Mass correlated failure — throttle recovery.
56 MassFailure {
57 /// Nodes that failed.
58 failed_nodes: Vec<u64>,
59 /// Fraction of tracked nodes that failed.
60 failure_ratio: f32,
61 /// Suspected root cause.
62 suspected_cause: FailureCause,
63 },
64}
65
66impl CorrelationVerdict {
67 /// Get the failed nodes regardless of verdict type.
68 pub fn failed_nodes(&self) -> &[u64] {
69 match self {
70 Self::Independent { failed_nodes } => failed_nodes,
71 Self::MassFailure { failed_nodes, .. } => failed_nodes,
72 }
73 }
74
75 /// Whether this is a mass failure.
76 pub fn is_mass_failure(&self) -> bool {
77 matches!(self, Self::MassFailure { .. })
78 }
79}
80
81/// Suspected cause of a mass failure.
82#[derive(Debug, Clone, PartialEq)]
83pub enum FailureCause {
84 /// Failures concentrated in a single subnet (likely partition).
85 SubnetFailure {
86 /// The subnet ancestor where failures are concentrated.
87 subnet: SubnetId,
88 /// Fraction of failures in this subnet.
89 affected_ratio: f32,
90 },
91 /// Failures spread across subnets (likely infrastructure outage).
92 BroadOutage,
93 /// Insufficient subnet data to determine cause.
94 Unknown,
95}
96
97/// Correlated failure detector.
98///
99/// Sits alongside `FailureDetector` as a correlation layer. Consumes
100/// failure events and classifies them as independent or correlated.
101pub struct CorrelatedFailureDetector {
102 config: CorrelatedFailureConfig,
103 /// Recent failures within the correlation window.
104 recent_failures: VecDeque<FailureEvent>,
105 /// Node -> subnet mapping for correlation analysis.
106 node_subnets: HashMap<u64, SubnetId>,
107 /// Whether we're currently in mass-failure mode.
108 in_mass_failure: bool,
109}
110
111impl CorrelatedFailureDetector {
112 /// Create a new detector with the given configuration.
113 pub fn new(config: CorrelatedFailureConfig) -> Self {
114 Self {
115 config,
116 recent_failures: VecDeque::new(),
117 node_subnets: HashMap::new(),
118 in_mass_failure: false,
119 }
120 }
121
122 /// Register a node's subnet for correlation analysis.
123 pub fn register_node(&mut self, node_id: u64, subnet: SubnetId) {
124 self.node_subnets.insert(node_id, subnet);
125 }
126
127 /// Record new failures and classify them.
128 ///
129 /// Call this after `FailureDetector::check_all()` with the newly
130 /// failed nodes and the total number of tracked nodes.
131 pub fn record_failures(
132 &mut self,
133 failed_nodes: &[u64],
134 total_tracked: usize,
135 ) -> CorrelationVerdict {
136 let now = Instant::now();
137
138 // Record new failures
139 for &node_id in failed_nodes {
140 self.recent_failures.push_back(FailureEvent {
141 node_id,
142 detected_at: now,
143 _subnet: self.node_subnets.get(&node_id).copied(),
144 });
145 }
146
147 // Prune events older than the correlation window.
148 //
149 // `now - duration` panics when `duration > now.elapsed()`
150 // (shortly after process start with a long correlation
151 // window). With the default 2s window this is fine, but
152 // configurable windows of hours/days can panic on the
153 // first second of process life. Saturate to the
154 // process-start `Instant` (`now - now.elapsed()`) so the
155 // cutoff is at most as old as the earliest possible
156 // observation — equivalent to "no events to prune yet."
157 let cutoff = now
158 .checked_sub(self.config.correlation_window)
159 .unwrap_or_else(|| now - now.elapsed());
160 while self
161 .recent_failures
162 .front()
163 .is_some_and(|e| e.detected_at < cutoff)
164 {
165 self.recent_failures.pop_front();
166 }
167
168 // Count unique failures in the window.
169 //
170 // Pre-fix this collected through a `HashSet<u64>`
171 // and converted back to `Vec`, which exposed the HashSet's
172 // randomized iteration order to downstream consumers.
173 // `window_failures` flows verbatim into
174 // `PartitionRecord::other_side` (partition.rs:160), so two
175 // nodes with identical inputs produced different
176 // `other_side` orderings, breaking cross-node serialization
177 // / reconcile-ordering / replay validation. Sort + dedup
178 // gives a canonical Vec deterministic across processes.
179 let mut window_failures: Vec<u64> =
180 self.recent_failures.iter().map(|e| e.node_id).collect();
181 window_failures.sort_unstable();
182 window_failures.dedup();
183
184 if total_tracked == 0 {
185 return CorrelationVerdict::Independent {
186 failed_nodes: failed_nodes.to_vec(),
187 };
188 }
189
190 let failure_ratio = window_failures.len() as f32 / total_tracked as f32;
191
192 if failure_ratio < self.config.mass_failure_threshold {
193 self.in_mass_failure = false;
194 return CorrelationVerdict::Independent {
195 failed_nodes: failed_nodes.to_vec(),
196 };
197 }
198
199 // Mass failure detected — analyze subnet correlation
200 self.in_mass_failure = true;
201 let cause = self.analyze_subnet_correlation(&window_failures);
202
203 CorrelationVerdict::MassFailure {
204 failed_nodes: window_failures,
205 failure_ratio,
206 suspected_cause: cause,
207 }
208 }
209
210 /// How many concurrent recovery actions are allowed.
211 ///
212 /// Throttled during mass failure to avoid overloading survivors.
213 pub fn recovery_budget(&self) -> usize {
214 if self.in_mass_failure {
215 self.config.max_concurrent_migrations
216 } else {
217 usize::MAX
218 }
219 }
220
221 /// Whether we're currently in mass-failure mode.
222 pub fn in_mass_failure(&self) -> bool {
223 self.in_mass_failure
224 }
225
226 /// Clear the failure window (e.g., when conditions normalize).
227 pub fn clear_window(&mut self) {
228 self.recent_failures.clear();
229 self.in_mass_failure = false;
230 }
231
232 /// Number of failures in the current window.
233 pub fn window_size(&self) -> usize {
234 self.recent_failures.len()
235 }
236
237 /// Analyze whether failures are concentrated in a subnet subtree.
238 fn analyze_subnet_correlation(&self, failed_nodes: &[u64]) -> FailureCause {
239 let mut subnet_counts: HashMap<SubnetId, usize> = HashMap::new();
240 let mut with_subnet = 0usize;
241
242 for &node_id in failed_nodes {
243 if let Some(&subnet) = self.node_subnets.get(&node_id) {
244 with_subnet += 1;
245 // Count at each hierarchy level. The break
246 // conditions (`parent == current`, `parent.is_global`)
247 // cover every well-formed `SubnetId::parent`
248 // implementation, but a defensive depth cap
249 // matches the 4-level hierarchy and forecloses
250 // an infinite loop if a future regression in
251 // `SubnetId::parent` ever returns a non-self,
252 // non-global subnet that cycles back to an
253 // ancestor (e.g., a typo in a 4→3→2→1→4 walk
254 // returning to the deepest level). The cap is
255 // generously above the 4-level hierarchy so
256 // legitimate walks always complete inside it.
257 let mut current = subnet;
258 for _ in 0..8 {
259 *subnet_counts.entry(current).or_insert(0) += 1;
260 let parent = current.parent();
261 if parent == current || parent.is_global() {
262 break;
263 }
264 current = parent;
265 }
266 }
267 }
268
269 if with_subnet == 0 {
270 return FailureCause::Unknown;
271 }
272
273 // Find the most specific subnet with the highest concentration
274 // Ceiling to avoid false subnet correlation from rounding down
275 let threshold =
276 (with_subnet as f32 * self.config.subnet_correlation_threshold).ceil() as usize;
277
278 // Iterate a sorted snapshot so ties resolve deterministically:
279 // higher `depth` wins; on equal depth, the lower `SubnetId`
280 // wins (the inner `u32` comparison is a stable total order
281 // without semantic hierarchy meaning — see `SubnetId`'s
282 // `Ord` rustdoc). Iterating a `HashMap` directly with `>=` on
283 // depth as the tiebreaker would let hash iteration order
284 // (randomized per process) pick the winner, and downstream
285 // `partition.rs::detect` would brand the partition record
286 // with a subnet that flips between runs given identical
287 // inputs.
288 let mut entries: Vec<(SubnetId, usize)> =
289 subnet_counts.iter().map(|(&s, &c)| (s, c)).collect();
290 entries.sort_by(|a, b| b.0.depth().cmp(&a.0.depth()).then_with(|| a.0.cmp(&b.0)));
291
292 // Sort (above) guarantees the entries are visited deepest-
293 // first within the threshold-meeting set, so the first hit
294 // is the deterministic winner. We `break` immediately on
295 // the first hit; no `best_depth` tracking needed.
296 let mut best_subnet = None;
297 for (subnet, count) in entries {
298 if count >= threshold {
299 best_subnet = Some(subnet);
300 break;
301 }
302 }
303
304 match best_subnet {
305 Some(subnet) => {
306 #[expect(
307 clippy::unwrap_used,
308 reason = "subnet was just yielded by iterating subnet_counts; the key is always present"
309 )]
310 let ratio = *subnet_counts.get(&subnet).unwrap() as f32 / with_subnet as f32;
311 FailureCause::SubnetFailure {
312 subnet,
313 affected_ratio: ratio,
314 }
315 }
316 None => FailureCause::BroadOutage,
317 }
318 }
319}
320
321impl std::fmt::Debug for CorrelatedFailureDetector {
322 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
323 f.debug_struct("CorrelatedFailureDetector")
324 .field("window_size", &self.recent_failures.len())
325 .field("tracked_nodes", &self.node_subnets.len())
326 .field("in_mass_failure", &self.in_mass_failure)
327 .finish()
328 }
329}
330
331#[cfg(test)]
332mod tests {
333 use super::*;
334
335 fn make_detector(threshold: f32) -> CorrelatedFailureDetector {
336 CorrelatedFailureDetector::new(CorrelatedFailureConfig {
337 mass_failure_threshold: threshold,
338 ..Default::default()
339 })
340 }
341
342 #[test]
343 fn test_independent_failures() {
344 let mut det = make_detector(0.30);
345 for i in 0..10 {
346 det.register_node(i, SubnetId::new(&[1]));
347 }
348
349 // 1 out of 10 fails = 10% < 30%
350 let verdict = det.record_failures(&[0], 10);
351 assert!(!verdict.is_mass_failure());
352 assert!(!det.in_mass_failure());
353 assert_eq!(det.recovery_budget(), usize::MAX);
354 }
355
356 #[test]
357 fn test_mass_failure() {
358 let mut det = make_detector(0.30);
359 for i in 0..10 {
360 det.register_node(i, SubnetId::new(&[1]));
361 }
362
363 // 4 out of 10 fails = 40% > 30%
364 let verdict = det.record_failures(&[0, 1, 2, 3], 10);
365 assert!(verdict.is_mass_failure());
366 assert!(det.in_mass_failure());
367 assert_eq!(det.recovery_budget(), 3); // default max_concurrent_migrations
368 }
369
370 #[test]
371 fn test_subnet_correlated() {
372 let mut det = make_detector(0.30);
373 // 5 nodes in subnet [1, 1], 5 in subnet [1, 2]
374 for i in 0..5 {
375 det.register_node(i, SubnetId::new(&[1, 1]));
376 }
377 for i in 5..10 {
378 det.register_node(i, SubnetId::new(&[1, 2]));
379 }
380
381 // All 5 nodes in subnet [1, 1] fail
382 let verdict = det.record_failures(&[0, 1, 2, 3, 4], 10);
383 assert!(verdict.is_mass_failure());
384
385 if let CorrelationVerdict::MassFailure {
386 suspected_cause, ..
387 } = &verdict
388 {
389 match suspected_cause {
390 FailureCause::SubnetFailure { subnet, .. } => {
391 // Should identify subnet [1, 1] as the correlated subnet
392 assert_eq!(*subnet, SubnetId::new(&[1, 1]));
393 }
394 other => panic!("expected SubnetFailure, got {:?}", other),
395 }
396 }
397 }
398
399 #[test]
400 fn test_broad_outage() {
401 let mut det = make_detector(0.30);
402 // Nodes spread across 4 different subnets
403 det.register_node(0, SubnetId::new(&[1]));
404 det.register_node(1, SubnetId::new(&[2]));
405 det.register_node(2, SubnetId::new(&[3]));
406 det.register_node(3, SubnetId::new(&[4]));
407 for i in 4..10 {
408 det.register_node(i, SubnetId::new(&[(i + 1) as u8]));
409 }
410
411 // Failures spread across all subnets
412 let verdict = det.record_failures(&[0, 1, 2, 3], 10);
413 assert!(verdict.is_mass_failure());
414
415 if let CorrelationVerdict::MassFailure {
416 suspected_cause, ..
417 } = &verdict
418 {
419 assert_eq!(*suspected_cause, FailureCause::BroadOutage);
420 }
421 }
422
423 /// Regression for BUG_AUDIT_2026_04_30_CORE.md #91: previously
424 /// `analyze_subnet_correlation` iterated `subnet_counts` (a
425 /// `HashMap`) directly with `>=` on depth as the tiebreaker.
426 /// On tied `best_depth`, the chosen subnet depended on hash
427 /// iteration order, which `std::collections::HashMap` randomizes
428 /// per process — recovery scope flipped between runs given
429 /// identical inputs.
430 ///
431 /// We pin the deterministic-tiebreak fix by:
432 /// 1. Building a scenario with two equally-deep subnets
433 /// that both meet the correlation threshold and have
434 /// equal failure counts (the pre-fix nondeterminism
435 /// window).
436 /// 2. Running the analysis many times back-to-back. The
437 /// same `CorrelatedFailureDetector` is rebuilt each
438 /// iteration to maximize the chance the underlying
439 /// hasher state shifts.
440 /// 3. Asserting every run picks the same subnet — the
441 /// lower `SubnetId` wins on ties (per the new sort).
442 ///
443 /// Pre-fix this would intermittently return `SubnetId::new(&[1, 2])`
444 /// instead of `SubnetId::new(&[1, 1])`.
445 #[test]
446 fn ties_resolve_deterministically_across_runs() {
447 for _attempt in 0..32 {
448 // Two sibling subnets at depth 2, each with 3 nodes.
449 // Threshold of 0.30 means a subnet needs count ≥
450 // ceil(6 * 0.30) = 2 to qualify. Both [1,1] and [1,2]
451 // hit count=3 — the tied case the pre-fix code
452 // resolved nondeterministically. (The default
453 // `subnet_correlation_threshold` of 0.80 would put
454 // the threshold at 5 and select only the parent
455 // rollup, so we override it explicitly.)
456 let mut det = CorrelatedFailureDetector::new(CorrelatedFailureConfig {
457 mass_failure_threshold: 0.30,
458 subnet_correlation_threshold: 0.30,
459 ..Default::default()
460 });
461 for i in 0..3 {
462 det.register_node(i, SubnetId::new(&[1, 1]));
463 }
464 for i in 3..6 {
465 det.register_node(i, SubnetId::new(&[1, 2]));
466 }
467 for i in 6..10 {
468 det.register_node(i, SubnetId::new(&[2, (i as u8)]));
469 }
470
471 // Fail all 6 nodes in [1,1] + [1,2]. with_subnet=6.
472 // Both [1,1] and [1,2] hit count=3 ≥ 2 at depth 2;
473 // [1] hits count=6 at depth 1. Pre-fix the loop
474 // would pick whichever depth-2 child the HashMap
475 // visited last in iteration order.
476 let verdict = det.record_failures(&[0, 1, 2, 3, 4, 5], 20);
477 assert!(verdict.is_mass_failure());
478 if let CorrelationVerdict::MassFailure {
479 suspected_cause, ..
480 } = &verdict
481 {
482 match suspected_cause {
483 FailureCause::SubnetFailure { subnet, .. } => {
484 // Deterministic tiebreak: lower id wins
485 // on equal depth. `SubnetId::new(&[1, 1])`
486 // < `SubnetId::new(&[1, 2])` under the
487 // derived `Ord` on the inner u32.
488 assert_eq!(
489 *subnet,
490 SubnetId::new(&[1, 1]),
491 "tied subnets at the same depth must \
492 resolve to the lower SubnetId every \
493 run — pre-fix this flipped between \
494 [1,1] and [1,2] depending on hash \
495 iteration order"
496 );
497 }
498 other => panic!("expected SubnetFailure, got {:?}", other),
499 }
500 }
501 }
502 }
503
504 #[test]
505 fn test_clear_window() {
506 let mut det = make_detector(0.30);
507 for i in 0..10 {
508 det.register_node(i, SubnetId::new(&[1]));
509 }
510
511 det.record_failures(&[0, 1, 2, 3], 10);
512 assert!(det.in_mass_failure());
513
514 det.clear_window();
515 assert!(!det.in_mass_failure());
516 assert_eq!(det.window_size(), 0);
517 }
518
519 #[test]
520 fn test_no_subnet_data() {
521 let mut det = make_detector(0.30);
522 // Don't register any subnets
523
524 let verdict = det.record_failures(&[0, 1, 2, 3], 10);
525 assert!(verdict.is_mass_failure());
526
527 if let CorrelationVerdict::MassFailure {
528 suspected_cause, ..
529 } = &verdict
530 {
531 assert_eq!(*suspected_cause, FailureCause::Unknown);
532 }
533 }
534
535 /// Pin: a correlation window longer than the time the process
536 /// has been running must NOT panic in the prune path. Pre-fix
537 /// `now - self.config.correlation_window` panicked when
538 /// `correlation_window > now.elapsed()` — trivially reachable
539 /// for any operator-tunable window of hours/days that's
540 /// hit during the first second of process startup.
541 #[test]
542 fn record_failures_does_not_panic_with_long_window_at_startup() {
543 let mut det = CorrelatedFailureDetector::new(CorrelatedFailureConfig {
544 mass_failure_threshold: 0.30,
545 // A correlation window much larger than any plausible
546 // process-uptime-since-Instant-creation. Pre-fix this
547 // panicked inside `now - duration`.
548 correlation_window: std::time::Duration::from_secs(86_400 * 365),
549 ..Default::default()
550 });
551 for i in 0..10 {
552 det.register_node(i, SubnetId::new(&[1]));
553 }
554 // Should not panic; should still produce a valid verdict.
555 let verdict = det.record_failures(&[0, 1, 2, 3], 10);
556 assert!(verdict.is_mass_failure());
557 }
558
559 /// `failed_nodes` in the verdict (sourced from
560 /// `window_failures` after dedup) must be sorted, not in
561 /// arbitrary HashSet iteration order. Pre-fix the same input
562 /// could produce different orderings on each run / process,
563 /// breaking serialization parity across nodes that observe
564 /// the same partition.
565 ///
566 /// We can't easily test "different across processes" in a
567 /// single test run, but we CAN check the ordering is
568 /// monotonic, which is a strong proxy: a sorted output is
569 /// canonical, while a HashSet output is not.
570 #[test]
571 fn mass_failure_failed_nodes_are_sorted_canonically() {
572 let mut det = make_detector(0.30);
573 for i in 0..10 {
574 det.register_node(i, SubnetId::new(&[1]));
575 }
576
577 // Record failures in a deliberately-unsorted order.
578 let verdict = det.record_failures(&[7, 2, 9, 4, 0, 5, 8, 1], 10);
579 assert!(verdict.is_mass_failure());
580 if let CorrelationVerdict::MassFailure { failed_nodes, .. } = &verdict {
581 let mut sorted = failed_nodes.clone();
582 sorted.sort_unstable();
583 assert_eq!(
584 failed_nodes, &sorted,
585 "MassFailure.failed_nodes must be in canonical \
586 (sorted) order; pre-fix it leaked HashSet iteration order \
587 and varied per process"
588 );
589 }
590 }
591}