1use dashmap::DashMap;
27use serde::{Deserialize, Serialize};
28use std::sync::{
29 Arc,
30 atomic::{AtomicU64, Ordering},
31};
32use tokio::sync::RwLock;
33
34use super::node_registry::{Node, NodeRegistry};
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct ClusterMember {
39 pub node_id: u32,
41 pub api_address: String,
43 pub replication_address: String,
45 pub role: MemberRole,
47 pub last_wal_offset: u64,
49 pub last_heartbeat_ms: u64,
51 pub healthy: bool,
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
57#[serde(rename_all = "lowercase")]
58pub enum MemberRole {
59 Leader,
60 Follower,
61 Candidate,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct VoteRequest {
67 pub term: u64,
69 pub candidate_id: u32,
71 pub last_wal_offset: u64,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct VoteResponse {
78 pub term: u64,
80 pub vote_granted: bool,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct ClusterStatus {
87 pub term: u64,
89 pub leader_id: Option<u32>,
91 pub self_id: u32,
93 pub self_role: MemberRole,
95 pub member_count: usize,
97 pub healthy_count: usize,
99 pub partition_count: u32,
101 pub members: Vec<ClusterMember>,
103}
104
105pub struct ClusterManager {
107 self_id: u32,
109 role: RwLock<MemberRole>,
111 term: AtomicU64,
113 voted_for: RwLock<Option<u32>>,
115 leader_id: RwLock<Option<u32>>,
117 members: DashMap<u32, ClusterMember>,
119 registry: Arc<NodeRegistry>,
121}
122
123impl ClusterManager {
124 pub fn new(self_id: u32, partition_count: u32) -> Self {
130 Self {
131 self_id,
132 role: RwLock::new(MemberRole::Follower),
133 term: AtomicU64::new(0),
134 voted_for: RwLock::new(None),
135 leader_id: RwLock::new(None),
136 members: DashMap::new(),
137 registry: Arc::new(NodeRegistry::new(partition_count)),
138 }
139 }
140
141 pub fn registry(&self) -> &Arc<NodeRegistry> {
143 &self.registry
144 }
145
146 pub fn self_id(&self) -> u32 {
148 self.self_id
149 }
150
151 pub fn current_term(&self) -> u64 {
153 self.term.load(Ordering::SeqCst)
154 }
155
156 pub async fn current_role(&self) -> MemberRole {
158 *self.role.read().await
159 }
160
161 pub async fn leader_id(&self) -> Option<u32> {
163 *self.leader_id.read().await
164 }
165
166 pub async fn add_member(&self, member: ClusterMember) {
172 let node_id = member.node_id;
173 let healthy = member.healthy;
174
175 self.registry.register_node(Node {
177 id: node_id,
178 address: member.api_address.clone(),
179 healthy,
180 assigned_partitions: vec![],
181 });
182
183 self.members.insert(node_id, member);
185
186 if node_id == self.self_id {
188 let mut role = self.role.write().await;
189 let member_ref = self.members.get(&node_id).unwrap();
190 *role = member_ref.role;
191 }
192 }
193
194 pub async fn remove_member(&self, node_id: u32) -> Option<ClusterMember> {
196 self.registry.unregister_node(node_id);
197 let removed = self.members.remove(&node_id).map(|(_, m)| m);
198
199 let leader = *self.leader_id.read().await;
201 if leader == Some(node_id) {
202 *self.leader_id.write().await = None;
203 }
204
205 removed
206 }
207
208 pub fn update_member_heartbeat(&self, node_id: u32, wal_offset: u64, healthy: bool) {
210 if let Some(mut member) = self.members.get_mut(&node_id) {
211 member.last_wal_offset = wal_offset;
212 member.healthy = healthy;
213 member.last_heartbeat_ms = std::time::SystemTime::now()
214 .duration_since(std::time::UNIX_EPOCH)
215 .unwrap_or_default()
216 .as_millis() as u64;
217 }
218 self.registry.set_node_health(node_id, healthy);
219 }
220
221 pub fn get_member(&self, node_id: u32) -> Option<ClusterMember> {
223 self.members.get(&node_id).map(|m| m.clone())
224 }
225
226 pub fn all_members(&self) -> Vec<ClusterMember> {
228 self.members.iter().map(|m| m.value().clone()).collect()
229 }
230
231 pub fn healthy_members(&self) -> Vec<ClusterMember> {
233 self.members
234 .iter()
235 .filter(|m| m.value().healthy)
236 .map(|m| m.value().clone())
237 .collect()
238 }
239
240 pub fn member_count(&self) -> usize {
242 self.members.len()
243 }
244
245 pub async fn handle_vote_request(&self, request: &VoteRequest) -> VoteResponse {
256 let current_term = self.term.load(Ordering::SeqCst);
257
258 if request.term < current_term {
260 return VoteResponse {
261 term: current_term,
262 vote_granted: false,
263 };
264 }
265
266 if request.term > current_term {
268 self.term.store(request.term, Ordering::SeqCst);
269 *self.voted_for.write().await = None;
270 *self.role.write().await = MemberRole::Follower;
272 }
273
274 let mut voted_for = self.voted_for.write().await;
275 let current_term = self.term.load(Ordering::SeqCst);
276
277 let can_vote = match *voted_for {
279 None => true,
280 Some(id) => id == request.candidate_id,
281 };
282
283 if can_vote {
284 let self_offset = self
286 .members
287 .get(&self.self_id)
288 .map_or(0, |m| m.last_wal_offset);
289
290 if request.last_wal_offset >= self_offset {
291 *voted_for = Some(request.candidate_id);
292 return VoteResponse {
293 term: current_term,
294 vote_granted: true,
295 };
296 }
297 }
298
299 VoteResponse {
300 term: current_term,
301 vote_granted: false,
302 }
303 }
304
305 pub async fn start_election(&self) -> u64 {
309 let new_term = self.term.fetch_add(1, Ordering::SeqCst) + 1;
310 *self.role.write().await = MemberRole::Candidate;
311 *self.voted_for.write().await = Some(self.self_id);
312 *self.leader_id.write().await = None;
313 new_term
314 }
315
316 pub async fn become_leader(&self, term: u64) {
320 let current_term = self.term.load(Ordering::SeqCst);
321 if term != current_term {
322 return; }
324 *self.role.write().await = MemberRole::Leader;
325 *self.leader_id.write().await = Some(self.self_id);
326
327 if let Some(mut member) = self.members.get_mut(&self.self_id) {
329 member.role = MemberRole::Leader;
330 }
331 }
332
333 pub async fn accept_leader(&self, leader_id: u32, term: u64) {
335 let current_term = self.term.load(Ordering::SeqCst);
336 if term < current_term {
337 return; }
339 if term > current_term {
340 self.term.store(term, Ordering::SeqCst);
341 *self.voted_for.write().await = None;
342 }
343 *self.role.write().await = MemberRole::Follower;
344 *self.leader_id.write().await = Some(leader_id);
345
346 for mut member in self.members.iter_mut() {
348 member.role = if member.node_id == leader_id {
349 MemberRole::Leader
350 } else {
351 MemberRole::Follower
352 };
353 }
354 }
355
356 pub fn select_leader_candidate(&self) -> Option<u32> {
360 let mut best: Option<(u32, u64)> = None; for member in &self.members {
363 if !member.healthy {
364 continue;
365 }
366 match best {
367 None => best = Some((member.node_id, member.last_wal_offset)),
368 Some((_, best_offset)) => {
369 if member.last_wal_offset > best_offset
370 || (member.last_wal_offset == best_offset
371 && member.node_id < best.unwrap().0)
372 {
373 best = Some((member.node_id, member.last_wal_offset));
374 }
375 }
376 }
377 }
378
379 best.map(|(id, _)| id)
380 }
381
382 pub async fn status(&self) -> ClusterStatus {
388 ClusterStatus {
389 term: self.term.load(Ordering::SeqCst),
390 leader_id: *self.leader_id.read().await,
391 self_id: self.self_id,
392 self_role: *self.role.read().await,
393 member_count: self.members.len(),
394 healthy_count: self.registry.healthy_node_count(),
395 partition_count: self
396 .registry
397 .partition_distribution()
398 .values()
399 .flat_map(|v| v.iter())
400 .count() as u32,
401 members: self.all_members(),
402 }
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use super::*;
409
410 fn make_member(id: u32, role: MemberRole, offset: u64) -> ClusterMember {
411 ClusterMember {
412 node_id: id,
413 api_address: format!("node-{id}:3900"),
414 replication_address: format!("node-{id}:3910"),
415 role,
416 last_wal_offset: offset,
417 last_heartbeat_ms: 0,
418 healthy: true,
419 }
420 }
421
422 #[tokio::test]
423 async fn test_create_cluster_manager() {
424 let cm = ClusterManager::new(0, 32);
425 assert_eq!(cm.self_id(), 0);
426 assert_eq!(cm.current_term(), 0);
427 assert_eq!(cm.current_role().await, MemberRole::Follower);
428 assert_eq!(cm.leader_id().await, None);
429 assert_eq!(cm.member_count(), 0);
430 }
431
432 #[tokio::test]
433 async fn test_add_and_remove_members() {
434 let cm = ClusterManager::new(0, 32);
435
436 cm.add_member(make_member(0, MemberRole::Leader, 100)).await;
437 cm.add_member(make_member(1, MemberRole::Follower, 90))
438 .await;
439 cm.add_member(make_member(2, MemberRole::Follower, 80))
440 .await;
441
442 assert_eq!(cm.member_count(), 3);
443 assert_eq!(cm.healthy_members().len(), 3);
444
445 let dist = cm.registry().partition_distribution();
447 assert_eq!(dist.len(), 3);
448
449 let removed = cm.remove_member(2).await;
451 assert!(removed.is_some());
452 assert_eq!(cm.member_count(), 2);
453 }
454
455 #[tokio::test]
456 async fn test_heartbeat_update() {
457 let cm = ClusterManager::new(0, 32);
458 cm.add_member(make_member(1, MemberRole::Follower, 50))
459 .await;
460
461 cm.update_member_heartbeat(1, 100, true);
462 let member = cm.get_member(1).unwrap();
463 assert_eq!(member.last_wal_offset, 100);
464 assert!(member.last_heartbeat_ms > 0);
465
466 cm.update_member_heartbeat(1, 100, false);
468 let member = cm.get_member(1).unwrap();
469 assert!(!member.healthy);
470 }
471
472 #[tokio::test]
473 async fn test_deterministic_leader_selection() {
474 let cm = ClusterManager::new(0, 32);
475
476 cm.add_member(make_member(0, MemberRole::Follower, 100))
477 .await;
478 cm.add_member(make_member(1, MemberRole::Follower, 200))
479 .await;
480 cm.add_member(make_member(2, MemberRole::Follower, 150))
481 .await;
482
483 let candidate = cm.select_leader_candidate();
485 assert_eq!(candidate, Some(1));
486 }
487
488 #[tokio::test]
489 async fn test_deterministic_leader_selection_tiebreak() {
490 let cm = ClusterManager::new(0, 32);
491
492 cm.add_member(make_member(0, MemberRole::Follower, 100))
493 .await;
494 cm.add_member(make_member(1, MemberRole::Follower, 100))
495 .await;
496 cm.add_member(make_member(2, MemberRole::Follower, 100))
497 .await;
498
499 let candidate = cm.select_leader_candidate();
501 assert_eq!(candidate, Some(0));
502 }
503
504 #[tokio::test]
505 async fn test_leader_selection_skips_unhealthy() {
506 let cm = ClusterManager::new(0, 32);
507
508 cm.add_member(make_member(0, MemberRole::Follower, 200))
509 .await;
510 cm.add_member(make_member(1, MemberRole::Follower, 100))
511 .await;
512
513 cm.update_member_heartbeat(0, 200, false);
515
516 let candidate = cm.select_leader_candidate();
517 assert_eq!(candidate, Some(1));
518 }
519
520 #[tokio::test]
521 async fn test_vote_request_grants_vote() {
522 let cm = ClusterManager::new(0, 32);
523 cm.add_member(make_member(0, MemberRole::Follower, 50))
524 .await;
525
526 let request = VoteRequest {
527 term: 1,
528 candidate_id: 1,
529 last_wal_offset: 100,
530 };
531
532 let response = cm.handle_vote_request(&request).await;
533 assert!(response.vote_granted);
534 assert_eq!(response.term, 1);
535 }
536
537 #[tokio::test]
538 async fn test_vote_request_rejects_stale_term() {
539 let cm = ClusterManager::new(0, 32);
540 cm.add_member(make_member(0, MemberRole::Follower, 50))
541 .await;
542
543 cm.term.store(5, Ordering::SeqCst);
545
546 let request = VoteRequest {
547 term: 3, candidate_id: 1,
549 last_wal_offset: 100,
550 };
551
552 let response = cm.handle_vote_request(&request).await;
553 assert!(!response.vote_granted);
554 assert_eq!(response.term, 5);
555 }
556
557 #[tokio::test]
558 async fn test_vote_request_rejects_duplicate_vote() {
559 let cm = ClusterManager::new(0, 32);
560 cm.add_member(make_member(0, MemberRole::Follower, 50))
561 .await;
562
563 let request1 = VoteRequest {
565 term: 1,
566 candidate_id: 1,
567 last_wal_offset: 100,
568 };
569 let response1 = cm.handle_vote_request(&request1).await;
570 assert!(response1.vote_granted);
571
572 let request2 = VoteRequest {
574 term: 1,
575 candidate_id: 2,
576 last_wal_offset: 100,
577 };
578 let response2 = cm.handle_vote_request(&request2).await;
579 assert!(!response2.vote_granted);
580 }
581
582 #[tokio::test]
583 async fn test_start_election() {
584 let cm = ClusterManager::new(0, 32);
585
586 let new_term = cm.start_election().await;
587 assert_eq!(new_term, 1);
588 assert_eq!(cm.current_term(), 1);
589 assert_eq!(cm.current_role().await, MemberRole::Candidate);
590 }
591
592 #[tokio::test]
593 async fn test_become_leader() {
594 let cm = ClusterManager::new(0, 32);
595 cm.add_member(make_member(0, MemberRole::Follower, 100))
596 .await;
597
598 let term = cm.start_election().await;
599 cm.become_leader(term).await;
600
601 assert_eq!(cm.current_role().await, MemberRole::Leader);
602 assert_eq!(cm.leader_id().await, Some(0));
603 }
604
605 #[tokio::test]
606 async fn test_accept_leader() {
607 let cm = ClusterManager::new(1, 32);
608 cm.add_member(make_member(0, MemberRole::Follower, 100))
609 .await;
610 cm.add_member(make_member(1, MemberRole::Follower, 90))
611 .await;
612
613 cm.accept_leader(0, 1).await;
614
615 assert_eq!(cm.current_role().await, MemberRole::Follower);
616 assert_eq!(cm.leader_id().await, Some(0));
617 assert_eq!(cm.current_term(), 1);
618
619 let leader = cm.get_member(0).unwrap();
621 assert_eq!(leader.role, MemberRole::Leader);
622 }
623
624 #[tokio::test]
625 async fn test_stale_become_leader_ignored() {
626 let cm = ClusterManager::new(0, 32);
627 cm.add_member(make_member(0, MemberRole::Follower, 100))
628 .await;
629
630 let _term = cm.start_election().await; cm.accept_leader(1, 2).await; cm.become_leader(1).await;
637 assert_eq!(cm.current_role().await, MemberRole::Follower);
638 assert_eq!(cm.leader_id().await, Some(1));
639 }
640
641 #[tokio::test]
642 async fn test_cluster_status() {
643 let cm = ClusterManager::new(0, 32);
644 cm.add_member(make_member(0, MemberRole::Leader, 100)).await;
645 cm.add_member(make_member(1, MemberRole::Follower, 90))
646 .await;
647
648 let status = cm.status().await;
649 assert_eq!(status.self_id, 0);
650 assert_eq!(status.member_count, 2);
651 assert_eq!(status.healthy_count, 2);
652 assert_eq!(status.members.len(), 2);
653 }
654
655 #[tokio::test]
656 async fn test_remove_leader_clears_leader_id() {
657 let cm = ClusterManager::new(1, 32);
658 cm.add_member(make_member(0, MemberRole::Leader, 100)).await;
659 cm.add_member(make_member(1, MemberRole::Follower, 90))
660 .await;
661
662 *cm.leader_id.write().await = Some(0);
663
664 cm.remove_member(0).await;
665 assert_eq!(cm.leader_id().await, None);
666 }
667}