use crate::{
RecoverStatus,
error::PERes,
journal::{
JournalId, JournalRead,
records::{
Cleanup, Commit, CreateSegment, DeleteRecord, DropSegment, FreedPage, InsertRecord, Metadata,
NewSegmentPage, PrepareCommit, ReadRecord, Rollback, RollbackPage, Start, UpdateRecord,
},
},
persy::{CommitStatus, PersyImpl},
transaction::recover_tx::RecoverTransaction,
};
use std::collections::{HashMap, HashSet};
#[derive(Default)]
pub(crate) struct PagesUseTracking {
freed_page: HashSet<u64>,
journal_used_page: HashSet<u64>,
}
impl PagesUseTracking {
pub(crate) fn freed_page(&mut self, page: u64) {
self.freed_page.insert(page);
}
pub(crate) fn used_page(&mut self, page: u64) {
self.freed_page.remove(&page);
}
pub(crate) fn freed_pages(&self) -> &HashSet<u64> {
&self.freed_page
}
}
#[derive(Default)]
pub struct RecoverImpl {
pub(crate) tx_id: HashMap<Vec<u8>, JournalId>,
pub(crate) transactions: HashMap<JournalId, (RecoverTransaction, Option<CommitStatus>)>,
pub(crate) in_cleaning_reallocated: HashMap<JournalId, Vec<u64>>,
pub(crate) pages_use_tracking: PagesUseTracking,
pub(crate) commit_order: Vec<JournalId>,
}
impl std::fmt::Debug for RecoverImpl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{:?} {}, {:?}",
self.tx_id,
self.transactions.len(),
self.commit_order
)
}
}
fn new_allocation(in_cleaning: &mut HashMap<JournalId, Vec<u64>>, allocation: u64) {
for v in in_cleaning.values_mut() {
v.push(allocation)
}
}
impl RecoverImpl {
pub fn apply<C>(&mut self, recover: C) -> PERes<()>
where
C: Fn(&Vec<u8>) -> bool,
{
for (id, status) in self.list_transactions() {
if status == RecoverStatus::PrepareCommit {
if recover(&id) {
self.commit(id);
} else {
self.rollback(id);
}
}
}
Ok(())
}
pub(crate) fn new_allocation(&mut self, journal_id: &JournalId, allocation: u64) {
if let Some(tx_alloc) = self.in_cleaning_reallocated.get_mut(journal_id) {
tx_alloc.push(allocation);
}
}
pub fn list_transactions(&self) -> Vec<(Vec<u8>, RecoverStatus)> {
let mut res = Vec::new();
for id in &self.commit_order {
if let Some((id, status)) = self
.transactions
.get(id)
.map(|(s, _)| (s.meta_id().clone(), s.recover_status().clone()))
{
res.push((id, status));
}
}
res
}
pub fn status(&self, tx_id: Vec<u8>) -> Option<RecoverStatus> {
if let Some(id) = self.tx_id.get(&tx_id) {
self.transactions.get(id).map(|(t, _)| t.recover_status().clone())
} else {
None
}
}
pub fn commit(&mut self, tx_id: Vec<u8>) {
if let Some(id) = self.tx_id.get(&tx_id) {
if let Some(tx) = self.transactions.get_mut(id) {
tx.1 = Some(CommitStatus::Commit);
}
}
}
pub fn rollback(&mut self, tx_id: Vec<u8>) {
if let Some(id) = self.tx_id.get(&tx_id) {
if let Some(tx) = self.transactions.get_mut(id) {
tx.1 = Some(CommitStatus::Rollback);
}
}
}
pub fn get_tx(&mut self, journal_id: &JournalId) -> &mut RecoverTransaction {
let transactions = &mut self.transactions;
let (tx, _) = transactions.entry(journal_id.clone()).or_insert_with(|| {
(
RecoverTransaction::recover(journal_id.clone(), RecoverStatus::Partial),
None,
)
});
tx
}
fn set_status(&mut self, id: &JournalId, recover_status: RecoverStatus) {
match recover_status {
RecoverStatus::PrepareCommit => {
self.commit_order.push(id.clone());
}
RecoverStatus::Commit => {
self.in_cleaning_reallocated.insert(id.clone(), Vec::new());
}
RecoverStatus::Cleanup => {
self.in_cleaning_reallocated.remove(id);
}
_ => {}
}
}
pub(crate) fn finish_journal_read(&mut self) {
for (id, pages) in self.in_cleaning_reallocated.drain() {
if let Some((tx, _)) = self.transactions.get_mut(&id) {
tx.remove_free_pages(pages);
}
}
for (id, (tx, _)) in &self.transactions {
self.tx_id.insert(tx.meta_id().clone(), id.clone());
}
}
pub fn final_recover(mut self, persy: &PersyImpl) -> PERes<()> {
let allocator = &persy.allocator();
let journal = &persy.journal();
let address = &persy.address();
let last_page_before_apply = journal.last_page();
let mut to_clean_tx = Vec::new();
for id in self.commit_order {
if let Some((mut tx, chosen)) = self.transactions.remove(&id) {
let res = tx.final_recover(persy, chosen, &mut self.pages_use_tracking)?;
if res {
to_clean_tx.push(tx);
}
}
}
for (_, (mut tx, _)) in self.transactions.drain() {
let res = tx.final_recover(persy, Some(CommitStatus::Rollback), &mut self.pages_use_tracking)?;
if res {
to_clean_tx.push(tx);
}
}
for page in self.pages_use_tracking.freed_pages() {
if !self.pages_use_tracking.journal_used_page.contains(page) {
allocator.recover_free(*page)?;
}
}
for tx in to_clean_tx {
tx.log_cleanup(persy)?;
}
let pages = journal.recover_clean(last_page_before_apply)?;
let last = journal.last_page();
for page in pages {
allocator.recover_free(page)?;
}
address.recompute_last_pages()?;
address.flush_segments()?;
allocator.recover_sync()?;
let pages = journal.recover_clean(last)?;
for page in pages {
allocator.recover_free(page)?;
}
allocator.trim_free_at_end()?;
allocator.recover_sync()?;
Ok(())
}
}
impl JournalRead for RecoverImpl {
fn journal_page(&mut self, allocation: u64) {
self.pages_use_tracking.journal_used_page.insert(allocation);
new_allocation(&mut self.in_cleaning_reallocated, allocation)
}
fn start(&mut self, journal_id: JournalId, _start: &Start) {
self.transactions.insert(
journal_id.clone(),
(
RecoverTransaction::recover(journal_id.clone(), RecoverStatus::Started),
None,
),
);
}
fn insert_record(&mut self, journal_id: JournalId, insert: &InsertRecord) {
self.get_tx(&journal_id).recover_insert(insert);
self.new_allocation(&journal_id, insert.record_page);
}
fn prepare_commit(&mut self, journal_id: JournalId, _prepare_commit: &PrepareCommit) {
self.get_tx(&journal_id).recover_prepare_commit();
self.set_status(&journal_id, RecoverStatus::PrepareCommit);
}
fn commit(&mut self, journal_id: JournalId, _commit: &Commit) {
self.get_tx(&journal_id).recover_commit();
self.set_status(&journal_id, RecoverStatus::Commit);
}
fn update_record(&mut self, journal_id: JournalId, update_record: &UpdateRecord) {
self.get_tx(&journal_id).recover_update(update_record);
self.new_allocation(&journal_id, update_record.record_page);
}
fn delete_record(&mut self, journal_id: JournalId, delete_record: &DeleteRecord) {
self.get_tx(&journal_id).recover_delete(delete_record);
}
fn rollback(&mut self, journal_id: JournalId, _rollback: &Rollback) {
self.get_tx(&journal_id).recover_rollback_log();
self.set_status(&journal_id, RecoverStatus::Rollback);
}
fn create_segment(&mut self, journal_id: JournalId, create_segment: &CreateSegment) {
self.get_tx(&journal_id).recover_add(create_segment);
}
fn drop_segment(&mut self, journal_id: JournalId, drop_segment: &DropSegment) {
self.get_tx(&journal_id).recover_drop(drop_segment);
}
fn read_record(&mut self, journal_id: JournalId, read_record: &ReadRecord) {
self.get_tx(&journal_id).recover_read(read_record);
}
fn metadata(&mut self, journal_id: JournalId, metadata: &Metadata) {
self.get_tx(&journal_id).recover_metadata(metadata);
}
fn free_page(&mut self, journal_id: JournalId, free_page: &FreedPage) {
self.get_tx(&journal_id).recover_freed_page(free_page);
}
fn new_segment_page(&mut self, journal_id: JournalId, new_segment_page: &NewSegmentPage) {
self.get_tx(&journal_id).recover_new_segment_page(new_segment_page);
self.new_allocation(&journal_id, new_segment_page.page);
}
fn cleanup(&mut self, journal_id: JournalId, _cleanup: &Cleanup) {
self.get_tx(&journal_id).recover_cleanup();
self.set_status(&journal_id, RecoverStatus::Cleanup);
}
fn rollback_page(&mut self, journal_id: JournalId, rollback_page: &RollbackPage) {
self.get_tx(&journal_id).recover_rollback_page(rollback_page);
}
}