Skip to main content

aegis_replication/
cluster.rs

1//! Aegis Cluster Management
2//!
3//! Cluster coordination and membership management.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use crate::node::{NodeHealth, NodeId, NodeInfo, NodeRole, NodeStatus};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::RwLock;
12use std::time::Duration;
13
14// =============================================================================
15// Cluster Configuration
16// =============================================================================
17
18/// Configuration for a cluster.
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct ClusterConfig {
21    pub cluster_id: String,
22    pub min_nodes: usize,
23    pub max_nodes: usize,
24    pub heartbeat_interval: Duration,
25    pub failure_timeout: Duration,
26    pub replication_factor: usize,
27    pub quorum_size: usize,
28}
29
30impl Default for ClusterConfig {
31    fn default() -> Self {
32        Self {
33            cluster_id: "aegis-cluster".to_string(),
34            min_nodes: 1,
35            max_nodes: 100,
36            heartbeat_interval: Duration::from_secs(1),
37            failure_timeout: Duration::from_secs(5),
38            replication_factor: 3,
39            quorum_size: 2,
40        }
41    }
42}
43
44impl ClusterConfig {
45    pub fn new(cluster_id: impl Into<String>) -> Self {
46        Self {
47            cluster_id: cluster_id.into(),
48            ..Default::default()
49        }
50    }
51
52    pub fn with_replication_factor(mut self, factor: usize) -> Self {
53        self.replication_factor = factor;
54        self.quorum_size = (factor / 2) + 1;
55        self
56    }
57
58    pub fn with_heartbeat_interval(mut self, interval: Duration) -> Self {
59        self.heartbeat_interval = interval;
60        self
61    }
62
63    pub fn with_failure_timeout(mut self, timeout: Duration) -> Self {
64        self.failure_timeout = timeout;
65        self
66    }
67}
68
69// =============================================================================
70// Cluster State
71// =============================================================================
72
73/// State of the cluster.
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
75pub enum ClusterState {
76    /// Cluster is initializing.
77    #[default]
78    Initializing,
79    /// Cluster is forming (waiting for quorum).
80    Forming,
81    /// Cluster is healthy and operational.
82    Healthy,
83    /// Cluster is degraded (some nodes down).
84    Degraded,
85    /// Cluster has lost quorum.
86    NoQuorum,
87    /// Cluster is shutting down.
88    ShuttingDown,
89}
90
91// =============================================================================
92// Membership Change
93// =============================================================================
94
95/// Type of membership change.
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub enum MembershipChange {
98    AddNode(NodeInfo),
99    RemoveNode(NodeId),
100    UpdateNode(NodeInfo),
101}
102
103/// A membership change request.
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct MembershipChangeRequest {
106    pub change_id: String,
107    pub change: MembershipChange,
108    pub requested_at: u64,
109}
110
111// =============================================================================
112// Cluster
113// =============================================================================
114
115/// A distributed cluster of nodes.
116pub struct Cluster {
117    config: ClusterConfig,
118    state: RwLock<ClusterState>,
119    nodes: RwLock<HashMap<NodeId, NodeInfo>>,
120    leader_id: RwLock<Option<NodeId>>,
121    local_node_id: NodeId,
122    health_checks: RwLock<HashMap<NodeId, NodeHealth>>,
123}
124
125impl Cluster {
126    /// Create a new cluster.
127    pub fn new(local_node: NodeInfo, config: ClusterConfig) -> Self {
128        let local_id = local_node.id.clone();
129        let mut nodes = HashMap::new();
130        nodes.insert(local_id.clone(), local_node);
131
132        Self {
133            config,
134            state: RwLock::new(ClusterState::Initializing),
135            nodes: RwLock::new(nodes),
136            leader_id: RwLock::new(None),
137            local_node_id: local_id,
138            health_checks: RwLock::new(HashMap::new()),
139        }
140    }
141
142    /// Get the cluster ID.
143    pub fn id(&self) -> &str {
144        &self.config.cluster_id
145    }
146
147    /// Get the cluster configuration.
148    pub fn config(&self) -> &ClusterConfig {
149        &self.config
150    }
151
152    /// Get the current cluster state.
153    pub fn state(&self) -> ClusterState {
154        *self.state.read().expect("cluster state lock poisoned")
155    }
156
157    /// Set the cluster state.
158    pub fn set_state(&self, state: ClusterState) {
159        *self.state.write().expect("cluster state lock poisoned") = state;
160    }
161
162    /// Get the local node ID.
163    pub fn local_node_id(&self) -> &NodeId {
164        &self.local_node_id
165    }
166
167    /// Get the current leader ID.
168    pub fn leader_id(&self) -> Option<NodeId> {
169        self.leader_id
170            .read()
171            .expect("cluster leader_id lock poisoned")
172            .clone()
173    }
174
175    /// Set the leader ID.
176    pub fn set_leader(&self, leader_id: Option<NodeId>) {
177        *self
178            .leader_id
179            .write()
180            .expect("cluster leader_id lock poisoned") = leader_id;
181    }
182
183    /// Check if this node is the leader.
184    pub fn is_leader(&self) -> bool {
185        self.leader_id()
186            .map(|id| id == self.local_node_id)
187            .unwrap_or(false)
188    }
189
190    // =========================================================================
191    // Node Management
192    // =========================================================================
193
194    /// Add a node to the cluster.
195    pub fn add_node(&self, node: NodeInfo) -> Result<(), ClusterError> {
196        let mut nodes = self.nodes.write().expect("cluster nodes lock poisoned");
197
198        if nodes.len() >= self.config.max_nodes {
199            return Err(ClusterError::MaxNodesReached);
200        }
201
202        if nodes.contains_key(&node.id) {
203            return Err(ClusterError::NodeAlreadyExists(node.id.clone()));
204        }
205
206        nodes.insert(node.id.clone(), node);
207        drop(nodes);
208
209        self.update_cluster_state();
210        Ok(())
211    }
212
213    /// Remove a node from the cluster.
214    pub fn remove_node(&self, node_id: &NodeId) -> Result<NodeInfo, ClusterError> {
215        if node_id == &self.local_node_id {
216            return Err(ClusterError::CannotRemoveLocalNode);
217        }
218
219        let mut nodes = self.nodes.write().expect("cluster nodes lock poisoned");
220        let node = nodes
221            .remove(node_id)
222            .ok_or_else(|| ClusterError::NodeNotFound(node_id.clone()))?;
223
224        drop(nodes);
225        self.update_cluster_state();
226        Ok(node)
227    }
228
229    /// Get a node by ID.
230    pub fn get_node(&self, node_id: &NodeId) -> Option<NodeInfo> {
231        self.nodes
232            .read()
233            .expect("cluster nodes lock poisoned")
234            .get(node_id)
235            .cloned()
236    }
237
238    /// Get all nodes in the cluster.
239    pub fn nodes(&self) -> Vec<NodeInfo> {
240        self.nodes
241            .read()
242            .expect("cluster nodes lock poisoned")
243            .values()
244            .cloned()
245            .collect()
246    }
247
248    /// Get all node IDs.
249    pub fn node_ids(&self) -> Vec<NodeId> {
250        self.nodes
251            .read()
252            .expect("cluster nodes lock poisoned")
253            .keys()
254            .cloned()
255            .collect()
256    }
257
258    /// Get the number of nodes.
259    pub fn node_count(&self) -> usize {
260        self.nodes
261            .read()
262            .expect("cluster nodes lock poisoned")
263            .len()
264    }
265
266    /// Get peer nodes (excluding local).
267    pub fn peers(&self) -> Vec<NodeInfo> {
268        self.nodes
269            .read()
270            .expect("cluster nodes lock poisoned")
271            .values()
272            .filter(|n| n.id != self.local_node_id)
273            .cloned()
274            .collect()
275    }
276
277    /// Get peer IDs.
278    pub fn peer_ids(&self) -> Vec<NodeId> {
279        self.nodes
280            .read()
281            .expect("cluster nodes lock poisoned")
282            .keys()
283            .filter(|id| *id != &self.local_node_id)
284            .cloned()
285            .collect()
286    }
287
288    // =========================================================================
289    // Health Management
290    // =========================================================================
291
292    /// Update node health.
293    pub fn update_health(&self, health: NodeHealth) {
294        let node_id = health.node_id.clone();
295        let mut checks = self
296            .health_checks
297            .write()
298            .expect("cluster health_checks lock poisoned");
299        checks.insert(node_id.clone(), health.clone());
300        drop(checks);
301
302        let mut nodes = self.nodes.write().expect("cluster nodes lock poisoned");
303        if let Some(node) = nodes.get_mut(&node_id) {
304            if health.healthy {
305                node.mark_healthy();
306            } else {
307                node.mark_suspect();
308            }
309        }
310        drop(nodes);
311
312        self.update_cluster_state();
313    }
314
315    /// Get health for a node.
316    pub fn get_health(&self, node_id: &NodeId) -> Option<NodeHealth> {
317        self.health_checks
318            .read()
319            .expect("cluster health_checks lock poisoned")
320            .get(node_id)
321            .cloned()
322    }
323
324    /// Process heartbeat from a node.
325    pub fn heartbeat(&self, node_id: &NodeId) {
326        let mut nodes = self.nodes.write().expect("cluster nodes lock poisoned");
327        if let Some(node) = nodes.get_mut(node_id) {
328            node.heartbeat();
329        }
330    }
331
332    /// Check for failed nodes based on timeout.
333    pub fn check_failures(&self) -> Vec<NodeId> {
334        let timeout_ms = self.config.failure_timeout.as_millis() as u64;
335        let mut failed = Vec::new();
336
337        let mut nodes = self.nodes.write().expect("cluster nodes lock poisoned");
338        for node in nodes.values_mut() {
339            if node.id == self.local_node_id {
340                continue;
341            }
342
343            if node.heartbeat_age() > timeout_ms {
344                if node.status == NodeStatus::Healthy {
345                    node.mark_suspect();
346                } else if node.status == NodeStatus::Suspect {
347                    node.mark_down();
348                    failed.push(node.id.clone());
349                }
350            }
351        }
352
353        drop(nodes);
354
355        if !failed.is_empty() {
356            self.update_cluster_state();
357        }
358
359        failed
360    }
361
362    // =========================================================================
363    // Cluster State Management
364    // =========================================================================
365
366    /// Update the cluster state based on node health.
367    fn update_cluster_state(&self) {
368        let nodes = self.nodes.read().expect("cluster nodes lock poisoned");
369        let total = nodes.len();
370        let healthy = nodes
371            .values()
372            .filter(|n| n.status == NodeStatus::Healthy || n.status == NodeStatus::Starting)
373            .count();
374
375        drop(nodes);
376
377        let new_state = if total < self.config.min_nodes {
378            ClusterState::Forming
379        } else if healthy >= self.config.quorum_size {
380            if healthy == total {
381                ClusterState::Healthy
382            } else {
383                ClusterState::Degraded
384            }
385        } else {
386            ClusterState::NoQuorum
387        };
388
389        self.set_state(new_state);
390    }
391
392    /// Check if cluster has quorum.
393    pub fn has_quorum(&self) -> bool {
394        let nodes = self.nodes.read().expect("cluster nodes lock poisoned");
395        let healthy = nodes
396            .values()
397            .filter(|n| n.status == NodeStatus::Healthy)
398            .count();
399        healthy >= self.config.quorum_size
400    }
401
402    /// Get cluster statistics.
403    pub fn stats(&self) -> ClusterStats {
404        let nodes = self.nodes.read().expect("cluster nodes lock poisoned");
405        let total = nodes.len();
406        let healthy = nodes
407            .values()
408            .filter(|n| n.status == NodeStatus::Healthy)
409            .count();
410        let suspect = nodes
411            .values()
412            .filter(|n| n.status == NodeStatus::Suspect)
413            .count();
414        let down = nodes
415            .values()
416            .filter(|n| n.status == NodeStatus::Down)
417            .count();
418
419        ClusterStats {
420            cluster_id: self.config.cluster_id.clone(),
421            state: self.state(),
422            total_nodes: total,
423            healthy_nodes: healthy,
424            suspect_nodes: suspect,
425            down_nodes: down,
426            has_leader: self.leader_id().is_some(),
427            has_quorum: healthy >= self.config.quorum_size,
428        }
429    }
430
431    // =========================================================================
432    // Leader Election Support
433    // =========================================================================
434
435    /// Get nodes that can vote.
436    pub fn voting_members(&self) -> Vec<NodeId> {
437        self.nodes
438            .read()
439            .expect("cluster nodes lock poisoned")
440            .values()
441            .filter(|n| n.is_available())
442            .map(|n| n.id.clone())
443            .collect()
444    }
445
446    /// Update node role.
447    pub fn set_node_role(&self, node_id: &NodeId, role: NodeRole) {
448        let mut nodes = self.nodes.write().expect("cluster nodes lock poisoned");
449        if let Some(node) = nodes.get_mut(node_id) {
450            node.role = role;
451        }
452    }
453
454    /// Get the current leader node.
455    pub fn leader(&self) -> Option<NodeInfo> {
456        let leader_id = self.leader_id()?;
457        self.get_node(&leader_id)
458    }
459}
460
461// =============================================================================
462// Cluster Statistics
463// =============================================================================
464
465/// Statistics about the cluster.
466#[derive(Debug, Clone, Serialize, Deserialize)]
467pub struct ClusterStats {
468    pub cluster_id: String,
469    pub state: ClusterState,
470    pub total_nodes: usize,
471    pub healthy_nodes: usize,
472    pub suspect_nodes: usize,
473    pub down_nodes: usize,
474    pub has_leader: bool,
475    pub has_quorum: bool,
476}
477
478// =============================================================================
479// Cluster Error
480// =============================================================================
481
482/// Errors that can occur in cluster operations.
483#[derive(Debug, Clone)]
484pub enum ClusterError {
485    NodeNotFound(NodeId),
486    NodeAlreadyExists(NodeId),
487    MaxNodesReached,
488    CannotRemoveLocalNode,
489    NoQuorum,
490    NotLeader,
491    ConfigurationError(String),
492}
493
494impl std::fmt::Display for ClusterError {
495    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
496        match self {
497            Self::NodeNotFound(id) => write!(f, "Node not found: {}", id),
498            Self::NodeAlreadyExists(id) => write!(f, "Node already exists: {}", id),
499            Self::MaxNodesReached => write!(f, "Maximum number of nodes reached"),
500            Self::CannotRemoveLocalNode => write!(f, "Cannot remove local node"),
501            Self::NoQuorum => write!(f, "Cluster has no quorum"),
502            Self::NotLeader => write!(f, "Not the leader"),
503            Self::ConfigurationError(msg) => write!(f, "Configuration error: {}", msg),
504        }
505    }
506}
507
508impl std::error::Error for ClusterError {}
509
510// =============================================================================
511// Tests
512// =============================================================================
513
514#[cfg(test)]
515mod tests {
516    use super::*;
517
518    fn create_node(id: &str) -> NodeInfo {
519        NodeInfo::new(id, "127.0.0.1", 5000)
520    }
521
522    #[test]
523    fn test_cluster_config() {
524        let config = ClusterConfig::new("test-cluster")
525            .with_replication_factor(3)
526            .with_heartbeat_interval(Duration::from_secs(2));
527
528        assert_eq!(config.cluster_id, "test-cluster");
529        assert_eq!(config.replication_factor, 3);
530        assert_eq!(config.quorum_size, 2);
531    }
532
533    #[test]
534    fn test_cluster_creation() {
535        let node = create_node("node1");
536        let cluster = Cluster::new(node, ClusterConfig::default());
537
538        assert_eq!(cluster.node_count(), 1);
539        assert_eq!(cluster.state(), ClusterState::Initializing);
540        assert!(cluster.leader_id().is_none());
541    }
542
543    #[test]
544    fn test_add_remove_node() {
545        let node1 = create_node("node1");
546        let cluster = Cluster::new(node1, ClusterConfig::default());
547
548        let node2 = create_node("node2");
549        cluster.add_node(node2).unwrap();
550        assert_eq!(cluster.node_count(), 2);
551
552        cluster.remove_node(&NodeId::new("node2")).unwrap();
553        assert_eq!(cluster.node_count(), 1);
554    }
555
556    #[test]
557    fn test_cannot_remove_local_node() {
558        let node = create_node("node1");
559        let cluster = Cluster::new(node, ClusterConfig::default());
560
561        let result = cluster.remove_node(&NodeId::new("node1"));
562        assert!(matches!(result, Err(ClusterError::CannotRemoveLocalNode)));
563    }
564
565    #[test]
566    fn test_peers() {
567        let node1 = create_node("node1");
568        let cluster = Cluster::new(node1, ClusterConfig::default());
569
570        cluster.add_node(create_node("node2")).unwrap();
571        cluster.add_node(create_node("node3")).unwrap();
572
573        let peers = cluster.peers();
574        assert_eq!(peers.len(), 2);
575
576        let peer_ids = cluster.peer_ids();
577        assert!(peer_ids.contains(&NodeId::new("node2")));
578        assert!(peer_ids.contains(&NodeId::new("node3")));
579        assert!(!peer_ids.contains(&NodeId::new("node1")));
580    }
581
582    #[test]
583    fn test_cluster_stats() {
584        let mut node1 = create_node("node1");
585        node1.mark_healthy();
586
587        let config = ClusterConfig::default().with_replication_factor(3);
588        let cluster = Cluster::new(node1, config);
589
590        let mut node2 = create_node("node2");
591        node2.mark_healthy();
592        cluster.add_node(node2).unwrap();
593
594        let stats = cluster.stats();
595        assert_eq!(stats.total_nodes, 2);
596        assert_eq!(stats.healthy_nodes, 2);
597        assert!(stats.has_quorum);
598    }
599
600    #[test]
601    fn test_heartbeat() {
602        let node1 = create_node("node1");
603        let cluster = Cluster::new(node1, ClusterConfig::default());
604
605        let mut node2 = create_node("node2");
606        node2.status = NodeStatus::Suspect;
607        cluster.add_node(node2).unwrap();
608
609        cluster.heartbeat(&NodeId::new("node2"));
610
611        let node = cluster.get_node(&NodeId::new("node2")).unwrap();
612        assert_eq!(node.status, NodeStatus::Healthy);
613    }
614
615    #[test]
616    fn test_leader_management() {
617        let node1 = create_node("node1");
618        let cluster = Cluster::new(node1, ClusterConfig::default());
619
620        assert!(!cluster.is_leader());
621
622        cluster.set_leader(Some(NodeId::new("node1")));
623        assert!(cluster.is_leader());
624
625        cluster.set_leader(Some(NodeId::new("node2")));
626        assert!(!cluster.is_leader());
627    }
628
629    #[test]
630    fn test_max_nodes() {
631        let node1 = create_node("node1");
632        let config = ClusterConfig {
633            max_nodes: 2,
634            ..Default::default()
635        };
636        let cluster = Cluster::new(node1, config);
637
638        cluster.add_node(create_node("node2")).unwrap();
639        let result = cluster.add_node(create_node("node3"));
640        assert!(matches!(result, Err(ClusterError::MaxNodesReached)));
641    }
642}