Skip to main content

oxigdal_cluster/
coordinator.rs

1//! Cluster coordinator with leader election and membership management.
2//!
3//! This module implements cluster coordination including Raft-based consensus,
4//! leader election, membership management, configuration distribution, and
5//! health check aggregation.
6
7use crate::error::{ClusterError, Result};
8use dashmap::DashMap;
9use parking_lot::RwLock;
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::time::{Duration, Instant};
15use tokio::sync::Notify;
16use tracing::{debug, error, info, warn};
17use uuid::Uuid;
18
19/// Cluster coordinator.
20#[derive(Clone)]
21pub struct ClusterCoordinator {
22    inner: Arc<CoordinatorInner>,
23}
24
25struct CoordinatorInner {
26    /// Node ID (this coordinator's ID)
27    node_id: NodeId,
28
29    /// Cluster state
30    state: Arc<RwLock<ClusterState>>,
31
32    /// Member registry
33    members: DashMap<NodeId, ClusterMember>,
34
35    /// Configuration store
36    config_store: Arc<RwLock<HashMap<String, Vec<u8>>>>,
37
38    /// Leader state
39    leader_state: Arc<RwLock<LeaderState>>,
40
41    /// Configuration
42    config: CoordinatorConfig,
43
44    /// Running flag
45    running: AtomicBool,
46
47    /// Health check notification
48    health_notify: Arc<Notify>,
49
50    /// Statistics
51    stats: Arc<CoordinatorStats>,
52}
53
54/// Coordinator configuration.
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct CoordinatorConfig {
57    /// Election timeout
58    pub election_timeout: Duration,
59
60    /// Heartbeat interval
61    pub heartbeat_interval: Duration,
62
63    /// Health check interval
64    pub health_check_interval: Duration,
65
66    /// Member timeout
67    pub member_timeout: Duration,
68
69    /// Configuration sync interval
70    pub config_sync_interval: Duration,
71
72    /// Minimum cluster size
73    pub min_cluster_size: usize,
74}
75
76impl Default for CoordinatorConfig {
77    fn default() -> Self {
78        Self {
79            election_timeout: Duration::from_secs(5),
80            heartbeat_interval: Duration::from_secs(1),
81            health_check_interval: Duration::from_secs(10),
82            member_timeout: Duration::from_secs(30),
83            config_sync_interval: Duration::from_secs(60),
84            min_cluster_size: 3,
85        }
86    }
87}
88
89/// Node identifier.
90#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
91pub struct NodeId(pub Uuid);
92
93impl NodeId {
94    /// Create a new random node ID.
95    pub fn new() -> Self {
96        Self(Uuid::new_v4())
97    }
98}
99
100impl Default for NodeId {
101    fn default() -> Self {
102        Self::new()
103    }
104}
105
106impl std::fmt::Display for NodeId {
107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108        write!(f, "{}", self.0)
109    }
110}
111
112/// Cluster state.
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct ClusterState {
115    /// Current term
116    pub term: u64,
117
118    /// Current leader
119    pub leader: Option<NodeId>,
120
121    /// Node role
122    pub role: NodeRole,
123
124    /// Last heartbeat from leader
125    #[serde(skip)]
126    pub last_leader_heartbeat: Option<Instant>,
127
128    /// Election in progress
129    pub election_in_progress: bool,
130}
131
132/// Node role in cluster.
133#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
134pub enum NodeRole {
135    /// Follower (default state)
136    Follower,
137
138    /// Candidate (during election)
139    Candidate,
140
141    /// Leader (elected)
142    Leader,
143}
144
145/// Leader state (only for leader node).
146#[derive(Debug, Clone, Default)]
147pub struct LeaderState {
148    /// Elected at
149    pub elected_at: Option<Instant>,
150
151    /// Last heartbeat sent
152    pub last_heartbeat_sent: Option<Instant>,
153
154    /// Follower state
155    pub followers: HashMap<NodeId, FollowerState>,
156}
157
158/// Follower state (tracked by leader).
159#[derive(Debug, Clone)]
160pub struct FollowerState {
161    /// Last heartbeat received
162    pub last_heartbeat: Instant,
163
164    /// Acknowledged term
165    pub acked_term: u64,
166
167    /// Health status
168    pub healthy: bool,
169}
170
171/// Cluster member information.
172#[derive(Debug, Clone)]
173pub struct ClusterMember {
174    /// Node ID
175    pub node_id: NodeId,
176
177    /// Address
178    pub address: String,
179
180    /// Role
181    pub role: NodeRole,
182
183    /// Status
184    pub status: MemberStatus,
185
186    /// Joined at
187    pub joined_at: Instant,
188
189    /// Last seen
190    pub last_seen: Instant,
191
192    /// Version
193    pub version: String,
194
195    /// Metadata
196    pub metadata: HashMap<String, String>,
197}
198
199/// Member status.
200#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
201pub enum MemberStatus {
202    /// Member is active
203    Active,
204
205    /// Member is suspected to be down
206    Suspected,
207
208    /// Member is confirmed down
209    Down,
210
211    /// Member left gracefully
212    Left,
213}
214
215/// Coordinator statistics.
216#[derive(Debug, Default)]
217struct CoordinatorStats {
218    /// Elections conducted
219    elections: AtomicU64,
220
221    /// Term changes
222    term_changes: AtomicU64,
223
224    /// Leadership changes
225    leadership_changes: AtomicU64,
226
227    /// Heartbeats sent
228    heartbeats_sent: AtomicU64,
229
230    /// Config syncs
231    config_syncs: AtomicU64,
232
233    /// Health checks
234    health_checks: AtomicU64,
235}
236
237impl ClusterCoordinator {
238    /// Create a new cluster coordinator.
239    pub fn new(config: CoordinatorConfig) -> Self {
240        let node_id = NodeId::new();
241
242        Self {
243            inner: Arc::new(CoordinatorInner {
244                node_id,
245                state: Arc::new(RwLock::new(ClusterState {
246                    term: 0,
247                    leader: None,
248                    role: NodeRole::Follower,
249                    last_leader_heartbeat: None,
250                    election_in_progress: false,
251                })),
252                members: DashMap::new(),
253                config_store: Arc::new(RwLock::new(HashMap::new())),
254                leader_state: Arc::new(RwLock::new(LeaderState::default())),
255                config,
256                running: AtomicBool::new(false),
257                health_notify: Arc::new(Notify::new()),
258                stats: Arc::new(CoordinatorStats::default()),
259            }),
260        }
261    }
262
263    /// Create with default configuration.
264    pub fn with_defaults() -> Self {
265        Self::new(CoordinatorConfig::default())
266    }
267
268    /// Get this node's ID.
269    pub fn node_id(&self) -> NodeId {
270        self.inner.node_id
271    }
272
273    /// Start the coordinator.
274    pub async fn start(&self) -> Result<()> {
275        if self.inner.running.swap(true, Ordering::SeqCst) {
276            return Err(ClusterError::InvalidState(
277                "Coordinator already running".to_string(),
278            ));
279        }
280
281        info!(
282            "Starting cluster coordinator (node: {})",
283            self.inner.node_id
284        );
285
286        // Spawn coordinator loops
287        let coord = self.clone();
288        tokio::spawn(async move {
289            coord.run_coordinator_loop().await;
290        });
291
292        let coord = self.clone();
293        tokio::spawn(async move {
294            coord.run_health_check_loop().await;
295        });
296
297        Ok(())
298    }
299
300    /// Stop the coordinator.
301    pub async fn stop(&self) -> Result<()> {
302        info!("Stopping cluster coordinator");
303        self.inner.running.store(false, Ordering::SeqCst);
304        self.inner.health_notify.notify_waiters();
305        Ok(())
306    }
307
308    /// Main coordinator loop.
309    async fn run_coordinator_loop(&self) {
310        let mut heartbeat_interval = tokio::time::interval(self.inner.config.heartbeat_interval);
311
312        while self.inner.running.load(Ordering::SeqCst) {
313            tokio::select! {
314                _ = heartbeat_interval.tick() => {
315                    let state = self.inner.state.read().clone();
316
317                    match state.role {
318                        NodeRole::Leader => {
319                            // Send heartbeats as leader
320                            if let Err(e) = self.send_leader_heartbeats().await {
321                                error!("Failed to send leader heartbeats: {}", e);
322                            }
323                        }
324                        NodeRole::Follower => {
325                            // Check for election timeout
326                            if self.should_start_election() {
327                                if let Err(e) = self.start_election().await {
328                                    error!("Failed to start election: {}", e);
329                                }
330                            }
331                        }
332                        NodeRole::Candidate => {
333                            // Election in progress, handled separately
334                        }
335                    }
336                }
337            }
338        }
339
340        info!("Coordinator loop stopped");
341    }
342
343    /// Health check loop.
344    async fn run_health_check_loop(&self) {
345        let mut interval = tokio::time::interval(self.inner.config.health_check_interval);
346
347        while self.inner.running.load(Ordering::SeqCst) {
348            interval.tick().await;
349
350            if let Err(e) = self.check_member_health().await {
351                error!("Health check failed: {}", e);
352            }
353
354            self.inner
355                .stats
356                .health_checks
357                .fetch_add(1, Ordering::Relaxed);
358        }
359    }
360
361    /// Check if election should be started.
362    fn should_start_election(&self) -> bool {
363        let state = self.inner.state.read();
364
365        if state.election_in_progress {
366            return false;
367        }
368
369        if let Some(last_heartbeat) = state.last_leader_heartbeat {
370            if last_heartbeat.elapsed() > self.inner.config.election_timeout {
371                return true;
372            }
373        } else {
374            // No leader heartbeat received, start election
375            return true;
376        }
377
378        false
379    }
380
381    /// Start leader election.
382    async fn start_election(&self) -> Result<()> {
383        info!("Starting leader election");
384
385        let term = {
386            let mut state = self.inner.state.write();
387            state.term += 1;
388            state.role = NodeRole::Candidate;
389            state.election_in_progress = true;
390            state.term
391        }; // Lock is dropped here
392
393        self.inner.stats.elections.fetch_add(1, Ordering::Relaxed);
394
395        self.inner
396            .stats
397            .term_changes
398            .fetch_add(1, Ordering::Relaxed);
399
400        // Request votes from other members
401        let votes = self.request_votes(term).await?;
402
403        // Check if we won the election
404        let total_members = self.inner.members.len() + 1; // +1 for self
405        let quorum = (total_members / 2) + 1;
406
407        if votes >= quorum {
408            self.become_leader(term)?;
409        } else {
410            // Lost election, become follower
411            let mut state = self.inner.state.write();
412            state.role = NodeRole::Follower;
413            state.election_in_progress = false;
414        }
415
416        Ok(())
417    }
418
419    /// Request votes from other members.
420    async fn request_votes(&self, _term: u64) -> Result<usize> {
421        // In a real implementation, this would send vote requests to other nodes
422        // For now, simulate by checking cluster size
423
424        let active_members = self
425            .inner
426            .members
427            .iter()
428            .filter(|entry| entry.value().status == MemberStatus::Active)
429            .count();
430
431        // Assume we get votes from active members (simplified)
432        Ok(active_members + 1) // +1 for self-vote
433    }
434
435    /// Become the cluster leader.
436    fn become_leader(&self, term: u64) -> Result<()> {
437        info!("Became cluster leader for term {}", term);
438
439        let mut state = self.inner.state.write();
440        state.role = NodeRole::Leader;
441        state.leader = Some(self.inner.node_id);
442        state.election_in_progress = false;
443        drop(state);
444
445        let mut leader_state = self.inner.leader_state.write();
446        leader_state.elected_at = Some(Instant::now());
447        leader_state.followers.clear();
448
449        // Initialize follower state for all members
450        for entry in self.inner.members.iter() {
451            leader_state.followers.insert(
452                *entry.key(),
453                FollowerState {
454                    last_heartbeat: Instant::now(),
455                    acked_term: term,
456                    healthy: true,
457                },
458            );
459        }
460
461        drop(leader_state);
462
463        self.inner
464            .stats
465            .leadership_changes
466            .fetch_add(1, Ordering::Relaxed);
467
468        Ok(())
469    }
470
471    /// Send heartbeats as leader.
472    async fn send_leader_heartbeats(&self) -> Result<()> {
473        let state = self.inner.state.read().clone();
474
475        if state.role != NodeRole::Leader {
476            return Ok(());
477        }
478
479        // In real implementation, send heartbeats to all followers
480        // For now, just update timestamp
481
482        let mut leader_state = self.inner.leader_state.write();
483        leader_state.last_heartbeat_sent = Some(Instant::now());
484
485        self.inner
486            .stats
487            .heartbeats_sent
488            .fetch_add(1, Ordering::Relaxed);
489
490        debug!("Sent leader heartbeats");
491
492        Ok(())
493    }
494
495    /// Check member health.
496    async fn check_member_health(&self) -> Result<()> {
497        let now = Instant::now();
498        let timeout = self.inner.config.member_timeout;
499
500        for mut entry in self.inner.members.iter_mut() {
501            let member = entry.value_mut();
502
503            let age = now.duration_since(member.last_seen);
504
505            if age > timeout {
506                if member.status == MemberStatus::Active {
507                    member.status = MemberStatus::Suspected;
508                    warn!("Member {} suspected down", member.node_id);
509                } else if member.status == MemberStatus::Suspected && age > timeout * 2 {
510                    member.status = MemberStatus::Down;
511                    warn!("Member {} confirmed down", member.node_id);
512                }
513            }
514        }
515
516        Ok(())
517    }
518
519    /// Register a new member.
520    pub fn register_member(&self, member: ClusterMember) -> Result<()> {
521        info!("Registering member: {}", member.node_id);
522
523        self.inner.members.insert(member.node_id, member);
524
525        Ok(())
526    }
527
528    /// Unregister a member.
529    pub fn unregister_member(&self, node_id: NodeId) -> Result<()> {
530        info!("Unregistering member: {}", node_id);
531
532        if let Some((_, mut member)) = self.inner.members.remove(&node_id) {
533            member.status = MemberStatus::Left;
534        }
535
536        // Remove from leader's follower list
537        let mut leader_state = self.inner.leader_state.write();
538        leader_state.followers.remove(&node_id);
539
540        Ok(())
541    }
542
543    /// Get all members.
544    pub fn get_members(&self) -> Vec<ClusterMember> {
545        self.inner
546            .members
547            .iter()
548            .map(|e| e.value().clone())
549            .collect()
550    }
551
552    /// Get active members.
553    pub fn get_active_members(&self) -> Vec<ClusterMember> {
554        self.inner
555            .members
556            .iter()
557            .filter(|e| e.value().status == MemberStatus::Active)
558            .map(|e| e.value().clone())
559            .collect()
560    }
561
562    /// Get current leader.
563    pub fn get_leader(&self) -> Option<NodeId> {
564        self.inner.state.read().leader
565    }
566
567    /// Check if this node is the leader.
568    pub fn is_leader(&self) -> bool {
569        let state = self.inner.state.read();
570        state.role == NodeRole::Leader
571    }
572
573    /// Store configuration value.
574    pub fn set_config(&self, key: String, value: Vec<u8>) -> Result<()> {
575        let mut config = self.inner.config_store.write();
576        config.insert(key.clone(), value);
577
578        debug!("Stored config: {}", key);
579
580        self.inner
581            .stats
582            .config_syncs
583            .fetch_add(1, Ordering::Relaxed);
584
585        Ok(())
586    }
587
588    /// Get configuration value.
589    pub fn get_config(&self, key: &str) -> Option<Vec<u8>> {
590        self.inner.config_store.read().get(key).cloned()
591    }
592
593    /// Get cluster statistics.
594    pub fn get_statistics(&self) -> CoordinatorStatistics {
595        let state = self.inner.state.read();
596
597        CoordinatorStatistics {
598            node_id: self.inner.node_id,
599            role: state.role,
600            current_term: state.term,
601            current_leader: state.leader,
602            total_members: self.inner.members.len(),
603            active_members: self.get_active_members().len(),
604            elections: self.inner.stats.elections.load(Ordering::Relaxed),
605            term_changes: self.inner.stats.term_changes.load(Ordering::Relaxed),
606            leadership_changes: self.inner.stats.leadership_changes.load(Ordering::Relaxed),
607            heartbeats_sent: self.inner.stats.heartbeats_sent.load(Ordering::Relaxed),
608            config_syncs: self.inner.stats.config_syncs.load(Ordering::Relaxed),
609            health_checks: self.inner.stats.health_checks.load(Ordering::Relaxed),
610        }
611    }
612}
613
614/// Coordinator statistics.
615#[derive(Debug, Clone, Serialize, Deserialize)]
616pub struct CoordinatorStatistics {
617    /// This node's ID
618    pub node_id: NodeId,
619
620    /// Current role
621    pub role: NodeRole,
622
623    /// Current term
624    pub current_term: u64,
625
626    /// Current leader
627    pub current_leader: Option<NodeId>,
628
629    /// Total members
630    pub total_members: usize,
631
632    /// Active members
633    pub active_members: usize,
634
635    /// Elections conducted
636    pub elections: u64,
637
638    /// Term changes
639    pub term_changes: u64,
640
641    /// Leadership changes
642    pub leadership_changes: u64,
643
644    /// Heartbeats sent
645    pub heartbeats_sent: u64,
646
647    /// Config syncs
648    pub config_syncs: u64,
649
650    /// Health checks
651    pub health_checks: u64,
652}
653
654#[cfg(test)]
655#[allow(clippy::expect_used, clippy::unwrap_used)]
656mod tests {
657    use super::*;
658
659    #[test]
660    fn test_coordinator_creation() {
661        let coord = ClusterCoordinator::with_defaults();
662        let node_id = coord.node_id();
663        assert_ne!(node_id.0, Uuid::nil());
664    }
665
666    #[test]
667    fn test_member_registration() {
668        let coord = ClusterCoordinator::with_defaults();
669
670        let member = ClusterMember {
671            node_id: NodeId::new(),
672            address: "localhost:8080".to_string(),
673            role: NodeRole::Follower,
674            status: MemberStatus::Active,
675            joined_at: Instant::now(),
676            last_seen: Instant::now(),
677            version: "1.0.0".to_string(),
678            metadata: HashMap::new(),
679        };
680
681        coord.register_member(member.clone()).ok();
682
683        let members = coord.get_members();
684        assert_eq!(members.len(), 1);
685        assert_eq!(members[0].node_id, member.node_id);
686    }
687
688    #[test]
689    fn test_config_storage() {
690        let coord = ClusterCoordinator::with_defaults();
691
692        coord.set_config("test_key".to_string(), vec![1, 2, 3]).ok();
693
694        let value = coord.get_config("test_key");
695        assert_eq!(value, Some(vec![1, 2, 3]));
696    }
697
698    #[tokio::test]
699    async fn test_coordinator_start_stop() {
700        let coord = ClusterCoordinator::with_defaults();
701
702        let start_result = coord.start().await;
703        assert!(start_result.is_ok());
704
705        let stop_result = coord.stop().await;
706        assert!(stop_result.is_ok());
707    }
708}