use crate::{
allocator::Allocator,
device::{Page, PageOps, ReadPage},
error::PERes,
journal::{
records::{
Cleanup, Commit, CreateSegment, DeleteRecord, DropSegment, FreedPage, InsertRecord, Metadata,
NewSegmentPage, PrepareCommit, ReadRecord, Rollback, RollbackPage, Start, UpdateRecord,
},
recover_impl::RecoverImpl,
},
snapshot::data::CleanInfo,
snapshots::Snapshots,
util::{
io::{
InfallibleReadFormat, InfallibleWrite, InfallibleWriteFormat, InfallibleWriteVarInt, ReadVarInt, read_u64,
write_u64,
},
vec_set::VecSet,
},
};
use std::{
collections::hash_set::HashSet,
io::Read,
sync::{Arc, Mutex},
};
pub(crate) mod records;
pub(crate) mod recover_impl;
mod startlist;
use startlist::StartList;
#[cfg(test)]
mod tests;
pub const JOURNAL_PAGE_EXP: u8 = 10; const JOURNAL_PAGE_NEXT_OFFSET: u32 = 0;
const JOURNAL_PAGE_PREV_OFFSET: u32 = 8;
const JOURNAL_PAGE_CONTENT_OFFSET: u32 = 16;
const JOURNAL_ROOT_VERSION_0: u8 = 0;
const JOURNAL_ROOT_VERSION: u8 = JOURNAL_ROOT_VERSION_0;
pub(crate) struct JournalShared {
root: u64,
first_page: u64,
last_page: u64,
last_pos: u32,
starts: StartList,
current: Page,
to_clear: Vec<JournalId>,
}
impl JournalShared {
fn new_version_0(mut page: ReadPage, all: &Allocator) -> PERes<JournalShared> {
let first_page;
{
let buffer = all.read_root_journal(&mut page, 11);
first_page = read_u64(&buffer[0..8]);
}
let current = if first_page != 0 {
all.write_page(first_page)?
} else {
Page::new(Vec::new(), 0, 0, 0)
};
Ok(JournalShared {
root: page.get_index(),
first_page,
last_page: first_page,
last_pos: JOURNAL_PAGE_CONTENT_OFFSET,
starts: StartList::new(),
current,
to_clear: Vec::new(),
})
}
fn required_space(&mut self, space: u32, allocator: &Allocator) -> PERes<()> {
if self.last_pos + space + 1 >= self.current.get_content_size() {
let prev = self.last_page;
let last_pos = self.last_pos;
let new_page = allocator.allocate(JOURNAL_PAGE_EXP)?;
let new_index = new_page.get_index();
let mut old = std::mem::replace(&mut self.current, new_page);
self.last_page = new_index;
self.current.seek(JOURNAL_PAGE_PREV_OFFSET);
self.current.write_u64(prev);
allocator.flush_journal(&self.current)?;
if prev != 0 {
old.seek(JOURNAL_PAGE_NEXT_OFFSET);
old.write_u64(new_index);
old.seek(last_pos);
old.write_u8(0);
allocator.flush_page(old)?;
}
self.last_pos = JOURNAL_PAGE_CONTENT_OFFSET;
}
Ok(())
}
fn append_buffer(&mut self, buffer: &[u8]) -> PERes<(u64, u32)> {
let cur_page = self.last_page;
let cur_pos = self.last_pos;
self.current.seek(cur_pos);
self.current.write_all(buffer);
self.last_pos += buffer.len() as u32;
Ok((cur_page, cur_pos))
}
fn log_bytes(&mut self, buffer: &[u8], allocator: &Allocator, flush: bool) -> PERes<()> {
self.required_space(buffer.len() as u32, allocator)?;
self.append_buffer(buffer)?;
if flush {
allocator.flush_journal(&self.current)?;
}
Ok(())
}
pub fn cleaned_to_trim(&mut self, ids: &[JournalId]) {
self.to_clear.extend_from_slice(ids);
}
pub fn start(&mut self, allocator: &Allocator) -> PERes<JournalId> {
let buffer = Start::default().to_buffer(&JournalId::new(0, 0))?;
self.required_space(buffer.len() as u32, allocator)?;
let val = self.append_buffer(&buffer)?;
let id = JournalId::new(val.0, val.1);
self.starts.push(&id);
Ok(id)
}
pub fn recover_clean(&mut self, allocator: &Allocator, till: u64) -> PERes<Vec<u64>> {
let mut pages_to_free = Vec::new();
let mut free_cursor = self.first_page;
while free_cursor != till && free_cursor != self.last_page {
let read = if let Some(mut cur) = allocator.load_page_not_free(free_cursor)? {
cur.seek(JOURNAL_PAGE_NEXT_OFFSET);
cur.read_u64()
} else {
break;
};
if !VecSet::insert(&mut pages_to_free, free_cursor) {
break;
}
free_cursor = read;
}
self.first_page = free_cursor;
let mut buffer = [0; 11];
write_u64(&mut buffer[0..8], free_cursor);
let root = allocator.write_page(self.root)?;
allocator.write_journal_root(root, &mut buffer, JOURNAL_ROOT_VERSION)?;
Ok(pages_to_free)
}
pub fn clear_in_queue(&mut self, allocator: &Allocator, snapshots: &Arc<Snapshots>) -> PERes<()> {
let mut pages_to_free = Vec::new();
let ids = self.to_clear.clone();
self.to_clear.clear();
let mut new_first = None;
for id in ids {
if self.starts.remove(&id) {
let first_page = new_first.unwrap_or(self.first_page);
let mut free_cursor = id.page;
loop {
let read = if free_cursor == self.last_page {
let pos = self.current.cursor_pos();
self.current.seek(JOURNAL_PAGE_PREV_OFFSET);
let prev = self.current.read_u64();
self.current.seek(pos as u32);
prev
} else if let Some(mut cur) = allocator.load_page_not_free(free_cursor)? {
cur.seek(JOURNAL_PAGE_PREV_OFFSET);
cur.read_u64()
} else {
break;
};
if free_cursor != id.page && !VecSet::insert(&mut pages_to_free, free_cursor) {
break;
}
if free_cursor == first_page {
break;
}
free_cursor = read;
}
new_first = Some(id.page);
}
}
if let Some(first) = new_first {
self.first_page = first;
let mut buffer = [0; 11];
write_u64(&mut buffer[0..8], first);
let root = allocator.write_page(self.root)?;
allocator.write_journal_root(root, &mut buffer, JOURNAL_ROOT_VERSION)?;
}
self.free_pages_tx(pages_to_free, allocator, snapshots)?;
Ok(())
}
pub fn free_pages_tx(
&mut self,
pages_to_free: Vec<u64>,
allocator: &Allocator,
snapshots: &Arc<Snapshots>,
) -> PERes<()> {
if !pages_to_free.is_empty() {
let id = self.start(allocator)?;
let mut pages_log = Vec::new();
for page in &pages_to_free {
let free_page = FreedPage::new(*page);
self.log_bytes(&free_page.to_buffer(&id)?, allocator, false)?;
pages_log.push(free_page)
}
self.log_bytes(&PrepareCommit::new().to_buffer(&id)?, allocator, false)?;
self.log_bytes(&Commit::new().to_buffer(&id)?, allocator, true)?;
let snapshot_ref = snapshots.snapshot(Vec::new(), CleanInfo::new(pages_log, Vec::new()), id);
if let Some(pc) = snapshots.pending_clean(snapshot_ref.id()) {
allocator.to_release_next_sync(pc);
}
}
Ok(())
}
pub fn finished_to_clean(&mut self, ids: &[JournalId], allocator: &Allocator) -> PERes<()> {
let mut iter = ids.iter().peekable();
while let Some(id) = iter.next() {
self.log_bytes(&Cleanup::new().to_buffer(id)?, allocator, iter.peek().is_none())?;
}
self.cleaned_to_trim(ids);
Ok(())
}
fn read_journal_record<JR: JournalRead>(
tp: u8,
cur_page: u64,
cursor_pos: u32,
ref_page: &mut ReadPage,
journal_read: &mut JR,
) -> PERes<bool> {
let page_id = ref_page.read_varint_u64()?;
let pos = ref_page.read_varint_u32()?;
let id = if tp == 1 {
JournalId::new(cur_page, cursor_pos)
} else {
JournalId::new(page_id, pos)
};
match tp {
1 => journal_read.start(id, &Start::read_new(ref_page)?),
2 => journal_read.insert_record(id, &InsertRecord::read_new(ref_page)?),
3 => journal_read.prepare_commit(id, &PrepareCommit::read_new(ref_page)?),
4 => journal_read.commit(id, &Commit::read_new(ref_page)?),
5 => journal_read.update_record(id, &UpdateRecord::read_new(ref_page)?),
6 => journal_read.delete_record(id, &DeleteRecord::read_new(ref_page)?),
7 => journal_read.rollback(id, &Rollback::read_new(ref_page)?),
8 => journal_read.create_segment(id, &CreateSegment::read_new(ref_page)?),
9 => journal_read.drop_segment(id, &DropSegment::read_new(ref_page)?),
10 => journal_read.read_record(id, &ReadRecord::read_new(ref_page)?),
11 => journal_read.metadata(id, &Metadata::read_new(ref_page)?),
12 => journal_read.free_page(id, &FreedPage::read_new(ref_page)?),
13 => journal_read.new_segment_page(id, &NewSegmentPage::read_new(ref_page)?),
14 => journal_read.cleanup(id, &Cleanup::read_new(ref_page)?),
15 => journal_read.rollback_page(id, &RollbackPage::read_new(ref_page)?),
_ => return Ok(false),
};
Ok(true)
}
fn recover<JR: JournalRead>(&mut self, journal_read: &mut JR, allocator: &Allocator) -> PERes<()> {
let mut cur_page = self.first_page;
let mut journal_pages = HashSet::new();
self.last_page = self.first_page;
journal_read.journal_page(cur_page);
let mut page = allocator.load_page(cur_page)?;
page.seek(JOURNAL_PAGE_CONTENT_OFFSET);
let mut reset_next = false;
loop {
let cursor_pos = page.cursor_pos();
let tp = page.read_u8();
let last_pos = page.cursor_pos() as u32;
if tp == 0 {
page.seek(JOURNAL_PAGE_NEXT_OFFSET);
cur_page = page.read_u64();
if cur_page == 0 {
self.last_pos = last_pos - 1;
break;
}
if let Some(pg) = allocator.load_page_not_free(cur_page)? {
page = pg;
} else {
reset_next = true;
self.last_pos = last_pos - 1;
break;
}
journal_read.journal_page(cur_page);
page.seek(JOURNAL_PAGE_CONTENT_OFFSET);
self.last_page = cur_page;
if journal_pages.contains(&cur_page) {
break;
}
journal_pages.insert(cur_page);
} else {
let res = Self::read_journal_record(tp, cur_page, cursor_pos as u32, &mut page, journal_read);
if !res.unwrap_or(false) {
self.last_pos = last_pos - 1;
reset_next = true;
break;
}
}
}
self.current = allocator.write_page(self.last_page)?;
if reset_next {
self.current.seek(JOURNAL_PAGE_NEXT_OFFSET);
self.current.write_u64(0);
self.current.seek(self.last_pos);
}
Ok(())
}
fn last_page(&self) -> u64 {
self.last_page
}
#[cfg(feature = "experimental_inspect")]
pub fn inspect<I>(first_page: u64, journal_read: &mut I, device_read: impl crate::device::Device) -> PERes<()>
where
I: JournalRead,
{
let mut cur_page = first_page;
let mut journal_pages = HashSet::new();
journal_read.journal_page(cur_page);
let mut page = device_read.load_page(cur_page)?;
page.seek(JOURNAL_PAGE_CONTENT_OFFSET);
loop {
let cursor_pos = page.cursor_pos();
let tp = page.read_u8();
if tp == 0 {
page.seek(JOURNAL_PAGE_NEXT_OFFSET);
cur_page = page.read_u64();
if cur_page == 0 {
break;
}
if let Some(pg) = device_read.load_page_if_exists(cur_page)? {
if pg.is_free()? {
break;
} else {
page = pg;
}
} else {
break;
}
journal_read.journal_page(cur_page);
page.seek(JOURNAL_PAGE_CONTENT_OFFSET);
if journal_pages.contains(&cur_page) {
break;
}
journal_pages.insert(cur_page);
} else {
let res = Self::read_journal_record(tp, cur_page, cursor_pos as u32, &mut page, journal_read);
if !res.unwrap_or(false) {
break;
}
}
}
Ok(())
}
}
pub trait JournalRead {
fn journal_page(&mut self, allocation: u64);
fn start(&mut self, journal_id: JournalId, start: &Start);
fn insert_record(&mut self, journal_id: JournalId, insert: &InsertRecord);
fn prepare_commit(&mut self, journal_id: JournalId, prepare_commit: &PrepareCommit);
fn commit(&mut self, journal_id: JournalId, commit: &Commit);
fn update_record(&mut self, journal_id: JournalId, update_record: &UpdateRecord);
fn delete_record(&mut self, journal_id: JournalId, delete_record: &DeleteRecord);
fn rollback(&mut self, journal_id: JournalId, rollback: &Rollback);
fn create_segment(&mut self, journal_id: JournalId, create_segment: &CreateSegment);
fn drop_segment(&mut self, journal_id: JournalId, drop_segment: &DropSegment);
fn read_record(&mut self, journal_id: JournalId, read_record: &ReadRecord);
fn metadata(&mut self, journal_id: JournalId, metadata: &Metadata);
fn free_page(&mut self, journal_id: JournalId, free_page: &FreedPage);
fn new_segment_page(&mut self, journal_id: JournalId, new_segment_page: &NewSegmentPage);
fn cleanup(&mut self, journal_id: JournalId, cleanup: &Cleanup);
fn rollback_page(&mut self, journal_id: JournalId, rollback_page: &RollbackPage);
}
pub struct Journal {
allocator: Arc<Allocator>,
journal: Mutex<JournalShared>,
}
pub(crate) trait JournalEntry {
fn get_type(&self) -> u8;
fn read_new(buffer: &mut dyn Read) -> PERes<Self>
where
Self: Sized;
fn write(&self, buffer: &mut dyn InfallibleWrite) -> PERes<()>;
fn to_buffer(&self, id: &JournalId) -> PERes<Vec<u8>> {
let mut buffer = Vec::<u8>::new();
buffer.write_u8(self.get_type());
id.write(&mut buffer);
self.write(&mut buffer)?;
Ok(buffer)
}
}
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
pub struct JournalId {
page: u64,
pos: u32,
}
impl JournalId {
pub fn new(page: u64, pos: u32) -> JournalId {
JournalId { page, pos }
}
fn write(&self, w: &mut dyn InfallibleWrite) {
w.write_varint_u64(self.page);
w.write_varint_u32(self.pos);
}
}
impl Journal {
pub fn new(all: &Arc<Allocator>, page: u64) -> PERes<Journal> {
let mut page = all.load_page(page)?;
let journal = match page.read_u8() {
JOURNAL_ROOT_VERSION_0 => JournalShared::new_version_0(page, all)?,
_ => panic!("version not supported"),
};
Ok(Journal {
allocator: all.clone(),
journal: Mutex::new(journal),
})
}
pub fn init(allocator: &Allocator) -> PERes<u64> {
let root_page = allocator.allocate(5)?;
let root_page_index = root_page.get_index();
let first_page = allocator.allocate(JOURNAL_PAGE_EXP)?;
let mut buffer = [0; 11];
write_u64(&mut buffer[0..8], first_page.get_index());
allocator.write_journal_root(root_page, &mut buffer, JOURNAL_ROOT_VERSION)?;
Ok(root_page_index)
}
pub fn start(&self) -> PERes<JournalId> {
let mut jr = self.journal.lock().expect("journal lock not poisoned");
jr.start(&self.allocator)
}
pub(crate) fn prepare(&self, entry: &dyn JournalEntry, id: &JournalId) -> PERes<()> {
self.internal_log(entry, id, true)?;
Ok(())
}
pub(crate) fn end(&self, entry: &dyn JournalEntry, id: &JournalId) -> PERes<()> {
self.internal_log(entry, id, true)?;
Ok(())
}
pub fn clear_in_queue(&self, snapshots: &Arc<Snapshots>) -> PERes<()> {
let mut jr = self.journal.lock().expect("journal lock not poisoned");
jr.clear_in_queue(&self.allocator, snapshots)
}
pub fn last_page(&self) -> u64 {
let jr = self.journal.lock().expect("journal lock not poisoned");
jr.last_page()
}
pub fn recover_clean(&self, till: u64) -> PERes<Vec<u64>> {
let mut jr = self.journal.lock().expect("journal lock not poisoned");
jr.recover_clean(&self.allocator, till)
}
pub fn finished_to_clean(&self, ids: &[JournalId]) -> PERes<()> {
let mut lock = self.journal.lock().expect("journal lock not poisoned");
lock.finished_to_clean(ids, &self.allocator)
}
pub fn cleaned_to_trim(&self, ids: &[JournalId]) {
let mut lock = self.journal.lock().expect("journal lock not poisoned");
lock.cleaned_to_trim(ids)
}
pub(crate) fn log(&self, entry: &dyn JournalEntry, id: &JournalId) -> PERes<()> {
self.internal_log(entry, id, false)?;
Ok(())
}
fn internal_log(&self, entry: &dyn JournalEntry, id: &JournalId, flush: bool) -> PERes<()> {
let buffer = entry.to_buffer(id)?;
let mut jr = self.journal.lock().expect("journal lock not poisoned");
jr.log_bytes(&buffer, &self.allocator, flush)
}
pub fn is_page_in_start_list(&self, page: u64) -> bool {
let jr = self.journal.lock().expect("journal lock not poisoned");
jr.starts.is_page_in_start_list(page)
}
pub(crate) fn recover(&self, recover: &mut RecoverImpl) -> PERes<()> {
let mut jr = self.journal.lock().expect("journal lock not poisoned");
jr.recover(recover, &self.allocator)
}
}