persy 1.8.0

Transactional Persistence Engine
Documentation
use crate::{
    error::PERes,
    util::io::{ArcSliceRead, InfallibleRead, InfallibleWrite, read_u64, write_u64},
};
use std::{
    cmp,
    fs::File,
    io::{self, Cursor, Read, Write},
    sync::Arc,
};
mod common;
mod file_device;
pub(crate) use file_device::FileDevice;
mod memory_device;
pub(crate) use memory_device::MemoryDevice;
#[cfg(test)]
mod tests;

pub const PAGE_METADATA_SIZE: u32 = 3;
const SIZE_OFFSET: usize = 0;
const FLAGS_OFFSET: usize = SIZE_OFFSET + 1;
//Data and next offset are the same because are used in different states.
const DATA_OFFSET: usize = FLAGS_OFFSET + 1;
const NEXT_OFFSET: usize = FLAGS_OFFSET + 1;
const PREV_OFFSET: usize = NEXT_OFFSET + 8;
pub trait UpdateList {
    fn update(&mut self, exp: u8, page: u64) -> PERes<u64>;
    fn remove(&mut self, exp: u8, page: u64, next: u64, check: bool) -> PERes<()>;
    fn remove_last(&mut self, exp: u8, page: u64, next: u64, check: bool) -> PERes<()>;
}

pub trait Device: Sync + Send {
    fn load_page(&self, page: u64) -> PERes<ReadPage>;

    fn load_page_if_exists(&self, page: u64) -> PERes<Option<ReadPage>>;

    /// Load a page avoiding to skip base page metadata, used for root page or metadata
    /// manipulation.
    ///
    fn load_page_raw(&self, page: u64, size_exp: u8) -> PERes<Page>;

    fn flush_page(&self, page: &Page) -> PERes<()>;

    /// Create a page without setting metadata, used by root page
    fn create_page_raw(&self, exp: u8) -> PERes<u64>;

    fn create_page(&self, exp: u8) -> PERes<Page>;

    fn mark_allocated(&self, page: u64) -> PERes<u64>;

    fn sync(&self) -> PERes<()>;

    fn trim_or_free_page(&self, page: u64, update_list: &mut dyn UpdateList) -> PERes<()>;

    fn trim_end_pages(&self, update_list: &mut dyn UpdateList) -> PERes<()>;

    fn load_free_page(&self, page: u64) -> PERes<FreePage>;

    fn flush_free_page(&self, page: &FreePage) -> PERes<()>;

    fn exp_from_content_size(&self, size: u64) -> u8 {
        // content + size + match_pointer:page+ match_pointer:pos  + page_header
        let final_size = size + u64::from(PAGE_METADATA_SIZE);
        let final_size = u64::max(final_size, 32);
        // Should be there a better way, so far is OK.
        let mut res: u8 = 1;
        loop {
            if final_size < (1 << res) {
                return res;
            }
            res += 1;
        }
    }

    #[cfg(test)]
    fn release_file_lock(&self) -> PERes<()>;
}

pub(crate) trait SizeTool {
    fn len(&self) -> PERes<u64>;
    fn set_len(&mut self, len: u64) -> PERes<()>;
}

impl SizeTool for File {
    fn len(&self) -> PERes<u64> {
        Ok(self.metadata()?.len())
    }
    fn set_len(&mut self, len: u64) -> PERes<()> {
        Ok(File::set_len(self, len)?)
    }
}

impl SizeTool for Cursor<Vec<u8>> {
    fn len(&self) -> PERes<u64> {
        Ok(self.get_ref().len() as u64)
    }
    fn set_len(&mut self, len: u64) -> PERes<()> {
        self.get_mut().resize(len as usize, 0);
        Ok(())
    }
}

pub struct ReadPage {
    buff: Arc<Vec<u8>>,
    index: u64,
    exp: u8,
    pos: usize,
}

impl ReadPage {
    pub fn new(buff: Arc<Vec<u8>>, pos: usize, index: u64, exp: u8) -> ReadPage {
        ReadPage { buff, index, exp, pos }
    }
}

impl ReadPage {
    pub fn get_size_exp(&mut self) -> u8 {
        self.exp
    }

    pub fn slice(&self, len: usize) -> &[u8] {
        let final_len = self.pos + len;
        debug_assert!(final_len < self.buff.len() - 1);
        &self.buff[self.pos..final_len]
    }
    pub fn arc_slice(&self, len: usize) -> ArcSliceRead {
        ArcSliceRead::new(self.buff.clone(), self.pos, self.pos + len)
    }
    pub fn content(&self) -> Vec<u8> {
        self.buff[DATA_OFFSET..self.buff.len() - 1].to_vec()
    }
}

impl Read for ReadPage {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        let len = self.buff.len() - 1;
        let amt = cmp::min(self.pos, len);
        let read = Read::read(&mut &self.buff[amt..len], buf)?;
        self.pos += read;
        Ok(read)
    }
}
impl InfallibleRead for ReadPage {
    fn read_exact(&mut self, buf: &mut [u8]) {
        Read::read_exact(self, buf).expect("in memory buff never fail");
    }

    fn read_slice(&mut self, size: usize) -> ArcSliceRead {
        // There is a -1 because the last byte is metadata
        debug_assert!(self.buff.len() - 1 > self.pos + size);
        let read = ArcSliceRead::new(self.buff.clone(), self.pos, size);
        self.pos += size;
        read
    }
}

impl PageOps for ReadPage {
    fn seek(&mut self, pos: u32) {
        debug_assert!(pos < self.get_content_size());
        self.pos = pos as usize + DATA_OFFSET;
    }
    fn get_index(&self) -> u64 {
        self.index
    }
    fn get_size_exp(&self) -> u8 {
        self.exp
    }
    fn get_content_size(&self) -> u32 {
        (1_u32 << self.exp as u32) - PAGE_METADATA_SIZE
    }
    fn clone_read(&self) -> ReadPage {
        ReadPage::new(self.buff.clone(), DATA_OFFSET, self.index, self.exp)
    }
    fn clone_write(&self) -> Page {
        Page::new(self.buff.as_ref().clone(), DATA_OFFSET, self.index, self.exp)
    }
    fn is_free(&self) -> PERes<bool> {
        Ok(is_free(self.buff[1]))
    }
    fn cursor_pos(&self) -> usize {
        self.pos.saturating_sub(DATA_OFFSET)
    }
}

pub struct Page {
    buff: Vec<u8>,
    index: u64,
    exp: u8,
    pos: usize,
}

pub trait PageOps {
    /// Seek the internal cursor of the page to the specified absolute position
    fn seek(&mut self, pos: u32);
    fn get_index(&self) -> u64;
    fn get_size_exp(&self) -> u8;
    /// Available space for the content excluding all the metadata
    fn get_content_size(&self) -> u32;
    fn clone_read(&self) -> ReadPage;
    fn clone_write(&self) -> Page;
    fn is_free(&self) -> PERes<bool>;
    fn cursor_pos(&self) -> usize;
}

fn free_list_head_flag_set(mut cur: u8, head: bool) -> u8 {
    if head {
        cur |= 0b0100_0000;
    } else {
        cur &= !0b0100_0000;
    }
    cur
}
fn is_free_list_head(cur: u8) -> bool {
    cur & 0b0100_0000 != 0
}

fn free_flag_set(mut cur: u8, free: bool) -> u8 {
    if free {
        cur |= 0b1000_0000;
    } else {
        cur &= !0b1000_0000;
    }
    cur
}
fn is_free(cur: u8) -> bool {
    cur & 0b1000_0000 != 0
}
impl Page {
    pub fn new(buff: Vec<u8>, pos: usize, index: u64, exp: u8) -> Page {
        Page { buff, index, exp, pos }
    }
    pub fn new_alloc(index: u64, exp: u8) -> Page {
        let size = 1 << exp;
        let mut buff = vec![0; size];
        buff[0] = exp;
        buff[1] = free_flag_set(0, false);
        buff[size - 1] = exp;
        Self::new(buff, DATA_OFFSET, index, exp)
    }
    pub fn clone_and_reset(&self) -> Page {
        Page {
            buff: self.buff.clone(),
            index: self.index,
            exp: self.exp,
            pos: 2,
        }
    }
}

impl Read for Page {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        // Last byte of the page is metadata so skip it
        let len = self.buff.len() - 1;
        let amt = cmp::min(self.pos, len);
        let read = Read::read(&mut &self.buff[amt..len], buf)?;
        self.pos += read;
        Ok(read)
    }
}

impl InfallibleRead for Page {
    fn read_exact(&mut self, buf: &mut [u8]) {
        Read::read_exact(self, buf).expect("in memory buff never fail");
    }

    fn read_slice(&mut self, size: usize) -> ArcSliceRead {
        let mut slice: Vec<u8> = vec![0; size];
        InfallibleRead::read_exact(self, &mut slice);
        ArcSliceRead::new_vec(slice)
    }
}

impl Write for Page {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        let pre = self.pos;
        // Last byte of the page is metadata so skip it
        let len = self.buff.len() - 1;
        if pre + (*buf).len() > len {
            panic!(
                "Over page allowed content size:{}, data size: {}",
                len,
                pre + (*buf).len()
            );
        }
        let pos = cmp::min(self.pos, len);
        let amt = (&mut self.buff[pos..len]).write(buf)?;
        self.pos += amt;
        Ok(amt)
    }

    fn flush(&mut self) -> io::Result<()> {
        self.buff.flush()
    }
}
impl InfallibleWrite for Page {
    fn write_all(&mut self, buf: &[u8]) {
        Write::write_all(self, buf).expect("in memory write should never fail")
    }
}

impl PageOps for Page {
    fn seek(&mut self, pos: u32) {
        debug_assert!(pos < self.get_content_size());
        self.pos = pos as usize + DATA_OFFSET;
    }
    fn get_index(&self) -> u64 {
        self.index
    }
    fn get_size_exp(&self) -> u8 {
        self.exp
    }
    fn clone_read(&self) -> ReadPage {
        ReadPage::new(Arc::new(self.buff.clone()), DATA_OFFSET, self.index, self.exp)
    }
    fn clone_write(&self) -> Page {
        self.clone_and_reset()
    }
    fn is_free(&self) -> PERes<bool> {
        Ok(is_free(self.buff[1]))
    }

    fn get_content_size(&self) -> u32 {
        (1_u32 << self.exp as u32) - PAGE_METADATA_SIZE
    }

    fn cursor_pos(&self) -> usize {
        self.pos.saturating_sub(DATA_OFFSET)
    }
}

impl Page {
    pub fn reset(&mut self) -> PERes<()> {
        self.buff = vec![0; self.buff.len()];
        self.buff[0] = self.exp;
        let end = self.buff.len() - 1;
        self.buff[end] = self.exp;
        Ok(())
    }

    pub fn make_read(self) -> ReadPage {
        ReadPage::new(Arc::new(self.buff), DATA_OFFSET, self.index, self.exp)
    }
}

/// Page only with the metadata and data when a page is free
pub struct FreePage {
    index: u64,
    buff: [u8; 32],
}

impl FreePage {
    fn new(index: u64, buffer: [u8; 32]) -> Self {
        Self { index, buff: buffer }
    }

    pub fn set_free(&mut self, free: bool) -> PERes<()> {
        self.buff[1] = free_flag_set(self.buff[1], free);
        Ok(())
    }

    pub fn set_next_free(&mut self, next: u64) {
        write_u64(&mut self.buff[NEXT_OFFSET..NEXT_OFFSET + 8], next)
    }

    pub fn get_next_free(&self) -> u64 {
        read_u64(&self.buff[NEXT_OFFSET..NEXT_OFFSET + 8])
    }

    pub fn set_prev_free(&mut self, next: u64) {
        write_u64(&mut self.buff[PREV_OFFSET..PREV_OFFSET + 8], next)
    }

    pub fn get_prev_free(&self) -> u64 {
        read_u64(&self.buff[PREV_OFFSET..PREV_OFFSET + 8])
    }

    pub fn is_free(&self) -> PERes<bool> {
        Ok(is_free(self.buff[1]))
    }

    pub fn get_index(&self) -> u64 {
        self.index
    }

    pub fn get_size_exp(&self) -> u8 {
        self.buff[0]
    }

    pub fn remove_free_list_head_flag(&mut self) {
        self.buff[1] = free_list_head_flag_set(self.buff[1], false);
    }

    pub fn is_free_list_head(&self) -> bool {
        is_free_list_head(self.buff[1])
    }
}