1use crate::node::{NodeHealth, NodeId, NodeInfo, NodeRole, NodeStatus};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::RwLock;
12use std::time::Duration;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct ClusterConfig {
21 pub cluster_id: String,
22 pub min_nodes: usize,
23 pub max_nodes: usize,
24 pub heartbeat_interval: Duration,
25 pub failure_timeout: Duration,
26 pub replication_factor: usize,
27 pub quorum_size: usize,
28}
29
30impl Default for ClusterConfig {
31 fn default() -> Self {
32 Self {
33 cluster_id: "aegis-cluster".to_string(),
34 min_nodes: 1,
35 max_nodes: 100,
36 heartbeat_interval: Duration::from_secs(1),
37 failure_timeout: Duration::from_secs(5),
38 replication_factor: 3,
39 quorum_size: 2,
40 }
41 }
42}
43
44impl ClusterConfig {
45 pub fn new(cluster_id: impl Into<String>) -> Self {
46 Self {
47 cluster_id: cluster_id.into(),
48 ..Default::default()
49 }
50 }
51
52 pub fn with_replication_factor(mut self, factor: usize) -> Self {
53 self.replication_factor = factor;
54 self.quorum_size = (factor / 2) + 1;
55 self
56 }
57
58 pub fn with_heartbeat_interval(mut self, interval: Duration) -> Self {
59 self.heartbeat_interval = interval;
60 self
61 }
62
63 pub fn with_failure_timeout(mut self, timeout: Duration) -> Self {
64 self.failure_timeout = timeout;
65 self
66 }
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
75pub enum ClusterState {
76 Initializing,
78 Forming,
80 Healthy,
82 Degraded,
84 NoQuorum,
86 ShuttingDown,
88}
89
90impl Default for ClusterState {
91 fn default() -> Self {
92 Self::Initializing
93 }
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
102pub enum MembershipChange {
103 AddNode(NodeInfo),
104 RemoveNode(NodeId),
105 UpdateNode(NodeInfo),
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct MembershipChangeRequest {
111 pub change_id: String,
112 pub change: MembershipChange,
113 pub requested_at: u64,
114}
115
116pub struct Cluster {
122 config: ClusterConfig,
123 state: RwLock<ClusterState>,
124 nodes: RwLock<HashMap<NodeId, NodeInfo>>,
125 leader_id: RwLock<Option<NodeId>>,
126 local_node_id: NodeId,
127 health_checks: RwLock<HashMap<NodeId, NodeHealth>>,
128}
129
130impl Cluster {
131 pub fn new(local_node: NodeInfo, config: ClusterConfig) -> Self {
133 let local_id = local_node.id.clone();
134 let mut nodes = HashMap::new();
135 nodes.insert(local_id.clone(), local_node);
136
137 Self {
138 config,
139 state: RwLock::new(ClusterState::Initializing),
140 nodes: RwLock::new(nodes),
141 leader_id: RwLock::new(None),
142 local_node_id: local_id,
143 health_checks: RwLock::new(HashMap::new()),
144 }
145 }
146
147 pub fn id(&self) -> &str {
149 &self.config.cluster_id
150 }
151
152 pub fn config(&self) -> &ClusterConfig {
154 &self.config
155 }
156
157 pub fn state(&self) -> ClusterState {
159 *self.state.read().unwrap()
160 }
161
162 pub fn set_state(&self, state: ClusterState) {
164 *self.state.write().unwrap() = state;
165 }
166
167 pub fn local_node_id(&self) -> &NodeId {
169 &self.local_node_id
170 }
171
172 pub fn leader_id(&self) -> Option<NodeId> {
174 self.leader_id.read().unwrap().clone()
175 }
176
177 pub fn set_leader(&self, leader_id: Option<NodeId>) {
179 *self.leader_id.write().unwrap() = leader_id;
180 }
181
182 pub fn is_leader(&self) -> bool {
184 self.leader_id()
185 .map(|id| id == self.local_node_id)
186 .unwrap_or(false)
187 }
188
189 pub fn add_node(&self, node: NodeInfo) -> Result<(), ClusterError> {
195 let mut nodes = self.nodes.write().unwrap();
196
197 if nodes.len() >= self.config.max_nodes {
198 return Err(ClusterError::MaxNodesReached);
199 }
200
201 if nodes.contains_key(&node.id) {
202 return Err(ClusterError::NodeAlreadyExists(node.id.clone()));
203 }
204
205 nodes.insert(node.id.clone(), node);
206 drop(nodes);
207
208 self.update_cluster_state();
209 Ok(())
210 }
211
212 pub fn remove_node(&self, node_id: &NodeId) -> Result<NodeInfo, ClusterError> {
214 if node_id == &self.local_node_id {
215 return Err(ClusterError::CannotRemoveLocalNode);
216 }
217
218 let mut nodes = self.nodes.write().unwrap();
219 let node = nodes
220 .remove(node_id)
221 .ok_or_else(|| ClusterError::NodeNotFound(node_id.clone()))?;
222
223 drop(nodes);
224 self.update_cluster_state();
225 Ok(node)
226 }
227
228 pub fn get_node(&self, node_id: &NodeId) -> Option<NodeInfo> {
230 self.nodes.read().unwrap().get(node_id).cloned()
231 }
232
233 pub fn nodes(&self) -> Vec<NodeInfo> {
235 self.nodes.read().unwrap().values().cloned().collect()
236 }
237
238 pub fn node_ids(&self) -> Vec<NodeId> {
240 self.nodes.read().unwrap().keys().cloned().collect()
241 }
242
243 pub fn node_count(&self) -> usize {
245 self.nodes.read().unwrap().len()
246 }
247
248 pub fn peers(&self) -> Vec<NodeInfo> {
250 self.nodes
251 .read()
252 .unwrap()
253 .values()
254 .filter(|n| n.id != self.local_node_id)
255 .cloned()
256 .collect()
257 }
258
259 pub fn peer_ids(&self) -> Vec<NodeId> {
261 self.nodes
262 .read()
263 .unwrap()
264 .keys()
265 .filter(|id| *id != &self.local_node_id)
266 .cloned()
267 .collect()
268 }
269
270 pub fn update_health(&self, health: NodeHealth) {
276 let node_id = health.node_id.clone();
277 let mut checks = self.health_checks.write().unwrap();
278 checks.insert(node_id.clone(), health.clone());
279 drop(checks);
280
281 let mut nodes = self.nodes.write().unwrap();
282 if let Some(node) = nodes.get_mut(&node_id) {
283 if health.healthy {
284 node.mark_healthy();
285 } else {
286 node.mark_suspect();
287 }
288 }
289 drop(nodes);
290
291 self.update_cluster_state();
292 }
293
294 pub fn get_health(&self, node_id: &NodeId) -> Option<NodeHealth> {
296 self.health_checks.read().unwrap().get(node_id).cloned()
297 }
298
299 pub fn heartbeat(&self, node_id: &NodeId) {
301 let mut nodes = self.nodes.write().unwrap();
302 if let Some(node) = nodes.get_mut(node_id) {
303 node.heartbeat();
304 }
305 }
306
307 pub fn check_failures(&self) -> Vec<NodeId> {
309 let timeout_ms = self.config.failure_timeout.as_millis() as u64;
310 let mut failed = Vec::new();
311
312 let mut nodes = self.nodes.write().unwrap();
313 for node in nodes.values_mut() {
314 if node.id == self.local_node_id {
315 continue;
316 }
317
318 if node.heartbeat_age() > timeout_ms {
319 if node.status == NodeStatus::Healthy {
320 node.mark_suspect();
321 } else if node.status == NodeStatus::Suspect {
322 node.mark_down();
323 failed.push(node.id.clone());
324 }
325 }
326 }
327
328 drop(nodes);
329
330 if !failed.is_empty() {
331 self.update_cluster_state();
332 }
333
334 failed
335 }
336
337 fn update_cluster_state(&self) {
343 let nodes = self.nodes.read().unwrap();
344 let total = nodes.len();
345 let healthy = nodes
346 .values()
347 .filter(|n| n.status == NodeStatus::Healthy || n.status == NodeStatus::Starting)
348 .count();
349
350 drop(nodes);
351
352 let new_state = if total < self.config.min_nodes {
353 ClusterState::Forming
354 } else if healthy >= self.config.quorum_size {
355 if healthy == total {
356 ClusterState::Healthy
357 } else {
358 ClusterState::Degraded
359 }
360 } else {
361 ClusterState::NoQuorum
362 };
363
364 self.set_state(new_state);
365 }
366
367 pub fn has_quorum(&self) -> bool {
369 let nodes = self.nodes.read().unwrap();
370 let healthy = nodes
371 .values()
372 .filter(|n| n.status == NodeStatus::Healthy)
373 .count();
374 healthy >= self.config.quorum_size
375 }
376
377 pub fn stats(&self) -> ClusterStats {
379 let nodes = self.nodes.read().unwrap();
380 let total = nodes.len();
381 let healthy = nodes
382 .values()
383 .filter(|n| n.status == NodeStatus::Healthy)
384 .count();
385 let suspect = nodes
386 .values()
387 .filter(|n| n.status == NodeStatus::Suspect)
388 .count();
389 let down = nodes
390 .values()
391 .filter(|n| n.status == NodeStatus::Down)
392 .count();
393
394 ClusterStats {
395 cluster_id: self.config.cluster_id.clone(),
396 state: self.state(),
397 total_nodes: total,
398 healthy_nodes: healthy,
399 suspect_nodes: suspect,
400 down_nodes: down,
401 has_leader: self.leader_id().is_some(),
402 has_quorum: healthy >= self.config.quorum_size,
403 }
404 }
405
406 pub fn voting_members(&self) -> Vec<NodeId> {
412 self.nodes
413 .read()
414 .unwrap()
415 .values()
416 .filter(|n| n.is_available())
417 .map(|n| n.id.clone())
418 .collect()
419 }
420
421 pub fn set_node_role(&self, node_id: &NodeId, role: NodeRole) {
423 let mut nodes = self.nodes.write().unwrap();
424 if let Some(node) = nodes.get_mut(node_id) {
425 node.role = role;
426 }
427 }
428
429 pub fn leader(&self) -> Option<NodeInfo> {
431 let leader_id = self.leader_id()?;
432 self.get_node(&leader_id)
433 }
434}
435
436#[derive(Debug, Clone, Serialize, Deserialize)]
442pub struct ClusterStats {
443 pub cluster_id: String,
444 pub state: ClusterState,
445 pub total_nodes: usize,
446 pub healthy_nodes: usize,
447 pub suspect_nodes: usize,
448 pub down_nodes: usize,
449 pub has_leader: bool,
450 pub has_quorum: bool,
451}
452
453#[derive(Debug, Clone)]
459pub enum ClusterError {
460 NodeNotFound(NodeId),
461 NodeAlreadyExists(NodeId),
462 MaxNodesReached,
463 CannotRemoveLocalNode,
464 NoQuorum,
465 NotLeader,
466 ConfigurationError(String),
467}
468
469impl std::fmt::Display for ClusterError {
470 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
471 match self {
472 Self::NodeNotFound(id) => write!(f, "Node not found: {}", id),
473 Self::NodeAlreadyExists(id) => write!(f, "Node already exists: {}", id),
474 Self::MaxNodesReached => write!(f, "Maximum number of nodes reached"),
475 Self::CannotRemoveLocalNode => write!(f, "Cannot remove local node"),
476 Self::NoQuorum => write!(f, "Cluster has no quorum"),
477 Self::NotLeader => write!(f, "Not the leader"),
478 Self::ConfigurationError(msg) => write!(f, "Configuration error: {}", msg),
479 }
480 }
481}
482
483impl std::error::Error for ClusterError {}
484
485#[cfg(test)]
490mod tests {
491 use super::*;
492
493 fn create_node(id: &str) -> NodeInfo {
494 NodeInfo::new(id, "127.0.0.1", 5000)
495 }
496
497 #[test]
498 fn test_cluster_config() {
499 let config = ClusterConfig::new("test-cluster")
500 .with_replication_factor(3)
501 .with_heartbeat_interval(Duration::from_secs(2));
502
503 assert_eq!(config.cluster_id, "test-cluster");
504 assert_eq!(config.replication_factor, 3);
505 assert_eq!(config.quorum_size, 2);
506 }
507
508 #[test]
509 fn test_cluster_creation() {
510 let node = create_node("node1");
511 let cluster = Cluster::new(node, ClusterConfig::default());
512
513 assert_eq!(cluster.node_count(), 1);
514 assert_eq!(cluster.state(), ClusterState::Initializing);
515 assert!(cluster.leader_id().is_none());
516 }
517
518 #[test]
519 fn test_add_remove_node() {
520 let node1 = create_node("node1");
521 let cluster = Cluster::new(node1, ClusterConfig::default());
522
523 let node2 = create_node("node2");
524 cluster.add_node(node2).unwrap();
525 assert_eq!(cluster.node_count(), 2);
526
527 cluster.remove_node(&NodeId::new("node2")).unwrap();
528 assert_eq!(cluster.node_count(), 1);
529 }
530
531 #[test]
532 fn test_cannot_remove_local_node() {
533 let node = create_node("node1");
534 let cluster = Cluster::new(node, ClusterConfig::default());
535
536 let result = cluster.remove_node(&NodeId::new("node1"));
537 assert!(matches!(result, Err(ClusterError::CannotRemoveLocalNode)));
538 }
539
540 #[test]
541 fn test_peers() {
542 let node1 = create_node("node1");
543 let cluster = Cluster::new(node1, ClusterConfig::default());
544
545 cluster.add_node(create_node("node2")).unwrap();
546 cluster.add_node(create_node("node3")).unwrap();
547
548 let peers = cluster.peers();
549 assert_eq!(peers.len(), 2);
550
551 let peer_ids = cluster.peer_ids();
552 assert!(peer_ids.contains(&NodeId::new("node2")));
553 assert!(peer_ids.contains(&NodeId::new("node3")));
554 assert!(!peer_ids.contains(&NodeId::new("node1")));
555 }
556
557 #[test]
558 fn test_cluster_stats() {
559 let mut node1 = create_node("node1");
560 node1.mark_healthy();
561
562 let config = ClusterConfig::default().with_replication_factor(3);
563 let cluster = Cluster::new(node1, config);
564
565 let mut node2 = create_node("node2");
566 node2.mark_healthy();
567 cluster.add_node(node2).unwrap();
568
569 let stats = cluster.stats();
570 assert_eq!(stats.total_nodes, 2);
571 assert_eq!(stats.healthy_nodes, 2);
572 assert!(stats.has_quorum);
573 }
574
575 #[test]
576 fn test_heartbeat() {
577 let node1 = create_node("node1");
578 let cluster = Cluster::new(node1, ClusterConfig::default());
579
580 let mut node2 = create_node("node2");
581 node2.status = NodeStatus::Suspect;
582 cluster.add_node(node2).unwrap();
583
584 cluster.heartbeat(&NodeId::new("node2"));
585
586 let node = cluster.get_node(&NodeId::new("node2")).unwrap();
587 assert_eq!(node.status, NodeStatus::Healthy);
588 }
589
590 #[test]
591 fn test_leader_management() {
592 let node1 = create_node("node1");
593 let cluster = Cluster::new(node1, ClusterConfig::default());
594
595 assert!(!cluster.is_leader());
596
597 cluster.set_leader(Some(NodeId::new("node1")));
598 assert!(cluster.is_leader());
599
600 cluster.set_leader(Some(NodeId::new("node2")));
601 assert!(!cluster.is_leader());
602 }
603
604 #[test]
605 fn test_max_nodes() {
606 let node1 = create_node("node1");
607 let config = ClusterConfig {
608 max_nodes: 2,
609 ..Default::default()
610 };
611 let cluster = Cluster::new(node1, config);
612
613 cluster.add_node(create_node("node2")).unwrap();
614 let result = cluster.add_node(create_node("node3"));
615 assert!(matches!(result, Err(ClusterError::MaxNodesReached)));
616 }
617}