Skip to main content

net/adapter/net/contested/
partition.rs

1//! Partition detection and healing.
2//!
3//! Detects when a mass failure is actually a network partition (asymmetric
4//! visibility), tracks partition state, and detects healing when nodes
5//! from the other side reappear.
6
7use std::time::Instant;
8
9use super::correlation::{CorrelationVerdict, FailureCause};
10use crate::adapter::net::state::horizon::ObservedHorizon;
11use crate::adapter::net::subnet::SubnetId;
12
13/// Lifecycle phase of a detected partition.
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum PartitionPhase {
16    /// Partition suspected but not confirmed.
17    Suspected,
18    /// Partition confirmed (other side is alive but unreachable).
19    Confirmed,
20    /// Partition healing — some nodes reappearing.
21    Healing {
22        /// Nodes from the other side that have reappeared.
23        reappeared: Vec<u64>,
24    },
25    /// Partition healed, reconciliation needed.
26    Healed,
27}
28
29/// Record of a detected partition.
30#[derive(Debug, Clone)]
31pub struct PartitionRecord {
32    /// Unique partition ID (timestamp-based).
33    id: u64,
34    /// When the partition was detected.
35    detected_at: Instant,
36    /// Nodes we can still reach.
37    our_side: Vec<u64>,
38    /// Nodes we lost.
39    other_side: Vec<u64>,
40    /// Subnet where the failure was concentrated (if identified).
41    partition_subnet: Option<SubnetId>,
42    /// Current phase.
43    phase: PartitionPhase,
44    /// Snapshot of our ObservedHorizon at partition time (reconciliation baseline).
45    our_horizon_at_split: ObservedHorizon,
46}
47
48impl PartitionRecord {
49    /// Get the partition ID.
50    #[inline]
51    pub fn id(&self) -> u64 {
52        self.id
53    }
54
55    /// Get our side nodes.
56    pub fn our_side(&self) -> &[u64] {
57        &self.our_side
58    }
59
60    /// Get the other side nodes.
61    pub fn other_side(&self) -> &[u64] {
62        &self.other_side
63    }
64
65    /// Get the partition subnet.
66    pub fn partition_subnet(&self) -> Option<SubnetId> {
67        self.partition_subnet
68    }
69
70    /// Get the current phase.
71    pub fn phase(&self) -> &PartitionPhase {
72        &self.phase
73    }
74
75    /// Get the horizon snapshot at split time.
76    pub fn horizon_at_split(&self) -> &ObservedHorizon {
77        &self.our_horizon_at_split
78    }
79
80    /// How long the partition has been active.
81    pub fn duration(&self) -> std::time::Duration {
82        self.detected_at.elapsed()
83    }
84
85    /// Fraction of the other side that has reappeared.
86    pub fn healing_progress(&self) -> f32 {
87        if self.other_side.is_empty() {
88            return 1.0;
89        }
90        match &self.phase {
91            PartitionPhase::Healing { reappeared } => {
92                reappeared.len() as f32 / self.other_side.len() as f32
93            }
94            PartitionPhase::Healed => 1.0,
95            _ => 0.0,
96        }
97    }
98}
99
100/// Partition detector.
101///
102/// Tracks active partitions and detects healing when nodes reappear.
103pub struct PartitionDetector {
104    /// Active partition records.
105    active_partitions: Vec<PartitionRecord>,
106    /// Healing threshold — fraction of other_side that must reappear
107    /// for the partition to be considered healed.
108    healing_threshold: f32,
109    /// Counter for generating partition IDs.
110    next_id: u64,
111}
112
113impl PartitionDetector {
114    /// Create a new partition detector.
115    pub fn new() -> Self {
116        Self {
117            active_partitions: Vec::new(),
118            healing_threshold: 0.50,
119            next_id: 1,
120        }
121    }
122
123    /// Set the healing threshold (fraction of other_side that must reappear).
124    pub fn with_healing_threshold(mut self, threshold: f32) -> Self {
125        self.healing_threshold = threshold;
126        self
127    }
128
129    /// Attempt to detect a partition from a correlation verdict.
130    ///
131    /// Only creates a partition record for `MassFailure` with `SubnetFailure` cause.
132    /// Returns the partition ID if created.
133    pub fn detect(
134        &mut self,
135        verdict: &CorrelationVerdict,
136        healthy_nodes: &[u64],
137        current_horizon: &ObservedHorizon,
138    ) -> Option<u64> {
139        let (failed_nodes, cause) = match verdict {
140            CorrelationVerdict::MassFailure {
141                failed_nodes,
142                suspected_cause,
143                ..
144            } => (failed_nodes, suspected_cause),
145            _ => return None,
146        };
147
148        let partition_subnet = match cause {
149            FailureCause::SubnetFailure { subnet, .. } => Some(*subnet),
150            _ => return None, // broad outage, not a partition
151        };
152
153        // Pre-fix `self.next_id += 1` panicked in debug
154        // and wrapped to 0 in release at u64::MAX, reusing partition
155        // ID 0 — `confirm` / `find_mut` would then operate on the
156        // wrong record. Astronomical in practice (a node would have
157        // to create ~1.8e19 partition records, which is absurd
158        // even for very long uptimes), but cheap to guard against
159        // a wrap that would silently corrupt partition tracking.
160        let id = self.next_id;
161        self.next_id = self.next_id.checked_add(1).unwrap_or_else(|| {
162            // Saturate by leaving next_id at u64::MAX. The next
163            // detection call will see `id = u64::MAX` again,
164            // re-issue it, and stay at u64::MAX. Two records with
165            // the same id is a recoverable failure mode (the
166            // operator notices duplicate partition tracking) where
167            // wraparound to 0 is silent.
168            tracing::error!(
169                "partition next_id reached u64::MAX; saturating to avoid \
170                 wrap-to-0 collisions with active records"
171            );
172            u64::MAX
173        });
174
175        let record = PartitionRecord {
176            id,
177            detected_at: Instant::now(),
178            our_side: healthy_nodes.to_vec(),
179            other_side: failed_nodes.clone(),
180            partition_subnet,
181            phase: PartitionPhase::Suspected,
182            our_horizon_at_split: current_horizon.clone(),
183        };
184
185        self.active_partitions.push(record);
186        Some(id)
187    }
188
189    /// Confirm a partition (e.g., received gossip that other side is alive).
190    pub fn confirm(&mut self, partition_id: u64) -> bool {
191        if let Some(record) = self.find_mut(partition_id) {
192            if record.phase == PartitionPhase::Suspected {
193                record.phase = PartitionPhase::Confirmed;
194                return true;
195            }
196        }
197        false
198    }
199
200    /// Record that a node has recovered (reappeared after failure).
201    ///
202    /// If the node was in any partition's `other_side`, transitions
203    /// the partition toward healing.
204    ///
205    /// # Overlapping partitions
206    ///
207    /// A single node id can appear in `other_side` of
208    /// multiple active partition records (e.g., a noisy detector
209    /// classified one physical outage into two records). This
210    /// function intentionally walks **all** matching records and
211    /// updates each independently — each record is the source of
212    /// truth for its own healing state.
213    ///
214    /// Downstream consumers that fire side-effecting healing
215    /// actions per partition (replica rebalance, alert dispatch)
216    /// must be idempotent over `(partition_id, recovered_node)`
217    /// pairs, otherwise overlapping records will double-count
218    /// one physical recovery. The detector layer is the place to
219    /// prevent overlaps; this layer is just bookkeeping.
220    pub fn on_node_recovery(&mut self, node_id: u64) {
221        for record in &mut self.active_partitions {
222            if !record.other_side.contains(&node_id) {
223                continue;
224            }
225
226            match &mut record.phase {
227                PartitionPhase::Suspected | PartitionPhase::Confirmed => {
228                    record.phase = PartitionPhase::Healing {
229                        reappeared: vec![node_id],
230                    };
231                }
232                PartitionPhase::Healing { reappeared } => {
233                    if !reappeared.contains(&node_id) {
234                        reappeared.push(node_id);
235                    }
236                }
237                PartitionPhase::Healed => {}
238            }
239
240            // Check if healed (after any phase transition).
241            // Guard against an empty `other_side`: the ratio
242            // computation would be `0 / 0 = NaN`, and `NaN >=
243            // threshold` is always false, so the partition could
244            // never auto-heal. The current control flow makes
245            // empty `other_side` unreachable inside this branch
246            // (the `contains(&node_id)` filter above eliminates
247            // empties before any phase enters `Healing`), but
248            // future refactors that mutate `other_side` after a
249            // `Healing` transition would silently expose the
250            // bug. Treat an empty `other_side` as already-healed
251            // — there is no remaining side to wait on.
252            if let PartitionPhase::Healing { reappeared } = &record.phase {
253                if record.other_side.is_empty() {
254                    record.phase = PartitionPhase::Healed;
255                } else {
256                    let ratio = reappeared.len() as f32 / record.other_side.len() as f32;
257                    if ratio >= self.healing_threshold {
258                        record.phase = PartitionPhase::Healed;
259                    }
260                }
261            }
262        }
263    }
264
265    /// Take all partitions that have healed (drains them from active list).
266    pub fn take_healed(&mut self) -> Vec<PartitionRecord> {
267        let mut healed = Vec::new();
268        self.active_partitions.retain(|r| {
269            if r.phase == PartitionPhase::Healed {
270                healed.push(r.clone());
271                false
272            } else {
273                true
274            }
275        });
276        healed
277    }
278
279    /// Number of active partitions.
280    pub fn active_count(&self) -> usize {
281        self.active_partitions.len()
282    }
283
284    /// Get an active partition by ID.
285    pub fn get(&self, partition_id: u64) -> Option<&PartitionRecord> {
286        self.active_partitions.iter().find(|r| r.id == partition_id)
287    }
288
289    fn find_mut(&mut self, partition_id: u64) -> Option<&mut PartitionRecord> {
290        self.active_partitions
291            .iter_mut()
292            .find(|r| r.id == partition_id)
293    }
294}
295
296impl Default for PartitionDetector {
297    fn default() -> Self {
298        Self::new()
299    }
300}
301
302impl std::fmt::Debug for PartitionDetector {
303    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
304        f.debug_struct("PartitionDetector")
305            .field("active_partitions", &self.active_partitions.len())
306            .field("healing_threshold", &self.healing_threshold)
307            .finish()
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314
315    fn make_verdict_subnet(failed: Vec<u64>, subnet: SubnetId) -> CorrelationVerdict {
316        CorrelationVerdict::MassFailure {
317            failed_nodes: failed,
318            failure_ratio: 0.5,
319            suspected_cause: FailureCause::SubnetFailure {
320                subnet,
321                affected_ratio: 1.0,
322            },
323        }
324    }
325
326    fn make_verdict_broad(failed: Vec<u64>) -> CorrelationVerdict {
327        CorrelationVerdict::MassFailure {
328            failed_nodes: failed,
329            failure_ratio: 0.5,
330            suspected_cause: FailureCause::BroadOutage,
331        }
332    }
333
334    #[test]
335    fn test_detect_partition() {
336        let mut det = PartitionDetector::new();
337        let horizon = ObservedHorizon::new();
338        let verdict = make_verdict_subnet(vec![1, 2, 3], SubnetId::new(&[2]));
339
340        let id = det.detect(&verdict, &[4, 5, 6], &horizon);
341        assert!(id.is_some());
342        assert_eq!(det.active_count(), 1);
343
344        let record = det.get(id.unwrap()).unwrap();
345        assert_eq!(record.other_side(), &[1, 2, 3]);
346        assert_eq!(record.our_side(), &[4, 5, 6]);
347        assert_eq!(record.phase(), &PartitionPhase::Suspected);
348    }
349
350    #[test]
351    fn test_no_partition_for_broad_outage() {
352        let mut det = PartitionDetector::new();
353        let horizon = ObservedHorizon::new();
354        let verdict = make_verdict_broad(vec![1, 2, 3]);
355
356        let id = det.detect(&verdict, &[4, 5, 6], &horizon);
357        assert!(id.is_none());
358        assert_eq!(det.active_count(), 0);
359    }
360
361    #[test]
362    fn test_no_partition_for_independent() {
363        let mut det = PartitionDetector::new();
364        let horizon = ObservedHorizon::new();
365        let verdict = CorrelationVerdict::Independent {
366            failed_nodes: vec![1],
367        };
368
369        let id = det.detect(&verdict, &[2, 3], &horizon);
370        assert!(id.is_none());
371    }
372
373    #[test]
374    fn test_confirm() {
375        let mut det = PartitionDetector::new();
376        let horizon = ObservedHorizon::new();
377        let verdict = make_verdict_subnet(vec![1, 2], SubnetId::new(&[2]));
378
379        let id = det.detect(&verdict, &[3, 4], &horizon).unwrap();
380        assert!(det.confirm(id));
381        assert_eq!(det.get(id).unwrap().phase(), &PartitionPhase::Confirmed);
382    }
383
384    #[test]
385    fn test_healing() {
386        let mut det = PartitionDetector::new().with_healing_threshold(0.50);
387        let horizon = ObservedHorizon::new();
388        let verdict = make_verdict_subnet(vec![1, 2, 3, 4], SubnetId::new(&[2]));
389
390        let id = det.detect(&verdict, &[5, 6], &horizon).unwrap();
391
392        // First recovery — not healed yet
393        det.on_node_recovery(1);
394        assert!(matches!(
395            det.get(id).unwrap().phase(),
396            PartitionPhase::Healing { .. }
397        ));
398
399        // Second recovery — 2/4 = 50% >= threshold
400        det.on_node_recovery(2);
401        assert_eq!(det.get(id).unwrap().phase(), &PartitionPhase::Healed);
402    }
403
404    #[test]
405    fn test_take_healed() {
406        let mut det = PartitionDetector::new().with_healing_threshold(0.50);
407        let horizon = ObservedHorizon::new();
408        let verdict = make_verdict_subnet(vec![1, 2], SubnetId::new(&[2]));
409
410        det.detect(&verdict, &[3, 4], &horizon);
411
412        det.on_node_recovery(1); // 1/2 = 50% >= threshold → healed
413
414        let healed = det.take_healed();
415        assert_eq!(healed.len(), 1);
416        assert_eq!(det.active_count(), 0);
417    }
418
419    #[test]
420    fn test_healing_progress() {
421        let mut det = PartitionDetector::new().with_healing_threshold(0.75);
422        let horizon = ObservedHorizon::new();
423        let verdict = make_verdict_subnet(vec![1, 2, 3, 4], SubnetId::new(&[2]));
424
425        let id = det.detect(&verdict, &[5], &horizon).unwrap();
426        assert_eq!(det.get(id).unwrap().healing_progress(), 0.0);
427
428        det.on_node_recovery(1);
429        assert_eq!(det.get(id).unwrap().healing_progress(), 0.25);
430
431        det.on_node_recovery(2);
432        assert_eq!(det.get(id).unwrap().healing_progress(), 0.50);
433    }
434
435    #[test]
436    fn test_duplicate_recovery_ignored() {
437        let mut det = PartitionDetector::new().with_healing_threshold(0.75);
438        let horizon = ObservedHorizon::new();
439        let verdict = make_verdict_subnet(vec![1, 2, 3, 4], SubnetId::new(&[2]));
440
441        let id = det.detect(&verdict, &[5], &horizon).unwrap();
442
443        det.on_node_recovery(1);
444        det.on_node_recovery(1); // duplicate
445        assert_eq!(det.get(id).unwrap().healing_progress(), 0.25); // still 1/4
446    }
447
448    /// PartitionRecord exposes a handful of pure getters that the
449    /// operator dashboards + healing logic read on every tick. The
450    /// existing tests cover `our_side`/`other_side`/`phase`; the
451    /// rest (id, partition_subnet, horizon_at_split, duration)
452    /// were not pinned. A getter that returns the wrong field —
453    /// say `partition_subnet()` accidentally returning `id` after
454    /// a refactor — would silently mis-render every partition
455    /// dashboard row without any test surfacing the swap.
456    #[test]
457    fn partition_record_getters_return_the_right_fields() {
458        let mut det = PartitionDetector::new();
459        let horizon = ObservedHorizon::new();
460        let subnet = SubnetId::new(&[2]);
461        let verdict = make_verdict_subnet(vec![1, 2, 3], subnet);
462
463        let id = det.detect(&verdict, &[4, 5, 6], &horizon).unwrap();
464        let record = det.get(id).unwrap();
465
466        assert_eq!(record.id(), id);
467        assert_eq!(record.partition_subnet(), Some(subnet));
468        // horizon_at_split was the empty ObservedHorizon we
469        // passed in — ObservedHorizon doesn't impl PartialEq, so
470        // we observe via entity_count() (0 for the empty horizon)
471        // as the simplest invariant.
472        assert_eq!(record.horizon_at_split().entity_count(), 0);
473        // duration() reflects time since detection. We only
474        // need a sanity ceiling — the contract is "returns a
475        // forward-going Duration relative to detect()," not a
476        // specific tight bound. A 60s ceiling tolerates loaded
477        // CI boxes while still catching a regression that
478        // returns `Duration::MAX` or an unrelated wall-clock
479        // value.
480        let d = record.duration();
481        assert!(
482            d < std::time::Duration::from_secs(60),
483            "duration sanity ceiling exceeded: {d:?}",
484        );
485    }
486
487    /// Phase-transition guard: confirm() returning false when the
488    /// partition is already past Suspected (e.g. confirming
489    /// twice). Pre-confirm the partition once, then re-confirm —
490    /// the second call must return false and leave the phase
491    /// untouched. A regression that unconditionally returns true
492    /// would let callers "re-confirm" healed partitions, breaking
493    /// the state-machine invariants downstream code relies on.
494    #[test]
495    fn confirm_returns_false_on_already_confirmed_partition() {
496        let mut det = PartitionDetector::new();
497        let horizon = ObservedHorizon::new();
498        let verdict = make_verdict_subnet(vec![1, 2], SubnetId::new(&[2]));
499        let id = det.detect(&verdict, &[3, 4], &horizon).unwrap();
500
501        assert!(det.confirm(id), "first confirm must succeed");
502        assert_eq!(det.get(id).unwrap().phase(), &PartitionPhase::Confirmed);
503
504        // Second confirm: already past Suspected → false, no
505        // phase change.
506        assert!(!det.confirm(id), "second confirm must reject");
507        assert_eq!(det.get(id).unwrap().phase(), &PartitionPhase::Confirmed);
508
509        // Confirming a non-existent partition: also false.
510        assert!(!det.confirm(u64::MAX));
511    }
512
513    #[test]
514    fn partition_detector_default_matches_new() {
515        let a: PartitionDetector = Default::default();
516        let b = PartitionDetector::new();
517        assert_eq!(a.active_count(), b.active_count());
518    }
519
520    #[test]
521    fn partition_detector_debug_includes_counts() {
522        let det = PartitionDetector::new().with_healing_threshold(0.5);
523        let s = format!("{:?}", det);
524        assert!(s.contains("PartitionDetector"));
525        assert!(s.contains("active_partitions: 0"));
526        assert!(s.contains("healing_threshold"));
527    }
528}