oxirs_vec/adaptive_intelligent_caching/
cache.rs

1//! Main adaptive intelligent cache implementation
2
3use anyhow::Result;
4use std::sync::atomic::Ordering;
5use std::time::{Duration, Instant, SystemTime};
6use tracing::{debug, info, warn};
7
8use super::config::CacheConfiguration;
9use super::eviction::{
10    AdaptiveEvictionPolicy, EvictionPolicy, LFUEvictionPolicy, LRUEvictionPolicy,
11};
12use super::metrics::CachePerformanceMetrics;
13use super::ml_models::MLModels;
14use super::optimizer::CacheOptimizer;
15use super::pattern_analyzer::AccessPatternAnalyzer;
16use super::prefetcher::PredictivePrefetcher;
17use super::storage::{CacheStorage, CompressedStorage, MemoryStorage, PersistentStorage};
18use super::tier::CacheTier;
19use super::types::AccessTracker;
20use super::types::{
21    AccessEvent, CacheItem, CacheKey, CachePerformanceData, CacheStatistics, CacheValue,
22    EstimatedImpact, ExportFormat, OptimizationEvent, OptimizationResult, TierConfiguration,
23    TierStatistics,
24};
25
26/// Adaptive intelligent caching system with ML-driven optimization
27#[derive(Debug)]
28pub struct AdaptiveIntelligentCache {
29    /// Multi-tier cache storage
30    tiers: Vec<CacheTier>,
31    /// Cache access pattern analyzer
32    pattern_analyzer: AccessPatternAnalyzer,
33    /// Predictive prefetching engine
34    prefetcher: PredictivePrefetcher,
35    /// Cache optimization engine
36    optimizer: CacheOptimizer,
37    /// Performance metrics collector
38    metrics: CachePerformanceMetrics,
39    /// Configuration parameters
40    config: CacheConfiguration,
41    /// Machine learning models for cache decisions
42    ml_models: MLModels,
43}
44
45impl AdaptiveIntelligentCache {
46    /// Create a new adaptive intelligent cache with the given configuration
47    pub fn new(config: CacheConfiguration) -> Result<Self> {
48        info!(
49            "Initializing Adaptive Intelligent Cache with {} tiers",
50            config.num_tiers
51        );
52
53        let mut tiers = Vec::new();
54        let tier_sizes = Self::calculate_tier_sizes(&config);
55
56        for (tier_id, size) in tier_sizes.into_iter().enumerate() {
57            let tier_config = TierConfiguration {
58                max_size_bytes: size,
59                default_ttl: Duration::from_secs(config.default_ttl_seconds),
60                compression_enabled: tier_id > 0, // Enable compression for higher tiers
61                persistence_enabled: tier_id == config.num_tiers as usize - 1, // Only last tier persisted
62                replication_factor: if tier_id == 0 { 1 } else { 2 }, // Replicate slower tiers
63            };
64
65            let storage = Self::create_storage_for_tier(tier_id as u32, &tier_config)?;
66            let eviction_policy = Self::create_eviction_policy_for_tier(tier_id as u32);
67
68            let tier = CacheTier {
69                tier_id: tier_id as u32,
70                storage,
71                eviction_policy,
72                access_tracker: AccessTracker::new(),
73                config: tier_config,
74                stats: TierStatistics::default(),
75            };
76
77            tiers.push(tier);
78        }
79
80        Ok(Self {
81            tiers,
82            pattern_analyzer: AccessPatternAnalyzer::new(),
83            prefetcher: PredictivePrefetcher::new(),
84            optimizer: CacheOptimizer::new(),
85            metrics: CachePerformanceMetrics::default(),
86            config,
87            ml_models: MLModels::new()?,
88        })
89    }
90
91    /// Store a value in the cache with intelligent tier placement
92    pub fn store(&mut self, key: CacheKey, value: CacheValue) -> Result<()> {
93        let start_time = Instant::now();
94
95        // Determine optimal tier placement using ML model
96        let optimal_tier = self
97            .ml_models
98            .tier_placement_model
99            .predict_optimal_tier(&key, &value);
100
101        // Store in the determined tier
102        let tier = &mut self.tiers[optimal_tier as usize];
103        tier.storage
104            .store(key.clone(), value.clone(), Some(tier.config.default_ttl))?;
105
106        // Update access tracking and metrics
107        tier.access_tracker.on_store(&key);
108        self.update_store_metrics(optimal_tier, start_time.elapsed());
109
110        // Trigger eviction if necessary
111        self.check_and_evict(optimal_tier)?;
112
113        // Update ML models with new data
114        self.ml_models
115            .update_with_store_event(&key, &value, optimal_tier);
116
117        debug!(
118            "Stored cache item in tier {} with key hash {:?}",
119            optimal_tier,
120            self.hash_key(&key)
121        );
122        Ok(())
123    }
124
125    /// Retrieve a value from the cache with intelligent promotion
126    pub fn retrieve(&mut self, key: &CacheKey) -> Option<CacheValue> {
127        let start_time = Instant::now();
128
129        // Search through tiers starting from fastest
130        for (tier_index, tier) in self.tiers.iter_mut().enumerate() {
131            if let Some(mut value) = tier.storage.retrieve(key) {
132                // Update access information
133                value.last_accessed = SystemTime::now();
134                value.access_count += 1;
135
136                tier.access_tracker.on_access(key, Instant::now());
137                self.update_hit_metrics(tier_index as u32, start_time.elapsed());
138
139                // Consider promoting to faster tier based on access pattern
140                if tier_index > 0 && self.should_promote(key, &value, tier_index) {
141                    if let Err(e) = self.promote_item(key.clone(), value.clone(), tier_index) {
142                        warn!("Failed to promote cache item: {}", e);
143                    }
144                }
145
146                // Record access event for pattern analysis
147                self.pattern_analyzer.record_access(AccessEvent {
148                    timestamp: SystemTime::now(),
149                    key: key.clone(),
150                    hit: true,
151                    latency_ns: start_time.elapsed().as_nanos() as u64,
152                    user_context: None, // Could be extracted from key metadata
153                });
154
155                // Trigger predictive prefetching
156                if self.config.enable_prefetching {
157                    self.prefetcher.trigger_prefetch_analysis(key, &value);
158                }
159
160                return Some(value);
161            }
162        }
163
164        // Cache miss - update metrics and patterns
165        self.update_miss_metrics(start_time.elapsed());
166        self.pattern_analyzer.record_access(AccessEvent {
167            timestamp: SystemTime::now(),
168            key: key.clone(),
169            hit: false,
170            latency_ns: start_time.elapsed().as_nanos() as u64,
171            user_context: None,
172        });
173
174        None
175    }
176
177    /// Remove an item from all cache tiers
178    pub fn remove(&mut self, key: &CacheKey) -> bool {
179        let mut removed = false;
180        for tier in &mut self.tiers {
181            if tier.storage.remove(key) {
182                tier.access_tracker.on_remove(key);
183                removed = true;
184            }
185        }
186        removed
187    }
188
189    /// Get comprehensive cache statistics
190    pub fn get_statistics(&self) -> CacheStatistics {
191        let total_hits = self.metrics.hit_count.load(Ordering::Relaxed);
192        let total_misses = self.metrics.miss_count.load(Ordering::Relaxed);
193        let total_requests = total_hits + total_misses;
194
195        let hit_rate = if total_requests > 0 {
196            total_hits as f64 / total_requests as f64
197        } else {
198            0.0
199        };
200
201        CacheStatistics {
202            hit_rate,
203            miss_rate: 1.0 - hit_rate,
204            total_requests,
205            avg_hit_latency_ns: self.metrics.avg_hit_latency_ns.load(Ordering::Relaxed),
206            avg_miss_latency_ns: self.metrics.avg_miss_latency_ns.load(Ordering::Relaxed),
207            cache_efficiency: self.metrics.cache_efficiency_score,
208            memory_utilization: self.calculate_memory_utilization(),
209            tier_statistics: self.collect_tier_statistics(),
210            prefetch_statistics: self.prefetcher.get_statistics(),
211            optimization_statistics: self.optimizer.get_statistics(),
212        }
213    }
214
215    /// Run cache optimization cycle
216    pub fn optimize(&mut self) -> Result<OptimizationResult> {
217        if !self.config.enable_adaptive_optimization {
218            return Ok(OptimizationResult {
219                improvement_score: 0.0,
220                changes_applied: vec![],
221                estimated_impact: EstimatedImpact::default(),
222            });
223        }
224
225        info!("Running cache optimization cycle");
226        let before_metrics = self.metrics.clone();
227
228        let mut total_improvement = 0.0;
229        let mut all_changes = Vec::new();
230
231        // Run each optimization algorithm
232        // Temporarily move algorithms out to avoid borrowing conflicts
233        let mut algorithms = std::mem::take(&mut self.optimizer.algorithms);
234        for algorithm in &mut algorithms {
235            match algorithm.optimize_cache(&self.tiers, &self.metrics, &self.config) {
236                Ok(result) => {
237                    total_improvement += result.improvement_score;
238                    all_changes.extend(result.changes_applied);
239                    info!(
240                        "Optimization algorithm '{}' achieved {:.2}% improvement",
241                        algorithm.name(),
242                        result.improvement_score * 100.0
243                    );
244                }
245                Err(e) => {
246                    warn!(
247                        "Optimization algorithm '{}' failed: {}",
248                        algorithm.name(),
249                        e
250                    );
251                }
252            }
253        }
254        // Move algorithms back
255        self.optimizer.algorithms = algorithms;
256
257        // Update optimization history
258        self.optimizer.record_optimization_event(OptimizationEvent {
259            timestamp: SystemTime::now(),
260            algorithm: "combined".to_string(),
261            changes: all_changes.clone(),
262            before_metrics,
263            after_metrics: None, // Will be updated later
264        });
265
266        Ok(OptimizationResult {
267            improvement_score: total_improvement,
268            changes_applied: all_changes,
269            estimated_impact: self.estimate_optimization_impact(total_improvement),
270        })
271    }
272
273    /// Export cache performance data for external analysis
274    pub fn export_performance_data(&self, format: ExportFormat) -> Result<String> {
275        match format {
276            ExportFormat::Json => {
277                let data = CachePerformanceData {
278                    metrics: self.metrics.clone(),
279                    statistics: self.get_statistics(),
280                    configuration: self.config.clone(),
281                    access_patterns: self.pattern_analyzer.export_patterns(),
282                    optimization_history: self.optimizer.export_history(),
283                };
284                Ok(serde_json::to_string_pretty(&data)?)
285            }
286            ExportFormat::Prometheus => self.export_prometheus_metrics(),
287            ExportFormat::Csv => self.export_csv_metrics(),
288        }
289    }
290
291    // Private helper methods
292
293    fn calculate_tier_sizes(config: &CacheConfiguration) -> Vec<u64> {
294        let total_size = config.max_total_size_bytes;
295        config
296            .tier_size_ratios
297            .iter()
298            .map(|ratio| (total_size as f64 * ratio) as u64)
299            .collect()
300    }
301
302    fn create_storage_for_tier(
303        tier_id: u32,
304        config: &TierConfiguration,
305    ) -> Result<Box<dyn CacheStorage>> {
306        match tier_id {
307            0 => Ok(Box::new(MemoryStorage::new(config.max_size_bytes))),
308            1 => Ok(Box::new(CompressedStorage::new(config.max_size_bytes))),
309            _ => Ok(Box::new(PersistentStorage::new(config.max_size_bytes)?)),
310        }
311    }
312
313    fn create_eviction_policy_for_tier(tier_id: u32) -> Box<dyn EvictionPolicy> {
314        match tier_id {
315            0 => Box::new(LRUEvictionPolicy::new()),
316            1 => Box::new(LFUEvictionPolicy::new()),
317            _ => Box::new(AdaptiveEvictionPolicy::new()),
318        }
319    }
320
321    fn should_promote(&self, _key: &CacheKey, value: &CacheValue, current_tier: usize) -> bool {
322        // Use ML model to determine if item should be promoted
323        let access_frequency = value.access_count as f64;
324        let recency_score = self.calculate_recency_score(value.last_accessed);
325        let size_penalty = value.metadata.size_bytes as f64 / 1024.0; // KB
326
327        let promotion_score = access_frequency * recency_score / size_penalty;
328        promotion_score > 2.0 && current_tier > 0
329    }
330
331    fn promote_item(&mut self, key: CacheKey, value: CacheValue, from_tier: usize) -> Result<()> {
332        if from_tier == 0 {
333            return Ok(()); // Already in fastest tier
334        }
335
336        let target_tier = from_tier - 1;
337
338        // Remove from current tier
339        self.tiers[from_tier].storage.remove(&key);
340
341        // Store in target tier
342        let default_ttl = self.tiers[target_tier].config.default_ttl;
343        self.tiers[target_tier]
344            .storage
345            .store(key, value, Some(default_ttl))?;
346
347        debug!(
348            "Promoted cache item from tier {} to tier {}",
349            from_tier, target_tier
350        );
351        Ok(())
352    }
353
354    fn calculate_recency_score(&self, last_accessed: SystemTime) -> f64 {
355        let now = SystemTime::now();
356        let duration = now.duration_since(last_accessed).unwrap_or(Duration::ZERO);
357        let hours = duration.as_secs_f64() / 3600.0;
358
359        // Exponential decay
360        (-hours / 24.0).exp()
361    }
362
363    fn check_and_evict(&mut self, tier_id: u32) -> Result<()> {
364        let size_info = {
365            let tier = &self.tiers[tier_id as usize];
366            tier.storage.size_info()
367        };
368
369        if size_info.used_bytes > self.tiers[tier_id as usize].config.max_size_bytes {
370            let target_size =
371                (self.tiers[tier_id as usize].config.max_size_bytes as f64 * 0.8) as u64; // Target 80% utilization
372            let items = self.collect_tier_items(tier_id);
373
374            let keys_to_evict = {
375                let tier = &mut self.tiers[tier_id as usize];
376                tier.eviction_policy
377                    .evict(size_info.used_bytes, target_size, &items)
378            };
379
380            let tier = &mut self.tiers[tier_id as usize];
381            for key in keys_to_evict {
382                tier.storage.remove(&key);
383                tier.stats.eviction_count += 1;
384            }
385        }
386
387        Ok(())
388    }
389
390    fn collect_tier_items(&self, _tier_id: u32) -> Vec<CacheItem> {
391        // This would collect all items from the tier for eviction analysis
392        // Simplified implementation
393        Vec::new()
394    }
395
396    fn hash_key(&self, key: &CacheKey) -> u64 {
397        use std::collections::hash_map::DefaultHasher;
398        use std::hash::{Hash, Hasher};
399        let mut hasher = DefaultHasher::new();
400        key.hash(&mut hasher);
401        hasher.finish()
402    }
403
404    fn update_store_metrics(&mut self, tier_id: u32, _latency: Duration) {
405        // Update tier-specific metrics
406        if let Some(_tier_metrics) = self.metrics.tier_metrics.get_mut(&tier_id) {
407            // Update tier metrics
408        }
409    }
410
411    fn update_hit_metrics(&mut self, _tier_id: u32, latency: Duration) {
412        self.metrics.hit_count.fetch_add(1, Ordering::Relaxed);
413        self.metrics.total_requests.fetch_add(1, Ordering::Relaxed);
414
415        // Update average hit latency (simplified)
416        let latency_ns = latency.as_nanos() as u64;
417        self.metrics
418            .avg_hit_latency_ns
419            .store(latency_ns, Ordering::Relaxed);
420    }
421
422    fn update_miss_metrics(&mut self, latency: Duration) {
423        self.metrics.miss_count.fetch_add(1, Ordering::Relaxed);
424        self.metrics.total_requests.fetch_add(1, Ordering::Relaxed);
425
426        let latency_ns = latency.as_nanos() as u64;
427        self.metrics
428            .avg_miss_latency_ns
429            .store(latency_ns, Ordering::Relaxed);
430    }
431
432    fn calculate_memory_utilization(&self) -> f64 {
433        let total_used: u64 = self
434            .tiers
435            .iter()
436            .map(|tier| tier.storage.size_info().used_bytes)
437            .sum();
438        let total_capacity: u64 = self
439            .tiers
440            .iter()
441            .map(|tier| tier.storage.size_info().total_capacity_bytes)
442            .sum();
443
444        if total_capacity > 0 {
445            total_used as f64 / total_capacity as f64
446        } else {
447            0.0
448        }
449    }
450
451    fn collect_tier_statistics(&self) -> Vec<TierStatistics> {
452        self.tiers.iter().map(|tier| tier.stats.clone()).collect()
453    }
454
455    fn estimate_optimization_impact(&self, improvement_score: f64) -> EstimatedImpact {
456        EstimatedImpact {
457            hit_rate_improvement: improvement_score * 0.1,
458            latency_reduction: improvement_score * 0.05,
459            memory_efficiency_gain: improvement_score * 0.08,
460            cost_reduction: improvement_score * 0.03,
461        }
462    }
463
464    fn export_prometheus_metrics(&self) -> Result<String> {
465        let mut metrics = String::new();
466
467        let hit_count = self.metrics.hit_count.load(Ordering::Relaxed);
468        let miss_count = self.metrics.miss_count.load(Ordering::Relaxed);
469        let total = hit_count + miss_count;
470
471        metrics.push_str(&format!("oxirs_cache_hits_total {hit_count}\n"));
472        metrics.push_str(&format!("oxirs_cache_misses_total {miss_count}\n"));
473        metrics.push_str(&format!("oxirs_cache_requests_total {total}\n"));
474
475        if total > 0 {
476            let hit_rate = hit_count as f64 / total as f64;
477            metrics.push_str(&format!("oxirs_cache_hit_rate {hit_rate:.4}\n"));
478        }
479
480        metrics.push_str(&format!(
481            "oxirs_cache_memory_utilization {:.4}\n",
482            self.calculate_memory_utilization()
483        ));
484        metrics.push_str(&format!(
485            "oxirs_cache_efficiency_score {:.4}\n",
486            self.metrics.cache_efficiency_score
487        ));
488
489        Ok(metrics)
490    }
491
492    fn export_csv_metrics(&self) -> Result<String> {
493        let mut csv = String::new();
494        csv.push_str("metric,value,timestamp\n");
495
496        let now = SystemTime::now()
497            .duration_since(SystemTime::UNIX_EPOCH)?
498            .as_secs();
499        let hit_count = self.metrics.hit_count.load(Ordering::Relaxed);
500        let miss_count = self.metrics.miss_count.load(Ordering::Relaxed);
501
502        csv.push_str(&format!("hit_count,{hit_count},{now}\n"));
503        csv.push_str(&format!("miss_count,{miss_count},{now}\n"));
504        csv.push_str(&format!(
505            "memory_utilization,{:.4},{}\n",
506            self.calculate_memory_utilization(),
507            now
508        ));
509
510        Ok(csv)
511    }
512}