1use std::hash::Hash;
70use std::sync::Arc;
71use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
72
73use dashmap::DashMap;
74use parking_lot::RwLock;
75
76pub const DEFAULT_PAGE_SIZE: usize = 64 * 1024;
78
79pub const DEFAULT_CACHE_PAGES: usize = 16_384; pub const MIN_HOT_RATIO: f64 = 0.05;
84
85pub const MAX_HOT_RATIO: f64 = 0.95;
87
88#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
94pub struct PageId {
95 pub file_id: u64,
97 pub page_no: u64,
99}
100
101impl PageId {
102 pub fn new(file_id: u64, page_no: u64) -> Self {
103 Self { file_id, page_no }
104 }
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
113pub enum PageState {
114 Hot,
116 Cold,
118 Test,
120}
121
122pub struct CachedPage {
124 #[allow(dead_code)]
126 id: PageId,
127 data: Vec<u8>,
129 referenced: AtomicBool,
131 state: RwLock<PageState>,
133 dirty: AtomicBool,
135 access_count: AtomicU64,
137 pin_count: AtomicUsize,
139}
140
141impl CachedPage {
142 pub fn new(id: PageId, data: Vec<u8>) -> Self {
143 Self {
144 id,
145 data,
146 referenced: AtomicBool::new(true),
147 state: RwLock::new(PageState::Cold),
148 dirty: AtomicBool::new(false),
149 access_count: AtomicU64::new(1),
150 pin_count: AtomicUsize::new(0),
151 }
152 }
153
154 pub fn data(&self) -> &[u8] {
156 &self.data
157 }
158
159 pub fn data_mut(&mut self) -> &mut [u8] {
161 self.dirty.store(true, Ordering::Release);
162 &mut self.data
163 }
164
165 pub fn is_dirty(&self) -> bool {
167 self.dirty.load(Ordering::Acquire)
168 }
169
170 pub fn mark_clean(&self) {
172 self.dirty.store(false, Ordering::Release);
173 }
174
175 pub fn pin(&self) {
177 self.pin_count.fetch_add(1, Ordering::AcqRel);
178 }
179
180 pub fn unpin(&self) {
182 self.pin_count.fetch_sub(1, Ordering::AcqRel);
183 }
184
185 pub fn is_pinned(&self) -> bool {
187 self.pin_count.load(Ordering::Acquire) > 0
188 }
189
190 pub fn touch(&self) {
192 self.referenced.store(true, Ordering::Release);
193 self.access_count.fetch_add(1, Ordering::Relaxed);
194 }
195
196 pub fn state(&self) -> PageState {
198 *self.state.read()
199 }
200
201 pub fn set_state(&self, state: PageState) {
203 *self.state.write() = state;
204 }
205}
206
207struct ClockRing<K: Clone + Eq + Hash> {
213 entries: Vec<K>,
215 hand: AtomicUsize,
217 capacity: usize,
219}
220
221impl<K: Clone + Eq + Hash> ClockRing<K> {
222 fn new(capacity: usize) -> Self {
223 Self {
224 entries: Vec::with_capacity(capacity),
225 hand: AtomicUsize::new(0),
226 capacity,
227 }
228 }
229
230 fn insert(&mut self, key: K) -> Option<K> {
232 if self.entries.len() < self.capacity {
233 self.entries.push(key);
234 return None;
235 }
236
237 let pos = self.hand.load(Ordering::Relaxed) % self.entries.len();
239 let evicted = std::mem::replace(&mut self.entries[pos], key);
240 self.advance_hand();
241 Some(evicted)
242 }
243
244 fn remove(&mut self, key: &K) -> bool {
246 if let Some(pos) = self.entries.iter().position(|k| k == key) {
247 self.entries.remove(pos);
248 let hand = self.hand.load(Ordering::Relaxed);
250 if pos < hand && hand > 0 {
251 self.hand.fetch_sub(1, Ordering::Relaxed);
252 }
253 return true;
254 }
255 false
256 }
257
258 fn advance_hand(&self) {
260 if !self.entries.is_empty() {
261 self.hand.fetch_add(1, Ordering::Relaxed);
262 }
263 }
264
265 fn current(&self) -> Option<K> {
267 if self.entries.is_empty() {
268 return None;
269 }
270 let pos = self.hand.load(Ordering::Relaxed) % self.entries.len();
271 Some(self.entries[pos].clone())
272 }
273
274 fn len(&self) -> usize {
276 self.entries.len()
277 }
278
279 #[allow(dead_code)]
281 fn is_empty(&self) -> bool {
282 self.entries.is_empty()
283 }
284
285 fn set_capacity(&mut self, new_capacity: usize) {
287 self.capacity = new_capacity;
288 }
289}
290
291pub struct ClockProCache {
314 pages: DashMap<PageId, Arc<CachedPage>>,
316 hot_ring: RwLock<ClockRing<PageId>>,
318 cold_ring: RwLock<ClockRing<PageId>>,
320 test_ring: RwLock<ClockRing<PageId>>,
322 capacity: usize,
324 hot_target: AtomicUsize,
326 stats: CacheStats,
328 page_size: usize,
330}
331
332#[derive(Debug, Default)]
334pub struct CacheStats {
335 pub hits: AtomicU64,
337 pub misses: AtomicU64,
339 pub hot_hits: AtomicU64,
341 pub cold_hits: AtomicU64,
343 pub test_hits: AtomicU64,
345 pub evictions: AtomicU64,
347 pub dirty_evictions: AtomicU64,
349}
350
351impl CacheStats {
352 pub fn hit_rate(&self) -> f64 {
353 let hits = self.hits.load(Ordering::Relaxed);
354 let misses = self.misses.load(Ordering::Relaxed);
355 let total = hits + misses;
356 if total == 0 {
357 return 0.0;
358 }
359 hits as f64 / total as f64
360 }
361
362 pub fn reset(&self) {
363 self.hits.store(0, Ordering::Relaxed);
364 self.misses.store(0, Ordering::Relaxed);
365 self.hot_hits.store(0, Ordering::Relaxed);
366 self.cold_hits.store(0, Ordering::Relaxed);
367 self.test_hits.store(0, Ordering::Relaxed);
368 self.evictions.store(0, Ordering::Relaxed);
369 self.dirty_evictions.store(0, Ordering::Relaxed);
370 }
371}
372
373impl ClockProCache {
374 pub fn new(capacity: usize) -> Self {
376 Self::with_page_size(capacity, DEFAULT_PAGE_SIZE)
377 }
378
379 pub fn with_page_size(capacity: usize, page_size: usize) -> Self {
381 let initial_hot = capacity / 2;
382 let cold_capacity = capacity - initial_hot;
383
384 Self {
385 pages: DashMap::new(),
386 hot_ring: RwLock::new(ClockRing::new(initial_hot)),
387 cold_ring: RwLock::new(ClockRing::new(cold_capacity)),
388 test_ring: RwLock::new(ClockRing::new(capacity)), capacity,
390 hot_target: AtomicUsize::new(initial_hot),
391 stats: CacheStats::default(),
392 page_size,
393 }
394 }
395
396 pub fn get(&self, page_id: &PageId) -> Option<Arc<CachedPage>> {
398 if let Some(page) = self.pages.get(page_id) {
399 let page = page.clone();
400 page.touch();
401
402 match page.state() {
403 PageState::Hot => {
404 self.stats.hot_hits.fetch_add(1, Ordering::Relaxed);
405 }
406 PageState::Cold => {
407 self.promote_to_hot(page_id);
409 self.stats.cold_hits.fetch_add(1, Ordering::Relaxed);
410 }
411 PageState::Test => {
412 self.stats.test_hits.fetch_add(1, Ordering::Relaxed);
414 self.adapt_increase_hot();
415 return None; }
417 }
418
419 self.stats.hits.fetch_add(1, Ordering::Relaxed);
420 return Some(page);
421 }
422
423 if self.is_in_test(page_id) {
425 self.stats.test_hits.fetch_add(1, Ordering::Relaxed);
426 self.adapt_increase_hot();
427 }
428
429 self.stats.misses.fetch_add(1, Ordering::Relaxed);
430 None
431 }
432
433 pub fn insert(&self, page_id: PageId, data: Vec<u8>) -> Arc<CachedPage> {
435 let page = Arc::new(CachedPage::new(page_id, data));
436
437 self.make_room();
439
440 {
442 let mut cold = self.cold_ring.write();
443 if let Some(evicted) = cold.insert(page_id) {
444 self.evict(&evicted);
445 }
446 }
447
448 page.set_state(PageState::Cold);
449 self.pages.insert(page_id, page.clone());
450
451 page
452 }
453
454 pub fn insert_pinned(&self, page_id: PageId, data: Vec<u8>) -> Arc<CachedPage> {
456 let page = self.insert(page_id, data);
457 page.pin();
458 page
459 }
460
461 fn evict(&self, page_id: &PageId) {
463 if let Some((_, page)) = self.pages.remove(page_id) {
464 if page.is_pinned() {
465 self.pages.insert(*page_id, page);
467 return;
468 }
469
470 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
471 if page.is_dirty() {
472 self.stats.dirty_evictions.fetch_add(1, Ordering::Relaxed);
473 }
475
476 {
478 let mut test = self.test_ring.write();
479 test.insert(*page_id);
480 }
481 }
482 }
483
484 fn promote_to_hot(&self, page_id: &PageId) {
486 {
488 let mut cold = self.cold_ring.write();
489 cold.remove(page_id);
490 }
491
492 {
494 let mut hot = self.hot_ring.write();
495 if let Some(demoted) = hot.insert(*page_id) {
496 self.demote_to_cold(&demoted);
498 }
499 }
500
501 if let Some(page) = self.pages.get(page_id) {
502 page.set_state(PageState::Hot);
503 }
504 }
505
506 fn demote_to_cold(&self, page_id: &PageId) {
508 {
509 let mut cold = self.cold_ring.write();
510 cold.insert(*page_id);
511 }
512
513 if let Some(page) = self.pages.get(page_id) {
514 page.set_state(PageState::Cold);
515 page.referenced.store(false, Ordering::Release);
516 }
517 }
518
519 fn is_in_test(&self, page_id: &PageId) -> bool {
521 let test = self.test_ring.read();
522 test.entries.contains(page_id)
523 }
524
525 fn make_room(&self) {
527 let total = self.pages.len();
528 if total < self.capacity {
529 return;
530 }
531
532 let mut cold = self.cold_ring.write();
534 while cold.len() > 0 && self.pages.len() >= self.capacity {
535 if let Some(victim) = cold.current() {
536 if let Some(page) = self.pages.get(&victim) {
537 if !page.is_pinned() && !page.referenced.load(Ordering::Acquire) {
538 cold.remove(&victim);
539 drop(cold);
540 self.evict(&victim);
541 return;
542 }
543 page.referenced.store(false, Ordering::Release);
544 }
545 }
546 cold.advance_hand();
547 }
548 }
549
550 fn adapt_increase_hot(&self) {
552 let current = self.hot_target.load(Ordering::Relaxed);
553 let max_hot = (self.capacity as f64 * MAX_HOT_RATIO) as usize;
554
555 if current < max_hot {
556 let new_hot = (current + 1).min(max_hot);
557 self.hot_target.store(new_hot, Ordering::Relaxed);
558
559 let mut hot = self.hot_ring.write();
560 hot.set_capacity(new_hot);
561
562 let mut cold = self.cold_ring.write();
563 cold.set_capacity(self.capacity - new_hot);
564 }
565 }
566
567 #[allow(dead_code)]
569 fn adapt_decrease_hot(&self) {
570 let current = self.hot_target.load(Ordering::Relaxed);
571 let min_hot = (self.capacity as f64 * MIN_HOT_RATIO) as usize;
572
573 if current > min_hot {
574 let new_hot = current.saturating_sub(1).max(min_hot);
575 self.hot_target.store(new_hot, Ordering::Relaxed);
576
577 let mut hot = self.hot_ring.write();
578 hot.set_capacity(new_hot);
579
580 let mut cold = self.cold_ring.write();
581 cold.set_capacity(self.capacity - new_hot);
582 }
583 }
584
585 pub fn stats(&self) -> &CacheStats {
587 &self.stats
588 }
589
590 pub fn len(&self) -> usize {
592 self.pages.len()
593 }
594
595 pub fn is_empty(&self) -> bool {
597 self.pages.is_empty()
598 }
599
600 pub fn capacity(&self) -> usize {
602 self.capacity
603 }
604
605 pub fn hot_ratio(&self) -> f64 {
607 self.hot_target.load(Ordering::Relaxed) as f64 / self.capacity as f64
608 }
609
610 pub fn clear(&self) {
612 self.pages.clear();
613 *self.hot_ring.write() = ClockRing::new(self.capacity / 2);
614 *self.cold_ring.write() = ClockRing::new(self.capacity / 2);
615 *self.test_ring.write() = ClockRing::new(self.capacity);
616 self.stats.reset();
617 }
618
619 pub fn memory_usage(&self) -> usize {
621 self.pages.len() * self.page_size
622 }
623}
624
625#[cfg(test)]
626mod tests {
627 use super::*;
628
629 #[test]
630 fn test_page_cache_basic() {
631 let cache = ClockProCache::new(100);
632
633 let page_id = PageId::new(1, 0);
634 let data = vec![0u8; 1024];
635
636 cache.insert(page_id, data.clone());
637
638 let page = cache.get(&page_id).unwrap();
639 assert_eq!(page.data(), data.as_slice());
640 }
641
642 #[test]
643 fn test_page_cache_miss() {
644 let cache = ClockProCache::new(100);
645
646 let page_id = PageId::new(1, 0);
647 assert!(cache.get(&page_id).is_none());
648
649 assert_eq!(cache.stats().misses.load(Ordering::Relaxed), 1);
650 }
651
652 #[test]
653 fn test_page_cache_promotion() {
654 let cache = ClockProCache::new(100);
655
656 let page_id = PageId::new(1, 0);
657 cache.insert(page_id, vec![0u8; 1024]);
658
659 let page = cache.get(&page_id).unwrap();
661 assert_eq!(page.state(), PageState::Hot); }
663
664 #[test]
665 fn test_page_cache_eviction() {
666 let cache = ClockProCache::new(10);
667
668 for i in 0..15 {
670 let page_id = PageId::new(1, i);
671 cache.insert(page_id, vec![0u8; 64]);
672 }
673
674 assert!(cache.len() <= 10);
676 assert!(cache.stats().evictions.load(Ordering::Relaxed) > 0);
677 }
678
679 #[test]
680 fn test_page_cache_pinned() {
681 let cache = ClockProCache::new(10);
682
683 let page_id = PageId::new(1, 0);
684 let page = cache.insert_pinned(page_id, vec![0u8; 64]);
685
686 assert!(page.is_pinned());
687
688 page.unpin();
689 assert!(!page.is_pinned());
690 }
691
692 #[test]
693 fn test_page_cache_dirty() {
694 let cache = ClockProCache::new(10);
695
696 let page_id = PageId::new(1, 0);
697 cache.insert(page_id, vec![0u8; 64]);
698
699 if let Some(page) = cache.get(&page_id) {
700 assert!(!page.is_dirty());
701 }
702 }
703
704 #[test]
705 fn test_cache_stats() {
706 let cache = ClockProCache::new(100);
707
708 let page_id = PageId::new(1, 0);
709 cache.insert(page_id, vec![0u8; 1024]);
710
711 cache.get(&page_id);
713 cache.get(&page_id);
714
715 assert_eq!(cache.stats().hits.load(Ordering::Relaxed), 2);
716 assert!(cache.stats().hit_rate() > 0.0);
717 }
718
719 #[test]
720 fn test_adaptive_hot_ratio() {
721 let cache = ClockProCache::new(100);
722
723 let initial_ratio = cache.hot_ratio();
724
725 for _ in 0..10 {
727 cache.adapt_increase_hot();
728 }
729
730 assert!(cache.hot_ratio() > initial_ratio);
731 }
732}