rez-next-cache 0.3.0

Intelligent caching system for rez-next with multi-level cache and predictive preheating
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
//! Performance Monitor
//!
//! This module provides comprehensive performance monitoring and benchmarking
//! for the intelligent cache system.

use crate::{MonitoringConfig, PerformanceMetrics};
use serde::{Deserialize, Serialize};
use std::{
    collections::{HashMap, VecDeque},
    sync::{
        atomic::{AtomicU64, Ordering},
        Arc, RwLock,
    },
    time::{Duration, Instant, SystemTime},
};
use tokio::time::Interval;

/// Real-time performance counters
#[derive(Debug)]
pub struct PerformanceCounters {
    /// Total get operations
    pub get_operations: AtomicU64,
    /// Total put operations
    pub put_operations: AtomicU64,
    /// Total remove operations
    pub remove_operations: AtomicU64,
    /// Total eviction operations
    pub eviction_operations: AtomicU64,
    /// Total get latency (microseconds)
    pub total_get_latency_us: AtomicU64,
    /// Total put latency (microseconds)
    pub total_put_latency_us: AtomicU64,
    /// Total remove latency (microseconds)
    pub total_remove_latency_us: AtomicU64,
    /// Total eviction latency (microseconds)
    pub total_eviction_latency_us: AtomicU64,
    /// Cache hit count
    pub hit_count: AtomicU64,
    /// Cache miss count
    pub miss_count: AtomicU64,
    /// Total bytes allocated (cumulative)
    pub total_bytes_allocated: AtomicU64,
    /// Peak memory usage
    pub peak_memory_usage: AtomicU64,
    /// Current memory usage
    pub current_memory_usage: AtomicU64,
    /// Total bytes read from disk
    pub disk_bytes_read: AtomicU64,
    /// Total bytes written to disk
    pub disk_bytes_written: AtomicU64,
}

impl Default for PerformanceCounters {
    fn default() -> Self {
        Self {
            get_operations: AtomicU64::new(0),
            put_operations: AtomicU64::new(0),
            remove_operations: AtomicU64::new(0),
            eviction_operations: AtomicU64::new(0),
            total_get_latency_us: AtomicU64::new(0),
            total_put_latency_us: AtomicU64::new(0),
            total_remove_latency_us: AtomicU64::new(0),
            total_eviction_latency_us: AtomicU64::new(0),
            hit_count: AtomicU64::new(0),
            miss_count: AtomicU64::new(0),
            total_bytes_allocated: AtomicU64::new(0),
            peak_memory_usage: AtomicU64::new(0),
            current_memory_usage: AtomicU64::new(0),
            disk_bytes_read: AtomicU64::new(0),
            disk_bytes_written: AtomicU64::new(0),
        }
    }
}

/// Performance event for detailed logging
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceEvent {
    /// Event timestamp
    pub timestamp: SystemTime,
    /// Event type
    pub event_type: PerformanceEventType,
    /// Operation latency (microseconds)
    pub latency_us: u64,
    /// Memory usage at time of event
    pub memory_usage: u64,
    /// Additional metadata
    pub metadata: HashMap<String, String>,
}

/// Types of performance events
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PerformanceEventType {
    /// Cache get operation
    CacheGet,
    /// Cache put operation
    CachePut,
    /// Cache remove operation
    CacheRemove,
    /// Cache eviction
    CacheEviction,
    /// Cache promotion (L2 to L1)
    CachePromotion,
    /// Cache demotion (L1 to L2)
    CacheDemotion,
    /// Predictive preheating
    PredictivePreheating,
    /// Adaptive tuning
    AdaptiveTuning,
}

/// Benchmark result
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BenchmarkResult {
    /// Benchmark name
    pub name: String,
    /// Operations per second
    pub ops_per_second: f64,
    /// Average latency (microseconds)
    pub avg_latency_us: f64,
    /// 95th percentile latency (microseconds)
    pub p95_latency_us: f64,
    /// 99th percentile latency (microseconds)
    pub p99_latency_us: f64,
    /// Memory usage (bytes)
    pub memory_usage: u64,
    /// Hit rate achieved
    pub hit_rate: f64,
    /// Test duration
    pub duration: Duration,
    /// Timestamp
    pub timestamp: SystemTime,
}

/// Performance Monitor
///
/// Provides comprehensive monitoring, metrics collection, and benchmarking
/// for the intelligent cache system.
#[derive(Debug)]
pub struct PerformanceMonitor {
    /// Configuration
    config: MonitoringConfig,
    /// Real-time performance counters
    counters: Arc<PerformanceCounters>,
    /// Performance event log
    event_log: Arc<RwLock<VecDeque<PerformanceEvent>>>,
    /// Latency histogram buckets (microseconds)
    latency_histogram: Arc<RwLock<HashMap<u64, u64>>>,
    /// Benchmark results history
    benchmark_history: Arc<RwLock<Vec<BenchmarkResult>>>,
    /// Monitoring start time
    start_time: Instant,
    /// Background monitoring interval
    _monitoring_interval: Option<Interval>,
}

impl PerformanceMonitor {
    /// Create a new performance monitor
    pub fn new(config: MonitoringConfig) -> Self {
        Self {
            config,
            counters: Arc::new(PerformanceCounters::default()),
            event_log: Arc::new(RwLock::new(VecDeque::new())),
            latency_histogram: Arc::new(RwLock::new(HashMap::new())),
            benchmark_history: Arc::new(RwLock::new(Vec::new())),
            start_time: Instant::now(),
            _monitoring_interval: None,
        }
    }

    /// Record a get operation latency
    pub async fn record_get_latency(&self, latency: Duration) {
        let latency_us = latency.as_micros() as u64;

        self.counters.get_operations.fetch_add(1, Ordering::Relaxed);
        self.counters
            .total_get_latency_us
            .fetch_add(latency_us, Ordering::Relaxed);

        self.update_latency_histogram(latency_us).await;

        if self.config.enable_event_logging {
            self.log_event(PerformanceEvent {
                timestamp: SystemTime::now(),
                event_type: PerformanceEventType::CacheGet,
                latency_us,
                memory_usage: self.counters.current_memory_usage.load(Ordering::Relaxed),
                metadata: HashMap::new(),
            })
            .await;
        }
    }

    /// Record a put operation latency
    pub async fn record_put_latency(&self, latency: Duration) {
        let latency_us = latency.as_micros() as u64;

        self.counters.put_operations.fetch_add(1, Ordering::Relaxed);
        self.counters
            .total_put_latency_us
            .fetch_add(latency_us, Ordering::Relaxed);

        self.update_latency_histogram(latency_us).await;

        if self.config.enable_event_logging {
            self.log_event(PerformanceEvent {
                timestamp: SystemTime::now(),
                event_type: PerformanceEventType::CachePut,
                latency_us,
                memory_usage: self.counters.current_memory_usage.load(Ordering::Relaxed),
                metadata: HashMap::new(),
            })
            .await;
        }
    }

    /// Record a remove operation latency
    pub async fn record_remove_latency(&self, latency: Duration) {
        let latency_us = latency.as_micros() as u64;

        self.counters
            .remove_operations
            .fetch_add(1, Ordering::Relaxed);
        self.counters
            .total_remove_latency_us
            .fetch_add(latency_us, Ordering::Relaxed);

        self.update_latency_histogram(latency_us).await;

        if self.config.enable_event_logging {
            self.log_event(PerformanceEvent {
                timestamp: SystemTime::now(),
                event_type: PerformanceEventType::CacheRemove,
                latency_us,
                memory_usage: self.counters.current_memory_usage.load(Ordering::Relaxed),
                metadata: HashMap::new(),
            })
            .await;
        }
    }

    /// Update memory usage
    pub async fn update_memory_usage(&self, current_usage: u64) {
        self.counters
            .current_memory_usage
            .store(current_usage, Ordering::Relaxed);

        // Update peak if necessary
        let current_peak = self.counters.peak_memory_usage.load(Ordering::Relaxed);
        if current_usage > current_peak {
            self.counters
                .peak_memory_usage
                .store(current_usage, Ordering::Relaxed);
        }
    }

    /// Record disk I/O
    pub async fn record_disk_io(&self, bytes_read: u64, bytes_written: u64) {
        self.counters
            .disk_bytes_read
            .fetch_add(bytes_read, Ordering::Relaxed);
        self.counters
            .disk_bytes_written
            .fetch_add(bytes_written, Ordering::Relaxed);
    }

    /// Record an eviction operation latency
    pub async fn record_eviction_latency(&self, latency: Duration) {
        let latency_us = latency.as_micros() as u64;
        self.counters
            .eviction_operations
            .fetch_add(1, Ordering::Relaxed);
        self.counters
            .total_eviction_latency_us
            .fetch_add(latency_us, Ordering::Relaxed);

        self.update_latency_histogram(latency_us).await;

        if self.config.enable_event_logging {
            self.log_event(PerformanceEvent {
                timestamp: SystemTime::now(),
                event_type: PerformanceEventType::CacheEviction,
                latency_us,
                memory_usage: self.counters.current_memory_usage.load(Ordering::Relaxed),
                metadata: HashMap::new(),
            })
            .await;
        }
    }

    /// Record a cache hit
    pub fn record_cache_hit(&self) {
        self.counters.hit_count.fetch_add(1, Ordering::Relaxed);
    }

    /// Record a cache miss
    pub fn record_cache_miss(&self) {
        self.counters.miss_count.fetch_add(1, Ordering::Relaxed);
    }

    /// Record memory allocation
    pub fn record_allocation(&self, bytes: u64) {
        self.counters
            .total_bytes_allocated
            .fetch_add(bytes, Ordering::Relaxed);
    }

    /// Get current cache hit rate (0.0–1.0)
    pub fn hit_rate(&self) -> f64 {
        let hits = self.counters.hit_count.load(Ordering::Relaxed);
        let misses = self.counters.miss_count.load(Ordering::Relaxed);
        let total = hits + misses;
        if total > 0 {
            hits as f64 / total as f64
        } else {
            0.0
        }
    }

    /// Log a performance event
    async fn log_event(&self, event: PerformanceEvent) {
        if !self.config.enable_event_logging {
            return;
        }

        let mut log = self.event_log.write().unwrap();
        log.push_back(event);

        // Keep log size manageable
        while log.len() > self.config.max_events_in_memory {
            log.pop_front();
        }
    }

    /// Update latency histogram
    async fn update_latency_histogram(&self, latency_us: u64) {
        // Round to nearest bucket (powers of 2)
        let bucket = if latency_us == 0 {
            0
        } else {
            1u64 << (64 - latency_us.leading_zeros())
        };

        let mut histogram = self.latency_histogram.write().unwrap();
        *histogram.entry(bucket).or_insert(0) += 1;
    }

    /// Get current performance metrics
    pub async fn get_performance_metrics(&self) -> PerformanceMetrics {
        let get_ops = self.counters.get_operations.load(Ordering::Relaxed);
        let put_ops = self.counters.put_operations.load(Ordering::Relaxed);
        let remove_ops = self.counters.remove_operations.load(Ordering::Relaxed);
        let eviction_ops = self.counters.eviction_operations.load(Ordering::Relaxed);

        let total_get_latency = self.counters.total_get_latency_us.load(Ordering::Relaxed);
        let total_put_latency = self.counters.total_put_latency_us.load(Ordering::Relaxed);
        let total_eviction_latency = self
            .counters
            .total_eviction_latency_us
            .load(Ordering::Relaxed);

        let elapsed_secs = self.start_time.elapsed().as_secs_f64();
        let total_ops = get_ops + put_ops + remove_ops;

        // Memory allocation rate: bytes allocated per second
        let total_allocated = self.counters.total_bytes_allocated.load(Ordering::Relaxed);
        let memory_allocation_rate = if elapsed_secs > 0.0 {
            total_allocated as f64 / elapsed_secs
        } else {
            0.0
        };

        PerformanceMetrics {
            avg_get_latency_us: if get_ops > 0 {
                total_get_latency as f64 / get_ops as f64
            } else {
                0.0
            },
            avg_put_latency_us: if put_ops > 0 {
                total_put_latency as f64 / put_ops as f64
            } else {
                0.0
            },
            avg_eviction_latency_us: if eviction_ops > 0 {
                total_eviction_latency as f64 / eviction_ops as f64
            } else {
                0.0
            },
            ops_per_second: if elapsed_secs > 0.0 {
                total_ops as f64 / elapsed_secs
            } else {
                0.0
            },
            memory_allocation_rate,
            disk_io_rate: if elapsed_secs > 0.0 {
                (self.counters.disk_bytes_read.load(Ordering::Relaxed)
                    + self.counters.disk_bytes_written.load(Ordering::Relaxed))
                    as f64
                    / elapsed_secs
            } else {
                0.0
            },
            // CPU usage: not reliably available without an OS-specific crate.
            // We approximate using elapsed time vs. total operation time.
            cpu_usage_percent: {
                let total_op_latency_us = total_get_latency
                    + total_put_latency
                    + total_eviction_latency
                    + self
                        .counters
                        .total_remove_latency_us
                        .load(Ordering::Relaxed);
                let elapsed_us = self.start_time.elapsed().as_micros() as f64;
                if elapsed_us > 0.0 {
                    (total_op_latency_us as f64 / elapsed_us * 100.0).min(100.0)
                } else {
                    0.0
                }
            },
            peak_memory_usage: self.counters.peak_memory_usage.load(Ordering::Relaxed),
        }
    }

    /// Run a benchmark
    pub async fn run_benchmark<F, Fut>(&self, name: &str, benchmark_fn: F) -> BenchmarkResult
    where
        F: Fn() -> Fut,
        Fut: std::future::Future<Output = ()>,
    {
        let start_time = Instant::now();
        let start_ops = self.counters.get_operations.load(Ordering::Relaxed)
            + self.counters.put_operations.load(Ordering::Relaxed);

        // Reset latency histogram for this benchmark
        {
            let mut histogram = self.latency_histogram.write().unwrap();
            histogram.clear();
        }

        // Run the benchmark
        benchmark_fn().await;

        let duration = start_time.elapsed();
        let end_ops = self.counters.get_operations.load(Ordering::Relaxed)
            + self.counters.put_operations.load(Ordering::Relaxed);

        let ops_performed = end_ops - start_ops;
        let ops_per_second = if duration.as_secs_f64() > 0.0 {
            ops_performed as f64 / duration.as_secs_f64()
        } else {
            0.0
        };

        // Calculate latency percentiles
        let (avg_latency, p95_latency, p99_latency) = self.calculate_latency_percentiles().await;

        let result = BenchmarkResult {
            name: name.to_string(),
            ops_per_second,
            avg_latency_us: avg_latency,
            p95_latency_us: p95_latency,
            p99_latency_us: p99_latency,
            memory_usage: self.counters.current_memory_usage.load(Ordering::Relaxed),
            hit_rate: self.hit_rate(),
            duration,
            timestamp: SystemTime::now(),
        };

        // Store benchmark result
        {
            let mut history = self.benchmark_history.write().unwrap();
            history.push(result.clone());
        }

        result
    }

    /// Calculate latency percentiles from histogram
    async fn calculate_latency_percentiles(&self) -> (f64, f64, f64) {
        let histogram = self.latency_histogram.read().unwrap();

        if histogram.is_empty() {
            return (0.0, 0.0, 0.0);
        }

        // Convert histogram to sorted vector
        let mut latencies: Vec<(u64, u64)> = histogram
            .iter()
            .map(|(&latency, &count)| (latency, count))
            .collect();
        latencies.sort_by_key(|&(latency, _)| latency);

        let total_samples: u64 = latencies.iter().map(|(_, count)| count).sum();
        if total_samples == 0 {
            return (0.0, 0.0, 0.0);
        }

        // Calculate weighted average
        let weighted_sum: u64 = latencies
            .iter()
            .map(|(latency, count)| latency * count)
            .sum();
        let avg_latency = weighted_sum as f64 / total_samples as f64;

        // Calculate percentiles
        let p95_target = (total_samples as f64 * 0.95) as u64;
        let p99_target = (total_samples as f64 * 0.99) as u64;

        let mut cumulative = 0u64;
        let mut p95_latency = 0.0;
        let mut p99_latency = 0.0;

        for &(latency, count) in &latencies {
            cumulative += count;

            if p95_latency == 0.0 && cumulative >= p95_target {
                p95_latency = latency as f64;
            }

            if p99_latency == 0.0 && cumulative >= p99_target {
                p99_latency = latency as f64;
                break;
            }
        }

        (avg_latency, p95_latency, p99_latency)
    }

    /// Get recent performance events
    pub async fn get_recent_events(&self, limit: usize) -> Vec<PerformanceEvent> {
        let log = self.event_log.read().unwrap();
        log.iter().rev().take(limit).cloned().collect()
    }

    /// Get benchmark history
    pub async fn get_benchmark_history(&self) -> Vec<BenchmarkResult> {
        self.benchmark_history.read().unwrap().clone()
    }

    /// Reset all counters and statistics
    pub async fn reset(&self) {
        // Reset counters
        self.counters.get_operations.store(0, Ordering::Relaxed);
        self.counters.put_operations.store(0, Ordering::Relaxed);
        self.counters.remove_operations.store(0, Ordering::Relaxed);
        self.counters
            .eviction_operations
            .store(0, Ordering::Relaxed);
        self.counters
            .total_get_latency_us
            .store(0, Ordering::Relaxed);
        self.counters
            .total_put_latency_us
            .store(0, Ordering::Relaxed);
        self.counters
            .total_remove_latency_us
            .store(0, Ordering::Relaxed);
        self.counters
            .total_eviction_latency_us
            .store(0, Ordering::Relaxed);
        self.counters.hit_count.store(0, Ordering::Relaxed);
        self.counters.miss_count.store(0, Ordering::Relaxed);
        self.counters
            .total_bytes_allocated
            .store(0, Ordering::Relaxed);
        self.counters.peak_memory_usage.store(0, Ordering::Relaxed);
        self.counters
            .current_memory_usage
            .store(0, Ordering::Relaxed);
        self.counters.disk_bytes_read.store(0, Ordering::Relaxed);
        self.counters.disk_bytes_written.store(0, Ordering::Relaxed);

        // Clear logs and histograms
        {
            let mut log = self.event_log.write().unwrap();
            log.clear();
        }

        {
            let mut histogram = self.latency_histogram.write().unwrap();
            histogram.clear();
        }
    }
}