1use super::config::*;
8use super::optimizer::{Adaptation, AdaptationPriority, AdaptationType};
9
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, VecDeque};
12use std::sync::{Arc, Mutex};
13use std::time::{Duration, Instant};
14
15#[derive(Debug, Clone, Serialize)]
17pub struct ResourceUsage {
18 pub memory_usage_mb: usize,
20 pub cpu_usage_percent: f64,
22 pub gpu_usage_percent: Option<f64>,
24 pub network_io_mbps: f64,
26 pub disk_io_mbps: f64,
28 pub active_threads: usize,
30 #[serde(skip)]
32 pub timestamp: Instant,
33}
34
35#[derive(Debug, Clone)]
37pub struct ResourceBudget {
38 pub memory_budget: MemoryBudget,
40 pub cpu_budget: CpuBudget,
42 pub network_budget: NetworkBudget,
44 pub time_budget: TimeBudget,
46 pub enforcement_strategy: BudgetEnforcementStrategy,
48 pub flexibility: f64,
50}
51
52#[derive(Debug, Clone)]
54pub struct MemoryBudget {
55 pub max_allocation_mb: usize,
57 pub soft_limit_mb: usize,
59 pub cleanup_threshold: f64,
61 pub enable_compression: bool,
63 pub priority_levels: Vec<MemoryPriority>,
65}
66
67#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
69pub enum MemoryPriority {
70 Critical,
72 High,
74 Normal,
76 Low,
78 Temporary,
80}
81
82#[derive(Debug, Clone)]
84pub struct CpuBudget {
85 pub max_utilization: f64,
87 pub target_utilization: f64,
89 pub max_threads: usize,
91 pub thread_priority: ThreadPriorityConfig,
93 pub cpu_affinity: Option<Vec<usize>>,
95}
96
97#[derive(Debug, Clone)]
99pub struct ThreadPriorityConfig {
100 pub high_priority_threads: usize,
102 pub normal_priority_threads: usize,
104 pub background_threads: usize,
106 pub dynamic_priority: bool,
108}
109
110#[derive(Debug, Clone)]
112pub struct NetworkBudget {
113 pub max_bandwidth_mbps: f64,
115 pub priority_allocation: HashMap<String, f64>,
117 pub enable_traffic_shaping: bool,
119 pub qos_settings: QoSSettings,
121}
122
123#[derive(Debug, Clone)]
125pub struct QoSSettings {
126 pub max_latency_ms: u64,
128 pub jitter_tolerance_ms: u64,
130 pub packet_loss_tolerance: f64,
132 pub traffic_classes: Vec<TrafficClass>,
134}
135
136#[derive(Debug, Clone)]
138pub struct TrafficClass {
139 pub name: String,
141 pub priority: u8,
143 pub bandwidth_guarantee: f64,
145 pub max_bandwidth: f64,
147}
148
149#[derive(Debug, Clone)]
151pub struct TimeBudget {
152 pub max_batch_processing_time: Duration,
154 pub target_batch_processing_time: Duration,
156 pub operation_timeout: Duration,
158 pub deadline_enforcement: DeadlineEnforcement,
160}
161
162#[derive(Debug, Clone)]
164pub enum DeadlineEnforcement {
165 Strict,
167 Soft,
169 BestEffort,
171 Adaptive,
173}
174
175#[derive(Debug, Clone)]
177pub enum BudgetEnforcementStrategy {
178 Strict,
180 Throttling,
182 LoadShedding,
184 GracefulDegradation,
186 Adaptive,
188}
189
190pub struct ResourceManager {
192 config: ResourceConfig,
194 current_usage: Arc<Mutex<ResourceUsage>>,
196 usage_history: Arc<Mutex<VecDeque<ResourceUsage>>>,
198 budget: ResourceBudget,
200 allocations: Arc<Mutex<HashMap<String, ResourceAllocation>>>,
202 monitoring_handle: Option<std::thread::JoinHandle<()>>,
204 predictor: ResourcePredictor,
206 optimizer: ResourceOptimizer,
208 alert_system: ResourceAlertSystem,
210}
211
212#[derive(Debug, Clone)]
214pub struct ResourceAllocation {
215 pub component_name: String,
217 pub allocated_memory_mb: usize,
219 pub allocated_cpu_percent: f64,
221 pub allocated_bandwidth_mbps: f64,
223 pub priority: ResourcePriority,
225 pub allocation_time: Instant,
227 pub last_access: Instant,
229 pub usage_stats: ComponentUsageStats,
231}
232
233#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
235pub enum ResourcePriority {
236 Critical = 0,
238 High = 1,
240 Normal = 2,
242 Low = 3,
244 Temporary = 4,
246}
247
248#[derive(Debug, Clone)]
250pub struct ComponentUsageStats {
251 pub peak_memory_mb: usize,
253 pub avg_memory_mb: usize,
255 pub peak_cpu_percent: f64,
257 pub avg_cpu_percent: f64,
259 pub total_processing_time: Duration,
261 pub operation_count: u64,
263 pub efficiency_score: f64,
265}
266
267pub struct ResourcePredictor {
269 usage_patterns: VecDeque<ResourceUsage>,
271 prediction_horizon: usize,
273 prediction_accuracy: HashMap<String, f64>,
275 seasonal_patterns: HashMap<String, Vec<f64>>,
277 trend_analysis: ResourceTrendAnalysis,
279}
280
281#[derive(Debug, Clone)]
283pub struct ResourceTrendAnalysis {
284 pub memory_trend: TrendDirection,
286 pub cpu_trend: TrendDirection,
288 pub network_trend: TrendDirection,
290 pub trend_confidence: f64,
292 pub trend_stability: f64,
294}
295
296#[derive(Debug, Clone, PartialEq, Eq)]
298pub enum TrendDirection {
299 Increasing,
301 Decreasing,
303 Stable,
305 Oscillating,
307 Unknown,
309}
310
311pub struct ResourceOptimizer {
313 strategy: ResourceOptimizationStrategy,
315 optimization_history: VecDeque<OptimizationEvent>,
317 performance_impact: HashMap<String, f64>,
319 constraints: OptimizationConstraints,
321}
322
323#[derive(Debug, Clone)]
325pub enum ResourceOptimizationStrategy {
326 Conservative,
328 Aggressive,
330 Balanced,
332 PowerEfficient,
334 LatencyOptimized,
336 ThroughputOptimized,
338}
339
340#[derive(Debug, Clone)]
342pub struct OptimizationEvent {
343 pub timestamp: Instant,
345 pub optimization_type: String,
347 pub affected_resources: Vec<String>,
349 pub resource_deltas: HashMap<String, f64>,
351 pub performance_impact: f64,
353 pub success: bool,
355}
356
357#[derive(Debug, Clone)]
359pub struct OptimizationConstraints {
360 pub min_guarantees: HashMap<String, f64>,
362 pub max_limits: HashMap<String, f64>,
364 pub change_rate_limits: HashMap<String, f64>,
366 pub stability_requirements: StabilityRequirements,
368}
369
370#[derive(Debug, Clone)]
372pub struct StabilityRequirements {
373 pub min_stable_period: Duration,
375 pub max_change_frequency: f64,
377 pub prevent_oscillation: bool,
379 pub hysteresis_factor: f64,
381}
382
383pub struct ResourceAlertSystem {
385 thresholds: ResourceThresholds,
387 active_alerts: VecDeque<ResourceAlert>,
389 alert_history: VecDeque<ResourceAlert>,
391 alert_handlers: Vec<Box<dyn AlertHandler>>,
393}
394
395#[derive(Debug, Clone)]
397pub struct ResourceThresholds {
398 pub memory_thresholds: ThresholdSet,
400 pub cpu_thresholds: ThresholdSet,
402 pub network_thresholds: ThresholdSet,
404 pub response_time_thresholds: ThresholdSet,
406}
407
408#[derive(Debug, Clone)]
410pub struct ThresholdSet {
411 pub warning: f64,
413 pub critical: f64,
415 pub emergency: f64,
417 pub recovery: f64,
419}
420
421#[derive(Debug, Clone)]
423pub struct ResourceAlert {
424 pub id: String,
426 pub timestamp: Instant,
428 pub severity: AlertSeverity,
430 pub resource_type: String,
432 pub current_value: f64,
434 pub threshold_value: f64,
436 pub message: String,
438 pub suggested_actions: Vec<String>,
440 pub auto_resolution_attempts: u32,
442}
443
444#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
446pub enum AlertSeverity {
447 Info,
449 Warning,
451 Error,
453 Critical,
455 Emergency,
457}
458
459pub trait AlertHandler: Send + Sync {
461 fn handle_alert(&self, alert: &ResourceAlert) -> Result<(), String>;
463
464 fn priority(&self) -> u32;
466
467 fn can_handle(&self, alert: &ResourceAlert) -> bool;
469}
470
471impl ResourceManager {
472 pub fn new(config: &StreamingConfig) -> Result<Self, String> {
474 let resource_config = config.resource_config.clone();
475
476 let budget = ResourceBudget {
477 memory_budget: MemoryBudget {
478 max_allocation_mb: resource_config.max_memory_mb,
479 soft_limit_mb: (resource_config.max_memory_mb as f64 * 0.8) as usize,
480 cleanup_threshold: resource_config.cleanup_threshold,
481 enable_compression: true,
482 priority_levels: vec![
483 MemoryPriority::Critical,
484 MemoryPriority::High,
485 MemoryPriority::Normal,
486 MemoryPriority::Low,
487 ],
488 },
489 cpu_budget: CpuBudget {
490 max_utilization: resource_config.max_cpu_percent,
491 target_utilization: resource_config.max_cpu_percent * 0.8,
492 max_threads: num_cpus::get(),
493 thread_priority: ThreadPriorityConfig {
494 high_priority_threads: 2,
495 normal_priority_threads: num_cpus::get() - 2,
496 background_threads: 1,
497 dynamic_priority: true,
498 },
499 cpu_affinity: None,
500 },
501 network_budget: NetworkBudget {
502 max_bandwidth_mbps: 100.0, priority_allocation: HashMap::new(),
504 enable_traffic_shaping: false,
505 qos_settings: QoSSettings {
506 max_latency_ms: 100,
507 jitter_tolerance_ms: 10,
508 packet_loss_tolerance: 0.1,
509 traffic_classes: Vec::new(),
510 },
511 },
512 time_budget: TimeBudget {
513 max_batch_processing_time: Duration::from_secs(30),
514 target_batch_processing_time: Duration::from_secs(10),
515 operation_timeout: Duration::from_secs(60),
516 deadline_enforcement: DeadlineEnforcement::Soft,
517 },
518 enforcement_strategy: match resource_config.allocation_strategy {
519 ResourceAllocationStrategy::Static => BudgetEnforcementStrategy::Strict,
520 ResourceAllocationStrategy::Dynamic => BudgetEnforcementStrategy::Throttling,
521 ResourceAllocationStrategy::Adaptive => BudgetEnforcementStrategy::Adaptive,
522 _ => BudgetEnforcementStrategy::GracefulDegradation,
523 },
524 flexibility: 0.2,
525 };
526
527 let predictor = ResourcePredictor::new();
528 let optimizer = ResourceOptimizer::new(ResourceOptimizationStrategy::Balanced);
529 let alert_system = ResourceAlertSystem::new();
530
531 Ok(Self {
532 config: resource_config,
533 current_usage: Arc::new(Mutex::new(ResourceUsage::default())),
534 usage_history: Arc::new(Mutex::new(VecDeque::with_capacity(1000))),
535 budget,
536 allocations: Arc::new(Mutex::new(HashMap::new())),
537 monitoring_handle: None,
538 predictor,
539 optimizer,
540 alert_system,
541 })
542 }
543
544 pub fn start_monitoring(&mut self) -> Result<(), String> {
546 if self.monitoring_handle.is_some() {
547 return Ok(()); }
549
550 let current_usage = Arc::clone(&self.current_usage);
551 let usage_history = Arc::clone(&self.usage_history);
552 let monitoring_frequency = self.config.monitoring_frequency;
553
554 let handle = std::thread::spawn(move || {
555 loop {
556 if let Ok(usage) = Self::collect_resource_usage() {
557 {
559 let mut current = current_usage.lock().unwrap();
560 *current = usage.clone();
561 }
562
563 {
565 let mut history = usage_history.lock().unwrap();
566 if history.len() >= 1000 {
567 history.pop_front();
568 }
569 history.push_back(usage);
570 }
571 }
572
573 std::thread::sleep(monitoring_frequency);
574 }
575 });
576
577 self.monitoring_handle = Some(handle);
578 Ok(())
579 }
580
581 fn collect_resource_usage() -> Result<ResourceUsage, String> {
583 let mut usage = ResourceUsage {
585 timestamp: Instant::now(),
586 ..Default::default()
587 };
588
589 let info = sysinfo::System::new_all().used_memory();
591 usage.memory_usage_mb = (info / 1024 / 1024) as usize;
592
593 usage.cpu_usage_percent = 50.0; usage.network_io_mbps = 1.0; usage.disk_io_mbps = 5.0; usage.active_threads = std::thread::available_parallelism()
604 .map(|n| n.get())
605 .unwrap_or(1);
606
607 Ok(usage)
608 }
609
610 pub fn allocate_resources(
612 &mut self,
613 component_name: &str,
614 memory_mb: usize,
615 cpu_percent: f64,
616 priority: ResourcePriority,
617 ) -> Result<(), String> {
618 self.check_budget_constraints(memory_mb, cpu_percent)?;
620
621 let allocation = ResourceAllocation {
622 component_name: component_name.to_string(),
623 allocated_memory_mb: memory_mb,
624 allocated_cpu_percent: cpu_percent,
625 allocated_bandwidth_mbps: 0.0, priority,
627 allocation_time: Instant::now(),
628 last_access: Instant::now(),
629 usage_stats: ComponentUsageStats {
630 peak_memory_mb: 0,
631 avg_memory_mb: 0,
632 peak_cpu_percent: 0.0,
633 avg_cpu_percent: 0.0,
634 total_processing_time: Duration::ZERO,
635 operation_count: 0,
636 efficiency_score: 1.0,
637 },
638 };
639
640 let mut allocations = self.allocations.lock().unwrap();
641 allocations.insert(component_name.to_string(), allocation);
642
643 Ok(())
644 }
645
646 fn check_budget_constraints(&self, memory_mb: usize, cpu_percent: f64) -> Result<(), String> {
648 let allocations = self.allocations.lock().unwrap();
649
650 let total_memory: usize = allocations
652 .values()
653 .map(|a| a.allocated_memory_mb)
654 .sum::<usize>()
655 + memory_mb;
656
657 let total_cpu: f64 = allocations
658 .values()
659 .map(|a| a.allocated_cpu_percent)
660 .sum::<f64>()
661 + cpu_percent;
662
663 if total_memory > self.budget.memory_budget.max_allocation_mb {
665 return Err(format!(
666 "Memory allocation would exceed budget: {} MB > {} MB",
667 total_memory, self.budget.memory_budget.max_allocation_mb
668 ));
669 }
670
671 if total_cpu > self.budget.cpu_budget.max_utilization {
672 return Err(format!(
673 "CPU allocation would exceed budget: {:.2}% > {:.2}%",
674 total_cpu, self.budget.cpu_budget.max_utilization
675 ));
676 }
677
678 Ok(())
679 }
680
681 pub fn update_utilization(&mut self) -> Result<(), String> {
683 let current_usage = self.current_usage.lock().unwrap().clone();
684
685 let alerts = self.alert_system.check_thresholds(¤t_usage)?;
687 for alert in alerts {
688 self.alert_system.handle_alert(alert)?;
689 }
690
691 self.predictor.update(¤t_usage)?;
693
694 if self.config.enable_dynamic_allocation {
696 self.optimizer
697 .check_optimization_opportunities(¤t_usage, &self.allocations)?;
698 }
699
700 Ok(())
701 }
702
703 pub fn has_sufficient_resources_for_processing(&self) -> Result<bool, String> {
705 let current_usage = self.current_usage.lock().unwrap();
706
707 let memory_available = current_usage.memory_usage_mb
709 < (self.budget.memory_budget.soft_limit_mb as f64 * 0.9) as usize;
710
711 let cpu_available =
713 current_usage.cpu_usage_percent < self.budget.cpu_budget.target_utilization * 0.9;
714
715 Ok(memory_available && cpu_available)
716 }
717
718 pub fn compute_allocation_adaptation(&mut self) -> Result<Option<Adaptation<f32>>, String> {
720 let current_usage = self.current_usage.lock().unwrap();
721
722 if current_usage.memory_usage_mb > self.budget.memory_budget.soft_limit_mb {
724 let adaptation = Adaptation {
726 adaptation_type: AdaptationType::ResourceAllocation,
727 magnitude: -0.2, target_component: "memory_manager".to_string(),
729 parameters: std::collections::HashMap::new(),
730 priority: AdaptationPriority::High,
731 timestamp: Instant::now(),
732 };
733
734 return Ok(Some(adaptation));
735 }
736
737 if current_usage.cpu_usage_percent > self.budget.cpu_budget.target_utilization {
738 let adaptation = Adaptation {
740 adaptation_type: AdaptationType::ResourceAllocation,
741 magnitude: -0.15, target_component: "cpu_manager".to_string(),
743 parameters: std::collections::HashMap::new(),
744 priority: AdaptationPriority::High,
745 timestamp: Instant::now(),
746 };
747
748 return Ok(Some(adaptation));
749 }
750
751 Ok(None)
752 }
753
754 pub fn apply_allocation_adaptation(
756 &mut self,
757 adaptation: &Adaptation<f32>,
758 ) -> Result<(), String> {
759 if adaptation.adaptation_type == AdaptationType::ResourceAllocation {
760 match adaptation.target_component.as_str() {
761 "memory_manager" => {
762 let factor = 1.0 + adaptation.magnitude;
764 let mut allocations = self.allocations.lock().unwrap();
765
766 for allocation in allocations.values_mut() {
767 if allocation.priority >= ResourcePriority::Normal {
768 allocation.allocated_memory_mb =
769 ((allocation.allocated_memory_mb as f32) * factor) as usize;
770 }
771 }
772 }
773 "cpu_manager" => {
774 let factor = 1.0 + adaptation.magnitude;
776 let mut allocations = self.allocations.lock().unwrap();
777
778 for allocation in allocations.values_mut() {
779 if allocation.priority >= ResourcePriority::Normal {
780 allocation.allocated_cpu_percent *= factor as f64;
781 }
782 }
783 }
784 _ => {
785 }
787 }
788 }
789
790 Ok(())
791 }
792
793 pub fn current_usage(&self) -> Result<ResourceUsage, String> {
795 Ok(self.current_usage.lock().unwrap().clone())
796 }
797
798 pub fn get_usage_history(&self, count: usize) -> Vec<ResourceUsage> {
800 let history = self.usage_history.lock().unwrap();
801 history.iter().rev().take(count).cloned().collect()
802 }
803
804 pub fn get_diagnostics(&self) -> ResourceDiagnostics {
806 let current_usage = self.current_usage.lock().unwrap();
807 let allocations = self.allocations.lock().unwrap();
808
809 ResourceDiagnostics {
810 current_usage: current_usage.clone(),
811 total_allocations: allocations.len(),
812 memory_utilization: (current_usage.memory_usage_mb as f64
813 / self.budget.memory_budget.max_allocation_mb as f64)
814 * 100.0,
815 cpu_utilization: current_usage.cpu_usage_percent,
816 active_alerts: self.alert_system.active_alerts.len(),
817 budget_violations: 0, }
819 }
820}
821
822impl ResourcePredictor {
823 fn new() -> Self {
824 Self {
825 usage_patterns: VecDeque::with_capacity(1000),
826 prediction_horizon: 10,
827 prediction_accuracy: HashMap::new(),
828 seasonal_patterns: HashMap::new(),
829 trend_analysis: ResourceTrendAnalysis {
830 memory_trend: TrendDirection::Unknown,
831 cpu_trend: TrendDirection::Unknown,
832 network_trend: TrendDirection::Unknown,
833 trend_confidence: 0.0,
834 trend_stability: 0.0,
835 },
836 }
837 }
838
839 fn update(&mut self, usage: &ResourceUsage) -> Result<(), String> {
840 if self.usage_patterns.len() >= 1000 {
841 self.usage_patterns.pop_front();
842 }
843 self.usage_patterns.push_back(usage.clone());
844
845 if self.usage_patterns.len() >= 10 {
847 self.update_trend_analysis()?;
848 }
849
850 Ok(())
851 }
852
853 fn update_trend_analysis(&mut self) -> Result<(), String> {
854 let recent_patterns: Vec<_> = self.usage_patterns.iter().rev().take(10).collect();
855
856 let memory_values: Vec<f64> = recent_patterns
858 .iter()
859 .map(|u| u.memory_usage_mb as f64)
860 .collect();
861 self.trend_analysis.memory_trend = self.analyze_trend(&memory_values);
862
863 let cpu_values: Vec<f64> = recent_patterns
865 .iter()
866 .map(|u| u.cpu_usage_percent)
867 .collect();
868 self.trend_analysis.cpu_trend = self.analyze_trend(&cpu_values);
869
870 self.trend_analysis.trend_confidence =
872 self.calculate_trend_confidence(&memory_values, &cpu_values);
873
874 Ok(())
875 }
876
877 fn analyze_trend(&self, values: &[f64]) -> TrendDirection {
878 if values.len() < 3 {
879 return TrendDirection::Unknown;
880 }
881
882 let first_half: f64 =
883 values.iter().take(values.len() / 2).sum::<f64>() / (values.len() / 2) as f64;
884 let second_half: f64 = values.iter().skip(values.len() / 2).sum::<f64>()
885 / (values.len() - values.len() / 2) as f64;
886
887 let change_threshold = 0.05; let relative_change = (second_half - first_half) / first_half.max(1.0);
889
890 if relative_change > change_threshold {
891 TrendDirection::Increasing
892 } else if relative_change < -change_threshold {
893 TrendDirection::Decreasing
894 } else {
895 TrendDirection::Stable
896 }
897 }
898
899 fn calculate_trend_confidence(&self, memory_values: &[f64], cpu_values: &[f64]) -> f64 {
900 let memory_variance = self.calculate_variance(memory_values);
902 let cpu_variance = self.calculate_variance(cpu_values);
903
904 let memory_confidence = 1.0 / (1.0 + memory_variance / 100.0);
906 let cpu_confidence = 1.0 / (1.0 + cpu_variance / 100.0);
907
908 (memory_confidence + cpu_confidence) / 2.0
909 }
910
911 fn calculate_variance(&self, values: &[f64]) -> f64 {
912 if values.len() < 2 {
913 return 0.0;
914 }
915
916 let mean = values.iter().sum::<f64>() / values.len() as f64;
917 let variance = values.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / values.len() as f64;
918
919 variance
920 }
921}
922
923impl ResourceOptimizer {
924 fn new(strategy: ResourceOptimizationStrategy) -> Self {
925 Self {
926 strategy,
927 optimization_history: VecDeque::with_capacity(100),
928 performance_impact: HashMap::new(),
929 constraints: OptimizationConstraints {
930 min_guarantees: HashMap::new(),
931 max_limits: HashMap::new(),
932 change_rate_limits: HashMap::new(),
933 stability_requirements: StabilityRequirements {
934 min_stable_period: Duration::from_secs(60),
935 max_change_frequency: 0.1, prevent_oscillation: true,
937 hysteresis_factor: 0.1,
938 },
939 },
940 }
941 }
942
943 fn check_optimization_opportunities(
944 &mut self,
945 current_usage: &ResourceUsage,
946 allocations: &Arc<Mutex<HashMap<String, ResourceAllocation>>>,
947 ) -> Result<(), String> {
948 match self.strategy {
950 ResourceOptimizationStrategy::Balanced => {
951 self.check_balanced_optimization(current_usage, allocations)?;
952 }
953 ResourceOptimizationStrategy::PowerEfficient => {
954 self.check_power_optimization(current_usage, allocations)?;
955 }
956 ResourceOptimizationStrategy::LatencyOptimized => {
957 self.check_latency_optimization(current_usage, allocations)?;
958 }
959 _ => {
960 }
962 }
963
964 Ok(())
965 }
966
967 fn check_balanced_optimization(
968 &mut self,
969 current_usage: &ResourceUsage,
970 _allocations: &Arc<Mutex<HashMap<String, ResourceAllocation>>>,
971 ) -> Result<(), String> {
972 let memory_utilization = current_usage.memory_usage_mb as f64 / 1024.0; let cpu_utilization = current_usage.cpu_usage_percent;
975
976 if (memory_utilization > 80.0 && cpu_utilization < 40.0)
978 || (cpu_utilization > 80.0 && memory_utilization < 40.0)
979 {
980 let optimization_event = OptimizationEvent {
981 timestamp: Instant::now(),
982 optimization_type: "resource_rebalancing".to_string(),
983 affected_resources: vec!["memory".to_string(), "cpu".to_string()],
984 resource_deltas: HashMap::new(),
985 performance_impact: 0.05, success: false, };
988
989 if self.optimization_history.len() >= 100 {
990 self.optimization_history.pop_front();
991 }
992 self.optimization_history.push_back(optimization_event);
993 }
994
995 Ok(())
996 }
997
998 fn check_power_optimization(
999 &mut self,
1000 _current_usage: &ResourceUsage,
1001 _allocations: &Arc<Mutex<HashMap<String, ResourceAllocation>>>,
1002 ) -> Result<(), String> {
1003 Ok(())
1005 }
1006
1007 fn check_latency_optimization(
1008 &mut self,
1009 _current_usage: &ResourceUsage,
1010 _allocations: &Arc<Mutex<HashMap<String, ResourceAllocation>>>,
1011 ) -> Result<(), String> {
1012 Ok(())
1014 }
1015}
1016
1017impl ResourceAlertSystem {
1018 fn new() -> Self {
1019 Self {
1020 thresholds: ResourceThresholds {
1021 memory_thresholds: ThresholdSet {
1022 warning: 70.0,
1023 critical: 85.0,
1024 emergency: 95.0,
1025 recovery: 65.0,
1026 },
1027 cpu_thresholds: ThresholdSet {
1028 warning: 75.0,
1029 critical: 90.0,
1030 emergency: 98.0,
1031 recovery: 70.0,
1032 },
1033 network_thresholds: ThresholdSet {
1034 warning: 80.0,
1035 critical: 95.0,
1036 emergency: 99.0,
1037 recovery: 75.0,
1038 },
1039 response_time_thresholds: ThresholdSet {
1040 warning: 1000.0, critical: 5000.0, emergency: 10000.0, recovery: 500.0, },
1045 },
1046 active_alerts: VecDeque::new(),
1047 alert_history: VecDeque::with_capacity(1000),
1048 alert_handlers: Vec::new(),
1049 }
1050 }
1051
1052 fn check_thresholds(&mut self, usage: &ResourceUsage) -> Result<Vec<ResourceAlert>, String> {
1053 let mut alerts = Vec::new();
1054
1055 let memory_percent = (usage.memory_usage_mb as f64 / 1024.0) * 100.0; if let Some(alert) =
1058 self.check_threshold("memory", memory_percent, &self.thresholds.memory_thresholds)?
1059 {
1060 alerts.push(alert);
1061 }
1062
1063 if let Some(alert) = self.check_threshold(
1065 "cpu",
1066 usage.cpu_usage_percent,
1067 &self.thresholds.cpu_thresholds,
1068 )? {
1069 alerts.push(alert);
1070 }
1071
1072 Ok(alerts)
1073 }
1074
1075 fn check_threshold(
1076 &self,
1077 resource_type: &str,
1078 current_value: f64,
1079 thresholds: &ThresholdSet,
1080 ) -> Result<Option<ResourceAlert>, String> {
1081 let severity = if current_value >= thresholds.emergency {
1082 AlertSeverity::Emergency
1083 } else if current_value >= thresholds.critical {
1084 AlertSeverity::Critical
1085 } else if current_value >= thresholds.warning {
1086 AlertSeverity::Warning
1087 } else {
1088 return Ok(None);
1089 };
1090
1091 let threshold_value = match severity {
1092 AlertSeverity::Emergency => thresholds.emergency,
1093 AlertSeverity::Critical => thresholds.critical,
1094 AlertSeverity::Warning => thresholds.warning,
1095 _ => thresholds.warning,
1096 };
1097
1098 let suggested_actions = self.generate_suggested_actions(resource_type, &severity);
1099
1100 let alert = ResourceAlert {
1101 id: format!("{}_{}", resource_type, Instant::now().elapsed().as_nanos()),
1102 timestamp: Instant::now(),
1103 severity,
1104 resource_type: resource_type.to_string(),
1105 current_value,
1106 threshold_value,
1107 message: format!(
1108 "{} usage is {:.2}% (threshold: {:.2}%)",
1109 resource_type, current_value, threshold_value
1110 ),
1111 suggested_actions,
1112 auto_resolution_attempts: 0,
1113 };
1114
1115 Ok(Some(alert))
1116 }
1117
1118 fn generate_suggested_actions(
1119 &self,
1120 resource_type: &str,
1121 severity: &AlertSeverity,
1122 ) -> Vec<String> {
1123 match (resource_type, severity) {
1124 ("memory", AlertSeverity::Critical | AlertSeverity::Emergency) => vec![
1125 "Reduce buffer sizes".to_string(),
1126 "Clear caches".to_string(),
1127 "Reduce batch sizes".to_string(),
1128 ],
1129 ("memory", AlertSeverity::Warning) => vec![
1130 "Monitor memory usage trends".to_string(),
1131 "Consider reducing buffer sizes".to_string(),
1132 ],
1133 ("cpu", AlertSeverity::Critical | AlertSeverity::Emergency) => vec![
1134 "Reduce processing frequency".to_string(),
1135 "Lower thread count".to_string(),
1136 "Defer non-critical operations".to_string(),
1137 ],
1138 ("cpu", AlertSeverity::Warning) => vec![
1139 "Monitor CPU usage patterns".to_string(),
1140 "Consider load balancing".to_string(),
1141 ],
1142 _ => vec!["Monitor resource usage".to_string()],
1143 }
1144 }
1145
1146 fn handle_alert(&mut self, alert: ResourceAlert) -> Result<(), String> {
1147 self.active_alerts.push_back(alert.clone());
1149
1150 if self.alert_history.len() >= 1000 {
1152 self.alert_history.pop_front();
1153 }
1154 self.alert_history.push_back(alert.clone());
1155
1156 for handler in &self.alert_handlers {
1158 if handler.can_handle(&alert) {
1159 handler.handle_alert(&alert)?;
1160 }
1161 }
1162
1163 Ok(())
1164 }
1165}
1166
1167#[derive(Debug, Clone)]
1169pub struct ResourceDiagnostics {
1170 pub current_usage: ResourceUsage,
1171 pub total_allocations: usize,
1172 pub memory_utilization: f64,
1173 pub cpu_utilization: f64,
1174 pub active_alerts: usize,
1175 pub budget_violations: usize,
1176}
1177
1178impl Default for ResourceUsage {
1179 fn default() -> Self {
1180 Self {
1181 memory_usage_mb: 0,
1182 cpu_usage_percent: 0.0,
1183 gpu_usage_percent: None,
1184 network_io_mbps: 0.0,
1185 disk_io_mbps: 0.0,
1186 active_threads: 0,
1187 timestamp: Instant::now(),
1188 }
1189 }
1190}