use crate::error::StoreError;
use crate::types::{CRC_SIZE, PAGE_HEADER_SIZE, PAGE_SIZE, PageId};
const PAGE_MAGIC: [u8; 4] = *b"VDPG";
const PAGE_VERSION: u8 = 1;
const SLOT_SIZE: usize = 4;
const CRC_OFFSET: usize = PAGE_SIZE - CRC_SIZE;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum PageType {
Leaf = 0,
Internal = 1,
Free = 2,
}
impl PageType {
pub fn from_byte(byte: u8) -> Option<Self> {
match byte {
0 => Some(Self::Leaf),
1 => Some(Self::Internal),
2 => Some(Self::Free),
_ => None,
}
}
pub fn as_byte(self) -> u8 {
self as u8
}
}
#[derive(Debug, Clone, Copy)]
pub struct PageHeader {
pub page_type: PageType,
pub item_count: u16,
pub free_offset: u16,
}
impl PageHeader {
pub fn new(page_type: PageType) -> Self {
Self {
page_type,
item_count: 0,
free_offset: PAGE_HEADER_SIZE as u16,
}
}
pub fn serialize(self, buf: &mut [u8; PAGE_HEADER_SIZE]) {
buf[0..4].copy_from_slice(&PAGE_MAGIC);
buf[4] = PAGE_VERSION;
buf[5] = self.page_type.as_byte();
buf[6..8].copy_from_slice(&self.item_count.to_le_bytes());
buf[8..10].copy_from_slice(&self.free_offset.to_le_bytes());
buf[10..32].fill(0);
}
pub fn deserialize(buf: &[u8; PAGE_HEADER_SIZE]) -> Result<Self, StoreError> {
let magic: [u8; 4] = buf[0..4]
.try_into()
.expect("slice length equals 4 after bounds check");
if magic != PAGE_MAGIC {
return Err(StoreError::InvalidPageMagic {
expected: u32::from_le_bytes(PAGE_MAGIC),
actual: u32::from_le_bytes(magic),
});
}
let version = buf[4];
if version != PAGE_VERSION {
return Err(StoreError::UnsupportedPageVersion(version));
}
let page_type =
PageType::from_byte(buf[5]).ok_or(StoreError::UnsupportedPageVersion(buf[5]))?;
let item_count = u16::from_le_bytes(buf[6..8].try_into().unwrap());
let free_offset = u16::from_le_bytes(buf[8..10].try_into().unwrap());
Ok(Self {
page_type,
item_count,
free_offset,
})
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct Slot {
pub offset: u16,
pub length: u16,
}
impl Slot {
pub fn new(offset: u16, length: u16) -> Self {
Self { offset, length }
}
pub fn serialize(self) -> [u8; SLOT_SIZE] {
let mut buf = [0u8; SLOT_SIZE];
buf[0..2].copy_from_slice(&self.offset.to_le_bytes());
buf[2..4].copy_from_slice(&self.length.to_le_bytes());
buf
}
pub fn deserialize(buf: [u8; SLOT_SIZE]) -> Self {
Self {
offset: u16::from_le_bytes(buf[0..2].try_into().unwrap()),
length: u16::from_le_bytes(buf[2..4].try_into().unwrap()),
}
}
}
#[derive(Clone)]
pub struct Page {
pub id: PageId,
data: [u8; PAGE_SIZE],
header: PageHeader,
dirty: bool,
is_raw: bool,
}
impl Page {
pub fn new(id: PageId, page_type: PageType) -> Self {
let mut data = [0u8; PAGE_SIZE];
let header = PageHeader::new(page_type);
let mut header_buf = [0u8; PAGE_HEADER_SIZE];
header.serialize(&mut header_buf);
data[..PAGE_HEADER_SIZE].copy_from_slice(&header_buf);
let mut page = Self {
id,
data,
header,
dirty: true,
is_raw: false,
};
page.update_crc();
page
}
pub fn from_bytes(id: PageId, data: &[u8; PAGE_SIZE]) -> Result<Self, StoreError> {
let stored_crc =
u32::from_le_bytes(data[CRC_OFFSET..CRC_OFFSET + CRC_SIZE].try_into().unwrap());
let computed_crc = kimberlite_crypto::crc32(&data[..CRC_OFFSET]);
if stored_crc != computed_crc {
return Err(StoreError::PageCorrupted {
page_id: id,
expected: stored_crc,
actual: computed_crc,
});
}
let header_bytes: [u8; PAGE_HEADER_SIZE] = data[..PAGE_HEADER_SIZE]
.try_into()
.expect("slice length equals PAGE_HEADER_SIZE");
let header = PageHeader::deserialize(&header_bytes)?;
Ok(Self {
id,
data: *data,
header,
dirty: false,
is_raw: false,
})
}
pub fn page_type(&self) -> PageType {
self.header.page_type
}
pub fn item_count(&self) -> usize {
self.header.item_count as usize
}
pub fn is_dirty(&self) -> bool {
self.dirty
}
pub fn mark_clean(&mut self) {
self.dirty = false;
}
pub fn free_space(&self) -> usize {
let slot_dir_end = self.slot_directory_end();
let data_start = self.data_area_start();
data_start.saturating_sub(slot_dir_end)
}
fn slot_directory_end(&self) -> usize {
PAGE_HEADER_SIZE + (self.header.item_count as usize * SLOT_SIZE)
}
fn data_area_start(&self) -> usize {
if self.header.item_count == 0 {
CRC_OFFSET
} else {
let mut min_offset = CRC_OFFSET;
for i in 0..self.header.item_count as usize {
let slot = self.get_slot(i);
min_offset = min_offset.min(slot.offset as usize);
}
min_offset
}
}
pub fn get_slot(&self, index: usize) -> Slot {
debug_assert!(
index < self.header.item_count as usize,
"slot index out of bounds"
);
let offset = PAGE_HEADER_SIZE + (index * SLOT_SIZE);
let slot_bytes: [u8; SLOT_SIZE] = self.data[offset..offset + SLOT_SIZE].try_into().unwrap();
Slot::deserialize(slot_bytes)
}
pub fn get_item(&self, index: usize) -> &[u8] {
let slot = self.get_slot(index);
&self.data[slot.offset as usize..(slot.offset + slot.length) as usize]
}
pub fn insert_item(&mut self, index: usize, data: &[u8]) -> Result<(), StoreError> {
let needed = SLOT_SIZE + data.len();
let available = self.free_space();
if needed > available {
return Err(StoreError::PageOverflow { needed, available });
}
let data_offset = self.data_area_start() - data.len();
let item_count = self.header.item_count as usize;
if index < item_count {
let src_start = PAGE_HEADER_SIZE + (index * SLOT_SIZE);
let src_end = PAGE_HEADER_SIZE + (item_count * SLOT_SIZE);
let dst_start = src_start + SLOT_SIZE;
self.data.copy_within(src_start..src_end, dst_start);
}
let slot = Slot::new(data_offset as u16, data.len() as u16);
let slot_offset = PAGE_HEADER_SIZE + (index * SLOT_SIZE);
self.data[slot_offset..slot_offset + SLOT_SIZE].copy_from_slice(&slot.serialize());
self.data[data_offset..data_offset + data.len()].copy_from_slice(data);
self.header.item_count += 1;
self.header.free_offset =
(PAGE_HEADER_SIZE + (self.header.item_count as usize * SLOT_SIZE)) as u16;
self.sync_header();
self.dirty = true;
debug_assert_eq!(self.header.item_count as usize, item_count + 1);
Ok(())
}
pub fn remove_item(&mut self, index: usize) {
debug_assert!(
index < self.header.item_count as usize,
"slot index out of bounds"
);
let item_count = self.header.item_count as usize;
if index < item_count - 1 {
let src_start = PAGE_HEADER_SIZE + ((index + 1) * SLOT_SIZE);
let src_end = PAGE_HEADER_SIZE + (item_count * SLOT_SIZE);
let dst_start = PAGE_HEADER_SIZE + (index * SLOT_SIZE);
self.data.copy_within(src_start..src_end, dst_start);
}
self.header.item_count -= 1;
self.header.free_offset =
(PAGE_HEADER_SIZE + (self.header.item_count as usize * SLOT_SIZE)) as u16;
self.sync_header();
self.dirty = true;
}
#[allow(dead_code)]
pub fn update_item(&mut self, index: usize, data: &[u8]) -> Result<(), StoreError> {
let slot = self.get_slot(index);
let old_len = slot.length as usize;
if data.len() <= old_len {
self.data[slot.offset as usize..slot.offset as usize + data.len()]
.copy_from_slice(data);
if data.len() < old_len {
let new_slot = Slot::new(slot.offset, data.len() as u16);
let slot_offset = PAGE_HEADER_SIZE + (index * SLOT_SIZE);
self.data[slot_offset..slot_offset + SLOT_SIZE]
.copy_from_slice(&new_slot.serialize());
}
self.dirty = true;
Ok(())
} else {
let additional = data.len() - old_len;
if additional > self.free_space() {
return Err(StoreError::PageOverflow {
needed: additional,
available: self.free_space(),
});
}
let new_offset = self.data_area_start() - data.len();
self.data[new_offset..new_offset + data.len()].copy_from_slice(data);
let new_slot = Slot::new(new_offset as u16, data.len() as u16);
let slot_offset = PAGE_HEADER_SIZE + (index * SLOT_SIZE);
self.data[slot_offset..slot_offset + SLOT_SIZE].copy_from_slice(&new_slot.serialize());
self.dirty = true;
Ok(())
}
}
fn sync_header(&mut self) {
let mut header_buf = [0u8; PAGE_HEADER_SIZE];
self.header.serialize(&mut header_buf);
self.data[..PAGE_HEADER_SIZE].copy_from_slice(&header_buf);
}
pub fn update_crc(&mut self) {
self.sync_header();
let crc = kimberlite_crypto::crc32(&self.data[..CRC_OFFSET]);
self.data[CRC_OFFSET..CRC_OFFSET + CRC_SIZE].copy_from_slice(&crc.to_le_bytes());
}
pub fn as_bytes(&mut self) -> &[u8; PAGE_SIZE] {
if !self.is_raw {
self.update_crc();
}
&self.data
}
#[allow(dead_code)]
pub fn data(&self) -> &[u8; PAGE_SIZE] {
&self.data
}
pub fn set_raw_data(&mut self, data: &[u8; PAGE_SIZE]) {
self.data = *data;
self.dirty = true;
self.is_raw = true;
}
#[allow(dead_code)]
pub fn compact(&mut self) {
if self.header.item_count == 0 {
return;
}
let items: Vec<Vec<u8>> = (0..self.header.item_count as usize)
.map(|i| self.get_item(i).to_vec())
.collect();
self.data[PAGE_HEADER_SIZE + (self.header.item_count as usize * SLOT_SIZE)..CRC_OFFSET]
.fill(0);
let mut data_offset = CRC_OFFSET;
for (i, item) in items.iter().enumerate() {
data_offset -= item.len();
self.data[data_offset..data_offset + item.len()].copy_from_slice(item);
let slot = Slot::new(data_offset as u16, item.len() as u16);
let slot_offset = PAGE_HEADER_SIZE + (i * SLOT_SIZE);
self.data[slot_offset..slot_offset + SLOT_SIZE].copy_from_slice(&slot.serialize());
}
self.dirty = true;
}
}
impl std::fmt::Debug for Page {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Page")
.field("id", &self.id)
.field("type", &self.header.page_type)
.field("items", &self.header.item_count)
.field("free_space", &self.free_space())
.field("dirty", &self.dirty)
.finish_non_exhaustive()
}
}
#[cfg(test)]
mod page_tests {
use super::*;
#[test]
fn test_new_page() {
let page = Page::new(PageId::new(1), PageType::Leaf);
assert_eq!(page.page_type(), PageType::Leaf);
assert_eq!(page.item_count(), 0);
assert!(page.is_dirty());
assert!(page.free_space() > 0);
}
#[test]
fn test_insert_and_get() {
let mut page = Page::new(PageId::new(1), PageType::Leaf);
page.insert_item(0, b"hello").unwrap();
assert_eq!(page.item_count(), 1);
assert_eq!(page.get_item(0), b"hello");
page.insert_item(1, b"world").unwrap();
assert_eq!(page.item_count(), 2);
assert_eq!(page.get_item(0), b"hello");
assert_eq!(page.get_item(1), b"world");
}
#[test]
fn test_insert_at_beginning() {
let mut page = Page::new(PageId::new(1), PageType::Leaf);
page.insert_item(0, b"second").unwrap();
page.insert_item(0, b"first").unwrap();
assert_eq!(page.item_count(), 2);
assert_eq!(page.get_item(0), b"first");
assert_eq!(page.get_item(1), b"second");
}
#[test]
fn test_remove_item() {
let mut page = Page::new(PageId::new(1), PageType::Leaf);
page.insert_item(0, b"a").unwrap();
page.insert_item(1, b"b").unwrap();
page.insert_item(2, b"c").unwrap();
page.remove_item(1);
assert_eq!(page.item_count(), 2);
assert_eq!(page.get_item(0), b"a");
assert_eq!(page.get_item(1), b"c");
}
#[test]
fn test_serialization_roundtrip() {
let mut page = Page::new(PageId::new(42), PageType::Internal);
page.insert_item(0, b"test data").unwrap();
let bytes = *page.as_bytes();
let loaded = Page::from_bytes(PageId::new(42), &bytes).unwrap();
assert_eq!(loaded.page_type(), PageType::Internal);
assert_eq!(loaded.item_count(), 1);
assert_eq!(loaded.get_item(0), b"test data");
}
#[test]
fn test_crc_corruption_detection() {
let mut page = Page::new(PageId::new(1), PageType::Leaf);
page.insert_item(0, b"data").unwrap();
let mut bytes = *page.as_bytes();
bytes[100] ^= 0xFF;
let result = Page::from_bytes(PageId::new(1), &bytes);
assert!(matches!(result, Err(StoreError::PageCorrupted { .. })));
}
}