persy 0.7.0

Transactional Persistence Engine
Documentation
use crate::{
    config::Config,
    discref::{DiscRef, Page, PageIndex, PageSeek},
    error::PRes,
    flush_checksum::{double_buffer_check, prepare_buffer_flush},
};
use byteorder::{BigEndian, ByteOrder};
use linked_hash_map::LinkedHashMap;
use std::{
    io::{self, Read, Write},
    sync::Mutex,
};

const ALLOCATOR_PAGE_EXP: u8 = 10; // 2^9

pub struct Cache {
    cache: LinkedHashMap<u64, Page>,
    size: u64,
    limit: u64,
}

impl Cache {
    pub fn new(limit: u64) -> Cache {
        Cache {
            cache: LinkedHashMap::new(),
            size: 0,
            limit,
        }
    }
    fn get(&mut self, key: u64) -> Option<Page> {
        self.cache.get_refresh(&key).map(|val| val.clone_resetted())
    }
    fn put(&mut self, key: u64, value: &Page) {
        self.size += 1 << value.get_size_exp();
        self.cache.insert(key, value.clone_resetted());
        while self.size > self.limit {
            if let Some(en) = self.cache.pop_front() {
                self.size -= 1 << en.1.get_size_exp();
            } else {
                break;
            }
        }
    }
}

struct FreeList {
    list: [u64; 32],
    last_flush: u8,
}

// TODO: Manage defragmentation by merging/splitting pages in the free list
pub struct Allocator {
    disc: DiscRef,
    freelist: Mutex<FreeList>,
    cache: Mutex<Cache>,
    page: u64,
}

pub struct ReadPage {
    page: Page,
}

impl ReadPage {
    pub fn get_size_exp(&mut self) -> u8 {
        self.page.get_size_exp()
    }
}

impl Read for ReadPage {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        self.page.read(buf)
    }
}

impl PageSeek for ReadPage {
    fn seek(&mut self, pos: u32) -> PRes<()> {
        self.page.seek(pos)
    }
}
impl PageIndex for ReadPage {
    fn get_index(&self) -> u64 {
        self.page.get_index()
    }
}

impl Allocator {
    pub fn new(dr: DiscRef, config: &Config, page: u64) -> PRes<Allocator> {
        let mut buffer_0 = [0; 259];
        let mut buffer_1 = [0; 259];
        let freelist;
        let last_flush;

        {
            let mut page = dr.load_page(page)?;
            page.read_exact(&mut buffer_0)?;
            page.read_exact(&mut buffer_1)?;
            let (flush_number, first) = double_buffer_check(&buffer_0, &buffer_1);
            last_flush = flush_number;
            if first {
                freelist = Allocator::read_free_list(&buffer_0);
            } else {
                freelist = Allocator::read_free_list(&buffer_1);
            }
        }
        let cache_size = config.cache_size();
        Ok(Allocator {
            disc: dr,
            freelist: Mutex::new(FreeList {
                list: freelist,
                last_flush,
            }),
            cache: Mutex::new(Cache::new(cache_size)),
            page,
        })
    }
    pub fn read_free_list(buffer: &[u8]) -> [u64; 32] {
        let mut freelist = [0; 32];
        for p in &mut freelist.iter_mut().enumerate() {
            let pos = 8 * p.0;
            (*p.1) = BigEndian::read_u64(&buffer[pos..pos + 8]);
        }
        freelist
    }

    pub fn init(dr: &DiscRef) -> PRes<u64> {
        let page = dr.create_page(ALLOCATOR_PAGE_EXP)?;
        let mut buffer = Allocator::write_free_list(&[0; 32]);
        prepare_buffer_flush(&mut buffer, 0);
        let mut pag = dr.load_page(page)?;
        pag.write_all(&buffer)?;
        dr.flush_page(&pag)?;
        Ok(page)
    }

    pub fn load_page(&self, page: u64) -> PRes<ReadPage> {
        Ok(ReadPage {
            page: self.write_page(page)?,
        })
    }

    pub fn write_page(&self, page: u64) -> PRes<Page> {
        let load = self.write_page_int(page)?;
        debug_assert!(!load.is_free()?);
        Ok(load)
    }

    fn write_page_int(&self, page: u64) -> PRes<Page> {
        {
            let mut cache = self.cache.lock()?;

            if let Some(pg) = cache.get(page) {
                return Ok(pg);
            }
        }
        let load = self.disc.load_page(page)?;
        {
            let mut cache = self.cache.lock()?;
            cache.put(page, &load);
        }
        Ok(load)
    }

    pub fn allocate(&self, exp: u8) -> PRes<u64> {
        {
            let mut fl = self.freelist.lock()?;
            let page = fl.list[exp as usize];
            if page != 0 as u64 {
                let next;
                {
                    let mut pg = self.write_page_int(page)?;
                    next = pg.get_next_free()?;
                    debug_assert!(pg.is_free()?);
                    pg.reset()?;
                    pg.set_free(false)?;
                    self.flush_page(&mut pg)?;
                }
                fl.list[exp as usize] = next;
                return Ok(page);
            }
        }
        self.disc.create_page(exp)
    }

    pub fn flush_page(&self, page: &mut Page) -> PRes<()> {
        self.disc.flush_page(page)?;
        {
            let mut cache = self.cache.lock()?;
            cache.put(page.get_index(), page);
        }
        Ok(())
    }

    pub fn remove_from_free(&self, page: u64, exp: u8) -> PRes<()> {
        //TODO: this at the moment may leak free pages on recover after crash
        let mut fl = self.freelist.lock()?;
        if fl.list[exp as usize] == page {
            fl.list[exp as usize] = 0;
        } else {
            let mut p = fl.list[exp as usize];
            while p != 0 as u64 {
                let mut pg = self.write_page_int(p)?;
                p = pg.get_next_free()?;
                if p == page {
                    pg.set_next_free(0)?;
                    self.flush_page(&mut pg)?;
                    break;
                }
            }
        }
        Ok(())
    }

    pub fn free(&self, page: u64) -> PRes<()> {
        let mut fl = self.freelist.lock()?;
        let result_load = self.disc.trim_or_load_page(page)?;
        if let Some(mut pag) = result_load {
            debug_assert!(!pag.is_free()?, "freeing: {} already freed ", page);
            pag.set_free(true)?;
            let size = pag.get_size_exp();
            let old = fl.list[size as usize];
            debug_assert!(old != page, "freeing: {} already free: {} ", page, old);
            fl.list[size as usize] = page;
            pag.set_next_free(old)?;
            self.flush_page(&mut pag)?
        };
        Ok(())
    }

    pub fn write_free_list(list: &[u64]) -> [u8; 259] {
        let mut buffer = [0; 259];
        for (pos, page) in list.iter().enumerate() {
            BigEndian::write_u64(&mut buffer[pos..pos + 8], *page);
        }
        buffer
    }

    pub fn flush_free_list(&self) -> PRes<()> {
        let mut lock = self.freelist.lock()?;
        let mut buffer = Allocator::write_free_list(&lock.list);
        let (last_flush, offset) = prepare_buffer_flush(&mut buffer, lock.last_flush);
        lock.last_flush = last_flush;
        let mut pag = self.disc.load_page(self.page)?;
        pag.seek(offset)?;
        pag.write_all(&buffer)?;
        self.disc.flush_page(&pag)?;
        // I do not do the disk sync here because is every time done by the caller.
        Ok(())
    }

    pub fn disc(&self) -> &DiscRef {
        &self.disc
    }
}

#[cfg(test)]
mod tests {
    use super::{Allocator, Cache};
    use crate::{
        config::Config,
        discref::{DiscRef, Page},
    };
    use std::rc::Rc;
    use tempfile::Builder;

    #[test]
    fn test_reuse_freed_page() {
        let file = Builder::new()
            .prefix("all_reuse_test")
            .suffix(".persy")
            .tempfile()
            .unwrap()
            .reopen()
            .unwrap();
        let disc = DiscRef::new(file);
        let pg = Allocator::init(&disc).unwrap();
        let allocator = Allocator::new(disc, &Rc::new(Config::new()), pg).unwrap();
        // This is needed to avoid the 0 page
        allocator.allocate(2).unwrap();
        let first = allocator.allocate(10).unwrap();
        let second = allocator.allocate(10).unwrap();
        let third = allocator.allocate(11).unwrap();
        let _forth_to_avoid_trim = allocator.allocate(11).unwrap();

        allocator.free(first).unwrap();
        allocator.free(second).unwrap();
        allocator.free(third).unwrap();

        let val = allocator.allocate(10).unwrap();
        assert_eq!(val, second);
        let val = allocator.allocate(10).unwrap();
        assert_eq!(val, first);
        let val = allocator.allocate(10).unwrap();
        assert!(val != first);
        assert!(val != second);
        let val = allocator.allocate(11).unwrap();
        assert_eq!(val, third);
        let val = allocator.allocate(11).unwrap();
        assert!(val != third);
    }

    #[test]
    fn test_remove_freed_page() {
        let file = Builder::new()
            .prefix("remove_free_test")
            .suffix(".persy")
            .tempfile()
            .unwrap()
            .reopen()
            .unwrap();
        let disc = DiscRef::new(file);
        let pg = Allocator::init(&disc).unwrap();
        let allocator = Allocator::new(disc, &Rc::new(Config::new()), pg).unwrap();
        // This is needed to avoid the 0 page
        allocator.allocate(2).unwrap();
        let first = allocator.allocate(10).unwrap();
        let second = allocator.allocate(10).unwrap();
        let third = allocator.allocate(10).unwrap();
        let _forth_to_avoid_trim = allocator.allocate(11).unwrap();

        allocator.free(first).unwrap();
        allocator.free(second).unwrap();
        allocator.free(third).unwrap();
        allocator.remove_from_free(second, 10).unwrap();
        let val = allocator.allocate(10).unwrap();
        assert_eq!(val, third);
        let val = allocator.allocate(10).unwrap();
        assert!(val != first);
        assert!(val != second);
        assert!(val != third);
    }

    #[test]
    fn test_cache_limit_evict() {
        let mut cache = Cache::new(1050 as u64);
        cache.put(10, &Page::new(Vec::new(), 0, 10, 9));
        cache.put(20, &Page::new(Vec::new(), 0, 10, 9));
        cache.put(30, &Page::new(Vec::new(), 0, 10, 9));
        assert!(cache.size < 1050);
        assert_eq!(cache.cache.len(), 2);
        let ten = 10 as u64;
        match cache.get(ten) {
            Some(_) => assert!(false),
            None => assert!(true),
        }
        let ten = 20 as u64;
        match cache.get(ten) {
            Some(_) => assert!(true),
            None => assert!(false),
        }
        let ten = 30 as u64;
        match cache.get(ten) {
            Some(_) => assert!(true),
            None => assert!(false),
        }
    }

    #[test]
    fn test_cache_limit_refresh_evict() {
        let mut cache = Cache::new(1050 as u64);
        cache.put(10, &Page::new(Vec::new(), 0, 10, 9));
        cache.put(20, &Page::new(Vec::new(), 0, 10, 9));
        let ten = 10 as u64;
        match cache.get(ten) {
            Some(_) => assert!(true),
            None => assert!(false),
        }

        cache.put(30, &Page::new(Vec::new(), 0, 10, 9));
        assert!(cache.size < 1050);
        assert_eq!(cache.cache.len(), 2);
        let ten = 10 as u64;
        match cache.get(ten) {
            Some(_) => assert!(true),
            None => assert!(false),
        }
        let ten = 20 as u64;
        match cache.get(ten) {
            Some(_) => assert!(false),
            None => assert!(true),
        }
        let ten = 30 as u64;
        match cache.get(ten) {
            Some(_) => assert!(true),
            None => assert!(false),
        }
    }
}