1use crate::node::{NodeHealth, NodeId, NodeInfo, NodeRole, NodeStatus};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::RwLock;
12use std::time::Duration;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct ClusterConfig {
21 pub cluster_id: String,
22 pub min_nodes: usize,
23 pub max_nodes: usize,
24 pub heartbeat_interval: Duration,
25 pub failure_timeout: Duration,
26 pub replication_factor: usize,
27 pub quorum_size: usize,
28}
29
30impl Default for ClusterConfig {
31 fn default() -> Self {
32 Self {
33 cluster_id: "aegis-cluster".to_string(),
34 min_nodes: 1,
35 max_nodes: 100,
36 heartbeat_interval: Duration::from_secs(1),
37 failure_timeout: Duration::from_secs(5),
38 replication_factor: 3,
39 quorum_size: 2,
40 }
41 }
42}
43
44impl ClusterConfig {
45 pub fn new(cluster_id: impl Into<String>) -> Self {
46 Self {
47 cluster_id: cluster_id.into(),
48 ..Default::default()
49 }
50 }
51
52 pub fn with_replication_factor(mut self, factor: usize) -> Self {
53 self.replication_factor = factor;
54 self.quorum_size = (factor / 2) + 1;
55 self
56 }
57
58 pub fn with_heartbeat_interval(mut self, interval: Duration) -> Self {
59 self.heartbeat_interval = interval;
60 self
61 }
62
63 pub fn with_failure_timeout(mut self, timeout: Duration) -> Self {
64 self.failure_timeout = timeout;
65 self
66 }
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
75#[derive(Default)]
76pub enum ClusterState {
77 #[default]
79 Initializing,
80 Forming,
82 Healthy,
84 Degraded,
86 NoQuorum,
88 ShuttingDown,
90}
91
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
99pub enum MembershipChange {
100 AddNode(NodeInfo),
101 RemoveNode(NodeId),
102 UpdateNode(NodeInfo),
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct MembershipChangeRequest {
108 pub change_id: String,
109 pub change: MembershipChange,
110 pub requested_at: u64,
111}
112
113pub struct Cluster {
119 config: ClusterConfig,
120 state: RwLock<ClusterState>,
121 nodes: RwLock<HashMap<NodeId, NodeInfo>>,
122 leader_id: RwLock<Option<NodeId>>,
123 local_node_id: NodeId,
124 health_checks: RwLock<HashMap<NodeId, NodeHealth>>,
125}
126
127impl Cluster {
128 pub fn new(local_node: NodeInfo, config: ClusterConfig) -> Self {
130 let local_id = local_node.id.clone();
131 let mut nodes = HashMap::new();
132 nodes.insert(local_id.clone(), local_node);
133
134 Self {
135 config,
136 state: RwLock::new(ClusterState::Initializing),
137 nodes: RwLock::new(nodes),
138 leader_id: RwLock::new(None),
139 local_node_id: local_id,
140 health_checks: RwLock::new(HashMap::new()),
141 }
142 }
143
144 pub fn id(&self) -> &str {
146 &self.config.cluster_id
147 }
148
149 pub fn config(&self) -> &ClusterConfig {
151 &self.config
152 }
153
154 pub fn state(&self) -> ClusterState {
156 *self.state.read().expect("cluster state lock poisoned")
157 }
158
159 pub fn set_state(&self, state: ClusterState) {
161 *self.state.write().expect("cluster state lock poisoned") = state;
162 }
163
164 pub fn local_node_id(&self) -> &NodeId {
166 &self.local_node_id
167 }
168
169 pub fn leader_id(&self) -> Option<NodeId> {
171 self.leader_id.read().expect("cluster leader_id lock poisoned").clone()
172 }
173
174 pub fn set_leader(&self, leader_id: Option<NodeId>) {
176 *self.leader_id.write().expect("cluster leader_id lock poisoned") = leader_id;
177 }
178
179 pub fn is_leader(&self) -> bool {
181 self.leader_id()
182 .map(|id| id == self.local_node_id)
183 .unwrap_or(false)
184 }
185
186 pub fn add_node(&self, node: NodeInfo) -> Result<(), ClusterError> {
192 let mut nodes = self.nodes.write().expect("cluster nodes lock poisoned");
193
194 if nodes.len() >= self.config.max_nodes {
195 return Err(ClusterError::MaxNodesReached);
196 }
197
198 if nodes.contains_key(&node.id) {
199 return Err(ClusterError::NodeAlreadyExists(node.id.clone()));
200 }
201
202 nodes.insert(node.id.clone(), node);
203 drop(nodes);
204
205 self.update_cluster_state();
206 Ok(())
207 }
208
209 pub fn remove_node(&self, node_id: &NodeId) -> Result<NodeInfo, ClusterError> {
211 if node_id == &self.local_node_id {
212 return Err(ClusterError::CannotRemoveLocalNode);
213 }
214
215 let mut nodes = self.nodes.write().expect("cluster nodes lock poisoned");
216 let node = nodes
217 .remove(node_id)
218 .ok_or_else(|| ClusterError::NodeNotFound(node_id.clone()))?;
219
220 drop(nodes);
221 self.update_cluster_state();
222 Ok(node)
223 }
224
225 pub fn get_node(&self, node_id: &NodeId) -> Option<NodeInfo> {
227 self.nodes.read().expect("cluster nodes lock poisoned").get(node_id).cloned()
228 }
229
230 pub fn nodes(&self) -> Vec<NodeInfo> {
232 self.nodes.read().expect("cluster nodes lock poisoned").values().cloned().collect()
233 }
234
235 pub fn node_ids(&self) -> Vec<NodeId> {
237 self.nodes.read().expect("cluster nodes lock poisoned").keys().cloned().collect()
238 }
239
240 pub fn node_count(&self) -> usize {
242 self.nodes.read().expect("cluster nodes lock poisoned").len()
243 }
244
245 pub fn peers(&self) -> Vec<NodeInfo> {
247 self.nodes
248 .read()
249 .expect("cluster nodes lock poisoned")
250 .values()
251 .filter(|n| n.id != self.local_node_id)
252 .cloned()
253 .collect()
254 }
255
256 pub fn peer_ids(&self) -> Vec<NodeId> {
258 self.nodes
259 .read()
260 .expect("cluster nodes lock poisoned")
261 .keys()
262 .filter(|id| *id != &self.local_node_id)
263 .cloned()
264 .collect()
265 }
266
267 pub fn update_health(&self, health: NodeHealth) {
273 let node_id = health.node_id.clone();
274 let mut checks = self.health_checks.write().expect("cluster health_checks lock poisoned");
275 checks.insert(node_id.clone(), health.clone());
276 drop(checks);
277
278 let mut nodes = self.nodes.write().expect("cluster nodes lock poisoned");
279 if let Some(node) = nodes.get_mut(&node_id) {
280 if health.healthy {
281 node.mark_healthy();
282 } else {
283 node.mark_suspect();
284 }
285 }
286 drop(nodes);
287
288 self.update_cluster_state();
289 }
290
291 pub fn get_health(&self, node_id: &NodeId) -> Option<NodeHealth> {
293 self.health_checks.read().expect("cluster health_checks lock poisoned").get(node_id).cloned()
294 }
295
296 pub fn heartbeat(&self, node_id: &NodeId) {
298 let mut nodes = self.nodes.write().expect("cluster nodes lock poisoned");
299 if let Some(node) = nodes.get_mut(node_id) {
300 node.heartbeat();
301 }
302 }
303
304 pub fn check_failures(&self) -> Vec<NodeId> {
306 let timeout_ms = self.config.failure_timeout.as_millis() as u64;
307 let mut failed = Vec::new();
308
309 let mut nodes = self.nodes.write().expect("cluster nodes lock poisoned");
310 for node in nodes.values_mut() {
311 if node.id == self.local_node_id {
312 continue;
313 }
314
315 if node.heartbeat_age() > timeout_ms {
316 if node.status == NodeStatus::Healthy {
317 node.mark_suspect();
318 } else if node.status == NodeStatus::Suspect {
319 node.mark_down();
320 failed.push(node.id.clone());
321 }
322 }
323 }
324
325 drop(nodes);
326
327 if !failed.is_empty() {
328 self.update_cluster_state();
329 }
330
331 failed
332 }
333
334 fn update_cluster_state(&self) {
340 let nodes = self.nodes.read().expect("cluster nodes lock poisoned");
341 let total = nodes.len();
342 let healthy = nodes
343 .values()
344 .filter(|n| n.status == NodeStatus::Healthy || n.status == NodeStatus::Starting)
345 .count();
346
347 drop(nodes);
348
349 let new_state = if total < self.config.min_nodes {
350 ClusterState::Forming
351 } else if healthy >= self.config.quorum_size {
352 if healthy == total {
353 ClusterState::Healthy
354 } else {
355 ClusterState::Degraded
356 }
357 } else {
358 ClusterState::NoQuorum
359 };
360
361 self.set_state(new_state);
362 }
363
364 pub fn has_quorum(&self) -> bool {
366 let nodes = self.nodes.read().expect("cluster nodes lock poisoned");
367 let healthy = nodes
368 .values()
369 .filter(|n| n.status == NodeStatus::Healthy)
370 .count();
371 healthy >= self.config.quorum_size
372 }
373
374 pub fn stats(&self) -> ClusterStats {
376 let nodes = self.nodes.read().expect("cluster nodes lock poisoned");
377 let total = nodes.len();
378 let healthy = nodes
379 .values()
380 .filter(|n| n.status == NodeStatus::Healthy)
381 .count();
382 let suspect = nodes
383 .values()
384 .filter(|n| n.status == NodeStatus::Suspect)
385 .count();
386 let down = nodes
387 .values()
388 .filter(|n| n.status == NodeStatus::Down)
389 .count();
390
391 ClusterStats {
392 cluster_id: self.config.cluster_id.clone(),
393 state: self.state(),
394 total_nodes: total,
395 healthy_nodes: healthy,
396 suspect_nodes: suspect,
397 down_nodes: down,
398 has_leader: self.leader_id().is_some(),
399 has_quorum: healthy >= self.config.quorum_size,
400 }
401 }
402
403 pub fn voting_members(&self) -> Vec<NodeId> {
409 self.nodes
410 .read()
411 .expect("cluster nodes lock poisoned")
412 .values()
413 .filter(|n| n.is_available())
414 .map(|n| n.id.clone())
415 .collect()
416 }
417
418 pub fn set_node_role(&self, node_id: &NodeId, role: NodeRole) {
420 let mut nodes = self.nodes.write().expect("cluster nodes lock poisoned");
421 if let Some(node) = nodes.get_mut(node_id) {
422 node.role = role;
423 }
424 }
425
426 pub fn leader(&self) -> Option<NodeInfo> {
428 let leader_id = self.leader_id()?;
429 self.get_node(&leader_id)
430 }
431}
432
433#[derive(Debug, Clone, Serialize, Deserialize)]
439pub struct ClusterStats {
440 pub cluster_id: String,
441 pub state: ClusterState,
442 pub total_nodes: usize,
443 pub healthy_nodes: usize,
444 pub suspect_nodes: usize,
445 pub down_nodes: usize,
446 pub has_leader: bool,
447 pub has_quorum: bool,
448}
449
450#[derive(Debug, Clone)]
456pub enum ClusterError {
457 NodeNotFound(NodeId),
458 NodeAlreadyExists(NodeId),
459 MaxNodesReached,
460 CannotRemoveLocalNode,
461 NoQuorum,
462 NotLeader,
463 ConfigurationError(String),
464}
465
466impl std::fmt::Display for ClusterError {
467 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
468 match self {
469 Self::NodeNotFound(id) => write!(f, "Node not found: {}", id),
470 Self::NodeAlreadyExists(id) => write!(f, "Node already exists: {}", id),
471 Self::MaxNodesReached => write!(f, "Maximum number of nodes reached"),
472 Self::CannotRemoveLocalNode => write!(f, "Cannot remove local node"),
473 Self::NoQuorum => write!(f, "Cluster has no quorum"),
474 Self::NotLeader => write!(f, "Not the leader"),
475 Self::ConfigurationError(msg) => write!(f, "Configuration error: {}", msg),
476 }
477 }
478}
479
480impl std::error::Error for ClusterError {}
481
482#[cfg(test)]
487mod tests {
488 use super::*;
489
490 fn create_node(id: &str) -> NodeInfo {
491 NodeInfo::new(id, "127.0.0.1", 5000)
492 }
493
494 #[test]
495 fn test_cluster_config() {
496 let config = ClusterConfig::new("test-cluster")
497 .with_replication_factor(3)
498 .with_heartbeat_interval(Duration::from_secs(2));
499
500 assert_eq!(config.cluster_id, "test-cluster");
501 assert_eq!(config.replication_factor, 3);
502 assert_eq!(config.quorum_size, 2);
503 }
504
505 #[test]
506 fn test_cluster_creation() {
507 let node = create_node("node1");
508 let cluster = Cluster::new(node, ClusterConfig::default());
509
510 assert_eq!(cluster.node_count(), 1);
511 assert_eq!(cluster.state(), ClusterState::Initializing);
512 assert!(cluster.leader_id().is_none());
513 }
514
515 #[test]
516 fn test_add_remove_node() {
517 let node1 = create_node("node1");
518 let cluster = Cluster::new(node1, ClusterConfig::default());
519
520 let node2 = create_node("node2");
521 cluster.add_node(node2).unwrap();
522 assert_eq!(cluster.node_count(), 2);
523
524 cluster.remove_node(&NodeId::new("node2")).unwrap();
525 assert_eq!(cluster.node_count(), 1);
526 }
527
528 #[test]
529 fn test_cannot_remove_local_node() {
530 let node = create_node("node1");
531 let cluster = Cluster::new(node, ClusterConfig::default());
532
533 let result = cluster.remove_node(&NodeId::new("node1"));
534 assert!(matches!(result, Err(ClusterError::CannotRemoveLocalNode)));
535 }
536
537 #[test]
538 fn test_peers() {
539 let node1 = create_node("node1");
540 let cluster = Cluster::new(node1, ClusterConfig::default());
541
542 cluster.add_node(create_node("node2")).unwrap();
543 cluster.add_node(create_node("node3")).unwrap();
544
545 let peers = cluster.peers();
546 assert_eq!(peers.len(), 2);
547
548 let peer_ids = cluster.peer_ids();
549 assert!(peer_ids.contains(&NodeId::new("node2")));
550 assert!(peer_ids.contains(&NodeId::new("node3")));
551 assert!(!peer_ids.contains(&NodeId::new("node1")));
552 }
553
554 #[test]
555 fn test_cluster_stats() {
556 let mut node1 = create_node("node1");
557 node1.mark_healthy();
558
559 let config = ClusterConfig::default().with_replication_factor(3);
560 let cluster = Cluster::new(node1, config);
561
562 let mut node2 = create_node("node2");
563 node2.mark_healthy();
564 cluster.add_node(node2).unwrap();
565
566 let stats = cluster.stats();
567 assert_eq!(stats.total_nodes, 2);
568 assert_eq!(stats.healthy_nodes, 2);
569 assert!(stats.has_quorum);
570 }
571
572 #[test]
573 fn test_heartbeat() {
574 let node1 = create_node("node1");
575 let cluster = Cluster::new(node1, ClusterConfig::default());
576
577 let mut node2 = create_node("node2");
578 node2.status = NodeStatus::Suspect;
579 cluster.add_node(node2).unwrap();
580
581 cluster.heartbeat(&NodeId::new("node2"));
582
583 let node = cluster.get_node(&NodeId::new("node2")).unwrap();
584 assert_eq!(node.status, NodeStatus::Healthy);
585 }
586
587 #[test]
588 fn test_leader_management() {
589 let node1 = create_node("node1");
590 let cluster = Cluster::new(node1, ClusterConfig::default());
591
592 assert!(!cluster.is_leader());
593
594 cluster.set_leader(Some(NodeId::new("node1")));
595 assert!(cluster.is_leader());
596
597 cluster.set_leader(Some(NodeId::new("node2")));
598 assert!(!cluster.is_leader());
599 }
600
601 #[test]
602 fn test_max_nodes() {
603 let node1 = create_node("node1");
604 let config = ClusterConfig {
605 max_nodes: 2,
606 ..Default::default()
607 };
608 let cluster = Cluster::new(node1, config);
609
610 cluster.add_node(create_node("node2")).unwrap();
611 let result = cluster.add_node(create_node("node3"));
612 assert!(matches!(result, Err(ClusterError::MaxNodesReached)));
613 }
614}