sync_engine/coordinator/
merkle_api.rs

1//! Public Merkle tree accessor API for external sync.
2//!
3//! This module exposes merkle tree queries for use by external nodes in a 
4//! multi-instance deployment. The typical sync flow:
5//!
6//! ```text
7//! Remote Node                           Local Node
8//!     │                                      │
9//!     ├──── "What's your root?" ────────────►│
10//!     │◄──── "0xABCD" ─────────────────────┤
11//!     │                                      │
12//!     │  (Hmm, mine is 0x1234, different!)   │
13//!     │                                      │
14//!     ├──── "Children of root?" ────────────►│
15//!     │◄──── [{path:"A",hash:"..."}, ...] ───┤
16//!     │                                      │
17//!     │  (A matches, B differs!)             │
18//!     │                                      │
19//!     ├──── "Children of B?" ───────────────►│
20//!     │◄──── [{path:"BA",hash:"..."}, ...] ──┤
21//!     │                                      │
22//!     │  (Found it! BA.leaf.123 differs)     │
23//!     │                                      │
24//!     ├──── "Get sync_item BA.leaf.123" ────►│
25//!     │◄──── { full CRDT data } ─────────────┤
26//! ```
27
28use std::collections::BTreeMap;
29use tracing::{debug, instrument};
30
31use crate::merkle::MerkleNode;
32use crate::storage::traits::StorageError;
33
34use super::SyncEngine;
35
36/// Result of comparing merkle trees between two nodes.
37#[derive(Debug, Clone)]
38pub struct MerkleDiff {
39    /// Paths where hashes differ (needs sync)
40    pub divergent_paths: Vec<String>,
41    /// Paths that exist locally but not remotely (local additions)
42    pub local_only: Vec<String>,
43    /// Paths that exist remotely but not locally (remote additions)
44    pub remote_only: Vec<String>,
45}
46
47impl SyncEngine {
48    // ═══════════════════════════════════════════════════════════════════════════
49    // Public Merkle API: For external sync between nodes
50    // ═══════════════════════════════════════════════════════════════════════════
51
52    /// Get the current merkle root hash.
53    ///
54    /// Returns the root hash from SQL (ground truth), or from Redis if they match.
55    /// This is the starting point for sync verification.
56    ///
57    /// # Returns
58    /// - `Some([u8; 32])` - The root hash
59    /// - `None` - No data has been written yet (empty tree)
60    #[instrument(skip(self))]
61    pub async fn get_merkle_root(&self) -> Result<Option<[u8; 32]>, StorageError> {
62        // SQL is ground truth - always prefer it
63        if let Some(ref sql_merkle) = self.sql_merkle {
64            return sql_merkle.root_hash().await;
65        }
66        
67        // Fall back to Redis if no SQL
68        if let Some(ref redis_merkle) = self.redis_merkle {
69            return redis_merkle.get_hash("").await;
70        }
71        
72        Ok(None)
73    }
74    
75    /// Get the hash for a specific merkle path.
76    ///
77    /// Serves from Redis if `redis_root == sql_root` (fast path), 
78    /// otherwise falls back to SQL (slow path).
79    ///
80    /// # Arguments
81    /// * `path` - The merkle path (e.g., "uk.nhs.patient")
82    ///
83    /// # Returns
84    /// - `Some([u8; 32])` - The hash at this path
85    /// - `None` - Path doesn't exist in the tree
86    #[instrument(skip(self))]
87    pub async fn get_merkle_hash(&self, path: &str) -> Result<Option<[u8; 32]>, StorageError> {
88        // Check if Redis shadow is in sync with SQL ground truth
89        if self.is_merkle_synced().await? {
90            // Fast path: serve from Redis
91            if let Some(ref redis_merkle) = self.redis_merkle {
92                return redis_merkle.get_hash(path).await;
93            }
94        }
95        
96        // Slow path: query SQL
97        if let Some(ref sql_merkle) = self.sql_merkle {
98            return sql_merkle.get_hash(path).await;
99        }
100        
101        Ok(None)
102    }
103    
104    /// Get a full merkle node (hash + children).
105    ///
106    /// Use this for tree traversal during sync.
107    ///
108    /// # Arguments
109    /// * `path` - The merkle path (use "" for root)
110    #[instrument(skip(self))]
111    pub async fn get_merkle_node(&self, path: &str) -> Result<Option<MerkleNode>, StorageError> {
112        // Check if Redis shadow is in sync
113        if self.is_merkle_synced().await? {
114            if let Some(ref redis_merkle) = self.redis_merkle {
115                return redis_merkle.get_node(path).await;
116            }
117        }
118        
119        // Fall back to SQL
120        if let Some(ref sql_merkle) = self.sql_merkle {
121            return sql_merkle.get_node(path).await;
122        }
123        
124        Ok(None)
125    }
126    
127    /// Get children of a merkle path.
128    ///
129    /// Returns a map of `segment -> hash` for all direct children.
130    /// Use this to traverse the tree level by level.
131    ///
132    /// # Arguments
133    /// * `path` - Parent path (use "" for top-level children)
134    ///
135    /// # Example
136    /// ```text
137    /// path="" → {"uk": 0xABC, "us": 0xDEF}
138    /// path="uk" → {"nhs": 0x123, "private": 0x456}
139    /// ```
140    #[instrument(skip(self))]
141    pub async fn get_merkle_children(&self, path: &str) -> Result<BTreeMap<String, [u8; 32]>, StorageError> {
142        // Check if Redis shadow is in sync
143        if self.is_merkle_synced().await? {
144            if let Some(ref redis_merkle) = self.redis_merkle {
145                return redis_merkle.get_children(path).await;
146            }
147        }
148        
149        // Fall back to SQL
150        if let Some(ref sql_merkle) = self.sql_merkle {
151            return sql_merkle.get_children(path).await;
152        }
153        
154        Ok(BTreeMap::new())
155    }
156    
157    /// Find paths where local and remote merkle trees diverge.
158    ///
159    /// This is the main sync algorithm - given remote node's hashes,
160    /// find the minimal set of paths that need to be synced.
161    ///
162    /// # Arguments
163    /// * `remote_nodes` - Pairs of (path, hash) from the remote node
164    ///
165    /// # Returns
166    /// Paths where hashes differ (need to fetch/push data)
167    ///
168    /// # Algorithm
169    /// For each remote (path, hash) pair:
170    /// 1. Get local hash for that path
171    /// 2. If hashes match → subtree is synced, skip
172    /// 3. If hashes differ → add to divergent list
173    /// 4. If local missing → add to remote_only
174    /// 5. If remote missing → add to local_only
175    #[instrument(skip(self, remote_nodes), fields(remote_count = remote_nodes.len()))]
176    pub async fn find_divergent_paths(
177        &self,
178        remote_nodes: &[(String, [u8; 32])],
179    ) -> Result<MerkleDiff, StorageError> {
180        let mut divergent_paths = Vec::new();
181        let remote_only = Vec::new(); // Paths remote has, we don't
182        let local_only = Vec::new();  // Paths we have, remote doesn't
183        
184        for (path, remote_hash) in remote_nodes {
185            let local_hash = self.get_merkle_hash(path).await?;
186            
187            match local_hash {
188                Some(local) if local == *remote_hash => {
189                    // Hashes match - this subtree is synced
190                    debug!(path = %path, "Merkle path synced");
191                }
192                Some(local) => {
193                    // Hashes differ - need to sync this path
194                    debug!(
195                        path = %path, 
196                        local = %hex::encode(local),
197                        remote = %hex::encode(remote_hash),
198                        "Merkle path diverged"
199                    );
200                    divergent_paths.push(path.clone());
201                }
202                None => {
203                    // We don't have this path at all
204                    debug!(path = %path, "Path exists on remote only");
205                    divergent_paths.push(path.clone());
206                }
207            }
208        }
209        
210        Ok(MerkleDiff {
211            divergent_paths,
212            local_only,
213            remote_only,
214        })
215    }
216    
217    /// Check if Redis merkle tree is in sync with SQL ground truth.
218    ///
219    /// This is used to determine whether we can serve merkle queries from Redis
220    /// (fast) or need to fall back to SQL (slow but authoritative).
221    #[instrument(skip(self))]
222    pub async fn is_merkle_synced(&self) -> Result<bool, StorageError> {
223        let sql_root = if let Some(ref sql_merkle) = self.sql_merkle {
224            sql_merkle.root_hash().await?
225        } else {
226            return Ok(false);
227        };
228        
229        let redis_root = if let Some(ref redis_merkle) = self.redis_merkle {
230            redis_merkle.get_hash("").await?
231        } else {
232            return Ok(false);
233        };
234        
235        Ok(sql_root == redis_root)
236    }
237    
238    /// Drill down the merkle tree to find all divergent leaf paths.
239    ///
240    /// Starting from a known-divergent path, recursively descend until
241    /// we find the actual leaves that need syncing.
242    ///
243    /// # Arguments
244    /// * `start_path` - A path known to have divergent hashes
245    /// * `remote_children` - Function to get children from remote node
246    ///
247    /// # Returns
248    /// List of leaf paths that need to be synced
249    #[instrument(skip(self, remote_children))]
250    pub async fn drill_down_divergence<F, Fut>(
251        &self,
252        start_path: &str,
253        remote_children: F,
254    ) -> Result<Vec<String>, StorageError>
255    where
256        F: Fn(String) -> Fut,
257        Fut: std::future::Future<Output = Result<BTreeMap<String, [u8; 32]>, StorageError>>,
258    {
259        let mut divergent_leaves = Vec::new();
260        let mut paths_to_check = vec![start_path.to_string()];
261        
262        while let Some(path) = paths_to_check.pop() {
263            let local_children = self.get_merkle_children(&path).await?;
264            let remote = remote_children(path.clone()).await?;
265            
266            // Find which children differ
267            for (segment, remote_hash) in &remote {
268                let child_path = if path.is_empty() {
269                    segment.clone()
270                } else {
271                    format!("{}.{}", path, segment)
272                };
273                
274                match local_children.get(segment) {
275                    Some(local_hash) if local_hash == remote_hash => {
276                        // Child is synced
277                    }
278                    Some(_) => {
279                        // Child differs - check if leaf or drill deeper
280                        let node = self.get_merkle_node(&child_path).await?;
281                        if node.map(|n| n.is_leaf).unwrap_or(true) {
282                            divergent_leaves.push(child_path);
283                        } else {
284                            paths_to_check.push(child_path);
285                        }
286                    }
287                    None => {
288                        // We don't have this child - it's a missing subtree
289                        divergent_leaves.push(child_path);
290                    }
291                }
292            }
293            
294            // Check for children we have that remote doesn't
295            for segment in local_children.keys() {
296                if !remote.contains_key(segment) {
297                    let child_path = if path.is_empty() {
298                        segment.clone()
299                    } else {
300                        format!("{}.{}", path, segment)
301                    };
302                    // Remote missing this - they need it from us
303                    divergent_leaves.push(child_path);
304                }
305            }
306        }
307        
308        Ok(divergent_leaves)
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315    
316    #[test]
317    fn test_merkle_diff_struct() {
318        let diff = MerkleDiff {
319            divergent_paths: vec!["uk.nhs".to_string()],
320            local_only: vec!["uk.private".to_string()],
321            remote_only: vec!["us.medicare".to_string()],
322        };
323        
324        assert_eq!(diff.divergent_paths.len(), 1);
325        assert_eq!(diff.local_only.len(), 1);
326        assert_eq!(diff.remote_only.len(), 1);
327    }
328}