1use crate::{
7 AdaptiveTuner, CacheError, PerformanceMonitor, PredictivePreheater, UnifiedCache,
8 UnifiedCacheConfig, UnifiedCacheStats,
9};
10use dashmap::DashMap;
11
12use async_trait::async_trait;
13use std::{
14 collections::HashMap,
15 hash::Hash,
16 sync::{Arc, RwLock},
17 time::{Duration, Instant, SystemTime},
18};
19use tokio::sync::RwLock as AsyncRwLock;
20
21#[derive(Debug, Clone)]
23pub struct MultiLevelCacheEntry<V> {
24 pub value: V,
26 pub created_at: SystemTime,
28 pub last_accessed: SystemTime,
30 pub access_count: u64,
32 pub level: u8,
34 pub size_bytes: u64,
36 pub ttl: u64,
38 pub prediction_score: f64,
40}
41
42impl<V> MultiLevelCacheEntry<V> {
43 pub fn new(value: V, ttl: u64, level: u8, size_bytes: u64) -> Self {
45 let now = SystemTime::now();
46 Self {
47 value,
48 created_at: now,
49 last_accessed: now,
50 access_count: 1,
51 level,
52 size_bytes,
53 ttl,
54 prediction_score: 0.0,
55 }
56 }
57
58 pub fn is_valid(&self) -> bool {
60 if self.ttl == 0 {
61 return true; }
63
64 let elapsed = self
65 .created_at
66 .elapsed()
67 .unwrap_or(Duration::from_secs(u64::MAX))
68 .as_secs();
69 elapsed < self.ttl
70 }
71
72 pub fn mark_accessed(&mut self) {
74 self.last_accessed = SystemTime::now();
75 self.access_count += 1;
76 }
77
78 pub fn calculate_priority(&self) -> f64 {
80 let age_factor = self
81 .last_accessed
82 .elapsed()
83 .unwrap_or(Duration::from_secs(0))
84 .as_secs() as f64;
85 let frequency_factor = self.access_count as f64;
86 let size_factor = 1.0 / (self.size_bytes as f64 + 1.0);
87
88 (frequency_factor * size_factor) / (age_factor + 1.0)
90 }
91}
92
93#[derive(Debug)]
98pub struct IntelligentCacheManager<K, V>
99where
100 K: Clone + Hash + Eq + Send + Sync + std::fmt::Debug + 'static,
101 V: Clone + Send + Sync + std::fmt::Debug + 'static,
102{
103 config: UnifiedCacheConfig,
105 l1_cache: Arc<DashMap<K, MultiLevelCacheEntry<V>>>,
107 l2_cache: Arc<AsyncRwLock<HashMap<K, MultiLevelCacheEntry<V>>>>,
109 preheater: Arc<PredictivePreheater<K>>,
111 tuner: Arc<AdaptiveTuner>,
113 monitor: Arc<PerformanceMonitor>,
115 stats: Arc<RwLock<UnifiedCacheStats>>,
117 access_patterns: Arc<RwLock<HashMap<K, Vec<SystemTime>>>>,
119}
120
121impl<K, V> IntelligentCacheManager<K, V>
122where
123 K: Clone + Hash + Eq + Send + Sync + std::fmt::Debug + 'static,
124 V: Clone + Send + Sync + std::fmt::Debug + 'static,
125{
126 pub fn new(config: UnifiedCacheConfig) -> Self {
128 let preheater = Arc::new(PredictivePreheater::new(config.preheating_config.clone()));
129 let tuner = Arc::new(AdaptiveTuner::new(config.tuning_config.clone()));
130 let monitor = Arc::new(PerformanceMonitor::new(config.monitoring_config.clone()));
131
132 Self {
133 config,
134 l1_cache: Arc::new(DashMap::new()),
135 l2_cache: Arc::new(AsyncRwLock::new(HashMap::new())),
136 preheater,
137 tuner,
138 monitor,
139 stats: Arc::new(RwLock::new(UnifiedCacheStats::default())),
140 access_patterns: Arc::new(RwLock::new(HashMap::new())),
141 }
142 }
143
144 pub fn config(&self) -> &UnifiedCacheConfig {
146 &self.config
147 }
148
149 pub fn preheater(&self) -> Arc<PredictivePreheater<K>> {
151 Arc::clone(&self.preheater)
152 }
153
154 pub fn tuner(&self) -> Arc<AdaptiveTuner> {
156 Arc::clone(&self.tuner)
157 }
158
159 pub fn monitor(&self) -> Arc<PerformanceMonitor> {
161 Arc::clone(&self.monitor)
162 }
163
164 async fn record_access_pattern(&self, key: &K) {
166 if !self.config.preheating_config.enable_pattern_learning {
167 return;
168 }
169
170 let mut patterns = self.access_patterns.write().unwrap();
171 let now = SystemTime::now();
172
173 patterns
174 .entry(key.clone())
175 .or_insert_with(Vec::new)
176 .push(now);
177
178 let cutoff =
180 now - Duration::from_secs(self.config.preheating_config.pattern_window_seconds);
181 if let Some(times) = patterns.get_mut(key) {
182 times.retain(|&time| time > cutoff);
183 }
184 }
185
186 async fn promote_to_l1(
188 &self,
189 key: K,
190 mut entry: MultiLevelCacheEntry<V>,
191 ) -> Result<(), CacheError> {
192 if self.l1_cache.len() >= self.config.l1_config.max_entries {
194 self.evict_l1_entries().await?;
195 }
196
197 entry.level = 1;
199 entry.mark_accessed();
200
201 self.l1_cache.insert(key.clone(), entry);
203
204 let mut l2_cache = self.l2_cache.write().await;
206 l2_cache.remove(&key);
207
208 {
210 let mut stats = self.stats.write().unwrap();
211 stats.overall_stats.promotions += 1;
212 }
213
214 Ok(())
215 }
216
217 async fn demote_to_l2(
219 &self,
220 key: K,
221 mut entry: MultiLevelCacheEntry<V>,
222 ) -> Result<(), CacheError> {
223 {
225 let l2_cache = self.l2_cache.read().await;
226 if l2_cache.len() >= self.config.l2_config.max_entries {
227 drop(l2_cache);
228 self.evict_l2_entries().await?;
229 }
230 }
231
232 entry.level = 2;
234
235 {
237 let mut l2_cache = self.l2_cache.write().await;
238 l2_cache.insert(key.clone(), entry);
239 }
240
241 self.l1_cache.remove(&key);
243
244 {
246 let mut stats = self.stats.write().unwrap();
247 stats.overall_stats.demotions += 1;
248 }
249
250 Ok(())
251 }
252
253 async fn evict_l1_entries(&self) -> Result<(), CacheError> {
255 let eviction_count = (self.l1_cache.len() as f64 * 0.1).max(1.0) as usize;
256
257 let mut entries: Vec<(K, f64)> = self
259 .l1_cache
260 .iter()
261 .map(|entry| {
262 let priority = entry.value().calculate_priority();
263 (entry.key().clone(), priority)
264 })
265 .collect();
266
267 entries.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
269
270 for (key, _) in entries.into_iter().take(eviction_count) {
272 if let Some((_, entry)) = self.l1_cache.remove(&key) {
273 if entry.access_count > 1 {
275 self.demote_to_l2(key, entry).await?;
276 }
277 }
278 }
279
280 Ok(())
281 }
282
283 async fn evict_l2_entries(&self) -> Result<(), CacheError> {
285 let mut l2_cache = self.l2_cache.write().await;
286 let eviction_count = (l2_cache.len() as f64 * 0.1).max(1.0) as usize;
287
288 let mut entries: Vec<(K, f64)> = l2_cache
290 .iter()
291 .map(|(key, entry)| {
292 let priority = entry.calculate_priority();
293 (key.clone(), priority)
294 })
295 .collect();
296
297 entries.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
299
300 for (key, _) in entries.into_iter().take(eviction_count) {
302 l2_cache.remove(&key);
303 }
304
305 Ok(())
306 }
307
308 async fn update_statistics(&self) {
310 let l1_entries = self.l1_cache.len();
312 let l1_usage_bytes = self
313 .l1_cache
314 .iter()
315 .map(|entry| entry.value().size_bytes)
316 .sum();
317
318 let (l2_entries, l2_usage_bytes) = {
320 let l2_cache = self.l2_cache.read().await;
321 let entries = l2_cache.len();
322 let usage_bytes = l2_cache.values().map(|entry| entry.size_bytes).sum();
323 (entries, usage_bytes)
324 };
325
326 {
328 let mut stats = self.stats.write().unwrap();
329
330 stats.l1_stats.entries = l1_entries;
332 stats.l1_stats.usage_bytes = l1_usage_bytes;
333
334 stats.l2_stats.entries = l2_entries;
336 stats.l2_stats.usage_bytes = l2_usage_bytes;
337
338 stats.update_overall_stats();
340 }
341 }
342}
343
344#[async_trait]
345impl<K, V> UnifiedCache<K, V> for IntelligentCacheManager<K, V>
346where
347 K: Clone + Hash + Eq + Send + Sync + std::fmt::Debug + 'static,
348 V: Clone + Send + Sync + std::fmt::Debug + 'static,
349{
350 async fn get(&self, key: &K) -> Option<V> {
352 let start_time = Instant::now();
353
354 self.record_access_pattern(key).await;
356
357 if let Some(mut entry) = self.l1_cache.get_mut(key) {
359 if entry.is_valid() {
360 entry.mark_accessed();
361
362 {
364 let mut stats = self.stats.write().unwrap();
365 stats.l1_stats.hits += 1;
366 }
367
368 self.monitor.record_get_latency(start_time.elapsed()).await;
370
371 return Some(entry.value.clone());
372 } else {
373 drop(entry);
375 self.l1_cache.remove(key);
376 }
377 }
378
379 {
381 let mut l2_cache = self.l2_cache.write().await;
382 if let Some(entry) = l2_cache.get_mut(key) {
383 if entry.is_valid() {
384 entry.mark_accessed();
385 let value = entry.value.clone();
386
387 if entry.access_count >= self.config.l1_config.promotion_threshold {
389 let promoted_entry = entry.clone();
390 l2_cache.remove(key);
391 drop(l2_cache);
392
393 if let Err(e) = self.promote_to_l1(key.clone(), promoted_entry).await {
394 eprintln!("Failed to promote to L1: {:?}", e);
395 }
396 }
397
398 {
400 let mut stats = self.stats.write().unwrap();
401 stats.l2_stats.hits += 1;
402 }
403
404 self.monitor.record_get_latency(start_time.elapsed()).await;
406
407 return Some(value);
408 } else {
409 l2_cache.remove(key);
411 }
412 }
413 }
414
415 {
417 let mut stats = self.stats.write().unwrap();
418 stats.l1_stats.misses += 1;
419 stats.l2_stats.misses += 1;
420 }
421
422 if self.config.preheating_config.enable_predictive_preheating {
424 self.preheater.predict_and_preheat(key).await;
425 }
426
427 self.monitor.record_get_latency(start_time.elapsed()).await;
429
430 None
431 }
432
433 async fn put(&self, key: K, value: V) -> Result<(), CacheError> {
435 let start_time = Instant::now();
436
437 let size_bytes = std::mem::size_of::<V>() as u64;
439
440 let entry =
442 MultiLevelCacheEntry::new(value, self.config.l1_config.default_ttl, 1, size_bytes);
443
444 if self.l1_cache.len() >= self.config.l1_config.max_entries {
446 self.evict_l1_entries().await?;
447 }
448
449 self.l1_cache.insert(key.clone(), entry);
451
452 self.update_statistics().await;
454
455 self.monitor.record_put_latency(start_time.elapsed()).await;
457
458 if self.config.tuning_config.enable_adaptive_tuning {
460 self.tuner.analyze_and_tune().await;
461 }
462
463 Ok(())
464 }
465
466 async fn remove(&self, key: &K) -> bool {
468 let l1_removed = self.l1_cache.remove(key).is_some();
469
470 let l2_removed = {
471 let mut l2_cache = self.l2_cache.write().await;
472 l2_cache.remove(key).is_some()
473 };
474
475 if l1_removed || l2_removed {
477 self.update_statistics().await;
478 }
479
480 l1_removed || l2_removed
481 }
482
483 async fn contains_key(&self, key: &K) -> bool {
485 if let Some(entry) = self.l1_cache.get(key) {
487 if entry.is_valid() {
488 return true;
489 }
490 }
491
492 let l2_cache = self.l2_cache.read().await;
494 if let Some(entry) = l2_cache.get(key) {
495 return entry.is_valid();
496 }
497
498 false
499 }
500
501 async fn get_stats(&self) -> UnifiedCacheStats {
503 self.update_statistics().await;
504 self.stats.read().unwrap().clone()
505 }
506
507 async fn clear(&self) -> Result<(), CacheError> {
509 self.l1_cache.clear();
510
511 {
512 let mut l2_cache = self.l2_cache.write().await;
513 l2_cache.clear();
514 }
515
516 {
518 let mut stats = self.stats.write().unwrap();
519 *stats = UnifiedCacheStats::default();
520 }
521
522 Ok(())
523 }
524
525 async fn size(&self) -> usize {
527 let l1_size = self.l1_cache.len();
528 let l2_size = {
529 let l2_cache = self.l2_cache.read().await;
530 l2_cache.len()
531 };
532 l1_size + l2_size
533 }
534
535 async fn is_empty(&self) -> bool {
537 self.size().await == 0
538 }
539
540 async fn capacity(&self) -> usize {
542 self.config.l1_config.max_entries + self.config.l2_config.max_entries
543 }
544
545 fn cache_type(&self) -> &'static str {
547 "IntelligentCacheManager"
548 }
549}