sync_engine/coordinator/merkle_api.rs
1//! Public Merkle tree accessor API for external sync.
2//!
3//! This module exposes merkle tree queries for use by external nodes in a
4//! multi-instance deployment. The typical sync flow:
5//!
6//! ```text
7//! Remote Node Local Node
8//! │ │
9//! ├──── "What's your root?" ────────────►│
10//! │◄──── "0xABCD" ─────────────────────┤
11//! │ │
12//! │ (Hmm, mine is 0x1234, different!) │
13//! │ │
14//! ├──── "Children of root?" ────────────►│
15//! │◄──── [{path:"A",hash:"..."}, ...] ───┤
16//! │ │
17//! │ (A matches, B differs!) │
18//! │ │
19//! ├──── "Children of B?" ───────────────►│
20//! │◄──── [{path:"BA",hash:"..."}, ...] ──┤
21//! │ │
22//! │ (Found it! BA.leaf.123 differs) │
23//! │ │
24//! ├──── "Get sync_item BA.leaf.123" ────►│
25//! │◄──── { full CRDT data } ─────────────┤
26//! ```
27
28use std::collections::BTreeMap;
29use tracing::{debug, instrument};
30
31use crate::merkle::MerkleNode;
32use crate::storage::traits::StorageError;
33
34use super::SyncEngine;
35
36/// Result of comparing merkle trees between two nodes.
37#[derive(Debug, Clone)]
38pub struct MerkleDiff {
39 /// Paths where hashes differ (needs sync)
40 pub divergent_paths: Vec<String>,
41 /// Paths that exist locally but not remotely (local additions)
42 pub local_only: Vec<String>,
43 /// Paths that exist remotely but not locally (remote additions)
44 pub remote_only: Vec<String>,
45}
46
47impl SyncEngine {
48 // ═══════════════════════════════════════════════════════════════════════════
49 // Public Merkle API: For external sync between nodes
50 // ═══════════════════════════════════════════════════════════════════════════
51
52 /// Get the current merkle root hash.
53 ///
54 /// Returns the root hash from SQL (ground truth), or from Redis if they match.
55 /// This is the starting point for sync verification.
56 ///
57 /// # Returns
58 /// - `Some([u8; 32])` - The root hash
59 /// - `None` - No data has been written yet (empty tree)
60 #[instrument(skip(self))]
61 pub async fn get_merkle_root(&self) -> Result<Option<[u8; 32]>, StorageError> {
62 // SQL is ground truth - always prefer it
63 if let Some(ref sql_merkle) = self.sql_merkle {
64 return sql_merkle.root_hash().await;
65 }
66
67 // Fall back to Redis if no SQL
68 if let Some(ref redis_merkle) = self.redis_merkle {
69 return redis_merkle.get_hash("").await;
70 }
71
72 Ok(None)
73 }
74
75 /// Get the hash for a specific merkle path.
76 ///
77 /// Serves from Redis if `redis_root == sql_root` (fast path),
78 /// otherwise falls back to SQL (slow path).
79 ///
80 /// # Arguments
81 /// * `path` - The merkle path (e.g., "uk.nhs.patient")
82 ///
83 /// # Returns
84 /// - `Some([u8; 32])` - The hash at this path
85 /// - `None` - Path doesn't exist in the tree
86 #[instrument(skip(self))]
87 pub async fn get_merkle_hash(&self, path: &str) -> Result<Option<[u8; 32]>, StorageError> {
88 // Check if Redis shadow is in sync with SQL ground truth
89 if self.is_merkle_synced().await? {
90 // Fast path: serve from Redis
91 if let Some(ref redis_merkle) = self.redis_merkle {
92 return redis_merkle.get_hash(path).await;
93 }
94 }
95
96 // Slow path: query SQL
97 if let Some(ref sql_merkle) = self.sql_merkle {
98 return sql_merkle.get_hash(path).await;
99 }
100
101 Ok(None)
102 }
103
104 /// Get a full merkle node (hash + children).
105 ///
106 /// Use this for tree traversal during sync.
107 ///
108 /// # Arguments
109 /// * `path` - The merkle path (use "" for root)
110 #[instrument(skip(self))]
111 pub async fn get_merkle_node(&self, path: &str) -> Result<Option<MerkleNode>, StorageError> {
112 // Check if Redis shadow is in sync
113 if self.is_merkle_synced().await? {
114 if let Some(ref redis_merkle) = self.redis_merkle {
115 return redis_merkle.get_node(path).await;
116 }
117 }
118
119 // Fall back to SQL
120 if let Some(ref sql_merkle) = self.sql_merkle {
121 return sql_merkle.get_node(path).await;
122 }
123
124 Ok(None)
125 }
126
127 /// Get children of a merkle path.
128 ///
129 /// Returns a map of `segment -> hash` for all direct children.
130 /// Use this to traverse the tree level by level.
131 ///
132 /// # Arguments
133 /// * `path` - Parent path (use "" for top-level children)
134 ///
135 /// # Example
136 /// ```text
137 /// path="" → {"uk": 0xABC, "us": 0xDEF}
138 /// path="uk" → {"nhs": 0x123, "private": 0x456}
139 /// ```
140 #[instrument(skip(self))]
141 pub async fn get_merkle_children(&self, path: &str) -> Result<BTreeMap<String, [u8; 32]>, StorageError> {
142 // Check if Redis shadow is in sync
143 if self.is_merkle_synced().await? {
144 if let Some(ref redis_merkle) = self.redis_merkle {
145 return redis_merkle.get_children(path).await;
146 }
147 }
148
149 // Fall back to SQL
150 if let Some(ref sql_merkle) = self.sql_merkle {
151 return sql_merkle.get_children(path).await;
152 }
153
154 Ok(BTreeMap::new())
155 }
156
157 /// Find paths where local and remote merkle trees diverge.
158 ///
159 /// This is the main sync algorithm - given remote node's hashes,
160 /// find the minimal set of paths that need to be synced.
161 ///
162 /// # Arguments
163 /// * `remote_nodes` - Pairs of (path, hash) from the remote node
164 ///
165 /// # Returns
166 /// Paths where hashes differ (need to fetch/push data)
167 ///
168 /// # Algorithm
169 /// For each remote (path, hash) pair:
170 /// 1. Get local hash for that path
171 /// 2. If hashes match → subtree is synced, skip
172 /// 3. If hashes differ → add to divergent list
173 /// 4. If local missing → add to remote_only
174 /// 5. If remote missing → add to local_only
175 #[instrument(skip(self, remote_nodes), fields(remote_count = remote_nodes.len()))]
176 pub async fn find_divergent_paths(
177 &self,
178 remote_nodes: &[(String, [u8; 32])],
179 ) -> Result<MerkleDiff, StorageError> {
180 let mut divergent_paths = Vec::new();
181 let remote_only = Vec::new(); // Paths remote has, we don't
182 let local_only = Vec::new(); // Paths we have, remote doesn't
183
184 for (path, remote_hash) in remote_nodes {
185 let local_hash = self.get_merkle_hash(path).await?;
186
187 match local_hash {
188 Some(local) if local == *remote_hash => {
189 // Hashes match - this subtree is synced
190 debug!(path = %path, "Merkle path synced");
191 }
192 Some(local) => {
193 // Hashes differ - need to sync this path
194 debug!(
195 path = %path,
196 local = %hex::encode(local),
197 remote = %hex::encode(remote_hash),
198 "Merkle path diverged"
199 );
200 divergent_paths.push(path.clone());
201 }
202 None => {
203 // We don't have this path at all
204 debug!(path = %path, "Path exists on remote only");
205 divergent_paths.push(path.clone());
206 }
207 }
208 }
209
210 Ok(MerkleDiff {
211 divergent_paths,
212 local_only,
213 remote_only,
214 })
215 }
216
217 /// Check if Redis merkle tree is in sync with SQL ground truth.
218 ///
219 /// This is used to determine whether we can serve merkle queries from Redis
220 /// (fast) or need to fall back to SQL (slow but authoritative).
221 #[instrument(skip(self))]
222 pub async fn is_merkle_synced(&self) -> Result<bool, StorageError> {
223 let sql_root = if let Some(ref sql_merkle) = self.sql_merkle {
224 sql_merkle.root_hash().await?
225 } else {
226 return Ok(false);
227 };
228
229 let redis_root = if let Some(ref redis_merkle) = self.redis_merkle {
230 redis_merkle.get_hash("").await?
231 } else {
232 return Ok(false);
233 };
234
235 Ok(sql_root == redis_root)
236 }
237
238 /// Drill down the merkle tree to find all divergent leaf paths.
239 ///
240 /// Starting from a known-divergent path, recursively descend until
241 /// we find the actual leaves that need syncing.
242 ///
243 /// # Arguments
244 /// * `start_path` - A path known to have divergent hashes
245 /// * `remote_children` - Function to get children from remote node
246 ///
247 /// # Returns
248 /// List of leaf paths that need to be synced
249 #[instrument(skip(self, remote_children))]
250 pub async fn drill_down_divergence<F, Fut>(
251 &self,
252 start_path: &str,
253 remote_children: F,
254 ) -> Result<Vec<String>, StorageError>
255 where
256 F: Fn(String) -> Fut,
257 Fut: std::future::Future<Output = Result<BTreeMap<String, [u8; 32]>, StorageError>>,
258 {
259 let mut divergent_leaves = Vec::new();
260 let mut paths_to_check = vec![start_path.to_string()];
261
262 while let Some(path) = paths_to_check.pop() {
263 let local_children = self.get_merkle_children(&path).await?;
264 let remote = remote_children(path.clone()).await?;
265
266 // Find which children differ
267 for (segment, remote_hash) in &remote {
268 let child_path = if path.is_empty() {
269 segment.clone()
270 } else {
271 format!("{}.{}", path, segment)
272 };
273
274 match local_children.get(segment) {
275 Some(local_hash) if local_hash == remote_hash => {
276 // Child is synced
277 }
278 Some(_) => {
279 // Child differs - check if leaf or drill deeper
280 let node = self.get_merkle_node(&child_path).await?;
281 if node.map(|n| n.is_leaf).unwrap_or(true) {
282 divergent_leaves.push(child_path);
283 } else {
284 paths_to_check.push(child_path);
285 }
286 }
287 None => {
288 // We don't have this child - it's a missing subtree
289 divergent_leaves.push(child_path);
290 }
291 }
292 }
293
294 // Check for children we have that remote doesn't
295 for segment in local_children.keys() {
296 if !remote.contains_key(segment) {
297 let child_path = if path.is_empty() {
298 segment.clone()
299 } else {
300 format!("{}.{}", path, segment)
301 };
302 // Remote missing this - they need it from us
303 divergent_leaves.push(child_path);
304 }
305 }
306 }
307
308 Ok(divergent_leaves)
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315
316 #[test]
317 fn test_merkle_diff_struct() {
318 let diff = MerkleDiff {
319 divergent_paths: vec!["uk.nhs".to_string()],
320 local_only: vec!["uk.private".to_string()],
321 remote_only: vec!["us.medicare".to_string()],
322 };
323
324 assert_eq!(diff.divergent_paths.len(), 1);
325 assert_eq!(diff.local_only.len(), 1);
326 assert_eq!(diff.remote_only.len(), 1);
327 }
328}