use super::*;
use crate::error::Result;
use std::sync::Arc;
use std::time::{Duration, Instant};
pub struct LruPageCache {
config: PageCacheConfig,
shards: Vec<Arc<SingleLruPageCache>>,
global_stats: CacheStatistics,
buffer_pool: Arc<BufferPool>,
}
impl LruPageCache {
pub fn new(config: PageCacheConfig) -> Result<Self> {
config.validate()?;
let num_shards = config.num_shards as usize;
let mut shards = Vec::with_capacity(num_shards);
let shard_capacity = config.capacity / num_shards;
let mut shard_config = config.clone();
shard_config.capacity = shard_capacity;
shard_config.num_shards = 1;
for shard_id in 0..num_shards {
let shard = Arc::new(SingleLruPageCache::new(shard_config.clone())?);
shards.push(shard);
}
let buffer_pool_size = std::cmp::max(64, num_shards * 16);
let buffer_pool = Arc::new(BufferPool::new(buffer_pool_size));
Ok(Self {
config,
shards,
global_stats: CacheStatistics::new(),
buffer_pool,
})
}
pub fn register_file(&self, fd: i32) -> Result<FileId> {
self.shards[0].register_file(fd)
}
pub fn read(&self, file_id: FileId, offset: u64, length: usize) -> Result<CacheBuffer> {
let start_page = (offset / PAGE_SIZE as u64) as PageId;
let shard_id = get_shard_id(file_id, start_page, self.config.num_shards) as usize;
let mut buffer = self.buffer_pool.get();
let shard = &self.shards[shard_id];
let data = shard.read(file_id, offset, length, &mut buffer)?;
self.global_stats.record_bytes_read(length as u64);
self.global_stats.record_hit(buffer.hit_type());
Ok(buffer)
}
pub fn read_batch(&self, requests: Vec<(FileId, u64, usize)>) -> Result<Vec<CacheBuffer>> {
let mut results = Vec::with_capacity(requests.len());
let mut shard_requests: Vec<Vec<(usize, FileId, u64, usize)>> =
vec![Vec::new(); self.shards.len()];
for (idx, &(file_id, offset, length)) in requests.iter().enumerate() {
let start_page = (offset / PAGE_SIZE as u64) as PageId;
let shard_id = get_shard_id(file_id, start_page, self.config.num_shards) as usize;
shard_requests[shard_id].push((idx, file_id, offset, length));
}
results.resize_with(requests.len(), || CacheBuffer::new());
for (shard_id, shard_reqs) in shard_requests.iter().enumerate() {
let shard = &self.shards[shard_id];
for &(idx, file_id, offset, length) in shard_reqs {
let mut buffer = self.buffer_pool.get();
match shard.read(file_id, offset, length, &mut buffer) {
Ok(_) => {
self.global_stats.record_bytes_read(length as u64);
self.global_stats.record_hit(buffer.hit_type());
results[idx] = buffer;
}
Err(e) => {
self.global_stats.record_load_failure();
return Err(e);
}
}
}
}
Ok(results)
}
pub fn read_blob_record(&self, file_id: FileId, record_id: u64, base_offset: u64) -> Result<CacheBuffer> {
let offset = base_offset + record_id * 1024; let length = 1024;
self.read(file_id, offset, length)
}
pub fn prefetch(&self, file_id: FileId, offset: u64, length: usize) -> Result<()> {
let start_page = (offset / PAGE_SIZE as u64) as PageId;
let end_page = ((offset + length as u64 - 1) / PAGE_SIZE as u64) as PageId;
for page_id in start_page..=end_page {
let shard_id = get_shard_id(file_id, page_id, self.config.num_shards) as usize;
let page_offset = (page_id as u64) * (PAGE_SIZE as u64);
let _ = self.read(file_id, page_offset, PAGE_SIZE);
}
Ok(())
}
pub fn invalidate_file(&self, file_id: FileId) -> Result<()> {
for shard in &self.shards {
}
Ok(())
}
pub fn stats(&self) -> CacheStatsSnapshot {
let mut aggregated = self.global_stats.snapshot();
for shard in &self.shards {
let shard_stats = shard.stats().snapshot();
for i in 0..7 {
aggregated.hit_counts[i] += shard_stats.hit_counts[i];
}
aggregated.total_hits += shard_stats.total_hits;
aggregated.total_misses += shard_stats.total_misses;
aggregated.evictions += shard_stats.evictions;
aggregated.hash_collisions += shard_stats.hash_collisions;
aggregated.memory_used += shard_stats.memory_used;
aggregated.lock_contentions += shard_stats.lock_contentions;
aggregated.lock_acquisitions += shard_stats.lock_acquisitions;
}
aggregated.hit_ratio = if aggregated.total_hits + aggregated.total_misses > 0 {
aggregated.total_hits as f64 / (aggregated.total_hits + aggregated.total_misses) as f64
} else {
0.0
};
aggregated.miss_ratio = 1.0 - aggregated.hit_ratio;
aggregated.memory_utilization = if aggregated.memory_allocated > 0 {
aggregated.memory_used as f64 / aggregated.memory_allocated as f64
} else {
0.0
};
aggregated.lock_contention_ratio = if aggregated.lock_acquisitions > 0 {
aggregated.lock_contentions as f64 / aggregated.lock_acquisitions as f64
} else {
0.0
};
aggregated
}
pub fn shard_stats(&self) -> Vec<CacheStatsSnapshot> {
self.shards.iter().map(|shard| shard.stats().snapshot()).collect()
}
pub fn buffer_pool_stats(&self) -> BufferPoolStats {
self.buffer_pool.stats()
}
pub fn config(&self) -> &PageCacheConfig {
&self.config
}
pub fn shard_count(&self) -> usize {
self.shards.len()
}
pub fn get_shard(&self, file_id: FileId, page_id: PageId) -> &Arc<SingleLruPageCache> {
let shard_id = get_shard_id(file_id, page_id, self.config.num_shards) as usize;
&self.shards[shard_id]
}
pub fn maintenance(&self) -> Result<()> {
for shard in &self.shards {
}
self.global_stats.record_maintenance_cycle();
Ok(())
}
pub fn resize(&mut self, new_capacity: usize) -> Result<()> {
Err(CacheError::HardwareUnsupported.into())
}
pub fn flush(&self) -> Result<()> {
Ok(())
}
}
pub enum ShardStrategy {
Hash,
RoundRobin,
LoadBalanced,
Custom(fn(FileId, PageId, u32) -> u32),
}
impl ShardStrategy {
pub fn select_shard(&self, file_id: FileId, page_id: PageId, num_shards: u32) -> u32 {
match self {
ShardStrategy::Hash => get_shard_id(file_id, page_id, num_shards),
ShardStrategy::RoundRobin => {
page_id % num_shards
}
ShardStrategy::LoadBalanced => {
get_shard_id(file_id, page_id, num_shards)
}
ShardStrategy::Custom(func) => func(file_id, page_id, num_shards),
}
}
}
pub struct CacheOpContext {
pub op_id: u64,
pub start_time: Instant,
pub shards_used: Vec<usize>,
pub bytes_requested: usize,
pub hit_info: Vec<CacheHitType>,
}
impl CacheOpContext {
pub fn new(op_id: u64, bytes_requested: usize) -> Self {
Self {
op_id,
start_time: Instant::now(),
shards_used: Vec::new(),
bytes_requested,
hit_info: Vec::new(),
}
}
pub fn add_shard(&mut self, shard_id: usize, hit_type: CacheHitType) {
self.shards_used.push(shard_id);
self.hit_info.push(hit_type);
}
pub fn duration(&self) -> Duration {
self.start_time.elapsed()
}
pub fn overall_hit_type(&self) -> CacheHitType {
if self.hit_info.is_empty() {
CacheHitType::Hit
} else if self.hit_info.len() == 1 {
self.hit_info[0]
} else {
CacheHitType::Mix
}
}
}