1use std::collections::VecDeque;
10use std::sync::{Arc, Mutex};
11use std::time::Duration;
12
13#[derive(Clone)]
15pub struct WorkloadPredictor {
16 window_size: usize,
17 history: Arc<Mutex<VecDeque<WorkloadSample>>>,
18}
19
20#[derive(Clone, Debug)]
21pub struct WorkloadSample {
22 pub timestamp: std::time::Instant,
23 pub queue_length: usize,
24 pub active_tasks: usize,
25 pub throughput: f64,
26}
27
28#[derive(Debug, Clone)]
29pub struct WorkloadPrediction {
30 pub predicted_queue_length: f64,
31 pub predicted_throughput: f64,
32 pub confidence: f64,
33 pub trend: Trend,
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum Trend {
38 Increasing,
39 Stable,
40 Decreasing,
41}
42
43impl WorkloadPredictor {
44 pub fn new(window_size: usize) -> Self {
45 Self {
46 window_size,
47 history: Arc::new(Mutex::new(VecDeque::with_capacity(window_size))),
48 }
49 }
50
51 pub fn record_sample(&self, sample: WorkloadSample) {
52 let mut history = self.history.lock().unwrap();
53 if history.len() >= self.window_size {
54 history.pop_front();
55 }
56 history.push_back(sample);
57 }
58
59 pub fn predict(&self) -> Option<WorkloadPrediction> {
60 let history = self.history.lock().unwrap();
61
62 if history.len() < 3 {
63 return None;
64 }
65
66 let queue_avg: f64 = history.iter()
68 .map(|s| s.queue_length as f64)
69 .sum::<f64>() / history.len() as f64;
70
71 let throughput_avg: f64 = history.iter()
72 .map(|s| s.throughput)
73 .sum::<f64>() / history.len() as f64;
74
75 let trend = self.detect_trend(&history);
77
78 let confidence = self.calculate_confidence(&history);
80
81 Some(WorkloadPrediction {
82 predicted_queue_length: queue_avg,
83 predicted_throughput: throughput_avg,
84 confidence,
85 trend,
86 })
87 }
88
89 fn detect_trend(&self, history: &VecDeque<WorkloadSample>) -> Trend {
90 if history.len() < 2 {
91 return Trend::Stable;
92 }
93
94 let recent_avg = history.iter()
95 .rev()
96 .take(history.len() / 2)
97 .map(|s| s.queue_length as f64)
98 .sum::<f64>() / (history.len() / 2) as f64;
99
100 let older_avg = history.iter()
101 .take(history.len() / 2)
102 .map(|s| s.queue_length as f64)
103 .sum::<f64>() / (history.len() / 2) as f64;
104
105 let threshold = 0.15; let change_ratio = (recent_avg - older_avg) / older_avg.max(1.0);
107
108 if change_ratio > threshold {
109 Trend::Increasing
110 } else if change_ratio < -threshold {
111 Trend::Decreasing
112 } else {
113 Trend::Stable
114 }
115 }
116
117 fn calculate_confidence(&self, history: &VecDeque<WorkloadSample>) -> f64 {
118 if history.len() < 2 {
119 return 0.0;
120 }
121
122 let queue_avg: f64 = history.iter()
124 .map(|s| s.queue_length as f64)
125 .sum::<f64>() / history.len() as f64;
126
127 let variance = history.iter()
128 .map(|s| {
129 let diff = s.queue_length as f64 - queue_avg;
130 diff * diff
131 })
132 .sum::<f64>() / history.len() as f64;
133
134 let std_dev = variance.sqrt();
135 let coefficient_of_variation = std_dev / queue_avg.max(1.0);
136
137 (1.0 - coefficient_of_variation.min(1.0)).max(0.0)
139 }
140}
141
142#[derive(Clone)]
144pub struct AnomalyDetector {
145 sensitivity: f64,
146 baseline: Arc<Mutex<Vec<f64>>>,
147}
148
149#[derive(Debug, Clone)]
150pub struct AnomalyReport {
151 pub is_anomaly: bool,
152 pub severity: f64, pub metric_name: String,
154 pub current_value: f64,
155 pub expected_range: (f64, f64),
156}
157
158impl AnomalyDetector {
159 pub fn new(sensitivity: f64) -> Self {
160 Self {
161 sensitivity,
162 baseline: Arc::new(Mutex::new(Vec::new())),
163 }
164 }
165
166 pub fn update_baseline(&self, value: f64) {
167 let mut baseline = self.baseline.lock().unwrap();
168 if baseline.len() >= 100 {
169 baseline.remove(0);
170 }
171 baseline.push(value);
172 }
173
174 pub fn detect(&self, metric_name: String, current_value: f64) -> AnomalyReport {
175 let baseline = self.baseline.lock().unwrap();
176
177 if baseline.len() < 10 {
178 return AnomalyReport {
179 is_anomaly: false,
180 severity: 0.0,
181 metric_name,
182 current_value,
183 expected_range: (0.0, 0.0),
184 };
185 }
186
187 let mean: f64 = baseline.iter().sum::<f64>() / baseline.len() as f64;
188 let variance: f64 = baseline.iter()
189 .map(|v| (v - mean).powi(2))
190 .sum::<f64>() / baseline.len() as f64;
191 let std_dev = variance.sqrt();
192
193 let z_score = (current_value - mean) / std_dev.max(0.001);
194 let threshold = 2.0 + (1.0 - self.sensitivity) * 2.0; let is_anomaly = z_score.abs() > threshold;
197 let severity = (z_score.abs() / (threshold * 2.0)).min(1.0);
198
199 let expected_range = (
200 mean - threshold * std_dev,
201 mean + threshold * std_dev,
202 );
203
204 AnomalyReport {
205 is_anomaly,
206 severity,
207 metric_name,
208 current_value,
209 expected_range,
210 }
211 }
212}
213
214#[derive(Clone)]
216pub struct PerformanceOptimizer {
217 #[allow(dead_code)]
218 learning_rate: f64,
219 optimal_params: Arc<Mutex<OptimalParameters>>,
220}
221
222#[derive(Clone, Debug)]
223struct OptimalParameters {
224 #[allow(dead_code)]
225 thread_count: usize,
226 #[allow(dead_code)]
227 queue_target: usize,
228 reward_history: Vec<f64>,
229}
230
231#[derive(Debug, Clone)]
232pub struct OptimizationSuggestion {
233 pub suggested_threads: usize,
234 pub suggested_queue_target: usize,
235 pub expected_improvement: f64,
236 pub confidence: f64,
237}
238
239impl PerformanceOptimizer {
240 pub fn new(learning_rate: f64) -> Self {
241 Self {
242 learning_rate,
243 optimal_params: Arc::new(Mutex::new(OptimalParameters {
244 thread_count: 4,
245 queue_target: 100,
246 reward_history: Vec::new(),
247 })),
248 }
249 }
250
251 pub fn record_performance(&self, throughput: f64, latency: Duration) {
252 let reward = throughput / latency.as_secs_f64().max(0.001);
253 let mut params = self.optimal_params.lock().unwrap();
254 params.reward_history.push(reward);
255 if params.reward_history.len() > 50 {
256 params.reward_history.remove(0);
257 }
258 }
259
260 pub fn suggest_optimization(&self, current_threads: usize, current_queue: usize) -> OptimizationSuggestion {
261 let params = self.optimal_params.lock().unwrap();
262
263 if params.reward_history.len() < 5 {
264 return OptimizationSuggestion {
265 suggested_threads: current_threads,
266 suggested_queue_target: current_queue,
267 expected_improvement: 0.0,
268 confidence: 0.0,
269 };
270 }
271
272 let recent_reward: f64 = params.reward_history.iter()
274 .rev()
275 .take(5)
276 .sum::<f64>() / 5.0;
277
278 let older_reward: f64 = if params.reward_history.len() >= 10 {
279 params.reward_history.iter()
280 .rev()
281 .skip(5)
282 .take(5)
283 .sum::<f64>() / 5.0
284 } else {
285 recent_reward
286 };
287
288 let improvement = recent_reward - older_reward;
289 let confidence = (params.reward_history.len() as f64 / 50.0).min(1.0);
290
291 let suggested_threads = if improvement < 0.0 {
292 current_threads.saturating_sub(1).max(2)
293 } else if improvement > 0.1 {
294 (current_threads + 1).min(16)
295 } else {
296 current_threads
297 };
298
299 OptimizationSuggestion {
300 suggested_threads,
301 suggested_queue_target: current_queue,
302 expected_improvement: improvement,
303 confidence,
304 }
305 }
306}
307
308impl std::fmt::Display for WorkloadPrediction {
309 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
310 write!(
311 f,
312 "Prediction[queue={:.1}, tps={:.1}, trend={:?}, conf={:.1}%]",
313 self.predicted_queue_length,
314 self.predicted_throughput,
315 self.trend,
316 self.confidence * 100.0
317 )
318 }
319}
320
321impl std::fmt::Display for AnomalyReport {
322 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
323 if self.is_anomaly {
324 write!(
325 f,
326 "🚨 ANOMALY: {} = {:.2} (expected {:.2}-{:.2}, severity={:.1}%)",
327 self.metric_name,
328 self.current_value,
329 self.expected_range.0,
330 self.expected_range.1,
331 self.severity * 100.0
332 )
333 } else {
334 write!(f, "✅ Normal: {} = {:.2}", self.metric_name, self.current_value)
335 }
336 }
337}