persy 0.7.0

Transactional Persistence Engine
Documentation
use crate::{
    allocator::Allocator,
    config::Config,
    discref::{Page, PAGE_METADATA_SIZE},
    error::{PRes, PersyError},
    id::{PersyId, RecRef},
    locks::{LockManager, RwLockManager},
    segment::{AllocatedSegmentPage, SegmentPage, SegmentPageIterator, SegmentPageRead, Segments},
    transaction::{DeleteRecord, InsertRecord, NewSegmentPage, SegmentOperation, UpdateRecord},
};
use std::{
    collections::{hash_map::Entry, HashMap, HashSet},
    sync::{Arc, RwLock},
};

pub const ADDRESS_ROOT_PAGE_EXP: u8 = 5; // 2^5
pub const ADDRESS_PAGE_EXP: u8 = 10; // 2^10
pub const ADDRESS_PAGE_SIZE: u32 = (1 << ADDRESS_PAGE_EXP) - PAGE_METADATA_SIZE; // 2^10 -2 size - page header
pub const FLAG_EXISTS: u8 = 0b000_0001;
pub const SEGMENT_HASH_OFFSET: u32 = 16;
pub const SEGMENT_PAGE_ENTRY_COUNT_OFFSET: u32 = 20;
pub const SEGMENT_DATA_OFFSET: u32 = 21;
pub const ADDRESS_ENTRY_SIZE: u32 = 8 + 1 + 2; // Pointer to data page + flags + version management (not yet used)

pub struct OldRecordInfo {
    pub recref: RecRef,
    pub segment: u32,
    pub record_page: u64,
    pub version: u16,
}

impl OldRecordInfo {
    fn new(recref: &RecRef, segment: u32, record_page: u64, version: u16) -> OldRecordInfo {
        OldRecordInfo {
            recref: recref.clone(),
            segment,
            record_page,
            version,
        }
    }
}

/// Address segment keep the basic addressing of the data in the data segment for a specific
/// data block
pub struct Address {
    config: Arc<Config>,
    allocator: Arc<Allocator>,
    record_locks: LockManager<RecRef>,
    segment_locks: RwLockManager<u32>,
    segments: RwLock<Segments>,
}

impl Address {
    pub fn new(all: &Arc<Allocator>, config: &Arc<Config>, page: u64) -> PRes<Address> {
        let segments = Segments::new(page, all)?;
        Ok(Address {
            config: config.clone(),
            allocator: all.clone(),
            record_locks: Default::default(),
            segment_locks: Default::default(),
            segments: RwLock::new(segments),
        })
    }

    pub fn init(all: &Allocator) -> PRes<u64> {
        let page = all.allocate(ADDRESS_ROOT_PAGE_EXP)?;
        Segments::init(page, all)?;
        Ok(page)
    }

    pub fn scan(&self, segment: u32) -> PRes<SegmentPageIterator> {
        let segments = self.segments.read()?;
        if let Some(segment) = segments.segment_by_id(segment) {
            Ok(SegmentPageIterator::new(segment.first_page))
        } else if let Some(temp_segment) = segments.segment_by_id_temp(segment) {
            Ok(SegmentPageIterator::new(temp_segment.first_page))
        } else {
            Err(PersyError::SegmentNotFound)
        }
    }

    pub fn scan_page(&self, cur_page: u64) -> PRes<(u64, Vec<u32>)> {
        // THIS IS ONLY FOR LOCK PROTECTION
        let _lock = self.segments.read()?;
        let mut page = self.allocator.load_page(cur_page)?;
        page.segment_scan_entries()
    }

    pub fn allocate_temp(&self, segment: u32) -> PRes<(RecRef, Option<AllocatedSegmentPage>)> {
        if let Some(found) = self.segments.write()?.get_temp_segment_mut(segment) {
            found.allocate_internal(&self.allocator)
        } else {
            Err(PersyError::SegmentNotFound)
        }
    }

    pub fn create_temp_segment(&self, segment: &str) -> PRes<(u32, u64)> {
        self.segments.write()?.create_temp_segment(&self.allocator, segment)
    }

    pub fn drop_temp_segment(&self, segment: u32) -> PRes<()> {
        self.segments.write()?.drop_temp_segment(&self.allocator, segment)
    }

    pub fn allocate(&self, segment: u32) -> PRes<(RecRef, Option<AllocatedSegmentPage>)> {
        if let Some(found) = self.segments.write()?.segments.get_mut(&segment) {
            found.allocate_internal(&self.allocator)
        } else {
            Err(PersyError::SegmentNotFound)
        }
    }

    pub fn acquire_locks(&self, records: &[(u32, RecRef, u16)], created_updated: &[u32], deleted: &[u32]) -> PRes<()> {
        let timeout = self.config.transaction_lock_timeout();
        self.segment_locks.lock_all_write(&deleted, timeout.clone())?;
        if let Err(x) = self.segment_locks.lock_all_read(&created_updated, timeout.clone()) {
            if let Err(e) = self.segment_locks.unlock_all_write(&deleted) {
                dbg!("unlock error: {}", e);
            }
            return Err(x);
        }

        let to_lock: Vec<_> = records.iter().map(|(_, id, _)| id.clone()).collect();
        if let Err(x) = self.record_locks.lock_all(&to_lock, timeout.clone()) {
            if let Err(e) = self.segment_locks.unlock_all_write(&deleted) {
                dbg!("unlock error: {}", e);
            }
            if let Err(e) = self.segment_locks.unlock_all_read(&created_updated) {
                dbg!("unlock error: {}", e);
            }
            return Err(x);
        }

        let segs = self.segments.read()?;
        for segment in created_updated {
            if !segs.exists_real_or_temp(*segment) {
                if let Err(e) = self.segment_locks.unlock_all_write(&deleted) {
                    dbg!("unlock error: {}", e);
                }
                if let Err(e) = self.segment_locks.unlock_all_read(&created_updated) {
                    dbg!("unlock error: {}", e);
                }
                if let Err(e) = self.record_locks.unlock_all(&to_lock) {
                    dbg!("unlock error: {}", e);
                }
                return Err(PersyError::SegmentNotFound);
            }
        }
        Ok(())
    }

    pub fn confirm_allocations(&self, segs: &[u32], recover: bool) -> PRes<()> {
        let mut segments = self.segments.write()?;
        segments.confirm_allocations(&segs, &self.allocator, recover)?;
        segments.flush_segments(&self.allocator)?;
        Ok(())
    }

    pub fn check_persistent_records(
        &self,
        records: &[(u32, RecRef, u16)],
        check_version: bool,
    ) -> PRes<Vec<OldRecordInfo>> {
        let mut current_record_pages = Vec::new();
        for &(segment, ref recref, version) in records {
            let val = self.read(recref, segment)?;
            if let Some((record, pers_version)) = val {
                current_record_pages.push(OldRecordInfo::new(&recref, segment, record, pers_version));
                if check_version && pers_version != version {
                    return Err(PersyError::VersionNotLastest);
                }
            } else {
                return Err(PersyError::RecordNotFound(PersyId(recref.clone())));
            }
        }
        Ok(current_record_pages)
    }

    pub fn release_locks(&self, records: &[(u32, RecRef, u16)], created_updated: &[u32], deleted: &[u32]) -> PRes<()> {
        self.record_locks.unlock_all_iter(records.iter().map(|(_, id, _)| id))?;
        self.segment_locks.unlock_all_read(&created_updated)?;
        self.segment_locks.unlock_all_write(&deleted)?;
        Ok(())
    }

    pub fn rollback(&self, _: &RecRef) -> PRes<()> {
        // TODO:make address re-available
        Ok(())
    }

    pub fn apply(
        &self,
        segs_new_pages: &[NewSegmentPage],
        inserts: &[InsertRecord],
        updates: &[UpdateRecord],
        deletes: &[DeleteRecord],
        seg_ops: &[SegmentOperation],
        recover: bool,
    ) -> PRes<Vec<(u32, u64)>> {
        let mut segments = self.segments.write()?;
        let mut dropped = HashSet::new();
        for seg_op in seg_ops {
            if let SegmentOperation::DROP(ref op) = *seg_op {
                dropped.insert(op.segment_id);
            }
        }
        let mut pages = HashMap::new();

        if recover {
            for new_page in segs_new_pages {
                let p_page = self.get_or_insert_mut(&mut pages, new_page.previous)?;
                p_page.set_next(new_page.page)?;
                let n_page = self.get_or_insert_mut(&mut pages, new_page.page)?;
                n_page.set_prev(new_page.previous)?;
                n_page.set_segment_id(new_page.segment)?;
            }
        }
        for insert in inserts {
            if !dropped.contains(&insert.segment) {
                let page = insert.recref.page;
                let seg_page = self.get_or_insert_mut(&mut pages, page)?;
                seg_page.segment_insert_entry(insert.segment, insert.recref.pos, insert.record_page)?;
            }
        }

        for update in updates {
            if !dropped.contains(&update.segment) {
                let page = update.recref.page;
                let seg_page = self.get_or_insert_mut(&mut pages, page)?;
                seg_page.segment_update_entry(update.segment, update.recref.pos, update.record_page)?;
            }
        }
        let mut pages_to_remove = Vec::new();

        for delete in deletes {
            if !dropped.contains(&delete.segment) {
                let page = delete.recref.page;
                let seg_page = self.get_or_insert_mut(&mut pages, page)?;
                if seg_page.segment_delete_entry(delete.segment, delete.recref.pos)? {
                    pages_to_remove.push((delete.segment, page));
                }
            }
        }

        if recover {
            for v in pages.values_mut() {
                v.recalc_count()?;
            }
            let recover_page = |record_page: u64| {
                let mut page = self.allocator.load_page(record_page)?;
                self.allocator.remove_from_free(record_page, page.get_size_exp())
            };
            for insert in inserts {
                recover_page(insert.record_page)?;
            }
            for update in updates {
                recover_page(update.record_page)?;
            }
        }

        for to_flush in &mut pages.values_mut() {
            self.allocator.flush_page(to_flush)?;
        }

        for seg_op in seg_ops {
            if let SegmentOperation::DROP(ref op) = *seg_op {
                segments.drop_segment(&op.name)?;
            }
        }

        for seg_op in seg_ops {
            if let SegmentOperation::CREATE(ref op) = *seg_op {
                segments.create_segment(op.segment_id, op.first_page)?;
            }
        }
        segments.flush_segments(&self.allocator)?;

        Ok(pages_to_remove)
    }

    pub fn collect_segment_pages(&self, allocator: &Allocator, segment: u32) -> PRes<Vec<u64>> {
        let segments = self.segments.read()?;
        segments.collect_segment_pages(allocator, segment)
    }

    #[allow(dead_code)]
    pub fn clear_empty(&self, empty: Vec<(u32, u64)>) -> PRes<()> {
        // This is not safe so disabled for now because we cannot unlink pages concurrently while
        // scanning the segment
        let mut segments = self.segments.write()?;
        for (segment, page) in empty {
            let mut p = self.allocator.write_page(page)?;
            let next = p.get_next()?;
            let prev = p.get_prev()?;

            if next != 0 {
                let mut next_page = self.allocator.write_page(next)?;
                // if (prev == 0), this is like doing setPrev(0)
                next_page.set_prev(prev)?;
                self.allocator.flush_page(&mut next_page)?;
            }

            if prev != 0 {
                let mut prev_page = self.allocator.write_page(prev)?;
                prev_page.set_next(next)?;
                self.allocator.flush_page(&mut prev_page)?;
            } else if next != 0 {
                segments.set_first_page(segment, next, &self.allocator)?;
            } else {
                segments.reset(segment, &self.allocator)?;
            }
            self.allocator.flush_page(&mut p)?;
            self.allocator.free(page)?;
        }

        Ok(())
    }

    pub fn exists_segment(&self, segment: &str) -> PRes<bool> {
        Ok(self.segments.read()?.has_segment(segment))
    }

    pub fn segment_id(&self, segment: &str) -> PRes<Option<u32>> {
        Ok(self.segments.read()?.segment_id(segment))
    }

    // Used only from the tests
    #[allow(dead_code)]
    pub fn insert(&self, segment_id: u32, recref: &RecRef, record_page: u64) -> PRes<()> {
        let mut page = self.allocator.write_page(recref.page)?;
        page.segment_insert_entry(segment_id, recref.pos, record_page)?;
        self.allocator.flush_page(&mut page)?;
        Ok(())
    }

    pub fn read(&self, recref: &RecRef, segment: u32) -> PRes<Option<(u64, u16)>> {
        let mut page = self.allocator.load_page(recref.page)?;
        page.segment_read_entry(segment, recref.pos)
    }

    fn get_or_insert_mut<'a>(&self, map: &'a mut HashMap<u64, Page>, k: u64) -> PRes<&'a mut Page> {
        Ok(match map.entry(k) {
            Entry::Occupied(entry) => entry.into_mut(),
            Entry::Vacant(entry) => entry.insert(self.allocator.write_page(k)?),
        })
    }

    pub fn list(&self) -> PRes<Vec<(String, u32)>> {
        Ok(self.segments.read()?.list())
    }
}

#[cfg(test)]
mod tests {
    use super::Address;
    use crate::transaction::{CreateSegment, DeleteRecord, InsertRecord, SegmentOperation, UpdateRecord};
    use crate::{allocator::Allocator, config::Config, discref::DiscRef};
    use std::sync::Arc;
    use tempfile::Builder;

    fn init_test_address(file_name: &str) -> (Address, u32) {
        let file = Builder::new().prefix(file_name).suffix(".persy").tempfile().unwrap();
        let config = Arc::new(Config::new());
        let disc = DiscRef::new(file.reopen().unwrap());
        let pa = Allocator::init(&disc).unwrap();
        let allocator = Allocator::new(disc, &config, pa).unwrap();
        let page = Address::init(&allocator).unwrap();
        let addr = Address::new(&Arc::new(allocator), &config, page).unwrap();
        let (id, fp) = addr.create_temp_segment("def").unwrap();
        addr.segments.write().unwrap().create_segment(id, fp).unwrap();
        (addr, id)
    }

    #[test]
    fn test_init_and_new_address() {
        let (add, segment_id) = init_test_address("./addr_test");
        assert_eq!(
            add.segments
                .read()
                .unwrap()
                .segment_by_id(segment_id)
                .unwrap()
                .alloc_page,
            1056
        );
        assert_eq!(
            add.segments
                .read()
                .unwrap()
                .segment_by_id(segment_id)
                .unwrap()
                .alloc_pos,
            21
        );
    }

    #[test]
    fn test_insert_update_delete_read_apply_pointer() {
        let (add, segment_id) = init_test_address("./addr_insert_update_delete_apply_test.persy");
        let (recref, _) = add.allocate(segment_id).unwrap();
        add.insert(segment_id, &recref, 10).unwrap();
        let (recref_1, _) = add.allocate(segment_id).unwrap();
        add.insert(segment_id, &recref_1, 20).unwrap();

        let mut inserted = Vec::new();
        let (recref_2, _) = add.allocate(segment_id).unwrap();
        inserted.push(InsertRecord::new(segment_id, &recref_2, 30));

        let mut updated = Vec::new();
        updated.push(UpdateRecord::new(segment_id, &recref_1, 40, 1));

        let mut deleted = Vec::new();

        deleted.push(DeleteRecord::new(segment_id, &recref, 1));
        let mut seg_ops = Vec::new();
        seg_ops.push(SegmentOperation::CREATE(CreateSegment::new("def", 20, 20)));

        add.apply(&[], &inserted, &updated, &deleted, &seg_ops, false).unwrap();

        let read = add.read(&recref, segment_id).unwrap();
        let read_1 = add.read(&recref_1, segment_id).unwrap();
        let read_2 = add.read(&recref_2, segment_id).unwrap();
        match read {
            Some(_) => assert!(false),
            None => assert!(true),
        }
        match read_1 {
            Some(val) => assert_eq!(val.0, 40),
            None => assert!(false),
        }
        match read_2 {
            Some(val) => assert_eq!(val.0, 30),
            None => assert!(false),
        }
    }

    #[test]
    fn test_insert_scan() {
        let (add, segment_id) = init_test_address("./addr_scan_test.persy");
        let (recref, _) = add.allocate(segment_id).unwrap();
        add.insert(segment_id, &recref, 10).unwrap();
        let (recref_1, _) = add.allocate(segment_id).unwrap();
        add.insert(segment_id, &recref_1, 20).unwrap();
        let mut to_iter = add.scan(segment_id).unwrap();
        let mut count = 0;
        while to_iter.next(&add).is_some() {
            count += 1;
        }
        assert_eq!(count, 2);
        let mut iter = add.scan(segment_id).unwrap();
        let re = iter.next(&add).unwrap();
        assert_eq!(re.page, recref.page);
        assert_eq!(re.pos, recref.pos);
        let re_1 = iter.next(&add).unwrap();
        assert_eq!(re_1.page, recref_1.page);
        assert_eq!(re_1.pos, recref_1.pos);
    }

    #[test]
    fn test_insert_over_page() {
        let (add, segment_id) = init_test_address("./addr_insert_over_page.persy");
        for z in 0..1000 {
            let (recref, _) = add.allocate(segment_id).unwrap();
            add.insert(segment_id, &recref, z).unwrap();
        }
        let mut to_iter = add.scan(segment_id).unwrap();
        let mut count = 0;
        while to_iter.next(&add).is_some() {
            count += 1;
        }
        assert_eq!(count, 1000);
    }
}