sync_engine/merkle/
cache_store.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Redis cache for SQL Merkle tree (shadow copy).
5//!
6//! This is a **read cache** of the SQL Merkle tree, NOT a Merkle tree of Redis data.
7//! SQL is ground truth - this just provides fast reads for cold path sync.
8//!
9//! # Design
10//!
11//! - SQL Merkle store does all tree computation (hashing, bubble-up)
12//! - After each SQL batch, we copy the affected nodes to Redis
13//! - Cold path queries read from this cache instead of SQL
14//! - If cache is empty/stale, we fall back to SQL
15//!
16//! # Keys
17//!
18//! - `{prefix}merkle:hash:{path}` → 32-byte hash (hex-encoded)
19//! - `{prefix}merkle:children:{path}` → sorted set of `segment:hash` pairs
20
21use super::path_tree::MerkleNode;
22use crate::StorageError;
23use redis::aio::ConnectionManager;
24use redis::AsyncCommands;
25use std::collections::BTreeMap;
26use tracing::{debug, instrument};
27
28/// Redis key prefixes for Merkle cache.
29const MERKLE_HASH_PREFIX: &str = "merkle:hash:";
30const MERKLE_CHILDREN_PREFIX: &str = "merkle:children:";
31
32/// Redis cache for SQL Merkle tree.
33///
34/// This is a **dumb cache** - no tree computation happens here.
35/// All tree logic (hashing, bubble-up) is handled by SqlMerkleStore.
36#[derive(Clone)]
37pub struct MerkleCacheStore {
38    conn: ConnectionManager,
39    /// Key prefix for namespacing (e.g., "node-a:" → "node-a:merkle:hash:...")
40    prefix: String,
41}
42
43impl MerkleCacheStore {
44    /// Create a new cache store without a prefix.
45    pub fn new(conn: ConnectionManager) -> Self {
46        Self::with_prefix(conn, None)
47    }
48    
49    /// Create a new cache store with an optional prefix.
50    pub fn with_prefix(conn: ConnectionManager, prefix: Option<&str>) -> Self {
51        Self { 
52            conn,
53            prefix: prefix.unwrap_or("").to_string(),
54        }
55    }
56    
57    /// Build the full key with prefix.
58    #[inline]
59    fn prefixed_key(&self, suffix: &str) -> String {
60        if self.prefix.is_empty() {
61            suffix.to_string()
62        } else {
63            format!("{}{}", self.prefix, suffix)
64        }
65    }
66    
67    /// Get the prefix used for all merkle keys.
68    pub fn key_prefix(&self) -> &str {
69        &self.prefix
70    }
71
72    // =========================================================================
73    // Read Operations (for cold path sync)
74    // =========================================================================
75
76    /// Get the hash for a path (root = "").
77    #[instrument(skip(self))]
78    pub async fn get_hash(&self, path: &str) -> Result<Option<[u8; 32]>, StorageError> {
79        let key = self.prefixed_key(&format!("{}{}", MERKLE_HASH_PREFIX, path));
80        let mut conn = self.conn.clone();
81        
82        let result: Option<String> = conn.get(&key).await.map_err(|e| {
83            StorageError::Backend(format!("Failed to get merkle hash from cache: {}", e))
84        })?;
85        
86        match result {
87            Some(hex_str) => {
88                let bytes = hex::decode(&hex_str).map_err(|e| {
89                    StorageError::Backend(format!("Invalid merkle hash hex in cache: {}", e))
90                })?;
91                if bytes.len() != 32 {
92                    return Err(StorageError::Backend(format!(
93                        "Invalid merkle hash length in cache: {}",
94                        bytes.len()
95                    )));
96                }
97                let mut hash = [0u8; 32];
98                hash.copy_from_slice(&bytes);
99                Ok(Some(hash))
100            }
101            None => Ok(None),
102        }
103    }
104
105    /// Get the root hash (path = "").
106    pub async fn root_hash(&self) -> Result<Option<[u8; 32]>, StorageError> {
107        self.get_hash("").await
108    }
109
110    /// Get children of an interior node.
111    #[instrument(skip(self))]
112    pub async fn get_children(
113        &self,
114        path: &str,
115    ) -> Result<BTreeMap<String, [u8; 32]>, StorageError> {
116        let key = self.prefixed_key(&format!("{}{}", MERKLE_CHILDREN_PREFIX, path));
117        let mut conn = self.conn.clone();
118        
119        // ZRANGE returns members as strings
120        let members: Vec<String> = conn.zrange(&key, 0, -1).await.map_err(|e| {
121            StorageError::Backend(format!("Failed to get merkle children from cache: {}", e))
122        })?;
123        
124        let mut children: BTreeMap<String, [u8; 32]> = BTreeMap::new();
125        for member in &members {
126            // member format: "segment:hexhash"
127            if let Some((segment, hash_hex)) = member.split_once(':') {
128                if let Ok(bytes) = hex::decode(hash_hex) {
129                    if bytes.len() == 32 {
130                        let mut hash = [0u8; 32];
131                        hash.copy_from_slice(&bytes);
132                        children.insert(segment.to_string(), hash);
133                    }
134                }
135            }
136        }
137        
138        Ok(children)
139    }
140
141    /// Get a full node (hash + children).
142    pub async fn get_node(&self, path: &str) -> Result<Option<MerkleNode>, StorageError> {
143        let hash = self.get_hash(path).await?;
144        
145        match hash {
146            Some(h) => {
147                let children = self.get_children(path).await?;
148                Ok(Some(if children.is_empty() {
149                    MerkleNode::leaf(h)
150                } else {
151                    MerkleNode {
152                        hash: h,
153                        children,
154                        is_leaf: false,
155                    }
156                }))
157            }
158            None => Ok(None),
159        }
160    }
161
162    // =========================================================================
163    // Write Operations (copy from SQL)
164    // =========================================================================
165
166    /// Copy a node from SQL to this cache.
167    ///
168    /// Called after SQL merkle batch completes. Copies both hash and children.
169    pub async fn cache_node(
170        &self,
171        path: &str,
172        hash: [u8; 32],
173        children: &BTreeMap<String, [u8; 32]>,
174    ) -> Result<(), StorageError> {
175        let hash_key = self.prefixed_key(&format!("{}{}", MERKLE_HASH_PREFIX, path));
176        let children_key = self.prefixed_key(&format!("{}{}", MERKLE_CHILDREN_PREFIX, path));
177        
178        let hash_hex = hex::encode(hash);
179        let mut conn = self.conn.clone();
180        
181        let mut pipe = redis::pipe();
182        pipe.atomic();
183        
184        // Set hash
185        pipe.set(&hash_key, &hash_hex);
186        
187        // Clear and rebuild children set (if any)
188        pipe.del(&children_key);
189        for (segment, child_hash) in children {
190            let member = format!("{}:{}", segment, hex::encode(child_hash));
191            pipe.zadd(&children_key, &member, 0i64);
192        }
193        
194        pipe.query_async::<()>(&mut conn).await.map_err(|e| {
195            StorageError::Backend(format!("Failed to cache merkle node: {}", e))
196        })?;
197        
198        Ok(())
199    }
200
201    /// Delete a node from cache.
202    pub async fn delete_node(&self, path: &str) -> Result<(), StorageError> {
203        let hash_key = self.prefixed_key(&format!("{}{}", MERKLE_HASH_PREFIX, path));
204        let children_key = self.prefixed_key(&format!("{}{}", MERKLE_CHILDREN_PREFIX, path));
205        
206        let mut conn = self.conn.clone();
207        let mut pipe = redis::pipe();
208        pipe.del(&hash_key);
209        pipe.del(&children_key);
210        
211        pipe.query_async::<()>(&mut conn).await.map_err(|e| {
212            StorageError::Backend(format!("Failed to delete cached merkle node: {}", e))
213        })?;
214        
215        Ok(())
216    }
217
218    /// Sync entire SQL merkle tree to cache.
219    ///
220    /// Useful on startup or after cache invalidation.
221    /// Reads all nodes from SQL and copies to Redis.
222    #[instrument(skip(self, sql_store))]
223    pub async fn sync_from_sql(
224        &self,
225        sql_store: &super::SqlMerkleStore,
226    ) -> Result<usize, StorageError> {
227        // Get all nodes from SQL
228        let nodes = sql_store.get_all_nodes().await?;
229        let count = nodes.len();
230        
231        if count == 0 {
232            debug!("No SQL merkle nodes to cache");
233            return Ok(0);
234        }
235        
236        // Copy each node to Redis
237        let mut conn = self.conn.clone();
238        let mut pipe = redis::pipe();
239        pipe.atomic();
240        
241        for (path, hash, children) in &nodes {
242            let hash_key = self.prefixed_key(&format!("{}{}", MERKLE_HASH_PREFIX, path));
243            let children_key = self.prefixed_key(&format!("{}{}", MERKLE_CHILDREN_PREFIX, path));
244            
245            pipe.set(&hash_key, hex::encode(hash));
246            pipe.del(&children_key);
247            
248            for (segment, child_hash) in children {
249                let member = format!("{}:{}", segment, hex::encode(child_hash));
250                pipe.zadd(&children_key, &member, 0i64);
251            }
252        }
253        
254        pipe.query_async::<()>(&mut conn).await.map_err(|e| {
255            StorageError::Backend(format!("Failed to sync merkle cache from SQL: {}", e))
256        })?;
257        
258        debug!(nodes_cached = count, "Synced SQL merkle tree to cache");
259        Ok(count)
260    }
261
262    /// Sync only affected paths from SQL to cache.
263    ///
264    /// More efficient than full sync for incremental updates.
265    /// Syncs the leaves, their ancestors, and the root.
266    #[instrument(skip(self, sql_store, affected_paths))]
267    pub async fn sync_affected_from_sql(
268        &self,
269        sql_store: &super::SqlMerkleStore,
270        affected_paths: &[String],
271    ) -> Result<usize, StorageError> {
272        use std::collections::HashSet;
273        use super::PathMerkle;
274        
275        if affected_paths.is_empty() {
276            return Ok(0);
277        }
278        
279        // Collect all paths that need syncing (leaves + ancestors + root)
280        let mut paths_to_sync: HashSet<String> = HashSet::new();
281        paths_to_sync.insert(String::new()); // Always sync root
282        
283        for path in affected_paths {
284            paths_to_sync.insert(path.clone());
285            // Add all ancestor prefixes
286            for ancestor in PathMerkle::ancestor_prefixes(path) {
287                paths_to_sync.insert(ancestor);
288            }
289        }
290        
291        let mut conn = self.conn.clone();
292        let mut pipe = redis::pipe();
293        pipe.atomic();
294        let mut count = 0;
295        
296        for path in &paths_to_sync {
297            // Get hash and children from SQL
298            if let Ok(Some(hash)) = sql_store.get_hash(path).await {
299                let hash_key = self.prefixed_key(&format!("{}{}", MERKLE_HASH_PREFIX, path));
300                pipe.set(&hash_key, hex::encode(hash));
301                count += 1;
302                
303                // Sync children for interior nodes
304                if let Ok(children) = sql_store.get_children(path).await {
305                    if !children.is_empty() {
306                        let children_key = self.prefixed_key(&format!("{}{}", MERKLE_CHILDREN_PREFIX, path));
307                        pipe.del(&children_key);
308                        for (segment, child_hash) in &children {
309                            let member = format!("{}:{}", segment, hex::encode(child_hash));
310                            pipe.zadd(&children_key, &member, 0i64);
311                        }
312                    }
313                }
314            }
315        }
316        
317        if count > 0 {
318            pipe.query_async::<()>(&mut conn).await.map_err(|e| {
319                StorageError::Backend(format!("Failed to sync affected merkle nodes to cache: {}", e))
320            })?;
321        }
322        
323        debug!(paths_synced = count, "Synced affected merkle paths to cache");
324        Ok(count)
325    }
326
327    /// Compare hashes and find differing branches.
328    ///
329    /// Returns prefixes where our hash differs from theirs.
330    #[instrument(skip(self, their_children))]
331    pub async fn diff_children(
332        &self,
333        prefix: &str,
334        their_children: &BTreeMap<String, [u8; 32]>,
335    ) -> Result<Vec<String>, StorageError> {
336        let our_children = self.get_children(prefix).await?;
337        let mut diffs = Vec::new();
338        
339        let prefix_with_dot = if prefix.is_empty() {
340            String::new()
341        } else {
342            format!("{}.", prefix)
343        };
344
345        // Find segments where hashes differ or we have but they don't
346        for (segment, our_hash) in &our_children {
347            match their_children.get(segment) {
348                Some(their_hash) if their_hash != our_hash => {
349                    diffs.push(format!("{}{}", prefix_with_dot, segment));
350                }
351                None => {
352                    diffs.push(format!("{}{}", prefix_with_dot, segment));
353                }
354                _ => {}
355            }
356        }
357
358        // Find segments they have but we don't
359        for segment in their_children.keys() {
360            if !our_children.contains_key(segment) {
361                diffs.push(format!("{}{}", prefix_with_dot, segment));
362            }
363        }
364
365        Ok(diffs)
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372
373    #[test]
374    fn test_key_prefixes() {
375        assert_eq!(
376            format!("{}{}", MERKLE_HASH_PREFIX, "uk.nhs.patient"),
377            "merkle:hash:uk.nhs.patient"
378        );
379        assert_eq!(
380            format!("{}{}", MERKLE_CHILDREN_PREFIX, "uk.nhs"),
381            "merkle:children:uk.nhs"
382        );
383    }
384}