1use std::path::Path;
26
27use crate::error::{PageError, PageResult};
28use crate::file::PageFile;
29use crate::page::{Page, PageId, PageSize};
30use crate::store::PageStore;
31use crate::sync::{self, Mutex};
32
33const NO_PAGE: u64 = u64::MAX;
35
36const SB_MAGIC: u32 = u32::from_le_bytes([b'P', b'G', b'S', b'B']);
38const SB_VERSION: u16 = 1;
39
40const SB_MAGIC_OFF: usize = 0;
42const SB_VERSION_OFF: usize = 4;
43const SB_HEAD_OFF: usize = 8;
44const SB_NEXT_OFF: usize = 16;
45const SB_FREECOUNT_OFF: usize = 24;
46
47const LINK_NEXT_OFF: usize = 0;
49
50struct AllocState {
52 free_list: Vec<u64>,
54 next_new: u64,
56 scratch: Page,
59}
60
61pub struct PageAllocator<S = PageFile> {
91 store: S,
92 state: Mutex<AllocState>,
93}
94
95impl PageAllocator<PageFile> {
96 pub fn open<P: AsRef<Path>>(path: P, page_size: PageSize) -> PageResult<Self> {
107 let file = PageFile::open(path, page_size)?;
108 Self::new(file)
109 }
110}
111
112impl<S: PageStore> PageAllocator<S> {
113 pub fn new(store: S) -> PageResult<Self> {
121 let mut scratch = store.allocate_page();
122
123 let (free_list, next_new, fresh) = match store.read_into(PageId::new(0), &mut scratch) {
124 Ok(()) => {
125 let payload = scratch.payload();
126 if read_u32(payload, SB_MAGIC_OFF) != SB_MAGIC
127 || read_u16(payload, SB_VERSION_OFF) != SB_VERSION
128 {
129 return Err(PageError::InvalidSuperblock);
130 }
131 let head = read_u64(payload, SB_HEAD_OFF);
132 let next_new = read_u64(payload, SB_NEXT_OFF);
133 let free_count = read_u64(payload, SB_FREECOUNT_OFF);
134 let free_list = walk_free_chain(&store, &mut scratch, head, free_count)?;
135 (free_list, next_new, false)
136 }
137 Err(PageError::ShortRead { .. }) => (Vec::new(), 1, true),
140 Err(err) => return Err(err),
141 };
142
143 let allocator = Self {
144 store,
145 state: Mutex::new(AllocState {
146 free_list,
147 next_new,
148 scratch,
149 }),
150 };
151
152 if fresh {
153 let mut state = sync::lock(&allocator.state);
154 allocator.persist_superblock(&mut state)?;
155 }
156
157 Ok(allocator)
158 }
159
160 pub fn allocate(&self) -> PageResult<PageId> {
171 let mut state = sync::lock(&self.state);
172
173 if let Some(id) = state.free_list.pop() {
174 return Ok(PageId::new(id));
175 }
176
177 let id = state.next_new;
178 match id.checked_add(1) {
179 Some(next) if next != NO_PAGE => {
180 state.next_new = next;
181 Ok(PageId::new(id))
182 }
183 _ => Err(PageError::InvalidPageId { page_id: id }),
184 }
185 }
186
187 pub fn free(&self, id: PageId) -> PageResult<()> {
199 let raw = id.get();
200 if raw == 0 {
201 return Err(PageError::InvalidPageId { page_id: 0 });
202 }
203 let mut state = sync::lock(&self.state);
204 if raw >= state.next_new {
205 return Err(PageError::InvalidPageId { page_id: raw });
206 }
207 state.free_list.push(raw);
208 Ok(())
209 }
210
211 #[must_use]
214 pub fn high_water(&self) -> u64 {
215 sync::lock(&self.state).next_new
216 }
217
218 #[must_use]
221 pub fn free_count(&self) -> u64 {
222 sync::lock(&self.state).free_list.len() as u64
223 }
224
225 pub fn sync(&self) -> PageResult<()> {
236 {
237 let mut state = sync::lock(&self.state);
238 self.persist_superblock(&mut state)?;
239 }
240 self.store.sync()
241 }
242
243 fn persist_superblock(&self, state: &mut AllocState) -> PageResult<()> {
246 let len = state.free_list.len();
250 for i in 0..len {
251 let id = state.free_list[i];
252 let next = if i + 1 < len {
253 state.free_list[i + 1]
254 } else {
255 NO_PAGE
256 };
257 state.scratch.reset();
258 write_u64(state.scratch.payload_mut(), LINK_NEXT_OFF, next);
259 self.store.write_page(PageId::new(id), &mut state.scratch)?;
260 }
261 let head = state.free_list.first().copied().unwrap_or(NO_PAGE);
262
263 state.scratch.reset();
264 let payload = state.scratch.payload_mut();
265 write_u32(payload, SB_MAGIC_OFF, SB_MAGIC);
266 write_u16(payload, SB_VERSION_OFF, SB_VERSION);
267 write_u64(payload, SB_HEAD_OFF, head);
268 write_u64(payload, SB_NEXT_OFF, state.next_new);
269 write_u64(payload, SB_FREECOUNT_OFF, len as u64);
270 self.store.write_page(PageId::new(0), &mut state.scratch)
271 }
272}
273
274fn walk_free_chain<S: PageStore>(
277 store: &S,
278 scratch: &mut Page,
279 head: u64,
280 free_count: u64,
281) -> PageResult<Vec<u64>> {
282 let mut ids = Vec::new();
283 let mut cur = head;
284 for _ in 0..free_count {
285 if cur == NO_PAGE {
286 return Err(PageError::InvalidSuperblock);
287 }
288 ids.push(cur);
289 store.read_into(PageId::new(cur), scratch)?;
290 cur = read_u64(scratch.payload(), LINK_NEXT_OFF);
291 }
292 if cur != NO_PAGE {
294 return Err(PageError::InvalidSuperblock);
295 }
296 Ok(ids)
297}
298
299#[inline]
300fn read_u16(bytes: &[u8], off: usize) -> u16 {
301 u16::from_le_bytes([bytes[off], bytes[off + 1]])
302}
303
304#[inline]
305fn read_u32(bytes: &[u8], off: usize) -> u32 {
306 u32::from_le_bytes([bytes[off], bytes[off + 1], bytes[off + 2], bytes[off + 3]])
307}
308
309#[inline]
310fn read_u64(bytes: &[u8], off: usize) -> u64 {
311 u64::from_le_bytes([
312 bytes[off],
313 bytes[off + 1],
314 bytes[off + 2],
315 bytes[off + 3],
316 bytes[off + 4],
317 bytes[off + 5],
318 bytes[off + 6],
319 bytes[off + 7],
320 ])
321}
322
323#[inline]
324fn write_u16(bytes: &mut [u8], off: usize, value: u16) {
325 bytes[off..off + 2].copy_from_slice(&value.to_le_bytes());
326}
327
328#[inline]
329fn write_u32(bytes: &mut [u8], off: usize, value: u32) {
330 bytes[off..off + 4].copy_from_slice(&value.to_le_bytes());
331}
332
333#[inline]
334fn write_u64(bytes: &mut [u8], off: usize, value: u64) {
335 bytes[off..off + 8].copy_from_slice(&value.to_le_bytes());
336}
337
338#[cfg(all(test, not(loom)))]
339mod tests {
340 #![allow(clippy::unwrap_used, clippy::expect_used)]
341
342 use std::collections::HashSet;
343
344 use proptest::prelude::*;
345
346 use super::*;
347 use crate::test_store::MemStore;
348
349 const PS: usize = 4096;
350
351 fn allocator() -> PageAllocator<MemStore> {
352 PageAllocator::new(MemStore::new(PS)).unwrap()
353 }
354
355 #[test]
356 fn test_allocate_starts_at_one_and_increments() {
357 let alloc = allocator();
358 assert_eq!(alloc.allocate().unwrap(), PageId::new(1));
359 assert_eq!(alloc.allocate().unwrap(), PageId::new(2));
360 assert_eq!(alloc.allocate().unwrap(), PageId::new(3));
361 assert_eq!(alloc.high_water(), 4);
362 }
363
364 #[test]
365 fn test_free_then_allocate_reuses_id() {
366 let alloc = allocator();
367 let a = alloc.allocate().unwrap();
368 let b = alloc.allocate().unwrap();
369 alloc.free(a).unwrap();
370 assert_eq!(alloc.free_count(), 1);
371 let c = alloc.allocate().unwrap();
372 assert_eq!(c, a);
373 assert_ne!(c, b);
374 assert_eq!(alloc.free_count(), 0);
375 }
376
377 #[test]
378 fn test_free_list_is_lifo() {
379 let alloc = allocator();
380 let ids: Vec<_> = (0..4).map(|_| alloc.allocate().unwrap()).collect();
381 for &id in &ids {
382 alloc.free(id).unwrap();
383 }
384 let mut reused = Vec::new();
386 for _ in 0..4 {
387 reused.push(alloc.allocate().unwrap());
388 }
389 let expected: Vec<_> = ids.into_iter().rev().collect();
390 assert_eq!(reused, expected);
391 }
392
393 #[test]
394 fn test_free_rejects_superblock_and_unallocated() {
395 let alloc = allocator();
396 let _ = alloc.allocate().unwrap(); assert!(matches!(
398 alloc.free(PageId::new(0)),
399 Err(PageError::InvalidPageId { page_id: 0 })
400 ));
401 assert!(matches!(
402 alloc.free(PageId::new(5)),
403 Err(PageError::InvalidPageId { page_id: 5 })
404 ));
405 }
406
407 #[test]
408 fn test_state_survives_reopen() {
409 let store = MemStore::new(PS);
410 {
411 let alloc = PageAllocator::new(store).unwrap();
412 let _ = alloc.allocate().unwrap(); let b = alloc.allocate().unwrap(); let _ = alloc.allocate().unwrap(); alloc.free(b).unwrap(); alloc.sync().unwrap(); let alloc2 = PageAllocator::new(alloc.into_store()).unwrap();
419 assert_eq!(alloc2.high_water(), 4);
420 assert_eq!(alloc2.free_count(), 1);
421 assert_eq!(alloc2.allocate().unwrap(), PageId::new(2));
423 }
424 }
425
426 #[test]
427 fn test_new_rejects_non_superblock_page_zero() {
428 let store = MemStore::new(PS);
429 {
431 let mut page = store.allocate_page();
432 page.payload_mut()[0] = 0xFF;
433 store.write_page(PageId::new(0), &mut page).unwrap();
434 }
435 assert!(matches!(
436 PageAllocator::new(store),
437 Err(PageError::InvalidSuperblock)
438 ));
439 }
440
441 impl PageAllocator<MemStore> {
443 fn into_store(self) -> MemStore {
444 self.store
445 }
446 }
447
448 proptest! {
449 #![proptest_config(ProptestConfig::with_cases(48))]
450
451 #[test]
455 fn allocate_free_never_double_allocates(
456 ops in proptest::collection::vec(any::<bool>(), 1..200),
457 ) {
458 let alloc = allocator();
459 let mut live: HashSet<u64> = HashSet::new();
460 let mut freed_pool: Vec<u64> = Vec::new();
461
462 for want_alloc in ops {
463 if want_alloc || live.is_empty() {
464 let id = alloc.allocate().unwrap().get();
465 prop_assert!(!live.contains(&id), "id {} double-allocated", id);
467 prop_assert!(id >= 1, "id 0 is reserved");
468 let _ = live.insert(id);
469 freed_pool.retain(|&f| f != id);
470 } else {
471 let victim = *live.iter().next().unwrap();
473 let _ = live.remove(&victim);
474 alloc.free(PageId::new(victim)).unwrap();
475 freed_pool.push(victim);
476 }
477 prop_assert_eq!(alloc.free_count(), freed_pool.len() as u64);
478 }
479 }
480 }
481}
482
483#[cfg(all(test, loom))]
484mod loom_tests {
485 use super::*;
486 use crate::sync::Arc;
487 use crate::test_store::MemStore;
488
489 #[test]
492 fn loom_concurrent_allocate_is_unique() {
493 loom::model(|| {
494 let alloc = Arc::new(PageAllocator::new(MemStore::new(4096)).unwrap());
495
496 let a = Arc::clone(&alloc);
497 let t = loom::thread::spawn(move || a.allocate().unwrap());
498
499 let first = alloc.allocate().unwrap();
500 let second = t.join().unwrap();
501 assert_ne!(first, second);
502 });
503 }
504}