aegis_replication/
cluster.rs

1//! Aegis Cluster Management
2//!
3//! Cluster coordination and membership management.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use crate::node::{NodeHealth, NodeId, NodeInfo, NodeRole, NodeStatus};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::RwLock;
12use std::time::Duration;
13
14// =============================================================================
15// Cluster Configuration
16// =============================================================================
17
18/// Configuration for a cluster.
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct ClusterConfig {
21    pub cluster_id: String,
22    pub min_nodes: usize,
23    pub max_nodes: usize,
24    pub heartbeat_interval: Duration,
25    pub failure_timeout: Duration,
26    pub replication_factor: usize,
27    pub quorum_size: usize,
28}
29
30impl Default for ClusterConfig {
31    fn default() -> Self {
32        Self {
33            cluster_id: "aegis-cluster".to_string(),
34            min_nodes: 1,
35            max_nodes: 100,
36            heartbeat_interval: Duration::from_secs(1),
37            failure_timeout: Duration::from_secs(5),
38            replication_factor: 3,
39            quorum_size: 2,
40        }
41    }
42}
43
44impl ClusterConfig {
45    pub fn new(cluster_id: impl Into<String>) -> Self {
46        Self {
47            cluster_id: cluster_id.into(),
48            ..Default::default()
49        }
50    }
51
52    pub fn with_replication_factor(mut self, factor: usize) -> Self {
53        self.replication_factor = factor;
54        self.quorum_size = (factor / 2) + 1;
55        self
56    }
57
58    pub fn with_heartbeat_interval(mut self, interval: Duration) -> Self {
59        self.heartbeat_interval = interval;
60        self
61    }
62
63    pub fn with_failure_timeout(mut self, timeout: Duration) -> Self {
64        self.failure_timeout = timeout;
65        self
66    }
67}
68
69// =============================================================================
70// Cluster State
71// =============================================================================
72
73/// State of the cluster.
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
75pub enum ClusterState {
76    /// Cluster is initializing.
77    Initializing,
78    /// Cluster is forming (waiting for quorum).
79    Forming,
80    /// Cluster is healthy and operational.
81    Healthy,
82    /// Cluster is degraded (some nodes down).
83    Degraded,
84    /// Cluster has lost quorum.
85    NoQuorum,
86    /// Cluster is shutting down.
87    ShuttingDown,
88}
89
90impl Default for ClusterState {
91    fn default() -> Self {
92        Self::Initializing
93    }
94}
95
96// =============================================================================
97// Membership Change
98// =============================================================================
99
100/// Type of membership change.
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub enum MembershipChange {
103    AddNode(NodeInfo),
104    RemoveNode(NodeId),
105    UpdateNode(NodeInfo),
106}
107
108/// A membership change request.
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct MembershipChangeRequest {
111    pub change_id: String,
112    pub change: MembershipChange,
113    pub requested_at: u64,
114}
115
116// =============================================================================
117// Cluster
118// =============================================================================
119
120/// A distributed cluster of nodes.
121pub struct Cluster {
122    config: ClusterConfig,
123    state: RwLock<ClusterState>,
124    nodes: RwLock<HashMap<NodeId, NodeInfo>>,
125    leader_id: RwLock<Option<NodeId>>,
126    local_node_id: NodeId,
127    health_checks: RwLock<HashMap<NodeId, NodeHealth>>,
128}
129
130impl Cluster {
131    /// Create a new cluster.
132    pub fn new(local_node: NodeInfo, config: ClusterConfig) -> Self {
133        let local_id = local_node.id.clone();
134        let mut nodes = HashMap::new();
135        nodes.insert(local_id.clone(), local_node);
136
137        Self {
138            config,
139            state: RwLock::new(ClusterState::Initializing),
140            nodes: RwLock::new(nodes),
141            leader_id: RwLock::new(None),
142            local_node_id: local_id,
143            health_checks: RwLock::new(HashMap::new()),
144        }
145    }
146
147    /// Get the cluster ID.
148    pub fn id(&self) -> &str {
149        &self.config.cluster_id
150    }
151
152    /// Get the cluster configuration.
153    pub fn config(&self) -> &ClusterConfig {
154        &self.config
155    }
156
157    /// Get the current cluster state.
158    pub fn state(&self) -> ClusterState {
159        *self.state.read().unwrap()
160    }
161
162    /// Set the cluster state.
163    pub fn set_state(&self, state: ClusterState) {
164        *self.state.write().unwrap() = state;
165    }
166
167    /// Get the local node ID.
168    pub fn local_node_id(&self) -> &NodeId {
169        &self.local_node_id
170    }
171
172    /// Get the current leader ID.
173    pub fn leader_id(&self) -> Option<NodeId> {
174        self.leader_id.read().unwrap().clone()
175    }
176
177    /// Set the leader ID.
178    pub fn set_leader(&self, leader_id: Option<NodeId>) {
179        *self.leader_id.write().unwrap() = leader_id;
180    }
181
182    /// Check if this node is the leader.
183    pub fn is_leader(&self) -> bool {
184        self.leader_id()
185            .map(|id| id == self.local_node_id)
186            .unwrap_or(false)
187    }
188
189    // =========================================================================
190    // Node Management
191    // =========================================================================
192
193    /// Add a node to the cluster.
194    pub fn add_node(&self, node: NodeInfo) -> Result<(), ClusterError> {
195        let mut nodes = self.nodes.write().unwrap();
196
197        if nodes.len() >= self.config.max_nodes {
198            return Err(ClusterError::MaxNodesReached);
199        }
200
201        if nodes.contains_key(&node.id) {
202            return Err(ClusterError::NodeAlreadyExists(node.id.clone()));
203        }
204
205        nodes.insert(node.id.clone(), node);
206        drop(nodes);
207
208        self.update_cluster_state();
209        Ok(())
210    }
211
212    /// Remove a node from the cluster.
213    pub fn remove_node(&self, node_id: &NodeId) -> Result<NodeInfo, ClusterError> {
214        if node_id == &self.local_node_id {
215            return Err(ClusterError::CannotRemoveLocalNode);
216        }
217
218        let mut nodes = self.nodes.write().unwrap();
219        let node = nodes
220            .remove(node_id)
221            .ok_or_else(|| ClusterError::NodeNotFound(node_id.clone()))?;
222
223        drop(nodes);
224        self.update_cluster_state();
225        Ok(node)
226    }
227
228    /// Get a node by ID.
229    pub fn get_node(&self, node_id: &NodeId) -> Option<NodeInfo> {
230        self.nodes.read().unwrap().get(node_id).cloned()
231    }
232
233    /// Get all nodes in the cluster.
234    pub fn nodes(&self) -> Vec<NodeInfo> {
235        self.nodes.read().unwrap().values().cloned().collect()
236    }
237
238    /// Get all node IDs.
239    pub fn node_ids(&self) -> Vec<NodeId> {
240        self.nodes.read().unwrap().keys().cloned().collect()
241    }
242
243    /// Get the number of nodes.
244    pub fn node_count(&self) -> usize {
245        self.nodes.read().unwrap().len()
246    }
247
248    /// Get peer nodes (excluding local).
249    pub fn peers(&self) -> Vec<NodeInfo> {
250        self.nodes
251            .read()
252            .unwrap()
253            .values()
254            .filter(|n| n.id != self.local_node_id)
255            .cloned()
256            .collect()
257    }
258
259    /// Get peer IDs.
260    pub fn peer_ids(&self) -> Vec<NodeId> {
261        self.nodes
262            .read()
263            .unwrap()
264            .keys()
265            .filter(|id| *id != &self.local_node_id)
266            .cloned()
267            .collect()
268    }
269
270    // =========================================================================
271    // Health Management
272    // =========================================================================
273
274    /// Update node health.
275    pub fn update_health(&self, health: NodeHealth) {
276        let node_id = health.node_id.clone();
277        let mut checks = self.health_checks.write().unwrap();
278        checks.insert(node_id.clone(), health.clone());
279        drop(checks);
280
281        let mut nodes = self.nodes.write().unwrap();
282        if let Some(node) = nodes.get_mut(&node_id) {
283            if health.healthy {
284                node.mark_healthy();
285            } else {
286                node.mark_suspect();
287            }
288        }
289        drop(nodes);
290
291        self.update_cluster_state();
292    }
293
294    /// Get health for a node.
295    pub fn get_health(&self, node_id: &NodeId) -> Option<NodeHealth> {
296        self.health_checks.read().unwrap().get(node_id).cloned()
297    }
298
299    /// Process heartbeat from a node.
300    pub fn heartbeat(&self, node_id: &NodeId) {
301        let mut nodes = self.nodes.write().unwrap();
302        if let Some(node) = nodes.get_mut(node_id) {
303            node.heartbeat();
304        }
305    }
306
307    /// Check for failed nodes based on timeout.
308    pub fn check_failures(&self) -> Vec<NodeId> {
309        let timeout_ms = self.config.failure_timeout.as_millis() as u64;
310        let mut failed = Vec::new();
311
312        let mut nodes = self.nodes.write().unwrap();
313        for node in nodes.values_mut() {
314            if node.id == self.local_node_id {
315                continue;
316            }
317
318            if node.heartbeat_age() > timeout_ms {
319                if node.status == NodeStatus::Healthy {
320                    node.mark_suspect();
321                } else if node.status == NodeStatus::Suspect {
322                    node.mark_down();
323                    failed.push(node.id.clone());
324                }
325            }
326        }
327
328        drop(nodes);
329
330        if !failed.is_empty() {
331            self.update_cluster_state();
332        }
333
334        failed
335    }
336
337    // =========================================================================
338    // Cluster State Management
339    // =========================================================================
340
341    /// Update the cluster state based on node health.
342    fn update_cluster_state(&self) {
343        let nodes = self.nodes.read().unwrap();
344        let total = nodes.len();
345        let healthy = nodes
346            .values()
347            .filter(|n| n.status == NodeStatus::Healthy || n.status == NodeStatus::Starting)
348            .count();
349
350        drop(nodes);
351
352        let new_state = if total < self.config.min_nodes {
353            ClusterState::Forming
354        } else if healthy >= self.config.quorum_size {
355            if healthy == total {
356                ClusterState::Healthy
357            } else {
358                ClusterState::Degraded
359            }
360        } else {
361            ClusterState::NoQuorum
362        };
363
364        self.set_state(new_state);
365    }
366
367    /// Check if cluster has quorum.
368    pub fn has_quorum(&self) -> bool {
369        let nodes = self.nodes.read().unwrap();
370        let healthy = nodes
371            .values()
372            .filter(|n| n.status == NodeStatus::Healthy)
373            .count();
374        healthy >= self.config.quorum_size
375    }
376
377    /// Get cluster statistics.
378    pub fn stats(&self) -> ClusterStats {
379        let nodes = self.nodes.read().unwrap();
380        let total = nodes.len();
381        let healthy = nodes
382            .values()
383            .filter(|n| n.status == NodeStatus::Healthy)
384            .count();
385        let suspect = nodes
386            .values()
387            .filter(|n| n.status == NodeStatus::Suspect)
388            .count();
389        let down = nodes
390            .values()
391            .filter(|n| n.status == NodeStatus::Down)
392            .count();
393
394        ClusterStats {
395            cluster_id: self.config.cluster_id.clone(),
396            state: self.state(),
397            total_nodes: total,
398            healthy_nodes: healthy,
399            suspect_nodes: suspect,
400            down_nodes: down,
401            has_leader: self.leader_id().is_some(),
402            has_quorum: healthy >= self.config.quorum_size,
403        }
404    }
405
406    // =========================================================================
407    // Leader Election Support
408    // =========================================================================
409
410    /// Get nodes that can vote.
411    pub fn voting_members(&self) -> Vec<NodeId> {
412        self.nodes
413            .read()
414            .unwrap()
415            .values()
416            .filter(|n| n.is_available())
417            .map(|n| n.id.clone())
418            .collect()
419    }
420
421    /// Update node role.
422    pub fn set_node_role(&self, node_id: &NodeId, role: NodeRole) {
423        let mut nodes = self.nodes.write().unwrap();
424        if let Some(node) = nodes.get_mut(node_id) {
425            node.role = role;
426        }
427    }
428
429    /// Get the current leader node.
430    pub fn leader(&self) -> Option<NodeInfo> {
431        let leader_id = self.leader_id()?;
432        self.get_node(&leader_id)
433    }
434}
435
436// =============================================================================
437// Cluster Statistics
438// =============================================================================
439
440/// Statistics about the cluster.
441#[derive(Debug, Clone, Serialize, Deserialize)]
442pub struct ClusterStats {
443    pub cluster_id: String,
444    pub state: ClusterState,
445    pub total_nodes: usize,
446    pub healthy_nodes: usize,
447    pub suspect_nodes: usize,
448    pub down_nodes: usize,
449    pub has_leader: bool,
450    pub has_quorum: bool,
451}
452
453// =============================================================================
454// Cluster Error
455// =============================================================================
456
457/// Errors that can occur in cluster operations.
458#[derive(Debug, Clone)]
459pub enum ClusterError {
460    NodeNotFound(NodeId),
461    NodeAlreadyExists(NodeId),
462    MaxNodesReached,
463    CannotRemoveLocalNode,
464    NoQuorum,
465    NotLeader,
466    ConfigurationError(String),
467}
468
469impl std::fmt::Display for ClusterError {
470    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
471        match self {
472            Self::NodeNotFound(id) => write!(f, "Node not found: {}", id),
473            Self::NodeAlreadyExists(id) => write!(f, "Node already exists: {}", id),
474            Self::MaxNodesReached => write!(f, "Maximum number of nodes reached"),
475            Self::CannotRemoveLocalNode => write!(f, "Cannot remove local node"),
476            Self::NoQuorum => write!(f, "Cluster has no quorum"),
477            Self::NotLeader => write!(f, "Not the leader"),
478            Self::ConfigurationError(msg) => write!(f, "Configuration error: {}", msg),
479        }
480    }
481}
482
483impl std::error::Error for ClusterError {}
484
485// =============================================================================
486// Tests
487// =============================================================================
488
489#[cfg(test)]
490mod tests {
491    use super::*;
492
493    fn create_node(id: &str) -> NodeInfo {
494        NodeInfo::new(id, "127.0.0.1", 5000)
495    }
496
497    #[test]
498    fn test_cluster_config() {
499        let config = ClusterConfig::new("test-cluster")
500            .with_replication_factor(3)
501            .with_heartbeat_interval(Duration::from_secs(2));
502
503        assert_eq!(config.cluster_id, "test-cluster");
504        assert_eq!(config.replication_factor, 3);
505        assert_eq!(config.quorum_size, 2);
506    }
507
508    #[test]
509    fn test_cluster_creation() {
510        let node = create_node("node1");
511        let cluster = Cluster::new(node, ClusterConfig::default());
512
513        assert_eq!(cluster.node_count(), 1);
514        assert_eq!(cluster.state(), ClusterState::Initializing);
515        assert!(cluster.leader_id().is_none());
516    }
517
518    #[test]
519    fn test_add_remove_node() {
520        let node1 = create_node("node1");
521        let cluster = Cluster::new(node1, ClusterConfig::default());
522
523        let node2 = create_node("node2");
524        cluster.add_node(node2).unwrap();
525        assert_eq!(cluster.node_count(), 2);
526
527        cluster.remove_node(&NodeId::new("node2")).unwrap();
528        assert_eq!(cluster.node_count(), 1);
529    }
530
531    #[test]
532    fn test_cannot_remove_local_node() {
533        let node = create_node("node1");
534        let cluster = Cluster::new(node, ClusterConfig::default());
535
536        let result = cluster.remove_node(&NodeId::new("node1"));
537        assert!(matches!(result, Err(ClusterError::CannotRemoveLocalNode)));
538    }
539
540    #[test]
541    fn test_peers() {
542        let node1 = create_node("node1");
543        let cluster = Cluster::new(node1, ClusterConfig::default());
544
545        cluster.add_node(create_node("node2")).unwrap();
546        cluster.add_node(create_node("node3")).unwrap();
547
548        let peers = cluster.peers();
549        assert_eq!(peers.len(), 2);
550
551        let peer_ids = cluster.peer_ids();
552        assert!(peer_ids.contains(&NodeId::new("node2")));
553        assert!(peer_ids.contains(&NodeId::new("node3")));
554        assert!(!peer_ids.contains(&NodeId::new("node1")));
555    }
556
557    #[test]
558    fn test_cluster_stats() {
559        let mut node1 = create_node("node1");
560        node1.mark_healthy();
561
562        let config = ClusterConfig::default().with_replication_factor(3);
563        let cluster = Cluster::new(node1, config);
564
565        let mut node2 = create_node("node2");
566        node2.mark_healthy();
567        cluster.add_node(node2).unwrap();
568
569        let stats = cluster.stats();
570        assert_eq!(stats.total_nodes, 2);
571        assert_eq!(stats.healthy_nodes, 2);
572        assert!(stats.has_quorum);
573    }
574
575    #[test]
576    fn test_heartbeat() {
577        let node1 = create_node("node1");
578        let cluster = Cluster::new(node1, ClusterConfig::default());
579
580        let mut node2 = create_node("node2");
581        node2.status = NodeStatus::Suspect;
582        cluster.add_node(node2).unwrap();
583
584        cluster.heartbeat(&NodeId::new("node2"));
585
586        let node = cluster.get_node(&NodeId::new("node2")).unwrap();
587        assert_eq!(node.status, NodeStatus::Healthy);
588    }
589
590    #[test]
591    fn test_leader_management() {
592        let node1 = create_node("node1");
593        let cluster = Cluster::new(node1, ClusterConfig::default());
594
595        assert!(!cluster.is_leader());
596
597        cluster.set_leader(Some(NodeId::new("node1")));
598        assert!(cluster.is_leader());
599
600        cluster.set_leader(Some(NodeId::new("node2")));
601        assert!(!cluster.is_leader());
602    }
603
604    #[test]
605    fn test_max_nodes() {
606        let node1 = create_node("node1");
607        let config = ClusterConfig {
608            max_nodes: 2,
609            ..Default::default()
610        };
611        let cluster = Cluster::new(node1, config);
612
613        cluster.add_node(create_node("node2")).unwrap();
614        let result = cluster.add_node(create_node("node3"));
615        assert!(matches!(result, Err(ClusterError::MaxNodesReached)));
616    }
617}