scirs2_core/distributed/cluster/
manager.rs

1//! Main cluster manager implementation
2//!
3//! This module provides the core ClusterManager implementation that
4//! orchestrates all cluster management functionality.
5
6use crate::error::{CoreError, CoreResult, ErrorContext, ErrorLocation};
7use std::collections::HashMap;
8use std::sync::{Arc, Mutex, RwLock};
9use std::thread;
10use std::time::{Duration, Instant};
11
12use super::allocator::ResourceAllocator;
13use super::coordination::ClusterCoordination;
14use super::events::ClusterEventLog;
15use super::health::HealthMonitor;
16use super::registry::NodeRegistry;
17use super::state::ClusterState;
18use super::types::{
19    ClusterConfiguration, ClusterHealth, ClusterHealthStatus, ClusterStatistics, ComputeCapacity,
20    DistributedTask, ExecutionPlan, ExecutionStatus, NodeInfo, NodeStatus, ResourceUtilization,
21    TaskId,
22};
23
24/// Global cluster manager instance
25static GLOBAL_CLUSTER_MANAGER: std::sync::OnceLock<Arc<ClusterManager>> =
26    std::sync::OnceLock::new();
27
28/// Comprehensive cluster management system
29#[derive(Debug)]
30pub struct ClusterManager {
31    cluster_state: Arc<RwLock<ClusterState>>,
32    node_registry: Arc<RwLock<NodeRegistry>>,
33    healthmonitor: Arc<Mutex<HealthMonitor>>,
34    resource_allocator: Arc<RwLock<ResourceAllocator>>,
35    configuration: Arc<RwLock<ClusterConfiguration>>,
36    eventlog: Arc<Mutex<ClusterEventLog>>,
37}
38
39#[allow(dead_code)]
40impl ClusterManager {
41    /// Create new cluster manager
42    pub fn new(config: ClusterConfiguration) -> CoreResult<Self> {
43        Ok(Self {
44            cluster_state: Arc::new(RwLock::new(ClusterState::new())),
45            node_registry: Arc::new(RwLock::new(NodeRegistry::new())),
46            healthmonitor: Arc::new(Mutex::new(HealthMonitor::new()?)),
47            resource_allocator: Arc::new(RwLock::new(ResourceAllocator::new())),
48            configuration: Arc::new(RwLock::new(config)),
49            eventlog: Arc::new(Mutex::new(ClusterEventLog::new())),
50        })
51    }
52
53    /// Get global cluster manager instance
54    pub fn global() -> CoreResult<Arc<Self>> {
55        Ok(GLOBAL_CLUSTER_MANAGER
56            .get_or_init(|| {
57                Arc::new(Self::new(ClusterConfiguration::default()).expect("Operation failed"))
58            })
59            .clone())
60    }
61
62    /// Start cluster management services
63    pub fn start(&self) -> CoreResult<()> {
64        // Start node discovery
65        self.start_node_discovery()?;
66
67        // Start health monitoring
68        self.start_health_monitoring()?;
69
70        // Start resource management
71        self.start_resource_management()?;
72
73        // Start cluster coordination
74        self.start_cluster_coordination()?;
75
76        Ok(())
77    }
78
79    /// Start the node discovery background thread
80    fn start_node_discovery(&self) -> CoreResult<()> {
81        let registry = self.node_registry.clone();
82        let config = self.configuration.clone();
83        let eventlog = self.eventlog.clone();
84
85        thread::spawn(move || loop {
86            if let Err(e) = ClusterCoordination::node_discovery_loop(&registry, &config, &eventlog)
87            {
88                eprintln!("Node discovery error: {e:?}");
89            }
90            thread::sleep(Duration::from_secs(30));
91        });
92
93        Ok(())
94    }
95
96    /// Start the health monitoring background thread
97    fn start_health_monitoring(&self) -> CoreResult<()> {
98        let healthmonitor = self.healthmonitor.clone();
99        let registry = self.node_registry.clone();
100        let eventlog = self.eventlog.clone();
101
102        thread::spawn(move || loop {
103            if let Err(e) =
104                ClusterCoordination::health_monitoring_loop(&healthmonitor, &registry, &eventlog)
105            {
106                eprintln!("Health monitoring error: {e:?}");
107            }
108            thread::sleep(Duration::from_secs(10));
109        });
110
111        Ok(())
112    }
113
114    /// Start the resource management background thread
115    fn start_resource_management(&self) -> CoreResult<()> {
116        let allocator = self.resource_allocator.clone();
117        let registry = self.node_registry.clone();
118
119        thread::spawn(move || loop {
120            if let Err(e) = ClusterCoordination::resource_management_loop(&allocator, &registry) {
121                eprintln!("Resource management error: {e:?}");
122            }
123            thread::sleep(Duration::from_secs(15));
124        });
125
126        Ok(())
127    }
128
129    /// Start the cluster coordination background thread
130    fn start_cluster_coordination(&self) -> CoreResult<()> {
131        let cluster_state = self.cluster_state.clone();
132        let registry = self.node_registry.clone();
133        let eventlog = self.eventlog.clone();
134
135        thread::spawn(move || loop {
136            if let Err(e) =
137                ClusterCoordination::cluster_coordination_loop(&cluster_state, &registry, &eventlog)
138            {
139                eprintln!("Cluster coordination error: {e:?}");
140            }
141            thread::sleep(Duration::from_secs(5));
142        });
143
144        Ok(())
145    }
146
147    /// Register a new node in the cluster
148    pub fn register_node(&self, nodeinfo: NodeInfo) -> CoreResult<()> {
149        let mut registry = self.node_registry.write().map_err(|_| {
150            CoreError::InvalidState(
151                ErrorContext::new("Failed to acquire registry lock")
152                    .with_location(ErrorLocation::new(file!(), line!())),
153            )
154        })?;
155
156        registry.register_node(nodeinfo)?;
157        Ok(())
158    }
159
160    /// Get cluster health status
161    pub fn get_health(&self) -> CoreResult<ClusterHealth> {
162        let registry = self.node_registry.read().map_err(|_| {
163            CoreError::InvalidState(
164                ErrorContext::new("Failed to acquire registry lock")
165                    .with_location(ErrorLocation::new(file!(), line!())),
166            )
167        })?;
168
169        let all_nodes = registry.get_all_nodes();
170        let healthy_nodes = all_nodes
171            .iter()
172            .filter(|n| n.status == NodeStatus::Healthy)
173            .count();
174        let total_nodes = all_nodes.len();
175
176        let health_percentage = if total_nodes == 0 {
177            100.0
178        } else {
179            (healthy_nodes as f64 / total_nodes as f64) * 100.0
180        };
181
182        let status = if health_percentage >= 80.0 {
183            ClusterHealthStatus::Healthy
184        } else if health_percentage >= 50.0 {
185            ClusterHealthStatus::Degraded
186        } else {
187            ClusterHealthStatus::Unhealthy
188        };
189
190        Ok(ClusterHealth {
191            status,
192            healthy_nodes,
193            total_nodes,
194            health_percentage,
195            last_updated: Instant::now(),
196        })
197    }
198
199    /// Get list of active nodes
200    pub fn get_active_nodes(&self) -> CoreResult<Vec<NodeInfo>> {
201        let registry = self.node_registry.read().map_err(|_| {
202            CoreError::InvalidState(
203                ErrorContext::new("Failed to acquire registry lock")
204                    .with_location(ErrorLocation::new(file!(), line!())),
205            )
206        })?;
207
208        Ok(registry.get_healthy_nodes())
209    }
210
211    /// Get available nodes (returns nodeid -> nodeinfo mapping)
212    pub fn get_available_nodes(&self) -> CoreResult<HashMap<String, NodeInfo>> {
213        let registry = self.node_registry.read().map_err(|_| {
214            CoreError::InvalidState(
215                ErrorContext::new("Failed to acquire registry lock")
216                    .with_location(ErrorLocation::new(file!(), line!())),
217            )
218        })?;
219
220        let nodes = registry.get_healthy_nodes();
221        let mut node_map = HashMap::new();
222        for node in nodes {
223            node_map.insert(node.id.clone(), node);
224        }
225        Ok(node_map)
226    }
227
228    /// Get total cluster compute capacity
229    pub fn get_total_capacity(&self) -> CoreResult<ComputeCapacity> {
230        let registry = self.node_registry.read().map_err(|_| {
231            CoreError::InvalidState(
232                ErrorContext::new("Failed to acquire registry lock")
233                    .with_location(ErrorLocation::new(file!(), line!())),
234            )
235        })?;
236
237        let nodes = registry.get_healthy_nodes();
238        let mut total_capacity = ComputeCapacity::default();
239
240        for node in nodes {
241            total_capacity.cpu_cores += node.capabilities.cpu_cores;
242            total_capacity.memory_gb += node.capabilities.memory_gb;
243            total_capacity.gpu_count += node.capabilities.gpu_count;
244            total_capacity.disk_space_gb += node.capabilities.disk_space_gb;
245        }
246
247        Ok(total_capacity)
248    }
249
250    /// Submit a distributed task to the cluster
251    pub fn submit_task(&self, task: DistributedTask) -> CoreResult<TaskId> {
252        let allocator = self.resource_allocator.read().map_err(|_| {
253            CoreError::InvalidState(ErrorContext::new("Failed to acquire allocator lock"))
254        })?;
255
256        let allocation = allocator.allocate_resources(&task.resource_requirements)?;
257
258        // Create task execution plan
259        let taskid = TaskId::generate();
260        let _execution_plan = ExecutionPlan {
261            taskid: taskid.clone(),
262            task,
263            node_allocation: allocation,
264            created_at: Instant::now(),
265            status: ExecutionStatus::Pending,
266        };
267
268        // Submit to scheduler (placeholder)
269        // In a real implementation, this would go to the distributed scheduler
270        Ok(taskid)
271    }
272
273    /// Get cluster statistics
274    pub fn get_cluster_statistics(&self) -> CoreResult<ClusterStatistics> {
275        let registry = self.node_registry.read().map_err(|_| {
276            CoreError::InvalidState(
277                ErrorContext::new("Failed to acquire registry lock")
278                    .with_location(ErrorLocation::new(file!(), line!())),
279            )
280        })?;
281
282        let allocator = self.resource_allocator.read().map_err(|_| {
283            CoreError::InvalidState(ErrorContext::new("Failed to acquire allocator lock"))
284        })?;
285
286        let nodes = registry.get_all_nodes();
287        let total_capacity = self.get_total_capacity()?;
288        let available_capacity = (*allocator).available_capacity();
289
290        Ok(ClusterStatistics {
291            total_nodes: nodes.len(),
292            healthy_nodes: nodes
293                .iter()
294                .filter(|n| n.status == NodeStatus::Healthy)
295                .count(),
296            total_capacity: total_capacity.clone(),
297            available_capacity: available_capacity.clone(),
298            resource_utilization: ResourceUtilization {
299                cpu_utilization: 1.0
300                    - (available_capacity.cpu_cores as f64 / total_capacity.cpu_cores as f64),
301                memory_utilization: 1.0
302                    - (available_capacity.memory_gb as f64 / total_capacity.memory_gb as f64),
303                gpu_utilization: if total_capacity.gpu_count > 0 {
304                    1.0 - (available_capacity.gpu_count as f64 / total_capacity.gpu_count as f64)
305                } else {
306                    0.0
307                },
308            },
309        })
310    }
311
312    /// Stop the cluster manager and all background threads
313    pub fn stop(&self) -> CoreResult<()> {
314        // In a real implementation, this would signal the background threads to stop
315        // For now, it's a placeholder
316        Ok(())
317    }
318
319    /// Get cluster configuration
320    pub fn get_configuration(&self) -> CoreResult<ClusterConfiguration> {
321        let config = self.configuration.read().map_err(|_| {
322            CoreError::InvalidState(ErrorContext::new("Failed to acquire config lock"))
323        })?;
324        Ok(config.clone())
325    }
326
327    /// Update cluster configuration
328    pub fn update_configuration(&self, new_config: ClusterConfiguration) -> CoreResult<()> {
329        let mut config = self.configuration.write().map_err(|_| {
330            CoreError::InvalidState(ErrorContext::new("Failed to acquire config lock"))
331        })?;
332        *config = new_config;
333        Ok(())
334    }
335
336    /// Get cluster state
337    pub fn get_cluster_state(&self) -> CoreResult<String> {
338        let state = self.cluster_state.read().map_err(|_| {
339            CoreError::InvalidState(ErrorContext::new("Failed to acquire cluster state lock"))
340        })?;
341
342        if let Some(leader) = state.get_leader() {
343            Ok(format!(
344                "Leader: {leader}, Last updated: {:?}",
345                state.last_updated()
346            ))
347        } else {
348            Ok("No leader elected".to_string())
349        }
350    }
351
352    /// Force leader election
353    pub fn force_leader_election(&self) -> CoreResult<Option<String>> {
354        let registry = self.node_registry.read().map_err(|_| {
355            CoreError::InvalidState(ErrorContext::new("Failed to acquire registry lock"))
356        })?;
357
358        let healthy_nodes = registry.get_healthy_nodes();
359        ClusterCoordination::elect_leader(&healthy_nodes)
360    }
361
362    /// Remove a node from the cluster
363    pub fn remove_node(&self, node_id: &str) -> CoreResult<()> {
364        ClusterCoordination::handle_node_removal(node_id, &self.node_registry, &self.eventlog)
365    }
366
367    /// Gracefully shutdown a node
368    pub fn shutdown_node(&self, node_id: &str) -> CoreResult<()> {
369        ClusterCoordination::handle_node_shutdown(node_id, &self.node_registry, &self.eventlog)
370    }
371}
372
373/// Initialize cluster manager with default configuration
374#[allow(dead_code)]
375pub fn initialize_cluster_manager() -> CoreResult<()> {
376    let cluster_manager = ClusterManager::global()?;
377    cluster_manager.start()?;
378    Ok(())
379}