Skip to main content

page_db/
pool.rs

1//! [`BufferPool`]: a bounded in-memory cache of pages over a [`PageStore`].
2//!
3//! The pool keeps a fixed number of frames resident, each holding one page. A
4//! caller asks for a page by id and gets back a [`PageGuard`] that pins the
5//! frame — a pinned frame is never evicted — and dropping the guard unpins it.
6//! Writing through the guard marks the frame dirty; a dirty frame is always
7//! flushed to the store before its frame is reused. Those two rules — never
8//! evict a pinned page, never lose a dirty page without flushing — are the
9//! invariants the property tests and the loom model checks hold the pool to.
10//!
11//! Eviction is the clock (second-chance) algorithm: each frame carries a
12//! reference bit set when it is touched; the clock hand sweeps frames, clearing
13//! set bits and skipping pinned frames, and reuses the first unset, unpinned
14//! frame it finds. If every frame is pinned, admission fails with
15//! [`PageError::BufferPoolExhausted`] rather than evicting something it must
16//! not.
17//!
18//! For v0.3.0 the pool serializes its bookkeeping — and the miss-path store I/O
19//! — under a single mutex. That keeps the pin/evict/dirty logic small enough to
20//! verify exhaustively with loom; sharding the table to remove the single-lock
21//! bottleneck is a later, measured change behind the same API.
22
23use std::collections::HashMap;
24use std::ops::{Deref, DerefMut};
25use std::path::Path;
26
27use crate::error::{PageError, PageResult};
28use crate::file::PageFile;
29use crate::page::{Page, PageId, PageSize};
30use crate::store::PageStore;
31use crate::sync::{self, Arc, AtomicBool, AtomicU64, AtomicUsize, Mutex, Ordering, RwLock};
32use crate::sync::{RwLockReadGuard, RwLockWriteGuard};
33
34/// Sentinel id for a frame that holds no page yet.
35const NO_PAGE: u64 = u64::MAX;
36
37/// One cache frame: a reusable page buffer plus its residency bookkeeping.
38struct FrameInner {
39    /// The page bytes. Reused in place across the pages that occupy this frame.
40    page: RwLock<Page>,
41    /// The id of the page currently resident, or [`NO_PAGE`].
42    id: AtomicU64,
43    /// Outstanding pins. A frame with `pin > 0` is never evicted.
44    pin: AtomicUsize,
45    /// Set when the resident page has unflushed modifications.
46    dirty: AtomicBool,
47    /// The clock reference bit: set on access, cleared by the clock sweep.
48    referenced: AtomicBool,
49}
50
51impl FrameInner {
52    fn new(page: Page) -> Self {
53        Self {
54            page: RwLock::new(page),
55            id: AtomicU64::new(NO_PAGE),
56            pin: AtomicUsize::new(0),
57            dirty: AtomicBool::new(false),
58            referenced: AtomicBool::new(false),
59        }
60    }
61
62    #[inline]
63    fn resident_id(&self) -> PageId {
64        PageId::new(self.id.load(Ordering::Acquire))
65    }
66}
67
68/// The pool's mutable bookkeeping, guarded by one mutex.
69struct Core {
70    /// Resident page id to frame index.
71    map: HashMap<PageId, usize>,
72    /// Frame indices never yet filled, available without eviction.
73    free: Vec<usize>,
74    /// The clock hand.
75    hand: usize,
76}
77
78/// A bounded cache of pages over a [`PageStore`].
79///
80/// `BufferPool<S>` is generic over its backing store; the default is
81/// [`PageFile`], so `BufferPool` without a type parameter is a pool over a file
82/// of pages. The handle is `Send + Sync` and every method takes `&self`, so it
83/// is shared across threads behind an `Arc` with no outer lock.
84///
85/// # Examples
86///
87/// ```
88/// use page_db::{BufferPool, PageId, Lsn, DEFAULT_PAGE_SIZE};
89///
90/// # let dir = tempfile::tempdir().unwrap();
91/// # let path = dir.path().join("data.pages");
92/// // A pool of 128 frames over a 4 KiB-page file.
93/// let pool = BufferPool::open(&path, DEFAULT_PAGE_SIZE, 128)?;
94///
95/// // Create page 0, write to it (which marks it dirty), then release the pin.
96/// {
97///     let guard = pool.new_page(PageId::new(0))?;
98///     let mut page = guard.write();
99///     page.set_lsn(Lsn::new(1));
100///     page.payload_mut()[..5].copy_from_slice(b"hello");
101/// }
102///
103/// // Flush dirty pages to the file and make them durable.
104/// pool.flush_all()?;
105/// pool.sync()?;
106///
107/// // Fetch it back — served from cache if resident, else read from the file.
108/// let guard = pool.fetch(PageId::new(0))?;
109/// assert_eq!(&guard.read().payload()[..5], b"hello");
110/// # Ok::<(), page_db::PageError>(())
111/// ```
112pub struct BufferPool<S = PageFile> {
113    store: S,
114    frames: Vec<Arc<FrameInner>>,
115    core: Mutex<Core>,
116    capacity: usize,
117}
118
119impl BufferPool<PageFile> {
120    /// Open a page file and wrap it in a pool of `capacity` frames.
121    ///
122    /// A convenience over [`PageFile::open`] followed by [`BufferPool::new`].
123    ///
124    /// # Errors
125    ///
126    /// Returns [`PageError::Io`] if the file cannot be opened.
127    pub fn open<P: AsRef<Path>>(path: P, page_size: PageSize, capacity: usize) -> PageResult<Self> {
128        let file = PageFile::open(path, page_size)?;
129        Ok(Self::new(file, capacity))
130    }
131}
132
133impl<S: PageStore> BufferPool<S> {
134    /// Build a pool of `capacity` frames over `store`.
135    ///
136    /// `capacity` is the number of pages held resident; it is clamped up to at
137    /// least one. The frame buffers are allocated once, here — the pool does no
138    /// per-request allocation on the hot path.
139    #[must_use]
140    pub fn new(store: S, capacity: usize) -> Self {
141        let capacity = capacity.max(1);
142        let mut frames = Vec::with_capacity(capacity);
143        for _ in 0..capacity {
144            frames.push(Arc::new(FrameInner::new(store.allocate_page())));
145        }
146        let free = (0..capacity).collect();
147        Self {
148            store,
149            frames,
150            core: Mutex::new(Core {
151                map: HashMap::with_capacity(capacity),
152                free,
153                hand: 0,
154            }),
155            capacity,
156        }
157    }
158
159    /// The number of frames in the pool.
160    #[inline]
161    #[must_use]
162    pub fn capacity(&self) -> usize {
163        self.capacity
164    }
165
166    /// The number of pages currently resident.
167    #[must_use]
168    pub fn resident_len(&self) -> usize {
169        sync::lock(&self.core).map.len()
170    }
171
172    /// Whether page `id` is currently held in the pool.
173    #[must_use]
174    pub fn is_resident(&self, id: PageId) -> bool {
175        sync::lock(&self.core).map.contains_key(&id)
176    }
177
178    /// Fetch the page at `id`, pinning it and returning a guard.
179    ///
180    /// Served from cache if resident; otherwise a frame is found (a free one, or
181    /// an evicted victim, flushing it first if dirty) and the page is read from
182    /// the store into it. The returned [`PageGuard`] holds a pin for its
183    /// lifetime.
184    ///
185    /// # Errors
186    ///
187    /// - [`PageError::BufferPoolExhausted`] if every frame is pinned.
188    /// - Whatever the store's read returns (for a file:
189    ///   [`PageError::ShortRead`] past end-of-file, or an integrity error).
190    /// - [`PageError::Io`] if flushing an evicted dirty victim fails.
191    pub fn fetch(&self, id: PageId) -> PageResult<PageGuard> {
192        let mut core = sync::lock(&self.core);
193
194        if let Some(&slot) = core.map.get(&id) {
195            let frame = self.frames[slot].clone();
196            let _ = frame.pin.fetch_add(1, Ordering::AcqRel);
197            frame.referenced.store(true, Ordering::Release);
198            return Ok(PageGuard { frame });
199        }
200
201        let slot = self.take_slot(&mut core)?;
202        {
203            let frame = &self.frames[slot];
204            let mut page = sync::write(&frame.page);
205            if let Err(err) = self.store.read_into(id, &mut page) {
206                drop(page);
207                core.free.push(slot);
208                return Err(err);
209            }
210        }
211        Ok(self.install(&mut core, slot, id, false))
212    }
213
214    /// Introduce a fresh, zeroed page at `id`, pinning it and returning a guard.
215    ///
216    /// The page is created in memory and marked dirty, so it is written to the
217    /// store on the next flush; no read is performed. If `id` is already
218    /// resident it is reset to a blank page. The caller chooses the id — the
219    /// free-list allocator that picks ids is a later release.
220    ///
221    /// # Errors
222    ///
223    /// - [`PageError::BufferPoolExhausted`] if every frame is pinned.
224    /// - [`PageError::Io`] if flushing an evicted dirty victim fails.
225    pub fn new_page(&self, id: PageId) -> PageResult<PageGuard> {
226        let mut core = sync::lock(&self.core);
227
228        if let Some(&slot) = core.map.get(&id) {
229            let frame = self.frames[slot].clone();
230            sync::write(&frame.page).reset();
231            let _ = frame.pin.fetch_add(1, Ordering::AcqRel);
232            frame.dirty.store(true, Ordering::Release);
233            frame.referenced.store(true, Ordering::Release);
234            return Ok(PageGuard { frame });
235        }
236
237        let slot = self.take_slot(&mut core)?;
238        sync::write(&self.frames[slot].page).reset();
239        Ok(self.install(&mut core, slot, id, true))
240    }
241
242    /// Flush page `id` to the store if it is resident and dirty.
243    ///
244    /// This places the bytes in the store; call [`sync`](BufferPool::sync) to
245    /// make them durable. Do not call this while holding a write guard to the
246    /// same page on the same thread — flushing takes the frame's lock.
247    ///
248    /// # Errors
249    ///
250    /// Whatever the store's write returns.
251    pub fn flush(&self, id: PageId) -> PageResult<()> {
252        let core = sync::lock(&self.core);
253        if let Some(&slot) = core.map.get(&id) {
254            self.flush_slot(slot, id)?;
255        }
256        Ok(())
257    }
258
259    /// Flush every dirty resident page to the store.
260    ///
261    /// # Errors
262    ///
263    /// Whatever the store's write returns. On error, some pages may already have
264    /// been flushed; the operation is safe to retry.
265    pub fn flush_all(&self) -> PageResult<()> {
266        let core = sync::lock(&self.core);
267        for (&id, &slot) in core.map.iter() {
268            self.flush_slot(slot, id)?;
269        }
270        Ok(())
271    }
272
273    /// Flush all dirty pages, then make the store durable.
274    ///
275    /// Equivalent to [`flush_all`](BufferPool::flush_all) followed by
276    /// [`sync`](BufferPool::sync) — the common checkpoint sequence.
277    ///
278    /// # Errors
279    ///
280    /// Whatever flushing or the store's sync returns.
281    pub fn checkpoint(&self) -> PageResult<()> {
282        self.flush_all()?;
283        self.sync()
284    }
285
286    /// Make the store durable (the pages already written to it).
287    ///
288    /// This does not flush dirty cached pages first; use
289    /// [`flush_all`](BufferPool::flush_all) or
290    /// [`checkpoint`](BufferPool::checkpoint) for that.
291    ///
292    /// # Errors
293    ///
294    /// Whatever the store's sync returns.
295    pub fn sync(&self) -> PageResult<()> {
296        self.store.sync()
297    }
298
299    /// Install a freshly loaded or created page into `slot`, returning a pinned
300    /// guard. Caller holds `core` and has already populated the frame's buffer.
301    fn install(&self, core: &mut Core, slot: usize, id: PageId, dirty: bool) -> PageGuard {
302        let frame = &self.frames[slot];
303        frame.id.store(id.get(), Ordering::Release);
304        frame.dirty.store(dirty, Ordering::Release);
305        frame.referenced.store(true, Ordering::Release);
306        frame.pin.store(1, Ordering::Release);
307        let _ = core.map.insert(id, slot);
308        PageGuard {
309            frame: self.frames[slot].clone(),
310        }
311    }
312
313    /// Flush the page in `slot` (resident id `id`) if dirty.
314    fn flush_slot(&self, slot: usize, id: PageId) -> PageResult<()> {
315        let frame = &self.frames[slot];
316        if frame.dirty.load(Ordering::Acquire) {
317            let mut page = sync::write(&frame.page);
318            self.store.write_page(id, &mut page)?;
319            frame.dirty.store(false, Ordering::Release);
320        }
321        Ok(())
322    }
323
324    /// Obtain a frame slot to fill: a free one, or an evicted victim (flushed
325    /// first if dirty). Caller holds `core`.
326    fn take_slot(&self, core: &mut Core) -> PageResult<usize> {
327        if let Some(slot) = core.free.pop() {
328            return Ok(slot);
329        }
330        let slot = match self.find_victim(core) {
331            Some(slot) => slot,
332            None => {
333                return Err(PageError::BufferPoolExhausted {
334                    capacity: self.capacity,
335                });
336            }
337        };
338        let victim_id = self.frames[slot].resident_id();
339        self.flush_slot(slot, victim_id)?;
340        let _ = core.map.remove(&victim_id);
341        Ok(slot)
342    }
343
344    /// The clock sweep: return an unpinned frame to reuse, or `None` if all
345    /// frames are pinned. Caller holds `core`.
346    fn find_victim(&self, core: &mut Core) -> Option<usize> {
347        let n = self.capacity;
348        // Two full passes: the first clears reference bits, the second selects.
349        // If every frame stays pinned across both, the pool is exhausted.
350        let mut steps = 0;
351        while steps < 2 * n {
352            let slot = core.hand;
353            core.hand = (core.hand + 1) % n;
354            steps += 1;
355
356            let frame = &self.frames[slot];
357            if frame.pin.load(Ordering::Acquire) > 0 {
358                continue;
359            }
360            if frame.referenced.swap(false, Ordering::AcqRel) {
361                continue;
362            }
363            return Some(slot);
364        }
365        None
366    }
367}
368
369/// A pin on a cached page.
370///
371/// While a `PageGuard` is alive the page stays resident and unevictable. Read
372/// the page with [`read`](PageGuard::read) and write it with
373/// [`write`](PageGuard::write); taking a write guard marks the page dirty.
374/// Dropping the `PageGuard` releases the pin.
375pub struct PageGuard {
376    frame: Arc<FrameInner>,
377}
378
379impl PageGuard {
380    /// The id of the pinned page.
381    #[inline]
382    #[must_use]
383    pub fn id(&self) -> PageId {
384        self.frame.resident_id()
385    }
386
387    /// Whether the page has unflushed modifications.
388    #[inline]
389    #[must_use]
390    pub fn is_dirty(&self) -> bool {
391        self.frame.dirty.load(Ordering::Acquire)
392    }
393
394    /// Borrow the page for reading. Multiple readers of the same page proceed
395    /// concurrently.
396    #[inline]
397    #[must_use]
398    pub fn read(&self) -> PageRef<'_> {
399        PageRef {
400            guard: sync::read(&self.frame.page),
401        }
402    }
403
404    /// Borrow the page for writing, marking it dirty.
405    ///
406    /// The page is recorded dirty as soon as the write guard is taken, so it
407    /// will be flushed even if the actual mutation is conditional.
408    #[inline]
409    #[must_use]
410    pub fn write(&self) -> PageMut<'_> {
411        self.frame.dirty.store(true, Ordering::Release);
412        PageMut {
413            guard: sync::write(&self.frame.page),
414        }
415    }
416}
417
418impl Drop for PageGuard {
419    fn drop(&mut self) {
420        let _ = self.frame.pin.fetch_sub(1, Ordering::AcqRel);
421    }
422}
423
424impl std::fmt::Debug for PageGuard {
425    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
426        f.debug_struct("PageGuard")
427            .field("id", &self.id())
428            .field("dirty", &self.is_dirty())
429            .finish()
430    }
431}
432
433/// A shared read borrow of a pinned page. Dereferences to [`Page`].
434pub struct PageRef<'a> {
435    guard: RwLockReadGuard<'a, Page>,
436}
437
438impl Deref for PageRef<'_> {
439    type Target = Page;
440    #[inline]
441    fn deref(&self) -> &Page {
442        &self.guard
443    }
444}
445
446/// An exclusive write borrow of a pinned page. Dereferences to [`Page`].
447pub struct PageMut<'a> {
448    guard: RwLockWriteGuard<'a, Page>,
449}
450
451impl Deref for PageMut<'_> {
452    type Target = Page;
453    #[inline]
454    fn deref(&self) -> &Page {
455        &self.guard
456    }
457}
458
459impl DerefMut for PageMut<'_> {
460    #[inline]
461    fn deref_mut(&mut self) -> &mut Page {
462        &mut self.guard
463    }
464}
465
466#[cfg(test)]
467mod test_support {
468    #![allow(clippy::unwrap_used, clippy::expect_used)]
469
470    use std::collections::HashMap;
471
472    use crate::error::{PageError, PageResult};
473    use crate::page::{Page, PageId, PageSize};
474    use crate::store::PageStore;
475    use crate::sync::{self, Mutex};
476
477    /// An in-memory [`PageStore`] for tests and loom models. It records the
478    /// checksummed bytes of every written page, so a test can assert that an
479    /// evicted dirty page really was flushed before its frame was reused.
480    pub(super) struct MemStore {
481        page_size: usize,
482        pages: Mutex<HashMap<u64, Vec<u8>>>,
483    }
484
485    impl MemStore {
486        pub(super) fn new(page_size: usize) -> Self {
487            Self {
488                page_size,
489                pages: Mutex::new(HashMap::new()),
490            }
491        }
492
493        pub(super) fn contains(&self, id: u64) -> bool {
494            sync::lock(&self.pages).contains_key(&id)
495        }
496    }
497
498    impl PageStore for MemStore {
499        fn page_size(&self) -> usize {
500            self.page_size
501        }
502
503        fn allocate_page(&self) -> Page {
504            Page::new(PageSize::new(self.page_size).unwrap())
505        }
506
507        fn read_into(&self, id: PageId, page: &mut Page) -> PageResult<()> {
508            let pages = sync::lock(&self.pages);
509            match pages.get(&id.get()) {
510                Some(bytes) => {
511                    page.as_bytes_mut().copy_from_slice(bytes);
512                    page.verify(Some(id))
513                }
514                None => Err(PageError::ShortRead {
515                    page_id: id.get(),
516                    got: 0,
517                    page_size: self.page_size,
518                }),
519            }
520        }
521
522        fn write_page(&self, id: PageId, page: &mut Page) -> PageResult<()> {
523            page.stamp(id);
524            let mut pages = sync::lock(&self.pages);
525            let _ = pages.insert(id.get(), page.as_bytes().to_vec());
526            Ok(())
527        }
528
529        fn sync(&self) -> PageResult<()> {
530            Ok(())
531        }
532    }
533}
534
535#[cfg(all(test, not(loom)))]
536mod tests {
537    #![allow(clippy::unwrap_used, clippy::expect_used)]
538
539    use std::collections::HashMap;
540
541    use proptest::prelude::*;
542
543    use super::test_support::MemStore;
544    use super::*;
545    use crate::page::Lsn;
546
547    const PS: usize = 4096;
548
549    fn pool(capacity: usize) -> BufferPool<MemStore> {
550        BufferPool::new(MemStore::new(PS), capacity)
551    }
552
553    #[test]
554    fn test_new_page_then_fetch_serves_from_cache() {
555        let pool = pool(8);
556        {
557            let guard = pool.new_page(PageId::new(0)).unwrap();
558            guard.write().payload_mut()[0] = 0x7A;
559        }
560        assert!(pool.is_resident(PageId::new(0)));
561        let guard = pool.fetch(PageId::new(0)).unwrap();
562        assert_eq!(guard.read().payload()[0], 0x7A);
563    }
564
565    #[test]
566    fn test_capacity_is_clamped_up_to_one() {
567        assert_eq!(pool(0).capacity(), 1);
568    }
569
570    #[test]
571    fn test_pinned_page_is_never_evicted() {
572        let pool = pool(1);
573        let _held = pool.new_page(PageId::new(0)).unwrap();
574        // The only frame is pinned, so admitting another page must fail rather
575        // than evict the pinned one.
576        assert!(matches!(
577            pool.new_page(PageId::new(1)),
578            Err(PageError::BufferPoolExhausted { capacity: 1 })
579        ));
580        assert!(pool.is_resident(PageId::new(0)));
581    }
582
583    #[test]
584    fn test_dirty_page_is_flushed_before_eviction() {
585        let pool = pool(1);
586        {
587            let guard = pool.new_page(PageId::new(0)).unwrap();
588            guard.write().set_lsn(Lsn::new(9));
589        } // page 0 is dirty and unpinned
590
591        // Force eviction of page 0 by reusing the only frame.
592        {
593            let _ = pool.new_page(PageId::new(1)).unwrap();
594        }
595        // Page 0 must have been written to the store on its way out.
596        assert!(pool.store_contains(0));
597        // And it reads back with the data it held.
598        let guard = pool.fetch(PageId::new(0)).unwrap();
599        assert_eq!(guard.read().lsn(), Lsn::new(9));
600    }
601
602    #[test]
603    fn test_clock_keeps_the_recently_used_page() {
604        let pool = pool(2);
605        let _ = pool.new_page(PageId::new(0)).unwrap();
606        let _ = pool.new_page(PageId::new(1)).unwrap();
607        pool.flush_all().unwrap();
608
609        // Touch 0 so its reference bit is set, then admit 2: the clock should
610        // evict 1 (untouched), not 0.
611        let _ = pool.fetch(PageId::new(0)).unwrap();
612        let _ = pool.new_page(PageId::new(2)).unwrap();
613
614        assert!(pool.is_resident(PageId::new(0)));
615        assert!(!pool.is_resident(PageId::new(1)));
616        assert!(pool.is_resident(PageId::new(2)));
617    }
618
619    #[test]
620    fn test_flush_clears_dirty() {
621        let pool = pool(4);
622        {
623            let guard = pool.new_page(PageId::new(0)).unwrap();
624            assert!(guard.is_dirty());
625        }
626        pool.flush(PageId::new(0)).unwrap();
627        let guard = pool.fetch(PageId::new(0)).unwrap();
628        assert!(!guard.is_dirty());
629    }
630
631    #[test]
632    fn test_fetch_missing_unwritten_page_errors() {
633        let pool = pool(4);
634        assert!(matches!(
635            pool.fetch(PageId::new(99)),
636            Err(PageError::ShortRead { .. })
637        ));
638        // A failed miss must not leak the frame it borrowed.
639        assert_eq!(pool.resident_len(), 0);
640        assert_eq!(pool.capacity(), 4);
641    }
642
643    proptest! {
644        #![proptest_config(ProptestConfig::with_cases(48))]
645
646        /// Through any sequence of fetches and dirtying writes against a pool
647        /// smaller than the working set, every page always reads back the last
648        /// value written to it — nothing is lost to eviction, nothing is stale.
649        #[test]
650        fn pool_never_loses_data(
651            ops in proptest::collection::vec((0u8..6, any::<u8>(), any::<bool>()), 1..200),
652        ) {
653            const N: u64 = 6;
654            let pool = pool(2);                       // 2 frames, 6 pages
655            let mut expected: HashMap<u64, u8> = HashMap::new();
656
657            // Seed every page so all fetches resolve.
658            for id in 0..N {
659                let guard = pool.new_page(PageId::new(id)).unwrap();
660                guard.write().payload_mut()[0] = 0;
661                let _ = expected.insert(id, 0);
662                drop(guard);
663            }
664            pool.flush_all().unwrap();
665
666            for (id, marker, write) in ops {
667                let id = id as u64 % N;
668                let guard = pool.fetch(PageId::new(id)).unwrap();
669                // Read must match the model.
670                prop_assert_eq!(guard.read().payload()[0], expected[&id]);
671                if write {
672                    guard.write().payload_mut()[0] = marker;
673                    let _ = expected.insert(id, marker);
674                }
675            }
676
677            // After a checkpoint, a cold reread of every page still matches.
678            pool.flush_all().unwrap();
679            for id in 0..N {
680                let guard = pool.fetch(PageId::new(id)).unwrap();
681                prop_assert_eq!(guard.read().payload()[0], expected[&id]);
682            }
683        }
684    }
685
686    // Helper hook used by the eviction test, kept here so the store stays
687    // private to the crate.
688    impl BufferPool<MemStore> {
689        fn store_contains(&self, id: u64) -> bool {
690            self.store.contains(id)
691        }
692    }
693}
694
695#[cfg(all(test, loom))]
696mod loom_tests {
697    use super::test_support::MemStore;
698    use super::*;
699    use crate::sync::Arc;
700
701    /// A pinned page is never evicted: while one thread holds a pin on the only
702    /// frame, another thread's attempt to admit a different page fails rather
703    /// than evicting the pinned one, under every interleaving.
704    #[test]
705    fn loom_pinned_page_never_evicted() {
706        loom::model(|| {
707            let pool = Arc::new(BufferPool::new(MemStore::new(4096), 1));
708            let held = pool.new_page(PageId::new(0)).unwrap();
709
710            let p = Arc::clone(&pool);
711            let other = loom::thread::spawn(move || p.new_page(PageId::new(1)).is_err());
712
713            // The pinned page stays resident no matter how the threads interleave.
714            assert!(pool.is_resident(PageId::new(0)));
715            let admit_failed = other.join().unwrap();
716            assert!(admit_failed);
717            assert_eq!(held.id(), PageId::new(0));
718            drop(held);
719        });
720    }
721
722    /// A dirty page is never lost: when an unpinned dirty page is evicted to
723    /// make room, it is flushed to the store first, under every interleaving.
724    #[test]
725    fn loom_dirty_page_flushed_on_eviction() {
726        loom::model(|| {
727            let store_pages = {
728                let pool = Arc::new(BufferPool::new(MemStore::new(4096), 1));
729                {
730                    let guard = pool.new_page(PageId::new(0)).unwrap();
731                    guard.write().payload_mut()[0] = 0x5A;
732                }
733
734                let p = Arc::clone(&pool);
735                let t = loom::thread::spawn(move || {
736                    // Admitting page 1 reuses the only frame, evicting page 0,
737                    // which is dirty and so must be flushed first.
738                    let _ = p.new_page(PageId::new(1)).unwrap();
739                });
740                t.join().unwrap();
741                pool.store_contains_loom(0)
742            };
743            assert!(store_pages, "evicted dirty page 0 was not flushed");
744        });
745    }
746
747    impl BufferPool<MemStore> {
748        fn store_contains_loom(&self, id: u64) -> bool {
749            self.store.contains(id)
750        }
751    }
752}