use crate::buffer::AlignedBuffer;
use crate::checksum::Crc32c;
use crate::error::{PageError, PageResult};
pub const MIN_PAGE_SIZE: usize = 4096;
pub const MAX_PAGE_SIZE: usize = 1 << 20;
pub const PAGE_HEADER_SIZE: usize = 32;
pub const DEFAULT_PAGE_SIZE: PageSize = PageSize(4096);
const MAGIC: u32 = u32::from_le_bytes([b'P', b'G', b'D', b'B']);
const FORMAT_VERSION: u16 = 1;
const OFF_MAGIC: usize = 0;
const OFF_VERSION: usize = 4;
const OFF_PAGE_ID: usize = 8;
const OFF_LSN: usize = 16;
const OFF_CRC: usize = 24;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct PageId(u64);
impl PageId {
#[inline]
#[must_use]
pub const fn new(id: u64) -> Self {
Self(id)
}
#[inline]
#[must_use]
pub const fn get(self) -> u64 {
self.0
}
#[inline]
#[must_use]
pub(crate) const fn byte_offset(self, page_size: usize) -> u64 {
self.0 * page_size as u64
}
}
impl std::fmt::Display for PageId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct Lsn(u64);
impl Lsn {
pub const ZERO: Lsn = Lsn(0);
#[inline]
#[must_use]
pub const fn new(lsn: u64) -> Self {
Self(lsn)
}
#[inline]
#[must_use]
pub const fn get(self) -> u64 {
self.0
}
}
impl std::fmt::Display for Lsn {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct PageSize(usize);
impl PageSize {
pub const fn new(size: usize) -> PageResult<Self> {
if size < MIN_PAGE_SIZE || size > MAX_PAGE_SIZE || !size.is_power_of_two() {
return Err(PageError::InvalidPageSize { size });
}
Ok(Self(size))
}
#[inline]
#[must_use]
pub const fn get(self) -> usize {
self.0
}
#[inline]
#[must_use]
pub const fn payload_len(self) -> usize {
self.0 - PAGE_HEADER_SIZE
}
}
impl Default for PageSize {
#[inline]
fn default() -> Self {
DEFAULT_PAGE_SIZE
}
}
pub struct Page {
buf: AlignedBuffer,
size: usize,
}
impl Page {
#[must_use]
pub fn new(page_size: PageSize) -> Self {
let size = page_size.get();
let mut buf = AlignedBuffer::new_zeroed(size, size);
{
let bytes = buf.as_mut_slice();
write_u32(bytes, OFF_MAGIC, MAGIC);
write_u16(bytes, OFF_VERSION, FORMAT_VERSION);
}
Self { buf, size }
}
pub fn from_bytes(page_size: PageSize, bytes: &[u8]) -> PageResult<Self> {
let size = page_size.get();
if bytes.len() != size {
return Err(PageError::ShortRead {
page_id: 0,
got: bytes.len(),
page_size: size,
});
}
let mut buf = AlignedBuffer::new_zeroed(size, size);
buf.as_mut_slice().copy_from_slice(bytes);
let page = Self { buf, size };
page.verify(None)?;
Ok(page)
}
#[inline]
#[must_use]
pub fn page_size(&self) -> usize {
self.size
}
#[inline]
#[must_use]
pub fn id(&self) -> PageId {
PageId(read_u64(self.buf.as_slice(), OFF_PAGE_ID))
}
#[inline]
#[must_use]
pub fn lsn(&self) -> Lsn {
Lsn(read_u64(self.buf.as_slice(), OFF_LSN))
}
#[inline]
pub fn set_lsn(&mut self, lsn: Lsn) {
write_u64(self.buf.as_mut_slice(), OFF_LSN, lsn.0);
}
#[inline]
#[must_use]
pub fn payload(&self) -> &[u8] {
&self.buf.as_slice()[PAGE_HEADER_SIZE..]
}
#[inline]
pub fn payload_mut(&mut self) -> &mut [u8] {
&mut self.buf.as_mut_slice()[PAGE_HEADER_SIZE..]
}
#[must_use]
pub fn to_checksummed_bytes(&self) -> Vec<u8> {
let mut out = self.buf.as_slice().to_vec();
let crc = compute_checksum(&out);
write_u32(&mut out, OFF_CRC, crc);
out
}
pub(crate) fn reset(&mut self) {
let bytes = self.buf.as_mut_slice();
bytes.fill(0);
write_u32(bytes, OFF_MAGIC, MAGIC);
write_u16(bytes, OFF_VERSION, FORMAT_VERSION);
}
pub(crate) fn stamp(&mut self, id: PageId) {
{
let bytes = self.buf.as_mut_slice();
write_u32(bytes, OFF_MAGIC, MAGIC);
write_u16(bytes, OFF_VERSION, FORMAT_VERSION);
write_u64(bytes, OFF_PAGE_ID, id.0);
}
let crc = compute_checksum(self.buf.as_slice());
write_u32(self.buf.as_mut_slice(), OFF_CRC, crc);
}
pub(crate) fn verify(&self, expected: Option<PageId>) -> PageResult<()> {
let bytes = self.buf.as_slice();
let magic = read_u32(bytes, OFF_MAGIC);
if magic != MAGIC {
return Err(PageError::BadMagic {
found: magic,
expected: MAGIC,
});
}
let version = read_u16(bytes, OFF_VERSION);
if version != FORMAT_VERSION {
return Err(PageError::UnsupportedVersion {
found: version,
supported: FORMAT_VERSION,
});
}
let stored = read_u32(bytes, OFF_CRC);
let computed = compute_checksum(bytes);
if stored != computed {
return Err(PageError::ChecksumMismatch {
page_id: read_u64(bytes, OFF_PAGE_ID),
stored,
computed,
});
}
if let Some(expected) = expected {
let found = read_u64(bytes, OFF_PAGE_ID);
if found != expected.0 {
return Err(PageError::MisdirectedPage {
requested: expected.0,
found,
});
}
}
Ok(())
}
#[inline]
pub(crate) fn as_bytes(&self) -> &[u8] {
self.buf.as_slice()
}
#[inline]
pub(crate) fn as_bytes_mut(&mut self) -> &mut [u8] {
self.buf.as_mut_slice()
}
}
impl Clone for Page {
fn clone(&self) -> Self {
Self {
buf: self.buf.clone(),
size: self.size,
}
}
}
impl std::fmt::Debug for Page {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Page")
.field("id", &self.id())
.field("lsn", &self.lsn())
.field("page_size", &self.size)
.finish()
}
}
fn compute_checksum(bytes: &[u8]) -> u32 {
let mut crc = Crc32c::new();
crc.update(&bytes[..OFF_CRC]);
crc.update(&bytes[OFF_CRC + 4..]);
crc.finalize()
}
#[inline]
fn read_u16(bytes: &[u8], off: usize) -> u16 {
u16::from_le_bytes([bytes[off], bytes[off + 1]])
}
#[inline]
fn read_u32(bytes: &[u8], off: usize) -> u32 {
u32::from_le_bytes([bytes[off], bytes[off + 1], bytes[off + 2], bytes[off + 3]])
}
#[inline]
fn read_u64(bytes: &[u8], off: usize) -> u64 {
u64::from_le_bytes([
bytes[off],
bytes[off + 1],
bytes[off + 2],
bytes[off + 3],
bytes[off + 4],
bytes[off + 5],
bytes[off + 6],
bytes[off + 7],
])
}
#[inline]
fn write_u16(bytes: &mut [u8], off: usize, value: u16) {
bytes[off..off + 2].copy_from_slice(&value.to_le_bytes());
}
#[inline]
fn write_u32(bytes: &mut [u8], off: usize, value: u32) {
bytes[off..off + 4].copy_from_slice(&value.to_le_bytes());
}
#[inline]
fn write_u64(bytes: &mut [u8], off: usize, value: u64) {
bytes[off..off + 8].copy_from_slice(&value.to_le_bytes());
}
#[cfg(test)]
mod tests {
#![allow(clippy::unwrap_used, clippy::expect_used)]
use super::*;
#[test]
fn test_page_size_rejects_non_power_of_two() {
assert!(matches!(
PageSize::new(5000),
Err(PageError::InvalidPageSize { size: 5000 })
));
}
#[test]
fn test_page_size_rejects_out_of_range() {
assert!(PageSize::new(2048).is_err());
assert!(PageSize::new(MAX_PAGE_SIZE * 2).is_err());
}
#[test]
fn test_page_size_payload_len() {
let ps = PageSize::new(4096).expect("valid");
assert_eq!(ps.payload_len(), 4096 - PAGE_HEADER_SIZE);
}
#[test]
fn test_new_page_has_valid_header() {
let page = Page::new(DEFAULT_PAGE_SIZE);
assert_eq!(page.id(), PageId::new(0));
assert_eq!(page.lsn(), Lsn::ZERO);
assert_eq!(page.page_size(), 4096);
}
#[test]
fn test_stamp_then_verify_roundtrips() {
let mut page = Page::new(DEFAULT_PAGE_SIZE);
page.set_lsn(Lsn::new(7));
page.payload_mut()[..4].copy_from_slice(b"data");
page.stamp(PageId::new(3));
assert_eq!(page.id(), PageId::new(3));
page.verify(Some(PageId::new(3))).expect("verifies");
assert_eq!(page.lsn(), Lsn::new(7));
}
#[test]
fn test_verify_detects_corruption() {
let mut page = Page::new(DEFAULT_PAGE_SIZE);
page.stamp(PageId::new(1));
page.payload_mut()[10] ^= 0xFF;
assert!(matches!(
page.verify(Some(PageId::new(1))),
Err(PageError::ChecksumMismatch { .. })
));
}
#[test]
fn test_verify_detects_misdirected_page() {
let mut page = Page::new(DEFAULT_PAGE_SIZE);
page.stamp(PageId::new(5));
assert!(matches!(
page.verify(Some(PageId::new(6))),
Err(PageError::MisdirectedPage {
requested: 6,
found: 5
})
));
}
#[test]
fn test_from_bytes_rejects_wrong_length() {
let bytes = vec![0u8; 100];
assert!(matches!(
Page::from_bytes(DEFAULT_PAGE_SIZE, &bytes),
Err(PageError::ShortRead { .. })
));
}
#[test]
fn test_from_bytes_rejects_bad_magic() {
let bytes = vec![0u8; 4096];
assert!(matches!(
Page::from_bytes(DEFAULT_PAGE_SIZE, &bytes),
Err(PageError::BadMagic { .. })
));
}
#[test]
fn test_to_bytes_from_bytes_roundtrips() {
let mut page = Page::new(DEFAULT_PAGE_SIZE);
page.set_lsn(Lsn::new(99));
page.payload_mut()[..5].copy_from_slice(b"hello");
let bytes = page.to_checksummed_bytes();
let loaded = Page::from_bytes(DEFAULT_PAGE_SIZE, &bytes).expect("verifies");
assert_eq!(loaded.lsn(), Lsn::new(99));
assert_eq!(&loaded.payload()[..5], b"hello");
}
}