Skip to main content

oxigdal_cache_advanced/
observability.rs

1//! Cache observability and monitoring
2//!
3//! Provides comprehensive monitoring capabilities:
4//! - Detailed metrics (latency percentiles, throughput)
5//! - Cache heat map visualization
6//! - Performance regression detection
7//! - Real-time alerting
8//! - Distributed tracing integration
9
10use crate::multi_tier::CacheKey;
11use std::collections::{HashMap, VecDeque};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::sync::RwLock;
15
16/// Latency percentile statistics
17#[derive(Debug, Clone)]
18pub struct LatencyStats {
19    /// Sorted latency samples (microseconds)
20    samples: VecDeque<u64>,
21    /// Maximum samples to keep
22    max_samples: usize,
23}
24
25impl LatencyStats {
26    /// Create new latency stats
27    pub fn new(max_samples: usize) -> Self {
28        Self {
29            samples: VecDeque::with_capacity(max_samples),
30            max_samples,
31        }
32    }
33
34    /// Record latency sample
35    pub fn record(&mut self, latency_us: u64) {
36        if self.samples.len() >= self.max_samples {
37            self.samples.pop_front();
38        }
39        self.samples.push_back(latency_us);
40    }
41
42    /// Calculate percentile
43    pub fn percentile(&self, p: f64) -> Option<u64> {
44        if self.samples.is_empty() {
45            return None;
46        }
47
48        let mut sorted: Vec<u64> = self.samples.iter().copied().collect();
49        sorted.sort_unstable();
50
51        let index = ((sorted.len() as f64 * p / 100.0).ceil() as usize).saturating_sub(1);
52        sorted.get(index).copied()
53    }
54
55    /// Get p50 (median)
56    pub fn p50(&self) -> Option<u64> {
57        self.percentile(50.0)
58    }
59
60    /// Get p95
61    pub fn p95(&self) -> Option<u64> {
62        self.percentile(95.0)
63    }
64
65    /// Get p99
66    pub fn p99(&self) -> Option<u64> {
67        self.percentile(99.0)
68    }
69
70    /// Get minimum latency
71    pub fn min(&self) -> Option<u64> {
72        self.samples.iter().min().copied()
73    }
74
75    /// Get maximum latency
76    pub fn max(&self) -> Option<u64> {
77        self.samples.iter().max().copied()
78    }
79
80    /// Get average latency
81    pub fn avg(&self) -> Option<f64> {
82        if self.samples.is_empty() {
83            None
84        } else {
85            let sum: u64 = self.samples.iter().sum();
86            Some(sum as f64 / self.samples.len() as f64)
87        }
88    }
89}
90
91/// Throughput tracker
92#[derive(Debug, Clone)]
93pub struct ThroughputTracker {
94    /// Request counts in time windows
95    windows: VecDeque<(Instant, u64)>,
96    /// Window duration
97    window_duration: Duration,
98    /// Maximum windows to keep
99    max_windows: usize,
100}
101
102impl ThroughputTracker {
103    /// Create new throughput tracker
104    pub fn new(window_duration: Duration, max_windows: usize) -> Self {
105        Self {
106            windows: VecDeque::with_capacity(max_windows),
107            window_duration,
108            max_windows,
109        }
110    }
111
112    /// Record request
113    pub fn record(&mut self) {
114        let now = Instant::now();
115
116        // Clean old windows
117        while let Some((ts, _)) = self.windows.front() {
118            if now.duration_since(*ts) > self.window_duration * self.max_windows as u32 {
119                self.windows.pop_front();
120            } else {
121                break;
122            }
123        }
124
125        // Update or create current window
126        if let Some((ts, count)) = self.windows.back_mut() {
127            if now.duration_since(*ts) < self.window_duration {
128                *count += 1;
129                return;
130            }
131        }
132
133        if self.windows.len() >= self.max_windows {
134            self.windows.pop_front();
135        }
136        self.windows.push_back((now, 1));
137    }
138
139    /// Calculate requests per second
140    pub fn requests_per_second(&self) -> f64 {
141        if self.windows.is_empty() {
142            return 0.0;
143        }
144
145        let total_requests: u64 = self.windows.iter().map(|(_, count)| count).sum();
146        let total_duration =
147            if let (Some(first), Some(last)) = (self.windows.front(), self.windows.back()) {
148                last.0.duration_since(first.0).as_secs_f64()
149            } else {
150                return 0.0;
151            };
152
153        if total_duration > 0.0 {
154            total_requests as f64 / total_duration
155        } else {
156            total_requests as f64
157        }
158    }
159
160    /// Get peak throughput
161    pub fn peak_throughput(&self) -> u64 {
162        self.windows
163            .iter()
164            .map(|(_, count)| count)
165            .max()
166            .copied()
167            .unwrap_or(0)
168    }
169}
170
171/// Cache heat map for visualizing access patterns
172#[derive(Debug, Clone)]
173pub struct HeatMapEntry {
174    /// Access count
175    pub access_count: u64,
176    /// Last access time
177    pub last_access: Instant,
178    /// Total bytes accessed
179    pub bytes_accessed: u64,
180}
181
182impl HeatMapEntry {
183    /// Create new entry
184    pub fn new() -> Self {
185        Self {
186            access_count: 0,
187            last_access: Instant::now(),
188            bytes_accessed: 0,
189        }
190    }
191
192    /// Record access
193    pub fn record_access(&mut self, bytes: u64) {
194        self.access_count += 1;
195        self.last_access = Instant::now();
196        self.bytes_accessed += bytes;
197    }
198
199    /// Calculate heat score (0.0 to 1.0)
200    pub fn heat_score(&self, max_accesses: u64, max_age: Duration) -> f64 {
201        let frequency_score = if max_accesses > 0 {
202            (self.access_count as f64 / max_accesses as f64).min(1.0)
203        } else {
204            0.0
205        };
206
207        let age = self.last_access.elapsed();
208        let recency_score = if age < max_age {
209            1.0 - (age.as_secs_f64() / max_age.as_secs_f64())
210        } else {
211            0.0
212        };
213
214        (frequency_score * 0.6 + recency_score * 0.4).min(1.0)
215    }
216}
217
218impl Default for HeatMapEntry {
219    fn default() -> Self {
220        Self::new()
221    }
222}
223
224/// Cache heat map
225pub struct CacheHeatMap {
226    /// Heat map entries
227    entries: Arc<RwLock<HashMap<CacheKey, HeatMapEntry>>>,
228    /// Maximum age for recency calculation
229    max_age: Duration,
230}
231
232impl CacheHeatMap {
233    /// Create new heat map
234    pub fn new(max_age: Duration) -> Self {
235        Self {
236            entries: Arc::new(RwLock::new(HashMap::new())),
237            max_age,
238        }
239    }
240
241    /// Record access
242    pub async fn record_access(&self, key: CacheKey, bytes: u64) {
243        let mut entries = self.entries.write().await;
244        entries
245            .entry(key)
246            .or_insert_with(HeatMapEntry::new)
247            .record_access(bytes);
248    }
249
250    /// Get heat scores for all keys
251    pub async fn get_heat_scores(&self) -> HashMap<CacheKey, f64> {
252        let entries = self.entries.read().await;
253
254        let max_accesses = entries.values().map(|e| e.access_count).max().unwrap_or(1);
255
256        entries
257            .iter()
258            .map(|(k, e)| (k.clone(), e.heat_score(max_accesses, self.max_age)))
259            .collect()
260    }
261
262    /// Get hottest keys
263    pub async fn get_hot_keys(&self, limit: usize) -> Vec<(CacheKey, f64)> {
264        let scores = self.get_heat_scores().await;
265        let mut sorted: Vec<_> = scores.into_iter().collect();
266        sorted.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
267        sorted.truncate(limit);
268        sorted
269    }
270
271    /// Clear old entries
272    pub async fn cleanup(&self, max_age: Duration) {
273        let mut entries = self.entries.write().await;
274        entries.retain(|_, e| e.last_access.elapsed() < max_age);
275    }
276}
277
278/// Performance regression detector
279pub struct RegressionDetector {
280    /// Historical baseline (metric -> value)
281    baseline: Arc<RwLock<HashMap<String, f64>>>,
282    /// Recent measurements
283    recent: Arc<RwLock<HashMap<String, VecDeque<f64>>>>,
284    /// Regression threshold (percentage)
285    threshold: f64,
286    /// Window size for recent measurements
287    window_size: usize,
288}
289
290impl RegressionDetector {
291    /// Create new regression detector
292    pub fn new(threshold: f64, window_size: usize) -> Self {
293        Self {
294            baseline: Arc::new(RwLock::new(HashMap::new())),
295            recent: Arc::new(RwLock::new(HashMap::new())),
296            threshold,
297            window_size,
298        }
299    }
300
301    /// Set baseline for metric
302    pub async fn set_baseline(&self, metric: String, value: f64) {
303        self.baseline.write().await.insert(metric, value);
304    }
305
306    /// Record measurement
307    pub async fn record(&self, metric: String, value: f64) {
308        let mut recent = self.recent.write().await;
309        let measurements = recent
310            .entry(metric)
311            .or_insert_with(|| VecDeque::with_capacity(self.window_size));
312
313        if measurements.len() >= self.window_size {
314            measurements.pop_front();
315        }
316        measurements.push_back(value);
317    }
318
319    /// Detect regression
320    pub async fn detect_regression(&self, metric: &str) -> Option<f64> {
321        let baseline = self.baseline.read().await;
322        let recent = self.recent.read().await;
323
324        let baseline_value = baseline.get(metric)?;
325        let measurements = recent.get(metric)?;
326
327        if measurements.is_empty() {
328            return None;
329        }
330
331        // Calculate recent average
332        let recent_avg: f64 = measurements.iter().sum::<f64>() / measurements.len() as f64;
333
334        // Calculate regression percentage
335        let regression = if *baseline_value > 0.0 {
336            ((recent_avg - baseline_value) / baseline_value) * 100.0
337        } else {
338            0.0
339        };
340
341        // Return regression if it exceeds threshold
342        if regression > self.threshold {
343            Some(regression)
344        } else {
345            None
346        }
347    }
348
349    /// Get all detected regressions
350    pub async fn get_regressions(&self) -> HashMap<String, f64> {
351        let baseline = self.baseline.read().await;
352        let mut regressions = HashMap::new();
353
354        for metric in baseline.keys() {
355            if let Some(regression) = self.detect_regression(metric).await {
356                regressions.insert(metric.clone(), regression);
357            }
358        }
359
360        regressions
361    }
362}
363
364/// Alert rule
365#[derive(Debug, Clone)]
366pub struct AlertRule {
367    /// Metric name
368    pub metric: String,
369    /// Threshold value
370    pub threshold: f64,
371    /// Comparison operator
372    pub operator: ComparisonOp,
373    /// Minimum duration before alerting
374    pub duration: Duration,
375}
376
377/// Comparison operator
378#[derive(Debug, Clone, Copy, PartialEq, Eq)]
379pub enum ComparisonOp {
380    /// Greater than
381    GreaterThan,
382    /// Less than
383    LessThan,
384    /// Equal to
385    EqualTo,
386}
387
388impl ComparisonOp {
389    /// Evaluate comparison
390    pub fn evaluate(&self, value: f64, threshold: f64) -> bool {
391        match self {
392            ComparisonOp::GreaterThan => value > threshold,
393            ComparisonOp::LessThan => value < threshold,
394            ComparisonOp::EqualTo => (value - threshold).abs() < f64::EPSILON,
395        }
396    }
397}
398
399/// Real-time alerting system
400pub struct AlertManager {
401    /// Alert rules
402    rules: Arc<RwLock<Vec<AlertRule>>>,
403    /// Active alerts (metric -> start time)
404    active_alerts: Arc<RwLock<HashMap<String, Instant>>>,
405}
406
407impl AlertManager {
408    /// Create new alert manager
409    pub fn new() -> Self {
410        Self {
411            rules: Arc::new(RwLock::new(Vec::new())),
412            active_alerts: Arc::new(RwLock::new(HashMap::new())),
413        }
414    }
415
416    /// Add alert rule
417    pub async fn add_rule(&self, rule: AlertRule) {
418        self.rules.write().await.push(rule);
419    }
420
421    /// Evaluate metrics against rules
422    pub async fn evaluate(&self, metrics: &HashMap<String, f64>) -> Vec<String> {
423        let rules = self.rules.read().await;
424        let mut active = self.active_alerts.write().await;
425        let mut triggered = Vec::new();
426
427        for rule in rules.iter() {
428            if let Some(&value) = metrics.get(&rule.metric) {
429                if rule.operator.evaluate(value, rule.threshold) {
430                    // Check duration
431                    let start_time = active
432                        .entry(rule.metric.clone())
433                        .or_insert_with(Instant::now);
434
435                    if start_time.elapsed() >= rule.duration {
436                        triggered.push(format!(
437                            "Alert: {} = {} (threshold: {})",
438                            rule.metric, value, rule.threshold
439                        ));
440                    }
441                } else {
442                    // Condition no longer met, clear alert
443                    active.remove(&rule.metric);
444                }
445            }
446        }
447
448        triggered
449    }
450
451    /// Get active alerts
452    pub async fn get_active_alerts(&self) -> Vec<String> {
453        self.active_alerts.read().await.keys().cloned().collect()
454    }
455}
456
457impl Default for AlertManager {
458    fn default() -> Self {
459        Self::new()
460    }
461}
462
463#[cfg(test)]
464mod tests {
465    use super::*;
466
467    #[test]
468    fn test_latency_stats() {
469        let mut stats = LatencyStats::new(100);
470
471        for i in 1..=100 {
472            stats.record(i * 10);
473        }
474
475        assert_eq!(stats.min(), Some(10));
476        assert_eq!(stats.max(), Some(1000));
477        assert!(stats.p50().is_some());
478        assert!(stats.p95().is_some());
479        assert!(stats.p99().is_some());
480    }
481
482    #[test]
483    fn test_throughput_tracker() {
484        let mut tracker = ThroughputTracker::new(Duration::from_secs(1), 10);
485
486        for _ in 0..100 {
487            tracker.record();
488        }
489
490        let rps = tracker.requests_per_second();
491        assert!(rps > 0.0);
492    }
493
494    #[tokio::test]
495    async fn test_heat_map() {
496        let heat_map = CacheHeatMap::new(Duration::from_secs(60));
497
498        heat_map.record_access("key1".to_string(), 1024).await;
499        heat_map.record_access("key1".to_string(), 1024).await;
500        heat_map.record_access("key2".to_string(), 512).await;
501
502        let hot_keys = heat_map.get_hot_keys(2).await;
503        assert!(!hot_keys.is_empty());
504    }
505
506    #[tokio::test]
507    async fn test_regression_detector() {
508        let detector = RegressionDetector::new(10.0, 5);
509
510        detector.set_baseline("latency".to_string(), 100.0).await;
511
512        for _ in 0..5 {
513            detector.record("latency".to_string(), 120.0).await;
514        }
515
516        let regression = detector.detect_regression("latency").await;
517        assert!(regression.is_some());
518    }
519
520    #[tokio::test]
521    async fn test_alert_manager() {
522        let manager = AlertManager::new();
523
524        let rule = AlertRule {
525            metric: "error_rate".to_string(),
526            threshold: 5.0,
527            operator: ComparisonOp::GreaterThan,
528            duration: Duration::from_secs(0),
529        };
530
531        manager.add_rule(rule).await;
532
533        let mut metrics = HashMap::new();
534        metrics.insert("error_rate".to_string(), 10.0);
535
536        let alerts = manager.evaluate(&metrics).await;
537        assert!(!alerts.is_empty());
538    }
539}