Skip to main content

citadeldb_sync/
diff.rs

1use std::collections::VecDeque;
2
3use citadel_core::types::{PageId, PageType};
4use citadel_core::{Result, MERKLE_HASH_SIZE};
5
6/// 28-byte BLAKE3 Merkle hash.
7pub type MerkleHash = [u8; MERKLE_HASH_SIZE];
8
9/// Digest of a single page — hash, type, and children.
10#[derive(Debug, Clone)]
11pub struct PageDigest {
12    pub page_id: PageId,
13    pub page_type: PageType,
14    pub merkle_hash: MerkleHash,
15    /// Child page IDs for branch pages. Empty for leaves.
16    pub children: Vec<PageId>,
17}
18
19/// A key-value entry from a leaf page.
20#[derive(Debug, Clone, PartialEq, Eq)]
21pub struct DiffEntry {
22    pub key: Vec<u8>,
23    pub value: Vec<u8>,
24    pub val_type: u8,
25}
26
27/// Result of a Merkle diff operation.
28#[derive(Debug, Clone)]
29pub struct DiffResult {
30    /// Entries from source that differ from target.
31    pub entries: Vec<DiffEntry>,
32    /// Number of pages whose hashes were compared.
33    pub pages_compared: u64,
34    /// Number of subtrees skipped because hashes matched.
35    pub subtrees_skipped: u64,
36}
37
38impl DiffResult {
39    pub fn is_empty(&self) -> bool {
40        self.entries.is_empty()
41    }
42
43    pub fn len(&self) -> usize {
44        self.entries.len()
45    }
46}
47
48/// Abstraction for reading tree structure during diff.
49///
50/// For local databases, `LocalTreeReader` reads from `TxnManager` directly.
51/// For remote databases, the transport layer implements this via message exchange.
52pub trait TreeReader {
53    /// Root page ID and its Merkle hash.
54    fn root_info(&self) -> Result<(PageId, MerkleHash)>;
55
56    /// Read a page digest (hash + type + children).
57    fn page_digest(&self, page_id: PageId) -> Result<PageDigest>;
58
59    /// Read all leaf entries from a leaf page.
60    fn leaf_entries(&self, page_id: PageId) -> Result<Vec<DiffEntry>>;
61
62    /// Collect all leaf entries from a subtree.
63    fn subtree_entries(&self, page_id: PageId) -> Result<Vec<DiffEntry>> {
64        let digest = self.page_digest(page_id)?;
65        match digest.page_type {
66            PageType::Leaf => self.leaf_entries(page_id),
67            PageType::Branch => {
68                let mut entries = Vec::new();
69                for child in &digest.children {
70                    entries.extend(self.subtree_entries(*child)?);
71                }
72                Ok(entries)
73            }
74            _ => Ok(Vec::new()),
75        }
76    }
77}
78
79/// Compute the Merkle diff between two trees.
80///
81/// Returns entries from `source` that are different from or missing in `target`.
82/// Walks both trees in parallel using BFS, skipping entire subtrees when
83/// Merkle hashes match.
84pub fn merkle_diff(
85    source: &dyn TreeReader,
86    target: &dyn TreeReader,
87) -> Result<DiffResult> {
88    let (src_root, src_root_hash) = source.root_info()?;
89    let (tgt_root, tgt_root_hash) = target.root_info()?;
90
91    let mut result = DiffResult {
92        entries: Vec::new(),
93        pages_compared: 0,
94        subtrees_skipped: 0,
95    };
96
97    // Roots match — databases are identical
98    if src_root_hash == tgt_root_hash {
99        return Ok(result);
100    }
101
102    let mut queue: VecDeque<(PageId, PageId)> = VecDeque::new();
103    queue.push_back((src_root, tgt_root));
104
105    while let Some((src_pid, tgt_pid)) = queue.pop_front() {
106        let src_digest = source.page_digest(src_pid)?;
107        let tgt_digest = target.page_digest(tgt_pid)?;
108        result.pages_compared += 1;
109
110        if src_digest.merkle_hash == tgt_digest.merkle_hash {
111            result.subtrees_skipped += 1;
112            continue;
113        }
114
115        match (src_digest.page_type, tgt_digest.page_type) {
116            (PageType::Leaf, PageType::Leaf) => {
117                result.entries.extend(source.leaf_entries(src_pid)?);
118            }
119            (PageType::Branch, PageType::Branch) => {
120                if src_digest.children.len() == tgt_digest.children.len() {
121                    for (sc, tc) in src_digest.children.iter().zip(&tgt_digest.children) {
122                        queue.push_back((*sc, *tc));
123                    }
124                } else {
125                    result.entries.extend(source.subtree_entries(src_pid)?);
126                }
127            }
128            _ => {
129                result.entries.extend(source.subtree_entries(src_pid)?);
130            }
131        }
132    }
133
134    Ok(result)
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140
141    #[test]
142    fn diff_entry_equality() {
143        let a = DiffEntry {
144            key: b"key1".to_vec(),
145            value: b"val1".to_vec(),
146            val_type: 0,
147        };
148        let b = DiffEntry {
149            key: b"key1".to_vec(),
150            value: b"val1".to_vec(),
151            val_type: 0,
152        };
153        let c = DiffEntry {
154            key: b"key1".to_vec(),
155            value: b"val2".to_vec(),
156            val_type: 0,
157        };
158        assert_eq!(a, b);
159        assert_ne!(a, c);
160    }
161
162    #[test]
163    fn diff_result_empty() {
164        let r = DiffResult {
165            entries: vec![],
166            pages_compared: 0,
167            subtrees_skipped: 0,
168        };
169        assert!(r.is_empty());
170        assert_eq!(r.len(), 0);
171    }
172
173    #[test]
174    fn diff_result_non_empty() {
175        let r = DiffResult {
176            entries: vec![DiffEntry {
177                key: b"k".to_vec(),
178                value: b"v".to_vec(),
179                val_type: 0,
180            }],
181            pages_compared: 1,
182            subtrees_skipped: 0,
183        };
184        assert!(!r.is_empty());
185        assert_eq!(r.len(), 1);
186    }
187
188    #[test]
189    fn page_digest_leaf_has_no_children() {
190        let d = PageDigest {
191            page_id: PageId(0),
192            page_type: PageType::Leaf,
193            merkle_hash: [0u8; MERKLE_HASH_SIZE],
194            children: vec![],
195        };
196        assert!(d.children.is_empty());
197    }
198
199    #[test]
200    fn page_digest_branch_has_children() {
201        let d = PageDigest {
202            page_id: PageId(0),
203            page_type: PageType::Branch,
204            merkle_hash: [1u8; MERKLE_HASH_SIZE],
205            children: vec![PageId(1), PageId(2), PageId(3)],
206        };
207        assert_eq!(d.children.len(), 3);
208    }
209}