use crate::error::{KernelError, KernelResult, PageErrorKind};
use crate::kernel_api::PageId;
use crate::wal::LogSequenceNumber;
use bytes::{BufMut, Bytes, BytesMut};
pub const PAGE_SIZE: usize = 8192;
pub const PAGE_HEADER_SIZE: usize = 32;
pub const PAGE_DATA_SIZE: usize = PAGE_SIZE - PAGE_HEADER_SIZE;
pub const PAGE_MAGIC: u32 = 0x544F4F4E;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum PageType {
Free = 0,
Data = 1,
Index = 2,
Overflow = 3,
Metadata = 4,
}
impl TryFrom<u8> for PageType {
type Error = KernelError;
fn try_from(value: u8) -> Result<Self, Self::Error> {
match value {
0 => Ok(Self::Free),
1 => Ok(Self::Data),
2 => Ok(Self::Index),
3 => Ok(Self::Overflow),
4 => Ok(Self::Metadata),
_ => Err(KernelError::Page {
kind: PageErrorKind::InvalidSize,
}),
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct PageHeader {
pub magic: u32,
pub page_id: PageId,
pub page_lsn: LogSequenceNumber,
pub page_type: PageType,
pub flags: u8,
pub free_space: u16,
pub checksum: u32,
}
impl PageHeader {
pub fn new(page_id: PageId, page_type: PageType) -> Self {
Self {
magic: PAGE_MAGIC,
page_id,
page_lsn: LogSequenceNumber::INVALID,
page_type,
flags: 0,
free_space: PAGE_DATA_SIZE as u16,
checksum: 0,
}
}
pub fn serialize(&self) -> [u8; PAGE_HEADER_SIZE] {
let mut buf = [0u8; PAGE_HEADER_SIZE];
let mut cursor = 0;
buf[cursor..cursor + 4].copy_from_slice(&self.magic.to_le_bytes());
cursor += 4;
buf[cursor..cursor + 8].copy_from_slice(&self.page_id.to_le_bytes());
cursor += 8;
buf[cursor..cursor + 8].copy_from_slice(&self.page_lsn.0.to_le_bytes());
cursor += 8;
buf[cursor] = self.page_type as u8;
cursor += 1;
buf[cursor] = self.flags;
cursor += 1;
buf[cursor..cursor + 2].copy_from_slice(&self.free_space.to_le_bytes());
cursor += 2;
buf[cursor..cursor + 4].copy_from_slice(&self.checksum.to_le_bytes());
buf
}
pub fn deserialize(data: &[u8]) -> KernelResult<Self> {
if data.len() < PAGE_HEADER_SIZE {
return Err(KernelError::Page {
kind: PageErrorKind::InvalidSize,
});
}
let magic = u32::from_le_bytes(data[0..4].try_into().unwrap());
if magic != PAGE_MAGIC {
return Err(KernelError::Corruption {
details: format!(
"invalid page magic: expected {:#x}, got {:#x}",
PAGE_MAGIC, magic
),
});
}
let page_id = u64::from_le_bytes(data[4..12].try_into().unwrap());
let page_lsn = LogSequenceNumber(u64::from_le_bytes(data[12..20].try_into().unwrap()));
let page_type = PageType::try_from(data[20])?;
let flags = data[21];
let free_space = u16::from_le_bytes(data[22..24].try_into().unwrap());
let checksum = u32::from_le_bytes(data[24..28].try_into().unwrap());
Ok(Self {
magic,
page_id,
page_lsn,
page_type,
flags,
free_space,
checksum,
})
}
}
pub struct Page {
pub header: PageHeader,
pub data: BytesMut,
}
impl Page {
pub fn new(page_id: PageId, page_type: PageType) -> Self {
Self {
header: PageHeader::new(page_id, page_type),
data: BytesMut::zeroed(PAGE_DATA_SIZE),
}
}
pub fn from_bytes(bytes: &[u8]) -> KernelResult<Self> {
if bytes.len() != PAGE_SIZE {
return Err(KernelError::Page {
kind: PageErrorKind::InvalidSize,
});
}
let header = PageHeader::deserialize(&bytes[..PAGE_HEADER_SIZE])?;
let data = BytesMut::from(&bytes[PAGE_HEADER_SIZE..]);
let page = Self { header, data };
if !page.validate_checksum() {
return Err(KernelError::Corruption {
details: format!(
"invalid page checksum: expected {:#x}, computed {:#x}",
page.header.checksum,
page.compute_checksum()
),
});
}
Ok(page)
}
pub fn to_bytes(&self) -> Bytes {
let mut buf = BytesMut::with_capacity(PAGE_SIZE);
buf.put_slice(&self.header.serialize());
buf.put_slice(&self.data);
buf.freeze()
}
pub fn page_id(&self) -> PageId {
self.header.page_id
}
pub fn lsn(&self) -> LogSequenceNumber {
self.header.page_lsn
}
pub fn set_lsn(&mut self, lsn: LogSequenceNumber) {
self.header.page_lsn = lsn;
}
pub fn needs_redo(&self, record_lsn: LogSequenceNumber) -> bool {
self.header.page_lsn < record_lsn
}
pub fn compute_checksum(&self) -> u32 {
let mut hasher = crc32fast::Hasher::new();
hasher.update(&self.header.magic.to_le_bytes());
hasher.update(&self.header.page_id.to_le_bytes());
hasher.update(&self.header.page_lsn.0.to_le_bytes());
hasher.update(&[self.header.page_type as u8, self.header.flags]);
hasher.update(&self.header.free_space.to_le_bytes());
hasher.update(&self.data);
hasher.finalize()
}
pub fn validate_checksum(&self) -> bool {
self.header.checksum == self.compute_checksum()
}
pub fn update_checksum(&mut self) {
self.header.checksum = self.compute_checksum();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_page_header_roundtrip() {
let header = PageHeader {
magic: PAGE_MAGIC,
page_id: 42,
page_lsn: LogSequenceNumber(100),
page_type: PageType::Data,
flags: 0x01,
free_space: 1234,
checksum: 0xDEADBEEF,
};
let serialized = header.serialize();
let deserialized = PageHeader::deserialize(&serialized).unwrap();
assert_eq!(header.magic, deserialized.magic);
assert_eq!(header.page_id, deserialized.page_id);
assert_eq!(header.page_lsn, deserialized.page_lsn);
assert_eq!(header.page_type, deserialized.page_type);
assert_eq!(header.flags, deserialized.flags);
assert_eq!(header.free_space, deserialized.free_space);
assert_eq!(header.checksum, deserialized.checksum);
}
#[test]
fn test_page_roundtrip() {
let mut page = Page::new(1, PageType::Data);
page.data[0..5].copy_from_slice(b"hello");
page.set_lsn(LogSequenceNumber(50));
page.update_checksum();
let bytes = page.to_bytes();
let restored = Page::from_bytes(&bytes).unwrap();
assert_eq!(restored.page_id(), 1);
assert_eq!(restored.lsn(), LogSequenceNumber(50));
assert!(restored.validate_checksum());
}
#[test]
fn test_needs_redo() {
let mut page = Page::new(1, PageType::Data);
page.set_lsn(LogSequenceNumber(100));
assert!(!page.needs_redo(LogSequenceNumber(50)));
assert!(!page.needs_redo(LogSequenceNumber(100)));
assert!(page.needs_redo(LogSequenceNumber(150)));
}
#[test]
fn test_page_from_bytes_rejects_corrupted_checksum() {
let mut page = Page::new(1, PageType::Data);
page.data[0..5].copy_from_slice(b"hello");
page.update_checksum();
let bytes = page.to_bytes();
let mut corrupted = bytes.to_vec();
corrupted[PAGE_HEADER_SIZE] = 0xFF;
let result = Page::from_bytes(&corrupted);
assert!(result.is_err());
}
}