1use anyhow::Result;
4use std::sync::atomic::Ordering;
5use std::time::{Duration, Instant, SystemTime};
6use tracing::{debug, info, warn};
7
8use super::config::CacheConfiguration;
9use super::eviction::{
10 AdaptiveEvictionPolicy, EvictionPolicy, LFUEvictionPolicy, LRUEvictionPolicy,
11};
12use super::metrics::CachePerformanceMetrics;
13use super::ml_models::MLModels;
14use super::optimizer::CacheOptimizer;
15use super::pattern_analyzer::AccessPatternAnalyzer;
16use super::prefetcher::PredictivePrefetcher;
17use super::storage::{CacheStorage, CompressedStorage, MemoryStorage, PersistentStorage};
18use super::tier::CacheTier;
19use super::types::AccessTracker;
20use super::types::{
21 AccessEvent, CacheItem, CacheKey, CachePerformanceData, CacheStatistics, CacheValue,
22 EstimatedImpact, ExportFormat, OptimizationEvent, OptimizationResult, TierConfiguration,
23 TierStatistics,
24};
25
26#[derive(Debug)]
28pub struct AdaptiveIntelligentCache {
29 tiers: Vec<CacheTier>,
31 pattern_analyzer: AccessPatternAnalyzer,
33 prefetcher: PredictivePrefetcher,
35 optimizer: CacheOptimizer,
37 metrics: CachePerformanceMetrics,
39 config: CacheConfiguration,
41 ml_models: MLModels,
43}
44
45impl AdaptiveIntelligentCache {
46 pub fn new(config: CacheConfiguration) -> Result<Self> {
48 info!(
49 "Initializing Adaptive Intelligent Cache with {} tiers",
50 config.num_tiers
51 );
52
53 let mut tiers = Vec::new();
54 let tier_sizes = Self::calculate_tier_sizes(&config);
55
56 for (tier_id, size) in tier_sizes.into_iter().enumerate() {
57 let tier_config = TierConfiguration {
58 max_size_bytes: size,
59 default_ttl: Duration::from_secs(config.default_ttl_seconds),
60 compression_enabled: tier_id > 0, persistence_enabled: tier_id == config.num_tiers as usize - 1, replication_factor: if tier_id == 0 { 1 } else { 2 }, };
64
65 let storage = Self::create_storage_for_tier(tier_id as u32, &tier_config)?;
66 let eviction_policy = Self::create_eviction_policy_for_tier(tier_id as u32);
67
68 let tier = CacheTier {
69 tier_id: tier_id as u32,
70 storage,
71 eviction_policy,
72 access_tracker: AccessTracker::new(),
73 config: tier_config,
74 stats: TierStatistics::default(),
75 };
76
77 tiers.push(tier);
78 }
79
80 Ok(Self {
81 tiers,
82 pattern_analyzer: AccessPatternAnalyzer::new(),
83 prefetcher: PredictivePrefetcher::new(),
84 optimizer: CacheOptimizer::new(),
85 metrics: CachePerformanceMetrics::default(),
86 config,
87 ml_models: MLModels::new()?,
88 })
89 }
90
91 pub fn store(&mut self, key: CacheKey, value: CacheValue) -> Result<()> {
93 let start_time = Instant::now();
94
95 let optimal_tier = self
97 .ml_models
98 .tier_placement_model
99 .predict_optimal_tier(&key, &value);
100
101 let tier = &mut self.tiers[optimal_tier as usize];
103 tier.storage
104 .store(key.clone(), value.clone(), Some(tier.config.default_ttl))?;
105
106 tier.access_tracker.on_store(&key);
108 self.update_store_metrics(optimal_tier, start_time.elapsed());
109
110 self.check_and_evict(optimal_tier)?;
112
113 self.ml_models
115 .update_with_store_event(&key, &value, optimal_tier);
116
117 debug!(
118 "Stored cache item in tier {} with key hash {:?}",
119 optimal_tier,
120 self.hash_key(&key)
121 );
122 Ok(())
123 }
124
125 pub fn retrieve(&mut self, key: &CacheKey) -> Option<CacheValue> {
127 let start_time = Instant::now();
128
129 for (tier_index, tier) in self.tiers.iter_mut().enumerate() {
131 if let Some(mut value) = tier.storage.retrieve(key) {
132 value.last_accessed = SystemTime::now();
134 value.access_count += 1;
135
136 tier.access_tracker.on_access(key, Instant::now());
137 self.update_hit_metrics(tier_index as u32, start_time.elapsed());
138
139 if tier_index > 0 && self.should_promote(key, &value, tier_index) {
141 if let Err(e) = self.promote_item(key.clone(), value.clone(), tier_index) {
142 warn!("Failed to promote cache item: {}", e);
143 }
144 }
145
146 self.pattern_analyzer.record_access(AccessEvent {
148 timestamp: SystemTime::now(),
149 key: key.clone(),
150 hit: true,
151 latency_ns: start_time.elapsed().as_nanos() as u64,
152 user_context: None, });
154
155 if self.config.enable_prefetching {
157 self.prefetcher.trigger_prefetch_analysis(key, &value);
158 }
159
160 return Some(value);
161 }
162 }
163
164 self.update_miss_metrics(start_time.elapsed());
166 self.pattern_analyzer.record_access(AccessEvent {
167 timestamp: SystemTime::now(),
168 key: key.clone(),
169 hit: false,
170 latency_ns: start_time.elapsed().as_nanos() as u64,
171 user_context: None,
172 });
173
174 None
175 }
176
177 pub fn remove(&mut self, key: &CacheKey) -> bool {
179 let mut removed = false;
180 for tier in &mut self.tiers {
181 if tier.storage.remove(key) {
182 tier.access_tracker.on_remove(key);
183 removed = true;
184 }
185 }
186 removed
187 }
188
189 pub fn get_statistics(&self) -> CacheStatistics {
191 let total_hits = self.metrics.hit_count.load(Ordering::Relaxed);
192 let total_misses = self.metrics.miss_count.load(Ordering::Relaxed);
193 let total_requests = total_hits + total_misses;
194
195 let hit_rate = if total_requests > 0 {
196 total_hits as f64 / total_requests as f64
197 } else {
198 0.0
199 };
200
201 CacheStatistics {
202 hit_rate,
203 miss_rate: 1.0 - hit_rate,
204 total_requests,
205 avg_hit_latency_ns: self.metrics.avg_hit_latency_ns.load(Ordering::Relaxed),
206 avg_miss_latency_ns: self.metrics.avg_miss_latency_ns.load(Ordering::Relaxed),
207 cache_efficiency: self.metrics.cache_efficiency_score,
208 memory_utilization: self.calculate_memory_utilization(),
209 tier_statistics: self.collect_tier_statistics(),
210 prefetch_statistics: self.prefetcher.get_statistics(),
211 optimization_statistics: self.optimizer.get_statistics(),
212 }
213 }
214
215 pub fn optimize(&mut self) -> Result<OptimizationResult> {
217 if !self.config.enable_adaptive_optimization {
218 return Ok(OptimizationResult {
219 improvement_score: 0.0,
220 changes_applied: vec![],
221 estimated_impact: EstimatedImpact::default(),
222 });
223 }
224
225 info!("Running cache optimization cycle");
226 let before_metrics = self.metrics.clone();
227
228 let mut total_improvement = 0.0;
229 let mut all_changes = Vec::new();
230
231 let mut algorithms = std::mem::take(&mut self.optimizer.algorithms);
234 for algorithm in &mut algorithms {
235 match algorithm.optimize_cache(&self.tiers, &self.metrics, &self.config) {
236 Ok(result) => {
237 total_improvement += result.improvement_score;
238 all_changes.extend(result.changes_applied);
239 info!(
240 "Optimization algorithm '{}' achieved {:.2}% improvement",
241 algorithm.name(),
242 result.improvement_score * 100.0
243 );
244 }
245 Err(e) => {
246 warn!(
247 "Optimization algorithm '{}' failed: {}",
248 algorithm.name(),
249 e
250 );
251 }
252 }
253 }
254 self.optimizer.algorithms = algorithms;
256
257 self.optimizer.record_optimization_event(OptimizationEvent {
259 timestamp: SystemTime::now(),
260 algorithm: "combined".to_string(),
261 changes: all_changes.clone(),
262 before_metrics,
263 after_metrics: None, });
265
266 Ok(OptimizationResult {
267 improvement_score: total_improvement,
268 changes_applied: all_changes,
269 estimated_impact: self.estimate_optimization_impact(total_improvement),
270 })
271 }
272
273 pub fn export_performance_data(&self, format: ExportFormat) -> Result<String> {
275 match format {
276 ExportFormat::Json => {
277 let data = CachePerformanceData {
278 metrics: self.metrics.clone(),
279 statistics: self.get_statistics(),
280 configuration: self.config.clone(),
281 access_patterns: self.pattern_analyzer.export_patterns(),
282 optimization_history: self.optimizer.export_history(),
283 };
284 Ok(serde_json::to_string_pretty(&data)?)
285 }
286 ExportFormat::Prometheus => self.export_prometheus_metrics(),
287 ExportFormat::Csv => self.export_csv_metrics(),
288 }
289 }
290
291 fn calculate_tier_sizes(config: &CacheConfiguration) -> Vec<u64> {
294 let total_size = config.max_total_size_bytes;
295 config
296 .tier_size_ratios
297 .iter()
298 .map(|ratio| (total_size as f64 * ratio) as u64)
299 .collect()
300 }
301
302 fn create_storage_for_tier(
303 tier_id: u32,
304 config: &TierConfiguration,
305 ) -> Result<Box<dyn CacheStorage>> {
306 match tier_id {
307 0 => Ok(Box::new(MemoryStorage::new(config.max_size_bytes))),
308 1 => Ok(Box::new(CompressedStorage::new(config.max_size_bytes))),
309 _ => Ok(Box::new(PersistentStorage::new(config.max_size_bytes)?)),
310 }
311 }
312
313 fn create_eviction_policy_for_tier(tier_id: u32) -> Box<dyn EvictionPolicy> {
314 match tier_id {
315 0 => Box::new(LRUEvictionPolicy::new()),
316 1 => Box::new(LFUEvictionPolicy::new()),
317 _ => Box::new(AdaptiveEvictionPolicy::new()),
318 }
319 }
320
321 fn should_promote(&self, _key: &CacheKey, value: &CacheValue, current_tier: usize) -> bool {
322 let access_frequency = value.access_count as f64;
324 let recency_score = self.calculate_recency_score(value.last_accessed);
325 let size_penalty = value.metadata.size_bytes as f64 / 1024.0; let promotion_score = access_frequency * recency_score / size_penalty;
328 promotion_score > 2.0 && current_tier > 0
329 }
330
331 fn promote_item(&mut self, key: CacheKey, value: CacheValue, from_tier: usize) -> Result<()> {
332 if from_tier == 0 {
333 return Ok(()); }
335
336 let target_tier = from_tier - 1;
337
338 self.tiers[from_tier].storage.remove(&key);
340
341 let default_ttl = self.tiers[target_tier].config.default_ttl;
343 self.tiers[target_tier]
344 .storage
345 .store(key, value, Some(default_ttl))?;
346
347 debug!(
348 "Promoted cache item from tier {} to tier {}",
349 from_tier, target_tier
350 );
351 Ok(())
352 }
353
354 fn calculate_recency_score(&self, last_accessed: SystemTime) -> f64 {
355 let now = SystemTime::now();
356 let duration = now.duration_since(last_accessed).unwrap_or(Duration::ZERO);
357 let hours = duration.as_secs_f64() / 3600.0;
358
359 (-hours / 24.0).exp()
361 }
362
363 fn check_and_evict(&mut self, tier_id: u32) -> Result<()> {
364 let size_info = {
365 let tier = &self.tiers[tier_id as usize];
366 tier.storage.size_info()
367 };
368
369 if size_info.used_bytes > self.tiers[tier_id as usize].config.max_size_bytes {
370 let target_size =
371 (self.tiers[tier_id as usize].config.max_size_bytes as f64 * 0.8) as u64; let items = self.collect_tier_items(tier_id);
373
374 let keys_to_evict = {
375 let tier = &mut self.tiers[tier_id as usize];
376 tier.eviction_policy
377 .evict(size_info.used_bytes, target_size, &items)
378 };
379
380 let tier = &mut self.tiers[tier_id as usize];
381 for key in keys_to_evict {
382 tier.storage.remove(&key);
383 tier.stats.eviction_count += 1;
384 }
385 }
386
387 Ok(())
388 }
389
390 fn collect_tier_items(&self, _tier_id: u32) -> Vec<CacheItem> {
391 Vec::new()
394 }
395
396 fn hash_key(&self, key: &CacheKey) -> u64 {
397 use std::collections::hash_map::DefaultHasher;
398 use std::hash::{Hash, Hasher};
399 let mut hasher = DefaultHasher::new();
400 key.hash(&mut hasher);
401 hasher.finish()
402 }
403
404 fn update_store_metrics(&mut self, tier_id: u32, _latency: Duration) {
405 if let Some(_tier_metrics) = self.metrics.tier_metrics.get_mut(&tier_id) {
407 }
409 }
410
411 fn update_hit_metrics(&mut self, _tier_id: u32, latency: Duration) {
412 self.metrics.hit_count.fetch_add(1, Ordering::Relaxed);
413 self.metrics.total_requests.fetch_add(1, Ordering::Relaxed);
414
415 let latency_ns = latency.as_nanos() as u64;
417 self.metrics
418 .avg_hit_latency_ns
419 .store(latency_ns, Ordering::Relaxed);
420 }
421
422 fn update_miss_metrics(&mut self, latency: Duration) {
423 self.metrics.miss_count.fetch_add(1, Ordering::Relaxed);
424 self.metrics.total_requests.fetch_add(1, Ordering::Relaxed);
425
426 let latency_ns = latency.as_nanos() as u64;
427 self.metrics
428 .avg_miss_latency_ns
429 .store(latency_ns, Ordering::Relaxed);
430 }
431
432 fn calculate_memory_utilization(&self) -> f64 {
433 let total_used: u64 = self
434 .tiers
435 .iter()
436 .map(|tier| tier.storage.size_info().used_bytes)
437 .sum();
438 let total_capacity: u64 = self
439 .tiers
440 .iter()
441 .map(|tier| tier.storage.size_info().total_capacity_bytes)
442 .sum();
443
444 if total_capacity > 0 {
445 total_used as f64 / total_capacity as f64
446 } else {
447 0.0
448 }
449 }
450
451 fn collect_tier_statistics(&self) -> Vec<TierStatistics> {
452 self.tiers.iter().map(|tier| tier.stats.clone()).collect()
453 }
454
455 fn estimate_optimization_impact(&self, improvement_score: f64) -> EstimatedImpact {
456 EstimatedImpact {
457 hit_rate_improvement: improvement_score * 0.1,
458 latency_reduction: improvement_score * 0.05,
459 memory_efficiency_gain: improvement_score * 0.08,
460 cost_reduction: improvement_score * 0.03,
461 }
462 }
463
464 fn export_prometheus_metrics(&self) -> Result<String> {
465 let mut metrics = String::new();
466
467 let hit_count = self.metrics.hit_count.load(Ordering::Relaxed);
468 let miss_count = self.metrics.miss_count.load(Ordering::Relaxed);
469 let total = hit_count + miss_count;
470
471 metrics.push_str(&format!("oxirs_cache_hits_total {hit_count}\n"));
472 metrics.push_str(&format!("oxirs_cache_misses_total {miss_count}\n"));
473 metrics.push_str(&format!("oxirs_cache_requests_total {total}\n"));
474
475 if total > 0 {
476 let hit_rate = hit_count as f64 / total as f64;
477 metrics.push_str(&format!("oxirs_cache_hit_rate {hit_rate:.4}\n"));
478 }
479
480 metrics.push_str(&format!(
481 "oxirs_cache_memory_utilization {:.4}\n",
482 self.calculate_memory_utilization()
483 ));
484 metrics.push_str(&format!(
485 "oxirs_cache_efficiency_score {:.4}\n",
486 self.metrics.cache_efficiency_score
487 ));
488
489 Ok(metrics)
490 }
491
492 fn export_csv_metrics(&self) -> Result<String> {
493 let mut csv = String::new();
494 csv.push_str("metric,value,timestamp\n");
495
496 let now = SystemTime::now()
497 .duration_since(SystemTime::UNIX_EPOCH)?
498 .as_secs();
499 let hit_count = self.metrics.hit_count.load(Ordering::Relaxed);
500 let miss_count = self.metrics.miss_count.load(Ordering::Relaxed);
501
502 csv.push_str(&format!("hit_count,{hit_count},{now}\n"));
503 csv.push_str(&format!("miss_count,{miss_count},{now}\n"));
504 csv.push_str(&format!(
505 "memory_utilization,{:.4},{}\n",
506 self.calculate_memory_utilization(),
507 now
508 ));
509
510 Ok(csv)
511 }
512}