use crate::transaction_impl::{
Cleanup, Commit, CreateSegment, DeleteRecord, DropSegment, FreedPage, InsertRecord, Metadata, NewSegmentPage,
PrepareCommit, ReadRecord, Rollback, TransactionImpl, UpdateRecord,
};
use crate::{
allocator::Allocator,
config::TxStrategy,
discref::{Page, PageOps, ReadPage, PAGE_METADATA_SIZE},
error::PERes,
flush_checksum::double_buffer_check,
id::SegmentId,
io::{
read_u64, write_u64, InfallibleRead, InfallibleReadFormat, InfallibleReadVarInt, InfallibleWrite,
InfallibleWriteFormat, InfallibleWriteVarInt,
},
RecoverStatus,
};
use std::{
collections::hash_map::HashMap,
str,
sync::{Arc, Mutex, MutexGuard},
};
pub const JOURNAL_PAGE_EXP: u8 = 10; const JOURNAL_PAGE_SIZE: u32 = (1 << JOURNAL_PAGE_EXP) - PAGE_METADATA_SIZE; const JOURNAL_PAGE_NEXT_OFFSET: u32 = 0;
const JOURNAL_PAGE_PREV_OFFSET: u32 = 8;
const JOURNAL_PAGE_CONTENT_OFFSET: u32 = 16;
const JOURNAL_ROOT_VERSION_0: u8 = 0;
const JOURNAL_ROOT_VERSION: u8 = JOURNAL_ROOT_VERSION_0;
struct StartListEntry {
next: Option<JournalId>,
prev: Option<JournalId>,
}
impl StartListEntry {
pub fn new(prev: Option<JournalId>) -> StartListEntry {
StartListEntry { next: None, prev }
}
}
struct StartList {
transactions: HashMap<JournalId, StartListEntry>,
last: Option<JournalId>,
}
impl StartList {
fn new() -> StartList {
StartList {
transactions: HashMap::new(),
last: None,
}
}
fn push(&mut self, id: &JournalId) {
self.transactions
.insert(id.clone(), StartListEntry::new(self.last.clone()));
if let Some(ref lst) = self.last {
self.transactions.get_mut(lst).unwrap().next = Some(id.clone());
}
self.last = Some(id.clone());
}
fn remove(&mut self, id: &JournalId) -> bool {
if let Some(entry) = self.transactions.remove(id) {
if let Some(ref next) = entry.next {
self.transactions.get_mut(next).unwrap().prev = entry.prev.clone();
}
if let Some(ref prev) = entry.prev {
self.transactions.get_mut(prev).unwrap().next = entry.next.clone();
}
if let Some(ref l) = self.last {
if l == id {
self.last = entry.prev.clone();
}
}
entry.prev.is_none()
} else {
false
}
}
}
struct JournalPagesToFree {
free_tx_id: JournalId,
pages: Vec<u64>,
}
struct JournalShared {
root: u64,
first_page: u64,
last_page: u64,
last_pos: u32,
starts: StartList,
current: Page,
last_flush: u8,
to_clear: Vec<JournalId>,
to_free: Option<JournalPagesToFree>,
}
pub struct Journal {
allocator: Arc<Allocator>,
journal: Mutex<JournalShared>,
}
pub(crate) trait JournalEntry {
fn get_type(&self) -> u8;
fn write(&self, buffer: &mut dyn InfallibleWrite) -> PERes<()>;
fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PERes<()>;
fn recover(&self, tx: &mut TransactionImpl) -> PERes<RecoverStatus>;
}
pub struct Start {}
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
pub struct JournalId {
page: u64,
pos: u32,
}
fn recover_entry<T>(entry: &mut dyn JournalEntry, page: &mut ReadPage, found: &mut T, id: &JournalId) -> PERes<()>
where
T: FnMut(&dyn JournalEntry, &JournalId),
{
entry.read(page)?;
found(entry, id);
Ok(())
}
impl Journal {
pub fn new(all: &Arc<Allocator>, page: u64) -> PERes<Journal> {
let mut page = all.load_page(page)?;
let journal = match page.read_u8() {
JOURNAL_ROOT_VERSION_0 => Self::new_version_0(page, all)?,
_ => panic!("version not supported"),
};
Ok(Journal {
allocator: all.clone(),
journal: Mutex::new(journal),
})
}
fn new_version_0(mut page: ReadPage, all: &Allocator) -> PERes<JournalShared> {
let first_page;
let last_flush;
let mut buffer_0 = [0; 11];
let mut buffer_1 = [0; 11];
let current;
{
page.read_exact(&mut buffer_0);
page.read_exact(&mut buffer_1);
let (last_flush_ret, first) = double_buffer_check(&buffer_0, &buffer_1);
last_flush = last_flush_ret;
first_page = read_u64(if first { &buffer_0[0..8] } else { &buffer_1[0..8] });
}
current = if first_page != 0 {
all.write_page(first_page)?
} else {
Page::new(Vec::new(), 0, 0, 0)
};
Ok(JournalShared {
root: page.get_index(),
first_page,
last_page: first_page,
last_pos: 0,
starts: StartList::new(),
current,
last_flush,
to_clear: Vec::new(),
to_free: None,
})
}
pub fn init(allocator: &Allocator) -> PERes<u64> {
let root_page = allocator.allocate(5)?;
let root_page_index = root_page.get_index();
let first_page = allocator.allocate(JOURNAL_PAGE_EXP)?;
let mut buffer = [0; 11];
write_u64(&mut buffer[0..8], first_page.get_index());
allocator.write_juornal_root(root_page, &mut buffer, JOURNAL_ROOT_VERSION, 0)?;
Ok(root_page_index)
}
pub fn start(&self) -> PERes<JournalId> {
let val = self.internal_log(&Start::new(), &JournalId::new(0, 0), false)?;
let id = JournalId::new(val.0, val.1);
self.journal.lock()?.starts.push(&id);
Ok(id)
}
pub(crate) fn prepare(&self, entry: &dyn JournalEntry, id: &JournalId) -> PERes<()> {
self.internal_log(entry, id, true)?;
Ok(())
}
pub(crate) fn end(&self, entry: &dyn JournalEntry, id: &JournalId) -> PERes<()> {
self.internal_log(entry, id, true)?;
Ok(())
}
pub fn clear_in_queque(&self) -> PERes<()> {
let mut pages_to_free = Vec::new();
{
let mut lock = self.journal.lock()?;
let ids = lock.to_clear.clone();
lock.to_clear.clear();
let mut new_first = None;
for id in ids {
if lock.starts.remove(&id) {
let first_page = lock.first_page;
let mut free_cursor = id.page;
loop {
let read;
{
let mut cur = self.allocator.load_page(free_cursor)?;
cur.seek(JOURNAL_PAGE_PREV_OFFSET);
read = cur.read_u64();
}
if free_cursor != id.page {
pages_to_free.push(free_cursor)
}
if free_cursor == first_page {
break;
}
free_cursor = read;
}
new_first = Some(id.page);
lock.first_page = id.page;
}
}
if let Some(first) = new_first {
let mut buffer = [0; 11];
write_u64(&mut buffer[0..8], first);
let root = self.allocator.write_page(lock.root)?;
lock.last_flush =
self.allocator
.write_juornal_root(root, &mut buffer, JOURNAL_ROOT_VERSION, lock.last_flush)?;
}
}
self.free_pages_tx(pages_to_free)?;
Ok(())
}
pub fn free_pages_tx(&self, pages_to_free: Vec<u64>) -> PERes<()> {
let mut prev_tx_id = None;
{
let mut lock = self.journal.lock()?;
if let Some(to_free) = &lock.to_free {
for page in &to_free.pages {
self.allocator.free(*page)?;
}
prev_tx_id = Some(to_free.free_tx_id.clone());
}
lock.to_free = None;
}
if let Some(id) = prev_tx_id {
self.finished_to_clean(&[id])?;
}
if !pages_to_free.is_empty() {
let id = self.start()?;
for page in &pages_to_free {
self.log(&FreedPage::new(*page), &id)?;
}
self.log(&PrepareCommit::new(), &id)?;
self.end(&Commit::new(), &id)?;
let mut lock = self.journal.lock()?;
lock.to_free = Some(JournalPagesToFree {
free_tx_id: id,
pages: pages_to_free,
});
}
Ok(())
}
pub fn finished_to_clean(&self, ids: &[JournalId]) -> PERes<()> {
for id in ids {
self.log(&Cleanup::new(), id)?;
}
let mut lock = self.journal.lock()?;
lock.to_clear.extend_from_slice(ids);
Ok(())
}
pub fn cleaned_to_trim(&self, ids: &[JournalId]) -> PERes<()> {
let mut lock = self.journal.lock()?;
lock.to_clear.extend_from_slice(ids);
Ok(())
}
pub(crate) fn log(&self, entry: &dyn JournalEntry, id: &JournalId) -> PERes<()> {
self.internal_log(entry, id, false)?;
Ok(())
}
fn internal_log(&self, entry: &dyn JournalEntry, id: &JournalId, flush: bool) -> PERes<(u64, u32)> {
let mut buffer = Vec::<u8>::new();
buffer.write_u8(entry.get_type());
buffer.write_varint_u64(id.page);
buffer.write_varint_u32(id.pos);
entry.write(&mut buffer)?;
let cur_page;
let cur_pos;
{
let mut jr = self.journal.lock()?;
self.required_space(buffer.len() as u32, &mut jr)?;
cur_page = jr.last_page;
cur_pos = jr.last_pos;
jr.current.seek(cur_pos);
jr.current.write_all(&*buffer);
if flush {
self.allocator.flush_page(jr.current.clone())?;
}
jr.last_pos += buffer.len() as u32;
}
Ok((cur_page, cur_pos))
}
pub(crate) fn recover<T>(&self, mut found: T) -> PERes<()>
where
T: FnMut(&dyn JournalEntry, &JournalId),
{
let mut jr = self.journal.lock()?;
let mut cur_page = jr.first_page;
jr.last_page = jr.first_page;
let mut page = self.allocator.load_page(cur_page)?;
page.seek(JOURNAL_PAGE_CONTENT_OFFSET);
loop {
let cursor_pos = page.cursor_pos();
let tp = page.read_u8();
if tp == 0 {
let last_pos = page.cursor_pos() as u32;
page.seek(JOURNAL_PAGE_NEXT_OFFSET);
cur_page = page.read_u64();
if cur_page == 0 {
jr.last_pos = last_pos - 1;
break;
}
page = self.allocator.load_page(cur_page)?;
page.seek(JOURNAL_PAGE_CONTENT_OFFSET);
jr.last_page = cur_page;
} else {
let page_id = page.read_varint_u64();
let pos = page.read_varint_u32();
let id = JournalId::new(page_id, pos);
let ref_page = &mut page;
let ref_found = &mut found;
match tp {
1 => {
Start::new().read(&mut page)?;
let tx_id = JournalId::new(jr.last_page, cursor_pos as u32);
jr.starts.push(&tx_id);
}
2 => recover_entry(&mut InsertRecord::default(), ref_page, ref_found, &id)?,
3 => recover_entry(&mut PrepareCommit::default(), ref_page, ref_found, &id)?,
4 => recover_entry(&mut Commit::default(), ref_page, ref_found, &id)?,
5 => recover_entry(&mut UpdateRecord::default(), ref_page, ref_found, &id)?,
6 => recover_entry(&mut DeleteRecord::default(), ref_page, ref_found, &id)?,
7 => recover_entry(&mut Rollback::default(), ref_page, ref_found, &id)?,
8 => recover_entry(&mut CreateSegment::default(), ref_page, ref_found, &id)?,
9 => recover_entry(&mut DropSegment::default(), ref_page, ref_found, &id)?,
10 => recover_entry(&mut ReadRecord::default(), ref_page, ref_found, &id)?,
11 => recover_entry(&mut Metadata::default(), ref_page, ref_found, &id)?,
12 => recover_entry(&mut FreedPage::default(), ref_page, ref_found, &id)?,
13 => recover_entry(&mut NewSegmentPage::default(), ref_page, ref_found, &id)?,
14 => recover_entry(&mut Cleanup::default(), ref_page, ref_found, &id)?,
_ => panic!(" wrong log entry {} ", tp),
};
}
}
jr.current = self.allocator.write_page(jr.last_page)?;
Ok(())
}
fn required_space(&self, space: u32, jr: &mut MutexGuard<JournalShared>) -> PERes<()> {
if jr.last_pos + space + 1 >= JOURNAL_PAGE_SIZE as u32 {
let prev = jr.last_page;
let last_pos = jr.last_pos;
let new_page = self.allocator.allocate(JOURNAL_PAGE_EXP)?;
if prev != 0 {
jr.current.seek(JOURNAL_PAGE_NEXT_OFFSET);
jr.current.write_u64(new_page.get_index());
jr.current.seek(last_pos);
jr.current.write_u8(0);
self.allocator.flush_page(jr.current.clone())?;
}
jr.last_page = new_page.get_index();
jr.current = new_page;
jr.current.seek(JOURNAL_PAGE_PREV_OFFSET);
jr.current.write_u64(prev);
self.allocator.flush_page(jr.current.clone())?;
jr.last_pos = JOURNAL_PAGE_CONTENT_OFFSET;
}
Ok(())
}
}
impl JournalEntry for DeleteRecord {
fn get_type(&self) -> u8 {
6
}
fn write(&self, buffer: &mut dyn InfallibleWrite) -> PERes<()> {
self.segment.write_varint(buffer);
buffer.write_varint_u64(self.recref.page);
buffer.write_varint_u32(self.recref.pos);
buffer.write_varint_u16(self.version);
Ok(())
}
fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PERes<()> {
self.segment = SegmentId::read_varint(buffer);
self.recref.page = buffer.read_varint_u64();
self.recref.pos = buffer.read_varint_u32();
self.version = buffer.read_varint_u16();
Ok(())
}
fn recover(&self, tx: &mut TransactionImpl) -> PERes<RecoverStatus> {
tx.recover_delete(self);
Ok(RecoverStatus::Started)
}
}
impl JournalEntry for ReadRecord {
fn get_type(&self) -> u8 {
10
}
fn write(&self, buffer: &mut dyn InfallibleWrite) -> PERes<()> {
self.segment.write_varint(buffer);
buffer.write_varint_u64(self.recref.page);
buffer.write_varint_u32(self.recref.pos);
buffer.write_varint_u16(self.version);
Ok(())
}
fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PERes<()> {
self.segment = SegmentId::read_varint(buffer);
self.recref.page = buffer.read_varint_u64();
self.recref.pos = buffer.read_varint_u32();
self.version = buffer.read_varint_u16();
Ok(())
}
fn recover(&self, tx: &mut TransactionImpl) -> PERes<RecoverStatus> {
tx.recover_read(self);
Ok(RecoverStatus::Started)
}
}
impl JournalEntry for UpdateRecord {
fn get_type(&self) -> u8 {
5
}
fn write(&self, buffer: &mut dyn InfallibleWrite) -> PERes<()> {
self.segment.write_varint(buffer);
buffer.write_varint_u64(self.recref.page);
buffer.write_varint_u32(self.recref.pos);
buffer.write_varint_u64(self.record_page);
buffer.write_varint_u16(self.version);
Ok(())
}
fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PERes<()> {
self.segment = SegmentId::read_varint(buffer);
self.recref.page = buffer.read_varint_u64();
self.recref.pos = buffer.read_varint_u32();
self.record_page = buffer.read_varint_u64();
self.version = buffer.read_varint_u16();
Ok(())
}
fn recover(&self, tx: &mut TransactionImpl) -> PERes<RecoverStatus> {
tx.recover_update(self);
Ok(RecoverStatus::Started)
}
}
impl JournalEntry for InsertRecord {
fn get_type(&self) -> u8 {
2
}
fn write(&self, buffer: &mut dyn InfallibleWrite) -> PERes<()> {
self.segment.write_varint(buffer);
buffer.write_varint_u64(self.recref.page);
buffer.write_varint_u32(self.recref.pos);
buffer.write_varint_u64(self.record_page);
Ok(())
}
fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PERes<()> {
self.segment = SegmentId::read_varint(buffer);
self.recref.page = buffer.read_varint_u64();
self.recref.pos = buffer.read_varint_u32();
self.record_page = buffer.read_varint_u64();
Ok(())
}
fn recover(&self, tx: &mut TransactionImpl) -> PERes<RecoverStatus> {
tx.recover_insert(self);
Ok(RecoverStatus::Started)
}
}
impl JournalEntry for PrepareCommit {
fn get_type(&self) -> u8 {
3
}
fn write(&self, _: &mut dyn InfallibleWrite) -> PERes<()> {
Ok(())
}
fn read(&mut self, _: &mut dyn InfallibleRead) -> PERes<()> {
Ok(())
}
fn recover(&self, _: &mut TransactionImpl) -> PERes<RecoverStatus> {
Ok(RecoverStatus::PrepareCommit)
}
}
impl JournalEntry for Commit {
fn get_type(&self) -> u8 {
4
}
fn write(&self, _: &mut dyn InfallibleWrite) -> PERes<()> {
Ok(())
}
fn read(&mut self, _: &mut dyn InfallibleRead) -> PERes<()> {
Ok(())
}
fn recover(&self, _: &mut TransactionImpl) -> PERes<RecoverStatus> {
Ok(RecoverStatus::Commit)
}
}
impl JournalEntry for Cleanup {
fn get_type(&self) -> u8 {
14
}
fn write(&self, _: &mut dyn InfallibleWrite) -> PERes<()> {
Ok(())
}
fn read(&mut self, _: &mut dyn InfallibleRead) -> PERes<()> {
Ok(())
}
fn recover(&self, _: &mut TransactionImpl) -> PERes<RecoverStatus> {
Ok(RecoverStatus::Cleanup)
}
}
impl JournalEntry for Metadata {
fn get_type(&self) -> u8 {
11
}
fn write(&self, write: &mut dyn InfallibleWrite) -> PERes<()> {
write.write_varint_u8(self.strategy.value());
let len = self.meta_id.len();
write.write_varint_u16(len as u16);
write.write_all(&self.meta_id);
Ok(())
}
fn read(&mut self, read: &mut dyn InfallibleRead) -> PERes<()> {
self.strategy = TxStrategy::from_value(read.read_varint_u8());
let len = read.read_varint_u16();
let mut slice: Vec<u8> = vec![0; len as usize];
read.read_exact(&mut slice[0..len as usize]);
self.meta_id = slice;
Ok(())
}
fn recover(&self, tx: &mut TransactionImpl) -> PERes<RecoverStatus> {
tx.recover_metadata(self);
Ok(RecoverStatus::Started)
}
}
impl JournalEntry for Start {
fn get_type(&self) -> u8 {
1
}
fn write(&self, _: &mut dyn InfallibleWrite) -> PERes<()> {
Ok(())
}
fn read(&mut self, _: &mut dyn InfallibleRead) -> PERes<()> {
Ok(())
}
fn recover(&self, _: &mut TransactionImpl) -> PERes<RecoverStatus> {
panic!("this should never be called")
}
}
impl JournalEntry for Rollback {
fn get_type(&self) -> u8 {
7
}
fn write(&self, _: &mut dyn InfallibleWrite) -> PERes<()> {
Ok(())
}
fn read(&mut self, _: &mut dyn InfallibleRead) -> PERes<()> {
Ok(())
}
fn recover(&self, _: &mut TransactionImpl) -> PERes<RecoverStatus> {
Ok(RecoverStatus::Rollback)
}
}
impl JournalEntry for CreateSegment {
fn get_type(&self) -> u8 {
8
}
fn write(&self, buffer: &mut dyn InfallibleWrite) -> PERes<()> {
self.segment_id.write_varint(buffer);
buffer.write_varint_u64(self.first_page);
buffer.write_varint_u16(self.name.len() as u16);
buffer.write_all(self.name.as_bytes());
Ok(())
}
fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PERes<()> {
self.segment_id = SegmentId::read_varint(buffer);
self.first_page = buffer.read_varint_u64();
let string_size = buffer.read_varint_u16();
let mut slice: Vec<u8> = vec![0; string_size as usize];
buffer.read_exact(&mut slice[0..string_size as usize]);
self.name = str::from_utf8(&slice[0..string_size as usize])?.into();
Ok(())
}
fn recover(&self, tx: &mut TransactionImpl) -> PERes<RecoverStatus> {
tx.recover_add(self);
Ok(RecoverStatus::Started)
}
}
impl JournalEntry for DropSegment {
fn get_type(&self) -> u8 {
9
}
fn write(&self, buffer: &mut dyn InfallibleWrite) -> PERes<()> {
self.segment_id.write_varint(buffer);
buffer.write_varint_u16(self.name.len() as u16);
buffer.write_all(self.name.as_bytes());
Ok(())
}
fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PERes<()> {
self.segment_id = SegmentId::read_varint(buffer);
let string_size = buffer.read_varint_u16();
let mut slice: Vec<u8> = vec![0; string_size as usize];
buffer.read_exact(&mut slice[0..string_size as usize]);
self.name = str::from_utf8(&slice[0..string_size as usize])?.into();
Ok(())
}
fn recover(&self, tx: &mut TransactionImpl) -> PERes<RecoverStatus> {
tx.recover_drop(self);
Ok(RecoverStatus::Started)
}
}
impl JournalEntry for FreedPage {
fn get_type(&self) -> u8 {
12
}
fn write(&self, buffer: &mut dyn InfallibleWrite) -> PERes<()> {
buffer.write_varint_u64(self.page);
Ok(())
}
fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PERes<()> {
self.page = buffer.read_varint_u64();
Ok(())
}
fn recover(&self, tx: &mut TransactionImpl) -> PERes<RecoverStatus> {
tx.recover_freed_page(self);
Ok(RecoverStatus::Started)
}
}
impl JournalEntry for NewSegmentPage {
fn get_type(&self) -> u8 {
13
}
fn write(&self, buffer: &mut dyn InfallibleWrite) -> PERes<()> {
self.segment.write_varint(buffer);
buffer.write_varint_u64(self.page);
buffer.write_varint_u64(self.previous);
Ok(())
}
fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PERes<()> {
self.segment = SegmentId::read_varint(buffer);
self.page = buffer.read_varint_u64();
self.previous = buffer.read_varint_u64();
Ok(())
}
fn recover(&self, tx: &mut TransactionImpl) -> PERes<RecoverStatus> {
tx.recover_new_segment_page(self);
Ok(RecoverStatus::Started)
}
}
impl JournalId {
pub fn new(page: u64, pos: u32) -> JournalId {
JournalId { page, pos }
}
}
impl Start {
fn new() -> Start {
Start {}
}
}
#[cfg(test)]
mod tests {
use super::{Journal, JournalEntry, JournalId, StartList};
use crate::transaction_impl::{
CreateSegment, DeleteRecord, DropSegment, InsertRecord, Metadata, NewSegmentPage, ReadRecord, UpdateRecord,
};
use crate::{
allocator::Allocator,
config::{Config, TxStrategy},
discref::DiscRef,
id::{RecRef, SegmentId},
io::ArcSliceRead,
};
use std::sync::Arc;
use tempfile::Builder;
#[test]
fn start_list_add_remove() {
let mut start = StartList::new();
start.push(&JournalId::new(1, 2));
start.push(&JournalId::new(1, 4));
assert!(start.remove(&JournalId::new(1, 2)));
assert!(start.remove(&JournalId::new(1, 4)));
start.push(&JournalId::new(1, 2));
start.push(&JournalId::new(1, 4));
assert!(!start.remove(&JournalId::new(1, 4)));
assert!(start.remove(&JournalId::new(1, 2)));
}
#[test]
fn start_list_add_remove_other_order() {
let mut start = StartList::new();
start.push(&JournalId::new(1, 1));
start.push(&JournalId::new(1, 2));
assert!(!start.remove(&JournalId::new(1, 2)));
start.push(&JournalId::new(1, 3));
assert!(!start.remove(&JournalId::new(1, 3)));
assert!(start.remove(&JournalId::new(1, 1)));
}
#[test]
fn read_write_insert_record() {
let mut buffer = Vec::<u8>::new();
let seg_id = SegmentId::new(10);
let to_write = InsertRecord::new(seg_id, &RecRef::new(20, 10), 3);
to_write.write(&mut buffer).unwrap();
let mut to_read = InsertRecord::default();
let len = buffer.len();
let mut reader = ArcSliceRead::new_vec(buffer);
to_read.read(&mut reader).unwrap();
assert_eq!(reader.cursor(), len);
assert_eq!(to_read.segment, seg_id);
assert_eq!(to_read.recref.page, 20);
assert_eq!(to_read.recref.pos, 10);
assert_eq!(to_read.record_page, 3);
}
#[test]
fn read_write_insert_read() {
let mut buffer = Vec::<u8>::new();
let seg_id = SegmentId::new(10);
let to_write = ReadRecord::new(seg_id, &RecRef::new(20, 10), 3);
to_write.write(&mut buffer).unwrap();
let mut to_read = ReadRecord::default();
let len = buffer.len();
let mut reader = ArcSliceRead::new_vec(buffer);
to_read.read(&mut reader).unwrap();
assert_eq!(reader.cursor(), len);
assert_eq!(to_read.segment, seg_id);
assert_eq!(to_read.recref.page, 20);
assert_eq!(to_read.recref.pos, 10);
assert_eq!(to_read.version, 3);
}
#[test]
fn read_write_update_record() {
let mut buffer = Vec::<u8>::new();
let seg_id = SegmentId::new(10);
let to_write = UpdateRecord::new(seg_id, &RecRef::new(20, 10), 3, 1);
to_write.write(&mut buffer).unwrap();
let mut to_read = UpdateRecord::default();
let len = buffer.len();
let mut reader = ArcSliceRead::new_vec(buffer);
to_read.read(&mut reader).unwrap();
assert_eq!(reader.cursor(), len);
assert_eq!(to_read.segment, seg_id);
assert_eq!(to_read.recref.page, 20);
assert_eq!(to_read.recref.pos, 10);
assert_eq!(to_read.record_page, 3);
assert_eq!(to_read.version, 1);
}
#[test]
fn read_write_delete_record() {
let mut buffer = Vec::<u8>::new();
let seg_id = SegmentId::new(10);
let to_write = DeleteRecord::new(seg_id, &RecRef::new(20, 10), 1);
to_write.write(&mut buffer).unwrap();
let mut to_read = DeleteRecord::default();
let len = buffer.len();
let mut reader = ArcSliceRead::new_vec(buffer);
to_read.read(&mut reader).unwrap();
assert_eq!(reader.cursor(), len);
assert_eq!(to_read.segment, seg_id);
assert_eq!(to_read.recref.page, 20);
assert_eq!(to_read.recref.pos, 10);
assert_eq!(to_read.version, 1);
}
#[test]
fn read_write_create_segment() {
let mut buffer = Vec::<u8>::new();
let seg_id = SegmentId::new(10);
let to_write = CreateSegment::new("some", seg_id, 20);
to_write.write(&mut buffer).unwrap();
let mut to_read = CreateSegment::default();
let len = buffer.len();
let mut reader = ArcSliceRead::new_vec(buffer);
to_read.read(&mut reader).unwrap();
assert_eq!(reader.cursor(), len);
assert_eq!(to_read.name, "some");
assert_eq!(to_read.segment_id, seg_id);
assert_eq!(to_read.first_page, 20);
}
#[test]
fn read_write_drop_segment() {
let mut buffer = Vec::<u8>::new();
let seg_id = SegmentId::new(20);
let to_write = DropSegment::new("some", seg_id);
to_write.write(&mut buffer).unwrap();
let mut to_read = DropSegment::default();
let len = buffer.len();
let mut reader = ArcSliceRead::new_vec(buffer);
to_read.read(&mut reader).unwrap();
assert_eq!(reader.cursor(), len);
assert_eq!(to_read.name, "some");
assert_eq!(to_read.segment_id, seg_id);
}
#[test]
fn read_write_metadata() {
let mut buffer = Vec::<u8>::new();
let meta_id = vec![10, 3];
let to_write = Metadata::new(&TxStrategy::VersionOnWrite, meta_id.clone());
to_write.write(&mut buffer).unwrap();
let mut to_read = Metadata::new(&TxStrategy::LastWin, Vec::new());
let len = buffer.len();
let mut reader = ArcSliceRead::new_vec(buffer);
to_read.read(&mut reader).unwrap();
assert_eq!(reader.cursor(), len);
assert_eq!(to_read.strategy, TxStrategy::VersionOnWrite);
assert_eq!(to_read.meta_id, meta_id);
}
#[test]
fn read_write_new_segment_page() {
let mut buffer = Vec::<u8>::new();
let seg_id = SegmentId::new(10);
let to_write = NewSegmentPage::new(seg_id, 20, 30);
to_write.write(&mut buffer).unwrap();
let mut to_read = NewSegmentPage::default();
let len = buffer.len();
let mut reader = ArcSliceRead::new_vec(buffer);
to_read.read(&mut reader).unwrap();
assert_eq!(reader.cursor(), len);
assert_eq!(to_read.segment, seg_id);
assert_eq!(to_read.page, 20);
assert_eq!(to_read.previous, 30);
}
#[test]
fn journal_log_and_recover() {
let file = Builder::new()
.prefix("journal_test")
.suffix(".persy")
.tempfile()
.unwrap()
.reopen()
.unwrap();
let disc = Box::new(DiscRef::new(file).unwrap());
let (_, allocator) = Allocator::init(disc, &Config::new()).unwrap();
let rp = Journal::init(&allocator).unwrap();
let journal = Journal::new(&Arc::new(allocator), rp).unwrap();
let seg_id = SegmentId::new(10);
let rec = InsertRecord::new(seg_id, &RecRef::new(1, 1), 1);
let id = JournalId::new(1, 20);
journal.log(&rec, &id).unwrap();
journal.log(&rec, &id).unwrap();
journal.recover(|e, _| assert_eq!(e.get_type(), 2)).unwrap();
}
}