Skip to main content

citadel_sync/
local_reader.rs

1use citadel_core::types::{PageId, PageType};
2use citadel_core::{Result, MERKLE_HASH_SIZE};
3use citadel_page::{branch_node, leaf_node};
4use citadel_txn::manager::TxnManager;
5
6use crate::diff::{DiffEntry, MerkleHash, PageDigest, TreeReader};
7
8/// `TreeReader` implementation for a local database.
9pub struct LocalTreeReader<'a> {
10    manager: &'a TxnManager,
11    root_page: PageId,
12    root_hash: MerkleHash,
13}
14
15impl<'a> LocalTreeReader<'a> {
16    /// Create a reader for the default (main) tree.
17    pub fn new(manager: &'a TxnManager) -> Self {
18        let slot = manager.current_slot();
19        Self {
20            manager,
21            root_page: slot.tree_root,
22            root_hash: slot.merkle_root,
23        }
24    }
25
26    /// Create a reader for a named table's tree.
27    pub fn for_table(manager: &'a TxnManager, root_page: PageId) -> Result<Self> {
28        let root_hash = if root_page.is_valid() {
29            manager.read_page_from_disk(root_page)?.merkle_hash()
30        } else {
31            [0u8; MERKLE_HASH_SIZE]
32        };
33        Ok(Self {
34            manager,
35            root_page,
36            root_hash,
37        })
38    }
39}
40
41impl<'a> TreeReader for LocalTreeReader<'a> {
42    fn root_info(&self) -> Result<(PageId, MerkleHash)> {
43        Ok((self.root_page, self.root_hash))
44    }
45
46    fn page_digest(&self, page_id: PageId) -> Result<PageDigest> {
47        let page = self.manager.read_page_from_disk(page_id)?;
48        let page_type = page
49            .page_type()
50            .ok_or_else(|| citadel_core::Error::InvalidPageType(page.page_type_raw(), page_id))?;
51        let merkle_hash: [u8; MERKLE_HASH_SIZE] = page.merkle_hash();
52        let mut children = Vec::new();
53
54        if page_type == PageType::Branch {
55            for i in 0..page.num_cells() as usize {
56                children.push(branch_node::get_child(&page, i));
57            }
58            let right = page.right_child();
59            if right.is_valid() {
60                children.push(right);
61            }
62        }
63
64        Ok(PageDigest {
65            page_id,
66            page_type,
67            merkle_hash,
68            children,
69        })
70    }
71
72    fn leaf_entries(&self, page_id: PageId) -> Result<Vec<DiffEntry>> {
73        let page = self.manager.read_page_from_disk(page_id)?;
74        let mut entries = Vec::with_capacity(page.num_cells() as usize);
75        for i in 0..page.num_cells() {
76            let cell = leaf_node::read_cell(&page, i);
77            entries.push(DiffEntry {
78                key: cell.key.to_vec(),
79                value: cell.value.to_vec(),
80                val_type: cell.val_type as u8,
81            });
82        }
83        Ok(entries)
84    }
85}