Skip to main content

allsource_core/infrastructure/cluster/
consensus.rs

1/// Simplified Raft-inspired Term-Based Consensus
2///
3/// Provides term-based leader election and cluster membership management
4/// for multi-node AllSource Core deployments.
5///
6/// # Design
7///
8/// This is a simplified Raft implementation focused on:
9/// - **Term-based leader election**: Monotonically increasing terms prevent split-brain
10/// - **Deterministic leader selection**: Highest term wins; on tie, highest WAL offset wins
11/// - **Manual failover support**: Integrate with existing `/internal/promote` + `/internal/repoint`
12///
13/// NOT implemented (handled by existing infrastructure):
14/// - Log replication (WAL shipping handles this)
15/// - Snapshot transfer (Parquet catch-up handles this)
16///
17/// # Architecture
18///
19/// ```text
20/// ClusterManager
21///   ├── NodeRegistry (partition assignment, health tracking)
22///   ├── term: AtomicU64 (current consensus term)
23///   ├── voted_for: RwLock<Option<u32>> (who we voted for this term)
24///   └── members: DashMap<u32, ClusterMember> (full member metadata)
25/// ```
26use dashmap::DashMap;
27use serde::{Deserialize, Serialize};
28use std::sync::{
29    Arc,
30    atomic::{AtomicU64, Ordering},
31};
32use tokio::sync::RwLock;
33
34use super::node_registry::{Node, NodeRegistry};
35
36/// Full cluster member metadata (extends Node with consensus state)
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct ClusterMember {
39    /// Unique node ID
40    pub node_id: u32,
41    /// Network address for API traffic (host:port)
42    pub api_address: String,
43    /// Network address for replication traffic (host:port)
44    pub replication_address: String,
45    /// Current role
46    pub role: MemberRole,
47    /// Last known WAL offset (used for leader selection)
48    pub last_wal_offset: u64,
49    /// Last heartbeat timestamp (millis since epoch)
50    pub last_heartbeat_ms: u64,
51    /// Whether the node is healthy
52    pub healthy: bool,
53}
54
55/// Role of a cluster member
56#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
57#[serde(rename_all = "lowercase")]
58pub enum MemberRole {
59    Leader,
60    Follower,
61    Candidate,
62}
63
64/// Vote request for leader election
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct VoteRequest {
67    /// Term number for this election
68    pub term: u64,
69    /// Candidate requesting the vote
70    pub candidate_id: u32,
71    /// Candidate's last WAL offset (for log completeness check)
72    pub last_wal_offset: u64,
73}
74
75/// Vote response
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct VoteResponse {
78    /// Current term (may be higher than requested)
79    pub term: u64,
80    /// Whether the vote was granted
81    pub vote_granted: bool,
82}
83
84/// Cluster status summary
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct ClusterStatus {
87    /// Current consensus term
88    pub term: u64,
89    /// Current leader node ID (if known)
90    pub leader_id: Option<u32>,
91    /// This node's ID
92    pub self_id: u32,
93    /// This node's role
94    pub self_role: MemberRole,
95    /// Total members
96    pub member_count: usize,
97    /// Healthy members
98    pub healthy_count: usize,
99    /// Partition count
100    pub partition_count: u32,
101    /// All members
102    pub members: Vec<ClusterMember>,
103}
104
105/// Cluster Manager — coordinates consensus, membership, and partition assignment
106pub struct ClusterManager {
107    /// This node's ID
108    self_id: u32,
109    /// This node's role
110    role: RwLock<MemberRole>,
111    /// Current consensus term (monotonically increasing)
112    term: AtomicU64,
113    /// Who we voted for in the current term
114    voted_for: RwLock<Option<u32>>,
115    /// Current leader ID
116    leader_id: RwLock<Option<u32>>,
117    /// Cluster members (full metadata)
118    members: DashMap<u32, ClusterMember>,
119    /// Node registry for partition assignment
120    registry: Arc<NodeRegistry>,
121}
122
123impl ClusterManager {
124    /// Create a new cluster manager
125    ///
126    /// # Arguments
127    /// - `self_id`: This node's unique ID
128    /// - `partition_count`: Total partitions for the cluster
129    pub fn new(self_id: u32, partition_count: u32) -> Self {
130        Self {
131            self_id,
132            role: RwLock::new(MemberRole::Follower),
133            term: AtomicU64::new(0),
134            voted_for: RwLock::new(None),
135            leader_id: RwLock::new(None),
136            members: DashMap::new(),
137            registry: Arc::new(NodeRegistry::new(partition_count)),
138        }
139    }
140
141    /// Get the underlying node registry (for partition routing)
142    pub fn registry(&self) -> &Arc<NodeRegistry> {
143        &self.registry
144    }
145
146    /// Get this node's ID
147    pub fn self_id(&self) -> u32 {
148        self.self_id
149    }
150
151    /// Get the current term
152    pub fn current_term(&self) -> u64 {
153        self.term.load(Ordering::SeqCst)
154    }
155
156    /// Get this node's current role
157    pub async fn current_role(&self) -> MemberRole {
158        *self.role.read().await
159    }
160
161    /// Get the current leader ID
162    pub async fn leader_id(&self) -> Option<u32> {
163        *self.leader_id.read().await
164    }
165
166    // -------------------------------------------------------------------------
167    // Membership management
168    // -------------------------------------------------------------------------
169
170    /// Register a new member in the cluster
171    pub async fn add_member(&self, member: ClusterMember) {
172        let node_id = member.node_id;
173        let healthy = member.healthy;
174
175        // Register in partition registry
176        self.registry.register_node(Node {
177            id: node_id,
178            address: member.api_address.clone(),
179            healthy,
180            assigned_partitions: vec![],
181        });
182
183        // Store full member metadata
184        self.members.insert(node_id, member);
185
186        // If adding self as leader, update role
187        if node_id == self.self_id {
188            let mut role = self.role.write().await;
189            let member_ref = self.members.get(&node_id).unwrap();
190            *role = member_ref.role;
191        }
192    }
193
194    /// Remove a member from the cluster
195    pub async fn remove_member(&self, node_id: u32) -> Option<ClusterMember> {
196        self.registry.unregister_node(node_id);
197        let removed = self.members.remove(&node_id).map(|(_, m)| m);
198
199        // If we removed the leader, clear leader_id
200        let leader = *self.leader_id.read().await;
201        if leader == Some(node_id) {
202            *self.leader_id.write().await = None;
203        }
204
205        removed
206    }
207
208    /// Update a member's health status and WAL offset
209    pub fn update_member_heartbeat(&self, node_id: u32, wal_offset: u64, healthy: bool) {
210        if let Some(mut member) = self.members.get_mut(&node_id) {
211            member.last_wal_offset = wal_offset;
212            member.healthy = healthy;
213            member.last_heartbeat_ms = std::time::SystemTime::now()
214                .duration_since(std::time::UNIX_EPOCH)
215                .unwrap_or_default()
216                .as_millis() as u64;
217        }
218        self.registry.set_node_health(node_id, healthy);
219    }
220
221    /// Get a member by ID
222    pub fn get_member(&self, node_id: u32) -> Option<ClusterMember> {
223        self.members.get(&node_id).map(|m| m.clone())
224    }
225
226    /// Get all members
227    pub fn all_members(&self) -> Vec<ClusterMember> {
228        self.members.iter().map(|m| m.value().clone()).collect()
229    }
230
231    /// Get healthy members
232    pub fn healthy_members(&self) -> Vec<ClusterMember> {
233        self.members
234            .iter()
235            .filter(|m| m.value().healthy)
236            .map(|m| m.value().clone())
237            .collect()
238    }
239
240    /// Get the number of members
241    pub fn member_count(&self) -> usize {
242        self.members.len()
243    }
244
245    // -------------------------------------------------------------------------
246    // Term-based consensus
247    // -------------------------------------------------------------------------
248
249    /// Handle a vote request (simplified Raft RequestVote RPC)
250    ///
251    /// Grants vote if:
252    /// 1. Request term >= our term
253    /// 2. We haven't voted for someone else in this term
254    /// 3. Candidate's WAL offset >= our knowledge of the log
255    pub async fn handle_vote_request(&self, request: &VoteRequest) -> VoteResponse {
256        let current_term = self.term.load(Ordering::SeqCst);
257
258        // If candidate's term is stale, reject
259        if request.term < current_term {
260            return VoteResponse {
261                term: current_term,
262                vote_granted: false,
263            };
264        }
265
266        // If candidate's term is higher, step down and update term
267        if request.term > current_term {
268            self.term.store(request.term, Ordering::SeqCst);
269            *self.voted_for.write().await = None;
270            // Step down to follower if we were leader/candidate
271            *self.role.write().await = MemberRole::Follower;
272        }
273
274        let mut voted_for = self.voted_for.write().await;
275        let current_term = self.term.load(Ordering::SeqCst);
276
277        // Check if we can vote for this candidate
278        let can_vote = match *voted_for {
279            None => true,
280            Some(id) => id == request.candidate_id,
281        };
282
283        if can_vote {
284            // Check log completeness: candidate must be at least as up-to-date
285            let self_offset = self
286                .members
287                .get(&self.self_id)
288                .map_or(0, |m| m.last_wal_offset);
289
290            if request.last_wal_offset >= self_offset {
291                *voted_for = Some(request.candidate_id);
292                return VoteResponse {
293                    term: current_term,
294                    vote_granted: true,
295                };
296            }
297        }
298
299        VoteResponse {
300            term: current_term,
301            vote_granted: false,
302        }
303    }
304
305    /// Start an election: increment term and vote for self
306    ///
307    /// Returns the new term. Caller should then send VoteRequests to other nodes.
308    pub async fn start_election(&self) -> u64 {
309        let new_term = self.term.fetch_add(1, Ordering::SeqCst) + 1;
310        *self.role.write().await = MemberRole::Candidate;
311        *self.voted_for.write().await = Some(self.self_id);
312        *self.leader_id.write().await = None;
313        new_term
314    }
315
316    /// Declare self as leader after winning an election
317    ///
318    /// Should only be called after receiving a majority of votes.
319    pub async fn become_leader(&self, term: u64) {
320        let current_term = self.term.load(Ordering::SeqCst);
321        if term != current_term {
322            return; // Stale election — a new term started
323        }
324        *self.role.write().await = MemberRole::Leader;
325        *self.leader_id.write().await = Some(self.self_id);
326
327        // Update self in members list
328        if let Some(mut member) = self.members.get_mut(&self.self_id) {
329            member.role = MemberRole::Leader;
330        }
331    }
332
333    /// Accept a leader (after receiving an AppendEntries or heartbeat from a valid leader)
334    pub async fn accept_leader(&self, leader_id: u32, term: u64) {
335        let current_term = self.term.load(Ordering::SeqCst);
336        if term < current_term {
337            return; // Stale leader
338        }
339        if term > current_term {
340            self.term.store(term, Ordering::SeqCst);
341            *self.voted_for.write().await = None;
342        }
343        *self.role.write().await = MemberRole::Follower;
344        *self.leader_id.write().await = Some(leader_id);
345
346        // Update member roles
347        for mut member in self.members.iter_mut() {
348            member.role = if member.node_id == leader_id {
349                MemberRole::Leader
350            } else {
351                MemberRole::Follower
352            };
353        }
354    }
355
356    /// Select the best candidate for leader from healthy members
357    ///
358    /// Deterministic: highest WAL offset wins; on tie, lowest node ID wins.
359    pub fn select_leader_candidate(&self) -> Option<u32> {
360        let mut best: Option<(u32, u64)> = None; // (node_id, wal_offset)
361
362        for member in &self.members {
363            if !member.healthy {
364                continue;
365            }
366            match best {
367                None => best = Some((member.node_id, member.last_wal_offset)),
368                Some((_, best_offset)) => {
369                    if member.last_wal_offset > best_offset
370                        || (member.last_wal_offset == best_offset
371                            && member.node_id < best.unwrap().0)
372                    {
373                        best = Some((member.node_id, member.last_wal_offset));
374                    }
375                }
376            }
377        }
378
379        best.map(|(id, _)| id)
380    }
381
382    // -------------------------------------------------------------------------
383    // Status
384    // -------------------------------------------------------------------------
385
386    /// Get full cluster status
387    pub async fn status(&self) -> ClusterStatus {
388        ClusterStatus {
389            term: self.term.load(Ordering::SeqCst),
390            leader_id: *self.leader_id.read().await,
391            self_id: self.self_id,
392            self_role: *self.role.read().await,
393            member_count: self.members.len(),
394            healthy_count: self.registry.healthy_node_count(),
395            partition_count: self
396                .registry
397                .partition_distribution()
398                .values()
399                .flat_map(|v| v.iter())
400                .count() as u32,
401            members: self.all_members(),
402        }
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409
410    fn make_member(id: u32, role: MemberRole, offset: u64) -> ClusterMember {
411        ClusterMember {
412            node_id: id,
413            api_address: format!("node-{id}:3900"),
414            replication_address: format!("node-{id}:3910"),
415            role,
416            last_wal_offset: offset,
417            last_heartbeat_ms: 0,
418            healthy: true,
419        }
420    }
421
422    #[tokio::test]
423    async fn test_create_cluster_manager() {
424        let cm = ClusterManager::new(0, 32);
425        assert_eq!(cm.self_id(), 0);
426        assert_eq!(cm.current_term(), 0);
427        assert_eq!(cm.current_role().await, MemberRole::Follower);
428        assert_eq!(cm.leader_id().await, None);
429        assert_eq!(cm.member_count(), 0);
430    }
431
432    #[tokio::test]
433    async fn test_add_and_remove_members() {
434        let cm = ClusterManager::new(0, 32);
435
436        cm.add_member(make_member(0, MemberRole::Leader, 100)).await;
437        cm.add_member(make_member(1, MemberRole::Follower, 90))
438            .await;
439        cm.add_member(make_member(2, MemberRole::Follower, 80))
440            .await;
441
442        assert_eq!(cm.member_count(), 3);
443        assert_eq!(cm.healthy_members().len(), 3);
444
445        // Verify partition distribution
446        let dist = cm.registry().partition_distribution();
447        assert_eq!(dist.len(), 3);
448
449        // Remove a member
450        let removed = cm.remove_member(2).await;
451        assert!(removed.is_some());
452        assert_eq!(cm.member_count(), 2);
453    }
454
455    #[tokio::test]
456    async fn test_heartbeat_update() {
457        let cm = ClusterManager::new(0, 32);
458        cm.add_member(make_member(1, MemberRole::Follower, 50))
459            .await;
460
461        cm.update_member_heartbeat(1, 100, true);
462        let member = cm.get_member(1).unwrap();
463        assert_eq!(member.last_wal_offset, 100);
464        assert!(member.last_heartbeat_ms > 0);
465
466        // Mark unhealthy
467        cm.update_member_heartbeat(1, 100, false);
468        let member = cm.get_member(1).unwrap();
469        assert!(!member.healthy);
470    }
471
472    #[tokio::test]
473    async fn test_deterministic_leader_selection() {
474        let cm = ClusterManager::new(0, 32);
475
476        cm.add_member(make_member(0, MemberRole::Follower, 100))
477            .await;
478        cm.add_member(make_member(1, MemberRole::Follower, 200))
479            .await;
480        cm.add_member(make_member(2, MemberRole::Follower, 150))
481            .await;
482
483        // Node 1 has highest offset → should be selected
484        let candidate = cm.select_leader_candidate();
485        assert_eq!(candidate, Some(1));
486    }
487
488    #[tokio::test]
489    async fn test_deterministic_leader_selection_tiebreak() {
490        let cm = ClusterManager::new(0, 32);
491
492        cm.add_member(make_member(0, MemberRole::Follower, 100))
493            .await;
494        cm.add_member(make_member(1, MemberRole::Follower, 100))
495            .await;
496        cm.add_member(make_member(2, MemberRole::Follower, 100))
497            .await;
498
499        // Same offset → lowest node ID wins
500        let candidate = cm.select_leader_candidate();
501        assert_eq!(candidate, Some(0));
502    }
503
504    #[tokio::test]
505    async fn test_leader_selection_skips_unhealthy() {
506        let cm = ClusterManager::new(0, 32);
507
508        cm.add_member(make_member(0, MemberRole::Follower, 200))
509            .await;
510        cm.add_member(make_member(1, MemberRole::Follower, 100))
511            .await;
512
513        // Mark node 0 (highest offset) as unhealthy
514        cm.update_member_heartbeat(0, 200, false);
515
516        let candidate = cm.select_leader_candidate();
517        assert_eq!(candidate, Some(1));
518    }
519
520    #[tokio::test]
521    async fn test_vote_request_grants_vote() {
522        let cm = ClusterManager::new(0, 32);
523        cm.add_member(make_member(0, MemberRole::Follower, 50))
524            .await;
525
526        let request = VoteRequest {
527            term: 1,
528            candidate_id: 1,
529            last_wal_offset: 100,
530        };
531
532        let response = cm.handle_vote_request(&request).await;
533        assert!(response.vote_granted);
534        assert_eq!(response.term, 1);
535    }
536
537    #[tokio::test]
538    async fn test_vote_request_rejects_stale_term() {
539        let cm = ClusterManager::new(0, 32);
540        cm.add_member(make_member(0, MemberRole::Follower, 50))
541            .await;
542
543        // Advance term to 5
544        cm.term.store(5, Ordering::SeqCst);
545
546        let request = VoteRequest {
547            term: 3, // Stale
548            candidate_id: 1,
549            last_wal_offset: 100,
550        };
551
552        let response = cm.handle_vote_request(&request).await;
553        assert!(!response.vote_granted);
554        assert_eq!(response.term, 5);
555    }
556
557    #[tokio::test]
558    async fn test_vote_request_rejects_duplicate_vote() {
559        let cm = ClusterManager::new(0, 32);
560        cm.add_member(make_member(0, MemberRole::Follower, 50))
561            .await;
562
563        // Vote for candidate 1
564        let request1 = VoteRequest {
565            term: 1,
566            candidate_id: 1,
567            last_wal_offset: 100,
568        };
569        let response1 = cm.handle_vote_request(&request1).await;
570        assert!(response1.vote_granted);
571
572        // Candidate 2 asks for vote in same term → reject
573        let request2 = VoteRequest {
574            term: 1,
575            candidate_id: 2,
576            last_wal_offset: 100,
577        };
578        let response2 = cm.handle_vote_request(&request2).await;
579        assert!(!response2.vote_granted);
580    }
581
582    #[tokio::test]
583    async fn test_start_election() {
584        let cm = ClusterManager::new(0, 32);
585
586        let new_term = cm.start_election().await;
587        assert_eq!(new_term, 1);
588        assert_eq!(cm.current_term(), 1);
589        assert_eq!(cm.current_role().await, MemberRole::Candidate);
590    }
591
592    #[tokio::test]
593    async fn test_become_leader() {
594        let cm = ClusterManager::new(0, 32);
595        cm.add_member(make_member(0, MemberRole::Follower, 100))
596            .await;
597
598        let term = cm.start_election().await;
599        cm.become_leader(term).await;
600
601        assert_eq!(cm.current_role().await, MemberRole::Leader);
602        assert_eq!(cm.leader_id().await, Some(0));
603    }
604
605    #[tokio::test]
606    async fn test_accept_leader() {
607        let cm = ClusterManager::new(1, 32);
608        cm.add_member(make_member(0, MemberRole::Follower, 100))
609            .await;
610        cm.add_member(make_member(1, MemberRole::Follower, 90))
611            .await;
612
613        cm.accept_leader(0, 1).await;
614
615        assert_eq!(cm.current_role().await, MemberRole::Follower);
616        assert_eq!(cm.leader_id().await, Some(0));
617        assert_eq!(cm.current_term(), 1);
618
619        // Verify member roles updated
620        let leader = cm.get_member(0).unwrap();
621        assert_eq!(leader.role, MemberRole::Leader);
622    }
623
624    #[tokio::test]
625    async fn test_stale_become_leader_ignored() {
626        let cm = ClusterManager::new(0, 32);
627        cm.add_member(make_member(0, MemberRole::Follower, 100))
628            .await;
629
630        let _term = cm.start_election().await; // term=1
631
632        // Meanwhile, someone else increments term
633        cm.accept_leader(1, 2).await; // term=2
634
635        // Trying to become leader for term 1 should be ignored
636        cm.become_leader(1).await;
637        assert_eq!(cm.current_role().await, MemberRole::Follower);
638        assert_eq!(cm.leader_id().await, Some(1));
639    }
640
641    #[tokio::test]
642    async fn test_cluster_status() {
643        let cm = ClusterManager::new(0, 32);
644        cm.add_member(make_member(0, MemberRole::Leader, 100)).await;
645        cm.add_member(make_member(1, MemberRole::Follower, 90))
646            .await;
647
648        let status = cm.status().await;
649        assert_eq!(status.self_id, 0);
650        assert_eq!(status.member_count, 2);
651        assert_eq!(status.healthy_count, 2);
652        assert_eq!(status.members.len(), 2);
653    }
654
655    #[tokio::test]
656    async fn test_remove_leader_clears_leader_id() {
657        let cm = ClusterManager::new(1, 32);
658        cm.add_member(make_member(0, MemberRole::Leader, 100)).await;
659        cm.add_member(make_member(1, MemberRole::Follower, 90))
660            .await;
661
662        *cm.leader_id.write().await = Some(0);
663
664        cm.remove_member(0).await;
665        assert_eq!(cm.leader_id().await, None);
666    }
667}