pageman 0.1.0

Disk-based page manager/store
Documentation
#[cfg(test)]
mod tests;

mod cache;

use self::cache::Cache;
use crate::{Page, Pid};
use eyre::{bail, ensure, OptionExt, Result, WrapErr};
use generic_array::{typenum::Unsigned, GenericArray};
use roaring::RoaringBitmap;
use std::{
    fs::{self, File},
    io::{self, Read, Seek, Write},
    num::NonZeroUsize,
    path::{Path, PathBuf},
};

/// Counter for various index statistics
#[derive(Default)]
pub struct Counter {
    /// Number of pages,
    pub pages: usize,
    /// Number of read operations
    pub reads: usize,
    /// Number of write operations
    pub writes: usize,
}

/**
Store represents the inner API used by the index to manage its file. In addition to handling all the I/O operations, it maintains the counter of stats, and the page cache.
*/
pub struct Store<P: Page> {
    /// The cache for pages
    cache: Cache<Pid, (P, bool)>,
    /// Counter for all page usage related stuff.
    counter: Counter,
    /// Directory where index file, and summary is located.
    dir: PathBuf,
    /// The index file use to read and write pages.
    file: File,
    /// Various bitmaps that catalogue the state of the index file.
    vacants: RoaringBitmap,
}

impl<P: Page> Store<P> {
    const PS: u64 = P::Size::U64;

    #[inline]
    pub fn cache_margin(&self) -> usize {
        self.cache.margin()
    }

    #[inline]
    pub fn counter(&self) -> &Counter {
        &self.counter
    }

    #[inline]
    pub fn dir(&self) -> &Path {
        self.dir.as_path()
    }

    #[inline]
    fn filelen(&self) -> u64 {
        self.file
            .metadata()
            .expect("failed to fetch file metadata")
            .len()
    }

    /// Reduce the trailing vacancies at the end of the index file.
    fn minimize_vacancies(&mut self) -> Result<()> {
        let mut max = self.vacants.max().ok_or_eyre("no max vacancy found")?;
        while max > 0 && self.vacants.contains(max - 1) {
            self.vacants.remove(max);
            max -= 1;
        }
        Ok(())
    }

    /// Create a new Store at the provided directory path, with a given cache capacity.
    pub fn new(index_dir: PathBuf, cache_cap: NonZeroUsize) -> Result<Self> {
        fs::create_dir_all(&index_dir)?;
        let index_path = index_dir.join("index.bin");
        let file = fs::File::options()
            .create(true)
            .read(true)
            .write(true)
            .truncate(false)
            .open(index_path)?;
        // tracing::trace!(dir = ?index_dir);
        Ok(Self {
            cache: Cache::new(cache_cap),
            counter: Counter::default(),
            dir: index_dir.clone(),
            file,
            vacants: RoaringBitmap::from([0]),
        })
    }

    #[inline]
    pub fn num_io(&self) -> usize {
        self.counter.reads + self.counter.writes
    }

    /// Resume the store from a previously saved state.
    pub fn resume(
        index_dir: PathBuf,
        cache_cap: NonZeroUsize,
        serial_vacants: Vec<u8>,
    ) -> Result<Self> {
        let mut store = Self::new(index_dir, cache_cap)?;
        ensure!(store.filelen() % Self::PS == 0, "Data file is corrupted");
        let mut buf = io::Cursor::new(serial_vacants);
        store.vacants = RoaringBitmap::deserialize_from(&mut buf)?;
        Ok(store)
    }

    #[inline]
    pub fn set_pages(&mut self, num_pages: usize) {
        self.counter.pages = num_pages;
    }

    pub fn sync_file(&mut self) -> Result<()> {
        self.flush_all()?;
        self.minimize_vacancies()?;
        let max_vacant = self
            .vacants
            .max()
            .ok_or_eyre("no vacancies on the index file")? as u64;
        self.file.set_len(max_vacant * Self::PS)?;
        Ok(())
    }

    #[inline]
    pub fn vacants(&self) -> &RoaringBitmap {
        &self.vacants
    }
}

/**
This section implements the logic for managing (Pid, Page) in the cache
*/
impl<P: Page> Store<P> {
    /// This function allows executing a closure with access to a detached page.
    pub fn detach<T>(&mut self, pid: Pid, f: impl FnOnce(&P, &mut Self) -> Result<T>) -> Result<T> {
        let is_pinned = self.is_pinned(&pid);
        self.fetch(pid).wrap_err("error while fetching a page")?;
        let (page, is_fragile) = self
            .cache
            .detach(&pid)
            .wrap_err("failed to detach the cache entry")?;
        let t = f(&page, self);
        self.cache
            .attach(&pid, (page, is_fragile))
            .wrap_err("failed to attach the cache entry")?;
        if is_pinned {
            self.pin(pid)?;
        }
        t
    }

    /// This function allows executing a closure with access to a detached page (mutable).
    pub fn detach_mut<T>(
        &mut self,
        pid: Pid,
        f: impl FnOnce(&mut P, &mut Self) -> Result<T>,
    ) -> Result<T> {
        let is_pinned = self.is_pinned(&pid);
        self.fetch(pid).wrap_err("error while fetching a page")?;
        let (mut page, _) = self
            .cache
            .detach(&pid)
            .wrap_err("failed to detach the cache entry")?;
        let t = f(&mut page, self);
        self.cache
            .attach(&pid, (page, true))
            .wrap_err("failed to attach the cache entry")?;
        if is_pinned {
            self.pin(pid)?;
        }
        t
    }

    /// Fetch the page from the disk, if not cached.
    fn fetch(&mut self, pid: Pid) -> Result<()> {
        if !self.cache.is_cached(&pid) {
            let offset = (*pid as u64) * Self::PS;
            assert!(
                offset + Self::PS <= self.filelen(),
                "File is short of content: Offset + PS = {}, File Length = {}",
                offset + Self::PS,
                self.filelen()
            );
            self.file.seek(io::SeekFrom::Start(offset))?;
            let mut buffer = GenericArray::<u8, P::Size>::default();
            self.file.read_exact(&mut buffer)?;
            let page = P::decode_from(&mut io::Cursor::new(buffer))?;
            self.push(pid, (page, false))?;
            self.counter.reads += 1;
        }
        Ok(())
    }

    /// Flush the page to the disk.
    fn flush(&mut self, (pid, (page, is_fragile)): (Pid, (P, bool))) -> Result<()> {
        if is_fragile {
            let mut buffer = GenericArray::<u8, P::Size>::default();
            let mut writer = io::Cursor::new(buffer.as_mut_slice());
            page.encode_into(&mut writer)?;
            let offset = (*pid as u64) * Self::PS;
            self.file.seek(io::SeekFrom::Start(offset))?;
            self.file.write_all(writer.get_ref())?;
            self.counter.writes += 1;
        }
        Ok(())
    }

    /// Flush all pages in the cache.
    #[inline]
    pub fn flush_all(&mut self) -> Result<()> {
        self.cache
            .empty()
            .wrap_err("failed to empty the cache")?
            .into_iter()
            .try_for_each(|entry| self.flush(entry).wrap_err("failed to flush the page"))
    }

    /// If the page is uncached, fetch the page from the disk, and cache it.
    /// Returns a reference to the cached page.
    pub fn get(&mut self, pid: Pid) -> Result<&P> {
        self.fetch(pid).wrap_err("failed to fetch the page")?;
        Ok(&self
            .cache
            .get(&pid)
            .wrap_err("requested entry is absent from the cache")?
            .0)
    }

    /// If the page is uncached, fetch the page from the disk, and cache it.
    /// Returns a mutable reference to the cached page.
    pub fn get_mut(&mut self, pid: Pid) -> Result<&mut P> {
        self.fetch(pid).wrap_err("failed to fetch the page")?;
        let (page, is_fragile) = self
            .cache
            .get_mut(&pid)
            .wrap_err("requested entry is absent from the cache")?;
        // Mark the page as fragile before returning because
        // it is supposed to be modified
        *is_fragile = true;
        Ok(page)
    }

    /// Insert a new page at the provided location in the file.
    pub fn insert(&mut self, pid: u32, page: P) -> Result<Pid> {
        let cur_max = self
            .vacants
            .max()
            .ok_or_eyre("no max Pid found: empty bitmap")?;
        if cur_max <= pid {
            self.vacants.insert_range(cur_max..=pid + 1);
        }
        if !self.vacants.remove(pid) {
            bail!("overwriting an occupied page: {:?}", pid)
        }
        self.counter.pages += 1;
        self.push(Pid(pid), (page, true))
            .wrap_err("failed to push the page")?;
        Ok(Pid(pid))
    }

    /// Check if the page corresponding to the given Pid is cached.
    #[inline]
    pub fn is_cached(&self, pid: &Pid) -> bool {
        self.cache.is_cached(pid)
    }

    /// Check if the page corresponding to the given Pid is pinned.
    #[inline]
    pub fn is_pinned(&self, pid: &Pid) -> bool {
        self.cache.is_pinned(pid)
    }

    /// Pin the page corresponding to the given Pid in the cache.
    /// This prevents the page from getting automatically evicted when cache overflows.
    pub fn pin(&mut self, pid: Pid) -> Result<()> {
        self.fetch(pid).wrap_err("failed to fetch the page")?;
        self.cache
            .pin(&pid)
            .wrap_err("failed to pin the cache entry")
    }

    /// If the page is cached, pop and return, otherwise fetch from the disk.
    pub fn pop(&mut self, pid: Pid) -> Result<P> {
        self.fetch(pid).wrap_err("failed to fetch the page")?;
        let page = self.cache.pop(&pid).wrap_err("failed to pop the cache")?.0;
        self.vacants.insert(*pid);
        self.minimize_vacancies()?;
        self.counter.pages -= 1;
        Ok(page)
    }

    /// Cache the new page. If cache is full, remove the LRU entry,
    /// while making sure it is synced to the disk.
    fn push(&mut self, pid: Pid, entry: (P, bool)) -> Result<()> {
        if let Some(stale) = self
            .cache
            .push(pid, entry)
            .wrap_err("failed to push the new cache entry")?
        {
            self.flush(stale)
                .wrap_err("failed to flush the stale page")?;
        }
        Ok(())
    }

    /// Save the given page anywhere in the file.
    pub fn save(&mut self, page: P) -> Result<Pid> {
        let at = if self.vacants.len() == 1 {
            let new_pid = self.vacants.max().expect("no vacant Pid to return");
            self.vacants.insert(new_pid + 1);
            new_pid
        } else {
            self.vacants.min().expect("no min Pid found: empty bitmap")
        };
        self.insert(at, page)
    }

    /// Unpid the page corresponding to the given Pid from the cache.
    #[inline]
    pub fn unpin(&mut self, pid: Pid) -> Result<()> {
        self.cache
            .unpin(&pid)
            .wrap_err("failed to unpin the cache entry")
    }
}