1use crate::error::{ClusterError, Result};
8use dashmap::DashMap;
9use parking_lot::RwLock;
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::time::{Duration, Instant};
15use tokio::sync::Notify;
16use tracing::{debug, error, info, warn};
17use uuid::Uuid;
18
19#[derive(Clone)]
21pub struct ClusterCoordinator {
22 inner: Arc<CoordinatorInner>,
23}
24
25struct CoordinatorInner {
26 node_id: NodeId,
28
29 state: Arc<RwLock<ClusterState>>,
31
32 members: DashMap<NodeId, ClusterMember>,
34
35 config_store: Arc<RwLock<HashMap<String, Vec<u8>>>>,
37
38 leader_state: Arc<RwLock<LeaderState>>,
40
41 config: CoordinatorConfig,
43
44 running: AtomicBool,
46
47 health_notify: Arc<Notify>,
49
50 stats: Arc<CoordinatorStats>,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct CoordinatorConfig {
57 pub election_timeout: Duration,
59
60 pub heartbeat_interval: Duration,
62
63 pub health_check_interval: Duration,
65
66 pub member_timeout: Duration,
68
69 pub config_sync_interval: Duration,
71
72 pub min_cluster_size: usize,
74}
75
76impl Default for CoordinatorConfig {
77 fn default() -> Self {
78 Self {
79 election_timeout: Duration::from_secs(5),
80 heartbeat_interval: Duration::from_secs(1),
81 health_check_interval: Duration::from_secs(10),
82 member_timeout: Duration::from_secs(30),
83 config_sync_interval: Duration::from_secs(60),
84 min_cluster_size: 3,
85 }
86 }
87}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
91pub struct NodeId(pub Uuid);
92
93impl NodeId {
94 pub fn new() -> Self {
96 Self(Uuid::new_v4())
97 }
98}
99
100impl Default for NodeId {
101 fn default() -> Self {
102 Self::new()
103 }
104}
105
106impl std::fmt::Display for NodeId {
107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 write!(f, "{}", self.0)
109 }
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct ClusterState {
115 pub term: u64,
117
118 pub leader: Option<NodeId>,
120
121 pub role: NodeRole,
123
124 #[serde(skip)]
126 pub last_leader_heartbeat: Option<Instant>,
127
128 pub election_in_progress: bool,
130}
131
132#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
134pub enum NodeRole {
135 Follower,
137
138 Candidate,
140
141 Leader,
143}
144
145#[derive(Debug, Clone, Default)]
147pub struct LeaderState {
148 pub elected_at: Option<Instant>,
150
151 pub last_heartbeat_sent: Option<Instant>,
153
154 pub followers: HashMap<NodeId, FollowerState>,
156}
157
158#[derive(Debug, Clone)]
160pub struct FollowerState {
161 pub last_heartbeat: Instant,
163
164 pub acked_term: u64,
166
167 pub healthy: bool,
169}
170
171#[derive(Debug, Clone)]
173pub struct ClusterMember {
174 pub node_id: NodeId,
176
177 pub address: String,
179
180 pub role: NodeRole,
182
183 pub status: MemberStatus,
185
186 pub joined_at: Instant,
188
189 pub last_seen: Instant,
191
192 pub version: String,
194
195 pub metadata: HashMap<String, String>,
197}
198
199#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
201pub enum MemberStatus {
202 Active,
204
205 Suspected,
207
208 Down,
210
211 Left,
213}
214
215#[derive(Debug, Default)]
217struct CoordinatorStats {
218 elections: AtomicU64,
220
221 term_changes: AtomicU64,
223
224 leadership_changes: AtomicU64,
226
227 heartbeats_sent: AtomicU64,
229
230 config_syncs: AtomicU64,
232
233 health_checks: AtomicU64,
235}
236
237impl ClusterCoordinator {
238 pub fn new(config: CoordinatorConfig) -> Self {
240 let node_id = NodeId::new();
241
242 Self {
243 inner: Arc::new(CoordinatorInner {
244 node_id,
245 state: Arc::new(RwLock::new(ClusterState {
246 term: 0,
247 leader: None,
248 role: NodeRole::Follower,
249 last_leader_heartbeat: None,
250 election_in_progress: false,
251 })),
252 members: DashMap::new(),
253 config_store: Arc::new(RwLock::new(HashMap::new())),
254 leader_state: Arc::new(RwLock::new(LeaderState::default())),
255 config,
256 running: AtomicBool::new(false),
257 health_notify: Arc::new(Notify::new()),
258 stats: Arc::new(CoordinatorStats::default()),
259 }),
260 }
261 }
262
263 pub fn with_defaults() -> Self {
265 Self::new(CoordinatorConfig::default())
266 }
267
268 pub fn node_id(&self) -> NodeId {
270 self.inner.node_id
271 }
272
273 pub async fn start(&self) -> Result<()> {
275 if self.inner.running.swap(true, Ordering::SeqCst) {
276 return Err(ClusterError::InvalidState(
277 "Coordinator already running".to_string(),
278 ));
279 }
280
281 info!(
282 "Starting cluster coordinator (node: {})",
283 self.inner.node_id
284 );
285
286 let coord = self.clone();
288 tokio::spawn(async move {
289 coord.run_coordinator_loop().await;
290 });
291
292 let coord = self.clone();
293 tokio::spawn(async move {
294 coord.run_health_check_loop().await;
295 });
296
297 Ok(())
298 }
299
300 pub async fn stop(&self) -> Result<()> {
302 info!("Stopping cluster coordinator");
303 self.inner.running.store(false, Ordering::SeqCst);
304 self.inner.health_notify.notify_waiters();
305 Ok(())
306 }
307
308 async fn run_coordinator_loop(&self) {
310 let mut heartbeat_interval = tokio::time::interval(self.inner.config.heartbeat_interval);
311
312 while self.inner.running.load(Ordering::SeqCst) {
313 tokio::select! {
314 _ = heartbeat_interval.tick() => {
315 let state = self.inner.state.read().clone();
316
317 match state.role {
318 NodeRole::Leader => {
319 if let Err(e) = self.send_leader_heartbeats().await {
321 error!("Failed to send leader heartbeats: {}", e);
322 }
323 }
324 NodeRole::Follower => {
325 if self.should_start_election() {
327 if let Err(e) = self.start_election().await {
328 error!("Failed to start election: {}", e);
329 }
330 }
331 }
332 NodeRole::Candidate => {
333 }
335 }
336 }
337 }
338 }
339
340 info!("Coordinator loop stopped");
341 }
342
343 async fn run_health_check_loop(&self) {
345 let mut interval = tokio::time::interval(self.inner.config.health_check_interval);
346
347 while self.inner.running.load(Ordering::SeqCst) {
348 interval.tick().await;
349
350 if let Err(e) = self.check_member_health().await {
351 error!("Health check failed: {}", e);
352 }
353
354 self.inner
355 .stats
356 .health_checks
357 .fetch_add(1, Ordering::Relaxed);
358 }
359 }
360
361 fn should_start_election(&self) -> bool {
363 let state = self.inner.state.read();
364
365 if state.election_in_progress {
366 return false;
367 }
368
369 if let Some(last_heartbeat) = state.last_leader_heartbeat {
370 if last_heartbeat.elapsed() > self.inner.config.election_timeout {
371 return true;
372 }
373 } else {
374 return true;
376 }
377
378 false
379 }
380
381 async fn start_election(&self) -> Result<()> {
383 info!("Starting leader election");
384
385 let term = {
386 let mut state = self.inner.state.write();
387 state.term += 1;
388 state.role = NodeRole::Candidate;
389 state.election_in_progress = true;
390 state.term
391 }; self.inner.stats.elections.fetch_add(1, Ordering::Relaxed);
394
395 self.inner
396 .stats
397 .term_changes
398 .fetch_add(1, Ordering::Relaxed);
399
400 let votes = self.request_votes(term).await?;
402
403 let total_members = self.inner.members.len() + 1; let quorum = (total_members / 2) + 1;
406
407 if votes >= quorum {
408 self.become_leader(term)?;
409 } else {
410 let mut state = self.inner.state.write();
412 state.role = NodeRole::Follower;
413 state.election_in_progress = false;
414 }
415
416 Ok(())
417 }
418
419 async fn request_votes(&self, _term: u64) -> Result<usize> {
421 let active_members = self
425 .inner
426 .members
427 .iter()
428 .filter(|entry| entry.value().status == MemberStatus::Active)
429 .count();
430
431 Ok(active_members + 1) }
434
435 fn become_leader(&self, term: u64) -> Result<()> {
437 info!("Became cluster leader for term {}", term);
438
439 let mut state = self.inner.state.write();
440 state.role = NodeRole::Leader;
441 state.leader = Some(self.inner.node_id);
442 state.election_in_progress = false;
443 drop(state);
444
445 let mut leader_state = self.inner.leader_state.write();
446 leader_state.elected_at = Some(Instant::now());
447 leader_state.followers.clear();
448
449 for entry in self.inner.members.iter() {
451 leader_state.followers.insert(
452 *entry.key(),
453 FollowerState {
454 last_heartbeat: Instant::now(),
455 acked_term: term,
456 healthy: true,
457 },
458 );
459 }
460
461 drop(leader_state);
462
463 self.inner
464 .stats
465 .leadership_changes
466 .fetch_add(1, Ordering::Relaxed);
467
468 Ok(())
469 }
470
471 async fn send_leader_heartbeats(&self) -> Result<()> {
473 let state = self.inner.state.read().clone();
474
475 if state.role != NodeRole::Leader {
476 return Ok(());
477 }
478
479 let mut leader_state = self.inner.leader_state.write();
483 leader_state.last_heartbeat_sent = Some(Instant::now());
484
485 self.inner
486 .stats
487 .heartbeats_sent
488 .fetch_add(1, Ordering::Relaxed);
489
490 debug!("Sent leader heartbeats");
491
492 Ok(())
493 }
494
495 async fn check_member_health(&self) -> Result<()> {
497 let now = Instant::now();
498 let timeout = self.inner.config.member_timeout;
499
500 for mut entry in self.inner.members.iter_mut() {
501 let member = entry.value_mut();
502
503 let age = now.duration_since(member.last_seen);
504
505 if age > timeout {
506 if member.status == MemberStatus::Active {
507 member.status = MemberStatus::Suspected;
508 warn!("Member {} suspected down", member.node_id);
509 } else if member.status == MemberStatus::Suspected && age > timeout * 2 {
510 member.status = MemberStatus::Down;
511 warn!("Member {} confirmed down", member.node_id);
512 }
513 }
514 }
515
516 Ok(())
517 }
518
519 pub fn register_member(&self, member: ClusterMember) -> Result<()> {
521 info!("Registering member: {}", member.node_id);
522
523 self.inner.members.insert(member.node_id, member);
524
525 Ok(())
526 }
527
528 pub fn unregister_member(&self, node_id: NodeId) -> Result<()> {
530 info!("Unregistering member: {}", node_id);
531
532 if let Some((_, mut member)) = self.inner.members.remove(&node_id) {
533 member.status = MemberStatus::Left;
534 }
535
536 let mut leader_state = self.inner.leader_state.write();
538 leader_state.followers.remove(&node_id);
539
540 Ok(())
541 }
542
543 pub fn get_members(&self) -> Vec<ClusterMember> {
545 self.inner
546 .members
547 .iter()
548 .map(|e| e.value().clone())
549 .collect()
550 }
551
552 pub fn get_active_members(&self) -> Vec<ClusterMember> {
554 self.inner
555 .members
556 .iter()
557 .filter(|e| e.value().status == MemberStatus::Active)
558 .map(|e| e.value().clone())
559 .collect()
560 }
561
562 pub fn get_leader(&self) -> Option<NodeId> {
564 self.inner.state.read().leader
565 }
566
567 pub fn is_leader(&self) -> bool {
569 let state = self.inner.state.read();
570 state.role == NodeRole::Leader
571 }
572
573 pub fn set_config(&self, key: String, value: Vec<u8>) -> Result<()> {
575 let mut config = self.inner.config_store.write();
576 config.insert(key.clone(), value);
577
578 debug!("Stored config: {}", key);
579
580 self.inner
581 .stats
582 .config_syncs
583 .fetch_add(1, Ordering::Relaxed);
584
585 Ok(())
586 }
587
588 pub fn get_config(&self, key: &str) -> Option<Vec<u8>> {
590 self.inner.config_store.read().get(key).cloned()
591 }
592
593 pub fn get_statistics(&self) -> CoordinatorStatistics {
595 let state = self.inner.state.read();
596
597 CoordinatorStatistics {
598 node_id: self.inner.node_id,
599 role: state.role,
600 current_term: state.term,
601 current_leader: state.leader,
602 total_members: self.inner.members.len(),
603 active_members: self.get_active_members().len(),
604 elections: self.inner.stats.elections.load(Ordering::Relaxed),
605 term_changes: self.inner.stats.term_changes.load(Ordering::Relaxed),
606 leadership_changes: self.inner.stats.leadership_changes.load(Ordering::Relaxed),
607 heartbeats_sent: self.inner.stats.heartbeats_sent.load(Ordering::Relaxed),
608 config_syncs: self.inner.stats.config_syncs.load(Ordering::Relaxed),
609 health_checks: self.inner.stats.health_checks.load(Ordering::Relaxed),
610 }
611 }
612}
613
614#[derive(Debug, Clone, Serialize, Deserialize)]
616pub struct CoordinatorStatistics {
617 pub node_id: NodeId,
619
620 pub role: NodeRole,
622
623 pub current_term: u64,
625
626 pub current_leader: Option<NodeId>,
628
629 pub total_members: usize,
631
632 pub active_members: usize,
634
635 pub elections: u64,
637
638 pub term_changes: u64,
640
641 pub leadership_changes: u64,
643
644 pub heartbeats_sent: u64,
646
647 pub config_syncs: u64,
649
650 pub health_checks: u64,
652}
653
654#[cfg(test)]
655#[allow(clippy::expect_used, clippy::unwrap_used)]
656mod tests {
657 use super::*;
658
659 #[test]
660 fn test_coordinator_creation() {
661 let coord = ClusterCoordinator::with_defaults();
662 let node_id = coord.node_id();
663 assert_ne!(node_id.0, Uuid::nil());
664 }
665
666 #[test]
667 fn test_member_registration() {
668 let coord = ClusterCoordinator::with_defaults();
669
670 let member = ClusterMember {
671 node_id: NodeId::new(),
672 address: "localhost:8080".to_string(),
673 role: NodeRole::Follower,
674 status: MemberStatus::Active,
675 joined_at: Instant::now(),
676 last_seen: Instant::now(),
677 version: "1.0.0".to_string(),
678 metadata: HashMap::new(),
679 };
680
681 coord.register_member(member.clone()).ok();
682
683 let members = coord.get_members();
684 assert_eq!(members.len(), 1);
685 assert_eq!(members[0].node_id, member.node_id);
686 }
687
688 #[test]
689 fn test_config_storage() {
690 let coord = ClusterCoordinator::with_defaults();
691
692 coord.set_config("test_key".to_string(), vec![1, 2, 3]).ok();
693
694 let value = coord.get_config("test_key");
695 assert_eq!(value, Some(vec![1, 2, 3]));
696 }
697
698 #[tokio::test]
699 async fn test_coordinator_start_stop() {
700 let coord = ClusterCoordinator::with_defaults();
701
702 let start_result = coord.start().await;
703 assert!(start_result.is_ok());
704
705 let stop_result = coord.stop().await;
706 assert!(stop_result.is_ok());
707 }
708}