scirs2_core/distributed/cluster/
manager.rs1use crate::error::{CoreError, CoreResult, ErrorContext, ErrorLocation};
7use std::collections::HashMap;
8use std::sync::{Arc, Mutex, RwLock};
9use std::thread;
10use std::time::{Duration, Instant};
11
12use super::allocator::ResourceAllocator;
13use super::coordination::ClusterCoordination;
14use super::events::ClusterEventLog;
15use super::health::HealthMonitor;
16use super::registry::NodeRegistry;
17use super::state::ClusterState;
18use super::types::{
19 ClusterConfiguration, ClusterHealth, ClusterHealthStatus, ClusterStatistics, ComputeCapacity,
20 DistributedTask, ExecutionPlan, ExecutionStatus, NodeInfo, NodeStatus, ResourceUtilization,
21 TaskId,
22};
23
24static GLOBAL_CLUSTER_MANAGER: std::sync::OnceLock<Arc<ClusterManager>> =
26 std::sync::OnceLock::new();
27
28#[derive(Debug)]
30pub struct ClusterManager {
31 cluster_state: Arc<RwLock<ClusterState>>,
32 node_registry: Arc<RwLock<NodeRegistry>>,
33 healthmonitor: Arc<Mutex<HealthMonitor>>,
34 resource_allocator: Arc<RwLock<ResourceAllocator>>,
35 configuration: Arc<RwLock<ClusterConfiguration>>,
36 eventlog: Arc<Mutex<ClusterEventLog>>,
37}
38
39#[allow(dead_code)]
40impl ClusterManager {
41 pub fn new(config: ClusterConfiguration) -> CoreResult<Self> {
43 Ok(Self {
44 cluster_state: Arc::new(RwLock::new(ClusterState::new())),
45 node_registry: Arc::new(RwLock::new(NodeRegistry::new())),
46 healthmonitor: Arc::new(Mutex::new(HealthMonitor::new()?)),
47 resource_allocator: Arc::new(RwLock::new(ResourceAllocator::new())),
48 configuration: Arc::new(RwLock::new(config)),
49 eventlog: Arc::new(Mutex::new(ClusterEventLog::new())),
50 })
51 }
52
53 pub fn global() -> CoreResult<Arc<Self>> {
55 Ok(GLOBAL_CLUSTER_MANAGER
56 .get_or_init(|| Arc::new(Self::new(ClusterConfiguration::default()).unwrap()))
57 .clone())
58 }
59
60 pub fn start(&self) -> CoreResult<()> {
62 self.start_node_discovery()?;
64
65 self.start_health_monitoring()?;
67
68 self.start_resource_management()?;
70
71 self.start_cluster_coordination()?;
73
74 Ok(())
75 }
76
77 fn start_node_discovery(&self) -> CoreResult<()> {
79 let registry = self.node_registry.clone();
80 let config = self.configuration.clone();
81 let eventlog = self.eventlog.clone();
82
83 thread::spawn(move || loop {
84 if let Err(e) = ClusterCoordination::node_discovery_loop(®istry, &config, &eventlog)
85 {
86 eprintln!("Node discovery error: {e:?}");
87 }
88 thread::sleep(Duration::from_secs(30));
89 });
90
91 Ok(())
92 }
93
94 fn start_health_monitoring(&self) -> CoreResult<()> {
96 let healthmonitor = self.healthmonitor.clone();
97 let registry = self.node_registry.clone();
98 let eventlog = self.eventlog.clone();
99
100 thread::spawn(move || loop {
101 if let Err(e) =
102 ClusterCoordination::health_monitoring_loop(&healthmonitor, ®istry, &eventlog)
103 {
104 eprintln!("Health monitoring error: {e:?}");
105 }
106 thread::sleep(Duration::from_secs(10));
107 });
108
109 Ok(())
110 }
111
112 fn start_resource_management(&self) -> CoreResult<()> {
114 let allocator = self.resource_allocator.clone();
115 let registry = self.node_registry.clone();
116
117 thread::spawn(move || loop {
118 if let Err(e) = ClusterCoordination::resource_management_loop(&allocator, ®istry) {
119 eprintln!("Resource management error: {e:?}");
120 }
121 thread::sleep(Duration::from_secs(15));
122 });
123
124 Ok(())
125 }
126
127 fn start_cluster_coordination(&self) -> CoreResult<()> {
129 let cluster_state = self.cluster_state.clone();
130 let registry = self.node_registry.clone();
131 let eventlog = self.eventlog.clone();
132
133 thread::spawn(move || loop {
134 if let Err(e) =
135 ClusterCoordination::cluster_coordination_loop(&cluster_state, ®istry, &eventlog)
136 {
137 eprintln!("Cluster coordination error: {e:?}");
138 }
139 thread::sleep(Duration::from_secs(5));
140 });
141
142 Ok(())
143 }
144
145 pub fn register_node(&self, nodeinfo: NodeInfo) -> CoreResult<()> {
147 let mut registry = self.node_registry.write().map_err(|_| {
148 CoreError::InvalidState(
149 ErrorContext::new("Failed to acquire registry lock")
150 .with_location(ErrorLocation::new(file!(), line!())),
151 )
152 })?;
153
154 registry.register_node(nodeinfo)?;
155 Ok(())
156 }
157
158 pub fn get_health(&self) -> CoreResult<ClusterHealth> {
160 let registry = self.node_registry.read().map_err(|_| {
161 CoreError::InvalidState(
162 ErrorContext::new("Failed to acquire registry lock")
163 .with_location(ErrorLocation::new(file!(), line!())),
164 )
165 })?;
166
167 let all_nodes = registry.get_all_nodes();
168 let healthy_nodes = all_nodes
169 .iter()
170 .filter(|n| n.status == NodeStatus::Healthy)
171 .count();
172 let total_nodes = all_nodes.len();
173
174 let health_percentage = if total_nodes == 0 {
175 100.0
176 } else {
177 (healthy_nodes as f64 / total_nodes as f64) * 100.0
178 };
179
180 let status = if health_percentage >= 80.0 {
181 ClusterHealthStatus::Healthy
182 } else if health_percentage >= 50.0 {
183 ClusterHealthStatus::Degraded
184 } else {
185 ClusterHealthStatus::Unhealthy
186 };
187
188 Ok(ClusterHealth {
189 status,
190 healthy_nodes,
191 total_nodes,
192 health_percentage,
193 last_updated: Instant::now(),
194 })
195 }
196
197 pub fn get_active_nodes(&self) -> CoreResult<Vec<NodeInfo>> {
199 let registry = self.node_registry.read().map_err(|_| {
200 CoreError::InvalidState(
201 ErrorContext::new("Failed to acquire registry lock")
202 .with_location(ErrorLocation::new(file!(), line!())),
203 )
204 })?;
205
206 Ok(registry.get_healthy_nodes())
207 }
208
209 pub fn get_available_nodes(&self) -> CoreResult<HashMap<String, NodeInfo>> {
211 let registry = self.node_registry.read().map_err(|_| {
212 CoreError::InvalidState(
213 ErrorContext::new("Failed to acquire registry lock")
214 .with_location(ErrorLocation::new(file!(), line!())),
215 )
216 })?;
217
218 let nodes = registry.get_healthy_nodes();
219 let mut node_map = HashMap::new();
220 for node in nodes {
221 node_map.insert(node.id.clone(), node);
222 }
223 Ok(node_map)
224 }
225
226 pub fn get_total_capacity(&self) -> CoreResult<ComputeCapacity> {
228 let registry = self.node_registry.read().map_err(|_| {
229 CoreError::InvalidState(
230 ErrorContext::new("Failed to acquire registry lock")
231 .with_location(ErrorLocation::new(file!(), line!())),
232 )
233 })?;
234
235 let nodes = registry.get_healthy_nodes();
236 let mut total_capacity = ComputeCapacity::default();
237
238 for node in nodes {
239 total_capacity.cpu_cores += node.capabilities.cpu_cores;
240 total_capacity.memory_gb += node.capabilities.memory_gb;
241 total_capacity.gpu_count += node.capabilities.gpu_count;
242 total_capacity.disk_space_gb += node.capabilities.disk_space_gb;
243 }
244
245 Ok(total_capacity)
246 }
247
248 pub fn submit_task(&self, task: DistributedTask) -> CoreResult<TaskId> {
250 let allocator = self.resource_allocator.read().map_err(|_| {
251 CoreError::InvalidState(ErrorContext::new("Failed to acquire allocator lock"))
252 })?;
253
254 let allocation = allocator.allocate_resources(&task.resource_requirements)?;
255
256 let taskid = TaskId::generate();
258 let _execution_plan = ExecutionPlan {
259 taskid: taskid.clone(),
260 task,
261 node_allocation: allocation,
262 created_at: Instant::now(),
263 status: ExecutionStatus::Pending,
264 };
265
266 Ok(taskid)
269 }
270
271 pub fn get_cluster_statistics(&self) -> CoreResult<ClusterStatistics> {
273 let registry = self.node_registry.read().map_err(|_| {
274 CoreError::InvalidState(
275 ErrorContext::new("Failed to acquire registry lock")
276 .with_location(ErrorLocation::new(file!(), line!())),
277 )
278 })?;
279
280 let allocator = self.resource_allocator.read().map_err(|_| {
281 CoreError::InvalidState(ErrorContext::new("Failed to acquire allocator lock"))
282 })?;
283
284 let nodes = registry.get_all_nodes();
285 let total_capacity = self.get_total_capacity()?;
286 let available_capacity = (*allocator).available_capacity();
287
288 Ok(ClusterStatistics {
289 total_nodes: nodes.len(),
290 healthy_nodes: nodes
291 .iter()
292 .filter(|n| n.status == NodeStatus::Healthy)
293 .count(),
294 total_capacity: total_capacity.clone(),
295 available_capacity: available_capacity.clone(),
296 resource_utilization: ResourceUtilization {
297 cpu_utilization: 1.0
298 - (available_capacity.cpu_cores as f64 / total_capacity.cpu_cores as f64),
299 memory_utilization: 1.0
300 - (available_capacity.memory_gb as f64 / total_capacity.memory_gb as f64),
301 gpu_utilization: if total_capacity.gpu_count > 0 {
302 1.0 - (available_capacity.gpu_count as f64 / total_capacity.gpu_count as f64)
303 } else {
304 0.0
305 },
306 },
307 })
308 }
309
310 pub fn stop(&self) -> CoreResult<()> {
312 Ok(())
315 }
316
317 pub fn get_configuration(&self) -> CoreResult<ClusterConfiguration> {
319 let config = self.configuration.read().map_err(|_| {
320 CoreError::InvalidState(ErrorContext::new("Failed to acquire config lock"))
321 })?;
322 Ok(config.clone())
323 }
324
325 pub fn update_configuration(&self, new_config: ClusterConfiguration) -> CoreResult<()> {
327 let mut config = self.configuration.write().map_err(|_| {
328 CoreError::InvalidState(ErrorContext::new("Failed to acquire config lock"))
329 })?;
330 *config = new_config;
331 Ok(())
332 }
333
334 pub fn get_cluster_state(&self) -> CoreResult<String> {
336 let state = self.cluster_state.read().map_err(|_| {
337 CoreError::InvalidState(ErrorContext::new("Failed to acquire cluster state lock"))
338 })?;
339
340 if let Some(leader) = state.get_leader() {
341 Ok(format!(
342 "Leader: {leader}, Last updated: {:?}",
343 state.last_updated()
344 ))
345 } else {
346 Ok("No leader elected".to_string())
347 }
348 }
349
350 pub fn force_leader_election(&self) -> CoreResult<Option<String>> {
352 let registry = self.node_registry.read().map_err(|_| {
353 CoreError::InvalidState(ErrorContext::new("Failed to acquire registry lock"))
354 })?;
355
356 let healthy_nodes = registry.get_healthy_nodes();
357 ClusterCoordination::elect_leader(&healthy_nodes)
358 }
359
360 pub fn remove_node(&self, node_id: &str) -> CoreResult<()> {
362 ClusterCoordination::handle_node_removal(node_id, &self.node_registry, &self.eventlog)
363 }
364
365 pub fn shutdown_node(&self, node_id: &str) -> CoreResult<()> {
367 ClusterCoordination::handle_node_shutdown(node_id, &self.node_registry, &self.eventlog)
368 }
369}
370
371#[allow(dead_code)]
373pub fn initialize_cluster_manager() -> CoreResult<()> {
374 let cluster_manager = ClusterManager::global()?;
375 cluster_manager.start()?;
376 Ok(())
377}