use crate::error::{GrumpyError, Result};
use crate::page::{PAGE_HEADER_SIZE, PAGE_SIZE, PageHeader, PageType, SLOT_SIZE};
pub struct SlottedPage {
pub data: [u8; PAGE_SIZE],
}
impl SlottedPage {
pub fn new(page_id: u32) -> Self {
let mut data = [0u8; PAGE_SIZE];
let header = PageHeader::new(page_id, PageType::Data);
header.write_to(&mut data);
Self { data }
}
pub fn from_bytes(data: [u8; PAGE_SIZE]) -> Self {
Self { data }
}
pub fn header(&self) -> PageHeader {
PageHeader::read_from(&self.data)
}
pub fn num_slots(&self) -> u16 {
u16::from_le_bytes(self.data[6..8].try_into().unwrap())
}
fn free_space_start(&self) -> u16 {
u16::from_le_bytes(self.data[8..10].try_into().unwrap())
}
fn free_space_end(&self) -> u16 {
u16::from_le_bytes(self.data[10..12].try_into().unwrap())
}
fn set_num_slots(&mut self, n: u16) {
self.data[6..8].copy_from_slice(&n.to_le_bytes());
}
fn set_free_space_start(&mut self, offset: u16) {
self.data[8..10].copy_from_slice(&offset.to_le_bytes());
}
fn set_free_space_end(&mut self, offset: u16) {
self.data[10..12].copy_from_slice(&offset.to_le_bytes());
}
pub fn free_space(&self) -> usize {
let start = self.free_space_start() as usize;
let end = self.free_space_end() as usize;
end.saturating_sub(start)
}
fn slot_offset(slot_index: u16) -> usize {
PAGE_HEADER_SIZE + (slot_index as usize) * SLOT_SIZE
}
fn read_slot(&self, slot_index: u16) -> (u16, u16) {
let base = Self::slot_offset(slot_index);
let offset = u16::from_le_bytes(self.data[base..base + 2].try_into().unwrap());
let length = u16::from_le_bytes(self.data[base + 2..base + 4].try_into().unwrap());
(offset, length)
}
fn write_slot(&mut self, slot_index: u16, offset: u16, length: u16) {
let base = Self::slot_offset(slot_index);
self.data[base..base + 2].copy_from_slice(&offset.to_le_bytes());
self.data[base + 2..base + 4].copy_from_slice(&length.to_le_bytes());
}
pub fn insert(&mut self, tuple_data: &[u8]) -> Result<u16> {
let data_len = tuple_data.len();
let needed = data_len + SLOT_SIZE;
if self.free_space() < needed {
let header = self.header();
return Err(GrumpyError::PageFull(header.page_id));
}
let slot_index = self.find_tombstone_slot().unwrap_or_else(|| {
let idx = self.num_slots();
self.set_num_slots(idx + 1);
self.set_free_space_start(Self::slot_offset(idx + 1) as u16);
idx
});
let new_end = self.free_space_end() - data_len as u16;
self.set_free_space_end(new_end);
let offset = new_end as usize;
self.data[offset..offset + data_len].copy_from_slice(tuple_data);
self.write_slot(slot_index, new_end, data_len as u16);
Ok(slot_index)
}
fn find_tombstone_slot(&self) -> Option<u16> {
let num = self.num_slots();
for i in 0..num {
let (offset, _) = self.read_slot(i);
if offset == 0 {
return Some(i);
}
}
None
}
pub fn get(&self, slot_index: u16) -> Result<&[u8]> {
if slot_index >= self.num_slots() {
let header = self.header();
return Err(GrumpyError::PageNotFound(header.page_id));
}
let (offset, length) = self.read_slot(slot_index);
if offset == 0 {
let header = self.header();
return Err(GrumpyError::PageNotFound(header.page_id));
}
let start = offset as usize;
let end = start + length as usize;
Ok(&self.data[start..end])
}
pub fn delete(&mut self, slot_index: u16) -> Result<()> {
if slot_index >= self.num_slots() {
let header = self.header();
return Err(GrumpyError::PageNotFound(header.page_id));
}
let (offset, _) = self.read_slot(slot_index);
if offset == 0 {
let header = self.header();
return Err(GrumpyError::PageNotFound(header.page_id));
}
self.write_slot(slot_index, 0, 0);
Ok(())
}
pub fn update(&mut self, slot_index: u16, new_data: &[u8]) -> Result<u16> {
if slot_index >= self.num_slots() {
let header = self.header();
return Err(GrumpyError::PageNotFound(header.page_id));
}
let (offset, length) = self.read_slot(slot_index);
if offset == 0 {
let header = self.header();
return Err(GrumpyError::PageNotFound(header.page_id));
}
if new_data.len() <= length as usize {
let start = offset as usize;
self.data[start..start + new_data.len()].copy_from_slice(new_data);
self.write_slot(slot_index, offset, new_data.len() as u16);
Ok(slot_index)
} else {
self.delete(slot_index)?;
self.insert(new_data)
}
}
pub fn compact(&mut self) {
let num_slots = self.num_slots();
let mut live_tuples: Vec<(u16, Vec<u8>)> = Vec::new();
for i in 0..num_slots {
let (offset, length) = self.read_slot(i);
if offset != 0 {
let start = offset as usize;
let end = start + length as usize;
live_tuples.push((i, self.data[start..end].to_vec()));
}
}
let header_and_slots_end = Self::slot_offset(num_slots);
self.data[header_and_slots_end..PAGE_SIZE].fill(0);
let mut write_end = PAGE_SIZE as u16;
for (slot_index, tuple_data) in &live_tuples {
write_end -= tuple_data.len() as u16;
let start = write_end as usize;
self.data[start..start + tuple_data.len()].copy_from_slice(tuple_data);
self.write_slot(*slot_index, write_end, tuple_data.len() as u16);
}
self.set_free_space_end(write_end);
}
pub fn live_tuple_count(&self) -> usize {
let num = self.num_slots();
let mut count = 0;
for i in 0..num {
let (offset, _) = self.read_slot(i);
if offset != 0 {
count += 1;
}
}
count
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_slotted_page_new() {
let page = SlottedPage::new(1);
let hdr = page.header();
assert_eq!(hdr.page_id, 1);
assert_eq!(hdr.page_type, PageType::Data);
assert_eq!(page.num_slots(), 0);
assert_eq!(page.free_space(), PAGE_SIZE - PAGE_HEADER_SIZE);
}
#[test]
fn test_slotted_page_insert_and_get() {
let mut page = SlottedPage::new(1);
let data = b"hello, grumpydb!";
let slot = page.insert(data).unwrap();
assert_eq!(slot, 0);
let retrieved = page.get(0).unwrap();
assert_eq!(retrieved, data);
}
#[test]
fn test_slotted_page_insert_multiple() {
let mut page = SlottedPage::new(1);
let d0 = b"first";
let d1 = b"second";
let d2 = b"third";
let s0 = page.insert(d0).unwrap();
let s1 = page.insert(d1).unwrap();
let s2 = page.insert(d2).unwrap();
assert_eq!(s0, 0);
assert_eq!(s1, 1);
assert_eq!(s2, 2);
assert_eq!(page.num_slots(), 3);
assert_eq!(page.get(0).unwrap(), d0.as_slice());
assert_eq!(page.get(1).unwrap(), d1.as_slice());
assert_eq!(page.get(2).unwrap(), d2.as_slice());
}
#[test]
fn test_slotted_page_full() {
let mut page = SlottedPage::new(1);
let big_data = vec![0xAB; 1000];
let mut count = 0;
loop {
match page.insert(&big_data) {
Ok(_) => count += 1,
Err(GrumpyError::PageFull(_)) => break,
Err(e) => panic!("unexpected error: {e}"),
}
}
assert!(count > 0);
assert!(count < 10); }
#[test]
fn test_slotted_page_delete() {
let mut page = SlottedPage::new(1);
page.insert(b"keep").unwrap();
page.insert(b"delete_me").unwrap();
page.insert(b"also_keep").unwrap();
page.delete(1).unwrap();
assert_eq!(page.get(0).unwrap(), b"keep");
assert!(page.get(1).is_err()); assert_eq!(page.get(2).unwrap(), b"also_keep");
assert_eq!(page.live_tuple_count(), 2);
}
#[test]
fn test_slotted_page_delete_nonexistent() {
let mut page = SlottedPage::new(1);
assert!(page.delete(0).is_err());
}
#[test]
fn test_slotted_page_double_delete() {
let mut page = SlottedPage::new(1);
page.insert(b"data").unwrap();
page.delete(0).unwrap();
assert!(page.delete(0).is_err()); }
#[test]
fn test_slotted_page_compact() {
let mut page = SlottedPage::new(1);
page.insert(b"aaa").unwrap();
page.insert(b"bbb").unwrap();
page.insert(b"ccc").unwrap();
let free_before = page.free_space();
page.delete(1).unwrap();
page.compact();
let free_after = page.free_space();
assert!(free_after > free_before);
assert_eq!(page.get(0).unwrap(), b"aaa");
assert!(page.get(1).is_err()); assert_eq!(page.get(2).unwrap(), b"ccc");
assert_eq!(page.live_tuple_count(), 2);
}
#[test]
fn test_slotted_page_update_in_place() {
let mut page = SlottedPage::new(1);
page.insert(b"hello world!!").unwrap();
let new_slot = page.update(0, b"hi").unwrap();
assert_eq!(new_slot, 0); assert_eq!(page.get(0).unwrap(), b"hi");
}
#[test]
fn test_slotted_page_update_larger() {
let mut page = SlottedPage::new(1);
page.insert(b"hi").unwrap();
let new_slot = page.update(0, b"hello world, this is much longer").unwrap();
let retrieved = page.get(new_slot).unwrap();
assert_eq!(retrieved, b"hello world, this is much longer");
}
#[test]
fn test_slotted_page_tombstone_reuse() {
let mut page = SlottedPage::new(1);
page.insert(b"first").unwrap(); page.insert(b"second").unwrap();
page.delete(0).unwrap();
let slot = page.insert(b"third").unwrap();
assert_eq!(slot, 0);
assert_eq!(page.get(0).unwrap(), b"third");
assert_eq!(page.num_slots(), 2); }
#[test]
fn test_slotted_page_get_out_of_range() {
let page = SlottedPage::new(1);
assert!(page.get(0).is_err());
assert!(page.get(100).is_err());
}
#[test]
fn test_slotted_page_from_bytes_round_trip() {
let mut page = SlottedPage::new(5);
page.insert(b"test data").unwrap();
let bytes = page.data;
let restored = SlottedPage::from_bytes(bytes);
assert_eq!(restored.get(0).unwrap(), b"test data");
assert_eq!(restored.header().page_id, 5);
}
#[test]
fn test_slotted_page_free_space_decreases() {
let mut page = SlottedPage::new(1);
let initial = page.free_space();
page.insert(b"some data").unwrap();
let after = page.free_space();
assert_eq!(initial - after, 9 + SLOT_SIZE);
}
#[test]
fn test_slotted_page_many_small_tuples() {
let mut page = SlottedPage::new(1);
let data = b"x";
let mut count = 0;
while page.insert(data).is_ok() {
count += 1;
}
assert!(count > 1000);
assert!(count <= 1632);
for i in 0..count {
assert_eq!(page.get(i as u16).unwrap(), b"x");
}
}
}