1use std::hash::Hash;
67use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
68use std::sync::Arc;
69
70use dashmap::DashMap;
71use parking_lot::RwLock;
72
73pub const DEFAULT_PAGE_SIZE: usize = 64 * 1024;
75
76pub const DEFAULT_CACHE_PAGES: usize = 16_384; pub const MIN_HOT_RATIO: f64 = 0.05;
81
82pub const MAX_HOT_RATIO: f64 = 0.95;
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
91pub struct PageId {
92 pub file_id: u64,
94 pub page_no: u64,
96}
97
98impl PageId {
99 pub fn new(file_id: u64, page_no: u64) -> Self {
100 Self { file_id, page_no }
101 }
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub enum PageState {
111 Hot,
113 Cold,
115 Test,
117}
118
119pub struct CachedPage {
121 #[allow(dead_code)]
123 id: PageId,
124 data: Vec<u8>,
126 referenced: AtomicBool,
128 state: RwLock<PageState>,
130 dirty: AtomicBool,
132 access_count: AtomicU64,
134 pin_count: AtomicUsize,
136}
137
138impl CachedPage {
139 pub fn new(id: PageId, data: Vec<u8>) -> Self {
140 Self {
141 id,
142 data,
143 referenced: AtomicBool::new(true),
144 state: RwLock::new(PageState::Cold),
145 dirty: AtomicBool::new(false),
146 access_count: AtomicU64::new(1),
147 pin_count: AtomicUsize::new(0),
148 }
149 }
150
151 pub fn data(&self) -> &[u8] {
153 &self.data
154 }
155
156 pub fn data_mut(&mut self) -> &mut [u8] {
158 self.dirty.store(true, Ordering::Release);
159 &mut self.data
160 }
161
162 pub fn is_dirty(&self) -> bool {
164 self.dirty.load(Ordering::Acquire)
165 }
166
167 pub fn mark_clean(&self) {
169 self.dirty.store(false, Ordering::Release);
170 }
171
172 pub fn pin(&self) {
174 self.pin_count.fetch_add(1, Ordering::AcqRel);
175 }
176
177 pub fn unpin(&self) {
179 self.pin_count.fetch_sub(1, Ordering::AcqRel);
180 }
181
182 pub fn is_pinned(&self) -> bool {
184 self.pin_count.load(Ordering::Acquire) > 0
185 }
186
187 pub fn touch(&self) {
189 self.referenced.store(true, Ordering::Release);
190 self.access_count.fetch_add(1, Ordering::Relaxed);
191 }
192
193 pub fn state(&self) -> PageState {
195 *self.state.read()
196 }
197
198 pub fn set_state(&self, state: PageState) {
200 *self.state.write() = state;
201 }
202}
203
204struct ClockRing<K: Clone + Eq + Hash> {
210 entries: Vec<K>,
212 hand: AtomicUsize,
214 capacity: usize,
216}
217
218impl<K: Clone + Eq + Hash> ClockRing<K> {
219 fn new(capacity: usize) -> Self {
220 Self {
221 entries: Vec::with_capacity(capacity),
222 hand: AtomicUsize::new(0),
223 capacity,
224 }
225 }
226
227 fn insert(&mut self, key: K) -> Option<K> {
229 if self.entries.len() < self.capacity {
230 self.entries.push(key);
231 return None;
232 }
233
234 let pos = self.hand.load(Ordering::Relaxed) % self.entries.len();
236 let evicted = std::mem::replace(&mut self.entries[pos], key);
237 self.advance_hand();
238 Some(evicted)
239 }
240
241 fn remove(&mut self, key: &K) -> bool {
243 if let Some(pos) = self.entries.iter().position(|k| k == key) {
244 self.entries.remove(pos);
245 let hand = self.hand.load(Ordering::Relaxed);
247 if pos < hand && hand > 0 {
248 self.hand.fetch_sub(1, Ordering::Relaxed);
249 }
250 return true;
251 }
252 false
253 }
254
255 fn advance_hand(&self) {
257 if !self.entries.is_empty() {
258 self.hand.fetch_add(1, Ordering::Relaxed);
259 }
260 }
261
262 fn current(&self) -> Option<K> {
264 if self.entries.is_empty() {
265 return None;
266 }
267 let pos = self.hand.load(Ordering::Relaxed) % self.entries.len();
268 Some(self.entries[pos].clone())
269 }
270
271 fn len(&self) -> usize {
273 self.entries.len()
274 }
275
276 #[allow(dead_code)]
278 fn is_empty(&self) -> bool {
279 self.entries.is_empty()
280 }
281
282 fn set_capacity(&mut self, new_capacity: usize) {
284 self.capacity = new_capacity;
285 }
286}
287
288pub struct ClockProCache {
311 pages: DashMap<PageId, Arc<CachedPage>>,
313 hot_ring: RwLock<ClockRing<PageId>>,
315 cold_ring: RwLock<ClockRing<PageId>>,
317 test_ring: RwLock<ClockRing<PageId>>,
319 capacity: usize,
321 hot_target: AtomicUsize,
323 stats: CacheStats,
325 page_size: usize,
327}
328
329#[derive(Debug, Default)]
331pub struct CacheStats {
332 pub hits: AtomicU64,
334 pub misses: AtomicU64,
336 pub hot_hits: AtomicU64,
338 pub cold_hits: AtomicU64,
340 pub test_hits: AtomicU64,
342 pub evictions: AtomicU64,
344 pub dirty_evictions: AtomicU64,
346}
347
348impl CacheStats {
349 pub fn hit_rate(&self) -> f64 {
350 let hits = self.hits.load(Ordering::Relaxed);
351 let misses = self.misses.load(Ordering::Relaxed);
352 let total = hits + misses;
353 if total == 0 {
354 return 0.0;
355 }
356 hits as f64 / total as f64
357 }
358
359 pub fn reset(&self) {
360 self.hits.store(0, Ordering::Relaxed);
361 self.misses.store(0, Ordering::Relaxed);
362 self.hot_hits.store(0, Ordering::Relaxed);
363 self.cold_hits.store(0, Ordering::Relaxed);
364 self.test_hits.store(0, Ordering::Relaxed);
365 self.evictions.store(0, Ordering::Relaxed);
366 self.dirty_evictions.store(0, Ordering::Relaxed);
367 }
368}
369
370impl ClockProCache {
371 pub fn new(capacity: usize) -> Self {
373 Self::with_page_size(capacity, DEFAULT_PAGE_SIZE)
374 }
375
376 pub fn with_page_size(capacity: usize, page_size: usize) -> Self {
378 let initial_hot = capacity / 2;
379 let cold_capacity = capacity - initial_hot;
380
381 Self {
382 pages: DashMap::new(),
383 hot_ring: RwLock::new(ClockRing::new(initial_hot)),
384 cold_ring: RwLock::new(ClockRing::new(cold_capacity)),
385 test_ring: RwLock::new(ClockRing::new(capacity)), capacity,
387 hot_target: AtomicUsize::new(initial_hot),
388 stats: CacheStats::default(),
389 page_size,
390 }
391 }
392
393 pub fn get(&self, page_id: &PageId) -> Option<Arc<CachedPage>> {
395 if let Some(page) = self.pages.get(page_id) {
396 let page = page.clone();
397 page.touch();
398
399 match page.state() {
400 PageState::Hot => {
401 self.stats.hot_hits.fetch_add(1, Ordering::Relaxed);
402 }
403 PageState::Cold => {
404 self.promote_to_hot(page_id);
406 self.stats.cold_hits.fetch_add(1, Ordering::Relaxed);
407 }
408 PageState::Test => {
409 self.stats.test_hits.fetch_add(1, Ordering::Relaxed);
411 self.adapt_increase_hot();
412 return None; }
414 }
415
416 self.stats.hits.fetch_add(1, Ordering::Relaxed);
417 return Some(page);
418 }
419
420 if self.is_in_test(page_id) {
422 self.stats.test_hits.fetch_add(1, Ordering::Relaxed);
423 self.adapt_increase_hot();
424 }
425
426 self.stats.misses.fetch_add(1, Ordering::Relaxed);
427 None
428 }
429
430 pub fn insert(&self, page_id: PageId, data: Vec<u8>) -> Arc<CachedPage> {
432 let page = Arc::new(CachedPage::new(page_id, data));
433
434 self.make_room();
436
437 {
439 let mut cold = self.cold_ring.write();
440 if let Some(evicted) = cold.insert(page_id) {
441 self.evict(&evicted);
442 }
443 }
444
445 page.set_state(PageState::Cold);
446 self.pages.insert(page_id, page.clone());
447
448 page
449 }
450
451 pub fn insert_pinned(&self, page_id: PageId, data: Vec<u8>) -> Arc<CachedPage> {
453 let page = self.insert(page_id, data);
454 page.pin();
455 page
456 }
457
458 fn evict(&self, page_id: &PageId) {
460 if let Some((_, page)) = self.pages.remove(page_id) {
461 if page.is_pinned() {
462 self.pages.insert(*page_id, page);
464 return;
465 }
466
467 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
468 if page.is_dirty() {
469 self.stats.dirty_evictions.fetch_add(1, Ordering::Relaxed);
470 }
472
473 {
475 let mut test = self.test_ring.write();
476 test.insert(*page_id);
477 }
478 }
479 }
480
481 fn promote_to_hot(&self, page_id: &PageId) {
483 {
485 let mut cold = self.cold_ring.write();
486 cold.remove(page_id);
487 }
488
489 {
491 let mut hot = self.hot_ring.write();
492 if let Some(demoted) = hot.insert(*page_id) {
493 self.demote_to_cold(&demoted);
495 }
496 }
497
498 if let Some(page) = self.pages.get(page_id) {
499 page.set_state(PageState::Hot);
500 }
501 }
502
503 fn demote_to_cold(&self, page_id: &PageId) {
505 {
506 let mut cold = self.cold_ring.write();
507 cold.insert(*page_id);
508 }
509
510 if let Some(page) = self.pages.get(page_id) {
511 page.set_state(PageState::Cold);
512 page.referenced.store(false, Ordering::Release);
513 }
514 }
515
516 fn is_in_test(&self, page_id: &PageId) -> bool {
518 let test = self.test_ring.read();
519 test.entries.contains(page_id)
520 }
521
522 fn make_room(&self) {
524 let total = self.pages.len();
525 if total < self.capacity {
526 return;
527 }
528
529 let mut cold = self.cold_ring.write();
531 while cold.len() > 0 && self.pages.len() >= self.capacity {
532 if let Some(victim) = cold.current() {
533 if let Some(page) = self.pages.get(&victim) {
534 if !page.is_pinned() && !page.referenced.load(Ordering::Acquire) {
535 cold.remove(&victim);
536 drop(cold);
537 self.evict(&victim);
538 return;
539 }
540 page.referenced.store(false, Ordering::Release);
541 }
542 }
543 cold.advance_hand();
544 }
545 }
546
547 fn adapt_increase_hot(&self) {
549 let current = self.hot_target.load(Ordering::Relaxed);
550 let max_hot = (self.capacity as f64 * MAX_HOT_RATIO) as usize;
551
552 if current < max_hot {
553 let new_hot = (current + 1).min(max_hot);
554 self.hot_target.store(new_hot, Ordering::Relaxed);
555
556 let mut hot = self.hot_ring.write();
557 hot.set_capacity(new_hot);
558
559 let mut cold = self.cold_ring.write();
560 cold.set_capacity(self.capacity - new_hot);
561 }
562 }
563
564 #[allow(dead_code)]
566 fn adapt_decrease_hot(&self) {
567 let current = self.hot_target.load(Ordering::Relaxed);
568 let min_hot = (self.capacity as f64 * MIN_HOT_RATIO) as usize;
569
570 if current > min_hot {
571 let new_hot = current.saturating_sub(1).max(min_hot);
572 self.hot_target.store(new_hot, Ordering::Relaxed);
573
574 let mut hot = self.hot_ring.write();
575 hot.set_capacity(new_hot);
576
577 let mut cold = self.cold_ring.write();
578 cold.set_capacity(self.capacity - new_hot);
579 }
580 }
581
582 pub fn stats(&self) -> &CacheStats {
584 &self.stats
585 }
586
587 pub fn len(&self) -> usize {
589 self.pages.len()
590 }
591
592 pub fn is_empty(&self) -> bool {
594 self.pages.is_empty()
595 }
596
597 pub fn capacity(&self) -> usize {
599 self.capacity
600 }
601
602 pub fn hot_ratio(&self) -> f64 {
604 self.hot_target.load(Ordering::Relaxed) as f64 / self.capacity as f64
605 }
606
607 pub fn clear(&self) {
609 self.pages.clear();
610 *self.hot_ring.write() = ClockRing::new(self.capacity / 2);
611 *self.cold_ring.write() = ClockRing::new(self.capacity / 2);
612 *self.test_ring.write() = ClockRing::new(self.capacity);
613 self.stats.reset();
614 }
615
616 pub fn memory_usage(&self) -> usize {
618 self.pages.len() * self.page_size
619 }
620}
621
622#[cfg(test)]
623mod tests {
624 use super::*;
625
626 #[test]
627 fn test_page_cache_basic() {
628 let cache = ClockProCache::new(100);
629
630 let page_id = PageId::new(1, 0);
631 let data = vec![0u8; 1024];
632
633 cache.insert(page_id, data.clone());
634
635 let page = cache.get(&page_id).unwrap();
636 assert_eq!(page.data(), data.as_slice());
637 }
638
639 #[test]
640 fn test_page_cache_miss() {
641 let cache = ClockProCache::new(100);
642
643 let page_id = PageId::new(1, 0);
644 assert!(cache.get(&page_id).is_none());
645
646 assert_eq!(cache.stats().misses.load(Ordering::Relaxed), 1);
647 }
648
649 #[test]
650 fn test_page_cache_promotion() {
651 let cache = ClockProCache::new(100);
652
653 let page_id = PageId::new(1, 0);
654 cache.insert(page_id, vec![0u8; 1024]);
655
656 let page = cache.get(&page_id).unwrap();
658 assert_eq!(page.state(), PageState::Hot); }
660
661 #[test]
662 fn test_page_cache_eviction() {
663 let cache = ClockProCache::new(10);
664
665 for i in 0..15 {
667 let page_id = PageId::new(1, i);
668 cache.insert(page_id, vec![0u8; 64]);
669 }
670
671 assert!(cache.len() <= 10);
673 assert!(cache.stats().evictions.load(Ordering::Relaxed) > 0);
674 }
675
676 #[test]
677 fn test_page_cache_pinned() {
678 let cache = ClockProCache::new(10);
679
680 let page_id = PageId::new(1, 0);
681 let page = cache.insert_pinned(page_id, vec![0u8; 64]);
682
683 assert!(page.is_pinned());
684
685 page.unpin();
686 assert!(!page.is_pinned());
687 }
688
689 #[test]
690 fn test_page_cache_dirty() {
691 let cache = ClockProCache::new(10);
692
693 let page_id = PageId::new(1, 0);
694 cache.insert(page_id, vec![0u8; 64]);
695
696 if let Some(page) = cache.get(&page_id) {
697 assert!(!page.is_dirty());
698 }
699 }
700
701 #[test]
702 fn test_cache_stats() {
703 let cache = ClockProCache::new(100);
704
705 let page_id = PageId::new(1, 0);
706 cache.insert(page_id, vec![0u8; 1024]);
707
708 cache.get(&page_id);
710 cache.get(&page_id);
711
712 assert_eq!(cache.stats().hits.load(Ordering::Relaxed), 2);
713 assert!(cache.stats().hit_rate() > 0.0);
714 }
715
716 #[test]
717 fn test_adaptive_hot_ratio() {
718 let cache = ClockProCache::new(100);
719
720 let initial_ratio = cache.hot_ratio();
721
722 for _ in 0..10 {
724 cache.adapt_increase_hot();
725 }
726
727 assert!(cache.hot_ratio() > initial_ratio);
728 }
729}