use std::fs::File;
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
use crate::error::{HoraError, Result};
use super::mmap::write_pages_to_file;
use super::page::{crc32, PageAllocator, PageType};
use super::wal::{WalFrame, WalHeader, WriteAheadLog};
const FILE_MAGIC: [u8; 4] = *b"HORA";
const FILE_VERSION: u16 = 1;
const FILE_HEADER_SIZE: usize = 32;
#[derive(Debug, Clone)]
pub struct FileHeader {
pub magic: [u8; 4],
pub version: u16,
pub page_size: u32,
pub page_count: u32,
pub freelist_page: u32,
pub freelist_count: u32,
pub header_checksum: u32,
}
impl FileHeader {
pub fn new(page_size: u32) -> Self {
Self {
magic: FILE_MAGIC,
version: FILE_VERSION,
page_size,
page_count: 1,
freelist_page: 0,
freelist_count: 0,
header_checksum: 0,
}
}
pub fn write_to(&self, buf: &mut [u8]) {
buf[0..4].copy_from_slice(&self.magic);
buf[4..6].copy_from_slice(&self.version.to_le_bytes());
buf[6..10].copy_from_slice(&self.page_size.to_le_bytes());
buf[10..14].copy_from_slice(&self.page_count.to_le_bytes());
buf[14..18].copy_from_slice(&self.freelist_page.to_le_bytes());
buf[18..22].copy_from_slice(&self.freelist_count.to_le_bytes());
let checksum = crc32(&buf[0..22]);
buf[22..26].copy_from_slice(&checksum.to_le_bytes());
buf[26..32].fill(0);
}
pub fn read_from(buf: &[u8]) -> Result<Self> {
if buf.len() < FILE_HEADER_SIZE {
return Err(HoraError::InvalidFile {
reason: "file too short for header",
});
}
let magic: [u8; 4] = buf[0..4].try_into().unwrap();
if magic != FILE_MAGIC {
return Err(HoraError::InvalidFile {
reason: "invalid magic bytes (expected HORA)",
});
}
let version = u16::from_le_bytes([buf[4], buf[5]]);
if version != FILE_VERSION {
return Err(HoraError::InvalidFile {
reason: "unsupported file version",
});
}
let page_size = u32::from_le_bytes(buf[6..10].try_into().unwrap());
let page_count = u32::from_le_bytes(buf[10..14].try_into().unwrap());
let freelist_page = u32::from_le_bytes(buf[14..18].try_into().unwrap());
let freelist_count = u32::from_le_bytes(buf[18..22].try_into().unwrap());
let stored_checksum = u32::from_le_bytes(buf[22..26].try_into().unwrap());
let computed_checksum = crc32(&buf[0..22]);
if stored_checksum != computed_checksum {
return Err(HoraError::InvalidFile {
reason: "file header checksum mismatch",
});
}
Ok(Self {
magic,
version,
page_size,
page_count,
freelist_page,
freelist_count,
header_checksum: stored_checksum,
})
}
}
fn wal_path(db_path: &Path) -> PathBuf {
db_path.with_extension("wal")
}
fn lock_path(db_path: &Path) -> PathBuf {
db_path.with_extension("lock")
}
#[cfg(unix)]
fn is_process_alive(pid: u32) -> bool {
extern "C" {
fn kill(pid: i32, sig: i32) -> i32;
}
unsafe { kill(pid as i32, 0) == 0 }
}
#[cfg(not(unix))]
fn is_process_alive(_pid: u32) -> bool {
false }
fn write_wal_file(path: &Path, header: &WalHeader, frames: &[WalFrame]) -> std::io::Result<()> {
let mut file = File::create(path)?;
file.write_all(&header.to_bytes())?;
for frame in frames {
file.write_all(&frame.to_bytes())?;
}
file.sync_all()?;
Ok(())
}
fn read_wal_file(path: &Path, page_size: usize) -> std::io::Result<Vec<WalFrame>> {
let mut data = Vec::new();
File::open(path)?.read_to_end(&mut data)?;
if data.len() < 26 {
return Ok(Vec::new());
}
let header = match WalHeader::from_bytes(&data) {
Some(h) => h,
None => return Ok(Vec::new()),
};
let frame_size = WalFrame::HEADER_SIZE + page_size;
let mut offset = 26;
let mut frames = Vec::new();
while offset + frame_size <= data.len() {
if let Some(frame) = WalFrame::from_bytes(&data[offset..], page_size) {
if frame.verify() && frame.salt == header.salt {
frames.push(frame);
}
}
offset += frame_size;
}
Ok(frames)
}
pub struct Database {
path: PathBuf,
alloc: PageAllocator,
wal: WriteAheadLog,
}
impl Database {
pub fn open(path: impl AsRef<Path>, page_size: usize) -> Result<Self> {
let path = path.as_ref().to_path_buf();
Self::acquire_lock(&path)?;
let result = if path.exists() {
Self::open_existing(&path, page_size)
} else {
Self::create_new(&path, page_size)
};
if result.is_err() {
let _ = std::fs::remove_file(lock_path(&path));
}
result
}
fn acquire_lock(db_path: &Path) -> Result<()> {
let lock = lock_path(db_path);
if lock.exists() {
if let Ok(pid_str) = std::fs::read_to_string(&lock) {
if let Ok(pid) = pid_str.trim().parse::<u32>() {
if is_process_alive(pid) {
return Err(HoraError::InvalidFile {
reason: "database is locked by another process",
});
}
}
}
let _ = std::fs::remove_file(&lock);
}
std::fs::write(&lock, std::process::id().to_string().as_bytes()).map_err(|_| {
HoraError::InvalidFile {
reason: "cannot create lock file",
}
})?;
Ok(())
}
fn create_new(path: &Path, page_size: usize) -> Result<Self> {
let mut alloc = PageAllocator::new(page_size);
let fh = FileHeader {
page_count: alloc.page_count(),
..FileHeader::new(page_size as u32)
};
fh.write_to(alloc.write_page(0)?);
write_pages_to_file(&alloc, path).map_err(|_| HoraError::InvalidFile {
reason: "cannot write new database file",
})?;
Ok(Self {
path: path.to_path_buf(),
alloc,
wal: WriteAheadLog::new(page_size as u32),
})
}
fn open_existing(path: &Path, page_size: usize) -> Result<Self> {
let data = std::fs::read(path).map_err(|_| HoraError::InvalidFile {
reason: "cannot read database file",
})?;
if data.len() < page_size {
return Err(HoraError::InvalidFile {
reason: "database file too small",
});
}
let fh = FileHeader::read_from(&data)?;
if fh.page_size as usize != page_size {
return Err(HoraError::InvalidFile {
reason: "page size mismatch",
});
}
let mut alloc =
PageAllocator::from_file_data(page_size, &data, fh.freelist_page, fh.freelist_count);
let wal_p = wal_path(path);
if wal_p.exists() {
let frames = read_wal_file(&wal_p, page_size).map_err(|_| HoraError::InvalidFile {
reason: "cannot read WAL file",
})?;
Self::replay_frames(&mut alloc, &frames);
let _ = std::fs::remove_file(&wal_p);
}
Ok(Self {
path: path.to_path_buf(),
alloc,
wal: WriteAheadLog::new(page_size as u32),
})
}
const MAX_REPLAY_PAGE: u32 = 16_000_000;
fn replay_frames(alloc: &mut PageAllocator, frames: &[WalFrame]) {
for frame in frames {
if frame.page_number > Self::MAX_REPLAY_PAGE {
continue; }
let page_num = frame.page_number as usize;
while alloc.page_count() as usize <= page_num {
alloc.alloc_page(PageType::Free);
}
if let Ok(page) = alloc.write_page(frame.page_number) {
let len = frame.data.len().min(page.len());
page[..len].copy_from_slice(&frame.data[..len]);
}
}
}
pub fn alloc(&self) -> &PageAllocator {
&self.alloc
}
pub fn alloc_mut(&mut self) -> &mut PageAllocator {
&mut self.alloc
}
pub fn wal(&self) -> &WriteAheadLog {
&self.wal
}
pub fn wal_mut(&mut self) -> &mut WriteAheadLog {
&mut self.wal
}
pub fn write_frame(&mut self, page_num: u32, data: Vec<u8>) -> bool {
let db_size = self.alloc.page_count();
self.wal.write_frame(page_num, db_size, data)
}
pub fn read_page(&self, page_num: u32) -> Result<&[u8]> {
if let Some(data) = self.wal.read_page(page_num) {
Ok(data)
} else {
self.alloc.read_page(page_num)
}
}
pub fn checkpoint(&mut self) -> Result<()> {
if self.wal.in_transaction() {
return Err(HoraError::InvalidFile {
reason: "cannot checkpoint during active transaction",
});
}
self.wal.checkpoint(&mut self.alloc);
let fh = FileHeader {
page_count: self.alloc.page_count(),
freelist_page: self.alloc.freelist_head(),
freelist_count: self.alloc.freelist_count(),
..FileHeader::new(self.alloc.page_size() as u32)
};
fh.write_to(self.alloc.write_page(0)?);
write_pages_to_file(&self.alloc, &self.path).map_err(|_| HoraError::InvalidFile {
reason: "cannot write database file during checkpoint",
})?;
let _ = std::fs::remove_file(wal_path(&self.path));
Ok(())
}
pub fn flush_wal(&self) -> Result<()> {
let committed = self.wal.committed_frames();
if committed.is_empty() {
return Ok(());
}
write_wal_file(&wal_path(&self.path), self.wal.header(), committed).map_err(|_| {
HoraError::InvalidFile {
reason: "cannot write WAL file",
}
})
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn begin_transaction(&mut self) -> Result<()> {
if !self.wal.begin_transaction() {
return Err(HoraError::InvalidFile {
reason: "transaction already active",
});
}
Ok(())
}
pub fn commit(&mut self) -> Result<()> {
if !self.wal.commit_transaction() {
return Err(HoraError::InvalidFile {
reason: "no active transaction to commit",
});
}
Ok(())
}
pub fn rollback(&mut self) -> Result<()> {
if !self.wal.rollback_transaction() {
return Err(HoraError::InvalidFile {
reason: "no active transaction to rollback",
});
}
Ok(())
}
pub fn in_transaction(&self) -> bool {
self.wal.in_transaction()
}
pub fn compact(&mut self) -> Result<CompactStats> {
self.checkpoint()?;
let old_count = self.alloc.page_count();
let relocations = self.alloc.compact();
let new_count = self.alloc.page_count();
let fh = FileHeader {
page_count: new_count,
freelist_page: self.alloc.freelist_head(),
freelist_count: self.alloc.freelist_count(),
..FileHeader::new(self.alloc.page_size() as u32)
};
fh.write_to(self.alloc.write_page(0)?);
write_pages_to_file(&self.alloc, &self.path).map_err(|_| HoraError::InvalidFile {
reason: "cannot write database file during compact",
})?;
Ok(CompactStats {
pages_relocated: relocations.len(),
pages_freed: (old_count - new_count) as usize,
old_page_count: old_count,
new_page_count: new_count,
relocations,
})
}
pub fn full_vacuum(&mut self) -> Result<CompactStats> {
self.checkpoint()?;
let old_count = self.alloc.page_count();
let page_size = self.alloc.page_size();
let mut new_alloc = PageAllocator::new(page_size);
let mut relocations = Vec::new();
for old_num in 1..old_count {
let page_data = self.alloc.read_page(old_num)?;
if let Some(hdr) = super::page::PageHeader::read_from(page_data) {
if hdr.page_type == PageType::Free {
continue;
}
}
let new_num = new_alloc.push_raw_page(page_data.to_vec());
if old_num != new_num {
relocations.push((old_num, new_num));
}
}
let fh = FileHeader {
page_count: new_alloc.page_count(),
..FileHeader::new(page_size as u32)
};
fh.write_to(new_alloc.write_page(0)?);
let tmp_path = self.path.with_extension("tmp");
write_pages_to_file(&new_alloc, &tmp_path).map_err(|_| HoraError::InvalidFile {
reason: "cannot write temp file during full vacuum",
})?;
std::fs::rename(&tmp_path, &self.path).map_err(|_| HoraError::InvalidFile {
reason: "cannot rename temp file during full vacuum",
})?;
let new_count = new_alloc.page_count();
self.alloc = new_alloc;
Ok(CompactStats {
pages_relocated: relocations.len(),
pages_freed: (old_count - new_count) as usize,
old_page_count: old_count,
new_page_count: new_count,
relocations,
})
}
}
pub struct CompactStats {
pub pages_relocated: usize,
pub pages_freed: usize,
pub old_page_count: u32,
pub new_page_count: u32,
pub relocations: Vec<(u32, u32)>,
}
impl Drop for Database {
fn drop(&mut self) {
let _ = std::fs::remove_file(lock_path(&self.path));
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::embedded::page::{PageType, DEFAULT_PAGE_SIZE, PAGE_HEADER_SIZE};
#[test]
fn test_file_header_roundtrip() {
let fh = FileHeader {
page_count: 42,
freelist_page: 5,
freelist_count: 3,
..FileHeader::new(4096)
};
let mut buf = [0u8; 32];
fh.write_to(&mut buf);
let decoded = FileHeader::read_from(&buf).unwrap();
assert_eq!(decoded.magic, *b"HORA");
assert_eq!(decoded.version, 1);
assert_eq!(decoded.page_size, 4096);
assert_eq!(decoded.page_count, 42);
assert_eq!(decoded.freelist_page, 5);
assert_eq!(decoded.freelist_count, 3);
}
#[test]
fn test_file_header_bad_magic() {
let mut buf = [0u8; 32];
buf[0..4].copy_from_slice(b"NOPE");
assert!(FileHeader::read_from(&buf).is_err());
}
#[test]
fn test_file_header_bad_checksum() {
let fh = FileHeader::new(4096);
let mut buf = [0u8; 32];
fh.write_to(&mut buf);
buf[10] = 0xFF; assert!(FileHeader::read_from(&buf).is_err());
}
#[test]
fn test_create_new_database() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.hora");
let db = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
assert!(path.exists());
assert_eq!(db.alloc().page_count(), 1);
let data = std::fs::read(&path).unwrap();
let fh = FileHeader::read_from(&data).unwrap();
assert_eq!(fh.page_size, DEFAULT_PAGE_SIZE as u32);
assert_eq!(fh.page_count, 1);
}
#[test]
fn test_checkpoint_persists_data() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.hora");
{
let mut db = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
let p = db.alloc_mut().alloc_page(PageType::EntityLeaf);
db.alloc_mut().write_page(p).unwrap()[PAGE_HEADER_SIZE] = 0xAA;
let page_data = db.alloc().read_page(p).unwrap().to_vec();
db.write_frame(p, page_data);
db.checkpoint().unwrap();
}
let db2 = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
assert_eq!(db2.alloc().page_count(), 2);
assert_eq!(db2.alloc().read_page(1).unwrap()[PAGE_HEADER_SIZE], 0xAA);
}
#[test]
fn test_wal_recovery_replays_frames() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.hora");
{
let mut db = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
db.checkpoint().unwrap();
let p = db.alloc_mut().alloc_page(PageType::EntityLeaf);
db.alloc_mut().write_page(p).unwrap()[PAGE_HEADER_SIZE] = 0xBB;
let page_data = db.alloc().read_page(p).unwrap().to_vec();
db.write_frame(p, page_data);
db.flush_wal().unwrap();
}
let db2 = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
assert_eq!(db2.alloc().read_page(1).unwrap()[PAGE_HEADER_SIZE], 0xBB);
assert!(!wal_path(&path).exists());
}
#[test]
fn test_partial_wal_frame_ignored() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.hora");
{
let mut db = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
db.checkpoint().unwrap();
let p1 = db.alloc_mut().alloc_page(PageType::EntityLeaf);
db.alloc_mut().write_page(p1).unwrap()[PAGE_HEADER_SIZE] = 0x11;
let data1 = db.alloc().read_page(p1).unwrap().to_vec();
db.write_frame(p1, data1);
let p2 = db.alloc_mut().alloc_page(PageType::EntityLeaf);
db.alloc_mut().write_page(p2).unwrap()[PAGE_HEADER_SIZE] = 0x22;
let data2 = db.alloc().read_page(p2).unwrap().to_vec();
db.write_frame(p2, data2);
db.flush_wal().unwrap();
}
let wal_p = wal_path(&path);
let wal_data = std::fs::read(&wal_p).unwrap();
let frame_size = WalFrame::HEADER_SIZE + DEFAULT_PAGE_SIZE;
let truncated_len = 26 + frame_size + frame_size / 2;
std::fs::write(&wal_p, &wal_data[..truncated_len]).unwrap();
let db2 = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
assert_eq!(db2.alloc().read_page(1).unwrap()[PAGE_HEADER_SIZE], 0x11);
}
#[test]
fn test_corrupted_wal_frame_skipped() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.hora");
{
let mut db = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
db.checkpoint().unwrap();
let p = db.alloc_mut().alloc_page(PageType::EntityLeaf);
db.alloc_mut().write_page(p).unwrap()[PAGE_HEADER_SIZE] = 0xCC;
let page_data = db.alloc().read_page(p).unwrap().to_vec();
db.write_frame(p, page_data);
db.flush_wal().unwrap();
}
let wal_p = wal_path(&path);
let mut wal_data = std::fs::read(&wal_p).unwrap();
wal_data[26 + WalFrame::HEADER_SIZE + PAGE_HEADER_SIZE] = 0xFF;
std::fs::write(&wal_p, &wal_data).unwrap();
let db2 = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
let page = db2.alloc().read_page(1);
if let Ok(p) = page {
assert_ne!(p[PAGE_HEADER_SIZE], 0xCC);
}
}
#[test]
fn test_no_wal_opens_normally() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.hora");
{
let mut db = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
let p = db.alloc_mut().alloc_page(PageType::EntityLeaf);
db.alloc_mut().write_page(p).unwrap()[PAGE_HEADER_SIZE] = 0xDD;
let page_data = db.alloc().read_page(p).unwrap().to_vec();
db.write_frame(p, page_data);
db.checkpoint().unwrap();
}
assert!(!wal_path(&path).exists());
let db2 = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
assert_eq!(db2.alloc().page_count(), 2);
assert_eq!(db2.alloc().read_page(1).unwrap()[PAGE_HEADER_SIZE], 0xDD);
}
#[test]
fn test_invalid_magic_error() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.hora");
std::fs::write(&path, vec![0u8; DEFAULT_PAGE_SIZE]).unwrap();
let result = Database::open(&path, DEFAULT_PAGE_SIZE);
match result {
Err(HoraError::InvalidFile { reason }) => {
assert!(reason.contains("magic"));
}
_ => panic!("expected InvalidFile error with magic"),
}
}
#[test]
#[cfg(unix)]
fn test_write_lock_prevents_double_open() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.hora");
let _db1 = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
let result = Database::open(&path, DEFAULT_PAGE_SIZE);
assert!(result.is_err());
}
#[test]
fn test_lock_released_on_drop() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.hora");
{
let _db = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
assert!(lock_path(&path).exists());
}
assert!(!lock_path(&path).exists());
let _db2 = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
}
#[test]
fn test_wal_recovery_extends_allocator() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.hora");
{
let mut db = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
db.checkpoint().unwrap();
let mut page_data = vec![0u8; DEFAULT_PAGE_SIZE];
page_data[PAGE_HEADER_SIZE] = 0xEE;
db.wal_mut().write_frame(5, 6, page_data);
db.flush_wal().unwrap();
}
let db2 = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
assert!(db2.alloc().page_count() > 5);
assert_eq!(db2.alloc().read_page(5).unwrap()[PAGE_HEADER_SIZE], 0xEE);
}
#[test]
fn test_read_page_wal_first() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.hora");
let mut db = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
let p = db.alloc_mut().alloc_page(PageType::EntityLeaf);
db.alloc_mut().write_page(p).unwrap()[PAGE_HEADER_SIZE] = 0x01;
let mut wal_data = vec![0u8; DEFAULT_PAGE_SIZE];
wal_data[PAGE_HEADER_SIZE] = 0x02;
db.write_frame(p, wal_data);
let read = db.read_page(p).unwrap();
assert_eq!(read[PAGE_HEADER_SIZE], 0x02);
}
#[test]
fn test_database_compact_reduces_file() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.hora");
{
let mut db = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
for i in 0u8..6 {
let p = db.alloc_mut().alloc_page(PageType::EntityLeaf);
db.alloc_mut().write_page(p).unwrap()[PAGE_HEADER_SIZE] = 0x10 + i;
}
db.alloc_mut().free_page(2).unwrap();
db.alloc_mut().free_page(4).unwrap();
let stats = db.compact().unwrap();
assert!(stats.pages_freed > 0);
assert!(stats.new_page_count < stats.old_page_count);
assert_eq!(db.alloc().freelist_count(), 0);
}
let db2 = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
assert_eq!(db2.alloc().page_count(), 5); }
#[test]
fn test_database_compact_data_intact() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.hora");
{
let mut db = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
let p1 = db.alloc_mut().alloc_page(PageType::EntityLeaf);
db.alloc_mut().write_page(p1).unwrap()[PAGE_HEADER_SIZE] = 0xAA;
let p2 = db.alloc_mut().alloc_page(PageType::EdgeData);
db.alloc_mut().write_page(p2).unwrap()[PAGE_HEADER_SIZE] = 0xBB;
let p3 = db.alloc_mut().alloc_page(PageType::VectorData);
db.alloc_mut().write_page(p3).unwrap()[PAGE_HEADER_SIZE] = 0xCC;
db.alloc_mut().free_page(p2).unwrap();
db.compact().unwrap();
}
let db2 = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
let mut data_bytes: Vec<u8> = Vec::new();
for i in 1..db2.alloc().page_count() {
data_bytes.push(db2.alloc().read_page(i).unwrap()[PAGE_HEADER_SIZE]);
}
data_bytes.sort();
assert_eq!(data_bytes, vec![0xAA, 0xCC]);
}
#[test]
fn test_database_full_vacuum() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.hora");
{
let mut db = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
for i in 0u8..5 {
let p = db.alloc_mut().alloc_page(PageType::EntityLeaf);
db.alloc_mut().write_page(p).unwrap()[PAGE_HEADER_SIZE] = 0x50 + i;
}
db.alloc_mut().free_page(1).unwrap();
db.alloc_mut().free_page(3).unwrap();
let stats = db.full_vacuum().unwrap();
assert_eq!(stats.pages_freed, 2);
assert_eq!(stats.new_page_count, 4); assert_eq!(db.alloc().freelist_count(), 0);
}
let db2 = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
assert_eq!(db2.alloc().page_count(), 4);
let mut data_bytes: Vec<u8> = Vec::new();
for i in 1..db2.alloc().page_count() {
data_bytes.push(db2.alloc().read_page(i).unwrap()[PAGE_HEADER_SIZE]);
}
data_bytes.sort();
assert_eq!(data_bytes, vec![0x51, 0x53, 0x54]);
}
#[test]
fn test_database_begin_commit() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.hora");
{
let mut db = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
db.begin_transaction().unwrap();
assert!(db.in_transaction());
let p = db.alloc_mut().alloc_page(PageType::EntityLeaf);
db.alloc_mut().write_page(p).unwrap()[PAGE_HEADER_SIZE] = 0xAA;
let page_data = db.alloc().read_page(p).unwrap().to_vec();
db.write_frame(p, page_data);
db.commit().unwrap();
assert!(!db.in_transaction());
db.checkpoint().unwrap();
}
let db2 = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
assert_eq!(db2.alloc().read_page(1).unwrap()[PAGE_HEADER_SIZE], 0xAA);
}
#[test]
fn test_database_rollback() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.hora");
let mut db = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
let p1 = db.alloc_mut().alloc_page(PageType::EntityLeaf);
db.alloc_mut().write_page(p1).unwrap()[PAGE_HEADER_SIZE] = 0x11;
let data1 = db.alloc().read_page(p1).unwrap().to_vec();
db.write_frame(p1, data1);
db.checkpoint().unwrap();
db.begin_transaction().unwrap();
let p2 = db.alloc_mut().alloc_page(PageType::EntityLeaf);
db.alloc_mut().write_page(p2).unwrap()[PAGE_HEADER_SIZE] = 0x22;
let data2 = db.alloc().read_page(p2).unwrap().to_vec();
db.write_frame(p2, data2);
db.rollback().unwrap();
assert!(db.wal().read_page(p2).is_none());
}
#[test]
fn test_checkpoint_blocked_during_transaction() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.hora");
let mut db = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
db.begin_transaction().unwrap();
let result = db.checkpoint();
assert!(result.is_err());
}
#[test]
fn test_flush_only_committed_frames() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.hora");
{
let mut db = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
let p1 = db.alloc_mut().alloc_page(PageType::EntityLeaf);
db.alloc_mut().write_page(p1).unwrap()[PAGE_HEADER_SIZE] = 0xAA;
let data1 = db.alloc().read_page(p1).unwrap().to_vec();
db.write_frame(p1, data1);
db.begin_transaction().unwrap();
let p2 = db.alloc_mut().alloc_page(PageType::EntityLeaf);
db.alloc_mut().write_page(p2).unwrap()[PAGE_HEADER_SIZE] = 0xBB;
let data2 = db.alloc().read_page(p2).unwrap().to_vec();
db.write_frame(p2, data2);
db.flush_wal().unwrap();
}
let db2 = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
assert_eq!(db2.alloc().read_page(1).unwrap()[PAGE_HEADER_SIZE], 0xAA);
if db2.alloc().page_count() > 2 {
assert_ne!(db2.alloc().read_page(2).unwrap()[PAGE_HEADER_SIZE], 0xBB);
}
}
#[test]
fn test_nested_transaction_error() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.hora");
let mut db = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
db.begin_transaction().unwrap();
assert!(db.begin_transaction().is_err());
}
#[test]
fn test_commit_without_transaction_error() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.hora");
let mut db = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
assert!(db.commit().is_err());
}
#[test]
fn test_rollback_without_transaction_error() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.hora");
let mut db = Database::open(&path, DEFAULT_PAGE_SIZE).unwrap();
assert!(db.rollback().is_err());
}
}