citadeldb-txn 0.11.0

Transaction manager with MVCC, commit protocol, and pending-free chain for Citadel
Documentation
use super::*;
use citadel_core::types::{PageType, ValueType};
use citadel_page::leaf_node;

fn make_leaf(page_id: PageId, txn_id: TxnId, entries: &[(&[u8], &[u8])]) -> Page {
    let mut page = Page::new(page_id, PageType::Leaf, txn_id);
    for (key, value) in entries {
        let cell = leaf_node::build_cell(key, ValueType::Inline, value);
        page.insert_cell_at(page.num_cells(), &cell);
    }
    page
}

fn make_branch(
    page_id: PageId,
    txn_id: TxnId,
    children: &[(PageId, &[u8])],
    right_child: PageId,
) -> Page {
    let mut page = Page::new(page_id, PageType::Branch, txn_id);
    for (child, sep_key) in children {
        let cell = citadel_page::branch_node::build_cell(*child, sep_key);
        page.insert_cell_at(page.num_cells(), &cell);
    }
    page.set_right_child(right_child);
    page
}

#[test]
fn leaf_hash_deterministic() {
    let txn = TxnId(1);
    let leaf = make_leaf(PageId(0), txn, &[(b"key1", b"val1"), (b"key2", b"val2")]);
    let h1 = compute_leaf_hash(&leaf);

    let leaf2 = make_leaf(PageId(0), txn, &[(b"key1", b"val1"), (b"key2", b"val2")]);
    let h2 = compute_leaf_hash(&leaf2);

    assert_eq!(h1, h2);
    assert_ne!(h1, [0u8; MERKLE_HASH_SIZE]);
}

#[test]
fn leaf_hash_changes_with_data() {
    let txn = TxnId(1);
    let leaf1 = make_leaf(PageId(0), txn, &[(b"key1", b"val1")]);
    let leaf2 = make_leaf(PageId(0), txn, &[(b"key1", b"val2")]);
    let leaf3 = make_leaf(PageId(0), txn, &[(b"key2", b"val1")]);

    let h1 = compute_leaf_hash(&leaf1);
    let h2 = compute_leaf_hash(&leaf2);
    let h3 = compute_leaf_hash(&leaf3);

    assert_ne!(h1, h2);
    assert_ne!(h1, h3);
    assert_ne!(h2, h3);
}

#[test]
fn empty_leaf_hash() {
    let leaf = make_leaf(PageId(0), TxnId(1), &[]);
    let h = compute_leaf_hash(&leaf);
    assert_ne!(h, [0u8; MERKLE_HASH_SIZE]);
}

#[test]
fn single_leaf_tree() {
    let txn = TxnId(1);
    let leaf = make_leaf(PageId(0), txn, &[(b"a", b"1")]);

    let mut pages: FxHashMap<PageId, Page> = FxHashMap::default();
    pages.insert(PageId(0), leaf);

    let root_hash = compute_tree_merkle(&mut pages, PageId(0), txn, &|_| {
        panic!("should not read disk")
    })
    .unwrap();

    assert_ne!(root_hash, [0u8; MERKLE_HASH_SIZE]);
    assert_eq!(pages[&PageId(0)].merkle_hash(), root_hash);
}

#[test]
fn branch_hash_from_children() {
    let txn = TxnId(1);
    let left = make_leaf(PageId(1), txn, &[(b"a", b"1")]);
    let right = make_leaf(PageId(2), txn, &[(b"c", b"3")]);
    let branch = make_branch(PageId(0), txn, &[(PageId(1), b"b")], PageId(2));

    let mut pages: FxHashMap<PageId, Page> = FxHashMap::default();
    pages.insert(PageId(0), branch);
    pages.insert(PageId(1), left);
    pages.insert(PageId(2), right);

    let root_hash = compute_tree_merkle(&mut pages, PageId(0), txn, &|_| {
        panic!("should not read disk")
    })
    .unwrap();

    assert_ne!(root_hash, [0u8; MERKLE_HASH_SIZE]);
    assert_ne!(pages[&PageId(0)].merkle_hash(), [0u8; MERKLE_HASH_SIZE]);
    assert_ne!(pages[&PageId(1)].merkle_hash(), [0u8; MERKLE_HASH_SIZE]);
    assert_ne!(pages[&PageId(2)].merkle_hash(), [0u8; MERKLE_HASH_SIZE]);
}

#[test]
fn branch_hash_changes_when_child_changes() {
    let txn = TxnId(1);

    let left1 = make_leaf(PageId(1), txn, &[(b"a", b"1")]);
    let right1 = make_leaf(PageId(2), txn, &[(b"c", b"3")]);
    let branch1 = make_branch(PageId(0), txn, &[(PageId(1), b"b")], PageId(2));

    let mut pages1: FxHashMap<PageId, Page> = FxHashMap::default();
    pages1.insert(PageId(0), branch1);
    pages1.insert(PageId(1), left1);
    pages1.insert(PageId(2), right1);

    let h1 = compute_tree_merkle(&mut pages1, PageId(0), txn, &|_| panic!("no disk")).unwrap();

    let left2 = make_leaf(PageId(1), txn, &[(b"a", b"2")]);
    let right2 = make_leaf(PageId(2), txn, &[(b"c", b"3")]);
    let branch2 = make_branch(PageId(0), txn, &[(PageId(1), b"b")], PageId(2));

    let mut pages2: FxHashMap<PageId, Page> = FxHashMap::default();
    pages2.insert(PageId(0), branch2);
    pages2.insert(PageId(1), left2);
    pages2.insert(PageId(2), right2);

    let h2 = compute_tree_merkle(&mut pages2, PageId(0), txn, &|_| panic!("no disk")).unwrap();

    assert_ne!(h1, h2);
}

#[test]
fn clean_page_uses_existing_hash() {
    let dirty_txn = TxnId(5);
    let clean_txn = TxnId(3);

    let mut clean_leaf = make_leaf(PageId(2), clean_txn, &[(b"x", b"y")]);
    let expected_hash = [0xAB; MERKLE_HASH_SIZE];
    clean_leaf.set_merkle_hash(&expected_hash);

    let dirty_leaf = make_leaf(PageId(1), dirty_txn, &[(b"a", b"1")]);

    let branch = make_branch(PageId(0), dirty_txn, &[(PageId(1), b"m")], PageId(2));

    let mut pages: FxHashMap<PageId, Page> = FxHashMap::default();
    pages.insert(PageId(0), branch);
    pages.insert(PageId(1), dirty_leaf);
    pages.insert(PageId(2), clean_leaf);

    let root_hash = compute_tree_merkle(&mut pages, PageId(0), dirty_txn, &|_| {
        panic!("should not read disk")
    })
    .unwrap();

    assert_ne!(root_hash, [0u8; MERKLE_HASH_SIZE]);

    assert_eq!(pages[&PageId(2)].merkle_hash(), expected_hash);
}

#[test]
fn reads_clean_hash_from_pool() {
    let dirty_txn = TxnId(5);
    let clean_txn = TxnId(3);

    let mut clean_leaf = make_leaf(PageId(2), clean_txn, &[(b"x", b"y")]);
    let precomputed_hash = compute_leaf_hash(&clean_leaf);
    clean_leaf.set_merkle_hash(&precomputed_hash);

    let dirty_leaf = make_leaf(PageId(1), dirty_txn, &[(b"a", b"1")]);
    let branch = make_branch(PageId(0), dirty_txn, &[(PageId(1), b"m")], PageId(2));

    let mut pages: FxHashMap<PageId, Page> = FxHashMap::default();
    pages.insert(PageId(0), branch);
    pages.insert(PageId(1), dirty_leaf);

    let root_hash = compute_tree_merkle(&mut pages, PageId(0), dirty_txn, &|page_id| {
        assert_eq!(page_id, PageId(2));
        Ok(precomputed_hash)
    })
    .unwrap();

    assert_ne!(root_hash, [0u8; MERKLE_HASH_SIZE]);

    assert!(!pages.contains_key(&PageId(2)));
}

#[test]
fn merkle_hash_page_header_roundtrip() {
    let mut page = Page::new(PageId(0), PageType::Leaf, TxnId(1));
    assert_eq!(page.merkle_hash(), [0u8; MERKLE_HASH_SIZE]);

    let hash = [0x42u8; MERKLE_HASH_SIZE];
    page.set_merkle_hash(&hash);
    assert_eq!(page.merkle_hash(), hash);
}

#[test]
fn hash_covered_by_page_checksum() {
    let mut page = Page::new(PageId(0), PageType::Leaf, TxnId(1));
    let hash = [0x42u8; MERKLE_HASH_SIZE];
    page.set_merkle_hash(&hash);
    page.update_checksum();
    assert!(page.verify_checksum());

    let mut bad_hash = hash;
    bad_hash[0] ^= 0xFF;
    page.set_merkle_hash(&bad_hash);
    assert!(!page.verify_checksum());
}

#[test]
fn three_level_tree() {
    let txn = TxnId(1);

    let l0 = make_leaf(PageId(3), txn, &[(b"a", b"1")]);
    let l1 = make_leaf(PageId(4), txn, &[(b"c", b"3")]);
    let l2 = make_leaf(PageId(5), txn, &[(b"e", b"5")]);
    let l3 = make_leaf(PageId(6), txn, &[(b"g", b"7")]);

    let b0 = make_branch(PageId(1), txn, &[(PageId(3), b"b")], PageId(4));
    let b1 = make_branch(PageId(2), txn, &[(PageId(5), b"f")], PageId(6));

    let root = make_branch(PageId(0), txn, &[(PageId(1), b"d")], PageId(2));

    let mut pages: FxHashMap<PageId, Page> = FxHashMap::default();
    pages.insert(PageId(0), root);
    pages.insert(PageId(1), b0);
    pages.insert(PageId(2), b1);
    pages.insert(PageId(3), l0);
    pages.insert(PageId(4), l1);
    pages.insert(PageId(5), l2);
    pages.insert(PageId(6), l3);

    let root_hash = compute_tree_merkle(&mut pages, PageId(0), txn, &|_| {
        panic!("no disk reads expected")
    })
    .unwrap();

    assert_ne!(root_hash, [0u8; MERKLE_HASH_SIZE]);

    for i in 0..7 {
        assert_ne!(
            pages[&PageId(i)].merkle_hash(),
            [0u8; MERKLE_HASH_SIZE],
            "page {i} should have a hash"
        );
    }
}

#[test]
fn identical_trees_same_hash() {
    let txn = TxnId(1);
    let entries: &[(&[u8], &[u8])] = &[(b"alpha", b"one"), (b"beta", b"two"), (b"gamma", b"three")];

    let leaf1 = make_leaf(PageId(0), txn, entries);
    let mut pages1: FxHashMap<PageId, Page> = FxHashMap::default();
    pages1.insert(PageId(0), leaf1);
    let h1 = compute_tree_merkle(&mut pages1, PageId(0), txn, &|_| panic!()).unwrap();

    let leaf2 = make_leaf(PageId(0), txn, entries);
    let mut pages2: FxHashMap<PageId, Page> = FxHashMap::default();
    pages2.insert(PageId(0), leaf2);
    let h2 = compute_tree_merkle(&mut pages2, PageId(0), txn, &|_| panic!()).unwrap();

    assert_eq!(h1, h2);
}

#[test]
fn leaf_with_tombstone() {
    let txn = TxnId(1);
    let mut page = Page::new(PageId(0), PageType::Leaf, txn);

    let cell1 = leaf_node::build_cell(b"alive", ValueType::Inline, b"data");
    page.insert_cell_at(0, &cell1);

    let cell2 = leaf_node::build_cell(b"dead", ValueType::Tombstone, &[]);
    page.insert_cell_at(1, &cell2);

    let h1 = compute_leaf_hash(&page);

    let mut page2 = Page::new(PageId(0), PageType::Leaf, txn);
    let cell3 = leaf_node::build_cell(b"alive", ValueType::Inline, b"data");
    page2.insert_cell_at(0, &cell3);

    let h2 = compute_leaf_hash(&page2);

    assert_ne!(h1, h2);
}

#[test]
fn truncation_is_28_bytes() {
    let h = blake3::hash(b"test");
    let truncated = truncate_hash(&h);
    assert_eq!(truncated.len(), 28);
    assert_eq!(&truncated[..], &h.as_bytes()[..28]);
}

#[test]
fn large_leaf_hash() {
    let txn = TxnId(1);
    let entries: Vec<(Vec<u8>, Vec<u8>)> = (0..100u32)
        .map(|i| {
            (
                format!("key-{i:05}").into_bytes(),
                format!("val-{i:05}").into_bytes(),
            )
        })
        .collect();
    let entry_refs: Vec<(&[u8], &[u8])> = entries
        .iter()
        .map(|(k, v)| (k.as_slice(), v.as_slice()))
        .collect();

    let leaf = make_leaf(PageId(0), txn, &entry_refs);
    let h = compute_leaf_hash(&leaf);
    assert_ne!(h, [0u8; MERKLE_HASH_SIZE]);
}

#[test]
fn recomputing_gives_same_hash() {
    let txn = TxnId(1);
    let leaf = make_leaf(PageId(0), txn, &[(b"k", b"v")]);
    let mut pages: FxHashMap<PageId, Page> = FxHashMap::default();
    pages.insert(PageId(0), leaf);

    let h1 = compute_tree_merkle(&mut pages, PageId(0), txn, &|_| panic!()).unwrap();

    pages
        .get_mut(&PageId(0))
        .unwrap()
        .set_merkle_hash(&[0u8; MERKLE_HASH_SIZE]);
    let h2 = compute_tree_merkle(&mut pages, PageId(0), txn, &|_| panic!()).unwrap();

    assert_eq!(h1, h2);
}