sync_engine/coordinator/merkle_api.rs
1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Public Merkle tree accessor API for external sync.
5//!
6//! This module exposes merkle tree queries for use by external nodes in a
7//! multi-instance deployment. The typical sync flow:
8//!
9//! ```text
10//! Remote Node Local Node
11//! │ │
12//! ├──── "What's your root?" ────────────►│
13//! │◄──── "0xABCD" ─────────────────────┤
14//! │ │
15//! │ (Hmm, mine is 0x1234, different!) │
16//! │ │
17//! ├──── "Children of root?" ────────────►│
18//! │◄──── [{path:"A",hash:"..."}, ...] ───┤
19//! │ │
20//! │ (A matches, B differs!) │
21//! │ │
22//! ├──── "Children of B?" ───────────────►│
23//! │◄──── [{path:"BA",hash:"..."}, ...] ──┤
24//! │ │
25//! │ (Found it! BA.leaf.123 differs) │
26//! │ │
27//! ├──── "Get sync_item BA.leaf.123" ────►│
28//! │◄──── { full CRDT data } ─────────────┤
29//! ```
30
31use std::collections::BTreeMap;
32use tracing::{debug, instrument};
33
34use crate::merkle::MerkleNode;
35use crate::storage::traits::StorageError;
36
37use super::SyncEngine;
38
39/// Result of comparing merkle trees between two nodes.
40#[derive(Debug, Clone)]
41pub struct MerkleDiff {
42 /// Paths where hashes differ (needs sync)
43 pub divergent_paths: Vec<String>,
44 /// Paths that exist locally but not remotely (local additions)
45 pub local_only: Vec<String>,
46 /// Paths that exist remotely but not locally (remote additions)
47 pub remote_only: Vec<String>,
48}
49
50impl SyncEngine {
51 // ═══════════════════════════════════════════════════════════════════════════
52 // Public Merkle API: For external sync between nodes
53 // ═══════════════════════════════════════════════════════════════════════════
54
55 /// Get the current merkle root hash.
56 ///
57 /// Returns the root hash from SQL (ground truth), or from Redis if they match.
58 /// This is the starting point for sync verification.
59 ///
60 /// # Returns
61 /// - `Some([u8; 32])` - The root hash
62 /// - `None` - No data has been written yet (empty tree)
63 #[instrument(skip(self))]
64 pub async fn get_merkle_root(&self) -> Result<Option<[u8; 32]>, StorageError> {
65 // SQL is ground truth - always prefer it
66 if let Some(ref sql_merkle) = self.sql_merkle {
67 return sql_merkle.root_hash().await;
68 }
69
70 // Fall back to Redis if no SQL
71 if let Some(ref redis_merkle) = self.redis_merkle {
72 return redis_merkle.get_hash("").await;
73 }
74
75 Ok(None)
76 }
77
78 /// Get the hash for a specific merkle path.
79 ///
80 /// Serves from Redis if `redis_root == sql_root` (fast path),
81 /// otherwise falls back to SQL (slow path).
82 ///
83 /// # Arguments
84 /// * `path` - The merkle path (e.g., "uk.nhs.patient")
85 ///
86 /// # Returns
87 /// - `Some([u8; 32])` - The hash at this path
88 /// - `None` - Path doesn't exist in the tree
89 #[instrument(skip(self))]
90 pub async fn get_merkle_hash(&self, path: &str) -> Result<Option<[u8; 32]>, StorageError> {
91 // Check if Redis shadow is in sync with SQL ground truth
92 if self.is_merkle_synced().await? {
93 // Fast path: serve from Redis
94 if let Some(ref redis_merkle) = self.redis_merkle {
95 return redis_merkle.get_hash(path).await;
96 }
97 }
98
99 // Slow path: query SQL
100 if let Some(ref sql_merkle) = self.sql_merkle {
101 return sql_merkle.get_hash(path).await;
102 }
103
104 Ok(None)
105 }
106
107 /// Get a full merkle node (hash + children).
108 ///
109 /// Use this for tree traversal during sync.
110 ///
111 /// # Arguments
112 /// * `path` - The merkle path (use "" for root)
113 #[instrument(skip(self))]
114 pub async fn get_merkle_node(&self, path: &str) -> Result<Option<MerkleNode>, StorageError> {
115 // Check if Redis shadow is in sync
116 if self.is_merkle_synced().await? {
117 if let Some(ref redis_merkle) = self.redis_merkle {
118 return redis_merkle.get_node(path).await;
119 }
120 }
121
122 // Fall back to SQL
123 if let Some(ref sql_merkle) = self.sql_merkle {
124 return sql_merkle.get_node(path).await;
125 }
126
127 Ok(None)
128 }
129
130 /// Get children of a merkle path.
131 ///
132 /// Returns a map of `segment -> hash` for all direct children.
133 /// Use this to traverse the tree level by level.
134 ///
135 /// # Arguments
136 /// * `path` - Parent path (use "" for top-level children)
137 ///
138 /// # Example
139 /// ```text
140 /// path="" → {"uk": 0xABC, "us": 0xDEF}
141 /// path="uk" → {"nhs": 0x123, "private": 0x456}
142 /// ```
143 #[instrument(skip(self))]
144 pub async fn get_merkle_children(&self, path: &str) -> Result<BTreeMap<String, [u8; 32]>, StorageError> {
145 // Check if Redis shadow is in sync
146 if self.is_merkle_synced().await? {
147 if let Some(ref redis_merkle) = self.redis_merkle {
148 return redis_merkle.get_children(path).await;
149 }
150 }
151
152 // Fall back to SQL
153 if let Some(ref sql_merkle) = self.sql_merkle {
154 return sql_merkle.get_children(path).await;
155 }
156
157 Ok(BTreeMap::new())
158 }
159
160 /// Find paths where local and remote merkle trees diverge.
161 ///
162 /// This is the main sync algorithm - given remote node's hashes,
163 /// find the minimal set of paths that need to be synced.
164 ///
165 /// # Arguments
166 /// * `remote_nodes` - Pairs of (path, hash) from the remote node
167 ///
168 /// # Returns
169 /// Paths where hashes differ (need to fetch/push data)
170 ///
171 /// # Algorithm
172 /// For each remote (path, hash) pair:
173 /// 1. Get local hash for that path
174 /// 2. If hashes match → subtree is synced, skip
175 /// 3. If hashes differ → add to divergent list
176 /// 4. If local missing → add to remote_only
177 /// 5. If remote missing → add to local_only
178 #[instrument(skip(self, remote_nodes), fields(remote_count = remote_nodes.len()))]
179 pub async fn find_divergent_paths(
180 &self,
181 remote_nodes: &[(String, [u8; 32])],
182 ) -> Result<MerkleDiff, StorageError> {
183 let mut divergent_paths = Vec::new();
184 let remote_only = Vec::new(); // Paths remote has, we don't
185 let local_only = Vec::new(); // Paths we have, remote doesn't
186
187 for (path, remote_hash) in remote_nodes {
188 let local_hash = self.get_merkle_hash(path).await?;
189
190 match local_hash {
191 Some(local) if local == *remote_hash => {
192 // Hashes match - this subtree is synced
193 debug!(path = %path, "Merkle path synced");
194 }
195 Some(local) => {
196 // Hashes differ - need to sync this path
197 debug!(
198 path = %path,
199 local = %hex::encode(local),
200 remote = %hex::encode(remote_hash),
201 "Merkle path diverged"
202 );
203 divergent_paths.push(path.clone());
204 }
205 None => {
206 // We don't have this path at all
207 debug!(path = %path, "Path exists on remote only");
208 divergent_paths.push(path.clone());
209 }
210 }
211 }
212
213 Ok(MerkleDiff {
214 divergent_paths,
215 local_only,
216 remote_only,
217 })
218 }
219
220 /// Check if Redis merkle tree is in sync with SQL ground truth.
221 ///
222 /// This is used to determine whether we can serve merkle queries from Redis
223 /// (fast) or need to fall back to SQL (slow but authoritative).
224 #[instrument(skip(self))]
225 pub async fn is_merkle_synced(&self) -> Result<bool, StorageError> {
226 let sql_root = if let Some(ref sql_merkle) = self.sql_merkle {
227 sql_merkle.root_hash().await?
228 } else {
229 return Ok(false);
230 };
231
232 let redis_root = if let Some(ref redis_merkle) = self.redis_merkle {
233 redis_merkle.get_hash("").await?
234 } else {
235 return Ok(false);
236 };
237
238 Ok(sql_root == redis_root)
239 }
240
241 /// Drill down the merkle tree to find all divergent leaf paths.
242 ///
243 /// Starting from a known-divergent path, recursively descend until
244 /// we find the actual leaves that need syncing.
245 ///
246 /// # Arguments
247 /// * `start_path` - A path known to have divergent hashes
248 /// * `remote_children` - Function to get children from remote node
249 ///
250 /// # Returns
251 /// List of leaf paths that need to be synced
252 #[instrument(skip(self, remote_children))]
253 pub async fn drill_down_divergence<F, Fut>(
254 &self,
255 start_path: &str,
256 remote_children: F,
257 ) -> Result<Vec<String>, StorageError>
258 where
259 F: Fn(String) -> Fut,
260 Fut: std::future::Future<Output = Result<BTreeMap<String, [u8; 32]>, StorageError>>,
261 {
262 let mut divergent_leaves = Vec::new();
263 let mut paths_to_check = vec![start_path.to_string()];
264
265 while let Some(path) = paths_to_check.pop() {
266 let local_children = self.get_merkle_children(&path).await?;
267 let remote = remote_children(path.clone()).await?;
268
269 // Find which children differ
270 for (segment, remote_hash) in &remote {
271 let child_path = if path.is_empty() {
272 segment.clone()
273 } else {
274 format!("{}.{}", path, segment)
275 };
276
277 match local_children.get(segment) {
278 Some(local_hash) if local_hash == remote_hash => {
279 // Child is synced
280 }
281 Some(_) => {
282 // Child differs - check if leaf or drill deeper
283 let node = self.get_merkle_node(&child_path).await?;
284 if node.map(|n| n.is_leaf).unwrap_or(true) {
285 divergent_leaves.push(child_path);
286 } else {
287 paths_to_check.push(child_path);
288 }
289 }
290 None => {
291 // We don't have this child - it's a missing subtree
292 divergent_leaves.push(child_path);
293 }
294 }
295 }
296
297 // Check for children we have that remote doesn't
298 for segment in local_children.keys() {
299 if !remote.contains_key(segment) {
300 let child_path = if path.is_empty() {
301 segment.clone()
302 } else {
303 format!("{}.{}", path, segment)
304 };
305 // Remote missing this - they need it from us
306 divergent_leaves.push(child_path);
307 }
308 }
309 }
310
311 Ok(divergent_leaves)
312 }
313}
314
315#[cfg(test)]
316mod tests {
317 use super::*;
318
319 #[test]
320 fn test_merkle_diff_struct() {
321 let diff = MerkleDiff {
322 divergent_paths: vec!["uk.nhs".to_string()],
323 local_only: vec!["uk.private".to_string()],
324 remote_only: vec!["us.medicare".to_string()],
325 };
326
327 assert_eq!(diff.divergent_paths.len(), 1);
328 assert_eq!(diff.local_only.len(), 1);
329 assert_eq!(diff.remote_only.len(), 1);
330 }
331}