use citadel_buffer::allocator::PageAllocator;
use citadel_core::types::{PageId, PageType, TxnId};
use citadel_core::{Error, Result, PAGE_HEADER_SIZE, PENDING_FREE_ENTRY_SIZE, USABLE_SIZE};
use citadel_page::page::Page;
use rustc_hash::FxHashMap;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PendingFreeEntry {
pub page_id: PageId,
pub freed_at_txn: TxnId,
}
const MAX_ENTRIES_PER_PAGE: usize = (USABLE_SIZE - 4) / PENDING_FREE_ENTRY_SIZE;
pub fn read_chain(pages: &FxHashMap<PageId, Page>, root: PageId) -> Result<Vec<PendingFreeEntry>> {
if !root.is_valid() {
return Ok(Vec::new());
}
let mut entries = Vec::new();
let mut current = root;
while current.is_valid() {
let page = pages.get(¤t).ok_or(Error::PageOutOfBounds(current))?;
let entry_count = read_entry_count(page);
let data_start = PAGE_HEADER_SIZE + 4;
for i in 0..entry_count {
let offset = data_start + i * PENDING_FREE_ENTRY_SIZE;
entries.push(read_entry_at(&page.data, offset));
}
current = page.right_child();
if !current.is_valid() {
break;
}
}
Ok(entries)
}
pub fn write_chain(
pages: &mut FxHashMap<PageId, Page>,
alloc: &mut PageAllocator,
txn_id: TxnId,
entries: &[PendingFreeEntry],
) -> PageId {
if entries.is_empty() {
return PageId::INVALID;
}
let num_pages = entries.len().div_ceil(MAX_ENTRIES_PER_PAGE);
let page_ids: Vec<PageId> = (0..num_pages).map(|_| alloc.allocate()).collect();
let mut entry_idx = 0;
for (i, &page_id) in page_ids.iter().enumerate() {
let mut page = Page::new(page_id, PageType::PendingFree, txn_id);
let next = if i + 1 < num_pages {
page_ids[i + 1]
} else {
PageId::INVALID
};
page.set_right_child(next);
let entries_this_page = std::cmp::min(MAX_ENTRIES_PER_PAGE, entries.len() - entry_idx);
page.data[PAGE_HEADER_SIZE..PAGE_HEADER_SIZE + 4]
.copy_from_slice(&(entries_this_page as u32).to_le_bytes());
let data_start = PAGE_HEADER_SIZE + 4;
for j in 0..entries_this_page {
let offset = data_start + j * PENDING_FREE_ENTRY_SIZE;
write_entry_at(&mut page.data, offset, &entries[entry_idx + j]);
}
entry_idx += entries_this_page;
page.update_checksum();
pages.insert(page_id, page);
}
page_ids[0]
}
pub fn collect_chain_page_ids(
pages: &FxHashMap<PageId, Page>,
root: PageId,
) -> Result<Vec<PageId>> {
if !root.is_valid() {
return Ok(Vec::new());
}
let mut ids = Vec::new();
let mut current = root;
while current.is_valid() {
ids.push(current);
let page = pages.get(¤t).ok_or(Error::PageOutOfBounds(current))?;
current = page.right_child();
if !current.is_valid() {
break;
}
}
Ok(ids)
}
pub fn process_chain(
pages: &mut FxHashMap<PageId, Page>,
alloc: &mut PageAllocator,
txn_id: TxnId,
current_root: PageId,
freed_this_txn: &[PageId],
deferred_free: &[PageId],
oldest_active_reader: TxnId,
) -> Result<(PageId, Vec<PageId>, Vec<PageId>)> {
let existing = read_chain(pages, current_root)?;
let old_chain_pages = collect_chain_page_ids(pages, current_root)?;
let mut still_pending = Vec::new();
let mut reclaimed = Vec::new();
for entry in existing {
if entry.freed_at_txn.as_u64() < oldest_active_reader.as_u64() {
reclaimed.push(entry.page_id);
} else {
still_pending.push(entry);
}
}
for &page_id in deferred_free {
still_pending.push(PendingFreeEntry {
page_id,
freed_at_txn: txn_id,
});
}
for &page_id in freed_this_txn {
still_pending.push(PendingFreeEntry {
page_id,
freed_at_txn: txn_id,
});
}
let new_root = write_chain(pages, alloc, txn_id, &still_pending);
Ok((new_root, reclaimed, old_chain_pages))
}
fn read_entry_count(page: &Page) -> usize {
u32::from_le_bytes(
page.data[PAGE_HEADER_SIZE..PAGE_HEADER_SIZE + 4]
.try_into()
.unwrap(),
) as usize
}
fn read_entry_at(data: &[u8], offset: usize) -> PendingFreeEntry {
PendingFreeEntry {
page_id: PageId(u32::from_le_bytes(
data[offset..offset + 4].try_into().unwrap(),
)),
freed_at_txn: TxnId(u64::from_le_bytes(
data[offset + 4..offset + 12].try_into().unwrap(),
)),
}
}
fn write_entry_at(data: &mut [u8], offset: usize, entry: &PendingFreeEntry) {
data[offset..offset + 4].copy_from_slice(&entry.page_id.as_u32().to_le_bytes());
data[offset + 4..offset + 12].copy_from_slice(&entry.freed_at_txn.as_u64().to_le_bytes());
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn empty_chain() {
let pages = FxHashMap::default();
let entries = read_chain(&pages, PageId::INVALID).unwrap();
assert!(entries.is_empty());
}
#[test]
fn write_and_read_chain() {
let mut pages = FxHashMap::default();
let mut alloc = PageAllocator::new(0);
let entries = vec![
PendingFreeEntry {
page_id: PageId(10),
freed_at_txn: TxnId(1),
},
PendingFreeEntry {
page_id: PageId(20),
freed_at_txn: TxnId(2),
},
PendingFreeEntry {
page_id: PageId(30),
freed_at_txn: TxnId(3),
},
];
let root = write_chain(&mut pages, &mut alloc, TxnId(5), &entries);
assert!(root.is_valid());
let read_back = read_chain(&pages, root).unwrap();
assert_eq!(read_back.len(), 3);
assert_eq!(read_back[0], entries[0]);
assert_eq!(read_back[1], entries[1]);
assert_eq!(read_back[2], entries[2]);
}
#[test]
fn write_chain_multi_page() {
let mut pages = FxHashMap::default();
let mut alloc = PageAllocator::new(0);
let count = MAX_ENTRIES_PER_PAGE + 10;
let entries: Vec<PendingFreeEntry> = (0..count)
.map(|i| PendingFreeEntry {
page_id: PageId(100 + i as u32),
freed_at_txn: TxnId(i as u64),
})
.collect();
let root = write_chain(&mut pages, &mut alloc, TxnId(999), &entries);
let read_back = read_chain(&pages, root).unwrap();
assert_eq!(read_back.len(), count);
for (i, entry) in read_back.iter().enumerate() {
assert_eq!(entry.page_id, PageId(100 + i as u32));
assert_eq!(entry.freed_at_txn, TxnId(i as u64));
}
}
#[test]
fn write_empty_chain() {
let mut pages = FxHashMap::default();
let mut alloc = PageAllocator::new(0);
let root = write_chain(&mut pages, &mut alloc, TxnId(1), &[]);
assert_eq!(root, PageId::INVALID);
}
#[test]
fn collect_chain_pages() {
let mut pages = FxHashMap::default();
let mut alloc = PageAllocator::new(0);
let entries: Vec<PendingFreeEntry> = (0..MAX_ENTRIES_PER_PAGE + 10)
.map(|i| PendingFreeEntry {
page_id: PageId(100 + i as u32),
freed_at_txn: TxnId(1),
})
.collect();
let root = write_chain(&mut pages, &mut alloc, TxnId(1), &entries);
let chain_pages = collect_chain_page_ids(&pages, root).unwrap();
assert_eq!(chain_pages.len(), 2); }
#[test]
fn process_chain_gc() {
let mut pages = FxHashMap::default();
let mut alloc = PageAllocator::new(0);
let initial_entries = vec![
PendingFreeEntry {
page_id: PageId(10),
freed_at_txn: TxnId(1),
},
PendingFreeEntry {
page_id: PageId(20),
freed_at_txn: TxnId(2),
},
PendingFreeEntry {
page_id: PageId(30),
freed_at_txn: TxnId(3),
},
];
let root = write_chain(&mut pages, &mut alloc, TxnId(3), &initial_entries);
let freed_this_txn = vec![PageId(40)];
let (new_root, reclaimed, old_chain) = process_chain(
&mut pages,
&mut alloc,
TxnId(4),
root,
&freed_this_txn,
&[],
TxnId(3),
)
.unwrap();
assert_eq!(reclaimed.len(), 2);
assert!(reclaimed.contains(&PageId(10)));
assert!(reclaimed.contains(&PageId(20)));
let new_entries = read_chain(&pages, new_root).unwrap();
assert_eq!(new_entries.len(), 2);
assert!(!old_chain.is_empty());
}
#[test]
fn process_chain_with_deferred_free() {
let mut pages = FxHashMap::default();
let mut alloc = PageAllocator::new(0);
let deferred = vec![PageId(50), PageId(51)];
let freed = vec![PageId(60)];
let (new_root, reclaimed, old_chain) = process_chain(
&mut pages,
&mut alloc,
TxnId(1),
PageId::INVALID,
&freed,
&deferred,
TxnId(1),
)
.unwrap();
assert!(reclaimed.is_empty());
assert!(old_chain.is_empty());
let entries = read_chain(&pages, new_root).unwrap();
assert_eq!(entries.len(), 3); }
#[test]
fn max_entries_per_page_correct() {
assert_eq!(MAX_ENTRIES_PER_PAGE, (8096 - 4) / 12);
}
}