use crate::{
AdaptiveTuner, CacheError, PerformanceMonitor, PredictivePreheater, UnifiedCache,
UnifiedCacheConfig, UnifiedCacheStats,
};
use dashmap::DashMap;
use tracing::warn;
use async_trait::async_trait;
use std::{
collections::HashMap,
hash::Hash,
sync::{Arc, RwLock},
time::{Duration, Instant, SystemTime},
};
use tokio::sync::RwLock as AsyncRwLock;
#[derive(Debug, Clone)]
pub struct MultiLevelCacheEntry<V> {
pub value: V,
pub created_at: SystemTime,
pub last_accessed: SystemTime,
pub access_count: u64,
pub level: u8,
pub size_bytes: u64,
pub ttl: u64,
pub prediction_score: f64,
}
impl<V> MultiLevelCacheEntry<V> {
pub fn new(value: V, ttl: u64, level: u8, size_bytes: u64) -> Self {
let now = SystemTime::now();
Self {
value,
created_at: now,
last_accessed: now,
access_count: 1,
level,
size_bytes,
ttl,
prediction_score: 0.0,
}
}
pub fn is_valid(&self) -> bool {
if self.ttl == 0 {
return true; }
let elapsed = self
.created_at
.elapsed()
.unwrap_or(Duration::from_secs(u64::MAX))
.as_secs();
elapsed < self.ttl
}
pub fn mark_accessed(&mut self) {
self.last_accessed = SystemTime::now();
self.access_count += 1;
}
pub fn calculate_priority(&self) -> f64 {
let age_factor = self
.last_accessed
.elapsed()
.unwrap_or(Duration::from_secs(0))
.as_secs() as f64;
let frequency_factor = self.access_count as f64;
let size_factor = 1.0 / (self.size_bytes as f64 + 1.0);
(frequency_factor * size_factor) / (age_factor + 1.0)
}
}
#[derive(Debug)]
pub struct IntelligentCacheManager<K, V>
where
K: Clone + Hash + Eq + Send + Sync + std::fmt::Debug + 'static,
V: Clone + Send + Sync + std::fmt::Debug + 'static,
{
config: UnifiedCacheConfig,
l1_cache: Arc<DashMap<K, MultiLevelCacheEntry<V>>>,
l2_cache: Arc<AsyncRwLock<HashMap<K, MultiLevelCacheEntry<V>>>>,
preheater: Arc<PredictivePreheater<K>>,
tuner: Arc<AdaptiveTuner>,
monitor: Arc<PerformanceMonitor>,
stats: Arc<RwLock<UnifiedCacheStats>>,
access_patterns: Arc<RwLock<HashMap<K, Vec<SystemTime>>>>,
}
impl<K, V> IntelligentCacheManager<K, V>
where
K: Clone + Hash + Eq + Send + Sync + std::fmt::Debug + 'static,
V: Clone + Send + Sync + std::fmt::Debug + 'static,
{
pub fn new(config: UnifiedCacheConfig) -> Self {
let preheater = Arc::new(PredictivePreheater::new(config.preheating_config.clone()));
let tuner = Arc::new(AdaptiveTuner::new(config.tuning_config.clone()));
let monitor = Arc::new(PerformanceMonitor::new(config.monitoring_config.clone()));
Self {
config,
l1_cache: Arc::new(DashMap::new()),
l2_cache: Arc::new(AsyncRwLock::new(HashMap::new())),
preheater,
tuner,
monitor,
stats: Arc::new(RwLock::new(UnifiedCacheStats::default())),
access_patterns: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn config(&self) -> &UnifiedCacheConfig {
&self.config
}
pub fn preheater(&self) -> Arc<PredictivePreheater<K>> {
Arc::clone(&self.preheater)
}
pub fn tuner(&self) -> Arc<AdaptiveTuner> {
Arc::clone(&self.tuner)
}
pub fn monitor(&self) -> Arc<PerformanceMonitor> {
Arc::clone(&self.monitor)
}
async fn record_access_pattern(&self, key: &K) {
if !self.config.preheating_config.enable_pattern_learning {
return;
}
let mut patterns = self.access_patterns.write().unwrap();
let now = SystemTime::now();
patterns.entry(key.clone()).or_default().push(now);
let cutoff =
now - Duration::from_secs(self.config.preheating_config.pattern_window_seconds);
if let Some(times) = patterns.get_mut(key) {
times.retain(|&time| time > cutoff);
}
}
async fn promote_to_l1(
&self,
key: K,
mut entry: MultiLevelCacheEntry<V>,
) -> Result<(), CacheError> {
if self.l1_cache.len() >= self.config.l1_config.max_entries {
self.evict_l1_entries().await?;
}
entry.level = 1;
entry.mark_accessed();
self.l1_cache.insert(key.clone(), entry);
let mut l2_cache = self.l2_cache.write().await;
l2_cache.remove(&key);
{
let mut stats = self.stats.write().unwrap();
stats.overall_stats.promotions += 1;
}
Ok(())
}
async fn demote_to_l2(
&self,
key: K,
mut entry: MultiLevelCacheEntry<V>,
) -> Result<(), CacheError> {
{
let l2_cache = self.l2_cache.read().await;
if l2_cache.len() >= self.config.l2_config.max_entries {
drop(l2_cache);
self.evict_l2_entries().await?;
}
}
entry.level = 2;
{
let mut l2_cache = self.l2_cache.write().await;
l2_cache.insert(key.clone(), entry);
}
self.l1_cache.remove(&key);
{
let mut stats = self.stats.write().unwrap();
stats.overall_stats.demotions += 1;
}
Ok(())
}
async fn evict_l1_entries(&self) -> Result<(), CacheError> {
let eviction_count = (self.l1_cache.len() as f64 * 0.1).max(1.0) as usize;
let mut entries: Vec<(K, f64)> = self
.l1_cache
.iter()
.map(|entry| {
let priority = entry.value().calculate_priority();
(entry.key().clone(), priority)
})
.collect();
entries.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
for (key, _) in entries.into_iter().take(eviction_count) {
if let Some((_, entry)) = self.l1_cache.remove(&key) {
if entry.access_count > 1 {
self.demote_to_l2(key, entry).await?;
}
}
}
Ok(())
}
async fn evict_l2_entries(&self) -> Result<(), CacheError> {
let mut l2_cache = self.l2_cache.write().await;
let eviction_count = (l2_cache.len() as f64 * 0.1).max(1.0) as usize;
let mut entries: Vec<(K, f64)> = l2_cache
.iter()
.map(|(key, entry)| {
let priority = entry.calculate_priority();
(key.clone(), priority)
})
.collect();
entries.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
for (key, _) in entries.into_iter().take(eviction_count) {
l2_cache.remove(&key);
}
Ok(())
}
async fn update_statistics(&self) {
let l1_entries = self.l1_cache.len();
let l1_usage_bytes = self
.l1_cache
.iter()
.map(|entry| entry.value().size_bytes)
.sum();
let (l2_entries, l2_usage_bytes) = {
let l2_cache = self.l2_cache.read().await;
let entries = l2_cache.len();
let usage_bytes = l2_cache.values().map(|entry| entry.size_bytes).sum();
(entries, usage_bytes)
};
{
let mut stats = self.stats.write().unwrap();
stats.l1_stats.entries = l1_entries;
stats.l1_stats.usage_bytes = l1_usage_bytes;
stats.l2_stats.entries = l2_entries;
stats.l2_stats.usage_bytes = l2_usage_bytes;
stats.update_overall_stats();
}
}
}
#[async_trait]
impl<K, V> UnifiedCache<K, V> for IntelligentCacheManager<K, V>
where
K: Clone + Hash + Eq + Send + Sync + std::fmt::Debug + 'static,
V: Clone + Send + Sync + std::fmt::Debug + 'static,
{
async fn get(&self, key: &K) -> Option<V> {
let start_time = Instant::now();
self.record_access_pattern(key).await;
if let Some(mut entry) = self.l1_cache.get_mut(key) {
if entry.is_valid() {
entry.mark_accessed();
{
let mut stats = self.stats.write().unwrap();
stats.l1_stats.hits += 1;
}
self.monitor.record_get_latency(start_time.elapsed()).await;
return Some(entry.value.clone());
} else {
drop(entry);
self.l1_cache.remove(key);
}
}
{
let mut l2_cache = self.l2_cache.write().await;
if let Some(entry) = l2_cache.get_mut(key) {
if entry.is_valid() {
entry.mark_accessed();
let value = entry.value.clone();
if entry.access_count >= self.config.l1_config.promotion_threshold {
let promoted_entry = entry.clone();
l2_cache.remove(key);
drop(l2_cache);
if let Err(e) = self.promote_to_l1(key.clone(), promoted_entry).await {
warn!("Failed to promote to L1: {:?}", e);
}
}
{
let mut stats = self.stats.write().unwrap();
stats.l2_stats.hits += 1;
}
self.monitor.record_get_latency(start_time.elapsed()).await;
return Some(value);
} else {
l2_cache.remove(key);
}
}
}
{
let mut stats = self.stats.write().unwrap();
stats.l1_stats.misses += 1;
stats.l2_stats.misses += 1;
}
if self.config.preheating_config.enable_predictive_preheating {
self.preheater.predict_and_preheat(key).await;
}
self.monitor.record_get_latency(start_time.elapsed()).await;
None
}
async fn put(&self, key: K, value: V) -> Result<(), CacheError> {
let start_time = Instant::now();
let size_bytes = std::mem::size_of::<V>() as u64;
let entry =
MultiLevelCacheEntry::new(value, self.config.l1_config.default_ttl, 1, size_bytes);
if self.l1_cache.len() >= self.config.l1_config.max_entries {
self.evict_l1_entries().await?;
}
self.l1_cache.insert(key.clone(), entry);
self.update_statistics().await;
self.monitor.record_put_latency(start_time.elapsed()).await;
if self.config.tuning_config.enable_adaptive_tuning {
self.tuner.analyze_and_tune().await;
}
Ok(())
}
async fn remove(&self, key: &K) -> bool {
let l1_removed = self.l1_cache.remove(key).is_some();
let l2_removed = {
let mut l2_cache = self.l2_cache.write().await;
l2_cache.remove(key).is_some()
};
if l1_removed || l2_removed {
self.update_statistics().await;
}
l1_removed || l2_removed
}
async fn contains_key(&self, key: &K) -> bool {
if let Some(entry) = self.l1_cache.get(key) {
if entry.is_valid() {
return true;
}
}
let l2_cache = self.l2_cache.read().await;
if let Some(entry) = l2_cache.get(key) {
return entry.is_valid();
}
false
}
async fn get_stats(&self) -> UnifiedCacheStats {
self.update_statistics().await;
self.stats.read().unwrap().clone()
}
async fn clear(&self) -> Result<(), CacheError> {
self.l1_cache.clear();
{
let mut l2_cache = self.l2_cache.write().await;
l2_cache.clear();
}
{
let mut stats = self.stats.write().unwrap();
*stats = UnifiedCacheStats::default();
}
Ok(())
}
async fn size(&self) -> usize {
let l1_size = self.l1_cache.len();
let l2_size = {
let l2_cache = self.l2_cache.read().await;
l2_cache.len()
};
l1_size + l2_size
}
async fn is_empty(&self) -> bool {
self.size().await == 0
}
async fn capacity(&self) -> usize {
self.config.l1_config.max_entries + self.config.l2_config.max_entries
}
fn cache_type(&self) -> &'static str {
"IntelligentCacheManager"
}
}