1use std::collections::HashMap;
7use std::sync::{Mutex, RwLock};
8use std::time::SystemTime;
9
10use sklears_core::error::{Result as SklResult, SklearsError};
11
12use super::tasks::{ExecutionTask, ResourceRequirements};
13
14#[derive(Debug, Clone)]
16pub struct ResourceUtilization {
17 pub cpu_percent: f64,
19 pub memory_percent: f64,
21 pub io_percent: f64,
23 pub network_percent: f64,
25 pub queue_percent: f64,
27}
28
29impl Default for ResourceUtilization {
30 fn default() -> Self {
31 Self {
32 cpu_percent: 0.0,
33 memory_percent: 0.0,
34 io_percent: 0.0,
35 network_percent: 0.0,
36 queue_percent: 0.0,
37 }
38 }
39}
40
41pub struct ResourceManager {
43 resources: RwLock<AvailableResources>,
45 allocations: Mutex<HashMap<String, ResourceAllocation>>,
47 policies: ResourcePolicies,
49 monitors: Vec<Box<dyn ResourceMonitor>>,
51}
52
53impl ResourceManager {
54 pub fn new() -> SklResult<Self> {
56 let resources = AvailableResources {
57 cpu_cores: num_cpus::get(),
58 total_memory: detect_total_memory(),
59 available_memory: detect_available_memory(),
60 io_bandwidth: detect_io_bandwidth(),
61 network_bandwidth: detect_network_bandwidth(),
62 available_disk_space: detect_available_disk_space(),
63 gpu_info: detect_gpu_info(),
64 };
65
66 Ok(Self {
67 resources: RwLock::new(resources),
68 allocations: Mutex::new(HashMap::new()),
69 policies: ResourcePolicies::default(),
70 monitors: Vec::new(),
71 })
72 }
73
74 pub fn check_availability(&self, requirements: &ResourceRequirements) -> SklResult<()> {
76 let resources = self.resources.read().map_err(|_| {
77 SklearsError::InvalidInput("Failed to acquire resource lock".to_string())
78 })?;
79
80 if requirements.cpu_cores > resources.cpu_cores as f64 {
82 return Err(SklearsError::InvalidInput(format!(
83 "Insufficient CPU cores: required {}, available {}",
84 requirements.cpu_cores, resources.cpu_cores
85 )));
86 }
87
88 if requirements.memory_bytes > resources.available_memory {
90 return Err(SklearsError::InvalidInput(format!(
91 "Insufficient memory: required {} bytes, available {} bytes",
92 requirements.memory_bytes, resources.available_memory
93 )));
94 }
95
96 if requirements.disk_bytes > 0 && requirements.disk_bytes > resources.available_disk_space {
98 return Err(SklearsError::InvalidInput(format!(
99 "Insufficient disk space: required {} bytes, available {} bytes",
100 requirements.disk_bytes, resources.available_disk_space
101 )));
102 }
103
104 if requirements.network_bandwidth > resources.network_bandwidth {
106 return Err(SklearsError::InvalidInput(
107 "Insufficient network bandwidth".to_string(),
108 ));
109 }
110
111 if requirements.gpu_memory_bytes > 0 {
113 let available_gpu_memory: u64 = resources
114 .gpu_info
115 .iter()
116 .map(|gpu| gpu.available_memory)
117 .sum();
118
119 if requirements.gpu_memory_bytes > available_gpu_memory {
120 return Err(SklearsError::InvalidInput(
121 "Insufficient GPU memory".to_string(),
122 ));
123 }
124 }
125
126 Ok(())
127 }
128
129 pub fn allocate_resources(&self, task: &ExecutionTask) -> SklResult<ResourceAllocation> {
131 self.check_availability(&task.requirements)?;
132
133 let allocation = match self.policies.cpu_policy {
134 CpuAllocationPolicy::Exclusive => self.allocate_exclusive_resources(task)?,
135 CpuAllocationPolicy::Shared => self.allocate_shared_resources(task)?,
136 CpuAllocationPolicy::NumaAware => self.allocate_numa_aware_resources(task)?,
137 CpuAllocationPolicy::PowerEfficient => self.allocate_power_efficient_resources(task)?,
138 };
139
140 let mut allocations = self.allocations.lock().map_err(|_| {
141 SklearsError::InvalidInput("Failed to acquire allocation lock".to_string())
142 })?;
143 allocations.insert(task.id.clone(), allocation.clone());
144
145 self.update_available_resources(&allocation, false)?;
147
148 Ok(allocation)
149 }
150
151 pub fn release_resources(&self, allocation: &ResourceAllocation) -> SklResult<()> {
153 let mut allocations = self.allocations.lock().map_err(|_| {
154 SklearsError::InvalidInput("Failed to acquire allocation lock".to_string())
155 })?;
156
157 if allocations.remove(&allocation.task_id).is_some() {
158 self.update_available_resources(allocation, true)?;
160 }
161
162 Ok(())
163 }
164
165 pub fn get_utilization(&self) -> SklResult<ResourceUtilization> {
167 let resources = self.resources.read().map_err(|_| {
168 SklearsError::InvalidInput("Failed to acquire resource lock".to_string())
169 })?;
170
171 let allocations = self.allocations.lock().map_err(|_| {
172 SklearsError::InvalidInput("Failed to acquire allocation lock".to_string())
173 })?;
174
175 let allocated_cores: usize = allocations
176 .values()
177 .map(|alloc| alloc.cpu_cores.len())
178 .sum();
179
180 let allocated_memory: u64 = allocations
181 .values()
182 .filter_map(|alloc| alloc.memory_range.map(|(_, size)| size))
183 .sum();
184
185 let allocated_io: u64 = allocations
186 .values()
187 .filter_map(|alloc| alloc.io_bandwidth)
188 .sum();
189
190 let cpu_percent = (allocated_cores as f64 / resources.cpu_cores as f64) * 100.0;
191 let memory_percent = (allocated_memory as f64 / resources.total_memory as f64) * 100.0;
192
193 let io_percent = if resources.io_bandwidth > 0 {
194 (allocated_io as f64 / resources.io_bandwidth as f64) * 100.0
195 } else {
196 0.0
197 };
198
199 let network_percent = 0.0; let max_concurrent_tasks = 1000;
207 let queue_percent = (allocations.len() as f64 / max_concurrent_tasks as f64) * 100.0;
208
209 Ok(ResourceUtilization {
210 cpu_percent,
211 memory_percent,
212 io_percent,
213 network_percent,
214 queue_percent,
215 })
216 }
217
218 pub fn add_monitor(&mut self, monitor: Box<dyn ResourceMonitor>) {
220 self.monitors.push(monitor);
221 }
222
223 pub fn collect_all_metrics(&self) -> Vec<ResourceMetrics> {
225 self.monitors
226 .iter()
227 .map(|monitor| monitor.collect_metrics())
228 .collect()
229 }
230
231 pub fn update_policies(&mut self, policies: ResourcePolicies) {
233 self.policies = policies;
234 }
235
236 pub fn get_policies(&self) -> ResourcePolicies {
238 self.policies.clone()
239 }
240
241 fn allocate_exclusive_resources(&self, task: &ExecutionTask) -> SklResult<ResourceAllocation> {
244 let resources = self.resources.read().map_err(|_| {
245 SklearsError::InvalidInput("Failed to acquire resource lock".to_string())
246 })?;
247
248 let required_cores = task.requirements.cpu_cores.ceil() as usize;
249 let cpu_cores: Vec<usize> = (0..required_cores.min(resources.cpu_cores)).collect();
250
251 Ok(ResourceAllocation {
252 task_id: task.id.clone(),
253 cpu_cores,
254 memory_range: Some((0, task.requirements.memory_bytes)),
255 io_bandwidth: Some(task.requirements.network_bandwidth),
256 gpu_id: if task.requirements.gpu_memory_bytes > 0 {
257 Some(0)
258 } else {
259 None
260 },
261 allocated_at: SystemTime::now(),
262 })
263 }
264
265 fn allocate_shared_resources(&self, task: &ExecutionTask) -> SklResult<ResourceAllocation> {
266 let resources = self.resources.read().map_err(|_| {
267 SklearsError::InvalidInput("Failed to acquire resource lock".to_string())
268 })?;
269
270 let required_cores = task.requirements.cpu_cores.ceil() as usize;
272 let cpu_cores: Vec<usize> = (0..required_cores.min(resources.cpu_cores)).collect();
273
274 Ok(ResourceAllocation {
275 task_id: task.id.clone(),
276 cpu_cores,
277 memory_range: Some((0, task.requirements.memory_bytes)),
278 io_bandwidth: Some(task.requirements.network_bandwidth),
279 gpu_id: if task.requirements.gpu_memory_bytes > 0 {
280 Some(0)
281 } else {
282 None
283 },
284 allocated_at: SystemTime::now(),
285 })
286 }
287
288 fn allocate_numa_aware_resources(&self, task: &ExecutionTask) -> SklResult<ResourceAllocation> {
289 self.allocate_shared_resources(task)
291 }
292
293 fn allocate_power_efficient_resources(
294 &self,
295 task: &ExecutionTask,
296 ) -> SklResult<ResourceAllocation> {
297 self.allocate_shared_resources(task)
299 }
300
301 fn update_available_resources(
302 &self,
303 allocation: &ResourceAllocation,
304 release: bool,
305 ) -> SklResult<()> {
306 let mut resources = self.resources.write().map_err(|_| {
307 SklearsError::InvalidInput("Failed to acquire resource lock".to_string())
308 })?;
309
310 if let Some((_, memory_size)) = allocation.memory_range {
311 if release {
312 resources.available_memory += memory_size;
313 } else {
314 resources.available_memory = resources.available_memory.saturating_sub(memory_size);
315 }
316 }
317
318 Ok(())
319 }
320}
321
322#[derive(Debug, Clone)]
324pub struct AvailableResources {
325 pub cpu_cores: usize,
327 pub total_memory: u64,
329 pub available_memory: u64,
331 pub io_bandwidth: u64,
333 pub network_bandwidth: u64,
335 pub available_disk_space: u64,
337 pub gpu_info: Vec<GpuInfo>,
339}
340
341#[derive(Debug, Clone)]
343pub struct GpuInfo {
344 pub id: usize,
346 pub name: String,
348 pub total_memory: u64,
350 pub available_memory: u64,
352 pub compute_capability: String,
354 pub vendor: GpuVendor,
356 pub utilization: f64,
358}
359
360#[derive(Debug, Clone, PartialEq)]
362pub enum GpuVendor {
363 Nvidia,
365 AMD,
367 Intel,
369 Apple,
371 Unknown,
373}
374
375#[derive(Debug, Clone)]
377pub struct ResourceAllocation {
378 pub task_id: String,
380 pub cpu_cores: Vec<usize>,
382 pub memory_range: Option<(u64, u64)>,
384 pub io_bandwidth: Option<u64>,
386 pub gpu_id: Option<usize>,
388 pub allocated_at: SystemTime,
390}
391
392#[derive(Debug, Clone)]
394pub struct ResourcePolicies {
395 pub cpu_policy: CpuAllocationPolicy,
397 pub memory_policy: MemoryAllocationPolicy,
399 pub io_policy: IoAllocationPolicy,
401 pub priority_allocation: bool,
403 pub fair_share: bool,
405}
406
407impl Default for ResourcePolicies {
408 fn default() -> Self {
409 Self {
410 cpu_policy: CpuAllocationPolicy::Shared,
411 memory_policy: MemoryAllocationPolicy::FirstFit,
412 io_policy: IoAllocationPolicy::FairQueuing,
413 priority_allocation: true,
414 fair_share: true,
415 }
416 }
417}
418
419#[derive(Debug, Clone, PartialEq)]
421pub enum CpuAllocationPolicy {
422 Exclusive,
424 Shared,
426 NumaAware,
428 PowerEfficient,
430}
431
432#[derive(Debug, Clone, PartialEq)]
434pub enum MemoryAllocationPolicy {
435 FirstFit,
437 BestFit,
439 WorstFit,
441 BuddySystem,
443 SlabAllocator,
445}
446
447#[derive(Debug, Clone, PartialEq)]
449pub enum IoAllocationPolicy {
450 FIFO,
452 FairQueuing,
454 WeightedFairQueuing,
456 DeadlineScheduling,
458}
459
460pub trait ResourceMonitor: Send + Sync {
462 fn name(&self) -> &str;
464
465 fn collect_metrics(&self) -> ResourceMetrics;
467
468 fn check_health(&self) -> ResourceHealth;
470
471 fn get_config(&self) -> MonitorConfig;
473
474 fn update_config(&mut self, config: MonitorConfig) -> SklResult<()>;
476}
477
478#[derive(Debug, Clone)]
480pub struct ResourceMetrics {
481 pub timestamp: SystemTime,
483 pub cpu: CpuMetrics,
485 pub memory: MemoryMetrics,
487 pub io: IoMetrics,
489 pub network: NetworkMetrics,
491 pub gpu: Vec<GpuMetrics>,
493}
494
495impl Default for ResourceMetrics {
496 fn default() -> Self {
497 Self {
498 timestamp: SystemTime::now(),
499 cpu: CpuMetrics::default(),
500 memory: MemoryMetrics::default(),
501 io: IoMetrics::default(),
502 network: NetworkMetrics::default(),
503 gpu: Vec::new(),
504 }
505 }
506}
507
508#[derive(Debug, Clone)]
510pub struct CpuMetrics {
511 pub utilization: f64,
513 pub per_core: Vec<f64>,
515 pub frequency: f64,
517 pub temperature: Option<f64>,
519 pub power_consumption: Option<f64>,
521}
522
523impl Default for CpuMetrics {
524 fn default() -> Self {
525 Self {
526 utilization: 0.0,
527 per_core: Vec::new(),
528 frequency: 0.0,
529 temperature: None,
530 power_consumption: None,
531 }
532 }
533}
534
535#[derive(Debug, Clone, Default)]
537pub struct MemoryMetrics {
538 pub used: u64,
540 pub available: u64,
542 pub cached: u64,
544 pub buffers: u64,
546 pub swap_used: u64,
548}
549
550#[derive(Debug, Clone)]
552pub struct IoMetrics {
553 pub read_ops: f64,
555 pub write_ops: f64,
557 pub read_bandwidth: f64,
559 pub write_bandwidth: f64,
561 pub io_wait: f64,
563}
564
565impl Default for IoMetrics {
566 fn default() -> Self {
567 Self {
568 read_ops: 0.0,
569 write_ops: 0.0,
570 read_bandwidth: 0.0,
571 write_bandwidth: 0.0,
572 io_wait: 0.0,
573 }
574 }
575}
576
577#[derive(Debug, Clone)]
579pub struct NetworkMetrics {
580 pub rx_bytes: f64,
582 pub tx_bytes: f64,
584 pub rx_packets: f64,
586 pub tx_packets: f64,
588 pub latency: Option<f64>,
590}
591
592impl Default for NetworkMetrics {
593 fn default() -> Self {
594 Self {
595 rx_bytes: 0.0,
596 tx_bytes: 0.0,
597 rx_packets: 0.0,
598 tx_packets: 0.0,
599 latency: None,
600 }
601 }
602}
603
604#[derive(Debug, Clone)]
606pub struct GpuMetrics {
607 pub gpu_id: usize,
609 pub utilization: f64,
611 pub memory_utilization: f64,
613 pub temperature: Option<f64>,
615 pub power_consumption: Option<f64>,
617}
618
619#[derive(Debug, Clone, PartialEq)]
621pub enum ResourceHealth {
622 Healthy,
624 Warning { reason: String },
626 Critical { reason: String },
628 Unavailable,
630}
631
632#[derive(Debug, Clone)]
634pub struct MonitorConfig {
635 pub interval: std::time::Duration,
637 pub detailed: bool,
639 pub thresholds: AlertThresholds,
641}
642
643impl Default for MonitorConfig {
644 fn default() -> Self {
645 Self {
646 interval: std::time::Duration::from_secs(5),
647 detailed: false,
648 thresholds: AlertThresholds::default(),
649 }
650 }
651}
652
653#[derive(Debug, Clone)]
655pub struct AlertThresholds {
656 pub cpu_threshold: f64,
658 pub memory_threshold: f64,
660 pub io_threshold: f64,
662 pub network_threshold: f64,
664 pub gpu_threshold: f64,
666}
667
668impl Default for AlertThresholds {
669 fn default() -> Self {
670 Self {
671 cpu_threshold: 90.0,
672 memory_threshold: 85.0,
673 io_threshold: 80.0,
674 network_threshold: 75.0,
675 gpu_threshold: 90.0,
676 }
677 }
678}
679
680pub struct SystemResourceMonitor {
682 config: MonitorConfig,
683}
684
685impl SystemResourceMonitor {
686 #[must_use]
688 pub fn new(config: MonitorConfig) -> Self {
689 Self { config }
690 }
691}
692
693impl ResourceMonitor for SystemResourceMonitor {
694 fn name(&self) -> &'static str {
695 "system_resource_monitor"
696 }
697
698 fn collect_metrics(&self) -> ResourceMetrics {
699 ResourceMetrics {
701 timestamp: SystemTime::now(),
702 cpu: collect_cpu_metrics(),
703 memory: collect_memory_metrics(),
704 io: collect_io_metrics(),
705 network: collect_network_metrics(),
706 gpu: collect_gpu_metrics(),
707 }
708 }
709
710 fn check_health(&self) -> ResourceHealth {
711 let metrics = self.collect_metrics();
712
713 if metrics.cpu.utilization > self.config.thresholds.cpu_threshold {
714 return ResourceHealth::Critical {
715 reason: format!("CPU utilization too high: {:.1}%", metrics.cpu.utilization),
716 };
717 }
718
719 if metrics.memory.used as f64 / (metrics.memory.used + metrics.memory.available) as f64
720 * 100.0
721 > self.config.thresholds.memory_threshold
722 {
723 return ResourceHealth::Warning {
724 reason: "Memory utilization high".to_string(),
725 };
726 }
727
728 ResourceHealth::Healthy
729 }
730
731 fn get_config(&self) -> MonitorConfig {
732 self.config.clone()
733 }
734
735 fn update_config(&mut self, config: MonitorConfig) -> SklResult<()> {
736 self.config = config;
737 Ok(())
738 }
739}
740
741fn detect_total_memory() -> u64 {
745 8 * 1024 * 1024 * 1024
747}
748
749fn detect_available_memory() -> u64 {
750 6 * 1024 * 1024 * 1024
752}
753
754fn detect_io_bandwidth() -> u64 {
755 1000 * 1024 * 1024
757}
758
759fn detect_network_bandwidth() -> u64 {
760 100 * 1024 * 1024
762}
763
764fn detect_gpu_info() -> Vec<GpuInfo> {
765 Vec::new()
767}
768
769fn detect_available_disk_space() -> u64 {
770 #[cfg(unix)]
773 {
774 use std::ffi::CString;
775 use std::mem;
776 use std::os::raw::{c_char, c_ulong};
777
778 #[repr(C)]
779 struct StatVfs {
780 f_bsize: c_ulong,
781 f_frsize: c_ulong,
782 f_blocks: c_ulong,
783 f_bfree: c_ulong,
784 f_bavail: c_ulong,
785 }
787
788 extern "C" {
789 fn statvfs(path: *const c_char, buf: *mut StatVfs) -> i32;
790 }
791
792 let path = CString::new(".").unwrap();
793 let mut stat: StatVfs = unsafe { mem::zeroed() };
794
795 if unsafe { statvfs(path.as_ptr(), &mut stat) } == 0 {
796 return stat.f_bavail.saturating_mul(stat.f_frsize);
798 }
799 }
800
801 100 * 1024 * 1024 * 1024
803}
804
805fn collect_cpu_metrics() -> CpuMetrics {
806 CpuMetrics::default()
807}
808
809fn collect_memory_metrics() -> MemoryMetrics {
810 MemoryMetrics::default()
811}
812
813fn collect_io_metrics() -> IoMetrics {
814 IoMetrics::default()
815}
816
817fn collect_network_metrics() -> NetworkMetrics {
818 NetworkMetrics::default()
819}
820
821fn collect_gpu_metrics() -> Vec<GpuMetrics> {
822 Vec::new()
823}
824
825#[allow(non_snake_case)]
826#[cfg(test)]
827mod tests {
828 use super::*;
829 use crate::execution::tasks::*;
830
831 #[test]
832 fn test_resource_manager_creation() {
833 let resource_manager = ResourceManager::new();
834 assert!(resource_manager.is_ok());
835 }
836
837 #[test]
838 fn test_resource_allocation() {
839 let resource_manager = ResourceManager::new().unwrap();
840 let task = create_test_task();
841
842 let allocation = resource_manager.allocate_resources(&task);
843 assert!(allocation.is_ok());
844
845 let alloc = allocation.unwrap();
846 assert_eq!(alloc.task_id, task.id);
847 assert!(!alloc.cpu_cores.is_empty());
848 }
849
850 #[test]
851 fn test_resource_release() {
852 let resource_manager = ResourceManager::new().unwrap();
853 let task = create_test_task();
854
855 let allocation = resource_manager.allocate_resources(&task).unwrap();
856 let result = resource_manager.release_resources(&allocation);
857 assert!(result.is_ok());
858 }
859
860 #[test]
861 fn test_system_resource_monitor() {
862 let config = MonitorConfig::default();
863 let monitor = SystemResourceMonitor::new(config);
864
865 assert_eq!(monitor.name(), "system_resource_monitor");
866
867 let metrics = monitor.collect_metrics();
868 assert!(metrics.timestamp <= SystemTime::now());
869
870 let health = monitor.check_health();
871 assert!(matches!(health, ResourceHealth::Healthy));
872 }
873
874 fn create_test_task() -> ExecutionTask {
875 ExecutionTask {
877 id: "test_task_1".to_string(),
878 task_type: TaskType::Computation,
879 metadata: TaskMetadata {
880 name: "Test Task".to_string(),
881 description: "A test task".to_string(),
882 priority: TaskPriority::Normal,
883 estimated_duration: Some(std::time::Duration::from_secs(10)),
884 deadline: None,
885 dependencies: Vec::new(),
886 tags: Vec::new(),
887 created_at: SystemTime::now(),
888 },
889 requirements: ResourceRequirements {
890 cpu_cores: 1.0,
891 memory_bytes: 1024 * 1024,
892 disk_bytes: 0,
893 network_bandwidth: 0,
894 gpu_memory_bytes: 0,
895 special_resources: Vec::new(),
896 },
897 input_data: None,
898 configuration: TaskConfiguration::default(),
899 }
900 }
901}