use std::collections::HashSet;
use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use crate::util::mmap::{map_file, Mmap};
use crate::constants::{
LOCK_BYTE_OFFSET, LOCK_BYTE_RANGE, MAX_PAGE_SIZE, MIN_PAGE_SIZE, OS_PAGE_SIZE,
};
use crate::error::{KiteError, Result};
pub struct FilePager {
file: File,
file_path: PathBuf,
page_size: usize,
file_size: u64,
free_pages: HashSet<u32>,
mmap: Option<Mmap>,
}
impl FilePager {
pub fn new(file: File, file_path: PathBuf, page_size: usize) -> Result<Self> {
let file_size = file.metadata()?.len();
Ok(Self {
file,
file_path,
page_size,
file_size,
free_pages: HashSet::new(),
mmap: None,
})
}
pub fn with_size(file: File, file_path: PathBuf, page_size: usize, file_size: u64) -> Self {
Self {
file,
file_path,
page_size,
file_size,
free_pages: HashSet::new(),
mmap: None,
}
}
pub fn file_path(&self) -> &Path {
&self.file_path
}
pub fn page_size(&self) -> usize {
self.page_size
}
pub fn file_size(&self) -> u64 {
self.file_size
}
fn lock_byte_page_range(&self) -> (u32, u32) {
let start = (LOCK_BYTE_OFFSET / self.page_size as u64) as u32;
let end = (LOCK_BYTE_OFFSET + LOCK_BYTE_RANGE as u64).div_ceil(self.page_size as u64) as u32;
(start, end)
}
fn is_lock_byte_page(&self, page_num: u32) -> bool {
let (start, end) = self.lock_byte_page_range();
page_num >= start && page_num < end
}
pub fn read_page(&mut self, page_num: u32) -> Result<Vec<u8>> {
let offset = page_num as u64 * self.page_size as u64;
if offset >= self.file_size {
return Ok(vec![0u8; self.page_size]);
}
let mut buffer = vec![0u8; self.page_size];
self.file.seek(SeekFrom::Start(offset))?;
let _bytes_read = self.file.read(&mut buffer)?;
Ok(buffer)
}
pub fn write_page(&mut self, page_num: u32, data: &[u8]) -> Result<()> {
if data.len() != self.page_size {
return Err(KiteError::Internal(format!(
"Page data must be exactly {} bytes, got {}",
self.page_size,
data.len()
)));
}
if self.is_lock_byte_page(page_num) {
return Err(KiteError::Internal(format!(
"Cannot write to lock byte page range (page {page_num})"
)));
}
let offset = page_num as u64 * self.page_size as u64;
let required_size = offset + self.page_size as u64;
if required_size > self.file_size {
self.file.set_len(required_size)?;
self.file_size = required_size;
}
self.file.seek(SeekFrom::Start(offset))?;
self.file.write_all(data)?;
self.invalidate_mmap_cache();
Ok(())
}
pub fn mmap_file(&mut self) -> Result<&Mmap> {
if let Some(mmap) = self.mmap.as_ref() {
let file_len = self.file.metadata()?.len() as usize;
if file_len != mmap.len() {
self.mmap = None;
}
}
if self.mmap.is_none() {
let mmap = map_file(&self.file)?;
self.mmap = Some(mmap);
}
self
.mmap
.as_ref()
.ok_or_else(|| KiteError::Internal("mmap not initialized after mapping".to_string()))
}
pub fn mmap_range(&mut self, start_page: u32, page_count: u32) -> Result<&[u8]> {
let start_offset = start_page as usize * self.page_size;
let length = page_count as usize * self.page_size;
if start_offset % OS_PAGE_SIZE != 0 {
return Err(KiteError::Internal(format!(
"mmap offset {start_offset} must be aligned to OS page size {OS_PAGE_SIZE}"
)));
}
let mmap = self.mmap_file()?;
if start_offset + length > mmap.len() {
return Err(KiteError::Internal(format!(
"mmap range {}..{} exceeds file size {}",
start_offset,
start_offset + length,
mmap.len()
)));
}
Ok(&mmap[start_offset..start_offset + length])
}
pub fn allocate_pages(&mut self, count: u32) -> Result<u32> {
if count == 0 {
return Err(KiteError::Internal(
"Must allocate at least 1 page".to_string(),
));
}
let current_page_count = self.file_size.div_ceil(self.page_size as u64) as u32;
let mut start_page = current_page_count;
let (lock_start, lock_end) = self.lock_byte_page_range();
if start_page < lock_end && start_page + count > lock_start {
start_page = lock_end;
}
let new_size = (start_page + count) as u64 * self.page_size as u64;
self.file.set_len(new_size)?;
self.file_size = new_size;
self.invalidate_mmap_cache();
Ok(start_page)
}
pub fn free_pages(&mut self, start_page: u32, count: u32) {
for i in 0..count {
self.free_pages.insert(start_page + i);
}
}
pub fn free_page_count(&self) -> usize {
self.free_pages.len()
}
pub fn truncate_pages(&mut self, page_count: u32) -> Result<()> {
let new_size = page_count as u64 * self.page_size as u64;
self.file.set_len(new_size)?;
self.file_size = new_size;
self.invalidate_mmap_cache();
Ok(())
}
pub fn sync(&self) -> Result<()> {
#[cfg(target_os = "macos")]
{
use std::os::unix::io::AsRawFd;
let result = unsafe { libc::fsync(self.file.as_raw_fd()) };
if result != 0 {
return Err(std::io::Error::last_os_error().into());
}
}
#[cfg(not(target_os = "macos"))]
{
self.file.sync_all()?;
}
Ok(())
}
pub fn relocate_area(&mut self, src_page: u32, page_count: u32, dst_page: u32) -> Result<()> {
if src_page == dst_page {
return Ok(());
}
let (lock_start, lock_end) = self.lock_byte_page_range();
if dst_page < lock_end && dst_page + page_count > lock_start {
return Err(KiteError::Internal(
"Cannot relocate to lock byte range".to_string(),
));
}
let copy_forward = src_page < dst_page;
if copy_forward {
for i in (0..page_count).rev() {
let src_offset = (src_page + i) as u64 * self.page_size as u64;
let dst_offset = (dst_page + i) as u64 * self.page_size as u64;
let mut buffer = vec![0u8; self.page_size];
self.file.seek(SeekFrom::Start(src_offset))?;
self.file.read_exact(&mut buffer)?;
let required_size = dst_offset + self.page_size as u64;
if required_size > self.file_size {
self.file.set_len(required_size)?;
self.file_size = required_size;
}
self.file.seek(SeekFrom::Start(dst_offset))?;
self.file.write_all(&buffer)?;
}
} else {
for i in 0..page_count {
let src_offset = (src_page + i) as u64 * self.page_size as u64;
let dst_offset = (dst_page + i) as u64 * self.page_size as u64;
let mut buffer = vec![0u8; self.page_size];
self.file.seek(SeekFrom::Start(src_offset))?;
self.file.read_exact(&mut buffer)?;
let required_size = dst_offset + self.page_size as u64;
if required_size > self.file_size {
self.file.set_len(required_size)?;
self.file_size = required_size;
}
self.file.seek(SeekFrom::Start(dst_offset))?;
self.file.write_all(&buffer)?;
}
}
self.sync()?;
self.free_pages(src_page, page_count);
self.invalidate_mmap_cache();
Ok(())
}
fn invalidate_mmap_cache(&mut self) {
self.mmap = None;
}
pub fn file(&self) -> &File {
&self.file
}
pub fn file_mut(&mut self) -> &mut File {
&mut self.file
}
}
pub fn open_pager<P: AsRef<Path>>(file_path: P, page_size: usize) -> Result<FilePager> {
let file = OpenOptions::new().read(true).write(true).open(&file_path)?;
FilePager::new(file, file_path.as_ref().to_path_buf(), page_size)
}
pub fn create_pager<P: AsRef<Path>>(file_path: P, page_size: usize) -> Result<FilePager> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&file_path)?;
Ok(FilePager::with_size(
file,
file_path.as_ref().to_path_buf(),
page_size,
0,
))
}
pub fn is_valid_page_size(page_size: usize) -> bool {
if !(MIN_PAGE_SIZE..=MAX_PAGE_SIZE).contains(&page_size) {
return false;
}
(page_size & (page_size - 1)) == 0
}
pub fn pages_to_store(byte_count: usize, page_size: usize) -> u32 {
byte_count.div_ceil(page_size) as u32
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::NamedTempFile;
#[test]
fn test_is_valid_page_size() {
assert!(is_valid_page_size(4096));
assert!(is_valid_page_size(8192));
assert!(is_valid_page_size(16384));
assert!(is_valid_page_size(32768));
assert!(is_valid_page_size(65536));
assert!(!is_valid_page_size(2048));
assert!(!is_valid_page_size(131072));
assert!(!is_valid_page_size(5000));
assert!(!is_valid_page_size(6000));
}
#[test]
fn test_pages_to_store() {
assert_eq!(pages_to_store(0, 4096), 0);
assert_eq!(pages_to_store(1, 4096), 1);
assert_eq!(pages_to_store(4096, 4096), 1);
assert_eq!(pages_to_store(4097, 4096), 2);
assert_eq!(pages_to_store(8192, 4096), 2);
assert_eq!(pages_to_store(10000, 4096), 3);
}
#[test]
fn test_read_write_page() {
let temp_file = NamedTempFile::new().expect("expected value");
let mut pager = create_pager(temp_file.path(), 4096).expect("expected value");
let data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
pager.write_page(0, &data).expect("expected value");
let read_data = pager.read_page(0).expect("expected value");
assert_eq!(read_data, data);
}
#[test]
fn test_read_empty_page() {
let temp_file = NamedTempFile::new().expect("expected value");
let mut pager = create_pager(temp_file.path(), 4096).expect("expected value");
let read_data = pager.read_page(100).expect("expected value");
assert_eq!(read_data, vec![0u8; 4096]);
}
#[test]
fn test_allocate_pages() {
let temp_file = NamedTempFile::new().expect("expected value");
let mut pager = create_pager(temp_file.path(), 4096).expect("expected value");
let start1 = pager.allocate_pages(5).expect("expected value");
assert_eq!(start1, 0);
assert_eq!(pager.file_size(), 5 * 4096);
let start2 = pager.allocate_pages(3).expect("expected value");
assert_eq!(start2, 5);
assert_eq!(pager.file_size(), 8 * 4096);
}
#[test]
fn test_free_pages() {
let temp_file = NamedTempFile::new().expect("expected value");
let mut pager = create_pager(temp_file.path(), 4096).expect("expected value");
pager.allocate_pages(10).expect("expected value");
pager.free_pages(2, 3);
assert_eq!(pager.free_page_count(), 3);
}
#[test]
fn test_mmap_file() {
let temp_file = NamedTempFile::new().expect("expected value");
let mut pager = create_pager(temp_file.path(), 4096).expect("expected value");
let data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
pager.write_page(0, &data).expect("expected value");
pager.sync().expect("expected value");
let mmap = pager.mmap_file().expect("expected value");
assert_eq!(&mmap[..4096], &data[..]);
}
#[test]
fn test_write_extends_file() {
let temp_file = NamedTempFile::new().expect("expected value");
let mut pager = create_pager(temp_file.path(), 4096).expect("expected value");
assert_eq!(pager.file_size(), 0);
let data = vec![0xAB; 4096];
pager.write_page(5, &data).expect("expected value");
assert_eq!(pager.file_size(), 6 * 4096);
}
#[test]
fn test_page_size_validation() {
let temp_file = NamedTempFile::new().expect("expected value");
let mut pager = create_pager(temp_file.path(), 4096).expect("expected value");
let small_data = vec![0u8; 100];
assert!(pager.write_page(0, &small_data).is_err());
let large_data = vec![0u8; 8192];
assert!(pager.write_page(0, &large_data).is_err());
}
}