Skip to main content

rez_next_cache/
intelligent_manager.rs

1//! Intelligent Cache Manager
2//!
3//! This module provides the IntelligentCacheManager, which coordinates
4//! multi-level caching, predictive preheating, and adaptive tuning.
5
6use crate::{
7    AdaptiveTuner, CacheError, PerformanceMonitor, PredictivePreheater, UnifiedCache,
8    UnifiedCacheConfig, UnifiedCacheStats,
9};
10use dashmap::DashMap;
11use tracing::warn;
12
13use async_trait::async_trait;
14use std::{
15    collections::HashMap,
16    hash::Hash,
17    sync::{Arc, RwLock},
18    time::{Duration, Instant, SystemTime},
19};
20use tokio::sync::RwLock as AsyncRwLock;
21
22/// Multi-level cache entry with metadata
23#[derive(Debug, Clone)]
24pub struct MultiLevelCacheEntry<V> {
25    /// The cached value
26    pub value: V,
27    /// Creation timestamp
28    pub created_at: SystemTime,
29    /// Last access timestamp
30    pub last_accessed: SystemTime,
31    /// Access count
32    pub access_count: u64,
33    /// Cache level (1 for L1, 2 for L2)
34    pub level: u8,
35    /// Size in bytes (estimated)
36    pub size_bytes: u64,
37    /// Time to live (in seconds)
38    pub ttl: u64,
39    /// Prediction score for preheating
40    pub prediction_score: f64,
41}
42
43impl<V> MultiLevelCacheEntry<V> {
44    /// Create a new cache entry
45    pub fn new(value: V, ttl: u64, level: u8, size_bytes: u64) -> Self {
46        let now = SystemTime::now();
47        Self {
48            value,
49            created_at: now,
50            last_accessed: now,
51            access_count: 1,
52            level,
53            size_bytes,
54            ttl,
55            prediction_score: 0.0,
56        }
57    }
58
59    /// Check if the entry is still valid
60    pub fn is_valid(&self) -> bool {
61        if self.ttl == 0 {
62            return true; // No expiration
63        }
64
65        let elapsed = self
66            .created_at
67            .elapsed()
68            .unwrap_or(Duration::from_secs(u64::MAX))
69            .as_secs();
70        elapsed < self.ttl
71    }
72
73    /// Mark the entry as accessed
74    pub fn mark_accessed(&mut self) {
75        self.last_accessed = SystemTime::now();
76        self.access_count += 1;
77    }
78
79    /// Calculate entry priority for eviction
80    pub fn calculate_priority(&self) -> f64 {
81        let age_factor = self
82            .last_accessed
83            .elapsed()
84            .unwrap_or(Duration::from_secs(0))
85            .as_secs() as f64;
86        let frequency_factor = self.access_count as f64;
87        let size_factor = 1.0 / (self.size_bytes as f64 + 1.0);
88
89        // Higher score = higher priority to keep
90        (frequency_factor * size_factor) / (age_factor + 1.0)
91    }
92}
93
94/// Intelligent Cache Manager
95///
96/// Coordinates multi-level caching with predictive preheating and adaptive tuning.
97/// Provides unified interface for all cache operations while optimizing performance.
98#[derive(Debug)]
99pub struct IntelligentCacheManager<K, V>
100where
101    K: Clone + Hash + Eq + Send + Sync + std::fmt::Debug + 'static,
102    V: Clone + Send + Sync + std::fmt::Debug + 'static,
103{
104    /// Configuration
105    config: UnifiedCacheConfig,
106    /// L1 cache (memory) - high-speed concurrent access
107    l1_cache: Arc<DashMap<K, MultiLevelCacheEntry<V>>>,
108    /// L2 cache (disk/persistent) - larger capacity
109    l2_cache: Arc<AsyncRwLock<HashMap<K, MultiLevelCacheEntry<V>>>>,
110    /// Predictive preheater
111    preheater: Arc<PredictivePreheater<K>>,
112    /// Adaptive tuner
113    tuner: Arc<AdaptiveTuner>,
114    /// Performance monitor
115    monitor: Arc<PerformanceMonitor>,
116    /// Cache statistics
117    stats: Arc<RwLock<UnifiedCacheStats>>,
118    /// Access pattern tracking
119    access_patterns: Arc<RwLock<HashMap<K, Vec<SystemTime>>>>,
120}
121
122impl<K, V> IntelligentCacheManager<K, V>
123where
124    K: Clone + Hash + Eq + Send + Sync + std::fmt::Debug + 'static,
125    V: Clone + Send + Sync + std::fmt::Debug + 'static,
126{
127    /// Create a new intelligent cache manager
128    pub fn new(config: UnifiedCacheConfig) -> Self {
129        let preheater = Arc::new(PredictivePreheater::new(config.preheating_config.clone()));
130        let tuner = Arc::new(AdaptiveTuner::new(config.tuning_config.clone()));
131        let monitor = Arc::new(PerformanceMonitor::new(config.monitoring_config.clone()));
132
133        Self {
134            config,
135            l1_cache: Arc::new(DashMap::new()),
136            l2_cache: Arc::new(AsyncRwLock::new(HashMap::new())),
137            preheater,
138            tuner,
139            monitor,
140            stats: Arc::new(RwLock::new(UnifiedCacheStats::default())),
141            access_patterns: Arc::new(RwLock::new(HashMap::new())),
142        }
143    }
144
145    /// Get cache configuration
146    pub fn config(&self) -> &UnifiedCacheConfig {
147        &self.config
148    }
149
150    /// Get predictive preheater
151    pub fn preheater(&self) -> Arc<PredictivePreheater<K>> {
152        Arc::clone(&self.preheater)
153    }
154
155    /// Get adaptive tuner
156    pub fn tuner(&self) -> Arc<AdaptiveTuner> {
157        Arc::clone(&self.tuner)
158    }
159
160    /// Get performance monitor
161    pub fn monitor(&self) -> Arc<PerformanceMonitor> {
162        Arc::clone(&self.monitor)
163    }
164
165    /// Record access pattern for predictive preheating
166    async fn record_access_pattern(&self, key: &K) {
167        if !self.config.preheating_config.enable_pattern_learning {
168            return;
169        }
170
171        let mut patterns = self.access_patterns.write().unwrap();
172        let now = SystemTime::now();
173
174        patterns.entry(key.clone()).or_default().push(now);
175
176        // Keep only recent accesses for pattern analysis
177        let cutoff =
178            now - Duration::from_secs(self.config.preheating_config.pattern_window_seconds);
179        if let Some(times) = patterns.get_mut(key) {
180            times.retain(|&time| time > cutoff);
181        }
182    }
183
184    /// Promote data from L2 to L1 cache
185    async fn promote_to_l1(
186        &self,
187        key: K,
188        mut entry: MultiLevelCacheEntry<V>,
189    ) -> Result<(), CacheError> {
190        // Check L1 capacity
191        if self.l1_cache.len() >= self.config.l1_config.max_entries {
192            self.evict_l1_entries().await?;
193        }
194
195        // Update entry metadata for L1
196        entry.level = 1;
197        entry.mark_accessed();
198
199        // Insert into L1
200        self.l1_cache.insert(key.clone(), entry);
201
202        // Remove from L2
203        let mut l2_cache = self.l2_cache.write().await;
204        l2_cache.remove(&key);
205
206        // Update statistics
207        {
208            let mut stats = self.stats.write().unwrap();
209            stats.overall_stats.promotions += 1;
210        }
211
212        Ok(())
213    }
214
215    /// Demote data from L1 to L2 cache
216    async fn demote_to_l2(
217        &self,
218        key: K,
219        mut entry: MultiLevelCacheEntry<V>,
220    ) -> Result<(), CacheError> {
221        // Check L2 capacity
222        {
223            let l2_cache = self.l2_cache.read().await;
224            if l2_cache.len() >= self.config.l2_config.max_entries {
225                drop(l2_cache);
226                self.evict_l2_entries().await?;
227            }
228        }
229
230        // Update entry metadata for L2
231        entry.level = 2;
232
233        // Insert into L2
234        {
235            let mut l2_cache = self.l2_cache.write().await;
236            l2_cache.insert(key.clone(), entry);
237        }
238
239        // Remove from L1
240        self.l1_cache.remove(&key);
241
242        // Update statistics
243        {
244            let mut stats = self.stats.write().unwrap();
245            stats.overall_stats.demotions += 1;
246        }
247
248        Ok(())
249    }
250
251    /// Evict entries from L1 cache
252    async fn evict_l1_entries(&self) -> Result<(), CacheError> {
253        let eviction_count = (self.l1_cache.len() as f64 * 0.1).max(1.0) as usize;
254
255        // Collect entries with their priorities
256        let mut entries: Vec<(K, f64)> = self
257            .l1_cache
258            .iter()
259            .map(|entry| {
260                let priority = entry.value().calculate_priority();
261                (entry.key().clone(), priority)
262            })
263            .collect();
264
265        // Sort by priority (lowest first for eviction)
266        entries.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
267
268        // Evict lowest priority entries
269        for (key, _) in entries.into_iter().take(eviction_count) {
270            if let Some((_, entry)) = self.l1_cache.remove(&key) {
271                // Try to demote to L2 if valuable enough
272                if entry.access_count > 1 {
273                    self.demote_to_l2(key, entry).await?;
274                }
275            }
276        }
277
278        Ok(())
279    }
280
281    /// Evict entries from L2 cache
282    async fn evict_l2_entries(&self) -> Result<(), CacheError> {
283        let mut l2_cache = self.l2_cache.write().await;
284        let eviction_count = (l2_cache.len() as f64 * 0.1).max(1.0) as usize;
285
286        // Collect entries with their priorities
287        let mut entries: Vec<(K, f64)> = l2_cache
288            .iter()
289            .map(|(key, entry)| {
290                let priority = entry.calculate_priority();
291                (key.clone(), priority)
292            })
293            .collect();
294
295        // Sort by priority (lowest first for eviction)
296        entries.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
297
298        // Remove lowest priority entries
299        for (key, _) in entries.into_iter().take(eviction_count) {
300            l2_cache.remove(&key);
301        }
302
303        Ok(())
304    }
305
306    /// Update cache statistics
307    async fn update_statistics(&self) {
308        // Collect L1 statistics
309        let l1_entries = self.l1_cache.len();
310        let l1_usage_bytes = self
311            .l1_cache
312            .iter()
313            .map(|entry| entry.value().size_bytes)
314            .sum();
315
316        // Collect L2 statistics
317        let (l2_entries, l2_usage_bytes) = {
318            let l2_cache = self.l2_cache.read().await;
319            let entries = l2_cache.len();
320            let usage_bytes = l2_cache.values().map(|entry| entry.size_bytes).sum();
321            (entries, usage_bytes)
322        };
323
324        // Update statistics in a separate scope
325        {
326            let mut stats = self.stats.write().unwrap();
327
328            // Update L1 statistics
329            stats.l1_stats.entries = l1_entries;
330            stats.l1_stats.usage_bytes = l1_usage_bytes;
331
332            // Update L2 statistics
333            stats.l2_stats.entries = l2_entries;
334            stats.l2_stats.usage_bytes = l2_usage_bytes;
335
336            // Update overall statistics
337            stats.update_overall_stats();
338        }
339    }
340}
341
342#[async_trait]
343impl<K, V> UnifiedCache<K, V> for IntelligentCacheManager<K, V>
344where
345    K: Clone + Hash + Eq + Send + Sync + std::fmt::Debug + 'static,
346    V: Clone + Send + Sync + std::fmt::Debug + 'static,
347{
348    /// Get a value from the cache
349    async fn get(&self, key: &K) -> Option<V> {
350        let start_time = Instant::now();
351
352        // Record access pattern
353        self.record_access_pattern(key).await;
354
355        // Try L1 cache first
356        if let Some(mut entry) = self.l1_cache.get_mut(key) {
357            if entry.is_valid() {
358                entry.mark_accessed();
359
360                // Update statistics
361                {
362                    let mut stats = self.stats.write().unwrap();
363                    stats.l1_stats.hits += 1;
364                }
365
366                // Record performance metrics
367                self.monitor.record_get_latency(start_time.elapsed()).await;
368
369                return Some(entry.value.clone());
370            } else {
371                // Remove expired entry
372                drop(entry);
373                self.l1_cache.remove(key);
374            }
375        }
376
377        // Try L2 cache
378        {
379            let mut l2_cache = self.l2_cache.write().await;
380            if let Some(entry) = l2_cache.get_mut(key) {
381                if entry.is_valid() {
382                    entry.mark_accessed();
383                    let value = entry.value.clone();
384
385                    // Promote to L1 if frequently accessed
386                    if entry.access_count >= self.config.l1_config.promotion_threshold {
387                        let promoted_entry = entry.clone();
388                        l2_cache.remove(key);
389                        drop(l2_cache);
390
391                        if let Err(e) = self.promote_to_l1(key.clone(), promoted_entry).await {
392                            warn!("Failed to promote to L1: {:?}", e);
393                        }
394                    }
395
396                    // Update statistics
397                    {
398                        let mut stats = self.stats.write().unwrap();
399                        stats.l2_stats.hits += 1;
400                    }
401
402                    // Record performance metrics
403                    self.monitor.record_get_latency(start_time.elapsed()).await;
404
405                    return Some(value);
406                } else {
407                    // Remove expired entry
408                    l2_cache.remove(key);
409                }
410            }
411        }
412
413        // Cache miss
414        {
415            let mut stats = self.stats.write().unwrap();
416            stats.l1_stats.misses += 1;
417            stats.l2_stats.misses += 1;
418        }
419
420        // Trigger predictive preheating
421        if self.config.preheating_config.enable_predictive_preheating {
422            self.preheater.predict_and_preheat(key).await;
423        }
424
425        // Record performance metrics
426        self.monitor.record_get_latency(start_time.elapsed()).await;
427
428        None
429    }
430
431    /// Put a value into the cache
432    async fn put(&self, key: K, value: V) -> Result<(), CacheError> {
433        let start_time = Instant::now();
434
435        // Estimate size (simplified)
436        let size_bytes = std::mem::size_of::<V>() as u64;
437
438        // Create cache entry for L1
439        let entry =
440            MultiLevelCacheEntry::new(value, self.config.l1_config.default_ttl, 1, size_bytes);
441
442        // Check L1 capacity and evict if necessary
443        if self.l1_cache.len() >= self.config.l1_config.max_entries {
444            self.evict_l1_entries().await?;
445        }
446
447        // Insert into L1 cache
448        self.l1_cache.insert(key.clone(), entry);
449
450        // Update statistics
451        self.update_statistics().await;
452
453        // Record performance metrics
454        self.monitor.record_put_latency(start_time.elapsed()).await;
455
456        // Trigger adaptive tuning
457        if self.config.tuning_config.enable_adaptive_tuning {
458            self.tuner.analyze_and_tune().await;
459        }
460
461        Ok(())
462    }
463
464    /// Remove a value from the cache
465    async fn remove(&self, key: &K) -> bool {
466        let l1_removed = self.l1_cache.remove(key).is_some();
467
468        let l2_removed = {
469            let mut l2_cache = self.l2_cache.write().await;
470            l2_cache.remove(key).is_some()
471        };
472
473        // Update statistics if removed
474        if l1_removed || l2_removed {
475            self.update_statistics().await;
476        }
477
478        l1_removed || l2_removed
479    }
480
481    /// Check if a key exists in the cache
482    async fn contains_key(&self, key: &K) -> bool {
483        // Check L1 first
484        if let Some(entry) = self.l1_cache.get(key) {
485            if entry.is_valid() {
486                return true;
487            }
488        }
489
490        // Check L2
491        let l2_cache = self.l2_cache.read().await;
492        if let Some(entry) = l2_cache.get(key) {
493            return entry.is_valid();
494        }
495
496        false
497    }
498
499    /// Get cache statistics
500    async fn get_stats(&self) -> UnifiedCacheStats {
501        self.update_statistics().await;
502        self.stats.read().unwrap().clone()
503    }
504
505    /// Clear all cache entries
506    async fn clear(&self) -> Result<(), CacheError> {
507        self.l1_cache.clear();
508
509        {
510            let mut l2_cache = self.l2_cache.write().await;
511            l2_cache.clear();
512        }
513
514        // Reset statistics
515        {
516            let mut stats = self.stats.write().unwrap();
517            *stats = UnifiedCacheStats::default();
518        }
519
520        Ok(())
521    }
522
523    /// Get cache size (total entries across all levels)
524    async fn size(&self) -> usize {
525        let l1_size = self.l1_cache.len();
526        let l2_size = {
527            let l2_cache = self.l2_cache.read().await;
528            l2_cache.len()
529        };
530        l1_size + l2_size
531    }
532
533    /// Check if cache is empty
534    async fn is_empty(&self) -> bool {
535        self.size().await == 0
536    }
537
538    /// Get cache capacity (total across all levels)
539    async fn capacity(&self) -> usize {
540        self.config.l1_config.max_entries + self.config.l2_config.max_entries
541    }
542
543    /// Get cache type identifier
544    fn cache_type(&self) -> &'static str {
545        "IntelligentCacheManager"
546    }
547}