sync_engine/coordinator/merkle_api.rs
1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Public Merkle tree accessor API for external sync.
5//!
6//! This module exposes merkle tree queries for use by external nodes in a
7//! multi-instance deployment. The typical sync flow:
8//!
9//! ```text
10//! Remote Node Local Node
11//! │ │
12//! ├──── "What's your root?" ────────────►│
13//! │◄──── "0xABCD" ─────────────────────┤
14//! │ │
15//! │ (Hmm, mine is 0x1234, different!) │
16//! │ │
17//! ├──── "Children of root?" ────────────►│
18//! │◄──── [{path:"A",hash:"..."}, ...] ───┤
19//! │ │
20//! │ (A matches, B differs!) │
21//! │ │
22//! ├──── "Children of B?" ───────────────►│
23//! │◄──── [{path:"BA",hash:"..."}, ...] ──┤
24//! │ │
25//! │ (Found it! BA.leaf.123 differs) │
26//! │ │
27//! ├──── "Get sync_item BA.leaf.123" ────►│
28//! │◄──── { full CRDT data } ─────────────┤
29//! ```
30
31use std::collections::BTreeMap;
32use tracing::{debug, instrument};
33
34use crate::merkle::MerkleNode;
35use crate::storage::traits::StorageError;
36
37use super::SyncEngine;
38
39/// Result of comparing merkle trees between two nodes.
40#[derive(Debug, Clone)]
41pub struct MerkleDiff {
42 /// Paths where hashes differ (needs sync)
43 pub divergent_paths: Vec<String>,
44 /// Paths that exist locally but not remotely (local additions)
45 pub local_only: Vec<String>,
46 /// Paths that exist remotely but not locally (remote additions)
47 pub remote_only: Vec<String>,
48}
49
50impl SyncEngine {
51 // ═══════════════════════════════════════════════════════════════════════════
52 // Public Merkle API: For external sync between nodes
53 // ═══════════════════════════════════════════════════════════════════════════
54
55 /// Get the current merkle root hash.
56 ///
57 /// Returns the root hash from SQL (ground truth), or from Redis if they match.
58 /// This is the starting point for sync verification.
59 ///
60 /// # Returns
61 /// - `Some([u8; 32])` - The root hash
62 /// - `None` - No data has been written yet (empty tree)
63 #[instrument(skip(self))]
64 pub async fn get_merkle_root(&self) -> Result<Option<[u8; 32]>, StorageError> {
65 // SQL is ground truth - always prefer it
66 if let Some(ref sql_merkle) = self.sql_merkle {
67 return sql_merkle.root_hash().await;
68 }
69
70 // Fall back to Redis if no SQL
71 if let Some(ref merkle_cache) = self.merkle_cache {
72 return merkle_cache.get_hash("").await;
73 }
74
75 Ok(None)
76 }
77
78 /// Get the hash for a specific merkle path.
79 ///
80 /// Serves from Redis if `redis_root == sql_root` (fast path),
81 /// otherwise falls back to SQL (slow path).
82 ///
83 /// # Arguments
84 /// * `path` - The merkle path (e.g., "uk.nhs.patient")
85 ///
86 /// # Returns
87 /// - `Some([u8; 32])` - The hash at this path
88 /// - `None` - Path doesn't exist in the tree
89 #[instrument(skip(self))]
90 pub async fn get_merkle_hash(&self, path: &str) -> Result<Option<[u8; 32]>, StorageError> {
91 // Check if Redis shadow is in sync with SQL ground truth
92 if self.is_merkle_synced().await? {
93 // Fast path: serve from Redis
94 if let Some(ref merkle_cache) = self.merkle_cache {
95 return merkle_cache.get_hash(path).await;
96 }
97 }
98
99 // Slow path: query SQL
100 if let Some(ref sql_merkle) = self.sql_merkle {
101 return sql_merkle.get_hash(path).await;
102 }
103
104 Ok(None)
105 }
106
107 /// Get a full merkle node (hash + children).
108 ///
109 /// Use this for tree traversal during sync.
110 ///
111 /// # Arguments
112 /// * `path` - The merkle path (use "" for root)
113 #[instrument(skip(self))]
114 pub async fn get_merkle_node(&self, path: &str) -> Result<Option<MerkleNode>, StorageError> {
115 // Check if Redis shadow is in sync
116 if self.is_merkle_synced().await? {
117 if let Some(ref merkle_cache) = self.merkle_cache {
118 return merkle_cache.get_node(path).await;
119 }
120 }
121
122 // Fall back to SQL
123 if let Some(ref sql_merkle) = self.sql_merkle {
124 return sql_merkle.get_node(path).await;
125 }
126
127 Ok(None)
128 }
129
130 /// Get children of a merkle path.
131 ///
132 /// Returns a map of `segment -> hash` for all direct children.
133 /// Use this to traverse the tree level by level.
134 ///
135 /// # Arguments
136 /// * `path` - Parent path (use "" for top-level children)
137 ///
138 /// # Example
139 /// ```text
140 /// path="" → {"uk": 0xABC, "us": 0xDEF}
141 /// path="uk" → {"nhs": 0x123, "private": 0x456}
142 /// ```
143 #[instrument(skip(self))]
144 pub async fn get_merkle_children(&self, path: &str) -> Result<BTreeMap<String, [u8; 32]>, StorageError> {
145 // Check if Redis shadow is in sync
146 if self.is_merkle_synced().await? {
147 if let Some(ref merkle_cache) = self.merkle_cache {
148 return merkle_cache.get_children(path).await;
149 }
150 }
151
152 // Fall back to SQL
153 if let Some(ref sql_merkle) = self.sql_merkle {
154 return sql_merkle.get_children(path).await;
155 }
156
157 Ok(BTreeMap::new())
158 }
159
160 /// Find paths where local and remote merkle trees diverge.
161 ///
162 /// This is the main sync algorithm - given remote node's hashes,
163 /// find the minimal set of paths that need to be synced.
164 ///
165 /// # Arguments
166 /// * `remote_nodes` - Pairs of (path, hash) from the remote node
167 ///
168 /// # Returns
169 /// Paths where hashes differ (need to fetch/push data)
170 ///
171 /// # Algorithm
172 /// For each remote (path, hash) pair:
173 /// 1. Get local hash for that path
174 /// 2. If hashes match → subtree is synced, skip
175 /// 3. If hashes differ → add to divergent list
176 /// 4. If local missing → add to remote_only
177 /// 5. If remote missing → add to local_only
178 #[instrument(skip(self, remote_nodes), fields(remote_count = remote_nodes.len()))]
179 pub async fn find_divergent_paths(
180 &self,
181 remote_nodes: &[(String, [u8; 32])],
182 ) -> Result<MerkleDiff, StorageError> {
183 let mut divergent_paths = Vec::new();
184 let remote_only = Vec::new(); // Paths remote has, we don't
185 let local_only = Vec::new(); // Paths we have, remote doesn't
186
187 for (path, remote_hash) in remote_nodes {
188 let local_hash = self.get_merkle_hash(path).await?;
189
190 match local_hash {
191 Some(local) if local == *remote_hash => {
192 // Hashes match - this subtree is synced
193 debug!(path = %path, "Merkle path synced");
194 }
195 Some(local) => {
196 // Hashes differ - need to sync this path
197 debug!(
198 path = %path,
199 local = %hex::encode(local),
200 remote = %hex::encode(remote_hash),
201 "Merkle path diverged"
202 );
203 divergent_paths.push(path.clone());
204 }
205 None => {
206 // We don't have this path at all
207 debug!(path = %path, "Path exists on remote only");
208 divergent_paths.push(path.clone());
209 }
210 }
211 }
212
213 Ok(MerkleDiff {
214 divergent_paths,
215 local_only,
216 remote_only,
217 })
218 }
219
220 /// Check if Redis merkle tree is in sync with SQL ground truth.
221 ///
222 /// This is used to determine whether we can serve merkle queries from Redis
223 /// (fast) or need to fall back to SQL (slow but authoritative).
224 #[instrument(skip(self))]
225 pub async fn is_merkle_synced(&self) -> Result<bool, StorageError> {
226 let sql_root = if let Some(ref sql_merkle) = self.sql_merkle {
227 sql_merkle.root_hash().await?
228 } else {
229 return Ok(false);
230 };
231
232 let redis_root = if let Some(ref merkle_cache) = self.merkle_cache {
233 merkle_cache.get_hash("").await?
234 } else {
235 return Ok(false);
236 };
237
238 Ok(sql_root == redis_root)
239 }
240
241 /// Drill down the merkle tree to find all divergent leaf paths.
242 ///
243 /// Starting from a known-divergent path, recursively descend until
244 /// we find the actual leaves that need syncing.
245 ///
246 /// # Arguments
247 /// * `start_path` - A path known to have divergent hashes
248 /// * `remote_children` - Function to get children from remote node
249 ///
250 /// # Returns
251 /// List of leaf paths that need to be synced
252 #[instrument(skip(self, remote_children))]
253 pub async fn drill_down_divergence<F, Fut>(
254 &self,
255 start_path: &str,
256 remote_children: F,
257 ) -> Result<Vec<String>, StorageError>
258 where
259 F: Fn(String) -> Fut,
260 Fut: std::future::Future<Output = Result<BTreeMap<String, [u8; 32]>, StorageError>>,
261 {
262 let mut divergent_leaves = Vec::new();
263 let mut paths_to_check = vec![start_path.to_string()];
264
265 while let Some(path) = paths_to_check.pop() {
266 let local_children = self.get_merkle_children(&path).await?;
267 let remote = remote_children(path.clone()).await?;
268
269 // Find which children differ
270 for (segment, remote_hash) in &remote {
271 let child_path = if path.is_empty() {
272 segment.clone()
273 } else {
274 format!("{}.{}", path, segment)
275 };
276
277 match local_children.get(segment) {
278 Some(local_hash) if local_hash == remote_hash => {
279 // Child is synced
280 }
281 Some(_) => {
282 // Child differs - check if leaf or drill deeper
283 let node = self.get_merkle_node(&child_path).await?;
284 if node.map(|n| n.is_leaf).unwrap_or(true) {
285 divergent_leaves.push(child_path);
286 } else {
287 paths_to_check.push(child_path);
288 }
289 }
290 None => {
291 // We don't have this child - it's a missing subtree
292 divergent_leaves.push(child_path);
293 }
294 }
295 }
296
297 // Check for children we have that remote doesn't
298 for segment in local_children.keys() {
299 if !remote.contains_key(segment) {
300 let child_path = if path.is_empty() {
301 segment.clone()
302 } else {
303 format!("{}.{}", path, segment)
304 };
305 // Remote missing this - they need it from us
306 divergent_leaves.push(child_path);
307 }
308 }
309 }
310
311 Ok(divergent_leaves)
312 }
313
314 // ═══════════════════════════════════════════════════════════════════════════
315 // Branch Hygiene API: For safe comparison during active writes
316 // ═══════════════════════════════════════════════════════════════════════════
317
318 /// Count dirty items within a branch prefix.
319 ///
320 /// Returns 0 if the branch is "clean" (all merkle hashes up-to-date),
321 /// meaning it's safe to compare with peers without churn concerns.
322 ///
323 /// # Arguments
324 /// * `prefix` - Branch prefix (e.g., "uk.nhs" matches "uk.nhs.patient.123")
325 ///
326 /// # Example
327 /// ```rust,no_run
328 /// # async fn example(engine: &sync_engine::SyncEngine) {
329 /// let dirty = engine.branch_dirty_count("uk.nhs").await.unwrap();
330 /// if dirty == 0 {
331 /// // Safe to compare this branch with peer
332 /// }
333 /// # }
334 /// ```
335 #[instrument(skip(self))]
336 pub async fn branch_dirty_count(&self, prefix: &str) -> Result<u64, StorageError> {
337 if let Some(ref sql_store) = self.sql_store {
338 return sql_store.branch_dirty_count(prefix).await;
339 }
340 // No SQL store means no ground truth - assume clean
341 Ok(0)
342 }
343
344 /// Get top-level prefixes that have dirty items pending merkle recalc.
345 ///
346 /// Branches NOT in this list are "clean" and safe to compare with peers.
347 /// Use this for opportunistic branch sync - sync what's stable, skip what's churning.
348 ///
349 /// # Returns
350 /// List of dirty top-level prefixes (e.g., ["uk", "us"] if those have pending writes)
351 ///
352 /// # Example
353 /// ```rust,no_run
354 /// # async fn example(engine: &sync_engine::SyncEngine) {
355 /// let dirty_branches = engine.get_dirty_prefixes().await.unwrap();
356 /// let all_branches = engine.get_top_level_branches().await.unwrap();
357 ///
358 /// for (branch, hash) in all_branches {
359 /// if !dirty_branches.contains(&branch) {
360 /// // This branch is clean - safe to compare with peer
361 /// }
362 /// }
363 /// # }
364 /// ```
365 #[instrument(skip(self))]
366 pub async fn get_dirty_prefixes(&self) -> Result<Vec<String>, StorageError> {
367 if let Some(ref sql_store) = self.sql_store {
368 return sql_store.get_dirty_prefixes().await;
369 }
370 Ok(Vec::new())
371 }
372
373 /// Get top-level branches with their hashes.
374 ///
375 /// Returns children of the root node - the first level of the merkle tree.
376 /// Combine with [`get_dirty_prefixes()`] to find clean branches for comparison.
377 #[instrument(skip(self))]
378 pub async fn get_top_level_branches(&self) -> Result<BTreeMap<String, [u8; 32]>, StorageError> {
379 self.get_merkle_children("").await
380 }
381
382 /// Get clean branches with their hashes.
383 ///
384 /// Convenience method that returns only branches that are fully up-to-date
385 /// (no pending merkle recalculations). These are safe to compare with peers.
386 #[instrument(skip(self))]
387 pub async fn get_clean_branches(&self) -> Result<BTreeMap<String, [u8; 32]>, StorageError> {
388 let all_branches = self.get_top_level_branches().await?;
389 let dirty_prefixes = self.get_dirty_prefixes().await?;
390
391 let clean: BTreeMap<String, [u8; 32]> = all_branches
392 .into_iter()
393 .filter(|(branch, _)| !dirty_prefixes.contains(branch))
394 .collect();
395
396 debug!(
397 total = clean.len() + dirty_prefixes.len(),
398 clean = clean.len(),
399 dirty = dirty_prefixes.len(),
400 "Branch hygiene check"
401 );
402
403 Ok(clean)
404 }
405
406 /// Check if the entire merkle tree is clean (no pending recalculations).
407 ///
408 /// Returns `true` only when all items have `merkle_dirty = 0`.
409 /// This is stricter than [`is_merkle_synced()`] which only compares roots.
410 #[instrument(skip(self))]
411 pub async fn is_fully_clean(&self) -> Result<bool, StorageError> {
412 if let Some(ref sql_store) = self.sql_store {
413 return sql_store.has_dirty_merkle().await.map(|has_dirty| !has_dirty);
414 }
415 Ok(true)
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 use super::*;
422
423 #[test]
424 fn test_merkle_diff_struct() {
425 let diff = MerkleDiff {
426 divergent_paths: vec!["uk.nhs".to_string()],
427 local_only: vec!["uk.private".to_string()],
428 remote_only: vec!["us.medicare".to_string()],
429 };
430
431 assert_eq!(diff.divergent_paths.len(), 1);
432 assert_eq!(diff.local_only.len(), 1);
433 assert_eq!(diff.remote_only.len(), 1);
434 }
435}