Skip to main content

oxirs_stream/performance_optimizer/
ml.rs

1//! Machine Learning Components for Performance Optimization
2//!
3//! This module provides ML-based performance prediction and optimization capabilities.
4//! It includes predictive models for batch size optimization, resource allocation,
5//! and throughput forecasting to achieve optimal streaming performance.
6
7use crate::performance_optimizer::{ProcessingStats, TuningDecision};
8use anyhow::Result;
9use nalgebra::{DMatrix, DVector, Vector2};
10use serde::{Deserialize, Serialize};
11use std::collections::VecDeque;
12use std::sync::atomic::Ordering;
13use std::time::{Duration, SystemTime};
14
15/// Performance metrics for ML training and prediction
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct PerformanceMetrics {
18    /// Throughput in events per second
19    pub throughput: f64,
20    /// Average latency in milliseconds
21    pub latency: f64,
22    /// CPU utilization percentage
23    pub cpu_utilization: f64,
24    /// Memory usage in MB
25    pub memory_usage: f64,
26    /// Batch size used
27    pub batch_size: f64,
28    /// Number of parallel workers
29    pub parallel_workers: f64,
30    /// Error rate percentage
31    pub error_rate: f64,
32    /// Timestamp when metrics were collected
33    pub timestamp: SystemTime,
34}
35
36impl PerformanceMetrics {
37    /// Create metrics from processing stats
38    pub fn from_stats(stats: &ProcessingStats, config_params: ConfigParams) -> Self {
39        let total_events = stats.total_events.load(Ordering::Relaxed) as f64;
40        let _total_time = stats.total_processing_time_ms.load(Ordering::Relaxed) as f64;
41        let error_count = stats.error_count.load(Ordering::Relaxed) as f64;
42
43        let throughput = stats.throughput_eps.load(Ordering::Relaxed) as f64;
44        let latency = stats.avg_processing_time_ms.load(Ordering::Relaxed) as f64;
45        let error_rate = if total_events > 0.0 {
46            error_count / total_events * 100.0
47        } else {
48            0.0
49        };
50
51        Self {
52            throughput,
53            latency,
54            cpu_utilization: config_params.cpu_utilization,
55            memory_usage: config_params.memory_usage,
56            batch_size: config_params.batch_size,
57            parallel_workers: config_params.parallel_workers,
58            error_rate,
59            timestamp: SystemTime::now(),
60        }
61    }
62
63    /// Convert to feature vector for ML models
64    pub fn to_feature_vector(&self) -> DVector<f64> {
65        DVector::from_vec(vec![
66            self.batch_size,
67            self.parallel_workers,
68            self.cpu_utilization,
69            self.memory_usage,
70            self.error_rate,
71        ])
72    }
73
74    /// Convert to target vector (throughput, latency)
75    pub fn to_target_vector(&self) -> Vector2<f64> {
76        Vector2::new(self.throughput, self.latency)
77    }
78}
79
80/// Configuration parameters for performance optimization
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct ConfigParams {
83    pub batch_size: f64,
84    pub parallel_workers: f64,
85    pub cpu_utilization: f64,
86    pub memory_usage: f64,
87}
88
89/// Linear regression model for performance prediction
90#[derive(Debug, Clone)]
91pub struct LinearRegressionModel {
92    /// Model weights
93    weights: DMatrix<f64>,
94    /// Model bias
95    bias: Vector2<f64>,
96    /// Training history
97    training_samples: usize,
98    /// Model accuracy metrics
99    r_squared: f64,
100}
101
102impl LinearRegressionModel {
103    /// Create a new linear regression model
104    pub fn new(feature_count: usize) -> Self {
105        Self {
106            weights: DMatrix::zeros(2, feature_count), // 2 outputs: throughput, latency
107            bias: Vector2::zeros(),
108            training_samples: 0,
109            r_squared: 0.0,
110        }
111    }
112
113    /// Train the model with new data
114    pub fn train(&mut self, features: &[DVector<f64>], targets: &[Vector2<f64>]) -> Result<()> {
115        if features.is_empty() || features.len() != targets.len() {
116            return Err(anyhow::anyhow!("Invalid training data"));
117        }
118
119        let n_samples = features.len();
120        let n_features = features[0].len();
121
122        // Build feature matrix
123        let mut x = DMatrix::zeros(n_samples, n_features + 1); // +1 for bias
124        for (i, feature) in features.iter().enumerate() {
125            for j in 0..n_features {
126                x[(i, j)] = feature[j];
127            }
128            x[(i, n_features)] = 1.0; // Bias term
129        }
130
131        // Build target matrix
132        let mut y = DMatrix::zeros(n_samples, 2);
133        for (i, target) in targets.iter().enumerate() {
134            y[(i, 0)] = target[0]; // Throughput
135            y[(i, 1)] = target[1]; // Latency
136        }
137
138        // Solve normal equations: (X^T * X)^-1 * X^T * Y
139        if let Some(xtx_inv) = (x.transpose() * &x).try_inverse() {
140            let coefficients = xtx_inv * x.transpose() * y;
141
142            // Extract weights and bias
143            for i in 0..2 {
144                for j in 0..n_features {
145                    self.weights[(i, j)] = coefficients[(j, i)];
146                }
147                self.bias[i] = coefficients[(n_features, i)];
148            }
149
150            self.training_samples += n_samples;
151            self.calculate_r_squared(features, targets);
152        } else {
153            return Err(anyhow::anyhow!("Failed to solve normal equations"));
154        }
155
156        Ok(())
157    }
158
159    /// Predict performance metrics
160    pub fn predict(&self, features: &DVector<f64>) -> Vector2<f64> {
161        if features.len() != self.weights.ncols() {
162            return Vector2::zeros();
163        }
164
165        &self.weights * features + self.bias
166    }
167
168    /// Calculate R-squared for model accuracy
169    fn calculate_r_squared(&mut self, features: &[DVector<f64>], targets: &[Vector2<f64>]) {
170        if targets.is_empty() {
171            return;
172        }
173
174        let mut total_variance = 0.0;
175        let mut residual_variance = 0.0;
176
177        // Calculate mean of targets
178        let mean_throughput: f64 = targets.iter().map(|t| t[0]).sum::<f64>() / targets.len() as f64;
179        let mean_latency: f64 = targets.iter().map(|t| t[1]).sum::<f64>() / targets.len() as f64;
180
181        for (features, target) in features.iter().zip(targets.iter()) {
182            let prediction = self.predict(features);
183
184            // Residual sum of squares
185            residual_variance +=
186                (target[0] - prediction[0]).powi(2) + (target[1] - prediction[1]).powi(2);
187
188            // Total sum of squares
189            total_variance +=
190                (target[0] - mean_throughput).powi(2) + (target[1] - mean_latency).powi(2);
191        }
192
193        self.r_squared = if total_variance > 0.0 {
194            1.0 - (residual_variance / total_variance)
195        } else {
196            0.0
197        };
198    }
199
200    /// Get model accuracy
201    pub fn accuracy(&self) -> f64 {
202        self.r_squared
203    }
204
205    /// Get training sample count
206    pub fn sample_count(&self) -> usize {
207        self.training_samples
208    }
209}
210
211/// ML-based performance predictor
212pub struct PerformancePredictor {
213    /// Primary prediction model
214    model: LinearRegressionModel,
215    /// Training data history
216    metrics_history: VecDeque<PerformanceMetrics>,
217    /// Maximum history size
218    max_history_size: usize,
219    /// Minimum samples for training
220    min_training_samples: usize,
221    /// Last training time
222    last_training: Option<SystemTime>,
223    /// Training interval
224    training_interval: Duration,
225}
226
227impl PerformancePredictor {
228    /// Create a new performance predictor
229    pub fn new() -> Self {
230        Self {
231            model: LinearRegressionModel::new(5), // 5 features
232            metrics_history: VecDeque::new(),
233            max_history_size: 1000,
234            min_training_samples: 10,
235            last_training: None,
236            training_interval: Duration::from_secs(30),
237        }
238    }
239
240    /// Add performance metrics for training
241    pub fn add_metrics(&mut self, metrics: PerformanceMetrics) {
242        self.metrics_history.push_back(metrics);
243
244        // Maintain history size
245        while self.metrics_history.len() > self.max_history_size {
246            self.metrics_history.pop_front();
247        }
248
249        // Train model if conditions are met
250        if self.should_retrain() {
251            let _ = self.retrain();
252        }
253    }
254
255    /// Check if model should be retrained
256    fn should_retrain(&self) -> bool {
257        self.metrics_history.len() >= self.min_training_samples
258            && (self.last_training.is_none()
259                || SystemTime::now()
260                    .duration_since(
261                        self.last_training
262                            .expect("last_training checked to be Some above"),
263                    )
264                    .unwrap_or(Duration::from_secs(0))
265                    >= self.training_interval)
266    }
267
268    /// Retrain the model with latest data
269    fn retrain(&mut self) -> Result<()> {
270        if self.metrics_history.len() < self.min_training_samples {
271            return Ok(());
272        }
273
274        let features: Vec<DVector<f64>> = self
275            .metrics_history
276            .iter()
277            .map(|m| m.to_feature_vector())
278            .collect();
279
280        let targets: Vec<Vector2<f64>> = self
281            .metrics_history
282            .iter()
283            .map(|m| m.to_target_vector())
284            .collect();
285
286        self.model.train(&features, &targets)?;
287        self.last_training = Some(SystemTime::now());
288
289        Ok(())
290    }
291
292    /// Predict optimal configuration
293    pub fn predict_optimal_config(&self, current_config: ConfigParams) -> Result<TuningDecision> {
294        if self.model.sample_count() < self.min_training_samples {
295            return Err(anyhow::anyhow!("Insufficient training data"));
296        }
297
298        let mut best_config = current_config.clone();
299        let mut best_score = f64::NEG_INFINITY;
300
301        // Test different configurations
302        for batch_multiplier in [0.8, 0.9, 1.0, 1.1, 1.2] {
303            for worker_delta in [-1.0, 0.0, 1.0] {
304                let test_config = ConfigParams {
305                    batch_size: (current_config.batch_size * batch_multiplier).clamp(10.0, 5000.0),
306                    parallel_workers: (current_config.parallel_workers + worker_delta)
307                        .clamp(1.0, 32.0),
308                    cpu_utilization: current_config.cpu_utilization,
309                    memory_usage: current_config.memory_usage,
310                };
311
312                let features = DVector::from_vec(vec![
313                    test_config.batch_size,
314                    test_config.parallel_workers,
315                    test_config.cpu_utilization,
316                    test_config.memory_usage,
317                    0.0, // Assume no errors for prediction
318                ]);
319
320                let prediction = self.model.predict(&features);
321                let throughput = prediction[0];
322                let latency = prediction[1];
323
324                // Score function: maximize throughput, minimize latency
325                let score = throughput - (latency * 0.1); // Weight latency penalty
326
327                if score > best_score {
328                    best_score = score;
329                    best_config = test_config;
330                }
331            }
332        }
333
334        // Generate tuning decision
335        let decision = if best_config.batch_size != current_config.batch_size {
336            TuningDecision {
337                parameter: "batch_size".to_string(),
338                old_value: current_config.batch_size,
339                new_value: best_config.batch_size,
340                reason: format!("ML prediction suggests optimal batch size for throughput improvement (model accuracy: {:.2})", self.model.accuracy()),
341                expected_improvement: (best_score / 1000.0).clamp(0.0, 1.0),
342                confidence: self.model.accuracy().max(0.5),
343            }
344        } else if best_config.parallel_workers != current_config.parallel_workers {
345            TuningDecision {
346                parameter: "parallel_workers".to_string(),
347                old_value: current_config.parallel_workers,
348                new_value: best_config.parallel_workers,
349                reason: format!("ML prediction suggests optimal worker count for latency reduction (model accuracy: {:.2})", self.model.accuracy()),
350                expected_improvement: (best_score / 1000.0).clamp(0.0, 1.0),
351                confidence: self.model.accuracy().max(0.5),
352            }
353        } else {
354            return Err(anyhow::anyhow!(
355                "No beneficial configuration changes predicted"
356            ));
357        };
358
359        Ok(decision)
360    }
361
362    /// Get prediction model statistics
363    pub fn model_stats(&self) -> ModelStats {
364        ModelStats {
365            accuracy: self.model.accuracy(),
366            training_samples: self.model.sample_count(),
367            history_size: self.metrics_history.len(),
368            last_training: self.last_training,
369        }
370    }
371}
372
373impl Default for PerformancePredictor {
374    fn default() -> Self {
375        Self::new()
376    }
377}
378
379/// Model statistics for monitoring
380#[derive(Debug, Clone, Serialize, Deserialize)]
381pub struct ModelStats {
382    pub accuracy: f64,
383    pub training_samples: usize,
384    pub history_size: usize,
385    pub last_training: Option<SystemTime>,
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391
392    #[test]
393    fn test_performance_metrics_creation() {
394        let stats = ProcessingStats::default();
395        let config = ConfigParams {
396            batch_size: 100.0,
397            parallel_workers: 4.0,
398            cpu_utilization: 75.0,
399            memory_usage: 512.0,
400        };
401
402        let metrics = PerformanceMetrics::from_stats(&stats, config);
403        assert_eq!(metrics.batch_size, 100.0);
404        assert_eq!(metrics.parallel_workers, 4.0);
405    }
406
407    #[test]
408    fn test_linear_regression_model() {
409        let mut model = LinearRegressionModel::new(2);
410
411        // Use more diverse data points to ensure a well-conditioned matrix
412        let features = vec![
413            DVector::from_vec(vec![1.0, 1.0]),
414            DVector::from_vec(vec![2.0, 4.0]),
415            DVector::from_vec(vec![3.0, 1.0]),
416            DVector::from_vec(vec![1.0, 3.0]),
417            DVector::from_vec(vec![4.0, 2.0]),
418        ];
419
420        let targets = vec![
421            Vector2::new(10.0, 5.0),
422            Vector2::new(25.0, 12.0),
423            Vector2::new(15.0, 8.0),
424            Vector2::new(18.0, 9.0),
425            Vector2::new(22.0, 11.0),
426        ];
427
428        assert!(model.train(&features, &targets).is_ok());
429        assert!(model.sample_count() > 0);
430
431        let prediction = model.predict(&DVector::from_vec(vec![4.0, 5.0]));
432        assert!(prediction[0] > 0.0);
433        assert!(prediction[1] > 0.0);
434    }
435
436    #[test]
437    fn test_performance_predictor() {
438        let mut predictor = PerformancePredictor::new();
439
440        let config = ConfigParams {
441            batch_size: 100.0,
442            parallel_workers: 4.0,
443            cpu_utilization: 75.0,
444            memory_usage: 512.0,
445        };
446
447        // Add training data
448        for _i in 0..15 {
449            let stats = ProcessingStats::default();
450            let metrics = PerformanceMetrics::from_stats(&stats, config.clone());
451            predictor.add_metrics(metrics);
452        }
453
454        let stats = predictor.model_stats();
455        assert!(stats.history_size > 0);
456    }
457
458    #[test]
459    fn test_feature_vector_conversion() {
460        let config = ConfigParams {
461            batch_size: 100.0,
462            parallel_workers: 4.0,
463            cpu_utilization: 75.0,
464            memory_usage: 512.0,
465        };
466
467        let stats = ProcessingStats::default();
468        let metrics = PerformanceMetrics::from_stats(&stats, config);
469        let features = metrics.to_feature_vector();
470
471        assert_eq!(features.len(), 5);
472        assert_eq!(features[0], 100.0); // batch_size
473        assert_eq!(features[1], 4.0); // parallel_workers
474    }
475}