use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::fs::{File, OpenOptions};
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
pub const DEFAULT_PAGE_SIZE: u32 = 4096;
pub const SOCHDB_MAGIC: [u8; 4] = *b"TOON";
pub const FORMAT_VERSION: u32 = 1;
pub type PageId = u64;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DbHeader {
pub magic: [u8; 4],
pub version: u32,
pub page_size: u32,
pub schema_page: PageId,
pub free_list_head: PageId,
pub total_pages: u64,
pub created_us: u64,
pub modified_us: u64,
pub checksum: u32,
}
impl DbHeader {
pub const SIZE: usize = 128;
pub fn new(page_size: u32) -> Self {
let now = now_micros();
let mut header = Self {
magic: SOCHDB_MAGIC,
version: FORMAT_VERSION,
page_size,
schema_page: 1, free_list_head: 0, total_pages: 2, created_us: now,
modified_us: now,
checksum: 0,
};
header.checksum = header.compute_checksum();
header
}
fn compute_checksum(&self) -> u32 {
let mut hasher = crc32fast::Hasher::new();
hasher.update(&self.magic);
hasher.update(&self.version.to_le_bytes());
hasher.update(&self.page_size.to_le_bytes());
hasher.update(&self.schema_page.to_le_bytes());
hasher.update(&self.free_list_head.to_le_bytes());
hasher.update(&self.total_pages.to_le_bytes());
hasher.update(&self.created_us.to_le_bytes());
hasher.update(&self.modified_us.to_le_bytes());
hasher.finalize()
}
pub fn validate(&self) -> bool {
self.magic == SOCHDB_MAGIC && self.checksum == self.compute_checksum()
}
pub fn to_bytes(&self) -> [u8; Self::SIZE] {
let mut buf = [0u8; Self::SIZE];
let mut cursor = io::Cursor::new(&mut buf[..]);
cursor.write_all(&self.magic).unwrap();
cursor.write_u32::<LittleEndian>(self.version).unwrap();
cursor.write_u32::<LittleEndian>(self.page_size).unwrap();
cursor.write_u64::<LittleEndian>(self.schema_page).unwrap();
cursor
.write_u64::<LittleEndian>(self.free_list_head)
.unwrap();
cursor.write_u64::<LittleEndian>(self.total_pages).unwrap();
cursor.write_u64::<LittleEndian>(self.created_us).unwrap();
cursor.write_u64::<LittleEndian>(self.modified_us).unwrap();
cursor.write_u32::<LittleEndian>(self.checksum).unwrap();
buf
}
pub fn from_bytes(buf: &[u8]) -> io::Result<Self> {
if buf.len() < Self::SIZE {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Buffer too short for header",
));
}
let mut cursor = io::Cursor::new(buf);
let mut magic = [0u8; 4];
cursor.read_exact(&mut magic)?;
let version = cursor.read_u32::<LittleEndian>()?;
let page_size = cursor.read_u32::<LittleEndian>()?;
let schema_page = cursor.read_u64::<LittleEndian>()?;
let free_list_head = cursor.read_u64::<LittleEndian>()?;
let total_pages = cursor.read_u64::<LittleEndian>()?;
let created_us = cursor.read_u64::<LittleEndian>()?;
let modified_us = cursor.read_u64::<LittleEndian>()?;
let checksum = cursor.read_u32::<LittleEndian>()?;
Ok(Self {
magic,
version,
page_size,
schema_page,
free_list_head,
total_pages,
created_us,
modified_us,
checksum,
})
}
}
#[derive(Debug, Clone)]
pub struct FreePageHeader {
pub next_free: PageId,
pub count: u32,
}
impl FreePageHeader {
pub const SIZE: usize = 12;
pub fn to_bytes(&self) -> [u8; Self::SIZE] {
let mut buf = [0u8; Self::SIZE];
let mut cursor = io::Cursor::new(&mut buf[..]);
cursor.write_u64::<LittleEndian>(self.next_free).unwrap();
cursor.write_u32::<LittleEndian>(self.count).unwrap();
buf
}
pub fn from_bytes(buf: &[u8]) -> io::Result<Self> {
let mut cursor = io::Cursor::new(buf);
Ok(Self {
next_free: cursor.read_u64::<LittleEndian>()?,
count: cursor.read_u32::<LittleEndian>()?,
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum PageType {
Header = 0,
Catalog = 1,
ColumnGroup = 2,
IndexInterior = 3,
IndexLeaf = 4,
Free = 5,
Overflow = 6,
}
#[derive(Debug, Clone, Default)]
pub struct PageManagerStats {
pub total_pages: u64,
pub used_pages: u64,
pub free_pages: u64,
pub allocations: u64,
pub deallocations: u64,
pub page_size: u32,
pub file_size: u64,
pub space_amplification: f64,
}
pub struct PageManager {
path: PathBuf,
file: RwLock<File>,
header: RwLock<DbHeader>,
page_size: u32,
allocations: AtomicU64,
deallocations: AtomicU64,
}
impl PageManager {
pub fn create<P: AsRef<Path>>(path: P, page_size: u32) -> io::Result<Self> {
let path = path.as_ref().to_path_buf();
let mut file = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(true)
.open(&path)?;
let header = DbHeader::new(page_size);
let header_bytes = header.to_bytes();
file.write_all(&header_bytes)?;
let padding = vec![0u8; page_size as usize - DbHeader::SIZE];
file.write_all(&padding)?;
let catalog_page = vec![0u8; page_size as usize];
file.write_all(&catalog_page)?;
file.sync_all()?;
Ok(Self {
path,
file: RwLock::new(file),
header: RwLock::new(header),
page_size,
allocations: AtomicU64::new(0),
deallocations: AtomicU64::new(0),
})
}
pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
let path = path.as_ref().to_path_buf();
let mut file = OpenOptions::new().read(true).write(true).open(&path)?;
let mut header_buf = [0u8; DbHeader::SIZE];
file.seek(SeekFrom::Start(0))?;
file.read_exact(&mut header_buf)?;
let header = DbHeader::from_bytes(&header_buf)?;
if !header.validate() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid database header or checksum",
));
}
let page_size = header.page_size;
Ok(Self {
path,
file: RwLock::new(file),
header: RwLock::new(header),
page_size,
allocations: AtomicU64::new(0),
deallocations: AtomicU64::new(0),
})
}
pub fn allocate_page(&self) -> io::Result<PageId> {
let mut header = self.header.write();
let mut file = self.file.write();
self.allocations.fetch_add(1, Ordering::Relaxed);
if header.free_list_head != 0 {
let page_id = header.free_list_head;
let offset = page_id * self.page_size as u64;
file.seek(SeekFrom::Start(offset))?;
let mut free_header_buf = [0u8; FreePageHeader::SIZE];
file.read_exact(&mut free_header_buf)?;
let free_header = FreePageHeader::from_bytes(&free_header_buf)?;
header.free_list_head = free_header.next_free;
header.modified_us = now_micros();
header.checksum = header.compute_checksum();
file.seek(SeekFrom::Start(0))?;
file.write_all(&header.to_bytes())?;
Ok(page_id)
} else {
let page_id = header.total_pages;
header.total_pages += 1;
header.modified_us = now_micros();
header.checksum = header.compute_checksum();
file.seek(SeekFrom::Start(0))?;
file.write_all(&header.to_bytes())?;
let offset = page_id * self.page_size as u64;
file.seek(SeekFrom::Start(offset))?;
let zero_page = vec![0u8; self.page_size as usize];
file.write_all(&zero_page)?;
Ok(page_id)
}
}
pub fn deallocate_page(&self, page_id: PageId) -> io::Result<()> {
if page_id < 2 {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Cannot deallocate header or catalog pages",
));
}
let mut header = self.header.write();
let mut file = self.file.write();
self.deallocations.fetch_add(1, Ordering::Relaxed);
let free_header = FreePageHeader {
next_free: header.free_list_head,
count: 1,
};
let offset = page_id * self.page_size as u64;
file.seek(SeekFrom::Start(offset))?;
file.write_all(&free_header.to_bytes())?;
header.free_list_head = page_id;
header.modified_us = now_micros();
header.checksum = header.compute_checksum();
file.seek(SeekFrom::Start(0))?;
file.write_all(&header.to_bytes())?;
Ok(())
}
pub fn read_page(&self, page_id: PageId) -> io::Result<Vec<u8>> {
let mut file = self.file.write();
let offset = page_id * self.page_size as u64;
file.seek(SeekFrom::Start(offset))?;
let mut buf = vec![0u8; self.page_size as usize];
file.read_exact(&mut buf)?;
Ok(buf)
}
pub fn write_page(&self, page_id: PageId, data: &[u8]) -> io::Result<()> {
if data.len() != self.page_size as usize {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("Page data must be exactly {} bytes", self.page_size),
));
}
let mut file = self.file.write();
let offset = page_id * self.page_size as u64;
file.seek(SeekFrom::Start(offset))?;
file.write_all(data)?;
{
let mut header = self.header.write();
header.modified_us = now_micros();
header.checksum = header.compute_checksum();
file.seek(SeekFrom::Start(0))?;
file.write_all(&header.to_bytes())?;
}
Ok(())
}
pub fn sync(&self) -> io::Result<()> {
self.file.read().sync_all()
}
pub fn page_size(&self) -> u32 {
self.page_size
}
pub fn total_pages(&self) -> u64 {
self.header.read().total_pages
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn stats(&self) -> io::Result<PageManagerStats> {
let header = self.header.read();
let file = self.file.read();
let file_size = file.metadata()?.len();
let mut free_count = 0u64;
let mut current = header.free_list_head;
drop(header);
drop(file);
while current != 0 {
free_count += 1;
let page_data = self.read_page(current)?;
let free_header = FreePageHeader::from_bytes(&page_data)?;
current = free_header.next_free;
if free_count > 1_000_000 {
break;
}
}
let header = self.header.read();
let used_pages = header.total_pages - free_count;
let logical_size = used_pages * self.page_size as u64;
let space_amp = if logical_size > 0 {
file_size as f64 / logical_size as f64
} else {
1.0
};
Ok(PageManagerStats {
total_pages: header.total_pages,
used_pages,
free_pages: free_count,
allocations: self.allocations.load(Ordering::Relaxed),
deallocations: self.deallocations.load(Ordering::Relaxed),
page_size: self.page_size,
file_size,
space_amplification: space_amp,
})
}
}
fn now_micros() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_micros() as u64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_create_and_open() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.sochdb");
{
let pm = PageManager::create(&path, DEFAULT_PAGE_SIZE).unwrap();
assert_eq!(pm.total_pages(), 2);
assert_eq!(pm.page_size(), DEFAULT_PAGE_SIZE);
}
{
let pm = PageManager::open(&path).unwrap();
assert_eq!(pm.total_pages(), 2);
}
}
#[test]
fn test_allocate_and_deallocate() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.sochdb");
let pm = PageManager::create(&path, DEFAULT_PAGE_SIZE).unwrap();
let p1 = pm.allocate_page().unwrap();
let p2 = pm.allocate_page().unwrap();
let p3 = pm.allocate_page().unwrap();
assert_eq!(p1, 2); assert_eq!(p2, 3);
assert_eq!(p3, 4);
assert_eq!(pm.total_pages(), 5);
pm.deallocate_page(p2).unwrap();
let p4 = pm.allocate_page().unwrap();
assert_eq!(p4, 3);
let stats = pm.stats().unwrap();
assert_eq!(stats.total_pages, 5);
assert_eq!(stats.free_pages, 0);
assert!(stats.space_amplification < 1.5);
}
#[test]
fn test_read_write_page() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.sochdb");
let pm = PageManager::create(&path, DEFAULT_PAGE_SIZE).unwrap();
let page_id = pm.allocate_page().unwrap();
let mut data = vec![0u8; DEFAULT_PAGE_SIZE as usize];
data[0..4].copy_from_slice(b"TEST");
data[100..108].copy_from_slice(&12345u64.to_le_bytes());
pm.write_page(page_id, &data).unwrap();
let read_data = pm.read_page(page_id).unwrap();
assert_eq!(&read_data[0..4], b"TEST");
let value = u64::from_le_bytes(read_data[100..108].try_into().unwrap());
assert_eq!(value, 12345);
}
#[test]
fn test_header_validation() {
let header = DbHeader::new(4096);
assert!(header.validate());
let bytes = header.to_bytes();
let restored = DbHeader::from_bytes(&bytes).unwrap();
assert!(restored.validate());
let mut bad_bytes = bytes;
bad_bytes[10] = 0xFF;
let bad_header = DbHeader::from_bytes(&bad_bytes).unwrap();
assert!(!bad_header.validate());
}
#[test]
fn test_cannot_deallocate_system_pages() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.sochdb");
let pm = PageManager::create(&path, DEFAULT_PAGE_SIZE).unwrap();
assert!(pm.deallocate_page(0).is_err());
assert!(pm.deallocate_page(1).is_err());
}
}