use std::hash::Hash;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use dashmap::DashMap;
use parking_lot::RwLock;
pub const DEFAULT_PAGE_SIZE: usize = 64 * 1024;
pub const DEFAULT_CACHE_PAGES: usize = 16_384;
pub const MIN_HOT_RATIO: f64 = 0.05;
pub const MAX_HOT_RATIO: f64 = 0.95;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct PageId {
pub file_id: u64,
pub page_no: u64,
}
impl PageId {
pub fn new(file_id: u64, page_no: u64) -> Self {
Self { file_id, page_no }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PageState {
Hot,
Cold,
Test,
}
pub struct CachedPage {
#[allow(dead_code)]
id: PageId,
data: Vec<u8>,
referenced: AtomicBool,
state: RwLock<PageState>,
dirty: AtomicBool,
access_count: AtomicU64,
pin_count: AtomicUsize,
}
impl CachedPage {
pub fn new(id: PageId, data: Vec<u8>) -> Self {
Self {
id,
data,
referenced: AtomicBool::new(true),
state: RwLock::new(PageState::Cold),
dirty: AtomicBool::new(false),
access_count: AtomicU64::new(1),
pin_count: AtomicUsize::new(0),
}
}
pub fn data(&self) -> &[u8] {
&self.data
}
pub fn data_mut(&mut self) -> &mut [u8] {
self.dirty.store(true, Ordering::Release);
&mut self.data
}
pub fn is_dirty(&self) -> bool {
self.dirty.load(Ordering::Acquire)
}
pub fn mark_clean(&self) {
self.dirty.store(false, Ordering::Release);
}
pub fn pin(&self) {
self.pin_count.fetch_add(1, Ordering::AcqRel);
}
pub fn unpin(&self) {
self.pin_count.fetch_sub(1, Ordering::AcqRel);
}
pub fn is_pinned(&self) -> bool {
self.pin_count.load(Ordering::Acquire) > 0
}
pub fn touch(&self) {
self.referenced.store(true, Ordering::Release);
self.access_count.fetch_add(1, Ordering::Relaxed);
}
pub fn state(&self) -> PageState {
*self.state.read()
}
pub fn set_state(&self, state: PageState) {
*self.state.write() = state;
}
}
struct ClockRing<K: Clone + Eq + Hash> {
entries: Vec<K>,
hand: AtomicUsize,
capacity: usize,
}
impl<K: Clone + Eq + Hash> ClockRing<K> {
fn new(capacity: usize) -> Self {
Self {
entries: Vec::with_capacity(capacity),
hand: AtomicUsize::new(0),
capacity,
}
}
fn insert(&mut self, key: K) -> Option<K> {
if self.entries.len() < self.capacity {
self.entries.push(key);
return None;
}
let pos = self.hand.load(Ordering::Relaxed) % self.entries.len();
let evicted = std::mem::replace(&mut self.entries[pos], key);
self.advance_hand();
Some(evicted)
}
fn remove(&mut self, key: &K) -> bool {
if let Some(pos) = self.entries.iter().position(|k| k == key) {
self.entries.remove(pos);
let hand = self.hand.load(Ordering::Relaxed);
if pos < hand && hand > 0 {
self.hand.fetch_sub(1, Ordering::Relaxed);
}
return true;
}
false
}
fn advance_hand(&self) {
if !self.entries.is_empty() {
self.hand.fetch_add(1, Ordering::Relaxed);
}
}
fn current(&self) -> Option<K> {
if self.entries.is_empty() {
return None;
}
let pos = self.hand.load(Ordering::Relaxed) % self.entries.len();
Some(self.entries[pos].clone())
}
fn len(&self) -> usize {
self.entries.len()
}
#[allow(dead_code)]
fn is_empty(&self) -> bool {
self.entries.is_empty()
}
fn set_capacity(&mut self, new_capacity: usize) {
self.capacity = new_capacity;
}
}
pub struct ClockProCache {
pages: DashMap<PageId, Arc<CachedPage>>,
hot_ring: RwLock<ClockRing<PageId>>,
cold_ring: RwLock<ClockRing<PageId>>,
test_ring: RwLock<ClockRing<PageId>>,
capacity: usize,
hot_target: AtomicUsize,
stats: CacheStats,
page_size: usize,
}
#[derive(Debug, Default)]
pub struct CacheStats {
pub hits: AtomicU64,
pub misses: AtomicU64,
pub hot_hits: AtomicU64,
pub cold_hits: AtomicU64,
pub test_hits: AtomicU64,
pub evictions: AtomicU64,
pub dirty_evictions: AtomicU64,
}
impl CacheStats {
pub fn hit_rate(&self) -> f64 {
let hits = self.hits.load(Ordering::Relaxed);
let misses = self.misses.load(Ordering::Relaxed);
let total = hits + misses;
if total == 0 {
return 0.0;
}
hits as f64 / total as f64
}
pub fn reset(&self) {
self.hits.store(0, Ordering::Relaxed);
self.misses.store(0, Ordering::Relaxed);
self.hot_hits.store(0, Ordering::Relaxed);
self.cold_hits.store(0, Ordering::Relaxed);
self.test_hits.store(0, Ordering::Relaxed);
self.evictions.store(0, Ordering::Relaxed);
self.dirty_evictions.store(0, Ordering::Relaxed);
}
}
impl ClockProCache {
pub fn new(capacity: usize) -> Self {
Self::with_page_size(capacity, DEFAULT_PAGE_SIZE)
}
pub fn with_page_size(capacity: usize, page_size: usize) -> Self {
let initial_hot = capacity / 2;
let cold_capacity = capacity - initial_hot;
Self {
pages: DashMap::new(),
hot_ring: RwLock::new(ClockRing::new(initial_hot)),
cold_ring: RwLock::new(ClockRing::new(cold_capacity)),
test_ring: RwLock::new(ClockRing::new(capacity)), capacity,
hot_target: AtomicUsize::new(initial_hot),
stats: CacheStats::default(),
page_size,
}
}
pub fn get(&self, page_id: &PageId) -> Option<Arc<CachedPage>> {
if let Some(page) = self.pages.get(page_id) {
let page = page.clone();
page.touch();
match page.state() {
PageState::Hot => {
self.stats.hot_hits.fetch_add(1, Ordering::Relaxed);
}
PageState::Cold => {
self.promote_to_hot(page_id);
self.stats.cold_hits.fetch_add(1, Ordering::Relaxed);
}
PageState::Test => {
self.stats.test_hits.fetch_add(1, Ordering::Relaxed);
self.adapt_increase_hot();
return None; }
}
self.stats.hits.fetch_add(1, Ordering::Relaxed);
return Some(page);
}
if self.is_in_test(page_id) {
self.stats.test_hits.fetch_add(1, Ordering::Relaxed);
self.adapt_increase_hot();
}
self.stats.misses.fetch_add(1, Ordering::Relaxed);
None
}
pub fn insert(&self, page_id: PageId, data: Vec<u8>) -> Arc<CachedPage> {
let page = Arc::new(CachedPage::new(page_id, data));
self.make_room();
{
let mut cold = self.cold_ring.write();
if let Some(evicted) = cold.insert(page_id) {
self.evict(&evicted);
}
}
page.set_state(PageState::Cold);
self.pages.insert(page_id, page.clone());
page
}
pub fn insert_pinned(&self, page_id: PageId, data: Vec<u8>) -> Arc<CachedPage> {
let page = self.insert(page_id, data);
page.pin();
page
}
fn evict(&self, page_id: &PageId) {
if let Some((_, page)) = self.pages.remove(page_id) {
if page.is_pinned() {
self.pages.insert(*page_id, page);
return;
}
self.stats.evictions.fetch_add(1, Ordering::Relaxed);
if page.is_dirty() {
self.stats.dirty_evictions.fetch_add(1, Ordering::Relaxed);
}
{
let mut test = self.test_ring.write();
test.insert(*page_id);
}
}
}
fn promote_to_hot(&self, page_id: &PageId) {
{
let mut cold = self.cold_ring.write();
cold.remove(page_id);
}
{
let mut hot = self.hot_ring.write();
if let Some(demoted) = hot.insert(*page_id) {
self.demote_to_cold(&demoted);
}
}
if let Some(page) = self.pages.get(page_id) {
page.set_state(PageState::Hot);
}
}
fn demote_to_cold(&self, page_id: &PageId) {
{
let mut cold = self.cold_ring.write();
cold.insert(*page_id);
}
if let Some(page) = self.pages.get(page_id) {
page.set_state(PageState::Cold);
page.referenced.store(false, Ordering::Release);
}
}
fn is_in_test(&self, page_id: &PageId) -> bool {
let test = self.test_ring.read();
test.entries.contains(page_id)
}
fn make_room(&self) {
let total = self.pages.len();
if total < self.capacity {
return;
}
let mut cold = self.cold_ring.write();
while cold.len() > 0 && self.pages.len() >= self.capacity {
if let Some(victim) = cold.current() {
if let Some(page) = self.pages.get(&victim) {
if !page.is_pinned() && !page.referenced.load(Ordering::Acquire) {
cold.remove(&victim);
drop(cold);
self.evict(&victim);
return;
}
page.referenced.store(false, Ordering::Release);
}
}
cold.advance_hand();
}
}
fn adapt_increase_hot(&self) {
let current = self.hot_target.load(Ordering::Relaxed);
let max_hot = (self.capacity as f64 * MAX_HOT_RATIO) as usize;
if current < max_hot {
let new_hot = (current + 1).min(max_hot);
self.hot_target.store(new_hot, Ordering::Relaxed);
let mut hot = self.hot_ring.write();
hot.set_capacity(new_hot);
let mut cold = self.cold_ring.write();
cold.set_capacity(self.capacity - new_hot);
}
}
#[allow(dead_code)]
fn adapt_decrease_hot(&self) {
let current = self.hot_target.load(Ordering::Relaxed);
let min_hot = (self.capacity as f64 * MIN_HOT_RATIO) as usize;
if current > min_hot {
let new_hot = current.saturating_sub(1).max(min_hot);
self.hot_target.store(new_hot, Ordering::Relaxed);
let mut hot = self.hot_ring.write();
hot.set_capacity(new_hot);
let mut cold = self.cold_ring.write();
cold.set_capacity(self.capacity - new_hot);
}
}
pub fn stats(&self) -> &CacheStats {
&self.stats
}
pub fn len(&self) -> usize {
self.pages.len()
}
pub fn is_empty(&self) -> bool {
self.pages.is_empty()
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn hot_ratio(&self) -> f64 {
self.hot_target.load(Ordering::Relaxed) as f64 / self.capacity as f64
}
pub fn clear(&self) {
self.pages.clear();
*self.hot_ring.write() = ClockRing::new(self.capacity / 2);
*self.cold_ring.write() = ClockRing::new(self.capacity / 2);
*self.test_ring.write() = ClockRing::new(self.capacity);
self.stats.reset();
}
pub fn memory_usage(&self) -> usize {
self.pages.len() * self.page_size
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_page_cache_basic() {
let cache = ClockProCache::new(100);
let page_id = PageId::new(1, 0);
let data = vec![0u8; 1024];
cache.insert(page_id, data.clone());
let page = cache.get(&page_id).unwrap();
assert_eq!(page.data(), data.as_slice());
}
#[test]
fn test_page_cache_miss() {
let cache = ClockProCache::new(100);
let page_id = PageId::new(1, 0);
assert!(cache.get(&page_id).is_none());
assert_eq!(cache.stats().misses.load(Ordering::Relaxed), 1);
}
#[test]
fn test_page_cache_promotion() {
let cache = ClockProCache::new(100);
let page_id = PageId::new(1, 0);
cache.insert(page_id, vec![0u8; 1024]);
let page = cache.get(&page_id).unwrap();
assert_eq!(page.state(), PageState::Hot); }
#[test]
fn test_page_cache_eviction() {
let cache = ClockProCache::new(10);
for i in 0..15 {
let page_id = PageId::new(1, i);
cache.insert(page_id, vec![0u8; 64]);
}
assert!(cache.len() <= 10);
assert!(cache.stats().evictions.load(Ordering::Relaxed) > 0);
}
#[test]
fn test_page_cache_pinned() {
let cache = ClockProCache::new(10);
let page_id = PageId::new(1, 0);
let page = cache.insert_pinned(page_id, vec![0u8; 64]);
assert!(page.is_pinned());
page.unpin();
assert!(!page.is_pinned());
}
#[test]
fn test_page_cache_dirty() {
let cache = ClockProCache::new(10);
let page_id = PageId::new(1, 0);
cache.insert(page_id, vec![0u8; 64]);
if let Some(page) = cache.get(&page_id) {
assert!(!page.is_dirty());
}
}
#[test]
fn test_cache_stats() {
let cache = ClockProCache::new(100);
let page_id = PageId::new(1, 0);
cache.insert(page_id, vec![0u8; 1024]);
cache.get(&page_id);
cache.get(&page_id);
assert_eq!(cache.stats().hits.load(Ordering::Relaxed), 2);
assert!(cache.stats().hit_rate() > 0.0);
}
#[test]
fn test_adaptive_hot_ratio() {
let cache = ClockProCache::new(100);
let initial_ratio = cache.hot_ratio();
for _ in 0..10 {
cache.adapt_increase_hot();
}
assert!(cache.hot_ratio() > initial_ratio);
}
}