scirs2_core/distributed/cluster/
manager.rs1use crate::error::{CoreError, CoreResult, ErrorContext, ErrorLocation};
7use std::collections::HashMap;
8use std::sync::{Arc, Mutex, RwLock};
9use std::thread;
10use std::time::{Duration, Instant};
11
12use super::allocator::ResourceAllocator;
13use super::coordination::ClusterCoordination;
14use super::events::ClusterEventLog;
15use super::health::HealthMonitor;
16use super::registry::NodeRegistry;
17use super::state::ClusterState;
18use super::types::{
19 ClusterConfiguration, ClusterHealth, ClusterHealthStatus, ClusterStatistics, ComputeCapacity,
20 DistributedTask, ExecutionPlan, ExecutionStatus, NodeInfo, NodeStatus, ResourceUtilization,
21 TaskId,
22};
23
24static GLOBAL_CLUSTER_MANAGER: std::sync::OnceLock<Arc<ClusterManager>> =
26 std::sync::OnceLock::new();
27
28#[derive(Debug)]
30pub struct ClusterManager {
31 cluster_state: Arc<RwLock<ClusterState>>,
32 node_registry: Arc<RwLock<NodeRegistry>>,
33 healthmonitor: Arc<Mutex<HealthMonitor>>,
34 resource_allocator: Arc<RwLock<ResourceAllocator>>,
35 configuration: Arc<RwLock<ClusterConfiguration>>,
36 eventlog: Arc<Mutex<ClusterEventLog>>,
37}
38
39#[allow(dead_code)]
40impl ClusterManager {
41 pub fn new(config: ClusterConfiguration) -> CoreResult<Self> {
43 Ok(Self {
44 cluster_state: Arc::new(RwLock::new(ClusterState::new())),
45 node_registry: Arc::new(RwLock::new(NodeRegistry::new())),
46 healthmonitor: Arc::new(Mutex::new(HealthMonitor::new()?)),
47 resource_allocator: Arc::new(RwLock::new(ResourceAllocator::new())),
48 configuration: Arc::new(RwLock::new(config)),
49 eventlog: Arc::new(Mutex::new(ClusterEventLog::new())),
50 })
51 }
52
53 pub fn global() -> CoreResult<Arc<Self>> {
55 Ok(GLOBAL_CLUSTER_MANAGER
56 .get_or_init(|| {
57 Arc::new(Self::new(ClusterConfiguration::default()).expect("Operation failed"))
58 })
59 .clone())
60 }
61
62 pub fn start(&self) -> CoreResult<()> {
64 self.start_node_discovery()?;
66
67 self.start_health_monitoring()?;
69
70 self.start_resource_management()?;
72
73 self.start_cluster_coordination()?;
75
76 Ok(())
77 }
78
79 fn start_node_discovery(&self) -> CoreResult<()> {
81 let registry = self.node_registry.clone();
82 let config = self.configuration.clone();
83 let eventlog = self.eventlog.clone();
84
85 thread::spawn(move || loop {
86 if let Err(e) = ClusterCoordination::node_discovery_loop(®istry, &config, &eventlog)
87 {
88 eprintln!("Node discovery error: {e:?}");
89 }
90 thread::sleep(Duration::from_secs(30));
91 });
92
93 Ok(())
94 }
95
96 fn start_health_monitoring(&self) -> CoreResult<()> {
98 let healthmonitor = self.healthmonitor.clone();
99 let registry = self.node_registry.clone();
100 let eventlog = self.eventlog.clone();
101
102 thread::spawn(move || loop {
103 if let Err(e) =
104 ClusterCoordination::health_monitoring_loop(&healthmonitor, ®istry, &eventlog)
105 {
106 eprintln!("Health monitoring error: {e:?}");
107 }
108 thread::sleep(Duration::from_secs(10));
109 });
110
111 Ok(())
112 }
113
114 fn start_resource_management(&self) -> CoreResult<()> {
116 let allocator = self.resource_allocator.clone();
117 let registry = self.node_registry.clone();
118
119 thread::spawn(move || loop {
120 if let Err(e) = ClusterCoordination::resource_management_loop(&allocator, ®istry) {
121 eprintln!("Resource management error: {e:?}");
122 }
123 thread::sleep(Duration::from_secs(15));
124 });
125
126 Ok(())
127 }
128
129 fn start_cluster_coordination(&self) -> CoreResult<()> {
131 let cluster_state = self.cluster_state.clone();
132 let registry = self.node_registry.clone();
133 let eventlog = self.eventlog.clone();
134
135 thread::spawn(move || loop {
136 if let Err(e) =
137 ClusterCoordination::cluster_coordination_loop(&cluster_state, ®istry, &eventlog)
138 {
139 eprintln!("Cluster coordination error: {e:?}");
140 }
141 thread::sleep(Duration::from_secs(5));
142 });
143
144 Ok(())
145 }
146
147 pub fn register_node(&self, nodeinfo: NodeInfo) -> CoreResult<()> {
149 let mut registry = self.node_registry.write().map_err(|_| {
150 CoreError::InvalidState(
151 ErrorContext::new("Failed to acquire registry lock")
152 .with_location(ErrorLocation::new(file!(), line!())),
153 )
154 })?;
155
156 registry.register_node(nodeinfo)?;
157 Ok(())
158 }
159
160 pub fn get_health(&self) -> CoreResult<ClusterHealth> {
162 let registry = self.node_registry.read().map_err(|_| {
163 CoreError::InvalidState(
164 ErrorContext::new("Failed to acquire registry lock")
165 .with_location(ErrorLocation::new(file!(), line!())),
166 )
167 })?;
168
169 let all_nodes = registry.get_all_nodes();
170 let healthy_nodes = all_nodes
171 .iter()
172 .filter(|n| n.status == NodeStatus::Healthy)
173 .count();
174 let total_nodes = all_nodes.len();
175
176 let health_percentage = if total_nodes == 0 {
177 100.0
178 } else {
179 (healthy_nodes as f64 / total_nodes as f64) * 100.0
180 };
181
182 let status = if health_percentage >= 80.0 {
183 ClusterHealthStatus::Healthy
184 } else if health_percentage >= 50.0 {
185 ClusterHealthStatus::Degraded
186 } else {
187 ClusterHealthStatus::Unhealthy
188 };
189
190 Ok(ClusterHealth {
191 status,
192 healthy_nodes,
193 total_nodes,
194 health_percentage,
195 last_updated: Instant::now(),
196 })
197 }
198
199 pub fn get_active_nodes(&self) -> CoreResult<Vec<NodeInfo>> {
201 let registry = self.node_registry.read().map_err(|_| {
202 CoreError::InvalidState(
203 ErrorContext::new("Failed to acquire registry lock")
204 .with_location(ErrorLocation::new(file!(), line!())),
205 )
206 })?;
207
208 Ok(registry.get_healthy_nodes())
209 }
210
211 pub fn get_available_nodes(&self) -> CoreResult<HashMap<String, NodeInfo>> {
213 let registry = self.node_registry.read().map_err(|_| {
214 CoreError::InvalidState(
215 ErrorContext::new("Failed to acquire registry lock")
216 .with_location(ErrorLocation::new(file!(), line!())),
217 )
218 })?;
219
220 let nodes = registry.get_healthy_nodes();
221 let mut node_map = HashMap::new();
222 for node in nodes {
223 node_map.insert(node.id.clone(), node);
224 }
225 Ok(node_map)
226 }
227
228 pub fn get_total_capacity(&self) -> CoreResult<ComputeCapacity> {
230 let registry = self.node_registry.read().map_err(|_| {
231 CoreError::InvalidState(
232 ErrorContext::new("Failed to acquire registry lock")
233 .with_location(ErrorLocation::new(file!(), line!())),
234 )
235 })?;
236
237 let nodes = registry.get_healthy_nodes();
238 let mut total_capacity = ComputeCapacity::default();
239
240 for node in nodes {
241 total_capacity.cpu_cores += node.capabilities.cpu_cores;
242 total_capacity.memory_gb += node.capabilities.memory_gb;
243 total_capacity.gpu_count += node.capabilities.gpu_count;
244 total_capacity.disk_space_gb += node.capabilities.disk_space_gb;
245 }
246
247 Ok(total_capacity)
248 }
249
250 pub fn submit_task(&self, task: DistributedTask) -> CoreResult<TaskId> {
252 let allocator = self.resource_allocator.read().map_err(|_| {
253 CoreError::InvalidState(ErrorContext::new("Failed to acquire allocator lock"))
254 })?;
255
256 let allocation = allocator.allocate_resources(&task.resource_requirements)?;
257
258 let taskid = TaskId::generate();
260 let _execution_plan = ExecutionPlan {
261 taskid: taskid.clone(),
262 task,
263 node_allocation: allocation,
264 created_at: Instant::now(),
265 status: ExecutionStatus::Pending,
266 };
267
268 Ok(taskid)
271 }
272
273 pub fn get_cluster_statistics(&self) -> CoreResult<ClusterStatistics> {
275 let registry = self.node_registry.read().map_err(|_| {
276 CoreError::InvalidState(
277 ErrorContext::new("Failed to acquire registry lock")
278 .with_location(ErrorLocation::new(file!(), line!())),
279 )
280 })?;
281
282 let allocator = self.resource_allocator.read().map_err(|_| {
283 CoreError::InvalidState(ErrorContext::new("Failed to acquire allocator lock"))
284 })?;
285
286 let nodes = registry.get_all_nodes();
287 let total_capacity = self.get_total_capacity()?;
288 let available_capacity = (*allocator).available_capacity();
289
290 Ok(ClusterStatistics {
291 total_nodes: nodes.len(),
292 healthy_nodes: nodes
293 .iter()
294 .filter(|n| n.status == NodeStatus::Healthy)
295 .count(),
296 total_capacity: total_capacity.clone(),
297 available_capacity: available_capacity.clone(),
298 resource_utilization: ResourceUtilization {
299 cpu_utilization: 1.0
300 - (available_capacity.cpu_cores as f64 / total_capacity.cpu_cores as f64),
301 memory_utilization: 1.0
302 - (available_capacity.memory_gb as f64 / total_capacity.memory_gb as f64),
303 gpu_utilization: if total_capacity.gpu_count > 0 {
304 1.0 - (available_capacity.gpu_count as f64 / total_capacity.gpu_count as f64)
305 } else {
306 0.0
307 },
308 },
309 })
310 }
311
312 pub fn stop(&self) -> CoreResult<()> {
314 Ok(())
317 }
318
319 pub fn get_configuration(&self) -> CoreResult<ClusterConfiguration> {
321 let config = self.configuration.read().map_err(|_| {
322 CoreError::InvalidState(ErrorContext::new("Failed to acquire config lock"))
323 })?;
324 Ok(config.clone())
325 }
326
327 pub fn update_configuration(&self, new_config: ClusterConfiguration) -> CoreResult<()> {
329 let mut config = self.configuration.write().map_err(|_| {
330 CoreError::InvalidState(ErrorContext::new("Failed to acquire config lock"))
331 })?;
332 *config = new_config;
333 Ok(())
334 }
335
336 pub fn get_cluster_state(&self) -> CoreResult<String> {
338 let state = self.cluster_state.read().map_err(|_| {
339 CoreError::InvalidState(ErrorContext::new("Failed to acquire cluster state lock"))
340 })?;
341
342 if let Some(leader) = state.get_leader() {
343 Ok(format!(
344 "Leader: {leader}, Last updated: {:?}",
345 state.last_updated()
346 ))
347 } else {
348 Ok("No leader elected".to_string())
349 }
350 }
351
352 pub fn force_leader_election(&self) -> CoreResult<Option<String>> {
354 let registry = self.node_registry.read().map_err(|_| {
355 CoreError::InvalidState(ErrorContext::new("Failed to acquire registry lock"))
356 })?;
357
358 let healthy_nodes = registry.get_healthy_nodes();
359 ClusterCoordination::elect_leader(&healthy_nodes)
360 }
361
362 pub fn remove_node(&self, node_id: &str) -> CoreResult<()> {
364 ClusterCoordination::handle_node_removal(node_id, &self.node_registry, &self.eventlog)
365 }
366
367 pub fn shutdown_node(&self, node_id: &str) -> CoreResult<()> {
369 ClusterCoordination::handle_node_shutdown(node_id, &self.node_registry, &self.eventlog)
370 }
371}
372
373#[allow(dead_code)]
375pub fn initialize_cluster_manager() -> CoreResult<()> {
376 let cluster_manager = ClusterManager::global()?;
377 cluster_manager.start()?;
378 Ok(())
379}