persy 0.9.0

Transactional Persistence Engine
Documentation
use crate::error::PRes;
use byteorder::{BigEndian, ByteOrder, ReadBytesExt, WriteBytesExt};
use std::{
    cmp,
    fs::File,
    io::{self, Read, Seek, SeekFrom, Write},
    sync::{Arc, Mutex},
};
pub const PAGE_METADATA_SIZE: u32 = 2;

pub struct FileHandler {
    file: File,
    metadata_changed: bool,
}

pub trait UpdateList {
    fn update(&mut self, exp: u8, page: u64) -> PRes<u64>;
}

pub trait Device: Sync + Send {
    fn load_page(&self, page: u64) -> PRes<ReadPage>;

    /// Load a page avoiding to skip base page metadata, used for root page or metadata
    /// manipulation.
    ///
    fn load_page_raw(&self, page: u64, size_exp: u8) -> PRes<Page>;

    fn flush_page(&self, page: &Page) -> PRes<()>;

    /// Create a page without setting metadata, used by root page
    fn create_page_raw(&self, exp: u8) -> PRes<u64>;

    fn create_page(&self, exp: u8) -> PRes<Page>;

    fn mark_allocated(&self, page: u64) -> PRes<u64>;

    fn sync(&self) -> PRes<()>;

    fn trim_or_free_page(&self, page: u64, update_list: &mut dyn UpdateList) -> PRes<()>;
}

pub struct DiscRef {
    file: Mutex<FileHandler>,
}

pub struct ReadPage {
    buff: Arc<Vec<u8>>,
    index: u64,
    exp: u8,
    pos: usize,
}
impl ReadPage {
    pub fn new(buff: Arc<Vec<u8>>, pos: usize, index: u64, exp: u8) -> ReadPage {
        ReadPage { buff, index, exp, pos }
    }
}

impl ReadPage {
    pub fn get_size_exp(&mut self) -> u8 {
        self.exp
    }

    pub fn slice(&self, len: usize) -> &[u8] {
        let final_len = self.pos + len;
        debug_assert!(final_len < self.buff.len());
        &self.buff[self.pos..final_len]
    }
}

impl Read for ReadPage {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        let len = self.buff.len();
        let amt = cmp::min(self.pos, len);
        let read = Read::read(&mut &self.buff[(amt as usize)..len], buf)?;
        self.pos += read;
        Ok(read)
    }
}

impl PageOps for ReadPage {
    fn seek(&mut self, pos: u32) -> PRes<()> {
        self.pos = (pos + 2) as usize;
        Ok(())
    }
    fn get_index(&self) -> u64 {
        self.index
    }
    fn get_size_exp(&self) -> u8 {
        self.exp
    }
    fn clone_read(&self) -> ReadPage {
        ReadPage::new(self.buff.clone(), 2, self.index, self.exp)
    }
    fn clone_write(&self) -> Page {
        Page::new(self.buff.as_ref().clone(), 2, self.index, self.exp)
    }
    fn is_free(&self) -> PRes<bool> {
        Ok((self.buff[1] & 0b1000_0000) != 0)
    }
}

#[derive(Clone)]
pub struct Page {
    buff: Vec<u8>,
    index: u64,
    exp: u8,
    pos: usize,
}

pub trait PageOps {
    fn seek(&mut self, pos: u32) -> PRes<()>;
    fn get_index(&self) -> u64;
    fn get_size_exp(&self) -> u8;
    fn clone_read(&self) -> ReadPage;
    fn clone_write(&self) -> Page;
    fn is_free(&self) -> PRes<bool>;
}

fn free_flag_set(mut cur: u8, free: bool) -> u8 {
    if free {
        cur |= 0b1000_0000;
    } else {
        cur &= !0b1000_0000;
    }
    cur
}
fn is_free(cur: u8) -> bool {
    cur & 0b1000_0000 != 0
}
impl Page {
    pub fn new(buff: Vec<u8>, pos: usize, index: u64, exp: u8) -> Page {
        Page { buff, index, exp, pos }
    }
    pub fn clone_resetted(&self) -> Page {
        let mut page = self.clone();
        page.pos = 2;
        page
    }
}

impl Read for Page {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        let len = self.buff.len();
        let amt = cmp::min(self.pos, len);
        let read = Read::read(&mut &self.buff[(amt as usize)..len], buf)?;
        self.pos += read;
        Ok(read)
    }
}

impl Write for Page {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        let pre = self.pos;
        let len = self.buff.len();
        if pre + (*buf).len() > len {
            panic!(
                "Over page allowed content size:{}, data size: {}",
                len,
                pre + (*buf).len()
            );
        }
        let pos = cmp::min(self.pos, len);
        let amt = (&mut self.buff[(pos as usize)..len]).write(buf)?;
        self.pos += amt;
        Ok(amt)
    }

    fn flush(&mut self) -> io::Result<()> {
        self.buff.flush()
    }
}

impl PageOps for Page {
    fn seek(&mut self, pos: u32) -> PRes<()> {
        self.pos = (pos + 2) as usize;
        Ok(())
    }
    fn get_index(&self) -> u64 {
        self.index
    }
    fn get_size_exp(&self) -> u8 {
        self.exp
    }
    fn clone_read(&self) -> ReadPage {
        ReadPage::new(Arc::new(self.buff.clone()), 2, self.index, self.exp)
    }
    fn clone_write(&self) -> Page {
        self.clone_resetted()
    }
    fn is_free(&self) -> PRes<bool> {
        Ok(is_free(self.buff[1]))
    }
}

impl Page {
    pub fn set_free(&mut self, free: bool) -> PRes<()> {
        self.buff[1] = free_flag_set(self.buff[1], free);
        Ok(())
    }

    pub fn set_next_free(&mut self, next: u64) -> PRes<()> {
        let pre = self.pos;
        self.pos = 2;
        self.write_u64::<BigEndian>(next)?;
        self.pos = pre;
        Ok(())
    }

    pub fn get_next_free(&mut self) -> PRes<u64> {
        let pre = self.pos;
        self.pos = 2;
        let val = self.read_u64::<BigEndian>()?;
        self.pos = pre;
        Ok(val)
    }

    pub fn reset(&mut self) -> PRes<()> {
        self.buff = vec![0; self.buff.len()];
        self.buff[0] = self.exp;
        Ok(())
    }

    pub fn make_read(self) -> ReadPage {
        ReadPage::new(Arc::new(self.buff), 2, self.index, self.exp)
    }
}

impl DiscRef {
    pub fn new(file: File) -> DiscRef {
        DiscRef {
            file: Mutex::new(FileHandler {
                file,
                metadata_changed: false,
            }),
        }
    }
}

impl Device for DiscRef {
    fn load_page(&self, page: u64) -> PRes<ReadPage> {
        // add 2 to skip the metadata
        let mut ve;
        let exp;
        {
            let fl = &mut self.file.lock()?.file;
            fl.seek(SeekFrom::Start(page))?;
            exp = fl.read_u8()?;
            let size = (1 << exp) as u64; //EXP - (size_exp+size_mitigator);
            ve = vec![0 as u8; size as usize];
            ve[0] = exp;
            fl.read_exact(&mut ve[1..size as usize])?;
        }
        Ok(ReadPage::new(Arc::new(ve), 2, page, exp))
    }

    /// Load a page avoiding to skip base page metadata, used for root page or metadata
    /// manipulation.
    ///
    fn load_page_raw(&self, page: u64, size_exp: u8) -> PRes<Page> {
        let mut ve;
        let size;
        {
            let fl = &mut self.file.lock()?.file;
            fl.seek(SeekFrom::Start(page))?;
            size = (1 << size_exp) as u64; //EXP - (size_exp+size_mitigator);
            ve = vec![0 as u8; size as usize];
            fl.read_exact(&mut ve[0..size as usize])?;
        }
        Ok(Page::new(ve, 0, page, size_exp))
    }

    fn flush_page(&self, page: &Page) -> PRes<()> {
        let fl = &mut self.file.lock()?.file;
        fl.seek(SeekFrom::Start(page.index))?;
        fl.write_all(&page.buff)?;
        Ok(())
    }

    /// Create a page without setting metadata, used by root page
    fn create_page_raw(&self, exp: u8) -> PRes<u64> {
        let lock = &mut self.file.lock()?;
        lock.metadata_changed = true;
        let fl = &mut lock.file;
        let offset = fl.seek(SeekFrom::End(0))?;
        let size: u64 = (1 << exp) as u64; //EXP - (size_exp+size_mitigator);
        fl.set_len(offset + size)?;
        Ok(offset)
    }

    fn create_page(&self, exp: u8) -> PRes<Page> {
        let size: u64 = (1 << exp) as u64; //EXP - (size_exp+size_mitigator);
        let mut ve = vec![0 as u8; size as usize];
        ve[0] = exp;
        let lock = &mut self.file.lock()?;
        lock.metadata_changed = true;
        let fl = &mut lock.file;
        let offset = fl.seek(SeekFrom::End(0))?;
        fl.write_all(&ve)?; //exp
        Ok(Page::new(ve, 2, offset, exp))
    }

    fn mark_allocated(&self, page: u64) -> PRes<u64> {
        let fl = &mut self.file.lock()?.file;
        fl.seek(SeekFrom::Start(page + 2))?;
        // Free pages are a linked list reading the next page and return
        let next = fl.read_u64::<BigEndian>()?;
        fl.seek(SeekFrom::Start(page + 1))?;
        let mut moderator = fl.read_u8()?;
        moderator = free_flag_set(moderator, false);
        fl.seek(SeekFrom::Start(page + 1))?;
        fl.write_u8(moderator)?;
        Ok(next)
    }

    fn sync(&self) -> PRes<()> {
        let to_sync;
        let metadata_changed;
        {
            let mut lock = self.file.lock()?;
            to_sync = lock.file.try_clone()?;
            metadata_changed = lock.metadata_changed;
            lock.metadata_changed = false;
        }
        if metadata_changed {
            to_sync.sync_all()?;
        } else {
            to_sync.sync_data()?;
        }
        Ok(())
    }

    fn trim_or_free_page(&self, page: u64, update_list: &mut dyn UpdateList) -> PRes<()> {
        let fl = &mut self.file.lock()?.file;
        fl.seek(SeekFrom::Start(page))?;
        let exp = fl.read_u8()?;
        let size = (1 << exp) as u64; //EXP - (size_exp+size_mitigator);
        if page + size == fl.metadata()?.len() {
            fl.set_len(page + 1)?;
        } else {
            let old = update_list.update(exp, page)?;
            fl.seek(SeekFrom::Start(page + 1))?;
            let mut moderator = fl.read_u8()?;
            debug_assert!(!is_free(moderator), "freeing: {} already freed ", page);
            moderator = free_flag_set(moderator, true);
            let mut data: [u8; 9] = [0; 9];
            data[0] = moderator;
            BigEndian::write_u64(&mut data[1..9], old);
            fl.seek(SeekFrom::Start(page + 1))?;
            fl.write_all(&data)?;
            if exp >= 5 {
                fl.write_all(&[0u8; 16])?;
            }
        }
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::{Device, DiscRef, PageOps, UpdateList};
    use crate::error::PRes;
    use byteorder::{ReadBytesExt, WriteBytesExt};
    use tempfile::Builder;

    #[test]
    fn create_load_flush_page() {
        let file = Builder::new()
            .prefix("disc_ref.raw")
            .suffix(".persy")
            .tempfile()
            .unwrap()
            .reopen()
            .unwrap();
        let disc = DiscRef::new(file);
        let page = disc.create_page(5).unwrap().get_index();
        let pg = disc.load_page(page).unwrap();
        disc.flush_page(&mut pg.clone_write()).unwrap();
    }

    #[test]
    fn set_get_next_free() {
        let file = Builder::new()
            .prefix("set_free.raw")
            .suffix(".persy")
            .tempfile()
            .unwrap()
            .reopen()
            .unwrap();
        let disc = DiscRef::new(file);
        let page = disc.create_page(5).unwrap().get_index();
        let pg = &mut disc.load_page(page).unwrap().clone_write();
        pg.set_next_free(30).unwrap();
        disc.flush_page(pg).unwrap();
        let pg1 = &mut disc.load_page(page).unwrap().clone_write();
        let val = pg1.get_next_free().unwrap();
        assert_eq!(val, 30);
    }

    #[test]
    fn get_size_page() {
        let file = Builder::new()
            .prefix("get_size.raw")
            .suffix(".persy")
            .tempfile()
            .unwrap()
            .reopen()
            .unwrap();
        let disc = DiscRef::new(file);
        let page = disc.create_page(5).unwrap().get_index();
        let pg = &mut disc.load_page(page).unwrap();
        let sz = pg.get_size_exp();
        assert_eq!(sz, 5);
    }

    #[test]
    fn write_read_page() {
        let file = Builder::new()
            .prefix("write_read.raw")
            .suffix(".persy")
            .tempfile()
            .unwrap()
            .reopen()
            .unwrap();
        let disc = DiscRef::new(file);
        let page = disc.create_page(5).unwrap().get_index();
        {
            let pg = &mut disc.load_page(page).unwrap().clone_write();
            pg.write_u8(10).unwrap();
            disc.flush_page(pg).unwrap();
        }
        {
            let pg = &mut disc.load_page(page).unwrap();
            let va = pg.read_u8().unwrap();
            assert_eq!(va, 10);
            let sz = pg.get_size_exp();
            assert_eq!(sz, 5);
        }
    }

    struct PanicCase {}
    impl UpdateList for PanicCase {
        fn update(&mut self, _: u8, _: u64) -> PRes<u64> {
            panic!("should not put the free in the free list")
        }
    }

    #[test]
    fn create_load_trim() {
        let file = Builder::new()
            .prefix("disc_ref_trim.raw")
            .suffix(".persy")
            .tempfile()
            .unwrap()
            .reopen()
            .unwrap();
        let disc = DiscRef::new(file);
        let page = disc.create_page(5).unwrap().get_index();
        let pg = &mut disc.load_page(page).unwrap().clone_write();
        disc.flush_page(pg).unwrap();
        disc.trim_or_free_page(page, &mut PanicCase {}).unwrap();
        assert!(disc.load_page(page).is_err());
    }

    struct IgnoreCase {}
    impl UpdateList for IgnoreCase {
        fn update(&mut self, _: u8, _: u64) -> PRes<u64> {
            Ok(0)
        }
    }

    #[test]
    fn create_not_trim_not_last() {
        let file = Builder::new()
            .prefix("disc_ref_no_trim.raw")
            .suffix(".persy")
            .tempfile()
            .unwrap()
            .reopen()
            .unwrap();
        let disc = DiscRef::new(file);
        let page = disc.create_page(5).unwrap().get_index();
        let _page_after = disc.create_page(5).unwrap();
        let pg = &mut disc.load_page(page).unwrap().clone_write();
        disc.flush_page(pg).unwrap();
        disc.trim_or_free_page(page, &mut IgnoreCase {}).unwrap();
        let load_page = disc.load_page(page);
        assert!(load_page.is_ok());
        assert!(load_page.unwrap().is_free().unwrap());
    }
}