persy 1.8.0

Transactional Persistence Engine
Documentation
use crate::{
    RecoverStatus,
    error::PERes,
    journal::{
        JournalId, JournalRead,
        records::{
            Cleanup, Commit, CreateSegment, DeleteRecord, DropSegment, FreedPage, InsertRecord, Metadata,
            NewSegmentPage, PrepareCommit, ReadRecord, Rollback, RollbackPage, Start, UpdateRecord,
        },
    },
    persy::{CommitStatus, PersyImpl},
    transaction::recover_tx::RecoverTransaction,
};
use std::collections::{HashMap, HashSet};

#[derive(Default)]
pub(crate) struct PagesUseTracking {
    freed_page: HashSet<u64>,
    journal_used_page: HashSet<u64>,
}

impl PagesUseTracking {
    pub(crate) fn freed_page(&mut self, page: u64) {
        self.freed_page.insert(page);
    }
    pub(crate) fn used_page(&mut self, page: u64) {
        self.freed_page.remove(&page);
    }
    pub(crate) fn freed_pages(&self) -> &HashSet<u64> {
        &self.freed_page
    }
}

#[derive(Default)]
pub struct RecoverImpl {
    pub(crate) tx_id: HashMap<Vec<u8>, JournalId>,
    pub(crate) transactions: HashMap<JournalId, (RecoverTransaction, Option<CommitStatus>)>,
    pub(crate) in_cleaning_reallocated: HashMap<JournalId, Vec<u64>>,
    pub(crate) pages_use_tracking: PagesUseTracking,
    pub(crate) commit_order: Vec<JournalId>,
}
impl std::fmt::Debug for RecoverImpl {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "{:?} {}, {:?}",
            self.tx_id,
            self.transactions.len(),
            self.commit_order
        )
    }
}

fn new_allocation(in_cleaning: &mut HashMap<JournalId, Vec<u64>>, allocation: u64) {
    for v in in_cleaning.values_mut() {
        v.push(allocation)
    }
}

impl RecoverImpl {
    pub fn apply<C>(&mut self, recover: C) -> PERes<()>
    where
        C: Fn(&Vec<u8>) -> bool,
    {
        for (id, status) in self.list_transactions() {
            if status == RecoverStatus::PrepareCommit {
                if recover(&id) {
                    self.commit(id);
                } else {
                    self.rollback(id);
                }
            }
        }
        Ok(())
    }

    pub(crate) fn new_allocation(&mut self, journal_id: &JournalId, allocation: u64) {
        if let Some(tx_alloc) = self.in_cleaning_reallocated.get_mut(journal_id) {
            tx_alloc.push(allocation);
        }
    }

    pub fn list_transactions(&self) -> Vec<(Vec<u8>, RecoverStatus)> {
        let mut res = Vec::new();
        for id in &self.commit_order {
            if let Some((id, status)) = self
                .transactions
                .get(id)
                .map(|(s, _)| (s.meta_id().clone(), s.recover_status().clone()))
            {
                res.push((id, status));
            }
        }
        res
    }

    pub fn status(&self, tx_id: Vec<u8>) -> Option<RecoverStatus> {
        if let Some(id) = self.tx_id.get(&tx_id) {
            self.transactions.get(id).map(|(t, _)| t.recover_status().clone())
        } else {
            None
        }
    }

    pub fn commit(&mut self, tx_id: Vec<u8>) {
        if let Some(id) = self.tx_id.get(&tx_id) {
            if let Some(tx) = self.transactions.get_mut(id) {
                tx.1 = Some(CommitStatus::Commit);
            }
        }
    }

    pub fn rollback(&mut self, tx_id: Vec<u8>) {
        if let Some(id) = self.tx_id.get(&tx_id) {
            if let Some(tx) = self.transactions.get_mut(id) {
                tx.1 = Some(CommitStatus::Rollback);
            }
        }
    }

    pub fn get_tx(&mut self, journal_id: &JournalId) -> &mut RecoverTransaction {
        let transactions = &mut self.transactions;

        let (tx, _) = transactions.entry(journal_id.clone()).or_insert_with(|| {
            (
                RecoverTransaction::recover(journal_id.clone(), RecoverStatus::Partial),
                None,
            )
        });
        tx
    }

    fn set_status(&mut self, id: &JournalId, recover_status: RecoverStatus) {
        match recover_status {
            RecoverStatus::PrepareCommit => {
                self.commit_order.push(id.clone());
            }
            RecoverStatus::Commit => {
                self.in_cleaning_reallocated.insert(id.clone(), Vec::new());
            }
            RecoverStatus::Cleanup => {
                self.in_cleaning_reallocated.remove(id);
            }
            _ => {}
        }
    }

    pub(crate) fn finish_journal_read(&mut self) {
        for (id, pages) in self.in_cleaning_reallocated.drain() {
            if let Some((tx, _)) = self.transactions.get_mut(&id) {
                tx.remove_free_pages(pages);
            }
        }
        for (id, (tx, _)) in &self.transactions {
            self.tx_id.insert(tx.meta_id().clone(), id.clone());
        }
    }

    pub fn final_recover(mut self, persy: &PersyImpl) -> PERes<()> {
        let allocator = &persy.allocator();
        let journal = &persy.journal();
        let address = &persy.address();
        let last_page_before_apply = journal.last_page();
        let mut to_clean_tx = Vec::new();
        for id in self.commit_order {
            if let Some((mut tx, chosen)) = self.transactions.remove(&id) {
                let res = tx.final_recover(persy, chosen, &mut self.pages_use_tracking)?;
                if res {
                    to_clean_tx.push(tx);
                }
            }
        }
        for (_, (mut tx, _)) in self.transactions.drain() {
            let res = tx.final_recover(persy, Some(CommitStatus::Rollback), &mut self.pages_use_tracking)?;
            if res {
                to_clean_tx.push(tx);
            }
        }
        for page in self.pages_use_tracking.freed_pages() {
            if !self.pages_use_tracking.journal_used_page.contains(page) {
                allocator.recover_free(*page)?;
            }
        }
        for tx in to_clean_tx {
            tx.log_cleanup(persy)?;
        }

        let pages = journal.recover_clean(last_page_before_apply)?;
        let last = journal.last_page();
        for page in pages {
            allocator.recover_free(page)?;
        }
        address.recompute_last_pages()?;
        address.flush_segments()?;
        allocator.recover_sync()?;
        let pages = journal.recover_clean(last)?;
        for page in pages {
            allocator.recover_free(page)?;
        }
        allocator.trim_free_at_end()?;
        allocator.recover_sync()?;
        Ok(())
    }
}

impl JournalRead for RecoverImpl {
    fn journal_page(&mut self, allocation: u64) {
        self.pages_use_tracking.journal_used_page.insert(allocation);
        new_allocation(&mut self.in_cleaning_reallocated, allocation)
    }

    fn start(&mut self, journal_id: JournalId, _start: &Start) {
        self.transactions.insert(
            journal_id.clone(),
            (
                RecoverTransaction::recover(journal_id.clone(), RecoverStatus::Started),
                None,
            ),
        );
    }

    fn insert_record(&mut self, journal_id: JournalId, insert: &InsertRecord) {
        self.get_tx(&journal_id).recover_insert(insert);
        self.new_allocation(&journal_id, insert.record_page);
    }

    fn prepare_commit(&mut self, journal_id: JournalId, _prepare_commit: &PrepareCommit) {
        self.get_tx(&journal_id).recover_prepare_commit();
        self.set_status(&journal_id, RecoverStatus::PrepareCommit);
    }

    fn commit(&mut self, journal_id: JournalId, _commit: &Commit) {
        self.get_tx(&journal_id).recover_commit();
        self.set_status(&journal_id, RecoverStatus::Commit);
    }

    fn update_record(&mut self, journal_id: JournalId, update_record: &UpdateRecord) {
        self.get_tx(&journal_id).recover_update(update_record);
        self.new_allocation(&journal_id, update_record.record_page);
    }

    fn delete_record(&mut self, journal_id: JournalId, delete_record: &DeleteRecord) {
        self.get_tx(&journal_id).recover_delete(delete_record);
    }

    fn rollback(&mut self, journal_id: JournalId, _rollback: &Rollback) {
        self.get_tx(&journal_id).recover_rollback_log();
        self.set_status(&journal_id, RecoverStatus::Rollback);
    }

    fn create_segment(&mut self, journal_id: JournalId, create_segment: &CreateSegment) {
        self.get_tx(&journal_id).recover_add(create_segment);
    }

    fn drop_segment(&mut self, journal_id: JournalId, drop_segment: &DropSegment) {
        self.get_tx(&journal_id).recover_drop(drop_segment);
    }

    fn read_record(&mut self, journal_id: JournalId, read_record: &ReadRecord) {
        self.get_tx(&journal_id).recover_read(read_record);
    }

    fn metadata(&mut self, journal_id: JournalId, metadata: &Metadata) {
        self.get_tx(&journal_id).recover_metadata(metadata);
    }

    fn free_page(&mut self, journal_id: JournalId, free_page: &FreedPage) {
        self.get_tx(&journal_id).recover_freed_page(free_page);
    }

    fn new_segment_page(&mut self, journal_id: JournalId, new_segment_page: &NewSegmentPage) {
        self.get_tx(&journal_id).recover_new_segment_page(new_segment_page);
        self.new_allocation(&journal_id, new_segment_page.page);
    }

    fn cleanup(&mut self, journal_id: JournalId, _cleanup: &Cleanup) {
        self.get_tx(&journal_id).recover_cleanup();
        self.set_status(&journal_id, RecoverStatus::Cleanup);
    }

    fn rollback_page(&mut self, journal_id: JournalId, rollback_page: &RollbackPage) {
        self.get_tx(&journal_id).recover_rollback_page(rollback_page);
    }
}