Skip to main content

reddb_server/storage/engine/
page_cache.rs

1//! SIEVE Page Cache Implementation
2//!
3//! A simple, efficient page cache using the SIEVE eviction algorithm.
4//! SIEVE outperforms LRU in many workloads while being simpler to implement.
5//!
6//! # SIEVE Algorithm (NSDI '24)
7//!
8//! SIEVE uses a single "visited" bit instead of LRU's complex list management:
9//! 1. On cache hit: set visited = true
10//! 2. On cache miss (insertion): add to FIFO queue
11//! 3. On eviction: sweep from "hand" position
12//!    - If visited=true: clear bit, skip
13//!    - If visited=false: evict this entry
14//!
15//! # References
16//!
17//! - Turso `core/storage/page_cache.rs:24-80` - PageCacheShardEntry with ref_bit
18//! - Turso `core/storage/page_cache.rs:129-150` - advance_clock_hand()
19//! - "SIEVE is Simpler than LRU" (NSDI '24)
20
21use std::collections::{HashMap, VecDeque};
22use std::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
23
24use super::page::Page;
25
26fn cache_read<'a, T>(lock: &'a RwLock<T>) -> RwLockReadGuard<'a, T> {
27    lock.read().unwrap_or_else(|poisoned| poisoned.into_inner())
28}
29
30fn cache_write<'a, T>(lock: &'a RwLock<T>) -> RwLockWriteGuard<'a, T> {
31    lock.write()
32        .unwrap_or_else(|poisoned| poisoned.into_inner())
33}
34
35fn cache_lock<'a, T>(lock: &'a Mutex<T>) -> MutexGuard<'a, T> {
36    lock.lock().unwrap_or_else(|poisoned| poisoned.into_inner())
37}
38
39/// Default cache capacity (number of pages)
40/// Turso uses 100,000 pages (~400MB for 4KB pages)
41pub const DEFAULT_CACHE_CAPACITY: usize = 100_000;
42
43/// Minimum cache capacity
44pub const MIN_CACHE_CAPACITY: usize = 2;
45
46/// Cache entry with SIEVE visited bit
47struct CacheEntry {
48    /// The cached page
49    page: Page,
50    /// Visited bit for SIEVE algorithm (true = recently accessed)
51    visited: bool,
52    /// Pin count (page cannot be evicted while pinned)
53    pin_count: usize,
54    /// Whether the page is dirty (modified)
55    dirty: bool,
56}
57
58impl CacheEntry {
59    fn new(page: Page) -> Self {
60        Self {
61            page,
62            visited: false,
63            pin_count: 0,
64            dirty: false,
65        }
66    }
67}
68
69/// Cache statistics
70#[derive(Debug, Default, Clone)]
71pub struct CacheStats {
72    /// Number of cache hits
73    pub hits: u64,
74    /// Number of cache misses
75    pub misses: u64,
76    /// Number of evictions
77    pub evictions: u64,
78    /// Number of dirty page writebacks
79    pub writebacks: u64,
80}
81
82impl CacheStats {
83    /// Calculate hit rate (0.0 to 1.0)
84    pub fn hit_rate(&self) -> f64 {
85        let total = self.hits + self.misses;
86        if total == 0 {
87            0.0
88        } else {
89            self.hits as f64 / total as f64
90        }
91    }
92}
93
94/// SIEVE-based page cache
95///
96/// Thread-safe page cache using the SIEVE eviction algorithm.
97pub struct PageCacheShard {
98    /// Maximum number of pages to cache
99    capacity: usize,
100    /// Page ID -> Entry index mapping
101    index: RwLock<HashMap<u32, usize>>,
102    /// FIFO queue of page IDs for eviction order
103    fifo: Mutex<VecDeque<u32>>,
104    /// Cache entries (indexed by slot)
105    entries: RwLock<Vec<Option<CacheEntry>>>,
106    /// Free slots
107    free_slots: Mutex<Vec<usize>>,
108    /// SIEVE eviction hand position
109    hand: Mutex<usize>,
110    /// Cache statistics
111    stats: Mutex<CacheStats>,
112}
113
114impl PageCacheShard {
115    /// Create a new page cache with specified capacity
116    pub fn new(capacity: usize) -> Self {
117        let capacity = capacity.max(MIN_CACHE_CAPACITY);
118
119        Self {
120            capacity,
121            index: RwLock::new(HashMap::with_capacity(capacity)),
122            fifo: Mutex::new(VecDeque::with_capacity(capacity)),
123            entries: RwLock::new(Vec::with_capacity(capacity)),
124            free_slots: Mutex::new(Vec::new()),
125            hand: Mutex::new(0),
126            stats: Mutex::new(CacheStats::default()),
127        }
128    }
129
130    /// Create a page cache with default capacity
131    pub fn with_default_capacity() -> Self {
132        Self::new(DEFAULT_CACHE_CAPACITY)
133    }
134
135    /// Get the current number of cached pages
136    pub fn len(&self) -> usize {
137        cache_read(&self.index).len()
138    }
139
140    /// Check if cache is empty
141    pub fn is_empty(&self) -> bool {
142        self.len() == 0
143    }
144
145    /// Get cache capacity
146    pub fn capacity(&self) -> usize {
147        self.capacity
148    }
149
150    /// Get cache statistics
151    pub fn stats(&self) -> CacheStats {
152        cache_lock(&self.stats).clone()
153    }
154
155    /// Reset statistics
156    pub fn reset_stats(&self) {
157        *cache_lock(&self.stats) = CacheStats::default();
158    }
159
160    /// Get a page from cache
161    ///
162    /// Returns None if page is not in cache (cache miss).
163    /// On hit, marks the page as visited (SIEVE).
164    pub fn get(&self, page_id: u32) -> Option<Page> {
165        // Check if page is in cache
166        let index = cache_read(&self.index);
167        let slot = match index.get(&page_id) {
168            Some(&s) => s,
169            None => {
170                drop(index);
171                // Cache miss
172                cache_lock(&self.stats).misses += 1;
173                return None;
174            }
175        };
176        drop(index);
177
178        // Get entry and mark as visited. Clone the page under a read
179        // lock; upgrade to a write lock *only* if the visited bit
180        // actually needs flipping. Hot-loop reads of an already-visited
181        // page (e.g. BTree inserts hammering the rightmost leaf) can
182        // then stay on the read lock — a second write lock per insert
183        // was one of the costliest steps on `insert_sequential`.
184        let entries = cache_read(&self.entries);
185        if let Some(entry) = entries.get(slot).and_then(|e| e.as_ref()) {
186            let page = entry.page.clone();
187            let needs_mark = !entry.visited;
188            drop(entries);
189
190            if needs_mark {
191                let mut entries = cache_write(&self.entries);
192                if let Some(Some(entry)) = entries.get_mut(slot) {
193                    entry.visited = true;
194                }
195            }
196
197            cache_lock(&self.stats).hits += 1;
198
199            Some(page)
200        } else {
201            cache_lock(&self.stats).misses += 1;
202            None
203        }
204    }
205
206    /// Insert a page into cache
207    ///
208    /// May trigger eviction if cache is full.
209    /// Returns the evicted page (if dirty) that needs to be written back.
210    pub fn insert(&self, page_id: u32, page: Page) -> Option<Page> {
211        // Check if already in cache
212        {
213            let index = cache_read(&self.index);
214            if let Some(&slot) = index.get(&page_id) {
215                drop(index);
216
217                // Update existing entry
218                let mut entries = cache_write(&self.entries);
219                if let Some(Some(entry)) = entries.get_mut(slot) {
220                    entry.page = page;
221                    entry.visited = true;
222                }
223                return None;
224            }
225        }
226
227        // Need to insert new entry
228        let mut evicted = None;
229
230        // Check if we need to evict before getting free_slots lock
231        let current_len = self.len();
232        if current_len >= self.capacity {
233            // Need to evict first (no locks held)
234            evicted = self.evict();
235        }
236
237        // Find or create a slot
238        let slot = {
239            let mut free_slots = cache_lock(&self.free_slots);
240            if let Some(slot) = free_slots.pop() {
241                slot
242            } else {
243                drop(free_slots);
244                // Add a new slot
245                let mut entries = cache_write(&self.entries);
246                let slot = entries.len();
247                entries.push(None);
248                slot
249            }
250        };
251
252        // Insert entry
253        {
254            let mut entries = cache_write(&self.entries);
255
256            // Ensure slot exists
257            while entries.len() <= slot {
258                entries.push(None);
259            }
260
261            entries[slot] = Some(CacheEntry::new(page));
262        }
263
264        // Update index
265        {
266            let mut index = cache_write(&self.index);
267            index.insert(page_id, slot);
268        }
269
270        // Add to FIFO
271        {
272            let mut fifo = cache_lock(&self.fifo);
273            fifo.push_back(page_id);
274        }
275
276        evicted
277    }
278
279    /// Evict a page using SIEVE algorithm
280    ///
281    /// Returns the evicted page if it was dirty.
282    fn evict(&self) -> Option<Page> {
283        let mut fifo = cache_lock(&self.fifo);
284        let mut hand = cache_lock(&self.hand);
285
286        if fifo.is_empty() {
287            return None;
288        }
289
290        let fifo_len = fifo.len();
291        let mut attempts = 0;
292
293        // Sweep from hand position
294        loop {
295            if attempts >= fifo_len * 2 {
296                // Couldn't find anything to evict (all pinned?)
297                return None;
298            }
299
300            // Wrap around
301            if *hand >= fifo_len {
302                *hand = 0;
303            }
304
305            let page_id = fifo[*hand];
306            attempts += 1;
307
308            // Get entry slot
309            let slot = {
310                let index = cache_read(&self.index);
311                match index.get(&page_id) {
312                    Some(&s) => s,
313                    None => {
314                        // Page was removed, skip
315                        *hand += 1;
316                        continue;
317                    }
318                }
319            };
320
321            // Check entry
322            let (should_evict, dirty) = {
323                let entries = cache_read(&self.entries);
324                match entries.get(slot).and_then(|e| e.as_ref()) {
325                    Some(entry) => {
326                        if entry.pin_count > 0 {
327                            // Pinned, can't evict
328                            (false, false)
329                        } else if entry.visited {
330                            // Clear visited bit, skip (SIEVE)
331                            (false, false)
332                        } else {
333                            // Can evict
334                            (true, entry.dirty)
335                        }
336                    }
337                    None => {
338                        *hand += 1;
339                        continue;
340                    }
341                }
342            };
343
344            if !should_evict {
345                // Clear visited bit
346                let mut entries = cache_write(&self.entries);
347                if let Some(Some(entry)) = entries.get_mut(slot) {
348                    entry.visited = false;
349                }
350                *hand += 1;
351                continue;
352            }
353
354            // Evict this entry
355            let evicted_page = {
356                let mut entries = cache_write(&self.entries);
357                let entry = entries[slot].take();
358                entry.map(|e| e.page)
359            };
360
361            // Remove from index
362            {
363                let mut index = cache_write(&self.index);
364                index.remove(&page_id);
365            }
366
367            // Remove from FIFO
368            fifo.remove(*hand);
369
370            // Add slot to free list
371            {
372                let mut free_slots = cache_lock(&self.free_slots);
373                free_slots.push(slot);
374            }
375
376            // Update stats
377            {
378                let mut stats = cache_lock(&self.stats);
379                stats.evictions += 1;
380                if dirty {
381                    stats.writebacks += 1;
382                }
383            }
384
385            // Return evicted page if dirty
386            if dirty {
387                return evicted_page;
388            } else {
389                return None;
390            }
391        }
392    }
393
394    /// Mark a page as dirty
395    pub fn mark_dirty(&self, page_id: u32) {
396        let index = cache_read(&self.index);
397        if let Some(&slot) = index.get(&page_id) {
398            drop(index);
399
400            let mut entries = cache_write(&self.entries);
401            if let Some(Some(entry)) = entries.get_mut(slot) {
402                entry.dirty = true;
403            }
404        }
405    }
406
407    /// Mark a page as clean
408    pub fn mark_clean(&self, page_id: u32) {
409        let index = cache_read(&self.index);
410        if let Some(&slot) = index.get(&page_id) {
411            drop(index);
412
413            let mut entries = cache_write(&self.entries);
414            if let Some(Some(entry)) = entries.get_mut(slot) {
415                entry.dirty = false;
416            }
417        }
418    }
419
420    /// Pin a page (prevent eviction)
421    pub fn pin(&self, page_id: u32) -> bool {
422        let index = cache_read(&self.index);
423        if let Some(&slot) = index.get(&page_id) {
424            drop(index);
425
426            let mut entries = cache_write(&self.entries);
427            if let Some(Some(entry)) = entries.get_mut(slot) {
428                entry.pin_count += 1;
429                return true;
430            }
431        }
432        false
433    }
434
435    /// Unpin a page
436    pub fn unpin(&self, page_id: u32) -> bool {
437        let index = cache_read(&self.index);
438        if let Some(&slot) = index.get(&page_id) {
439            drop(index);
440
441            let mut entries = cache_write(&self.entries);
442            if let Some(Some(entry)) = entries.get_mut(slot) {
443                if entry.pin_count > 0 {
444                    entry.pin_count -= 1;
445                    return true;
446                }
447            }
448        }
449        false
450    }
451
452    /// Remove a page from cache
453    pub fn remove(&self, page_id: u32) -> Option<Page> {
454        // Get and remove from index
455        let slot = {
456            let mut index = cache_write(&self.index);
457            index.remove(&page_id)?
458        };
459
460        // Remove entry
461        let entry = {
462            let mut entries = cache_write(&self.entries);
463            entries.get_mut(slot).and_then(|e| e.take())
464        };
465
466        // Remove from FIFO
467        {
468            let mut fifo = cache_lock(&self.fifo);
469            fifo.retain(|&id| id != page_id);
470        }
471
472        // Add slot to free list
473        {
474            let mut free_slots = cache_lock(&self.free_slots);
475            free_slots.push(slot);
476        }
477
478        entry.map(|e| e.page)
479    }
480
481    /// Flush all dirty pages
482    ///
483    /// Returns an iterator of (page_id, page) for dirty pages.
484    pub fn flush_dirty(&self) -> Vec<(u32, Page)> {
485        let mut dirty_pages = Vec::new();
486
487        let index = cache_read(&self.index);
488        let entries = cache_read(&self.entries);
489
490        for (&page_id, &slot) in index.iter() {
491            if let Some(Some(entry)) = entries.get(slot) {
492                if entry.dirty {
493                    dirty_pages.push((page_id, entry.page.clone()));
494                }
495            }
496        }
497
498        drop(entries);
499        drop(index);
500
501        // Mark all as clean
502        for (page_id, _) in &dirty_pages {
503            self.mark_clean(*page_id);
504        }
505
506        let count = dirty_pages.len();
507        cache_lock(&self.stats).writebacks += count as u64;
508
509        dirty_pages
510    }
511
512    /// Bounded counterpart of `flush_dirty` used by the background
513    /// writer. Snapshots up to `max` dirty pages, marks them clean,
514    /// and returns the (page_id, page) pairs for the caller to
515    /// persist via the pager. Clamps to the current dirty set —
516    /// returning fewer than `max` simply means we're caught up.
517    pub fn flush_some_dirty(&self, max: usize) -> Vec<(u32, Page)> {
518        if max == 0 {
519            return Vec::new();
520        }
521        let mut dirty_pages = Vec::with_capacity(max);
522
523        let index = cache_read(&self.index);
524        let entries = cache_read(&self.entries);
525
526        for (&page_id, &slot) in index.iter() {
527            if dirty_pages.len() >= max {
528                break;
529            }
530            if let Some(Some(entry)) = entries.get(slot) {
531                if entry.dirty {
532                    dirty_pages.push((page_id, entry.page.clone()));
533                }
534            }
535        }
536
537        drop(entries);
538        drop(index);
539
540        for (page_id, _) in &dirty_pages {
541            self.mark_clean(*page_id);
542        }
543
544        let count = dirty_pages.len();
545        cache_lock(&self.stats).writebacks += count as u64;
546
547        dirty_pages
548    }
549
550    /// Count dirty pages currently in the cache. Used by the
551    /// background writer to compute an adaptive flush budget.
552    pub fn dirty_count(&self) -> usize {
553        let index = cache_read(&self.index);
554        let entries = cache_read(&self.entries);
555        let mut count = 0;
556        for (_, &slot) in index.iter() {
557            if let Some(Some(entry)) = entries.get(slot) {
558                if entry.dirty {
559                    count += 1;
560                }
561            }
562        }
563        count
564    }
565
566    /// Clear all entries from cache
567    pub fn clear(&self) {
568        let mut index = cache_write(&self.index);
569        let mut entries = cache_write(&self.entries);
570        let mut fifo = cache_lock(&self.fifo);
571        let mut free_slots = cache_lock(&self.free_slots);
572
573        index.clear();
574        entries.clear();
575        fifo.clear();
576        free_slots.clear();
577        *cache_lock(&self.hand) = 0;
578    }
579
580    /// Check if a page is in cache
581    pub fn contains(&self, page_id: u32) -> bool {
582        cache_read(&self.index).contains_key(&page_id)
583    }
584
585    /// Get all cached page IDs
586    pub fn page_ids(&self) -> Vec<u32> {
587        cache_read(&self.index).keys().copied().collect()
588    }
589}
590
591impl Default for PageCacheShard {
592    fn default() -> Self {
593        Self::with_default_capacity()
594    }
595}
596
597/// Number of independent `PageCacheShard`s inside a `PageCache`. Must
598/// be a power of two so `page_id & (NUM_SHARDS - 1)` is a valid shard
599/// index. 8 was picked to cover typical bench concurrency (16 workers)
600/// without making each shard's own SIEVE queue too small; bumping to
601/// 16 is safe if profiling ever shows shard-level contention.
602const NUM_SHARDS: usize = 8;
603
604/// Sharded page cache.
605///
606/// Routes each `page_id` to one of `NUM_SHARDS` independent
607/// [`PageCacheShard`]s by the low bits of the id. Readers and writers
608/// for disjoint pages hit different shards and do not contend on the
609/// shard-internal RwLocks/Mutexes. Each shard runs its own SIEVE
610/// eviction loop over `capacity / NUM_SHARDS` slots; hot/cold
611/// asymmetry across shards is accepted in exchange for the
612/// contention win.
613pub struct PageCache {
614    shards: Box<[PageCacheShard]>,
615    capacity: usize,
616}
617
618impl PageCache {
619    pub fn new(capacity: usize) -> Self {
620        // Keep every shard above MIN_CACHE_CAPACITY so the inner
621        // SIEVE invariants stay valid even when callers pass tiny
622        // totals (tests frequently do).
623        let per_shard = capacity.div_ceil(NUM_SHARDS).max(MIN_CACHE_CAPACITY);
624        let total = per_shard * NUM_SHARDS;
625        let shards: Vec<PageCacheShard> = (0..NUM_SHARDS)
626            .map(|_| PageCacheShard::new(per_shard))
627            .collect();
628        Self {
629            shards: shards.into_boxed_slice(),
630            capacity: total,
631        }
632    }
633
634    pub fn with_default_capacity() -> Self {
635        Self::new(DEFAULT_CACHE_CAPACITY)
636    }
637
638    #[inline]
639    fn shard_for(&self, page_id: u32) -> &PageCacheShard {
640        &self.shards[(page_id as usize) & (NUM_SHARDS - 1)]
641    }
642
643    pub fn len(&self) -> usize {
644        self.shards.iter().map(|s| s.len()).sum()
645    }
646
647    pub fn is_empty(&self) -> bool {
648        self.shards.iter().all(|s| s.is_empty())
649    }
650
651    pub fn capacity(&self) -> usize {
652        self.capacity
653    }
654
655    pub fn stats(&self) -> CacheStats {
656        let mut agg = CacheStats::default();
657        for s in self.shards.iter() {
658            let cs = s.stats();
659            agg.hits += cs.hits;
660            agg.misses += cs.misses;
661            agg.evictions += cs.evictions;
662            agg.writebacks += cs.writebacks;
663        }
664        agg
665    }
666
667    pub fn reset_stats(&self) {
668        for s in self.shards.iter() {
669            s.reset_stats();
670        }
671    }
672
673    pub fn get(&self, page_id: u32) -> Option<Page> {
674        self.shard_for(page_id).get(page_id)
675    }
676
677    pub fn insert(&self, page_id: u32, page: Page) -> Option<Page> {
678        self.shard_for(page_id).insert(page_id, page)
679    }
680
681    pub fn mark_dirty(&self, page_id: u32) {
682        self.shard_for(page_id).mark_dirty(page_id)
683    }
684
685    pub fn mark_clean(&self, page_id: u32) {
686        self.shard_for(page_id).mark_clean(page_id)
687    }
688
689    pub fn pin(&self, page_id: u32) -> bool {
690        self.shard_for(page_id).pin(page_id)
691    }
692
693    pub fn unpin(&self, page_id: u32) -> bool {
694        self.shard_for(page_id).unpin(page_id)
695    }
696
697    pub fn remove(&self, page_id: u32) -> Option<Page> {
698        self.shard_for(page_id).remove(page_id)
699    }
700
701    pub fn contains(&self, page_id: u32) -> bool {
702        self.shard_for(page_id).contains(page_id)
703    }
704
705    pub fn flush_dirty(&self) -> Vec<(u32, Page)> {
706        let mut out = Vec::new();
707        for s in self.shards.iter() {
708            out.extend(s.flush_dirty());
709        }
710        out
711    }
712
713    pub fn flush_some_dirty(&self, max: usize) -> Vec<(u32, Page)> {
714        if max == 0 {
715            return Vec::new();
716        }
717        let mut out = Vec::with_capacity(max);
718        for s in self.shards.iter() {
719            if out.len() >= max {
720                break;
721            }
722            let budget = max - out.len();
723            out.extend(s.flush_some_dirty(budget));
724        }
725        out
726    }
727
728    pub fn dirty_count(&self) -> usize {
729        self.shards.iter().map(|s| s.dirty_count()).sum()
730    }
731
732    pub fn clear(&self) {
733        for s in self.shards.iter() {
734            s.clear();
735        }
736    }
737
738    pub fn page_ids(&self) -> Vec<u32> {
739        let mut out = Vec::with_capacity(self.len());
740        for s in self.shards.iter() {
741            out.extend(s.page_ids());
742        }
743        out
744    }
745}
746
747impl Default for PageCache {
748    fn default() -> Self {
749        Self::with_default_capacity()
750    }
751}
752
753#[cfg(test)]
754mod tests {
755    use super::*;
756    use crate::storage::engine::page::PageType;
757
758    fn make_page(id: u32) -> Page {
759        Page::new(PageType::BTreeLeaf, id)
760    }
761
762    #[test]
763    fn test_cache_basic() {
764        let cache = PageCacheShard::new(100);
765
766        assert!(cache.is_empty());
767        assert_eq!(cache.capacity(), 100);
768
769        // Insert
770        cache.insert(1, make_page(1));
771        assert_eq!(cache.len(), 1);
772        assert!(cache.contains(1));
773
774        // Get
775        let page = cache.get(1);
776        assert!(page.is_some());
777
778        // Miss
779        let page = cache.get(999);
780        assert!(page.is_none());
781    }
782
783    #[test]
784    fn test_cache_eviction() {
785        let cache = PageCacheShard::new(4);
786
787        // Fill cache
788        for i in 0..4 {
789            cache.insert(i, make_page(i));
790        }
791
792        assert_eq!(cache.len(), 4);
793
794        // Insert one more, should trigger eviction
795        cache.insert(100, make_page(100));
796
797        // One of the original pages should be evicted
798        assert_eq!(cache.len(), 4);
799        assert!(cache.contains(100));
800    }
801
802    #[test]
803    fn test_cache_sieve_visited() {
804        let cache = PageCacheShard::new(4);
805
806        // Fill cache
807        for i in 0..4 {
808            cache.insert(i, make_page(i));
809        }
810
811        // Access page 0 (marks as visited)
812        cache.get(0);
813
814        // Insert new page, should evict non-visited first
815        cache.insert(100, make_page(100));
816
817        // Page 0 should still be there (was visited)
818        assert!(cache.contains(0));
819        assert!(cache.contains(100));
820    }
821
822    #[test]
823    fn test_cache_dirty() {
824        let cache = PageCacheShard::new(100);
825
826        cache.insert(1, make_page(1));
827        cache.mark_dirty(1);
828
829        let dirty = cache.flush_dirty();
830        assert_eq!(dirty.len(), 1);
831        assert_eq!(dirty[0].0, 1);
832
833        // Should be clean now
834        let dirty = cache.flush_dirty();
835        assert_eq!(dirty.len(), 0);
836    }
837
838    #[test]
839    fn test_cache_pin() {
840        let cache = PageCacheShard::new(2);
841
842        cache.insert(1, make_page(1));
843        cache.insert(2, make_page(2));
844
845        // Pin page 1
846        assert!(cache.pin(1));
847
848        // Fill cache to trigger eviction
849        cache.insert(3, make_page(3));
850
851        // Page 1 should still be there (pinned)
852        assert!(cache.contains(1));
853
854        // Unpin
855        assert!(cache.unpin(1));
856    }
857
858    #[test]
859    fn test_cache_remove() {
860        let cache = PageCacheShard::new(100);
861
862        cache.insert(1, make_page(1));
863        assert!(cache.contains(1));
864
865        let removed = cache.remove(1);
866        assert!(removed.is_some());
867        assert!(!cache.contains(1));
868    }
869
870    #[test]
871    fn test_cache_stats() {
872        let cache = PageCacheShard::new(100);
873
874        cache.insert(1, make_page(1));
875
876        // Hit
877        cache.get(1);
878        cache.get(1);
879
880        // Miss
881        cache.get(999);
882
883        let stats = cache.stats();
884        assert_eq!(stats.hits, 2);
885        assert_eq!(stats.misses, 1);
886        assert!((stats.hit_rate() - 0.666).abs() < 0.01);
887    }
888
889    #[test]
890    fn test_cache_clear() {
891        let cache = PageCacheShard::new(100);
892
893        for i in 0..50 {
894            cache.insert(i, make_page(i));
895        }
896
897        assert_eq!(cache.len(), 50);
898
899        cache.clear();
900        assert!(cache.is_empty());
901    }
902
903    #[test]
904    fn test_cache_update_existing() {
905        let cache = PageCacheShard::new(100);
906
907        let mut page1 = make_page(1);
908        page1.as_bytes_mut()[100] = 0xAA;
909        cache.insert(1, page1);
910
911        let mut page1_updated = make_page(1);
912        page1_updated.as_bytes_mut()[100] = 0xBB;
913        cache.insert(1, page1_updated);
914
915        // Should still only have one entry
916        assert_eq!(cache.len(), 1);
917
918        // Should have updated value
919        let retrieved = cache.get(1).unwrap();
920        assert_eq!(retrieved.as_bytes()[100], 0xBB);
921    }
922
923    #[test]
924    fn test_cache_recovers_after_index_lock_poisoning() {
925        let cache = std::sync::Arc::new(PageCacheShard::new(8));
926        let poison_target = std::sync::Arc::clone(&cache);
927        let _ = std::thread::spawn(move || {
928            let _guard = poison_target
929                .index
930                .write()
931                .expect("index lock should be acquired");
932            panic!("poison index lock");
933        })
934        .join();
935
936        cache.insert(1, make_page(1));
937        assert!(cache.contains(1));
938        assert!(cache.get(1).is_some());
939    }
940
941    #[test]
942    fn test_cache_recovers_after_stats_lock_poisoning() {
943        let cache = std::sync::Arc::new(PageCacheShard::new(8));
944        let poison_target = std::sync::Arc::clone(&cache);
945        let _ = std::thread::spawn(move || {
946            let _guard = poison_target
947                .stats
948                .lock()
949                .expect("stats lock should be acquired");
950            panic!("poison stats lock");
951        })
952        .join();
953
954        assert!(cache.get(999).is_none());
955        assert_eq!(cache.stats().misses, 1);
956        cache.reset_stats();
957        assert_eq!(cache.stats().misses, 0);
958    }
959
960    /// Legacy single-lock baseline used as a regression check for the
961    /// sharded `PageCache`. Mirrors the pre-shard cache shape: a single
962    /// `RwLock<HashMap<u32, Page>>` so every mutation serializes on one
963    /// writer. We only need the surface used by the concurrency test
964    /// (`insert` / `get`); keeping it minimal avoids accidentally
965    /// drifting toward the sharded design and silently flattening the
966    /// regression signal.
967    mod legacy_baseline {
968        use super::Page;
969        use std::collections::HashMap;
970        use std::sync::RwLock;
971
972        pub struct LegacyPageCache {
973            entries: RwLock<HashMap<u32, Page>>,
974        }
975
976        impl LegacyPageCache {
977            pub fn new(_capacity: usize) -> Self {
978                Self {
979                    entries: RwLock::new(HashMap::new()),
980                }
981            }
982
983            pub fn insert(&self, page_id: u32, page: Page) {
984                let mut entries = self.entries.write().unwrap();
985                entries.insert(page_id, page);
986            }
987
988            pub fn get(&self, page_id: u32) -> Option<Page> {
989                let entries = self.entries.read().unwrap();
990                entries.get(&page_id).cloned()
991            }
992        }
993    }
994
995    /// Workload shared by both the sharded and legacy runs of the
996    /// concurrency property test below. Each worker churns through its
997    /// own disjoint `page_id` range so any contention seen between
998    /// workers is purely lock-induced, not data-induced.
999    fn run_workload<F>(workers: usize, ops_per_worker: usize, run: F) -> std::time::Duration
1000    where
1001        F: Fn(u32, &Page) + Send + Sync + 'static + Clone,
1002    {
1003        use std::sync::Arc;
1004        use std::time::Instant;
1005
1006        let run = Arc::new(run);
1007        let start = Instant::now();
1008        let mut handles = Vec::with_capacity(workers);
1009        for w in 0..workers {
1010            let run = Arc::clone(&run);
1011            handles.push(std::thread::spawn(move || {
1012                let base = (w as u32) * 1_000_000;
1013                let page = make_page(0);
1014                for i in 0..ops_per_worker {
1015                    let id = base + (i as u32);
1016                    run(id, &page);
1017                }
1018            }));
1019        }
1020        for h in handles {
1021            h.join().unwrap();
1022        }
1023        start.elapsed()
1024    }
1025
1026    #[test]
1027    fn test_sharded_cache_scales_concurrently() {
1028        // Property: with disjoint page_id ranges, 10 workers should
1029        // beat 1 worker by a meaningful factor on the sharded cache,
1030        // and the sharded cache should beat the legacy single-lock
1031        // baseline at 10 workers (since the baseline serializes every
1032        // mutation through one global RwLock).
1033        //
1034        // We assert "sub-linear scaling" in a deliberately loose form
1035        // (parallel < 7x serial) so CI flakiness on busy runners
1036        // doesn't false-positive. The legacy comparison is the
1037        // stronger regression signal.
1038        use std::sync::Arc;
1039
1040        const WORKERS: usize = 10;
1041        const OPS: usize = 5_000;
1042        const CAPACITY: usize = 200_000;
1043
1044        // Sharded: 1 worker baseline.
1045        let sharded = Arc::new(PageCache::new(CAPACITY));
1046        let s1 = Arc::clone(&sharded);
1047        let sharded_serial = run_workload(1, OPS * WORKERS, move |id, page| {
1048            s1.insert(id, page.clone());
1049            let _ = s1.get(id);
1050        });
1051
1052        // Sharded: WORKERS parallel.
1053        let sharded = Arc::new(PageCache::new(CAPACITY));
1054        let s2 = Arc::clone(&sharded);
1055        let sharded_parallel = run_workload(WORKERS, OPS, move |id, page| {
1056            s2.insert(id, page.clone());
1057            let _ = s2.get(id);
1058        });
1059
1060        // Legacy: WORKERS parallel.
1061        let legacy = Arc::new(legacy_baseline::LegacyPageCache::new(CAPACITY));
1062        let l2 = Arc::clone(&legacy);
1063        let legacy_parallel = run_workload(WORKERS, OPS, move |id, page| {
1064            l2.insert(id, page.clone());
1065            let _ = l2.get(id);
1066        });
1067
1068        eprintln!(
1069            "page_cache concurrency: sharded 1w={:?} sharded {}w={:?} legacy {}w={:?}",
1070            sharded_serial, WORKERS, sharded_parallel, WORKERS, legacy_parallel
1071        );
1072
1073        // Sub-linear scaling: parallel must not be worse than ~7x the
1074        // serial run. Linear (no scaling) would be ~10x.
1075        assert!(
1076            sharded_parallel.as_nanos() < sharded_serial.as_nanos() * 7,
1077            "sharded cache did not scale: 1w={:?} {}w={:?}",
1078            sharded_serial,
1079            WORKERS,
1080            sharded_parallel
1081        );
1082
1083        // Regression check: sharded must beat the legacy single-lock
1084        // baseline at WORKERS workers. We give the legacy a generous
1085        // 1.2x cushion so a noisy CI box doesn't flip the assertion
1086        // when both designs are within noise (would itself indicate a
1087        // sharding regression worth investigating).
1088        assert!(
1089            sharded_parallel.as_nanos() * 12 < legacy_parallel.as_nanos() * 10,
1090            "sharded cache did not beat legacy baseline: sharded {}w={:?} legacy {}w={:?}",
1091            WORKERS,
1092            sharded_parallel,
1093            WORKERS,
1094            legacy_parallel
1095        );
1096    }
1097}