use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::PathBuf;
use cypherlite_core::{CypherLiteError, DatabaseConfig, PageId, Result};
use super::{DatabaseHeader, FORMAT_VERSION, FSM_PAGE_ID, MAGIC, PAGE_SIZE};
pub struct PageManager {
file: File,
header: DatabaseHeader,
path: PathBuf,
next_free_hint: u32,
}
impl PageManager {
pub fn create_database(config: &DatabaseConfig) -> Result<Self> {
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create_new(true)
.open(&config.path)?;
let header = DatabaseHeader::new();
let header_page = header.to_page();
file.write_all(&header_page)?;
let mut fsm_page = [0u8; PAGE_SIZE];
fsm_page[0] = 0b0000_0011; file.write_all(&fsm_page)?;
file.sync_all()?;
Ok(Self {
file,
header,
path: config.path.clone(),
next_free_hint: super::FIRST_DATA_PAGE,
})
}
pub fn open_database(config: &DatabaseConfig) -> Result<Self> {
let mut file = OpenOptions::new()
.read(true)
.write(true)
.open(&config.path)?;
let mut header_buf = [0u8; PAGE_SIZE];
file.read_exact(&mut header_buf)?;
let header = DatabaseHeader::from_page(&header_buf);
if header.magic != MAGIC {
return Err(CypherLiteError::InvalidMagicNumber);
}
if header.version == 0 || header.version > FORMAT_VERSION {
return Err(CypherLiteError::UnsupportedVersion {
found: header.version,
supported: FORMAT_VERSION,
});
}
let mut header = header;
if header.version < FORMAT_VERSION {
if header.version < 2 {
header.version_store_root_page = 0;
}
header.version = FORMAT_VERSION;
}
let compiled = super::DatabaseHeader::compiled_feature_flags();
let db_flags = header.feature_flags;
if (db_flags & !compiled) != 0 {
return Err(CypherLiteError::FeatureIncompatible {
db_flags,
compiled_flags: compiled,
});
}
Ok(Self {
file,
header,
path: config.path.clone(),
next_free_hint: super::FIRST_DATA_PAGE,
})
}
pub fn allocate_page(&mut self) -> Result<PageId> {
let mut fsm_buf = [0u8; PAGE_SIZE];
self.read_page_raw(FSM_PAGE_ID, &mut fsm_buf)?;
let start_byte = (self.next_free_hint / 8) as usize;
if let Some(page_id) = Self::find_free_page(&fsm_buf, start_byte) {
return self.complete_allocation(&mut fsm_buf, page_id);
}
if start_byte > 0 {
if let Some(page_id) = Self::find_free_page(&fsm_buf, 0) {
return self.complete_allocation(&mut fsm_buf, page_id);
}
}
Err(CypherLiteError::OutOfSpace)
}
fn find_free_page(fsm_buf: &[u8; PAGE_SIZE], start_byte: usize) -> Option<u32> {
for (offset, byte_val) in fsm_buf[start_byte..].iter().enumerate() {
if *byte_val != 0xFF {
let byte_idx = start_byte + offset;
for bit in 0..8u32 {
if (*byte_val & (1 << bit)) == 0 {
return Some((byte_idx as u32) * 8 + bit);
}
}
}
}
None
}
fn complete_allocation(
&mut self,
fsm_buf: &mut [u8; PAGE_SIZE],
page_id: u32,
) -> Result<PageId> {
let byte_idx = page_id as usize / 8;
let bit_idx = page_id % 8;
fsm_buf[byte_idx] |= 1 << bit_idx;
self.write_page_raw(FSM_PAGE_ID, fsm_buf)?;
self.next_free_hint = page_id + 1;
if page_id >= self.header.page_count {
self.header.page_count = page_id + 1;
self.flush_header()?;
}
let required_size = (page_id as u64 + 1) * PAGE_SIZE as u64;
let current_size = self.file.seek(SeekFrom::End(0))?;
if current_size < required_size {
self.file
.seek(SeekFrom::Start(page_id as u64 * PAGE_SIZE as u64))?;
self.file.write_all(&[0u8; PAGE_SIZE])?;
}
Ok(PageId(page_id))
}
pub fn deallocate_page(&mut self, page_id: PageId) -> Result<()> {
let mut fsm_buf = [0u8; PAGE_SIZE];
self.read_page_raw(FSM_PAGE_ID, &mut fsm_buf)?;
let byte_idx = page_id.0 as usize / 8;
let bit_idx = page_id.0 % 8;
fsm_buf[byte_idx] &= !(1 << bit_idx);
self.write_page_raw(FSM_PAGE_ID, &fsm_buf)?;
if page_id.0 < self.next_free_hint {
self.next_free_hint = page_id.0;
}
Ok(())
}
pub fn read_page(&mut self, page_id: PageId) -> Result<[u8; PAGE_SIZE]> {
let mut buf = [0u8; PAGE_SIZE];
self.read_page_raw(page_id.0, &mut buf)?;
Ok(buf)
}
pub fn write_page(&mut self, page_id: PageId, data: &[u8; PAGE_SIZE]) -> Result<()> {
self.write_page_raw(page_id.0, data)
}
pub fn flush_header(&mut self) -> Result<()> {
let page = self.header.to_page();
self.write_page_raw(0, &page)?;
Ok(())
}
pub fn header(&self) -> &DatabaseHeader {
&self.header
}
pub fn header_mut(&mut self) -> &mut DatabaseHeader {
&mut self.header
}
pub fn path(&self) -> &PathBuf {
&self.path
}
pub fn sync(&mut self) -> Result<()> {
self.file.sync_all()?;
Ok(())
}
fn read_page_raw(&mut self, page_id: u32, buf: &mut [u8; PAGE_SIZE]) -> Result<()> {
let offset = page_id as u64 * PAGE_SIZE as u64;
self.file.seek(SeekFrom::Start(offset))?;
self.file.read_exact(buf)?;
Ok(())
}
fn write_page_raw(&mut self, page_id: u32, data: &[u8; PAGE_SIZE]) -> Result<()> {
let offset = page_id as u64 * PAGE_SIZE as u64;
self.file.seek(SeekFrom::Start(offset))?;
self.file.write_all(data)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::page::{DatabaseHeader, FIRST_DATA_PAGE};
use tempfile::tempdir;
fn test_config(dir: &std::path::Path) -> DatabaseConfig {
DatabaseConfig {
path: dir.join("test.cyl"),
..Default::default()
}
}
#[test]
fn test_create_database_writes_header() {
let dir = tempdir().expect("tempdir");
let config = test_config(dir.path());
let pm = PageManager::create_database(&config).expect("create");
assert_eq!(pm.header().magic, MAGIC);
assert_eq!(pm.header().version, FORMAT_VERSION);
assert_eq!(pm.header().page_count, FIRST_DATA_PAGE);
}
#[test]
fn test_create_database_file_size() {
let dir = tempdir().expect("tempdir");
let config = test_config(dir.path());
let _pm = PageManager::create_database(&config).expect("create");
let file_size = std::fs::metadata(&config.path).expect("metadata").len();
assert_eq!(file_size, 2 * PAGE_SIZE as u64);
}
#[test]
fn test_open_invalid_magic_number() {
let dir = tempdir().expect("tempdir");
let config = test_config(dir.path());
let mut file = File::create(&config.path).expect("create");
let mut bad_page = [0u8; PAGE_SIZE];
bad_page[0..4].copy_from_slice(&0xDEAD_BEEFu32.to_le_bytes());
file.write_all(&bad_page).expect("write");
file.write_all(&[0u8; PAGE_SIZE]).expect("write fsm");
drop(file);
let result = PageManager::open_database(&config);
assert!(matches!(result, Err(CypherLiteError::InvalidMagicNumber)));
}
#[test]
fn test_open_unsupported_version() {
let dir = tempdir().expect("tempdir");
let config = test_config(dir.path());
let mut hdr = DatabaseHeader::new();
hdr.version = 99;
let mut file = File::create(&config.path).expect("create");
file.write_all(&hdr.to_page()).expect("write");
file.write_all(&[0u8; PAGE_SIZE]).expect("write fsm");
drop(file);
let result = PageManager::open_database(&config);
assert!(matches!(
result,
Err(CypherLiteError::UnsupportedVersion {
found: 99,
supported
}) if supported == FORMAT_VERSION
));
}
#[test]
fn test_open_valid_database() {
let dir = tempdir().expect("tempdir");
let config = test_config(dir.path());
drop(PageManager::create_database(&config).expect("create"));
let pm = PageManager::open_database(&config).expect("open");
assert_eq!(pm.header().magic, MAGIC);
}
#[test]
fn test_allocate_page_returns_first_free() {
let dir = tempdir().expect("tempdir");
let config = test_config(dir.path());
let mut pm = PageManager::create_database(&config).expect("create");
let page = pm.allocate_page().expect("alloc");
assert_eq!(page, PageId(2));
let page2 = pm.allocate_page().expect("alloc");
assert_eq!(page2, PageId(3));
}
#[test]
fn test_deallocate_and_reallocate() {
let dir = tempdir().expect("tempdir");
let config = test_config(dir.path());
let mut pm = PageManager::create_database(&config).expect("create");
let p1 = pm.allocate_page().expect("alloc");
let p2 = pm.allocate_page().expect("alloc");
assert_eq!(p1, PageId(2));
assert_eq!(p2, PageId(3));
pm.deallocate_page(p1).expect("dealloc");
let p3 = pm.allocate_page().expect("alloc");
assert_eq!(p3, PageId(2));
}
#[test]
fn test_read_write_page_roundtrip() {
let dir = tempdir().expect("tempdir");
let config = test_config(dir.path());
let mut pm = PageManager::create_database(&config).expect("create");
let page_id = pm.allocate_page().expect("alloc");
let mut data = [0u8; PAGE_SIZE];
data[0] = 0xAB;
data[4095] = 0xCD;
pm.write_page(page_id, &data).expect("write");
let read_back = pm.read_page(page_id).expect("read");
assert_eq!(read_back[0], 0xAB);
assert_eq!(read_back[4095], 0xCD);
}
#[test]
fn test_allocate_multiple_pages() {
let dir = tempdir().expect("tempdir");
let config = test_config(dir.path());
let mut pm = PageManager::create_database(&config).expect("create");
let mut pages = vec![];
for i in 0..10 {
let p = pm.allocate_page().expect("alloc");
assert_eq!(p, PageId(FIRST_DATA_PAGE + i));
pages.push(p);
}
assert_eq!(pages.len(), 10);
}
#[test]
fn test_fsm_initial_state() {
let dir = tempdir().expect("tempdir");
let config = test_config(dir.path());
let mut pm = PageManager::create_database(&config).expect("create");
let fsm = pm.read_page(PageId(FSM_PAGE_ID)).expect("read fsm");
assert_eq!(fsm[0] & 0b11, 0b11);
assert_eq!(fsm[0] & 0b100, 0);
}
#[test]
fn test_flush_header_persists() {
let dir = tempdir().expect("tempdir");
let config = test_config(dir.path());
{
let mut pm = PageManager::create_database(&config).expect("create");
pm.header_mut().next_node_id = 42;
pm.flush_header().expect("flush");
}
let pm = PageManager::open_database(&config).expect("open");
assert_eq!(pm.header().next_node_id, 42);
}
#[test]
fn test_open_v2_database_migrates_to_v3() {
let dir = tempdir().expect("tempdir");
let config = test_config(dir.path());
let mut file = File::create(&config.path).expect("create");
let mut page = [0u8; PAGE_SIZE];
page[0..4].copy_from_slice(&MAGIC.to_le_bytes());
page[4..8].copy_from_slice(&2u32.to_le_bytes());
page[8..12].copy_from_slice(&FIRST_DATA_PAGE.to_le_bytes());
page[20..28].copy_from_slice(&1u64.to_le_bytes());
page[28..36].copy_from_slice(&1u64.to_le_bytes());
file.write_all(&page).expect("write header");
file.write_all(&[0u8; PAGE_SIZE]).expect("write fsm");
drop(file);
let pm = PageManager::open_database(&config).expect("open v2");
assert_eq!(pm.header().version, FORMAT_VERSION);
assert!(pm.header().feature_flags & DatabaseHeader::FLAG_TEMPORAL_CORE != 0);
}
#[test]
fn test_open_database_with_unsupported_features() {
let dir = tempdir().expect("tempdir");
let config = test_config(dir.path());
let mut page = [0u8; PAGE_SIZE];
page[0..4].copy_from_slice(&MAGIC.to_le_bytes());
page[4..8].copy_from_slice(&FORMAT_VERSION.to_le_bytes());
page[8..12].copy_from_slice(&FIRST_DATA_PAGE.to_le_bytes());
page[20..28].copy_from_slice(&1u64.to_le_bytes());
page[28..36].copy_from_slice(&1u64.to_le_bytes());
let bogus_flags = 0x8000_0000u32;
page[44..48].copy_from_slice(&bogus_flags.to_le_bytes());
let mut file = File::create(&config.path).expect("create");
file.write_all(&page).expect("write header");
file.write_all(&[0u8; PAGE_SIZE]).expect("write fsm");
drop(file);
let result = PageManager::open_database(&config);
assert!(matches!(
result,
Err(CypherLiteError::FeatureIncompatible { .. })
));
}
#[test]
fn test_new_database_has_current_format_version_header() {
let dir = tempdir().expect("tempdir");
let config = test_config(dir.path());
let pm = PageManager::create_database(&config).expect("create");
assert_eq!(pm.header().version, FORMAT_VERSION);
assert!(pm.header().feature_flags & DatabaseHeader::FLAG_TEMPORAL_CORE != 0);
}
#[test]
fn test_hint_skips_used_pages() {
let dir = tempdir().expect("tempdir");
let config = test_config(dir.path());
let mut pm = PageManager::create_database(&config).expect("create");
for _ in 0..10 {
pm.allocate_page().expect("alloc");
}
pm.deallocate_page(PageId(5)).expect("dealloc");
let reused = pm.allocate_page().expect("alloc");
assert_eq!(reused, PageId(5));
let next = pm.allocate_page().expect("alloc");
assert_eq!(next, PageId(12));
}
#[test]
fn test_hint_update_on_dealloc() {
let dir = tempdir().expect("tempdir");
let config = test_config(dir.path());
let mut pm = PageManager::create_database(&config).expect("create");
for _ in 0..4 {
pm.allocate_page().expect("alloc");
}
pm.deallocate_page(PageId(3)).expect("dealloc");
let reused = pm.allocate_page().expect("alloc");
assert_eq!(reused, PageId(3));
pm.deallocate_page(PageId(2)).expect("dealloc");
pm.deallocate_page(PageId(4)).expect("dealloc");
let first = pm.allocate_page().expect("alloc");
assert_eq!(first, PageId(2));
let second = pm.allocate_page().expect("alloc");
assert_eq!(second, PageId(4));
}
#[test]
fn test_sequential_allocation_with_hint() {
let dir = tempdir().expect("tempdir");
let config = test_config(dir.path());
let mut pm = PageManager::create_database(&config).expect("create");
for i in 0..50u32 {
let page = pm.allocate_page().expect("alloc");
assert_eq!(
page,
PageId(FIRST_DATA_PAGE + i),
"Page {} should have ID {}",
i,
FIRST_DATA_PAGE + i
);
}
}
}