use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::Path;
use mentedb_core::error::{MenteError, MenteResult};
use tracing::{debug, info, trace};
pub const PAGE_SIZE: usize = 16 * 1024;
const MAGIC: u64 = 0x4D454E_5445444231;
const VERSION: u32 = 1;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct PageId(pub u64);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum PageType {
Free = 0,
Data = 1,
Index = 2,
Overflow = 3,
}
impl From<u8> for PageType {
fn from(v: u8) -> Self {
match v {
1 => PageType::Data,
2 => PageType::Index,
3 => PageType::Overflow,
_ => PageType::Free,
}
}
}
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct PageHeader {
pub page_id: u64,
pub lsn: u64,
pub checksum: u32,
pub free_space: u16,
pub num_slots: u16,
pub page_type: u8,
pub _padding: [u8; 7],
}
pub const HEADER_SIZE: usize = std::mem::size_of::<PageHeader>();
pub const PAGE_DATA_SIZE: usize = PAGE_SIZE - HEADER_SIZE;
#[repr(C, align(4096))]
pub struct Page {
pub header: PageHeader,
pub data: [u8; PAGE_DATA_SIZE],
}
impl std::fmt::Debug for Page {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Page")
.field("header", &self.header)
.field("data_len", &self.data.len())
.finish()
}
}
impl Clone for Page {
fn clone(&self) -> Self {
let mut new_page = Page::zeroed();
new_page.header = self.header;
new_page.data.copy_from_slice(&self.data);
new_page
}
}
impl Page {
pub fn zeroed() -> Self {
unsafe { std::mem::zeroed() }
}
fn as_bytes(&self) -> &[u8; PAGE_SIZE] {
unsafe { &*(self as *const Page as *const [u8; PAGE_SIZE]) }
}
fn from_bytes(bytes: &[u8; PAGE_SIZE]) -> Self {
unsafe { std::ptr::read(bytes.as_ptr() as *const Page) }
}
pub fn compute_checksum(&self) -> u32 {
let mut h = crc32fast::Hasher::new();
h.update(&self.header.page_id.to_le_bytes());
h.update(&self.header.lsn.to_le_bytes());
h.update(&self.header.free_space.to_le_bytes());
h.update(&self.header.num_slots.to_le_bytes());
h.update(&[self.header.page_type]);
h.update(&self.data);
h.finalize()
}
}
#[repr(C)]
struct FileHeader {
magic: u64,
version: u32,
_pad: u32,
page_count: u64,
free_list_head: u64,
}
pub struct PageManager {
file: File,
page_count: u64,
free_list_head: u64,
}
impl PageManager {
pub fn open(dir_path: &Path) -> MenteResult<Self> {
let file_path = dir_path.join("pages.db");
let exists = file_path.exists()
&& std::fs::metadata(&file_path)
.map(|m| m.len() > 0)
.unwrap_or(false);
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&file_path)?;
if exists {
let mut buf = [0u8; std::mem::size_of::<FileHeader>()];
file.seek(SeekFrom::Start(0))?;
file.read_exact(&mut buf)?;
let hdr: FileHeader = unsafe { std::ptr::read(buf.as_ptr() as *const FileHeader) };
if hdr.magic != MAGIC {
return Err(MenteError::Storage("invalid page file magic number".into()));
}
if hdr.version != VERSION {
return Err(MenteError::Storage(format!(
"unsupported page file version: {}",
hdr.version
)));
}
info!(page_count = hdr.page_count, "opened existing page file");
Ok(Self {
file,
page_count: hdr.page_count,
free_list_head: hdr.free_list_head,
})
} else {
let mut pm = Self {
file,
page_count: 1,
free_list_head: 0,
};
let mut header_page = Page::zeroed();
header_page.header.page_id = 0;
pm.write_page_raw(PageId(0), &header_page)?;
pm.write_file_header()?;
info!("created new page file");
Ok(pm)
}
}
fn write_file_header(&mut self) -> MenteResult<()> {
let hdr = FileHeader {
magic: MAGIC,
version: VERSION,
_pad: 0,
page_count: self.page_count,
free_list_head: self.free_list_head,
};
let bytes = unsafe {
std::slice::from_raw_parts(
&hdr as *const FileHeader as *const u8,
std::mem::size_of::<FileHeader>(),
)
};
self.file.seek(SeekFrom::Start(0))?;
self.file.write_all(bytes)?;
self.file.flush()?;
Ok(())
}
pub fn allocate_page(&mut self) -> MenteResult<PageId> {
if self.free_list_head != 0 {
let page_id = PageId(self.free_list_head);
let page = self.read_page(page_id)?;
let next_free = u64::from_le_bytes(page.data[..8].try_into().unwrap());
self.free_list_head = next_free;
self.write_file_header()?;
debug!(page_id = page_id.0, "allocated page from free list");
return Ok(page_id);
}
let page_id = PageId(self.page_count);
self.page_count += 1;
let mut page = Page::zeroed();
page.header.page_id = page_id.0;
page.header.page_type = PageType::Data as u8;
page.header.free_space = PAGE_DATA_SIZE as u16;
self.write_page_raw(page_id, &page)?;
self.write_file_header()?;
debug!(page_id = page_id.0, "allocated new page");
Ok(page_id)
}
pub fn read_page(&mut self, page_id: PageId) -> MenteResult<Box<Page>> {
if page_id.0 >= self.page_count {
return Err(MenteError::Storage(format!(
"page {} out of range (count={})",
page_id.0, self.page_count
)));
}
let offset = page_id.0 * PAGE_SIZE as u64;
let mut buf = [0u8; PAGE_SIZE];
self.file.seek(SeekFrom::Start(offset))?;
self.file.read_exact(&mut buf)?;
trace!(page_id = page_id.0, "read page from disk");
Ok(Box::new(Page::from_bytes(&buf)))
}
pub fn write_page(&mut self, page_id: PageId, page: &Page) -> MenteResult<()> {
self.write_page_raw(page_id, page)
}
fn write_page_raw(&mut self, page_id: PageId, page: &Page) -> MenteResult<()> {
let offset = page_id.0 * PAGE_SIZE as u64;
self.file.seek(SeekFrom::Start(offset))?;
self.file.write_all(page.as_bytes())?;
trace!(page_id = page_id.0, "wrote page to disk");
Ok(())
}
pub fn free_page(&mut self, page_id: PageId) -> MenteResult<()> {
let mut page = Page::zeroed();
page.header.page_id = page_id.0;
page.header.page_type = PageType::Free as u8;
page.data[..8].copy_from_slice(&self.free_list_head.to_le_bytes());
self.write_page(page_id, &page)?;
self.free_list_head = page_id.0;
self.write_file_header()?;
debug!(page_id = page_id.0, "freed page");
Ok(())
}
pub fn page_count(&self) -> u64 {
self.page_count
}
pub fn sync(&mut self) -> MenteResult<()> {
self.file.sync_data()?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn setup() -> (tempfile::TempDir, PageManager) {
let dir = tempfile::tempdir().unwrap();
let pm = PageManager::open(dir.path()).unwrap();
(dir, pm)
}
#[test]
fn test_allocate_and_read_write() {
let (_dir, mut pm) = setup();
let pid = pm.allocate_page().unwrap();
assert_eq!(pid.0, 1);
let mut page = Page::zeroed();
page.header.page_id = pid.0;
page.header.page_type = PageType::Data as u8;
page.data[0..5].copy_from_slice(b"hello");
pm.write_page(pid, &page).unwrap();
let loaded = pm.read_page(pid).unwrap();
assert_eq!(&loaded.data[0..5], b"hello");
}
#[test]
fn test_free_and_reuse() {
let (_dir, mut pm) = setup();
let p1 = pm.allocate_page().unwrap();
let p2 = pm.allocate_page().unwrap();
assert_eq!(p1.0, 1);
assert_eq!(p2.0, 2);
pm.free_page(p1).unwrap();
let p3 = pm.allocate_page().unwrap();
assert_eq!(p3.0, p1.0);
let p4 = pm.allocate_page().unwrap();
assert_eq!(p4.0, 3);
}
#[test]
fn test_multiple_free_reuse() {
let (_dir, mut pm) = setup();
let p1 = pm.allocate_page().unwrap();
let p2 = pm.allocate_page().unwrap();
let _p3 = pm.allocate_page().unwrap();
pm.free_page(p1).unwrap();
pm.free_page(p2).unwrap();
let a1 = pm.allocate_page().unwrap();
let a2 = pm.allocate_page().unwrap();
assert_eq!(a1.0, p2.0);
assert_eq!(a2.0, p1.0);
let a3 = pm.allocate_page().unwrap();
assert_eq!(a3.0, 4);
}
#[test]
fn test_reopen() {
let dir = tempfile::tempdir().unwrap();
let pid;
{
let mut pm = PageManager::open(dir.path()).unwrap();
pid = pm.allocate_page().unwrap();
let mut page = Page::zeroed();
page.header.page_id = pid.0;
page.data[0..4].copy_from_slice(b"test");
pm.write_page(pid, &page).unwrap();
pm.sync().unwrap();
}
{
let mut pm = PageManager::open(dir.path()).unwrap();
let page = pm.read_page(pid).unwrap();
assert_eq!(&page.data[0..4], b"test");
}
}
#[test]
fn test_out_of_range() {
let (_dir, mut pm) = setup();
assert!(pm.read_page(PageId(999)).is_err());
}
#[test]
fn test_checksum() {
let mut page = Page::zeroed();
page.header.page_id = 42;
page.data[0] = 0xFF;
let c1 = page.compute_checksum();
page.data[0] = 0x00;
let c2 = page.compute_checksum();
assert_ne!(c1, c2);
}
}