1use crate::multi_tier::CacheKey;
11use std::collections::{HashMap, VecDeque};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::sync::RwLock;
15
16#[derive(Debug, Clone)]
18pub struct LatencyStats {
19 samples: VecDeque<u64>,
21 max_samples: usize,
23}
24
25impl LatencyStats {
26 pub fn new(max_samples: usize) -> Self {
28 Self {
29 samples: VecDeque::with_capacity(max_samples),
30 max_samples,
31 }
32 }
33
34 pub fn record(&mut self, latency_us: u64) {
36 if self.samples.len() >= self.max_samples {
37 self.samples.pop_front();
38 }
39 self.samples.push_back(latency_us);
40 }
41
42 pub fn percentile(&self, p: f64) -> Option<u64> {
44 if self.samples.is_empty() {
45 return None;
46 }
47
48 let mut sorted: Vec<u64> = self.samples.iter().copied().collect();
49 sorted.sort_unstable();
50
51 let index = ((sorted.len() as f64 * p / 100.0).ceil() as usize).saturating_sub(1);
52 sorted.get(index).copied()
53 }
54
55 pub fn p50(&self) -> Option<u64> {
57 self.percentile(50.0)
58 }
59
60 pub fn p95(&self) -> Option<u64> {
62 self.percentile(95.0)
63 }
64
65 pub fn p99(&self) -> Option<u64> {
67 self.percentile(99.0)
68 }
69
70 pub fn min(&self) -> Option<u64> {
72 self.samples.iter().min().copied()
73 }
74
75 pub fn max(&self) -> Option<u64> {
77 self.samples.iter().max().copied()
78 }
79
80 pub fn avg(&self) -> Option<f64> {
82 if self.samples.is_empty() {
83 None
84 } else {
85 let sum: u64 = self.samples.iter().sum();
86 Some(sum as f64 / self.samples.len() as f64)
87 }
88 }
89}
90
91#[derive(Debug, Clone)]
93pub struct ThroughputTracker {
94 windows: VecDeque<(Instant, u64)>,
96 window_duration: Duration,
98 max_windows: usize,
100}
101
102impl ThroughputTracker {
103 pub fn new(window_duration: Duration, max_windows: usize) -> Self {
105 Self {
106 windows: VecDeque::with_capacity(max_windows),
107 window_duration,
108 max_windows,
109 }
110 }
111
112 pub fn record(&mut self) {
114 let now = Instant::now();
115
116 while let Some((ts, _)) = self.windows.front() {
118 if now.duration_since(*ts) > self.window_duration * self.max_windows as u32 {
119 self.windows.pop_front();
120 } else {
121 break;
122 }
123 }
124
125 if let Some((ts, count)) = self.windows.back_mut() {
127 if now.duration_since(*ts) < self.window_duration {
128 *count += 1;
129 return;
130 }
131 }
132
133 if self.windows.len() >= self.max_windows {
134 self.windows.pop_front();
135 }
136 self.windows.push_back((now, 1));
137 }
138
139 pub fn requests_per_second(&self) -> f64 {
141 if self.windows.is_empty() {
142 return 0.0;
143 }
144
145 let total_requests: u64 = self.windows.iter().map(|(_, count)| count).sum();
146 let total_duration =
147 if let (Some(first), Some(last)) = (self.windows.front(), self.windows.back()) {
148 last.0.duration_since(first.0).as_secs_f64()
149 } else {
150 return 0.0;
151 };
152
153 if total_duration > 0.0 {
154 total_requests as f64 / total_duration
155 } else {
156 total_requests as f64
157 }
158 }
159
160 pub fn peak_throughput(&self) -> u64 {
162 self.windows
163 .iter()
164 .map(|(_, count)| count)
165 .max()
166 .copied()
167 .unwrap_or(0)
168 }
169}
170
171#[derive(Debug, Clone)]
173pub struct HeatMapEntry {
174 pub access_count: u64,
176 pub last_access: Instant,
178 pub bytes_accessed: u64,
180}
181
182impl HeatMapEntry {
183 pub fn new() -> Self {
185 Self {
186 access_count: 0,
187 last_access: Instant::now(),
188 bytes_accessed: 0,
189 }
190 }
191
192 pub fn record_access(&mut self, bytes: u64) {
194 self.access_count += 1;
195 self.last_access = Instant::now();
196 self.bytes_accessed += bytes;
197 }
198
199 pub fn heat_score(&self, max_accesses: u64, max_age: Duration) -> f64 {
201 let frequency_score = if max_accesses > 0 {
202 (self.access_count as f64 / max_accesses as f64).min(1.0)
203 } else {
204 0.0
205 };
206
207 let age = self.last_access.elapsed();
208 let recency_score = if age < max_age {
209 1.0 - (age.as_secs_f64() / max_age.as_secs_f64())
210 } else {
211 0.0
212 };
213
214 (frequency_score * 0.6 + recency_score * 0.4).min(1.0)
215 }
216}
217
218impl Default for HeatMapEntry {
219 fn default() -> Self {
220 Self::new()
221 }
222}
223
224pub struct CacheHeatMap {
226 entries: Arc<RwLock<HashMap<CacheKey, HeatMapEntry>>>,
228 max_age: Duration,
230}
231
232impl CacheHeatMap {
233 pub fn new(max_age: Duration) -> Self {
235 Self {
236 entries: Arc::new(RwLock::new(HashMap::new())),
237 max_age,
238 }
239 }
240
241 pub async fn record_access(&self, key: CacheKey, bytes: u64) {
243 let mut entries = self.entries.write().await;
244 entries
245 .entry(key)
246 .or_insert_with(HeatMapEntry::new)
247 .record_access(bytes);
248 }
249
250 pub async fn get_heat_scores(&self) -> HashMap<CacheKey, f64> {
252 let entries = self.entries.read().await;
253
254 let max_accesses = entries.values().map(|e| e.access_count).max().unwrap_or(1);
255
256 entries
257 .iter()
258 .map(|(k, e)| (k.clone(), e.heat_score(max_accesses, self.max_age)))
259 .collect()
260 }
261
262 pub async fn get_hot_keys(&self, limit: usize) -> Vec<(CacheKey, f64)> {
264 let scores = self.get_heat_scores().await;
265 let mut sorted: Vec<_> = scores.into_iter().collect();
266 sorted.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
267 sorted.truncate(limit);
268 sorted
269 }
270
271 pub async fn cleanup(&self, max_age: Duration) {
273 let mut entries = self.entries.write().await;
274 entries.retain(|_, e| e.last_access.elapsed() < max_age);
275 }
276}
277
278pub struct RegressionDetector {
280 baseline: Arc<RwLock<HashMap<String, f64>>>,
282 recent: Arc<RwLock<HashMap<String, VecDeque<f64>>>>,
284 threshold: f64,
286 window_size: usize,
288}
289
290impl RegressionDetector {
291 pub fn new(threshold: f64, window_size: usize) -> Self {
293 Self {
294 baseline: Arc::new(RwLock::new(HashMap::new())),
295 recent: Arc::new(RwLock::new(HashMap::new())),
296 threshold,
297 window_size,
298 }
299 }
300
301 pub async fn set_baseline(&self, metric: String, value: f64) {
303 self.baseline.write().await.insert(metric, value);
304 }
305
306 pub async fn record(&self, metric: String, value: f64) {
308 let mut recent = self.recent.write().await;
309 let measurements = recent
310 .entry(metric)
311 .or_insert_with(|| VecDeque::with_capacity(self.window_size));
312
313 if measurements.len() >= self.window_size {
314 measurements.pop_front();
315 }
316 measurements.push_back(value);
317 }
318
319 pub async fn detect_regression(&self, metric: &str) -> Option<f64> {
321 let baseline = self.baseline.read().await;
322 let recent = self.recent.read().await;
323
324 let baseline_value = baseline.get(metric)?;
325 let measurements = recent.get(metric)?;
326
327 if measurements.is_empty() {
328 return None;
329 }
330
331 let recent_avg: f64 = measurements.iter().sum::<f64>() / measurements.len() as f64;
333
334 let regression = if *baseline_value > 0.0 {
336 ((recent_avg - baseline_value) / baseline_value) * 100.0
337 } else {
338 0.0
339 };
340
341 if regression > self.threshold {
343 Some(regression)
344 } else {
345 None
346 }
347 }
348
349 pub async fn get_regressions(&self) -> HashMap<String, f64> {
351 let baseline = self.baseline.read().await;
352 let mut regressions = HashMap::new();
353
354 for metric in baseline.keys() {
355 if let Some(regression) = self.detect_regression(metric).await {
356 regressions.insert(metric.clone(), regression);
357 }
358 }
359
360 regressions
361 }
362}
363
364#[derive(Debug, Clone)]
366pub struct AlertRule {
367 pub metric: String,
369 pub threshold: f64,
371 pub operator: ComparisonOp,
373 pub duration: Duration,
375}
376
377#[derive(Debug, Clone, Copy, PartialEq, Eq)]
379pub enum ComparisonOp {
380 GreaterThan,
382 LessThan,
384 EqualTo,
386}
387
388impl ComparisonOp {
389 pub fn evaluate(&self, value: f64, threshold: f64) -> bool {
391 match self {
392 ComparisonOp::GreaterThan => value > threshold,
393 ComparisonOp::LessThan => value < threshold,
394 ComparisonOp::EqualTo => (value - threshold).abs() < f64::EPSILON,
395 }
396 }
397}
398
399pub struct AlertManager {
401 rules: Arc<RwLock<Vec<AlertRule>>>,
403 active_alerts: Arc<RwLock<HashMap<String, Instant>>>,
405}
406
407impl AlertManager {
408 pub fn new() -> Self {
410 Self {
411 rules: Arc::new(RwLock::new(Vec::new())),
412 active_alerts: Arc::new(RwLock::new(HashMap::new())),
413 }
414 }
415
416 pub async fn add_rule(&self, rule: AlertRule) {
418 self.rules.write().await.push(rule);
419 }
420
421 pub async fn evaluate(&self, metrics: &HashMap<String, f64>) -> Vec<String> {
423 let rules = self.rules.read().await;
424 let mut active = self.active_alerts.write().await;
425 let mut triggered = Vec::new();
426
427 for rule in rules.iter() {
428 if let Some(&value) = metrics.get(&rule.metric) {
429 if rule.operator.evaluate(value, rule.threshold) {
430 let start_time = active
432 .entry(rule.metric.clone())
433 .or_insert_with(Instant::now);
434
435 if start_time.elapsed() >= rule.duration {
436 triggered.push(format!(
437 "Alert: {} = {} (threshold: {})",
438 rule.metric, value, rule.threshold
439 ));
440 }
441 } else {
442 active.remove(&rule.metric);
444 }
445 }
446 }
447
448 triggered
449 }
450
451 pub async fn get_active_alerts(&self) -> Vec<String> {
453 self.active_alerts.read().await.keys().cloned().collect()
454 }
455}
456
457impl Default for AlertManager {
458 fn default() -> Self {
459 Self::new()
460 }
461}
462
463#[cfg(test)]
464mod tests {
465 use super::*;
466
467 #[test]
468 fn test_latency_stats() {
469 let mut stats = LatencyStats::new(100);
470
471 for i in 1..=100 {
472 stats.record(i * 10);
473 }
474
475 assert_eq!(stats.min(), Some(10));
476 assert_eq!(stats.max(), Some(1000));
477 assert!(stats.p50().is_some());
478 assert!(stats.p95().is_some());
479 assert!(stats.p99().is_some());
480 }
481
482 #[test]
483 fn test_throughput_tracker() {
484 let mut tracker = ThroughputTracker::new(Duration::from_secs(1), 10);
485
486 for _ in 0..100 {
487 tracker.record();
488 }
489
490 let rps = tracker.requests_per_second();
491 assert!(rps > 0.0);
492 }
493
494 #[tokio::test]
495 async fn test_heat_map() {
496 let heat_map = CacheHeatMap::new(Duration::from_secs(60));
497
498 heat_map.record_access("key1".to_string(), 1024).await;
499 heat_map.record_access("key1".to_string(), 1024).await;
500 heat_map.record_access("key2".to_string(), 512).await;
501
502 let hot_keys = heat_map.get_hot_keys(2).await;
503 assert!(!hot_keys.is_empty());
504 }
505
506 #[tokio::test]
507 async fn test_regression_detector() {
508 let detector = RegressionDetector::new(10.0, 5);
509
510 detector.set_baseline("latency".to_string(), 100.0).await;
511
512 for _ in 0..5 {
513 detector.record("latency".to_string(), 120.0).await;
514 }
515
516 let regression = detector.detect_regression("latency").await;
517 assert!(regression.is_some());
518 }
519
520 #[tokio::test]
521 async fn test_alert_manager() {
522 let manager = AlertManager::new();
523
524 let rule = AlertRule {
525 metric: "error_rate".to_string(),
526 threshold: 5.0,
527 operator: ComparisonOp::GreaterThan,
528 duration: Duration::from_secs(0),
529 };
530
531 manager.add_rule(rule).await;
532
533 let mut metrics = HashMap::new();
534 metrics.insert("error_rate".to_string(), 10.0);
535
536 let alerts = manager.evaluate(&metrics).await;
537 assert!(!alerts.is_empty());
538 }
539}