scirs2_core/distributed/cluster/
mod.rs

1//! Cluster management for distributed computing
2//!
3//! This module provides comprehensive cluster management capabilities
4//! including node discovery, health monitoring, resource allocation,
5//! and fault-tolerant cluster coordination.
6
7// Module declarations
8pub mod allocator;
9pub mod coordination;
10pub mod discovery;
11pub mod events;
12pub mod health;
13pub mod manager;
14pub mod registry;
15pub mod state;
16pub mod types;
17
18// Re-export main types and functionality
19pub use manager::{initialize_cluster_manager, ClusterManager};
20
21pub use types::{
22    AllocationId,
23    AllocationStrategy,
24
25    BackoffStrategy,
26    // Configuration types
27    ClusterConfiguration,
28    // Events
29    ClusterEvent,
30
31    ClusterHealth,
32    ClusterHealthStatus,
33
34    // Statistics
35    ClusterStatistics,
36    // Cluster topology
37    ClusterTopology,
38    // Resource management
39    ComputeCapacity,
40    DataAccessType,
41    DataDependency,
42    // Task management
43    DistributedTask,
44    ExecutionPlan,
45    ExecutionStatus,
46
47    HealthCheck,
48    HealthCheckResult,
49    NetworkTopology,
50
51    NodeCapabilities,
52    NodeDiscoveryMethod,
53    // Health monitoring
54    NodeHealthStatus,
55    // Node types
56    NodeInfo,
57    NodeMetadata,
58    NodeStatus,
59    NodeType,
60    ResourceAllocation,
61    ResourceRequirements,
62    ResourceUtilization,
63    RetryPolicy,
64    SpecializedRequirement,
65
66    SpecializedUnit,
67
68    TaskId,
69    TaskParameters,
70    TaskPriority,
71    TaskType,
72    Zone,
73};
74
75pub use allocator::ResourceAllocator;
76pub use coordination::ClusterCoordination;
77pub use discovery::NodeDiscovery;
78pub use events::ClusterEventLog;
79pub use health::HealthMonitor;
80pub use registry::NodeRegistry;
81pub use state::ClusterState;
82
83#[cfg(test)]
84mod tests {
85    use super::*;
86    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
87    use std::time::Instant;
88
89    #[test]
90    fn test_cluster_manager_creation() {
91        let config = ClusterConfiguration::default();
92        let manager = ClusterManager::new(config).expect("Operation failed");
93        // Basic functionality test
94    }
95
96    #[test]
97    fn test_node_registry() {
98        let mut registry = NodeRegistry::new();
99
100        let node = NodeInfo {
101            id: "test_node".to_string(),
102            address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
103            node_type: NodeType::Worker,
104            capabilities: NodeCapabilities::default(),
105            status: NodeStatus::Healthy,
106            last_seen: Instant::now(),
107            metadata: NodeMetadata::default(),
108        };
109
110        let is_new = registry
111            .register_node(node.clone())
112            .expect("Operation failed");
113        assert!(is_new);
114
115        let healthy_nodes = registry.get_healthy_nodes();
116        assert_eq!(healthy_nodes.len(), 1);
117        assert_eq!(healthy_nodes[0usize].id, "test_node");
118    }
119
120    #[test]
121    fn test_resource_allocator() {
122        let mut allocator = ResourceAllocator::new();
123
124        // Set some available resources
125        let nodes = vec![NodeInfo {
126            id: "test_node".to_string(),
127            address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
128            node_type: NodeType::Worker,
129            capabilities: NodeCapabilities {
130                cpu_cores: 8,
131                memory_gb: 16,
132                gpu_count: 1,
133                disk_space_gb: 100,
134                networkbandwidth_gbps: 1.0,
135                specialized_units: Vec::new(),
136            },
137            status: NodeStatus::Healthy,
138            last_seen: Instant::now(),
139            metadata: NodeMetadata::default(),
140        }];
141
142        allocator
143            .update_available_resources(&nodes)
144            .expect("Operation failed");
145
146        let requirements = ResourceRequirements {
147            cpu_cores: 4,
148            memory_gb: 8,
149            gpu_count: 0,
150            disk_space_gb: 50,
151            specialized_requirements: Vec::new(),
152        };
153
154        let allocation = allocator
155            .allocate_resources(&requirements)
156            .expect("Operation failed");
157        assert_eq!(allocation.allocated_resources.cpu_cores, 4);
158        assert_eq!(allocation.allocated_resources.memory_gb, 8);
159    }
160
161    #[test]
162    fn test_health_monitor() {
163        let mut monitor = HealthMonitor::new().expect("Operation failed");
164
165        let node = NodeInfo {
166            id: "test_node".to_string(),
167            address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
168            node_type: NodeType::Worker,
169            capabilities: NodeCapabilities::default(),
170            status: NodeStatus::Unknown,
171            last_seen: Instant::now(),
172            metadata: NodeMetadata::default(),
173        };
174
175        let health_status = monitor.check_node_health(&node).expect("Operation failed");
176        assert!(health_status.health_score >= 0.0 && health_status.health_score <= 100.0f64);
177    }
178
179    #[test]
180    fn test_cluster_topology() {
181        let mut topology = ClusterTopology::new();
182
183        let nodes = vec![
184            NodeInfo {
185                id: "node1".to_string(),
186                address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
187                node_type: NodeType::Worker,
188                capabilities: NodeCapabilities::default(),
189                status: NodeStatus::Healthy,
190                last_seen: Instant::now(),
191                metadata: NodeMetadata::default(),
192            },
193            NodeInfo {
194                id: "node2".to_string(),
195                address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 8080),
196                node_type: NodeType::Worker,
197                capabilities: NodeCapabilities::default(),
198                status: NodeStatus::Healthy,
199                last_seen: Instant::now(),
200                metadata: NodeMetadata::default(),
201            },
202        ];
203
204        topology.update_model(&nodes);
205        assert_eq!(topology.zones.len(), 2); // Two different zones based on IP
206    }
207
208    #[test]
209    fn test_cluster_state() {
210        let mut state = ClusterState::new();
211
212        assert!(state.needs_leader_election());
213        assert!(state.get_leader().is_none());
214
215        state.set_leader("node1".to_string());
216        assert_eq!(state.get_leader(), Some(&"node1".to_string()));
217        assert!(!state.needs_leader_election());
218    }
219
220    #[test]
221    fn test_cluster_event_log() {
222        let mut log = ClusterEventLog::new();
223
224        let event = ClusterEvent::NodeDiscovered {
225            nodeid: "test_node".to_string(),
226            address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
227            timestamp: Instant::now(),
228        };
229
230        log.log_event(event);
231        assert_eq!(log.event_count(), 1);
232
233        let recent_events = log.get_recent_events(10);
234        assert_eq!(recent_events.len(), 1);
235    }
236
237    #[test]
238    fn test_coordination_leader_election() {
239        let nodes = vec![
240            NodeInfo {
241                id: "node_z".to_string(),
242                address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
243                node_type: NodeType::Worker,
244                capabilities: NodeCapabilities::default(),
245                status: NodeStatus::Healthy,
246                last_seen: Instant::now(),
247                metadata: NodeMetadata::default(),
248            },
249            NodeInfo {
250                id: "node_a".to_string(),
251                address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)), 8080),
252                node_type: NodeType::Worker,
253                capabilities: NodeCapabilities::default(),
254                status: NodeStatus::Healthy,
255                last_seen: Instant::now(),
256                metadata: NodeMetadata::default(),
257            },
258        ];
259
260        let leader = ClusterCoordination::elect_leader(&nodes).expect("Operation failed");
261        assert_eq!(leader, Some("node_a".to_string())); // Should elect lexicographically first
262    }
263}