1use dashmap::DashMap;
27use serde::{Deserialize, Serialize};
28use std::sync::{
29 Arc,
30 atomic::{AtomicU64, Ordering},
31};
32use tokio::sync::RwLock;
33
34use super::node_registry::{Node, NodeRegistry};
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct ClusterMember {
39 pub node_id: u32,
41 pub api_address: String,
43 pub replication_address: String,
45 pub role: MemberRole,
47 pub last_wal_offset: u64,
49 pub last_heartbeat_ms: u64,
51 pub healthy: bool,
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
57#[serde(rename_all = "lowercase")]
58pub enum MemberRole {
59 Leader,
60 Follower,
61 Candidate,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct VoteRequest {
67 pub term: u64,
69 pub candidate_id: u32,
71 pub last_wal_offset: u64,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct VoteResponse {
78 pub term: u64,
80 pub vote_granted: bool,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct ClusterStatus {
87 pub term: u64,
89 pub leader_id: Option<u32>,
91 pub self_id: u32,
93 pub self_role: MemberRole,
95 pub member_count: usize,
97 pub healthy_count: usize,
99 pub partition_count: u32,
101 pub members: Vec<ClusterMember>,
103}
104
105pub struct ClusterManager {
107 self_id: u32,
109 role: RwLock<MemberRole>,
111 term: AtomicU64,
113 voted_for: RwLock<Option<u32>>,
115 leader_id: RwLock<Option<u32>>,
117 members: DashMap<u32, ClusterMember>,
119 registry: Arc<NodeRegistry>,
121}
122
123impl ClusterManager {
124 pub fn new(self_id: u32, partition_count: u32) -> Self {
130 Self {
131 self_id,
132 role: RwLock::new(MemberRole::Follower),
133 term: AtomicU64::new(0),
134 voted_for: RwLock::new(None),
135 leader_id: RwLock::new(None),
136 members: DashMap::new(),
137 registry: Arc::new(NodeRegistry::new(partition_count)),
138 }
139 }
140
141 pub fn registry(&self) -> &Arc<NodeRegistry> {
143 &self.registry
144 }
145
146 pub fn self_id(&self) -> u32 {
148 self.self_id
149 }
150
151 pub fn current_term(&self) -> u64 {
153 self.term.load(Ordering::SeqCst)
154 }
155
156 pub async fn current_role(&self) -> MemberRole {
158 *self.role.read().await
159 }
160
161 pub async fn leader_id(&self) -> Option<u32> {
163 *self.leader_id.read().await
164 }
165
166 pub async fn add_member(&self, member: ClusterMember) {
172 let node_id = member.node_id;
173 let healthy = member.healthy;
174
175 self.registry.register_node(Node {
177 id: node_id,
178 address: member.api_address.clone(),
179 healthy,
180 assigned_partitions: vec![],
181 });
182
183 self.members.insert(node_id, member);
185
186 if node_id == self.self_id {
188 let mut role = self.role.write().await;
189 let member_ref = self.members.get(&node_id).unwrap();
190 *role = member_ref.role;
191 }
192 }
193
194 pub async fn remove_member(&self, node_id: u32) -> Option<ClusterMember> {
196 self.registry.unregister_node(node_id);
197 let removed = self.members.remove(&node_id).map(|(_, m)| m);
198
199 let leader = *self.leader_id.read().await;
201 if leader == Some(node_id) {
202 *self.leader_id.write().await = None;
203 }
204
205 removed
206 }
207
208 pub fn update_member_heartbeat(&self, node_id: u32, wal_offset: u64, healthy: bool) {
210 if let Some(mut member) = self.members.get_mut(&node_id) {
211 member.last_wal_offset = wal_offset;
212 member.healthy = healthy;
213 member.last_heartbeat_ms = std::time::SystemTime::now()
214 .duration_since(std::time::UNIX_EPOCH)
215 .unwrap_or_default()
216 .as_millis() as u64;
217 }
218 self.registry.set_node_health(node_id, healthy);
219 }
220
221 pub fn get_member(&self, node_id: u32) -> Option<ClusterMember> {
223 self.members.get(&node_id).map(|m| m.clone())
224 }
225
226 pub fn all_members(&self) -> Vec<ClusterMember> {
228 self.members.iter().map(|m| m.value().clone()).collect()
229 }
230
231 pub fn healthy_members(&self) -> Vec<ClusterMember> {
233 self.members
234 .iter()
235 .filter(|m| m.value().healthy)
236 .map(|m| m.value().clone())
237 .collect()
238 }
239
240 pub fn member_count(&self) -> usize {
242 self.members.len()
243 }
244
245 pub async fn handle_vote_request(&self, request: &VoteRequest) -> VoteResponse {
256 let current_term = self.term.load(Ordering::SeqCst);
257
258 if request.term < current_term {
260 return VoteResponse {
261 term: current_term,
262 vote_granted: false,
263 };
264 }
265
266 if request.term > current_term {
268 self.term.store(request.term, Ordering::SeqCst);
269 *self.voted_for.write().await = None;
270 *self.role.write().await = MemberRole::Follower;
272 }
273
274 let mut voted_for = self.voted_for.write().await;
275 let current_term = self.term.load(Ordering::SeqCst);
276
277 let can_vote = match *voted_for {
279 None => true,
280 Some(id) => id == request.candidate_id,
281 };
282
283 if can_vote {
284 let self_offset = self
286 .members
287 .get(&self.self_id)
288 .map(|m| m.last_wal_offset)
289 .unwrap_or(0);
290
291 if request.last_wal_offset >= self_offset {
292 *voted_for = Some(request.candidate_id);
293 return VoteResponse {
294 term: current_term,
295 vote_granted: true,
296 };
297 }
298 }
299
300 VoteResponse {
301 term: current_term,
302 vote_granted: false,
303 }
304 }
305
306 pub async fn start_election(&self) -> u64 {
310 let new_term = self.term.fetch_add(1, Ordering::SeqCst) + 1;
311 *self.role.write().await = MemberRole::Candidate;
312 *self.voted_for.write().await = Some(self.self_id);
313 *self.leader_id.write().await = None;
314 new_term
315 }
316
317 pub async fn become_leader(&self, term: u64) {
321 let current_term = self.term.load(Ordering::SeqCst);
322 if term != current_term {
323 return; }
325 *self.role.write().await = MemberRole::Leader;
326 *self.leader_id.write().await = Some(self.self_id);
327
328 if let Some(mut member) = self.members.get_mut(&self.self_id) {
330 member.role = MemberRole::Leader;
331 }
332 }
333
334 pub async fn accept_leader(&self, leader_id: u32, term: u64) {
336 let current_term = self.term.load(Ordering::SeqCst);
337 if term < current_term {
338 return; }
340 if term > current_term {
341 self.term.store(term, Ordering::SeqCst);
342 *self.voted_for.write().await = None;
343 }
344 *self.role.write().await = MemberRole::Follower;
345 *self.leader_id.write().await = Some(leader_id);
346
347 for mut member in self.members.iter_mut() {
349 member.role = if member.node_id == leader_id {
350 MemberRole::Leader
351 } else {
352 MemberRole::Follower
353 };
354 }
355 }
356
357 pub fn select_leader_candidate(&self) -> Option<u32> {
361 let mut best: Option<(u32, u64)> = None; for member in self.members.iter() {
364 if !member.healthy {
365 continue;
366 }
367 match best {
368 None => best = Some((member.node_id, member.last_wal_offset)),
369 Some((_, best_offset)) => {
370 if member.last_wal_offset > best_offset
371 || (member.last_wal_offset == best_offset
372 && member.node_id < best.unwrap().0)
373 {
374 best = Some((member.node_id, member.last_wal_offset));
375 }
376 }
377 }
378 }
379
380 best.map(|(id, _)| id)
381 }
382
383 pub async fn status(&self) -> ClusterStatus {
389 ClusterStatus {
390 term: self.term.load(Ordering::SeqCst),
391 leader_id: *self.leader_id.read().await,
392 self_id: self.self_id,
393 self_role: *self.role.read().await,
394 member_count: self.members.len(),
395 healthy_count: self.registry.healthy_node_count(),
396 partition_count: self
397 .registry
398 .partition_distribution()
399 .values()
400 .flat_map(|v| v.iter())
401 .count() as u32,
402 members: self.all_members(),
403 }
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410
411 fn make_member(id: u32, role: MemberRole, offset: u64) -> ClusterMember {
412 ClusterMember {
413 node_id: id,
414 api_address: format!("node-{}:3900", id),
415 replication_address: format!("node-{}:3910", id),
416 role,
417 last_wal_offset: offset,
418 last_heartbeat_ms: 0,
419 healthy: true,
420 }
421 }
422
423 #[tokio::test]
424 async fn test_create_cluster_manager() {
425 let cm = ClusterManager::new(0, 32);
426 assert_eq!(cm.self_id(), 0);
427 assert_eq!(cm.current_term(), 0);
428 assert_eq!(cm.current_role().await, MemberRole::Follower);
429 assert_eq!(cm.leader_id().await, None);
430 assert_eq!(cm.member_count(), 0);
431 }
432
433 #[tokio::test]
434 async fn test_add_and_remove_members() {
435 let cm = ClusterManager::new(0, 32);
436
437 cm.add_member(make_member(0, MemberRole::Leader, 100)).await;
438 cm.add_member(make_member(1, MemberRole::Follower, 90))
439 .await;
440 cm.add_member(make_member(2, MemberRole::Follower, 80))
441 .await;
442
443 assert_eq!(cm.member_count(), 3);
444 assert_eq!(cm.healthy_members().len(), 3);
445
446 let dist = cm.registry().partition_distribution();
448 assert_eq!(dist.len(), 3);
449
450 let removed = cm.remove_member(2).await;
452 assert!(removed.is_some());
453 assert_eq!(cm.member_count(), 2);
454 }
455
456 #[tokio::test]
457 async fn test_heartbeat_update() {
458 let cm = ClusterManager::new(0, 32);
459 cm.add_member(make_member(1, MemberRole::Follower, 50))
460 .await;
461
462 cm.update_member_heartbeat(1, 100, true);
463 let member = cm.get_member(1).unwrap();
464 assert_eq!(member.last_wal_offset, 100);
465 assert!(member.last_heartbeat_ms > 0);
466
467 cm.update_member_heartbeat(1, 100, false);
469 let member = cm.get_member(1).unwrap();
470 assert!(!member.healthy);
471 }
472
473 #[tokio::test]
474 async fn test_deterministic_leader_selection() {
475 let cm = ClusterManager::new(0, 32);
476
477 cm.add_member(make_member(0, MemberRole::Follower, 100))
478 .await;
479 cm.add_member(make_member(1, MemberRole::Follower, 200))
480 .await;
481 cm.add_member(make_member(2, MemberRole::Follower, 150))
482 .await;
483
484 let candidate = cm.select_leader_candidate();
486 assert_eq!(candidate, Some(1));
487 }
488
489 #[tokio::test]
490 async fn test_deterministic_leader_selection_tiebreak() {
491 let cm = ClusterManager::new(0, 32);
492
493 cm.add_member(make_member(0, MemberRole::Follower, 100))
494 .await;
495 cm.add_member(make_member(1, MemberRole::Follower, 100))
496 .await;
497 cm.add_member(make_member(2, MemberRole::Follower, 100))
498 .await;
499
500 let candidate = cm.select_leader_candidate();
502 assert_eq!(candidate, Some(0));
503 }
504
505 #[tokio::test]
506 async fn test_leader_selection_skips_unhealthy() {
507 let cm = ClusterManager::new(0, 32);
508
509 cm.add_member(make_member(0, MemberRole::Follower, 200))
510 .await;
511 cm.add_member(make_member(1, MemberRole::Follower, 100))
512 .await;
513
514 cm.update_member_heartbeat(0, 200, false);
516
517 let candidate = cm.select_leader_candidate();
518 assert_eq!(candidate, Some(1));
519 }
520
521 #[tokio::test]
522 async fn test_vote_request_grants_vote() {
523 let cm = ClusterManager::new(0, 32);
524 cm.add_member(make_member(0, MemberRole::Follower, 50))
525 .await;
526
527 let request = VoteRequest {
528 term: 1,
529 candidate_id: 1,
530 last_wal_offset: 100,
531 };
532
533 let response = cm.handle_vote_request(&request).await;
534 assert!(response.vote_granted);
535 assert_eq!(response.term, 1);
536 }
537
538 #[tokio::test]
539 async fn test_vote_request_rejects_stale_term() {
540 let cm = ClusterManager::new(0, 32);
541 cm.add_member(make_member(0, MemberRole::Follower, 50))
542 .await;
543
544 cm.term.store(5, Ordering::SeqCst);
546
547 let request = VoteRequest {
548 term: 3, candidate_id: 1,
550 last_wal_offset: 100,
551 };
552
553 let response = cm.handle_vote_request(&request).await;
554 assert!(!response.vote_granted);
555 assert_eq!(response.term, 5);
556 }
557
558 #[tokio::test]
559 async fn test_vote_request_rejects_duplicate_vote() {
560 let cm = ClusterManager::new(0, 32);
561 cm.add_member(make_member(0, MemberRole::Follower, 50))
562 .await;
563
564 let request1 = VoteRequest {
566 term: 1,
567 candidate_id: 1,
568 last_wal_offset: 100,
569 };
570 let response1 = cm.handle_vote_request(&request1).await;
571 assert!(response1.vote_granted);
572
573 let request2 = VoteRequest {
575 term: 1,
576 candidate_id: 2,
577 last_wal_offset: 100,
578 };
579 let response2 = cm.handle_vote_request(&request2).await;
580 assert!(!response2.vote_granted);
581 }
582
583 #[tokio::test]
584 async fn test_start_election() {
585 let cm = ClusterManager::new(0, 32);
586
587 let new_term = cm.start_election().await;
588 assert_eq!(new_term, 1);
589 assert_eq!(cm.current_term(), 1);
590 assert_eq!(cm.current_role().await, MemberRole::Candidate);
591 }
592
593 #[tokio::test]
594 async fn test_become_leader() {
595 let cm = ClusterManager::new(0, 32);
596 cm.add_member(make_member(0, MemberRole::Follower, 100))
597 .await;
598
599 let term = cm.start_election().await;
600 cm.become_leader(term).await;
601
602 assert_eq!(cm.current_role().await, MemberRole::Leader);
603 assert_eq!(cm.leader_id().await, Some(0));
604 }
605
606 #[tokio::test]
607 async fn test_accept_leader() {
608 let cm = ClusterManager::new(1, 32);
609 cm.add_member(make_member(0, MemberRole::Follower, 100))
610 .await;
611 cm.add_member(make_member(1, MemberRole::Follower, 90))
612 .await;
613
614 cm.accept_leader(0, 1).await;
615
616 assert_eq!(cm.current_role().await, MemberRole::Follower);
617 assert_eq!(cm.leader_id().await, Some(0));
618 assert_eq!(cm.current_term(), 1);
619
620 let leader = cm.get_member(0).unwrap();
622 assert_eq!(leader.role, MemberRole::Leader);
623 }
624
625 #[tokio::test]
626 async fn test_stale_become_leader_ignored() {
627 let cm = ClusterManager::new(0, 32);
628 cm.add_member(make_member(0, MemberRole::Follower, 100))
629 .await;
630
631 let _term = cm.start_election().await; cm.accept_leader(1, 2).await; cm.become_leader(1).await;
638 assert_eq!(cm.current_role().await, MemberRole::Follower);
639 assert_eq!(cm.leader_id().await, Some(1));
640 }
641
642 #[tokio::test]
643 async fn test_cluster_status() {
644 let cm = ClusterManager::new(0, 32);
645 cm.add_member(make_member(0, MemberRole::Leader, 100)).await;
646 cm.add_member(make_member(1, MemberRole::Follower, 90))
647 .await;
648
649 let status = cm.status().await;
650 assert_eq!(status.self_id, 0);
651 assert_eq!(status.member_count, 2);
652 assert_eq!(status.healthy_count, 2);
653 assert_eq!(status.members.len(), 2);
654 }
655
656 #[tokio::test]
657 async fn test_remove_leader_clears_leader_id() {
658 let cm = ClusterManager::new(1, 32);
659 cm.add_member(make_member(0, MemberRole::Leader, 100)).await;
660 cm.add_member(make_member(1, MemberRole::Follower, 90))
661 .await;
662
663 *cm.leader_id.write().await = Some(0);
664
665 cm.remove_member(0).await;
666 assert_eq!(cm.leader_id().await, None);
667 }
668}