sync_engine/merkle/
path_tree.rs

1//! Path-based Merkle tree for hierarchical sync verification.
2//!
3//! Uses the natural tree structure from reverse DNS object IDs:
4//! `uk.nhs.patient.record.123` becomes a path in the tree.
5//!
6//! Stored in Redis as a "shadow" alongside data - data can be evicted,
7//! but Merkle nodes persist (they're tiny: 32 bytes per node).
8
9use sha2::{Digest, Sha256};
10use std::collections::BTreeMap;
11use tracing::instrument;
12
13/// A node in the path-based Merkle tree.
14#[derive(Debug, Clone)]
15pub struct MerkleNode {
16    /// Hash of this node (computed from children or leaf value)
17    pub hash: [u8; 32],
18    /// Child segment -> child hash (empty for leaves)
19    pub children: BTreeMap<String, [u8; 32]>,
20    /// True if this is a leaf (actual item), false if interior node
21    pub is_leaf: bool,
22}
23
24impl MerkleNode {
25    /// Create a leaf node from a version hash.
26    pub fn leaf(version_hash: [u8; 32]) -> Self {
27        Self {
28            hash: version_hash,
29            children: BTreeMap::new(),
30            is_leaf: true,
31        }
32    }
33
34    /// Create an interior node, computing hash from children.
35    pub fn interior(children: BTreeMap<String, [u8; 32]>) -> Self {
36        let hash = Self::compute_hash(&children);
37        Self {
38            hash,
39            children,
40            is_leaf: false,
41        }
42    }
43
44    /// Compute hash from sorted children (deterministic).
45    fn compute_hash(children: &BTreeMap<String, [u8; 32]>) -> [u8; 32] {
46        let mut hasher = Sha256::new();
47        // BTreeMap is already sorted by key
48        for (segment, child_hash) in children {
49            hasher.update(segment.as_bytes());
50            hasher.update(b":");
51            hasher.update(child_hash);
52            hasher.update(b";");
53        }
54        hasher.finalize().into()
55    }
56
57    /// Recompute hash after children changed.
58    pub fn recompute_hash(&mut self) {
59        if !self.is_leaf {
60            self.hash = Self::compute_hash(&self.children);
61        }
62    }
63
64    /// Add or update a child, returns true if hash changed.
65    pub fn set_child(&mut self, segment: String, child_hash: [u8; 32]) -> bool {
66        let old_hash = self.hash;
67        self.children.insert(segment, child_hash);
68        self.recompute_hash();
69        self.hash != old_hash
70    }
71
72    /// Remove a child, returns true if hash changed.
73    pub fn remove_child(&mut self, segment: &str) -> bool {
74        let old_hash = self.hash;
75        self.children.remove(segment);
76        self.recompute_hash();
77        self.hash != old_hash
78    }
79}
80
81/// Manages path-based Merkle tree operations.
82///
83/// This is the local computation layer - storage is handled by RedisMerkleStore.
84#[derive(Debug)]
85pub struct PathMerkle;
86
87impl PathMerkle {
88    /// Split an object_id into path segments.
89    ///
90    /// `uk.nhs.patient.record.123` -> `["uk", "nhs", "patient", "record", "123"]`
91    #[inline]
92    pub fn split_path(object_id: &str) -> Vec<&str> {
93        object_id.split('.').collect()
94    }
95
96    /// Get the parent prefix for a path.
97    ///
98    /// `uk.nhs.patient.record.123` -> `uk.nhs.patient.record`
99    /// `uk` -> `""` (root)
100    pub fn parent_prefix(path: &str) -> &str {
101        match path.rfind('.') {
102            Some(idx) => &path[..idx],
103            None => "",
104        }
105    }
106
107    /// Get all ancestor prefixes for a path (excluding root, including self).
108    ///
109    /// `uk.nhs.patient` -> `["uk", "uk.nhs", "uk.nhs.patient"]`
110    pub fn ancestor_prefixes(path: &str) -> Vec<String> {
111        let segments: Vec<&str> = path.split('.').collect();
112        let mut prefixes = Vec::with_capacity(segments.len());
113        let mut current = String::new();
114        
115        for (i, segment) in segments.iter().enumerate() {
116            if i > 0 {
117                current.push('.');
118            }
119            current.push_str(segment);
120            prefixes.push(current.clone());
121        }
122        
123        prefixes
124    }
125
126    /// Compute the leaf hash for a sync item.
127    ///
128    /// Hash = SHA256(object_id || version || timestamp || payload_hash)
129    #[instrument(skip(payload_hash))]
130    pub fn leaf_hash(
131        object_id: &str,
132        version: u64,
133        timestamp: i64,
134        payload_hash: &[u8; 32],
135    ) -> [u8; 32] {
136        let mut hasher = Sha256::new();
137        hasher.update(object_id.as_bytes());
138        hasher.update(b"|");
139        hasher.update(version.to_le_bytes());
140        hasher.update(b"|");
141        hasher.update(timestamp.to_le_bytes());
142        hasher.update(b"|");
143        hasher.update(payload_hash);
144        hasher.finalize().into()
145    }
146
147    /// Compute hash of a payload (for leaf_hash input).
148    pub fn payload_hash(payload: &[u8]) -> [u8; 32] {
149        Sha256::digest(payload).into()
150    }
151}
152
153/// Batch of Merkle updates to apply atomically.
154#[derive(Debug, Default)]
155pub struct MerkleBatch {
156    /// Leaf updates: object_id -> new hash (or None to delete)
157    pub leaves: BTreeMap<String, Option<[u8; 32]>>,
158}
159
160impl MerkleBatch {
161    pub fn new() -> Self {
162        Self::default()
163    }
164
165    /// Add a leaf update.
166    pub fn insert(&mut self, object_id: String, hash: [u8; 32]) {
167        self.leaves.insert(object_id, Some(hash));
168    }
169
170    /// Mark a leaf for deletion.
171    pub fn delete(&mut self, object_id: String) {
172        self.leaves.insert(object_id, None);
173    }
174
175    pub fn is_empty(&self) -> bool {
176        self.leaves.is_empty()
177    }
178
179    pub fn len(&self) -> usize {
180        self.leaves.len()
181    }
182
183    /// Get all affected prefixes (for bubble-up updates).
184    ///
185    /// Returns prefixes in bottom-up order (leaves first, root last).
186    pub fn affected_prefixes(&self) -> Vec<String> {
187        use std::collections::BTreeSet;
188        
189        let mut all_prefixes = BTreeSet::new();
190        
191        for object_id in self.leaves.keys() {
192            for prefix in PathMerkle::ancestor_prefixes(object_id) {
193                all_prefixes.insert(prefix);
194            }
195        }
196        
197        // Convert to vec and reverse (we want bottom-up for processing)
198        let mut prefixes: Vec<_> = all_prefixes.into_iter().collect();
199        // Sort by depth (more dots = deeper = process first)
200        prefixes.sort_by(|a, b| {
201            let depth_a = a.matches('.').count();
202            let depth_b = b.matches('.').count();
203            depth_b.cmp(&depth_a) // Reverse: deeper first
204        });
205        
206        prefixes
207    }
208}
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213
214    #[test]
215    fn test_split_path() {
216        assert_eq!(
217            PathMerkle::split_path("uk.nhs.patient.record.123"),
218            vec!["uk", "nhs", "patient", "record", "123"]
219        );
220    }
221
222    #[test]
223    fn test_parent_prefix() {
224        assert_eq!(PathMerkle::parent_prefix("uk.nhs.patient"), "uk.nhs");
225        assert_eq!(PathMerkle::parent_prefix("uk"), "");
226    }
227
228    #[test]
229    fn test_ancestor_prefixes() {
230        assert_eq!(
231            PathMerkle::ancestor_prefixes("uk.nhs.patient"),
232            vec!["uk", "uk.nhs", "uk.nhs.patient"]
233        );
234    }
235
236    #[test]
237    fn test_merkle_node_hash_deterministic() {
238        let mut children = BTreeMap::new();
239        children.insert("a".to_string(), [1u8; 32]);
240        children.insert("b".to_string(), [2u8; 32]);
241        
242        let node1 = MerkleNode::interior(children.clone());
243        let node2 = MerkleNode::interior(children);
244        
245        assert_eq!(node1.hash, node2.hash);
246    }
247
248    #[test]
249    fn test_merkle_batch_affected_prefixes() {
250        let mut batch = MerkleBatch::new();
251        batch.insert("uk.nhs.patient.123".to_string(), [1u8; 32]);
252        batch.insert("uk.nhs.doctor.456".to_string(), [2u8; 32]);
253        
254        let prefixes = batch.affected_prefixes();
255        
256        // Should be sorted deepest first
257        assert!(prefixes.iter().position(|p| p == "uk.nhs.patient")
258            < prefixes.iter().position(|p| p == "uk.nhs"));
259        assert!(prefixes.iter().position(|p| p == "uk.nhs")
260            < prefixes.iter().position(|p| p == "uk"));
261    }
262}