persy 0.2.0

Transactional Persistence Engine
Documentation

use persy::{PRes, RecRef, PersyError};
use allocator::Allocator;
use std::sync::{Condvar, Mutex, RwLock, Arc};
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::hash_map::Entry;
use transaction::{InsertRecord, UpdateRecord, DeleteRecord, SegmentOperation};
use config::Config;
use segment::{Segments, SegmentScanner, SegmentPage, SegmentPageRead};

pub const ADDRESS_PAGE_EXP: u8 = 10; // 2^10
pub const ADDRESS_PAGE_SIZE: u32 = (1 << ADDRESS_PAGE_EXP) - 2; // 2^10 -2 size - pageheader
pub const FLAG_EXISTS: u8 = 1;
pub const SEGMENT_HASH_OFFSET: u32 = 16;
pub const SEGMENT_DATA_OFFSET: u32 = 20;
pub const ADDRESS_ENTRY_SIZE: u32 = 8 + 1 + 2; //POINTER TO DATA PAGE + FLAGS + VESION MANAGMENT(not yet used)


/// Address segment keep the basic addressing of the data in the data segment for a specific
/// data block
pub struct Address {
    config: Arc<Config>,
    allocator: Arc<Allocator>,
    record_locks: Mutex<HashMap<RecRef, Arc<Condvar>>>,
    segment_locks: Mutex<HashMap<u32, SegmentLock>>,
    segments: RwLock<Segments>,
}


struct SegmentLock {
    write: bool,
    read_count: u32,
    cond: Arc<Condvar>,
}

impl SegmentLock {
    fn new_write() -> SegmentLock {
        SegmentLock {
            write: true,
            read_count: 0,
            cond: Arc::new(Condvar::new()),
        }
    }

    fn new_read() -> SegmentLock {
        SegmentLock {
            write: false,
            read_count: 1,
            cond: Arc::new(Condvar::new()),
        }
    }

    fn inc_read(&mut self) {
        self.read_count += 1;
    }
    fn dec_read(&mut self) -> bool {
        self.read_count -= 1;
        self.read_count == 0
    }
}


impl Address {
    pub fn new(all: &Arc<Allocator>, config: &Arc<Config>, page: u64) -> PRes<Address> {
        let segments = Segments::new(page, all)?;
        Ok(Address {
            config: config.clone(),
            allocator: all.clone(),
            record_locks: Mutex::new(HashMap::new()),
            segment_locks: Mutex::new(HashMap::new()),
            segments: RwLock::new(segments),
        })
    }

    pub fn init(all: &Allocator) -> PRes<u64> {
        let page = all.allocate(ADDRESS_PAGE_EXP)?;
        Segments::init(page, all)?;
        Ok(page)
    }

    pub fn scan<'a>(&'a self, segment: u32) -> PRes<SegmentScanner<'a>> {
        let segments = self.segments.read()?;
        if let Some(segment) = segments.segment_by_id(segment) {
            Ok(SegmentScanner::<'a>::new(self, segment.first_page))
        } else {
            if let Some(temp_segment) = segments.segment_by_id_temp(segment) {
                Ok(SegmentScanner::<'a>::new(self, temp_segment.first_page))
            } else {
                Err(PersyError::SegmentNotFound)
            }
        }
    }

    pub fn scan_page(&self, cur_page: u64) -> PRes<(u64, Vec<u32>)> {
        // THIS IS ONLY FOR LOCK PROTECTION
        let _ = self.segments.read()?;
        let mut page = self.allocator.load_page(cur_page)?;
        page.segment_scan_entries()
    }

    pub fn allocate_temp(&self, segment: u32) -> PRes<RecRef> {
        let mut segments = self.segments.write()?;
        let found = segments.get_temp_segment_mut(segment);
        if found.is_none() {
            return Err(PersyError::SegmentNotFound);
        }
        found.unwrap().allocate_internal(&self.allocator)
    }

    pub fn create_temp_segment(&self, segment: &String) -> PRes<u32> {
        self.segments.write()?.create_temp_segment(&self.allocator, segment)
    }

    pub fn drop_temp_segment(&self, segment: u32) -> PRes<()> {
        self.segments.write()?.drop_temp_segment(&self.allocator, segment)
    }

    pub fn allocate(&self, segment: u32) -> PRes<RecRef> {
        let mut segments = self.segments.write()?;
        let found = segments.segments.get_mut(&segment);
        if found.is_none() {
            return Err(PersyError::SegmentNotFound);
        }
        found.unwrap().allocate_internal(&self.allocator)
    }

    pub fn acquire_locks(&self, records: &Vec<(u32, RecRef, u16)>, created_updated: &Vec<u32>, deleted: &Vec<u32>, check_version: bool) -> PRes<()> {

        // The loop inside the for may not be needed becasue of notify_one

        for segment in deleted {
            let seg_lock = SegmentLock::new_write();
            loop {
                let mut lock_manager = self.segment_locks.lock()?;
                let cond = match lock_manager.entry(segment.clone()) {
                    Entry::Occupied(o) => o.get().cond.clone(),
                    Entry::Vacant(v) => {
                        v.insert(seg_lock);
                        break;
                    }
                };
                cond.wait_timeout(lock_manager, self.config.transaction_lock_timeout().clone())?;
            }
        }

        for segment in created_updated {
            loop {
                let mut lock_manager = self.segment_locks.lock()?;
                let cond;
                match lock_manager.entry(segment.clone()) {
                    Entry::Occupied(mut o) => {
                        if o.get().write {
                            cond = o.get().cond.clone();
                        } else {
                            o.get_mut().inc_read();
                            break;
                        }
                    }
                    Entry::Vacant(v) => {
                        v.insert(SegmentLock::new_read());
                        break;
                    }
                };
                cond.wait_timeout(lock_manager, self.config.transaction_lock_timeout().clone())?;
            }
        }
        for rec in records {
            let cond = Arc::new(Condvar::new());
            loop {
                let mut lock_manager = self.record_locks.lock()?;
                let cond = match lock_manager.entry(rec.1.clone()) {
                    Entry::Occupied(o) => o.get().clone(),
                    Entry::Vacant(v) => {
                        v.insert(cond);
                        break;
                    }
                };
                cond.wait_timeout(lock_manager, self.config.transaction_lock_timeout().clone())?;
            }
        }
        {
            let segs = self.segments.read()?;
            for segment in created_updated {
                if !segs.exists_real_or_temp(segment) {
                    return Err(PersyError::SegmentNotFound);
                }
            }
        }
        for &(segment, ref recref, version) in records {
            let val = self.read(recref, segment)?;
            if let Some((_, pers_version)) = val {
                if check_version && pers_version != version {
                    return Err(PersyError::VersionNotLastest);
                }
            } else {
                return Err(PersyError::RecordNotFound);
            }
        }
        Ok(())
    }

    pub fn release_locks(&self, records: Vec<(u32, RecRef, u16)>, created_updated: &Vec<u32>, deleted: &Vec<u32>) -> PRes<()> {

        for rec in records {
            let mut lock_manager = self.record_locks.lock()?;
            if let Some(cond) = lock_manager.remove(&rec.1) {
                cond.notify_one();
            }
        }

        for segment in created_updated {
            let mut lock_manager = self.segment_locks.lock()?;
            if let Entry::Occupied(mut lock) = lock_manager.entry(segment.clone()) {
                if lock.get_mut().dec_read() {
                    let cond = lock.get().cond.clone();
                    lock.remove();
                    cond.notify_one();
                }
            }
        }

        for segment in deleted {
            let mut lock_manager = self.segment_locks.lock()?;
            if let Some(lock) = lock_manager.remove(segment) {
                lock.cond.notify_one();
            }
        }

        Ok(())
    }

    pub fn rollback(&self, _: &RecRef) -> PRes<()> {
        // TODO:make address re-aviable
        Ok(())
    }

    pub fn apply(&self, inserts: &Vec<InsertRecord>, updates: &Vec<UpdateRecord>, deletes: &Vec<DeleteRecord>, seg_ops: &Vec<SegmentOperation>) -> PRes<()> {
        let mut segments = self.segments.write()?;
        let mut dropped = HashSet::new();
        for seg_op in seg_ops {
            match *seg_op {
                SegmentOperation::CREATE(_) => {}
                SegmentOperation::DROP(ref op) => {
                    dropped.insert(op.segment_id);
                }
            }
        }
        let mut segs = Vec::new();
        let mut pages = HashMap::new();
        for insert in inserts {
            if !dropped.contains(&insert.segment) {
                segs.push(&insert.segment);
                match pages.entry(&insert.recref.page) {
                    Entry::Vacant(o) => {
                        let mut page = self.allocator.write_page(insert.recref.page)?;
                        page.segment_insert_entry(insert.segment, insert.recref.pos, insert.record_page)?;
                        o.insert(page);
                    }
                    Entry::Occupied(mut o) => {
                        o.get_mut().segment_insert_entry(insert.segment, insert.recref.pos, insert.record_page)?;
                    }
                }
            }

        }

        for update in updates {
            if !dropped.contains(&update.segment) {
                segs.push(&update.segment);
                match pages.entry(&update.recref.page) {
                    Entry::Vacant(o) => {
                        let mut page = self.allocator.write_page(update.recref.page)?;
                        page.segment_update_entry(update.segment, update.recref.pos, update.record_page)?;
                        o.insert(page);
                    }
                    Entry::Occupied(mut o) => {
                        o.get_mut().segment_update_entry(update.segment, update.recref.pos, update.record_page)?;
                    }
                }
            }
        }

        for delete in deletes {
            if !dropped.contains(&delete.segment) {
                segs.push(&delete.segment);
                match pages.entry(&delete.recref.page) {
                    Entry::Vacant(o) => {
                        let mut page = self.allocator.write_page(delete.recref.page)?;
                        page.segment_delete_entry(delete.segment, delete.recref.pos)?;
                        o.insert(page);
                    }
                    Entry::Occupied(mut o) => {
                        o.get_mut().segment_delete_entry(delete.segment, delete.recref.pos)?;
                    }
                }
            }
        }

        for (_, v) in &mut pages {
            self.allocator.flush_page(v)?;
        }

        for seg_op in seg_ops {
            match *seg_op {
                SegmentOperation::CREATE(_) => {}
                SegmentOperation::DROP(ref op) => {
                    segments.drop_segment(&self.allocator, &op.name)?;
                }
            }
        }

        for seg_op in seg_ops {
            match *seg_op {
                SegmentOperation::CREATE(ref op) => {
                    segments.create_segment(op.segment_id)?;
                }
                SegmentOperation::DROP(_) => {}
            }
        }


        for seg in segs {
            // If happen that a segment is not found something went wrong and is better crash
            let mut segment = segments.segments.get_mut(seg).unwrap();
            segment.persistent_page = segment.alloc_page;
            segment.persistent_pos = segment.alloc_pos;
        }

        segments.flush_segments(&self.allocator)?;

        Ok(())
    }

    pub fn exists_segment(&self, segment: &String) -> PRes<bool> {
        Ok(self.segments.read()?.has_segment(segment))
    }

    pub fn segment_id(&self, segment: &String) -> PRes<Option<u32>> {
        Ok(self.segments.read()?.segment_id(segment))
    }

    pub fn insert(&self, segment_id: u32, recref: &RecRef, record_page: u64) -> PRes<()> {
        let mut page = self.allocator.write_page(recref.page)?;
        page.segment_insert_entry(segment_id, recref.pos, record_page)?;
        self.allocator.flush_page(&mut page)?;
        Ok(())
    }

    pub fn read(&self, recref: &RecRef, segment: u32) -> PRes<Option<(u64, u16)>> {
        let mut page = self.allocator.load_page(recref.page)?;
        page.segment_read_entry(segment, recref.pos)
    }
}

#[cfg(test)]
mod tests {
    use std::fs::OpenOptions;
    use std::fs;
    use discref::DiscRef;
    use allocator::Allocator;
    use address::Address;
    use std::sync::Arc;
    use config::Config;
    use transaction::{InsertRecord, UpdateRecord, DeleteRecord, CreateSegment, SegmentOperation};

    fn init_test_address(file_name: &str) -> (Address, u32) {
        let file = OpenOptions::new().read(true).write(true).create(true).open(file_name).unwrap();
        let config = Arc::new(Config::new());
        let disc = DiscRef::new(file);
        let pa = Allocator::init(&disc).unwrap();
        let allocator = Allocator::new(disc, &config, pa).unwrap();
        let page = Address::init(&allocator).unwrap();
        let addr = Address::new(&Arc::new(allocator), &config, page).unwrap();
        let id = addr.create_temp_segment(&"def".into()).unwrap();
        addr.segments.write().unwrap().create_segment(id).unwrap();
        (addr, id)

    }

    #[test]
    fn test_init_and_new_address() {
        let (add, segment_id) = init_test_address("./addr_test");
        fs::remove_file("./addr_test").unwrap();
        assert_eq!(add.segments.read().unwrap().segment_by_id(segment_id).unwrap().alloc_page,
                   1536);
        assert_eq!(add.segments.read().unwrap().segment_by_id(segment_id).unwrap().alloc_pos,
                   20);
    }

    #[test]
    fn test_insert_update_delete_read_apply_pointer() {
        let (add, segment_id) = init_test_address("./addr_insert_update_delete_apply_test.persy");
        let recref = add.allocate(segment_id).unwrap();
        add.insert(segment_id, &recref, 10).unwrap();
        let recref_1 = add.allocate(segment_id).unwrap();
        add.insert(segment_id, &recref_1, 20).unwrap();


        let mut inserted = Vec::new();
        let recref_2 = add.allocate(segment_id).unwrap();
        inserted.push(InsertRecord::new(segment_id, &recref_2, 30));

        let mut updated = Vec::new();
        updated.push(UpdateRecord::new(segment_id, &recref_1, 40, 20, 1));

        let mut deleted = Vec::new();

        deleted.push(DeleteRecord::new(segment_id, &recref, 10, 1));
        let mut seg_ops = Vec::new();
        seg_ops.push(SegmentOperation::CREATE(CreateSegment::new("def", 20)));

        add.apply(&inserted, &updated, &deleted, &seg_ops).unwrap();

        let read = add.read(&recref, segment_id).unwrap();
        let read_1 = add.read(&recref_1, segment_id).unwrap();
        let read_2 = add.read(&recref_2, segment_id).unwrap();
        fs::remove_file("./addr_insert_update_delete_apply_test.persy").unwrap();
        match read {
            Some(_) => assert!(false),
            None => assert!(true),
        }
        match read_1 {
            Some(val) => assert_eq!(val.0, 40),
            None => assert!(false),
        }
        match read_2 {
            Some(val) => assert_eq!(val.0, 30),
            None => assert!(false),
        }
    }

    #[test]
    fn test_insert_scan() {
        let (add, segment_id) = init_test_address("./addr_scan_test.persy");
        let recref = add.allocate(segment_id).unwrap();
        add.insert(segment_id, &recref, 10).unwrap();
        let recref_1 = add.allocate(segment_id).unwrap();
        add.insert(segment_id, &recref_1, 20).unwrap();
        let to_iter = add.scan(segment_id).unwrap();

        assert_eq!(to_iter.into_iter().count(), 2);
        let mut iter = add.scan(segment_id).unwrap().into_iter();
        let re = iter.next().unwrap();
        assert_eq!(re.page, recref.page);
        assert_eq!(re.pos, recref.pos);
        let re_1 = iter.next().unwrap();
        assert_eq!(re_1.page, recref_1.page);
        assert_eq!(re_1.pos, recref_1.pos);
        fs::remove_file("./addr_scan_test.persy").unwrap();
    }

    #[test]
    fn test_insert_over_page() {
        let (add, segment_id) = init_test_address("./addr_insert_over_page.persy");
        for z in 0..1000 {
            let recref = add.allocate(segment_id).unwrap();
            add.insert(segment_id, &recref, z).unwrap();
        }
        let to_iter = add.scan(segment_id).unwrap();

        assert_eq!(to_iter.into_iter().count(), 1000);

        fs::remove_file("./addr_insert_over_page.persy").unwrap();

    }
}