scirs2_core/distributed/cluster/
mod.rs

1//! Cluster management for distributed computing
2//!
3//! This module provides comprehensive cluster management capabilities
4//! including node discovery, health monitoring, resource allocation,
5//! and fault-tolerant cluster coordination.
6
7// Module declarations
8pub mod allocator;
9pub mod coordination;
10pub mod discovery;
11pub mod events;
12pub mod health;
13pub mod manager;
14pub mod registry;
15pub mod state;
16pub mod types;
17
18// Re-export main types and functionality
19pub use manager::{initialize_cluster_manager, ClusterManager};
20
21pub use types::{
22    AllocationId,
23    AllocationStrategy,
24
25    BackoffStrategy,
26    // Configuration types
27    ClusterConfiguration,
28    // Events
29    ClusterEvent,
30
31    ClusterHealth,
32    ClusterHealthStatus,
33
34    // Statistics
35    ClusterStatistics,
36    // Cluster topology
37    ClusterTopology,
38    // Resource management
39    ComputeCapacity,
40    DataAccessType,
41    DataDependency,
42    // Task management
43    DistributedTask,
44    ExecutionPlan,
45    ExecutionStatus,
46
47    HealthCheck,
48    HealthCheckResult,
49    NetworkTopology,
50
51    NodeCapabilities,
52    NodeDiscoveryMethod,
53    // Health monitoring
54    NodeHealthStatus,
55    // Node types
56    NodeInfo,
57    NodeMetadata,
58    NodeStatus,
59    NodeType,
60    ResourceAllocation,
61    ResourceRequirements,
62    ResourceUtilization,
63    RetryPolicy,
64    SpecializedRequirement,
65
66    SpecializedUnit,
67
68    TaskId,
69    TaskParameters,
70    TaskPriority,
71    TaskType,
72    Zone,
73};
74
75pub use allocator::ResourceAllocator;
76pub use coordination::ClusterCoordination;
77pub use discovery::NodeDiscovery;
78pub use events::ClusterEventLog;
79pub use health::HealthMonitor;
80pub use registry::NodeRegistry;
81pub use state::ClusterState;
82
83#[cfg(test)]
84mod tests {
85    use super::*;
86    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
87    use std::time::Instant;
88
89    #[test]
90    fn test_cluster_manager_creation() {
91        let config = ClusterConfiguration::default();
92        let manager = ClusterManager::new(config).unwrap();
93        // Basic functionality test
94    }
95
96    #[test]
97    fn test_node_registry() {
98        let mut registry = NodeRegistry::new();
99
100        let node = NodeInfo {
101            id: "test_node".to_string(),
102            address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
103            node_type: NodeType::Worker,
104            capabilities: NodeCapabilities::default(),
105            status: NodeStatus::Healthy,
106            last_seen: Instant::now(),
107            metadata: NodeMetadata::default(),
108        };
109
110        let is_new = registry.register_node(node.clone()).unwrap();
111        assert!(is_new);
112
113        let healthy_nodes = registry.get_healthy_nodes();
114        assert_eq!(healthy_nodes.len(), 1);
115        assert_eq!(healthy_nodes[0usize].id, "test_node");
116    }
117
118    #[test]
119    fn test_resource_allocator() {
120        let mut allocator = ResourceAllocator::new();
121
122        // Set some available resources
123        let nodes = vec![NodeInfo {
124            id: "test_node".to_string(),
125            address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
126            node_type: NodeType::Worker,
127            capabilities: NodeCapabilities {
128                cpu_cores: 8,
129                memory_gb: 16,
130                gpu_count: 1,
131                disk_space_gb: 100,
132                networkbandwidth_gbps: 1.0,
133                specialized_units: Vec::new(),
134            },
135            status: NodeStatus::Healthy,
136            last_seen: Instant::now(),
137            metadata: NodeMetadata::default(),
138        }];
139
140        allocator.update_available_resources(&nodes).unwrap();
141
142        let requirements = ResourceRequirements {
143            cpu_cores: 4,
144            memory_gb: 8,
145            gpu_count: 0,
146            disk_space_gb: 50,
147            specialized_requirements: Vec::new(),
148        };
149
150        let allocation = allocator.allocate_resources(&requirements).unwrap();
151        assert_eq!(allocation.allocated_resources.cpu_cores, 4);
152        assert_eq!(allocation.allocated_resources.memory_gb, 8);
153    }
154
155    #[test]
156    fn test_health_monitor() {
157        let mut monitor = HealthMonitor::new().unwrap();
158
159        let node = NodeInfo {
160            id: "test_node".to_string(),
161            address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
162            node_type: NodeType::Worker,
163            capabilities: NodeCapabilities::default(),
164            status: NodeStatus::Unknown,
165            last_seen: Instant::now(),
166            metadata: NodeMetadata::default(),
167        };
168
169        let health_status = monitor.check_node_health(&node).unwrap();
170        assert!(health_status.health_score >= 0.0 && health_status.health_score <= 100.0f64);
171    }
172
173    #[test]
174    fn test_cluster_topology() {
175        let mut topology = ClusterTopology::new();
176
177        let nodes = vec![
178            NodeInfo {
179                id: "node1".to_string(),
180                address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
181                node_type: NodeType::Worker,
182                capabilities: NodeCapabilities::default(),
183                status: NodeStatus::Healthy,
184                last_seen: Instant::now(),
185                metadata: NodeMetadata::default(),
186            },
187            NodeInfo {
188                id: "node2".to_string(),
189                address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 8080),
190                node_type: NodeType::Worker,
191                capabilities: NodeCapabilities::default(),
192                status: NodeStatus::Healthy,
193                last_seen: Instant::now(),
194                metadata: NodeMetadata::default(),
195            },
196        ];
197
198        topology.update_model(&nodes);
199        assert_eq!(topology.zones.len(), 2); // Two different zones based on IP
200    }
201
202    #[test]
203    fn test_cluster_state() {
204        let mut state = ClusterState::new();
205
206        assert!(state.needs_leader_election());
207        assert!(state.get_leader().is_none());
208
209        state.set_leader("node1".to_string());
210        assert_eq!(state.get_leader(), Some(&"node1".to_string()));
211        assert!(!state.needs_leader_election());
212    }
213
214    #[test]
215    fn test_cluster_event_log() {
216        let mut log = ClusterEventLog::new();
217
218        let event = ClusterEvent::NodeDiscovered {
219            nodeid: "test_node".to_string(),
220            address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
221            timestamp: Instant::now(),
222        };
223
224        log.log_event(event);
225        assert_eq!(log.event_count(), 1);
226
227        let recent_events = log.get_recent_events(10);
228        assert_eq!(recent_events.len(), 1);
229    }
230
231    #[test]
232    fn test_coordination_leader_election() {
233        let nodes = vec![
234            NodeInfo {
235                id: "node_z".to_string(),
236                address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
237                node_type: NodeType::Worker,
238                capabilities: NodeCapabilities::default(),
239                status: NodeStatus::Healthy,
240                last_seen: Instant::now(),
241                metadata: NodeMetadata::default(),
242            },
243            NodeInfo {
244                id: "node_a".to_string(),
245                address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)), 8080),
246                node_type: NodeType::Worker,
247                capabilities: NodeCapabilities::default(),
248                status: NodeStatus::Healthy,
249                last_seen: Instant::now(),
250                metadata: NodeMetadata::default(),
251            },
252        ];
253
254        let leader = ClusterCoordination::elect_leader(&nodes).unwrap();
255        assert_eq!(leader, Some("node_a".to_string())); // Should elect lexicographically first
256    }
257}