Skip to main content

aegis_replication/
cluster.rs

1//! Aegis Cluster Management
2//!
3//! Cluster coordination and membership management.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use crate::node::{NodeHealth, NodeId, NodeInfo, NodeRole, NodeStatus};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::RwLock;
12use std::time::Duration;
13
14// =============================================================================
15// Cluster Configuration
16// =============================================================================
17
18/// Configuration for a cluster.
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct ClusterConfig {
21    pub cluster_id: String,
22    pub min_nodes: usize,
23    pub max_nodes: usize,
24    pub heartbeat_interval: Duration,
25    pub failure_timeout: Duration,
26    pub replication_factor: usize,
27    pub quorum_size: usize,
28}
29
30impl Default for ClusterConfig {
31    fn default() -> Self {
32        Self {
33            cluster_id: "aegis-cluster".to_string(),
34            min_nodes: 1,
35            max_nodes: 100,
36            heartbeat_interval: Duration::from_secs(1),
37            failure_timeout: Duration::from_secs(5),
38            replication_factor: 3,
39            quorum_size: 2,
40        }
41    }
42}
43
44impl ClusterConfig {
45    pub fn new(cluster_id: impl Into<String>) -> Self {
46        Self {
47            cluster_id: cluster_id.into(),
48            ..Default::default()
49        }
50    }
51
52    pub fn with_replication_factor(mut self, factor: usize) -> Self {
53        self.replication_factor = factor;
54        self.quorum_size = (factor / 2) + 1;
55        self
56    }
57
58    pub fn with_heartbeat_interval(mut self, interval: Duration) -> Self {
59        self.heartbeat_interval = interval;
60        self
61    }
62
63    pub fn with_failure_timeout(mut self, timeout: Duration) -> Self {
64        self.failure_timeout = timeout;
65        self
66    }
67}
68
69// =============================================================================
70// Cluster State
71// =============================================================================
72
73/// State of the cluster.
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
75#[derive(Default)]
76pub enum ClusterState {
77    /// Cluster is initializing.
78    #[default]
79    Initializing,
80    /// Cluster is forming (waiting for quorum).
81    Forming,
82    /// Cluster is healthy and operational.
83    Healthy,
84    /// Cluster is degraded (some nodes down).
85    Degraded,
86    /// Cluster has lost quorum.
87    NoQuorum,
88    /// Cluster is shutting down.
89    ShuttingDown,
90}
91
92
93// =============================================================================
94// Membership Change
95// =============================================================================
96
97/// Type of membership change.
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub enum MembershipChange {
100    AddNode(NodeInfo),
101    RemoveNode(NodeId),
102    UpdateNode(NodeInfo),
103}
104
105/// A membership change request.
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct MembershipChangeRequest {
108    pub change_id: String,
109    pub change: MembershipChange,
110    pub requested_at: u64,
111}
112
113// =============================================================================
114// Cluster
115// =============================================================================
116
117/// A distributed cluster of nodes.
118pub struct Cluster {
119    config: ClusterConfig,
120    state: RwLock<ClusterState>,
121    nodes: RwLock<HashMap<NodeId, NodeInfo>>,
122    leader_id: RwLock<Option<NodeId>>,
123    local_node_id: NodeId,
124    health_checks: RwLock<HashMap<NodeId, NodeHealth>>,
125}
126
127impl Cluster {
128    /// Create a new cluster.
129    pub fn new(local_node: NodeInfo, config: ClusterConfig) -> Self {
130        let local_id = local_node.id.clone();
131        let mut nodes = HashMap::new();
132        nodes.insert(local_id.clone(), local_node);
133
134        Self {
135            config,
136            state: RwLock::new(ClusterState::Initializing),
137            nodes: RwLock::new(nodes),
138            leader_id: RwLock::new(None),
139            local_node_id: local_id,
140            health_checks: RwLock::new(HashMap::new()),
141        }
142    }
143
144    /// Get the cluster ID.
145    pub fn id(&self) -> &str {
146        &self.config.cluster_id
147    }
148
149    /// Get the cluster configuration.
150    pub fn config(&self) -> &ClusterConfig {
151        &self.config
152    }
153
154    /// Get the current cluster state.
155    pub fn state(&self) -> ClusterState {
156        *self.state.read().expect("cluster state lock poisoned")
157    }
158
159    /// Set the cluster state.
160    pub fn set_state(&self, state: ClusterState) {
161        *self.state.write().expect("cluster state lock poisoned") = state;
162    }
163
164    /// Get the local node ID.
165    pub fn local_node_id(&self) -> &NodeId {
166        &self.local_node_id
167    }
168
169    /// Get the current leader ID.
170    pub fn leader_id(&self) -> Option<NodeId> {
171        self.leader_id.read().expect("cluster leader_id lock poisoned").clone()
172    }
173
174    /// Set the leader ID.
175    pub fn set_leader(&self, leader_id: Option<NodeId>) {
176        *self.leader_id.write().expect("cluster leader_id lock poisoned") = leader_id;
177    }
178
179    /// Check if this node is the leader.
180    pub fn is_leader(&self) -> bool {
181        self.leader_id()
182            .map(|id| id == self.local_node_id)
183            .unwrap_or(false)
184    }
185
186    // =========================================================================
187    // Node Management
188    // =========================================================================
189
190    /// Add a node to the cluster.
191    pub fn add_node(&self, node: NodeInfo) -> Result<(), ClusterError> {
192        let mut nodes = self.nodes.write().expect("cluster nodes lock poisoned");
193
194        if nodes.len() >= self.config.max_nodes {
195            return Err(ClusterError::MaxNodesReached);
196        }
197
198        if nodes.contains_key(&node.id) {
199            return Err(ClusterError::NodeAlreadyExists(node.id.clone()));
200        }
201
202        nodes.insert(node.id.clone(), node);
203        drop(nodes);
204
205        self.update_cluster_state();
206        Ok(())
207    }
208
209    /// Remove a node from the cluster.
210    pub fn remove_node(&self, node_id: &NodeId) -> Result<NodeInfo, ClusterError> {
211        if node_id == &self.local_node_id {
212            return Err(ClusterError::CannotRemoveLocalNode);
213        }
214
215        let mut nodes = self.nodes.write().expect("cluster nodes lock poisoned");
216        let node = nodes
217            .remove(node_id)
218            .ok_or_else(|| ClusterError::NodeNotFound(node_id.clone()))?;
219
220        drop(nodes);
221        self.update_cluster_state();
222        Ok(node)
223    }
224
225    /// Get a node by ID.
226    pub fn get_node(&self, node_id: &NodeId) -> Option<NodeInfo> {
227        self.nodes.read().expect("cluster nodes lock poisoned").get(node_id).cloned()
228    }
229
230    /// Get all nodes in the cluster.
231    pub fn nodes(&self) -> Vec<NodeInfo> {
232        self.nodes.read().expect("cluster nodes lock poisoned").values().cloned().collect()
233    }
234
235    /// Get all node IDs.
236    pub fn node_ids(&self) -> Vec<NodeId> {
237        self.nodes.read().expect("cluster nodes lock poisoned").keys().cloned().collect()
238    }
239
240    /// Get the number of nodes.
241    pub fn node_count(&self) -> usize {
242        self.nodes.read().expect("cluster nodes lock poisoned").len()
243    }
244
245    /// Get peer nodes (excluding local).
246    pub fn peers(&self) -> Vec<NodeInfo> {
247        self.nodes
248            .read()
249            .expect("cluster nodes lock poisoned")
250            .values()
251            .filter(|n| n.id != self.local_node_id)
252            .cloned()
253            .collect()
254    }
255
256    /// Get peer IDs.
257    pub fn peer_ids(&self) -> Vec<NodeId> {
258        self.nodes
259            .read()
260            .expect("cluster nodes lock poisoned")
261            .keys()
262            .filter(|id| *id != &self.local_node_id)
263            .cloned()
264            .collect()
265    }
266
267    // =========================================================================
268    // Health Management
269    // =========================================================================
270
271    /// Update node health.
272    pub fn update_health(&self, health: NodeHealth) {
273        let node_id = health.node_id.clone();
274        let mut checks = self.health_checks.write().expect("cluster health_checks lock poisoned");
275        checks.insert(node_id.clone(), health.clone());
276        drop(checks);
277
278        let mut nodes = self.nodes.write().expect("cluster nodes lock poisoned");
279        if let Some(node) = nodes.get_mut(&node_id) {
280            if health.healthy {
281                node.mark_healthy();
282            } else {
283                node.mark_suspect();
284            }
285        }
286        drop(nodes);
287
288        self.update_cluster_state();
289    }
290
291    /// Get health for a node.
292    pub fn get_health(&self, node_id: &NodeId) -> Option<NodeHealth> {
293        self.health_checks.read().expect("cluster health_checks lock poisoned").get(node_id).cloned()
294    }
295
296    /// Process heartbeat from a node.
297    pub fn heartbeat(&self, node_id: &NodeId) {
298        let mut nodes = self.nodes.write().expect("cluster nodes lock poisoned");
299        if let Some(node) = nodes.get_mut(node_id) {
300            node.heartbeat();
301        }
302    }
303
304    /// Check for failed nodes based on timeout.
305    pub fn check_failures(&self) -> Vec<NodeId> {
306        let timeout_ms = self.config.failure_timeout.as_millis() as u64;
307        let mut failed = Vec::new();
308
309        let mut nodes = self.nodes.write().expect("cluster nodes lock poisoned");
310        for node in nodes.values_mut() {
311            if node.id == self.local_node_id {
312                continue;
313            }
314
315            if node.heartbeat_age() > timeout_ms {
316                if node.status == NodeStatus::Healthy {
317                    node.mark_suspect();
318                } else if node.status == NodeStatus::Suspect {
319                    node.mark_down();
320                    failed.push(node.id.clone());
321                }
322            }
323        }
324
325        drop(nodes);
326
327        if !failed.is_empty() {
328            self.update_cluster_state();
329        }
330
331        failed
332    }
333
334    // =========================================================================
335    // Cluster State Management
336    // =========================================================================
337
338    /// Update the cluster state based on node health.
339    fn update_cluster_state(&self) {
340        let nodes = self.nodes.read().expect("cluster nodes lock poisoned");
341        let total = nodes.len();
342        let healthy = nodes
343            .values()
344            .filter(|n| n.status == NodeStatus::Healthy || n.status == NodeStatus::Starting)
345            .count();
346
347        drop(nodes);
348
349        let new_state = if total < self.config.min_nodes {
350            ClusterState::Forming
351        } else if healthy >= self.config.quorum_size {
352            if healthy == total {
353                ClusterState::Healthy
354            } else {
355                ClusterState::Degraded
356            }
357        } else {
358            ClusterState::NoQuorum
359        };
360
361        self.set_state(new_state);
362    }
363
364    /// Check if cluster has quorum.
365    pub fn has_quorum(&self) -> bool {
366        let nodes = self.nodes.read().expect("cluster nodes lock poisoned");
367        let healthy = nodes
368            .values()
369            .filter(|n| n.status == NodeStatus::Healthy)
370            .count();
371        healthy >= self.config.quorum_size
372    }
373
374    /// Get cluster statistics.
375    pub fn stats(&self) -> ClusterStats {
376        let nodes = self.nodes.read().expect("cluster nodes lock poisoned");
377        let total = nodes.len();
378        let healthy = nodes
379            .values()
380            .filter(|n| n.status == NodeStatus::Healthy)
381            .count();
382        let suspect = nodes
383            .values()
384            .filter(|n| n.status == NodeStatus::Suspect)
385            .count();
386        let down = nodes
387            .values()
388            .filter(|n| n.status == NodeStatus::Down)
389            .count();
390
391        ClusterStats {
392            cluster_id: self.config.cluster_id.clone(),
393            state: self.state(),
394            total_nodes: total,
395            healthy_nodes: healthy,
396            suspect_nodes: suspect,
397            down_nodes: down,
398            has_leader: self.leader_id().is_some(),
399            has_quorum: healthy >= self.config.quorum_size,
400        }
401    }
402
403    // =========================================================================
404    // Leader Election Support
405    // =========================================================================
406
407    /// Get nodes that can vote.
408    pub fn voting_members(&self) -> Vec<NodeId> {
409        self.nodes
410            .read()
411            .expect("cluster nodes lock poisoned")
412            .values()
413            .filter(|n| n.is_available())
414            .map(|n| n.id.clone())
415            .collect()
416    }
417
418    /// Update node role.
419    pub fn set_node_role(&self, node_id: &NodeId, role: NodeRole) {
420        let mut nodes = self.nodes.write().expect("cluster nodes lock poisoned");
421        if let Some(node) = nodes.get_mut(node_id) {
422            node.role = role;
423        }
424    }
425
426    /// Get the current leader node.
427    pub fn leader(&self) -> Option<NodeInfo> {
428        let leader_id = self.leader_id()?;
429        self.get_node(&leader_id)
430    }
431}
432
433// =============================================================================
434// Cluster Statistics
435// =============================================================================
436
437/// Statistics about the cluster.
438#[derive(Debug, Clone, Serialize, Deserialize)]
439pub struct ClusterStats {
440    pub cluster_id: String,
441    pub state: ClusterState,
442    pub total_nodes: usize,
443    pub healthy_nodes: usize,
444    pub suspect_nodes: usize,
445    pub down_nodes: usize,
446    pub has_leader: bool,
447    pub has_quorum: bool,
448}
449
450// =============================================================================
451// Cluster Error
452// =============================================================================
453
454/// Errors that can occur in cluster operations.
455#[derive(Debug, Clone)]
456pub enum ClusterError {
457    NodeNotFound(NodeId),
458    NodeAlreadyExists(NodeId),
459    MaxNodesReached,
460    CannotRemoveLocalNode,
461    NoQuorum,
462    NotLeader,
463    ConfigurationError(String),
464}
465
466impl std::fmt::Display for ClusterError {
467    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
468        match self {
469            Self::NodeNotFound(id) => write!(f, "Node not found: {}", id),
470            Self::NodeAlreadyExists(id) => write!(f, "Node already exists: {}", id),
471            Self::MaxNodesReached => write!(f, "Maximum number of nodes reached"),
472            Self::CannotRemoveLocalNode => write!(f, "Cannot remove local node"),
473            Self::NoQuorum => write!(f, "Cluster has no quorum"),
474            Self::NotLeader => write!(f, "Not the leader"),
475            Self::ConfigurationError(msg) => write!(f, "Configuration error: {}", msg),
476        }
477    }
478}
479
480impl std::error::Error for ClusterError {}
481
482// =============================================================================
483// Tests
484// =============================================================================
485
486#[cfg(test)]
487mod tests {
488    use super::*;
489
490    fn create_node(id: &str) -> NodeInfo {
491        NodeInfo::new(id, "127.0.0.1", 5000)
492    }
493
494    #[test]
495    fn test_cluster_config() {
496        let config = ClusterConfig::new("test-cluster")
497            .with_replication_factor(3)
498            .with_heartbeat_interval(Duration::from_secs(2));
499
500        assert_eq!(config.cluster_id, "test-cluster");
501        assert_eq!(config.replication_factor, 3);
502        assert_eq!(config.quorum_size, 2);
503    }
504
505    #[test]
506    fn test_cluster_creation() {
507        let node = create_node("node1");
508        let cluster = Cluster::new(node, ClusterConfig::default());
509
510        assert_eq!(cluster.node_count(), 1);
511        assert_eq!(cluster.state(), ClusterState::Initializing);
512        assert!(cluster.leader_id().is_none());
513    }
514
515    #[test]
516    fn test_add_remove_node() {
517        let node1 = create_node("node1");
518        let cluster = Cluster::new(node1, ClusterConfig::default());
519
520        let node2 = create_node("node2");
521        cluster.add_node(node2).unwrap();
522        assert_eq!(cluster.node_count(), 2);
523
524        cluster.remove_node(&NodeId::new("node2")).unwrap();
525        assert_eq!(cluster.node_count(), 1);
526    }
527
528    #[test]
529    fn test_cannot_remove_local_node() {
530        let node = create_node("node1");
531        let cluster = Cluster::new(node, ClusterConfig::default());
532
533        let result = cluster.remove_node(&NodeId::new("node1"));
534        assert!(matches!(result, Err(ClusterError::CannotRemoveLocalNode)));
535    }
536
537    #[test]
538    fn test_peers() {
539        let node1 = create_node("node1");
540        let cluster = Cluster::new(node1, ClusterConfig::default());
541
542        cluster.add_node(create_node("node2")).unwrap();
543        cluster.add_node(create_node("node3")).unwrap();
544
545        let peers = cluster.peers();
546        assert_eq!(peers.len(), 2);
547
548        let peer_ids = cluster.peer_ids();
549        assert!(peer_ids.contains(&NodeId::new("node2")));
550        assert!(peer_ids.contains(&NodeId::new("node3")));
551        assert!(!peer_ids.contains(&NodeId::new("node1")));
552    }
553
554    #[test]
555    fn test_cluster_stats() {
556        let mut node1 = create_node("node1");
557        node1.mark_healthy();
558
559        let config = ClusterConfig::default().with_replication_factor(3);
560        let cluster = Cluster::new(node1, config);
561
562        let mut node2 = create_node("node2");
563        node2.mark_healthy();
564        cluster.add_node(node2).unwrap();
565
566        let stats = cluster.stats();
567        assert_eq!(stats.total_nodes, 2);
568        assert_eq!(stats.healthy_nodes, 2);
569        assert!(stats.has_quorum);
570    }
571
572    #[test]
573    fn test_heartbeat() {
574        let node1 = create_node("node1");
575        let cluster = Cluster::new(node1, ClusterConfig::default());
576
577        let mut node2 = create_node("node2");
578        node2.status = NodeStatus::Suspect;
579        cluster.add_node(node2).unwrap();
580
581        cluster.heartbeat(&NodeId::new("node2"));
582
583        let node = cluster.get_node(&NodeId::new("node2")).unwrap();
584        assert_eq!(node.status, NodeStatus::Healthy);
585    }
586
587    #[test]
588    fn test_leader_management() {
589        let node1 = create_node("node1");
590        let cluster = Cluster::new(node1, ClusterConfig::default());
591
592        assert!(!cluster.is_leader());
593
594        cluster.set_leader(Some(NodeId::new("node1")));
595        assert!(cluster.is_leader());
596
597        cluster.set_leader(Some(NodeId::new("node2")));
598        assert!(!cluster.is_leader());
599    }
600
601    #[test]
602    fn test_max_nodes() {
603        let node1 = create_node("node1");
604        let config = ClusterConfig {
605            max_nodes: 2,
606            ..Default::default()
607        };
608        let cluster = Cluster::new(node1, config);
609
610        cluster.add_node(create_node("node2")).unwrap();
611        let result = cluster.add_node(create_node("node3"));
612        assert!(matches!(result, Err(ClusterError::MaxNodesReached)));
613    }
614}