avila_async/
ai.rs

1//! AI/ML Integration for predictive runtime optimization
2//!
3//! Provides machine learning capabilities for:
4//! - Workload prediction
5//! - Anomaly detection
6//! - Performance optimization
7//! - Resource forecasting
8
9use std::collections::VecDeque;
10use std::sync::{Arc, Mutex};
11use std::time::Duration;
12
13/// Simple moving average predictor
14#[derive(Clone)]
15pub struct WorkloadPredictor {
16    window_size: usize,
17    history: Arc<Mutex<VecDeque<WorkloadSample>>>,
18}
19
20#[derive(Clone, Debug)]
21pub struct WorkloadSample {
22    pub timestamp: std::time::Instant,
23    pub queue_length: usize,
24    pub active_tasks: usize,
25    pub throughput: f64,
26}
27
28#[derive(Debug, Clone)]
29pub struct WorkloadPrediction {
30    pub predicted_queue_length: f64,
31    pub predicted_throughput: f64,
32    pub confidence: f64,
33    pub trend: Trend,
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum Trend {
38    Increasing,
39    Stable,
40    Decreasing,
41}
42
43impl WorkloadPredictor {
44    pub fn new(window_size: usize) -> Self {
45        Self {
46            window_size,
47            history: Arc::new(Mutex::new(VecDeque::with_capacity(window_size))),
48        }
49    }
50
51    pub fn record_sample(&self, sample: WorkloadSample) {
52        let mut history = self.history.lock().unwrap();
53        if history.len() >= self.window_size {
54            history.pop_front();
55        }
56        history.push_back(sample);
57    }
58
59    pub fn predict(&self) -> Option<WorkloadPrediction> {
60        let history = self.history.lock().unwrap();
61
62        if history.len() < 3 {
63            return None;
64        }
65
66        // Simple moving average
67        let queue_avg: f64 = history.iter()
68            .map(|s| s.queue_length as f64)
69            .sum::<f64>() / history.len() as f64;
70
71        let throughput_avg: f64 = history.iter()
72            .map(|s| s.throughput)
73            .sum::<f64>() / history.len() as f64;
74
75        // Detect trend (simple linear regression slope)
76        let trend = self.detect_trend(&history);
77
78        // Confidence based on data stability
79        let confidence = self.calculate_confidence(&history);
80
81        Some(WorkloadPrediction {
82            predicted_queue_length: queue_avg,
83            predicted_throughput: throughput_avg,
84            confidence,
85            trend,
86        })
87    }
88
89    fn detect_trend(&self, history: &VecDeque<WorkloadSample>) -> Trend {
90        if history.len() < 2 {
91            return Trend::Stable;
92        }
93
94        let recent_avg = history.iter()
95            .rev()
96            .take(history.len() / 2)
97            .map(|s| s.queue_length as f64)
98            .sum::<f64>() / (history.len() / 2) as f64;
99
100        let older_avg = history.iter()
101            .take(history.len() / 2)
102            .map(|s| s.queue_length as f64)
103            .sum::<f64>() / (history.len() / 2) as f64;
104
105        let threshold = 0.15; // 15% change
106        let change_ratio = (recent_avg - older_avg) / older_avg.max(1.0);
107
108        if change_ratio > threshold {
109            Trend::Increasing
110        } else if change_ratio < -threshold {
111            Trend::Decreasing
112        } else {
113            Trend::Stable
114        }
115    }
116
117    fn calculate_confidence(&self, history: &VecDeque<WorkloadSample>) -> f64 {
118        if history.len() < 2 {
119            return 0.0;
120        }
121
122        // Calculate variance
123        let queue_avg: f64 = history.iter()
124            .map(|s| s.queue_length as f64)
125            .sum::<f64>() / history.len() as f64;
126
127        let variance = history.iter()
128            .map(|s| {
129                let diff = s.queue_length as f64 - queue_avg;
130                diff * diff
131            })
132            .sum::<f64>() / history.len() as f64;
133
134        let std_dev = variance.sqrt();
135        let coefficient_of_variation = std_dev / queue_avg.max(1.0);
136
137        // Lower CV = higher confidence
138        (1.0 - coefficient_of_variation.min(1.0)).max(0.0)
139    }
140}
141
142/// Anomaly detector using statistical methods
143#[derive(Clone)]
144pub struct AnomalyDetector {
145    sensitivity: f64,
146    baseline: Arc<Mutex<Vec<f64>>>,
147}
148
149#[derive(Debug, Clone)]
150pub struct AnomalyReport {
151    pub is_anomaly: bool,
152    pub severity: f64, // 0.0 to 1.0
153    pub metric_name: String,
154    pub current_value: f64,
155    pub expected_range: (f64, f64),
156}
157
158impl AnomalyDetector {
159    pub fn new(sensitivity: f64) -> Self {
160        Self {
161            sensitivity,
162            baseline: Arc::new(Mutex::new(Vec::new())),
163        }
164    }
165
166    pub fn update_baseline(&self, value: f64) {
167        let mut baseline = self.baseline.lock().unwrap();
168        if baseline.len() >= 100 {
169            baseline.remove(0);
170        }
171        baseline.push(value);
172    }
173
174    pub fn detect(&self, metric_name: String, current_value: f64) -> AnomalyReport {
175        let baseline = self.baseline.lock().unwrap();
176
177        if baseline.len() < 10 {
178            return AnomalyReport {
179                is_anomaly: false,
180                severity: 0.0,
181                metric_name,
182                current_value,
183                expected_range: (0.0, 0.0),
184            };
185        }
186
187        let mean: f64 = baseline.iter().sum::<f64>() / baseline.len() as f64;
188        let variance: f64 = baseline.iter()
189            .map(|v| (v - mean).powi(2))
190            .sum::<f64>() / baseline.len() as f64;
191        let std_dev = variance.sqrt();
192
193        let z_score = (current_value - mean) / std_dev.max(0.001);
194        let threshold = 2.0 + (1.0 - self.sensitivity) * 2.0; // 2-4 sigma
195
196        let is_anomaly = z_score.abs() > threshold;
197        let severity = (z_score.abs() / (threshold * 2.0)).min(1.0);
198
199        let expected_range = (
200            mean - threshold * std_dev,
201            mean + threshold * std_dev,
202        );
203
204        AnomalyReport {
205            is_anomaly,
206            severity,
207            metric_name,
208            current_value,
209            expected_range,
210        }
211    }
212}
213
214/// Performance optimizer using RL-like approach
215#[derive(Clone)]
216pub struct PerformanceOptimizer {
217    #[allow(dead_code)]
218    learning_rate: f64,
219    optimal_params: Arc<Mutex<OptimalParameters>>,
220}
221
222#[derive(Clone, Debug)]
223struct OptimalParameters {
224    #[allow(dead_code)]
225    thread_count: usize,
226    #[allow(dead_code)]
227    queue_target: usize,
228    reward_history: Vec<f64>,
229}
230
231#[derive(Debug, Clone)]
232pub struct OptimizationSuggestion {
233    pub suggested_threads: usize,
234    pub suggested_queue_target: usize,
235    pub expected_improvement: f64,
236    pub confidence: f64,
237}
238
239impl PerformanceOptimizer {
240    pub fn new(learning_rate: f64) -> Self {
241        Self {
242            learning_rate,
243            optimal_params: Arc::new(Mutex::new(OptimalParameters {
244                thread_count: 4,
245                queue_target: 100,
246                reward_history: Vec::new(),
247            })),
248        }
249    }
250
251    pub fn record_performance(&self, throughput: f64, latency: Duration) {
252        let reward = throughput / latency.as_secs_f64().max(0.001);
253        let mut params = self.optimal_params.lock().unwrap();
254        params.reward_history.push(reward);
255        if params.reward_history.len() > 50 {
256            params.reward_history.remove(0);
257        }
258    }
259
260    pub fn suggest_optimization(&self, current_threads: usize, current_queue: usize) -> OptimizationSuggestion {
261        let params = self.optimal_params.lock().unwrap();
262
263        if params.reward_history.len() < 5 {
264            return OptimizationSuggestion {
265                suggested_threads: current_threads,
266                suggested_queue_target: current_queue,
267                expected_improvement: 0.0,
268                confidence: 0.0,
269            };
270        }
271
272        // Simple gradient-based optimization
273        let recent_reward: f64 = params.reward_history.iter()
274            .rev()
275            .take(5)
276            .sum::<f64>() / 5.0;
277
278        let older_reward: f64 = if params.reward_history.len() >= 10 {
279            params.reward_history.iter()
280                .rev()
281                .skip(5)
282                .take(5)
283                .sum::<f64>() / 5.0
284        } else {
285            recent_reward
286        };
287
288        let improvement = recent_reward - older_reward;
289        let confidence = (params.reward_history.len() as f64 / 50.0).min(1.0);
290
291        let suggested_threads = if improvement < 0.0 {
292            current_threads.saturating_sub(1).max(2)
293        } else if improvement > 0.1 {
294            (current_threads + 1).min(16)
295        } else {
296            current_threads
297        };
298
299        OptimizationSuggestion {
300            suggested_threads,
301            suggested_queue_target: current_queue,
302            expected_improvement: improvement,
303            confidence,
304        }
305    }
306}
307
308impl std::fmt::Display for WorkloadPrediction {
309    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
310        write!(
311            f,
312            "Prediction[queue={:.1}, tps={:.1}, trend={:?}, conf={:.1}%]",
313            self.predicted_queue_length,
314            self.predicted_throughput,
315            self.trend,
316            self.confidence * 100.0
317        )
318    }
319}
320
321impl std::fmt::Display for AnomalyReport {
322    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
323        if self.is_anomaly {
324            write!(
325                f,
326                "🚨 ANOMALY: {} = {:.2} (expected {:.2}-{:.2}, severity={:.1}%)",
327                self.metric_name,
328                self.current_value,
329                self.expected_range.0,
330                self.expected_range.1,
331                self.severity * 100.0
332            )
333        } else {
334            write!(f, "✅ Normal: {} = {:.2}", self.metric_name, self.current_value)
335        }
336    }
337}