use crate::{
error::PERes,
journal::{JournalEntry, JournalId},
persy::{CommitStatus, PersyImpl},
snapshots::release_snapshot,
transaction::tx_impl::TransactionImpl,
RecoverStatus,
};
use std::collections::HashMap;
#[derive(Default)]
pub struct RecoverImpl {
pub(crate) tx_id: HashMap<Vec<u8>, JournalId>,
pub(crate) transactions: HashMap<JournalId, (RecoverStatus, TransactionImpl, Option<CommitStatus>)>,
pub(crate) in_cleaning_reallocated: HashMap<JournalId, Vec<u64>>,
pub(crate) order: Vec<JournalId>,
}
impl std::fmt::Debug for RecoverImpl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?} {}, {:?}", self.tx_id, self.transactions.len(), self.order)
}
}
fn new_allocation(in_cleaning: &mut HashMap<JournalId, Vec<u64>>, allocation: u64) {
for v in in_cleaning.values_mut() {
v.push(allocation)
}
}
pub(crate) struct RecoverRefs<'a> {
tx: &'a mut TransactionImpl,
in_cleaning_reallocated: &'a mut HashMap<JournalId, Vec<u64>>,
}
impl<'a> RecoverRefs<'a> {
pub(crate) fn tx(&mut self) -> &mut TransactionImpl {
self.tx
}
pub(crate) fn new_allocation(&mut self, allocation: u64) {
new_allocation(self.in_cleaning_reallocated, allocation)
}
}
impl RecoverImpl {
pub fn apply<C>(&mut self, recover: C) -> PERes<()>
where
C: Fn(&Vec<u8>) -> bool,
{
for (id, status) in self.list_transactions() {
if status == RecoverStatus::PrepareCommit {
if recover(&id) {
self.commit(id);
} else {
self.rollback(id);
}
}
}
Ok(())
}
pub fn list_transactions(&self) -> Vec<(Vec<u8>, RecoverStatus)> {
let mut res = Vec::new();
for id in &self.order {
if let Some((id, status)) = self
.transactions
.get(id)
.map(|(s, tx, _)| (tx.meta_id().clone(), s.clone()))
{
res.push((id, status));
}
}
res
}
pub fn status(&self, tx_id: Vec<u8>) -> Option<RecoverStatus> {
if let Some(id) = self.tx_id.get(&tx_id) {
self.transactions.get(id).map(|(s, _, _)| s.clone())
} else {
None
}
}
pub fn commit(&mut self, tx_id: Vec<u8>) {
if let Some(id) = self.tx_id.get(&tx_id) {
if let Some(tx) = self.transactions.get_mut(id) {
tx.2 = Some(CommitStatus::Commit);
}
}
}
pub fn rollback(&mut self, tx_id: Vec<u8>) {
if let Some(id) = self.tx_id.get(&tx_id) {
if let Some(tx) = self.transactions.get_mut(id) {
tx.2 = Some(CommitStatus::Rollback);
}
}
}
pub(crate) fn recover_journal_entry(&mut self, record: &dyn JournalEntry, id: &JournalId) {
let transactions = &mut self.transactions;
let (status, tx, _) = transactions
.entry(id.clone())
.or_insert_with(|| (RecoverStatus::Started, TransactionImpl::recover(id.clone()), None));
let cl = &mut self.in_cleaning_reallocated;
let mut rec_ref = RecoverRefs {
tx,
in_cleaning_reallocated: cl,
};
*status = match record.recover(&mut rec_ref) {
Err(_) => RecoverStatus::Rollback,
Ok(_) if *status == RecoverStatus::Rollback => RecoverStatus::Rollback,
Ok(x) => match x {
RecoverStatus::Started => RecoverStatus::Started,
RecoverStatus::PrepareCommit => {
self.order.push(id.clone());
RecoverStatus::PrepareCommit
}
RecoverStatus::Rollback => RecoverStatus::Rollback,
RecoverStatus::Commit => {
self.in_cleaning_reallocated.insert(id.clone(), Vec::new());
RecoverStatus::Commit
}
RecoverStatus::Cleanup => {
self.in_cleaning_reallocated.remove(id);
RecoverStatus::Cleanup
}
},
}
}
pub(crate) fn finish_journal_read(&mut self) {
for (id, pages) in self.in_cleaning_reallocated.drain() {
if let Some((_, tx, _)) = self.transactions.get_mut(&id) {
tx.remove_free_pages(pages);
}
}
for (id, (_, tx, _)) in &self.transactions {
self.tx_id.insert(tx.meta_id().clone(), id.clone());
}
}
pub fn final_recover(mut self, persy: &PersyImpl) -> PERes<()> {
let allocator = &persy.allocator();
let journal = &persy.journal();
let address = &persy.address();
let snapshots = &persy.snapshots();
for id in self.order {
if let Some((status, mut tx, choosed)) = self.transactions.remove(&id) {
match status {
RecoverStatus::PrepareCommit => match choosed {
Some(CommitStatus::Commit) | None => {
let prepared_result = tx.recover_prepare(persy);
if let Ok(prepared) = prepared_result {
tx.recover_commit(persy, prepared)?;
journal.finished_to_clean(&[id])?;
}
}
Some(CommitStatus::Rollback) => {
tx.recover_rollback(persy)?;
}
},
RecoverStatus::Commit => {
tx.recover_cleanup(persy)?;
journal.finished_to_clean(&[id])?;
}
RecoverStatus::Cleanup => {
journal.cleaned_to_trim(&[id])?;
}
RecoverStatus::Started | RecoverStatus::Rollback => {
tx.recover_rollback(persy)?;
}
}
}
}
for (_, (_, tx, _)) in self.transactions.iter_mut() {
tx.recover_rollback(persy)?;
}
address.recompute_last_pages()?;
address.flush_segments()?;
let to_release = allocator.recover_sync()?;
for snap in to_release {
release_snapshot(snap, snapshots, allocator, journal)?;
}
journal.clear_in_queue(snapshots)?;
allocator.trim_free_at_end()?;
Ok(())
}
pub(crate) fn journal_page(&mut self, allocation: u64) {
new_allocation(&mut self.in_cleaning_reallocated, allocation)
}
}