use crate::compression::Compressor;
use crate::transfer_log;
use crate::config::{CacheConfig, CacheConfigBuilder};
use crate::error::{CacheError, CacheResult};
use crate::l1_cache::{L1Cache, L1CacheStats};
#[cfg(feature = "melange-storage")]
use crate::l2_cache::{L2Cache, L2CacheStats};
use crate::ttl::TtlManager;
use crate::types::{CacheLayer, CacheOperation};
use crate::cache_log;
use bytes::Bytes;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::RwLock;
use tokio::time::{interval, Duration};
#[derive(Debug)]
pub struct RatMemCache {
config: Arc<CacheConfig>,
l1_cache: Arc<L1Cache>,
#[cfg(feature = "melange-storage")]
l2_cache: Option<Arc<L2Cache>>,
ttl_manager: Arc<TtlManager>,
compressor: Arc<Compressor>,
is_running: Arc<RwLock<bool>>,
}
#[derive(Debug)]
pub struct RatMemCacheBuilder {
config_builder: CacheConfigBuilder,
}
#[derive(Debug, Clone)]
pub struct CacheOptions {
pub ttl_seconds: Option<u64>,
pub force_l2: bool,
pub skip_l1: bool,
pub enable_compression: Option<bool>,
}
impl Default for CacheOptions {
fn default() -> Self {
Self {
ttl_seconds: None,
force_l2: false,
skip_l1: false,
enable_compression: None,
}
}
}
impl RatMemCacheBuilder {
pub fn new() -> Self {
Self {
config_builder: CacheConfigBuilder::new(),
}
}
pub fn l1_config(mut self, config: crate::config::L1Config) -> Self {
self.config_builder = self.config_builder.with_l1_config(config);
self
}
#[cfg(feature = "melange-storage")]
pub fn l2_config(mut self, config: crate::config::L2Config) -> Self {
self.config_builder = self.config_builder.with_l2_config(config);
self
}
pub fn ttl_config(mut self, config: crate::config::TtlConfig) -> Self {
self.config_builder = self.config_builder.with_ttl_config(config);
self
}
pub fn performance_config(mut self, config: crate::config::PerformanceConfig) -> Self {
self.config_builder = self.config_builder.with_performance_config(config);
self
}
pub fn logging_config(mut self, config: crate::config::LoggingConfig) -> Self {
self.config_builder = self.config_builder.with_logging_config(config);
self
}
pub async fn build(self) -> CacheResult<RatMemCache> {
let config = self.config_builder.build()?;
RatMemCache::new(config).await
}
}
impl Default for RatMemCacheBuilder {
fn default() -> Self {
Self::new()
}
}
impl RatMemCache {
pub async fn new(config: CacheConfig) -> CacheResult<Self> {
let start_time = Instant::now();
rat_logger::debug!("[CACHE] RatMemCache::new 开始初始化");
rat_logger::debug!("[CACHE] 配: {:?}", config);
rat_logger::debug!("[CACHE] 开始初始化 RatMemCache...");
rat_logger::debug!("[CACHE] 初始化压缩器");
let compressor = if let Some(ref l2_config) = config.l2 {
Arc::new(Compressor::new_from_l2_config(l2_config))
} else {
Arc::new(Compressor::new_disabled())
};
rat_logger::debug!("[CACHE] 初始化 TTL 管理器");
let ttl_manager = Arc::new(TtlManager::new(config.ttl.clone()).await?);
rat_logger::debug!("[CACHE] 初始化 L1 缓存");
let l1_cache = Arc::new(
L1Cache::new(
config.l1.clone(),
compressor.as_ref().clone(),
Arc::clone(&ttl_manager),
).await?
);
rat_logger::debug!("[CACHE] L1 缓存初始化成功");
#[cfg(feature = "melange-storage")]
let l2_cache = {
let l2_config = config.l2.as_ref().ok_or_else(|| {
CacheError::config_error("启用了 melange-storage 特性但未配置 L2")
})?;
if l2_config.enable_l2_cache {
rat_logger::debug!("[CACHE] 检查是否启用 L2 缓存: {}", l2_config.enable_l2_cache);
rat_logger::debug!("[CACHE] L2 缓存配置: {:?}", l2_config);
rat_logger::debug!("[CACHE] 开始初始化 L2 缓存");
rat_logger::debug!("[CACHE] L2 缓存数据目录: {:?}", l2_config.data_dir);
if let Some(dir) = &l2_config.data_dir {
rat_logger::debug!("[CACHE] 手动验证 L2 缓存目录是否可写: {:?}", dir);
rat_logger::debug!("[CACHE] 目录是否存在: {}", dir.exists());
if !dir.exists() {
rat_logger::debug!("[CACHE] 尝试创建目录: {:?}", dir);
match std::fs::create_dir_all(dir) {
Ok(_) => rat_logger::debug!("[CACHE] 目录创建成功"),
Err(e) => rat_logger::debug!("[CACHE] 创建目录失败: {}", e)
}
}
let test_file = dir.join(".cache_write_test");
rat_logger::debug!("[CACHE] 尝试写入测试文件: {:?}", test_file);
match std::fs::write(&test_file, b"test") {
Ok(_) => {
rat_logger::debug!("[CACHE] 测试文件写入成功");
match std::fs::remove_file(&test_file) {
Ok(_) => rat_logger::debug!("[CACHE] 测试文件删除成功"),
Err(e) => rat_logger::debug!("[CACHE] 测试文件删除失败: {}", e)
}
},
Err(e) => rat_logger::debug!("[CACHE] 测试文件写入失败: {}", e)
}
} else {
rat_logger::debug!("[CACHE] L2 缓存数据目录未设置");
}
rat_logger::debug!("[CACHE] 调用 L2Cache::new");
let l2_cache_result = L2Cache::new(
l2_config.clone(),
compressor.as_ref().clone(),
Arc::clone(&ttl_manager),
).await;
match &l2_cache_result {
Ok(_) => rat_logger::debug!("[CACHE] L2Cache::new 调用成功"),
Err(e) => rat_logger::debug!("[CACHE] L2Cache::new 调用失败: {}", e)
}
Some(Arc::new(l2_cache_result?))
} else {
rat_logger::debug!("[CACHE] L2 缓存已禁用,不创建任何实例");
None
}
};
#[cfg(not(feature = "melange-storage"))]
let l2_cache: Option<()> = None;
rat_logger::debug!("[CACHE] 创建 RatMemCache 实例");
let cache = Self {
config: Arc::new(config.clone()),
l1_cache,
#[cfg(feature = "melange-storage")]
l2_cache,
ttl_manager,
compressor,
is_running: Arc::new(RwLock::new(true)),
};
let elapsed = start_time.elapsed();
rat_logger::debug!("[CACHE] RatMemCache 初始化完成,耗时: {:.2}ms", elapsed.as_millis());
rat_logger::debug!("[CACHE] 返回 RatMemCache 实例");
Ok(cache)
}
pub async fn get(&self, key: &str) -> CacheResult<Option<Bytes>> {
self.get_with_options(key, &CacheOptions::default()).await
}
pub async fn get_with_options(&self, key: &str, options: &CacheOptions) -> CacheResult<Option<Bytes>> {
let start_time = Instant::now();
if self.ttl_manager.is_expired(key).await {
self.delete_internal(key).await?;
return Ok(None);
}
if !options.skip_l1 {
if let Some(value) = self.l1_cache.get(key).await? {
transfer_log!(debug, "L1 缓存命中: {}", key);
return Ok(Some(value));
}
}
#[cfg(feature = "melange-storage")]
if let Some(l2_cache) = &self.l2_cache {
if let Some(value) = l2_cache.get(key).await? {
transfer_log!(debug, "L2 缓存命中: {}", key);
if !options.skip_l1 && !options.force_l2 {
let ttl = self.ttl_manager.get_ttl(key).await;
if let Err(e) = self.l1_cache.set(key.to_string(), value.clone(), ttl).await {
rat_logger::warn!("[CACHE] L1 缓存设置失败: {} - {}", key, e);
}
}
return Ok(Some(value));
}
}
rat_logger::debug!("[CACHE] 缓存未命中: {}", key);
Ok(None)
}
pub async fn set(&self, key: String, value: Bytes) -> CacheResult<()> {
self.set_with_options(key, value, &CacheOptions::default()).await
}
pub async fn set_with_ttl(&self, key: String, value: Bytes, ttl_seconds: u64) -> CacheResult<()> {
let options = CacheOptions {
ttl_seconds: Some(ttl_seconds),
..Default::default()
};
self.set_with_options(key, value, &options).await
}
pub async fn set_with_options(&self, key: String, value: Bytes, options: &CacheOptions) -> CacheResult<()> {
let start_time = Instant::now();
let threshold = self.config.performance.large_value_threshold;
let is_large_value = value.len() > threshold;
let processed_value = value.clone();
if is_large_value {
rat_logger::debug!("[CACHE] 检测到大值: {} ({} bytes)", key, value.len());
#[cfg(feature = "melange-storage")]
{
if let Some(l2_cache) = &self.l2_cache {
rat_logger::debug!("[CACHE] 大值直接下沉到 L2: {}", key);
if let Some(ttl) = options.ttl_seconds {
l2_cache.set_with_ttl(&key, processed_value, ttl).await?;
} else {
l2_cache.set(key.clone(), processed_value, None).await?;
}
} else {
rat_logger::warn!("[CACHE] 大值被抛弃(无 L2 缓存): {} ({} bytes > {} bytes)",
key, value.len(), self.config.performance.large_value_threshold);
return Ok(());
}
}
#[cfg(not(feature = "melange-storage"))]
{
rat_logger::warn!("[CACHE] 大值被抛弃(未启用 L2 功能): {} ({} bytes > {} bytes)",
key, value.len(), self.config.performance.large_value_threshold);
return Ok(());
}
} else {
if !options.skip_l1 && !options.force_l2 {
if let Err(e) = self.l1_cache.set(key.clone(), processed_value.clone(), options.ttl_seconds).await {
rat_logger::warn!("[CACHE] L1 缓存设置失败: {} - {}", key, e);
}
}
#[cfg(feature = "melange-storage")]
let should_write_l2 = if let Some(_l2_cache) = &self.l2_cache {
options.force_l2 || self.should_write_to_l2(&key, &processed_value, options).await
} else {
false
};
#[cfg(not(feature = "melange-storage"))]
let should_write_l2 = false;
if should_write_l2 {
#[cfg(feature = "melange-storage")]
if let Some(l2_cache) = &self.l2_cache {
if let Some(ttl) = options.ttl_seconds {
l2_cache.set_with_ttl(&key, processed_value, ttl).await?;
} else {
l2_cache.set(key.clone(), processed_value, None).await?;
}
}
}
}
rat_logger::debug!("[CACHE] 缓存设置完成: {} (大值: {}, L1: {}, L2: {})",
key, is_large_value, !options.skip_l1 && !options.force_l2 && !is_large_value, is_large_value);
Ok(())
}
pub async fn delete(&self, key: &str) -> CacheResult<bool> {
let start_time = Instant::now();
let deleted = self.delete_internal(key).await?;
Ok(deleted)
}
pub async fn clear(&self) -> CacheResult<()> {
let start_time = Instant::now();
self.l1_cache.clear().await?;
#[cfg(feature = "melange-storage")]
if let Some(l2_cache) = &self.l2_cache {
l2_cache.clear().await?;
}
rat_logger::debug!("[CACHE] 缓存已清空");
Ok(())
}
pub async fn contains_key(&self, key: &str) -> CacheResult<bool> {
if self.ttl_manager.is_expired(key).await {
self.delete_internal(key).await?;
return Ok(false);
}
if self.l1_cache.contains_key(key) {
return Ok(true);
}
#[cfg(feature = "melange-storage")]
if let Some(l2_cache) = &self.l2_cache {
l2_cache.contains_key(key).await
} else {
Ok(false)
}
#[cfg(not(feature = "melange-storage"))]
{
Ok(false)
}
}
pub async fn keys(&self) -> CacheResult<Vec<String>> {
let mut keys = std::collections::HashSet::<String>::new();
for key in self.l1_cache.keys() {
if !self.ttl_manager.is_expired(&key).await {
keys.insert(key);
}
}
#[cfg(feature = "melange-storage")]
if let Some(l2_cache) = &self.l2_cache {
for key in l2_cache.keys().await? {
if !self.ttl_manager.is_expired(&key).await {
keys.insert(key);
}
}
}
Ok(keys.into_iter().collect::<Vec<String>>())
}
pub async fn len(&self) -> CacheResult<usize> {
let keys = self.keys().await?;
Ok(keys.len())
}
pub async fn is_empty(&self) -> CacheResult<bool> {
let len = self.len().await?;
Ok(len == 0)
}
pub async fn get_l1_stats(&self) -> L1CacheStats {
self.l1_cache.get_stats().await
}
#[cfg(feature = "melange-storage")]
pub async fn get_l2_stats(&self) -> L2CacheStats {
if let Some(l2_cache) = &self.l2_cache {
l2_cache.get_stats().await
} else {
L2CacheStats::default()
}
}
#[cfg(feature = "melange-storage")]
pub async fn get_hit_rate(&self) -> Option<f64> {
let l2_stats = self.get_l2_stats().await;
let total_requests = l2_stats.hits + l2_stats.misses;
if total_requests > 0 {
Some((l2_stats.hits as f64 / total_requests as f64) * 100.0)
} else {
None
}
}
#[cfg(not(feature = "melange-storage"))]
pub async fn get_hit_rate(&self) -> Option<f64> {
None
}
#[cfg(feature = "melange-storage")]
pub async fn compact(&self) -> CacheResult<()> {
if let Some(l2_cache) = &self.l2_cache {
l2_cache.compact().await
} else {
Ok(())
}
}
pub async fn cleanup_expired(&self) -> CacheResult<u64> {
Ok(0)
}
pub async fn get_ttl(&self, key: &str) -> Option<u64> {
self.ttl_manager.get_ttl(key).await
}
pub async fn set_ttl(&self, key: &str, ttl_seconds: u64) -> CacheResult<()> {
let _ = self.ttl_manager.add_key(key.to_string(), Some(ttl_seconds)).await;
Ok(())
}
pub async fn remove_ttl(&self, key: &str) -> CacheResult<()> {
self.ttl_manager.remove_key(key).await;
Ok(())
}
pub async fn shutdown(&self) -> CacheResult<()> {
rat_logger::info!("[CACHE] 开始关闭 RatMemCache...");
{
let mut running = self.is_running.write().await;
*running = false;
}
tokio::time::sleep(Duration::from_millis(100)).await;
rat_logger::info!("[CACHE] RatMemCache 已关闭");
Ok(())
}
async fn delete_internal(&self, key: &str) -> CacheResult<bool> {
let mut deleted = false;
if self.l1_cache.delete(key).await? {
deleted = true;
}
#[cfg(feature = "melange-storage")]
if let Some(l2_cache) = &self.l2_cache {
if l2_cache.delete(key).await? {
deleted = true;
}
}
self.ttl_manager.remove_key(key).await;
if deleted {
rat_logger::debug!("[CACHE] 缓存删除: {}", key);
}
Ok(deleted)
}
#[cfg(feature = "melange-storage")]
async fn should_write_to_l2(&self, _key: &str, value: &Bytes, options: &CacheOptions) -> bool {
if options.force_l2 {
return true;
}
if let Some(l2_config) = &self.config.l2 {
match l2_config.l2_write_strategy.as_str() {
"always" => true,
"never" => false,
"size_based" => value.len() >= l2_config.l2_write_threshold,
"ttl_based" => options.ttl_seconds.unwrap_or(0) >= l2_config.l2_write_ttl_threshold,
"adaptive" => {
let l1_stats = self.l1_cache.get_stats().await;
let l1_usage_ratio = l1_stats.memory_usage as f64 / self.config.l1.max_memory as f64;
l1_usage_ratio > 0.8 || value.len() >= l2_config.l2_write_threshold
},
_ => false,
}
} else {
false
}
}
}
impl Clone for RatMemCache {
fn clone(&self) -> Self {
Self {
config: Arc::clone(&self.config),
l1_cache: Arc::clone(&self.l1_cache),
#[cfg(feature = "melange-storage")]
l2_cache: self.l2_cache.as_ref().map(|cache| Arc::clone(cache)),
ttl_manager: Arc::clone(&self.ttl_manager),
compressor: Arc::clone(&self.compressor),
is_running: Arc::clone(&self.is_running),
}
}
}
#[cfg(all(test, feature = "melange-storage"))]
mod tests {
use super::*;
use crate::config::CacheConfigBuilder;
use bytes::Bytes;
use tempfile::TempDir;
async fn create_test_cache() -> (RatMemCache, TempDir) {
let temp_dir = TempDir::new().unwrap();
let cache = RatMemCacheBuilder::new()
.l1_config(crate::config::L1Config {
max_memory: 1024 * 1024 * 1024, max_entries: 100_000,
eviction_strategy: crate::EvictionStrategy::Lru,
})
.l2_config(crate::config::L2Config {
enable_l2_cache: true,
data_dir: Some(temp_dir.path().to_path_buf()),
max_disk_size: 10 * 1024 * 1024, write_buffer_size: 1024 * 1024, max_write_buffer_number: 3,
block_cache_size: 512 * 1024, enable_lz4: true,
compression_threshold: 128,
compression_max_threshold: 1024 * 1024,
compression_level: 6,
background_threads: 2,
clear_on_startup: false,
cache_size_mb: 256,
max_file_size_mb: 512,
smart_flush_enabled: true,
smart_flush_base_interval_ms: 100,
smart_flush_min_interval_ms: 20,
smart_flush_max_interval_ms: 500,
smart_flush_write_rate_threshold: 10000,
smart_flush_accumulated_bytes_threshold: 4 * 1024 * 1024,
cache_warmup_strategy: crate::config::CacheWarmupStrategy::Recent,
zstd_compression_level: None,
l2_write_strategy: "write_through".to_string(),
l2_write_threshold: 1024,
l2_write_ttl_threshold: 300,
})
.ttl_config(crate::config::TtlConfig {
expire_seconds: Some(60),
cleanup_interval: 60,
max_cleanup_entries: 100,
lazy_expiration: true,
active_expiration: false, })
.performance_config(crate::config::PerformanceConfig {
worker_threads: 4,
enable_concurrency: true,
read_write_separation: true,
batch_size: 100,
enable_warmup: false,
large_value_threshold: 10240, })
.logging_config(crate::config::LoggingConfig {
level: "debug".to_string(),
enable_colors: false,
show_timestamp: false,
enable_performance_logs: true,
enable_audit_logs: false,
enable_cache_logs: true,
enable_logging: true,
enable_async: false,
batch_size: 2048,
batch_interval_ms: 25,
buffer_size: 16384,
})
.build()
.await
.unwrap();
(cache, temp_dir)
}
#[tokio::test]
async fn test_cache_creation() {
let (cache, _temp_dir) = create_test_cache().await;
let is_empty = cache.is_empty().await.unwrap();
assert!(is_empty);
}
#[tokio::test]
async fn test_basic_operations() {
let (cache, _temp_dir) = create_test_cache().await;
let key = "test_key".to_string();
let value = Bytes::from("test_value");
cache.set(key.clone(), value.clone()).await.unwrap();
let retrieved = cache.get(&key).await.unwrap();
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap(), value);
assert!(cache.contains_key(&key).await.unwrap());
let deleted = cache.delete(&key).await.unwrap();
assert!(deleted);
assert!(!cache.contains_key(&key).await.unwrap());
}
#[tokio::test]
async fn test_ttl_operations() {
let (cache, _temp_dir) = create_test_cache().await;
let key = "ttl_key".to_string();
let value = Bytes::from("ttl_value");
cache.set_with_ttl(key.clone(), value.clone(), 2).await.unwrap();
let retrieved = cache.get(&key).await.unwrap();
assert!(retrieved.is_some());
let ttl = cache.get_ttl(&key).await;
assert!(ttl.is_some());
tokio::time::sleep(Duration::from_millis(2100)).await;
let retrieved = cache.get(&key).await.unwrap();
assert!(retrieved.is_none());
}
#[tokio::test]
async fn test_cache_options() {
let (cache, _temp_dir) = create_test_cache().await;
let key = "options_key".to_string();
let value = Bytes::from("options_value");
let options = CacheOptions {
force_l2: true,
..Default::default()
};
cache.set_with_options(key.clone(), value.clone(), &options).await.unwrap();
let get_options = CacheOptions {
skip_l1: true,
..Default::default()
};
let retrieved = cache.get_with_options(&key, &get_options).await.unwrap();
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap(), value);
}
#[tokio::test]
async fn test_clear_and_stats() {
let (cache, _temp_dir) = create_test_cache().await;
for i in 0..10 {
let key = format!("key_{}", i);
let value = Bytes::from(format!("value_{}", i));
cache.set(key, value).await.unwrap();
}
let len_before = cache.len().await.unwrap();
assert!(len_before > 0);
for i in 0..5 {
let key = format!("key_{}", i);
let _ = cache.get(&key).await.unwrap();
}
#[cfg(feature = "melange-storage")]
let l2_stats = cache.get_l2_stats().await;
cache.clear().await.unwrap();
let is_empty = cache.is_empty().await.unwrap();
assert!(is_empty);
}
#[tokio::test]
async fn test_shutdown() {
let (cache, _temp_dir) = create_test_cache().await;
cache.set("test".to_string(), Bytes::from("value")).await.unwrap();
cache.shutdown().await.unwrap();
let running = cache.is_running.read().await;
assert!(!*running);
}
}