1use crate::error::{CoreError, CoreResult, ErrorContext};
8use std::collections::{BTreeMap, HashMap, VecDeque};
9use std::net::{IpAddr, SocketAddr};
10use std::sync::{Arc, Mutex, RwLock};
11use std::thread;
12use std::time::{Duration, Instant, SystemTime};
13
14#[cfg(feature = "logging")]
15use log;
16
17use serde::{Deserialize, Serialize};
18
19static GLOBAL_CLUSTER_MANAGER: std::sync::OnceLock<Arc<ClusterManager>> =
21 std::sync::OnceLock::new();
22
23#[derive(Debug)]
25pub struct ClusterManager {
26 cluster_state: Arc<RwLock<ClusterState>>,
27 node_registry: Arc<RwLock<NodeRegistry>>,
28 healthmonitor: Arc<Mutex<HealthMonitor>>,
29 resource_allocator: Arc<RwLock<ResourceAllocator>>,
30 configuration: Arc<RwLock<ClusterConfiguration>>,
31 eventlog: Arc<Mutex<ClusterEventLog>>,
32}
33
34#[allow(dead_code)]
35impl ClusterManager {
36 pub fn new(config: ClusterConfiguration) -> CoreResult<Self> {
38 Ok(Self {
39 cluster_state: Arc::new(RwLock::new(ClusterState::new())),
40 node_registry: Arc::new(RwLock::new(NodeRegistry::new())),
41 healthmonitor: Arc::new(Mutex::new(HealthMonitor::new()?)),
42 resource_allocator: Arc::new(RwLock::new(ResourceAllocator::new())),
43 configuration: Arc::new(RwLock::new(config)),
44 eventlog: Arc::new(Mutex::new(ClusterEventLog::new())),
45 })
46 }
47
48 pub fn global() -> CoreResult<Arc<Self>> {
50 Ok(GLOBAL_CLUSTER_MANAGER
51 .get_or_init(|| Arc::new(Self::new(ClusterConfiguration::default()).unwrap()))
52 .clone())
53 }
54
55 pub fn start(&self) -> CoreResult<()> {
57 self.start_node_discovery()?;
59
60 self.start_healthmonitoring()?;
62
63 self.start_resource_management()?;
65
66 self.start_cluster_coordination()?;
68
69 Ok(())
70 }
71
72 fn start_node_discovery(&self) -> CoreResult<()> {
73 let registry = self.node_registry.clone();
74 let config = self.configuration.clone();
75 let eventlog = self.eventlog.clone();
76
77 thread::spawn(move || loop {
78 if let Err(e) = Self::node_discovery_loop(®istry, &config, &eventlog) {
79 eprintln!("Node discovery error: {e:?}");
80 }
81 thread::sleep(Duration::from_secs(30));
82 });
83
84 Ok(())
85 }
86
87 fn start_healthmonitoring(&self) -> CoreResult<()> {
88 let healthmonitor = self.healthmonitor.clone();
89 let registry = self.node_registry.clone();
90 let eventlog = self.eventlog.clone();
91
92 thread::spawn(move || loop {
93 if let Err(e) = Self::healthmonitoring_loop(&healthmonitor, ®istry, &eventlog) {
94 eprintln!("Health monitoring error: {e:?}");
95 }
96 thread::sleep(Duration::from_secs(10));
97 });
98
99 Ok(())
100 }
101
102 fn start_resource_management(&self) -> CoreResult<()> {
103 let allocator = self.resource_allocator.clone();
104 let registry = self.node_registry.clone();
105
106 thread::spawn(move || loop {
107 if let Err(e) = Self::resource_management_loop(&allocator, ®istry) {
108 eprintln!("Resource management error: {e:?}");
109 }
110 thread::sleep(Duration::from_secs(15));
111 });
112
113 Ok(())
114 }
115
116 fn start_cluster_coordination(&self) -> CoreResult<()> {
117 let cluster_state = self.cluster_state.clone();
118 let registry = self.node_registry.clone();
119 let eventlog = self.eventlog.clone();
120
121 thread::spawn(move || loop {
122 if let Err(e) = Self::cluster_coordination_loop(&cluster_state, ®istry, &eventlog) {
123 eprintln!("Cluster coordination error: {e:?}");
124 }
125 thread::sleep(Duration::from_secs(5));
126 });
127
128 Ok(())
129 }
130
131 fn node_discovery_loop(
132 registry: &Arc<RwLock<NodeRegistry>>,
133 config: &Arc<RwLock<ClusterConfiguration>>,
134 eventlog: &Arc<Mutex<ClusterEventLog>>,
135 ) -> CoreResult<()> {
136 let config_read = config.read().map_err(|_| {
137 CoreError::InvalidState(ErrorContext::new("Failed to acquire config lock"))
138 })?;
139
140 if !config_read.auto_discovery_enabled {
141 return Ok(());
142 }
143
144 for discovery_method in &config_read.discovery_methods {
146 let discovered_nodes = match discovery_method {
148 NodeDiscoveryMethod::Static(addresses) => {
149 let mut nodes = Vec::new();
150 for address in addresses {
151 if Self::is_node_reachable(*address)? {
152 nodes.push(NodeInfo {
153 id: format!("node_{address}"),
154 address: *address,
155 node_type: NodeType::Worker,
156 capabilities: NodeCapabilities::default(),
157 status: NodeStatus::Unknown,
158 last_seen: Instant::now(),
159 metadata: NodeMetadata::default(),
160 });
161 }
162 }
163 nodes
164 }
165 NodeDiscoveryMethod::Multicast { group, port } => {
166 Self::multicast_discovery(group, *port)?
167 }
168 NodeDiscoveryMethod::DnsService { service_name } => {
169 vec![]
171 }
172 NodeDiscoveryMethod::Consul { endpoint } => {
173 vec![]
175 }
176 };
177
178 let mut registry_write = registry.write().map_err(|_| {
179 CoreError::InvalidState(ErrorContext::new("Failed to acquire registry lock"))
180 })?;
181
182 for nodeinfo in discovered_nodes {
183 if registry_write.register_node(nodeinfo.clone())? {
184 let mut log = eventlog.lock().map_err(|_| {
186 CoreError::InvalidState(ErrorContext::new(
187 "Failed to acquire event log lock",
188 ))
189 })?;
190 log.log_event(ClusterEvent::NodeDiscovered {
191 nodeid: nodeinfo.id.clone(),
192 address: nodeinfo.address,
193 timestamp: Instant::now(),
194 });
195 }
196 }
197 }
198
199 Ok(())
200 }
201
202 fn discover_nodes(&self, method: &NodeDiscoveryMethod) -> CoreResult<Vec<NodeInfo>> {
203 match method {
204 NodeDiscoveryMethod::Static(addresses) => {
205 let mut nodes = Vec::new();
206 for address in addresses {
207 if Self::is_node_reachable(*address)? {
208 nodes.push(NodeInfo {
209 id: format!("node_{address}"),
210 address: *address,
211 node_type: NodeType::Worker,
212 capabilities: NodeCapabilities::default(),
213 status: NodeStatus::Unknown,
214 last_seen: Instant::now(),
215 metadata: NodeMetadata::default(),
216 });
217 }
218 }
219 Ok(nodes)
220 }
221 NodeDiscoveryMethod::Multicast { group, port } => {
222 self.discover_via_multicast(group, *port)
224 }
225 NodeDiscoveryMethod::DnsService { service_name } => {
226 self.discover_via_dns_service(service_name)
228 }
229 NodeDiscoveryMethod::Consul { endpoint } => {
230 self.discover_via_consul(endpoint)
232 }
233 }
234 }
235
236 fn discover_via_multicast(&self, group: &IpAddr, port: u16) -> CoreResult<Vec<NodeInfo>> {
237 Self::multicast_discovery(group, port)
238 }
239
240 fn discover_via_dns_service(&self, _servicename: &str) -> CoreResult<Vec<NodeInfo>> {
241 Ok(vec![])
243 }
244
245 fn discover_via_consul(&self, endpoint: &str) -> CoreResult<Vec<NodeInfo>> {
246 Ok(vec![])
248 }
249
250 fn is_node_reachable(address: SocketAddr) -> CoreResult<bool> {
251 Ok(true) }
255
256 fn multicast_discovery(group: &IpAddr, port: u16) -> CoreResult<Vec<NodeInfo>> {
257 use std::net::{SocketAddr, UdpSocket};
258 use std::time::Duration;
259
260 let mut discovered_nodes = Vec::new();
261
262 let socket = UdpSocket::bind(SocketAddr::new(*group, port)).map_err(|e| {
264 CoreError::IoError(crate::error::ErrorContext::new(format!(
265 "Failed to bind multicast socket: {e}"
266 )))
267 })?;
268
269 socket
271 .set_read_timeout(Some(Duration::from_secs(5)))
272 .map_err(|e| {
273 CoreError::IoError(crate::error::ErrorContext::new(format!(
274 "Failed to set socket timeout: {e}"
275 )))
276 })?;
277
278 let discovery_message = b"SCIRS2_NODE_DISCOVERY";
280 let broadcast_addr = SocketAddr::new(*group, port);
281
282 match socket.send_to(discovery_message, broadcast_addr) {
283 Ok(_) => {
284 let mut buffer = [0u8; 1024];
286 let start_time = std::time::Instant::now();
287
288 while start_time.elapsed() < Duration::from_secs(3) {
289 match socket.recv_from(&mut buffer) {
290 Ok((size, addr)) => {
291 let response = String::from_utf8_lossy(&buffer[..size]);
292 if response.starts_with("SCIRS2_NODE_RESPONSE") {
293 let parts: Vec<&str> = response.split(':').collect();
295 if parts.len() >= 3 {
296 let nodeid = parts[1usize].to_string();
297 let node_type = match parts[2usize] {
298 "master" => NodeType::Master,
299 "worker" => NodeType::Worker,
300 "storage" => NodeType::Storage,
301 "compute" => NodeType::Compute,
302 _ => NodeType::Worker,
303 };
304
305 discovered_nodes.push(NodeInfo {
306 id: nodeid,
307 address: addr,
308 node_type,
309 capabilities: NodeCapabilities::default(),
310 status: NodeStatus::Unknown,
311 last_seen: Instant::now(),
312 metadata: NodeMetadata::default(),
313 });
314 }
315 }
316 }
317 Err(_) => break, }
319 }
320 }
321 Err(e) => {
322 return Err(CoreError::IoError(crate::error::ErrorContext::new(
323 format!("Failed to send discovery broadcast: {e}"),
324 )));
325 }
326 }
327
328 Ok(discovered_nodes)
329 }
330
331 fn dns_discovery(_servicename: &str) -> CoreResult<Vec<NodeInfo>> {
332 #[allow(unused_mut)]
335 let mut discovered_nodes = Vec::new();
336
337 #[cfg(target_os = "linux")]
338 {
339 use std::process::Command;
340 use std::str;
341 match Command::new("avahi-browse")
343 .arg("-t") .arg("-r") .arg("-p") .arg(_servicename)
347 .output()
348 {
349 Ok(output) => {
350 let output_str = str::from_utf8(&output.stdout).map_err(|e| {
351 CoreError::ValidationError(ErrorContext::new(format!(
352 "Failed to parse avahi output: {e}"
353 )))
354 })?;
355
356 for line in output_str.lines() {
358 let parts: Vec<&str> = line.split(';').collect();
359 if parts.len() >= 9 && parts[0usize] == "=" {
360 let hostname = parts[6usize];
362 let address_str = parts[7usize];
363 let port_str = parts[8usize];
364
365 if let Ok(port) = port_str.parse::<u16>() {
366 if let Ok(ip) = address_str.parse::<IpAddr>() {
368 let socket_addr = SocketAddr::new(ip, port);
369 let nodeid = format!("dns_{hostname}_{port}");
370
371 discovered_nodes.push(NodeInfo {
372 id: nodeid,
373 address: socket_addr,
374 node_type: NodeType::Worker,
375 capabilities: NodeCapabilities::default(),
376 status: NodeStatus::Unknown,
377 last_seen: Instant::now(),
378 metadata: NodeMetadata {
379 hostname: hostname.to_string(),
380 operating_system: "unknown".to_string(),
381 kernel_version: "unknown".to_string(),
382 container_runtime: None,
383 labels: std::collections::HashMap::new(),
384 },
385 });
386 }
387 }
388 }
389 }
390 }
391 Err(_) => {
392 match Command::new("nslookup")
394 .arg("-type=SRV")
395 .arg(_servicename)
396 .output()
397 {
398 Ok(output) => {
399 let output_str = str::from_utf8(&output.stdout).map_err(|e| {
400 CoreError::ValidationError(ErrorContext::new(format!(
401 "Failed to parse nslookup output: {e}"
402 )))
403 })?;
404
405 for line in output_str.lines() {
407 if line.contains("service =") {
408 let parts: Vec<&str> = line.split_whitespace().collect();
410 if parts.len() >= 4 {
411 if let Ok(port) = parts[2usize].parse::<u16>() {
412 let hostname = parts[3usize].trim_end_matches('.');
413 let nodeid = format!("srv_{hostname}_{port}");
414
415 if let Ok(mut addrs) =
417 std::net::ToSocketAddrs::to_socket_addrs(&format!(
418 "{hostname}:{port}"
419 ))
420 {
421 if let Some(addr) = addrs.next() {
422 discovered_nodes.push(NodeInfo {
423 id: nodeid,
424 address: addr,
425 node_type: NodeType::Worker,
426 capabilities: NodeCapabilities::default(),
427 status: NodeStatus::Unknown,
428 last_seen: Instant::now(),
429 metadata: NodeMetadata {
430 hostname: hostname.to_string(),
431 operating_system: "unknown".to_string(),
432 kernel_version: "unknown".to_string(),
433 container_runtime: None,
434 labels: std::collections::HashMap::new(
435 ),
436 },
437 });
438 }
439 }
440 }
441 }
442 }
443 }
444 }
445 Err(_) => {
446 }
448 }
449 }
450 }
451 }
452
453 #[cfg(target_os = "windows")]
454 {
455 match Command::new("dns-sd")
457 .arg("-B") .arg(_servicename)
459 .output()
460 {
461 Ok(output) => {
462 let output_str = str::from_utf8(&output.stdout).map_err(|e| {
463 CoreError::ValidationError(ErrorContext::new(format!(
464 "Failed to parse dns-sd output: {e}"
465 )))
466 })?;
467
468 for line in output_str.lines() {
470 if line.contains(_servicename) {
471 let parts: Vec<&str> = line.split_whitespace().collect();
474 if parts.len() >= 2 {
475 let service_instance = parts[1usize];
476 let nodeid = format!("dnssd_{service_instance}");
477
478 let socket_addr = SocketAddr::new(
481 IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
482 8080,
483 );
484
485 discovered_nodes.push(NodeInfo {
486 id: nodeid,
487 address: socket_addr,
488 node_type: NodeType::Worker,
489 capabilities: NodeCapabilities::default(),
490 status: NodeStatus::Unknown,
491 last_seen: Instant::now(),
492 metadata: NodeMetadata::default(),
493 });
494 }
495 }
496 }
497 }
498 Err(_) => {
499 }
501 }
502 }
503
504 Ok(discovered_nodes)
505 }
506
507 fn consul_discovery(endpoint: &str) -> CoreResult<Vec<NodeInfo>> {
508 use std::process::Command;
510 use std::str;
511
512 let mut discovered_nodes = Vec::new();
513
514 let consul_url = if endpoint.starts_with("http") {
516 format!("{endpoint}/v1/catalog/services")
517 } else {
518 format!("http://{endpoint}/v1/catalog/services")
519 };
520
521 match Command::new("curl")
523 .arg("-s") .arg("-f") .arg("--connect-timeout")
526 .arg("5") .arg(&consul_url)
528 .output()
529 {
530 Ok(output) => {
531 if output.status.success() {
532 let json_str = str::from_utf8(&output.stdout).map_err(|e| {
533 CoreError::ValidationError(ErrorContext::new(format!(
534 "Failed to parse Consul response: {e}"
535 )))
536 })?;
537
538 if json_str.trim().starts_with('{') {
541 let cleaned = json_str.replace(['{', '}'], "");
543 for service_entry in cleaned.split(',') {
544 let service_parts: Vec<&str> = service_entry.split(':').collect();
545 if service_parts.len() >= 2 {
546 let service_name = service_parts[0usize].trim().trim_matches('"');
547
548 let service_url = if endpoint.starts_with("http") {
550 format!("{endpoint}/v1/catalog/service/{service_name}")
551 } else {
552 format!("http://{endpoint}/v1/catalog/service/{service_name}")
553 };
554
555 match Command::new("curl")
556 .arg("-s")
557 .arg("-f")
558 .arg("--connect-timeout")
559 .arg("3")
560 .arg(&service_url)
561 .output()
562 {
563 Ok(service_output) => {
564 if service_output.status.success() {
565 let service_json =
566 str::from_utf8(&service_output.stdout)
567 .unwrap_or("");
568
569 if service_json.contains("\"Address\"")
572 && service_json.contains("\"ServicePort\"")
573 {
574 let lines: Vec<&str> =
576 service_json.lines().collect();
577 let mut address_str = "";
578 let mut port_str = "";
579
580 for line in lines {
581 if line.contains("\"Address\"") {
582 if let Some(addr_part) =
583 line.split(':').nth(1)
584 {
585 address_str = addr_part
586 .trim()
587 .trim_matches('"')
588 .trim_matches(',');
589 }
590 }
591 if line.contains("\"ServicePort\"") {
592 if let Some(port_part) =
593 line.split(':').nth(1)
594 {
595 port_str =
596 port_part.trim().trim_matches(',');
597 }
598 }
599 }
600
601 if !address_str.is_empty() && !port_str.is_empty() {
603 if let (Ok(ip), Ok(port)) = (
604 address_str.parse::<IpAddr>(),
605 port_str.parse::<u16>(),
606 ) {
607 let socket_addr = SocketAddr::new(ip, port);
608 let nodeid = format!(
609 "consul_{service_name}_{address_str}"
610 );
611
612 discovered_nodes.push(NodeInfo {
613 id: nodeid,
614 address: socket_addr,
615 node_type: NodeType::Worker,
616 capabilities: NodeCapabilities::default(),
617 status: NodeStatus::Unknown,
618 last_seen: Instant::now(),
619 metadata: NodeMetadata {
620 hostname: address_str.to_string(),
621 operating_system: "unknown".to_string(),
622 kernel_version: "unknown".to_string(),
623 container_runtime: Some("consul".to_string()),
624 labels: {
625 let mut labels = std::collections::HashMap::new();
626 labels.insert("service".to_string(), service_name.to_string());
627 labels.insert("discovery".to_string(), "consul".to_string());
628 labels
629 },
630 },
631 });
632 }
633 }
634 }
635 }
636 }
637 Err(_) => continue, }
639 }
640 }
641 }
642 } else {
643 return Err(CoreError::IoError(ErrorContext::new(format!(
644 "Failed to connect to Consul at {endpoint}"
645 ))));
646 }
647 }
648 Err(_) => {
649 return Err(CoreError::InvalidState(ErrorContext::new(
650 "curl command not available for Consul discovery",
651 )));
652 }
653 }
654
655 Ok(discovered_nodes)
656 }
657
658 fn healthmonitoring_loop(
659 healthmonitor: &Arc<Mutex<HealthMonitor>>,
660 registry: &Arc<RwLock<NodeRegistry>>,
661 eventlog: &Arc<Mutex<ClusterEventLog>>,
662 ) -> CoreResult<()> {
663 let nodes = {
664 let registry_read = registry.read().map_err(|_| {
665 CoreError::InvalidState(ErrorContext::new("Failed to acquire registry lock"))
666 })?;
667 registry_read.get_all_nodes()
668 };
669
670 let mut monitor = healthmonitor.lock().map_err(|_| {
671 CoreError::InvalidState(ErrorContext::new("Failed to acquire health monitor lock"))
672 })?;
673
674 for nodeinfo in nodes {
675 let health_status = monitor.check_node_health(&nodeinfo)?;
676
677 let mut registry_write = registry.write().map_err(|_| {
679 CoreError::InvalidState(ErrorContext::new("Failed to acquire registry lock"))
680 })?;
681
682 let previous_status = registry_write.get_node_status(&nodeinfo.id);
683 registry_write.update_node_status(&nodeinfo.id, health_status.status)?;
684
685 if let Some(prev_status) = previous_status {
687 if prev_status != health_status.status {
688 let mut log = eventlog.lock().map_err(|_| {
689 CoreError::InvalidState(ErrorContext::new(
690 "Failed to acquire event log lock",
691 ))
692 })?;
693 log.log_event(ClusterEvent::NodeStatusChanged {
694 nodeid: nodeinfo.id.clone(),
695 old_status: prev_status,
696 new_status: health_status.status,
697 timestamp: Instant::now(),
698 });
699 }
700 }
701 }
702
703 Ok(())
704 }
705
706 fn resource_management_loop(
707 allocator: &Arc<RwLock<ResourceAllocator>>,
708 registry: &Arc<RwLock<NodeRegistry>>,
709 ) -> CoreResult<()> {
710 let nodes = {
711 let registry_read = registry.read().map_err(|_| {
712 CoreError::InvalidState(ErrorContext::new("Failed to acquire registry lock"))
713 })?;
714 registry_read.get_healthy_nodes()
715 };
716
717 let mut allocator_write = allocator.write().map_err(|_| {
718 CoreError::InvalidState(ErrorContext::new("Failed to acquire allocator lock"))
719 })?;
720
721 allocator_write.update_available_resources(&nodes)?;
722 allocator_write.optimize_resource_allocation()?;
723
724 Ok(())
725 }
726
727 fn cluster_coordination_loop(
728 cluster_state: &Arc<RwLock<ClusterState>>,
729 registry: &Arc<RwLock<NodeRegistry>>,
730 eventlog: &Arc<Mutex<ClusterEventLog>>,
731 ) -> CoreResult<()> {
732 let healthy_nodes = {
733 let registry_read = registry.read().map_err(|_| {
734 CoreError::InvalidState(ErrorContext::new("Failed to acquire registry lock"))
735 })?;
736 registry_read.get_healthy_nodes()
737 };
738
739 let mut state_write = cluster_state.write().map_err(|_| {
740 CoreError::InvalidState(ErrorContext::new("Failed to acquire cluster _state lock"))
741 })?;
742
743 state_write.update_topology(&healthy_nodes)?;
745
746 if state_write.needs_leader_election() {
748 let new_leader: Option<String> = None;
750 if let Some(leader) = new_leader {
751 state_write.set_leader(leader.clone());
752
753 let mut log = eventlog.lock().map_err(|_| {
754 CoreError::InvalidState(ErrorContext::new("Failed to acquire event log lock"))
755 })?;
756 log.log_event(ClusterEvent::LeaderElected {
757 nodeid: leader,
758 timestamp: Instant::now(),
759 });
760 }
761 }
762
763 Ok(())
764 }
765
766 fn elect_leader(&self, nodes: &[NodeInfo]) -> CoreResult<Option<String>> {
767 if nodes.is_empty() {
769 return Ok(None);
770 }
771
772 let leader = nodes
774 .iter()
775 .filter(|node| node.status == NodeStatus::Healthy)
776 .min_by(|a, b| a.id.cmp(&b.id));
777
778 Ok(leader.map(|node| node.id.clone()))
779 }
780
781 pub fn register_node(&self, nodeinfo: NodeInfo) -> CoreResult<()> {
783 let mut registry = self.node_registry.write().map_err(|_| {
784 CoreError::InvalidState(
785 ErrorContext::new("Failed to acquire registry lock")
786 .with_location(crate::error::ErrorLocation::new(file!(), line!())),
787 )
788 })?;
789
790 registry.register_node(nodeinfo)?;
791 Ok(())
792 }
793
794 pub fn get_health(&self) -> CoreResult<ClusterHealth> {
796 let registry = self.node_registry.read().map_err(|_| {
797 CoreError::InvalidState(
798 ErrorContext::new("Failed to acquire registry lock")
799 .with_location(crate::error::ErrorLocation::new(file!(), line!())),
800 )
801 })?;
802
803 let all_nodes = registry.get_all_nodes();
804 let healthy_nodes = all_nodes
805 .iter()
806 .filter(|n| n.status == NodeStatus::Healthy)
807 .count();
808 let total_nodes = all_nodes.len();
809
810 let health_percentage = if total_nodes == 0 {
811 100.0
812 } else {
813 (healthy_nodes as f64 / total_nodes as f64) * 100.0
814 };
815
816 let status = if health_percentage >= 80.0 {
817 ClusterHealthStatus::Healthy
818 } else if health_percentage >= 50.0 {
819 ClusterHealthStatus::Degraded
820 } else {
821 ClusterHealthStatus::Unhealthy
822 };
823
824 Ok(ClusterHealth {
825 status,
826 healthy_nodes,
827 total_nodes,
828 health_percentage,
829 last_updated: Instant::now(),
830 })
831 }
832
833 pub fn get_active_nodes(&self) -> CoreResult<Vec<NodeInfo>> {
835 let registry = self.node_registry.read().map_err(|_| {
836 CoreError::InvalidState(
837 ErrorContext::new("Failed to acquire registry lock")
838 .with_location(crate::error::ErrorLocation::new(file!(), line!())),
839 )
840 })?;
841
842 Ok(registry.get_healthy_nodes())
843 }
844
845 pub fn get_availablenodes(&self) -> CoreResult<HashMap<String, NodeInfo>> {
847 let registry = self.node_registry.read().map_err(|_| {
848 CoreError::InvalidState(
849 ErrorContext::new("Failed to acquire registry lock")
850 .with_location(crate::error::ErrorLocation::new(file!(), line!())),
851 )
852 })?;
853
854 let nodes = registry.get_healthy_nodes();
855 let mut node_map = HashMap::new();
856 for node in nodes {
857 node_map.insert(node.id.clone(), node);
858 }
859 Ok(node_map)
860 }
861
862 pub fn get_total_capacity(&self) -> CoreResult<ComputeCapacity> {
864 let registry = self.node_registry.read().map_err(|_| {
865 CoreError::InvalidState(
866 ErrorContext::new("Failed to acquire registry lock")
867 .with_location(crate::error::ErrorLocation::new(file!(), line!())),
868 )
869 })?;
870
871 let nodes = registry.get_healthy_nodes();
872 let mut total_capacity = ComputeCapacity::default();
873
874 for node in nodes {
875 total_capacity.cpu_cores += node.capabilities.cpu_cores;
876 total_capacity.memory_gb += node.capabilities.memory_gb;
877 total_capacity.gpu_count += node.capabilities.gpu_count;
878 total_capacity.disk_space_gb += node.capabilities.disk_space_gb;
879 }
880
881 Ok(total_capacity)
882 }
883
884 pub fn submit_task(&self, task: DistributedTask) -> CoreResult<TaskId> {
886 let allocator = self.resource_allocator.read().map_err(|_| {
887 CoreError::InvalidState(ErrorContext::new("Failed to acquire allocator lock"))
888 })?;
889
890 let allocation = allocator.allocate_resources(&task.resource_requirements)?;
891
892 let taskid = TaskId::generate();
894 let _execution_plan = ExecutionPlan {
895 taskid: taskid.clone(),
896 task,
897 node_allocation: allocation,
898 created_at: Instant::now(),
899 status: ExecutionStatus::Pending,
900 };
901
902 Ok(taskid)
905 }
906
907 pub fn get_cluster_statistics(&self) -> CoreResult<ClusterStatistics> {
909 let registry = self.node_registry.read().map_err(|_| {
910 CoreError::InvalidState(
911 ErrorContext::new("Failed to acquire registry lock")
912 .with_location(crate::error::ErrorLocation::new(file!(), line!())),
913 )
914 })?;
915
916 let allocator = self.resource_allocator.read().map_err(|_| {
917 CoreError::InvalidState(ErrorContext::new("Failed to acquire allocator lock"))
918 })?;
919
920 let nodes = registry.get_all_nodes();
921 let total_capacity = self.get_total_capacity()?;
922 let available_capacity = (*allocator).available_capacity();
923
924 Ok(ClusterStatistics {
925 total_nodes: nodes.len(),
926 healthy_nodes: nodes
927 .iter()
928 .filter(|n| n.status == NodeStatus::Healthy)
929 .count(),
930 total_capacity: total_capacity.clone(),
931 available_capacity: available_capacity.clone(),
932 resource_utilization: ResourceUtilization {
933 cpu_utilization: 1.0
934 - (available_capacity.cpu_cores as f64 / total_capacity.cpu_cores as f64),
935 memory_utilization: 1.0
936 - (available_capacity.memory_gb as f64 / total_capacity.memory_gb as f64),
937 gpu_utilization: if total_capacity.gpu_count > 0 {
938 1.0 - (available_capacity.gpu_count as f64 / total_capacity.gpu_count as f64)
939 } else {
940 0.0
941 },
942 },
943 })
944 }
945}
946
947#[derive(Debug)]
949pub struct ClusterState {
950 leader_node: Option<String>,
951 topology: ClusterTopology,
952 last_updated: Instant,
953}
954
955impl Default for ClusterState {
956 fn default() -> Self {
957 Self::new()
958 }
959}
960
961impl ClusterState {
962 pub fn new() -> Self {
963 Self {
964 leader_node: None,
965 topology: ClusterTopology::new(),
966 last_updated: Instant::now(),
967 }
968 }
969
970 pub fn update_topology(&mut self, nodes: &[NodeInfo]) -> CoreResult<()> {
971 self.topology.update(nodes);
972 self.last_updated = Instant::now();
973 Ok(())
974 }
975
976 pub fn needs_leader_election(&self) -> bool {
977 self.leader_node.is_none() || self.last_updated.elapsed() > Duration::from_secs(300)
978 }
980
981 pub fn set_leader(&mut self, nodeid: String) {
982 self.leader_node = Some(nodeid);
983 self.last_updated = Instant::now();
984 }
985}
986
987#[derive(Debug)]
989pub struct NodeRegistry {
990 nodes: HashMap<String, NodeInfo>,
991 node_status: HashMap<String, NodeStatus>,
992}
993
994impl Default for NodeRegistry {
995 fn default() -> Self {
996 Self::new()
997 }
998}
999
1000impl NodeRegistry {
1001 pub fn new() -> Self {
1002 Self {
1003 nodes: HashMap::new(),
1004 node_status: HashMap::new(),
1005 }
1006 }
1007
1008 pub fn register_node(&mut self, nodeinfo: NodeInfo) -> CoreResult<bool> {
1009 let is_new = !self.nodes.contains_key(&nodeinfo.id);
1010 self.nodes.insert(nodeinfo.id.clone(), nodeinfo.clone());
1011 self.node_status
1012 .insert(nodeinfo.id.clone(), nodeinfo.status);
1013 Ok(is_new)
1014 }
1015
1016 pub fn get_all_nodes(&self) -> Vec<NodeInfo> {
1017 self.nodes.values().cloned().collect()
1018 }
1019
1020 pub fn get_healthy_nodes(&self) -> Vec<NodeInfo> {
1021 self.nodes
1022 .values()
1023 .filter(|node| self.node_status.get(&node.id) == Some(&NodeStatus::Healthy))
1024 .cloned()
1025 .collect()
1026 }
1027
1028 pub fn get_node_status(&self, nodeid: &str) -> Option<NodeStatus> {
1029 self.node_status.get(nodeid).copied()
1030 }
1031
1032 pub fn update_node_status(&mut self, nodeid: &str, status: NodeStatus) -> CoreResult<()> {
1033 if let Some(node) = self.nodes.get_mut(nodeid) {
1034 node.status = status;
1035 self.node_status.insert(nodeid.to_string(), status);
1036 }
1037 Ok(())
1038 }
1039}
1040
1041#[derive(Debug)]
1043pub struct HealthMonitor {
1044 health_checks: Vec<HealthCheck>,
1045 #[allow(dead_code)]
1046 check_interval: Duration,
1047}
1048
1049impl HealthMonitor {
1050 pub fn new() -> CoreResult<Self> {
1051 Ok(Self {
1052 health_checks: Self::default_health_checks(),
1053 check_interval: Duration::from_secs(30),
1054 })
1055 }
1056
1057 fn default_health_checks() -> Vec<HealthCheck> {
1058 vec![
1059 HealthCheck::Ping,
1060 HealthCheck::CpuLoad,
1061 HealthCheck::MemoryUsage,
1062 HealthCheck::DiskSpace,
1063 HealthCheck::NetworkConnectivity,
1064 ]
1065 }
1066
1067 pub fn check_node_health(&mut self, node: &NodeInfo) -> CoreResult<NodeHealthStatus> {
1068 let mut health_score = 100.0f64;
1069 let mut failing_checks = Vec::new();
1070
1071 for check in &self.health_checks {
1072 match self.execute_health_check(check, node) {
1073 Ok(result) => {
1074 if !result.is_healthy {
1075 health_score -= result.impact_score;
1076 failing_checks.push(check.clone());
1077 }
1078 }
1079 Err(_) => {
1080 health_score -= 20.0f64; failing_checks.push(check.clone());
1082 }
1083 }
1084 }
1085
1086 let status = if health_score >= 80.0 {
1087 NodeStatus::Healthy
1088 } else if health_score >= 50.0 {
1089 NodeStatus::Degraded
1090 } else {
1091 NodeStatus::Unhealthy
1092 };
1093
1094 Ok(NodeHealthStatus {
1095 status,
1096 health_score,
1097 failing_checks,
1098 last_checked: Instant::now(),
1099 })
1100 }
1101
1102 fn execute_health_check(
1103 &self,
1104 check: &HealthCheck,
1105 node: &NodeInfo,
1106 ) -> CoreResult<HealthCheckResult> {
1107 match check {
1108 HealthCheck::Ping => {
1109 Ok(HealthCheckResult {
1111 is_healthy: true, impact_score: 10.0f64,
1113 details: "Ping successful".to_string(),
1114 })
1115 }
1116 HealthCheck::CpuLoad => {
1117 Ok(HealthCheckResult {
1119 is_healthy: true, impact_score: 15.0f64,
1121 details: "CPU load normal".to_string(),
1122 })
1123 }
1124 HealthCheck::MemoryUsage => {
1125 Ok(HealthCheckResult {
1127 is_healthy: true, impact_score: 20.0f64,
1129 details: "Memory usage normal".to_string(),
1130 })
1131 }
1132 HealthCheck::DiskSpace => {
1133 Ok(HealthCheckResult {
1135 is_healthy: true, impact_score: 10.0f64,
1137 details: "Disk space adequate".to_string(),
1138 })
1139 }
1140 HealthCheck::NetworkConnectivity => {
1141 let _ = node; Ok(HealthCheckResult {
1144 is_healthy: true, impact_score: 15.0f64,
1146 details: "Network connectivity good".to_string(),
1147 })
1148 }
1149 }
1150 }
1151}
1152
1153#[derive(Debug)]
1155pub struct ResourceAllocator {
1156 allocations: HashMap<TaskId, ResourceAllocation>,
1157 available_resources: ComputeCapacity,
1158 allocation_strategy: AllocationStrategy,
1159}
1160
1161impl Default for ResourceAllocator {
1162 fn default() -> Self {
1163 Self::new()
1164 }
1165}
1166
1167#[allow(dead_code)]
1168impl ResourceAllocator {
1169 pub fn new() -> Self {
1170 Self {
1171 allocations: HashMap::new(),
1172 available_resources: ComputeCapacity::default(),
1173 allocation_strategy: AllocationStrategy::FirstFit,
1174 }
1175 }
1176
1177 pub fn update_available_resources(&mut self, nodes: &[NodeInfo]) -> CoreResult<()> {
1178 self.available_resources = ComputeCapacity::default();
1179
1180 for node in nodes {
1181 if node.status == NodeStatus::Healthy {
1182 self.available_resources.cpu_cores += node.capabilities.cpu_cores;
1183 self.available_resources.memory_gb += node.capabilities.memory_gb;
1184 self.available_resources.gpu_count += node.capabilities.gpu_count;
1185 self.available_resources.disk_space_gb += node.capabilities.disk_space_gb;
1186 }
1187 }
1188
1189 for allocation in self.allocations.values() {
1191 self.available_resources.cpu_cores = self
1192 .available_resources
1193 .cpu_cores
1194 .saturating_sub(allocation.allocated_resources.cpu_cores);
1195 self.available_resources.memory_gb = self
1196 .available_resources
1197 .memory_gb
1198 .saturating_sub(allocation.allocated_resources.memory_gb);
1199 self.available_resources.gpu_count = self
1200 .available_resources
1201 .gpu_count
1202 .saturating_sub(allocation.allocated_resources.gpu_count);
1203 self.available_resources.disk_space_gb = self
1204 .available_resources
1205 .disk_space_gb
1206 .saturating_sub(allocation.allocated_resources.disk_space_gb);
1207 }
1208
1209 Ok(())
1210 }
1211
1212 pub fn allocate_resources(
1213 &self,
1214 requirements: &ResourceRequirements,
1215 ) -> CoreResult<ResourceAllocation> {
1216 if !self.can_satisfy_requirements(requirements) {
1218 return Err(CoreError::ResourceError(ErrorContext::new(
1219 "Insufficient resources available",
1220 )));
1221 }
1222
1223 Ok(ResourceAllocation {
1225 allocation_id: AllocationId::generate(),
1226 allocated_resources: ComputeCapacity {
1227 cpu_cores: requirements.cpu_cores,
1228 memory_gb: requirements.memory_gb,
1229 gpu_count: requirements.gpu_count,
1230 disk_space_gb: requirements.disk_space_gb,
1231 },
1232 assigned_nodes: Vec::new(), created_at: Instant::now(),
1234 expires_at: None,
1235 })
1236 }
1237
1238 fn can_satisfy_requirements(&self, requirements: &ResourceRequirements) -> bool {
1239 self.available_resources.cpu_cores >= requirements.cpu_cores
1240 && self.available_resources.memory_gb >= requirements.memory_gb
1241 && self.available_resources.gpu_count >= requirements.gpu_count
1242 && self.available_resources.disk_space_gb >= requirements.disk_space_gb
1243 }
1244
1245 pub fn optimize_resource_allocation(&mut self) -> CoreResult<()> {
1246 match self.allocation_strategy {
1248 AllocationStrategy::FirstFit => {
1249 }
1251 AllocationStrategy::BestFit => {
1252 self.optimize_best_fit()?;
1254 }
1255 AllocationStrategy::LoadBalanced => {
1256 self.optimize_load_balanced()?;
1258 }
1259 }
1260 Ok(())
1261 }
1262
1263 fn optimize_best_fit(&mut self) -> CoreResult<()> {
1264 let mut allocations: Vec<(TaskId, ResourceAllocation)> = self
1269 .allocations
1270 .iter()
1271 .map(|(k, v)| (k.clone(), v.clone()))
1272 .collect();
1273
1274 allocations.sort_by(|a, b| {
1277 let weight_a = a.1.allocated_resources.cpu_cores
1278 + a.1.allocated_resources.memory_gb
1279 + a.1.allocated_resources.gpu_count * 4 + a.1.allocated_resources.disk_space_gb / 10; let weight_b = b.1.allocated_resources.cpu_cores
1282 + b.1.allocated_resources.memory_gb
1283 + b.1.allocated_resources.gpu_count * 4
1284 + b.1.allocated_resources.disk_space_gb / 10;
1285 weight_b.cmp(&weight_a)
1286 });
1287
1288 let mut optimizations_made = 0;
1293 let fragmentation_score_before = self.calculate_fragmentation_score();
1294
1295 let (large_allocations, medium_allocations, small_allocations): (Vec<_>, Vec<_>, Vec<_>) = {
1297 let mut large = Vec::new();
1298 let mut medium = Vec::new();
1299 let mut small = Vec::new();
1300
1301 for (taskid, allocation) in allocations {
1302 let total_resources = allocation.allocated_resources.cpu_cores
1303 + allocation.allocated_resources.memory_gb
1304 + allocation.allocated_resources.gpu_count * 4;
1305
1306 if total_resources >= 32 {
1307 large.push((taskid.clone(), allocation.clone()));
1308 } else if total_resources >= 8 {
1309 medium.push((taskid.clone(), allocation.clone()));
1310 } else {
1311 small.push((taskid.clone(), allocation.clone()));
1312 }
1313 }
1314
1315 (large, medium, small)
1316 };
1317
1318 for (taskid, allocation) in large_allocations {
1321 if allocation.assigned_nodes.len() > 1 {
1322 if self.attempt_consolidation(&taskid, &allocation)? {
1324 optimizations_made += 1;
1325 }
1326 }
1327 }
1328
1329 for (taskid, allocation) in medium_allocations {
1332 if self.attempt_best_fit_pairing(&taskid, &allocation)? {
1333 optimizations_made += 1;
1334 }
1335 }
1336
1337 for (taskid, allocation) in small_allocations {
1340 if self.attempt_small_allocation_packing(&taskid, &allocation)? {
1341 optimizations_made += 1;
1342 }
1343 }
1344
1345 let fragmentation_score_after = self.calculate_fragmentation_score();
1347 let _improvement = fragmentation_score_before - fragmentation_score_after;
1348
1349 if optimizations_made > 0 {
1350 #[cfg(feature = "logging")]
1351 log::info!(
1352 "Best-fit optimization completed: {optimizations_made} optimizations, fragmentation improved by {_improvement:.2}"
1353 );
1354 }
1355
1356 Ok(())
1357 }
1358
1359 fn optimize_load_balanced(&mut self) -> CoreResult<()> {
1360 let mut nodeloads = HashMap::new();
1365 let mut total_load = 0.0f64;
1366
1367 for allocation in self.allocations.values() {
1369 for nodeid in &allocation.assigned_nodes {
1370 let load_weight =
1371 self.calculate_allocation_load_weight(&allocation.allocated_resources);
1372 *nodeloads.entry(nodeid.clone()).or_insert(0.0) += load_weight;
1373 total_load += load_weight;
1374 }
1375 }
1376
1377 let num_active_nodes = nodeloads.len().max(1);
1379 let target_load_per_node = total_load / num_active_nodes as f64;
1380 let load_variance_threshold = target_load_per_node * 0.15f64; let mut overloaded_nodes = Vec::new();
1384 let mut underloaded_nodes = Vec::new();
1385
1386 for (nodeid, ¤t_load) in &nodeloads {
1387 let load_diff = current_load - target_load_per_node;
1388 if load_diff > load_variance_threshold {
1389 overloaded_nodes.push((nodeid.clone(), current_load, load_diff));
1390 } else if load_diff < -load_variance_threshold {
1391 underloaded_nodes.push((nodeid.clone(), current_load, -load_diff));
1392 }
1393 }
1394
1395 overloaded_nodes.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap());
1397 underloaded_nodes.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap());
1398
1399 let mut rebalancing_actions = 0;
1400 let initial_variance = self.calculate_load_variance(&nodeloads);
1401
1402 for (overloaded_node, current_load, overloaded_amount) in overloaded_nodes {
1404 let moveable_allocations = self.find_moveable_allocations(&overloaded_node);
1406
1407 for (taskid, allocation) in moveable_allocations {
1408 if let Some((target_node, _)) = self.find_best_target_node(
1410 &allocation.allocated_resources,
1411 &underloaded_nodes
1412 .iter()
1413 .map(|(nodeid, load, _)| (nodeid.clone(), *load))
1414 .collect::<Vec<_>>(),
1415 )? {
1416 if self.attempt_allocation_migration(&taskid, &target_node)? {
1418 rebalancing_actions += 1;
1419
1420 let allocation_weight =
1422 self.calculate_allocation_load_weight(&allocation.allocated_resources);
1423 if let Some(old_load) = nodeloads.get_mut(&overloaded_node) {
1424 *old_load -= allocation_weight;
1425 }
1426 if let Some(new_load) = nodeloads.get_mut(&target_node) {
1427 *new_load += allocation_weight;
1428 }
1429
1430 if nodeloads.get(&overloaded_node).copied().unwrap_or(0.0)
1432 <= target_load_per_node + load_variance_threshold
1433 {
1434 break; }
1436 }
1437 }
1438 }
1439 }
1440
1441 let single_node_allocations: Vec<(TaskId, ResourceAllocation)> = self
1443 .allocations
1444 .iter()
1445 .filter(|(_, allocation)| allocation.assigned_nodes.len() == 1)
1446 .map(|(k, v)| (k.clone(), v.clone()))
1447 .collect();
1448
1449 for (taskid, allocation) in single_node_allocations {
1450 let load_weight =
1451 self.calculate_allocation_load_weight(&allocation.allocated_resources);
1452 if load_weight > target_load_per_node * 0.6 {
1453 if self.attempt_allocation_spreading(&taskid, &allocation)? {
1455 rebalancing_actions += 1;
1456 }
1457 }
1458 }
1459
1460 let final_variance = self.calculate_load_variance(&nodeloads);
1462 let _variance_improvement = initial_variance - final_variance;
1463
1464 if rebalancing_actions > 0 {
1465 #[cfg(feature = "logging")]
1466 log::info!(
1467 "Load-balanced optimization completed: {rebalancing_actions} rebalancing actions, \
1468 load variance improved by {_variance_improvement:.2}"
1469 );
1470 }
1471
1472 Ok(())
1473 }
1474
1475 pub fn get_available_capacity(&self) -> ComputeCapacity {
1476 self.available_resources.clone()
1477 }
1478
1479 fn calculate_fragmentation_score(&self) -> f64 {
1482 let total_allocated_resources = self.allocations.len() as f64;
1485 if total_allocated_resources == 0.0 {
1486 return 0.0f64;
1487 }
1488
1489 let split_allocations = self
1491 .allocations
1492 .values()
1493 .filter(|alloc| alloc.assigned_nodes.len() > 1)
1494 .count() as f64;
1495
1496 let mut total_efficiency = 0.0f64;
1498 for allocation in self.allocations.values() {
1499 let resource_efficiency =
1500 self.calculate_resource_efficiency(&allocation.allocated_resources);
1501 total_efficiency += resource_efficiency;
1502 }
1503 let avg_efficiency = total_efficiency / total_allocated_resources;
1504
1505 let split_ratio = split_allocations / total_allocated_resources;
1507 (split_ratio * 0.6 + (1.0 - avg_efficiency) * 0.4f64) * 100.0
1508 }
1509
1510 fn calculate_resource_efficiency(&self, resources: &ComputeCapacity) -> f64 {
1511 let cpu_ratio = resources.cpu_cores as f64;
1516 let _memory_ratio = resources.memory_gb as f64 / 4.0f64; let gpu_ratio = resources.gpu_count as f64 * 8.0f64; let total_compute = cpu_ratio + gpu_ratio;
1520 let balanced_memory = total_compute * 4.0f64;
1521
1522 let memory_efficiency = if resources.memory_gb as f64 > 0.0 {
1524 balanced_memory.min(resources.memory_gb as f64)
1525 / balanced_memory.max(resources.memory_gb as f64)
1526 } else {
1527 1.0
1528 };
1529
1530 let scale_efficiency = if total_compute < 2.0 {
1532 total_compute / 2.0 } else {
1534 1.0
1535 };
1536
1537 let combined_efficiency = memory_efficiency * 0.7 + scale_efficiency * 0.3f64;
1538 combined_efficiency.min(1.0)
1539 }
1540
1541 fn try_consolidate_large_allocation(
1542 &self,
1543 _allocation: &ResourceAllocation,
1544 ) -> CoreResult<bool> {
1545 Ok(false)
1552 }
1553
1554 fn try_optimize_medium_allocations(
1555 &self,
1556 _allocation: &ResourceAllocation,
1557 ) -> CoreResult<bool> {
1558 Ok(false)
1561 }
1562
1563 fn try_pack_small_allocations(&self, allocation: &ResourceAllocation) -> CoreResult<bool> {
1564 Ok(false)
1567 }
1568
1569 fn calculate_allocation_load_weight(&self, resources: &ComputeCapacity) -> f64 {
1570 let cpu_weight = resources.cpu_cores as f64;
1573 let memory_weight = resources.memory_gb as f64 * 0.25f64; let gpu_weight = resources.gpu_count as f64 * 8.0f64; let disk_weight = resources.disk_space_gb as f64 * 0.01f64; cpu_weight + memory_weight + gpu_weight + disk_weight
1578 }
1579
1580 fn calculate_load_variance(&self, nodeloads: &HashMap<String, f64>) -> f64 {
1581 if nodeloads.len() <= 1 {
1583 return 0.0f64;
1584 }
1585
1586 let total_load: f64 = nodeloads.values().sum();
1587 let mean_load = total_load / nodeloads.len() as f64;
1588
1589 let variance = nodeloads
1590 .values()
1591 .map(|&load| (load - mean_load).powi(2))
1592 .sum::<f64>()
1593 / nodeloads.len() as f64;
1594
1595 variance.sqrt() }
1597
1598 fn find_moveable_allocations(&self, nodeid: &str) -> Vec<(TaskId, ResourceAllocation)> {
1599 self.allocations
1601 .iter()
1602 .filter(|(_, allocation)| allocation.assigned_nodes.contains(&nodeid.to_string()))
1603 .map(|(taskid, allocation)| (taskid.clone(), allocation.clone()))
1604 .collect()
1605 }
1606
1607 fn find_best_underloaded_node(
1608 &self,
1609 nodes: &[(String, f64, f64)],
1610 _required_capacity: f64,
1611 ) -> Option<(String, f64)> {
1612 nodes
1615 .first()
1616 .map(|(nodeid, load, capacity)| (nodeid.clone(), *load))
1617 }
1618
1619 fn try_migrate_allocation(&self, _taskid: &TaskId, _targetnode: &str) -> CoreResult<bool> {
1620 Ok(false)
1627 }
1628
1629 fn try_spread_allocation(&self, allocation: &ResourceAllocation) -> CoreResult<bool> {
1630 Ok(false)
1633 }
1634
1635 pub fn available_capacity(&self) -> &ComputeCapacity {
1636 &self.available_resources
1637 }
1638
1639 pub fn attempt_consolidation(
1640 &mut self,
1641 _taskid: &TaskId,
1642 _allocation: &ResourceAllocation,
1643 ) -> CoreResult<bool> {
1644 Ok(false)
1646 }
1647
1648 pub fn attempt_best_fit_pairing(
1649 &mut self,
1650 _taskid: &TaskId,
1651 _allocation: &ResourceAllocation,
1652 ) -> CoreResult<bool> {
1653 Ok(false)
1655 }
1656
1657 pub fn attempt_small_allocation_packing(
1658 &mut self,
1659 _taskid: &TaskId,
1660 _allocation: &ResourceAllocation,
1661 ) -> CoreResult<bool> {
1662 Ok(false)
1664 }
1665
1666 pub fn find_best_target_node(
1667 &mut self,
1668 _resources: &ComputeCapacity,
1669 _underloaded_nodes: &[(String, f64)],
1670 ) -> CoreResult<Option<(String, f64)>> {
1671 Ok(None)
1673 }
1674
1675 pub fn attempt_allocation_migration(
1676 &mut self,
1677 _taskid: &TaskId,
1678 _to_node: &str,
1679 ) -> CoreResult<bool> {
1680 Ok(false)
1682 }
1683
1684 pub fn attempt_allocation_spreading(
1685 &mut self,
1686 _taskid: &TaskId,
1687 _allocation: &ResourceAllocation,
1688 ) -> CoreResult<bool> {
1689 Ok(false)
1691 }
1692}
1693
1694#[derive(Debug)]
1696pub struct ClusterEventLog {
1697 events: VecDeque<ClusterEvent>,
1698 max_events: usize,
1699}
1700
1701impl Default for ClusterEventLog {
1702 fn default() -> Self {
1703 Self::new()
1704 }
1705}
1706
1707impl ClusterEventLog {
1708 pub fn new() -> Self {
1709 Self {
1710 events: VecDeque::with_capacity(10000usize),
1711 max_events: 10000,
1712 }
1713 }
1714
1715 pub fn log_event(&mut self, event: ClusterEvent) {
1716 self.events.push_back(event);
1717
1718 while self.events.len() > self.max_events {
1720 self.events.pop_front();
1721 }
1722 }
1723
1724 pub fn get_recent_events(&self, count: usize) -> Vec<ClusterEvent> {
1725 self.events.iter().rev().take(count).cloned().collect()
1726 }
1727}
1728
1729#[derive(Debug, Clone)]
1732pub struct ClusterConfiguration {
1733 pub auto_discovery_enabled: bool,
1734 pub discovery_methods: Vec<NodeDiscoveryMethod>,
1735 pub health_check_interval: Duration,
1736 pub leadership_timeout: Duration,
1737 pub resource_allocation_strategy: AllocationStrategy,
1738 pub max_nodes: Option<usize>,
1739}
1740
1741impl Default for ClusterConfiguration {
1742 fn default() -> Self {
1743 Self {
1744 auto_discovery_enabled: true,
1745 discovery_methods: vec![NodeDiscoveryMethod::Static(vec![])],
1746 health_check_interval: Duration::from_secs(30),
1747 leadership_timeout: Duration::from_secs(300),
1748 resource_allocation_strategy: AllocationStrategy::FirstFit,
1749 max_nodes: None,
1750 }
1751 }
1752}
1753
1754#[derive(Debug, Clone)]
1755pub enum NodeDiscoveryMethod {
1756 Static(Vec<SocketAddr>),
1757 Multicast { group: IpAddr, port: u16 },
1758 DnsService { service_name: String },
1759 Consul { endpoint: String },
1760}
1761
1762#[derive(Debug, Clone)]
1763pub struct NodeInfo {
1764 pub id: String,
1765 pub address: SocketAddr,
1766 pub node_type: NodeType,
1767 pub capabilities: NodeCapabilities,
1768 pub status: NodeStatus,
1769 pub last_seen: Instant,
1770 pub metadata: NodeMetadata,
1771}
1772
1773#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1774pub enum NodeType {
1775 Master,
1776 Worker,
1777 Storage,
1778 Compute,
1779 ComputeOptimized,
1780 MemoryOptimized,
1781 StorageOptimized,
1782 General,
1783}
1784
1785#[derive(Debug, Clone)]
1786pub struct NodeCapabilities {
1787 pub cpu_cores: usize,
1788 pub memory_gb: usize,
1789 pub gpu_count: usize,
1790 pub disk_space_gb: usize,
1791 pub networkbandwidth_gbps: f64,
1792 pub specialized_units: Vec<SpecializedUnit>,
1793}
1794
1795impl Default for NodeCapabilities {
1796 fn default() -> Self {
1797 Self {
1798 cpu_cores: 4,
1799 memory_gb: 8,
1800 gpu_count: 0,
1801 disk_space_gb: 100,
1802 networkbandwidth_gbps: 1.0f64,
1803 specialized_units: Vec::new(),
1804 }
1805 }
1806}
1807
1808#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1810pub enum SpecializedUnit {
1811 TensorCore,
1812 QuantumProcessor,
1813 VectorUnit,
1814 CryptoAccelerator,
1815 NeuralProcessingUnit,
1816 Fpga,
1817 Asic,
1818 CustomAsic(u32),
1819}
1820
1821#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1822pub enum NodeStatus {
1823 Unknown,
1824 Healthy,
1825 Degraded,
1826 Unhealthy,
1827 Offline,
1828 Draining,
1829}
1830
1831#[derive(Debug, Clone)]
1832pub struct NodeMetadata {
1833 pub hostname: String,
1834 pub operating_system: String,
1835 pub kernel_version: String,
1836 pub container_runtime: Option<String>,
1837 pub labels: HashMap<String, String>,
1838}
1839
1840impl Default for NodeMetadata {
1841 fn default() -> Self {
1842 Self {
1843 hostname: "unknown".to_string(),
1844 operating_system: "unknown".to_string(),
1845 kernel_version: "unknown".to_string(),
1846 container_runtime: None,
1847 labels: HashMap::new(),
1848 }
1849 }
1850}
1851
1852#[derive(Debug, Clone)]
1853pub struct ClusterTopology {
1854 pub zones: BTreeMap<String, Zone>,
1855 pub network_topology: NetworkTopology,
1856}
1857
1858impl Default for ClusterTopology {
1859 fn default() -> Self {
1860 Self::new()
1861 }
1862}
1863
1864impl ClusterTopology {
1865 pub fn new() -> Self {
1866 Self {
1867 zones: BTreeMap::new(),
1868 network_topology: NetworkTopology::Flat,
1869 }
1870 }
1871
1872 pub fn update(&mut self, nodes: &[NodeInfo]) {
1873 self.zones.clear();
1875
1876 for node in nodes {
1877 let zone_name = self.determine_zone(&node.address);
1878 let zone = self.zones.entry(zone_name).or_default();
1879 zone.add_node(node.clone());
1880 }
1881 }
1882
1883 fn determine_zone(&self, address: &SocketAddr) -> String {
1884 format!(
1887 "zone_{}",
1888 address.ip().to_string().split('.').next().unwrap_or("0")
1889 )
1890 }
1891
1892 pub fn update_model(&mut self, nodes: &[NodeInfo]) {
1894 self.update(nodes);
1896
1897 }
1900}
1901
1902#[derive(Debug, Clone)]
1903pub struct Zone {
1904 pub nodes: Vec<NodeInfo>,
1905 pub capacity: ComputeCapacity,
1906}
1907
1908impl Default for Zone {
1909 fn default() -> Self {
1910 Self::new()
1911 }
1912}
1913
1914impl Zone {
1915 pub fn new() -> Self {
1916 Self {
1917 nodes: Vec::new(),
1918 capacity: ComputeCapacity::default(),
1919 }
1920 }
1921
1922 pub fn add_node(&mut self, node: NodeInfo) {
1923 self.capacity.cpu_cores += node.capabilities.cpu_cores;
1924 self.capacity.memory_gb += node.capabilities.memory_gb;
1925 self.capacity.gpu_count += node.capabilities.gpu_count;
1926 self.capacity.disk_space_gb += node.capabilities.disk_space_gb;
1927
1928 self.nodes.push(node);
1929 }
1930}
1931
1932#[derive(Debug, Clone)]
1933pub enum NetworkTopology {
1934 Flat,
1935 Hierarchical,
1936 Mesh,
1937 Ring,
1938}
1939
1940#[derive(Debug, Clone)]
1941pub struct NodeHealthStatus {
1942 pub status: NodeStatus,
1943 pub health_score: f64,
1944 pub failing_checks: Vec<HealthCheck>,
1945 pub last_checked: Instant,
1946}
1947
1948#[derive(Debug, Clone)]
1949pub enum HealthCheck {
1950 Ping,
1951 CpuLoad,
1952 MemoryUsage,
1953 DiskSpace,
1954 NetworkConnectivity,
1955}
1956
1957#[derive(Debug)]
1958pub struct HealthCheckResult {
1959 pub is_healthy: bool,
1960 pub impact_score: f64,
1961 pub details: String,
1962}
1963
1964#[derive(Debug, Clone)]
1965pub struct ClusterHealth {
1966 pub status: ClusterHealthStatus,
1967 pub healthy_nodes: usize,
1968 pub total_nodes: usize,
1969 pub health_percentage: f64,
1970 pub last_updated: Instant,
1971}
1972
1973#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1974pub enum ClusterHealthStatus {
1975 Healthy,
1976 Degraded,
1977 Unhealthy,
1978}
1979
1980#[derive(Debug, Clone, Default)]
1981pub struct ComputeCapacity {
1982 pub cpu_cores: usize,
1983 pub memory_gb: usize,
1984 pub gpu_count: usize,
1985 pub disk_space_gb: usize,
1986}
1987
1988#[derive(Debug, Clone)]
1989pub struct ResourceRequirements {
1990 pub cpu_cores: usize,
1991 pub memory_gb: usize,
1992 pub gpu_count: usize,
1993 pub disk_space_gb: usize,
1994 pub specialized_requirements: Vec<SpecializedRequirement>,
1995}
1996
1997#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1998pub struct SpecializedRequirement {
1999 pub unit_type: SpecializedUnit,
2000 pub count: usize,
2001}
2002
2003#[derive(Debug, Clone)]
2004pub struct ResourceAllocation {
2005 pub allocation_id: AllocationId,
2006 pub allocated_resources: ComputeCapacity,
2007 pub assigned_nodes: Vec<String>,
2008 pub created_at: Instant,
2009 pub expires_at: Option<Instant>,
2010}
2011
2012#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2013pub struct AllocationId(String);
2014
2015impl AllocationId {
2016 pub fn generate() -> Self {
2017 Self(format!(
2018 "alloc_{}",
2019 SystemTime::now()
2020 .duration_since(SystemTime::UNIX_EPOCH)
2021 .unwrap()
2022 .as_millis()
2023 ))
2024 }
2025}
2026
2027#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2028pub enum AllocationStrategy {
2029 FirstFit,
2030 BestFit,
2031 LoadBalanced,
2032}
2033
2034#[derive(Debug, Clone)]
2035pub struct DistributedTask {
2036 pub taskid: TaskId,
2037 pub task_type: TaskType,
2038 pub resource_requirements: ResourceRequirements,
2039 pub data_dependencies: Vec<DataDependency>,
2040 pub execution_parameters: TaskParameters,
2041 pub priority: TaskPriority,
2042}
2043
2044#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2045pub struct TaskId(String);
2046
2047impl TaskId {
2048 pub fn generate() -> Self {
2049 Self(format!(
2050 "task_{}",
2051 SystemTime::now()
2052 .duration_since(SystemTime::UNIX_EPOCH)
2053 .unwrap()
2054 .as_millis()
2055 ))
2056 }
2057}
2058
2059#[derive(Debug, Clone)]
2060pub enum TaskType {
2061 Computation,
2062 DataProcessing,
2063 MachineLearning,
2064 Simulation,
2065 Analysis,
2066}
2067
2068#[derive(Debug, Clone)]
2069pub struct DataDependency {
2070 pub data_id: String,
2071 pub access_type: DataAccessType,
2072 pub size_hint: Option<usize>,
2073}
2074
2075#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2076pub enum DataAccessType {
2077 Read,
2078 Write,
2079 ReadWrite,
2080}
2081
2082#[derive(Debug, Clone)]
2083pub struct TaskParameters {
2084 pub environment_variables: HashMap<String, String>,
2085 pub command_arguments: Vec<String>,
2086 pub timeout: Option<Duration>,
2087 pub retrypolicy: RetryPolicy,
2088}
2089
2090#[derive(Debug, Clone)]
2091pub struct RetryPolicy {
2092 pub max_attempts: usize,
2093 pub backoff_strategy: BackoffStrategy,
2094}
2095
2096#[derive(Debug, Clone)]
2097pub enum BackoffStrategy {
2098 Fixed(Duration),
2099 Linear(Duration),
2100 Exponential { base: Duration, multiplier: f64 },
2101}
2102
2103#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
2104pub enum TaskPriority {
2105 Low,
2106 Normal,
2107 High,
2108 Critical,
2109}
2110
2111#[derive(Debug, Clone)]
2112pub struct ExecutionPlan {
2113 pub taskid: TaskId,
2114 pub task: DistributedTask,
2115 pub node_allocation: ResourceAllocation,
2116 pub created_at: Instant,
2117 pub status: ExecutionStatus,
2118}
2119
2120#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2121pub enum ExecutionStatus {
2122 Pending,
2123 Scheduled,
2124 Running,
2125 Completed,
2126 Failed,
2127 Cancelled,
2128}
2129
2130#[derive(Debug, Clone)]
2131pub enum ClusterEvent {
2132 NodeDiscovered {
2133 nodeid: String,
2134 address: SocketAddr,
2135 timestamp: Instant,
2136 },
2137 NodeStatusChanged {
2138 nodeid: String,
2139 old_status: NodeStatus,
2140 new_status: NodeStatus,
2141 timestamp: Instant,
2142 },
2143 LeaderElected {
2144 nodeid: String,
2145 timestamp: Instant,
2146 },
2147 TaskScheduled {
2148 taskid: TaskId,
2149 nodeid: String,
2150 timestamp: Instant,
2151 },
2152 TaskCompleted {
2153 taskid: TaskId,
2154 nodeid: String,
2155 execution_time: Duration,
2156 timestamp: Instant,
2157 },
2158 ResourceAllocation {
2159 allocation_id: AllocationId,
2160 resources: ComputeCapacity,
2161 timestamp: Instant,
2162 },
2163}
2164
2165#[derive(Debug, Clone)]
2166pub struct ClusterStatistics {
2167 pub total_nodes: usize,
2168 pub healthy_nodes: usize,
2169 pub total_capacity: ComputeCapacity,
2170 pub available_capacity: ComputeCapacity,
2171 pub resource_utilization: ResourceUtilization,
2172}
2173
2174#[derive(Debug, Clone)]
2175pub struct ResourceUtilization {
2176 pub cpu_utilization: f64,
2177 pub memory_utilization: f64,
2178 pub gpu_utilization: f64,
2179}
2180
2181#[allow(dead_code)]
2183pub fn initialize_cluster_manager() -> CoreResult<()> {
2184 let cluster_manager = ClusterManager::global()?;
2185 cluster_manager.start()?;
2186 Ok(())
2187}
2188
2189#[cfg(test)]
2190mod tests {
2191 use super::*;
2192 use std::net::{IpAddr, Ipv4Addr};
2193
2194 #[test]
2195 fn test_cluster_manager_creation() {
2196 let config = ClusterConfiguration::default();
2197 let manager = ClusterManager::new(config).unwrap();
2198 }
2200
2201 #[test]
2202 fn test_node_registry() {
2203 let mut registry = NodeRegistry::new();
2204
2205 let node = NodeInfo {
2206 id: "test_node".to_string(),
2207 address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
2208 node_type: NodeType::Worker,
2209 capabilities: NodeCapabilities::default(),
2210 status: NodeStatus::Healthy,
2211 last_seen: Instant::now(),
2212 metadata: NodeMetadata::default(),
2213 };
2214
2215 let is_new = registry.register_node(node.clone()).unwrap();
2216 assert!(is_new);
2217
2218 let healthy_nodes = registry.get_healthy_nodes();
2219 assert_eq!(healthy_nodes.len(), 1);
2220 assert_eq!(healthy_nodes[0usize].id, "test_node");
2221 }
2222
2223 #[test]
2224 fn test_resource_allocator() {
2225 let mut allocator = ResourceAllocator::new();
2226
2227 allocator.available_resources = ComputeCapacity {
2229 cpu_cores: 8,
2230 memory_gb: 16,
2231 gpu_count: 1,
2232 disk_space_gb: 100,
2233 };
2234
2235 let requirements = ResourceRequirements {
2236 cpu_cores: 4,
2237 memory_gb: 8,
2238 gpu_count: 0,
2239 disk_space_gb: 50,
2240 specialized_requirements: Vec::new(),
2241 };
2242
2243 let allocation = allocator.allocate_resources(&requirements).unwrap();
2244 assert_eq!(allocation.allocated_resources.cpu_cores, 4);
2245 assert_eq!(allocation.allocated_resources.memory_gb, 8);
2246 }
2247
2248 #[test]
2249 fn test_healthmonitor() {
2250 let mut monitor = HealthMonitor::new().unwrap();
2251
2252 let node = NodeInfo {
2253 id: "test_node".to_string(),
2254 address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
2255 node_type: NodeType::Worker,
2256 capabilities: NodeCapabilities::default(),
2257 status: NodeStatus::Unknown,
2258 last_seen: Instant::now(),
2259 metadata: NodeMetadata::default(),
2260 };
2261
2262 let health_status = monitor.check_node_health(&node).unwrap();
2263 assert!(health_status.health_score >= 0.0 && health_status.health_score <= 100.0f64);
2264 }
2265
2266 #[test]
2267 fn test_cluster_topology() {
2268 let mut topology = ClusterTopology::new();
2269
2270 let nodes = vec![
2271 NodeInfo {
2272 id: "node1".to_string(),
2273 address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
2274 node_type: NodeType::Worker,
2275 capabilities: NodeCapabilities::default(),
2276 status: NodeStatus::Healthy,
2277 last_seen: Instant::now(),
2278 metadata: NodeMetadata::default(),
2279 },
2280 NodeInfo {
2281 id: "node2".to_string(),
2282 address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 8080),
2283 node_type: NodeType::Worker,
2284 capabilities: NodeCapabilities::default(),
2285 status: NodeStatus::Healthy,
2286 last_seen: Instant::now(),
2287 metadata: NodeMetadata::default(),
2288 },
2289 ];
2290
2291 topology.update_model(&nodes);
2292 assert_eq!(topology.zones.len(), 2); }
2294}