sync_engine/merkle/
redis_store.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Redis storage for Merkle tree nodes.
5//!
6//! Merkle nodes are stored separately from data and are NEVER evicted.
7//! They're tiny (32 bytes + overhead) and critical for sync verification.
8
9use super::path_tree::{MerkleBatch, MerkleNode};
10use crate::StorageError;
11use redis::aio::ConnectionManager;
12use redis::AsyncCommands;
13use std::collections::BTreeMap;
14use tracing::{debug, instrument};
15
16/// Redis key prefixes for Merkle storage.
17const MERKLE_HASH_PREFIX: &str = "merkle:hash:";
18const MERKLE_CHILDREN_PREFIX: &str = "merkle:children:";
19
20/// Redis-backed Merkle tree storage.
21///
22/// Uses two key patterns:
23/// - `{prefix}merkle:hash:{path}` -> 32-byte hash (string, hex-encoded)
24/// - `{prefix}merkle:children:{path}` -> sorted set of `segment:hash` pairs
25///
26/// The optional prefix enables namespacing when sharing Redis with other apps.
27#[derive(Clone)]
28pub struct RedisMerkleStore {
29    conn: ConnectionManager,
30    /// Optional key prefix for namespacing (e.g., "myapp:" → "myapp:merkle:hash:...")
31    prefix: String,
32}
33
34impl RedisMerkleStore {
35    /// Create a new merkle store without a prefix.
36    pub fn new(conn: ConnectionManager) -> Self {
37        Self::with_prefix(conn, None)
38    }
39    
40    /// Create a new merkle store with an optional prefix.
41    pub fn with_prefix(conn: ConnectionManager, prefix: Option<&str>) -> Self {
42        Self { 
43            conn,
44            prefix: prefix.unwrap_or("").to_string(),
45        }
46    }
47    
48    /// Build the full key with prefix.
49    #[inline]
50    fn prefixed_key(&self, suffix: &str) -> String {
51        if self.prefix.is_empty() {
52            suffix.to_string()
53        } else {
54            format!("{}{}", self.prefix, suffix)
55        }
56    }
57    
58    /// Get the prefix used for all merkle keys.
59    pub fn key_prefix(&self) -> &str {
60        &self.prefix
61    }
62
63    /// Get the hash for a prefix (interior node or leaf).
64    #[instrument(skip(self))]
65    pub async fn get_hash(&self, path: &str) -> Result<Option<[u8; 32]>, StorageError> {
66        let key = self.prefixed_key(&format!("{}{}", MERKLE_HASH_PREFIX, path));
67        let mut conn = self.conn.clone();
68        
69        let result: Option<String> = conn.get(&key).await.map_err(|e| {
70            StorageError::Backend(format!("Failed to get merkle hash: {}", e))
71        })?;
72        
73        match result {
74            Some(hex_str) => {
75                let bytes = hex::decode(&hex_str).map_err(|e| {
76                    StorageError::Backend(format!("Invalid merkle hash hex: {}", e))
77                })?;
78                if bytes.len() != 32 {
79                    return Err(StorageError::Backend(format!(
80                        "Invalid merkle hash length: {}",
81                        bytes.len()
82                    )));
83                }
84                let mut hash = [0u8; 32];
85                hash.copy_from_slice(&bytes);
86                Ok(Some(hash))
87            }
88            None => Ok(None),
89        }
90    }
91
92    /// Get children of an interior node.
93    #[instrument(skip(self))]
94    pub async fn get_children(
95        &self,
96        path: &str,
97    ) -> Result<BTreeMap<String, [u8; 32]>, StorageError> {
98        let key = self.prefixed_key(&format!("{}{}", MERKLE_CHILDREN_PREFIX, path));
99        let mut conn = self.conn.clone();
100        
101        // ZRANGE returns members as strings
102        let members: Vec<String> = conn.zrange(&key, 0, -1).await.map_err(|e| {
103            StorageError::Backend(format!("Failed to get merkle children: {}", e))
104        })?;
105        
106        let mut children: BTreeMap<String, [u8; 32]> = BTreeMap::new();
107        for member in &members {
108            // member format: "segment:hexhash"
109            let member_str: &str = member.as_str();
110            if let Some((segment, hash_hex)) = member_str.split_once(':') {
111                let bytes = hex::decode(hash_hex).map_err(|e| {
112                    StorageError::Backend(format!("Invalid child hash hex: {}", e))
113                })?;
114                if bytes.len() == 32 {
115                    let mut hash = [0u8; 32];
116                    hash.copy_from_slice(&bytes);
117                    children.insert(segment.to_string(), hash);
118                }
119            }
120        }
121        
122        Ok(children)
123    }
124
125    /// Get a full node (hash + children).
126    pub async fn get_node(&self, prefix: &str) -> Result<Option<MerkleNode>, StorageError> {
127        let hash = self.get_hash(prefix).await?;
128        
129        match hash {
130            Some(h) => {
131                let children: BTreeMap<String, [u8; 32]> = self.get_children(prefix).await?;
132                Ok(Some(if children.is_empty() {
133                    MerkleNode::leaf(h)
134                } else {
135                    MerkleNode {
136                        hash: h,
137                        children,
138                        is_leaf: false,
139                    }
140                }))
141            }
142            None => Ok(None),
143        }
144    }
145
146    /// Apply a batch of Merkle updates atomically.
147    ///
148    /// This handles the full bubble-up: updates leaves, then recomputes
149    /// all affected interior nodes bottom-up.
150    #[instrument(skip(self, batch), fields(batch_size = batch.len()))]
151    pub async fn apply_batch(&self, batch: &MerkleBatch) -> Result<(), StorageError> {
152        if batch.is_empty() {
153            return Ok(());
154        }
155
156        let mut conn = self.conn.clone();
157        let mut pipe = redis::pipe();
158        pipe.atomic();
159
160        // Step 1: Apply leaf updates
161        for (object_id, maybe_hash) in &batch.leaves {
162            let hash_key = self.prefixed_key(&format!("{}{}", MERKLE_HASH_PREFIX, object_id));
163            
164            match maybe_hash {
165                Some(hash) => {
166                    let hex_str = hex::encode(hash);
167                    pipe.set(&hash_key, &hex_str);
168                    debug!(object_id = %object_id, "Setting leaf hash");
169                }
170                None => {
171                    pipe.del(&hash_key);
172                    debug!(object_id = %object_id, "Deleting leaf hash");
173                }
174            }
175        }
176
177        // Execute leaf updates first
178        pipe.query_async::<()>(&mut conn).await.map_err(|e| {
179            StorageError::Backend(format!("Failed to apply merkle leaf updates: {}", e))
180        })?;
181
182        // Step 2: Bubble up - recompute interior nodes bottom-up
183        let affected_prefixes = batch.affected_prefixes();
184        
185        for prefix in affected_prefixes {
186            self.recompute_interior_node(&prefix).await?;
187        }
188
189        Ok(())
190    }
191
192    /// Recompute an interior node's hash from its children.
193    #[instrument(skip(self))]
194    async fn recompute_interior_node(&self, prefix: &str) -> Result<(), StorageError> {
195        let mut conn = self.conn.clone();
196        
197        // Build the prefix for finding direct children
198        let prefix_with_dot = if prefix.is_empty() {
199            String::new()
200        } else {
201            format!("{}.", prefix)
202        };
203        
204        // Use SCAN instead of KEYS to avoid blocking Redis
205        let scan_pattern = if prefix.is_empty() {
206            self.prefixed_key(&format!("{}*", MERKLE_HASH_PREFIX))
207        } else {
208            self.prefixed_key(&format!("{}{}.*", MERKLE_HASH_PREFIX, prefix))
209        };
210        
211        // For stripping keys later, we need the full prefix
212        let full_hash_prefix = self.prefixed_key(MERKLE_HASH_PREFIX);
213        
214        let mut keys: Vec<String> = Vec::new();
215        let mut cursor = 0u64;
216        
217        loop {
218            let (new_cursor, batch): (u64, Vec<String>) = redis::cmd("SCAN")
219                .arg(cursor)
220                .arg("MATCH")
221                .arg(&scan_pattern)
222                .arg("COUNT")
223                .arg(100)  // Fetch 100 keys at a time
224                .query_async(&mut conn)
225                .await
226                .map_err(|e| StorageError::Backend(format!("Failed to scan merkle keys: {}", e)))?;
227            
228            keys.extend(batch);
229            cursor = new_cursor;
230            
231            if cursor == 0 {
232                break;
233            }
234        }
235        
236        let mut direct_children: Vec<(String, String)> = Vec::new(); // (segment, full_key)
237        
238        for key in &keys {
239            // Extract the path from the key
240            let path: &str = key.strip_prefix(&full_hash_prefix).unwrap_or(key.as_str());
241            
242            // Check if this is a direct child
243            let suffix: &str = if prefix.is_empty() {
244                path
245            } else {
246                match path.strip_prefix(&prefix_with_dot) {
247                    Some(s) => s,
248                    None => continue,
249                }
250            };
251            
252            // Direct child has no dots in suffix (take first segment only)
253            if let Some(segment) = suffix.split('.').next() {
254                // Only if segment IS the whole suffix (no more dots)
255                if segment == suffix || !suffix.contains('.') {
256                    direct_children.push((segment.to_string(), key.clone()));
257                }
258            }
259        }
260
261        if direct_children.is_empty() {
262            // No children, this might be a leaf or deleted node
263            return Ok(());
264        }
265
266        // Batch fetch hashes (MGET)
267        let mut children: BTreeMap<String, [u8; 32]> = BTreeMap::new();
268        
269        // Redis MGET is O(n) - chunk to avoid blocking the server too long
270        const MGET_CHUNK_SIZE: usize = 1000;
271        for chunk in direct_children.chunks(MGET_CHUNK_SIZE) {
272            let keys: Vec<String> = chunk.iter().map(|(_, k)| k.clone()).collect();
273            let segments: Vec<String> = chunk.iter().map(|(s, _)| s.clone()).collect();
274            
275            let hex_hashes: Vec<Option<String>> = conn.mget(&keys).await.map_err(|e| {
276                StorageError::Backend(format!("Failed to batch get merkle hashes: {}", e))
277            })?;
278            
279            for (i, maybe_hex) in hex_hashes.into_iter().enumerate() {
280                if let Some(hex_str) = maybe_hex {
281                    if let Ok(bytes) = hex::decode(&hex_str) {
282                        if bytes.len() == 32 {
283                            let mut hash = [0u8; 32];
284                            hash.copy_from_slice(&bytes);
285                            children.insert(segments[i].clone(), hash);
286                        }
287                    }
288                }
289            }
290        }
291
292        if children.is_empty() {
293            // No children, this might be a leaf or deleted node
294            return Ok(());
295        }
296
297        // Compute new hash
298        let node = MerkleNode::interior(children.clone());
299        let hash_hex = hex::encode(node.hash);
300        
301        // Update hash and children set
302        let hash_key = self.prefixed_key(&format!("{}{}", MERKLE_HASH_PREFIX, prefix));
303        let children_key = self.prefixed_key(&format!("{}{}", MERKLE_CHILDREN_PREFIX, prefix));
304        
305        let mut pipe = redis::pipe();
306        pipe.atomic();
307        pipe.set(&hash_key, &hash_hex);
308        
309        // Clear and rebuild children set
310        pipe.del(&children_key);
311        for (segment, hash) in &children {
312            let member = format!("{}:{}", segment, hex::encode(hash));
313            pipe.zadd(&children_key, &member, 0i64);
314        }
315        
316        pipe.query_async::<()>(&mut conn).await.map_err(|e| {
317            StorageError::Backend(format!("Failed to update interior node: {}", e))
318        })?;
319
320        debug!(prefix = %prefix, children_count = children.len(), "Recomputed interior node");
321        
322        Ok(())
323    }
324
325    /// Get the root hash (empty prefix = root of tree).
326    pub async fn root_hash(&self) -> Result<Option<[u8; 32]>, StorageError> {
327        // The root is the hash of all top-level segments
328        self.recompute_interior_node("").await?;
329        
330        // Root hash is stored at ""
331        let key = self.prefixed_key(MERKLE_HASH_PREFIX);
332        let mut conn = self.conn.clone();
333        
334        let result: Option<String> = conn.get(&key).await.map_err(|e| {
335            StorageError::Backend(format!("Failed to get root hash: {}", e))
336        })?;
337        
338        match result {
339            Some(hex_str) => {
340                let bytes = hex::decode(&hex_str).map_err(|e| {
341                    StorageError::Backend(format!("Invalid root hash hex: {}", e))
342                })?;
343                if bytes.len() != 32 {
344                    return Err(StorageError::Backend(format!(
345                        "Invalid root hash length: {}",
346                        bytes.len()
347                    )));
348                }
349                let mut hash = [0u8; 32];
350                hash.copy_from_slice(&bytes);
351                Ok(Some(hash))
352            }
353            None => Ok(None),
354        }
355    }
356
357    /// Compare hashes and find differing branches.
358    ///
359    /// Returns prefixes where our hash differs from theirs.
360    #[instrument(skip(self, their_children))]
361    pub async fn diff_children(
362        &self,
363        prefix: &str,
364        their_children: &BTreeMap<String, [u8; 32]>,
365    ) -> Result<Vec<String>, StorageError> {
366        let our_children: BTreeMap<String, [u8; 32]> = self.get_children(prefix).await?;
367        let mut diffs = Vec::new();
368        
369        let prefix_with_dot = if prefix.is_empty() {
370            String::new()
371        } else {
372            format!("{}.", prefix)
373        };
374
375        // Find segments where hashes differ or we have but they don't
376        for (segment, our_hash) in &our_children {
377            match their_children.get(segment) {
378                Some(their_hash) if their_hash != our_hash => {
379                    diffs.push(format!("{}{}", prefix_with_dot, segment));
380                }
381                None => {
382                    // We have it, they don't
383                    diffs.push(format!("{}{}", prefix_with_dot, segment));
384                }
385                _ => {} // Hashes match
386            }
387        }
388
389        // Find segments they have but we don't
390        for segment in their_children.keys() {
391            if !our_children.contains_key(segment) {
392                diffs.push(format!("{}{}", prefix_with_dot, segment));
393            }
394        }
395
396        Ok(diffs)
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use super::*;
403
404    #[test]
405    fn test_key_prefixes() {
406        assert_eq!(
407            format!("{}{}", MERKLE_HASH_PREFIX, "uk.nhs.patient"),
408            "merkle:hash:uk.nhs.patient"
409        );
410        assert_eq!(
411            format!("{}{}", MERKLE_CHILDREN_PREFIX, "uk.nhs"),
412            "merkle:children:uk.nhs"
413        );
414    }
415}