use crate::config::L1Config;
use crate::compression::Compressor;
use crate::error::{CacheError, CacheResult};
use crate::ttl::TtlManager;
use crate::types::{CacheValue, EvictionStrategy, CacheLayer, CacheOperation};
use bytes::Bytes;
use dashmap::DashMap;
use parking_lot::RwLock;
use std::collections::VecDeque;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::Mutex;
#[derive(Debug)]
pub struct L1Cache {
config: Arc<L1Config>,
storage: Arc<DashMap<String, CacheValue>>,
compressor: Arc<Compressor>,
ttl_manager: Arc<TtlManager>,
lru_order: Arc<Mutex<VecDeque<String>>>,
lfu_counter: Arc<DashMap<String, AtomicU64>>,
fifo_order: Arc<Mutex<VecDeque<String>>>,
memory_usage: Arc<AtomicUsize>,
entry_count: Arc<AtomicUsize>,
eviction_stats: Arc<RwLock<EvictionStats>>,
}
#[derive(Debug, Clone, Default)]
pub struct EvictionStats {
lru_evictions: u64,
lfu_evictions: u64,
fifo_evictions: u64,
ttl_evictions: u64,
total_evictions: u64,
evicted_bytes: u64,
}
impl L1Cache {
pub async fn new(
config: L1Config,
compressor: Compressor,
ttl_manager: Arc<TtlManager>,
) -> CacheResult<Self> {
let config_for_log = config.clone();
let cache = Self {
config: Arc::new(config),
storage: Arc::new(DashMap::new()),
compressor: Arc::new(compressor),
ttl_manager,
lru_order: Arc::new(Mutex::new(VecDeque::new())),
lfu_counter: Arc::new(DashMap::new()),
fifo_order: Arc::new(Mutex::new(VecDeque::new())),
memory_usage: Arc::new(AtomicUsize::new(0)),
entry_count: Arc::new(AtomicUsize::new(0)),
eviction_stats: Arc::new(RwLock::new(EvictionStats::default())),
};
rat_logger::debug!("[L1] 缓存已初始化,最大内存: {} bytes,最大条目: {}",
config_for_log.max_memory, config_for_log.max_entries);
Ok(cache)
}
pub async fn get(&self, key: &str) -> CacheResult<Option<Bytes>> {
let start_time = Instant::now();
if self.ttl_manager.is_expired(key).await {
self.remove_internal(key).await;
return Ok(None);
}
if let Some(cache_value) = self.storage.get(key) {
self.update_access_stats(key).await;
let data = Bytes::from(cache_value.data.clone());
rat_logger::debug!("[L1] 缓存命中: {}", key);
Ok(Some(data))
} else {
rat_logger::debug!("[L1] 缓存未命中: {}", key);
Ok(None)
}
}
pub async fn set(&self, key: String, value: Bytes, ttl_seconds: Option<u64>) -> CacheResult<()> {
let start_time = Instant::now();
let cache_value = CacheValue::new_uncompressed(value.to_vec());
let value_size = cache_value.size();
self.ensure_capacity(value_size).await?;
let is_update = self.storage.contains_key(&key);
if let Some(old_value) = self.storage.insert(key.clone(), cache_value) {
let old_size = old_value.size();
self.memory_usage.fetch_sub(old_size, Ordering::Relaxed);
} else {
self.entry_count.fetch_add(1, Ordering::Relaxed);
}
self.memory_usage.fetch_add(value_size, Ordering::Relaxed);
if !is_update {
self.update_insertion_stats(&key).await;
}
self.update_access_stats(&key).await;
if ttl_seconds.is_some() || self.ttl_manager.get_ttl(&key).await.is_none() {
self.ttl_manager.add_key(key.clone(), ttl_seconds).await?;
}
rat_logger::debug!("[L1] 缓存设置: {} (未压缩)", key);
Ok(())
}
pub async fn delete(&self, key: &str) -> CacheResult<bool> {
let start_time = Instant::now();
let removed = self.remove_internal(key).await;
if removed {
rat_logger::debug!("[L1] 缓存删除: {}", key);
}
Ok(removed)
}
pub async fn clear(&self) -> CacheResult<()> {
let _start_time = Instant::now();
let old_count = self.entry_count.load(Ordering::Relaxed);
self.storage.clear();
self.lru_order.lock().await.clear();
self.lfu_counter.clear();
self.fifo_order.lock().await.clear();
self.memory_usage.store(0, Ordering::Relaxed);
self.entry_count.store(0, Ordering::Relaxed);
rat_logger::debug!("[L1] 缓存已清空,删除了 {} 个条目", old_count);
Ok(())
}
pub async fn get_stats(&self) -> L1CacheStats {
let eviction_stats = self.eviction_stats.read().clone();
L1CacheStats {
entry_count: self.entry_count.load(Ordering::Relaxed),
memory_usage: self.memory_usage.load(Ordering::Relaxed),
max_memory: self.config.max_memory,
max_entries: self.config.max_entries,
memory_utilization: self.memory_usage.load(Ordering::Relaxed) as f64 / self.config.max_memory as f64,
entry_utilization: self.entry_count.load(Ordering::Relaxed) as f64 / self.config.max_entries as f64,
eviction_stats,
}
}
pub fn contains_key(&self, key: &str) -> bool {
self.storage.contains_key(key)
}
pub fn keys(&self) -> Vec<String> {
self.storage.iter().map(|entry| entry.key().clone()).collect()
}
pub fn len(&self) -> usize {
self.entry_count.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
async fn remove_internal(&self, key: &str) -> bool {
if let Some((_, old_value)) = self.storage.remove(key) {
let old_size = old_value.size();
self.memory_usage.fetch_sub(old_size, Ordering::Relaxed);
self.entry_count.fetch_sub(1, Ordering::Relaxed);
self.cleanup_access_stats(key).await;
self.ttl_manager.remove_key(key).await;
true
} else {
false
}
}
async fn ensure_capacity(&self, required_size: usize) -> CacheResult<()> {
let current_memory = self.memory_usage.load(Ordering::Relaxed);
let current_entries = self.entry_count.load(Ordering::Relaxed);
if current_memory + required_size > self.config.max_memory {
let needed_space = current_memory + required_size - self.config.max_memory;
if self.evict_by_memory(required_size).await.is_err() {
return Err(CacheError::out_of_memory(needed_space));
}
}
if current_entries >= self.config.max_entries {
if self.evict_by_count(1).await.is_err() {
return Err(CacheError::cache_full(current_entries, self.config.max_entries));
}
}
Ok(())
}
async fn evict_by_memory(&self, required_size: usize) -> CacheResult<()> {
let target_memory = self.config.max_memory - required_size;
let mut evicted_bytes = 0;
let mut evicted_count = 0;
while self.memory_usage.load(Ordering::Relaxed) > target_memory && !self.storage.is_empty() {
if let Some(key) = self.select_eviction_candidate().await {
if let Some((_, value)) = self.storage.remove(&key) {
let size = value.size();
evicted_bytes += size;
evicted_count += 1;
self.memory_usage.fetch_sub(size, Ordering::Relaxed);
self.entry_count.fetch_sub(1, Ordering::Relaxed);
self.cleanup_access_stats(&key).await;
self.ttl_manager.remove_key(&key).await;
rat_logger::debug!("[L1] 驱逐键: {} ({}字节)", key, size);
} else {
break;
}
} else {
break;
}
}
if evicted_count > 0 {
self.update_eviction_stats(evicted_count, evicted_bytes).await;
rat_logger::debug!("[L1] 内存驱逐完成: {} 个条目,{} 字节",
evicted_count, evicted_bytes);
}
Ok(())
}
async fn evict_by_count(&self, required_count: usize) -> CacheResult<()> {
let mut evicted_bytes = 0;
let mut evicted_count = 0;
for _ in 0..required_count {
if let Some(key) = self.select_eviction_candidate().await {
if let Some((_, value)) = self.storage.remove(&key) {
let size = value.size();
evicted_bytes += size;
evicted_count += 1;
self.memory_usage.fetch_sub(size, Ordering::Relaxed);
self.entry_count.fetch_sub(1, Ordering::Relaxed);
self.cleanup_access_stats(&key).await;
self.ttl_manager.remove_key(&key).await;
rat_logger::debug!("[L1] 驱逐键: {} ({}字节)", key, size);
} else {
break;
}
} else {
break;
}
}
if evicted_count > 0 {
self.update_eviction_stats(evicted_count, evicted_bytes).await;
rat_logger::debug!("[L1] 条目驱逐完成: {} 个条目,{} 字节",
evicted_count, evicted_bytes);
}
Ok(())
}
async fn select_eviction_candidate(&self) -> Option<String> {
match self.config.eviction_strategy {
EvictionStrategy::Lru => self.select_lru_candidate().await,
EvictionStrategy::Lfu => self.select_lfu_candidate().await,
EvictionStrategy::Fifo => self.select_fifo_candidate().await,
EvictionStrategy::LruLfu => self.select_lru_lfu_candidate().await,
EvictionStrategy::TtlBased => self.select_ttl_candidate().await,
}
}
async fn select_lru_candidate(&self) -> Option<String> {
let mut lru_order = self.lru_order.lock().await;
lru_order.pop_front()
}
async fn select_lfu_candidate(&self) -> Option<String> {
let mut min_count = u64::MAX;
let mut candidate = None;
for entry in self.lfu_counter.iter() {
let count = entry.value().load(Ordering::Relaxed);
if count < min_count {
min_count = count;
candidate = Some(entry.key().clone());
}
}
candidate
}
async fn select_fifo_candidate(&self) -> Option<String> {
let mut fifo_order = self.fifo_order.lock().await;
fifo_order.pop_front()
}
async fn select_lru_lfu_candidate(&self) -> Option<String> {
let mut hasher = DefaultHasher::new();
std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default().as_nanos().hash(&mut hasher);
let random_value = (hasher.finish() % 100) as f64 / 100.0;
if random_value < 0.7 {
self.select_lru_candidate().await
} else {
self.select_lfu_candidate().await
}
}
async fn select_ttl_candidate(&self) -> Option<String> {
let expired_keys = self.ttl_manager.get_expired_keys(1).await;
if !expired_keys.is_empty() {
return Some(expired_keys[0].clone());
}
self.select_lru_candidate().await
}
async fn update_access_stats(&self, key: &str) {
let mut lru_order = self.lru_order.lock().await;
lru_order.retain(|k| k != key);
lru_order.push_back(key.to_string());
drop(lru_order);
self.lfu_counter.entry(key.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
}
async fn update_insertion_stats(&self, key: &str) {
let mut fifo_order = self.fifo_order.lock().await;
fifo_order.push_back(key.to_string());
}
async fn cleanup_access_stats(&self, key: &str) {
let mut lru_order = self.lru_order.lock().await;
lru_order.retain(|k| k != key);
drop(lru_order);
self.lfu_counter.remove(key);
let mut fifo_order = self.fifo_order.lock().await;
fifo_order.retain(|k| k != key);
}
async fn update_eviction_stats(&self, count: usize, bytes: usize) {
let mut stats = self.eviction_stats.write();
stats.total_evictions += count as u64;
stats.evicted_bytes += bytes as u64;
match self.config.eviction_strategy {
EvictionStrategy::Lru => stats.lru_evictions += count as u64,
EvictionStrategy::Lfu => stats.lfu_evictions += count as u64,
EvictionStrategy::Fifo => stats.fifo_evictions += count as u64,
EvictionStrategy::TtlBased => stats.ttl_evictions += count as u64,
EvictionStrategy::LruLfu => {
stats.lru_evictions += (count as f64 * 0.7) as u64;
stats.lfu_evictions += (count as f64 * 0.3) as u64;
}
}
}
}
#[derive(Debug, Clone, Default)]
pub struct L1CacheStats {
pub entry_count: usize,
pub memory_usage: usize,
pub max_memory: usize,
pub max_entries: usize,
pub memory_utilization: f64,
pub entry_utilization: f64,
pub eviction_stats: EvictionStats,
}
impl L1CacheStats {
pub fn format(&self) -> String {
format!(
"L1 缓存统计:\n\
条目数: {}/{}({:.1}%)\n\
内存使用: {}/{} bytes ({:.1}%)\n\
总驱逐: {} 次 ({} bytes)\n\
LRU驱逐: {}, LFU驱逐: {}, FIFO驱逐: {}, TTL驱逐: {}",
self.entry_count, self.max_entries, self.entry_utilization * 100.0,
self.memory_usage, self.max_memory, self.memory_utilization * 100.0,
self.eviction_stats.total_evictions, self.eviction_stats.evicted_bytes,
self.eviction_stats.lru_evictions, self.eviction_stats.lfu_evictions,
self.eviction_stats.fifo_evictions, self.eviction_stats.ttl_evictions
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{L1Config, LoggingConfig, TtlConfig};
use crate::compression::Compressor;
use crate::ttl::TtlManager;
use bytes::Bytes;
async fn create_test_cache() -> L1Cache {
let l1_config = L1Config {
max_memory: 1024 * 1024, max_entries: 1000,
eviction_strategy: EvictionStrategy::Lru,
};
let logging_config = LoggingConfig {
level: "debug".to_string(),
enable_colors: false,
show_timestamp: false,
enable_performance_logs: true,
enable_audit_logs: false,
enable_cache_logs: true,
enable_logging: true,
enable_async: false,
batch_size: 2048,
batch_interval_ms: 25,
buffer_size: 16384,
};
let ttl_config = TtlConfig {
expire_seconds: Some(60),
cleanup_interval: 60,
max_cleanup_entries: 100,
lazy_expiration: true,
active_expiration: true,
};
let compressor = Compressor::new_disabled();
let ttl_manager = Arc::new(TtlManager::new(ttl_config).await.unwrap());
L1Cache::new(l1_config, compressor, ttl_manager).await.unwrap()
}
#[tokio::test]
async fn test_cache_creation() {
let cache = create_test_cache().await;
assert_eq!(cache.len(), 0);
assert!(cache.is_empty());
}
#[tokio::test]
async fn test_set_and_get() {
let cache = create_test_cache().await;
let key = "test_key".to_string();
let value = Bytes::from("test_value");
cache.set(key.clone(), value.clone(), None).await.unwrap();
let retrieved = cache.get(&key).await.unwrap();
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap(), value);
}
#[tokio::test]
async fn test_delete() {
let cache = create_test_cache().await;
let key = "test_key".to_string();
let value = Bytes::from("test_value");
cache.set(key.clone(), value, None).await.unwrap();
assert!(cache.contains_key(&key));
let deleted = cache.delete(&key).await.unwrap();
assert!(deleted);
assert!(!cache.contains_key(&key));
}
#[tokio::test]
async fn test_clear() {
let cache = create_test_cache().await;
for i in 0..10 {
let key = format!("key_{}", i);
let value = Bytes::from(format!("value_{}", i));
cache.set(key, value, None).await.unwrap();
}
assert_eq!(cache.len(), 10);
cache.clear().await.unwrap();
assert_eq!(cache.len(), 0);
assert!(cache.is_empty());
}
#[tokio::test]
async fn test_eviction() {
let mut l1_config = L1Config {
max_memory: 1024, max_entries: 5, eviction_strategy: EvictionStrategy::Lru,
};
let logging_config = LoggingConfig {
level: "debug".to_string(),
enable_colors: false,
show_timestamp: false,
enable_performance_logs: true,
enable_audit_logs: false,
enable_cache_logs: true,
enable_logging: true,
enable_async: false,
batch_size: 2048,
batch_interval_ms: 25,
buffer_size: 16384,
};
let ttl_config = TtlConfig {
expire_seconds: None,
cleanup_interval: 60,
max_cleanup_entries: 100,
lazy_expiration: true,
active_expiration: false,
};
let compressor = Compressor::new_disabled();
let ttl_manager = Arc::new(TtlManager::new(ttl_config).await.unwrap());
let cache = L1Cache::new(l1_config, compressor, ttl_manager).await.unwrap();
for i in 0..10 {
let key = format!("key_{}", i);
let value = Bytes::from(vec![b'x'; 200]); cache.set(key, value, None).await.unwrap();
}
assert!(cache.len() <= 5);
let stats = cache.get_stats().await;
assert!(stats.eviction_stats.total_evictions > 0);
}
}