Skip to main content

page_db/
alloc.rs

1//! [`PageAllocator`]: hand out and reclaim page ids over a [`PageStore`].
2//!
3//! The allocator turns a flat array of page slots into a managed space of ids:
4//! [`allocate`](PageAllocator::allocate) returns an unused id and
5//! [`free`](PageAllocator::free) returns one for reuse. Reuse is tracked with a
6//! free-list, and a high-water mark counts ids never yet handed out. Both
7//! `allocate` and `free` are pure in-memory operations on the hot path — no I/O.
8//!
9//! On disk the free-list is an intrusive chain: each free page stores the id of
10//! the next free page in its first eight bytes, with the chain head, the
11//! high-water mark, and the free count in a **superblock** at page 0, which the
12//! allocator reserves; ids it hands out start at 1.
13//!
14//! ## Durability
15//!
16//! `allocate` and `free` touch only memory. The on-disk state — the superblock
17//! and the free-list chain — is written by [`sync`](PageAllocator::sync). Call
18//! it as part of the same checkpoint that makes the pages it handed out durable:
19//! the high-water mark must reach stable storage no later than the data written
20//! to the ids beyond it, or a crash could re-hand-out an id whose page is
21//! already durable. As everywhere in page-db, the write-ahead log above is the
22//! authority on crash recovery; the superblock is a checkpoint of allocator
23//! state.
24
25use std::collections::HashSet;
26use std::path::Path;
27
28use crate::error::{PageError, PageResult};
29use crate::file::PageFile;
30use crate::page::{Page, PageId, PageSize};
31use crate::store::PageStore;
32use crate::sync::{self, Mutex};
33
34/// Sentinel for "no page" — the empty free-list and end-of-chain marker.
35const NO_PAGE: u64 = u64::MAX;
36
37/// Superblock magic: `b"PGSB"`.
38const SB_MAGIC: u32 = u32::from_le_bytes([b'P', b'G', b'S', b'B']);
39const SB_VERSION: u16 = 1;
40
41// Superblock field offsets within the page payload.
42const SB_MAGIC_OFF: usize = 0;
43const SB_VERSION_OFF: usize = 4;
44const SB_HEAD_OFF: usize = 8;
45const SB_NEXT_OFF: usize = 16;
46const SB_FREECOUNT_OFF: usize = 24;
47
48// Free-page link offset within the payload.
49const LINK_NEXT_OFF: usize = 0;
50
51/// The allocator's mutable state, behind one mutex.
52struct AllocState {
53    /// Ids available for reuse. `allocate` pops the back; `free` pushes it.
54    free_list: Vec<u64>,
55    /// The next id to hand out when the free-list is empty (high-water mark).
56    next_new: u64,
57    /// A reusable page buffer for chain and superblock I/O at sync time, so the
58    /// allocator does no per-call allocation.
59    scratch: Page,
60}
61
62/// A page-id allocator over a [`PageStore`].
63///
64/// `PageAllocator<S>` is generic over its backing store; the default is
65/// [`PageFile`], so `PageAllocator` with no type parameter manages ids in a file
66/// of pages. It is `Send + Sync` and every method takes `&self`.
67///
68/// The allocator owns page 0 (the superblock); the ids it returns are `>= 1`.
69/// Pair it with a [`BufferPool`](crate::BufferPool) over the same store — the
70/// allocator picks ids, the pool caches the pages at those ids — but do not use
71/// page 0 for data, and free a page only once it is no longer in use.
72///
73/// # Examples
74///
75/// ```
76/// use page_db::{PageAllocator, DEFAULT_PAGE_SIZE};
77///
78/// # let dir = tempfile::tempdir().unwrap();
79/// # let path = dir.path().join("data.pages");
80/// let alloc = PageAllocator::open(&path, DEFAULT_PAGE_SIZE)?;
81///
82/// let a = alloc.allocate()?;          // 1
83/// let b = alloc.allocate()?;          // 2
84/// alloc.free(a)?;                     // a goes on the free-list
85/// let c = alloc.allocate()?;          // reuses a
86/// assert_eq!(c, a);
87/// assert_ne!(b, c);
88/// alloc.sync()?;                      // persist the superblock durably
89/// # Ok::<(), page_db::PageError>(())
90/// ```
91pub struct PageAllocator<S = PageFile> {
92    store: S,
93    state: Mutex<AllocState>,
94}
95
96impl PageAllocator<PageFile> {
97    /// Open a page file and an allocator over it.
98    ///
99    /// Reads the superblock if the file already has one, or initializes a fresh
100    /// one. A convenience over [`PageFile::open`] and [`PageAllocator::new`].
101    ///
102    /// # Errors
103    ///
104    /// - [`PageError::Io`] if the file cannot be opened.
105    /// - [`PageError::InvalidSuperblock`] if page 0 exists but is not a
106    ///   superblock this allocator wrote.
107    pub fn open<P: AsRef<Path>>(path: P, page_size: PageSize) -> PageResult<Self> {
108        let file = PageFile::open(path, page_size)?;
109        Self::new(file)
110    }
111}
112
113impl<S: PageStore> PageAllocator<S> {
114    /// Build an allocator over `store`, reading or initializing its superblock.
115    ///
116    /// # Errors
117    ///
118    /// - [`PageError::InvalidSuperblock`] if page 0 exists but is not a valid
119    ///   allocator superblock.
120    /// - Whatever the store's read or write returns.
121    pub fn new(store: S) -> PageResult<Self> {
122        let mut scratch = store.allocate_page();
123
124        let (free_list, next_new, fresh) = match store.read_into(PageId::new(0), &mut scratch) {
125            Ok(()) => {
126                let payload = scratch.payload();
127                if read_u32(payload, SB_MAGIC_OFF) != SB_MAGIC
128                    || read_u16(payload, SB_VERSION_OFF) != SB_VERSION
129                {
130                    return Err(PageError::InvalidSuperblock);
131                }
132                let head = read_u64(payload, SB_HEAD_OFF);
133                let next_new = read_u64(payload, SB_NEXT_OFF);
134                let free_count = read_u64(payload, SB_FREECOUNT_OFF);
135                if next_new < 1 {
136                    return Err(PageError::InvalidSuperblock);
137                }
138                let free_list = walk_free_chain(&store, &mut scratch, head, next_new, free_count)?;
139                (free_list, next_new, false)
140            }
141            // No page 0 yet: a fresh file. Reserve page 0 for the superblock and
142            // start handing out ids at 1.
143            Err(PageError::ShortRead { .. }) => (Vec::new(), 1, true),
144            Err(err) => return Err(err),
145        };
146
147        let allocator = Self {
148            store,
149            state: Mutex::new(AllocState {
150                free_list,
151                next_new,
152                scratch,
153            }),
154        };
155
156        if fresh {
157            let mut state = sync::lock(&allocator.state);
158            allocator.persist_superblock(&mut state)?;
159        }
160
161        Ok(allocator)
162    }
163
164    /// Allocate an unused page id.
165    ///
166    /// Reuses a freed id if the free-list is non-empty, otherwise extends the
167    /// high-water mark with a never-used id. This is an in-memory operation; the
168    /// returned id's page has no defined contents until written.
169    ///
170    /// # Errors
171    ///
172    /// [`PageError::InvalidPageId`] only if the id space is exhausted (reachable
173    /// at 2^64 pages).
174    pub fn allocate(&self) -> PageResult<PageId> {
175        let mut state = sync::lock(&self.state);
176
177        if let Some(id) = state.free_list.pop() {
178            return Ok(PageId::new(id));
179        }
180
181        let id = state.next_new;
182        match id.checked_add(1) {
183            Some(next) if next != NO_PAGE => {
184                state.next_new = next;
185                Ok(PageId::new(id))
186            }
187            _ => Err(PageError::InvalidPageId { page_id: id }),
188        }
189    }
190
191    /// Return a page id to the free-list for reuse.
192    ///
193    /// An in-memory push; the free-list is written to the store at the next
194    /// [`sync`](PageAllocator::sync). Freeing an id that is already free is a
195    /// caller error and corrupts the free-list, exactly as with a system
196    /// allocator.
197    ///
198    /// # Errors
199    ///
200    /// [`PageError::InvalidPageId`] if `id` is the reserved superblock (0) or was
201    /// never allocated (at or beyond the high-water mark).
202    pub fn free(&self, id: PageId) -> PageResult<()> {
203        let raw = id.get();
204        if raw == 0 {
205            return Err(PageError::InvalidPageId { page_id: 0 });
206        }
207        let mut state = sync::lock(&self.state);
208        if raw >= state.next_new {
209            return Err(PageError::InvalidPageId { page_id: raw });
210        }
211        state.free_list.push(raw);
212        Ok(())
213    }
214
215    /// The next id that would be newly allocated — one past the highest id ever
216    /// handed out. Pages `1..high_water()` have been allocated at some point.
217    #[must_use]
218    pub fn high_water(&self) -> u64 {
219        sync::lock(&self.state).next_new
220    }
221
222    /// The number of pages currently on the free-list, available for reuse
223    /// without extending the file.
224    #[must_use]
225    pub fn free_count(&self) -> u64 {
226        sync::lock(&self.state).free_list.len() as u64
227    }
228
229    /// Persist the superblock and free-list chain, then make the store durable.
230    ///
231    /// Writes the free-list onto its pages as an intrusive chain, records the
232    /// head and high-water mark in page 0, then syncs the store. Call it as part
233    /// of the checkpoint that makes the allocated pages durable (see the module
234    /// docs on ordering). Cost is proportional to the free-list length.
235    ///
236    /// # Errors
237    ///
238    /// Whatever the store's writes or sync return.
239    pub fn sync(&self) -> PageResult<()> {
240        {
241            let mut state = sync::lock(&self.state);
242            self.persist_superblock(&mut state)?;
243        }
244        self.store.sync()
245    }
246
247    /// Write the free-list chain and the superblock at page 0. Caller holds the
248    /// lock.
249    fn persist_superblock(&self, state: &mut AllocState) -> PageResult<()> {
250        // Write each free page's link so the on-disk chain matches `free_list`:
251        // free_list[0] -> free_list[1] -> ... -> free_list[n-1] -> NO_PAGE,
252        // with the superblock head pointing at free_list[0].
253        let len = state.free_list.len();
254        for i in 0..len {
255            let id = state.free_list[i];
256            let next = if i + 1 < len {
257                state.free_list[i + 1]
258            } else {
259                NO_PAGE
260            };
261            state.scratch.reset();
262            write_u64(state.scratch.payload_mut(), LINK_NEXT_OFF, next);
263            self.store.write_page(PageId::new(id), &mut state.scratch)?;
264        }
265        let head = state.free_list.first().copied().unwrap_or(NO_PAGE);
266
267        state.scratch.reset();
268        let payload = state.scratch.payload_mut();
269        write_u32(payload, SB_MAGIC_OFF, SB_MAGIC);
270        write_u16(payload, SB_VERSION_OFF, SB_VERSION);
271        write_u64(payload, SB_HEAD_OFF, head);
272        write_u64(payload, SB_NEXT_OFF, state.next_new);
273        write_u64(payload, SB_FREECOUNT_OFF, len as u64);
274        self.store.write_page(PageId::new(0), &mut state.scratch)
275    }
276}
277
278/// Follow the on-disk free-list chain from `head`, returning the ids in the same
279/// order [`persist_superblock`](PageAllocator::persist_superblock) wrote them.
280///
281/// This walks an untrusted, possibly corrupt superblock, so it is bounded
282/// against a malicious chain: every link must be a real allocated id
283/// (`1..next_new`), no id may repeat (a cycle is rejected, not followed), the
284/// walk never collects more than `free_count` ids, and the final length must
285/// match `free_count` exactly. Any deviation is [`PageError::InvalidSuperblock`].
286fn walk_free_chain<S: PageStore>(
287    store: &S,
288    scratch: &mut Page,
289    head: u64,
290    next_new: u64,
291    free_count: u64,
292) -> PageResult<Vec<u64>> {
293    let mut ids = Vec::new();
294    let mut seen = HashSet::new();
295    let mut cur = head;
296
297    while cur != NO_PAGE {
298        // Never collect more links than the superblock claims, so a corrupt
299        // count can never drive an unbounded walk.
300        if ids.len() as u64 >= free_count {
301            return Err(PageError::InvalidSuperblock);
302        }
303        // Every free id must be a real allocated page (page 0 is reserved).
304        if cur == 0 || cur >= next_new {
305            return Err(PageError::InvalidSuperblock);
306        }
307        // A repeat means the chain cycles; reject rather than loop. `seen` grows
308        // only to the number of distinct existing pages, so it cannot run away.
309        if !seen.insert(cur) {
310            return Err(PageError::InvalidSuperblock);
311        }
312        ids.push(cur);
313        store.read_into(PageId::new(cur), scratch)?;
314        cur = read_u64(scratch.payload(), LINK_NEXT_OFF);
315    }
316
317    // The chain must end exactly at the recorded length.
318    if ids.len() as u64 != free_count {
319        return Err(PageError::InvalidSuperblock);
320    }
321    Ok(ids)
322}
323
324#[inline]
325fn read_u16(bytes: &[u8], off: usize) -> u16 {
326    u16::from_le_bytes([bytes[off], bytes[off + 1]])
327}
328
329#[inline]
330fn read_u32(bytes: &[u8], off: usize) -> u32 {
331    u32::from_le_bytes([bytes[off], bytes[off + 1], bytes[off + 2], bytes[off + 3]])
332}
333
334#[inline]
335fn read_u64(bytes: &[u8], off: usize) -> u64 {
336    u64::from_le_bytes([
337        bytes[off],
338        bytes[off + 1],
339        bytes[off + 2],
340        bytes[off + 3],
341        bytes[off + 4],
342        bytes[off + 5],
343        bytes[off + 6],
344        bytes[off + 7],
345    ])
346}
347
348#[inline]
349fn write_u16(bytes: &mut [u8], off: usize, value: u16) {
350    bytes[off..off + 2].copy_from_slice(&value.to_le_bytes());
351}
352
353#[inline]
354fn write_u32(bytes: &mut [u8], off: usize, value: u32) {
355    bytes[off..off + 4].copy_from_slice(&value.to_le_bytes());
356}
357
358#[inline]
359fn write_u64(bytes: &mut [u8], off: usize, value: u64) {
360    bytes[off..off + 8].copy_from_slice(&value.to_le_bytes());
361}
362
363#[cfg(all(test, not(loom)))]
364mod tests {
365    #![allow(clippy::unwrap_used, clippy::expect_used)]
366
367    use std::collections::HashSet;
368
369    use proptest::prelude::*;
370
371    use super::*;
372    use crate::test_store::MemStore;
373
374    const PS: usize = 4096;
375
376    fn allocator() -> PageAllocator<MemStore> {
377        PageAllocator::new(MemStore::new(PS)).unwrap()
378    }
379
380    #[test]
381    fn test_allocate_starts_at_one_and_increments() {
382        let alloc = allocator();
383        assert_eq!(alloc.allocate().unwrap(), PageId::new(1));
384        assert_eq!(alloc.allocate().unwrap(), PageId::new(2));
385        assert_eq!(alloc.allocate().unwrap(), PageId::new(3));
386        assert_eq!(alloc.high_water(), 4);
387    }
388
389    #[test]
390    fn test_free_then_allocate_reuses_id() {
391        let alloc = allocator();
392        let a = alloc.allocate().unwrap();
393        let b = alloc.allocate().unwrap();
394        alloc.free(a).unwrap();
395        assert_eq!(alloc.free_count(), 1);
396        let c = alloc.allocate().unwrap();
397        assert_eq!(c, a);
398        assert_ne!(c, b);
399        assert_eq!(alloc.free_count(), 0);
400    }
401
402    #[test]
403    fn test_free_list_is_lifo() {
404        let alloc = allocator();
405        let ids: Vec<_> = (0..4).map(|_| alloc.allocate().unwrap()).collect();
406        for &id in &ids {
407            alloc.free(id).unwrap();
408        }
409        // Freed 1,2,3,4; the list pops most-recently-freed first.
410        let mut reused = Vec::new();
411        for _ in 0..4 {
412            reused.push(alloc.allocate().unwrap());
413        }
414        let expected: Vec<_> = ids.into_iter().rev().collect();
415        assert_eq!(reused, expected);
416    }
417
418    #[test]
419    fn test_free_rejects_superblock_and_unallocated() {
420        let alloc = allocator();
421        let _ = alloc.allocate().unwrap(); // high_water now 2
422        assert!(matches!(
423            alloc.free(PageId::new(0)),
424            Err(PageError::InvalidPageId { page_id: 0 })
425        ));
426        assert!(matches!(
427            alloc.free(PageId::new(5)),
428            Err(PageError::InvalidPageId { page_id: 5 })
429        ));
430    }
431
432    #[test]
433    fn test_state_survives_reopen() {
434        let store = MemStore::new(PS);
435        {
436            let alloc = PageAllocator::new(store).unwrap();
437            let _ = alloc.allocate().unwrap(); // 1
438            let b = alloc.allocate().unwrap(); // 2
439            let _ = alloc.allocate().unwrap(); // 3
440            alloc.free(b).unwrap(); // 2 freed
441            alloc.sync().unwrap(); // persist superblock
442            // Recover the store from the allocator to reopen over it.
443            let alloc2 = PageAllocator::new(alloc.into_store()).unwrap();
444            assert_eq!(alloc2.high_water(), 4);
445            assert_eq!(alloc2.free_count(), 1);
446            // Next allocate reuses the freed id 2.
447            assert_eq!(alloc2.allocate().unwrap(), PageId::new(2));
448        }
449    }
450
451    #[test]
452    fn test_new_rejects_non_superblock_page_zero() {
453        let store = MemStore::new(PS);
454        // Write a non-superblock page at slot 0.
455        {
456            let mut page = store.allocate_page();
457            page.payload_mut()[0] = 0xFF;
458            store.write_page(PageId::new(0), &mut page).unwrap();
459        }
460        assert!(matches!(
461            PageAllocator::new(store),
462            Err(PageError::InvalidSuperblock)
463        ));
464    }
465
466    /// Write a raw superblock (no consistency checks) to drive the corrupt-input
467    /// rejection tests.
468    fn write_superblock(store: &MemStore, head: u64, next_new: u64, free_count: u64) {
469        let mut page = store.allocate_page();
470        let payload = page.payload_mut();
471        write_u32(payload, SB_MAGIC_OFF, SB_MAGIC);
472        write_u16(payload, SB_VERSION_OFF, SB_VERSION);
473        write_u64(payload, SB_HEAD_OFF, head);
474        write_u64(payload, SB_NEXT_OFF, next_new);
475        write_u64(payload, SB_FREECOUNT_OFF, free_count);
476        store.write_page(PageId::new(0), &mut page).unwrap();
477    }
478
479    fn write_link(store: &MemStore, id: u64, next: u64) {
480        let mut page = store.allocate_page();
481        write_u64(page.payload_mut(), LINK_NEXT_OFF, next);
482        store.write_page(PageId::new(id), &mut page).unwrap();
483    }
484
485    #[test]
486    fn test_new_rejects_cycled_free_chain() {
487        let store = MemStore::new(PS);
488        write_superblock(&store, 1, 3, 10); // claims 10 free in id space 1..3
489        write_link(&store, 1, 2);
490        write_link(&store, 2, 1); // 1 -> 2 -> 1 cycles
491        assert!(matches!(
492            PageAllocator::new(store),
493            Err(PageError::InvalidSuperblock)
494        ));
495    }
496
497    #[test]
498    fn test_new_rejects_out_of_range_link() {
499        let store = MemStore::new(PS);
500        write_superblock(&store, 5, 3, 1); // head 5 is past the high-water mark
501        assert!(matches!(
502            PageAllocator::new(store),
503            Err(PageError::InvalidSuperblock)
504        ));
505    }
506
507    #[test]
508    fn test_new_rejects_free_count_mismatch() {
509        let store = MemStore::new(PS);
510        write_superblock(&store, 1, 3, 5); // claims 5 free
511        write_link(&store, 1, NO_PAGE); // chain is only 1 long
512        assert!(matches!(
513            PageAllocator::new(store),
514            Err(PageError::InvalidSuperblock)
515        ));
516    }
517
518    // Test-only accessor to reopen over the same store.
519    impl PageAllocator<MemStore> {
520        fn into_store(self) -> MemStore {
521            self.store
522        }
523    }
524
525    proptest! {
526        #![proptest_config(ProptestConfig::with_cases(48))]
527
528        /// Allocated ids are always unique while live: through any sequence of
529        /// allocates and frees, two outstanding ids never coincide, and a freed
530        /// id is only ever handed back out after being freed.
531        #[test]
532        fn allocate_free_never_double_allocates(
533            ops in proptest::collection::vec(any::<bool>(), 1..200),
534        ) {
535            let alloc = allocator();
536            let mut live: HashSet<u64> = HashSet::new();
537            let mut freed_pool: Vec<u64> = Vec::new();
538
539            for want_alloc in ops {
540                if want_alloc || live.is_empty() {
541                    let id = alloc.allocate().unwrap().get();
542                    // The id must not already be live.
543                    prop_assert!(!live.contains(&id), "id {} double-allocated", id);
544                    prop_assert!(id >= 1, "id 0 is reserved");
545                    let _ = live.insert(id);
546                    freed_pool.retain(|&f| f != id);
547                } else {
548                    // Free an arbitrary live id (the first one).
549                    let victim = *live.iter().next().unwrap();
550                    let _ = live.remove(&victim);
551                    alloc.free(PageId::new(victim)).unwrap();
552                    freed_pool.push(victim);
553                }
554                prop_assert_eq!(alloc.free_count(), freed_pool.len() as u64);
555            }
556        }
557    }
558}
559
560#[cfg(all(test, loom))]
561mod loom_tests {
562    use super::*;
563    use crate::sync::Arc;
564    use crate::test_store::MemStore;
565
566    /// Concurrent allocations never collide: two threads each allocate, and the
567    /// two ids they get are always distinct, under every interleaving.
568    #[test]
569    fn loom_concurrent_allocate_is_unique() {
570        loom::model(|| {
571            let alloc = Arc::new(PageAllocator::new(MemStore::new(4096)).unwrap());
572
573            let a = Arc::clone(&alloc);
574            let t = loom::thread::spawn(move || a.allocate().unwrap());
575
576            let first = alloc.allocate().unwrap();
577            let second = t.join().unwrap();
578            assert_ne!(first, second);
579        });
580    }
581}