persy 0.2.0

Transactional Persistence Engine
Documentation
extern crate rand;

use persy::{PRes, RecRef, PersyError};
use allocator::Allocator;
use byteorder::{ReadBytesExt, WriteBytesExt, BigEndian};
use std::sync::Arc;
use std::collections::HashMap;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hasher, Hash};
use address::{Address, ADDRESS_PAGE_EXP, ADDRESS_ENTRY_SIZE, FLAG_EXISTS, ADDRESS_PAGE_SIZE, SEGMENT_DATA_OFFSET, SEGMENT_HASH_OFFSET};
use std::io::{Write, Read};
use discref::PageSeek;
use std::vec;
use std::str;

const SEGMENT_CONTENT_OFFSET: u32 = 8;
const SEGMENT_NEXT_PAGE_OFFSET: u32 = 0;

pub struct Segment {
    pub first_page: u64,
    pub persistent_page: u64,
    pub persistent_pos: u32,
    pub alloc_page: u64,
    pub alloc_pos: u32,
    pub segment_id: u32,
    pub name: String,
}

impl Segment {
    pub fn new(first_page: u64, persistent_page: u64, persistent_pos: u32, alloc_page: u64, alloc_pos: u32, segment_id: u32, name: &String) -> Segment {
        Segment {
            first_page: first_page,
            persistent_page: persistent_page,
            persistent_pos: persistent_pos,
            alloc_page: alloc_page,
            alloc_pos: alloc_pos,
            segment_id: segment_id,
            name: name.clone(),
        }
    }

    pub fn allocate_internal(&mut self, allocator: &Arc<Allocator>) -> PRes<RecRef> {
        let page = self.alloc_page;
        let pos = self.alloc_pos;
        let new_pos = pos + ADDRESS_ENTRY_SIZE;
        if new_pos > ADDRESS_PAGE_SIZE {
            let new_page = allocator.allocate(ADDRESS_PAGE_EXP)?;
            let mut pg = allocator.write_page(page)?;
            pg.write_u64::<BigEndian>(new_page)?;
            allocator.flush_page(&mut pg)?;
            let mut new_pg = allocator.write_page(new_page)?;
            new_pg.write_u64::<BigEndian>(0)?;
            new_pg.write_u64::<BigEndian>(page)?;
            new_pg.write_u32::<BigEndian>(self.segment_id)?;
            allocator.flush_page(&mut new_pg)?;
            self.alloc_page = new_page;
            self.alloc_pos = SEGMENT_DATA_OFFSET + ADDRESS_ENTRY_SIZE;
            return Ok(RecRef {
                page: self.alloc_page,
                pos: SEGMENT_DATA_OFFSET,
            });
        }

        self.alloc_pos = new_pos;
        Ok(RecRef {
            page: page,
            pos: pos,
        })
    }

    pub fn free_segment_pages(&self, allocator: &Arc<Allocator>) -> PRes<()> {
        let page = self.first_page;
        let last = self.persistent_page;
        loop {
            let mut pag = allocator.write_page(page)?;
            let next = pag.read_u64::<BigEndian>()?;
            // SET TO 0 THE SEGMENT HASH TO AVOID MISS READ
            pag.seek(SEGMENT_HASH_OFFSET)?;
            pag.write_u32::<BigEndian>(0)?;
            let mut pos = SEGMENT_DATA_OFFSET;
            loop {
                pag.seek(pos)?;
                let data_page = pag.read_u64::<BigEndian>()?;
                let flag = pag.read_u8()?;
                if flag & FLAG_EXISTS == 1 {
                    allocator.free(data_page)?;
                }
                pos += ADDRESS_ENTRY_SIZE;
                if pos > ADDRESS_PAGE_SIZE - ADDRESS_ENTRY_SIZE {
                    break;
                }
            }
            allocator.flush_page(&mut pag)?;
            allocator.free(page)?;
            if next == last {
                allocator.free(page)?;
                break;
            }
        }
        Ok(())
    }
}

pub trait SegmentPageRead {
    fn segment_read_entry(&mut self, segment_id: u32, pos: u32) -> PRes<Option<(u64, u16)>>;
    fn segment_scan_entries(&mut self) -> PRes<(u64, Vec<u32>)>;
}
pub trait SegmentPage: SegmentPageRead {
    fn segment_insert_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()>;
    fn segment_update_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()>;
    fn segment_delete_entry(&mut self, segment_id: u32, pos: u32) -> PRes<()>;
}

impl<T: ReadBytesExt + PageSeek> SegmentPageRead for T {
    fn segment_read_entry(&mut self, segment_id: u32, pos: u32) -> PRes<Option<(u64, u16)>> {
        self.seek(SEGMENT_HASH_OFFSET)?;
        let persistent_id = self.read_u32::<BigEndian>()?;
        if persistent_id != segment_id {
            return Ok(None);
        }
        self.seek(pos)?;
        let record = self.read_u64::<BigEndian>()?;
        let flag = self.read_u8()?;
        let version = self.read_u16::<BigEndian>()?;
        if flag & FLAG_EXISTS == 0 {
            Ok(None)
        } else {
            Ok(Some((record, version)))
        }
    }

    fn segment_scan_entries(&mut self) -> PRes<(u64, Vec<u32>)> {
        let next_page = self.read_u64::<BigEndian>()?;
        let mut pos = SEGMENT_DATA_OFFSET;
        let mut recs = Vec::new();
        loop {
            self.seek(pos + 8)?;
            let flag = self.read_u8()?;
            if flag & FLAG_EXISTS == 1 {
                recs.push(pos);
            }
            pos += ADDRESS_ENTRY_SIZE;
            if pos > ADDRESS_PAGE_SIZE - ADDRESS_ENTRY_SIZE {
                break;
            }
        }
        Ok((next_page, recs))
    }
}

fn inc_version(mut version: u16) -> u16 {
    version += 1;
    if version == 0 { 1 } else { version }
}

impl<T: WriteBytesExt + ReadBytesExt + PageSeek> SegmentPage for T {
    fn segment_insert_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()> {
        self.seek(SEGMENT_HASH_OFFSET)?;
        let persistent_id = self.read_u32::<BigEndian>()?;
        if persistent_id != segment_id {
            return Err(PersyError::SegmentNotFound);
        }
        self.seek(pos)?;
        self.write_u64::<BigEndian>(record_page)?;
        self.write_u8(FLAG_EXISTS)?;
        self.write_u16::<BigEndian>(1)?;
        Ok(())
    }
    fn segment_update_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()> {
        self.seek(SEGMENT_HASH_OFFSET)?;
        let persistent_id = self.read_u32::<BigEndian>()?;
        if persistent_id != segment_id {
            return Err(PersyError::RecordNotFound);
        }
        self.seek(pos + 9)?;
        let version = self.read_u16::<BigEndian>()?;
        self.seek(pos)?;
        self.write_u64::<BigEndian>(record_page)?;
        self.seek(pos + 9)?;
        self.write_u16::<BigEndian>(inc_version(version))?;
        Ok(())
    }
    fn segment_delete_entry(&mut self, segment_id: u32, pos: u32) -> PRes<()> {
        self.seek(SEGMENT_HASH_OFFSET)?;
        let persistent_id = self.read_u32::<BigEndian>()?;
        if persistent_id != segment_id {
            return Err(PersyError::RecordNotFound);
        }
        self.seek(pos + 8)?;
        let flag = self.read_u8()?;
        self.seek(pos + 8)?;
        self.write_u8(flag ^ FLAG_EXISTS)?;
        Ok(())
    }
}

pub struct Segments {
    pub root_page: u64,
    pub segments: HashMap<u32, Segment>,
    pub segments_id: HashMap<String, u32>,
    pub temp_segments: HashMap<u32, Segment>,
    pub temp_segments_id: HashMap<String, u32>,
}

pub fn segment_hash(segment: &String) -> u32 {
    let mut val: u32;
    let ref mut hasher = DefaultHasher::new();
    segment.hash(hasher);
    val = hasher.finish() as u32;
    val = val << 16;
    val |= rand::random::<u16>() as u32;
    val
}

impl Segments {
    pub fn new(root_page: u64, allocator: &Arc<Allocator>) -> PRes<Segments> {
        let mut root = allocator.load_page(root_page)?;
        root.seek(SEGMENT_CONTENT_OFFSET)?;
        let mut segments = HashMap::new();
        let mut segments_id = HashMap::new();
        loop {
            let flag = root.read_u8()?;
            if flag == 1 {
                let first_page = root.read_u64::<BigEndian>()?;
                let persistent_page = root.read_u64::<BigEndian>()?;
                let persistent_pos = root.read_u32::<BigEndian>()?;
                let pers_hash = root.read_u32::<BigEndian>()?;
                let name_size = root.read_u16::<BigEndian>()? as usize;

                let mut slice: Vec<u8> = vec![0;name_size ];
                root.read_exact(&mut slice)?;
                let name: String = str::from_utf8(&slice[0..name_size])?.into();
                segments.insert(pers_hash,
                                Segment::new(first_page,
                                             persistent_page,
                                             persistent_pos,
                                             persistent_page,
                                             persistent_pos,
                                             pers_hash,
                                             &name));
                segments_id.insert(name, pers_hash);
            } else {
                root.seek(SEGMENT_NEXT_PAGE_OFFSET)?;
                let next_page = root.read_u64::<BigEndian>()?;
                if next_page != 0 {
                    root = allocator.load_page(next_page)?;
                    root.seek(SEGMENT_NEXT_PAGE_OFFSET)?;
                } else {
                    break;
                }
            }
        }
        Ok(Segments {
            root_page: root_page,
            segments: segments,
            segments_id: segments_id,
            temp_segments: HashMap::new(),
            temp_segments_id: HashMap::new(),
        })
    }

    pub fn init(root_page: u64, allocator: &Allocator) -> PRes<()> {
        let mut root = allocator.write_page(root_page)?;
        root.write_u64::<BigEndian>(root_page)?;
        root.write_u8(0)?;
        Ok(())
    }

    pub fn segment_id(&self, segment: &String) -> Option<u32> {
        if let Some(id) = self.segments_id.get(segment) {
            self.segments.get(id).map(|x| x.segment_id)
        } else {
            None
        }
    }

    pub fn segment_by_id<'a>(&'a self, id: u32) -> Option<&'a Segment> {
        self.segments.get(&id)
    }

    pub fn segment_by_id_temp<'a>(&'a self, id: u32) -> Option<&'a Segment> {
        self.temp_segments.get(&id)
    }

    pub fn has_segment(&self, segment: &String) -> bool {
        self.segments_id.contains_key(segment)
    }

    pub fn create_temp_segment(&mut self, allocator: &Arc<Allocator>, segment: &String) -> PRes<u32> {
        let allocated = allocator.allocate(ADDRESS_PAGE_EXP)?;
        let segment_id = segment_hash(&segment);
        let seg = Segment::new(allocated,
                               allocated,
                               SEGMENT_DATA_OFFSET,
                               allocated,
                               SEGMENT_DATA_OFFSET,
                               segment_id,
                               segment);
        self.temp_segments.insert(segment_id, seg);
        self.temp_segments_id.insert(segment.clone(), segment_id);
        let mut pag = allocator.write_page(allocated)?;
        pag.write_u64::<BigEndian>(0)?;
        pag.write_u64::<BigEndian>(0)?;
        pag.write_u32::<BigEndian>(segment_id)?;
        allocator.flush_page(&mut pag)?;
        Ok(segment_id)
    }

    pub fn get_temp_segment_mut(&mut self, segment: u32) -> Option<&mut Segment> {
        self.temp_segments.get_mut(&segment)
    }

    pub fn drop_temp_segment(&mut self, allocator: &Arc<Allocator>, segment: u32) -> PRes<()> {
        let rem_seg = self.temp_segments.remove(&segment);
        if let Some(segment) = rem_seg {
            self.temp_segments_id.remove(&segment.name);
            segment.free_segment_pages(allocator)?;
        }

        Ok(())
    }

    pub fn exists_real_or_temp(&self, segment: &u32) -> bool {
        self.segments.contains_key(segment) || self.temp_segments.contains_key(segment)
    }

    pub fn create_segment(&mut self, segment: u32) -> PRes<()> {
        let seg = self.temp_segments.remove(&segment);
        if let Some(s) = seg {
            self.temp_segments_id.remove(&s.name);
            self.segments_id.insert(s.name.clone(), s.segment_id);
            self.segments.insert(s.segment_id, s);
        }
        Ok(())
    }

    pub fn drop_segment(&mut self, allocator: &Arc<Allocator>, segment: &String) -> PRes<()> {
        // if a crash happen while this is running and one released page has been reused this can
        // bring to a disc leak, to review
        if let Some(seg) = self.segments_id.remove(segment) {
            let rem_seg = self.segments.remove(&seg);
            if let Some(segment) = rem_seg {
                segment.free_segment_pages(allocator)?;
            }
        }

        Ok(())
    }

    pub fn flush_segments(&self, allocator: &Arc<Allocator>) -> PRes<()> {
        let mut root = allocator.write_page(self.root_page)?;
        root.seek(SEGMENT_CONTENT_OFFSET)?;
        let mut cursor = 0;
        for (_, segment) in &self.segments {
            let mut buffer = Vec::<u8>::new();
            buffer.write_u64::<BigEndian>(segment.first_page)?;
            buffer.write_u64::<BigEndian>(segment.persistent_page)?;
            buffer.write_u32::<BigEndian>(segment.persistent_pos)?;
            buffer.write_u32::<BigEndian>(segment.segment_id)?;
            buffer.write_u16::<BigEndian>(segment.name.len() as u16)?;
            buffer.write_all(segment.name.as_bytes())?;

            if cursor < ADDRESS_PAGE_SIZE {
                root.write_u8(1)?;
                root.write_all(&*buffer)?;
                cursor += buffer.len() as u32;
            } else {
                let next = allocator.allocate(ADDRESS_PAGE_EXP)?;
                root.write_u8(0)?;
                root.seek(SEGMENT_NEXT_PAGE_OFFSET)?;
                root.write_u64::<BigEndian>(next)?;
                allocator.flush_page(&mut root)?;
                root = allocator.write_page(next)?;
                root.seek(SEGMENT_CONTENT_OFFSET)?;
            }
        }
        root.write_u8(0)?;
        root.seek(SEGMENT_NEXT_PAGE_OFFSET)?;
        root.write_u64::<BigEndian>(0)?;
        allocator.flush_page(&mut root)?;
        Ok(())
    }
}

pub struct SegmentScanner<'a> {
    address: &'a Address,
    page: u64,
}

impl<'a> SegmentScanner<'a> {
    pub fn new<'b>(address: &'b Address, page: u64) -> SegmentScanner<'b> {
        SegmentScanner::<'b> {
            address: address,
            page: page,
        }
    }
}

pub struct SegmentIterator<'a> {
    address: &'a Address,
    cur_page: u64,
    next_page: u64,
    per_page_iterator: vec::IntoIter<u32>,
}

impl<'a> IntoIterator for SegmentScanner<'a> {
    type Item = RecRef;
    type IntoIter = SegmentIterator<'a>;

    fn into_iter(self) -> Self::IntoIter {
        let vc = Vec::new();
        let vci = vc.into_iter();
        SegmentIterator::<'a> {
            address: self.address,
            cur_page: self.page,
            next_page: self.page,
            per_page_iterator: vci,
        }
    }
}

impl<'a> Iterator for SegmentIterator<'a> {
    type Item = RecRef;
    fn next(&mut self) -> Option<RecRef> {
        let mut iter;
        // This loop is needed because some pages may be empty
        loop {
            iter = self.per_page_iterator.next();
            if iter.is_none() && self.next_page != 0 {
                self.cur_page = self.next_page;
                let res = self.address.scan_page(self.cur_page);
                if !res.is_err() {
                    let tp = res.unwrap();
                    self.next_page = tp.0;
                    self.per_page_iterator = tp.1.into_iter();

                } else {
                    break;
                }
            } else {
                break;
            }
        }
        if let Some(pos) = iter {
            return Some(RecRef::new(self.cur_page, pos));
        } else {
            return None;
        }
    }
}

#[cfg(test)]
mod tests {
    use std::fs;
    use std::fs::OpenOptions;
    use std::sync::Arc;
    use std::io;
    use discref::{Page, DiscRef};
    use allocator::Allocator;
    use config::Config;
    use address::ADDRESS_PAGE_EXP;
    use super::{Segments, SegmentPage, SegmentPageRead, segment_hash};

    fn create_allocator(file_name: &str) -> Allocator {
        let file = OpenOptions::new().read(true).write(true).create(true).open(file_name).unwrap();
        let config = Arc::new(Config::new());
        let disc = DiscRef::new(file);
        let pa = Allocator::init(&disc).unwrap();
        let allocator = Allocator::new(disc, &config, pa).unwrap();
        allocator.allocate(5).unwrap(); //Just to be sure it not start from 0, it cannot happen in not test cases.

        allocator
    }


    #[test()]
    fn test_create_drop_segment() {
        let allocator = create_allocator("./raw_segment_create_delete.persy");
        let all = Arc::new(allocator);
        let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
        Segments::init(root, &all).unwrap();
        let mut segments = Segments::new(root, &all).unwrap();

        let id = segments.create_temp_segment(&all, &"some".into()).unwrap();
        segments.create_segment(id).unwrap();
        segments.flush_segments(&all).unwrap();
        assert!(segments.segments_id.contains_key("some"));
        assert!(segments.segments.contains_key(&id));
        segments.drop_segment(&all, &"some".into()).unwrap();
        segments.flush_segments(&all).unwrap();
        assert!(!segments.segments_id.contains_key("some"));
        assert!(!segments.segments.contains_key(&id));

        fs::remove_file("./raw_segment_create_delete.persy").unwrap();
    }

    #[test()]
    fn test_create_drop_temp_segment() {
        let allocator = create_allocator("./segment_create_delete.persy");
        let all = Arc::new(allocator);
        let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
        Segments::init(root, &all).unwrap();
        let mut segments = Segments::new(root, &all).unwrap();

        let id = segments.create_temp_segment(&all, &"some".into()).unwrap();
        assert!(segments.temp_segments.contains_key(&id));
        assert!(segments.temp_segments_id.contains_key("some"));
        segments.drop_temp_segment(&all, id).unwrap();
        assert!(!segments.temp_segments.contains_key(&id));
        assert!(!segments.temp_segments_id.contains_key("some"));

        fs::remove_file("./segment_create_delete.persy").unwrap();
    }

    #[test()]
    fn test_create_close_drop_close_segment() {

        let allocator = create_allocator("./segment_pers_create_delete.persy");
        let all = Arc::new(allocator);
        let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
        Segments::init(root, &all).unwrap();
        let id;
        {
            let mut segments = Segments::new(root, &all).unwrap();

            id = segments.create_temp_segment(&all, &"some".into()).unwrap();
            segments.create_segment(id).unwrap();
            segments.flush_segments(&all).unwrap();
        }
        {
            let mut segments = Segments::new(root, &all).unwrap();
            assert_eq!(segments.segments.len(), 1);
            assert!(segments.segments_id.contains_key("some"));
            assert!(segments.segments.contains_key(&id));
            segments.drop_segment(&all, &"some".into()).unwrap();
            segments.flush_segments(&all).unwrap();
        }
        {
            let segments = Segments::new(root, &all).unwrap();
            assert!(!segments.segments_id.contains_key("some"));
            assert!(!segments.segments.contains_key(&id));
        }
        fs::remove_file("./segment_pers_create_delete.persy").unwrap();

    }


    #[test]
    fn test_seg_insert_read_pointer() {
        let mut page = Page::new(io::Cursor::new(vec![0;1024]), 0, 10);
        page.segment_insert_entry(0, 30, 10).unwrap();
        let read = page.segment_read_entry(0, 30).unwrap();
        match read {
            Some(val) => assert_eq!(val.0, 10),
            None => assert!(false),
        }
    }

    #[test]
    fn test_seg_insert_update_read_pointer() {
        let mut page = Page::new(io::Cursor::new(vec![0;1024]), 0, 10);
        page.segment_insert_entry(0, 30, 10).unwrap();
        page.segment_update_entry(0, 30, 15).unwrap();
        let read = page.segment_read_entry(0, 30).unwrap();
        match read {
            Some(val) => assert_eq!(val.0, 15),
            None => assert!(false),
        }
    }

    #[test]
    fn test_seg_insert_delete_read_pointer() {
        let mut page = Page::new(io::Cursor::new(vec![0;1024]), 0, 10);
        page.segment_insert_entry(0, 30, 10).unwrap();
        page.segment_delete_entry(0, 30).unwrap();
        let read = page.segment_read_entry(0, 30).unwrap();
        match read {
            Some(_) => assert!(false),
            None => assert!(true),
        }
    }


    #[test]
    fn test_hash_id_generator() {
        assert!(0 != segment_hash(&"some".into()));
    }
}