persy 1.3.0

Transactional Persistence Engine
Documentation
use crate::{
    error::PERes,
    journal::{JournalEntry, JournalId},
    persy::{CommitStatus, PersyImpl},
    snapshots::release_snapshot,
    transaction::tx_impl::TransactionImpl,
    RecoverStatus,
};
use std::collections::HashMap;
#[derive(Default)]
pub struct RecoverImpl {
    pub(crate) tx_id: HashMap<Vec<u8>, JournalId>,
    pub(crate) transactions: HashMap<JournalId, (RecoverStatus, TransactionImpl, Option<CommitStatus>)>,
    pub(crate) in_cleaning_reallocated: HashMap<JournalId, Vec<u64>>,
    pub(crate) order: Vec<JournalId>,
}
impl std::fmt::Debug for RecoverImpl {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{:?} {}, {:?}", self.tx_id, self.transactions.len(), self.order)
    }
}

fn new_allocation(in_cleaning: &mut HashMap<JournalId, Vec<u64>>, allocation: u64) {
    for v in in_cleaning.values_mut() {
        v.push(allocation)
    }
}

pub(crate) struct RecoverRefs<'a> {
    tx: &'a mut TransactionImpl,
    in_cleaning_reallocated: &'a mut HashMap<JournalId, Vec<u64>>,
}
impl<'a> RecoverRefs<'a> {
    pub(crate) fn tx(&mut self) -> &mut TransactionImpl {
        self.tx
    }
    pub(crate) fn new_allocation(&mut self, allocation: u64) {
        new_allocation(self.in_cleaning_reallocated, allocation)
    }
}

impl RecoverImpl {
    pub fn apply<C>(&mut self, recover: C) -> PERes<()>
    where
        C: Fn(&Vec<u8>) -> bool,
    {
        for (id, status) in self.list_transactions() {
            if status == RecoverStatus::PrepareCommit {
                if recover(&id) {
                    self.commit(id);
                } else {
                    self.rollback(id);
                }
            }
        }
        Ok(())
    }

    pub fn list_transactions(&self) -> Vec<(Vec<u8>, RecoverStatus)> {
        let mut res = Vec::new();
        for id in &self.order {
            if let Some((id, status)) = self
                .transactions
                .get(id)
                .map(|(s, tx, _)| (tx.meta_id().clone(), s.clone()))
            {
                res.push((id, status));
            }
        }
        res
    }

    pub fn status(&self, tx_id: Vec<u8>) -> Option<RecoverStatus> {
        if let Some(id) = self.tx_id.get(&tx_id) {
            self.transactions.get(id).map(|(s, _, _)| s.clone())
        } else {
            None
        }
    }

    pub fn commit(&mut self, tx_id: Vec<u8>) {
        if let Some(id) = self.tx_id.get(&tx_id) {
            if let Some(tx) = self.transactions.get_mut(id) {
                tx.2 = Some(CommitStatus::Commit);
            }
        }
    }

    pub fn rollback(&mut self, tx_id: Vec<u8>) {
        if let Some(id) = self.tx_id.get(&tx_id) {
            if let Some(tx) = self.transactions.get_mut(id) {
                tx.2 = Some(CommitStatus::Rollback);
            }
        }
    }

    pub(crate) fn recover_journal_entry(&mut self, record: &dyn JournalEntry, id: &JournalId) {
        let transactions = &mut self.transactions;

        let (status, tx, _) = transactions
            .entry(id.clone())
            .or_insert_with(|| (RecoverStatus::Started, TransactionImpl::recover(id.clone()), None));
        let cl = &mut self.in_cleaning_reallocated;
        let mut rec_ref = RecoverRefs {
            tx,
            in_cleaning_reallocated: cl,
        };
        *status = match record.recover(&mut rec_ref) {
            Err(_) => RecoverStatus::Rollback,
            Ok(_) if *status == RecoverStatus::Rollback => RecoverStatus::Rollback,
            Ok(x) => match x {
                RecoverStatus::Started => RecoverStatus::Started,
                RecoverStatus::PrepareCommit => {
                    self.order.push(id.clone());
                    RecoverStatus::PrepareCommit
                }
                RecoverStatus::Rollback => RecoverStatus::Rollback,
                RecoverStatus::Commit => {
                    self.in_cleaning_reallocated.insert(id.clone(), Vec::new());
                    RecoverStatus::Commit
                }
                RecoverStatus::Cleanup => {
                    self.in_cleaning_reallocated.remove(id);
                    RecoverStatus::Cleanup
                }
            },
        }
    }

    pub(crate) fn finish_journal_read(&mut self) {
        for (id, pages) in self.in_cleaning_reallocated.drain() {
            if let Some((_, tx, _)) = self.transactions.get_mut(&id) {
                tx.remove_free_pages(pages);
            }
        }
        for (id, (_, tx, _)) in &self.transactions {
            self.tx_id.insert(tx.meta_id().clone(), id.clone());
        }
    }

    pub fn final_recover(mut self, persy: &PersyImpl) -> PERes<()> {
        let allocator = &persy.allocator();
        let journal = &persy.journal();
        let address = &persy.address();
        let snapshots = &persy.snapshots();
        for id in self.order {
            if let Some((status, mut tx, choosed)) = self.transactions.remove(&id) {
                match status {
                    RecoverStatus::PrepareCommit => match choosed {
                        Some(CommitStatus::Commit) | None => {
                            let prepared_result = tx.recover_prepare(persy);
                            if let Ok(prepared) = prepared_result {
                                tx.recover_commit(persy, prepared)?;
                                journal.finished_to_clean(&[id])?;
                            }
                        }
                        Some(CommitStatus::Rollback) => {
                            tx.recover_rollback(persy)?;
                        }
                    },
                    RecoverStatus::Commit => {
                        tx.recover_cleanup(persy)?;
                        journal.finished_to_clean(&[id])?;
                    }
                    RecoverStatus::Cleanup => {
                        journal.cleaned_to_trim(&[id])?;
                    }
                    RecoverStatus::Started | RecoverStatus::Rollback => {
                        tx.recover_rollback(persy)?;
                    }
                }
            }
        }
        for (_, (_, tx, _)) in self.transactions.iter_mut() {
            tx.recover_rollback(persy)?;
        }
        address.recompute_last_pages()?;
        address.flush_segments()?;
        let to_release = allocator.recover_sync()?;
        for snap in to_release {
            release_snapshot(snap, snapshots, allocator, journal)?;
        }
        journal.clear_in_queue(snapshots)?;
        allocator.trim_free_at_end()?;
        Ok(())
    }

    pub(crate) fn journal_page(&mut self, allocation: u64) {
        new_allocation(&mut self.in_cleaning_reallocated, allocation)
    }
}