Skip to main content

allsource_core/infrastructure/cluster/
consensus.rs

1/// Simplified Raft-inspired Term-Based Consensus
2///
3/// Provides term-based leader election and cluster membership management
4/// for multi-node AllSource Core deployments.
5///
6/// # Design
7///
8/// This is a simplified Raft implementation focused on:
9/// - **Term-based leader election**: Monotonically increasing terms prevent split-brain
10/// - **Deterministic leader selection**: Highest term wins; on tie, highest WAL offset wins
11/// - **Manual failover support**: Integrate with existing `/internal/promote` + `/internal/repoint`
12///
13/// NOT implemented (handled by existing infrastructure):
14/// - Log replication (WAL shipping handles this)
15/// - Snapshot transfer (Parquet catch-up handles this)
16///
17/// # Architecture
18///
19/// ```text
20/// ClusterManager
21///   ├── NodeRegistry (partition assignment, health tracking)
22///   ├── term: AtomicU64 (current consensus term)
23///   ├── voted_for: RwLock<Option<u32>> (who we voted for this term)
24///   └── members: DashMap<u32, ClusterMember> (full member metadata)
25/// ```
26use dashmap::DashMap;
27use serde::{Deserialize, Serialize};
28use std::sync::{
29    Arc,
30    atomic::{AtomicU64, Ordering},
31};
32use tokio::sync::RwLock;
33
34use super::node_registry::{Node, NodeRegistry};
35
36/// Full cluster member metadata (extends Node with consensus state)
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct ClusterMember {
39    /// Unique node ID
40    pub node_id: u32,
41    /// Network address for API traffic (host:port)
42    pub api_address: String,
43    /// Network address for replication traffic (host:port)
44    pub replication_address: String,
45    /// Current role
46    pub role: MemberRole,
47    /// Last known WAL offset (used for leader selection)
48    pub last_wal_offset: u64,
49    /// Last heartbeat timestamp (millis since epoch)
50    pub last_heartbeat_ms: u64,
51    /// Whether the node is healthy
52    pub healthy: bool,
53}
54
55/// Role of a cluster member
56#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
57#[serde(rename_all = "lowercase")]
58pub enum MemberRole {
59    Leader,
60    Follower,
61    Candidate,
62}
63
64/// Vote request for leader election
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct VoteRequest {
67    /// Term number for this election
68    pub term: u64,
69    /// Candidate requesting the vote
70    pub candidate_id: u32,
71    /// Candidate's last WAL offset (for log completeness check)
72    pub last_wal_offset: u64,
73}
74
75/// Vote response
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct VoteResponse {
78    /// Current term (may be higher than requested)
79    pub term: u64,
80    /// Whether the vote was granted
81    pub vote_granted: bool,
82}
83
84/// Cluster status summary
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct ClusterStatus {
87    /// Current consensus term
88    pub term: u64,
89    /// Current leader node ID (if known)
90    pub leader_id: Option<u32>,
91    /// This node's ID
92    pub self_id: u32,
93    /// This node's role
94    pub self_role: MemberRole,
95    /// Total members
96    pub member_count: usize,
97    /// Healthy members
98    pub healthy_count: usize,
99    /// Partition count
100    pub partition_count: u32,
101    /// All members
102    pub members: Vec<ClusterMember>,
103}
104
105/// Cluster Manager — coordinates consensus, membership, and partition assignment
106pub struct ClusterManager {
107    /// This node's ID
108    self_id: u32,
109    /// This node's role
110    role: RwLock<MemberRole>,
111    /// Current consensus term (monotonically increasing)
112    term: AtomicU64,
113    /// Who we voted for in the current term
114    voted_for: RwLock<Option<u32>>,
115    /// Current leader ID
116    leader_id: RwLock<Option<u32>>,
117    /// Cluster members (full metadata)
118    members: DashMap<u32, ClusterMember>,
119    /// Node registry for partition assignment
120    registry: Arc<NodeRegistry>,
121}
122
123impl ClusterManager {
124    /// Create a new cluster manager
125    ///
126    /// # Arguments
127    /// - `self_id`: This node's unique ID
128    /// - `partition_count`: Total partitions for the cluster
129    pub fn new(self_id: u32, partition_count: u32) -> Self {
130        Self {
131            self_id,
132            role: RwLock::new(MemberRole::Follower),
133            term: AtomicU64::new(0),
134            voted_for: RwLock::new(None),
135            leader_id: RwLock::new(None),
136            members: DashMap::new(),
137            registry: Arc::new(NodeRegistry::new(partition_count)),
138        }
139    }
140
141    /// Get the underlying node registry (for partition routing)
142    pub fn registry(&self) -> &Arc<NodeRegistry> {
143        &self.registry
144    }
145
146    /// Get this node's ID
147    pub fn self_id(&self) -> u32 {
148        self.self_id
149    }
150
151    /// Get the current term
152    pub fn current_term(&self) -> u64 {
153        self.term.load(Ordering::SeqCst)
154    }
155
156    /// Get this node's current role
157    pub async fn current_role(&self) -> MemberRole {
158        *self.role.read().await
159    }
160
161    /// Get the current leader ID
162    pub async fn leader_id(&self) -> Option<u32> {
163        *self.leader_id.read().await
164    }
165
166    // -------------------------------------------------------------------------
167    // Membership management
168    // -------------------------------------------------------------------------
169
170    /// Register a new member in the cluster
171    pub async fn add_member(&self, member: ClusterMember) {
172        let node_id = member.node_id;
173        let healthy = member.healthy;
174
175        // Register in partition registry
176        self.registry.register_node(Node {
177            id: node_id,
178            address: member.api_address.clone(),
179            healthy,
180            assigned_partitions: vec![],
181        });
182
183        // Store full member metadata
184        self.members.insert(node_id, member);
185
186        // If adding self as leader, update role
187        if node_id == self.self_id {
188            let mut role = self.role.write().await;
189            let member_ref = self.members.get(&node_id).unwrap();
190            *role = member_ref.role;
191        }
192    }
193
194    /// Remove a member from the cluster
195    pub async fn remove_member(&self, node_id: u32) -> Option<ClusterMember> {
196        self.registry.unregister_node(node_id);
197        let removed = self.members.remove(&node_id).map(|(_, m)| m);
198
199        // If we removed the leader, clear leader_id
200        let leader = *self.leader_id.read().await;
201        if leader == Some(node_id) {
202            *self.leader_id.write().await = None;
203        }
204
205        removed
206    }
207
208    /// Update a member's health status and WAL offset
209    pub fn update_member_heartbeat(&self, node_id: u32, wal_offset: u64, healthy: bool) {
210        if let Some(mut member) = self.members.get_mut(&node_id) {
211            member.last_wal_offset = wal_offset;
212            member.healthy = healthy;
213            member.last_heartbeat_ms = std::time::SystemTime::now()
214                .duration_since(std::time::UNIX_EPOCH)
215                .unwrap_or_default()
216                .as_millis() as u64;
217        }
218        self.registry.set_node_health(node_id, healthy);
219    }
220
221    /// Get a member by ID
222    pub fn get_member(&self, node_id: u32) -> Option<ClusterMember> {
223        self.members.get(&node_id).map(|m| m.clone())
224    }
225
226    /// Get all members
227    pub fn all_members(&self) -> Vec<ClusterMember> {
228        self.members.iter().map(|m| m.value().clone()).collect()
229    }
230
231    /// Get healthy members
232    pub fn healthy_members(&self) -> Vec<ClusterMember> {
233        self.members
234            .iter()
235            .filter(|m| m.value().healthy)
236            .map(|m| m.value().clone())
237            .collect()
238    }
239
240    /// Get the number of members
241    pub fn member_count(&self) -> usize {
242        self.members.len()
243    }
244
245    // -------------------------------------------------------------------------
246    // Term-based consensus
247    // -------------------------------------------------------------------------
248
249    /// Handle a vote request (simplified Raft RequestVote RPC)
250    ///
251    /// Grants vote if:
252    /// 1. Request term >= our term
253    /// 2. We haven't voted for someone else in this term
254    /// 3. Candidate's WAL offset >= our knowledge of the log
255    pub async fn handle_vote_request(&self, request: &VoteRequest) -> VoteResponse {
256        let current_term = self.term.load(Ordering::SeqCst);
257
258        // If candidate's term is stale, reject
259        if request.term < current_term {
260            return VoteResponse {
261                term: current_term,
262                vote_granted: false,
263            };
264        }
265
266        // If candidate's term is higher, step down and update term
267        if request.term > current_term {
268            self.term.store(request.term, Ordering::SeqCst);
269            *self.voted_for.write().await = None;
270            // Step down to follower if we were leader/candidate
271            *self.role.write().await = MemberRole::Follower;
272        }
273
274        let mut voted_for = self.voted_for.write().await;
275        let current_term = self.term.load(Ordering::SeqCst);
276
277        // Check if we can vote for this candidate
278        let can_vote = match *voted_for {
279            None => true,
280            Some(id) => id == request.candidate_id,
281        };
282
283        if can_vote {
284            // Check log completeness: candidate must be at least as up-to-date
285            let self_offset = self
286                .members
287                .get(&self.self_id)
288                .map(|m| m.last_wal_offset)
289                .unwrap_or(0);
290
291            if request.last_wal_offset >= self_offset {
292                *voted_for = Some(request.candidate_id);
293                return VoteResponse {
294                    term: current_term,
295                    vote_granted: true,
296                };
297            }
298        }
299
300        VoteResponse {
301            term: current_term,
302            vote_granted: false,
303        }
304    }
305
306    /// Start an election: increment term and vote for self
307    ///
308    /// Returns the new term. Caller should then send VoteRequests to other nodes.
309    pub async fn start_election(&self) -> u64 {
310        let new_term = self.term.fetch_add(1, Ordering::SeqCst) + 1;
311        *self.role.write().await = MemberRole::Candidate;
312        *self.voted_for.write().await = Some(self.self_id);
313        *self.leader_id.write().await = None;
314        new_term
315    }
316
317    /// Declare self as leader after winning an election
318    ///
319    /// Should only be called after receiving a majority of votes.
320    pub async fn become_leader(&self, term: u64) {
321        let current_term = self.term.load(Ordering::SeqCst);
322        if term != current_term {
323            return; // Stale election — a new term started
324        }
325        *self.role.write().await = MemberRole::Leader;
326        *self.leader_id.write().await = Some(self.self_id);
327
328        // Update self in members list
329        if let Some(mut member) = self.members.get_mut(&self.self_id) {
330            member.role = MemberRole::Leader;
331        }
332    }
333
334    /// Accept a leader (after receiving an AppendEntries or heartbeat from a valid leader)
335    pub async fn accept_leader(&self, leader_id: u32, term: u64) {
336        let current_term = self.term.load(Ordering::SeqCst);
337        if term < current_term {
338            return; // Stale leader
339        }
340        if term > current_term {
341            self.term.store(term, Ordering::SeqCst);
342            *self.voted_for.write().await = None;
343        }
344        *self.role.write().await = MemberRole::Follower;
345        *self.leader_id.write().await = Some(leader_id);
346
347        // Update member roles
348        for mut member in self.members.iter_mut() {
349            member.role = if member.node_id == leader_id {
350                MemberRole::Leader
351            } else {
352                MemberRole::Follower
353            };
354        }
355    }
356
357    /// Select the best candidate for leader from healthy members
358    ///
359    /// Deterministic: highest WAL offset wins; on tie, lowest node ID wins.
360    pub fn select_leader_candidate(&self) -> Option<u32> {
361        let mut best: Option<(u32, u64)> = None; // (node_id, wal_offset)
362
363        for member in self.members.iter() {
364            if !member.healthy {
365                continue;
366            }
367            match best {
368                None => best = Some((member.node_id, member.last_wal_offset)),
369                Some((_, best_offset)) => {
370                    if member.last_wal_offset > best_offset
371                        || (member.last_wal_offset == best_offset
372                            && member.node_id < best.unwrap().0)
373                    {
374                        best = Some((member.node_id, member.last_wal_offset));
375                    }
376                }
377            }
378        }
379
380        best.map(|(id, _)| id)
381    }
382
383    // -------------------------------------------------------------------------
384    // Status
385    // -------------------------------------------------------------------------
386
387    /// Get full cluster status
388    pub async fn status(&self) -> ClusterStatus {
389        ClusterStatus {
390            term: self.term.load(Ordering::SeqCst),
391            leader_id: *self.leader_id.read().await,
392            self_id: self.self_id,
393            self_role: *self.role.read().await,
394            member_count: self.members.len(),
395            healthy_count: self.registry.healthy_node_count(),
396            partition_count: self
397                .registry
398                .partition_distribution()
399                .values()
400                .flat_map(|v| v.iter())
401                .count() as u32,
402            members: self.all_members(),
403        }
404    }
405}
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410
411    fn make_member(id: u32, role: MemberRole, offset: u64) -> ClusterMember {
412        ClusterMember {
413            node_id: id,
414            api_address: format!("node-{}:3900", id),
415            replication_address: format!("node-{}:3910", id),
416            role,
417            last_wal_offset: offset,
418            last_heartbeat_ms: 0,
419            healthy: true,
420        }
421    }
422
423    #[tokio::test]
424    async fn test_create_cluster_manager() {
425        let cm = ClusterManager::new(0, 32);
426        assert_eq!(cm.self_id(), 0);
427        assert_eq!(cm.current_term(), 0);
428        assert_eq!(cm.current_role().await, MemberRole::Follower);
429        assert_eq!(cm.leader_id().await, None);
430        assert_eq!(cm.member_count(), 0);
431    }
432
433    #[tokio::test]
434    async fn test_add_and_remove_members() {
435        let cm = ClusterManager::new(0, 32);
436
437        cm.add_member(make_member(0, MemberRole::Leader, 100)).await;
438        cm.add_member(make_member(1, MemberRole::Follower, 90))
439            .await;
440        cm.add_member(make_member(2, MemberRole::Follower, 80))
441            .await;
442
443        assert_eq!(cm.member_count(), 3);
444        assert_eq!(cm.healthy_members().len(), 3);
445
446        // Verify partition distribution
447        let dist = cm.registry().partition_distribution();
448        assert_eq!(dist.len(), 3);
449
450        // Remove a member
451        let removed = cm.remove_member(2).await;
452        assert!(removed.is_some());
453        assert_eq!(cm.member_count(), 2);
454    }
455
456    #[tokio::test]
457    async fn test_heartbeat_update() {
458        let cm = ClusterManager::new(0, 32);
459        cm.add_member(make_member(1, MemberRole::Follower, 50))
460            .await;
461
462        cm.update_member_heartbeat(1, 100, true);
463        let member = cm.get_member(1).unwrap();
464        assert_eq!(member.last_wal_offset, 100);
465        assert!(member.last_heartbeat_ms > 0);
466
467        // Mark unhealthy
468        cm.update_member_heartbeat(1, 100, false);
469        let member = cm.get_member(1).unwrap();
470        assert!(!member.healthy);
471    }
472
473    #[tokio::test]
474    async fn test_deterministic_leader_selection() {
475        let cm = ClusterManager::new(0, 32);
476
477        cm.add_member(make_member(0, MemberRole::Follower, 100))
478            .await;
479        cm.add_member(make_member(1, MemberRole::Follower, 200))
480            .await;
481        cm.add_member(make_member(2, MemberRole::Follower, 150))
482            .await;
483
484        // Node 1 has highest offset → should be selected
485        let candidate = cm.select_leader_candidate();
486        assert_eq!(candidate, Some(1));
487    }
488
489    #[tokio::test]
490    async fn test_deterministic_leader_selection_tiebreak() {
491        let cm = ClusterManager::new(0, 32);
492
493        cm.add_member(make_member(0, MemberRole::Follower, 100))
494            .await;
495        cm.add_member(make_member(1, MemberRole::Follower, 100))
496            .await;
497        cm.add_member(make_member(2, MemberRole::Follower, 100))
498            .await;
499
500        // Same offset → lowest node ID wins
501        let candidate = cm.select_leader_candidate();
502        assert_eq!(candidate, Some(0));
503    }
504
505    #[tokio::test]
506    async fn test_leader_selection_skips_unhealthy() {
507        let cm = ClusterManager::new(0, 32);
508
509        cm.add_member(make_member(0, MemberRole::Follower, 200))
510            .await;
511        cm.add_member(make_member(1, MemberRole::Follower, 100))
512            .await;
513
514        // Mark node 0 (highest offset) as unhealthy
515        cm.update_member_heartbeat(0, 200, false);
516
517        let candidate = cm.select_leader_candidate();
518        assert_eq!(candidate, Some(1));
519    }
520
521    #[tokio::test]
522    async fn test_vote_request_grants_vote() {
523        let cm = ClusterManager::new(0, 32);
524        cm.add_member(make_member(0, MemberRole::Follower, 50))
525            .await;
526
527        let request = VoteRequest {
528            term: 1,
529            candidate_id: 1,
530            last_wal_offset: 100,
531        };
532
533        let response = cm.handle_vote_request(&request).await;
534        assert!(response.vote_granted);
535        assert_eq!(response.term, 1);
536    }
537
538    #[tokio::test]
539    async fn test_vote_request_rejects_stale_term() {
540        let cm = ClusterManager::new(0, 32);
541        cm.add_member(make_member(0, MemberRole::Follower, 50))
542            .await;
543
544        // Advance term to 5
545        cm.term.store(5, Ordering::SeqCst);
546
547        let request = VoteRequest {
548            term: 3, // Stale
549            candidate_id: 1,
550            last_wal_offset: 100,
551        };
552
553        let response = cm.handle_vote_request(&request).await;
554        assert!(!response.vote_granted);
555        assert_eq!(response.term, 5);
556    }
557
558    #[tokio::test]
559    async fn test_vote_request_rejects_duplicate_vote() {
560        let cm = ClusterManager::new(0, 32);
561        cm.add_member(make_member(0, MemberRole::Follower, 50))
562            .await;
563
564        // Vote for candidate 1
565        let request1 = VoteRequest {
566            term: 1,
567            candidate_id: 1,
568            last_wal_offset: 100,
569        };
570        let response1 = cm.handle_vote_request(&request1).await;
571        assert!(response1.vote_granted);
572
573        // Candidate 2 asks for vote in same term → reject
574        let request2 = VoteRequest {
575            term: 1,
576            candidate_id: 2,
577            last_wal_offset: 100,
578        };
579        let response2 = cm.handle_vote_request(&request2).await;
580        assert!(!response2.vote_granted);
581    }
582
583    #[tokio::test]
584    async fn test_start_election() {
585        let cm = ClusterManager::new(0, 32);
586
587        let new_term = cm.start_election().await;
588        assert_eq!(new_term, 1);
589        assert_eq!(cm.current_term(), 1);
590        assert_eq!(cm.current_role().await, MemberRole::Candidate);
591    }
592
593    #[tokio::test]
594    async fn test_become_leader() {
595        let cm = ClusterManager::new(0, 32);
596        cm.add_member(make_member(0, MemberRole::Follower, 100))
597            .await;
598
599        let term = cm.start_election().await;
600        cm.become_leader(term).await;
601
602        assert_eq!(cm.current_role().await, MemberRole::Leader);
603        assert_eq!(cm.leader_id().await, Some(0));
604    }
605
606    #[tokio::test]
607    async fn test_accept_leader() {
608        let cm = ClusterManager::new(1, 32);
609        cm.add_member(make_member(0, MemberRole::Follower, 100))
610            .await;
611        cm.add_member(make_member(1, MemberRole::Follower, 90))
612            .await;
613
614        cm.accept_leader(0, 1).await;
615
616        assert_eq!(cm.current_role().await, MemberRole::Follower);
617        assert_eq!(cm.leader_id().await, Some(0));
618        assert_eq!(cm.current_term(), 1);
619
620        // Verify member roles updated
621        let leader = cm.get_member(0).unwrap();
622        assert_eq!(leader.role, MemberRole::Leader);
623    }
624
625    #[tokio::test]
626    async fn test_stale_become_leader_ignored() {
627        let cm = ClusterManager::new(0, 32);
628        cm.add_member(make_member(0, MemberRole::Follower, 100))
629            .await;
630
631        let _term = cm.start_election().await; // term=1
632
633        // Meanwhile, someone else increments term
634        cm.accept_leader(1, 2).await; // term=2
635
636        // Trying to become leader for term 1 should be ignored
637        cm.become_leader(1).await;
638        assert_eq!(cm.current_role().await, MemberRole::Follower);
639        assert_eq!(cm.leader_id().await, Some(1));
640    }
641
642    #[tokio::test]
643    async fn test_cluster_status() {
644        let cm = ClusterManager::new(0, 32);
645        cm.add_member(make_member(0, MemberRole::Leader, 100)).await;
646        cm.add_member(make_member(1, MemberRole::Follower, 90))
647            .await;
648
649        let status = cm.status().await;
650        assert_eq!(status.self_id, 0);
651        assert_eq!(status.member_count, 2);
652        assert_eq!(status.healthy_count, 2);
653        assert_eq!(status.members.len(), 2);
654    }
655
656    #[tokio::test]
657    async fn test_remove_leader_clears_leader_id() {
658        let cm = ClusterManager::new(1, 32);
659        cm.add_member(make_member(0, MemberRole::Leader, 100)).await;
660        cm.add_member(make_member(1, MemberRole::Follower, 90))
661            .await;
662
663        *cm.leader_id.write().await = Some(0);
664
665        cm.remove_member(0).await;
666        assert_eq!(cm.leader_id().await, None);
667    }
668}