scirs2_core/distributed/cluster/
manager.rs

1//! Main cluster manager implementation
2//!
3//! This module provides the core ClusterManager implementation that
4//! orchestrates all cluster management functionality.
5
6use crate::error::{CoreError, CoreResult, ErrorContext, ErrorLocation};
7use std::collections::HashMap;
8use std::sync::{Arc, Mutex, RwLock};
9use std::thread;
10use std::time::{Duration, Instant};
11
12use super::allocator::ResourceAllocator;
13use super::coordination::ClusterCoordination;
14use super::events::ClusterEventLog;
15use super::health::HealthMonitor;
16use super::registry::NodeRegistry;
17use super::state::ClusterState;
18use super::types::{
19    ClusterConfiguration, ClusterHealth, ClusterHealthStatus, ClusterStatistics, ComputeCapacity,
20    DistributedTask, ExecutionPlan, ExecutionStatus, NodeInfo, NodeStatus, ResourceUtilization,
21    TaskId,
22};
23
24/// Global cluster manager instance
25static GLOBAL_CLUSTER_MANAGER: std::sync::OnceLock<Arc<ClusterManager>> =
26    std::sync::OnceLock::new();
27
28/// Comprehensive cluster management system
29#[derive(Debug)]
30pub struct ClusterManager {
31    cluster_state: Arc<RwLock<ClusterState>>,
32    node_registry: Arc<RwLock<NodeRegistry>>,
33    healthmonitor: Arc<Mutex<HealthMonitor>>,
34    resource_allocator: Arc<RwLock<ResourceAllocator>>,
35    configuration: Arc<RwLock<ClusterConfiguration>>,
36    eventlog: Arc<Mutex<ClusterEventLog>>,
37}
38
39#[allow(dead_code)]
40impl ClusterManager {
41    /// Create new cluster manager
42    pub fn new(config: ClusterConfiguration) -> CoreResult<Self> {
43        Ok(Self {
44            cluster_state: Arc::new(RwLock::new(ClusterState::new())),
45            node_registry: Arc::new(RwLock::new(NodeRegistry::new())),
46            healthmonitor: Arc::new(Mutex::new(HealthMonitor::new()?)),
47            resource_allocator: Arc::new(RwLock::new(ResourceAllocator::new())),
48            configuration: Arc::new(RwLock::new(config)),
49            eventlog: Arc::new(Mutex::new(ClusterEventLog::new())),
50        })
51    }
52
53    /// Get global cluster manager instance
54    pub fn global() -> CoreResult<Arc<Self>> {
55        Ok(GLOBAL_CLUSTER_MANAGER
56            .get_or_init(|| Arc::new(Self::new(ClusterConfiguration::default()).unwrap()))
57            .clone())
58    }
59
60    /// Start cluster management services
61    pub fn start(&self) -> CoreResult<()> {
62        // Start node discovery
63        self.start_node_discovery()?;
64
65        // Start health monitoring
66        self.start_health_monitoring()?;
67
68        // Start resource management
69        self.start_resource_management()?;
70
71        // Start cluster coordination
72        self.start_cluster_coordination()?;
73
74        Ok(())
75    }
76
77    /// Start the node discovery background thread
78    fn start_node_discovery(&self) -> CoreResult<()> {
79        let registry = self.node_registry.clone();
80        let config = self.configuration.clone();
81        let eventlog = self.eventlog.clone();
82
83        thread::spawn(move || loop {
84            if let Err(e) = ClusterCoordination::node_discovery_loop(&registry, &config, &eventlog)
85            {
86                eprintln!("Node discovery error: {e:?}");
87            }
88            thread::sleep(Duration::from_secs(30));
89        });
90
91        Ok(())
92    }
93
94    /// Start the health monitoring background thread
95    fn start_health_monitoring(&self) -> CoreResult<()> {
96        let healthmonitor = self.healthmonitor.clone();
97        let registry = self.node_registry.clone();
98        let eventlog = self.eventlog.clone();
99
100        thread::spawn(move || loop {
101            if let Err(e) =
102                ClusterCoordination::health_monitoring_loop(&healthmonitor, &registry, &eventlog)
103            {
104                eprintln!("Health monitoring error: {e:?}");
105            }
106            thread::sleep(Duration::from_secs(10));
107        });
108
109        Ok(())
110    }
111
112    /// Start the resource management background thread
113    fn start_resource_management(&self) -> CoreResult<()> {
114        let allocator = self.resource_allocator.clone();
115        let registry = self.node_registry.clone();
116
117        thread::spawn(move || loop {
118            if let Err(e) = ClusterCoordination::resource_management_loop(&allocator, &registry) {
119                eprintln!("Resource management error: {e:?}");
120            }
121            thread::sleep(Duration::from_secs(15));
122        });
123
124        Ok(())
125    }
126
127    /// Start the cluster coordination background thread
128    fn start_cluster_coordination(&self) -> CoreResult<()> {
129        let cluster_state = self.cluster_state.clone();
130        let registry = self.node_registry.clone();
131        let eventlog = self.eventlog.clone();
132
133        thread::spawn(move || loop {
134            if let Err(e) =
135                ClusterCoordination::cluster_coordination_loop(&cluster_state, &registry, &eventlog)
136            {
137                eprintln!("Cluster coordination error: {e:?}");
138            }
139            thread::sleep(Duration::from_secs(5));
140        });
141
142        Ok(())
143    }
144
145    /// Register a new node in the cluster
146    pub fn register_node(&self, nodeinfo: NodeInfo) -> CoreResult<()> {
147        let mut registry = self.node_registry.write().map_err(|_| {
148            CoreError::InvalidState(
149                ErrorContext::new("Failed to acquire registry lock")
150                    .with_location(ErrorLocation::new(file!(), line!())),
151            )
152        })?;
153
154        registry.register_node(nodeinfo)?;
155        Ok(())
156    }
157
158    /// Get cluster health status
159    pub fn get_health(&self) -> CoreResult<ClusterHealth> {
160        let registry = self.node_registry.read().map_err(|_| {
161            CoreError::InvalidState(
162                ErrorContext::new("Failed to acquire registry lock")
163                    .with_location(ErrorLocation::new(file!(), line!())),
164            )
165        })?;
166
167        let all_nodes = registry.get_all_nodes();
168        let healthy_nodes = all_nodes
169            .iter()
170            .filter(|n| n.status == NodeStatus::Healthy)
171            .count();
172        let total_nodes = all_nodes.len();
173
174        let health_percentage = if total_nodes == 0 {
175            100.0
176        } else {
177            (healthy_nodes as f64 / total_nodes as f64) * 100.0
178        };
179
180        let status = if health_percentage >= 80.0 {
181            ClusterHealthStatus::Healthy
182        } else if health_percentage >= 50.0 {
183            ClusterHealthStatus::Degraded
184        } else {
185            ClusterHealthStatus::Unhealthy
186        };
187
188        Ok(ClusterHealth {
189            status,
190            healthy_nodes,
191            total_nodes,
192            health_percentage,
193            last_updated: Instant::now(),
194        })
195    }
196
197    /// Get list of active nodes
198    pub fn get_active_nodes(&self) -> CoreResult<Vec<NodeInfo>> {
199        let registry = self.node_registry.read().map_err(|_| {
200            CoreError::InvalidState(
201                ErrorContext::new("Failed to acquire registry lock")
202                    .with_location(ErrorLocation::new(file!(), line!())),
203            )
204        })?;
205
206        Ok(registry.get_healthy_nodes())
207    }
208
209    /// Get available nodes (returns nodeid -> nodeinfo mapping)
210    pub fn get_available_nodes(&self) -> CoreResult<HashMap<String, NodeInfo>> {
211        let registry = self.node_registry.read().map_err(|_| {
212            CoreError::InvalidState(
213                ErrorContext::new("Failed to acquire registry lock")
214                    .with_location(ErrorLocation::new(file!(), line!())),
215            )
216        })?;
217
218        let nodes = registry.get_healthy_nodes();
219        let mut node_map = HashMap::new();
220        for node in nodes {
221            node_map.insert(node.id.clone(), node);
222        }
223        Ok(node_map)
224    }
225
226    /// Get total cluster compute capacity
227    pub fn get_total_capacity(&self) -> CoreResult<ComputeCapacity> {
228        let registry = self.node_registry.read().map_err(|_| {
229            CoreError::InvalidState(
230                ErrorContext::new("Failed to acquire registry lock")
231                    .with_location(ErrorLocation::new(file!(), line!())),
232            )
233        })?;
234
235        let nodes = registry.get_healthy_nodes();
236        let mut total_capacity = ComputeCapacity::default();
237
238        for node in nodes {
239            total_capacity.cpu_cores += node.capabilities.cpu_cores;
240            total_capacity.memory_gb += node.capabilities.memory_gb;
241            total_capacity.gpu_count += node.capabilities.gpu_count;
242            total_capacity.disk_space_gb += node.capabilities.disk_space_gb;
243        }
244
245        Ok(total_capacity)
246    }
247
248    /// Submit a distributed task to the cluster
249    pub fn submit_task(&self, task: DistributedTask) -> CoreResult<TaskId> {
250        let allocator = self.resource_allocator.read().map_err(|_| {
251            CoreError::InvalidState(ErrorContext::new("Failed to acquire allocator lock"))
252        })?;
253
254        let allocation = allocator.allocate_resources(&task.resource_requirements)?;
255
256        // Create task execution plan
257        let taskid = TaskId::generate();
258        let _execution_plan = ExecutionPlan {
259            taskid: taskid.clone(),
260            task,
261            node_allocation: allocation,
262            created_at: Instant::now(),
263            status: ExecutionStatus::Pending,
264        };
265
266        // Submit to scheduler (placeholder)
267        // In a real implementation, this would go to the distributed scheduler
268        Ok(taskid)
269    }
270
271    /// Get cluster statistics
272    pub fn get_cluster_statistics(&self) -> CoreResult<ClusterStatistics> {
273        let registry = self.node_registry.read().map_err(|_| {
274            CoreError::InvalidState(
275                ErrorContext::new("Failed to acquire registry lock")
276                    .with_location(ErrorLocation::new(file!(), line!())),
277            )
278        })?;
279
280        let allocator = self.resource_allocator.read().map_err(|_| {
281            CoreError::InvalidState(ErrorContext::new("Failed to acquire allocator lock"))
282        })?;
283
284        let nodes = registry.get_all_nodes();
285        let total_capacity = self.get_total_capacity()?;
286        let available_capacity = (*allocator).available_capacity();
287
288        Ok(ClusterStatistics {
289            total_nodes: nodes.len(),
290            healthy_nodes: nodes
291                .iter()
292                .filter(|n| n.status == NodeStatus::Healthy)
293                .count(),
294            total_capacity: total_capacity.clone(),
295            available_capacity: available_capacity.clone(),
296            resource_utilization: ResourceUtilization {
297                cpu_utilization: 1.0
298                    - (available_capacity.cpu_cores as f64 / total_capacity.cpu_cores as f64),
299                memory_utilization: 1.0
300                    - (available_capacity.memory_gb as f64 / total_capacity.memory_gb as f64),
301                gpu_utilization: if total_capacity.gpu_count > 0 {
302                    1.0 - (available_capacity.gpu_count as f64 / total_capacity.gpu_count as f64)
303                } else {
304                    0.0
305                },
306            },
307        })
308    }
309
310    /// Stop the cluster manager and all background threads
311    pub fn stop(&self) -> CoreResult<()> {
312        // In a real implementation, this would signal the background threads to stop
313        // For now, it's a placeholder
314        Ok(())
315    }
316
317    /// Get cluster configuration
318    pub fn get_configuration(&self) -> CoreResult<ClusterConfiguration> {
319        let config = self.configuration.read().map_err(|_| {
320            CoreError::InvalidState(ErrorContext::new("Failed to acquire config lock"))
321        })?;
322        Ok(config.clone())
323    }
324
325    /// Update cluster configuration
326    pub fn update_configuration(&self, new_config: ClusterConfiguration) -> CoreResult<()> {
327        let mut config = self.configuration.write().map_err(|_| {
328            CoreError::InvalidState(ErrorContext::new("Failed to acquire config lock"))
329        })?;
330        *config = new_config;
331        Ok(())
332    }
333
334    /// Get cluster state
335    pub fn get_cluster_state(&self) -> CoreResult<String> {
336        let state = self.cluster_state.read().map_err(|_| {
337            CoreError::InvalidState(ErrorContext::new("Failed to acquire cluster state lock"))
338        })?;
339
340        if let Some(leader) = state.get_leader() {
341            Ok(format!(
342                "Leader: {leader}, Last updated: {:?}",
343                state.last_updated()
344            ))
345        } else {
346            Ok("No leader elected".to_string())
347        }
348    }
349
350    /// Force leader election
351    pub fn force_leader_election(&self) -> CoreResult<Option<String>> {
352        let registry = self.node_registry.read().map_err(|_| {
353            CoreError::InvalidState(ErrorContext::new("Failed to acquire registry lock"))
354        })?;
355
356        let healthy_nodes = registry.get_healthy_nodes();
357        ClusterCoordination::elect_leader(&healthy_nodes)
358    }
359
360    /// Remove a node from the cluster
361    pub fn remove_node(&self, node_id: &str) -> CoreResult<()> {
362        ClusterCoordination::handle_node_removal(node_id, &self.node_registry, &self.eventlog)
363    }
364
365    /// Gracefully shutdown a node
366    pub fn shutdown_node(&self, node_id: &str) -> CoreResult<()> {
367        ClusterCoordination::handle_node_shutdown(node_id, &self.node_registry, &self.eventlog)
368    }
369}
370
371/// Initialize cluster manager with default configuration
372#[allow(dead_code)]
373pub fn initialize_cluster_manager() -> CoreResult<()> {
374    let cluster_manager = ClusterManager::global()?;
375    cluster_manager.start()?;
376    Ok(())
377}