page_db/pool.rs
1//! [`BufferPool`]: a bounded in-memory cache of pages over a [`PageStore`].
2//!
3//! The pool keeps a fixed number of frames resident, each holding one page. A
4//! caller asks for a page by id and gets back a [`PageGuard`] that pins the
5//! frame — a pinned frame is never evicted — and dropping the guard unpins it.
6//! Writing through the guard marks the frame dirty; a dirty frame is always
7//! flushed to the store before its frame is reused. Those two rules — never
8//! evict a pinned page, never lose a dirty page without flushing — are the
9//! invariants the property tests and the loom model checks hold the pool to.
10//!
11//! Eviction is the clock (second-chance) algorithm: each frame carries a
12//! reference bit set when it is touched; the clock hand sweeps frames, clearing
13//! set bits and skipping pinned frames, and reuses the first unset, unpinned
14//! frame it finds. If every frame is pinned, admission fails with
15//! [`PageError::BufferPoolExhausted`] rather than evicting something it must
16//! not.
17//!
18//! For v0.3.0 the pool serializes its bookkeeping — and the miss-path store I/O
19//! — under a single mutex. That keeps the pin/evict/dirty logic small enough to
20//! verify exhaustively with loom; sharding the table to remove the single-lock
21//! bottleneck is a later, measured change behind the same API.
22
23use std::collections::HashMap;
24use std::ops::{Deref, DerefMut};
25use std::path::Path;
26
27use crate::error::{PageError, PageResult};
28use crate::file::PageFile;
29use crate::page::{Page, PageId, PageSize};
30use crate::store::PageStore;
31use crate::sync::{self, Arc, AtomicBool, AtomicU64, AtomicUsize, Mutex, Ordering, RwLock};
32use crate::sync::{RwLockReadGuard, RwLockWriteGuard};
33
34/// Sentinel id for a frame that holds no page yet.
35const NO_PAGE: u64 = u64::MAX;
36
37/// One cache frame: a reusable page buffer plus its residency bookkeeping.
38struct FrameInner {
39 /// The page bytes. Reused in place across the pages that occupy this frame.
40 page: RwLock<Page>,
41 /// The id of the page currently resident, or [`NO_PAGE`].
42 id: AtomicU64,
43 /// Outstanding pins. A frame with `pin > 0` is never evicted.
44 pin: AtomicUsize,
45 /// Set when the resident page has unflushed modifications.
46 dirty: AtomicBool,
47 /// The clock reference bit: set on access, cleared by the clock sweep.
48 referenced: AtomicBool,
49}
50
51impl FrameInner {
52 fn new(page: Page) -> Self {
53 Self {
54 page: RwLock::new(page),
55 id: AtomicU64::new(NO_PAGE),
56 pin: AtomicUsize::new(0),
57 dirty: AtomicBool::new(false),
58 referenced: AtomicBool::new(false),
59 }
60 }
61
62 #[inline]
63 fn resident_id(&self) -> PageId {
64 PageId::new(self.id.load(Ordering::Acquire))
65 }
66}
67
68/// The pool's mutable bookkeeping, guarded by one mutex.
69struct Core {
70 /// Resident page id to frame index.
71 map: HashMap<PageId, usize>,
72 /// Frame indices never yet filled, available without eviction.
73 free: Vec<usize>,
74 /// The clock hand.
75 hand: usize,
76}
77
78/// A bounded cache of pages over a [`PageStore`].
79///
80/// `BufferPool<S>` is generic over its backing store; the default is
81/// [`PageFile`], so `BufferPool` without a type parameter is a pool over a file
82/// of pages. The handle is `Send + Sync` and every method takes `&self`, so it
83/// is shared across threads behind an `Arc` with no outer lock.
84///
85/// # Examples
86///
87/// ```
88/// use page_db::{BufferPool, PageId, Lsn, DEFAULT_PAGE_SIZE};
89///
90/// # let dir = tempfile::tempdir().unwrap();
91/// # let path = dir.path().join("data.pages");
92/// // A pool of 128 frames over a 4 KiB-page file.
93/// let pool = BufferPool::open(&path, DEFAULT_PAGE_SIZE, 128)?;
94///
95/// // Create page 0, write to it (which marks it dirty), then release the pin.
96/// {
97/// let guard = pool.new_page(PageId::new(0))?;
98/// let mut page = guard.write();
99/// page.set_lsn(Lsn::new(1));
100/// page.payload_mut()[..5].copy_from_slice(b"hello");
101/// }
102///
103/// // Flush dirty pages to the file and make them durable.
104/// pool.flush_all()?;
105/// pool.sync()?;
106///
107/// // Fetch it back — served from cache if resident, else read from the file.
108/// let guard = pool.fetch(PageId::new(0))?;
109/// assert_eq!(&guard.read().payload()[..5], b"hello");
110/// # Ok::<(), page_db::PageError>(())
111/// ```
112pub struct BufferPool<S = PageFile> {
113 store: S,
114 frames: Vec<Arc<FrameInner>>,
115 core: Mutex<Core>,
116 capacity: usize,
117}
118
119impl BufferPool<PageFile> {
120 /// Open a page file and wrap it in a pool of `capacity` frames.
121 ///
122 /// A convenience over [`PageFile::open`] followed by [`BufferPool::new`].
123 ///
124 /// # Errors
125 ///
126 /// Returns [`PageError::Io`] if the file cannot be opened.
127 pub fn open<P: AsRef<Path>>(path: P, page_size: PageSize, capacity: usize) -> PageResult<Self> {
128 let file = PageFile::open(path, page_size)?;
129 Ok(Self::new(file, capacity))
130 }
131}
132
133impl<S: PageStore> BufferPool<S> {
134 /// Build a pool of `capacity` frames over `store`.
135 ///
136 /// `capacity` is the number of pages held resident; it is clamped up to at
137 /// least one. The frame buffers are allocated once, here — the pool does no
138 /// per-request allocation on the hot path.
139 #[must_use]
140 pub fn new(store: S, capacity: usize) -> Self {
141 let capacity = capacity.max(1);
142 let mut frames = Vec::with_capacity(capacity);
143 for _ in 0..capacity {
144 frames.push(Arc::new(FrameInner::new(store.allocate_page())));
145 }
146 let free = (0..capacity).collect();
147 Self {
148 store,
149 frames,
150 core: Mutex::new(Core {
151 map: HashMap::with_capacity(capacity),
152 free,
153 hand: 0,
154 }),
155 capacity,
156 }
157 }
158
159 /// The number of frames in the pool.
160 #[inline]
161 #[must_use]
162 pub fn capacity(&self) -> usize {
163 self.capacity
164 }
165
166 /// The number of pages currently resident.
167 #[must_use]
168 pub fn resident_len(&self) -> usize {
169 sync::lock(&self.core).map.len()
170 }
171
172 /// Whether page `id` is currently held in the pool.
173 #[must_use]
174 pub fn is_resident(&self, id: PageId) -> bool {
175 sync::lock(&self.core).map.contains_key(&id)
176 }
177
178 /// Fetch the page at `id`, pinning it and returning a guard.
179 ///
180 /// Served from cache if resident; otherwise a frame is found (a free one, or
181 /// an evicted victim, flushing it first if dirty) and the page is read from
182 /// the store into it. The returned [`PageGuard`] holds a pin for its
183 /// lifetime.
184 ///
185 /// # Errors
186 ///
187 /// - [`PageError::BufferPoolExhausted`] if every frame is pinned.
188 /// - Whatever the store's read returns (for a file:
189 /// [`PageError::ShortRead`] past end-of-file, or an integrity error).
190 /// - [`PageError::Io`] if flushing an evicted dirty victim fails.
191 pub fn fetch(&self, id: PageId) -> PageResult<PageGuard> {
192 let mut core = sync::lock(&self.core);
193
194 if let Some(&slot) = core.map.get(&id) {
195 let frame = self.frames[slot].clone();
196 let _ = frame.pin.fetch_add(1, Ordering::AcqRel);
197 frame.referenced.store(true, Ordering::Release);
198 return Ok(PageGuard { frame });
199 }
200
201 let slot = self.take_slot(&mut core)?;
202 {
203 let frame = &self.frames[slot];
204 let mut page = sync::write(&frame.page);
205 if let Err(err) = self.store.read_into(id, &mut page) {
206 drop(page);
207 core.free.push(slot);
208 return Err(err);
209 }
210 }
211 Ok(self.install(&mut core, slot, id, false))
212 }
213
214 /// Introduce a fresh, zeroed page at `id`, pinning it and returning a guard.
215 ///
216 /// The page is created in memory and marked dirty, so it is written to the
217 /// store on the next flush; no read is performed. If `id` is already
218 /// resident it is reset to a blank page. The caller chooses the id — the
219 /// free-list allocator that picks ids is a later release.
220 ///
221 /// # Errors
222 ///
223 /// - [`PageError::BufferPoolExhausted`] if every frame is pinned.
224 /// - [`PageError::Io`] if flushing an evicted dirty victim fails.
225 pub fn new_page(&self, id: PageId) -> PageResult<PageGuard> {
226 let mut core = sync::lock(&self.core);
227
228 if let Some(&slot) = core.map.get(&id) {
229 let frame = self.frames[slot].clone();
230 sync::write(&frame.page).reset();
231 let _ = frame.pin.fetch_add(1, Ordering::AcqRel);
232 frame.dirty.store(true, Ordering::Release);
233 frame.referenced.store(true, Ordering::Release);
234 return Ok(PageGuard { frame });
235 }
236
237 let slot = self.take_slot(&mut core)?;
238 sync::write(&self.frames[slot].page).reset();
239 Ok(self.install(&mut core, slot, id, true))
240 }
241
242 /// Flush page `id` to the store if it is resident and dirty.
243 ///
244 /// This places the bytes in the store; call [`sync`](BufferPool::sync) to
245 /// make them durable. Do not call this while holding a write guard to the
246 /// same page on the same thread — flushing takes the frame's lock.
247 ///
248 /// # Errors
249 ///
250 /// Whatever the store's write returns.
251 pub fn flush(&self, id: PageId) -> PageResult<()> {
252 let core = sync::lock(&self.core);
253 if let Some(&slot) = core.map.get(&id) {
254 self.flush_slot(slot, id)?;
255 }
256 Ok(())
257 }
258
259 /// Flush every dirty resident page to the store.
260 ///
261 /// # Errors
262 ///
263 /// Whatever the store's write returns. On error, some pages may already have
264 /// been flushed; the operation is safe to retry.
265 pub fn flush_all(&self) -> PageResult<()> {
266 let core = sync::lock(&self.core);
267 for (&id, &slot) in core.map.iter() {
268 self.flush_slot(slot, id)?;
269 }
270 Ok(())
271 }
272
273 /// Flush all dirty pages, then make the store durable.
274 ///
275 /// Equivalent to [`flush_all`](BufferPool::flush_all) followed by
276 /// [`sync`](BufferPool::sync) — the common checkpoint sequence.
277 ///
278 /// # Errors
279 ///
280 /// Whatever flushing or the store's sync returns.
281 pub fn checkpoint(&self) -> PageResult<()> {
282 self.flush_all()?;
283 self.sync()
284 }
285
286 /// Make the store durable (the pages already written to it).
287 ///
288 /// This does not flush dirty cached pages first; use
289 /// [`flush_all`](BufferPool::flush_all) or
290 /// [`checkpoint`](BufferPool::checkpoint) for that.
291 ///
292 /// # Errors
293 ///
294 /// Whatever the store's sync returns.
295 pub fn sync(&self) -> PageResult<()> {
296 self.store.sync()
297 }
298
299 /// Install a freshly loaded or created page into `slot`, returning a pinned
300 /// guard. Caller holds `core` and has already populated the frame's buffer.
301 fn install(&self, core: &mut Core, slot: usize, id: PageId, dirty: bool) -> PageGuard {
302 let frame = &self.frames[slot];
303 frame.id.store(id.get(), Ordering::Release);
304 frame.dirty.store(dirty, Ordering::Release);
305 frame.referenced.store(true, Ordering::Release);
306 frame.pin.store(1, Ordering::Release);
307 let _ = core.map.insert(id, slot);
308 PageGuard {
309 frame: self.frames[slot].clone(),
310 }
311 }
312
313 /// Flush the page in `slot` (resident id `id`) if dirty.
314 fn flush_slot(&self, slot: usize, id: PageId) -> PageResult<()> {
315 let frame = &self.frames[slot];
316 if frame.dirty.load(Ordering::Acquire) {
317 let mut page = sync::write(&frame.page);
318 self.store.write_page(id, &mut page)?;
319 frame.dirty.store(false, Ordering::Release);
320 }
321 Ok(())
322 }
323
324 /// Obtain a frame slot to fill: a free one, or an evicted victim (flushed
325 /// first if dirty). Caller holds `core`.
326 fn take_slot(&self, core: &mut Core) -> PageResult<usize> {
327 if let Some(slot) = core.free.pop() {
328 return Ok(slot);
329 }
330 let slot = match self.find_victim(core) {
331 Some(slot) => slot,
332 None => {
333 return Err(PageError::BufferPoolExhausted {
334 capacity: self.capacity,
335 });
336 }
337 };
338 let victim_id = self.frames[slot].resident_id();
339 self.flush_slot(slot, victim_id)?;
340 let _ = core.map.remove(&victim_id);
341 Ok(slot)
342 }
343
344 /// The clock sweep: return an unpinned frame to reuse, or `None` if all
345 /// frames are pinned. Caller holds `core`.
346 fn find_victim(&self, core: &mut Core) -> Option<usize> {
347 let n = self.capacity;
348 // Two full passes: the first clears reference bits, the second selects.
349 // If every frame stays pinned across both, the pool is exhausted.
350 let mut steps = 0;
351 while steps < 2 * n {
352 let slot = core.hand;
353 core.hand = (core.hand + 1) % n;
354 steps += 1;
355
356 let frame = &self.frames[slot];
357 if frame.pin.load(Ordering::Acquire) > 0 {
358 continue;
359 }
360 if frame.referenced.swap(false, Ordering::AcqRel) {
361 continue;
362 }
363 return Some(slot);
364 }
365 None
366 }
367}
368
369/// A pin on a cached page.
370///
371/// While a `PageGuard` is alive the page stays resident and unevictable. Read
372/// the page with [`read`](PageGuard::read) and write it with
373/// [`write`](PageGuard::write); taking a write guard marks the page dirty.
374/// Dropping the `PageGuard` releases the pin.
375pub struct PageGuard {
376 frame: Arc<FrameInner>,
377}
378
379impl PageGuard {
380 /// The id of the pinned page.
381 #[inline]
382 #[must_use]
383 pub fn id(&self) -> PageId {
384 self.frame.resident_id()
385 }
386
387 /// Whether the page has unflushed modifications.
388 #[inline]
389 #[must_use]
390 pub fn is_dirty(&self) -> bool {
391 self.frame.dirty.load(Ordering::Acquire)
392 }
393
394 /// Borrow the page for reading. Multiple readers of the same page proceed
395 /// concurrently.
396 #[inline]
397 #[must_use]
398 pub fn read(&self) -> PageRef<'_> {
399 PageRef {
400 guard: sync::read(&self.frame.page),
401 }
402 }
403
404 /// Borrow the page for writing, marking it dirty.
405 ///
406 /// The page is recorded dirty as soon as the write guard is taken, so it
407 /// will be flushed even if the actual mutation is conditional.
408 #[inline]
409 #[must_use]
410 pub fn write(&self) -> PageMut<'_> {
411 self.frame.dirty.store(true, Ordering::Release);
412 PageMut {
413 guard: sync::write(&self.frame.page),
414 }
415 }
416}
417
418impl Drop for PageGuard {
419 fn drop(&mut self) {
420 let _ = self.frame.pin.fetch_sub(1, Ordering::AcqRel);
421 }
422}
423
424impl std::fmt::Debug for PageGuard {
425 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
426 f.debug_struct("PageGuard")
427 .field("id", &self.id())
428 .field("dirty", &self.is_dirty())
429 .finish()
430 }
431}
432
433/// A shared read borrow of a pinned page. Dereferences to [`Page`].
434pub struct PageRef<'a> {
435 guard: RwLockReadGuard<'a, Page>,
436}
437
438impl Deref for PageRef<'_> {
439 type Target = Page;
440 #[inline]
441 fn deref(&self) -> &Page {
442 &self.guard
443 }
444}
445
446/// An exclusive write borrow of a pinned page. Dereferences to [`Page`].
447pub struct PageMut<'a> {
448 guard: RwLockWriteGuard<'a, Page>,
449}
450
451impl Deref for PageMut<'_> {
452 type Target = Page;
453 #[inline]
454 fn deref(&self) -> &Page {
455 &self.guard
456 }
457}
458
459impl DerefMut for PageMut<'_> {
460 #[inline]
461 fn deref_mut(&mut self) -> &mut Page {
462 &mut self.guard
463 }
464}
465
466#[cfg(all(test, not(loom)))]
467mod tests {
468 #![allow(clippy::unwrap_used, clippy::expect_used)]
469
470 use std::collections::HashMap;
471
472 use proptest::prelude::*;
473
474 use super::*;
475 use crate::page::Lsn;
476 use crate::test_store::MemStore;
477
478 const PS: usize = 4096;
479
480 fn pool(capacity: usize) -> BufferPool<MemStore> {
481 BufferPool::new(MemStore::new(PS), capacity)
482 }
483
484 #[test]
485 fn test_new_page_then_fetch_serves_from_cache() {
486 let pool = pool(8);
487 {
488 let guard = pool.new_page(PageId::new(0)).unwrap();
489 guard.write().payload_mut()[0] = 0x7A;
490 }
491 assert!(pool.is_resident(PageId::new(0)));
492 let guard = pool.fetch(PageId::new(0)).unwrap();
493 assert_eq!(guard.read().payload()[0], 0x7A);
494 }
495
496 #[test]
497 fn test_capacity_is_clamped_up_to_one() {
498 assert_eq!(pool(0).capacity(), 1);
499 }
500
501 #[test]
502 fn test_pinned_page_is_never_evicted() {
503 let pool = pool(1);
504 let _held = pool.new_page(PageId::new(0)).unwrap();
505 // The only frame is pinned, so admitting another page must fail rather
506 // than evict the pinned one.
507 assert!(matches!(
508 pool.new_page(PageId::new(1)),
509 Err(PageError::BufferPoolExhausted { capacity: 1 })
510 ));
511 assert!(pool.is_resident(PageId::new(0)));
512 }
513
514 #[test]
515 fn test_dirty_page_is_flushed_before_eviction() {
516 let pool = pool(1);
517 {
518 let guard = pool.new_page(PageId::new(0)).unwrap();
519 guard.write().set_lsn(Lsn::new(9));
520 } // page 0 is dirty and unpinned
521
522 // Force eviction of page 0 by reusing the only frame.
523 {
524 let _ = pool.new_page(PageId::new(1)).unwrap();
525 }
526 // Page 0 must have been written to the store on its way out.
527 assert!(pool.store_contains(0));
528 // And it reads back with the data it held.
529 let guard = pool.fetch(PageId::new(0)).unwrap();
530 assert_eq!(guard.read().lsn(), Lsn::new(9));
531 }
532
533 #[test]
534 fn test_clock_keeps_the_recently_used_page() {
535 let pool = pool(2);
536 let _ = pool.new_page(PageId::new(0)).unwrap();
537 let _ = pool.new_page(PageId::new(1)).unwrap();
538 pool.flush_all().unwrap();
539
540 // Touch 0 so its reference bit is set, then admit 2: the clock should
541 // evict 1 (untouched), not 0.
542 let _ = pool.fetch(PageId::new(0)).unwrap();
543 let _ = pool.new_page(PageId::new(2)).unwrap();
544
545 assert!(pool.is_resident(PageId::new(0)));
546 assert!(!pool.is_resident(PageId::new(1)));
547 assert!(pool.is_resident(PageId::new(2)));
548 }
549
550 #[test]
551 fn test_flush_clears_dirty() {
552 let pool = pool(4);
553 {
554 let guard = pool.new_page(PageId::new(0)).unwrap();
555 assert!(guard.is_dirty());
556 }
557 pool.flush(PageId::new(0)).unwrap();
558 let guard = pool.fetch(PageId::new(0)).unwrap();
559 assert!(!guard.is_dirty());
560 }
561
562 #[test]
563 fn test_fetch_missing_unwritten_page_errors() {
564 let pool = pool(4);
565 assert!(matches!(
566 pool.fetch(PageId::new(99)),
567 Err(PageError::ShortRead { .. })
568 ));
569 // A failed miss must not leak the frame it borrowed.
570 assert_eq!(pool.resident_len(), 0);
571 assert_eq!(pool.capacity(), 4);
572 }
573
574 proptest! {
575 #![proptest_config(ProptestConfig::with_cases(48))]
576
577 /// Through any sequence of fetches and dirtying writes against a pool
578 /// smaller than the working set, every page always reads back the last
579 /// value written to it — nothing is lost to eviction, nothing is stale.
580 #[test]
581 fn pool_never_loses_data(
582 ops in proptest::collection::vec((0u8..6, any::<u8>(), any::<bool>()), 1..200),
583 ) {
584 const N: u64 = 6;
585 let pool = pool(2); // 2 frames, 6 pages
586 let mut expected: HashMap<u64, u8> = HashMap::new();
587
588 // Seed every page so all fetches resolve.
589 for id in 0..N {
590 let guard = pool.new_page(PageId::new(id)).unwrap();
591 guard.write().payload_mut()[0] = 0;
592 let _ = expected.insert(id, 0);
593 drop(guard);
594 }
595 pool.flush_all().unwrap();
596
597 for (id, marker, write) in ops {
598 let id = id as u64 % N;
599 let guard = pool.fetch(PageId::new(id)).unwrap();
600 // Read must match the model.
601 prop_assert_eq!(guard.read().payload()[0], expected[&id]);
602 if write {
603 guard.write().payload_mut()[0] = marker;
604 let _ = expected.insert(id, marker);
605 }
606 }
607
608 // After a checkpoint, a cold reread of every page still matches.
609 pool.flush_all().unwrap();
610 for id in 0..N {
611 let guard = pool.fetch(PageId::new(id)).unwrap();
612 prop_assert_eq!(guard.read().payload()[0], expected[&id]);
613 }
614 }
615 }
616
617 // Helper hook used by the eviction test, kept here so the store stays
618 // private to the crate.
619 impl BufferPool<MemStore> {
620 fn store_contains(&self, id: u64) -> bool {
621 self.store.contains(id)
622 }
623 }
624}
625
626#[cfg(all(test, loom))]
627mod loom_tests {
628 use super::*;
629 use crate::sync::Arc;
630 use crate::test_store::MemStore;
631
632 /// A pinned page is never evicted: while one thread holds a pin on the only
633 /// frame, another thread's attempt to admit a different page fails rather
634 /// than evicting the pinned one, under every interleaving.
635 #[test]
636 fn loom_pinned_page_never_evicted() {
637 loom::model(|| {
638 let pool = Arc::new(BufferPool::new(MemStore::new(4096), 1));
639 let held = pool.new_page(PageId::new(0)).unwrap();
640
641 let p = Arc::clone(&pool);
642 let other = loom::thread::spawn(move || p.new_page(PageId::new(1)).is_err());
643
644 // The pinned page stays resident no matter how the threads interleave.
645 assert!(pool.is_resident(PageId::new(0)));
646 let admit_failed = other.join().unwrap();
647 assert!(admit_failed);
648 assert_eq!(held.id(), PageId::new(0));
649 drop(held);
650 });
651 }
652
653 /// A dirty page is never lost: when an unpinned dirty page is evicted to
654 /// make room, it is flushed to the store first, under every interleaving.
655 #[test]
656 fn loom_dirty_page_flushed_on_eviction() {
657 loom::model(|| {
658 let store_pages = {
659 let pool = Arc::new(BufferPool::new(MemStore::new(4096), 1));
660 {
661 let guard = pool.new_page(PageId::new(0)).unwrap();
662 guard.write().payload_mut()[0] = 0x5A;
663 }
664
665 let p = Arc::clone(&pool);
666 let t = loom::thread::spawn(move || {
667 // Admitting page 1 reuses the only frame, evicting page 0,
668 // which is dirty and so must be flushed first.
669 let _ = p.new_page(PageId::new(1)).unwrap();
670 });
671 t.join().unwrap();
672 pool.store_contains_loom(0)
673 };
674 assert!(store_pages, "evicted dirty page 0 was not flushed");
675 });
676 }
677
678 impl BufferPool<MemStore> {
679 fn store_contains_loom(&self, id: u64) -> bool {
680 self.store.contains(id)
681 }
682 }
683}