persy 0.7.0

Transactional Persistence Engine
Documentation
use crate::{
    address::{
        Address, ADDRESS_ENTRY_SIZE, ADDRESS_PAGE_EXP, ADDRESS_PAGE_SIZE, FLAG_EXISTS, SEGMENT_DATA_OFFSET,
        SEGMENT_HASH_OFFSET, SEGMENT_PAGE_ENTRY_COUNT_OFFSET,
    },
    allocator::Allocator,
    discref::{PageIndex, PageSeek},
    error::{PRes, PersyError},
    flush_checksum::{double_buffer_check, prepare_buffer_flush},
    id::{PersyId, RecRef},
    persy::exp_from_content_size,
};
use byteorder::{BigEndian, ByteOrder, ReadBytesExt, WriteBytesExt};
use std::{
    collections::{hash_map::DefaultHasher, HashMap},
    hash::{Hash, Hasher},
    io::{Read, Write},
    str,
    sync::Arc,
    vec,
};

pub struct AllocatedSegmentPage {
    pub new_page: u64,
    pub previus_page: u64,
}

pub struct Segment {
    pub first_page: u64,
    persistent_page: u64,
    persistent_pos: u32,
    pub alloc_page: u64,
    pub alloc_pos: u32,
    segment_id: u32,
    name: String,
}

impl Segment {
    pub fn new(
        first_page: u64,
        persistent_page: u64,
        persistent_pos: u32,
        alloc_page: u64,
        alloc_pos: u32,
        segment_id: u32,
        name: &str,
    ) -> Segment {
        Segment {
            first_page,
            persistent_page,
            persistent_pos,
            alloc_page,
            alloc_pos,
            segment_id,
            name: name.to_string(),
        }
    }

    pub fn allocate_internal(&mut self, allocator: &Arc<Allocator>) -> PRes<(RecRef, Option<AllocatedSegmentPage>)> {
        let page = self.alloc_page;
        let pos = self.alloc_pos;
        let new_pos = pos + ADDRESS_ENTRY_SIZE;
        Ok(if new_pos > ADDRESS_PAGE_SIZE {
            let new_page = allocator.allocate(ADDRESS_PAGE_EXP)?;
            let mut pg = allocator.write_page(page)?;
            pg.set_next(new_page)?;
            allocator.flush_page(&mut pg)?;
            let mut new_pg = allocator.write_page(new_page)?;
            new_pg.set_next(0)?;
            new_pg.set_prev(page)?;
            new_pg.set_segment_id(self.segment_id)?;
            allocator.flush_page(&mut new_pg)?;
            self.alloc_page = new_page;
            self.alloc_pos = SEGMENT_DATA_OFFSET + ADDRESS_ENTRY_SIZE;
            (
                RecRef {
                    page: self.alloc_page,
                    pos: SEGMENT_DATA_OFFSET,
                },
                Some(AllocatedSegmentPage {
                    new_page,
                    previus_page: page,
                }),
            )
        } else {
            self.alloc_pos = new_pos;
            (RecRef { page, pos }, None)
        })
    }

    pub fn collect_segment_pages(&self, allocator: &Allocator) -> PRes<Vec<u64>> {
        let mut pages = Vec::new();
        let mut page = self.first_page;
        let last = self.persistent_page;
        loop {
            let mut pag = allocator.load_page(page)?;
            let next = pag.read_u64::<BigEndian>()?;
            let mut pos = SEGMENT_DATA_OFFSET;
            loop {
                pag.seek(pos)?;
                let data_page = pag.read_u64::<BigEndian>()?;
                let flag = pag.read_u8()?;
                if flag & FLAG_EXISTS == 1 {
                    pages.push(data_page);
                }
                pos += ADDRESS_ENTRY_SIZE;
                if pos > ADDRESS_PAGE_SIZE - ADDRESS_ENTRY_SIZE {
                    break;
                }
            }
            pages.push(page);
            if page == last {
                break;
            }
            page = next;
        }
        Ok(pages)
    }
}

pub trait SegmentPageRead: PageIndex {
    fn segment_read_entry(&mut self, segment_id: u32, pos: u32) -> PRes<Option<(u64, u16)>>;
    fn segment_scan_entries(&mut self) -> PRes<(u64, Vec<u32>)>;
    fn segment_first_available_pos(&mut self) -> PRes<u32>;
}
pub trait SegmentPage: SegmentPageRead {
    fn segment_insert_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()>;
    fn segment_update_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()>;
    fn segment_delete_entry(&mut self, segment_id: u32, pos: u32) -> PRes<bool>;
    fn set_segment_id(&mut self, id: u32) -> PRes<()>;
    fn get_next(&mut self) -> PRes<u64>;
    fn set_next(&mut self, next: u64) -> PRes<()>;
    fn get_prev(&mut self) -> PRes<u64>;
    fn set_prev(&mut self, prev: u64) -> PRes<()>;
    fn recalc_count(&mut self) -> PRes<()>;
}

impl<T: ReadBytesExt + PageSeek + PageIndex> SegmentPageRead for T {
    fn segment_read_entry(&mut self, segment_id: u32, pos: u32) -> PRes<Option<(u64, u16)>> {
        self.seek(SEGMENT_HASH_OFFSET)?;
        let persistent_id = self.read_u32::<BigEndian>()?;
        if persistent_id != segment_id {
            return Ok(None);
        }
        self.seek(pos)?;
        let record = self.read_u64::<BigEndian>()?;
        let flag = self.read_u8()?;
        let version = self.read_u16::<BigEndian>()?;
        Ok(if flag & FLAG_EXISTS == 0 {
            None
        } else {
            Some((record, version))
        })
    }

    fn segment_scan_entries(&mut self) -> PRes<(u64, Vec<u32>)> {
        let next_page = self.read_u64::<BigEndian>()?;
        let mut pos = SEGMENT_DATA_OFFSET;
        let mut recs = Vec::new();
        loop {
            self.seek(pos + 8)?;
            let flag = self.read_u8()?;
            if flag & FLAG_EXISTS == 1 {
                recs.push(pos);
            }
            pos += ADDRESS_ENTRY_SIZE;
            if pos > ADDRESS_PAGE_SIZE - ADDRESS_ENTRY_SIZE {
                break;
            }
        }
        Ok((next_page, recs))
    }

    fn segment_first_available_pos(&mut self) -> PRes<u32> {
        let mut pos = SEGMENT_DATA_OFFSET;
        let mut empty_pos = 0;
        loop {
            self.seek(pos + 8)?;
            let flag = self.read_u8()?;
            pos += ADDRESS_ENTRY_SIZE;
            if flag & FLAG_EXISTS == 1 {
                empty_pos = pos;
            }
            if pos > ADDRESS_PAGE_SIZE - ADDRESS_ENTRY_SIZE {
                break;
            }
        }
        Ok(empty_pos)
    }
}

fn inc_version(mut version: u16) -> u16 {
    version += 1;
    if version == 0 {
        1
    } else {
        version
    }
}

impl<T: WriteBytesExt + ReadBytesExt + PageSeek + PageIndex> SegmentPage for T {
    fn segment_insert_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()> {
        debug_assert!(pos >= SEGMENT_DATA_OFFSET);
        self.seek(SEGMENT_HASH_OFFSET)?;
        let persistent_id = self.read_u32::<BigEndian>()?;
        if persistent_id != segment_id {
            return Err(PersyError::SegmentNotFound);
        }
        // TODO: In case of restore this may get a wrong value, so on restore should be
        // re-calculated from the persistent flags.
        self.seek(SEGMENT_PAGE_ENTRY_COUNT_OFFSET)?;
        let count = self.read_u8()?;
        self.seek(SEGMENT_PAGE_ENTRY_COUNT_OFFSET)?;
        self.write_u8(count + 1)?;
        self.seek(pos)?;
        self.write_u64::<BigEndian>(record_page)?;
        self.write_u8(FLAG_EXISTS)?;
        self.write_u16::<BigEndian>(1)?;
        Ok(())
    }
    fn segment_update_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()> {
        debug_assert!(pos >= SEGMENT_DATA_OFFSET);
        self.seek(SEGMENT_HASH_OFFSET)?;
        let persistent_id = self.read_u32::<BigEndian>()?;
        if persistent_id != segment_id {
            return Err(PersyError::RecordNotFound(PersyId(RecRef::new(self.get_index(), pos))));
        }
        self.seek(pos + 9)?;
        let version = self.read_u16::<BigEndian>()?;
        self.seek(pos)?;
        self.write_u64::<BigEndian>(record_page)?;
        self.seek(pos + 9)?;
        self.write_u16::<BigEndian>(inc_version(version))?;
        Ok(())
    }
    fn segment_delete_entry(&mut self, segment_id: u32, pos: u32) -> PRes<bool> {
        debug_assert!(pos >= SEGMENT_DATA_OFFSET);
        self.seek(SEGMENT_HASH_OFFSET)?;
        let persistent_id = self.read_u32::<BigEndian>()?;
        if persistent_id != segment_id {
            return Err(PersyError::RecordNotFound(PersyId(RecRef::new(self.get_index(), pos))));
        }
        self.seek(SEGMENT_PAGE_ENTRY_COUNT_OFFSET)?;
        let count = self.read_u8()? - 1;
        self.seek(SEGMENT_PAGE_ENTRY_COUNT_OFFSET)?;
        self.write_u8(count)?;
        self.seek(pos + 8)?;
        let flag = self.read_u8()?;
        self.seek(pos + 8)?;
        self.write_u8(flag ^ FLAG_EXISTS)?;
        Ok(count == 0)
    }

    fn set_segment_id(&mut self, id: u32) -> PRes<()> {
        self.seek(SEGMENT_HASH_OFFSET)?;
        self.write_u32::<BigEndian>(id)?;
        Ok(())
    }

    fn get_next(&mut self) -> PRes<u64> {
        self.seek(0)?;
        self.read_u64::<BigEndian>().map_err(<_>::into)
    }

    fn set_next(&mut self, next: u64) -> PRes<()> {
        self.seek(0)?;
        self.write_u64::<BigEndian>(next)?;
        Ok(())
    }

    fn get_prev(&mut self) -> PRes<u64> {
        self.seek(8)?;
        self.read_u64::<BigEndian>().map_err(<_>::into)
    }

    fn set_prev(&mut self, prev: u64) -> PRes<()> {
        self.seek(8)?;
        self.write_u64::<BigEndian>(prev)?;
        Ok(())
    }

    fn recalc_count(&mut self) -> PRes<()> {
        let count = self.segment_scan_entries()?.1.len();
        self.seek(SEGMENT_PAGE_ENTRY_COUNT_OFFSET)?;
        self.write_u8(count as u8)?;
        Ok(())
    }
}

pub struct Segments {
    pub root_page: u64,
    content_page: u64,
    last_flush: u8,
    pub segments: HashMap<u32, Segment>,
    pub segments_id: HashMap<String, u32>,
    pub temp_segments: HashMap<u32, Segment>,
    pub temp_segments_id: HashMap<String, u32>,
}

pub fn segment_hash(segment: &str) -> u32 {
    let mut val: u32;
    let hasher = &mut DefaultHasher::new();
    segment.hash(hasher);
    val = hasher.finish() as u32;
    val <<= 16;
    val |= u32::from(rand::random::<u16>());
    val
}

impl Segments {
    pub fn new(root_page: u64, allocator: &Arc<Allocator>) -> PRes<Segments> {
        let mut buffer_0 = [0; 11];
        let mut buffer_1 = [0; 11];
        let page_id;
        let last_flush;
        {
            let mut root = allocator.load_page(root_page)?;
            root.read_exact(&mut buffer_0)?;
            root.read_exact(&mut buffer_1)?;
            let (flush, first) = double_buffer_check(&buffer_0, &buffer_1);
            last_flush = flush;
            if first {
                page_id = BigEndian::read_u64(&buffer_0[0..8]);
            } else {
                page_id = BigEndian::read_u64(&buffer_1[0..8]);
            }
        }
        let mut segments = HashMap::new();
        let mut segments_id = HashMap::new();
        if page_id != 0 {
            let mut page = allocator.load_page(page_id)?;
            loop {
                let flag = page.read_u8()?;
                if flag == 1 {
                    let first_page = page.read_u64::<BigEndian>()?;
                    let persistent_page = page.read_u64::<BigEndian>()?;
                    let persistent_pos = page.read_u32::<BigEndian>()?;
                    let pers_hash = page.read_u32::<BigEndian>()?;
                    let name_size = page.read_u16::<BigEndian>()? as usize;

                    let mut slice: Vec<u8> = vec![0; name_size];
                    page.read_exact(&mut slice)?;
                    let name: String = str::from_utf8(&slice[0..name_size])?.into();
                    segments.insert(
                        pers_hash,
                        Segment::new(
                            first_page,
                            persistent_page,
                            persistent_pos,
                            persistent_page,
                            persistent_pos,
                            pers_hash,
                            &name,
                        ),
                    );
                    segments_id.insert(name, pers_hash);
                } else {
                    break;
                }
            }
        }
        Ok(Segments {
            root_page,
            content_page: page_id,
            last_flush,
            segments,
            segments_id,
            temp_segments: HashMap::new(),
            temp_segments_id: HashMap::new(),
        })
    }

    pub fn init(root_page: u64, allocator: &Allocator) -> PRes<()> {
        let mut buffer = [0; 11];
        BigEndian::write_u64(&mut buffer[0..8], 0);
        prepare_buffer_flush(&mut buffer, 0);
        let mut root = allocator.write_page(root_page)?;
        root.write_all(&buffer)?;
        allocator.flush_page(&mut root)?;
        Ok(())
    }

    pub fn segment_id(&self, segment: &str) -> Option<u32> {
        if let Some(id) = self.segments_id.get(segment) {
            self.segments.get(id).map(|x| x.segment_id)
        } else {
            None
        }
    }

    pub fn segment_by_id(&self, id: u32) -> Option<&Segment> {
        self.segments.get(&id)
    }

    pub fn segment_by_id_temp(&self, id: u32) -> Option<&Segment> {
        self.temp_segments.get(&id)
    }

    pub fn has_segment(&self, segment: &str) -> bool {
        self.segments_id.contains_key(segment)
    }

    pub fn create_temp_segment(&mut self, allocator: &Arc<Allocator>, segment: &str) -> PRes<(u32, u64)> {
        let allocated = allocator.allocate(ADDRESS_PAGE_EXP)?;
        let segment_id = segment_hash(segment);
        let seg = Segment::new(
            allocated,
            allocated,
            SEGMENT_DATA_OFFSET,
            allocated,
            SEGMENT_DATA_OFFSET,
            segment_id,
            segment,
        );
        self.temp_segments.insert(segment_id, seg);
        self.temp_segments_id.insert(segment.to_string(), segment_id);
        let mut pag = allocator.write_page(allocated)?;
        pag.write_u64::<BigEndian>(0)?;
        pag.write_u64::<BigEndian>(0)?;
        pag.write_u32::<BigEndian>(segment_id)?;
        allocator.flush_page(&mut pag)?;
        Ok((segment_id, allocated))
    }

    pub fn get_temp_segment_mut(&mut self, segment: u32) -> Option<&mut Segment> {
        self.temp_segments.get_mut(&segment)
    }

    pub fn drop_temp_segment(&mut self, allocator: &Arc<Allocator>, segment: u32) -> PRes<()> {
        if let Some(segment) = self.temp_segments.remove(&segment) {
            self.temp_segments_id.remove(&segment.name);
            //TODO: to review this, may cause disc leaks in case of crash on rollback of segment
            // creation
            let pages = segment.collect_segment_pages(allocator)?;
            for page in pages {
                allocator.free(page)?;
            }
        }
        Ok(())
    }

    pub fn exists_real_or_temp(&self, segment: u32) -> bool {
        self.segments.contains_key(&segment) || self.temp_segments.contains_key(&segment)
    }

    pub fn create_segment(&mut self, segment: u32, first_page: u64) -> PRes<()> {
        if let Some(mut s) = self.temp_segments.remove(&segment) {
            // This is needed mainly for recover
            s.first_page = first_page;
            self.temp_segments_id.remove(&s.name);
            self.segments_id.insert(s.name.clone(), s.segment_id);
            self.segments.insert(s.segment_id, s);
        }
        Ok(())
    }

    pub fn drop_segment(&mut self, segment: &str) -> PRes<()> {
        if let Some(seg) = self.segments_id.remove(segment) {
            self.segments.remove(&seg);
        }

        Ok(())
    }
    pub fn collect_segment_pages(&self, allocator: &Allocator, segment: u32) -> PRes<Vec<u64>> {
        if let Some(seg) = self.segments.get(&segment) {
            seg.collect_segment_pages(allocator)
        } else {
            Ok(Vec::new())
        }
    }
    pub fn set_first_page(&mut self, segment: u32, first_page: u64, allocator: &Arc<Allocator>) -> PRes<()> {
        if let Some(seg) = self.segments.get_mut(&segment) {
            seg.first_page = first_page;
        }
        self.flush_segments(allocator)?;
        allocator.disc().sync()?;
        Ok(())
    }
    pub fn reset(&mut self, segment: u32, allocator: &Arc<Allocator>) -> PRes<()> {
        if let Some(seg) = self.segments.get_mut(&segment) {
            seg.persistent_page = seg.first_page;
            seg.persistent_pos = SEGMENT_DATA_OFFSET;
            seg.alloc_page = seg.first_page;
            seg.alloc_pos = SEGMENT_DATA_OFFSET;
        }
        self.flush_segments(allocator)?;
        allocator.disc().sync()?;
        Ok(())
    }

    pub fn confirm_allocations(&mut self, segments: &[u32], allocator: &Allocator, recover: bool) -> PRes<()> {
        for seg in segments {
            // If happen that a segment is not found something went wrong and is better crash
            let segment = if let Some(s) = self.segments.get_mut(seg) {
                s
            } else if let Some(s) = self.temp_segments.get_mut(seg) {
                s
            } else {
                panic!("segment operation when segment do not exists");
            };
            segment.persistent_page = segment.alloc_page;
            if recover {
                // In case of recover recalculate the last available position
                let mut page = allocator.load_page(segment.alloc_page)?;
                segment.alloc_pos = page.segment_first_available_pos()?;
            }
            segment.persistent_pos = segment.alloc_pos;
        }
        Ok(())
    }

    pub fn flush_segments(&mut self, allocator: &Arc<Allocator>) -> PRes<()> {
        let mut buffer = Vec::<u8>::new();
        for segment in self.segments.values() {
            buffer.write_u8(1)?;
            buffer.write_u64::<BigEndian>(segment.first_page)?;
            buffer.write_u64::<BigEndian>(segment.persistent_page)?;
            buffer.write_u32::<BigEndian>(segment.persistent_pos)?;
            buffer.write_u32::<BigEndian>(segment.segment_id)?;
            buffer.write_u16::<BigEndian>(segment.name.len() as u16)?;
            buffer.write_all(segment.name.as_bytes())?;
        }
        buffer.write_u8(0)?;
        let exp = exp_from_content_size(buffer.len() as u64);
        let content_page_id;
        {
            content_page_id = allocator.allocate(exp)?;
            let mut content_page = allocator.write_page(content_page_id)?;
            content_page.write_all(&buffer)?;
            allocator.flush_page(&mut content_page)?;
        }

        let mut root_buffer = [0; 11];
        BigEndian::write_u64(&mut root_buffer[0..8], content_page_id);
        let (last_flush, offset) = prepare_buffer_flush(&mut root_buffer, self.last_flush);
        self.last_flush = last_flush;
        {
            let mut root = allocator.write_page(self.root_page)?;
            root.seek(offset)?;
            root.write_all(&root_buffer)?;
            allocator.flush_page(&mut root)?;
        }
        if self.content_page != 0 {
            allocator.free(self.content_page)?;
        }
        self.content_page = content_page_id;
        Ok(())
    }

    pub fn list(&self) -> Vec<(String, u32)> {
        self.segments_id.iter().map(|(n, id)| (n.clone(), *id)).collect()
    }
}

pub struct SegmentPageIterator {
    cur_page: u64,
    next_page: u64,
    per_page_iterator: vec::IntoIter<u32>,
}

impl SegmentPageIterator {
    pub fn new(first_page: u64) -> SegmentPageIterator {
        let vc = Vec::new();
        let vci = vc.into_iter();
        SegmentPageIterator {
            cur_page: first_page,
            next_page: first_page,
            per_page_iterator: vci,
        }
    }

    pub fn next(&mut self, address: &Address) -> Option<RecRef> {
        // This loop is needed because some pages may be empty
        loop {
            let iter = self.per_page_iterator.next();
            if iter.is_none() && self.next_page != 0 {
                self.cur_page = self.next_page;
                if let Ok(tp) = address.scan_page(self.cur_page) {
                    self.next_page = tp.0;
                    self.per_page_iterator = tp.1.into_iter();
                    continue;
                }
            }
            break iter;
        }
        .map(|pos| RecRef::new(self.cur_page, pos))
    }
}

#[cfg(test)]
mod tests {
    use super::{segment_hash, SegmentPage, SegmentPageRead, Segments};
    use crate::{
        address::ADDRESS_PAGE_EXP,
        allocator::Allocator,
        config::Config,
        discref::{DiscRef, Page},
    };
    use std::sync::Arc;
    use tempfile::Builder;

    fn create_allocator(file_name: &str) -> Allocator {
        let file = Builder::new()
            .prefix(file_name)
            .suffix(".persy")
            .tempfile()
            .unwrap()
            .reopen()
            .unwrap();
        let config = Arc::new(Config::new());
        let disc = DiscRef::new(file);
        let pa = Allocator::init(&disc).unwrap();
        let allocator = Allocator::new(disc, &config, pa).unwrap();
        allocator.allocate(5).unwrap(); //Just to be sure it not start from 0, it cannot happen in not test cases.

        allocator
    }

    #[test]
    fn test_create_drop_segment() {
        let allocator = create_allocator("./raw_segment_create_delete.persy");
        let all = Arc::new(allocator);
        let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
        Segments::init(root, &all).unwrap();
        let mut segments = Segments::new(root, &all).unwrap();

        let (id, fp) = segments.create_temp_segment(&all, "some").unwrap();
        segments.create_segment(id, fp).unwrap();
        segments.flush_segments(&all).unwrap();
        assert!(segments.segments_id.contains_key("some"));
        assert!(segments.segments.contains_key(&id));
        segments.drop_segment("some").unwrap();
        segments.flush_segments(&all).unwrap();
        assert!(!segments.segments_id.contains_key("some"));
        assert!(!segments.segments.contains_key(&id));
    }

    #[test]
    fn test_create_drop_temp_segment() {
        let allocator = create_allocator("./segment_create_delete.persy");
        let all = Arc::new(allocator);
        let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
        Segments::init(root, &all).unwrap();
        let mut segments = Segments::new(root, &all).unwrap();

        let (id, _) = segments.create_temp_segment(&all, "some").unwrap();
        assert!(segments.temp_segments.contains_key(&id));
        assert!(segments.temp_segments_id.contains_key("some"));
        segments.drop_temp_segment(&all, id).unwrap();
        assert!(!segments.temp_segments.contains_key(&id));
        assert!(!segments.temp_segments_id.contains_key("some"));
    }

    #[test]
    fn test_create_close_drop_close_segment() {
        let allocator = create_allocator("./segment_pers_create_delete.persy");
        let all = Arc::new(allocator);
        let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
        Segments::init(root, &all).unwrap();
        let id;
        {
            let mut segments = Segments::new(root, &all).unwrap();
            let (c_id, fp) = segments.create_temp_segment(&all, "some").unwrap();
            id = c_id;
            segments.create_segment(id, fp).unwrap();
            segments.flush_segments(&all).unwrap();
        }
        {
            let mut segments = Segments::new(root, &all).unwrap();
            assert_eq!(segments.segments.len(), 1);
            assert!(segments.segments_id.contains_key("some"));
            assert!(segments.segments.contains_key(&id));
            segments.drop_segment("some").unwrap();
            segments.flush_segments(&all).unwrap();
        }
        {
            let segments = Segments::new(root, &all).unwrap();
            assert!(!segments.segments_id.contains_key("some"));
            assert!(!segments.segments.contains_key(&id));
        }
    }

    #[test]
    fn test_create_close_drop_close_segment_off_page() {
        let allocator = create_allocator("./segment_pers_create_delete_off_page.persy");
        let all = Arc::new(allocator);
        let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
        Segments::init(root, &all).unwrap();
        {
            let mut segments = Segments::new(root, &all).unwrap();
            for i in 0..100 {
                let (id, fp) = segments.create_temp_segment(&all, &format!("some{}", i)).unwrap();
                segments.create_segment(id, fp).unwrap();
            }
            segments.flush_segments(&all).unwrap();
        }
        {
            let mut segments = Segments::new(root, &all).unwrap();
            for i in 0..100 {
                assert!(segments.segments_id.contains_key(&format!("some{}", i)));
                segments.drop_segment(&format!("some{}", i)).unwrap();
            }
            segments.flush_segments(&all).unwrap();
        }
        {
            let segments = Segments::new(root, &all).unwrap();
            for i in 0..100 {
                assert!(!segments.segments_id.contains_key(&format!("some{}", i)));
            }
        }
    }

    #[test]
    fn test_seg_insert_read_pointer() {
        let mut page = Page::new(vec![0; 1024], 0, 0, 10);
        page.segment_insert_entry(0, 30, 10).unwrap();
        let read = page.segment_read_entry(0, 30).unwrap();
        match read {
            Some(val) => assert_eq!(val.0, 10),
            None => assert!(false),
        }
    }

    #[test]
    fn test_seg_insert_update_read_pointer() {
        let mut page = Page::new(vec![0; 1024], 0, 0, 10);
        page.segment_insert_entry(0, 30, 10).unwrap();
        page.segment_update_entry(0, 30, 15).unwrap();
        let read = page.segment_read_entry(0, 30).unwrap();
        match read {
            Some(val) => assert_eq!(val.0, 15),
            None => assert!(false),
        }
    }

    #[test]
    fn test_seg_insert_delete_read_pointer() {
        let mut page = Page::new(vec![0; 1024], 0, 0, 10);
        page.segment_insert_entry(0, 30, 10).unwrap();
        page.segment_delete_entry(0, 30).unwrap();
        let read = page.segment_read_entry(0, 30).unwrap();
        match read {
            Some(_) => assert!(false),
            None => assert!(true),
        }
    }

    #[test]
    fn test_hash_id_generator() {
        assert!(0 != segment_hash("some"));
    }
}