rez_next_cache/
intelligent_manager.rs

1//! Intelligent Cache Manager
2//!
3//! This module provides the IntelligentCacheManager, which coordinates
4//! multi-level caching, predictive preheating, and adaptive tuning.
5
6use crate::{
7    AdaptiveTuner, CacheError, PerformanceMonitor, PredictivePreheater, UnifiedCache,
8    UnifiedCacheConfig, UnifiedCacheStats,
9};
10use dashmap::DashMap;
11
12use async_trait::async_trait;
13use std::{
14    collections::HashMap,
15    hash::Hash,
16    sync::{Arc, RwLock},
17    time::{Duration, Instant, SystemTime},
18};
19use tokio::sync::RwLock as AsyncRwLock;
20
21/// Multi-level cache entry with metadata
22#[derive(Debug, Clone)]
23pub struct MultiLevelCacheEntry<V> {
24    /// The cached value
25    pub value: V,
26    /// Creation timestamp
27    pub created_at: SystemTime,
28    /// Last access timestamp
29    pub last_accessed: SystemTime,
30    /// Access count
31    pub access_count: u64,
32    /// Cache level (1 for L1, 2 for L2)
33    pub level: u8,
34    /// Size in bytes (estimated)
35    pub size_bytes: u64,
36    /// Time to live (in seconds)
37    pub ttl: u64,
38    /// Prediction score for preheating
39    pub prediction_score: f64,
40}
41
42impl<V> MultiLevelCacheEntry<V> {
43    /// Create a new cache entry
44    pub fn new(value: V, ttl: u64, level: u8, size_bytes: u64) -> Self {
45        let now = SystemTime::now();
46        Self {
47            value,
48            created_at: now,
49            last_accessed: now,
50            access_count: 1,
51            level,
52            size_bytes,
53            ttl,
54            prediction_score: 0.0,
55        }
56    }
57
58    /// Check if the entry is still valid
59    pub fn is_valid(&self) -> bool {
60        if self.ttl == 0 {
61            return true; // No expiration
62        }
63
64        let elapsed = self
65            .created_at
66            .elapsed()
67            .unwrap_or(Duration::from_secs(u64::MAX))
68            .as_secs();
69        elapsed < self.ttl
70    }
71
72    /// Mark the entry as accessed
73    pub fn mark_accessed(&mut self) {
74        self.last_accessed = SystemTime::now();
75        self.access_count += 1;
76    }
77
78    /// Calculate entry priority for eviction
79    pub fn calculate_priority(&self) -> f64 {
80        let age_factor = self
81            .last_accessed
82            .elapsed()
83            .unwrap_or(Duration::from_secs(0))
84            .as_secs() as f64;
85        let frequency_factor = self.access_count as f64;
86        let size_factor = 1.0 / (self.size_bytes as f64 + 1.0);
87
88        // Higher score = higher priority to keep
89        (frequency_factor * size_factor) / (age_factor + 1.0)
90    }
91}
92
93/// Intelligent Cache Manager
94///
95/// Coordinates multi-level caching with predictive preheating and adaptive tuning.
96/// Provides unified interface for all cache operations while optimizing performance.
97#[derive(Debug)]
98pub struct IntelligentCacheManager<K, V>
99where
100    K: Clone + Hash + Eq + Send + Sync + std::fmt::Debug + 'static,
101    V: Clone + Send + Sync + std::fmt::Debug + 'static,
102{
103    /// Configuration
104    config: UnifiedCacheConfig,
105    /// L1 cache (memory) - high-speed concurrent access
106    l1_cache: Arc<DashMap<K, MultiLevelCacheEntry<V>>>,
107    /// L2 cache (disk/persistent) - larger capacity
108    l2_cache: Arc<AsyncRwLock<HashMap<K, MultiLevelCacheEntry<V>>>>,
109    /// Predictive preheater
110    preheater: Arc<PredictivePreheater<K>>,
111    /// Adaptive tuner
112    tuner: Arc<AdaptiveTuner>,
113    /// Performance monitor
114    monitor: Arc<PerformanceMonitor>,
115    /// Cache statistics
116    stats: Arc<RwLock<UnifiedCacheStats>>,
117    /// Access pattern tracking
118    access_patterns: Arc<RwLock<HashMap<K, Vec<SystemTime>>>>,
119}
120
121impl<K, V> IntelligentCacheManager<K, V>
122where
123    K: Clone + Hash + Eq + Send + Sync + std::fmt::Debug + 'static,
124    V: Clone + Send + Sync + std::fmt::Debug + 'static,
125{
126    /// Create a new intelligent cache manager
127    pub fn new(config: UnifiedCacheConfig) -> Self {
128        let preheater = Arc::new(PredictivePreheater::new(config.preheating_config.clone()));
129        let tuner = Arc::new(AdaptiveTuner::new(config.tuning_config.clone()));
130        let monitor = Arc::new(PerformanceMonitor::new(config.monitoring_config.clone()));
131
132        Self {
133            config,
134            l1_cache: Arc::new(DashMap::new()),
135            l2_cache: Arc::new(AsyncRwLock::new(HashMap::new())),
136            preheater,
137            tuner,
138            monitor,
139            stats: Arc::new(RwLock::new(UnifiedCacheStats::default())),
140            access_patterns: Arc::new(RwLock::new(HashMap::new())),
141        }
142    }
143
144    /// Get cache configuration
145    pub fn config(&self) -> &UnifiedCacheConfig {
146        &self.config
147    }
148
149    /// Get predictive preheater
150    pub fn preheater(&self) -> Arc<PredictivePreheater<K>> {
151        Arc::clone(&self.preheater)
152    }
153
154    /// Get adaptive tuner
155    pub fn tuner(&self) -> Arc<AdaptiveTuner> {
156        Arc::clone(&self.tuner)
157    }
158
159    /// Get performance monitor
160    pub fn monitor(&self) -> Arc<PerformanceMonitor> {
161        Arc::clone(&self.monitor)
162    }
163
164    /// Record access pattern for predictive preheating
165    async fn record_access_pattern(&self, key: &K) {
166        if !self.config.preheating_config.enable_pattern_learning {
167            return;
168        }
169
170        let mut patterns = self.access_patterns.write().unwrap();
171        let now = SystemTime::now();
172
173        patterns
174            .entry(key.clone())
175            .or_insert_with(Vec::new)
176            .push(now);
177
178        // Keep only recent accesses for pattern analysis
179        let cutoff =
180            now - Duration::from_secs(self.config.preheating_config.pattern_window_seconds);
181        if let Some(times) = patterns.get_mut(key) {
182            times.retain(|&time| time > cutoff);
183        }
184    }
185
186    /// Promote data from L2 to L1 cache
187    async fn promote_to_l1(
188        &self,
189        key: K,
190        mut entry: MultiLevelCacheEntry<V>,
191    ) -> Result<(), CacheError> {
192        // Check L1 capacity
193        if self.l1_cache.len() >= self.config.l1_config.max_entries {
194            self.evict_l1_entries().await?;
195        }
196
197        // Update entry metadata for L1
198        entry.level = 1;
199        entry.mark_accessed();
200
201        // Insert into L1
202        self.l1_cache.insert(key.clone(), entry);
203
204        // Remove from L2
205        let mut l2_cache = self.l2_cache.write().await;
206        l2_cache.remove(&key);
207
208        // Update statistics
209        {
210            let mut stats = self.stats.write().unwrap();
211            stats.overall_stats.promotions += 1;
212        }
213
214        Ok(())
215    }
216
217    /// Demote data from L1 to L2 cache
218    async fn demote_to_l2(
219        &self,
220        key: K,
221        mut entry: MultiLevelCacheEntry<V>,
222    ) -> Result<(), CacheError> {
223        // Check L2 capacity
224        {
225            let l2_cache = self.l2_cache.read().await;
226            if l2_cache.len() >= self.config.l2_config.max_entries {
227                drop(l2_cache);
228                self.evict_l2_entries().await?;
229            }
230        }
231
232        // Update entry metadata for L2
233        entry.level = 2;
234
235        // Insert into L2
236        {
237            let mut l2_cache = self.l2_cache.write().await;
238            l2_cache.insert(key.clone(), entry);
239        }
240
241        // Remove from L1
242        self.l1_cache.remove(&key);
243
244        // Update statistics
245        {
246            let mut stats = self.stats.write().unwrap();
247            stats.overall_stats.demotions += 1;
248        }
249
250        Ok(())
251    }
252
253    /// Evict entries from L1 cache
254    async fn evict_l1_entries(&self) -> Result<(), CacheError> {
255        let eviction_count = (self.l1_cache.len() as f64 * 0.1).max(1.0) as usize;
256
257        // Collect entries with their priorities
258        let mut entries: Vec<(K, f64)> = self
259            .l1_cache
260            .iter()
261            .map(|entry| {
262                let priority = entry.value().calculate_priority();
263                (entry.key().clone(), priority)
264            })
265            .collect();
266
267        // Sort by priority (lowest first for eviction)
268        entries.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
269
270        // Evict lowest priority entries
271        for (key, _) in entries.into_iter().take(eviction_count) {
272            if let Some((_, entry)) = self.l1_cache.remove(&key) {
273                // Try to demote to L2 if valuable enough
274                if entry.access_count > 1 {
275                    self.demote_to_l2(key, entry).await?;
276                }
277            }
278        }
279
280        Ok(())
281    }
282
283    /// Evict entries from L2 cache
284    async fn evict_l2_entries(&self) -> Result<(), CacheError> {
285        let mut l2_cache = self.l2_cache.write().await;
286        let eviction_count = (l2_cache.len() as f64 * 0.1).max(1.0) as usize;
287
288        // Collect entries with their priorities
289        let mut entries: Vec<(K, f64)> = l2_cache
290            .iter()
291            .map(|(key, entry)| {
292                let priority = entry.calculate_priority();
293                (key.clone(), priority)
294            })
295            .collect();
296
297        // Sort by priority (lowest first for eviction)
298        entries.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
299
300        // Remove lowest priority entries
301        for (key, _) in entries.into_iter().take(eviction_count) {
302            l2_cache.remove(&key);
303        }
304
305        Ok(())
306    }
307
308    /// Update cache statistics
309    async fn update_statistics(&self) {
310        // Collect L1 statistics
311        let l1_entries = self.l1_cache.len();
312        let l1_usage_bytes = self
313            .l1_cache
314            .iter()
315            .map(|entry| entry.value().size_bytes)
316            .sum();
317
318        // Collect L2 statistics
319        let (l2_entries, l2_usage_bytes) = {
320            let l2_cache = self.l2_cache.read().await;
321            let entries = l2_cache.len();
322            let usage_bytes = l2_cache.values().map(|entry| entry.size_bytes).sum();
323            (entries, usage_bytes)
324        };
325
326        // Update statistics in a separate scope
327        {
328            let mut stats = self.stats.write().unwrap();
329
330            // Update L1 statistics
331            stats.l1_stats.entries = l1_entries;
332            stats.l1_stats.usage_bytes = l1_usage_bytes;
333
334            // Update L2 statistics
335            stats.l2_stats.entries = l2_entries;
336            stats.l2_stats.usage_bytes = l2_usage_bytes;
337
338            // Update overall statistics
339            stats.update_overall_stats();
340        }
341    }
342}
343
344#[async_trait]
345impl<K, V> UnifiedCache<K, V> for IntelligentCacheManager<K, V>
346where
347    K: Clone + Hash + Eq + Send + Sync + std::fmt::Debug + 'static,
348    V: Clone + Send + Sync + std::fmt::Debug + 'static,
349{
350    /// Get a value from the cache
351    async fn get(&self, key: &K) -> Option<V> {
352        let start_time = Instant::now();
353
354        // Record access pattern
355        self.record_access_pattern(key).await;
356
357        // Try L1 cache first
358        if let Some(mut entry) = self.l1_cache.get_mut(key) {
359            if entry.is_valid() {
360                entry.mark_accessed();
361
362                // Update statistics
363                {
364                    let mut stats = self.stats.write().unwrap();
365                    stats.l1_stats.hits += 1;
366                }
367
368                // Record performance metrics
369                self.monitor.record_get_latency(start_time.elapsed()).await;
370
371                return Some(entry.value.clone());
372            } else {
373                // Remove expired entry
374                drop(entry);
375                self.l1_cache.remove(key);
376            }
377        }
378
379        // Try L2 cache
380        {
381            let mut l2_cache = self.l2_cache.write().await;
382            if let Some(entry) = l2_cache.get_mut(key) {
383                if entry.is_valid() {
384                    entry.mark_accessed();
385                    let value = entry.value.clone();
386
387                    // Promote to L1 if frequently accessed
388                    if entry.access_count >= self.config.l1_config.promotion_threshold {
389                        let promoted_entry = entry.clone();
390                        l2_cache.remove(key);
391                        drop(l2_cache);
392
393                        if let Err(e) = self.promote_to_l1(key.clone(), promoted_entry).await {
394                            eprintln!("Failed to promote to L1: {:?}", e);
395                        }
396                    }
397
398                    // Update statistics
399                    {
400                        let mut stats = self.stats.write().unwrap();
401                        stats.l2_stats.hits += 1;
402                    }
403
404                    // Record performance metrics
405                    self.monitor.record_get_latency(start_time.elapsed()).await;
406
407                    return Some(value);
408                } else {
409                    // Remove expired entry
410                    l2_cache.remove(key);
411                }
412            }
413        }
414
415        // Cache miss
416        {
417            let mut stats = self.stats.write().unwrap();
418            stats.l1_stats.misses += 1;
419            stats.l2_stats.misses += 1;
420        }
421
422        // Trigger predictive preheating
423        if self.config.preheating_config.enable_predictive_preheating {
424            self.preheater.predict_and_preheat(key).await;
425        }
426
427        // Record performance metrics
428        self.monitor.record_get_latency(start_time.elapsed()).await;
429
430        None
431    }
432
433    /// Put a value into the cache
434    async fn put(&self, key: K, value: V) -> Result<(), CacheError> {
435        let start_time = Instant::now();
436
437        // Estimate size (simplified)
438        let size_bytes = std::mem::size_of::<V>() as u64;
439
440        // Create cache entry for L1
441        let entry =
442            MultiLevelCacheEntry::new(value, self.config.l1_config.default_ttl, 1, size_bytes);
443
444        // Check L1 capacity and evict if necessary
445        if self.l1_cache.len() >= self.config.l1_config.max_entries {
446            self.evict_l1_entries().await?;
447        }
448
449        // Insert into L1 cache
450        self.l1_cache.insert(key.clone(), entry);
451
452        // Update statistics
453        self.update_statistics().await;
454
455        // Record performance metrics
456        self.monitor.record_put_latency(start_time.elapsed()).await;
457
458        // Trigger adaptive tuning
459        if self.config.tuning_config.enable_adaptive_tuning {
460            self.tuner.analyze_and_tune().await;
461        }
462
463        Ok(())
464    }
465
466    /// Remove a value from the cache
467    async fn remove(&self, key: &K) -> bool {
468        let l1_removed = self.l1_cache.remove(key).is_some();
469
470        let l2_removed = {
471            let mut l2_cache = self.l2_cache.write().await;
472            l2_cache.remove(key).is_some()
473        };
474
475        // Update statistics if removed
476        if l1_removed || l2_removed {
477            self.update_statistics().await;
478        }
479
480        l1_removed || l2_removed
481    }
482
483    /// Check if a key exists in the cache
484    async fn contains_key(&self, key: &K) -> bool {
485        // Check L1 first
486        if let Some(entry) = self.l1_cache.get(key) {
487            if entry.is_valid() {
488                return true;
489            }
490        }
491
492        // Check L2
493        let l2_cache = self.l2_cache.read().await;
494        if let Some(entry) = l2_cache.get(key) {
495            return entry.is_valid();
496        }
497
498        false
499    }
500
501    /// Get cache statistics
502    async fn get_stats(&self) -> UnifiedCacheStats {
503        self.update_statistics().await;
504        self.stats.read().unwrap().clone()
505    }
506
507    /// Clear all cache entries
508    async fn clear(&self) -> Result<(), CacheError> {
509        self.l1_cache.clear();
510
511        {
512            let mut l2_cache = self.l2_cache.write().await;
513            l2_cache.clear();
514        }
515
516        // Reset statistics
517        {
518            let mut stats = self.stats.write().unwrap();
519            *stats = UnifiedCacheStats::default();
520        }
521
522        Ok(())
523    }
524
525    /// Get cache size (total entries across all levels)
526    async fn size(&self) -> usize {
527        let l1_size = self.l1_cache.len();
528        let l2_size = {
529            let l2_cache = self.l2_cache.read().await;
530            l2_cache.len()
531        };
532        l1_size + l2_size
533    }
534
535    /// Check if cache is empty
536    async fn is_empty(&self) -> bool {
537        self.size().await == 0
538    }
539
540    /// Get cache capacity (total across all levels)
541    async fn capacity(&self) -> usize {
542        self.config.l1_config.max_entries + self.config.l2_config.max_entries
543    }
544
545    /// Get cache type identifier
546    fn cache_type(&self) -> &'static str {
547        "IntelligentCacheManager"
548    }
549}