1use std::{
6 collections::{BTreeMap, HashSet},
7 sync::Arc,
8 time::{Duration, Instant},
9};
10
11use parking_lot::RwLock;
12use tracing::warn;
13
14use super::service::gossip::{NodeState, NodeStatus};
15
16#[derive(Debug, Clone)]
18pub struct PartitionConfig {
19 pub unreachable_timeout: Duration,
21 pub min_cluster_size: usize,
23 pub quorum_threshold: usize,
25}
26
27impl Default for PartitionConfig {
28 fn default() -> Self {
29 Self {
30 unreachable_timeout: Duration::from_secs(30),
31 min_cluster_size: 3,
32 quorum_threshold: 2,
33 }
34 }
35}
36
37#[derive(Debug, Clone, PartialEq, Eq)]
39pub enum PartitionState {
40 Normal,
42 PartitionedWithQuorum,
44 PartitionedWithoutQuorum,
46}
47
48#[derive(Debug)]
50pub struct PartitionDetector {
51 config: PartitionConfig,
52 last_seen: Arc<RwLock<BTreeMap<String, Instant>>>,
53 current_state: Arc<RwLock<PartitionState>>,
54}
55
56impl PartitionDetector {
57 pub fn new(config: PartitionConfig) -> Self {
58 Self {
59 config,
60 last_seen: Arc::new(RwLock::new(BTreeMap::new())),
61 current_state: Arc::new(RwLock::new(PartitionState::Normal)),
62 }
63 }
64
65 pub fn update_last_seen(&self, node_name: &str) {
67 let mut last_seen = self.last_seen.write();
68 last_seen.insert(node_name.to_string(), Instant::now());
69 }
70
71 pub fn detect_partition(&self, cluster_state: &BTreeMap<String, NodeState>) -> PartitionState {
73 let now = Instant::now();
74 let last_seen = self.last_seen.read();
75
76 let mut alive_count = 0;
78 let mut unreachable_count = 0;
79 let mut reachable_nodes = HashSet::new();
80
81 for (name, node) in cluster_state {
82 if node.status == NodeStatus::Alive as i32 {
83 alive_count += 1;
84
85 if let Some(last_seen_time) = last_seen.get(name) {
87 if now.duration_since(*last_seen_time) < self.config.unreachable_timeout {
88 reachable_nodes.insert(name.clone());
89 } else {
90 unreachable_count += 1;
91 warn!(
92 "Node {} unreachable for {:?}",
93 name,
94 now.duration_since(*last_seen_time)
95 );
96 }
97 } else {
98 reachable_nodes.insert(name.clone());
100 }
101 }
102 }
103
104 let reachable_count = reachable_nodes.len();
105
106 let state = if unreachable_count == 0 {
108 PartitionState::Normal
109 } else if reachable_count >= self.config.quorum_threshold {
110 PartitionState::PartitionedWithQuorum
111 } else {
112 PartitionState::PartitionedWithoutQuorum
113 };
114
115 *self.current_state.write() = state.clone();
117
118 if state != PartitionState::Normal {
119 warn!(
120 "Partition detected: state={:?}, reachable={}, unreachable={}, total_alive={}",
121 state, reachable_count, unreachable_count, alive_count
122 );
123 }
124
125 state
126 }
127
128 pub fn current_state(&self) -> PartitionState {
130 self.current_state.read().clone()
131 }
132
133 pub fn has_quorum(&self, reachable_count: usize) -> bool {
135 reachable_count >= self.config.quorum_threshold
136 }
137
138 pub fn get_unreachable_nodes(
140 &self,
141 cluster_state: &BTreeMap<String, NodeState>,
142 ) -> Vec<String> {
143 let now = Instant::now();
144 let last_seen = self.last_seen.read();
145 let mut unreachable = Vec::new();
146
147 for (name, node) in cluster_state {
148 if node.status == NodeStatus::Alive as i32 {
149 if let Some(last_seen_time) = last_seen.get(name) {
150 if now.duration_since(*last_seen_time) >= self.config.unreachable_timeout {
151 unreachable.push(name.clone());
152 }
153 }
154 }
155 }
156
157 unreachable
158 }
159
160 pub fn should_serve(&self) -> bool {
162 let state = self.current_state.read();
163 matches!(
164 *state,
165 PartitionState::Normal | PartitionState::PartitionedWithQuorum
166 )
167 }
168}
169
170impl Default for PartitionDetector {
171 fn default() -> Self {
172 Self::new(PartitionConfig::default())
173 }
174}
175
176#[cfg(test)]
177mod tests {
178 use std::{collections::BTreeMap, time::Duration};
179
180 use super::*;
181 use crate::service::gossip::{NodeState, NodeStatus};
183
184 fn create_test_config() -> PartitionConfig {
185 PartitionConfig {
186 unreachable_timeout: Duration::from_millis(100),
187 min_cluster_size: 3,
188 quorum_threshold: 2,
189 }
190 }
191
192 fn create_node_state(name: &str, address: &str, status: NodeStatus) -> NodeState {
193 NodeState {
194 name: name.to_string(),
195 address: address.to_string(),
196 status: status as i32,
197 version: 1,
198 metadata: std::collections::HashMap::new(),
199 }
200 }
201
202 #[test]
203 fn test_partition_config_default() {
204 let config = PartitionConfig::default();
205 assert_eq!(config.unreachable_timeout, Duration::from_secs(30));
206 assert_eq!(config.min_cluster_size, 3);
207 assert_eq!(config.quorum_threshold, 2);
208 }
209
210 #[test]
211 fn test_partition_detector_initial_state() {
212 let config = create_test_config();
213 let detector = PartitionDetector::new(config);
214
215 assert_eq!(detector.current_state(), PartitionState::Normal);
216 assert!(detector.should_serve());
217 }
218
219 #[test]
220 fn test_update_last_seen() {
221 let config = create_test_config();
222 let detector = PartitionDetector::new(config);
223
224 detector.update_last_seen("node1");
225 detector.update_last_seen("node2");
226
227 let cluster_state = BTreeMap::new();
229 let state = detector.detect_partition(&cluster_state);
230 assert_eq!(state, PartitionState::Normal);
231 }
232
233 #[test]
234 fn test_detect_partition_normal() {
235 let config = create_test_config();
236 let detector = PartitionDetector::new(config);
237
238 let mut cluster_state = BTreeMap::new();
239 cluster_state.insert(
240 "node1".to_string(),
241 create_node_state("node1", "127.0.0.1:8080", NodeStatus::Alive),
242 );
243 cluster_state.insert(
244 "node2".to_string(),
245 create_node_state("node2", "127.0.0.1:8081", NodeStatus::Alive),
246 );
247 cluster_state.insert(
248 "node3".to_string(),
249 create_node_state("node3", "127.0.0.1:8082", NodeStatus::Alive),
250 );
251
252 detector.update_last_seen("node1");
254 detector.update_last_seen("node2");
255 detector.update_last_seen("node3");
256
257 let state = detector.detect_partition(&cluster_state);
258 assert_eq!(state, PartitionState::Normal);
259 assert!(detector.should_serve());
260 }
261
262 #[test]
263 fn test_detect_partition_with_quorum() {
264 let config = create_test_config();
265 let detector = PartitionDetector::new(config);
266
267 let mut cluster_state = BTreeMap::new();
268 cluster_state.insert(
269 "node1".to_string(),
270 create_node_state("node1", "127.0.0.1:8080", NodeStatus::Alive),
271 );
272 cluster_state.insert(
273 "node2".to_string(),
274 create_node_state("node2", "127.0.0.1:8081", NodeStatus::Alive),
275 );
276 cluster_state.insert(
277 "node3".to_string(),
278 create_node_state("node3", "127.0.0.1:8082", NodeStatus::Alive),
279 );
280
281 detector.update_last_seen("node1");
283 detector.update_last_seen("node2");
284
285 detector.update_last_seen("node3");
289 std::thread::sleep(Duration::from_millis(150));
290
291 detector.update_last_seen("node1");
293 detector.update_last_seen("node2");
294
295 let state = detector.detect_partition(&cluster_state);
296 assert_eq!(state, PartitionState::PartitionedWithQuorum);
298 assert!(detector.should_serve());
299 }
300
301 #[test]
302 fn test_detect_partition_without_quorum() {
303 let mut config = create_test_config();
304 config.quorum_threshold = 2;
305 let detector = PartitionDetector::new(config);
306
307 let mut cluster_state = BTreeMap::new();
308 cluster_state.insert(
309 "node1".to_string(),
310 create_node_state("node1", "127.0.0.1:8080", NodeStatus::Alive),
311 );
312 cluster_state.insert(
313 "node2".to_string(),
314 create_node_state("node2", "127.0.0.1:8081", NodeStatus::Alive),
315 );
316 cluster_state.insert(
317 "node3".to_string(),
318 create_node_state("node3", "127.0.0.1:8082", NodeStatus::Alive),
319 );
320
321 detector.update_last_seen("node1");
323 detector.update_last_seen("node2");
324 detector.update_last_seen("node3");
325
326 std::thread::sleep(Duration::from_millis(150));
328
329 detector.update_last_seen("node1");
331
332 let state = detector.detect_partition(&cluster_state);
333 assert_eq!(state, PartitionState::PartitionedWithoutQuorum);
335 assert!(!detector.should_serve());
336 }
337
338 #[test]
339 fn test_has_quorum() {
340 let config = create_test_config();
341 let detector = PartitionDetector::new(config);
342
343 assert!(detector.has_quorum(2));
344 assert!(detector.has_quorum(3));
345 assert!(!detector.has_quorum(1));
346 assert!(!detector.has_quorum(0));
347 }
348
349 #[test]
350 fn test_get_unreachable_nodes() {
351 let config = create_test_config();
352 let detector = PartitionDetector::new(config);
353
354 let mut cluster_state = BTreeMap::new();
355 cluster_state.insert(
356 "node1".to_string(),
357 create_node_state("node1", "127.0.0.1:8080", NodeStatus::Alive),
358 );
359 cluster_state.insert(
360 "node2".to_string(),
361 create_node_state("node2", "127.0.0.1:8081", NodeStatus::Alive),
362 );
363 cluster_state.insert(
364 "node3".to_string(),
365 create_node_state("node3", "127.0.0.1:8082", NodeStatus::Alive),
366 );
367
368 detector.update_last_seen("node1");
370 detector.update_last_seen("node2");
371 detector.update_last_seen("node3");
372
373 let unreachable = detector.get_unreachable_nodes(&cluster_state);
375 assert!(unreachable.is_empty());
376
377 std::thread::sleep(Duration::from_millis(150));
379
380 let unreachable = detector.get_unreachable_nodes(&cluster_state);
382 assert_eq!(unreachable.len(), 3);
383 assert!(unreachable.contains(&"node1".to_string()));
384 assert!(unreachable.contains(&"node2".to_string()));
385 assert!(unreachable.contains(&"node3".to_string()));
386 }
387
388 #[test]
389 fn test_get_unreachable_nodes_with_recent_updates() {
390 let config = create_test_config();
391 let detector = PartitionDetector::new(config);
392
393 let mut cluster_state = BTreeMap::new();
394 cluster_state.insert(
395 "node1".to_string(),
396 create_node_state("node1", "127.0.0.1:8080", NodeStatus::Alive),
397 );
398 cluster_state.insert(
399 "node2".to_string(),
400 create_node_state("node2", "127.0.0.1:8081", NodeStatus::Alive),
401 );
402
403 detector.update_last_seen("node1");
405 std::thread::sleep(Duration::from_millis(50));
406
407 detector.update_last_seen("node2");
409
410 std::thread::sleep(Duration::from_millis(60));
412
413 let unreachable = detector.get_unreachable_nodes(&cluster_state);
414 assert!(unreachable.contains(&"node1".to_string()));
416 assert!(!unreachable.contains(&"node2".to_string()));
417 }
418
419 #[test]
420 fn test_detect_partition_ignores_non_alive_nodes() {
421 let config = create_test_config();
422 let detector = PartitionDetector::new(config);
423
424 let mut cluster_state = BTreeMap::new();
425 cluster_state.insert(
426 "node1".to_string(),
427 create_node_state("node1", "127.0.0.1:8080", NodeStatus::Alive),
428 );
429 cluster_state.insert(
430 "node2".to_string(),
431 create_node_state("node2", "127.0.0.1:8081", NodeStatus::Down),
432 );
433 cluster_state.insert(
434 "node3".to_string(),
435 create_node_state("node3", "127.0.0.1:8082", NodeStatus::Suspected),
436 );
437
438 detector.update_last_seen("node1");
439
440 let state = detector.detect_partition(&cluster_state);
441 assert_eq!(state, PartitionState::Normal);
445 }
446
447 #[test]
448 fn test_new_node_considered_reachable() {
449 let config = create_test_config();
450 let detector = PartitionDetector::new(config);
451
452 let mut cluster_state = BTreeMap::new();
453 cluster_state.insert(
454 "node1".to_string(),
455 create_node_state("node1", "127.0.0.1:8080", NodeStatus::Alive),
456 );
457 cluster_state.insert(
458 "new_node".to_string(),
459 create_node_state("new_node", "127.0.0.1:8083", NodeStatus::Alive),
460 );
461
462 detector.update_last_seen("node1");
464
465 let state = detector.detect_partition(&cluster_state);
466 assert_eq!(state, PartitionState::Normal);
468 }
469
470 #[test]
471 fn test_should_serve() {
472 let config = create_test_config();
473 let detector = PartitionDetector::new(config);
474
475 *detector.current_state.write() = PartitionState::Normal;
477 assert!(detector.should_serve());
478
479 *detector.current_state.write() = PartitionState::PartitionedWithQuorum;
481 assert!(detector.should_serve());
482
483 *detector.current_state.write() = PartitionState::PartitionedWithoutQuorum;
485 assert!(!detector.should_serve());
486 }
487
488 #[test]
489 fn test_default_implementation() {
490 let detector = PartitionDetector::default();
491 assert_eq!(detector.current_state(), PartitionState::Normal);
492 assert!(detector.should_serve());
493 }
494
495 #[test]
496 fn test_partition_state_equality() {
497 assert_eq!(PartitionState::Normal, PartitionState::Normal);
498 assert_ne!(
499 PartitionState::Normal,
500 PartitionState::PartitionedWithQuorum
501 );
502 assert_ne!(
503 PartitionState::PartitionedWithQuorum,
504 PartitionState::PartitionedWithoutQuorum
505 );
506 }
507}