sync_engine/coordinator/
merkle_api.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Public Merkle tree accessor API for external sync.
5//!
6//! This module exposes merkle tree queries for use by external nodes in a 
7//! multi-instance deployment. The typical sync flow:
8//!
9//! ```text
10//! Remote Node                           Local Node
11//!     │                                      │
12//!     ├──── "What's your root?" ────────────►│
13//!     │◄──── "0xABCD" ─────────────────────┤
14//!     │                                      │
15//!     │  (Hmm, mine is 0x1234, different!)   │
16//!     │                                      │
17//!     ├──── "Children of root?" ────────────►│
18//!     │◄──── [{path:"A",hash:"..."}, ...] ───┤
19//!     │                                      │
20//!     │  (A matches, B differs!)             │
21//!     │                                      │
22//!     ├──── "Children of B?" ───────────────►│
23//!     │◄──── [{path:"BA",hash:"..."}, ...] ──┤
24//!     │                                      │
25//!     │  (Found it! BA.leaf.123 differs)     │
26//!     │                                      │
27//!     ├──── "Get sync_item BA.leaf.123" ────►│
28//!     │◄──── { full CRDT data } ─────────────┤
29//! ```
30
31use std::collections::BTreeMap;
32use tracing::{debug, instrument};
33
34use crate::merkle::MerkleNode;
35use crate::storage::traits::StorageError;
36
37use super::SyncEngine;
38
39/// Result of comparing merkle trees between two nodes.
40#[derive(Debug, Clone)]
41pub struct MerkleDiff {
42    /// Paths where hashes differ (needs sync)
43    pub divergent_paths: Vec<String>,
44    /// Paths that exist locally but not remotely (local additions)
45    pub local_only: Vec<String>,
46    /// Paths that exist remotely but not locally (remote additions)
47    pub remote_only: Vec<String>,
48}
49
50impl SyncEngine {
51    // ═══════════════════════════════════════════════════════════════════════════
52    // Public Merkle API: For external sync between nodes
53    // ═══════════════════════════════════════════════════════════════════════════
54
55    /// Get the current merkle root hash.
56    ///
57    /// Returns the root hash from SQL (ground truth), or from Redis if they match.
58    /// This is the starting point for sync verification.
59    ///
60    /// # Returns
61    /// - `Some([u8; 32])` - The root hash
62    /// - `None` - No data has been written yet (empty tree)
63    #[instrument(skip(self))]
64    pub async fn get_merkle_root(&self) -> Result<Option<[u8; 32]>, StorageError> {
65        // SQL is ground truth - always prefer it
66        if let Some(ref sql_merkle) = self.sql_merkle {
67            return sql_merkle.root_hash().await;
68        }
69        
70        // Fall back to Redis if no SQL
71        if let Some(ref redis_merkle) = self.redis_merkle {
72            return redis_merkle.get_hash("").await;
73        }
74        
75        Ok(None)
76    }
77    
78    /// Get the hash for a specific merkle path.
79    ///
80    /// Serves from Redis if `redis_root == sql_root` (fast path), 
81    /// otherwise falls back to SQL (slow path).
82    ///
83    /// # Arguments
84    /// * `path` - The merkle path (e.g., "uk.nhs.patient")
85    ///
86    /// # Returns
87    /// - `Some([u8; 32])` - The hash at this path
88    /// - `None` - Path doesn't exist in the tree
89    #[instrument(skip(self))]
90    pub async fn get_merkle_hash(&self, path: &str) -> Result<Option<[u8; 32]>, StorageError> {
91        // Check if Redis shadow is in sync with SQL ground truth
92        if self.is_merkle_synced().await? {
93            // Fast path: serve from Redis
94            if let Some(ref redis_merkle) = self.redis_merkle {
95                return redis_merkle.get_hash(path).await;
96            }
97        }
98        
99        // Slow path: query SQL
100        if let Some(ref sql_merkle) = self.sql_merkle {
101            return sql_merkle.get_hash(path).await;
102        }
103        
104        Ok(None)
105    }
106    
107    /// Get a full merkle node (hash + children).
108    ///
109    /// Use this for tree traversal during sync.
110    ///
111    /// # Arguments
112    /// * `path` - The merkle path (use "" for root)
113    #[instrument(skip(self))]
114    pub async fn get_merkle_node(&self, path: &str) -> Result<Option<MerkleNode>, StorageError> {
115        // Check if Redis shadow is in sync
116        if self.is_merkle_synced().await? {
117            if let Some(ref redis_merkle) = self.redis_merkle {
118                return redis_merkle.get_node(path).await;
119            }
120        }
121        
122        // Fall back to SQL
123        if let Some(ref sql_merkle) = self.sql_merkle {
124            return sql_merkle.get_node(path).await;
125        }
126        
127        Ok(None)
128    }
129    
130    /// Get children of a merkle path.
131    ///
132    /// Returns a map of `segment -> hash` for all direct children.
133    /// Use this to traverse the tree level by level.
134    ///
135    /// # Arguments
136    /// * `path` - Parent path (use "" for top-level children)
137    ///
138    /// # Example
139    /// ```text
140    /// path="" → {"uk": 0xABC, "us": 0xDEF}
141    /// path="uk" → {"nhs": 0x123, "private": 0x456}
142    /// ```
143    #[instrument(skip(self))]
144    pub async fn get_merkle_children(&self, path: &str) -> Result<BTreeMap<String, [u8; 32]>, StorageError> {
145        // Check if Redis shadow is in sync
146        if self.is_merkle_synced().await? {
147            if let Some(ref redis_merkle) = self.redis_merkle {
148                return redis_merkle.get_children(path).await;
149            }
150        }
151        
152        // Fall back to SQL
153        if let Some(ref sql_merkle) = self.sql_merkle {
154            return sql_merkle.get_children(path).await;
155        }
156        
157        Ok(BTreeMap::new())
158    }
159    
160    /// Find paths where local and remote merkle trees diverge.
161    ///
162    /// This is the main sync algorithm - given remote node's hashes,
163    /// find the minimal set of paths that need to be synced.
164    ///
165    /// # Arguments
166    /// * `remote_nodes` - Pairs of (path, hash) from the remote node
167    ///
168    /// # Returns
169    /// Paths where hashes differ (need to fetch/push data)
170    ///
171    /// # Algorithm
172    /// For each remote (path, hash) pair:
173    /// 1. Get local hash for that path
174    /// 2. If hashes match → subtree is synced, skip
175    /// 3. If hashes differ → add to divergent list
176    /// 4. If local missing → add to remote_only
177    /// 5. If remote missing → add to local_only
178    #[instrument(skip(self, remote_nodes), fields(remote_count = remote_nodes.len()))]
179    pub async fn find_divergent_paths(
180        &self,
181        remote_nodes: &[(String, [u8; 32])],
182    ) -> Result<MerkleDiff, StorageError> {
183        let mut divergent_paths = Vec::new();
184        let remote_only = Vec::new(); // Paths remote has, we don't
185        let local_only = Vec::new();  // Paths we have, remote doesn't
186        
187        for (path, remote_hash) in remote_nodes {
188            let local_hash = self.get_merkle_hash(path).await?;
189            
190            match local_hash {
191                Some(local) if local == *remote_hash => {
192                    // Hashes match - this subtree is synced
193                    debug!(path = %path, "Merkle path synced");
194                }
195                Some(local) => {
196                    // Hashes differ - need to sync this path
197                    debug!(
198                        path = %path, 
199                        local = %hex::encode(local),
200                        remote = %hex::encode(remote_hash),
201                        "Merkle path diverged"
202                    );
203                    divergent_paths.push(path.clone());
204                }
205                None => {
206                    // We don't have this path at all
207                    debug!(path = %path, "Path exists on remote only");
208                    divergent_paths.push(path.clone());
209                }
210            }
211        }
212        
213        Ok(MerkleDiff {
214            divergent_paths,
215            local_only,
216            remote_only,
217        })
218    }
219    
220    /// Check if Redis merkle tree is in sync with SQL ground truth.
221    ///
222    /// This is used to determine whether we can serve merkle queries from Redis
223    /// (fast) or need to fall back to SQL (slow but authoritative).
224    #[instrument(skip(self))]
225    pub async fn is_merkle_synced(&self) -> Result<bool, StorageError> {
226        let sql_root = if let Some(ref sql_merkle) = self.sql_merkle {
227            sql_merkle.root_hash().await?
228        } else {
229            return Ok(false);
230        };
231        
232        let redis_root = if let Some(ref redis_merkle) = self.redis_merkle {
233            redis_merkle.get_hash("").await?
234        } else {
235            return Ok(false);
236        };
237        
238        Ok(sql_root == redis_root)
239    }
240    
241    /// Drill down the merkle tree to find all divergent leaf paths.
242    ///
243    /// Starting from a known-divergent path, recursively descend until
244    /// we find the actual leaves that need syncing.
245    ///
246    /// # Arguments
247    /// * `start_path` - A path known to have divergent hashes
248    /// * `remote_children` - Function to get children from remote node
249    ///
250    /// # Returns
251    /// List of leaf paths that need to be synced
252    #[instrument(skip(self, remote_children))]
253    pub async fn drill_down_divergence<F, Fut>(
254        &self,
255        start_path: &str,
256        remote_children: F,
257    ) -> Result<Vec<String>, StorageError>
258    where
259        F: Fn(String) -> Fut,
260        Fut: std::future::Future<Output = Result<BTreeMap<String, [u8; 32]>, StorageError>>,
261    {
262        let mut divergent_leaves = Vec::new();
263        let mut paths_to_check = vec![start_path.to_string()];
264        
265        while let Some(path) = paths_to_check.pop() {
266            let local_children = self.get_merkle_children(&path).await?;
267            let remote = remote_children(path.clone()).await?;
268            
269            // Find which children differ
270            for (segment, remote_hash) in &remote {
271                let child_path = if path.is_empty() {
272                    segment.clone()
273                } else {
274                    format!("{}.{}", path, segment)
275                };
276                
277                match local_children.get(segment) {
278                    Some(local_hash) if local_hash == remote_hash => {
279                        // Child is synced
280                    }
281                    Some(_) => {
282                        // Child differs - check if leaf or drill deeper
283                        let node = self.get_merkle_node(&child_path).await?;
284                        if node.map(|n| n.is_leaf).unwrap_or(true) {
285                            divergent_leaves.push(child_path);
286                        } else {
287                            paths_to_check.push(child_path);
288                        }
289                    }
290                    None => {
291                        // We don't have this child - it's a missing subtree
292                        divergent_leaves.push(child_path);
293                    }
294                }
295            }
296            
297            // Check for children we have that remote doesn't
298            for segment in local_children.keys() {
299                if !remote.contains_key(segment) {
300                    let child_path = if path.is_empty() {
301                        segment.clone()
302                    } else {
303                        format!("{}.{}", path, segment)
304                    };
305                    // Remote missing this - they need it from us
306                    divergent_leaves.push(child_path);
307                }
308            }
309        }
310        
311        Ok(divergent_leaves)
312    }
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318    
319    #[test]
320    fn test_merkle_diff_struct() {
321        let diff = MerkleDiff {
322            divergent_paths: vec!["uk.nhs".to_string()],
323            local_only: vec!["uk.private".to_string()],
324            remote_only: vec!["us.medicare".to_string()],
325        };
326        
327        assert_eq!(diff.divergent_paths.len(), 1);
328        assert_eq!(diff.local_only.len(), 1);
329        assert_eq!(diff.remote_only.len(), 1);
330    }
331}