1use crate::performance_optimizer::{ProcessingStats, TuningDecision};
8use anyhow::Result;
9use nalgebra::{DMatrix, DVector, Vector2};
10use serde::{Deserialize, Serialize};
11use std::collections::VecDeque;
12use std::sync::atomic::Ordering;
13use std::time::{Duration, SystemTime};
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct PerformanceMetrics {
18 pub throughput: f64,
20 pub latency: f64,
22 pub cpu_utilization: f64,
24 pub memory_usage: f64,
26 pub batch_size: f64,
28 pub parallel_workers: f64,
30 pub error_rate: f64,
32 pub timestamp: SystemTime,
34}
35
36impl PerformanceMetrics {
37 pub fn from_stats(stats: &ProcessingStats, config_params: ConfigParams) -> Self {
39 let total_events = stats.total_events.load(Ordering::Relaxed) as f64;
40 let _total_time = stats.total_processing_time_ms.load(Ordering::Relaxed) as f64;
41 let error_count = stats.error_count.load(Ordering::Relaxed) as f64;
42
43 let throughput = stats.throughput_eps.load(Ordering::Relaxed) as f64;
44 let latency = stats.avg_processing_time_ms.load(Ordering::Relaxed) as f64;
45 let error_rate = if total_events > 0.0 {
46 error_count / total_events * 100.0
47 } else {
48 0.0
49 };
50
51 Self {
52 throughput,
53 latency,
54 cpu_utilization: config_params.cpu_utilization,
55 memory_usage: config_params.memory_usage,
56 batch_size: config_params.batch_size,
57 parallel_workers: config_params.parallel_workers,
58 error_rate,
59 timestamp: SystemTime::now(),
60 }
61 }
62
63 pub fn to_feature_vector(&self) -> DVector<f64> {
65 DVector::from_vec(vec![
66 self.batch_size,
67 self.parallel_workers,
68 self.cpu_utilization,
69 self.memory_usage,
70 self.error_rate,
71 ])
72 }
73
74 pub fn to_target_vector(&self) -> Vector2<f64> {
76 Vector2::new(self.throughput, self.latency)
77 }
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct ConfigParams {
83 pub batch_size: f64,
84 pub parallel_workers: f64,
85 pub cpu_utilization: f64,
86 pub memory_usage: f64,
87}
88
89#[derive(Debug, Clone)]
91pub struct LinearRegressionModel {
92 weights: DMatrix<f64>,
94 bias: Vector2<f64>,
96 training_samples: usize,
98 r_squared: f64,
100}
101
102impl LinearRegressionModel {
103 pub fn new(feature_count: usize) -> Self {
105 Self {
106 weights: DMatrix::zeros(2, feature_count), bias: Vector2::zeros(),
108 training_samples: 0,
109 r_squared: 0.0,
110 }
111 }
112
113 pub fn train(&mut self, features: &[DVector<f64>], targets: &[Vector2<f64>]) -> Result<()> {
115 if features.is_empty() || features.len() != targets.len() {
116 return Err(anyhow::anyhow!("Invalid training data"));
117 }
118
119 let n_samples = features.len();
120 let n_features = features[0].len();
121
122 let mut x = DMatrix::zeros(n_samples, n_features + 1); for (i, feature) in features.iter().enumerate() {
125 for j in 0..n_features {
126 x[(i, j)] = feature[j];
127 }
128 x[(i, n_features)] = 1.0; }
130
131 let mut y = DMatrix::zeros(n_samples, 2);
133 for (i, target) in targets.iter().enumerate() {
134 y[(i, 0)] = target[0]; y[(i, 1)] = target[1]; }
137
138 if let Some(xtx_inv) = (x.transpose() * &x).try_inverse() {
140 let coefficients = xtx_inv * x.transpose() * y;
141
142 for i in 0..2 {
144 for j in 0..n_features {
145 self.weights[(i, j)] = coefficients[(j, i)];
146 }
147 self.bias[i] = coefficients[(n_features, i)];
148 }
149
150 self.training_samples += n_samples;
151 self.calculate_r_squared(features, targets);
152 } else {
153 return Err(anyhow::anyhow!("Failed to solve normal equations"));
154 }
155
156 Ok(())
157 }
158
159 pub fn predict(&self, features: &DVector<f64>) -> Vector2<f64> {
161 if features.len() != self.weights.ncols() {
162 return Vector2::zeros();
163 }
164
165 &self.weights * features + self.bias
166 }
167
168 fn calculate_r_squared(&mut self, features: &[DVector<f64>], targets: &[Vector2<f64>]) {
170 if targets.is_empty() {
171 return;
172 }
173
174 let mut total_variance = 0.0;
175 let mut residual_variance = 0.0;
176
177 let mean_throughput: f64 = targets.iter().map(|t| t[0]).sum::<f64>() / targets.len() as f64;
179 let mean_latency: f64 = targets.iter().map(|t| t[1]).sum::<f64>() / targets.len() as f64;
180
181 for (features, target) in features.iter().zip(targets.iter()) {
182 let prediction = self.predict(features);
183
184 residual_variance +=
186 (target[0] - prediction[0]).powi(2) + (target[1] - prediction[1]).powi(2);
187
188 total_variance +=
190 (target[0] - mean_throughput).powi(2) + (target[1] - mean_latency).powi(2);
191 }
192
193 self.r_squared = if total_variance > 0.0 {
194 1.0 - (residual_variance / total_variance)
195 } else {
196 0.0
197 };
198 }
199
200 pub fn accuracy(&self) -> f64 {
202 self.r_squared
203 }
204
205 pub fn sample_count(&self) -> usize {
207 self.training_samples
208 }
209}
210
211pub struct PerformancePredictor {
213 model: LinearRegressionModel,
215 metrics_history: VecDeque<PerformanceMetrics>,
217 max_history_size: usize,
219 min_training_samples: usize,
221 last_training: Option<SystemTime>,
223 training_interval: Duration,
225}
226
227impl PerformancePredictor {
228 pub fn new() -> Self {
230 Self {
231 model: LinearRegressionModel::new(5), metrics_history: VecDeque::new(),
233 max_history_size: 1000,
234 min_training_samples: 10,
235 last_training: None,
236 training_interval: Duration::from_secs(30),
237 }
238 }
239
240 pub fn add_metrics(&mut self, metrics: PerformanceMetrics) {
242 self.metrics_history.push_back(metrics);
243
244 while self.metrics_history.len() > self.max_history_size {
246 self.metrics_history.pop_front();
247 }
248
249 if self.should_retrain() {
251 let _ = self.retrain();
252 }
253 }
254
255 fn should_retrain(&self) -> bool {
257 self.metrics_history.len() >= self.min_training_samples
258 && (self.last_training.is_none()
259 || SystemTime::now()
260 .duration_since(
261 self.last_training
262 .expect("last_training checked to be Some above"),
263 )
264 .unwrap_or(Duration::from_secs(0))
265 >= self.training_interval)
266 }
267
268 fn retrain(&mut self) -> Result<()> {
270 if self.metrics_history.len() < self.min_training_samples {
271 return Ok(());
272 }
273
274 let features: Vec<DVector<f64>> = self
275 .metrics_history
276 .iter()
277 .map(|m| m.to_feature_vector())
278 .collect();
279
280 let targets: Vec<Vector2<f64>> = self
281 .metrics_history
282 .iter()
283 .map(|m| m.to_target_vector())
284 .collect();
285
286 self.model.train(&features, &targets)?;
287 self.last_training = Some(SystemTime::now());
288
289 Ok(())
290 }
291
292 pub fn predict_optimal_config(&self, current_config: ConfigParams) -> Result<TuningDecision> {
294 if self.model.sample_count() < self.min_training_samples {
295 return Err(anyhow::anyhow!("Insufficient training data"));
296 }
297
298 let mut best_config = current_config.clone();
299 let mut best_score = f64::NEG_INFINITY;
300
301 for batch_multiplier in [0.8, 0.9, 1.0, 1.1, 1.2] {
303 for worker_delta in [-1.0, 0.0, 1.0] {
304 let test_config = ConfigParams {
305 batch_size: (current_config.batch_size * batch_multiplier).clamp(10.0, 5000.0),
306 parallel_workers: (current_config.parallel_workers + worker_delta)
307 .clamp(1.0, 32.0),
308 cpu_utilization: current_config.cpu_utilization,
309 memory_usage: current_config.memory_usage,
310 };
311
312 let features = DVector::from_vec(vec![
313 test_config.batch_size,
314 test_config.parallel_workers,
315 test_config.cpu_utilization,
316 test_config.memory_usage,
317 0.0, ]);
319
320 let prediction = self.model.predict(&features);
321 let throughput = prediction[0];
322 let latency = prediction[1];
323
324 let score = throughput - (latency * 0.1); if score > best_score {
328 best_score = score;
329 best_config = test_config;
330 }
331 }
332 }
333
334 let decision = if best_config.batch_size != current_config.batch_size {
336 TuningDecision {
337 parameter: "batch_size".to_string(),
338 old_value: current_config.batch_size,
339 new_value: best_config.batch_size,
340 reason: format!("ML prediction suggests optimal batch size for throughput improvement (model accuracy: {:.2})", self.model.accuracy()),
341 expected_improvement: (best_score / 1000.0).clamp(0.0, 1.0),
342 confidence: self.model.accuracy().max(0.5),
343 }
344 } else if best_config.parallel_workers != current_config.parallel_workers {
345 TuningDecision {
346 parameter: "parallel_workers".to_string(),
347 old_value: current_config.parallel_workers,
348 new_value: best_config.parallel_workers,
349 reason: format!("ML prediction suggests optimal worker count for latency reduction (model accuracy: {:.2})", self.model.accuracy()),
350 expected_improvement: (best_score / 1000.0).clamp(0.0, 1.0),
351 confidence: self.model.accuracy().max(0.5),
352 }
353 } else {
354 return Err(anyhow::anyhow!(
355 "No beneficial configuration changes predicted"
356 ));
357 };
358
359 Ok(decision)
360 }
361
362 pub fn model_stats(&self) -> ModelStats {
364 ModelStats {
365 accuracy: self.model.accuracy(),
366 training_samples: self.model.sample_count(),
367 history_size: self.metrics_history.len(),
368 last_training: self.last_training,
369 }
370 }
371}
372
373impl Default for PerformancePredictor {
374 fn default() -> Self {
375 Self::new()
376 }
377}
378
379#[derive(Debug, Clone, Serialize, Deserialize)]
381pub struct ModelStats {
382 pub accuracy: f64,
383 pub training_samples: usize,
384 pub history_size: usize,
385 pub last_training: Option<SystemTime>,
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391
392 #[test]
393 fn test_performance_metrics_creation() {
394 let stats = ProcessingStats::default();
395 let config = ConfigParams {
396 batch_size: 100.0,
397 parallel_workers: 4.0,
398 cpu_utilization: 75.0,
399 memory_usage: 512.0,
400 };
401
402 let metrics = PerformanceMetrics::from_stats(&stats, config);
403 assert_eq!(metrics.batch_size, 100.0);
404 assert_eq!(metrics.parallel_workers, 4.0);
405 }
406
407 #[test]
408 fn test_linear_regression_model() {
409 let mut model = LinearRegressionModel::new(2);
410
411 let features = vec![
413 DVector::from_vec(vec![1.0, 1.0]),
414 DVector::from_vec(vec![2.0, 4.0]),
415 DVector::from_vec(vec![3.0, 1.0]),
416 DVector::from_vec(vec![1.0, 3.0]),
417 DVector::from_vec(vec![4.0, 2.0]),
418 ];
419
420 let targets = vec![
421 Vector2::new(10.0, 5.0),
422 Vector2::new(25.0, 12.0),
423 Vector2::new(15.0, 8.0),
424 Vector2::new(18.0, 9.0),
425 Vector2::new(22.0, 11.0),
426 ];
427
428 assert!(model.train(&features, &targets).is_ok());
429 assert!(model.sample_count() > 0);
430
431 let prediction = model.predict(&DVector::from_vec(vec![4.0, 5.0]));
432 assert!(prediction[0] > 0.0);
433 assert!(prediction[1] > 0.0);
434 }
435
436 #[test]
437 fn test_performance_predictor() {
438 let mut predictor = PerformancePredictor::new();
439
440 let config = ConfigParams {
441 batch_size: 100.0,
442 parallel_workers: 4.0,
443 cpu_utilization: 75.0,
444 memory_usage: 512.0,
445 };
446
447 for _i in 0..15 {
449 let stats = ProcessingStats::default();
450 let metrics = PerformanceMetrics::from_stats(&stats, config.clone());
451 predictor.add_metrics(metrics);
452 }
453
454 let stats = predictor.model_stats();
455 assert!(stats.history_size > 0);
456 }
457
458 #[test]
459 fn test_feature_vector_conversion() {
460 let config = ConfigParams {
461 batch_size: 100.0,
462 parallel_workers: 4.0,
463 cpu_utilization: 75.0,
464 memory_usage: 512.0,
465 };
466
467 let stats = ProcessingStats::default();
468 let metrics = PerformanceMetrics::from_stats(&stats, config);
469 let features = metrics.to_feature_vector();
470
471 assert_eq!(features.len(), 5);
472 assert_eq!(features[0], 100.0); assert_eq!(features[1], 4.0); }
475}