Skip to main content

rez_next_cache/
performance_monitor.rs

1//! Performance Monitor
2//!
3//! This module provides comprehensive performance monitoring and benchmarking
4//! for the intelligent cache system.
5
6use crate::{MonitoringConfig, PerformanceMetrics};
7use serde::{Deserialize, Serialize};
8use std::{
9    collections::{HashMap, VecDeque},
10    sync::{
11        atomic::{AtomicU64, Ordering},
12        Arc, RwLock,
13    },
14    time::{Duration, Instant, SystemTime},
15};
16use tokio::time::Interval;
17
18/// Real-time performance counters
19#[derive(Debug)]
20pub struct PerformanceCounters {
21    /// Total get operations
22    pub get_operations: AtomicU64,
23    /// Total put operations
24    pub put_operations: AtomicU64,
25    /// Total remove operations
26    pub remove_operations: AtomicU64,
27    /// Total eviction operations
28    pub eviction_operations: AtomicU64,
29    /// Total get latency (microseconds)
30    pub total_get_latency_us: AtomicU64,
31    /// Total put latency (microseconds)
32    pub total_put_latency_us: AtomicU64,
33    /// Total remove latency (microseconds)
34    pub total_remove_latency_us: AtomicU64,
35    /// Total eviction latency (microseconds)
36    pub total_eviction_latency_us: AtomicU64,
37    /// Cache hit count
38    pub hit_count: AtomicU64,
39    /// Cache miss count
40    pub miss_count: AtomicU64,
41    /// Total bytes allocated (cumulative)
42    pub total_bytes_allocated: AtomicU64,
43    /// Peak memory usage
44    pub peak_memory_usage: AtomicU64,
45    /// Current memory usage
46    pub current_memory_usage: AtomicU64,
47    /// Total bytes read from disk
48    pub disk_bytes_read: AtomicU64,
49    /// Total bytes written to disk
50    pub disk_bytes_written: AtomicU64,
51}
52
53impl Default for PerformanceCounters {
54    fn default() -> Self {
55        Self {
56            get_operations: AtomicU64::new(0),
57            put_operations: AtomicU64::new(0),
58            remove_operations: AtomicU64::new(0),
59            eviction_operations: AtomicU64::new(0),
60            total_get_latency_us: AtomicU64::new(0),
61            total_put_latency_us: AtomicU64::new(0),
62            total_remove_latency_us: AtomicU64::new(0),
63            total_eviction_latency_us: AtomicU64::new(0),
64            hit_count: AtomicU64::new(0),
65            miss_count: AtomicU64::new(0),
66            total_bytes_allocated: AtomicU64::new(0),
67            peak_memory_usage: AtomicU64::new(0),
68            current_memory_usage: AtomicU64::new(0),
69            disk_bytes_read: AtomicU64::new(0),
70            disk_bytes_written: AtomicU64::new(0),
71        }
72    }
73}
74
75/// Performance event for detailed logging
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct PerformanceEvent {
78    /// Event timestamp
79    pub timestamp: SystemTime,
80    /// Event type
81    pub event_type: PerformanceEventType,
82    /// Operation latency (microseconds)
83    pub latency_us: u64,
84    /// Memory usage at time of event
85    pub memory_usage: u64,
86    /// Additional metadata
87    pub metadata: HashMap<String, String>,
88}
89
90/// Types of performance events
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub enum PerformanceEventType {
93    /// Cache get operation
94    CacheGet,
95    /// Cache put operation
96    CachePut,
97    /// Cache remove operation
98    CacheRemove,
99    /// Cache eviction
100    CacheEviction,
101    /// Cache promotion (L2 to L1)
102    CachePromotion,
103    /// Cache demotion (L1 to L2)
104    CacheDemotion,
105    /// Predictive preheating
106    PredictivePreheating,
107    /// Adaptive tuning
108    AdaptiveTuning,
109}
110
111/// Benchmark result
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct BenchmarkResult {
114    /// Benchmark name
115    pub name: String,
116    /// Operations per second
117    pub ops_per_second: f64,
118    /// Average latency (microseconds)
119    pub avg_latency_us: f64,
120    /// 95th percentile latency (microseconds)
121    pub p95_latency_us: f64,
122    /// 99th percentile latency (microseconds)
123    pub p99_latency_us: f64,
124    /// Memory usage (bytes)
125    pub memory_usage: u64,
126    /// Hit rate achieved
127    pub hit_rate: f64,
128    /// Test duration
129    pub duration: Duration,
130    /// Timestamp
131    pub timestamp: SystemTime,
132}
133
134/// Performance Monitor
135///
136/// Provides comprehensive monitoring, metrics collection, and benchmarking
137/// for the intelligent cache system.
138#[derive(Debug)]
139pub struct PerformanceMonitor {
140    /// Configuration
141    config: MonitoringConfig,
142    /// Real-time performance counters
143    counters: Arc<PerformanceCounters>,
144    /// Performance event log
145    event_log: Arc<RwLock<VecDeque<PerformanceEvent>>>,
146    /// Latency histogram buckets (microseconds)
147    latency_histogram: Arc<RwLock<HashMap<u64, u64>>>,
148    /// Benchmark results history
149    benchmark_history: Arc<RwLock<Vec<BenchmarkResult>>>,
150    /// Monitoring start time
151    start_time: Instant,
152    /// Background monitoring interval
153    _monitoring_interval: Option<Interval>,
154}
155
156impl PerformanceMonitor {
157    /// Create a new performance monitor
158    pub fn new(config: MonitoringConfig) -> Self {
159        Self {
160            config,
161            counters: Arc::new(PerformanceCounters::default()),
162            event_log: Arc::new(RwLock::new(VecDeque::new())),
163            latency_histogram: Arc::new(RwLock::new(HashMap::new())),
164            benchmark_history: Arc::new(RwLock::new(Vec::new())),
165            start_time: Instant::now(),
166            _monitoring_interval: None,
167        }
168    }
169
170    /// Record a get operation latency
171    pub async fn record_get_latency(&self, latency: Duration) {
172        let latency_us = latency.as_micros() as u64;
173
174        self.counters.get_operations.fetch_add(1, Ordering::Relaxed);
175        self.counters
176            .total_get_latency_us
177            .fetch_add(latency_us, Ordering::Relaxed);
178
179        self.update_latency_histogram(latency_us).await;
180
181        if self.config.enable_event_logging {
182            self.log_event(PerformanceEvent {
183                timestamp: SystemTime::now(),
184                event_type: PerformanceEventType::CacheGet,
185                latency_us,
186                memory_usage: self.counters.current_memory_usage.load(Ordering::Relaxed),
187                metadata: HashMap::new(),
188            })
189            .await;
190        }
191    }
192
193    /// Record a put operation latency
194    pub async fn record_put_latency(&self, latency: Duration) {
195        let latency_us = latency.as_micros() as u64;
196
197        self.counters.put_operations.fetch_add(1, Ordering::Relaxed);
198        self.counters
199            .total_put_latency_us
200            .fetch_add(latency_us, Ordering::Relaxed);
201
202        self.update_latency_histogram(latency_us).await;
203
204        if self.config.enable_event_logging {
205            self.log_event(PerformanceEvent {
206                timestamp: SystemTime::now(),
207                event_type: PerformanceEventType::CachePut,
208                latency_us,
209                memory_usage: self.counters.current_memory_usage.load(Ordering::Relaxed),
210                metadata: HashMap::new(),
211            })
212            .await;
213        }
214    }
215
216    /// Record a remove operation latency
217    pub async fn record_remove_latency(&self, latency: Duration) {
218        let latency_us = latency.as_micros() as u64;
219
220        self.counters
221            .remove_operations
222            .fetch_add(1, Ordering::Relaxed);
223        self.counters
224            .total_remove_latency_us
225            .fetch_add(latency_us, Ordering::Relaxed);
226
227        self.update_latency_histogram(latency_us).await;
228
229        if self.config.enable_event_logging {
230            self.log_event(PerformanceEvent {
231                timestamp: SystemTime::now(),
232                event_type: PerformanceEventType::CacheRemove,
233                latency_us,
234                memory_usage: self.counters.current_memory_usage.load(Ordering::Relaxed),
235                metadata: HashMap::new(),
236            })
237            .await;
238        }
239    }
240
241    /// Update memory usage
242    pub async fn update_memory_usage(&self, current_usage: u64) {
243        self.counters
244            .current_memory_usage
245            .store(current_usage, Ordering::Relaxed);
246
247        // Update peak if necessary
248        let current_peak = self.counters.peak_memory_usage.load(Ordering::Relaxed);
249        if current_usage > current_peak {
250            self.counters
251                .peak_memory_usage
252                .store(current_usage, Ordering::Relaxed);
253        }
254    }
255
256    /// Record disk I/O
257    pub async fn record_disk_io(&self, bytes_read: u64, bytes_written: u64) {
258        self.counters
259            .disk_bytes_read
260            .fetch_add(bytes_read, Ordering::Relaxed);
261        self.counters
262            .disk_bytes_written
263            .fetch_add(bytes_written, Ordering::Relaxed);
264    }
265
266    /// Record an eviction operation latency
267    pub async fn record_eviction_latency(&self, latency: Duration) {
268        let latency_us = latency.as_micros() as u64;
269        self.counters
270            .eviction_operations
271            .fetch_add(1, Ordering::Relaxed);
272        self.counters
273            .total_eviction_latency_us
274            .fetch_add(latency_us, Ordering::Relaxed);
275
276        self.update_latency_histogram(latency_us).await;
277
278        if self.config.enable_event_logging {
279            self.log_event(PerformanceEvent {
280                timestamp: SystemTime::now(),
281                event_type: PerformanceEventType::CacheEviction,
282                latency_us,
283                memory_usage: self.counters.current_memory_usage.load(Ordering::Relaxed),
284                metadata: HashMap::new(),
285            })
286            .await;
287        }
288    }
289
290    /// Record a cache hit
291    pub fn record_cache_hit(&self) {
292        self.counters.hit_count.fetch_add(1, Ordering::Relaxed);
293    }
294
295    /// Record a cache miss
296    pub fn record_cache_miss(&self) {
297        self.counters.miss_count.fetch_add(1, Ordering::Relaxed);
298    }
299
300    /// Record memory allocation
301    pub fn record_allocation(&self, bytes: u64) {
302        self.counters
303            .total_bytes_allocated
304            .fetch_add(bytes, Ordering::Relaxed);
305    }
306
307    /// Get current cache hit rate (0.0–1.0)
308    pub fn hit_rate(&self) -> f64 {
309        let hits = self.counters.hit_count.load(Ordering::Relaxed);
310        let misses = self.counters.miss_count.load(Ordering::Relaxed);
311        let total = hits + misses;
312        if total > 0 {
313            hits as f64 / total as f64
314        } else {
315            0.0
316        }
317    }
318
319    /// Log a performance event
320    async fn log_event(&self, event: PerformanceEvent) {
321        if !self.config.enable_event_logging {
322            return;
323        }
324
325        let mut log = self.event_log.write().unwrap();
326        log.push_back(event);
327
328        // Keep log size manageable
329        while log.len() > self.config.max_events_in_memory {
330            log.pop_front();
331        }
332    }
333
334    /// Update latency histogram
335    async fn update_latency_histogram(&self, latency_us: u64) {
336        // Round to nearest bucket (powers of 2)
337        let bucket = if latency_us == 0 {
338            0
339        } else {
340            1u64 << (64 - latency_us.leading_zeros())
341        };
342
343        let mut histogram = self.latency_histogram.write().unwrap();
344        *histogram.entry(bucket).or_insert(0) += 1;
345    }
346
347    /// Get current performance metrics
348    pub async fn get_performance_metrics(&self) -> PerformanceMetrics {
349        let get_ops = self.counters.get_operations.load(Ordering::Relaxed);
350        let put_ops = self.counters.put_operations.load(Ordering::Relaxed);
351        let remove_ops = self.counters.remove_operations.load(Ordering::Relaxed);
352        let eviction_ops = self.counters.eviction_operations.load(Ordering::Relaxed);
353
354        let total_get_latency = self.counters.total_get_latency_us.load(Ordering::Relaxed);
355        let total_put_latency = self.counters.total_put_latency_us.load(Ordering::Relaxed);
356        let total_eviction_latency = self
357            .counters
358            .total_eviction_latency_us
359            .load(Ordering::Relaxed);
360
361        let elapsed_secs = self.start_time.elapsed().as_secs_f64();
362        let total_ops = get_ops + put_ops + remove_ops;
363
364        // Memory allocation rate: bytes allocated per second
365        let total_allocated = self.counters.total_bytes_allocated.load(Ordering::Relaxed);
366        let memory_allocation_rate = if elapsed_secs > 0.0 {
367            total_allocated as f64 / elapsed_secs
368        } else {
369            0.0
370        };
371
372        PerformanceMetrics {
373            avg_get_latency_us: if get_ops > 0 {
374                total_get_latency as f64 / get_ops as f64
375            } else {
376                0.0
377            },
378            avg_put_latency_us: if put_ops > 0 {
379                total_put_latency as f64 / put_ops as f64
380            } else {
381                0.0
382            },
383            avg_eviction_latency_us: if eviction_ops > 0 {
384                total_eviction_latency as f64 / eviction_ops as f64
385            } else {
386                0.0
387            },
388            ops_per_second: if elapsed_secs > 0.0 {
389                total_ops as f64 / elapsed_secs
390            } else {
391                0.0
392            },
393            memory_allocation_rate,
394            disk_io_rate: if elapsed_secs > 0.0 {
395                (self.counters.disk_bytes_read.load(Ordering::Relaxed)
396                    + self.counters.disk_bytes_written.load(Ordering::Relaxed))
397                    as f64
398                    / elapsed_secs
399            } else {
400                0.0
401            },
402            // CPU usage: not reliably available without an OS-specific crate.
403            // We approximate using elapsed time vs. total operation time.
404            cpu_usage_percent: {
405                let total_op_latency_us = total_get_latency
406                    + total_put_latency
407                    + total_eviction_latency
408                    + self
409                        .counters
410                        .total_remove_latency_us
411                        .load(Ordering::Relaxed);
412                let elapsed_us = self.start_time.elapsed().as_micros() as f64;
413                if elapsed_us > 0.0 {
414                    (total_op_latency_us as f64 / elapsed_us * 100.0).min(100.0)
415                } else {
416                    0.0
417                }
418            },
419            peak_memory_usage: self.counters.peak_memory_usage.load(Ordering::Relaxed),
420        }
421    }
422
423    /// Run a benchmark
424    pub async fn run_benchmark<F, Fut>(&self, name: &str, benchmark_fn: F) -> BenchmarkResult
425    where
426        F: Fn() -> Fut,
427        Fut: std::future::Future<Output = ()>,
428    {
429        let start_time = Instant::now();
430        let start_ops = self.counters.get_operations.load(Ordering::Relaxed)
431            + self.counters.put_operations.load(Ordering::Relaxed);
432
433        // Reset latency histogram for this benchmark
434        {
435            let mut histogram = self.latency_histogram.write().unwrap();
436            histogram.clear();
437        }
438
439        // Run the benchmark
440        benchmark_fn().await;
441
442        let duration = start_time.elapsed();
443        let end_ops = self.counters.get_operations.load(Ordering::Relaxed)
444            + self.counters.put_operations.load(Ordering::Relaxed);
445
446        let ops_performed = end_ops - start_ops;
447        let ops_per_second = if duration.as_secs_f64() > 0.0 {
448            ops_performed as f64 / duration.as_secs_f64()
449        } else {
450            0.0
451        };
452
453        // Calculate latency percentiles
454        let (avg_latency, p95_latency, p99_latency) = self.calculate_latency_percentiles().await;
455
456        let result = BenchmarkResult {
457            name: name.to_string(),
458            ops_per_second,
459            avg_latency_us: avg_latency,
460            p95_latency_us: p95_latency,
461            p99_latency_us: p99_latency,
462            memory_usage: self.counters.current_memory_usage.load(Ordering::Relaxed),
463            hit_rate: self.hit_rate(),
464            duration,
465            timestamp: SystemTime::now(),
466        };
467
468        // Store benchmark result
469        {
470            let mut history = self.benchmark_history.write().unwrap();
471            history.push(result.clone());
472        }
473
474        result
475    }
476
477    /// Calculate latency percentiles from histogram
478    async fn calculate_latency_percentiles(&self) -> (f64, f64, f64) {
479        let histogram = self.latency_histogram.read().unwrap();
480
481        if histogram.is_empty() {
482            return (0.0, 0.0, 0.0);
483        }
484
485        // Convert histogram to sorted vector
486        let mut latencies: Vec<(u64, u64)> = histogram
487            .iter()
488            .map(|(&latency, &count)| (latency, count))
489            .collect();
490        latencies.sort_by_key(|&(latency, _)| latency);
491
492        let total_samples: u64 = latencies.iter().map(|(_, count)| count).sum();
493        if total_samples == 0 {
494            return (0.0, 0.0, 0.0);
495        }
496
497        // Calculate weighted average
498        let weighted_sum: u64 = latencies
499            .iter()
500            .map(|(latency, count)| latency * count)
501            .sum();
502        let avg_latency = weighted_sum as f64 / total_samples as f64;
503
504        // Calculate percentiles
505        let p95_target = (total_samples as f64 * 0.95) as u64;
506        let p99_target = (total_samples as f64 * 0.99) as u64;
507
508        let mut cumulative = 0u64;
509        let mut p95_latency = 0.0;
510        let mut p99_latency = 0.0;
511
512        for &(latency, count) in &latencies {
513            cumulative += count;
514
515            if p95_latency == 0.0 && cumulative >= p95_target {
516                p95_latency = latency as f64;
517            }
518
519            if p99_latency == 0.0 && cumulative >= p99_target {
520                p99_latency = latency as f64;
521                break;
522            }
523        }
524
525        (avg_latency, p95_latency, p99_latency)
526    }
527
528    /// Get recent performance events
529    pub async fn get_recent_events(&self, limit: usize) -> Vec<PerformanceEvent> {
530        let log = self.event_log.read().unwrap();
531        log.iter().rev().take(limit).cloned().collect()
532    }
533
534    /// Get benchmark history
535    pub async fn get_benchmark_history(&self) -> Vec<BenchmarkResult> {
536        self.benchmark_history.read().unwrap().clone()
537    }
538
539    /// Reset all counters and statistics
540    pub async fn reset(&self) {
541        // Reset counters
542        self.counters.get_operations.store(0, Ordering::Relaxed);
543        self.counters.put_operations.store(0, Ordering::Relaxed);
544        self.counters.remove_operations.store(0, Ordering::Relaxed);
545        self.counters
546            .eviction_operations
547            .store(0, Ordering::Relaxed);
548        self.counters
549            .total_get_latency_us
550            .store(0, Ordering::Relaxed);
551        self.counters
552            .total_put_latency_us
553            .store(0, Ordering::Relaxed);
554        self.counters
555            .total_remove_latency_us
556            .store(0, Ordering::Relaxed);
557        self.counters
558            .total_eviction_latency_us
559            .store(0, Ordering::Relaxed);
560        self.counters.hit_count.store(0, Ordering::Relaxed);
561        self.counters.miss_count.store(0, Ordering::Relaxed);
562        self.counters
563            .total_bytes_allocated
564            .store(0, Ordering::Relaxed);
565        self.counters.peak_memory_usage.store(0, Ordering::Relaxed);
566        self.counters
567            .current_memory_usage
568            .store(0, Ordering::Relaxed);
569        self.counters.disk_bytes_read.store(0, Ordering::Relaxed);
570        self.counters.disk_bytes_written.store(0, Ordering::Relaxed);
571
572        // Clear logs and histograms
573        {
574            let mut log = self.event_log.write().unwrap();
575            log.clear();
576        }
577
578        {
579            let mut histogram = self.latency_histogram.write().unwrap();
580            histogram.clear();
581        }
582    }
583}