use config::TxStrategy;
use persy::{RecRef, PRes, RecoverStatus};
use byteorder::{ReadBytesExt, WriteBytesExt, BigEndian};
use allocator::Allocator;
use transaction::{InsertRecord, PrepareCommit, Commit, UpdateRecord, DeleteRecord, Rollback, CreateSegment, DropSegment, Transaction, ReadRecord};
use std::collections::hash_map::HashMap;
use std::io::{Write, Read, Cursor};
use std::sync::{Mutex, Arc};
use discref::{Page, PageSeek};
use std::str;
const JOURNAL_PAGE_EXP: u8 = 10; const JOURNAL_PAGE_SIZE: u32 = (1 << JOURNAL_PAGE_EXP) - 2; const JOURNAL_PAGE_NEXT_OFFSET: u32 = 0;
const JOURNAL_PAGE_PREV_OFFSET: u32 = 8;
const JOURNAL_PAGE_CONTENT_OFFSET: u32 = 16;
struct StartListEntry {
next: Option<JournalId>,
prev: Option<JournalId>,
}
impl StartListEntry {
pub fn new(prev: Option<JournalId>) -> StartListEntry {
StartListEntry {
next: None,
prev: prev,
}
}
}
struct StartList {
transactions: HashMap<JournalId, StartListEntry>,
last: Option<JournalId>,
}
impl StartList {
fn new() -> StartList {
StartList {
transactions: HashMap::new(),
last: None,
}
}
fn push(&mut self, id: &JournalId) {
self.transactions.insert(id.clone(), StartListEntry::new(self.last.clone()));
if let Some(ref lst) = self.last {
self.transactions.get_mut(lst).unwrap().next = Some(id.clone());
}
self.last = Some(id.clone());
}
fn remove(&mut self, id: &JournalId) -> bool {
let entry = self.transactions.remove(id).unwrap();
if let Some(ref next) = entry.next {
self.transactions.get_mut(next).unwrap().prev = entry.prev.clone();
} else {
self.last = None;
}
if let Some(ref prev) = entry.prev {
self.transactions.get_mut(prev).unwrap().next = entry.next.clone();
}
entry.prev.is_none()
}
}
struct JournalShared {
root: u64,
first_page: u64,
last_page: u64,
last_pos: u32,
starts: StartList,
current: Page,
}
pub struct Journal {
allocator: Arc<Allocator>,
journal: Mutex<JournalShared>,
}
pub trait JournalEntry {
fn get_type(&self) -> u8;
fn write(&self, buffer: &mut Write) -> PRes<()>;
fn read(&mut self, buffer: &mut Read) -> PRes<u32>;
fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus>;
}
pub struct Start {
strategy: TxStrategy,
}
#[derive(Hash,Eq,PartialEq,Clone)]
pub struct JournalId {
page: u64,
pos: u32,
}
impl Journal {
pub fn new(all: &Arc<Allocator>, page: u64) -> PRes<Journal> {
let first_page;
let last_page;
let current;
{
let mut page = all.load_page(page)?;
first_page = page.read_u64::<BigEndian>()?;
last_page = page.read_u64::<BigEndian>()?;
}
if last_page != 0 {
current = all.write_page(last_page)?;
} else {
current = Page::new(Cursor::new(Vec::new()), 0, 0);
}
let journal = JournalShared {
root: page,
first_page: first_page,
last_page: last_page,
last_pos: 0,
starts: StartList::new(),
current: current,
};
Ok(Journal {
allocator: all.clone(),
journal: Mutex::new(journal),
})
}
pub fn init(allocator: &Allocator) -> PRes<u64> {
let root_page = allocator.allocate(5)?;
let first_page = allocator.allocate(JOURNAL_PAGE_EXP)?;
{
let mut page = allocator.write_page(root_page)?;
page.write_u64::<BigEndian>(first_page)?;
page.write_u64::<BigEndian>(first_page)?;
allocator.flush_page(&mut page)?;
}
Ok(root_page)
}
pub fn start(&self, strategy: &TxStrategy) -> PRes<JournalId> {
let val = self.internal_log(&Start::new(strategy), &JournalId::new(0, 0), false)?;
let id = JournalId::new(val.0, val.1);
self.journal.lock()?.starts.push(&id);
Ok(id)
}
pub fn prepare(&self, entry: &JournalEntry, id: &JournalId) -> PRes<()> {
self.internal_log(entry, id, true)?;
Ok(())
}
pub fn end(&self, entry: &JournalEntry, id: &JournalId) -> PRes<bool> {
self.internal_log(entry, id, true)?;
Ok(self.journal.lock()?.starts.remove(id))
}
pub fn clear(&self, id: &JournalId) -> PRes<()> {
let mut lock = self.journal.lock()?;
let first_page = lock.first_page;
let mut free_cursor = id.page;
loop {
let read;
{
let mut cur = self.allocator.load_page(free_cursor)?;
cur.seek(JOURNAL_PAGE_PREV_OFFSET)?;
read = cur.read_u64::<BigEndian>()?;
}
if free_cursor != id.page {
self.allocator.free(free_cursor)?;
}
if free_cursor == first_page {
break;
}
free_cursor = read;
}
let mut root_page = self.allocator.write_page(lock.root)?;
root_page.write_u64::<BigEndian>(id.page)?;
self.allocator.flush_page(&mut root_page)?;
lock.first_page = id.page;
Ok(())
}
pub fn log(&self, entry: &JournalEntry, id: &JournalId) -> PRes<()> {
self.internal_log(entry, id, false)?;
Ok(())
}
fn internal_log(&self, entry: &JournalEntry, id: &JournalId, flush: bool) -> PRes<(u64, u32)> {
let mut buffer = Vec::<u8>::new();
buffer.write_u8(entry.get_type())?;
buffer.write_u64::<BigEndian>(id.page)?;
buffer.write_u32::<BigEndian>(id.pos)?;
entry.write(&mut buffer)?;
self.required_space(buffer.len() as u32)?;
let cur_page;
let cur_pos;
{
let mut jr = self.journal.lock()?;
cur_page = jr.last_page;
cur_pos = jr.last_pos;
jr.current.seek(cur_pos)?;
jr.current.write(&*buffer)?;
if flush {
self.allocator.flush_page(&mut jr.current)?;
}
jr.last_pos = jr.last_pos + buffer.len() as u32;
}
Ok((cur_page, cur_pos))
}
pub fn recover<T>(&self, mut found: T) -> PRes<()>
where T: FnMut(&JournalEntry, &JournalId)
{
let mut jr = self.journal.lock()?;
let mut cur_page = jr.first_page;
let mut cur_cursor = JOURNAL_PAGE_CONTENT_OFFSET;
loop {
{
let mut page = self.allocator.load_page(cur_page)?;
page.seek(cur_cursor)?;
let tp = page.read_u8()?;
if tp == 0 {
page.seek(JOURNAL_PAGE_NEXT_OFFSET)?;
cur_page = page.read_u64::<BigEndian>()?;
if cur_page == 0 {
jr.last_pos = cur_cursor;
break;
}
cur_cursor = JOURNAL_PAGE_CONTENT_OFFSET;
} else {
let read_size;
let page_id = page.read_u64::<BigEndian>()?;
let pos = page.read_u32::<BigEndian>()?;
match tp {
1 => {
let mut entry = Start::new(&TxStrategy::LastWin);
read_size = entry.read(&mut page)?;
found(&entry, &JournalId::new(page_id, pos));
}
2 => {
let mut entry = InsertRecord::new(0, &RecRef::new(0, 0), 0);
read_size = entry.read(&mut page)?;
found(&entry, &JournalId::new(page_id, pos));
}
3 => {
let mut entry = PrepareCommit::new();
read_size = entry.read(&mut page)?;
found(&entry, &JournalId::new(page_id, pos));
}
4 => {
let mut entry = Commit::new();
read_size = entry.read(&mut page)?;
found(&entry, &JournalId::new(page_id, pos));
}
5 => {
let mut entry = UpdateRecord::new(0, &RecRef::new(0, 0), 0, 0, 0);
read_size = entry.read(&mut page)?;
found(&entry, &JournalId::new(page_id, pos));
}
6 => {
let mut entry = DeleteRecord::new(0, &RecRef::new(0, 0), 0, 0);
read_size = entry.read(&mut page)?;
found(&entry, &JournalId::new(page_id, pos));
}
7 => {
let mut entry = Rollback::new();
read_size = entry.read(&mut page)?;
found(&entry, &JournalId::new(page_id, pos));
}
8 => {
let mut entry = CreateSegment::new("", 0);
read_size = entry.read(&mut page)?;
found(&entry, &JournalId::new(page_id, pos));
}
9 => {
let mut entry = DropSegment::new("", 0);
read_size = entry.read(&mut page)?;
found(&entry, &JournalId::new(page_id, pos));
}
10 => {
let mut entry = ReadRecord::new(0, &RecRef::new(0, 0), 0);
read_size = entry.read(&mut page)?;
found(&entry, &JournalId::new(page_id, pos));
}
_ => panic!(" wrong log entry {} ", tp),
};
cur_cursor = read_size + cur_cursor + 13;
}
}
}
Ok(())
}
fn required_space(&self, space: u32) -> PRes<()> {
let mut jr = self.journal.lock()?;
if jr.last_pos == 0 || jr.last_pos + space + 1 >= JOURNAL_PAGE_SIZE as u32 {
let prev = jr.last_page;
jr.last_page = self.allocator.allocate(JOURNAL_PAGE_EXP)?;
if prev != 0 {
let last_page = jr.last_page;
let last_pos = jr.last_pos;
jr.current.seek(JOURNAL_PAGE_NEXT_OFFSET)?;
jr.current.write_u64::<BigEndian>(last_page)?;
jr.current.seek(last_pos)?;
jr.current.write_u8(0)?;
self.allocator.flush_page(&mut jr.current)?;
let mut page = self.allocator.write_page(jr.root)?;
page.seek(8)?;
page.write_u64::<BigEndian>(jr.last_page)?;
self.allocator.flush_page(&mut page)?;
} else {
let mut page = self.allocator.write_page(jr.root)?;
page.write_u64::<BigEndian>(jr.last_page)?;
page.write_u64::<BigEndian>(jr.last_page)?;
self.allocator.flush_page(&mut page)?;
jr.first_page = jr.last_page;
}
jr.current = self.allocator.write_page(jr.last_page)?;
jr.current.reset()?;
jr.current.seek(JOURNAL_PAGE_PREV_OFFSET)?;
jr.current.write_u64::<BigEndian>(prev)?;
self.allocator.flush_page(&mut jr.current)?;
jr.last_pos = JOURNAL_PAGE_CONTENT_OFFSET;
}
Ok(())
}
}
impl JournalEntry for DeleteRecord {
fn get_type(&self) -> u8 {
6
}
fn write(&self, buffer: &mut Write) -> PRes<()> {
buffer.write_u32::<BigEndian>(self.segment)?;
buffer.write_u64::<BigEndian>(self.recref.page)?;
buffer.write_u32::<BigEndian>(self.recref.pos)?;
buffer.write_u64::<BigEndian>(self.record_old_page)?;
buffer.write_u16::<BigEndian>(self.version)?;
Ok(())
}
fn read(&mut self, buffer: &mut Read) -> PRes<u32> {
self.segment = buffer.read_u32::<BigEndian>()?;
self.recref.page = buffer.read_u64::<BigEndian>()?;
self.recref.pos = buffer.read_u32::<BigEndian>()?;
self.record_old_page = buffer.read_u64::<BigEndian>()?;
self.version = buffer.read_u16::<BigEndian>()?;
Ok(26)
}
fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
tx.recover_delete(self);
Ok(RecoverStatus::Started)
}
}
impl JournalEntry for ReadRecord {
fn get_type(&self) -> u8 {
10
}
fn write(&self, buffer: &mut Write) -> PRes<()> {
buffer.write_u32::<BigEndian>(self.segment)?;
buffer.write_u64::<BigEndian>(self.recref.page)?;
buffer.write_u32::<BigEndian>(self.recref.pos)?;
buffer.write_u16::<BigEndian>(self.version)?;
Ok(())
}
fn read(&mut self, buffer: &mut Read) -> PRes<u32> {
self.segment = buffer.read_u32::<BigEndian>()?;
self.recref.page = buffer.read_u64::<BigEndian>()?;
self.recref.pos = buffer.read_u32::<BigEndian>()?;
self.version = buffer.read_u16::<BigEndian>()?;
Ok(18)
}
fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
tx.recover_read(self);
Ok(RecoverStatus::Started)
}
}
impl JournalEntry for UpdateRecord {
fn get_type(&self) -> u8 {
5
}
fn write(&self, buffer: &mut Write) -> PRes<()> {
buffer.write_u32::<BigEndian>(self.segment)?;
buffer.write_u64::<BigEndian>(self.recref.page)?;
buffer.write_u32::<BigEndian>(self.recref.pos)?;
buffer.write_u64::<BigEndian>(self.record_page)?;
buffer.write_u64::<BigEndian>(self.record_old_page)?;
buffer.write_u16::<BigEndian>(self.version)?;
Ok(())
}
fn read(&mut self, buffer: &mut Read) -> PRes<u32> {
self.segment = buffer.read_u32::<BigEndian>()?;
self.recref.page = buffer.read_u64::<BigEndian>()?;
self.recref.pos = buffer.read_u32::<BigEndian>()?;
self.record_page = buffer.read_u64::<BigEndian>()?;
self.record_old_page = buffer.read_u64::<BigEndian>()?;
self.version = buffer.read_u16::<BigEndian>()?;
Ok(34)
}
fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
tx.recover_update(self);
Ok(RecoverStatus::Started)
}
}
impl JournalEntry for InsertRecord {
fn get_type(&self) -> u8 {
2
}
fn write(&self, buffer: &mut Write) -> PRes<()> {
buffer.write_u32::<BigEndian>(self.segment)?;
buffer.write_u64::<BigEndian>(self.recref.page)?;
buffer.write_u32::<BigEndian>(self.recref.pos)?;
buffer.write_u64::<BigEndian>(self.record_page)?;
Ok(())
}
fn read(&mut self, buffer: &mut Read) -> PRes<u32> {
self.segment = buffer.read_u32::<BigEndian>()?;
self.recref.page = buffer.read_u64::<BigEndian>()?;
self.recref.pos = buffer.read_u32::<BigEndian>()?;
self.record_page = buffer.read_u64::<BigEndian>()?;
Ok(24)
}
fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
tx.recover_insert(self);
Ok(RecoverStatus::Started)
}
}
impl JournalEntry for PrepareCommit {
fn get_type(&self) -> u8 {
3
}
fn write(&self, _: &mut Write) -> PRes<()> {
Ok(())
}
fn read(&mut self, _: &mut Read) -> PRes<u32> {
Ok(0)
}
fn recover(&self, _: &mut Transaction) -> PRes<RecoverStatus> {
Ok(RecoverStatus::PrepareCommit)
}
}
impl JournalEntry for Commit {
fn get_type(&self) -> u8 {
4
}
fn write(&self, _: &mut Write) -> PRes<()> {
Ok(())
}
fn read(&mut self, _: &mut Read) -> PRes<u32> {
Ok(0)
}
fn recover(&self, _: &mut Transaction) -> PRes<RecoverStatus> {
Ok(RecoverStatus::Commit)
}
}
impl JournalEntry for Start {
fn get_type(&self) -> u8 {
1
}
fn write(&self, write: &mut Write) -> PRes<()> {
write.write_u8(self.strategy.value())?;
Ok(())
}
fn read(&mut self, read: &mut Read) -> PRes<u32> {
self.strategy = TxStrategy::from_value(read.read_u8()?);
Ok(1)
}
fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
tx.recover_strategy(&self.strategy);
Ok(RecoverStatus::Started)
}
}
impl JournalEntry for Rollback {
fn get_type(&self) -> u8 {
7
}
fn write(&self, _: &mut Write) -> PRes<()> {
Ok(())
}
fn read(&mut self, _: &mut Read) -> PRes<u32> {
Ok(0)
}
fn recover(&self, _: &mut Transaction) -> PRes<RecoverStatus> {
Ok(RecoverStatus::Rollback)
}
}
impl JournalEntry for CreateSegment {
fn get_type(&self) -> u8 {
8
}
fn write(&self, buffer: &mut Write) -> PRes<()> {
buffer.write_u32::<BigEndian>(self.segment_id)?;
buffer.write_u16::<BigEndian>(self.name.len() as u16)?;
buffer.write_all(self.name.as_bytes())?;
Ok(())
}
fn read(&mut self, buffer: &mut Read) -> PRes<u32> {
self.segment_id = buffer.read_u32::<BigEndian>()?;
let string_size = buffer.read_u16::<BigEndian>()?;
let mut slice: Vec<u8> = Vec::new();
try!(buffer.take(string_size as u64).read_to_end(&mut slice));
self.name = try!(str::from_utf8(&slice[0..string_size as usize])).into();
Ok((4 + string_size + 2) as u32)
}
fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
tx.recover_add(self);
Ok(RecoverStatus::Started)
}
}
impl JournalEntry for DropSegment {
fn get_type(&self) -> u8 {
9
}
fn write(&self, buffer: &mut Write) -> PRes<()> {
buffer.write_u32::<BigEndian>(self.segment_id)?;
buffer.write_u16::<BigEndian>(self.name.len() as u16)?;
buffer.write_all(self.name.as_bytes())?;
Ok(())
}
fn read(&mut self, buffer: &mut Read) -> PRes<u32> {
self.segment_id = buffer.read_u32::<BigEndian>()?;
let string_size = buffer.read_u16::<BigEndian>()?;
let mut slice: Vec<u8> = Vec::new();
try!(buffer.take(string_size as u64).read_to_end(&mut slice));
self.name = try!(str::from_utf8(&slice[0..string_size as usize])).into();
Ok((4 + string_size + 2) as u32)
}
fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
tx.recover_drop(self);
Ok(RecoverStatus::Started)
}
}
impl JournalId {
pub fn new(page: u64, pos: u32) -> JournalId {
JournalId {
page: page,
pos: pos,
}
}
}
impl Start {
fn new(strategy: &TxStrategy) -> Start {
Start { strategy: strategy.clone() }
}
}
#[cfg(test)]
mod tests {
use std::fs::OpenOptions;
use std::fs;
use std::io::Cursor;
use persy::RecRef;
use discref::DiscRef;
use allocator::Allocator;
use super::{Journal, JournalId, JournalEntry, StartList};
use transaction::{InsertRecord, UpdateRecord, DeleteRecord, CreateSegment, DropSegment, ReadRecord};
use std::sync::Arc;
use config::Config;
#[test()]
fn start_list_add_remove() {
let mut start = StartList::new();
start.push(&JournalId::new(1, 2));
start.push(&JournalId::new(1, 4));
assert!(start.remove(&JournalId::new(1, 2)));
assert!(start.remove(&JournalId::new(1, 4)));
start.push(&JournalId::new(1, 2));
start.push(&JournalId::new(1, 4));
assert!(!start.remove(&JournalId::new(1, 4)));
assert!(start.remove(&JournalId::new(1, 2)));
}
#[test()]
fn read_write_insert_record() {
let mut buffer = Vec::<u8>::new();
let to_write = InsertRecord::new(10, &RecRef::new(20, 10), 3);
to_write.write(&mut buffer).unwrap();
let mut to_read = InsertRecord::new(0, &RecRef::new(0, 0), 0);
to_read.read(&mut Cursor::<&Vec<u8>>::new(&buffer)).unwrap();
assert_eq!(to_read.segment, 10);
assert_eq!(to_read.recref.page, 20);
assert_eq!(to_read.recref.pos, 10);
assert_eq!(to_read.record_page, 3);
}
#[test()]
fn read_write_insert_read() {
let mut buffer = Vec::<u8>::new();
let to_write = ReadRecord::new(10, &RecRef::new(20, 10), 3);
to_write.write(&mut buffer).unwrap();
let mut to_read = ReadRecord::new(0, &RecRef::new(0, 0), 0);
to_read.read(&mut Cursor::<&Vec<u8>>::new(&buffer)).unwrap();
assert_eq!(to_read.segment, 10);
assert_eq!(to_read.recref.page, 20);
assert_eq!(to_read.recref.pos, 10);
assert_eq!(to_read.version, 3);
}
#[test()]
fn read_write_update_record() {
let mut buffer = Vec::<u8>::new();
let to_write = UpdateRecord::new(10, &RecRef::new(20, 10), 3, 5, 1);
to_write.write(&mut buffer).unwrap();
let mut to_read = UpdateRecord::new(0, &RecRef::new(0, 0), 0, 0, 0);
to_read.read(&mut Cursor::<&Vec<u8>>::new(&buffer)).unwrap();
assert_eq!(to_read.segment, 10);
assert_eq!(to_read.recref.page, 20);
assert_eq!(to_read.recref.pos, 10);
assert_eq!(to_read.record_page, 3);
assert_eq!(to_read.record_old_page, 5);
assert_eq!(to_read.version, 1);
}
#[test()]
fn read_write_delete_record() {
let mut buffer = Vec::<u8>::new();
let to_write = DeleteRecord::new(10, &RecRef::new(20, 10), 5, 1);
to_write.write(&mut buffer).unwrap();
let mut to_read = DeleteRecord::new(0, &RecRef::new(0, 0), 0, 1);
to_read.read(&mut Cursor::<&Vec<u8>>::new(&buffer)).unwrap();
assert_eq!(to_read.segment, 10);
assert_eq!(to_read.recref.page, 20);
assert_eq!(to_read.recref.pos, 10);
assert_eq!(to_read.record_old_page, 5);
assert_eq!(to_read.version, 1);
}
#[test()]
fn read_write_create_segment() {
let mut buffer = Vec::<u8>::new();
let to_write = CreateSegment::new("some", 10);
to_write.write(&mut buffer).unwrap();
let mut to_read = CreateSegment::new("", 0);
to_read.read(&mut Cursor::<&Vec<u8>>::new(&buffer)).unwrap();
assert_eq!(to_read.name, "some");
assert_eq!(to_read.segment_id, 10);
}
#[test()]
fn read_write_drop_segment() {
let mut buffer = Vec::<u8>::new();
let to_write = DropSegment::new("some", 20);
to_write.write(&mut buffer).unwrap();
let mut to_read = DropSegment::new("", 0);
to_read.read(&mut Cursor::<&Vec<u8>>::new(&buffer)).unwrap();
assert_eq!(to_read.name, "some");
assert_eq!(to_read.segment_id, 20);
}
#[test()]
fn journal_log_and_recover() {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open("./journal_test.persy")
.unwrap();
let disc = DiscRef::new(file);
let pa = Allocator::init(&disc).unwrap();
let allocator = Allocator::new(disc, &Arc::new(Config::new()), pa).unwrap();
let rp = Journal::init(&allocator).unwrap();
let journal = Journal::new(&Arc::new(allocator), rp).unwrap();
let rec = InsertRecord::new(10, &RecRef::new(1, 1), 1);
let id = JournalId::new(1, 20);
journal.log(&rec, &id).unwrap();
journal.log(&rec, &id).unwrap();
journal.recover(|e, _| assert_eq!(e.get_type(), 2)).unwrap();
fs::remove_file("./journal_test.persy").unwrap();
}
}