Skip to main content

citadel_sync/
diff.rs

1use std::collections::VecDeque;
2
3use citadel_core::types::{PageId, PageType};
4use citadel_core::{Result, MERKLE_HASH_SIZE};
5
6/// 28-byte BLAKE3 Merkle hash.
7pub type MerkleHash = [u8; MERKLE_HASH_SIZE];
8
9/// Digest of a single page — hash, type, and children.
10#[derive(Debug, Clone)]
11pub struct PageDigest {
12    pub page_id: PageId,
13    pub page_type: PageType,
14    pub merkle_hash: MerkleHash,
15    /// Child page IDs for branch pages. Empty for leaves.
16    pub children: Vec<PageId>,
17}
18
19/// A key-value entry from a leaf page.
20#[derive(Debug, Clone, PartialEq, Eq)]
21pub struct DiffEntry {
22    pub key: Vec<u8>,
23    pub value: Vec<u8>,
24    pub val_type: u8,
25}
26
27/// Result of a Merkle diff operation.
28#[derive(Debug, Clone)]
29pub struct DiffResult {
30    /// Entries from source that differ from target.
31    pub entries: Vec<DiffEntry>,
32    /// Number of pages whose hashes were compared.
33    pub pages_compared: u64,
34    /// Number of subtrees skipped because hashes matched.
35    pub subtrees_skipped: u64,
36}
37
38impl DiffResult {
39    pub fn is_empty(&self) -> bool {
40        self.entries.is_empty()
41    }
42
43    pub fn len(&self) -> usize {
44        self.entries.len()
45    }
46}
47
48/// Abstraction for reading tree structure during diff.
49///
50/// For local databases, `LocalTreeReader` reads from `TxnManager` directly.
51/// For remote databases, the transport layer implements this via message exchange.
52pub trait TreeReader {
53    /// Root page ID and its Merkle hash.
54    fn root_info(&self) -> Result<(PageId, MerkleHash)>;
55
56    /// Read a page digest (hash + type + children).
57    fn page_digest(&self, page_id: PageId) -> Result<PageDigest>;
58
59    /// Read all leaf entries from a leaf page.
60    fn leaf_entries(&self, page_id: PageId) -> Result<Vec<DiffEntry>>;
61
62    /// Collect all leaf entries from a subtree.
63    fn subtree_entries(&self, page_id: PageId) -> Result<Vec<DiffEntry>> {
64        let digest = self.page_digest(page_id)?;
65        match digest.page_type {
66            PageType::Leaf => self.leaf_entries(page_id),
67            PageType::Branch => {
68                let mut entries = Vec::new();
69                for child in &digest.children {
70                    entries.extend(self.subtree_entries(*child)?);
71                }
72                Ok(entries)
73            }
74            _ => Ok(Vec::new()),
75        }
76    }
77}
78
79/// Compute the Merkle diff between two trees.
80///
81/// Returns entries from `source` that are different from or missing in `target`.
82/// Walks both trees in parallel using BFS, skipping entire subtrees when
83/// Merkle hashes match.
84pub fn merkle_diff(source: &dyn TreeReader, target: &dyn TreeReader) -> Result<DiffResult> {
85    let (src_root, src_root_hash) = source.root_info()?;
86    let (tgt_root, tgt_root_hash) = target.root_info()?;
87
88    let mut result = DiffResult {
89        entries: Vec::new(),
90        pages_compared: 0,
91        subtrees_skipped: 0,
92    };
93
94    // Roots match — databases are identical
95    if src_root_hash == tgt_root_hash {
96        return Ok(result);
97    }
98
99    let mut queue: VecDeque<(PageId, PageId)> = VecDeque::new();
100    queue.push_back((src_root, tgt_root));
101
102    while let Some((src_pid, tgt_pid)) = queue.pop_front() {
103        let src_digest = source.page_digest(src_pid)?;
104        let tgt_digest = target.page_digest(tgt_pid)?;
105        result.pages_compared += 1;
106
107        if src_digest.merkle_hash == tgt_digest.merkle_hash {
108            result.subtrees_skipped += 1;
109            continue;
110        }
111
112        match (src_digest.page_type, tgt_digest.page_type) {
113            (PageType::Leaf, PageType::Leaf) => {
114                result.entries.extend(source.leaf_entries(src_pid)?);
115            }
116            (PageType::Branch, PageType::Branch) => {
117                if src_digest.children.len() == tgt_digest.children.len() {
118                    for (sc, tc) in src_digest.children.iter().zip(&tgt_digest.children) {
119                        queue.push_back((*sc, *tc));
120                    }
121                } else {
122                    result.entries.extend(source.subtree_entries(src_pid)?);
123                }
124            }
125            _ => {
126                result.entries.extend(source.subtree_entries(src_pid)?);
127            }
128        }
129    }
130
131    Ok(result)
132}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137
138    #[test]
139    fn diff_entry_equality() {
140        let a = DiffEntry {
141            key: b"key1".to_vec(),
142            value: b"val1".to_vec(),
143            val_type: 0,
144        };
145        let b = DiffEntry {
146            key: b"key1".to_vec(),
147            value: b"val1".to_vec(),
148            val_type: 0,
149        };
150        let c = DiffEntry {
151            key: b"key1".to_vec(),
152            value: b"val2".to_vec(),
153            val_type: 0,
154        };
155        assert_eq!(a, b);
156        assert_ne!(a, c);
157    }
158
159    #[test]
160    fn diff_result_empty() {
161        let r = DiffResult {
162            entries: vec![],
163            pages_compared: 0,
164            subtrees_skipped: 0,
165        };
166        assert!(r.is_empty());
167        assert_eq!(r.len(), 0);
168    }
169
170    #[test]
171    fn diff_result_non_empty() {
172        let r = DiffResult {
173            entries: vec![DiffEntry {
174                key: b"k".to_vec(),
175                value: b"v".to_vec(),
176                val_type: 0,
177            }],
178            pages_compared: 1,
179            subtrees_skipped: 0,
180        };
181        assert!(!r.is_empty());
182        assert_eq!(r.len(), 1);
183    }
184
185    #[test]
186    fn page_digest_leaf_has_no_children() {
187        let d = PageDigest {
188            page_id: PageId(0),
189            page_type: PageType::Leaf,
190            merkle_hash: [0u8; MERKLE_HASH_SIZE],
191            children: vec![],
192        };
193        assert!(d.children.is_empty());
194    }
195
196    #[test]
197    fn page_digest_branch_has_children() {
198        let d = PageDigest {
199            page_id: PageId(0),
200            page_type: PageType::Branch,
201            merkle_hash: [1u8; MERKLE_HASH_SIZE],
202            children: vec![PageId(1), PageId(2), PageId(3)],
203        };
204        assert_eq!(d.children.len(), 3);
205    }
206}