use crate::{
config::TxStrategy,
error::PERes,
id::{RecRef, SegmentId},
io::{InfallibleWrite, InfallibleWriteVarInt, ReadVarInt},
journal::{recover_impl::RecoverRefs, JournalEntry, RecoverStatus},
};
use std::{io::Read, str};
#[derive(Default)]
pub struct Start {}
impl JournalEntry for Start {
fn get_type(&self) -> u8 {
1
}
fn write(&self, _: &mut dyn InfallibleWrite) -> PERes<()> {
Ok(())
}
fn read(&mut self, _: &mut dyn Read) -> PERes<()> {
Ok(())
}
fn recover(&self, _recover: &mut RecoverRefs) -> PERes<RecoverStatus> {
panic!("this should never be called")
}
}
#[derive(Clone, Default)]
pub struct NewSegmentPage {
pub segment: SegmentId,
pub page: u64,
pub previous: u64,
}
impl NewSegmentPage {
pub fn new(segment: SegmentId, page: u64, previous: u64) -> NewSegmentPage {
NewSegmentPage {
segment,
page,
previous,
}
}
}
impl JournalEntry for NewSegmentPage {
fn get_type(&self) -> u8 {
13
}
fn write(&self, buffer: &mut dyn InfallibleWrite) -> PERes<()> {
self.segment.write_varint(buffer);
buffer.write_varint_u64(self.page);
buffer.write_varint_u64(self.previous);
Ok(())
}
fn read(&mut self, buffer: &mut dyn Read) -> PERes<()> {
self.segment = SegmentId::read_varint(buffer)?;
self.page = buffer.read_varint_u64()?;
self.previous = buffer.read_varint_u64()?;
Ok(())
}
fn recover(&self, recover: &mut RecoverRefs) -> PERes<RecoverStatus> {
recover.tx().recover_new_segment_page(self);
recover.new_allocation(self.page);
Ok(RecoverStatus::Started)
}
}
#[derive(Clone, Default)]
pub struct InsertRecord {
pub segment: SegmentId,
pub recref: RecRef,
pub record_page: u64,
}
impl InsertRecord {
pub fn new(segment: SegmentId, rec_ref: &RecRef, record: u64) -> InsertRecord {
InsertRecord {
segment,
recref: *rec_ref,
record_page: record,
}
}
}
impl JournalEntry for InsertRecord {
fn get_type(&self) -> u8 {
2
}
fn write(&self, buffer: &mut dyn InfallibleWrite) -> PERes<()> {
self.segment.write_varint(buffer);
buffer.write_varint_u64(self.recref.page);
buffer.write_varint_u32(self.recref.pos);
buffer.write_varint_u64(self.record_page);
Ok(())
}
fn read(&mut self, buffer: &mut dyn Read) -> PERes<()> {
self.segment = SegmentId::read_varint(buffer)?;
self.recref.page = buffer.read_varint_u64()?;
self.recref.pos = buffer.read_varint_u32()?;
self.record_page = buffer.read_varint_u64()?;
Ok(())
}
fn recover(&self, recover: &mut RecoverRefs) -> PERes<RecoverStatus> {
recover.tx().recover_insert(self);
recover.new_allocation(self.record_page);
Ok(RecoverStatus::Started)
}
}
#[derive(Clone, Default)]
pub struct UpdateRecord {
pub segment: SegmentId,
pub recref: RecRef,
pub record_page: u64,
pub version: u16,
}
impl UpdateRecord {
pub fn new(segment: SegmentId, rec_ref: &RecRef, record: u64, version: u16) -> UpdateRecord {
UpdateRecord {
segment,
recref: *rec_ref,
record_page: record,
version,
}
}
}
impl JournalEntry for UpdateRecord {
fn get_type(&self) -> u8 {
5
}
fn write(&self, buffer: &mut dyn InfallibleWrite) -> PERes<()> {
self.segment.write_varint(buffer);
buffer.write_varint_u64(self.recref.page);
buffer.write_varint_u32(self.recref.pos);
buffer.write_varint_u64(self.record_page);
buffer.write_varint_u16(self.version);
Ok(())
}
fn read(&mut self, buffer: &mut dyn Read) -> PERes<()> {
self.segment = SegmentId::read_varint(buffer)?;
self.recref.page = buffer.read_varint_u64()?;
self.recref.pos = buffer.read_varint_u32()?;
self.record_page = buffer.read_varint_u64()?;
self.version = buffer.read_varint_u16()?;
Ok(())
}
fn recover(&self, recover: &mut RecoverRefs) -> PERes<RecoverStatus> {
recover.tx().recover_update(self);
recover.new_allocation(self.record_page);
Ok(RecoverStatus::Started)
}
}
#[derive(Clone, Default)]
pub struct ReadRecord {
pub segment: SegmentId,
pub recref: RecRef,
pub version: u16,
}
impl ReadRecord {
pub fn new(segment: SegmentId, recref: &RecRef, version: u16) -> ReadRecord {
ReadRecord {
segment,
recref: *recref,
version,
}
}
}
impl JournalEntry for ReadRecord {
fn get_type(&self) -> u8 {
10
}
fn write(&self, buffer: &mut dyn InfallibleWrite) -> PERes<()> {
self.segment.write_varint(buffer);
buffer.write_varint_u64(self.recref.page);
buffer.write_varint_u32(self.recref.pos);
buffer.write_varint_u16(self.version);
Ok(())
}
fn read(&mut self, buffer: &mut dyn Read) -> PERes<()> {
self.segment = SegmentId::read_varint(buffer)?;
self.recref.page = buffer.read_varint_u64()?;
self.recref.pos = buffer.read_varint_u32()?;
self.version = buffer.read_varint_u16()?;
Ok(())
}
fn recover(&self, recover: &mut RecoverRefs) -> PERes<RecoverStatus> {
recover.tx().recover_read(self);
Ok(RecoverStatus::Started)
}
}
#[derive(Clone, Default)]
pub struct DeleteRecord {
pub segment: SegmentId,
pub recref: RecRef,
pub version: u16,
}
impl DeleteRecord {
pub fn new(segment: SegmentId, rec_ref: &RecRef, version: u16) -> DeleteRecord {
DeleteRecord {
segment,
recref: *rec_ref,
version,
}
}
}
impl JournalEntry for DeleteRecord {
fn get_type(&self) -> u8 {
6
}
fn write(&self, buffer: &mut dyn InfallibleWrite) -> PERes<()> {
self.segment.write_varint(buffer);
buffer.write_varint_u64(self.recref.page);
buffer.write_varint_u32(self.recref.pos);
buffer.write_varint_u16(self.version);
Ok(())
}
fn read(&mut self, buffer: &mut dyn Read) -> PERes<()> {
self.segment = SegmentId::read_varint(buffer)?;
self.recref.page = buffer.read_varint_u64()?;
self.recref.pos = buffer.read_varint_u32()?;
self.version = buffer.read_varint_u16()?;
Ok(())
}
fn recover(&self, recover: &mut RecoverRefs) -> PERes<RecoverStatus> {
recover.tx().recover_delete(self);
Ok(RecoverStatus::Started)
}
}
#[derive(Clone, Default)]
pub struct CreateSegment {
pub name: String,
pub segment_id: SegmentId,
pub first_page: u64,
}
impl CreateSegment {
pub fn new(name: &str, segment_id: SegmentId, first_page: u64) -> CreateSegment {
CreateSegment {
name: name.into(),
segment_id,
first_page,
}
}
}
impl JournalEntry for CreateSegment {
fn get_type(&self) -> u8 {
8
}
fn write(&self, buffer: &mut dyn InfallibleWrite) -> PERes<()> {
self.segment_id.write_varint(buffer);
buffer.write_varint_u64(self.first_page);
buffer.write_varint_u16(self.name.len() as u16);
buffer.write_all(self.name.as_bytes());
Ok(())
}
fn read(&mut self, buffer: &mut dyn Read) -> PERes<()> {
self.segment_id = SegmentId::read_varint(buffer)?;
self.first_page = buffer.read_varint_u64()?;
let string_size = buffer.read_varint_u16()?;
let mut slice: Vec<u8> = vec![0; string_size as usize];
buffer.read_exact(&mut slice[0..string_size as usize])?;
self.name = str::from_utf8(&slice[0..string_size as usize])?.into();
Ok(())
}
fn recover(&self, recover: &mut RecoverRefs) -> PERes<RecoverStatus> {
recover.tx().recover_add(self);
Ok(RecoverStatus::Started)
}
}
#[derive(Clone, Default)]
pub struct DropSegment {
pub name: String,
pub segment_id: SegmentId,
}
impl DropSegment {
pub fn new(name: &str, segment_id: SegmentId) -> DropSegment {
DropSegment {
name: name.into(),
segment_id,
}
}
}
impl JournalEntry for DropSegment {
fn get_type(&self) -> u8 {
9
}
fn write(&self, buffer: &mut dyn InfallibleWrite) -> PERes<()> {
self.segment_id.write_varint(buffer);
buffer.write_varint_u16(self.name.len() as u16);
buffer.write_all(self.name.as_bytes());
Ok(())
}
fn read(&mut self, buffer: &mut dyn Read) -> PERes<()> {
self.segment_id = SegmentId::read_varint(buffer)?;
let string_size = buffer.read_varint_u16()?;
let mut slice: Vec<u8> = vec![0; string_size as usize];
buffer.read_exact(&mut slice[0..string_size as usize])?;
self.name = str::from_utf8(&slice[0..string_size as usize])?.into();
Ok(())
}
fn recover(&self, recover: &mut RecoverRefs) -> PERes<RecoverStatus> {
recover.tx().recover_drop(self);
Ok(RecoverStatus::Started)
}
}
#[derive(Clone, PartialEq, Debug, PartialOrd, Ord, Eq, Default)]
pub struct FreedPage {
pub page: u64,
}
impl FreedPage {
pub fn new(page: u64) -> FreedPage {
FreedPage { page }
}
}
impl JournalEntry for FreedPage {
fn get_type(&self) -> u8 {
12
}
fn write(&self, buffer: &mut dyn InfallibleWrite) -> PERes<()> {
buffer.write_varint_u64(self.page);
Ok(())
}
fn read(&mut self, buffer: &mut dyn Read) -> PERes<()> {
self.page = buffer.read_varint_u64()?;
Ok(())
}
fn recover(&self, recover: &mut RecoverRefs) -> PERes<RecoverStatus> {
recover.tx().recover_freed_page(self);
Ok(RecoverStatus::Started)
}
}
#[derive(Default)]
pub struct PrepareCommit {}
impl PrepareCommit {
pub fn new() -> PrepareCommit {
PrepareCommit {}
}
}
impl JournalEntry for PrepareCommit {
fn get_type(&self) -> u8 {
3
}
fn write(&self, _: &mut dyn InfallibleWrite) -> PERes<()> {
Ok(())
}
fn read(&mut self, _: &mut dyn Read) -> PERes<()> {
Ok(())
}
fn recover(&self, _recover: &mut RecoverRefs) -> PERes<RecoverStatus> {
Ok(RecoverStatus::PrepareCommit)
}
}
#[derive(Default)]
pub struct Commit {}
impl Commit {
pub fn new() -> Commit {
Commit {}
}
}
impl JournalEntry for Commit {
fn get_type(&self) -> u8 {
4
}
fn write(&self, _: &mut dyn InfallibleWrite) -> PERes<()> {
Ok(())
}
fn read(&mut self, _: &mut dyn Read) -> PERes<()> {
Ok(())
}
fn recover(&self, _recover: &mut RecoverRefs) -> PERes<RecoverStatus> {
Ok(RecoverStatus::Commit)
}
}
#[derive(Default)]
pub struct Cleanup {}
impl Cleanup {
pub fn new() -> Cleanup {
Cleanup {}
}
}
impl JournalEntry for Cleanup {
fn get_type(&self) -> u8 {
14
}
fn write(&self, _: &mut dyn InfallibleWrite) -> PERes<()> {
Ok(())
}
fn read(&mut self, _: &mut dyn Read) -> PERes<()> {
Ok(())
}
fn recover(&self, _recover: &mut RecoverRefs) -> PERes<RecoverStatus> {
Ok(RecoverStatus::Cleanup)
}
}
#[derive(Default)]
pub struct Rollback {}
impl Rollback {
pub fn new() -> Rollback {
Rollback {}
}
}
impl JournalEntry for Rollback {
fn get_type(&self) -> u8 {
7
}
fn write(&self, _: &mut dyn InfallibleWrite) -> PERes<()> {
Ok(())
}
fn read(&mut self, _: &mut dyn Read) -> PERes<()> {
Ok(())
}
fn recover(&self, _recover: &mut RecoverRefs) -> PERes<RecoverStatus> {
Ok(RecoverStatus::Rollback)
}
}
#[derive(Default)]
pub struct Metadata {
pub strategy: TxStrategy,
pub meta_id: Vec<u8>,
}
impl Metadata {
pub fn new(strategy: &TxStrategy, meta_id: Vec<u8>) -> Metadata {
Metadata {
strategy: strategy.clone(),
meta_id,
}
}
}
impl JournalEntry for Metadata {
fn get_type(&self) -> u8 {
11
}
fn write(&self, write: &mut dyn InfallibleWrite) -> PERes<()> {
write.write_varint_u8(self.strategy.value());
let len = self.meta_id.len();
write.write_varint_u16(len as u16);
write.write_all(&self.meta_id);
Ok(())
}
fn read(&mut self, read: &mut dyn Read) -> PERes<()> {
self.strategy = TxStrategy::from_value(read.read_varint_u8()?);
let len = read.read_varint_u16()?;
let mut slice: Vec<u8> = vec![0; len as usize];
read.read_exact(&mut slice[0..len as usize])?;
self.meta_id = slice;
Ok(())
}
fn recover(&self, recover: &mut RecoverRefs) -> PERes<RecoverStatus> {
recover.tx().recover_metadata(self);
Ok(RecoverStatus::Started)
}
}
#[derive(Clone, PartialEq, Debug, PartialOrd, Ord, Eq, Default)]
pub struct RollbackPage {
pub segment: SegmentId,
pub recref: RecRef,
pub record_page: u64,
}
impl RollbackPage {
pub fn new(segment: SegmentId, rec_ref: &RecRef, record: u64) -> Self {
Self {
segment,
recref: *rec_ref,
record_page: record,
}
}
}
impl JournalEntry for RollbackPage {
fn get_type(&self) -> u8 {
15
}
fn write(&self, buffer: &mut dyn InfallibleWrite) -> PERes<()> {
self.segment.write_varint(buffer);
buffer.write_varint_u64(self.recref.page);
buffer.write_varint_u32(self.recref.pos);
buffer.write_varint_u64(self.record_page);
Ok(())
}
fn read(&mut self, buffer: &mut dyn Read) -> PERes<()> {
self.segment = SegmentId::read_varint(buffer)?;
self.recref.page = buffer.read_varint_u64()?;
self.recref.pos = buffer.read_varint_u32()?;
self.record_page = buffer.read_varint_u64()?;
Ok(())
}
fn recover(&self, recover: &mut RecoverRefs) -> PERes<RecoverStatus> {
recover.tx().recover_rollback_page(self);
Ok(RecoverStatus::Started)
}
}