use std::collections::{HashMap, LinkedList, VecDeque};
use std::sync::{Arc, RwLock, Mutex};
use std::time::{Duration, Instant};
use std::hash::{Hash, Hasher};
use std::collections::hash_map::DefaultHasher;
use parking_lot::RwLock as ParkingRwLock;
use serde::{Serialize, Deserialize};
use crate::debug_log;
#[derive(Debug, Clone)]
pub struct CacheBlock {
pub data: Vec<u8>,
pub block_id: u64,
pub access_count: u32,
pub last_access: Instant,
pub created_at: Instant,
pub size: usize,
pub access_pattern: AccessPattern,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum AccessPattern {
Sequential, Random, Unknown, }
#[derive(Debug, Clone, Copy)]
pub enum EvictionPolicy {
LRU, LFU, ARC, SizeAware, }
#[derive(Debug, Clone)]
pub struct CacheConfig {
pub max_size: usize,
pub block_size: usize,
pub eviction_policy: EvictionPolicy,
pub enable_prefetch: bool,
pub prefetch_window: usize,
pub enable_compression: bool,
pub compression_threshold: usize,
}
impl Default for CacheConfig {
fn default() -> Self {
Self {
max_size: 256 * 1024 * 1024, block_size: 4096, eviction_policy: EvictionPolicy::ARC,
enable_prefetch: true,
prefetch_window: 4,
enable_compression: true,
compression_threshold: 1024, }
}
}
#[derive(Debug)]
struct LruNode {
block: CacheBlock,
prev: Option<*mut LruNode>,
next: Option<*mut LruNode>,
}
#[derive(Debug)]
struct LruCache {
map: HashMap<u64, Box<LruNode>>,
head: Option<*mut LruNode>,
tail: Option<*mut LruNode>,
current_size: usize,
max_size: usize,
}
impl LruCache {
fn new(max_size: usize) -> Self {
Self {
map: HashMap::new(),
head: None,
tail: None,
current_size: 0,
max_size,
}
}
fn get(&mut self, block_id: u64) -> Option<CacheBlock> {
if self.map.contains_key(&block_id) {
let node = self.map.remove(&block_id).unwrap();
let block = node.block.clone();
let block_size = block.size;
self.current_size = self.current_size.saturating_sub(block_size);
self.put_without_size_check(block.clone());
Some(block)
} else {
None
}
}
fn put(&mut self, block: CacheBlock) -> Option<CacheBlock> {
let block_size = block.size;
if self.map.contains_key(&block.block_id) {
let old_node = self.map.remove(&block.block_id).unwrap();
self.current_size = self.current_size.saturating_sub(old_node.block.size);
return Some(old_node.block);
}
while self.current_size + block_size > self.max_size {
if let Some(evicted) = self.evict() {
self.current_size -= evicted.size;
} else {
break;
}
}
let mut node = Box::new(LruNode {
block: block.clone(),
prev: None,
next: self.head,
});
let node_ptr = node.as_mut() as *mut LruNode;
if let Some(head) = self.head {
unsafe {
(*head).prev = Some(node_ptr);
}
}
self.head = Some(node_ptr);
if self.tail.is_none() {
self.tail = Some(node_ptr);
}
self.map.insert(block.block_id, node);
self.current_size += block_size;
None
}
fn put_without_size_check(&mut self, block: CacheBlock) {
let block_size = block.size;
let mut node = Box::new(LruNode {
block: block.clone(),
prev: None,
next: self.head,
});
let node_ptr = node.as_mut() as *mut LruNode;
if let Some(head) = self.head {
unsafe {
(*head).prev = Some(node_ptr);
}
}
self.head = Some(node_ptr);
if self.tail.is_none() {
self.tail = Some(node_ptr);
}
self.map.insert(block.block_id, node);
self.current_size += block_size;
}
fn move_to_head(&mut self, node: &mut Box<LruNode>) {
let node_ptr = node.as_mut() as *mut LruNode;
if let Some(head) = self.head {
if head == node_ptr {
return;
}
}
if let Some(prev) = node.prev {
unsafe {
(*prev).next = node.next;
}
} else {
return;
}
if let Some(next) = node.next {
unsafe {
(*next).prev = node.prev;
}
} else {
self.tail = node.prev;
}
node.prev = None;
node.next = self.head;
if let Some(head) = self.head {
unsafe {
(*head).prev = Some(node_ptr);
}
}
self.head = Some(node_ptr);
}
fn evict(&mut self) -> Option<CacheBlock> {
if let Some(tail_ptr) = self.tail {
unsafe {
let tail = &mut *tail_ptr;
self.tail = tail.prev;
if let Some(prev) = tail.prev {
(*prev).next = None;
} else {
self.head = None;
}
let block_id = tail.block.block_id;
let block = self.map.remove(&block_id).unwrap().block;
Some(block)
}
} else {
None
}
}
fn clear(&mut self) {
self.map.clear();
self.head = None;
self.tail = None;
self.current_size = 0;
}
fn len(&self) -> usize {
self.map.len()
}
fn size(&self) -> usize {
self.current_size
}
}
#[derive(Debug)]
pub struct TieredBlockCache {
hot_cache: Arc<ParkingRwLock<LruCache>>,
warm_cache: Arc<ParkingRwLock<LruCache>>,
cold_cache: Arc<ParkingRwLock<LruCache>>,
config: CacheConfig,
prefetch_queue: Arc<Mutex<VecDeque<u64>>>,
access_patterns: Arc<RwLock<HashMap<u64, AccessPattern>>>,
stats: Arc<RwLock<CacheStats>>,
}
#[doc(hidden)]
#[derive(Debug, Clone, Default)]
pub struct CacheStats {
pub hits: u64,
pub misses: u64,
pub evictions: u64,
pub prefetch_hits: u64,
pub prefetch_misses: u64,
pub hot_hits: u64,
pub warm_hits: u64,
pub cold_hits: u64,
pub total_bytes_served: u64,
pub compression_ratio: f64,
}
impl TieredBlockCache {
pub fn new(config: CacheConfig) -> Self {
let hot_size = (config.max_size as f64 * 0.1) as usize; let warm_size = (config.max_size as f64 * 0.3) as usize; let cold_size = (config.max_size as f64 * 0.6) as usize;
debug_log!("创建分级块缓存: 热={}, 温={}, 冷={}", hot_size, warm_size, cold_size);
Self {
hot_cache: Arc::new(ParkingRwLock::new(LruCache::new(hot_size))),
warm_cache: Arc::new(ParkingRwLock::new(LruCache::new(warm_size))),
cold_cache: Arc::new(ParkingRwLock::new(LruCache::new(cold_size))),
config,
prefetch_queue: Arc::new(Mutex::new(VecDeque::new())),
access_patterns: Arc::new(RwLock::new(HashMap::new())),
stats: Arc::new(RwLock::new(CacheStats::default())),
}
}
pub fn get(&self, block_id: u64) -> Option<CacheBlock> {
if let Some(block) = self.hot_cache.write().get(block_id) {
self.update_stats(true, CacheTier::Hot);
return Some(block);
}
if let Some(block) = self.warm_cache.write().get(block_id) {
self.update_stats(true, CacheTier::Warm);
self.promote_to_hot(block.clone());
return Some(block);
}
if let Some(block) = self.cold_cache.write().get(block_id) {
self.update_stats(true, CacheTier::Cold);
self.promote_to_warm(block.clone());
return Some(block);
}
self.update_stats(false, CacheTier::Cold);
None
}
pub fn put(&self, mut block: CacheBlock) {
self.update_access_pattern(block.block_id);
if self.config.enable_compression && block.size > self.config.compression_threshold {
if let Ok(compressed) = self.compress_block(&block) {
block.data = compressed;
block.size = block.data.len();
}
}
self.warm_cache.write().put(block.clone());
if self.config.enable_prefetch {
self.trigger_prefetch(block.block_id);
}
}
fn promote_to_hot(&self, block: CacheBlock) {
self.hot_cache.write().put(block);
}
fn promote_to_warm(&self, block: CacheBlock) {
self.warm_cache.write().put(block);
}
fn trigger_prefetch(&self, current_block_id: u64) {
let mut queue = self.prefetch_queue.lock().unwrap();
for i in 1..=self.config.prefetch_window {
let next_block_id = current_block_id + i as u64;
if !queue.contains(&next_block_id) {
queue.push_back(next_block_id);
}
}
}
pub fn get_prefetch_task(&self) -> Option<u64> {
let mut queue = self.prefetch_queue.lock().unwrap();
queue.pop_front()
}
fn update_access_pattern(&self, block_id: u64) {
let mut patterns = self.access_patterns.write().unwrap();
let pattern = patterns.entry(block_id).or_insert(AccessPattern::Unknown);
*pattern = match pattern {
AccessPattern::Unknown => AccessPattern::Sequential,
AccessPattern::Sequential => AccessPattern::Sequential,
AccessPattern::Random => AccessPattern::Random,
};
}
fn compress_block(&self, block: &CacheBlock) -> Result<Vec<u8>, String> {
use zstd::bulk::compress;
match compress(&block.data, 3) { Ok(compressed) => {
if compressed.len() < block.data.len() {
Ok(compressed)
} else {
Err("压缩后没有节省空间".to_string())
}
}
Err(e) => Err(format!("压缩失败: {}", e)),
}
}
fn update_stats(&self, hit: bool, tier: CacheTier) {
let mut stats = self.stats.write().unwrap();
if hit {
stats.hits += 1;
match tier {
CacheTier::Hot => stats.hot_hits += 1,
CacheTier::Warm => stats.warm_hits += 1,
CacheTier::Cold => stats.cold_hits += 1,
}
} else {
stats.misses += 1;
}
}
pub fn stats(&self) -> CacheStats {
self.stats.read().unwrap().clone()
}
pub fn clear(&self) {
self.hot_cache.write().clear();
self.warm_cache.write().clear();
self.cold_cache.write().clear();
self.prefetch_queue.lock().unwrap().clear();
self.access_patterns.write().unwrap().clear();
}
pub fn size_info(&self) -> CacheSizeInfo {
CacheSizeInfo {
hot_size: self.hot_cache.read().size(),
warm_size: self.warm_cache.read().size(),
cold_size: self.cold_cache.read().size(),
hot_blocks: self.hot_cache.read().len(),
warm_blocks: self.warm_cache.read().len(),
cold_blocks: self.cold_cache.read().len(),
}
}
}
#[derive(Debug, Clone, Copy)]
enum CacheTier {
Hot,
Warm,
Cold,
}
#[derive(Debug, Clone)]
pub struct CacheSizeInfo {
pub hot_size: usize,
pub warm_size: usize,
pub cold_size: usize,
pub hot_blocks: usize,
pub warm_blocks: usize,
pub cold_blocks: usize,
}
#[derive(Debug)]
pub struct CacheManager {
block_cache: Arc<TieredBlockCache>,
config: CacheConfig,
}
impl CacheManager {
pub fn new(config: CacheConfig) -> Self {
Self {
block_cache: Arc::new(TieredBlockCache::new(config.clone())),
config,
}
}
pub fn read_block(&self, block_id: u64) -> Option<CacheBlock> {
if let Some(block) = self.block_cache.get(block_id) {
return Some(block);
}
None
}
pub fn write_block(&self, block_id: u64, data: Vec<u8>) {
let size = data.len();
let block = CacheBlock {
data,
block_id,
access_count: 1,
last_access: Instant::now(),
created_at: Instant::now(),
size,
access_pattern: AccessPattern::Unknown,
};
self.block_cache.put(block);
}
pub fn prefetch_blocks(&self, block_ids: &[u64]) {
for &block_id in block_ids {
if self.block_cache.get(block_id).is_none() {
self.block_cache.trigger_prefetch(block_id);
}
}
}
pub fn stats(&self) -> CacheStats {
self.block_cache.stats()
}
pub fn size_info(&self) -> CacheSizeInfo {
self.block_cache.size_info()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_lru_cache_basic() {
let mut cache = LruCache::new(1024);
let block1 = CacheBlock {
data: vec![1u8; 100],
block_id: 1,
access_count: 1,
last_access: Instant::now(),
created_at: Instant::now(),
size: 100,
access_pattern: AccessPattern::Unknown,
};
let block2 = CacheBlock {
data: vec![2u8; 200],
block_id: 2,
access_count: 1,
last_access: Instant::now(),
created_at: Instant::now(),
size: 200,
access_pattern: AccessPattern::Unknown,
};
assert!(cache.put(block1).is_none());
assert!(cache.put(block2).is_none());
assert!(cache.get(1).is_some());
assert!(cache.get(2).is_some());
assert!(cache.get(3).is_none());
assert_eq!(cache.size(), 300);
assert_eq!(cache.len(), 2);
}
#[test]
fn test_tiered_block_cache() {
let config = CacheConfig::default();
let cache = TieredBlockCache::new(config);
let block = CacheBlock {
data: vec![1u8; 100],
block_id: 1,
access_count: 1,
last_access: Instant::now(),
created_at: Instant::now(),
size: 100,
access_pattern: AccessPattern::Unknown,
};
cache.put(block.clone());
assert!(cache.get(1).is_some());
assert!(cache.get(2).is_none());
let stats = cache.stats();
assert!(stats.hits > 0);
}
#[test]
fn test_cache_manager() {
let config = CacheConfig::default();
let manager = CacheManager::new(config);
let block_id = 1;
let data = vec![1u8; 100];
manager.write_block(block_id, data.clone());
let cached_block = manager.read_block(block_id);
assert!(cached_block.is_some());
assert_eq!(cached_block.unwrap().data, data);
}
}