citadeldb-txn 0.9.0

Transaction manager with MVCC, commit protocol, and pending-free chain for Citadel
Documentation
use std::collections::HashSet;

use citadel_core::types::{PageId, PageType, ValueType};
use citadel_core::Result;
use citadel_page::page::Page;
use citadel_page::{branch_node, leaf_node};

use crate::catalog::TableDescriptor;
use crate::manager::TxnManager;

#[derive(Debug)]
pub struct IntegrityReport {
    pub pages_checked: u64,
    pub errors: Vec<IntegrityError>,
}

impl IntegrityReport {
    pub fn is_ok(&self) -> bool {
        self.errors.is_empty()
    }
}

#[derive(Debug)]
pub enum IntegrityError {
    PageReadFailed {
        page: PageId,
        error: String,
    },
    KeyOrderViolation {
        page: PageId,
        index: usize,
    },
    DuplicatePageRef(PageId),
    EntryCountMismatch {
        expected: u64,
        actual: u64,
    },
    InvalidPageType {
        page: PageId,
        expected: &'static str,
    },
}

pub(crate) fn run_integrity_check(mgr: &TxnManager) -> Result<IntegrityReport> {
    let slot = mgr.current_slot();
    let mut visited = HashSet::new();
    let mut errors = Vec::new();
    let mut pages_checked: u64 = 0;

    let default_count = walk_tree(
        mgr,
        slot.tree_root,
        &mut visited,
        &mut errors,
        &mut pages_checked,
    );

    if default_count != slot.tree_entries {
        errors.push(IntegrityError::EntryCountMismatch {
            expected: slot.tree_entries,
            actual: default_count,
        });
    }

    if slot.catalog_root.is_valid() {
        walk_catalog(
            mgr,
            slot.catalog_root,
            &mut visited,
            &mut errors,
            &mut pages_checked,
        );
    }

    if slot.pending_free_root.is_valid() {
        walk_chain(
            mgr,
            slot.pending_free_root,
            &mut visited,
            &mut errors,
            &mut pages_checked,
        );
    }

    Ok(IntegrityReport {
        pages_checked,
        errors,
    })
}

fn walk_tree(
    mgr: &TxnManager,
    root: PageId,
    visited: &mut HashSet<PageId>,
    errors: &mut Vec<IntegrityError>,
    pages_checked: &mut u64,
) -> u64 {
    let mut entry_count: u64 = 0;
    let mut stack = vec![root];

    while let Some(page_id) = stack.pop() {
        if !visited.insert(page_id) {
            errors.push(IntegrityError::DuplicatePageRef(page_id));
            continue;
        }

        let page = match mgr.read_page_from_disk(page_id) {
            Ok(p) => p,
            Err(e) => {
                errors.push(IntegrityError::PageReadFailed {
                    page: page_id,
                    error: e.to_string(),
                });
                continue;
            }
        };
        *pages_checked += 1;

        match page.page_type() {
            Some(PageType::Leaf) => {
                let n = page.num_cells();
                entry_count += n as u64;
                check_leaf_ordering(&page, page_id, errors);
            }
            Some(PageType::Branch) => {
                for i in 0..page.num_cells() as usize {
                    stack.push(branch_node::get_child(&page, i));
                }
                let right = page.right_child();
                if right.is_valid() {
                    stack.push(right);
                }
            }
            _ => {
                errors.push(IntegrityError::InvalidPageType {
                    page: page_id,
                    expected: "Leaf or Branch",
                });
            }
        }
    }

    entry_count
}

fn check_leaf_ordering(page: &Page, page_id: PageId, errors: &mut Vec<IntegrityError>) {
    let n = page.num_cells();
    for i in 1..n {
        let prev = leaf_node::read_cell(page, i - 1);
        let curr = leaf_node::read_cell(page, i);
        if prev.key >= curr.key {
            errors.push(IntegrityError::KeyOrderViolation {
                page: page_id,
                index: i as usize,
            });
        }
    }
}

fn walk_catalog(
    mgr: &TxnManager,
    catalog_root: PageId,
    visited: &mut HashSet<PageId>,
    errors: &mut Vec<IntegrityError>,
    pages_checked: &mut u64,
) {
    let table_roots = collect_named_table_roots(mgr, catalog_root, visited, errors, pages_checked);

    for root in table_roots {
        walk_tree(mgr, root, visited, errors, pages_checked);
    }
}

fn collect_named_table_roots(
    mgr: &TxnManager,
    catalog_root: PageId,
    visited: &mut HashSet<PageId>,
    errors: &mut Vec<IntegrityError>,
    pages_checked: &mut u64,
) -> Vec<PageId> {
    let mut roots = Vec::new();
    let mut stack = vec![catalog_root];

    while let Some(page_id) = stack.pop() {
        if !visited.insert(page_id) {
            errors.push(IntegrityError::DuplicatePageRef(page_id));
            continue;
        }

        let page = match mgr.read_page_from_disk(page_id) {
            Ok(p) => p,
            Err(e) => {
                errors.push(IntegrityError::PageReadFailed {
                    page: page_id,
                    error: e.to_string(),
                });
                continue;
            }
        };
        *pages_checked += 1;

        match page.page_type() {
            Some(PageType::Leaf) => {
                for i in 0..page.num_cells() {
                    let cell = leaf_node::read_cell(&page, i);
                    if cell.val_type != ValueType::Tombstone && cell.value.len() >= 4 {
                        let desc = TableDescriptor::deserialize(cell.value);
                        if desc.root_page.is_valid() {
                            roots.push(desc.root_page);
                        }
                    }
                }
            }
            Some(PageType::Branch) => {
                for i in 0..page.num_cells() as usize {
                    stack.push(branch_node::get_child(&page, i));
                }
                let right = page.right_child();
                if right.is_valid() {
                    stack.push(right);
                }
            }
            _ => {
                errors.push(IntegrityError::InvalidPageType {
                    page: page_id,
                    expected: "Leaf or Branch (catalog)",
                });
            }
        }
    }

    roots
}

fn walk_chain(
    mgr: &TxnManager,
    root: PageId,
    visited: &mut HashSet<PageId>,
    errors: &mut Vec<IntegrityError>,
    pages_checked: &mut u64,
) {
    let mut current = root;
    while current.is_valid() {
        if !visited.insert(current) {
            errors.push(IntegrityError::DuplicatePageRef(current));
            break;
        }

        let page = match mgr.read_page_from_disk(current) {
            Ok(p) => p,
            Err(e) => {
                errors.push(IntegrityError::PageReadFailed {
                    page: current,
                    error: e.to_string(),
                });
                break;
            }
        };
        *pages_checked += 1;

        current = page.right_child();
    }
}