1use crate::node::{NodeHealth, NodeId, NodeInfo, NodeRole, NodeStatus};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::RwLock;
12use std::time::Duration;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct ClusterConfig {
21 pub cluster_id: String,
22 pub min_nodes: usize,
23 pub max_nodes: usize,
24 pub heartbeat_interval: Duration,
25 pub failure_timeout: Duration,
26 pub replication_factor: usize,
27 pub quorum_size: usize,
28}
29
30impl Default for ClusterConfig {
31 fn default() -> Self {
32 Self {
33 cluster_id: "aegis-cluster".to_string(),
34 min_nodes: 1,
35 max_nodes: 100,
36 heartbeat_interval: Duration::from_secs(1),
37 failure_timeout: Duration::from_secs(5),
38 replication_factor: 3,
39 quorum_size: 2,
40 }
41 }
42}
43
44impl ClusterConfig {
45 pub fn new(cluster_id: impl Into<String>) -> Self {
46 Self {
47 cluster_id: cluster_id.into(),
48 ..Default::default()
49 }
50 }
51
52 pub fn with_replication_factor(mut self, factor: usize) -> Self {
53 self.replication_factor = factor;
54 self.quorum_size = (factor / 2) + 1;
55 self
56 }
57
58 pub fn with_heartbeat_interval(mut self, interval: Duration) -> Self {
59 self.heartbeat_interval = interval;
60 self
61 }
62
63 pub fn with_failure_timeout(mut self, timeout: Duration) -> Self {
64 self.failure_timeout = timeout;
65 self
66 }
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
75pub enum ClusterState {
76 #[default]
78 Initializing,
79 Forming,
81 Healthy,
83 Degraded,
85 NoQuorum,
87 ShuttingDown,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
97pub enum MembershipChange {
98 AddNode(NodeInfo),
99 RemoveNode(NodeId),
100 UpdateNode(NodeInfo),
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct MembershipChangeRequest {
106 pub change_id: String,
107 pub change: MembershipChange,
108 pub requested_at: u64,
109}
110
111pub struct Cluster {
117 config: ClusterConfig,
118 state: RwLock<ClusterState>,
119 nodes: RwLock<HashMap<NodeId, NodeInfo>>,
120 leader_id: RwLock<Option<NodeId>>,
121 local_node_id: NodeId,
122 health_checks: RwLock<HashMap<NodeId, NodeHealth>>,
123}
124
125impl Cluster {
126 pub fn new(local_node: NodeInfo, config: ClusterConfig) -> Self {
128 let local_id = local_node.id.clone();
129 let mut nodes = HashMap::new();
130 nodes.insert(local_id.clone(), local_node);
131
132 Self {
133 config,
134 state: RwLock::new(ClusterState::Initializing),
135 nodes: RwLock::new(nodes),
136 leader_id: RwLock::new(None),
137 local_node_id: local_id,
138 health_checks: RwLock::new(HashMap::new()),
139 }
140 }
141
142 pub fn id(&self) -> &str {
144 &self.config.cluster_id
145 }
146
147 pub fn config(&self) -> &ClusterConfig {
149 &self.config
150 }
151
152 pub fn state(&self) -> ClusterState {
154 *self.state.read().expect("cluster state lock poisoned")
155 }
156
157 pub fn set_state(&self, state: ClusterState) {
159 *self.state.write().expect("cluster state lock poisoned") = state;
160 }
161
162 pub fn local_node_id(&self) -> &NodeId {
164 &self.local_node_id
165 }
166
167 pub fn leader_id(&self) -> Option<NodeId> {
169 self.leader_id
170 .read()
171 .expect("cluster leader_id lock poisoned")
172 .clone()
173 }
174
175 pub fn set_leader(&self, leader_id: Option<NodeId>) {
177 *self
178 .leader_id
179 .write()
180 .expect("cluster leader_id lock poisoned") = leader_id;
181 }
182
183 pub fn is_leader(&self) -> bool {
185 self.leader_id()
186 .map(|id| id == self.local_node_id)
187 .unwrap_or(false)
188 }
189
190 pub fn add_node(&self, node: NodeInfo) -> Result<(), ClusterError> {
196 let mut nodes = self.nodes.write().expect("cluster nodes lock poisoned");
197
198 if nodes.len() >= self.config.max_nodes {
199 return Err(ClusterError::MaxNodesReached);
200 }
201
202 if nodes.contains_key(&node.id) {
203 return Err(ClusterError::NodeAlreadyExists(node.id.clone()));
204 }
205
206 nodes.insert(node.id.clone(), node);
207 drop(nodes);
208
209 self.update_cluster_state();
210 Ok(())
211 }
212
213 pub fn remove_node(&self, node_id: &NodeId) -> Result<NodeInfo, ClusterError> {
215 if node_id == &self.local_node_id {
216 return Err(ClusterError::CannotRemoveLocalNode);
217 }
218
219 let mut nodes = self.nodes.write().expect("cluster nodes lock poisoned");
220 let node = nodes
221 .remove(node_id)
222 .ok_or_else(|| ClusterError::NodeNotFound(node_id.clone()))?;
223
224 drop(nodes);
225 self.update_cluster_state();
226 Ok(node)
227 }
228
229 pub fn get_node(&self, node_id: &NodeId) -> Option<NodeInfo> {
231 self.nodes
232 .read()
233 .expect("cluster nodes lock poisoned")
234 .get(node_id)
235 .cloned()
236 }
237
238 pub fn nodes(&self) -> Vec<NodeInfo> {
240 self.nodes
241 .read()
242 .expect("cluster nodes lock poisoned")
243 .values()
244 .cloned()
245 .collect()
246 }
247
248 pub fn node_ids(&self) -> Vec<NodeId> {
250 self.nodes
251 .read()
252 .expect("cluster nodes lock poisoned")
253 .keys()
254 .cloned()
255 .collect()
256 }
257
258 pub fn node_count(&self) -> usize {
260 self.nodes
261 .read()
262 .expect("cluster nodes lock poisoned")
263 .len()
264 }
265
266 pub fn peers(&self) -> Vec<NodeInfo> {
268 self.nodes
269 .read()
270 .expect("cluster nodes lock poisoned")
271 .values()
272 .filter(|n| n.id != self.local_node_id)
273 .cloned()
274 .collect()
275 }
276
277 pub fn peer_ids(&self) -> Vec<NodeId> {
279 self.nodes
280 .read()
281 .expect("cluster nodes lock poisoned")
282 .keys()
283 .filter(|id| *id != &self.local_node_id)
284 .cloned()
285 .collect()
286 }
287
288 pub fn update_health(&self, health: NodeHealth) {
294 let node_id = health.node_id.clone();
295 let mut checks = self
296 .health_checks
297 .write()
298 .expect("cluster health_checks lock poisoned");
299 checks.insert(node_id.clone(), health.clone());
300 drop(checks);
301
302 let mut nodes = self.nodes.write().expect("cluster nodes lock poisoned");
303 if let Some(node) = nodes.get_mut(&node_id) {
304 if health.healthy {
305 node.mark_healthy();
306 } else {
307 node.mark_suspect();
308 }
309 }
310 drop(nodes);
311
312 self.update_cluster_state();
313 }
314
315 pub fn get_health(&self, node_id: &NodeId) -> Option<NodeHealth> {
317 self.health_checks
318 .read()
319 .expect("cluster health_checks lock poisoned")
320 .get(node_id)
321 .cloned()
322 }
323
324 pub fn heartbeat(&self, node_id: &NodeId) {
326 let mut nodes = self.nodes.write().expect("cluster nodes lock poisoned");
327 if let Some(node) = nodes.get_mut(node_id) {
328 node.heartbeat();
329 }
330 }
331
332 pub fn check_failures(&self) -> Vec<NodeId> {
334 let timeout_ms = self.config.failure_timeout.as_millis() as u64;
335 let mut failed = Vec::new();
336
337 let mut nodes = self.nodes.write().expect("cluster nodes lock poisoned");
338 for node in nodes.values_mut() {
339 if node.id == self.local_node_id {
340 continue;
341 }
342
343 if node.heartbeat_age() > timeout_ms {
344 if node.status == NodeStatus::Healthy {
345 node.mark_suspect();
346 } else if node.status == NodeStatus::Suspect {
347 node.mark_down();
348 failed.push(node.id.clone());
349 }
350 }
351 }
352
353 drop(nodes);
354
355 if !failed.is_empty() {
356 self.update_cluster_state();
357 }
358
359 failed
360 }
361
362 fn update_cluster_state(&self) {
368 let nodes = self.nodes.read().expect("cluster nodes lock poisoned");
369 let total = nodes.len();
370 let healthy = nodes
371 .values()
372 .filter(|n| n.status == NodeStatus::Healthy || n.status == NodeStatus::Starting)
373 .count();
374
375 drop(nodes);
376
377 let new_state = if total < self.config.min_nodes {
378 ClusterState::Forming
379 } else if healthy >= self.config.quorum_size {
380 if healthy == total {
381 ClusterState::Healthy
382 } else {
383 ClusterState::Degraded
384 }
385 } else {
386 ClusterState::NoQuorum
387 };
388
389 self.set_state(new_state);
390 }
391
392 pub fn has_quorum(&self) -> bool {
394 let nodes = self.nodes.read().expect("cluster nodes lock poisoned");
395 let healthy = nodes
396 .values()
397 .filter(|n| n.status == NodeStatus::Healthy)
398 .count();
399 healthy >= self.config.quorum_size
400 }
401
402 pub fn stats(&self) -> ClusterStats {
404 let nodes = self.nodes.read().expect("cluster nodes lock poisoned");
405 let total = nodes.len();
406 let healthy = nodes
407 .values()
408 .filter(|n| n.status == NodeStatus::Healthy)
409 .count();
410 let suspect = nodes
411 .values()
412 .filter(|n| n.status == NodeStatus::Suspect)
413 .count();
414 let down = nodes
415 .values()
416 .filter(|n| n.status == NodeStatus::Down)
417 .count();
418
419 ClusterStats {
420 cluster_id: self.config.cluster_id.clone(),
421 state: self.state(),
422 total_nodes: total,
423 healthy_nodes: healthy,
424 suspect_nodes: suspect,
425 down_nodes: down,
426 has_leader: self.leader_id().is_some(),
427 has_quorum: healthy >= self.config.quorum_size,
428 }
429 }
430
431 pub fn voting_members(&self) -> Vec<NodeId> {
437 self.nodes
438 .read()
439 .expect("cluster nodes lock poisoned")
440 .values()
441 .filter(|n| n.is_available())
442 .map(|n| n.id.clone())
443 .collect()
444 }
445
446 pub fn set_node_role(&self, node_id: &NodeId, role: NodeRole) {
448 let mut nodes = self.nodes.write().expect("cluster nodes lock poisoned");
449 if let Some(node) = nodes.get_mut(node_id) {
450 node.role = role;
451 }
452 }
453
454 pub fn leader(&self) -> Option<NodeInfo> {
456 let leader_id = self.leader_id()?;
457 self.get_node(&leader_id)
458 }
459}
460
461#[derive(Debug, Clone, Serialize, Deserialize)]
467pub struct ClusterStats {
468 pub cluster_id: String,
469 pub state: ClusterState,
470 pub total_nodes: usize,
471 pub healthy_nodes: usize,
472 pub suspect_nodes: usize,
473 pub down_nodes: usize,
474 pub has_leader: bool,
475 pub has_quorum: bool,
476}
477
478#[derive(Debug, Clone)]
484pub enum ClusterError {
485 NodeNotFound(NodeId),
486 NodeAlreadyExists(NodeId),
487 MaxNodesReached,
488 CannotRemoveLocalNode,
489 NoQuorum,
490 NotLeader,
491 ConfigurationError(String),
492}
493
494impl std::fmt::Display for ClusterError {
495 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
496 match self {
497 Self::NodeNotFound(id) => write!(f, "Node not found: {}", id),
498 Self::NodeAlreadyExists(id) => write!(f, "Node already exists: {}", id),
499 Self::MaxNodesReached => write!(f, "Maximum number of nodes reached"),
500 Self::CannotRemoveLocalNode => write!(f, "Cannot remove local node"),
501 Self::NoQuorum => write!(f, "Cluster has no quorum"),
502 Self::NotLeader => write!(f, "Not the leader"),
503 Self::ConfigurationError(msg) => write!(f, "Configuration error: {}", msg),
504 }
505 }
506}
507
508impl std::error::Error for ClusterError {}
509
510#[cfg(test)]
515mod tests {
516 use super::*;
517
518 fn create_node(id: &str) -> NodeInfo {
519 NodeInfo::new(id, "127.0.0.1", 5000)
520 }
521
522 #[test]
523 fn test_cluster_config() {
524 let config = ClusterConfig::new("test-cluster")
525 .with_replication_factor(3)
526 .with_heartbeat_interval(Duration::from_secs(2));
527
528 assert_eq!(config.cluster_id, "test-cluster");
529 assert_eq!(config.replication_factor, 3);
530 assert_eq!(config.quorum_size, 2);
531 }
532
533 #[test]
534 fn test_cluster_creation() {
535 let node = create_node("node1");
536 let cluster = Cluster::new(node, ClusterConfig::default());
537
538 assert_eq!(cluster.node_count(), 1);
539 assert_eq!(cluster.state(), ClusterState::Initializing);
540 assert!(cluster.leader_id().is_none());
541 }
542
543 #[test]
544 fn test_add_remove_node() {
545 let node1 = create_node("node1");
546 let cluster = Cluster::new(node1, ClusterConfig::default());
547
548 let node2 = create_node("node2");
549 cluster.add_node(node2).unwrap();
550 assert_eq!(cluster.node_count(), 2);
551
552 cluster.remove_node(&NodeId::new("node2")).unwrap();
553 assert_eq!(cluster.node_count(), 1);
554 }
555
556 #[test]
557 fn test_cannot_remove_local_node() {
558 let node = create_node("node1");
559 let cluster = Cluster::new(node, ClusterConfig::default());
560
561 let result = cluster.remove_node(&NodeId::new("node1"));
562 assert!(matches!(result, Err(ClusterError::CannotRemoveLocalNode)));
563 }
564
565 #[test]
566 fn test_peers() {
567 let node1 = create_node("node1");
568 let cluster = Cluster::new(node1, ClusterConfig::default());
569
570 cluster.add_node(create_node("node2")).unwrap();
571 cluster.add_node(create_node("node3")).unwrap();
572
573 let peers = cluster.peers();
574 assert_eq!(peers.len(), 2);
575
576 let peer_ids = cluster.peer_ids();
577 assert!(peer_ids.contains(&NodeId::new("node2")));
578 assert!(peer_ids.contains(&NodeId::new("node3")));
579 assert!(!peer_ids.contains(&NodeId::new("node1")));
580 }
581
582 #[test]
583 fn test_cluster_stats() {
584 let mut node1 = create_node("node1");
585 node1.mark_healthy();
586
587 let config = ClusterConfig::default().with_replication_factor(3);
588 let cluster = Cluster::new(node1, config);
589
590 let mut node2 = create_node("node2");
591 node2.mark_healthy();
592 cluster.add_node(node2).unwrap();
593
594 let stats = cluster.stats();
595 assert_eq!(stats.total_nodes, 2);
596 assert_eq!(stats.healthy_nodes, 2);
597 assert!(stats.has_quorum);
598 }
599
600 #[test]
601 fn test_heartbeat() {
602 let node1 = create_node("node1");
603 let cluster = Cluster::new(node1, ClusterConfig::default());
604
605 let mut node2 = create_node("node2");
606 node2.status = NodeStatus::Suspect;
607 cluster.add_node(node2).unwrap();
608
609 cluster.heartbeat(&NodeId::new("node2"));
610
611 let node = cluster.get_node(&NodeId::new("node2")).unwrap();
612 assert_eq!(node.status, NodeStatus::Healthy);
613 }
614
615 #[test]
616 fn test_leader_management() {
617 let node1 = create_node("node1");
618 let cluster = Cluster::new(node1, ClusterConfig::default());
619
620 assert!(!cluster.is_leader());
621
622 cluster.set_leader(Some(NodeId::new("node1")));
623 assert!(cluster.is_leader());
624
625 cluster.set_leader(Some(NodeId::new("node2")));
626 assert!(!cluster.is_leader());
627 }
628
629 #[test]
630 fn test_max_nodes() {
631 let node1 = create_node("node1");
632 let config = ClusterConfig {
633 max_nodes: 2,
634 ..Default::default()
635 };
636 let cluster = Cluster::new(node1, config);
637
638 cluster.add_node(create_node("node2")).unwrap();
639 let result = cluster.add_node(create_node("node3"));
640 assert!(matches!(result, Err(ClusterError::MaxNodesReached)));
641 }
642}