1use std::collections::HashSet;
26use std::path::Path;
27
28use crate::error::{PageError, PageResult};
29use crate::file::PageFile;
30use crate::page::{Page, PageId, PageSize};
31use crate::store::PageStore;
32use crate::sync::{self, Mutex};
33
34const NO_PAGE: u64 = u64::MAX;
36
37const SB_MAGIC: u32 = u32::from_le_bytes([b'P', b'G', b'S', b'B']);
39const SB_VERSION: u16 = 1;
40
41const SB_MAGIC_OFF: usize = 0;
43const SB_VERSION_OFF: usize = 4;
44const SB_HEAD_OFF: usize = 8;
45const SB_NEXT_OFF: usize = 16;
46const SB_FREECOUNT_OFF: usize = 24;
47
48const LINK_NEXT_OFF: usize = 0;
50
51struct AllocState {
53 free_list: Vec<u64>,
55 next_new: u64,
57 scratch: Page,
60}
61
62pub struct PageAllocator<S = PageFile> {
92 store: S,
93 state: Mutex<AllocState>,
94}
95
96impl PageAllocator<PageFile> {
97 pub fn open<P: AsRef<Path>>(path: P, page_size: PageSize) -> PageResult<Self> {
108 let file = PageFile::open(path, page_size)?;
109 Self::new(file)
110 }
111}
112
113impl<S: PageStore> PageAllocator<S> {
114 pub fn new(store: S) -> PageResult<Self> {
122 let mut scratch = store.allocate_page();
123
124 let (free_list, next_new, fresh) = match store.read_into(PageId::new(0), &mut scratch) {
125 Ok(()) => {
126 let payload = scratch.payload();
127 if read_u32(payload, SB_MAGIC_OFF) != SB_MAGIC
128 || read_u16(payload, SB_VERSION_OFF) != SB_VERSION
129 {
130 return Err(PageError::InvalidSuperblock);
131 }
132 let head = read_u64(payload, SB_HEAD_OFF);
133 let next_new = read_u64(payload, SB_NEXT_OFF);
134 let free_count = read_u64(payload, SB_FREECOUNT_OFF);
135 if next_new < 1 {
136 return Err(PageError::InvalidSuperblock);
137 }
138 let free_list = walk_free_chain(&store, &mut scratch, head, next_new, free_count)?;
139 (free_list, next_new, false)
140 }
141 Err(PageError::ShortRead { .. }) => (Vec::new(), 1, true),
144 Err(err) => return Err(err),
145 };
146
147 let allocator = Self {
148 store,
149 state: Mutex::new(AllocState {
150 free_list,
151 next_new,
152 scratch,
153 }),
154 };
155
156 if fresh {
157 let mut state = sync::lock(&allocator.state);
158 allocator.persist_superblock(&mut state)?;
159 }
160
161 Ok(allocator)
162 }
163
164 pub fn allocate(&self) -> PageResult<PageId> {
175 let mut state = sync::lock(&self.state);
176
177 if let Some(id) = state.free_list.pop() {
178 return Ok(PageId::new(id));
179 }
180
181 let id = state.next_new;
182 match id.checked_add(1) {
183 Some(next) if next != NO_PAGE => {
184 state.next_new = next;
185 Ok(PageId::new(id))
186 }
187 _ => Err(PageError::InvalidPageId { page_id: id }),
188 }
189 }
190
191 pub fn free(&self, id: PageId) -> PageResult<()> {
203 let raw = id.get();
204 if raw == 0 {
205 return Err(PageError::InvalidPageId { page_id: 0 });
206 }
207 let mut state = sync::lock(&self.state);
208 if raw >= state.next_new {
209 return Err(PageError::InvalidPageId { page_id: raw });
210 }
211 state.free_list.push(raw);
212 Ok(())
213 }
214
215 #[must_use]
218 pub fn high_water(&self) -> u64 {
219 sync::lock(&self.state).next_new
220 }
221
222 #[must_use]
225 pub fn free_count(&self) -> u64 {
226 sync::lock(&self.state).free_list.len() as u64
227 }
228
229 pub fn sync(&self) -> PageResult<()> {
240 {
241 let mut state = sync::lock(&self.state);
242 self.persist_superblock(&mut state)?;
243 }
244 self.store.sync()
245 }
246
247 fn persist_superblock(&self, state: &mut AllocState) -> PageResult<()> {
250 let len = state.free_list.len();
254 for i in 0..len {
255 let id = state.free_list[i];
256 let next = if i + 1 < len {
257 state.free_list[i + 1]
258 } else {
259 NO_PAGE
260 };
261 state.scratch.reset();
262 write_u64(state.scratch.payload_mut(), LINK_NEXT_OFF, next);
263 self.store.write_page(PageId::new(id), &mut state.scratch)?;
264 }
265 let head = state.free_list.first().copied().unwrap_or(NO_PAGE);
266
267 state.scratch.reset();
268 let payload = state.scratch.payload_mut();
269 write_u32(payload, SB_MAGIC_OFF, SB_MAGIC);
270 write_u16(payload, SB_VERSION_OFF, SB_VERSION);
271 write_u64(payload, SB_HEAD_OFF, head);
272 write_u64(payload, SB_NEXT_OFF, state.next_new);
273 write_u64(payload, SB_FREECOUNT_OFF, len as u64);
274 self.store.write_page(PageId::new(0), &mut state.scratch)
275 }
276}
277
278fn walk_free_chain<S: PageStore>(
287 store: &S,
288 scratch: &mut Page,
289 head: u64,
290 next_new: u64,
291 free_count: u64,
292) -> PageResult<Vec<u64>> {
293 let mut ids = Vec::new();
294 let mut seen = HashSet::new();
295 let mut cur = head;
296
297 while cur != NO_PAGE {
298 if ids.len() as u64 >= free_count {
301 return Err(PageError::InvalidSuperblock);
302 }
303 if cur == 0 || cur >= next_new {
305 return Err(PageError::InvalidSuperblock);
306 }
307 if !seen.insert(cur) {
310 return Err(PageError::InvalidSuperblock);
311 }
312 ids.push(cur);
313 store.read_into(PageId::new(cur), scratch)?;
314 cur = read_u64(scratch.payload(), LINK_NEXT_OFF);
315 }
316
317 if ids.len() as u64 != free_count {
319 return Err(PageError::InvalidSuperblock);
320 }
321 Ok(ids)
322}
323
324#[inline]
325fn read_u16(bytes: &[u8], off: usize) -> u16 {
326 u16::from_le_bytes([bytes[off], bytes[off + 1]])
327}
328
329#[inline]
330fn read_u32(bytes: &[u8], off: usize) -> u32 {
331 u32::from_le_bytes([bytes[off], bytes[off + 1], bytes[off + 2], bytes[off + 3]])
332}
333
334#[inline]
335fn read_u64(bytes: &[u8], off: usize) -> u64 {
336 u64::from_le_bytes([
337 bytes[off],
338 bytes[off + 1],
339 bytes[off + 2],
340 bytes[off + 3],
341 bytes[off + 4],
342 bytes[off + 5],
343 bytes[off + 6],
344 bytes[off + 7],
345 ])
346}
347
348#[inline]
349fn write_u16(bytes: &mut [u8], off: usize, value: u16) {
350 bytes[off..off + 2].copy_from_slice(&value.to_le_bytes());
351}
352
353#[inline]
354fn write_u32(bytes: &mut [u8], off: usize, value: u32) {
355 bytes[off..off + 4].copy_from_slice(&value.to_le_bytes());
356}
357
358#[inline]
359fn write_u64(bytes: &mut [u8], off: usize, value: u64) {
360 bytes[off..off + 8].copy_from_slice(&value.to_le_bytes());
361}
362
363#[cfg(all(test, not(loom)))]
364mod tests {
365 #![allow(clippy::unwrap_used, clippy::expect_used)]
366
367 use std::collections::HashSet;
368
369 use proptest::prelude::*;
370
371 use super::*;
372 use crate::test_store::MemStore;
373
374 const PS: usize = 4096;
375
376 fn allocator() -> PageAllocator<MemStore> {
377 PageAllocator::new(MemStore::new(PS)).unwrap()
378 }
379
380 #[test]
381 fn test_allocate_starts_at_one_and_increments() {
382 let alloc = allocator();
383 assert_eq!(alloc.allocate().unwrap(), PageId::new(1));
384 assert_eq!(alloc.allocate().unwrap(), PageId::new(2));
385 assert_eq!(alloc.allocate().unwrap(), PageId::new(3));
386 assert_eq!(alloc.high_water(), 4);
387 }
388
389 #[test]
390 fn test_free_then_allocate_reuses_id() {
391 let alloc = allocator();
392 let a = alloc.allocate().unwrap();
393 let b = alloc.allocate().unwrap();
394 alloc.free(a).unwrap();
395 assert_eq!(alloc.free_count(), 1);
396 let c = alloc.allocate().unwrap();
397 assert_eq!(c, a);
398 assert_ne!(c, b);
399 assert_eq!(alloc.free_count(), 0);
400 }
401
402 #[test]
403 fn test_free_list_is_lifo() {
404 let alloc = allocator();
405 let ids: Vec<_> = (0..4).map(|_| alloc.allocate().unwrap()).collect();
406 for &id in &ids {
407 alloc.free(id).unwrap();
408 }
409 let mut reused = Vec::new();
411 for _ in 0..4 {
412 reused.push(alloc.allocate().unwrap());
413 }
414 let expected: Vec<_> = ids.into_iter().rev().collect();
415 assert_eq!(reused, expected);
416 }
417
418 #[test]
419 fn test_free_rejects_superblock_and_unallocated() {
420 let alloc = allocator();
421 let _ = alloc.allocate().unwrap(); assert!(matches!(
423 alloc.free(PageId::new(0)),
424 Err(PageError::InvalidPageId { page_id: 0 })
425 ));
426 assert!(matches!(
427 alloc.free(PageId::new(5)),
428 Err(PageError::InvalidPageId { page_id: 5 })
429 ));
430 }
431
432 #[test]
433 fn test_state_survives_reopen() {
434 let store = MemStore::new(PS);
435 {
436 let alloc = PageAllocator::new(store).unwrap();
437 let _ = alloc.allocate().unwrap(); let b = alloc.allocate().unwrap(); let _ = alloc.allocate().unwrap(); alloc.free(b).unwrap(); alloc.sync().unwrap(); let alloc2 = PageAllocator::new(alloc.into_store()).unwrap();
444 assert_eq!(alloc2.high_water(), 4);
445 assert_eq!(alloc2.free_count(), 1);
446 assert_eq!(alloc2.allocate().unwrap(), PageId::new(2));
448 }
449 }
450
451 #[test]
452 fn test_new_rejects_non_superblock_page_zero() {
453 let store = MemStore::new(PS);
454 {
456 let mut page = store.allocate_page();
457 page.payload_mut()[0] = 0xFF;
458 store.write_page(PageId::new(0), &mut page).unwrap();
459 }
460 assert!(matches!(
461 PageAllocator::new(store),
462 Err(PageError::InvalidSuperblock)
463 ));
464 }
465
466 fn write_superblock(store: &MemStore, head: u64, next_new: u64, free_count: u64) {
469 let mut page = store.allocate_page();
470 let payload = page.payload_mut();
471 write_u32(payload, SB_MAGIC_OFF, SB_MAGIC);
472 write_u16(payload, SB_VERSION_OFF, SB_VERSION);
473 write_u64(payload, SB_HEAD_OFF, head);
474 write_u64(payload, SB_NEXT_OFF, next_new);
475 write_u64(payload, SB_FREECOUNT_OFF, free_count);
476 store.write_page(PageId::new(0), &mut page).unwrap();
477 }
478
479 fn write_link(store: &MemStore, id: u64, next: u64) {
480 let mut page = store.allocate_page();
481 write_u64(page.payload_mut(), LINK_NEXT_OFF, next);
482 store.write_page(PageId::new(id), &mut page).unwrap();
483 }
484
485 #[test]
486 fn test_new_rejects_cycled_free_chain() {
487 let store = MemStore::new(PS);
488 write_superblock(&store, 1, 3, 10); write_link(&store, 1, 2);
490 write_link(&store, 2, 1); assert!(matches!(
492 PageAllocator::new(store),
493 Err(PageError::InvalidSuperblock)
494 ));
495 }
496
497 #[test]
498 fn test_new_rejects_out_of_range_link() {
499 let store = MemStore::new(PS);
500 write_superblock(&store, 5, 3, 1); assert!(matches!(
502 PageAllocator::new(store),
503 Err(PageError::InvalidSuperblock)
504 ));
505 }
506
507 #[test]
508 fn test_new_rejects_free_count_mismatch() {
509 let store = MemStore::new(PS);
510 write_superblock(&store, 1, 3, 5); write_link(&store, 1, NO_PAGE); assert!(matches!(
513 PageAllocator::new(store),
514 Err(PageError::InvalidSuperblock)
515 ));
516 }
517
518 impl PageAllocator<MemStore> {
520 fn into_store(self) -> MemStore {
521 self.store
522 }
523 }
524
525 proptest! {
526 #![proptest_config(ProptestConfig::with_cases(48))]
527
528 #[test]
532 fn allocate_free_never_double_allocates(
533 ops in proptest::collection::vec(any::<bool>(), 1..200),
534 ) {
535 let alloc = allocator();
536 let mut live: HashSet<u64> = HashSet::new();
537 let mut freed_pool: Vec<u64> = Vec::new();
538
539 for want_alloc in ops {
540 if want_alloc || live.is_empty() {
541 let id = alloc.allocate().unwrap().get();
542 prop_assert!(!live.contains(&id), "id {} double-allocated", id);
544 prop_assert!(id >= 1, "id 0 is reserved");
545 let _ = live.insert(id);
546 freed_pool.retain(|&f| f != id);
547 } else {
548 let victim = *live.iter().next().unwrap();
550 let _ = live.remove(&victim);
551 alloc.free(PageId::new(victim)).unwrap();
552 freed_pool.push(victim);
553 }
554 prop_assert_eq!(alloc.free_count(), freed_pool.len() as u64);
555 }
556 }
557 }
558}
559
560#[cfg(all(test, loom))]
561mod loom_tests {
562 use super::*;
563 use crate::sync::Arc;
564 use crate::test_store::MemStore;
565
566 #[test]
569 fn loom_concurrent_allocate_is_unique() {
570 loom::model(|| {
571 let alloc = Arc::new(PageAllocator::new(MemStore::new(4096)).unwrap());
572
573 let a = Arc::clone(&alloc);
574 let t = loom::thread::spawn(move || a.allocate().unwrap());
575
576 let first = alloc.allocate().unwrap();
577 let second = t.join().unwrap();
578 assert_ne!(first, second);
579 });
580 }
581}