use crate::error::{Result, SQLRiteError};
use crate::sql::pager::cell::Cell;
use crate::sql::pager::overflow::PagedEntry;
use crate::sql::pager::page::PAYLOAD_PER_PAGE;
const OFFSET_SLOT_COUNT: usize = 0;
const OFFSET_CELLS_TOP: usize = 2;
const PAGE_PAYLOAD_HEADER: usize = 4;
const SLOT_SIZE: usize = 2;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Find {
Found(usize),
NotFound(usize),
}
pub struct TablePage {
buf: Box<[u8; PAYLOAD_PER_PAGE]>,
}
impl TablePage {
pub fn empty() -> Self {
let mut buf = Box::new([0u8; PAYLOAD_PER_PAGE]);
write_u16(&mut buf[..], OFFSET_SLOT_COUNT, 0);
write_u16(&mut buf[..], OFFSET_CELLS_TOP, PAYLOAD_PER_PAGE as u16);
Self { buf }
}
pub fn from_bytes(bytes: &[u8; PAYLOAD_PER_PAGE]) -> Self {
Self {
buf: Box::new(*bytes),
}
}
pub fn as_bytes(&self) -> &[u8; PAYLOAD_PER_PAGE] {
&self.buf
}
pub fn slot_count(&self) -> usize {
read_u16(&self.buf[..], OFFSET_SLOT_COUNT) as usize
}
fn set_slot_count(&mut self, n: usize) {
write_u16(&mut self.buf[..], OFFSET_SLOT_COUNT, n as u16);
}
pub fn cells_top(&self) -> usize {
read_u16(&self.buf[..], OFFSET_CELLS_TOP) as usize
}
fn set_cells_top(&mut self, v: usize) {
write_u16(&mut self.buf[..], OFFSET_CELLS_TOP, v as u16);
}
const fn slots_start() -> usize {
PAGE_PAYLOAD_HEADER
}
fn slots_end(&self) -> usize {
Self::slots_start() + self.slot_count() * SLOT_SIZE
}
pub fn free_space(&self) -> usize {
self.cells_top().saturating_sub(self.slots_end())
}
pub fn would_fit(&self, cell_encoded_size: usize) -> bool {
cell_encoded_size.saturating_add(SLOT_SIZE) <= self.free_space()
}
pub fn slot_offset_raw(&self, slot: usize) -> Result<usize> {
self.slot_offset(slot)
}
fn slot_offset(&self, slot: usize) -> Result<usize> {
if slot >= self.slot_count() {
return Err(SQLRiteError::Internal(format!(
"slot {slot} out of bounds (count = {})",
self.slot_count()
)));
}
let at = Self::slots_start() + slot * SLOT_SIZE;
Ok(read_u16(&self.buf[..], at) as usize)
}
fn set_slot_offset(&mut self, slot: usize, offset: usize) {
let at = Self::slots_start() + slot * SLOT_SIZE;
write_u16(&mut self.buf[..], at, offset as u16);
}
pub fn rowid_at(&self, slot: usize) -> Result<i64> {
let offset = self.slot_offset(slot)?;
Cell::peek_rowid(&self.buf[..], offset)
}
pub fn cell_at(&self, slot: usize) -> Result<Cell> {
let offset = self.slot_offset(slot)?;
let (cell, _) = Cell::decode(&self.buf[..], offset)?;
Ok(cell)
}
pub fn find(&self, rowid: i64) -> Result<Find> {
let mut lo = 0usize;
let mut hi = self.slot_count();
while lo < hi {
let mid = lo + (hi - lo) / 2;
let mid_rowid = self.rowid_at(mid)?;
match mid_rowid.cmp(&rowid) {
std::cmp::Ordering::Equal => return Ok(Find::Found(mid)),
std::cmp::Ordering::Less => lo = mid + 1,
std::cmp::Ordering::Greater => hi = mid,
}
}
Ok(Find::NotFound(lo))
}
pub fn lookup(&self, rowid: i64) -> Result<Option<Cell>> {
match self.find(rowid)? {
Find::Found(slot) => Ok(Some(self.cell_at(slot)?)),
Find::NotFound(_) => Ok(None),
}
}
pub fn iter(&self) -> TablePageIter<'_> {
TablePageIter { page: self, pos: 0 }
}
pub fn insert(&mut self, cell: &Cell) -> Result<()> {
let encoded = cell.encode()?;
self.insert_entry(cell.rowid, &encoded)
}
pub fn insert_paged_entry(&mut self, entry: &PagedEntry) -> Result<()> {
let encoded = entry.encode()?;
self.insert_entry(entry.rowid(), &encoded)
}
pub fn insert_entry(&mut self, rowid: i64, encoded: &[u8]) -> Result<()> {
match self.find(rowid)? {
Find::Found(_) => Err(SQLRiteError::Internal(format!(
"duplicate rowid {rowid} — caller must delete before re-inserting"
))),
Find::NotFound(insert_at) => {
if !self.would_fit(encoded.len()) {
return Err(SQLRiteError::Internal(format!(
"page full: entry of {} bytes + slot wouldn't fit in {} bytes of free space",
encoded.len(),
self.free_space()
)));
}
let new_cells_top = self.cells_top() - encoded.len();
self.buf[new_cells_top..new_cells_top + encoded.len()].copy_from_slice(encoded);
self.set_cells_top(new_cells_top);
let old_count = self.slot_count();
let shift_start = Self::slots_start() + insert_at * SLOT_SIZE;
let shift_end = Self::slots_start() + old_count * SLOT_SIZE;
self.buf
.copy_within(shift_start..shift_end, shift_start + SLOT_SIZE);
self.set_slot_count(old_count + 1);
self.set_slot_offset(insert_at, new_cells_top);
Ok(())
}
}
}
pub fn entry_at(&self, slot: usize) -> Result<PagedEntry> {
let offset = self.slot_offset(slot)?;
let (entry, _) = PagedEntry::decode(&self.buf[..], offset)?;
Ok(entry)
}
pub fn delete(&mut self, rowid: i64) -> Result<bool> {
let slot = match self.find(rowid)? {
Find::Found(s) => s,
Find::NotFound(_) => return Ok(false),
};
let old_count = self.slot_count();
let shift_start = Self::slots_start() + (slot + 1) * SLOT_SIZE;
let shift_end = Self::slots_start() + old_count * SLOT_SIZE;
self.buf
.copy_within(shift_start..shift_end, shift_start - SLOT_SIZE);
self.set_slot_count(old_count - 1);
Ok(true)
}
}
pub struct TablePageIter<'a> {
page: &'a TablePage,
pos: usize,
}
impl<'a> Iterator for TablePageIter<'a> {
type Item = Result<Cell>;
fn next(&mut self) -> Option<Self::Item> {
if self.pos >= self.page.slot_count() {
return None;
}
let cell = self.page.cell_at(self.pos);
self.pos += 1;
Some(cell)
}
}
fn read_u16(buf: &[u8], offset: usize) -> u16 {
u16::from_le_bytes([buf[offset], buf[offset + 1]])
}
fn write_u16(buf: &mut [u8], offset: usize, value: u16) {
let bytes = value.to_le_bytes();
buf[offset] = bytes[0];
buf[offset + 1] = bytes[1];
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sql::db::table::Value;
fn int_cell(rowid: i64, v: i64) -> Cell {
Cell::new(rowid, vec![Some(Value::Integer(v))])
}
fn text_cell(rowid: i64, s: &str) -> Cell {
Cell::new(rowid, vec![Some(Value::Text(s.to_string()))])
}
#[test]
fn empty_page_metadata_matches_spec() {
let p = TablePage::empty();
assert_eq!(p.slot_count(), 0);
assert_eq!(p.cells_top(), PAYLOAD_PER_PAGE);
assert_eq!(p.free_space(), PAYLOAD_PER_PAGE - PAGE_PAYLOAD_HEADER);
}
#[test]
fn insert_lookup_iterate() {
let mut p = TablePage::empty();
p.insert(&int_cell(2, 20)).unwrap();
p.insert(&int_cell(1, 10)).unwrap();
p.insert(&int_cell(3, 30)).unwrap();
assert_eq!(p.slot_count(), 3);
assert_eq!(p.lookup(1).unwrap(), Some(int_cell(1, 10)));
assert_eq!(p.lookup(2).unwrap(), Some(int_cell(2, 20)));
assert_eq!(p.lookup(3).unwrap(), Some(int_cell(3, 30)));
assert_eq!(p.lookup(4).unwrap(), None);
let got: Vec<Cell> = p.iter().map(|r| r.unwrap()).collect();
assert_eq!(got, vec![int_cell(1, 10), int_cell(2, 20), int_cell(3, 30)]);
}
#[test]
fn duplicate_rowid_is_rejected() {
let mut p = TablePage::empty();
p.insert(&int_cell(1, 100)).unwrap();
let err = p.insert(&int_cell(1, 200)).unwrap_err();
assert!(format!("{err}").contains("duplicate rowid"));
}
#[test]
fn delete_then_reinsert() {
let mut p = TablePage::empty();
p.insert(&int_cell(1, 10)).unwrap();
p.insert(&int_cell(2, 20)).unwrap();
p.insert(&int_cell(3, 30)).unwrap();
assert!(p.delete(2).unwrap());
assert_eq!(p.slot_count(), 2);
assert_eq!(p.lookup(2).unwrap(), None);
assert_eq!(p.lookup(1).unwrap(), Some(int_cell(1, 10)));
assert_eq!(p.lookup(3).unwrap(), Some(int_cell(3, 30)));
p.insert(&int_cell(2, 999)).unwrap();
assert_eq!(p.lookup(2).unwrap(), Some(int_cell(2, 999)));
}
#[test]
fn delete_missing_rowid_reports_false() {
let mut p = TablePage::empty();
p.insert(&int_cell(1, 10)).unwrap();
assert!(!p.delete(999).unwrap());
assert_eq!(p.slot_count(), 1);
}
#[test]
fn bytes_round_trip_through_from_bytes() {
let mut p = TablePage::empty();
p.insert(&text_cell(1, "alpha")).unwrap();
p.insert(&text_cell(2, "beta")).unwrap();
p.insert(&text_cell(3, "gamma")).unwrap();
let bytes = *p.as_bytes();
let p2 = TablePage::from_bytes(&bytes);
assert_eq!(p2.slot_count(), 3);
assert_eq!(p2.lookup(1).unwrap(), Some(text_cell(1, "alpha")));
assert_eq!(p2.lookup(2).unwrap(), Some(text_cell(2, "beta")));
assert_eq!(p2.lookup(3).unwrap(), Some(text_cell(3, "gamma")));
}
#[test]
fn find_returns_insertion_slot() {
let mut p = TablePage::empty();
p.insert(&int_cell(10, 0)).unwrap();
p.insert(&int_cell(20, 0)).unwrap();
p.insert(&int_cell(30, 0)).unwrap();
assert_eq!(p.find(5).unwrap(), Find::NotFound(0));
assert_eq!(p.find(15).unwrap(), Find::NotFound(1));
assert_eq!(p.find(999).unwrap(), Find::NotFound(3));
assert_eq!(p.find(20).unwrap(), Find::Found(1));
}
#[test]
fn would_fit_gates_insert() {
let mut p = TablePage::empty();
let body = "x".repeat(200);
let mut rid = 0i64;
let mut inserted = 0usize;
loop {
rid += 1;
let c = text_cell(rid, &body);
let size = c.encoded_len().unwrap();
if !p.would_fit(size) {
break;
}
p.insert(&c).unwrap();
inserted += 1;
}
assert!(inserted > 10 && inserted < 50);
let overflow = text_cell(rid, &body);
let err = p.insert(&overflow).unwrap_err();
assert!(format!("{err}").contains("page full"));
}
#[test]
fn free_space_tracks_inserts_and_deletes_by_slot_only() {
let mut p = TablePage::empty();
let initial = p.free_space();
let c = int_cell(1, 42);
let cell_size = c.encoded_len().unwrap();
p.insert(&c).unwrap();
let after_insert = p.free_space();
assert_eq!(after_insert, initial - cell_size - SLOT_SIZE);
p.delete(1).unwrap();
let after_delete = p.free_space();
assert_eq!(after_delete, initial - cell_size);
}
#[test]
fn mixed_types_and_nulls_round_trip_on_a_page() {
let mut p = TablePage::empty();
p.insert(&Cell::new(
1,
vec![
Some(Value::Integer(10)),
Some(Value::Text("hi".to_string())),
None,
],
))
.unwrap();
p.insert(&Cell::new(
2,
vec![None, Some(Value::Real(2.5)), Some(Value::Bool(true))],
))
.unwrap();
let got: Vec<Cell> = p.iter().map(|r| r.unwrap()).collect();
assert_eq!(got.len(), 2);
assert_eq!(got[0].rowid, 1);
assert_eq!(got[0].values[2], None);
assert_eq!(got[1].rowid, 2);
assert_eq!(got[1].values[0], None);
}
#[test]
fn peek_rowid_matches_cell_at() {
let mut p = TablePage::empty();
p.insert(&int_cell(42, 0)).unwrap();
p.insert(&int_cell(7, 0)).unwrap();
p.insert(&int_cell(100, 0)).unwrap();
assert_eq!(p.rowid_at(0).unwrap(), 7);
assert_eq!(p.rowid_at(1).unwrap(), 42);
assert_eq!(p.rowid_at(2).unwrap(), 100);
}
}