persy 0.9.0

Transactional Persistence Engine
Documentation
use crate::{
    config::Config,
    discref::{Device, DiscRef, Page, PageOps, ReadPage, UpdateList},
    error::PRes,
    flush_checksum::{double_buffer_check, prepare_buffer_flush},
};
use byteorder::{BigEndian, ByteOrder};
use linked_hash_map::LinkedHashMap;
use std::{
    io::{Read, Write},
    sync::Mutex,
};

const ALLOCATOR_PAGE_EXP: u8 = 10; // 2^9

pub struct Cache {
    cache: LinkedHashMap<u64, ReadPage>,
    size: u64,
    limit: u64,
}

impl Cache {
    pub fn new(limit: u64) -> Cache {
        Cache {
            cache: LinkedHashMap::new(),
            size: 0,
            limit,
        }
    }

    fn get(&mut self, key: u64) -> Option<ReadPage> {
        self.cache.get_refresh(&key).map(|val| val.clone_read())
    }

    fn put(&mut self, key: u64, value: ReadPage) {
        self.size += 1 << value.get_size_exp();
        self.cache.insert(key, value);
        while self.size > self.limit {
            if let Some(en) = self.cache.pop_front() {
                self.size -= 1 << en.1.get_size_exp();
            } else {
                break;
            }
        }
    }
    fn remove(&mut self, key: u64) {
        self.cache.remove(&key);
    }
}

struct FreeList {
    list: [u64; 32],
    last_flush: u8,
}

// TODO: Manage defragmentation by merging/splitting pages in the free list
pub struct Allocator {
    disc: Box<dyn Device>,
    freelist: Mutex<FreeList>,
    cache: Mutex<Cache>,
    page: u64,
}

impl Allocator {
    pub fn new(dr: DiscRef, config: &Config, page: u64) -> PRes<Allocator> {
        let mut buffer_0 = [0; 259];
        let mut buffer_1 = [0; 259];
        let freelist;
        let last_flush;

        {
            let mut page = dr.load_page(page)?;
            page.read_exact(&mut buffer_0)?;
            page.read_exact(&mut buffer_1)?;
            let (flush_number, first) = double_buffer_check(&buffer_0, &buffer_1);
            last_flush = flush_number;
            if first {
                freelist = Allocator::read_free_list(&buffer_0);
            } else {
                freelist = Allocator::read_free_list(&buffer_1);
            }
        }
        let cache_size = config.cache_size();
        Ok(Allocator {
            disc: Box::new(dr),
            freelist: Mutex::new(FreeList {
                list: freelist,
                last_flush,
            }),
            cache: Mutex::new(Cache::new(cache_size)),
            page,
        })
    }
    pub fn read_free_list(buffer: &[u8]) -> [u64; 32] {
        let mut freelist = [0; 32];
        for p in &mut freelist.iter_mut().enumerate() {
            let pos = 8 * p.0;
            (*p.1) = BigEndian::read_u64(&buffer[pos..pos + 8]);
        }
        freelist
    }

    pub fn init(dr: &DiscRef) -> PRes<u64> {
        let mut page = dr.create_page(ALLOCATOR_PAGE_EXP)?;
        let mut buffer = Allocator::write_free_list(&[0; 32]);
        prepare_buffer_flush(&mut buffer, 0);
        page.write_all(&buffer)?;
        dr.flush_page(&page)?;
        Ok(page.get_index())
    }

    pub fn load_page_not_free(&self, page: u64) -> PRes<Option<ReadPage>> {
        let load = self.read_page_int(page)?;
        if load.is_free()? {
            Ok(None)
        } else {
            Ok(Some(load))
        }
    }

    pub fn load_page(&self, page: u64) -> PRes<ReadPage> {
        let load = self.read_page_int(page)?;
        debug_assert!(!load.is_free()?, "page {} should not be marked as free", page);
        Ok(load)
    }

    pub fn write_page(&self, page: u64) -> PRes<Page> {
        let load = self.write_page_int(page)?;
        debug_assert!(!load.is_free()?, "page {} should not be marked as free", page);
        Ok(load)
    }

    fn read_page_int(&self, page: u64) -> PRes<ReadPage> {
        {
            let mut cache = self.cache.lock()?;
            if let Some(pg) = cache.get(page) {
                return Ok(pg);
            }
        }
        let load = self.disc.load_page(page)?;
        {
            let mut cache = self.cache.lock()?;
            cache.put(page, load.clone_read());
        }
        Ok(load)
    }

    fn write_page_int(&self, page: u64) -> PRes<Page> {
        let cache_result;
        {
            let mut cache = self.cache.lock()?;
            cache_result = cache.get(page);
        }
        if let Some(pg) = cache_result {
            return Ok(pg.clone_write());
        }
        let load = self.disc.load_page(page)?;
        {
            let mut cache = self.cache.lock()?;
            cache.put(page, load.clone_read());
        }
        Ok(load.clone_write())
    }

    pub fn allocate(&self, exp: u8) -> PRes<Page> {
        {
            let mut fl = self.freelist.lock()?;
            let page = fl.list[exp as usize];
            if page != 0 as u64 {
                let next = self.disc.mark_allocated(page)?;
                fl.list[exp as usize] = next;
                let mut pg = self.write_page_int(page)?;
                pg.reset()?;
                return Ok(pg);
            }
        }
        let page = self.disc.create_page(exp)?;
        Ok(page)
    }

    pub fn flush_page(&self, page: Page) -> PRes<()> {
        self.disc.flush_page(&page)?;
        {
            let mut cache = self.cache.lock()?;
            cache.put(page.get_index(), page.make_read());
        }
        Ok(())
    }

    pub fn remove_from_free(&self, page: u64, exp: u8) -> PRes<()> {
        //TODO: this at the moment may leak free pages on recover after crash
        let mut fl = self.freelist.lock()?;
        if fl.list[exp as usize] == page {
            fl.list[exp as usize] = 0;
        } else {
            let mut p = fl.list[exp as usize];
            while p != 0 as u64 {
                let mut pg = self.write_page_int(p)?;
                p = pg.get_next_free()?;
                if p == page {
                    pg.set_free(false)?;
                    pg.set_next_free(0)?;
                    self.flush_page(pg)?;
                    break;
                }
            }
        }
        Ok(())
    }

    pub fn free(&self, page: u64) -> PRes<()> {
        self.cache.lock()?.remove(page);
        let mut fl = self.freelist.lock()?;
        self.disc.trim_or_free_page(page, &mut fl.list)?;
        Ok(())
    }

    pub fn write_free_list(list: &[u64]) -> [u8; 259] {
        let mut buffer = [0; 259];
        for (pos, page) in list.iter().enumerate() {
            BigEndian::write_u64(&mut buffer[pos..pos + 8], *page);
        }
        buffer
    }

    pub fn flush_free_list(&self) -> PRes<()> {
        let mut lock = self.freelist.lock()?;
        let mut buffer = Allocator::write_free_list(&lock.list);
        let (last_flush, offset) = prepare_buffer_flush(&mut buffer, lock.last_flush);
        lock.last_flush = last_flush;
        let mut pag = self.disc.load_page(self.page)?.clone_write();
        pag.seek(offset)?;
        pag.write_all(&buffer)?;
        self.disc.flush_page(&pag)?;
        // I do not do the disk sync here because is every time done by the caller.
        Ok(())
    }

    pub fn disc(&self) -> &dyn Device {
        &*self.disc
    }
}

impl UpdateList for [u64; 32] {
    fn update(&mut self, size: u8, page: u64) -> PRes<u64> {
        let old = self[size as usize];
        self[size as usize] = page;
        debug_assert!(old != page, "freeing: {} already free: {} ", page, old);
        Ok(old)
    }
}

#[cfg(test)]
mod tests {
    use super::{Allocator, Cache};
    use crate::{
        config::Config,
        discref::{DiscRef, PageOps, ReadPage},
    };
    use std::rc::Rc;
    use std::sync::Arc;
    use tempfile::Builder;

    #[test]
    fn test_reuse_freed_page() {
        let file = Builder::new()
            .prefix("all_reuse_test")
            .suffix(".persy")
            .tempfile()
            .unwrap()
            .reopen()
            .unwrap();
        let disc = DiscRef::new(file);
        let pg = Allocator::init(&disc).unwrap();
        let allocator = Allocator::new(disc, &Rc::new(Config::new()), pg).unwrap();
        // This is needed to avoid the 0 page
        allocator.allocate(2).unwrap();
        let first = allocator.allocate(10).unwrap().get_index();
        let second = allocator.allocate(10).unwrap().get_index();
        let third = allocator.allocate(11).unwrap().get_index();
        let _forth_to_avoid_trim = allocator.allocate(11).unwrap();

        allocator.free(first).unwrap();
        allocator.free(second).unwrap();
        allocator.free(third).unwrap();

        let val = allocator.allocate(10).unwrap().get_index();
        assert_eq!(val, second);
        let val = allocator.allocate(10).unwrap().get_index();
        assert_eq!(val, first);
        let val = allocator.allocate(10).unwrap().get_index();
        assert!(val != first);
        assert!(val != second);
        let val = allocator.allocate(11).unwrap().get_index();
        assert_eq!(val, third);
        let val = allocator.allocate(11).unwrap().get_index();
        assert!(val != third);
    }

    #[test]
    fn test_remove_freed_page() {
        let file = Builder::new()
            .prefix("remove_free_test")
            .suffix(".persy")
            .tempfile()
            .unwrap()
            .reopen()
            .unwrap();
        let disc = DiscRef::new(file);
        let pg = Allocator::init(&disc).unwrap();
        let allocator = Allocator::new(disc, &Rc::new(Config::new()), pg).unwrap();
        // This is needed to avoid the 0 page
        allocator.allocate(2).unwrap();
        let first = allocator.allocate(10).unwrap().get_index();
        let second = allocator.allocate(10).unwrap().get_index();
        let third = allocator.allocate(10).unwrap().get_index();
        let _forth_to_avoid_trim = allocator.allocate(11).unwrap();
        println!("{} {} {}", first, second, third);
        allocator.free(first).unwrap();
        allocator.free(second).unwrap();
        allocator.free(third).unwrap();
        allocator.remove_from_free(second, 10).unwrap();
        let val = allocator.allocate(10).unwrap().get_index();
        assert_eq!(val, third);
        let val = allocator.allocate(10).unwrap().get_index();
        assert!(val != first);
        assert!(val != second);
        assert!(val != third);
    }

    #[test]
    fn test_cache_limit_evict() {
        let mut cache = Cache::new(1050 as u64);
        cache.put(10, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
        cache.put(20, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
        cache.put(30, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
        assert!(cache.size < 1050);
        assert_eq!(cache.cache.len(), 2);
        let ten = 10 as u64;
        match cache.get(ten) {
            Some(_) => assert!(false),
            None => assert!(true),
        }
        let ten = 20 as u64;
        match cache.get(ten) {
            Some(_) => assert!(true),
            None => assert!(false),
        }
        let ten = 30 as u64;
        match cache.get(ten) {
            Some(_) => assert!(true),
            None => assert!(false),
        }
    }

    #[test]
    fn test_cache_limit_refresh_evict() {
        let mut cache = Cache::new(1050 as u64);
        cache.put(10, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
        cache.put(20, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
        let ten = 10 as u64;
        match cache.get(ten) {
            Some(_) => assert!(true),
            None => assert!(false),
        }

        cache.put(30, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
        assert!(cache.size < 1050);
        assert_eq!(cache.cache.len(), 2);
        let ten = 10 as u64;
        match cache.get(ten) {
            Some(_) => assert!(true),
            None => assert!(false),
        }
        let ten = 20 as u64;
        match cache.get(ten) {
            Some(_) => assert!(false),
            None => assert!(true),
        }
        let ten = 30 as u64;
        match cache.get(ten) {
            Some(_) => assert!(true),
            None => assert!(false),
        }
    }
}