1use crate::{
7 AdaptiveTuner, CacheError, PerformanceMonitor, PredictivePreheater, UnifiedCache,
8 UnifiedCacheConfig, UnifiedCacheStats,
9};
10use dashmap::DashMap;
11use tracing::warn;
12
13use async_trait::async_trait;
14use std::{
15 collections::HashMap,
16 hash::Hash,
17 sync::{Arc, RwLock},
18 time::{Duration, Instant, SystemTime},
19};
20use tokio::sync::RwLock as AsyncRwLock;
21
22#[derive(Debug, Clone)]
24pub struct MultiLevelCacheEntry<V> {
25 pub value: V,
27 pub created_at: SystemTime,
29 pub last_accessed: SystemTime,
31 pub access_count: u64,
33 pub level: u8,
35 pub size_bytes: u64,
37 pub ttl: u64,
39 pub prediction_score: f64,
41}
42
43impl<V> MultiLevelCacheEntry<V> {
44 pub fn new(value: V, ttl: u64, level: u8, size_bytes: u64) -> Self {
46 let now = SystemTime::now();
47 Self {
48 value,
49 created_at: now,
50 last_accessed: now,
51 access_count: 1,
52 level,
53 size_bytes,
54 ttl,
55 prediction_score: 0.0,
56 }
57 }
58
59 pub fn is_valid(&self) -> bool {
61 if self.ttl == 0 {
62 return true; }
64
65 let elapsed = self
66 .created_at
67 .elapsed()
68 .unwrap_or(Duration::from_secs(u64::MAX))
69 .as_secs();
70 elapsed < self.ttl
71 }
72
73 pub fn mark_accessed(&mut self) {
75 self.last_accessed = SystemTime::now();
76 self.access_count += 1;
77 }
78
79 pub fn calculate_priority(&self) -> f64 {
81 let age_factor = self
82 .last_accessed
83 .elapsed()
84 .unwrap_or(Duration::from_secs(0))
85 .as_secs() as f64;
86 let frequency_factor = self.access_count as f64;
87 let size_factor = 1.0 / (self.size_bytes as f64 + 1.0);
88
89 (frequency_factor * size_factor) / (age_factor + 1.0)
91 }
92}
93
94#[derive(Debug)]
99pub struct IntelligentCacheManager<K, V>
100where
101 K: Clone + Hash + Eq + Send + Sync + std::fmt::Debug + 'static,
102 V: Clone + Send + Sync + std::fmt::Debug + 'static,
103{
104 config: UnifiedCacheConfig,
106 l1_cache: Arc<DashMap<K, MultiLevelCacheEntry<V>>>,
108 l2_cache: Arc<AsyncRwLock<HashMap<K, MultiLevelCacheEntry<V>>>>,
110 preheater: Arc<PredictivePreheater<K>>,
112 tuner: Arc<AdaptiveTuner>,
114 monitor: Arc<PerformanceMonitor>,
116 stats: Arc<RwLock<UnifiedCacheStats>>,
118 access_patterns: Arc<RwLock<HashMap<K, Vec<SystemTime>>>>,
120}
121
122impl<K, V> IntelligentCacheManager<K, V>
123where
124 K: Clone + Hash + Eq + Send + Sync + std::fmt::Debug + 'static,
125 V: Clone + Send + Sync + std::fmt::Debug + 'static,
126{
127 pub fn new(config: UnifiedCacheConfig) -> Self {
129 let preheater = Arc::new(PredictivePreheater::new(config.preheating_config.clone()));
130 let tuner = Arc::new(AdaptiveTuner::new(config.tuning_config.clone()));
131 let monitor = Arc::new(PerformanceMonitor::new(config.monitoring_config.clone()));
132
133 Self {
134 config,
135 l1_cache: Arc::new(DashMap::new()),
136 l2_cache: Arc::new(AsyncRwLock::new(HashMap::new())),
137 preheater,
138 tuner,
139 monitor,
140 stats: Arc::new(RwLock::new(UnifiedCacheStats::default())),
141 access_patterns: Arc::new(RwLock::new(HashMap::new())),
142 }
143 }
144
145 pub fn config(&self) -> &UnifiedCacheConfig {
147 &self.config
148 }
149
150 pub fn preheater(&self) -> Arc<PredictivePreheater<K>> {
152 Arc::clone(&self.preheater)
153 }
154
155 pub fn tuner(&self) -> Arc<AdaptiveTuner> {
157 Arc::clone(&self.tuner)
158 }
159
160 pub fn monitor(&self) -> Arc<PerformanceMonitor> {
162 Arc::clone(&self.monitor)
163 }
164
165 async fn record_access_pattern(&self, key: &K) {
167 if !self.config.preheating_config.enable_pattern_learning {
168 return;
169 }
170
171 let mut patterns = self.access_patterns.write().unwrap();
172 let now = SystemTime::now();
173
174 patterns.entry(key.clone()).or_default().push(now);
175
176 let cutoff =
178 now - Duration::from_secs(self.config.preheating_config.pattern_window_seconds);
179 if let Some(times) = patterns.get_mut(key) {
180 times.retain(|&time| time > cutoff);
181 }
182 }
183
184 async fn promote_to_l1(
186 &self,
187 key: K,
188 mut entry: MultiLevelCacheEntry<V>,
189 ) -> Result<(), CacheError> {
190 if self.l1_cache.len() >= self.config.l1_config.max_entries {
192 self.evict_l1_entries().await?;
193 }
194
195 entry.level = 1;
197 entry.mark_accessed();
198
199 self.l1_cache.insert(key.clone(), entry);
201
202 let mut l2_cache = self.l2_cache.write().await;
204 l2_cache.remove(&key);
205
206 {
208 let mut stats = self.stats.write().unwrap();
209 stats.overall_stats.promotions += 1;
210 }
211
212 Ok(())
213 }
214
215 async fn demote_to_l2(
217 &self,
218 key: K,
219 mut entry: MultiLevelCacheEntry<V>,
220 ) -> Result<(), CacheError> {
221 {
223 let l2_cache = self.l2_cache.read().await;
224 if l2_cache.len() >= self.config.l2_config.max_entries {
225 drop(l2_cache);
226 self.evict_l2_entries().await?;
227 }
228 }
229
230 entry.level = 2;
232
233 {
235 let mut l2_cache = self.l2_cache.write().await;
236 l2_cache.insert(key.clone(), entry);
237 }
238
239 self.l1_cache.remove(&key);
241
242 {
244 let mut stats = self.stats.write().unwrap();
245 stats.overall_stats.demotions += 1;
246 }
247
248 Ok(())
249 }
250
251 async fn evict_l1_entries(&self) -> Result<(), CacheError> {
253 let eviction_count = (self.l1_cache.len() as f64 * 0.1).max(1.0) as usize;
254
255 let mut entries: Vec<(K, f64)> = self
257 .l1_cache
258 .iter()
259 .map(|entry| {
260 let priority = entry.value().calculate_priority();
261 (entry.key().clone(), priority)
262 })
263 .collect();
264
265 entries.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
267
268 for (key, _) in entries.into_iter().take(eviction_count) {
270 if let Some((_, entry)) = self.l1_cache.remove(&key) {
271 if entry.access_count > 1 {
273 self.demote_to_l2(key, entry).await?;
274 }
275 }
276 }
277
278 Ok(())
279 }
280
281 async fn evict_l2_entries(&self) -> Result<(), CacheError> {
283 let mut l2_cache = self.l2_cache.write().await;
284 let eviction_count = (l2_cache.len() as f64 * 0.1).max(1.0) as usize;
285
286 let mut entries: Vec<(K, f64)> = l2_cache
288 .iter()
289 .map(|(key, entry)| {
290 let priority = entry.calculate_priority();
291 (key.clone(), priority)
292 })
293 .collect();
294
295 entries.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
297
298 for (key, _) in entries.into_iter().take(eviction_count) {
300 l2_cache.remove(&key);
301 }
302
303 Ok(())
304 }
305
306 async fn update_statistics(&self) {
308 let l1_entries = self.l1_cache.len();
310 let l1_usage_bytes = self
311 .l1_cache
312 .iter()
313 .map(|entry| entry.value().size_bytes)
314 .sum();
315
316 let (l2_entries, l2_usage_bytes) = {
318 let l2_cache = self.l2_cache.read().await;
319 let entries = l2_cache.len();
320 let usage_bytes = l2_cache.values().map(|entry| entry.size_bytes).sum();
321 (entries, usage_bytes)
322 };
323
324 {
326 let mut stats = self.stats.write().unwrap();
327
328 stats.l1_stats.entries = l1_entries;
330 stats.l1_stats.usage_bytes = l1_usage_bytes;
331
332 stats.l2_stats.entries = l2_entries;
334 stats.l2_stats.usage_bytes = l2_usage_bytes;
335
336 stats.update_overall_stats();
338 }
339 }
340}
341
342#[async_trait]
343impl<K, V> UnifiedCache<K, V> for IntelligentCacheManager<K, V>
344where
345 K: Clone + Hash + Eq + Send + Sync + std::fmt::Debug + 'static,
346 V: Clone + Send + Sync + std::fmt::Debug + 'static,
347{
348 async fn get(&self, key: &K) -> Option<V> {
350 let start_time = Instant::now();
351
352 self.record_access_pattern(key).await;
354
355 if let Some(mut entry) = self.l1_cache.get_mut(key) {
357 if entry.is_valid() {
358 entry.mark_accessed();
359
360 {
362 let mut stats = self.stats.write().unwrap();
363 stats.l1_stats.hits += 1;
364 }
365
366 self.monitor.record_get_latency(start_time.elapsed()).await;
368
369 return Some(entry.value.clone());
370 } else {
371 drop(entry);
373 self.l1_cache.remove(key);
374 }
375 }
376
377 {
379 let mut l2_cache = self.l2_cache.write().await;
380 if let Some(entry) = l2_cache.get_mut(key) {
381 if entry.is_valid() {
382 entry.mark_accessed();
383 let value = entry.value.clone();
384
385 if entry.access_count >= self.config.l1_config.promotion_threshold {
387 let promoted_entry = entry.clone();
388 l2_cache.remove(key);
389 drop(l2_cache);
390
391 if let Err(e) = self.promote_to_l1(key.clone(), promoted_entry).await {
392 warn!("Failed to promote to L1: {:?}", e);
393 }
394 }
395
396 {
398 let mut stats = self.stats.write().unwrap();
399 stats.l2_stats.hits += 1;
400 }
401
402 self.monitor.record_get_latency(start_time.elapsed()).await;
404
405 return Some(value);
406 } else {
407 l2_cache.remove(key);
409 }
410 }
411 }
412
413 {
415 let mut stats = self.stats.write().unwrap();
416 stats.l1_stats.misses += 1;
417 stats.l2_stats.misses += 1;
418 }
419
420 if self.config.preheating_config.enable_predictive_preheating {
422 self.preheater.predict_and_preheat(key).await;
423 }
424
425 self.monitor.record_get_latency(start_time.elapsed()).await;
427
428 None
429 }
430
431 async fn put(&self, key: K, value: V) -> Result<(), CacheError> {
433 let start_time = Instant::now();
434
435 let size_bytes = std::mem::size_of::<V>() as u64;
437
438 let entry =
440 MultiLevelCacheEntry::new(value, self.config.l1_config.default_ttl, 1, size_bytes);
441
442 if self.l1_cache.len() >= self.config.l1_config.max_entries {
444 self.evict_l1_entries().await?;
445 }
446
447 self.l1_cache.insert(key.clone(), entry);
449
450 self.update_statistics().await;
452
453 self.monitor.record_put_latency(start_time.elapsed()).await;
455
456 if self.config.tuning_config.enable_adaptive_tuning {
458 self.tuner.analyze_and_tune().await;
459 }
460
461 Ok(())
462 }
463
464 async fn remove(&self, key: &K) -> bool {
466 let l1_removed = self.l1_cache.remove(key).is_some();
467
468 let l2_removed = {
469 let mut l2_cache = self.l2_cache.write().await;
470 l2_cache.remove(key).is_some()
471 };
472
473 if l1_removed || l2_removed {
475 self.update_statistics().await;
476 }
477
478 l1_removed || l2_removed
479 }
480
481 async fn contains_key(&self, key: &K) -> bool {
483 if let Some(entry) = self.l1_cache.get(key) {
485 if entry.is_valid() {
486 return true;
487 }
488 }
489
490 let l2_cache = self.l2_cache.read().await;
492 if let Some(entry) = l2_cache.get(key) {
493 return entry.is_valid();
494 }
495
496 false
497 }
498
499 async fn get_stats(&self) -> UnifiedCacheStats {
501 self.update_statistics().await;
502 self.stats.read().unwrap().clone()
503 }
504
505 async fn clear(&self) -> Result<(), CacheError> {
507 self.l1_cache.clear();
508
509 {
510 let mut l2_cache = self.l2_cache.write().await;
511 l2_cache.clear();
512 }
513
514 {
516 let mut stats = self.stats.write().unwrap();
517 *stats = UnifiedCacheStats::default();
518 }
519
520 Ok(())
521 }
522
523 async fn size(&self) -> usize {
525 let l1_size = self.l1_cache.len();
526 let l2_size = {
527 let l2_cache = self.l2_cache.read().await;
528 l2_cache.len()
529 };
530 l1_size + l2_size
531 }
532
533 async fn is_empty(&self) -> bool {
535 self.size().await == 0
536 }
537
538 async fn capacity(&self) -> usize {
540 self.config.l1_config.max_entries + self.config.l2_config.max_entries
541 }
542
543 fn cache_type(&self) -> &'static str {
545 "IntelligentCacheManager"
546 }
547}