#[cfg(test)]
mod tests;
mod cache;
use self::cache::Cache;
use crate::{Page, Pid};
use eyre::{bail, ensure, OptionExt, Result, WrapErr};
use generic_array::{typenum::Unsigned, GenericArray};
use roaring::RoaringBitmap;
use std::{
fs::{self, File},
io::{self, Read, Seek, Write},
num::NonZeroUsize,
path::{Path, PathBuf},
};
#[derive(Default)]
pub struct Counter {
pub pages: usize,
pub reads: usize,
pub writes: usize,
}
pub struct Store<P: Page> {
cache: Cache<Pid, (P, bool)>,
counter: Counter,
dir: PathBuf,
file: File,
vacants: RoaringBitmap,
}
impl<P: Page> Store<P> {
const PS: u64 = P::Size::U64;
#[inline]
pub fn cache_margin(&self) -> usize {
self.cache.margin()
}
#[inline]
pub fn counter(&self) -> &Counter {
&self.counter
}
#[inline]
pub fn dir(&self) -> &Path {
self.dir.as_path()
}
#[inline]
fn filelen(&self) -> u64 {
self.file
.metadata()
.expect("failed to fetch file metadata")
.len()
}
fn minimize_vacancies(&mut self) -> Result<()> {
let mut max = self.vacants.max().ok_or_eyre("no max vacancy found")?;
while max > 0 && self.vacants.contains(max - 1) {
self.vacants.remove(max);
max -= 1;
}
Ok(())
}
pub fn new(index_dir: PathBuf, cache_cap: NonZeroUsize) -> Result<Self> {
fs::create_dir_all(&index_dir)?;
let index_path = index_dir.join("index.bin");
let file = fs::File::options()
.create(true)
.read(true)
.write(true)
.truncate(false)
.open(index_path)?;
Ok(Self {
cache: Cache::new(cache_cap),
counter: Counter::default(),
dir: index_dir.clone(),
file,
vacants: RoaringBitmap::from([0]),
})
}
#[inline]
pub fn num_io(&self) -> usize {
self.counter.reads + self.counter.writes
}
pub fn resume(
index_dir: PathBuf,
cache_cap: NonZeroUsize,
serial_vacants: Vec<u8>,
) -> Result<Self> {
let mut store = Self::new(index_dir, cache_cap)?;
ensure!(store.filelen() % Self::PS == 0, "Data file is corrupted");
let mut buf = io::Cursor::new(serial_vacants);
store.vacants = RoaringBitmap::deserialize_from(&mut buf)?;
Ok(store)
}
#[inline]
pub fn set_pages(&mut self, num_pages: usize) {
self.counter.pages = num_pages;
}
pub fn sync_file(&mut self) -> Result<()> {
self.flush_all()?;
self.minimize_vacancies()?;
let max_vacant = self
.vacants
.max()
.ok_or_eyre("no vacancies on the index file")? as u64;
self.file.set_len(max_vacant * Self::PS)?;
Ok(())
}
#[inline]
pub fn vacants(&self) -> &RoaringBitmap {
&self.vacants
}
}
impl<P: Page> Store<P> {
pub fn detach<T>(&mut self, pid: Pid, f: impl FnOnce(&P, &mut Self) -> Result<T>) -> Result<T> {
let is_pinned = self.is_pinned(&pid);
self.fetch(pid).wrap_err("error while fetching a page")?;
let (page, is_fragile) = self
.cache
.detach(&pid)
.wrap_err("failed to detach the cache entry")?;
let t = f(&page, self);
self.cache
.attach(&pid, (page, is_fragile))
.wrap_err("failed to attach the cache entry")?;
if is_pinned {
self.pin(pid)?;
}
t
}
pub fn detach_mut<T>(
&mut self,
pid: Pid,
f: impl FnOnce(&mut P, &mut Self) -> Result<T>,
) -> Result<T> {
let is_pinned = self.is_pinned(&pid);
self.fetch(pid).wrap_err("error while fetching a page")?;
let (mut page, _) = self
.cache
.detach(&pid)
.wrap_err("failed to detach the cache entry")?;
let t = f(&mut page, self);
self.cache
.attach(&pid, (page, true))
.wrap_err("failed to attach the cache entry")?;
if is_pinned {
self.pin(pid)?;
}
t
}
fn fetch(&mut self, pid: Pid) -> Result<()> {
if !self.cache.is_cached(&pid) {
let offset = (*pid as u64) * Self::PS;
assert!(
offset + Self::PS <= self.filelen(),
"File is short of content: Offset + PS = {}, File Length = {}",
offset + Self::PS,
self.filelen()
);
self.file.seek(io::SeekFrom::Start(offset))?;
let mut buffer = GenericArray::<u8, P::Size>::default();
self.file.read_exact(&mut buffer)?;
let page = P::decode_from(&mut io::Cursor::new(buffer))?;
self.push(pid, (page, false))?;
self.counter.reads += 1;
}
Ok(())
}
fn flush(&mut self, (pid, (page, is_fragile)): (Pid, (P, bool))) -> Result<()> {
if is_fragile {
let mut buffer = GenericArray::<u8, P::Size>::default();
let mut writer = io::Cursor::new(buffer.as_mut_slice());
page.encode_into(&mut writer)?;
let offset = (*pid as u64) * Self::PS;
self.file.seek(io::SeekFrom::Start(offset))?;
self.file.write_all(writer.get_ref())?;
self.counter.writes += 1;
}
Ok(())
}
#[inline]
pub fn flush_all(&mut self) -> Result<()> {
self.cache
.empty()
.wrap_err("failed to empty the cache")?
.into_iter()
.try_for_each(|entry| self.flush(entry).wrap_err("failed to flush the page"))
}
pub fn get(&mut self, pid: Pid) -> Result<&P> {
self.fetch(pid).wrap_err("failed to fetch the page")?;
Ok(&self
.cache
.get(&pid)
.wrap_err("requested entry is absent from the cache")?
.0)
}
pub fn get_mut(&mut self, pid: Pid) -> Result<&mut P> {
self.fetch(pid).wrap_err("failed to fetch the page")?;
let (page, is_fragile) = self
.cache
.get_mut(&pid)
.wrap_err("requested entry is absent from the cache")?;
*is_fragile = true;
Ok(page)
}
pub fn insert(&mut self, pid: u32, page: P) -> Result<Pid> {
let cur_max = self
.vacants
.max()
.ok_or_eyre("no max Pid found: empty bitmap")?;
if cur_max <= pid {
self.vacants.insert_range(cur_max..=pid + 1);
}
if !self.vacants.remove(pid) {
bail!("overwriting an occupied page: {:?}", pid)
}
self.counter.pages += 1;
self.push(Pid(pid), (page, true))
.wrap_err("failed to push the page")?;
Ok(Pid(pid))
}
#[inline]
pub fn is_cached(&self, pid: &Pid) -> bool {
self.cache.is_cached(pid)
}
#[inline]
pub fn is_pinned(&self, pid: &Pid) -> bool {
self.cache.is_pinned(pid)
}
pub fn pin(&mut self, pid: Pid) -> Result<()> {
self.fetch(pid).wrap_err("failed to fetch the page")?;
self.cache
.pin(&pid)
.wrap_err("failed to pin the cache entry")
}
pub fn pop(&mut self, pid: Pid) -> Result<P> {
self.fetch(pid).wrap_err("failed to fetch the page")?;
let page = self.cache.pop(&pid).wrap_err("failed to pop the cache")?.0;
self.vacants.insert(*pid);
self.minimize_vacancies()?;
self.counter.pages -= 1;
Ok(page)
}
fn push(&mut self, pid: Pid, entry: (P, bool)) -> Result<()> {
if let Some(stale) = self
.cache
.push(pid, entry)
.wrap_err("failed to push the new cache entry")?
{
self.flush(stale)
.wrap_err("failed to flush the stale page")?;
}
Ok(())
}
pub fn save(&mut self, page: P) -> Result<Pid> {
let at = if self.vacants.len() == 1 {
let new_pid = self.vacants.max().expect("no vacant Pid to return");
self.vacants.insert(new_pid + 1);
new_pid
} else {
self.vacants.min().expect("no min Pid found: empty bitmap")
};
self.insert(at, page)
}
#[inline]
pub fn unpin(&mut self, pid: Pid) -> Result<()> {
self.cache
.unpin(&pid)
.wrap_err("failed to unpin the cache entry")
}
}