Skip to main content

page_db/
alloc.rs

1//! [`PageAllocator`]: hand out and reclaim page ids over a [`PageStore`].
2//!
3//! The allocator turns a flat array of page slots into a managed space of ids:
4//! [`allocate`](PageAllocator::allocate) returns an unused id and
5//! [`free`](PageAllocator::free) returns one for reuse. Reuse is tracked with a
6//! free-list, and a high-water mark counts ids never yet handed out. Both
7//! `allocate` and `free` are pure in-memory operations on the hot path — no I/O.
8//!
9//! On disk the free-list is an intrusive chain: each free page stores the id of
10//! the next free page in its first eight bytes, with the chain head, the
11//! high-water mark, and the free count in a **superblock** at page 0, which the
12//! allocator reserves; ids it hands out start at 1.
13//!
14//! ## Durability
15//!
16//! `allocate` and `free` touch only memory. The on-disk state — the superblock
17//! and the free-list chain — is written by [`sync`](PageAllocator::sync). Call
18//! it as part of the same checkpoint that makes the pages it handed out durable:
19//! the high-water mark must reach stable storage no later than the data written
20//! to the ids beyond it, or a crash could re-hand-out an id whose page is
21//! already durable. As everywhere in page-db, the write-ahead log above is the
22//! authority on crash recovery; the superblock is a checkpoint of allocator
23//! state.
24
25use std::path::Path;
26
27use crate::error::{PageError, PageResult};
28use crate::file::PageFile;
29use crate::page::{Page, PageId, PageSize};
30use crate::store::PageStore;
31use crate::sync::{self, Mutex};
32
33/// Sentinel for "no page" — the empty free-list and end-of-chain marker.
34const NO_PAGE: u64 = u64::MAX;
35
36/// Superblock magic: `b"PGSB"`.
37const SB_MAGIC: u32 = u32::from_le_bytes([b'P', b'G', b'S', b'B']);
38const SB_VERSION: u16 = 1;
39
40// Superblock field offsets within the page payload.
41const SB_MAGIC_OFF: usize = 0;
42const SB_VERSION_OFF: usize = 4;
43const SB_HEAD_OFF: usize = 8;
44const SB_NEXT_OFF: usize = 16;
45const SB_FREECOUNT_OFF: usize = 24;
46
47// Free-page link offset within the payload.
48const LINK_NEXT_OFF: usize = 0;
49
50/// The allocator's mutable state, behind one mutex.
51struct AllocState {
52    /// Ids available for reuse. `allocate` pops the back; `free` pushes it.
53    free_list: Vec<u64>,
54    /// The next id to hand out when the free-list is empty (high-water mark).
55    next_new: u64,
56    /// A reusable page buffer for chain and superblock I/O at sync time, so the
57    /// allocator does no per-call allocation.
58    scratch: Page,
59}
60
61/// A page-id allocator over a [`PageStore`].
62///
63/// `PageAllocator<S>` is generic over its backing store; the default is
64/// [`PageFile`], so `PageAllocator` with no type parameter manages ids in a file
65/// of pages. It is `Send + Sync` and every method takes `&self`.
66///
67/// The allocator owns page 0 (the superblock); the ids it returns are `>= 1`.
68/// Pair it with a [`BufferPool`](crate::BufferPool) over the same store — the
69/// allocator picks ids, the pool caches the pages at those ids — but do not use
70/// page 0 for data, and free a page only once it is no longer in use.
71///
72/// # Examples
73///
74/// ```
75/// use page_db::{PageAllocator, DEFAULT_PAGE_SIZE};
76///
77/// # let dir = tempfile::tempdir().unwrap();
78/// # let path = dir.path().join("data.pages");
79/// let alloc = PageAllocator::open(&path, DEFAULT_PAGE_SIZE)?;
80///
81/// let a = alloc.allocate()?;          // 1
82/// let b = alloc.allocate()?;          // 2
83/// alloc.free(a)?;                     // a goes on the free-list
84/// let c = alloc.allocate()?;          // reuses a
85/// assert_eq!(c, a);
86/// assert_ne!(b, c);
87/// alloc.sync()?;                      // persist the superblock durably
88/// # Ok::<(), page_db::PageError>(())
89/// ```
90pub struct PageAllocator<S = PageFile> {
91    store: S,
92    state: Mutex<AllocState>,
93}
94
95impl PageAllocator<PageFile> {
96    /// Open a page file and an allocator over it.
97    ///
98    /// Reads the superblock if the file already has one, or initializes a fresh
99    /// one. A convenience over [`PageFile::open`] and [`PageAllocator::new`].
100    ///
101    /// # Errors
102    ///
103    /// - [`PageError::Io`] if the file cannot be opened.
104    /// - [`PageError::InvalidSuperblock`] if page 0 exists but is not a
105    ///   superblock this allocator wrote.
106    pub fn open<P: AsRef<Path>>(path: P, page_size: PageSize) -> PageResult<Self> {
107        let file = PageFile::open(path, page_size)?;
108        Self::new(file)
109    }
110}
111
112impl<S: PageStore> PageAllocator<S> {
113    /// Build an allocator over `store`, reading or initializing its superblock.
114    ///
115    /// # Errors
116    ///
117    /// - [`PageError::InvalidSuperblock`] if page 0 exists but is not a valid
118    ///   allocator superblock.
119    /// - Whatever the store's read or write returns.
120    pub fn new(store: S) -> PageResult<Self> {
121        let mut scratch = store.allocate_page();
122
123        let (free_list, next_new, fresh) = match store.read_into(PageId::new(0), &mut scratch) {
124            Ok(()) => {
125                let payload = scratch.payload();
126                if read_u32(payload, SB_MAGIC_OFF) != SB_MAGIC
127                    || read_u16(payload, SB_VERSION_OFF) != SB_VERSION
128                {
129                    return Err(PageError::InvalidSuperblock);
130                }
131                let head = read_u64(payload, SB_HEAD_OFF);
132                let next_new = read_u64(payload, SB_NEXT_OFF);
133                let free_count = read_u64(payload, SB_FREECOUNT_OFF);
134                let free_list = walk_free_chain(&store, &mut scratch, head, free_count)?;
135                (free_list, next_new, false)
136            }
137            // No page 0 yet: a fresh file. Reserve page 0 for the superblock and
138            // start handing out ids at 1.
139            Err(PageError::ShortRead { .. }) => (Vec::new(), 1, true),
140            Err(err) => return Err(err),
141        };
142
143        let allocator = Self {
144            store,
145            state: Mutex::new(AllocState {
146                free_list,
147                next_new,
148                scratch,
149            }),
150        };
151
152        if fresh {
153            let mut state = sync::lock(&allocator.state);
154            allocator.persist_superblock(&mut state)?;
155        }
156
157        Ok(allocator)
158    }
159
160    /// Allocate an unused page id.
161    ///
162    /// Reuses a freed id if the free-list is non-empty, otherwise extends the
163    /// high-water mark with a never-used id. This is an in-memory operation; the
164    /// returned id's page has no defined contents until written.
165    ///
166    /// # Errors
167    ///
168    /// [`PageError::InvalidPageId`] only if the id space is exhausted (reachable
169    /// at 2^64 pages).
170    pub fn allocate(&self) -> PageResult<PageId> {
171        let mut state = sync::lock(&self.state);
172
173        if let Some(id) = state.free_list.pop() {
174            return Ok(PageId::new(id));
175        }
176
177        let id = state.next_new;
178        match id.checked_add(1) {
179            Some(next) if next != NO_PAGE => {
180                state.next_new = next;
181                Ok(PageId::new(id))
182            }
183            _ => Err(PageError::InvalidPageId { page_id: id }),
184        }
185    }
186
187    /// Return a page id to the free-list for reuse.
188    ///
189    /// An in-memory push; the free-list is written to the store at the next
190    /// [`sync`](PageAllocator::sync). Freeing an id that is already free is a
191    /// caller error and corrupts the free-list, exactly as with a system
192    /// allocator.
193    ///
194    /// # Errors
195    ///
196    /// [`PageError::InvalidPageId`] if `id` is the reserved superblock (0) or was
197    /// never allocated (at or beyond the high-water mark).
198    pub fn free(&self, id: PageId) -> PageResult<()> {
199        let raw = id.get();
200        if raw == 0 {
201            return Err(PageError::InvalidPageId { page_id: 0 });
202        }
203        let mut state = sync::lock(&self.state);
204        if raw >= state.next_new {
205            return Err(PageError::InvalidPageId { page_id: raw });
206        }
207        state.free_list.push(raw);
208        Ok(())
209    }
210
211    /// The next id that would be newly allocated — one past the highest id ever
212    /// handed out. Pages `1..high_water()` have been allocated at some point.
213    #[must_use]
214    pub fn high_water(&self) -> u64 {
215        sync::lock(&self.state).next_new
216    }
217
218    /// The number of pages currently on the free-list, available for reuse
219    /// without extending the file.
220    #[must_use]
221    pub fn free_count(&self) -> u64 {
222        sync::lock(&self.state).free_list.len() as u64
223    }
224
225    /// Persist the superblock and free-list chain, then make the store durable.
226    ///
227    /// Writes the free-list onto its pages as an intrusive chain, records the
228    /// head and high-water mark in page 0, then syncs the store. Call it as part
229    /// of the checkpoint that makes the allocated pages durable (see the module
230    /// docs on ordering). Cost is proportional to the free-list length.
231    ///
232    /// # Errors
233    ///
234    /// Whatever the store's writes or sync return.
235    pub fn sync(&self) -> PageResult<()> {
236        {
237            let mut state = sync::lock(&self.state);
238            self.persist_superblock(&mut state)?;
239        }
240        self.store.sync()
241    }
242
243    /// Write the free-list chain and the superblock at page 0. Caller holds the
244    /// lock.
245    fn persist_superblock(&self, state: &mut AllocState) -> PageResult<()> {
246        // Write each free page's link so the on-disk chain matches `free_list`:
247        // free_list[0] -> free_list[1] -> ... -> free_list[n-1] -> NO_PAGE,
248        // with the superblock head pointing at free_list[0].
249        let len = state.free_list.len();
250        for i in 0..len {
251            let id = state.free_list[i];
252            let next = if i + 1 < len {
253                state.free_list[i + 1]
254            } else {
255                NO_PAGE
256            };
257            state.scratch.reset();
258            write_u64(state.scratch.payload_mut(), LINK_NEXT_OFF, next);
259            self.store.write_page(PageId::new(id), &mut state.scratch)?;
260        }
261        let head = state.free_list.first().copied().unwrap_or(NO_PAGE);
262
263        state.scratch.reset();
264        let payload = state.scratch.payload_mut();
265        write_u32(payload, SB_MAGIC_OFF, SB_MAGIC);
266        write_u16(payload, SB_VERSION_OFF, SB_VERSION);
267        write_u64(payload, SB_HEAD_OFF, head);
268        write_u64(payload, SB_NEXT_OFF, state.next_new);
269        write_u64(payload, SB_FREECOUNT_OFF, len as u64);
270        self.store.write_page(PageId::new(0), &mut state.scratch)
271    }
272}
273
274/// Follow the on-disk free-list chain from `head`, returning the ids in the same
275/// order [`persist_superblock`](PageAllocator::persist_superblock) wrote them.
276fn walk_free_chain<S: PageStore>(
277    store: &S,
278    scratch: &mut Page,
279    head: u64,
280    free_count: u64,
281) -> PageResult<Vec<u64>> {
282    let mut ids = Vec::new();
283    let mut cur = head;
284    for _ in 0..free_count {
285        if cur == NO_PAGE {
286            return Err(PageError::InvalidSuperblock);
287        }
288        ids.push(cur);
289        store.read_into(PageId::new(cur), scratch)?;
290        cur = read_u64(scratch.payload(), LINK_NEXT_OFF);
291    }
292    // The chain must end exactly at the recorded length.
293    if cur != NO_PAGE {
294        return Err(PageError::InvalidSuperblock);
295    }
296    Ok(ids)
297}
298
299#[inline]
300fn read_u16(bytes: &[u8], off: usize) -> u16 {
301    u16::from_le_bytes([bytes[off], bytes[off + 1]])
302}
303
304#[inline]
305fn read_u32(bytes: &[u8], off: usize) -> u32 {
306    u32::from_le_bytes([bytes[off], bytes[off + 1], bytes[off + 2], bytes[off + 3]])
307}
308
309#[inline]
310fn read_u64(bytes: &[u8], off: usize) -> u64 {
311    u64::from_le_bytes([
312        bytes[off],
313        bytes[off + 1],
314        bytes[off + 2],
315        bytes[off + 3],
316        bytes[off + 4],
317        bytes[off + 5],
318        bytes[off + 6],
319        bytes[off + 7],
320    ])
321}
322
323#[inline]
324fn write_u16(bytes: &mut [u8], off: usize, value: u16) {
325    bytes[off..off + 2].copy_from_slice(&value.to_le_bytes());
326}
327
328#[inline]
329fn write_u32(bytes: &mut [u8], off: usize, value: u32) {
330    bytes[off..off + 4].copy_from_slice(&value.to_le_bytes());
331}
332
333#[inline]
334fn write_u64(bytes: &mut [u8], off: usize, value: u64) {
335    bytes[off..off + 8].copy_from_slice(&value.to_le_bytes());
336}
337
338#[cfg(all(test, not(loom)))]
339mod tests {
340    #![allow(clippy::unwrap_used, clippy::expect_used)]
341
342    use std::collections::HashSet;
343
344    use proptest::prelude::*;
345
346    use super::*;
347    use crate::test_store::MemStore;
348
349    const PS: usize = 4096;
350
351    fn allocator() -> PageAllocator<MemStore> {
352        PageAllocator::new(MemStore::new(PS)).unwrap()
353    }
354
355    #[test]
356    fn test_allocate_starts_at_one_and_increments() {
357        let alloc = allocator();
358        assert_eq!(alloc.allocate().unwrap(), PageId::new(1));
359        assert_eq!(alloc.allocate().unwrap(), PageId::new(2));
360        assert_eq!(alloc.allocate().unwrap(), PageId::new(3));
361        assert_eq!(alloc.high_water(), 4);
362    }
363
364    #[test]
365    fn test_free_then_allocate_reuses_id() {
366        let alloc = allocator();
367        let a = alloc.allocate().unwrap();
368        let b = alloc.allocate().unwrap();
369        alloc.free(a).unwrap();
370        assert_eq!(alloc.free_count(), 1);
371        let c = alloc.allocate().unwrap();
372        assert_eq!(c, a);
373        assert_ne!(c, b);
374        assert_eq!(alloc.free_count(), 0);
375    }
376
377    #[test]
378    fn test_free_list_is_lifo() {
379        let alloc = allocator();
380        let ids: Vec<_> = (0..4).map(|_| alloc.allocate().unwrap()).collect();
381        for &id in &ids {
382            alloc.free(id).unwrap();
383        }
384        // Freed 1,2,3,4; the list pops most-recently-freed first.
385        let mut reused = Vec::new();
386        for _ in 0..4 {
387            reused.push(alloc.allocate().unwrap());
388        }
389        let expected: Vec<_> = ids.into_iter().rev().collect();
390        assert_eq!(reused, expected);
391    }
392
393    #[test]
394    fn test_free_rejects_superblock_and_unallocated() {
395        let alloc = allocator();
396        let _ = alloc.allocate().unwrap(); // high_water now 2
397        assert!(matches!(
398            alloc.free(PageId::new(0)),
399            Err(PageError::InvalidPageId { page_id: 0 })
400        ));
401        assert!(matches!(
402            alloc.free(PageId::new(5)),
403            Err(PageError::InvalidPageId { page_id: 5 })
404        ));
405    }
406
407    #[test]
408    fn test_state_survives_reopen() {
409        let store = MemStore::new(PS);
410        {
411            let alloc = PageAllocator::new(store).unwrap();
412            let _ = alloc.allocate().unwrap(); // 1
413            let b = alloc.allocate().unwrap(); // 2
414            let _ = alloc.allocate().unwrap(); // 3
415            alloc.free(b).unwrap(); // 2 freed
416            alloc.sync().unwrap(); // persist superblock
417            // Recover the store from the allocator to reopen over it.
418            let alloc2 = PageAllocator::new(alloc.into_store()).unwrap();
419            assert_eq!(alloc2.high_water(), 4);
420            assert_eq!(alloc2.free_count(), 1);
421            // Next allocate reuses the freed id 2.
422            assert_eq!(alloc2.allocate().unwrap(), PageId::new(2));
423        }
424    }
425
426    #[test]
427    fn test_new_rejects_non_superblock_page_zero() {
428        let store = MemStore::new(PS);
429        // Write a non-superblock page at slot 0.
430        {
431            let mut page = store.allocate_page();
432            page.payload_mut()[0] = 0xFF;
433            store.write_page(PageId::new(0), &mut page).unwrap();
434        }
435        assert!(matches!(
436            PageAllocator::new(store),
437            Err(PageError::InvalidSuperblock)
438        ));
439    }
440
441    // Test-only accessor to reopen over the same store.
442    impl PageAllocator<MemStore> {
443        fn into_store(self) -> MemStore {
444            self.store
445        }
446    }
447
448    proptest! {
449        #![proptest_config(ProptestConfig::with_cases(48))]
450
451        /// Allocated ids are always unique while live: through any sequence of
452        /// allocates and frees, two outstanding ids never coincide, and a freed
453        /// id is only ever handed back out after being freed.
454        #[test]
455        fn allocate_free_never_double_allocates(
456            ops in proptest::collection::vec(any::<bool>(), 1..200),
457        ) {
458            let alloc = allocator();
459            let mut live: HashSet<u64> = HashSet::new();
460            let mut freed_pool: Vec<u64> = Vec::new();
461
462            for want_alloc in ops {
463                if want_alloc || live.is_empty() {
464                    let id = alloc.allocate().unwrap().get();
465                    // The id must not already be live.
466                    prop_assert!(!live.contains(&id), "id {} double-allocated", id);
467                    prop_assert!(id >= 1, "id 0 is reserved");
468                    let _ = live.insert(id);
469                    freed_pool.retain(|&f| f != id);
470                } else {
471                    // Free an arbitrary live id (the first one).
472                    let victim = *live.iter().next().unwrap();
473                    let _ = live.remove(&victim);
474                    alloc.free(PageId::new(victim)).unwrap();
475                    freed_pool.push(victim);
476                }
477                prop_assert_eq!(alloc.free_count(), freed_pool.len() as u64);
478            }
479        }
480    }
481}
482
483#[cfg(all(test, loom))]
484mod loom_tests {
485    use super::*;
486    use crate::sync::Arc;
487    use crate::test_store::MemStore;
488
489    /// Concurrent allocations never collide: two threads each allocate, and the
490    /// two ids they get are always distinct, under every interleaving.
491    #[test]
492    fn loom_concurrent_allocate_is_unique() {
493        loom::model(|| {
494            let alloc = Arc::new(PageAllocator::new(MemStore::new(4096)).unwrap());
495
496            let a = Arc::clone(&alloc);
497            let t = loom::thread::spawn(move || a.allocate().unwrap());
498
499            let first = alloc.allocate().unwrap();
500            let second = t.join().unwrap();
501            assert_ne!(first, second);
502        });
503    }
504}