Skip to main content

reddb_server/storage/cache/
sieve.rs

1//! SIEVE Page Cache
2//!
3//! Implementation of the SIEVE cache eviction algorithm for database pages.
4//!
5//! SIEVE (Simple, Efficient, and Versatile Eviction) is a modern cache
6//! eviction algorithm that is simpler than LRU but often performs better.
7//!
8//! Key properties:
9//! - O(1) insertion, lookup, and eviction
10//! - No metadata updates on cache hits (just set visited bit)
11//! - Uses circular buffer with single "hand" pointer
12//! - Sweeps to find eviction candidates
13//!
14//! Reference: "SIEVE is Simpler than LRU: An Efficient Turn-Key Eviction Algorithm for Web Caches"
15//! by Yazhuo Zhang et al. (2023)
16
17use std::collections::HashMap;
18use std::hash::Hash;
19use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
20use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
21
22fn recover_read_guard<'a, T>(lock: &'a RwLock<T>, name: &str) -> RwLockReadGuard<'a, T> {
23    match lock.read() {
24        Ok(guard) => guard,
25        Err(poisoned) => {
26            tracing::warn!(lock = name, "RwLock poisoned, recovering read guard");
27            poisoned.into_inner()
28        }
29    }
30}
31
32fn recover_write_guard<'a, T>(lock: &'a RwLock<T>, name: &str) -> RwLockWriteGuard<'a, T> {
33    match lock.write() {
34        Ok(guard) => guard,
35        Err(poisoned) => {
36            tracing::warn!(lock = name, "RwLock poisoned, recovering write guard");
37            poisoned.into_inner()
38        }
39    }
40}
41
42/// Page identifier type
43pub type PageId = u64;
44
45/// Default page size (4KB)
46pub const DEFAULT_PAGE_SIZE: usize = 4096;
47
48/// Cache entry containing page data and metadata
49#[derive(Debug)]
50struct CacheEntry<V> {
51    /// The cached value
52    value: V,
53    /// Visited flag (set on access)
54    visited: AtomicBool,
55    /// Entry index in the circular buffer
56    index: usize,
57    /// Dirty flag (modified since loaded)
58    dirty: AtomicBool,
59    /// Pin count (prevent eviction)
60    pin_count: AtomicUsize,
61}
62
63impl<V> CacheEntry<V> {
64    fn new(value: V, index: usize) -> Self {
65        Self {
66            value,
67            visited: AtomicBool::new(true), // New entries start as visited
68            index,
69            dirty: AtomicBool::new(false),
70            pin_count: AtomicUsize::new(0),
71        }
72    }
73
74    fn is_visited(&self) -> bool {
75        self.visited.load(Ordering::Relaxed)
76    }
77
78    fn set_visited(&self, visited: bool) {
79        self.visited.store(visited, Ordering::Relaxed);
80    }
81
82    fn is_dirty(&self) -> bool {
83        self.dirty.load(Ordering::Relaxed)
84    }
85
86    fn mark_dirty(&self) {
87        self.dirty.store(true, Ordering::Relaxed);
88    }
89
90    fn clear_dirty(&self) {
91        self.dirty.store(false, Ordering::Relaxed);
92    }
93
94    fn pin(&self) {
95        self.pin_count.fetch_add(1, Ordering::SeqCst);
96    }
97
98    fn unpin(&self) {
99        self.pin_count.fetch_sub(1, Ordering::SeqCst);
100    }
101
102    fn is_pinned(&self) -> bool {
103        self.pin_count.load(Ordering::SeqCst) > 0
104    }
105}
106
107/// Circular buffer slot
108#[derive(Debug, Clone)]
109enum Slot<K>
110where
111    K: Clone,
112{
113    /// Empty slot
114    Empty,
115    /// Occupied with key
116    Occupied(K),
117}
118
119/// Cache configuration
120#[derive(Debug, Clone)]
121pub struct CacheConfig {
122    /// Maximum number of entries
123    pub capacity: usize,
124    /// Page size in bytes
125    pub page_size: usize,
126    /// Enable statistics collection
127    pub collect_stats: bool,
128}
129
130impl Default for CacheConfig {
131    fn default() -> Self {
132        Self {
133            capacity: 1024,
134            page_size: DEFAULT_PAGE_SIZE,
135            collect_stats: true,
136        }
137    }
138}
139
140impl CacheConfig {
141    /// Create with specific capacity
142    pub fn with_capacity(capacity: usize) -> Self {
143        Self {
144            capacity,
145            ..Default::default()
146        }
147    }
148
149    /// Set page size
150    pub fn with_page_size(mut self, page_size: usize) -> Self {
151        self.page_size = page_size;
152        self
153    }
154
155    /// Calculate total memory usage
156    pub fn memory_size(&self) -> usize {
157        self.capacity * self.page_size
158    }
159}
160
161/// Cache statistics
162#[derive(Debug, Clone, Default)]
163pub struct CacheStats {
164    /// Total cache hits
165    pub hits: u64,
166    /// Total cache misses
167    pub misses: u64,
168    /// Total insertions
169    pub insertions: u64,
170    /// Total evictions
171    pub evictions: u64,
172    /// Current entries
173    pub entries: usize,
174    /// Dirty pages written back
175    pub writebacks: u64,
176    /// Hand sweeps performed
177    pub sweeps: u64,
178}
179
180impl CacheStats {
181    /// Calculate hit ratio
182    pub fn hit_ratio(&self) -> f64 {
183        let total = self.hits + self.misses;
184        if total == 0 {
185            0.0
186        } else {
187            self.hits as f64 / total as f64
188        }
189    }
190
191    /// Calculate miss ratio
192    pub fn miss_ratio(&self) -> f64 {
193        1.0 - self.hit_ratio()
194    }
195}
196
197/// Atomic cache statistics
198struct AtomicStats {
199    hits: AtomicU64,
200    misses: AtomicU64,
201    insertions: AtomicU64,
202    evictions: AtomicU64,
203    writebacks: AtomicU64,
204    sweeps: AtomicU64,
205}
206
207impl AtomicStats {
208    fn new() -> Self {
209        Self {
210            hits: AtomicU64::new(0),
211            misses: AtomicU64::new(0),
212            insertions: AtomicU64::new(0),
213            evictions: AtomicU64::new(0),
214            writebacks: AtomicU64::new(0),
215            sweeps: AtomicU64::new(0),
216        }
217    }
218
219    fn to_stats(&self, entries: usize) -> CacheStats {
220        CacheStats {
221            hits: self.hits.load(Ordering::Relaxed),
222            misses: self.misses.load(Ordering::Relaxed),
223            insertions: self.insertions.load(Ordering::Relaxed),
224            evictions: self.evictions.load(Ordering::Relaxed),
225            entries,
226            writebacks: self.writebacks.load(Ordering::Relaxed),
227            sweeps: self.sweeps.load(Ordering::Relaxed),
228        }
229    }
230}
231
232/// Page cache callback for writeback
233pub trait PageWriter<K, V>: Send + Sync {
234    /// Write a dirty page back to storage
235    fn write_page(&self, key: &K, value: &V) -> std::io::Result<()>;
236}
237
238/// No-op page writer (for read-only caches)
239pub struct NoOpWriter;
240
241impl<K, V> PageWriter<K, V> for NoOpWriter {
242    fn write_page(&self, _key: &K, _value: &V) -> std::io::Result<()> {
243        Ok(())
244    }
245}
246
247/// SIEVE Page Cache
248pub struct PageCache<K, V, W = NoOpWriter>
249where
250    K: Clone + Eq + Hash,
251    V: Clone,
252    W: PageWriter<K, V>,
253{
254    /// Configuration
255    config: CacheConfig,
256    /// Key to entry mapping
257    entries: RwLock<HashMap<K, Arc<CacheEntry<V>>>>,
258    /// Circular buffer of slots
259    slots: RwLock<Vec<Slot<K>>>,
260    /// Current hand position
261    hand: AtomicUsize,
262    /// Current entry count
263    count: AtomicUsize,
264    /// Statistics
265    stats: AtomicStats,
266    /// Page writer for dirty pages
267    writer: W,
268    /// Per-strategy buffer rings.
269    ///
270    /// Lazily allocated when a non-`Normal` strategy is first used. The
271    /// rings are completely isolated from the main pool — a page in a
272    /// ring does NOT appear in `entries`/`slots`, and vice versa. See
273    /// `src/storage/cache/README.md` § Invariant 4.
274    rings:
275        RwLock<HashMap<super::strategy::BufferAccessStrategy, Arc<super::ring::BufferRing<K, V>>>>,
276}
277
278impl<K, V> PageCache<K, V, NoOpWriter>
279where
280    K: Clone + Eq + Hash,
281    V: Clone,
282{
283    /// Create new cache with default writer
284    pub fn new(config: CacheConfig) -> Self {
285        Self::with_writer(config, NoOpWriter)
286    }
287
288    /// Create with specific capacity
289    pub fn with_capacity(capacity: usize) -> Self {
290        Self::new(CacheConfig::with_capacity(capacity))
291    }
292}
293
294impl<K, V, W> PageCache<K, V, W>
295where
296    K: Clone + Eq + Hash,
297    V: Clone,
298    W: PageWriter<K, V>,
299{
300    /// Create new cache with custom writer
301    pub fn with_writer(config: CacheConfig, writer: W) -> Self {
302        let capacity = config.capacity;
303        Self {
304            config,
305            entries: RwLock::new(HashMap::with_capacity(capacity)),
306            slots: RwLock::new(vec![Slot::Empty; capacity]),
307            hand: AtomicUsize::new(0),
308            count: AtomicUsize::new(0),
309            stats: AtomicStats::new(),
310            writer,
311            rings: RwLock::new(HashMap::new()),
312        }
313    }
314
315    /// Strategy-aware get.
316    ///
317    /// `Normal` behaves exactly like [`PageCache::get`]. Non-`Normal`
318    /// strategies look in the main pool first (a hit is a free win),
319    /// then fall through to the strategy's dedicated ring buffer.
320    /// Hits in the ring do NOT promote the page into the main pool —
321    /// that is the whole point of the strategy: keep scans out of the
322    /// hot working set.
323    pub fn get_with(&self, key: &K, strategy: super::strategy::BufferAccessStrategy) -> Option<V> {
324        // Always check the main pool first — a present page should be
325        // served from there at zero cost regardless of strategy.
326        if let Some(v) = self.get(key) {
327            return Some(v);
328        }
329        // Non-Normal strategies fall through to their ring.
330        if strategy.is_ring() {
331            if let Some(ring) = self.get_ring(strategy) {
332                return ring.get(key);
333            }
334        }
335        None
336    }
337
338    /// Strategy-aware insert.
339    ///
340    /// `Normal` behaves exactly like [`PageCache::insert`]. Non-`Normal`
341    /// strategies route the write into the dedicated ring instead of
342    /// the main pool. The ring's eviction return is propagated up so
343    /// callers (the pager) can flush dirty pages through the
344    /// double-write buffer.
345    pub fn insert_with(
346        &self,
347        key: K,
348        value: V,
349        strategy: super::strategy::BufferAccessStrategy,
350    ) -> Option<(K, V)> {
351        if !strategy.is_ring() {
352            // Normal path: existing insert, returning the prior value
353            // wrapped in (key, value) tuple shape for caller uniformity.
354            let prev = self.insert(key.clone(), value);
355            return prev.map(|v| (key, v));
356        }
357        let ring = self.ensure_ring(strategy);
358        ring.insert(key, value)
359    }
360
361    /// Look up the ring for `strategy`, creating it lazily if needed.
362    fn ensure_ring(
363        &self,
364        strategy: super::strategy::BufferAccessStrategy,
365    ) -> Arc<super::ring::BufferRing<K, V>> {
366        // Fast path: ring already exists.
367        {
368            let rings = recover_read_guard(&self.rings, "rings");
369            if let Some(r) = rings.get(&strategy) {
370                return Arc::clone(r);
371            }
372        }
373        // Slow path: create under write lock, double-check first.
374        let mut rings = recover_write_guard(&self.rings, "rings");
375        if let Some(r) = rings.get(&strategy) {
376            return Arc::clone(r);
377        }
378        let cap = strategy.ring_size().unwrap_or(16);
379        let ring = Arc::new(super::ring::BufferRing::new(cap));
380        rings.insert(strategy, Arc::clone(&ring));
381        ring
382    }
383
384    /// Read-only ring lookup (does not allocate).
385    fn get_ring(
386        &self,
387        strategy: super::strategy::BufferAccessStrategy,
388    ) -> Option<Arc<super::ring::BufferRing<K, V>>> {
389        let rings = recover_read_guard(&self.rings, "rings");
390        rings.get(&strategy).cloned()
391    }
392
393    /// Drop every strategy ring. Used by tests and by post-checkpoint
394    /// cleanup.
395    pub fn clear_strategy_rings(&self) {
396        let rings = recover_read_guard(&self.rings, "rings");
397        for ring in rings.values() {
398            ring.clear();
399        }
400    }
401
402    /// Get an entry from cache
403    pub fn get(&self, key: &K) -> Option<V> {
404        let entries = recover_read_guard(&self.entries, "entries");
405
406        if let Some(entry) = entries.get(key) {
407            // Set visited flag (no lock needed - atomic)
408            entry.set_visited(true);
409
410            if self.config.collect_stats {
411                self.stats.hits.fetch_add(1, Ordering::Relaxed);
412            }
413
414            Some(entry.value.clone())
415        } else {
416            if self.config.collect_stats {
417                self.stats.misses.fetch_add(1, Ordering::Relaxed);
418            }
419            None
420        }
421    }
422
423    /// Check if key exists in cache
424    pub fn contains(&self, key: &K) -> bool {
425        recover_read_guard(&self.entries, "entries").contains_key(key)
426    }
427
428    /// Insert an entry into cache
429    pub fn insert(&self, key: K, value: V) -> Option<V> {
430        // Check if update first (no locks held while checking)
431        {
432            let entries = recover_read_guard(&self.entries, "entries");
433            if let Some(entry) = entries.get(&key) {
434                entry.set_visited(true);
435                let old_value = entry.value.clone();
436                drop(entries);
437                return self.update_existing(key, value, old_value);
438            }
439        }
440
441        // Need to insert new entry - evict if needed.
442        //
443        // `count` is read with Acquire to pair with the Release stores
444        // in fetch_add/fetch_sub. We do not need SeqCst here — there's
445        // no cross-atomic ordering requirement, and the subsequent
446        // write lock on `entries` provides the actual synchronisation.
447        let index = if self.count.load(Ordering::Acquire) >= self.config.capacity {
448            self.evict_one()
449        } else {
450            None
451        };
452
453        // Now acquire locks in consistent order: entries first, then slots
454        let mut entries = recover_write_guard(&self.entries, "entries");
455        let mut slots = recover_write_guard(&self.slots, "slots");
456
457        // Double-check the key wasn't inserted while we waited
458        if entries.contains_key(&key) {
459            if let Some(entry) = entries.get(&key) {
460                entry.set_visited(true);
461            }
462            return None;
463        }
464
465        // Find slot index
466        let slot_index = if let Some(idx) = index {
467            idx
468        } else {
469            // Find empty slot
470            slots.iter().position(|s| matches!(s, Slot::Empty))?
471        };
472
473        // Insert into slot and entry map
474        let entry = Arc::new(CacheEntry::new(value, slot_index));
475        slots[slot_index] = Slot::Occupied(key.clone());
476        entries.insert(key, entry);
477
478        // Release-store: pairs with the Acquire-load above. The
479        // entries write lock has already published the slot, so
480        // counters need only single-variable Release semantics.
481        self.count.fetch_add(1, Ordering::Release);
482
483        if self.config.collect_stats {
484            self.stats.insertions.fetch_add(1, Ordering::Relaxed);
485        }
486
487        None
488    }
489
490    /// Update existing entry (internal)
491    fn update_existing(&self, key: K, new_value: V, old_value: V) -> Option<V> {
492        let mut entries = recover_write_guard(&self.entries, "entries");
493
494        if let Some(old_entry) = entries.get(&key) {
495            let index = old_entry.index;
496            let new_entry = Arc::new(CacheEntry::new(new_value, index));
497            entries.insert(key, new_entry);
498            Some(old_value)
499        } else {
500            None
501        }
502    }
503
504    /// Remove an entry from cache
505    pub fn remove(&self, key: &K) -> Option<V> {
506        let mut entries = recover_write_guard(&self.entries, "entries");
507
508        if let Some(entry) = entries.remove(key) {
509            let mut slots = recover_write_guard(&self.slots, "slots");
510            slots[entry.index] = Slot::Empty;
511            self.count.fetch_sub(1, Ordering::Release);
512
513            // Writeback if dirty
514            if entry.is_dirty() {
515                let _ = self.writer.write_page(key, &entry.value);
516                if self.config.collect_stats {
517                    self.stats.writebacks.fetch_add(1, Ordering::Relaxed);
518                }
519            }
520
521            Some(entry.value.clone())
522        } else {
523            None
524        }
525    }
526
527    /// Evict one entry using SIEVE algorithm
528    ///
529    /// **Atomic ordering note:** the `hand` pointer is read and
530    /// written with `Relaxed` because it does not coordinate
531    /// visibility of any page content — the hand is just a sweep
532    /// position, and concurrent writers always re-acquire the
533    /// `entries`/`slots` write locks before touching anything the
534    /// hand selects. The `pin_count` check inside `is_pinned()`
535    /// stays SeqCst (in `CacheEntry`) and that single SeqCst load
536    /// is what coordinates pin/unpin visibility across threads.
537    fn evict_one(&self) -> Option<usize> {
538        let capacity = self.config.capacity;
539        let max_sweeps = capacity * 2;
540
541        for _ in 0..max_sweeps {
542            let current_hand = self.hand.load(Ordering::Relaxed);
543
544            // Extract eviction candidate under locks, then release before I/O.
545            // Invariant: the entry is removed from both `entries` and `slots`
546            // atomically under the write locks; writeback runs after locks drop.
547            let eviction: Option<(K, Arc<CacheEntry<V>>)> = {
548                let mut entries = recover_write_guard(&self.entries, "entries");
549                let mut slots = recover_write_guard(&self.slots, "slots");
550
551                if let Slot::Occupied(ref key) = slots[current_hand] {
552                    if let Some(entry) = entries.get(key) {
553                        if entry.is_pinned() {
554                            None
555                        } else if entry.is_visited() {
556                            entry.set_visited(false);
557                            None
558                        } else {
559                            let key_clone = key.clone();
560                            match entries.remove(&key_clone) {
561                                None => {
562                                    // Defensive: key was present in slot but not entries map.
563                                    let next = (current_hand + 1) % capacity;
564                                    self.hand.store(next, Ordering::Relaxed);
565                                    continue;
566                                }
567                                Some(entry) => {
568                                    slots[current_hand] = Slot::Empty;
569                                    self.count.fetch_sub(1, Ordering::Release);
570                                    let next = (current_hand + 1) % capacity;
571                                    self.hand.store(next, Ordering::Relaxed);
572                                    Some((key_clone, entry))
573                                }
574                            }
575                        }
576                    } else {
577                        None
578                    }
579                } else {
580                    None
581                }
582                // locks dropped here — write_page runs without contention
583            };
584
585            if let Some((key_clone, entry)) = eviction {
586                if entry.is_dirty() {
587                    let _ = self.writer.write_page(&key_clone, &entry.value);
588                    if self.config.collect_stats {
589                        self.stats.writebacks.fetch_add(1, Ordering::Relaxed);
590                    }
591                }
592                if self.config.collect_stats {
593                    self.stats.evictions.fetch_add(1, Ordering::Relaxed);
594                    self.stats.sweeps.fetch_add(1, Ordering::Relaxed);
595                }
596                return Some(current_hand);
597            }
598
599            // Advance hand and try next slot
600            let next = (current_hand + 1) % capacity;
601            self.hand.store(next, Ordering::Relaxed);
602        }
603
604        if self.config.collect_stats {
605            self.stats.sweeps.fetch_add(1, Ordering::Relaxed);
606        }
607
608        None
609    }
610
611    /// Pin a page (prevent eviction)
612    pub fn pin(&self, key: &K) -> bool {
613        let entries = recover_read_guard(&self.entries, "entries");
614        if let Some(entry) = entries.get(key) {
615            entry.pin();
616            true
617        } else {
618            false
619        }
620    }
621
622    /// Unpin a page
623    pub fn unpin(&self, key: &K) -> bool {
624        let entries = recover_read_guard(&self.entries, "entries");
625        if let Some(entry) = entries.get(key) {
626            entry.unpin();
627            true
628        } else {
629            false
630        }
631    }
632
633    /// Mark a page as dirty
634    pub fn mark_dirty(&self, key: &K) -> bool {
635        let entries = recover_read_guard(&self.entries, "entries");
636        if let Some(entry) = entries.get(key) {
637            entry.mark_dirty();
638            true
639        } else {
640            false
641        }
642    }
643
644    /// Flush all dirty pages
645    pub fn flush(&self) -> std::io::Result<usize> {
646        let entries = recover_read_guard(&self.entries, "entries");
647        let mut flushed = 0;
648
649        for (key, entry) in entries.iter() {
650            if entry.is_dirty() {
651                self.writer.write_page(key, &entry.value)?;
652                entry.clear_dirty();
653                flushed += 1;
654            }
655        }
656
657        if self.config.collect_stats {
658            self.stats
659                .writebacks
660                .fetch_add(flushed as u64, Ordering::Relaxed);
661        }
662
663        Ok(flushed)
664    }
665
666    /// Clear all entries
667    pub fn clear(&self) {
668        // Flush dirty pages first
669        let _ = self.flush();
670
671        let mut entries = recover_write_guard(&self.entries, "entries");
672        let mut slots = recover_write_guard(&self.slots, "slots");
673
674        entries.clear();
675        for slot in slots.iter_mut() {
676            *slot = Slot::Empty;
677        }
678
679        // clear() is exclusive (write locks above); Relaxed is safe.
680        self.count.store(0, Ordering::Relaxed);
681        self.hand.store(0, Ordering::Relaxed);
682    }
683
684    /// Get current entry count
685    pub fn len(&self) -> usize {
686        // Acquire pairs with the Release stores in insert/remove/evict.
687        self.count.load(Ordering::Acquire)
688    }
689
690    /// Check if empty
691    pub fn is_empty(&self) -> bool {
692        self.len() == 0
693    }
694
695    /// Get capacity
696    pub fn capacity(&self) -> usize {
697        self.config.capacity
698    }
699
700    /// Get statistics
701    pub fn stats(&self) -> CacheStats {
702        self.stats.to_stats(self.len())
703    }
704
705    /// Get configuration
706    pub fn config(&self) -> &CacheConfig {
707        &self.config
708    }
709
710    /// Get all cached keys
711    pub fn keys(&self) -> Vec<K> {
712        recover_read_guard(&self.entries, "entries")
713            .keys()
714            .cloned()
715            .collect()
716    }
717
718    /// Get dirty page count
719    pub fn dirty_count(&self) -> usize {
720        recover_read_guard(&self.entries, "entries")
721            .values()
722            .filter(|e| e.is_dirty())
723            .count()
724    }
725}
726
727/// Page buffer (fixed-size byte array)
728#[derive(Clone)]
729pub struct Page {
730    /// Page data
731    data: Vec<u8>,
732    /// Page size
733    size: usize,
734}
735
736impl Page {
737    /// Create new page with default size
738    pub fn new() -> Self {
739        Self::with_size(DEFAULT_PAGE_SIZE)
740    }
741
742    /// Create page with specific size
743    pub fn with_size(size: usize) -> Self {
744        Self {
745            data: vec![0u8; size],
746            size,
747        }
748    }
749
750    /// Create page from data
751    pub fn from_data(data: Vec<u8>) -> Self {
752        let size = data.len();
753        Self { data, size }
754    }
755
756    /// Get page data
757    pub fn data(&self) -> &[u8] {
758        &self.data
759    }
760
761    /// Get mutable page data
762    pub fn data_mut(&mut self) -> &mut [u8] {
763        &mut self.data
764    }
765
766    /// Get page size
767    pub fn size(&self) -> usize {
768        self.size
769    }
770
771    /// Read bytes at offset
772    pub fn read(&self, offset: usize, len: usize) -> Option<&[u8]> {
773        if offset + len <= self.size {
774            Some(&self.data[offset..offset + len])
775        } else {
776            None
777        }
778    }
779
780    /// Write bytes at offset
781    pub fn write(&mut self, offset: usize, data: &[u8]) -> bool {
782        if offset + data.len() <= self.size {
783            self.data[offset..offset + data.len()].copy_from_slice(data);
784            true
785        } else {
786            false
787        }
788    }
789
790    /// Read u32 at offset
791    pub fn read_u32(&self, offset: usize) -> Option<u32> {
792        self.read(offset, 4).map(|bytes| {
793            let mut array = [0u8; 4];
794            array.copy_from_slice(bytes);
795            u32::from_le_bytes(array)
796        })
797    }
798
799    /// Write u32 at offset
800    pub fn write_u32(&mut self, offset: usize, value: u32) {
801        self.write(offset, &value.to_le_bytes());
802    }
803
804    /// Read u64 at offset
805    pub fn read_u64(&self, offset: usize) -> Option<u64> {
806        self.read(offset, 8).map(|bytes| {
807            let mut array = [0u8; 8];
808            array.copy_from_slice(bytes);
809            u64::from_le_bytes(array)
810        })
811    }
812
813    /// Write u64 at offset
814    pub fn write_u64(&mut self, offset: usize, value: u64) {
815        self.write(offset, &value.to_le_bytes());
816    }
817}
818
819impl Default for Page {
820    fn default() -> Self {
821        Self::new()
822    }
823}
824
825impl std::fmt::Debug for Page {
826    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
827        f.debug_struct("Page")
828            .field("size", &self.size)
829            .field("data", &format!("[{} bytes]", self.data.len()))
830            .finish()
831    }
832}
833
834#[cfg(test)]
835mod tests {
836    use super::*;
837
838    #[test]
839    fn test_basic_operations() {
840        let cache: PageCache<u64, String> = PageCache::with_capacity(10);
841
842        // Insert
843        cache.insert(1, "one".to_string());
844        cache.insert(2, "two".to_string());
845
846        // Get
847        assert_eq!(cache.get(&1), Some("one".to_string()));
848        assert_eq!(cache.get(&2), Some("two".to_string()));
849        assert_eq!(cache.get(&3), None);
850
851        // Contains
852        assert!(cache.contains(&1));
853        assert!(!cache.contains(&3));
854
855        // Remove
856        assert_eq!(cache.remove(&1), Some("one".to_string()));
857        assert_eq!(cache.get(&1), None);
858    }
859
860    #[test]
861    fn test_eviction() {
862        let cache: PageCache<u64, String> = PageCache::with_capacity(3);
863
864        // Fill cache
865        cache.insert(1, "one".to_string());
866        cache.insert(2, "two".to_string());
867        cache.insert(3, "three".to_string());
868
869        assert_eq!(cache.len(), 3);
870
871        // Access some entries to set visited
872        cache.get(&1);
873        cache.get(&3);
874
875        // Insert new entry - should evict entry 2 (unvisited)
876        cache.insert(4, "four".to_string());
877
878        assert_eq!(cache.len(), 3);
879        assert!(cache.contains(&4));
880
881        // Entry 2 should be evicted (wasn't visited)
882        // Note: actual eviction depends on hand position
883    }
884
885    #[test]
886    fn test_stats() {
887        let cache: PageCache<u64, String> = PageCache::with_capacity(10);
888
889        cache.insert(1, "one".to_string());
890        cache.get(&1); // Hit
891        cache.get(&2); // Miss
892
893        let stats = cache.stats();
894        assert_eq!(stats.insertions, 1);
895        assert_eq!(stats.hits, 1);
896        assert_eq!(stats.misses, 1);
897        assert_eq!(stats.hit_ratio(), 0.5);
898    }
899
900    #[test]
901    fn test_pin_unpin() {
902        let cache: PageCache<u64, String> = PageCache::with_capacity(2);
903
904        cache.insert(1, "one".to_string());
905        cache.insert(2, "two".to_string());
906
907        // Pin entry 1
908        assert!(cache.pin(&1));
909
910        // Try to evict by inserting more
911        cache.insert(3, "three".to_string());
912
913        // Pinned entry should still be there
914        assert!(cache.contains(&1));
915
916        // Unpin
917        cache.unpin(&1);
918    }
919
920    #[test]
921    fn test_page() {
922        let mut page = Page::with_size(64);
923
924        // Write and read
925        page.write(0, b"hello");
926        assert_eq!(page.read(0, 5), Some(b"hello".as_slice()));
927
928        // Write u32
929        page.write_u32(8, 0x12345678);
930        assert_eq!(page.read_u32(8), Some(0x12345678));
931
932        // Write u64
933        page.write_u64(16, 0xDEADBEEF);
934        assert_eq!(page.read_u64(16), Some(0xDEADBEEF));
935
936        // Bounds check
937        assert_eq!(page.read(60, 10), None);
938    }
939
940    #[test]
941    fn test_clear() {
942        let cache: PageCache<u64, String> = PageCache::with_capacity(10);
943
944        cache.insert(1, "one".to_string());
945        cache.insert(2, "two".to_string());
946
947        cache.clear();
948
949        assert!(cache.is_empty());
950        assert_eq!(cache.len(), 0);
951    }
952
953    #[test]
954    fn test_keys() {
955        let cache: PageCache<u64, String> = PageCache::with_capacity(10);
956
957        cache.insert(1, "one".to_string());
958        cache.insert(2, "two".to_string());
959        cache.insert(3, "three".to_string());
960
961        let keys = cache.keys();
962        assert_eq!(keys.len(), 3);
963        assert!(keys.contains(&1));
964        assert!(keys.contains(&2));
965        assert!(keys.contains(&3));
966    }
967
968    #[test]
969    fn test_update() {
970        let cache: PageCache<u64, String> = PageCache::with_capacity(10);
971
972        cache.insert(1, "one".to_string());
973        assert_eq!(cache.get(&1), Some("one".to_string()));
974
975        // Update
976        let old = cache.insert(1, "ONE".to_string());
977        assert_eq!(old, Some("one".to_string()));
978        assert_eq!(cache.get(&1), Some("ONE".to_string()));
979    }
980
981    #[test]
982    fn test_dirty_pages() {
983        let cache: PageCache<u64, String> = PageCache::with_capacity(10);
984
985        cache.insert(1, "one".to_string());
986        cache.insert(2, "two".to_string());
987
988        assert_eq!(cache.dirty_count(), 0);
989
990        cache.mark_dirty(&1);
991        assert_eq!(cache.dirty_count(), 1);
992
993        cache.mark_dirty(&2);
994        assert_eq!(cache.dirty_count(), 2);
995    }
996
997    #[test]
998    fn test_config() {
999        let config = CacheConfig::with_capacity(1024).with_page_size(8192);
1000
1001        assert_eq!(config.capacity, 1024);
1002        assert_eq!(config.page_size, 8192);
1003        assert_eq!(config.memory_size(), 1024 * 8192);
1004    }
1005
1006    // ---------------------------------------------------------------
1007    // Target 4: BufferAccessStrategy / ring tests
1008    // ---------------------------------------------------------------
1009
1010    use super::super::strategy::BufferAccessStrategy;
1011
1012    #[test]
1013    fn normal_strategy_is_backwards_compatible() {
1014        // get_with(Normal) and insert_with(Normal) must behave exactly
1015        // like get/insert — same hot pool, same eviction.
1016        let cache: PageCache<u64, String> = PageCache::with_capacity(8);
1017        let prev = cache.insert_with(1, "a".to_string(), BufferAccessStrategy::Normal);
1018        assert!(prev.is_none());
1019        assert_eq!(
1020            cache.get_with(&1, BufferAccessStrategy::Normal),
1021            Some("a".to_string())
1022        );
1023        // Plain get/insert see the same value.
1024        assert_eq!(cache.get(&1), Some("a".to_string()));
1025    }
1026
1027    #[test]
1028    fn sequential_scan_does_not_pollute_main_pool() {
1029        // Warm the main pool, then do a scan via SequentialScan.
1030        // The hot keys must still be in the main pool afterwards.
1031        let cache: PageCache<u64, String> = PageCache::with_capacity(64);
1032        for i in 0..50 {
1033            cache.insert(i, format!("hot-{i}"));
1034        }
1035        // Now scan 200 cold pages via SequentialScan strategy.
1036        for k in 1000..1200u64 {
1037            let _ = cache.insert_with(k, format!("cold-{k}"), BufferAccessStrategy::SequentialScan);
1038        }
1039        // Hot keys must still be present in the main pool.
1040        for i in 0..50u64 {
1041            assert!(
1042                cache.contains(&i),
1043                "hot key {i} was evicted by sequential scan"
1044            );
1045        }
1046    }
1047
1048    #[test]
1049    fn scan_pages_are_findable_via_strategy_get() {
1050        // Pages inserted via SequentialScan are reachable through
1051        // get_with(SequentialScan) but NOT through plain get (they live
1052        // in the ring, not the main pool).
1053        let cache: PageCache<u64, String> = PageCache::with_capacity(64);
1054        cache.insert_with(
1055            42,
1056            "scanned".to_string(),
1057            BufferAccessStrategy::SequentialScan,
1058        );
1059        // Plain get hits main pool only — must miss.
1060        assert_eq!(cache.get(&42), None);
1061        // get_with sees both pools.
1062        assert_eq!(
1063            cache.get_with(&42, BufferAccessStrategy::SequentialScan),
1064            Some("scanned".to_string())
1065        );
1066    }
1067
1068    #[test]
1069    fn bulk_read_and_bulk_write_are_independent_rings() {
1070        let cache: PageCache<u64, String> = PageCache::with_capacity(64);
1071        cache.insert_with(1, "r".to_string(), BufferAccessStrategy::BulkRead);
1072        cache.insert_with(2, "w".to_string(), BufferAccessStrategy::BulkWrite);
1073
1074        // Each strategy sees its own page only.
1075        assert_eq!(
1076            cache.get_with(&1, BufferAccessStrategy::BulkRead),
1077            Some("r".to_string())
1078        );
1079        assert_eq!(
1080            cache.get_with(&2, BufferAccessStrategy::BulkWrite),
1081            Some("w".to_string())
1082        );
1083
1084        // Cross-strategy lookups miss because rings are isolated.
1085        assert!(cache
1086            .get_with(&1, BufferAccessStrategy::BulkWrite)
1087            .is_none());
1088        assert!(cache.get_with(&2, BufferAccessStrategy::BulkRead).is_none());
1089    }
1090
1091    #[test]
1092    fn bulk_write_evicts_dirty_page_on_overflow() {
1093        // Fill a BulkWrite ring (capacity 32) past its limit and verify
1094        // insert_with returns the evicted (key, value) pair so the
1095        // pager can flush it.
1096        let cache: PageCache<u64, String> = PageCache::with_capacity(64);
1097        let mut last_evicted = None;
1098        for i in 0..40u64 {
1099            let evicted = cache.insert_with(i, format!("v{i}"), BufferAccessStrategy::BulkWrite);
1100            if evicted.is_some() {
1101                last_evicted = evicted;
1102            }
1103        }
1104        // Some eviction must have happened (40 inserts into a 32-slot ring).
1105        assert!(last_evicted.is_some());
1106        // The first 8 keys should be evicted.
1107        for i in 0..8u64 {
1108            assert!(
1109                cache
1110                    .get_with(&i, BufferAccessStrategy::BulkWrite)
1111                    .is_none(),
1112                "key {i} should have been evicted from bulk_write ring"
1113            );
1114        }
1115    }
1116
1117    #[test]
1118    fn clear_strategy_rings_drops_all_ring_pages() {
1119        let cache: PageCache<u64, String> = PageCache::with_capacity(64);
1120        // Seed both pools.
1121        cache.insert(99, "main".to_string());
1122        cache.insert_with(1, "ring".to_string(), BufferAccessStrategy::SequentialScan);
1123        // Clear only the rings.
1124        cache.clear_strategy_rings();
1125        // Main pool survives.
1126        assert_eq!(cache.get(&99), Some("main".to_string()));
1127        // Ring is empty.
1128        assert!(cache
1129            .get_with(&1, BufferAccessStrategy::SequentialScan)
1130            .is_none());
1131    }
1132
1133    #[test]
1134    fn ring_is_lazily_allocated() {
1135        let cache: PageCache<u64, String> = PageCache::with_capacity(64);
1136        // Initially no rings exist.
1137        assert!(cache
1138            .get_with(&1, BufferAccessStrategy::SequentialScan)
1139            .is_none());
1140        // Inserting via a strategy creates the ring.
1141        cache.insert_with(1, "a".to_string(), BufferAccessStrategy::SequentialScan);
1142        assert_eq!(
1143            cache.get_with(&1, BufferAccessStrategy::SequentialScan),
1144            Some("a".to_string())
1145        );
1146    }
1147
1148    // ---------------------------------------------------------------
1149    // evict_one lock-release + poison recovery tests (issue #221)
1150    // ---------------------------------------------------------------
1151
1152    use std::time::{Duration, Instant};
1153
1154    /// Slow writer that blocks for `delay_ms` milliseconds on each writeback.
1155    /// Used to verify that evict_one releases locks before calling write_page.
1156    struct SlowWriter {
1157        delay: Duration,
1158        /// Set to true the moment write_page is entered (before sleep).
1159        writing: Arc<AtomicBool>,
1160    }
1161
1162    impl PageWriter<u64, String> for SlowWriter {
1163        fn write_page(&self, _key: &u64, _value: &String) -> std::io::Result<()> {
1164            self.writing.store(true, Ordering::SeqCst);
1165            std::thread::sleep(self.delay);
1166            Ok(())
1167        }
1168    }
1169
1170    #[test]
1171    fn evict_one_releases_locks_before_writeback() {
1172        // Cache with capacity 1 so any new insert triggers eviction.
1173        const DELAY_MS: u64 = 300;
1174        let writing = Arc::new(AtomicBool::new(false));
1175        let cache = Arc::new(PageCache::with_writer(
1176            CacheConfig::with_capacity(1),
1177            SlowWriter {
1178                delay: Duration::from_millis(DELAY_MS),
1179                writing: Arc::clone(&writing),
1180            },
1181        ));
1182
1183        // Fill the single slot with a dirty entry.
1184        cache.insert(0u64, "dirty".to_string());
1185        cache.mark_dirty(&0);
1186
1187        let cache2 = Arc::clone(&cache);
1188        let writing2 = Arc::clone(&writing);
1189
1190        // Thread A: triggers eviction of key 0 → write_page blocks DELAY_MS ms.
1191        let thread_a = std::thread::spawn(move || {
1192            cache2.insert(1u64, "new".to_string());
1193        });
1194
1195        // Wait until write_page has been entered (locks must be released by then).
1196        while !writing2.load(Ordering::SeqCst) {
1197            std::thread::yield_now();
1198        }
1199
1200        // Thread B: should be able to read from the cache immediately because
1201        // locks are no longer held during writeback.
1202        let start = Instant::now();
1203        let _ = cache.get(&1u64);
1204        let elapsed = start.elapsed();
1205
1206        thread_a.join().unwrap();
1207
1208        // If locks were still held during write_page, elapsed would be ~DELAY_MS.
1209        // With the fix, it should be well under 10% of DELAY_MS.
1210        assert!(
1211            elapsed < Duration::from_millis(DELAY_MS / 10),
1212            "get() blocked for {elapsed:?} — locks were probably still held during writeback"
1213        );
1214    }
1215
1216    #[test]
1217    fn recover_write_guard_handles_poisoned_lock() {
1218        let lock: RwLock<u32> = RwLock::new(42);
1219
1220        // Poison the lock by panicking while holding the write guard.
1221        let _ = std::panic::catch_unwind(|| {
1222            let _guard = lock.write().unwrap();
1223            panic!("intentional poison");
1224        });
1225
1226        assert!(lock.write().is_err(), "lock must be poisoned after panic");
1227
1228        // recover_write_guard must return the inner value without panicking.
1229        let guard = recover_write_guard(&lock, "test_lock");
1230        assert_eq!(*guard, 42);
1231    }
1232
1233    #[test]
1234    fn recover_read_guard_handles_poisoned_lock() {
1235        let lock: RwLock<u32> = RwLock::new(99);
1236
1237        let _ = std::panic::catch_unwind(|| {
1238            let _guard = lock.write().unwrap();
1239            panic!("intentional poison");
1240        });
1241
1242        assert!(lock.read().is_err(), "lock must be poisoned after panic");
1243
1244        let guard = recover_read_guard(&lock, "test_lock");
1245        assert_eq!(*guard, 99);
1246    }
1247}