1use crate::cross_module_performance_types::{
6 AllocationEvent, AllocationStrategy, AllocationType, AnomalyAlgorithm, AnomalyEvent,
7 AnomalyType, ModuleMetrics, OptimizationRecommendation, OptimizationType, PerformanceBaseline,
8 PerformanceImpact, PerformanceModel, PerformanceSnapshot, Priority, ResourceAllocation,
9 SeverityLevel, TrainingSample,
10};
11use anyhow::Result;
12use chrono::Utc;
13use std::collections::{HashMap, VecDeque};
14use std::sync::atomic::{AtomicU64, AtomicUsize};
15use std::sync::{Arc, RwLock};
16use std::time::Duration;
17use tracing::debug;
18
19#[derive(Debug, Clone)]
23pub struct ResourceTracker {
24 cpu_history: Arc<RwLock<VecDeque<f64>>>,
25 memory_history: Arc<RwLock<VecDeque<u64>>>,
26 last_update: Arc<RwLock<chrono::DateTime<chrono::Utc>>>,
27}
28
29impl ResourceTracker {
30 pub fn new() -> Self {
31 Self {
32 cpu_history: Arc::new(RwLock::new(VecDeque::new())),
33 memory_history: Arc::new(RwLock::new(VecDeque::new())),
34 last_update: Arc::new(RwLock::new(Utc::now())),
35 }
36 }
37}
38
39impl Default for ResourceTracker {
40 fn default() -> Self {
41 Self::new()
42 }
43}
44
45#[derive(Debug, Clone)]
49pub struct PredictionModel {
50 pub parameters: HashMap<String, f64>,
51 pub last_trained: chrono::DateTime<chrono::Utc>,
52}
53
54impl PredictionModel {
55 pub fn new() -> Self {
56 Self {
57 parameters: HashMap::new(),
58 last_trained: Utc::now(),
59 }
60 }
61}
62
63impl Default for PredictionModel {
64 fn default() -> Self {
65 Self::new()
66 }
67}
68
69#[derive(Debug, Clone)]
73pub struct ModulePerformanceMonitor {
74 pub module_name: String,
75 pub metrics: Arc<RwLock<ModuleMetrics>>,
76 pub resource_tracker: ResourceTracker,
77 pub history: Arc<RwLock<VecDeque<PerformanceSnapshot>>>,
78 pub(crate) prediction_model: PredictionModel,
79}
80
81impl ModulePerformanceMonitor {
82 pub fn new(module_name: String) -> Self {
83 Self {
84 module_name,
85 metrics: Arc::new(RwLock::new(ModuleMetrics {
86 cpu_usage: 0.0,
87 memory_usage: 0,
88 gpu_memory_usage: None,
89 network_io_bps: 0,
90 disk_io_bps: 0,
91 request_rate: 0.0,
92 avg_response_time: Duration::from_millis(0),
93 error_rate: 0.0,
94 cache_hit_rate: 0.0,
95 active_connections: 0,
96 queue_depth: 0,
97 })),
98 resource_tracker: ResourceTracker::new(),
99 history: Arc::new(RwLock::new(VecDeque::new())),
100 prediction_model: PredictionModel::new(),
101 }
102 }
103
104 pub async fn update_metrics(&self, new_metrics: ModuleMetrics) -> Result<()> {
105 {
106 let mut metrics = self.metrics.write().expect("lock poisoned");
107 *metrics = new_metrics.clone();
108 }
109 let snapshot = PerformanceSnapshot {
110 metrics: new_metrics,
111 timestamp: Utc::now(),
112 };
113 {
114 let mut history = self.history.write().expect("lock poisoned");
115 history.push_back(snapshot);
116 if history.len() > 1000 {
117 history.pop_front();
118 }
119 }
120 Ok(())
121 }
122
123 pub async fn get_current_metrics(&self) -> Result<ModuleMetrics> {
124 let metrics = self.metrics.read().expect("lock poisoned");
125 Ok(metrics.clone())
126 }
127}
128
129#[derive(Debug)]
133pub struct ResourceAllocator {
134 pub available_cores: AtomicUsize,
135 pub available_memory: AtomicU64,
136 pub available_gpu_memory: AtomicU64,
137 pub allocation_history: Arc<RwLock<VecDeque<AllocationEvent>>>,
138 pub current_allocations: Arc<RwLock<HashMap<String, ResourceAllocation>>>,
139 pub optimization_strategies: Vec<AllocationStrategy>,
140}
141
142impl ResourceAllocator {
143 pub fn new() -> Self {
144 Self {
145 available_cores: AtomicUsize::new(8),
146 available_memory: AtomicU64::new(16_000_000_000),
147 available_gpu_memory: AtomicU64::new(8_000_000_000),
148 allocation_history: Arc::new(RwLock::new(VecDeque::new())),
149 current_allocations: Arc::new(RwLock::new(HashMap::new())),
150 optimization_strategies: Vec::new(),
151 }
152 }
153
154 pub async fn reallocate_resources(
155 &self,
156 module_name: &str,
157 recommendation: &OptimizationRecommendation,
158 ) -> Result<()> {
159 debug!("Reallocating resources for module: {}", module_name);
160 let current_allocation = self.get_current_allocation(module_name).await?;
161 let new_allocation = self
162 .calculate_new_allocation(¤t_allocation, recommendation)
163 .await?;
164 self.apply_allocation(module_name, new_allocation).await?;
165 Ok(())
166 }
167
168 async fn get_current_allocation(&self, module_name: &str) -> Result<ResourceAllocation> {
169 let allocations = self.current_allocations.read().expect("lock poisoned");
170 if let Some(allocation) = allocations.get(module_name) {
171 Ok(allocation.clone())
172 } else {
173 Ok(ResourceAllocation {
174 cpu_cores: 2,
175 memory_bytes: 2_000_000_000,
176 gpu_memory_bytes: Some(1_000_000_000),
177 priority: 50,
178 allocated_at: Utc::now(),
179 expected_duration: None,
180 })
181 }
182 }
183
184 async fn calculate_new_allocation(
185 &self,
186 current: &ResourceAllocation,
187 recommendation: &OptimizationRecommendation,
188 ) -> Result<ResourceAllocation> {
189 let mut new_allocation = current.clone();
190 match recommendation.optimization_type {
191 OptimizationType::ResourceReallocation => match recommendation.priority {
192 Priority::Critical => {
193 new_allocation.cpu_cores = (current.cpu_cores * 2).min(8);
194 new_allocation.memory_bytes = (current.memory_bytes * 2).min(8_000_000_000);
195 }
196 Priority::High => {
197 new_allocation.cpu_cores = (current.cpu_cores + 2).min(6);
198 new_allocation.memory_bytes =
199 (current.memory_bytes + 1_000_000_000).min(6_000_000_000);
200 }
201 _ => {
202 new_allocation.cpu_cores = (current.cpu_cores + 1).min(4);
203 new_allocation.memory_bytes =
204 (current.memory_bytes + 500_000_000).min(4_000_000_000);
205 }
206 },
207 _ => {
208 new_allocation.priority = (current.priority + 10).min(100);
209 }
210 }
211 new_allocation.allocated_at = Utc::now();
212 Ok(new_allocation)
213 }
214
215 async fn apply_allocation(
216 &self,
217 module_name: &str,
218 allocation: ResourceAllocation,
219 ) -> Result<()> {
220 {
221 let mut allocations = self.current_allocations.write().expect("lock poisoned");
222 allocations.insert(module_name.to_string(), allocation.clone());
223 }
224 let event = AllocationEvent {
225 module_name: module_name.to_string(),
226 event_type: AllocationType::Rebalance,
227 allocation,
228 performance_impact: None,
229 timestamp: Utc::now(),
230 };
231 {
232 let mut history = self.allocation_history.write().expect("lock poisoned");
233 history.push_back(event);
234 if history.len() > 1000 {
235 history.pop_front();
236 }
237 }
238 Ok(())
239 }
240}
241
242impl Default for ResourceAllocator {
243 fn default() -> Self {
244 Self::new()
245 }
246}
247
248#[derive(Debug)]
252pub struct LearningEngine {
253 learning_rate: f64,
254 training_samples: Arc<RwLock<VecDeque<TrainingSample>>>,
255 update_frequency: Duration,
256 baselines: Arc<RwLock<HashMap<String, PerformanceBaseline>>>,
257}
258
259impl LearningEngine {
260 pub fn new() -> Self {
261 Self {
262 learning_rate: 0.01,
263 training_samples: Arc::new(RwLock::new(VecDeque::new())),
264 update_frequency: Duration::from_secs(3600),
265 baselines: Arc::new(RwLock::new(HashMap::new())),
266 }
267 }
268
269 pub async fn update_model(
270 &self,
271 recommendation: &OptimizationRecommendation,
272 actual_impact: &PerformanceImpact,
273 ) -> Result<()> {
274 let sample = TrainingSample {
275 features: vec![
276 recommendation.estimated_impact.overall_score,
277 recommendation.priority.clone() as u8 as f64,
278 ],
279 target: actual_impact.overall_score,
280 context: HashMap::new(),
281 weight: 1.0,
282 timestamp: Utc::now(),
283 };
284 {
285 let mut samples = self.training_samples.write().expect("lock poisoned");
286 samples.push_back(sample);
287 if samples.len() > 10000 {
288 samples.pop_front();
289 }
290 }
291 Ok(())
292 }
293}
294
295impl Default for LearningEngine {
296 fn default() -> Self {
297 Self::new()
298 }
299}
300
301#[derive(Debug)]
305pub struct AnomalyDetector {
306 algorithms: Vec<AnomalyAlgorithm>,
307 thresholds: HashMap<String, f64>,
308 anomaly_history: Arc<RwLock<VecDeque<AnomalyEvent>>>,
309 false_positive_rate: f64,
310}
311
312impl AnomalyDetector {
313 pub fn new() -> Self {
314 Self {
315 algorithms: vec![
316 AnomalyAlgorithm::StatisticalOutlier { z_threshold: 3.0 },
317 AnomalyAlgorithm::IsolationForest { contamination: 0.1 },
318 ],
319 thresholds: HashMap::new(),
320 anomaly_history: Arc::new(RwLock::new(VecDeque::new())),
321 false_positive_rate: 0.05,
322 }
323 }
324
325 pub async fn detect(
326 &self,
327 performance_data: &HashMap<String, ModuleMetrics>,
328 ) -> Result<Vec<AnomalyEvent>> {
329 let mut anomalies = Vec::new();
330 for (module_name, metrics) in performance_data {
331 if metrics.cpu_usage > 90.0
332 || metrics.error_rate > 5.0
333 || metrics.avg_response_time > Duration::from_millis(1000)
334 {
335 let anomaly = AnomalyEvent {
336 module_name: module_name.clone(),
337 anomaly_type: AnomalyType::PerformanceDegradation,
338 severity: if metrics.cpu_usage > 95.0 || metrics.error_rate > 10.0 {
339 SeverityLevel::Critical
340 } else {
341 SeverityLevel::High
342 },
343 score: calculate_anomaly_score(metrics),
344 affected_metrics: vec![
345 "cpu_usage".to_string(),
346 "error_rate".to_string(),
347 "response_time".to_string(),
348 ],
349 recommended_actions: vec![
350 "Increase resource allocation".to_string(),
351 "Investigate error sources".to_string(),
352 "Optimize critical paths".to_string(),
353 ],
354 detected_at: Utc::now(),
355 resolved_at: None,
356 };
357 anomalies.push(anomaly);
358 }
359 if metrics.memory_usage > 12_000_000_000 {
360 let anomaly = AnomalyEvent {
361 module_name: module_name.clone(),
362 anomaly_type: AnomalyType::MemoryLeak,
363 severity: SeverityLevel::High,
364 score: (metrics.memory_usage as f64 / 16_000_000_000.0) * 100.0,
365 affected_metrics: vec!["memory_usage".to_string()],
366 recommended_actions: vec![
367 "Investigate memory usage patterns".to_string(),
368 "Enable memory profiling".to_string(),
369 "Implement memory cleanup".to_string(),
370 ],
371 detected_at: Utc::now(),
372 resolved_at: None,
373 };
374 anomalies.push(anomaly);
375 }
376 }
377 {
378 let mut history = self.anomaly_history.write().expect("lock poisoned");
379 for anomaly in &anomalies {
380 history.push_back(anomaly.clone());
381 }
382 while history.len() > 1000 {
383 history.pop_front();
384 }
385 }
386 Ok(anomalies)
387 }
388}
389
390impl Default for AnomalyDetector {
391 fn default() -> Self {
392 Self::new()
393 }
394}
395
396#[derive(Debug)]
400pub struct PredictivePerformanceEngine {
401 pub models: Arc<RwLock<HashMap<String, PerformanceModel>>>,
402 pub prediction_cache:
403 Arc<RwLock<HashMap<String, crate::cross_module_performance_types::CachedPrediction>>>,
404 pub learning_engine: LearningEngine,
405 pub anomaly_detector: AnomalyDetector,
406}
407
408impl PredictivePerformanceEngine {
409 pub fn new() -> Self {
410 Self {
411 models: Arc::new(RwLock::new(HashMap::new())),
412 prediction_cache: Arc::new(RwLock::new(HashMap::new())),
413 learning_engine: LearningEngine::new(),
414 anomaly_detector: AnomalyDetector::new(),
415 }
416 }
417
418 pub async fn detect_anomalies(
419 &self,
420 performance_data: &HashMap<String, ModuleMetrics>,
421 ) -> Result<Vec<AnomalyEvent>> {
422 self.anomaly_detector.detect(performance_data).await
423 }
424
425 pub async fn update_models(
426 &self,
427 recommendation: &OptimizationRecommendation,
428 actual_impact: &PerformanceImpact,
429 ) -> Result<()> {
430 self.learning_engine
431 .update_model(recommendation, actual_impact)
432 .await
433 }
434}
435
436impl Default for PredictivePerformanceEngine {
437 fn default() -> Self {
438 Self::new()
439 }
440}
441
442pub fn calculate_anomaly_score(metrics: &ModuleMetrics) -> f64 {
446 let cpu_score = if metrics.cpu_usage > 80.0 {
447 metrics.cpu_usage
448 } else {
449 0.0
450 };
451 let error_score = metrics.error_rate * 10.0;
452 let latency_score = if metrics.avg_response_time > Duration::from_millis(500) {
453 metrics.avg_response_time.as_millis() as f64 / 10.0
454 } else {
455 0.0
456 };
457 (cpu_score + error_score + latency_score) / 3.0
458}
459
460pub fn calculate_percentage_change(old_value: f64, new_value: f64) -> f64 {
462 if old_value == 0.0 {
463 return 0.0;
464 }
465 ((new_value - old_value) / old_value) * 100.0
466}