net/adapter/net/contested/
partition.rs1use std::time::Instant;
8
9use super::correlation::{CorrelationVerdict, FailureCause};
10use crate::adapter::net::state::horizon::ObservedHorizon;
11use crate::adapter::net::subnet::SubnetId;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum PartitionPhase {
16 Suspected,
18 Confirmed,
20 Healing {
22 reappeared: Vec<u64>,
24 },
25 Healed,
27}
28
29#[derive(Debug, Clone)]
31pub struct PartitionRecord {
32 id: u64,
34 detected_at: Instant,
36 our_side: Vec<u64>,
38 other_side: Vec<u64>,
40 partition_subnet: Option<SubnetId>,
42 phase: PartitionPhase,
44 our_horizon_at_split: ObservedHorizon,
46}
47
48impl PartitionRecord {
49 #[inline]
51 pub fn id(&self) -> u64 {
52 self.id
53 }
54
55 pub fn our_side(&self) -> &[u64] {
57 &self.our_side
58 }
59
60 pub fn other_side(&self) -> &[u64] {
62 &self.other_side
63 }
64
65 pub fn partition_subnet(&self) -> Option<SubnetId> {
67 self.partition_subnet
68 }
69
70 pub fn phase(&self) -> &PartitionPhase {
72 &self.phase
73 }
74
75 pub fn horizon_at_split(&self) -> &ObservedHorizon {
77 &self.our_horizon_at_split
78 }
79
80 pub fn duration(&self) -> std::time::Duration {
82 self.detected_at.elapsed()
83 }
84
85 pub fn healing_progress(&self) -> f32 {
87 if self.other_side.is_empty() {
88 return 1.0;
89 }
90 match &self.phase {
91 PartitionPhase::Healing { reappeared } => {
92 reappeared.len() as f32 / self.other_side.len() as f32
93 }
94 PartitionPhase::Healed => 1.0,
95 _ => 0.0,
96 }
97 }
98}
99
100pub struct PartitionDetector {
104 active_partitions: Vec<PartitionRecord>,
106 healing_threshold: f32,
109 next_id: u64,
111}
112
113impl PartitionDetector {
114 pub fn new() -> Self {
116 Self {
117 active_partitions: Vec::new(),
118 healing_threshold: 0.50,
119 next_id: 1,
120 }
121 }
122
123 pub fn with_healing_threshold(mut self, threshold: f32) -> Self {
125 self.healing_threshold = threshold;
126 self
127 }
128
129 pub fn detect(
134 &mut self,
135 verdict: &CorrelationVerdict,
136 healthy_nodes: &[u64],
137 current_horizon: &ObservedHorizon,
138 ) -> Option<u64> {
139 let (failed_nodes, cause) = match verdict {
140 CorrelationVerdict::MassFailure {
141 failed_nodes,
142 suspected_cause,
143 ..
144 } => (failed_nodes, suspected_cause),
145 _ => return None,
146 };
147
148 let partition_subnet = match cause {
149 FailureCause::SubnetFailure { subnet, .. } => Some(*subnet),
150 _ => return None, };
152
153 let id = self.next_id;
161 self.next_id = self.next_id.checked_add(1).unwrap_or_else(|| {
162 tracing::error!(
169 "partition next_id reached u64::MAX; saturating to avoid \
170 wrap-to-0 collisions with active records"
171 );
172 u64::MAX
173 });
174
175 let record = PartitionRecord {
176 id,
177 detected_at: Instant::now(),
178 our_side: healthy_nodes.to_vec(),
179 other_side: failed_nodes.clone(),
180 partition_subnet,
181 phase: PartitionPhase::Suspected,
182 our_horizon_at_split: current_horizon.clone(),
183 };
184
185 self.active_partitions.push(record);
186 Some(id)
187 }
188
189 pub fn confirm(&mut self, partition_id: u64) -> bool {
191 if let Some(record) = self.find_mut(partition_id) {
192 if record.phase == PartitionPhase::Suspected {
193 record.phase = PartitionPhase::Confirmed;
194 return true;
195 }
196 }
197 false
198 }
199
200 pub fn on_node_recovery(&mut self, node_id: u64) {
221 for record in &mut self.active_partitions {
222 if !record.other_side.contains(&node_id) {
223 continue;
224 }
225
226 match &mut record.phase {
227 PartitionPhase::Suspected | PartitionPhase::Confirmed => {
228 record.phase = PartitionPhase::Healing {
229 reappeared: vec![node_id],
230 };
231 }
232 PartitionPhase::Healing { reappeared } => {
233 if !reappeared.contains(&node_id) {
234 reappeared.push(node_id);
235 }
236 }
237 PartitionPhase::Healed => {}
238 }
239
240 if let PartitionPhase::Healing { reappeared } = &record.phase {
253 if record.other_side.is_empty() {
254 record.phase = PartitionPhase::Healed;
255 } else {
256 let ratio = reappeared.len() as f32 / record.other_side.len() as f32;
257 if ratio >= self.healing_threshold {
258 record.phase = PartitionPhase::Healed;
259 }
260 }
261 }
262 }
263 }
264
265 pub fn take_healed(&mut self) -> Vec<PartitionRecord> {
267 let mut healed = Vec::new();
268 self.active_partitions.retain(|r| {
269 if r.phase == PartitionPhase::Healed {
270 healed.push(r.clone());
271 false
272 } else {
273 true
274 }
275 });
276 healed
277 }
278
279 pub fn active_count(&self) -> usize {
281 self.active_partitions.len()
282 }
283
284 pub fn get(&self, partition_id: u64) -> Option<&PartitionRecord> {
286 self.active_partitions.iter().find(|r| r.id == partition_id)
287 }
288
289 fn find_mut(&mut self, partition_id: u64) -> Option<&mut PartitionRecord> {
290 self.active_partitions
291 .iter_mut()
292 .find(|r| r.id == partition_id)
293 }
294}
295
296impl Default for PartitionDetector {
297 fn default() -> Self {
298 Self::new()
299 }
300}
301
302impl std::fmt::Debug for PartitionDetector {
303 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
304 f.debug_struct("PartitionDetector")
305 .field("active_partitions", &self.active_partitions.len())
306 .field("healing_threshold", &self.healing_threshold)
307 .finish()
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314
315 fn make_verdict_subnet(failed: Vec<u64>, subnet: SubnetId) -> CorrelationVerdict {
316 CorrelationVerdict::MassFailure {
317 failed_nodes: failed,
318 failure_ratio: 0.5,
319 suspected_cause: FailureCause::SubnetFailure {
320 subnet,
321 affected_ratio: 1.0,
322 },
323 }
324 }
325
326 fn make_verdict_broad(failed: Vec<u64>) -> CorrelationVerdict {
327 CorrelationVerdict::MassFailure {
328 failed_nodes: failed,
329 failure_ratio: 0.5,
330 suspected_cause: FailureCause::BroadOutage,
331 }
332 }
333
334 #[test]
335 fn test_detect_partition() {
336 let mut det = PartitionDetector::new();
337 let horizon = ObservedHorizon::new();
338 let verdict = make_verdict_subnet(vec![1, 2, 3], SubnetId::new(&[2]));
339
340 let id = det.detect(&verdict, &[4, 5, 6], &horizon);
341 assert!(id.is_some());
342 assert_eq!(det.active_count(), 1);
343
344 let record = det.get(id.unwrap()).unwrap();
345 assert_eq!(record.other_side(), &[1, 2, 3]);
346 assert_eq!(record.our_side(), &[4, 5, 6]);
347 assert_eq!(record.phase(), &PartitionPhase::Suspected);
348 }
349
350 #[test]
351 fn test_no_partition_for_broad_outage() {
352 let mut det = PartitionDetector::new();
353 let horizon = ObservedHorizon::new();
354 let verdict = make_verdict_broad(vec![1, 2, 3]);
355
356 let id = det.detect(&verdict, &[4, 5, 6], &horizon);
357 assert!(id.is_none());
358 assert_eq!(det.active_count(), 0);
359 }
360
361 #[test]
362 fn test_no_partition_for_independent() {
363 let mut det = PartitionDetector::new();
364 let horizon = ObservedHorizon::new();
365 let verdict = CorrelationVerdict::Independent {
366 failed_nodes: vec![1],
367 };
368
369 let id = det.detect(&verdict, &[2, 3], &horizon);
370 assert!(id.is_none());
371 }
372
373 #[test]
374 fn test_confirm() {
375 let mut det = PartitionDetector::new();
376 let horizon = ObservedHorizon::new();
377 let verdict = make_verdict_subnet(vec![1, 2], SubnetId::new(&[2]));
378
379 let id = det.detect(&verdict, &[3, 4], &horizon).unwrap();
380 assert!(det.confirm(id));
381 assert_eq!(det.get(id).unwrap().phase(), &PartitionPhase::Confirmed);
382 }
383
384 #[test]
385 fn test_healing() {
386 let mut det = PartitionDetector::new().with_healing_threshold(0.50);
387 let horizon = ObservedHorizon::new();
388 let verdict = make_verdict_subnet(vec![1, 2, 3, 4], SubnetId::new(&[2]));
389
390 let id = det.detect(&verdict, &[5, 6], &horizon).unwrap();
391
392 det.on_node_recovery(1);
394 assert!(matches!(
395 det.get(id).unwrap().phase(),
396 PartitionPhase::Healing { .. }
397 ));
398
399 det.on_node_recovery(2);
401 assert_eq!(det.get(id).unwrap().phase(), &PartitionPhase::Healed);
402 }
403
404 #[test]
405 fn test_take_healed() {
406 let mut det = PartitionDetector::new().with_healing_threshold(0.50);
407 let horizon = ObservedHorizon::new();
408 let verdict = make_verdict_subnet(vec![1, 2], SubnetId::new(&[2]));
409
410 det.detect(&verdict, &[3, 4], &horizon);
411
412 det.on_node_recovery(1); let healed = det.take_healed();
415 assert_eq!(healed.len(), 1);
416 assert_eq!(det.active_count(), 0);
417 }
418
419 #[test]
420 fn test_healing_progress() {
421 let mut det = PartitionDetector::new().with_healing_threshold(0.75);
422 let horizon = ObservedHorizon::new();
423 let verdict = make_verdict_subnet(vec![1, 2, 3, 4], SubnetId::new(&[2]));
424
425 let id = det.detect(&verdict, &[5], &horizon).unwrap();
426 assert_eq!(det.get(id).unwrap().healing_progress(), 0.0);
427
428 det.on_node_recovery(1);
429 assert_eq!(det.get(id).unwrap().healing_progress(), 0.25);
430
431 det.on_node_recovery(2);
432 assert_eq!(det.get(id).unwrap().healing_progress(), 0.50);
433 }
434
435 #[test]
436 fn test_duplicate_recovery_ignored() {
437 let mut det = PartitionDetector::new().with_healing_threshold(0.75);
438 let horizon = ObservedHorizon::new();
439 let verdict = make_verdict_subnet(vec![1, 2, 3, 4], SubnetId::new(&[2]));
440
441 let id = det.detect(&verdict, &[5], &horizon).unwrap();
442
443 det.on_node_recovery(1);
444 det.on_node_recovery(1); assert_eq!(det.get(id).unwrap().healing_progress(), 0.25); }
447
448 #[test]
457 fn partition_record_getters_return_the_right_fields() {
458 let mut det = PartitionDetector::new();
459 let horizon = ObservedHorizon::new();
460 let subnet = SubnetId::new(&[2]);
461 let verdict = make_verdict_subnet(vec![1, 2, 3], subnet);
462
463 let id = det.detect(&verdict, &[4, 5, 6], &horizon).unwrap();
464 let record = det.get(id).unwrap();
465
466 assert_eq!(record.id(), id);
467 assert_eq!(record.partition_subnet(), Some(subnet));
468 assert_eq!(record.horizon_at_split().entity_count(), 0);
473 let d = record.duration();
481 assert!(
482 d < std::time::Duration::from_secs(60),
483 "duration sanity ceiling exceeded: {d:?}",
484 );
485 }
486
487 #[test]
495 fn confirm_returns_false_on_already_confirmed_partition() {
496 let mut det = PartitionDetector::new();
497 let horizon = ObservedHorizon::new();
498 let verdict = make_verdict_subnet(vec![1, 2], SubnetId::new(&[2]));
499 let id = det.detect(&verdict, &[3, 4], &horizon).unwrap();
500
501 assert!(det.confirm(id), "first confirm must succeed");
502 assert_eq!(det.get(id).unwrap().phase(), &PartitionPhase::Confirmed);
503
504 assert!(!det.confirm(id), "second confirm must reject");
507 assert_eq!(det.get(id).unwrap().phase(), &PartitionPhase::Confirmed);
508
509 assert!(!det.confirm(u64::MAX));
511 }
512
513 #[test]
514 fn partition_detector_default_matches_new() {
515 let a: PartitionDetector = Default::default();
516 let b = PartitionDetector::new();
517 assert_eq!(a.active_count(), b.active_count());
518 }
519
520 #[test]
521 fn partition_detector_debug_includes_counts() {
522 let det = PartitionDetector::new().with_healing_threshold(0.5);
523 let s = format!("{:?}", det);
524 assert!(s.contains("PartitionDetector"));
525 assert!(s.contains("active_partitions: 0"));
526 assert!(s.contains("healing_threshold"));
527 }
528}