persy 0.7.0

Transactional Persistence Engine
Documentation
use crate::transaction::{
    Commit, CreateSegment, DeleteRecord, DropSegment, FreedPage, InsertRecord, Metadata, NewSegmentPage, PrepareCommit,
    ReadRecord, Rollback, Transaction, UpdateRecord,
};
use crate::{
    allocator::Allocator,
    config::TxStrategy,
    discref::{Page, PageSeek, PAGE_METADATA_SIZE},
    error::PRes,
    flush_checksum::{double_buffer_check, prepare_buffer_flush},
    id::RecRef,
    persy::RecoverStatus,
};
use byteorder::{BigEndian, ByteOrder, ReadBytesExt, WriteBytesExt};
use std::{
    collections::hash_map::HashMap,
    io::{Read, Write},
    str,
    sync::{Arc, Mutex, MutexGuard},
};

pub const JOURNAL_PAGE_EXP: u8 = 10; // 2^10
const JOURNAL_PAGE_SIZE: u32 = (1 << JOURNAL_PAGE_EXP) - PAGE_METADATA_SIZE; // 2^ 10 -2 size - page header
const JOURNAL_PAGE_NEXT_OFFSET: u32 = 0;
const JOURNAL_PAGE_PREV_OFFSET: u32 = 8;
const JOURNAL_PAGE_CONTENT_OFFSET: u32 = 16;

struct StartListEntry {
    next: Option<JournalId>,
    prev: Option<JournalId>,
}

impl StartListEntry {
    pub fn new(prev: Option<JournalId>) -> StartListEntry {
        StartListEntry { next: None, prev }
    }
}

struct StartList {
    transactions: HashMap<JournalId, StartListEntry>,
    last: Option<JournalId>,
}

impl StartList {
    fn new() -> StartList {
        StartList {
            transactions: HashMap::new(),
            last: None,
        }
    }

    fn push(&mut self, id: &JournalId) {
        self.transactions
            .insert(id.clone(), StartListEntry::new(self.last.clone()));
        if let Some(ref lst) = self.last {
            self.transactions.get_mut(lst).unwrap().next = Some(id.clone());
        }
        self.last = Some(id.clone());
    }

    fn remove(&mut self, id: &JournalId) -> bool {
        if let Some(entry) = self.transactions.remove(id) {
            if let Some(ref next) = entry.next {
                self.transactions.get_mut(next).unwrap().prev = entry.prev.clone();
            }
            if let Some(ref prev) = entry.prev {
                self.transactions.get_mut(prev).unwrap().next = entry.next.clone();
            }
            if let Some(ref l) = self.last {
                if l == id {
                    self.last = entry.prev.clone();
                }
            }
            entry.prev.is_none()
        } else {
            false
        }
    }
}

struct JournalShared {
    root: u64,
    first_page: u64,
    last_page: u64,
    last_pos: u32,
    starts: StartList,
    current: Page,
    last_flush: u8,
}

/// Journal segment is the area where the transactional log is kept
pub struct Journal {
    allocator: Arc<Allocator>,
    journal: Mutex<JournalShared>,
}

pub trait JournalEntry {
    fn get_type(&self) -> u8;
    fn write(&self, buffer: &mut dyn Write) -> PRes<()>;
    fn read(&mut self, buffer: &mut dyn Read) -> PRes<u32>;
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus>;
}

pub struct Start {}

#[derive(Hash, Eq, PartialEq, Clone, Debug)]
pub struct JournalId {
    page: u64,
    pos: u32,
}

impl Journal {
    pub fn new(all: &Arc<Allocator>, page: u64) -> PRes<Journal> {
        let first_page;
        let last_flush;
        let mut buffer_0 = [0; 11];
        let mut buffer_1 = [0; 11];

        let current;
        {
            let mut page = all.load_page(page)?;
            page.read_exact(&mut buffer_0)?;
            page.read_exact(&mut buffer_1)?;
            let (last_flush_ret, first) = double_buffer_check(&buffer_0, &buffer_1);
            last_flush = last_flush_ret;
            if first {
                first_page = BigEndian::read_u64(&buffer_0[0..8]);
            } else {
                first_page = BigEndian::read_u64(&buffer_1[0..8]);
            }
        }
        if first_page != 0 {
            current = all.write_page(first_page)?;
        } else {
            // Empty 0 sized page
            current = Page::new(Vec::new(), 0, 0, 0);
        }
        let journal = JournalShared {
            root: page,
            first_page,
            last_page: first_page,
            last_pos: 0,
            starts: StartList::new(),
            current,
            last_flush,
        };

        Ok(Journal {
            allocator: all.clone(),
            journal: Mutex::new(journal),
        })
    }

    pub fn init(allocator: &Allocator) -> PRes<u64> {
        let root_page = allocator.allocate(5)?;
        let first_page = allocator.allocate(JOURNAL_PAGE_EXP)?;
        let mut buffer = [0; 11];
        BigEndian::write_u64(&mut buffer[0..8], first_page);
        prepare_buffer_flush(&mut buffer, 0);
        let mut page = allocator.write_page(root_page)?;
        page.write_all(&buffer)?;
        allocator.flush_page(&mut page)?;
        Ok(root_page)
    }

    pub fn start(&self) -> PRes<JournalId> {
        let val = self.internal_log(&Start::new(), &JournalId::new(0, 0), false)?;
        let id = JournalId::new(val.0, val.1);
        self.journal.lock()?.starts.push(&id);
        Ok(id)
    }

    pub fn prepare(&self, entry: &dyn JournalEntry, id: &JournalId) -> PRes<()> {
        self.internal_log(entry, id, true)?;
        Ok(())
    }

    pub fn end(&self, entry: &dyn JournalEntry, id: &JournalId) -> PRes<()> {
        self.internal_log(entry, id, true)?;
        // TODO:optimization keep in memory last page and move the flush here.
        Ok(())
    }

    pub fn clear_all(&self, ids: &[JournalId]) -> PRes<bool> {
        let mut lock = self.journal.lock()?;
        let mut sync = false;
        for id in ids {
            if lock.starts.remove(id) {
                let first_page = lock.first_page;
                let mut free_cursor = id.page;
                loop {
                    let read;
                    {
                        let mut cur = self.allocator.load_page(free_cursor)?;
                        cur.seek(JOURNAL_PAGE_PREV_OFFSET)?;
                        read = cur.read_u64::<BigEndian>()?;
                    }
                    if free_cursor != id.page {
                        self.allocator.free(free_cursor)?;
                    }
                    if free_cursor == first_page {
                        break;
                    }
                    free_cursor = read;
                }

                let mut buffer = [0; 11];
                BigEndian::write_u64(&mut buffer[0..8], id.page);
                let (flush, offset) = prepare_buffer_flush(&mut buffer, lock.last_flush);
                lock.last_flush = flush;
                let mut root_page = self.allocator.write_page(lock.root)?;
                root_page.seek(offset)?;
                root_page.write_all(&buffer)?;
                self.allocator.flush_page(&mut root_page)?;
                lock.first_page = id.page;
                sync = true;
            }
        }
        Ok(sync)
    }

    pub fn log(&self, entry: &dyn JournalEntry, id: &JournalId) -> PRes<()> {
        self.internal_log(entry, id, false)?;
        Ok(())
    }

    fn internal_log(&self, entry: &dyn JournalEntry, id: &JournalId, flush: bool) -> PRes<(u64, u32)> {
        let mut buffer = Vec::<u8>::new();
        buffer.write_u8(entry.get_type())?;
        buffer.write_u64::<BigEndian>(id.page)?;
        buffer.write_u32::<BigEndian>(id.pos)?;
        entry.write(&mut buffer)?;

        let cur_page;
        let cur_pos;
        {
            let mut jr = self.journal.lock()?;
            self.required_space(buffer.len() as u32, &mut jr)?;
            cur_page = jr.last_page;
            cur_pos = jr.last_pos;
            jr.current.seek(cur_pos)?;
            jr.current.write_all(&*buffer)?;
            if flush {
                self.allocator.flush_page(&mut jr.current)?;
            }
            jr.last_pos += buffer.len() as u32;
        }
        Ok((cur_page, cur_pos))
    }

    pub fn recover<T>(&self, mut found: T) -> PRes<Vec<u64>>
    where
        T: FnMut(&dyn JournalEntry, &JournalId),
    {
        let mut journal_pages = Vec::new();
        let mut jr = self.journal.lock()?;
        let mut cur_page = jr.first_page;
        jr.last_page = jr.first_page;
        journal_pages.push(cur_page);
        let mut cur_cursor = JOURNAL_PAGE_CONTENT_OFFSET;
        loop {
            let mut page = self.allocator.load_page(cur_page)?;
            page.seek(cur_cursor)?;
            let tp = page.read_u8()?;
            if tp == 0 {
                page.seek(JOURNAL_PAGE_NEXT_OFFSET)?;
                cur_page = page.read_u64::<BigEndian>()?;
                if cur_page == 0 {
                    jr.last_pos = cur_cursor;
                    break;
                }
                journal_pages.push(cur_page);
                cur_cursor = JOURNAL_PAGE_CONTENT_OFFSET;
                jr.last_page = cur_page;
            } else {
                let mut read_size = 0;
                let page_id = page.read_u64::<BigEndian>()?;
                let pos = page.read_u32::<BigEndian>()?;
                let mut recover_entry = |entry: &mut dyn JournalEntry| -> PRes<()> {
                    read_size = entry.read(&mut page)?;
                    found(entry, &JournalId::new(page_id, pos));
                    Ok(())
                };
                match tp {
                    1 => {
                        let mut entry = Start::new();
                        read_size = entry.read(&mut page)?;
                        //The Start entry has no valid id, should not be recovered
                    }
                    2 => {
                        let mut entry = InsertRecord::new(0, &RecRef::new(0, 0), 0);
                        recover_entry(&mut entry)?;
                    }
                    3 => {
                        let mut entry = PrepareCommit::new();
                        recover_entry(&mut entry)?;
                    }
                    4 => {
                        let mut entry = Commit::new();
                        recover_entry(&mut entry)?;
                    }
                    5 => {
                        let mut entry = UpdateRecord::new(0, &RecRef::new(0, 0), 0, 0);
                        recover_entry(&mut entry)?;
                    }
                    6 => {
                        let mut entry = DeleteRecord::new(0, &RecRef::new(0, 0), 0);
                        recover_entry(&mut entry)?;
                    }
                    7 => {
                        let mut entry = Rollback::new();
                        recover_entry(&mut entry)?;
                    }
                    8 => {
                        let mut entry = CreateSegment::new("", 0, 0);
                        recover_entry(&mut entry)?;
                    }
                    9 => {
                        let mut entry = DropSegment::new("", 0);
                        recover_entry(&mut entry)?;
                    }
                    10 => {
                        let mut entry = ReadRecord::new(0, &RecRef::new(0, 0), 0);
                        recover_entry(&mut entry)?;
                    }
                    11 => {
                        let mut entry = Metadata::new(&TxStrategy::LastWin, Vec::new());
                        recover_entry(&mut entry)?;
                    }
                    12 => {
                        let mut entry = FreedPage::new(0);
                        recover_entry(&mut entry)?;
                    }
                    13 => {
                        let mut entry = NewSegmentPage::new(0, 0, 0);
                        recover_entry(&mut entry)?;
                    }
                    _ => panic!(" wrong log entry {} ", tp),
                };
                cur_cursor = read_size + cur_cursor + 13;
            }
        }
        jr.current = self.allocator.write_page(jr.last_page)?;
        Ok(journal_pages)
    }

    fn required_space(&self, space: u32, jr: &mut MutexGuard<JournalShared>) -> PRes<()> {
        // if there is no page or the  'current content' + 'space required' + 'end marker' is more
        // than the page, allocate new page and link the previous one
        if jr.last_pos + space + 1 >= JOURNAL_PAGE_SIZE as u32 {
            let prev = jr.last_page;
            let last_pos = jr.last_pos;
            let new_page = self.allocator.allocate(JOURNAL_PAGE_EXP)?;
            if prev != 0 {
                jr.current.seek(JOURNAL_PAGE_NEXT_OFFSET)?;
                jr.current.write_u64::<BigEndian>(new_page)?;
                jr.current.seek(last_pos)?;
                jr.current.write_u8(0)?;
                self.allocator.flush_page(&mut jr.current)?;
            }
            jr.current = self.allocator.write_page(new_page)?;
            jr.current.seek(JOURNAL_PAGE_PREV_OFFSET)?;
            jr.current.write_u64::<BigEndian>(prev)?;
            self.allocator.flush_page(&mut jr.current)?;
            jr.last_pos = JOURNAL_PAGE_CONTENT_OFFSET;
            jr.last_page = new_page;
        }
        Ok(())
    }
}

impl JournalEntry for DeleteRecord {
    fn get_type(&self) -> u8 {
        6
    }

    fn write(&self, buffer: &mut dyn Write) -> PRes<()> {
        buffer.write_u32::<BigEndian>(self.segment)?;
        buffer.write_u64::<BigEndian>(self.recref.page)?;
        buffer.write_u32::<BigEndian>(self.recref.pos)?;
        buffer.write_u16::<BigEndian>(self.version)?;
        Ok(())
    }

    fn read(&mut self, buffer: &mut dyn Read) -> PRes<u32> {
        self.segment = buffer.read_u32::<BigEndian>()?;
        self.recref.page = buffer.read_u64::<BigEndian>()?;
        self.recref.pos = buffer.read_u32::<BigEndian>()?;
        self.version = buffer.read_u16::<BigEndian>()?;
        Ok(18)
    }

    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
        tx.recover_delete(self);
        Ok(RecoverStatus::Started)
    }
}

impl JournalEntry for ReadRecord {
    fn get_type(&self) -> u8 {
        10
    }

    fn write(&self, buffer: &mut dyn Write) -> PRes<()> {
        buffer.write_u32::<BigEndian>(self.segment)?;
        buffer.write_u64::<BigEndian>(self.recref.page)?;
        buffer.write_u32::<BigEndian>(self.recref.pos)?;
        buffer.write_u16::<BigEndian>(self.version)?;
        Ok(())
    }

    fn read(&mut self, buffer: &mut dyn Read) -> PRes<u32> {
        self.segment = buffer.read_u32::<BigEndian>()?;
        self.recref.page = buffer.read_u64::<BigEndian>()?;
        self.recref.pos = buffer.read_u32::<BigEndian>()?;
        self.version = buffer.read_u16::<BigEndian>()?;
        Ok(18)
    }

    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
        tx.recover_read(self);
        Ok(RecoverStatus::Started)
    }
}
impl JournalEntry for UpdateRecord {
    fn get_type(&self) -> u8 {
        5
    }

    fn write(&self, buffer: &mut dyn Write) -> PRes<()> {
        buffer.write_u32::<BigEndian>(self.segment)?;
        buffer.write_u64::<BigEndian>(self.recref.page)?;
        buffer.write_u32::<BigEndian>(self.recref.pos)?;
        buffer.write_u64::<BigEndian>(self.record_page)?;
        buffer.write_u16::<BigEndian>(self.version)?;
        Ok(())
    }

    fn read(&mut self, buffer: &mut dyn Read) -> PRes<u32> {
        self.segment = buffer.read_u32::<BigEndian>()?;
        self.recref.page = buffer.read_u64::<BigEndian>()?;
        self.recref.pos = buffer.read_u32::<BigEndian>()?;
        self.record_page = buffer.read_u64::<BigEndian>()?;
        self.version = buffer.read_u16::<BigEndian>()?;
        Ok(26)
    }

    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
        tx.recover_update(self);
        Ok(RecoverStatus::Started)
    }
}

impl JournalEntry for InsertRecord {
    fn get_type(&self) -> u8 {
        2
    }

    fn write(&self, buffer: &mut dyn Write) -> PRes<()> {
        buffer.write_u32::<BigEndian>(self.segment)?;
        buffer.write_u64::<BigEndian>(self.recref.page)?;
        buffer.write_u32::<BigEndian>(self.recref.pos)?;
        buffer.write_u64::<BigEndian>(self.record_page)?;
        Ok(())
    }

    fn read(&mut self, buffer: &mut dyn Read) -> PRes<u32> {
        self.segment = buffer.read_u32::<BigEndian>()?;
        self.recref.page = buffer.read_u64::<BigEndian>()?;
        self.recref.pos = buffer.read_u32::<BigEndian>()?;
        self.record_page = buffer.read_u64::<BigEndian>()?;
        Ok(24)
    }

    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
        tx.recover_insert(self);
        Ok(RecoverStatus::Started)
    }
}

impl JournalEntry for PrepareCommit {
    fn get_type(&self) -> u8 {
        3
    }

    fn write(&self, _: &mut dyn Write) -> PRes<()> {
        Ok(())
    }

    fn read(&mut self, _: &mut dyn Read) -> PRes<u32> {
        Ok(0)
    }

    fn recover(&self, _: &mut Transaction) -> PRes<RecoverStatus> {
        Ok(RecoverStatus::PrepareCommit)
    }
}

impl JournalEntry for Commit {
    fn get_type(&self) -> u8 {
        4
    }

    fn write(&self, _: &mut dyn Write) -> PRes<()> {
        Ok(())
    }

    fn read(&mut self, _: &mut dyn Read) -> PRes<u32> {
        Ok(0)
    }

    fn recover(&self, _: &mut Transaction) -> PRes<RecoverStatus> {
        Ok(RecoverStatus::Commit)
    }
}

impl JournalEntry for Metadata {
    fn get_type(&self) -> u8 {
        11
    }

    fn write(&self, write: &mut dyn Write) -> PRes<()> {
        write.write_u8(self.strategy.value())?;
        let len = self.meta_id.len();
        write.write_u16::<BigEndian>(len as u16)?;
        write.write_all(&self.meta_id)?;
        Ok(())
    }

    fn read(&mut self, read: &mut dyn Read) -> PRes<u32> {
        self.strategy = TxStrategy::from_value(read.read_u8()?);
        let len = read.read_u16::<BigEndian>()?;
        let mut slice: Vec<u8> = Vec::new();
        read.take(u64::from(len)).read_to_end(&mut slice)?;
        self.meta_id = slice;
        Ok(u32::from(1 + 2 + len))
    }

    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
        tx.recover_metadata(self);
        Ok(RecoverStatus::Started)
    }
}

impl JournalEntry for Start {
    fn get_type(&self) -> u8 {
        1
    }

    fn write(&self, _: &mut dyn Write) -> PRes<()> {
        Ok(())
    }

    fn read(&mut self, _: &mut dyn Read) -> PRes<u32> {
        Ok(0)
    }

    fn recover(&self, _: &mut Transaction) -> PRes<RecoverStatus> {
        panic!("this should never be called")
    }
}

impl JournalEntry for Rollback {
    fn get_type(&self) -> u8 {
        7
    }

    fn write(&self, _: &mut dyn Write) -> PRes<()> {
        Ok(())
    }

    fn read(&mut self, _: &mut dyn Read) -> PRes<u32> {
        Ok(0)
    }
    fn recover(&self, _: &mut Transaction) -> PRes<RecoverStatus> {
        Ok(RecoverStatus::Rollback)
    }
}

impl JournalEntry for CreateSegment {
    fn get_type(&self) -> u8 {
        8
    }

    fn write(&self, buffer: &mut dyn Write) -> PRes<()> {
        buffer.write_u32::<BigEndian>(self.segment_id)?;
        buffer.write_u64::<BigEndian>(self.first_page)?;
        buffer.write_u16::<BigEndian>(self.name.len() as u16)?;
        buffer.write_all(self.name.as_bytes())?;
        Ok(())
    }

    fn read(&mut self, buffer: &mut dyn Read) -> PRes<u32> {
        self.segment_id = buffer.read_u32::<BigEndian>()?;
        self.first_page = buffer.read_u64::<BigEndian>()?;
        let string_size = buffer.read_u16::<BigEndian>()?;
        let mut slice: Vec<u8> = Vec::new();
        buffer.take(u64::from(string_size)).read_to_end(&mut slice)?;
        self.name = str::from_utf8(&slice[0..string_size as usize])?.into();
        Ok(u32::from(4 + string_size + 2 + 8))
    }

    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
        tx.recover_add(self);
        Ok(RecoverStatus::Started)
    }
}

impl JournalEntry for DropSegment {
    fn get_type(&self) -> u8 {
        9
    }

    fn write(&self, buffer: &mut dyn Write) -> PRes<()> {
        buffer.write_u32::<BigEndian>(self.segment_id)?;
        buffer.write_u16::<BigEndian>(self.name.len() as u16)?;
        buffer.write_all(self.name.as_bytes())?;
        Ok(())
    }

    fn read(&mut self, buffer: &mut dyn Read) -> PRes<u32> {
        self.segment_id = buffer.read_u32::<BigEndian>()?;
        let string_size = buffer.read_u16::<BigEndian>()?;
        let mut slice: Vec<u8> = Vec::new();
        buffer.take(u64::from(string_size)).read_to_end(&mut slice)?;
        self.name = str::from_utf8(&slice[0..string_size as usize])?.into();
        Ok(u32::from(4 + string_size + 2))
    }

    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
        tx.recover_drop(self);
        Ok(RecoverStatus::Started)
    }
}

impl JournalEntry for FreedPage {
    fn get_type(&self) -> u8 {
        12
    }

    fn write(&self, buffer: &mut dyn Write) -> PRes<()> {
        buffer.write_u64::<BigEndian>(self.page)?;
        Ok(())
    }

    fn read(&mut self, buffer: &mut dyn Read) -> PRes<u32> {
        self.page = buffer.read_u64::<BigEndian>()?;
        Ok((8) as u32)
    }

    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
        tx.recover_freed_page(self);
        Ok(RecoverStatus::Started)
    }
}

impl JournalEntry for NewSegmentPage {
    fn get_type(&self) -> u8 {
        13
    }

    fn write(&self, buffer: &mut dyn Write) -> PRes<()> {
        buffer.write_u32::<BigEndian>(self.segment)?;
        buffer.write_u64::<BigEndian>(self.page)?;
        buffer.write_u64::<BigEndian>(self.previous)?;
        Ok(())
    }

    fn read(&mut self, buffer: &mut dyn Read) -> PRes<u32> {
        self.segment = buffer.read_u32::<BigEndian>()?;
        self.page = buffer.read_u64::<BigEndian>()?;
        self.previous = buffer.read_u64::<BigEndian>()?;
        Ok((4 + 8 + 8) as u32)
    }

    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
        tx.recover_new_segment_page(self);
        Ok(RecoverStatus::Started)
    }
}
impl JournalId {
    pub fn new(page: u64, pos: u32) -> JournalId {
        JournalId { page, pos }
    }
}

impl Start {
    fn new() -> Start {
        Start {}
    }
}

#[cfg(test)]
mod tests {
    use super::{Journal, JournalEntry, JournalId, StartList};
    use crate::transaction::{
        CreateSegment, DeleteRecord, DropSegment, InsertRecord, Metadata, NewSegmentPage, ReadRecord, UpdateRecord,
    };
    use crate::{
        allocator::Allocator,
        config::{Config, TxStrategy},
        discref::DiscRef,
        id::RecRef,
    };
    use std::{
        io::{Cursor, Seek, SeekFrom},
        sync::Arc,
    };
    use tempfile::Builder;

    #[test]
    fn start_list_add_remove() {
        let mut start = StartList::new();
        start.push(&JournalId::new(1, 2));
        start.push(&JournalId::new(1, 4));
        assert!(start.remove(&JournalId::new(1, 2)));
        assert!(start.remove(&JournalId::new(1, 4)));

        start.push(&JournalId::new(1, 2));
        start.push(&JournalId::new(1, 4));
        assert!(!start.remove(&JournalId::new(1, 4)));
        assert!(start.remove(&JournalId::new(1, 2)));
    }

    #[test]
    fn start_list_add_remove_other_order() {
        let mut start = StartList::new();
        start.push(&JournalId::new(1, 1));
        start.push(&JournalId::new(1, 2));
        assert!(!start.remove(&JournalId::new(1, 2)));
        start.push(&JournalId::new(1, 3));
        assert!(!start.remove(&JournalId::new(1, 3)));
        assert!(start.remove(&JournalId::new(1, 1)));
    }

    #[test]
    fn read_write_insert_record() {
        let mut buffer = Vec::<u8>::new();
        let to_write = InsertRecord::new(10, &RecRef::new(20, 10), 3);

        to_write.write(&mut buffer).unwrap();

        let mut to_read = InsertRecord::new(0, &RecRef::new(0, 0), 0);
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
        to_read.read(&mut cursor).unwrap();
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
        assert_eq!(to_read.segment, 10);
        assert_eq!(to_read.recref.page, 20);
        assert_eq!(to_read.recref.pos, 10);
        assert_eq!(to_read.record_page, 3);
    }

    #[test]
    fn read_write_insert_read() {
        let mut buffer = Vec::<u8>::new();
        let to_write = ReadRecord::new(10, &RecRef::new(20, 10), 3);

        to_write.write(&mut buffer).unwrap();

        let mut to_read = ReadRecord::new(0, &RecRef::new(0, 0), 0);
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
        to_read.read(&mut cursor).unwrap();
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
        assert_eq!(to_read.segment, 10);
        assert_eq!(to_read.recref.page, 20);
        assert_eq!(to_read.recref.pos, 10);
        assert_eq!(to_read.version, 3);
    }

    #[test]
    fn read_write_update_record() {
        let mut buffer = Vec::<u8>::new();
        let to_write = UpdateRecord::new(10, &RecRef::new(20, 10), 3, 1);

        to_write.write(&mut buffer).unwrap();

        let mut to_read = UpdateRecord::new(0, &RecRef::new(0, 0), 0, 0);
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
        to_read.read(&mut cursor).unwrap();
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
        assert_eq!(to_read.segment, 10);
        assert_eq!(to_read.recref.page, 20);
        assert_eq!(to_read.recref.pos, 10);
        assert_eq!(to_read.record_page, 3);
        assert_eq!(to_read.version, 1);
    }

    #[test]
    fn read_write_delete_record() {
        let mut buffer = Vec::<u8>::new();
        let to_write = DeleteRecord::new(10, &RecRef::new(20, 10), 1);

        to_write.write(&mut buffer).unwrap();

        let mut to_read = DeleteRecord::new(0, &RecRef::new(0, 0), 1);
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
        to_read.read(&mut cursor).unwrap();
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
        assert_eq!(to_read.segment, 10);
        assert_eq!(to_read.recref.page, 20);
        assert_eq!(to_read.recref.pos, 10);
        assert_eq!(to_read.version, 1);
    }

    #[test]
    fn read_write_create_segment() {
        let mut buffer = Vec::<u8>::new();
        let to_write = CreateSegment::new("some", 10, 20);
        to_write.write(&mut buffer).unwrap();
        let mut to_read = CreateSegment::new("", 0, 0);
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
        to_read.read(&mut cursor).unwrap();
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
        assert_eq!(to_read.name, "some");
        assert_eq!(to_read.segment_id, 10);
        assert_eq!(to_read.first_page, 20);
    }

    #[test]
    fn read_write_drop_segment() {
        let mut buffer = Vec::<u8>::new();
        let to_write = DropSegment::new("some", 20);
        to_write.write(&mut buffer).unwrap();
        let mut to_read = DropSegment::new("", 0);
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
        to_read.read(&mut cursor).unwrap();
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
        assert_eq!(to_read.name, "some");
        assert_eq!(to_read.segment_id, 20);
    }

    #[test]
    fn read_write_metadata() {
        let mut buffer = Vec::<u8>::new();
        let meta_id = vec![10, 3];
        let to_write = Metadata::new(&TxStrategy::VersionOnWrite, meta_id.clone());
        to_write.write(&mut buffer).unwrap();
        let mut to_read = Metadata::new(&TxStrategy::LastWin, Vec::new());
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
        to_read.read(&mut cursor).unwrap();
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
        assert_eq!(to_read.strategy, TxStrategy::VersionOnWrite);
        assert_eq!(to_read.meta_id, meta_id);
    }

    #[test]
    fn read_write_new_segment_page() {
        let mut buffer = Vec::<u8>::new();
        let to_write = NewSegmentPage::new(10, 20, 30);
        to_write.write(&mut buffer).unwrap();
        let mut to_read = NewSegmentPage::new(0, 0, 0);
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
        to_read.read(&mut cursor).unwrap();
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
        assert_eq!(to_read.segment, 10);
        assert_eq!(to_read.page, 20);
        assert_eq!(to_read.previous, 30);
    }

    #[test]
    fn journal_log_and_recover() {
        let file = Builder::new()
            .prefix("journal_test")
            .suffix(".persy")
            .tempfile()
            .unwrap()
            .reopen()
            .unwrap();
        let disc = DiscRef::new(file);
        let pa = Allocator::init(&disc).unwrap();
        let allocator = Allocator::new(disc, &Arc::new(Config::new()), pa).unwrap();
        let rp = Journal::init(&allocator).unwrap();
        let journal = Journal::new(&Arc::new(allocator), rp).unwrap();
        let rec = InsertRecord::new(10, &RecRef::new(1, 1), 1);
        let id = JournalId::new(1, 20);
        journal.log(&rec, &id).unwrap();
        journal.log(&rec, &id).unwrap();
        journal.recover(|e, _| assert_eq!(e.get_type(), 2)).unwrap();
    }
}