1use std::collections::{HashMap, VecDeque};
22use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
23use std::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
24
25use super::page::Page;
26
27fn cache_read<'a, T>(lock: &'a RwLock<T>) -> RwLockReadGuard<'a, T> {
28 lock.read().unwrap_or_else(|poisoned| poisoned.into_inner())
29}
30
31fn cache_write<'a, T>(lock: &'a RwLock<T>) -> RwLockWriteGuard<'a, T> {
32 lock.write()
33 .unwrap_or_else(|poisoned| poisoned.into_inner())
34}
35
36fn cache_lock<'a, T>(lock: &'a Mutex<T>) -> MutexGuard<'a, T> {
37 lock.lock().unwrap_or_else(|poisoned| poisoned.into_inner())
38}
39
40pub const DEFAULT_CACHE_CAPACITY: usize = 100_000;
43
44pub const MIN_CACHE_CAPACITY: usize = 2;
46
47struct CacheEntry {
57 page: Page,
59 pin_count: usize,
61 dirty: bool,
63}
64
65impl CacheEntry {
66 fn new(page: Page) -> Self {
67 Self {
68 page,
69 pin_count: 0,
70 dirty: false,
71 }
72 }
73}
74
75const CACHE_LINE: usize = 64;
82
83const TAG_EMPTY: u32 = u32::MAX;
88
89#[repr(align(64))]
94struct VisitedLine([AtomicBool; CACHE_LINE]);
95
96const TAGS_PER_LINE: usize = CACHE_LINE / 4;
98
99#[repr(align(64))]
103struct TagLine([AtomicU32; TAGS_PER_LINE]);
104
105struct ShardMeta {
123 visited: Box<[VisitedLine]>,
124 tag: Box<[TagLine]>,
125 capacity: usize,
126}
127
128impl ShardMeta {
129 fn new(capacity: usize) -> Self {
130 let visited = (0..capacity.div_ceil(CACHE_LINE))
131 .map(|_| VisitedLine(std::array::from_fn(|_| AtomicBool::new(false))))
132 .collect::<Vec<_>>()
133 .into_boxed_slice();
134 let tag = (0..capacity.div_ceil(TAGS_PER_LINE))
135 .map(|_| TagLine(std::array::from_fn(|_| AtomicU32::new(TAG_EMPTY))))
136 .collect::<Vec<_>>()
137 .into_boxed_slice();
138 Self {
139 visited,
140 tag,
141 capacity,
142 }
143 }
144
145 #[inline]
150 fn mark_visited(&self, slot: usize) {
151 if let Some(b) = self.visited_bit(slot) {
152 b.store(true, Ordering::Relaxed);
153 }
154 }
155
156 #[inline]
157 fn is_visited(&self, slot: usize) -> bool {
158 self.visited_bit(slot)
159 .map(|b| b.load(Ordering::Relaxed))
160 .unwrap_or(false)
161 }
162
163 #[inline]
167 fn clear_visited(&self, slot: usize) {
168 if let Some(b) = self.visited_bit(slot) {
169 b.store(false, Ordering::Relaxed);
170 }
171 }
172
173 #[inline]
175 fn tag(&self, slot: usize) -> u32 {
176 self.tag_slot(slot)
177 .map(|t| t.load(Ordering::Relaxed))
178 .unwrap_or(TAG_EMPTY)
179 }
180
181 #[inline]
185 fn occupy(&self, slot: usize, page_id: u32) {
186 if let Some(t) = self.tag_slot(slot) {
187 t.store(page_id, Ordering::Relaxed);
188 }
189 self.clear_visited(slot);
190 }
191
192 #[inline]
194 fn vacate(&self, slot: usize) {
195 if let Some(t) = self.tag_slot(slot) {
196 t.store(TAG_EMPTY, Ordering::Relaxed);
197 }
198 self.clear_visited(slot);
199 }
200
201 fn reset(&self) {
203 for line in self.visited.iter() {
204 for b in line.0.iter() {
205 b.store(false, Ordering::Relaxed);
206 }
207 }
208 for line in self.tag.iter() {
209 for t in line.0.iter() {
210 t.store(TAG_EMPTY, Ordering::Relaxed);
211 }
212 }
213 }
214
215 #[inline]
216 fn visited_bit(&self, slot: usize) -> Option<&AtomicBool> {
217 if slot >= self.capacity {
218 return None;
219 }
220 self.visited
221 .get(slot / CACHE_LINE)
222 .map(|line| &line.0[slot % CACHE_LINE])
223 }
224
225 #[inline]
226 fn tag_slot(&self, slot: usize) -> Option<&AtomicU32> {
227 if slot >= self.capacity {
228 return None;
229 }
230 self.tag
231 .get(slot / TAGS_PER_LINE)
232 .map(|line| &line.0[slot % TAGS_PER_LINE])
233 }
234}
235
236#[derive(Debug, Default, Clone)]
238pub struct CacheStats {
239 pub hits: u64,
241 pub misses: u64,
243 pub evictions: u64,
245 pub writebacks: u64,
247}
248
249impl CacheStats {
250 pub fn hit_rate(&self) -> f64 {
252 let total = self.hits + self.misses;
253 if total == 0 {
254 0.0
255 } else {
256 self.hits as f64 / total as f64
257 }
258 }
259}
260
261pub struct PageCacheShard {
265 capacity: usize,
267 index: RwLock<HashMap<u32, usize>>,
269 fifo: Mutex<VecDeque<u32>>,
271 entries: RwLock<Vec<Option<CacheEntry>>>,
273 meta: ShardMeta,
277 free_slots: Mutex<Vec<usize>>,
279 hand: Mutex<usize>,
281 stats: Mutex<CacheStats>,
283}
284
285impl PageCacheShard {
286 pub fn new(capacity: usize) -> Self {
288 let capacity = capacity.max(MIN_CACHE_CAPACITY);
289
290 Self {
291 capacity,
292 index: RwLock::new(HashMap::with_capacity(capacity)),
293 fifo: Mutex::new(VecDeque::with_capacity(capacity)),
294 entries: RwLock::new(Vec::with_capacity(capacity)),
295 meta: ShardMeta::new(capacity),
296 free_slots: Mutex::new(Vec::new()),
297 hand: Mutex::new(0),
298 stats: Mutex::new(CacheStats::default()),
299 }
300 }
301
302 pub fn with_default_capacity() -> Self {
304 Self::new(DEFAULT_CACHE_CAPACITY)
305 }
306
307 pub fn len(&self) -> usize {
309 cache_read(&self.index).len()
310 }
311
312 pub fn is_empty(&self) -> bool {
314 self.len() == 0
315 }
316
317 pub fn capacity(&self) -> usize {
319 self.capacity
320 }
321
322 pub fn stats(&self) -> CacheStats {
324 cache_lock(&self.stats).clone()
325 }
326
327 pub fn reset_stats(&self) {
329 *cache_lock(&self.stats) = CacheStats::default();
330 }
331
332 pub fn get(&self, page_id: u32) -> Option<Page> {
337 let index = cache_read(&self.index);
339 let slot = match index.get(&page_id) {
340 Some(&s) => s,
341 None => {
342 drop(index);
343 cache_lock(&self.stats).misses += 1;
345 return None;
346 }
347 };
348 drop(index);
349
350 let entries = cache_read(&self.entries);
361 if let Some(entry) = entries.get(slot).and_then(|e| e.as_ref()) {
362 let page = entry.page.clone();
363 drop(entries);
364
365 if self.meta.tag(slot) == page_id {
372 self.meta.mark_visited(slot);
373 }
374
375 cache_lock(&self.stats).hits += 1;
376
377 Some(page)
378 } else {
379 cache_lock(&self.stats).misses += 1;
380 None
381 }
382 }
383
384 pub fn insert(&self, page_id: u32, page: Page) -> Option<Page> {
389 {
391 let index = cache_read(&self.index);
392 if let Some(&slot) = index.get(&page_id) {
393 drop(index);
394
395 let mut entries = cache_write(&self.entries);
400 if let Some(Some(entry)) = entries.get_mut(slot) {
401 entry.page = page;
402 }
403 drop(entries);
404 self.meta.mark_visited(slot);
405 return None;
406 }
407 }
408
409 let mut evicted = None;
411
412 let current_len = self.len();
414 if current_len >= self.capacity {
415 evicted = self.evict();
417 }
418
419 let slot = {
421 let mut free_slots = cache_lock(&self.free_slots);
422 if let Some(slot) = free_slots.pop() {
423 slot
424 } else {
425 drop(free_slots);
426 let mut entries = cache_write(&self.entries);
428 let slot = entries.len();
429 entries.push(None);
430 slot
431 }
432 };
433
434 {
436 let mut entries = cache_write(&self.entries);
437
438 while entries.len() <= slot {
440 entries.push(None);
441 }
442
443 entries[slot] = Some(CacheEntry::new(page));
444 }
445
446 {
448 let mut index = cache_write(&self.index);
449 index.insert(page_id, slot);
450 }
451
452 self.meta.occupy(slot, page_id);
456
457 {
459 let mut fifo = cache_lock(&self.fifo);
460 fifo.push_back(page_id);
461 }
462
463 evicted
464 }
465
466 fn evict(&self) -> Option<Page> {
470 let mut fifo = cache_lock(&self.fifo);
471 let mut hand = cache_lock(&self.hand);
472
473 if fifo.is_empty() {
474 return None;
475 }
476
477 let fifo_len = fifo.len();
478 let mut attempts = 0;
479
480 loop {
482 if attempts >= fifo_len * 2 {
483 return None;
485 }
486
487 if *hand >= fifo_len {
489 *hand = 0;
490 }
491
492 let page_id = fifo[*hand];
493 attempts += 1;
494
495 let slot = {
497 let index = cache_read(&self.index);
498 match index.get(&page_id) {
499 Some(&s) => s,
500 None => {
501 *hand += 1;
503 continue;
504 }
505 }
506 };
507
508 let (should_evict, dirty) = {
512 let entries = cache_read(&self.entries);
513 match entries.get(slot).and_then(|e| e.as_ref()) {
514 Some(entry) => {
515 if entry.pin_count > 0 {
516 (false, false)
518 } else if self.meta.is_visited(slot) {
519 (false, false)
521 } else {
522 (true, entry.dirty)
524 }
525 }
526 None => {
527 *hand += 1;
528 continue;
529 }
530 }
531 };
532
533 if !should_evict {
534 self.meta.clear_visited(slot);
537 *hand += 1;
538 continue;
539 }
540
541 let evicted_page = {
546 let mut entries = cache_write(&self.entries);
547 debug_assert!(
548 entries
549 .get(slot)
550 .and_then(|e| e.as_ref())
551 .map(|e| e.pin_count == 0)
552 .unwrap_or(true),
553 "ADR 0033 violation: eviction selected a pinned page (slot {slot})"
554 );
555 let entry = entries[slot].take();
556 entry.map(|e| e.page)
557 };
558
559 self.meta.vacate(slot);
561
562 {
564 let mut index = cache_write(&self.index);
565 index.remove(&page_id);
566 }
567
568 fifo.remove(*hand);
570
571 {
573 let mut free_slots = cache_lock(&self.free_slots);
574 free_slots.push(slot);
575 }
576
577 {
579 let mut stats = cache_lock(&self.stats);
580 stats.evictions += 1;
581 if dirty {
582 stats.writebacks += 1;
583 }
584 }
585
586 if dirty {
588 return evicted_page;
589 } else {
590 return None;
591 }
592 }
593 }
594
595 pub fn mark_dirty(&self, page_id: u32) {
597 let index = cache_read(&self.index);
598 if let Some(&slot) = index.get(&page_id) {
599 drop(index);
600
601 let mut entries = cache_write(&self.entries);
602 if let Some(Some(entry)) = entries.get_mut(slot) {
603 entry.dirty = true;
604 }
605 }
606 }
607
608 pub fn mark_clean(&self, page_id: u32) {
610 let index = cache_read(&self.index);
611 if let Some(&slot) = index.get(&page_id) {
612 drop(index);
613
614 let mut entries = cache_write(&self.entries);
615 if let Some(Some(entry)) = entries.get_mut(slot) {
616 entry.dirty = false;
617 }
618 }
619 }
620
621 pub fn pin(&self, page_id: u32) -> bool {
623 let index = cache_read(&self.index);
624 if let Some(&slot) = index.get(&page_id) {
625 drop(index);
626
627 let mut entries = cache_write(&self.entries);
628 if let Some(Some(entry)) = entries.get_mut(slot) {
629 entry.pin_count += 1;
630 return true;
631 }
632 }
633 false
634 }
635
636 pub fn unpin(&self, page_id: u32) -> bool {
638 let index = cache_read(&self.index);
639 if let Some(&slot) = index.get(&page_id) {
640 drop(index);
641
642 let mut entries = cache_write(&self.entries);
643 if let Some(Some(entry)) = entries.get_mut(slot) {
644 if entry.pin_count > 0 {
645 entry.pin_count -= 1;
646 return true;
647 }
648 }
649 }
650 false
651 }
652
653 pub fn remove(&self, page_id: u32) -> Option<Page> {
655 let slot = {
657 let mut index = cache_write(&self.index);
658 index.remove(&page_id)?
659 };
660
661 let entry = {
663 let mut entries = cache_write(&self.entries);
664 entries.get_mut(slot).and_then(|e| e.take())
665 };
666
667 self.meta.vacate(slot);
669
670 {
672 let mut fifo = cache_lock(&self.fifo);
673 fifo.retain(|&id| id != page_id);
674 }
675
676 {
678 let mut free_slots = cache_lock(&self.free_slots);
679 free_slots.push(slot);
680 }
681
682 entry.map(|e| e.page)
683 }
684
685 pub fn flush_dirty(&self) -> Vec<(u32, Page)> {
689 let mut dirty_pages = Vec::new();
690
691 let index = cache_read(&self.index);
692 let entries = cache_read(&self.entries);
693
694 for (&page_id, &slot) in index.iter() {
695 if let Some(Some(entry)) = entries.get(slot) {
696 if entry.dirty {
697 dirty_pages.push((page_id, entry.page.clone()));
698 }
699 }
700 }
701
702 drop(entries);
703 drop(index);
704
705 for (page_id, _) in &dirty_pages {
707 self.mark_clean(*page_id);
708 }
709
710 let count = dirty_pages.len();
711 cache_lock(&self.stats).writebacks += count as u64;
712
713 dirty_pages
714 }
715
716 pub fn flush_some_dirty(&self, max: usize) -> Vec<(u32, Page)> {
722 if max == 0 {
723 return Vec::new();
724 }
725 let mut dirty_pages = Vec::with_capacity(max);
726
727 let index = cache_read(&self.index);
728 let entries = cache_read(&self.entries);
729
730 for (&page_id, &slot) in index.iter() {
731 if dirty_pages.len() >= max {
732 break;
733 }
734 if let Some(Some(entry)) = entries.get(slot) {
735 if entry.dirty {
736 dirty_pages.push((page_id, entry.page.clone()));
737 }
738 }
739 }
740
741 drop(entries);
742 drop(index);
743
744 for (page_id, _) in &dirty_pages {
745 self.mark_clean(*page_id);
746 }
747
748 let count = dirty_pages.len();
749 cache_lock(&self.stats).writebacks += count as u64;
750
751 dirty_pages
752 }
753
754 pub fn dirty_count(&self) -> usize {
757 let index = cache_read(&self.index);
758 let entries = cache_read(&self.entries);
759 let mut count = 0;
760 for (_, &slot) in index.iter() {
761 if let Some(Some(entry)) = entries.get(slot) {
762 if entry.dirty {
763 count += 1;
764 }
765 }
766 }
767 count
768 }
769
770 pub fn clear(&self) {
772 let mut index = cache_write(&self.index);
773 let mut entries = cache_write(&self.entries);
774 let mut fifo = cache_lock(&self.fifo);
775 let mut free_slots = cache_lock(&self.free_slots);
776
777 index.clear();
778 entries.clear();
779 fifo.clear();
780 free_slots.clear();
781 self.meta.reset();
782 *cache_lock(&self.hand) = 0;
783 }
784
785 pub fn contains(&self, page_id: u32) -> bool {
787 cache_read(&self.index).contains_key(&page_id)
788 }
789
790 pub fn page_ids(&self) -> Vec<u32> {
792 cache_read(&self.index).keys().copied().collect()
793 }
794}
795
796impl Default for PageCacheShard {
797 fn default() -> Self {
798 Self::with_default_capacity()
799 }
800}
801
802const NUM_SHARDS: usize = 8;
808
809pub struct PageCache {
819 shards: Box<[PageCacheShard]>,
820 capacity: usize,
821}
822
823impl PageCache {
824 pub fn new(capacity: usize) -> Self {
825 let per_shard = capacity.div_ceil(NUM_SHARDS).max(MIN_CACHE_CAPACITY);
829 let total = per_shard * NUM_SHARDS;
830 let shards: Vec<PageCacheShard> = (0..NUM_SHARDS)
831 .map(|_| PageCacheShard::new(per_shard))
832 .collect();
833 Self {
834 shards: shards.into_boxed_slice(),
835 capacity: total,
836 }
837 }
838
839 pub fn with_default_capacity() -> Self {
840 Self::new(DEFAULT_CACHE_CAPACITY)
841 }
842
843 #[inline]
844 fn shard_for(&self, page_id: u32) -> &PageCacheShard {
845 &self.shards[(page_id as usize) & (NUM_SHARDS - 1)]
846 }
847
848 pub fn len(&self) -> usize {
849 self.shards.iter().map(|s| s.len()).sum()
850 }
851
852 pub fn is_empty(&self) -> bool {
853 self.shards.iter().all(|s| s.is_empty())
854 }
855
856 pub fn capacity(&self) -> usize {
857 self.capacity
858 }
859
860 pub fn stats(&self) -> CacheStats {
861 let mut agg = CacheStats::default();
862 for s in self.shards.iter() {
863 let cs = s.stats();
864 agg.hits += cs.hits;
865 agg.misses += cs.misses;
866 agg.evictions += cs.evictions;
867 agg.writebacks += cs.writebacks;
868 }
869 agg
870 }
871
872 pub fn reset_stats(&self) {
873 for s in self.shards.iter() {
874 s.reset_stats();
875 }
876 }
877
878 pub fn get(&self, page_id: u32) -> Option<Page> {
879 self.shard_for(page_id).get(page_id)
880 }
881
882 pub fn insert(&self, page_id: u32, page: Page) -> Option<Page> {
883 self.shard_for(page_id).insert(page_id, page)
884 }
885
886 pub fn mark_dirty(&self, page_id: u32) {
887 self.shard_for(page_id).mark_dirty(page_id)
888 }
889
890 pub fn mark_clean(&self, page_id: u32) {
891 self.shard_for(page_id).mark_clean(page_id)
892 }
893
894 pub fn pin(&self, page_id: u32) -> bool {
895 self.shard_for(page_id).pin(page_id)
896 }
897
898 pub fn unpin(&self, page_id: u32) -> bool {
899 self.shard_for(page_id).unpin(page_id)
900 }
901
902 pub fn remove(&self, page_id: u32) -> Option<Page> {
903 self.shard_for(page_id).remove(page_id)
904 }
905
906 pub fn contains(&self, page_id: u32) -> bool {
907 self.shard_for(page_id).contains(page_id)
908 }
909
910 pub fn flush_dirty(&self) -> Vec<(u32, Page)> {
911 let mut out = Vec::new();
912 for s in self.shards.iter() {
913 out.extend(s.flush_dirty());
914 }
915 out
916 }
917
918 pub fn flush_some_dirty(&self, max: usize) -> Vec<(u32, Page)> {
919 if max == 0 {
920 return Vec::new();
921 }
922 let mut out = Vec::with_capacity(max);
923 for s in self.shards.iter() {
924 if out.len() >= max {
925 break;
926 }
927 let budget = max - out.len();
928 out.extend(s.flush_some_dirty(budget));
929 }
930 out
931 }
932
933 pub fn dirty_count(&self) -> usize {
934 self.shards.iter().map(|s| s.dirty_count()).sum()
935 }
936
937 pub fn clear(&self) {
938 for s in self.shards.iter() {
939 s.clear();
940 }
941 }
942
943 pub fn page_ids(&self) -> Vec<u32> {
944 let mut out = Vec::with_capacity(self.len());
945 for s in self.shards.iter() {
946 out.extend(s.page_ids());
947 }
948 out
949 }
950}
951
952impl Default for PageCache {
953 fn default() -> Self {
954 Self::with_default_capacity()
955 }
956}
957
958#[cfg(test)]
959mod tests {
960 use super::*;
961 use crate::storage::engine::page::PageType;
962
963 fn make_page(id: u32) -> Page {
964 Page::new(PageType::BTreeLeaf, id)
965 }
966
967 #[test]
968 fn test_cache_basic() {
969 let cache = PageCacheShard::new(100);
970
971 assert!(cache.is_empty());
972 assert_eq!(cache.capacity(), 100);
973
974 cache.insert(1, make_page(1));
976 assert_eq!(cache.len(), 1);
977 assert!(cache.contains(1));
978
979 let page = cache.get(1);
981 assert!(page.is_some());
982
983 let page = cache.get(999);
985 assert!(page.is_none());
986 }
987
988 #[test]
989 fn test_cache_eviction() {
990 let cache = PageCacheShard::new(4);
991
992 for i in 0..4 {
994 cache.insert(i, make_page(i));
995 }
996
997 assert_eq!(cache.len(), 4);
998
999 cache.insert(100, make_page(100));
1001
1002 assert_eq!(cache.len(), 4);
1004 assert!(cache.contains(100));
1005 }
1006
1007 #[test]
1008 fn test_cache_sieve_visited() {
1009 let cache = PageCacheShard::new(4);
1010
1011 for i in 0..4 {
1013 cache.insert(i, make_page(i));
1014 }
1015
1016 cache.get(0);
1018
1019 cache.insert(100, make_page(100));
1021
1022 assert!(cache.contains(0));
1024 assert!(cache.contains(100));
1025 }
1026
1027 #[test]
1028 fn test_cache_dirty() {
1029 let cache = PageCacheShard::new(100);
1030
1031 cache.insert(1, make_page(1));
1032 cache.mark_dirty(1);
1033
1034 let dirty = cache.flush_dirty();
1035 assert_eq!(dirty.len(), 1);
1036 assert_eq!(dirty[0].0, 1);
1037
1038 let dirty = cache.flush_dirty();
1040 assert_eq!(dirty.len(), 0);
1041 }
1042
1043 #[test]
1044 fn test_cache_pin() {
1045 let cache = PageCacheShard::new(2);
1046
1047 cache.insert(1, make_page(1));
1048 cache.insert(2, make_page(2));
1049
1050 assert!(cache.pin(1));
1052
1053 cache.insert(3, make_page(3));
1055
1056 assert!(cache.contains(1));
1058
1059 assert!(cache.unpin(1));
1061 }
1062
1063 #[test]
1064 fn test_cache_remove() {
1065 let cache = PageCacheShard::new(100);
1066
1067 cache.insert(1, make_page(1));
1068 assert!(cache.contains(1));
1069
1070 let removed = cache.remove(1);
1071 assert!(removed.is_some());
1072 assert!(!cache.contains(1));
1073 }
1074
1075 #[test]
1076 fn test_cache_stats() {
1077 let cache = PageCacheShard::new(100);
1078
1079 cache.insert(1, make_page(1));
1080
1081 cache.get(1);
1083 cache.get(1);
1084
1085 cache.get(999);
1087
1088 let stats = cache.stats();
1089 assert_eq!(stats.hits, 2);
1090 assert_eq!(stats.misses, 1);
1091 assert!((stats.hit_rate() - 0.666).abs() < 0.01);
1092 }
1093
1094 #[test]
1095 fn test_cache_clear() {
1096 let cache = PageCacheShard::new(100);
1097
1098 for i in 0..50 {
1099 cache.insert(i, make_page(i));
1100 }
1101
1102 assert_eq!(cache.len(), 50);
1103
1104 cache.clear();
1105 assert!(cache.is_empty());
1106 }
1107
1108 #[test]
1109 fn test_cache_update_existing() {
1110 let cache = PageCacheShard::new(100);
1111
1112 let mut page1 = make_page(1);
1113 page1.as_bytes_mut()[100] = 0xAA;
1114 cache.insert(1, page1);
1115
1116 let mut page1_updated = make_page(1);
1117 page1_updated.as_bytes_mut()[100] = 0xBB;
1118 cache.insert(1, page1_updated);
1119
1120 assert_eq!(cache.len(), 1);
1122
1123 let retrieved = cache.get(1).unwrap();
1125 assert_eq!(retrieved.as_bytes()[100], 0xBB);
1126 }
1127
1128 #[test]
1129 fn test_cache_recovers_after_index_lock_poisoning() {
1130 let cache = std::sync::Arc::new(PageCacheShard::new(8));
1131 let poison_target = std::sync::Arc::clone(&cache);
1132 let _ = std::thread::spawn(move || {
1133 let _guard = poison_target
1134 .index
1135 .write()
1136 .expect("index lock should be acquired");
1137 panic!("poison index lock");
1138 })
1139 .join();
1140
1141 cache.insert(1, make_page(1));
1142 assert!(cache.contains(1));
1143 assert!(cache.get(1).is_some());
1144 }
1145
1146 #[test]
1147 fn test_cache_recovers_after_stats_lock_poisoning() {
1148 let cache = std::sync::Arc::new(PageCacheShard::new(8));
1149 let poison_target = std::sync::Arc::clone(&cache);
1150 let _ = std::thread::spawn(move || {
1151 let _guard = poison_target
1152 .stats
1153 .lock()
1154 .expect("stats lock should be acquired");
1155 panic!("poison stats lock");
1156 })
1157 .join();
1158
1159 assert!(cache.get(999).is_none());
1160 assert_eq!(cache.stats().misses, 1);
1161 cache.reset_stats();
1162 assert_eq!(cache.stats().misses, 0);
1163 }
1164
1165 #[test]
1172 fn test_metadata_layout_is_cache_line_packed() {
1173 assert_eq!(std::mem::align_of::<VisitedLine>(), CACHE_LINE);
1174 assert_eq!(std::mem::size_of::<VisitedLine>(), CACHE_LINE);
1175 assert_eq!(std::mem::align_of::<TagLine>(), CACHE_LINE);
1176 assert_eq!(std::mem::size_of::<TagLine>(), CACHE_LINE);
1177
1178 let meta = ShardMeta::new(200);
1180 assert_eq!(meta.visited.len(), 200usize.div_ceil(64));
1182 assert_eq!(meta.tag.len(), 200usize.div_ceil(16));
1183
1184 meta.occupy(5, 42);
1186 assert_eq!(meta.tag(5), 42);
1187 assert!(!meta.is_visited(5));
1188 meta.mark_visited(5);
1189 assert!(meta.is_visited(5));
1190 meta.vacate(5);
1191 assert_eq!(meta.tag(5), TAG_EMPTY);
1192 assert!(!meta.is_visited(5));
1193 }
1194
1195 #[test]
1200 fn test_hit_sets_visited_and_preserves_sieve_policy() {
1201 let cache = PageCacheShard::new(4);
1202 for i in 0..4 {
1203 cache.insert(i, make_page(i));
1204 }
1205
1206 assert!(cache.get(0).is_some());
1208 assert!(cache.get(1).is_some());
1209
1210 cache.insert(10, make_page(10));
1212 cache.insert(11, make_page(11));
1213
1214 assert!(cache.contains(0), "hit page 0 must survive");
1215 assert!(cache.contains(1), "hit page 1 must survive");
1216 assert!(cache.contains(10));
1217 assert!(cache.contains(11));
1218 assert!(!cache.contains(2));
1219 assert!(!cache.contains(3));
1220 }
1221
1222 #[test]
1228 fn test_concurrent_hits_never_evict_pinned_or_lose_dirty() {
1229 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
1230 use std::sync::Arc;
1231
1232 const CAP: u32 = 64;
1233 const PINNED: u32 = 8; const DIRTY: u32 = 8; const HITTERS: usize = 6;
1236 const INSERTERS: usize = 4;
1237 const ROUNDS: usize = 4_000;
1238
1239 let cache = Arc::new(PageCacheShard::new(CAP as usize));
1240 for i in 0..CAP {
1241 cache.insert(i, make_page(i));
1242 }
1243 for p in 0..PINNED {
1244 assert!(cache.pin(p), "pin {p}");
1245 }
1246 for p in PINNED..PINNED + DIRTY {
1247 cache.mark_dirty(p);
1248 }
1249
1250 let dirty_written_back = Arc::new(AtomicUsize::new(0));
1253 let stop = Arc::new(AtomicBool::new(false));
1254
1255 let mut hitters = Vec::with_capacity(HITTERS);
1257 for h in 0..HITTERS {
1258 let cache = Arc::clone(&cache);
1259 let stop = Arc::clone(&stop);
1260 hitters.push(std::thread::spawn(move || {
1261 let mut id = (h as u32) % CAP;
1262 while !stop.load(Ordering::Relaxed) {
1263 let _ = cache.get(id);
1264 id = (id + 1) % CAP;
1265 }
1266 }));
1267 }
1268
1269 let mut inserters = Vec::with_capacity(INSERTERS);
1271 for w in 0..INSERTERS {
1272 let cache = Arc::clone(&cache);
1273 let dirty_written_back = Arc::clone(&dirty_written_back);
1274 inserters.push(std::thread::spawn(move || {
1275 let base = 1_000 + (w as u32) * 1_000_000;
1276 for i in 0..ROUNDS {
1277 let id = base + i as u32;
1278 if cache.insert(id, make_page(id)).is_some() {
1279 dirty_written_back.fetch_add(1, Ordering::Relaxed);
1281 }
1282 for p in 0..PINNED {
1285 assert!(
1286 cache.contains(p),
1287 "pinned page {p} was evicted under concurrency"
1288 );
1289 }
1290 }
1291 }));
1292 }
1293
1294 for h in inserters {
1296 h.join()
1297 .expect("inserter thread panicked (invariant broke)");
1298 }
1299 stop.store(true, Ordering::Relaxed);
1300 for h in hitters {
1301 h.join().expect("hitter thread panicked (invariant broke)");
1302 }
1303
1304 for p in 0..PINNED {
1306 assert!(cache.contains(p), "pinned page {p} missing after storm");
1307 }
1308
1309 let dirty_resident = (PINNED..PINNED + DIRTY)
1313 .filter(|&p| cache.contains(p))
1314 .count();
1315 assert!(
1316 dirty_resident + dirty_written_back.load(Ordering::Relaxed) >= DIRTY as usize,
1317 "dirty pages lost: resident={dirty_resident} written_back={}",
1318 dirty_written_back.load(Ordering::Relaxed)
1319 );
1320
1321 }
1329
1330 mod legacy_baseline {
1338 use super::Page;
1339 use std::collections::HashMap;
1340 use std::sync::RwLock;
1341
1342 pub struct LegacyPageCache {
1343 entries: RwLock<HashMap<u32, Page>>,
1344 }
1345
1346 impl LegacyPageCache {
1347 pub fn new(_capacity: usize) -> Self {
1348 Self {
1349 entries: RwLock::new(HashMap::new()),
1350 }
1351 }
1352
1353 pub fn insert(&self, page_id: u32, page: Page) {
1354 let mut entries = self.entries.write().unwrap();
1355 entries.insert(page_id, page);
1356 }
1357
1358 pub fn get(&self, page_id: u32) -> Option<Page> {
1359 let entries = self.entries.read().unwrap();
1360 entries.get(&page_id).cloned()
1361 }
1362 }
1363 }
1364
1365 fn run_workload<F>(workers: usize, ops_per_worker: usize, run: F) -> std::time::Duration
1370 where
1371 F: Fn(u32, &Page) + Send + Sync + 'static + Clone,
1372 {
1373 use std::sync::Arc;
1374 use std::time::Instant;
1375
1376 let run = Arc::new(run);
1377 let start = Instant::now();
1378 let mut handles = Vec::with_capacity(workers);
1379 for w in 0..workers {
1380 let run = Arc::clone(&run);
1381 handles.push(std::thread::spawn(move || {
1382 let base = (w as u32) * 1_000_000;
1383 let page = make_page(0);
1384 for i in 0..ops_per_worker {
1385 let id = base + (i as u32);
1386 run(id, &page);
1387 }
1388 }));
1389 }
1390 for h in handles {
1391 h.join().unwrap();
1392 }
1393 start.elapsed()
1394 }
1395
1396 #[test]
1397 fn test_sharded_cache_scales_concurrently() {
1398 use std::sync::Arc;
1409
1410 const WORKERS: usize = 10;
1411 const OPS: usize = 5_000;
1412 const CAPACITY: usize = 200_000;
1413
1414 let sharded = Arc::new(PageCache::new(CAPACITY));
1416 let s1 = Arc::clone(&sharded);
1417 let sharded_serial = run_workload(1, OPS * WORKERS, move |id, page| {
1418 s1.insert(id, page.clone());
1419 let _ = s1.get(id);
1420 });
1421
1422 let sharded = Arc::new(PageCache::new(CAPACITY));
1424 let s2 = Arc::clone(&sharded);
1425 let sharded_parallel = run_workload(WORKERS, OPS, move |id, page| {
1426 s2.insert(id, page.clone());
1427 let _ = s2.get(id);
1428 });
1429
1430 let legacy = Arc::new(legacy_baseline::LegacyPageCache::new(CAPACITY));
1432 let l2 = Arc::clone(&legacy);
1433 let legacy_parallel = run_workload(WORKERS, OPS, move |id, page| {
1434 l2.insert(id, page.clone());
1435 let _ = l2.get(id);
1436 });
1437
1438 eprintln!(
1439 "page_cache concurrency: sharded 1w={:?} sharded {}w={:?} legacy {}w={:?}",
1440 sharded_serial, WORKERS, sharded_parallel, WORKERS, legacy_parallel
1441 );
1442
1443 assert!(
1446 sharded_parallel.as_nanos() < sharded_serial.as_nanos() * 7,
1447 "sharded cache did not scale: 1w={:?} {}w={:?}",
1448 sharded_serial,
1449 WORKERS,
1450 sharded_parallel
1451 );
1452
1453 assert!(
1459 sharded_parallel.as_nanos() * 12 < legacy_parallel.as_nanos() * 10,
1460 "sharded cache did not beat legacy baseline: sharded {}w={:?} legacy {}w={:?}",
1461 WORKERS,
1462 sharded_parallel,
1463 WORKERS,
1464 legacy_parallel
1465 );
1466 }
1467}