1use std::collections::HashMap;
18use std::hash::Hash;
19use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
20use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
21
22fn recover_read_guard<'a, T>(lock: &'a RwLock<T>, name: &str) -> RwLockReadGuard<'a, T> {
23 match lock.read() {
24 Ok(guard) => guard,
25 Err(poisoned) => {
26 tracing::warn!(lock = name, "RwLock poisoned, recovering read guard");
27 poisoned.into_inner()
28 }
29 }
30}
31
32fn recover_write_guard<'a, T>(lock: &'a RwLock<T>, name: &str) -> RwLockWriteGuard<'a, T> {
33 match lock.write() {
34 Ok(guard) => guard,
35 Err(poisoned) => {
36 tracing::warn!(lock = name, "RwLock poisoned, recovering write guard");
37 poisoned.into_inner()
38 }
39 }
40}
41
42pub type PageId = u64;
44
45pub const DEFAULT_PAGE_SIZE: usize = 4096;
47
48#[derive(Debug)]
50struct CacheEntry<V> {
51 value: V,
53 visited: AtomicBool,
55 index: usize,
57 dirty: AtomicBool,
59 pin_count: AtomicUsize,
61}
62
63impl<V> CacheEntry<V> {
64 fn new(value: V, index: usize) -> Self {
65 Self {
66 value,
67 visited: AtomicBool::new(true), index,
69 dirty: AtomicBool::new(false),
70 pin_count: AtomicUsize::new(0),
71 }
72 }
73
74 fn is_visited(&self) -> bool {
75 self.visited.load(Ordering::Relaxed)
76 }
77
78 fn set_visited(&self, visited: bool) {
79 self.visited.store(visited, Ordering::Relaxed);
80 }
81
82 fn is_dirty(&self) -> bool {
83 self.dirty.load(Ordering::Relaxed)
84 }
85
86 fn mark_dirty(&self) {
87 self.dirty.store(true, Ordering::Relaxed);
88 }
89
90 fn clear_dirty(&self) {
91 self.dirty.store(false, Ordering::Relaxed);
92 }
93
94 fn pin(&self) {
95 self.pin_count.fetch_add(1, Ordering::SeqCst);
96 }
97
98 fn unpin(&self) {
99 self.pin_count.fetch_sub(1, Ordering::SeqCst);
100 }
101
102 fn is_pinned(&self) -> bool {
103 self.pin_count.load(Ordering::SeqCst) > 0
104 }
105}
106
107#[derive(Debug, Clone)]
109enum Slot<K>
110where
111 K: Clone,
112{
113 Empty,
115 Occupied(K),
117}
118
119#[derive(Debug, Clone)]
121pub struct CacheConfig {
122 pub capacity: usize,
124 pub page_size: usize,
126 pub collect_stats: bool,
128}
129
130impl Default for CacheConfig {
131 fn default() -> Self {
132 Self {
133 capacity: 1024,
134 page_size: DEFAULT_PAGE_SIZE,
135 collect_stats: true,
136 }
137 }
138}
139
140impl CacheConfig {
141 pub fn with_capacity(capacity: usize) -> Self {
143 Self {
144 capacity,
145 ..Default::default()
146 }
147 }
148
149 pub fn with_page_size(mut self, page_size: usize) -> Self {
151 self.page_size = page_size;
152 self
153 }
154
155 pub fn memory_size(&self) -> usize {
157 self.capacity * self.page_size
158 }
159}
160
161#[derive(Debug, Clone, Default)]
163pub struct CacheStats {
164 pub hits: u64,
166 pub misses: u64,
168 pub insertions: u64,
170 pub evictions: u64,
172 pub entries: usize,
174 pub writebacks: u64,
176 pub sweeps: u64,
178}
179
180impl CacheStats {
181 pub fn hit_ratio(&self) -> f64 {
183 let total = self.hits + self.misses;
184 if total == 0 {
185 0.0
186 } else {
187 self.hits as f64 / total as f64
188 }
189 }
190
191 pub fn miss_ratio(&self) -> f64 {
193 1.0 - self.hit_ratio()
194 }
195}
196
197struct AtomicStats {
199 hits: AtomicU64,
200 misses: AtomicU64,
201 insertions: AtomicU64,
202 evictions: AtomicU64,
203 writebacks: AtomicU64,
204 sweeps: AtomicU64,
205}
206
207impl AtomicStats {
208 fn new() -> Self {
209 Self {
210 hits: AtomicU64::new(0),
211 misses: AtomicU64::new(0),
212 insertions: AtomicU64::new(0),
213 evictions: AtomicU64::new(0),
214 writebacks: AtomicU64::new(0),
215 sweeps: AtomicU64::new(0),
216 }
217 }
218
219 fn to_stats(&self, entries: usize) -> CacheStats {
220 CacheStats {
221 hits: self.hits.load(Ordering::Relaxed),
222 misses: self.misses.load(Ordering::Relaxed),
223 insertions: self.insertions.load(Ordering::Relaxed),
224 evictions: self.evictions.load(Ordering::Relaxed),
225 entries,
226 writebacks: self.writebacks.load(Ordering::Relaxed),
227 sweeps: self.sweeps.load(Ordering::Relaxed),
228 }
229 }
230}
231
232pub trait PageWriter<K, V>: Send + Sync {
234 fn write_page(&self, key: &K, value: &V) -> std::io::Result<()>;
236}
237
238pub struct NoOpWriter;
240
241impl<K, V> PageWriter<K, V> for NoOpWriter {
242 fn write_page(&self, _key: &K, _value: &V) -> std::io::Result<()> {
243 Ok(())
244 }
245}
246
247pub struct PageCache<K, V, W = NoOpWriter>
249where
250 K: Clone + Eq + Hash,
251 V: Clone,
252 W: PageWriter<K, V>,
253{
254 config: CacheConfig,
256 entries: RwLock<HashMap<K, Arc<CacheEntry<V>>>>,
258 slots: RwLock<Vec<Slot<K>>>,
260 hand: AtomicUsize,
262 count: AtomicUsize,
264 stats: AtomicStats,
266 writer: W,
268 rings:
275 RwLock<HashMap<super::strategy::BufferAccessStrategy, Arc<super::ring::BufferRing<K, V>>>>,
276}
277
278impl<K, V> PageCache<K, V, NoOpWriter>
279where
280 K: Clone + Eq + Hash,
281 V: Clone,
282{
283 pub fn new(config: CacheConfig) -> Self {
285 Self::with_writer(config, NoOpWriter)
286 }
287
288 pub fn with_capacity(capacity: usize) -> Self {
290 Self::new(CacheConfig::with_capacity(capacity))
291 }
292}
293
294impl<K, V, W> PageCache<K, V, W>
295where
296 K: Clone + Eq + Hash,
297 V: Clone,
298 W: PageWriter<K, V>,
299{
300 pub fn with_writer(config: CacheConfig, writer: W) -> Self {
302 let capacity = config.capacity;
303 Self {
304 config,
305 entries: RwLock::new(HashMap::with_capacity(capacity)),
306 slots: RwLock::new(vec![Slot::Empty; capacity]),
307 hand: AtomicUsize::new(0),
308 count: AtomicUsize::new(0),
309 stats: AtomicStats::new(),
310 writer,
311 rings: RwLock::new(HashMap::new()),
312 }
313 }
314
315 pub fn get_with(&self, key: &K, strategy: super::strategy::BufferAccessStrategy) -> Option<V> {
324 if let Some(v) = self.get(key) {
327 return Some(v);
328 }
329 if strategy.is_ring() {
331 if let Some(ring) = self.get_ring(strategy) {
332 return ring.get(key);
333 }
334 }
335 None
336 }
337
338 pub fn insert_with(
346 &self,
347 key: K,
348 value: V,
349 strategy: super::strategy::BufferAccessStrategy,
350 ) -> Option<(K, V)> {
351 if !strategy.is_ring() {
352 let prev = self.insert(key.clone(), value);
355 return prev.map(|v| (key, v));
356 }
357 let ring = self.ensure_ring(strategy);
358 ring.insert(key, value)
359 }
360
361 fn ensure_ring(
363 &self,
364 strategy: super::strategy::BufferAccessStrategy,
365 ) -> Arc<super::ring::BufferRing<K, V>> {
366 {
368 let rings = recover_read_guard(&self.rings, "rings");
369 if let Some(r) = rings.get(&strategy) {
370 return Arc::clone(r);
371 }
372 }
373 let mut rings = recover_write_guard(&self.rings, "rings");
375 if let Some(r) = rings.get(&strategy) {
376 return Arc::clone(r);
377 }
378 let cap = strategy.ring_size().unwrap_or(16);
379 let ring = Arc::new(super::ring::BufferRing::new(cap));
380 rings.insert(strategy, Arc::clone(&ring));
381 ring
382 }
383
384 fn get_ring(
386 &self,
387 strategy: super::strategy::BufferAccessStrategy,
388 ) -> Option<Arc<super::ring::BufferRing<K, V>>> {
389 let rings = recover_read_guard(&self.rings, "rings");
390 rings.get(&strategy).cloned()
391 }
392
393 pub fn clear_strategy_rings(&self) {
396 let rings = recover_read_guard(&self.rings, "rings");
397 for ring in rings.values() {
398 ring.clear();
399 }
400 }
401
402 pub fn get(&self, key: &K) -> Option<V> {
404 let entries = recover_read_guard(&self.entries, "entries");
405
406 if let Some(entry) = entries.get(key) {
407 entry.set_visited(true);
409
410 if self.config.collect_stats {
411 self.stats.hits.fetch_add(1, Ordering::Relaxed);
412 }
413
414 Some(entry.value.clone())
415 } else {
416 if self.config.collect_stats {
417 self.stats.misses.fetch_add(1, Ordering::Relaxed);
418 }
419 None
420 }
421 }
422
423 pub fn contains(&self, key: &K) -> bool {
425 recover_read_guard(&self.entries, "entries").contains_key(key)
426 }
427
428 pub fn insert(&self, key: K, value: V) -> Option<V> {
430 {
432 let entries = recover_read_guard(&self.entries, "entries");
433 if let Some(entry) = entries.get(&key) {
434 entry.set_visited(true);
435 let old_value = entry.value.clone();
436 drop(entries);
437 return self.update_existing(key, value, old_value);
438 }
439 }
440
441 let index = if self.count.load(Ordering::Acquire) >= self.config.capacity {
448 self.evict_one()
449 } else {
450 None
451 };
452
453 let mut entries = recover_write_guard(&self.entries, "entries");
455 let mut slots = recover_write_guard(&self.slots, "slots");
456
457 if entries.contains_key(&key) {
459 if let Some(entry) = entries.get(&key) {
460 entry.set_visited(true);
461 }
462 return None;
463 }
464
465 let slot_index = if let Some(idx) = index {
467 idx
468 } else {
469 slots.iter().position(|s| matches!(s, Slot::Empty))?
471 };
472
473 let entry = Arc::new(CacheEntry::new(value, slot_index));
475 slots[slot_index] = Slot::Occupied(key.clone());
476 entries.insert(key, entry);
477
478 self.count.fetch_add(1, Ordering::Release);
482
483 if self.config.collect_stats {
484 self.stats.insertions.fetch_add(1, Ordering::Relaxed);
485 }
486
487 None
488 }
489
490 fn update_existing(&self, key: K, new_value: V, old_value: V) -> Option<V> {
492 let mut entries = recover_write_guard(&self.entries, "entries");
493
494 if let Some(old_entry) = entries.get(&key) {
495 let index = old_entry.index;
496 let new_entry = Arc::new(CacheEntry::new(new_value, index));
497 entries.insert(key, new_entry);
498 Some(old_value)
499 } else {
500 None
501 }
502 }
503
504 pub fn remove(&self, key: &K) -> Option<V> {
506 let mut entries = recover_write_guard(&self.entries, "entries");
507
508 if let Some(entry) = entries.remove(key) {
509 let mut slots = recover_write_guard(&self.slots, "slots");
510 slots[entry.index] = Slot::Empty;
511 self.count.fetch_sub(1, Ordering::Release);
512
513 if entry.is_dirty() {
515 let _ = self.writer.write_page(key, &entry.value);
516 if self.config.collect_stats {
517 self.stats.writebacks.fetch_add(1, Ordering::Relaxed);
518 }
519 }
520
521 Some(entry.value.clone())
522 } else {
523 None
524 }
525 }
526
527 fn evict_one(&self) -> Option<usize> {
538 let capacity = self.config.capacity;
539 let max_sweeps = capacity * 2;
540
541 for _ in 0..max_sweeps {
542 let current_hand = self.hand.load(Ordering::Relaxed);
543
544 let eviction: Option<(K, Arc<CacheEntry<V>>)> = {
548 let mut entries = recover_write_guard(&self.entries, "entries");
549 let mut slots = recover_write_guard(&self.slots, "slots");
550
551 if let Slot::Occupied(ref key) = slots[current_hand] {
552 if let Some(entry) = entries.get(key) {
553 if entry.is_pinned() {
554 None
555 } else if entry.is_visited() {
556 entry.set_visited(false);
557 None
558 } else {
559 let key_clone = key.clone();
560 match entries.remove(&key_clone) {
561 None => {
562 let next = (current_hand + 1) % capacity;
564 self.hand.store(next, Ordering::Relaxed);
565 continue;
566 }
567 Some(entry) => {
568 slots[current_hand] = Slot::Empty;
569 self.count.fetch_sub(1, Ordering::Release);
570 let next = (current_hand + 1) % capacity;
571 self.hand.store(next, Ordering::Relaxed);
572 Some((key_clone, entry))
573 }
574 }
575 }
576 } else {
577 None
578 }
579 } else {
580 None
581 }
582 };
584
585 if let Some((key_clone, entry)) = eviction {
586 if entry.is_dirty() {
587 let _ = self.writer.write_page(&key_clone, &entry.value);
588 if self.config.collect_stats {
589 self.stats.writebacks.fetch_add(1, Ordering::Relaxed);
590 }
591 }
592 if self.config.collect_stats {
593 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
594 self.stats.sweeps.fetch_add(1, Ordering::Relaxed);
595 }
596 return Some(current_hand);
597 }
598
599 let next = (current_hand + 1) % capacity;
601 self.hand.store(next, Ordering::Relaxed);
602 }
603
604 if self.config.collect_stats {
605 self.stats.sweeps.fetch_add(1, Ordering::Relaxed);
606 }
607
608 None
609 }
610
611 pub fn pin(&self, key: &K) -> bool {
613 let entries = recover_read_guard(&self.entries, "entries");
614 if let Some(entry) = entries.get(key) {
615 entry.pin();
616 true
617 } else {
618 false
619 }
620 }
621
622 pub fn unpin(&self, key: &K) -> bool {
624 let entries = recover_read_guard(&self.entries, "entries");
625 if let Some(entry) = entries.get(key) {
626 entry.unpin();
627 true
628 } else {
629 false
630 }
631 }
632
633 pub fn mark_dirty(&self, key: &K) -> bool {
635 let entries = recover_read_guard(&self.entries, "entries");
636 if let Some(entry) = entries.get(key) {
637 entry.mark_dirty();
638 true
639 } else {
640 false
641 }
642 }
643
644 pub fn flush(&self) -> std::io::Result<usize> {
646 let entries = recover_read_guard(&self.entries, "entries");
647 let mut flushed = 0;
648
649 for (key, entry) in entries.iter() {
650 if entry.is_dirty() {
651 self.writer.write_page(key, &entry.value)?;
652 entry.clear_dirty();
653 flushed += 1;
654 }
655 }
656
657 if self.config.collect_stats {
658 self.stats
659 .writebacks
660 .fetch_add(flushed as u64, Ordering::Relaxed);
661 }
662
663 Ok(flushed)
664 }
665
666 pub fn clear(&self) {
668 let _ = self.flush();
670
671 let mut entries = recover_write_guard(&self.entries, "entries");
672 let mut slots = recover_write_guard(&self.slots, "slots");
673
674 entries.clear();
675 for slot in slots.iter_mut() {
676 *slot = Slot::Empty;
677 }
678
679 self.count.store(0, Ordering::Relaxed);
681 self.hand.store(0, Ordering::Relaxed);
682 }
683
684 pub fn len(&self) -> usize {
686 self.count.load(Ordering::Acquire)
688 }
689
690 pub fn is_empty(&self) -> bool {
692 self.len() == 0
693 }
694
695 pub fn capacity(&self) -> usize {
697 self.config.capacity
698 }
699
700 pub fn stats(&self) -> CacheStats {
702 self.stats.to_stats(self.len())
703 }
704
705 pub fn config(&self) -> &CacheConfig {
707 &self.config
708 }
709
710 pub fn keys(&self) -> Vec<K> {
712 recover_read_guard(&self.entries, "entries")
713 .keys()
714 .cloned()
715 .collect()
716 }
717
718 pub fn dirty_count(&self) -> usize {
720 recover_read_guard(&self.entries, "entries")
721 .values()
722 .filter(|e| e.is_dirty())
723 .count()
724 }
725}
726
727#[derive(Clone)]
729pub struct Page {
730 data: Vec<u8>,
732 size: usize,
734}
735
736impl Page {
737 pub fn new() -> Self {
739 Self::with_size(DEFAULT_PAGE_SIZE)
740 }
741
742 pub fn with_size(size: usize) -> Self {
744 Self {
745 data: vec![0u8; size],
746 size,
747 }
748 }
749
750 pub fn from_data(data: Vec<u8>) -> Self {
752 let size = data.len();
753 Self { data, size }
754 }
755
756 pub fn data(&self) -> &[u8] {
758 &self.data
759 }
760
761 pub fn data_mut(&mut self) -> &mut [u8] {
763 &mut self.data
764 }
765
766 pub fn size(&self) -> usize {
768 self.size
769 }
770
771 pub fn read(&self, offset: usize, len: usize) -> Option<&[u8]> {
773 if offset + len <= self.size {
774 Some(&self.data[offset..offset + len])
775 } else {
776 None
777 }
778 }
779
780 pub fn write(&mut self, offset: usize, data: &[u8]) -> bool {
782 if offset + data.len() <= self.size {
783 self.data[offset..offset + data.len()].copy_from_slice(data);
784 true
785 } else {
786 false
787 }
788 }
789
790 pub fn read_u32(&self, offset: usize) -> Option<u32> {
792 self.read(offset, 4).map(|bytes| {
793 let mut array = [0u8; 4];
794 array.copy_from_slice(bytes);
795 u32::from_le_bytes(array)
796 })
797 }
798
799 pub fn write_u32(&mut self, offset: usize, value: u32) {
801 self.write(offset, &value.to_le_bytes());
802 }
803
804 pub fn read_u64(&self, offset: usize) -> Option<u64> {
806 self.read(offset, 8).map(|bytes| {
807 let mut array = [0u8; 8];
808 array.copy_from_slice(bytes);
809 u64::from_le_bytes(array)
810 })
811 }
812
813 pub fn write_u64(&mut self, offset: usize, value: u64) {
815 self.write(offset, &value.to_le_bytes());
816 }
817}
818
819impl Default for Page {
820 fn default() -> Self {
821 Self::new()
822 }
823}
824
825impl std::fmt::Debug for Page {
826 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
827 f.debug_struct("Page")
828 .field("size", &self.size)
829 .field("data", &format!("[{} bytes]", self.data.len()))
830 .finish()
831 }
832}
833
834#[cfg(test)]
835mod tests {
836 use super::*;
837
838 #[test]
839 fn test_basic_operations() {
840 let cache: PageCache<u64, String> = PageCache::with_capacity(10);
841
842 cache.insert(1, "one".to_string());
844 cache.insert(2, "two".to_string());
845
846 assert_eq!(cache.get(&1), Some("one".to_string()));
848 assert_eq!(cache.get(&2), Some("two".to_string()));
849 assert_eq!(cache.get(&3), None);
850
851 assert!(cache.contains(&1));
853 assert!(!cache.contains(&3));
854
855 assert_eq!(cache.remove(&1), Some("one".to_string()));
857 assert_eq!(cache.get(&1), None);
858 }
859
860 #[test]
861 fn test_eviction() {
862 let cache: PageCache<u64, String> = PageCache::with_capacity(3);
863
864 cache.insert(1, "one".to_string());
866 cache.insert(2, "two".to_string());
867 cache.insert(3, "three".to_string());
868
869 assert_eq!(cache.len(), 3);
870
871 cache.get(&1);
873 cache.get(&3);
874
875 cache.insert(4, "four".to_string());
877
878 assert_eq!(cache.len(), 3);
879 assert!(cache.contains(&4));
880
881 }
884
885 #[test]
886 fn test_stats() {
887 let cache: PageCache<u64, String> = PageCache::with_capacity(10);
888
889 cache.insert(1, "one".to_string());
890 cache.get(&1); cache.get(&2); let stats = cache.stats();
894 assert_eq!(stats.insertions, 1);
895 assert_eq!(stats.hits, 1);
896 assert_eq!(stats.misses, 1);
897 assert_eq!(stats.hit_ratio(), 0.5);
898 }
899
900 #[test]
901 fn test_pin_unpin() {
902 let cache: PageCache<u64, String> = PageCache::with_capacity(2);
903
904 cache.insert(1, "one".to_string());
905 cache.insert(2, "two".to_string());
906
907 assert!(cache.pin(&1));
909
910 cache.insert(3, "three".to_string());
912
913 assert!(cache.contains(&1));
915
916 cache.unpin(&1);
918 }
919
920 #[test]
921 fn test_page() {
922 let mut page = Page::with_size(64);
923
924 page.write(0, b"hello");
926 assert_eq!(page.read(0, 5), Some(b"hello".as_slice()));
927
928 page.write_u32(8, 0x12345678);
930 assert_eq!(page.read_u32(8), Some(0x12345678));
931
932 page.write_u64(16, 0xDEADBEEF);
934 assert_eq!(page.read_u64(16), Some(0xDEADBEEF));
935
936 assert_eq!(page.read(60, 10), None);
938 }
939
940 #[test]
941 fn test_clear() {
942 let cache: PageCache<u64, String> = PageCache::with_capacity(10);
943
944 cache.insert(1, "one".to_string());
945 cache.insert(2, "two".to_string());
946
947 cache.clear();
948
949 assert!(cache.is_empty());
950 assert_eq!(cache.len(), 0);
951 }
952
953 #[test]
954 fn test_keys() {
955 let cache: PageCache<u64, String> = PageCache::with_capacity(10);
956
957 cache.insert(1, "one".to_string());
958 cache.insert(2, "two".to_string());
959 cache.insert(3, "three".to_string());
960
961 let keys = cache.keys();
962 assert_eq!(keys.len(), 3);
963 assert!(keys.contains(&1));
964 assert!(keys.contains(&2));
965 assert!(keys.contains(&3));
966 }
967
968 #[test]
969 fn test_update() {
970 let cache: PageCache<u64, String> = PageCache::with_capacity(10);
971
972 cache.insert(1, "one".to_string());
973 assert_eq!(cache.get(&1), Some("one".to_string()));
974
975 let old = cache.insert(1, "ONE".to_string());
977 assert_eq!(old, Some("one".to_string()));
978 assert_eq!(cache.get(&1), Some("ONE".to_string()));
979 }
980
981 #[test]
982 fn test_dirty_pages() {
983 let cache: PageCache<u64, String> = PageCache::with_capacity(10);
984
985 cache.insert(1, "one".to_string());
986 cache.insert(2, "two".to_string());
987
988 assert_eq!(cache.dirty_count(), 0);
989
990 cache.mark_dirty(&1);
991 assert_eq!(cache.dirty_count(), 1);
992
993 cache.mark_dirty(&2);
994 assert_eq!(cache.dirty_count(), 2);
995 }
996
997 #[test]
998 fn test_config() {
999 let config = CacheConfig::with_capacity(1024).with_page_size(8192);
1000
1001 assert_eq!(config.capacity, 1024);
1002 assert_eq!(config.page_size, 8192);
1003 assert_eq!(config.memory_size(), 1024 * 8192);
1004 }
1005
1006 use super::super::strategy::BufferAccessStrategy;
1011
1012 #[test]
1013 fn normal_strategy_is_backwards_compatible() {
1014 let cache: PageCache<u64, String> = PageCache::with_capacity(8);
1017 let prev = cache.insert_with(1, "a".to_string(), BufferAccessStrategy::Normal);
1018 assert!(prev.is_none());
1019 assert_eq!(
1020 cache.get_with(&1, BufferAccessStrategy::Normal),
1021 Some("a".to_string())
1022 );
1023 assert_eq!(cache.get(&1), Some("a".to_string()));
1025 }
1026
1027 #[test]
1028 fn sequential_scan_does_not_pollute_main_pool() {
1029 let cache: PageCache<u64, String> = PageCache::with_capacity(64);
1032 for i in 0..50 {
1033 cache.insert(i, format!("hot-{i}"));
1034 }
1035 for k in 1000..1200u64 {
1037 let _ = cache.insert_with(k, format!("cold-{k}"), BufferAccessStrategy::SequentialScan);
1038 }
1039 for i in 0..50u64 {
1041 assert!(
1042 cache.contains(&i),
1043 "hot key {i} was evicted by sequential scan"
1044 );
1045 }
1046 }
1047
1048 #[test]
1049 fn scan_pages_are_findable_via_strategy_get() {
1050 let cache: PageCache<u64, String> = PageCache::with_capacity(64);
1054 cache.insert_with(
1055 42,
1056 "scanned".to_string(),
1057 BufferAccessStrategy::SequentialScan,
1058 );
1059 assert_eq!(cache.get(&42), None);
1061 assert_eq!(
1063 cache.get_with(&42, BufferAccessStrategy::SequentialScan),
1064 Some("scanned".to_string())
1065 );
1066 }
1067
1068 #[test]
1069 fn bulk_read_and_bulk_write_are_independent_rings() {
1070 let cache: PageCache<u64, String> = PageCache::with_capacity(64);
1071 cache.insert_with(1, "r".to_string(), BufferAccessStrategy::BulkRead);
1072 cache.insert_with(2, "w".to_string(), BufferAccessStrategy::BulkWrite);
1073
1074 assert_eq!(
1076 cache.get_with(&1, BufferAccessStrategy::BulkRead),
1077 Some("r".to_string())
1078 );
1079 assert_eq!(
1080 cache.get_with(&2, BufferAccessStrategy::BulkWrite),
1081 Some("w".to_string())
1082 );
1083
1084 assert!(cache
1086 .get_with(&1, BufferAccessStrategy::BulkWrite)
1087 .is_none());
1088 assert!(cache.get_with(&2, BufferAccessStrategy::BulkRead).is_none());
1089 }
1090
1091 #[test]
1092 fn bulk_write_evicts_dirty_page_on_overflow() {
1093 let cache: PageCache<u64, String> = PageCache::with_capacity(64);
1097 let mut last_evicted = None;
1098 for i in 0..40u64 {
1099 let evicted = cache.insert_with(i, format!("v{i}"), BufferAccessStrategy::BulkWrite);
1100 if evicted.is_some() {
1101 last_evicted = evicted;
1102 }
1103 }
1104 assert!(last_evicted.is_some());
1106 for i in 0..8u64 {
1108 assert!(
1109 cache
1110 .get_with(&i, BufferAccessStrategy::BulkWrite)
1111 .is_none(),
1112 "key {i} should have been evicted from bulk_write ring"
1113 );
1114 }
1115 }
1116
1117 #[test]
1118 fn clear_strategy_rings_drops_all_ring_pages() {
1119 let cache: PageCache<u64, String> = PageCache::with_capacity(64);
1120 cache.insert(99, "main".to_string());
1122 cache.insert_with(1, "ring".to_string(), BufferAccessStrategy::SequentialScan);
1123 cache.clear_strategy_rings();
1125 assert_eq!(cache.get(&99), Some("main".to_string()));
1127 assert!(cache
1129 .get_with(&1, BufferAccessStrategy::SequentialScan)
1130 .is_none());
1131 }
1132
1133 #[test]
1134 fn ring_is_lazily_allocated() {
1135 let cache: PageCache<u64, String> = PageCache::with_capacity(64);
1136 assert!(cache
1138 .get_with(&1, BufferAccessStrategy::SequentialScan)
1139 .is_none());
1140 cache.insert_with(1, "a".to_string(), BufferAccessStrategy::SequentialScan);
1142 assert_eq!(
1143 cache.get_with(&1, BufferAccessStrategy::SequentialScan),
1144 Some("a".to_string())
1145 );
1146 }
1147
1148 use std::time::{Duration, Instant};
1153
1154 struct SlowWriter {
1157 delay: Duration,
1158 writing: Arc<AtomicBool>,
1160 }
1161
1162 impl PageWriter<u64, String> for SlowWriter {
1163 fn write_page(&self, _key: &u64, _value: &String) -> std::io::Result<()> {
1164 self.writing.store(true, Ordering::SeqCst);
1165 std::thread::sleep(self.delay);
1166 Ok(())
1167 }
1168 }
1169
1170 #[test]
1171 fn evict_one_releases_locks_before_writeback() {
1172 const DELAY_MS: u64 = 300;
1174 let writing = Arc::new(AtomicBool::new(false));
1175 let cache = Arc::new(PageCache::with_writer(
1176 CacheConfig::with_capacity(1),
1177 SlowWriter {
1178 delay: Duration::from_millis(DELAY_MS),
1179 writing: Arc::clone(&writing),
1180 },
1181 ));
1182
1183 cache.insert(0u64, "dirty".to_string());
1185 cache.mark_dirty(&0);
1186
1187 let cache2 = Arc::clone(&cache);
1188 let writing2 = Arc::clone(&writing);
1189
1190 let thread_a = std::thread::spawn(move || {
1192 cache2.insert(1u64, "new".to_string());
1193 });
1194
1195 while !writing2.load(Ordering::SeqCst) {
1197 std::thread::yield_now();
1198 }
1199
1200 let start = Instant::now();
1203 let _ = cache.get(&1u64);
1204 let elapsed = start.elapsed();
1205
1206 thread_a.join().unwrap();
1207
1208 assert!(
1211 elapsed < Duration::from_millis(DELAY_MS / 10),
1212 "get() blocked for {elapsed:?} — locks were probably still held during writeback"
1213 );
1214 }
1215
1216 #[test]
1217 fn recover_write_guard_handles_poisoned_lock() {
1218 let lock: RwLock<u32> = RwLock::new(42);
1219
1220 let _ = std::panic::catch_unwind(|| {
1222 let _guard = lock.write().unwrap();
1223 panic!("intentional poison");
1224 });
1225
1226 assert!(lock.write().is_err(), "lock must be poisoned after panic");
1227
1228 let guard = recover_write_guard(&lock, "test_lock");
1230 assert_eq!(*guard, 42);
1231 }
1232
1233 #[test]
1234 fn recover_read_guard_handles_poisoned_lock() {
1235 let lock: RwLock<u32> = RwLock::new(99);
1236
1237 let _ = std::panic::catch_unwind(|| {
1238 let _guard = lock.write().unwrap();
1239 panic!("intentional poison");
1240 });
1241
1242 assert!(lock.read().is_err(), "lock must be poisoned after panic");
1243
1244 let guard = recover_read_guard(&lock, "test_lock");
1245 assert_eq!(*guard, 99);
1246 }
1247}