Skip to main content

page_db/
pool.rs

1//! [`BufferPool`]: a bounded in-memory cache of pages over a [`PageStore`].
2//!
3//! The pool keeps a fixed number of frames resident, each holding one page. A
4//! caller asks for a page by id and gets back a [`PageGuard`] that pins the
5//! frame — a pinned frame is never evicted — and dropping the guard unpins it.
6//! Writing through the guard marks the frame dirty; a dirty frame is always
7//! flushed to the store before its frame is reused. Those two rules — never
8//! evict a pinned page, never lose a dirty page without flushing — are the
9//! invariants the property tests and the loom model checks hold the pool to.
10//!
11//! Eviction is the clock (second-chance) algorithm: each frame carries a
12//! reference bit set when it is touched; the clock hand sweeps frames, clearing
13//! set bits and skipping pinned frames, and reuses the first unset, unpinned
14//! frame it finds. If every frame is pinned, admission fails with
15//! [`PageError::BufferPoolExhausted`] rather than evicting something it must
16//! not.
17//!
18//! For v0.3.0 the pool serializes its bookkeeping — and the miss-path store I/O
19//! — under a single mutex. That keeps the pin/evict/dirty logic small enough to
20//! verify exhaustively with loom; sharding the table to remove the single-lock
21//! bottleneck is a later, measured change behind the same API.
22
23use std::collections::HashMap;
24use std::ops::{Deref, DerefMut};
25use std::path::Path;
26
27use crate::error::{PageError, PageResult};
28use crate::file::PageFile;
29use crate::page::{Page, PageId, PageSize};
30use crate::store::PageStore;
31use crate::sync::{self, Arc, AtomicBool, AtomicU64, AtomicUsize, Mutex, Ordering, RwLock};
32use crate::sync::{RwLockReadGuard, RwLockWriteGuard};
33
34/// Sentinel id for a frame that holds no page yet.
35const NO_PAGE: u64 = u64::MAX;
36
37/// One cache frame: a reusable page buffer plus its residency bookkeeping.
38struct FrameInner {
39    /// The page bytes. Reused in place across the pages that occupy this frame.
40    page: RwLock<Page>,
41    /// The id of the page currently resident, or [`NO_PAGE`].
42    id: AtomicU64,
43    /// Outstanding pins. A frame with `pin > 0` is never evicted.
44    pin: AtomicUsize,
45    /// Set when the resident page has unflushed modifications.
46    dirty: AtomicBool,
47    /// The clock reference bit: set on access, cleared by the clock sweep.
48    referenced: AtomicBool,
49}
50
51impl FrameInner {
52    fn new(page: Page) -> Self {
53        Self {
54            page: RwLock::new(page),
55            id: AtomicU64::new(NO_PAGE),
56            pin: AtomicUsize::new(0),
57            dirty: AtomicBool::new(false),
58            referenced: AtomicBool::new(false),
59        }
60    }
61
62    #[inline]
63    fn resident_id(&self) -> PageId {
64        PageId::new(self.id.load(Ordering::Acquire))
65    }
66}
67
68/// The pool's mutable bookkeeping, guarded by one mutex.
69struct Core {
70    /// Resident page id to frame index.
71    map: HashMap<PageId, usize>,
72    /// Frame indices never yet filled, available without eviction.
73    free: Vec<usize>,
74    /// The clock hand.
75    hand: usize,
76}
77
78/// A bounded cache of pages over a [`PageStore`].
79///
80/// `BufferPool<S>` is generic over its backing store; the default is
81/// [`PageFile`], so `BufferPool` without a type parameter is a pool over a file
82/// of pages. The handle is `Send + Sync` and every method takes `&self`, so it
83/// is shared across threads behind an `Arc` with no outer lock.
84///
85/// # Examples
86///
87/// ```
88/// use page_db::{BufferPool, PageId, Lsn, DEFAULT_PAGE_SIZE};
89///
90/// # let dir = tempfile::tempdir().unwrap();
91/// # let path = dir.path().join("data.pages");
92/// // A pool of 128 frames over a 4 KiB-page file.
93/// let pool = BufferPool::open(&path, DEFAULT_PAGE_SIZE, 128)?;
94///
95/// // Create page 0, write to it (which marks it dirty), then release the pin.
96/// {
97///     let guard = pool.new_page(PageId::new(0))?;
98///     let mut page = guard.write();
99///     page.set_lsn(Lsn::new(1));
100///     page.payload_mut()[..5].copy_from_slice(b"hello");
101/// }
102///
103/// // Flush dirty pages to the file and make them durable.
104/// pool.flush_all()?;
105/// pool.sync()?;
106///
107/// // Fetch it back — served from cache if resident, else read from the file.
108/// let guard = pool.fetch(PageId::new(0))?;
109/// assert_eq!(&guard.read().payload()[..5], b"hello");
110/// # Ok::<(), page_db::PageError>(())
111/// ```
112pub struct BufferPool<S = PageFile> {
113    store: S,
114    frames: Vec<Arc<FrameInner>>,
115    core: Mutex<Core>,
116    capacity: usize,
117}
118
119impl BufferPool<PageFile> {
120    /// Open a page file and wrap it in a pool of `capacity` frames.
121    ///
122    /// A convenience over [`PageFile::open`] followed by [`BufferPool::new`].
123    ///
124    /// # Errors
125    ///
126    /// Returns [`PageError::Io`] if the file cannot be opened.
127    pub fn open<P: AsRef<Path>>(path: P, page_size: PageSize, capacity: usize) -> PageResult<Self> {
128        let file = PageFile::open(path, page_size)?;
129        Ok(Self::new(file, capacity))
130    }
131}
132
133impl<S: PageStore> BufferPool<S> {
134    /// Build a pool of `capacity` frames over `store`.
135    ///
136    /// `capacity` is the number of pages held resident; it is clamped up to at
137    /// least one. The frame buffers are allocated once, here — the pool does no
138    /// per-request allocation on the hot path.
139    #[must_use]
140    pub fn new(store: S, capacity: usize) -> Self {
141        let capacity = capacity.max(1);
142        let mut frames = Vec::with_capacity(capacity);
143        for _ in 0..capacity {
144            frames.push(Arc::new(FrameInner::new(store.allocate_page())));
145        }
146        let free = (0..capacity).collect();
147        Self {
148            store,
149            frames,
150            core: Mutex::new(Core {
151                map: HashMap::with_capacity(capacity),
152                free,
153                hand: 0,
154            }),
155            capacity,
156        }
157    }
158
159    /// The number of frames in the pool.
160    #[inline]
161    #[must_use]
162    pub fn capacity(&self) -> usize {
163        self.capacity
164    }
165
166    /// The number of pages currently resident.
167    #[must_use]
168    pub fn resident_len(&self) -> usize {
169        sync::lock(&self.core).map.len()
170    }
171
172    /// Whether page `id` is currently held in the pool.
173    #[must_use]
174    pub fn is_resident(&self, id: PageId) -> bool {
175        sync::lock(&self.core).map.contains_key(&id)
176    }
177
178    /// Fetch the page at `id`, pinning it and returning a guard.
179    ///
180    /// Served from cache if resident; otherwise a frame is found (a free one, or
181    /// an evicted victim, flushing it first if dirty) and the page is read from
182    /// the store into it. The returned [`PageGuard`] holds a pin for its
183    /// lifetime.
184    ///
185    /// # Errors
186    ///
187    /// - [`PageError::BufferPoolExhausted`] if every frame is pinned.
188    /// - Whatever the store's read returns (for a file:
189    ///   [`PageError::ShortRead`] past end-of-file, or an integrity error).
190    /// - [`PageError::Io`] if flushing an evicted dirty victim fails.
191    pub fn fetch(&self, id: PageId) -> PageResult<PageGuard> {
192        let mut core = sync::lock(&self.core);
193
194        if let Some(&slot) = core.map.get(&id) {
195            let frame = self.frames[slot].clone();
196            let _ = frame.pin.fetch_add(1, Ordering::AcqRel);
197            frame.referenced.store(true, Ordering::Release);
198            return Ok(PageGuard { frame });
199        }
200
201        let slot = self.take_slot(&mut core)?;
202        {
203            let frame = &self.frames[slot];
204            let mut page = sync::write(&frame.page);
205            if let Err(err) = self.store.read_into(id, &mut page) {
206                drop(page);
207                core.free.push(slot);
208                return Err(err);
209            }
210        }
211        Ok(self.install(&mut core, slot, id, false))
212    }
213
214    /// Introduce a fresh, zeroed page at `id`, pinning it and returning a guard.
215    ///
216    /// The page is created in memory and marked dirty, so it is written to the
217    /// store on the next flush; no read is performed. If `id` is already
218    /// resident it is reset to a blank page. The caller chooses the id — the
219    /// free-list allocator that picks ids is a later release.
220    ///
221    /// # Errors
222    ///
223    /// - [`PageError::BufferPoolExhausted`] if every frame is pinned.
224    /// - [`PageError::Io`] if flushing an evicted dirty victim fails.
225    pub fn new_page(&self, id: PageId) -> PageResult<PageGuard> {
226        let mut core = sync::lock(&self.core);
227
228        if let Some(&slot) = core.map.get(&id) {
229            let frame = self.frames[slot].clone();
230            sync::write(&frame.page).reset();
231            let _ = frame.pin.fetch_add(1, Ordering::AcqRel);
232            frame.dirty.store(true, Ordering::Release);
233            frame.referenced.store(true, Ordering::Release);
234            return Ok(PageGuard { frame });
235        }
236
237        let slot = self.take_slot(&mut core)?;
238        sync::write(&self.frames[slot].page).reset();
239        Ok(self.install(&mut core, slot, id, true))
240    }
241
242    /// Flush page `id` to the store if it is resident and dirty.
243    ///
244    /// This places the bytes in the store; call [`sync`](BufferPool::sync) to
245    /// make them durable. Do not call this while holding a write guard to the
246    /// same page on the same thread — flushing takes the frame's lock.
247    ///
248    /// # Errors
249    ///
250    /// Whatever the store's write returns.
251    pub fn flush(&self, id: PageId) -> PageResult<()> {
252        let core = sync::lock(&self.core);
253        if let Some(&slot) = core.map.get(&id) {
254            self.flush_slot(slot, id)?;
255        }
256        Ok(())
257    }
258
259    /// Flush every dirty resident page to the store.
260    ///
261    /// # Errors
262    ///
263    /// Whatever the store's write returns. On error, some pages may already have
264    /// been flushed; the operation is safe to retry.
265    pub fn flush_all(&self) -> PageResult<()> {
266        let core = sync::lock(&self.core);
267        for (&id, &slot) in core.map.iter() {
268            self.flush_slot(slot, id)?;
269        }
270        Ok(())
271    }
272
273    /// Flush all dirty pages, then make the store durable.
274    ///
275    /// Equivalent to [`flush_all`](BufferPool::flush_all) followed by
276    /// [`sync`](BufferPool::sync) — the common checkpoint sequence.
277    ///
278    /// # Errors
279    ///
280    /// Whatever flushing or the store's sync returns.
281    pub fn checkpoint(&self) -> PageResult<()> {
282        self.flush_all()?;
283        self.sync()
284    }
285
286    /// Make the store durable (the pages already written to it).
287    ///
288    /// This does not flush dirty cached pages first; use
289    /// [`flush_all`](BufferPool::flush_all) or
290    /// [`checkpoint`](BufferPool::checkpoint) for that.
291    ///
292    /// # Errors
293    ///
294    /// Whatever the store's sync returns.
295    pub fn sync(&self) -> PageResult<()> {
296        self.store.sync()
297    }
298
299    /// Install a freshly loaded or created page into `slot`, returning a pinned
300    /// guard. Caller holds `core` and has already populated the frame's buffer.
301    fn install(&self, core: &mut Core, slot: usize, id: PageId, dirty: bool) -> PageGuard {
302        let frame = &self.frames[slot];
303        frame.id.store(id.get(), Ordering::Release);
304        frame.dirty.store(dirty, Ordering::Release);
305        frame.referenced.store(true, Ordering::Release);
306        frame.pin.store(1, Ordering::Release);
307        let _ = core.map.insert(id, slot);
308        PageGuard {
309            frame: self.frames[slot].clone(),
310        }
311    }
312
313    /// Flush the page in `slot` (resident id `id`) if dirty.
314    fn flush_slot(&self, slot: usize, id: PageId) -> PageResult<()> {
315        let frame = &self.frames[slot];
316        if frame.dirty.load(Ordering::Acquire) {
317            let mut page = sync::write(&frame.page);
318            self.store.write_page(id, &mut page)?;
319            frame.dirty.store(false, Ordering::Release);
320        }
321        Ok(())
322    }
323
324    /// Obtain a frame slot to fill: a free one, or an evicted victim (flushed
325    /// first if dirty). Caller holds `core`.
326    fn take_slot(&self, core: &mut Core) -> PageResult<usize> {
327        if let Some(slot) = core.free.pop() {
328            return Ok(slot);
329        }
330        let slot = match self.find_victim(core) {
331            Some(slot) => slot,
332            None => {
333                return Err(PageError::BufferPoolExhausted {
334                    capacity: self.capacity,
335                });
336            }
337        };
338        let victim_id = self.frames[slot].resident_id();
339        self.flush_slot(slot, victim_id)?;
340        let _ = core.map.remove(&victim_id);
341        Ok(slot)
342    }
343
344    /// The clock sweep: return an unpinned frame to reuse, or `None` if all
345    /// frames are pinned. Caller holds `core`.
346    fn find_victim(&self, core: &mut Core) -> Option<usize> {
347        let n = self.capacity;
348        // Two full passes: the first clears reference bits, the second selects.
349        // If every frame stays pinned across both, the pool is exhausted.
350        let mut steps = 0;
351        while steps < 2 * n {
352            let slot = core.hand;
353            core.hand = (core.hand + 1) % n;
354            steps += 1;
355
356            let frame = &self.frames[slot];
357            if frame.pin.load(Ordering::Acquire) > 0 {
358                continue;
359            }
360            if frame.referenced.swap(false, Ordering::AcqRel) {
361                continue;
362            }
363            return Some(slot);
364        }
365        None
366    }
367}
368
369/// A pin on a cached page.
370///
371/// While a `PageGuard` is alive the page stays resident and unevictable. Read
372/// the page with [`read`](PageGuard::read) and write it with
373/// [`write`](PageGuard::write); taking a write guard marks the page dirty.
374/// Dropping the `PageGuard` releases the pin.
375pub struct PageGuard {
376    frame: Arc<FrameInner>,
377}
378
379impl PageGuard {
380    /// The id of the pinned page.
381    #[inline]
382    #[must_use]
383    pub fn id(&self) -> PageId {
384        self.frame.resident_id()
385    }
386
387    /// Whether the page has unflushed modifications.
388    #[inline]
389    #[must_use]
390    pub fn is_dirty(&self) -> bool {
391        self.frame.dirty.load(Ordering::Acquire)
392    }
393
394    /// Borrow the page for reading. Multiple readers of the same page proceed
395    /// concurrently.
396    #[inline]
397    #[must_use]
398    pub fn read(&self) -> PageRef<'_> {
399        PageRef {
400            guard: sync::read(&self.frame.page),
401        }
402    }
403
404    /// Borrow the page for writing, marking it dirty.
405    ///
406    /// The page is recorded dirty as soon as the write guard is taken, so it
407    /// will be flushed even if the actual mutation is conditional.
408    #[inline]
409    #[must_use]
410    pub fn write(&self) -> PageMut<'_> {
411        self.frame.dirty.store(true, Ordering::Release);
412        PageMut {
413            guard: sync::write(&self.frame.page),
414        }
415    }
416}
417
418impl Drop for PageGuard {
419    fn drop(&mut self) {
420        let _ = self.frame.pin.fetch_sub(1, Ordering::AcqRel);
421    }
422}
423
424impl std::fmt::Debug for PageGuard {
425    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
426        f.debug_struct("PageGuard")
427            .field("id", &self.id())
428            .field("dirty", &self.is_dirty())
429            .finish()
430    }
431}
432
433/// A shared read borrow of a pinned page. Dereferences to [`Page`].
434pub struct PageRef<'a> {
435    guard: RwLockReadGuard<'a, Page>,
436}
437
438impl Deref for PageRef<'_> {
439    type Target = Page;
440    #[inline]
441    fn deref(&self) -> &Page {
442        &self.guard
443    }
444}
445
446/// An exclusive write borrow of a pinned page. Dereferences to [`Page`].
447pub struct PageMut<'a> {
448    guard: RwLockWriteGuard<'a, Page>,
449}
450
451impl Deref for PageMut<'_> {
452    type Target = Page;
453    #[inline]
454    fn deref(&self) -> &Page {
455        &self.guard
456    }
457}
458
459impl DerefMut for PageMut<'_> {
460    #[inline]
461    fn deref_mut(&mut self) -> &mut Page {
462        &mut self.guard
463    }
464}
465
466#[cfg(all(test, not(loom)))]
467mod tests {
468    #![allow(clippy::unwrap_used, clippy::expect_used)]
469
470    use std::collections::HashMap;
471
472    use proptest::prelude::*;
473
474    use super::*;
475    use crate::page::Lsn;
476    use crate::test_store::MemStore;
477
478    const PS: usize = 4096;
479
480    fn pool(capacity: usize) -> BufferPool<MemStore> {
481        BufferPool::new(MemStore::new(PS), capacity)
482    }
483
484    #[test]
485    fn test_new_page_then_fetch_serves_from_cache() {
486        let pool = pool(8);
487        {
488            let guard = pool.new_page(PageId::new(0)).unwrap();
489            guard.write().payload_mut()[0] = 0x7A;
490        }
491        assert!(pool.is_resident(PageId::new(0)));
492        let guard = pool.fetch(PageId::new(0)).unwrap();
493        assert_eq!(guard.read().payload()[0], 0x7A);
494    }
495
496    #[test]
497    fn test_capacity_is_clamped_up_to_one() {
498        assert_eq!(pool(0).capacity(), 1);
499    }
500
501    #[test]
502    fn test_pinned_page_is_never_evicted() {
503        let pool = pool(1);
504        let _held = pool.new_page(PageId::new(0)).unwrap();
505        // The only frame is pinned, so admitting another page must fail rather
506        // than evict the pinned one.
507        assert!(matches!(
508            pool.new_page(PageId::new(1)),
509            Err(PageError::BufferPoolExhausted { capacity: 1 })
510        ));
511        assert!(pool.is_resident(PageId::new(0)));
512    }
513
514    #[test]
515    fn test_dirty_page_is_flushed_before_eviction() {
516        let pool = pool(1);
517        {
518            let guard = pool.new_page(PageId::new(0)).unwrap();
519            guard.write().set_lsn(Lsn::new(9));
520        } // page 0 is dirty and unpinned
521
522        // Force eviction of page 0 by reusing the only frame.
523        {
524            let _ = pool.new_page(PageId::new(1)).unwrap();
525        }
526        // Page 0 must have been written to the store on its way out.
527        assert!(pool.store_contains(0));
528        // And it reads back with the data it held.
529        let guard = pool.fetch(PageId::new(0)).unwrap();
530        assert_eq!(guard.read().lsn(), Lsn::new(9));
531    }
532
533    #[test]
534    fn test_clock_keeps_the_recently_used_page() {
535        let pool = pool(2);
536        let _ = pool.new_page(PageId::new(0)).unwrap();
537        let _ = pool.new_page(PageId::new(1)).unwrap();
538        pool.flush_all().unwrap();
539
540        // Touch 0 so its reference bit is set, then admit 2: the clock should
541        // evict 1 (untouched), not 0.
542        let _ = pool.fetch(PageId::new(0)).unwrap();
543        let _ = pool.new_page(PageId::new(2)).unwrap();
544
545        assert!(pool.is_resident(PageId::new(0)));
546        assert!(!pool.is_resident(PageId::new(1)));
547        assert!(pool.is_resident(PageId::new(2)));
548    }
549
550    #[test]
551    fn test_flush_clears_dirty() {
552        let pool = pool(4);
553        {
554            let guard = pool.new_page(PageId::new(0)).unwrap();
555            assert!(guard.is_dirty());
556        }
557        pool.flush(PageId::new(0)).unwrap();
558        let guard = pool.fetch(PageId::new(0)).unwrap();
559        assert!(!guard.is_dirty());
560    }
561
562    #[test]
563    fn test_fetch_missing_unwritten_page_errors() {
564        let pool = pool(4);
565        assert!(matches!(
566            pool.fetch(PageId::new(99)),
567            Err(PageError::ShortRead { .. })
568        ));
569        // A failed miss must not leak the frame it borrowed.
570        assert_eq!(pool.resident_len(), 0);
571        assert_eq!(pool.capacity(), 4);
572    }
573
574    proptest! {
575        #![proptest_config(ProptestConfig::with_cases(48))]
576
577        /// Through any sequence of fetches and dirtying writes against a pool
578        /// smaller than the working set, every page always reads back the last
579        /// value written to it — nothing is lost to eviction, nothing is stale.
580        #[test]
581        fn pool_never_loses_data(
582            ops in proptest::collection::vec((0u8..6, any::<u8>(), any::<bool>()), 1..200),
583        ) {
584            const N: u64 = 6;
585            let pool = pool(2);                       // 2 frames, 6 pages
586            let mut expected: HashMap<u64, u8> = HashMap::new();
587
588            // Seed every page so all fetches resolve.
589            for id in 0..N {
590                let guard = pool.new_page(PageId::new(id)).unwrap();
591                guard.write().payload_mut()[0] = 0;
592                let _ = expected.insert(id, 0);
593                drop(guard);
594            }
595            pool.flush_all().unwrap();
596
597            for (id, marker, write) in ops {
598                let id = id as u64 % N;
599                let guard = pool.fetch(PageId::new(id)).unwrap();
600                // Read must match the model.
601                prop_assert_eq!(guard.read().payload()[0], expected[&id]);
602                if write {
603                    guard.write().payload_mut()[0] = marker;
604                    let _ = expected.insert(id, marker);
605                }
606            }
607
608            // After a checkpoint, a cold reread of every page still matches.
609            pool.flush_all().unwrap();
610            for id in 0..N {
611                let guard = pool.fetch(PageId::new(id)).unwrap();
612                prop_assert_eq!(guard.read().payload()[0], expected[&id]);
613            }
614        }
615    }
616
617    // Helper hook used by the eviction test, kept here so the store stays
618    // private to the crate.
619    impl BufferPool<MemStore> {
620        fn store_contains(&self, id: u64) -> bool {
621            self.store.contains(id)
622        }
623    }
624}
625
626#[cfg(all(test, loom))]
627mod loom_tests {
628    use super::*;
629    use crate::sync::Arc;
630    use crate::test_store::MemStore;
631
632    /// A pinned page is never evicted: while one thread holds a pin on the only
633    /// frame, another thread's attempt to admit a different page fails rather
634    /// than evicting the pinned one, under every interleaving.
635    #[test]
636    fn loom_pinned_page_never_evicted() {
637        loom::model(|| {
638            let pool = Arc::new(BufferPool::new(MemStore::new(4096), 1));
639            let held = pool.new_page(PageId::new(0)).unwrap();
640
641            let p = Arc::clone(&pool);
642            let other = loom::thread::spawn(move || p.new_page(PageId::new(1)).is_err());
643
644            // The pinned page stays resident no matter how the threads interleave.
645            assert!(pool.is_resident(PageId::new(0)));
646            let admit_failed = other.join().unwrap();
647            assert!(admit_failed);
648            assert_eq!(held.id(), PageId::new(0));
649            drop(held);
650        });
651    }
652
653    /// A dirty page is never lost: when an unpinned dirty page is evicted to
654    /// make room, it is flushed to the store first, under every interleaving.
655    #[test]
656    fn loom_dirty_page_flushed_on_eviction() {
657        loom::model(|| {
658            let store_pages = {
659                let pool = Arc::new(BufferPool::new(MemStore::new(4096), 1));
660                {
661                    let guard = pool.new_page(PageId::new(0)).unwrap();
662                    guard.write().payload_mut()[0] = 0x5A;
663                }
664
665                let p = Arc::clone(&pool);
666                let t = loom::thread::spawn(move || {
667                    // Admitting page 1 reuses the only frame, evicting page 0,
668                    // which is dirty and so must be flushed first.
669                    let _ = p.new_page(PageId::new(1)).unwrap();
670                });
671                t.join().unwrap();
672                pool.store_contains_loom(0)
673            };
674            assert!(store_pages, "evicted dirty page 0 was not flushed");
675        });
676    }
677
678    impl BufferPool<MemStore> {
679        fn store_contains_loom(&self, id: u64) -> bool {
680            self.store.contains(id)
681        }
682    }
683}