Skip to main content

smg_mesh/
partition.rs

1//! Partition detection and handling
2//!
3//! Detects network partitions and handles state isolation and recovery
4
5use std::{
6    collections::{BTreeMap, HashSet},
7    sync::Arc,
8    time::{Duration, Instant},
9};
10
11use parking_lot::RwLock;
12use tracing::warn;
13
14use super::service::gossip::{NodeState, NodeStatus};
15
16/// Partition detection configuration
17#[derive(Debug, Clone)]
18pub struct PartitionConfig {
19    /// Timeout for considering a node unreachable (seconds)
20    pub unreachable_timeout: Duration,
21    /// Minimum cluster size to consider a partition
22    pub min_cluster_size: usize,
23    /// Quorum threshold (minimum nodes needed for quorum)
24    pub quorum_threshold: usize,
25}
26
27impl Default for PartitionConfig {
28    fn default() -> Self {
29        Self {
30            unreachable_timeout: Duration::from_secs(30),
31            min_cluster_size: 3,
32            quorum_threshold: 2,
33        }
34    }
35}
36
37/// Partition state
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub enum PartitionState {
40    /// Normal operation, no partition detected
41    Normal,
42    /// Partition detected, but we have quorum
43    PartitionedWithQuorum,
44    /// Partition detected, we don't have quorum
45    PartitionedWithoutQuorum,
46}
47
48/// Partition detector
49#[derive(Debug)]
50pub struct PartitionDetector {
51    config: PartitionConfig,
52    last_seen: Arc<RwLock<BTreeMap<String, Instant>>>,
53    current_state: Arc<RwLock<PartitionState>>,
54}
55
56impl PartitionDetector {
57    pub fn new(config: PartitionConfig) -> Self {
58        Self {
59            config,
60            last_seen: Arc::new(RwLock::new(BTreeMap::new())),
61            current_state: Arc::new(RwLock::new(PartitionState::Normal)),
62        }
63    }
64
65    /// Update last seen time for a node
66    pub fn update_last_seen(&self, node_name: &str) {
67        let mut last_seen = self.last_seen.write();
68        last_seen.insert(node_name.to_string(), Instant::now());
69    }
70
71    /// Detect partition based on current cluster state
72    pub fn detect_partition(&self, cluster_state: &BTreeMap<String, NodeState>) -> PartitionState {
73        let now = Instant::now();
74        let last_seen = self.last_seen.read();
75
76        // Count alive nodes and unreachable nodes
77        let mut alive_count = 0;
78        let mut unreachable_count = 0;
79        let mut reachable_nodes = HashSet::new();
80
81        for (name, node) in cluster_state {
82            if node.status == NodeStatus::Alive as i32 {
83                alive_count += 1;
84
85                // Check if we've seen this node recently
86                if let Some(last_seen_time) = last_seen.get(name) {
87                    if now.duration_since(*last_seen_time) < self.config.unreachable_timeout {
88                        reachable_nodes.insert(name.clone());
89                    } else {
90                        unreachable_count += 1;
91                        warn!(
92                            "Node {} unreachable for {:?}",
93                            name,
94                            now.duration_since(*last_seen_time)
95                        );
96                    }
97                } else {
98                    // New node, consider it reachable for now
99                    reachable_nodes.insert(name.clone());
100                }
101            }
102        }
103
104        let reachable_count = reachable_nodes.len();
105
106        // Determine partition state
107        let state = if unreachable_count == 0 {
108            PartitionState::Normal
109        } else if reachable_count >= self.config.quorum_threshold {
110            PartitionState::PartitionedWithQuorum
111        } else {
112            PartitionState::PartitionedWithoutQuorum
113        };
114
115        // Update current state
116        *self.current_state.write() = state.clone();
117
118        if state != PartitionState::Normal {
119            warn!(
120                "Partition detected: state={:?}, reachable={}, unreachable={}, total_alive={}",
121                state, reachable_count, unreachable_count, alive_count
122            );
123        }
124
125        state
126    }
127
128    /// Get current partition state
129    pub fn current_state(&self) -> PartitionState {
130        self.current_state.read().clone()
131    }
132
133    /// Check if we have quorum
134    pub fn has_quorum(&self, reachable_count: usize) -> bool {
135        reachable_count >= self.config.quorum_threshold
136    }
137
138    /// Get unreachable nodes
139    pub fn get_unreachable_nodes(
140        &self,
141        cluster_state: &BTreeMap<String, NodeState>,
142    ) -> Vec<String> {
143        let now = Instant::now();
144        let last_seen = self.last_seen.read();
145        let mut unreachable = Vec::new();
146
147        for (name, node) in cluster_state {
148            if node.status == NodeStatus::Alive as i32 {
149                if let Some(last_seen_time) = last_seen.get(name) {
150                    if now.duration_since(*last_seen_time) >= self.config.unreachable_timeout {
151                        unreachable.push(name.clone());
152                    }
153                }
154            }
155        }
156
157        unreachable
158    }
159
160    /// Check if we should continue serving (have quorum)
161    pub fn should_serve(&self) -> bool {
162        let state = self.current_state.read();
163        matches!(
164            *state,
165            PartitionState::Normal | PartitionState::PartitionedWithQuorum
166        )
167    }
168}
169
170impl Default for PartitionDetector {
171    fn default() -> Self {
172        Self::new(PartitionConfig::default())
173    }
174}
175
176#[cfg(test)]
177mod tests {
178    use std::{collections::BTreeMap, time::Duration};
179
180    use super::*;
181    // Import NodeState and NodeStatus from gossip module
182    use crate::service::gossip::{NodeState, NodeStatus};
183
184    fn create_test_config() -> PartitionConfig {
185        PartitionConfig {
186            unreachable_timeout: Duration::from_millis(100),
187            min_cluster_size: 3,
188            quorum_threshold: 2,
189        }
190    }
191
192    fn create_node_state(name: &str, address: &str, status: NodeStatus) -> NodeState {
193        NodeState {
194            name: name.to_string(),
195            address: address.to_string(),
196            status: status as i32,
197            version: 1,
198            metadata: std::collections::HashMap::new(),
199        }
200    }
201
202    #[test]
203    fn test_partition_config_default() {
204        let config = PartitionConfig::default();
205        assert_eq!(config.unreachable_timeout, Duration::from_secs(30));
206        assert_eq!(config.min_cluster_size, 3);
207        assert_eq!(config.quorum_threshold, 2);
208    }
209
210    #[test]
211    fn test_partition_detector_initial_state() {
212        let config = create_test_config();
213        let detector = PartitionDetector::new(config);
214
215        assert_eq!(detector.current_state(), PartitionState::Normal);
216        assert!(detector.should_serve());
217    }
218
219    #[test]
220    fn test_update_last_seen() {
221        let config = create_test_config();
222        let detector = PartitionDetector::new(config);
223
224        detector.update_last_seen("node1");
225        detector.update_last_seen("node2");
226
227        // Verify nodes are tracked
228        let cluster_state = BTreeMap::new();
229        let state = detector.detect_partition(&cluster_state);
230        assert_eq!(state, PartitionState::Normal);
231    }
232
233    #[test]
234    fn test_detect_partition_normal() {
235        let config = create_test_config();
236        let detector = PartitionDetector::new(config);
237
238        let mut cluster_state = BTreeMap::new();
239        cluster_state.insert(
240            "node1".to_string(),
241            create_node_state("node1", "127.0.0.1:8080", NodeStatus::Alive),
242        );
243        cluster_state.insert(
244            "node2".to_string(),
245            create_node_state("node2", "127.0.0.1:8081", NodeStatus::Alive),
246        );
247        cluster_state.insert(
248            "node3".to_string(),
249            create_node_state("node3", "127.0.0.1:8082", NodeStatus::Alive),
250        );
251
252        // Update last seen for all nodes
253        detector.update_last_seen("node1");
254        detector.update_last_seen("node2");
255        detector.update_last_seen("node3");
256
257        let state = detector.detect_partition(&cluster_state);
258        assert_eq!(state, PartitionState::Normal);
259        assert!(detector.should_serve());
260    }
261
262    #[test]
263    fn test_detect_partition_with_quorum() {
264        let config = create_test_config();
265        let detector = PartitionDetector::new(config);
266
267        let mut cluster_state = BTreeMap::new();
268        cluster_state.insert(
269            "node1".to_string(),
270            create_node_state("node1", "127.0.0.1:8080", NodeStatus::Alive),
271        );
272        cluster_state.insert(
273            "node2".to_string(),
274            create_node_state("node2", "127.0.0.1:8081", NodeStatus::Alive),
275        );
276        cluster_state.insert(
277            "node3".to_string(),
278            create_node_state("node3", "127.0.0.1:8082", NodeStatus::Alive),
279        );
280
281        // Update last seen for node1 and node2 (quorum)
282        detector.update_last_seen("node1");
283        detector.update_last_seen("node2");
284
285        // Don't update node3, but wait for it to be considered unreachable
286        // Since node3 is new, it's initially considered reachable
287        // We need to update it first, then wait for timeout
288        detector.update_last_seen("node3");
289        std::thread::sleep(Duration::from_millis(150));
290
291        // Update node1 and node2 again to keep them reachable
292        detector.update_last_seen("node1");
293        detector.update_last_seen("node2");
294
295        let state = detector.detect_partition(&cluster_state);
296        // node1 and node2 are still reachable (quorum of 2), node3 is unreachable
297        assert_eq!(state, PartitionState::PartitionedWithQuorum);
298        assert!(detector.should_serve());
299    }
300
301    #[test]
302    fn test_detect_partition_without_quorum() {
303        let mut config = create_test_config();
304        config.quorum_threshold = 2;
305        let detector = PartitionDetector::new(config);
306
307        let mut cluster_state = BTreeMap::new();
308        cluster_state.insert(
309            "node1".to_string(),
310            create_node_state("node1", "127.0.0.1:8080", NodeStatus::Alive),
311        );
312        cluster_state.insert(
313            "node2".to_string(),
314            create_node_state("node2", "127.0.0.1:8081", NodeStatus::Alive),
315        );
316        cluster_state.insert(
317            "node3".to_string(),
318            create_node_state("node3", "127.0.0.1:8082", NodeStatus::Alive),
319        );
320
321        // Update last seen for all nodes first
322        detector.update_last_seen("node1");
323        detector.update_last_seen("node2");
324        detector.update_last_seen("node3");
325
326        // Wait for node2 and node3 to become unreachable
327        std::thread::sleep(Duration::from_millis(150));
328
329        // Only update node1 again to keep it reachable
330        detector.update_last_seen("node1");
331
332        let state = detector.detect_partition(&cluster_state);
333        // Only node1 is reachable (below quorum of 2)
334        assert_eq!(state, PartitionState::PartitionedWithoutQuorum);
335        assert!(!detector.should_serve());
336    }
337
338    #[test]
339    fn test_has_quorum() {
340        let config = create_test_config();
341        let detector = PartitionDetector::new(config);
342
343        assert!(detector.has_quorum(2));
344        assert!(detector.has_quorum(3));
345        assert!(!detector.has_quorum(1));
346        assert!(!detector.has_quorum(0));
347    }
348
349    #[test]
350    fn test_get_unreachable_nodes() {
351        let config = create_test_config();
352        let detector = PartitionDetector::new(config);
353
354        let mut cluster_state = BTreeMap::new();
355        cluster_state.insert(
356            "node1".to_string(),
357            create_node_state("node1", "127.0.0.1:8080", NodeStatus::Alive),
358        );
359        cluster_state.insert(
360            "node2".to_string(),
361            create_node_state("node2", "127.0.0.1:8081", NodeStatus::Alive),
362        );
363        cluster_state.insert(
364            "node3".to_string(),
365            create_node_state("node3", "127.0.0.1:8082", NodeStatus::Alive),
366        );
367
368        // Update last seen for all nodes
369        detector.update_last_seen("node1");
370        detector.update_last_seen("node2");
371        detector.update_last_seen("node3");
372
373        // Initially no unreachable nodes
374        let unreachable = detector.get_unreachable_nodes(&cluster_state);
375        assert!(unreachable.is_empty());
376
377        // Wait for timeout
378        std::thread::sleep(Duration::from_millis(150));
379
380        // All nodes should be unreachable now
381        let unreachable = detector.get_unreachable_nodes(&cluster_state);
382        assert_eq!(unreachable.len(), 3);
383        assert!(unreachable.contains(&"node1".to_string()));
384        assert!(unreachable.contains(&"node2".to_string()));
385        assert!(unreachable.contains(&"node3".to_string()));
386    }
387
388    #[test]
389    fn test_get_unreachable_nodes_with_recent_updates() {
390        let config = create_test_config();
391        let detector = PartitionDetector::new(config);
392
393        let mut cluster_state = BTreeMap::new();
394        cluster_state.insert(
395            "node1".to_string(),
396            create_node_state("node1", "127.0.0.1:8080", NodeStatus::Alive),
397        );
398        cluster_state.insert(
399            "node2".to_string(),
400            create_node_state("node2", "127.0.0.1:8081", NodeStatus::Alive),
401        );
402
403        // Update node1 first (old)
404        detector.update_last_seen("node1");
405        std::thread::sleep(Duration::from_millis(50));
406
407        // Update node2 later (more recent)
408        detector.update_last_seen("node2");
409
410        // Wait for node1 to timeout but node2 should still be reachable
411        std::thread::sleep(Duration::from_millis(60));
412
413        let unreachable = detector.get_unreachable_nodes(&cluster_state);
414        // node1 should be unreachable (updated 110ms ago), node2 should still be reachable (updated 60ms ago)
415        assert!(unreachable.contains(&"node1".to_string()));
416        assert!(!unreachable.contains(&"node2".to_string()));
417    }
418
419    #[test]
420    fn test_detect_partition_ignores_non_alive_nodes() {
421        let config = create_test_config();
422        let detector = PartitionDetector::new(config);
423
424        let mut cluster_state = BTreeMap::new();
425        cluster_state.insert(
426            "node1".to_string(),
427            create_node_state("node1", "127.0.0.1:8080", NodeStatus::Alive),
428        );
429        cluster_state.insert(
430            "node2".to_string(),
431            create_node_state("node2", "127.0.0.1:8081", NodeStatus::Down),
432        );
433        cluster_state.insert(
434            "node3".to_string(),
435            create_node_state("node3", "127.0.0.1:8082", NodeStatus::Suspected),
436        );
437
438        detector.update_last_seen("node1");
439
440        let state = detector.detect_partition(&cluster_state);
441        // Only node1 is alive and reachable
442        // Since node2 and node3 are not alive, they don't count as unreachable
443        // If all alive nodes are reachable (unreachable_count == 0), state is Normal
444        assert_eq!(state, PartitionState::Normal);
445    }
446
447    #[test]
448    fn test_new_node_considered_reachable() {
449        let config = create_test_config();
450        let detector = PartitionDetector::new(config);
451
452        let mut cluster_state = BTreeMap::new();
453        cluster_state.insert(
454            "node1".to_string(),
455            create_node_state("node1", "127.0.0.1:8080", NodeStatus::Alive),
456        );
457        cluster_state.insert(
458            "new_node".to_string(),
459            create_node_state("new_node", "127.0.0.1:8083", NodeStatus::Alive),
460        );
461
462        // Don't update last_seen for new_node, it should be considered reachable
463        detector.update_last_seen("node1");
464
465        let state = detector.detect_partition(&cluster_state);
466        // Both nodes should be considered reachable (node1 explicitly, new_node by default)
467        assert_eq!(state, PartitionState::Normal);
468    }
469
470    #[test]
471    fn test_should_serve() {
472        let config = create_test_config();
473        let detector = PartitionDetector::new(config);
474
475        // Normal state should serve
476        *detector.current_state.write() = PartitionState::Normal;
477        assert!(detector.should_serve());
478
479        // Partitioned with quorum should serve
480        *detector.current_state.write() = PartitionState::PartitionedWithQuorum;
481        assert!(detector.should_serve());
482
483        // Partitioned without quorum should not serve
484        *detector.current_state.write() = PartitionState::PartitionedWithoutQuorum;
485        assert!(!detector.should_serve());
486    }
487
488    #[test]
489    fn test_default_implementation() {
490        let detector = PartitionDetector::default();
491        assert_eq!(detector.current_state(), PartitionState::Normal);
492        assert!(detector.should_serve());
493    }
494
495    #[test]
496    fn test_partition_state_equality() {
497        assert_eq!(PartitionState::Normal, PartitionState::Normal);
498        assert_ne!(
499            PartitionState::Normal,
500            PartitionState::PartitionedWithQuorum
501        );
502        assert_ne!(
503            PartitionState::PartitionedWithQuorum,
504            PartitionState::PartitionedWithoutQuorum
505        );
506    }
507}