1use super::path_tree::{MerkleBatch, MerkleNode};
10use crate::StorageError;
11use redis::aio::ConnectionManager;
12use redis::AsyncCommands;
13use std::collections::BTreeMap;
14use tracing::{debug, instrument};
15
16const MERKLE_HASH_PREFIX: &str = "merkle:hash:";
18const MERKLE_CHILDREN_PREFIX: &str = "merkle:children:";
19
20#[derive(Clone)]
28pub struct RedisMerkleStore {
29 conn: ConnectionManager,
30 prefix: String,
32}
33
34impl RedisMerkleStore {
35 pub fn new(conn: ConnectionManager) -> Self {
37 Self::with_prefix(conn, None)
38 }
39
40 pub fn with_prefix(conn: ConnectionManager, prefix: Option<&str>) -> Self {
42 Self {
43 conn,
44 prefix: prefix.unwrap_or("").to_string(),
45 }
46 }
47
48 #[inline]
50 fn prefixed_key(&self, suffix: &str) -> String {
51 if self.prefix.is_empty() {
52 suffix.to_string()
53 } else {
54 format!("{}{}", self.prefix, suffix)
55 }
56 }
57
58 pub fn key_prefix(&self) -> &str {
60 &self.prefix
61 }
62
63 #[instrument(skip(self))]
65 pub async fn get_hash(&self, path: &str) -> Result<Option<[u8; 32]>, StorageError> {
66 let key = self.prefixed_key(&format!("{}{}", MERKLE_HASH_PREFIX, path));
67 let mut conn = self.conn.clone();
68
69 let result: Option<String> = conn.get(&key).await.map_err(|e| {
70 StorageError::Backend(format!("Failed to get merkle hash: {}", e))
71 })?;
72
73 match result {
74 Some(hex_str) => {
75 let bytes = hex::decode(&hex_str).map_err(|e| {
76 StorageError::Backend(format!("Invalid merkle hash hex: {}", e))
77 })?;
78 if bytes.len() != 32 {
79 return Err(StorageError::Backend(format!(
80 "Invalid merkle hash length: {}",
81 bytes.len()
82 )));
83 }
84 let mut hash = [0u8; 32];
85 hash.copy_from_slice(&bytes);
86 Ok(Some(hash))
87 }
88 None => Ok(None),
89 }
90 }
91
92 #[instrument(skip(self))]
94 pub async fn get_children(
95 &self,
96 path: &str,
97 ) -> Result<BTreeMap<String, [u8; 32]>, StorageError> {
98 let key = self.prefixed_key(&format!("{}{}", MERKLE_CHILDREN_PREFIX, path));
99 let mut conn = self.conn.clone();
100
101 let members: Vec<String> = conn.zrange(&key, 0, -1).await.map_err(|e| {
103 StorageError::Backend(format!("Failed to get merkle children: {}", e))
104 })?;
105
106 let mut children: BTreeMap<String, [u8; 32]> = BTreeMap::new();
107 for member in &members {
108 let member_str: &str = member.as_str();
110 if let Some((segment, hash_hex)) = member_str.split_once(':') {
111 let bytes = hex::decode(hash_hex).map_err(|e| {
112 StorageError::Backend(format!("Invalid child hash hex: {}", e))
113 })?;
114 if bytes.len() == 32 {
115 let mut hash = [0u8; 32];
116 hash.copy_from_slice(&bytes);
117 children.insert(segment.to_string(), hash);
118 }
119 }
120 }
121
122 Ok(children)
123 }
124
125 pub async fn get_node(&self, prefix: &str) -> Result<Option<MerkleNode>, StorageError> {
127 let hash = self.get_hash(prefix).await?;
128
129 match hash {
130 Some(h) => {
131 let children: BTreeMap<String, [u8; 32]> = self.get_children(prefix).await?;
132 Ok(Some(if children.is_empty() {
133 MerkleNode::leaf(h)
134 } else {
135 MerkleNode {
136 hash: h,
137 children,
138 is_leaf: false,
139 }
140 }))
141 }
142 None => Ok(None),
143 }
144 }
145
146 #[instrument(skip(self, batch), fields(batch_size = batch.len()))]
151 pub async fn apply_batch(&self, batch: &MerkleBatch) -> Result<(), StorageError> {
152 if batch.is_empty() {
153 return Ok(());
154 }
155
156 let mut conn = self.conn.clone();
157 let mut pipe = redis::pipe();
158 pipe.atomic();
159
160 for (object_id, maybe_hash) in &batch.leaves {
162 let hash_key = self.prefixed_key(&format!("{}{}", MERKLE_HASH_PREFIX, object_id));
163
164 match maybe_hash {
165 Some(hash) => {
166 let hex_str = hex::encode(hash);
167 pipe.set(&hash_key, &hex_str);
168 debug!(object_id = %object_id, "Setting leaf hash");
169 }
170 None => {
171 pipe.del(&hash_key);
172 debug!(object_id = %object_id, "Deleting leaf hash");
173 }
174 }
175 }
176
177 pipe.query_async::<()>(&mut conn).await.map_err(|e| {
179 StorageError::Backend(format!("Failed to apply merkle leaf updates: {}", e))
180 })?;
181
182 let affected_prefixes = batch.affected_prefixes();
184
185 for prefix in affected_prefixes {
186 self.recompute_interior_node(&prefix).await?;
187 }
188
189 Ok(())
190 }
191
192 #[instrument(skip(self))]
194 async fn recompute_interior_node(&self, prefix: &str) -> Result<(), StorageError> {
195 let mut conn = self.conn.clone();
196
197 let prefix_with_dot = if prefix.is_empty() {
199 String::new()
200 } else {
201 format!("{}.", prefix)
202 };
203
204 let scan_pattern = if prefix.is_empty() {
206 self.prefixed_key(&format!("{}*", MERKLE_HASH_PREFIX))
207 } else {
208 self.prefixed_key(&format!("{}{}.*", MERKLE_HASH_PREFIX, prefix))
209 };
210
211 let full_hash_prefix = self.prefixed_key(MERKLE_HASH_PREFIX);
213
214 let mut keys: Vec<String> = Vec::new();
215 let mut cursor = 0u64;
216
217 loop {
218 let (new_cursor, batch): (u64, Vec<String>) = redis::cmd("SCAN")
219 .arg(cursor)
220 .arg("MATCH")
221 .arg(&scan_pattern)
222 .arg("COUNT")
223 .arg(100) .query_async(&mut conn)
225 .await
226 .map_err(|e| StorageError::Backend(format!("Failed to scan merkle keys: {}", e)))?;
227
228 keys.extend(batch);
229 cursor = new_cursor;
230
231 if cursor == 0 {
232 break;
233 }
234 }
235
236 let mut direct_children: Vec<(String, String)> = Vec::new(); for key in &keys {
239 let path: &str = key.strip_prefix(&full_hash_prefix).unwrap_or(key.as_str());
241
242 let suffix: &str = if prefix.is_empty() {
244 path
245 } else {
246 match path.strip_prefix(&prefix_with_dot) {
247 Some(s) => s,
248 None => continue,
249 }
250 };
251
252 if let Some(segment) = suffix.split('.').next() {
254 if segment == suffix || !suffix.contains('.') {
256 direct_children.push((segment.to_string(), key.clone()));
257 }
258 }
259 }
260
261 if direct_children.is_empty() {
262 return Ok(());
264 }
265
266 let mut children: BTreeMap<String, [u8; 32]> = BTreeMap::new();
268
269 const MGET_CHUNK_SIZE: usize = 1000;
271 for chunk in direct_children.chunks(MGET_CHUNK_SIZE) {
272 let keys: Vec<String> = chunk.iter().map(|(_, k)| k.clone()).collect();
273 let segments: Vec<String> = chunk.iter().map(|(s, _)| s.clone()).collect();
274
275 let hex_hashes: Vec<Option<String>> = conn.mget(&keys).await.map_err(|e| {
276 StorageError::Backend(format!("Failed to batch get merkle hashes: {}", e))
277 })?;
278
279 for (i, maybe_hex) in hex_hashes.into_iter().enumerate() {
280 if let Some(hex_str) = maybe_hex {
281 if let Ok(bytes) = hex::decode(&hex_str) {
282 if bytes.len() == 32 {
283 let mut hash = [0u8; 32];
284 hash.copy_from_slice(&bytes);
285 children.insert(segments[i].clone(), hash);
286 }
287 }
288 }
289 }
290 }
291
292 if children.is_empty() {
293 return Ok(());
295 }
296
297 let node = MerkleNode::interior(children.clone());
299 let hash_hex = hex::encode(node.hash);
300
301 let hash_key = self.prefixed_key(&format!("{}{}", MERKLE_HASH_PREFIX, prefix));
303 let children_key = self.prefixed_key(&format!("{}{}", MERKLE_CHILDREN_PREFIX, prefix));
304
305 let mut pipe = redis::pipe();
306 pipe.atomic();
307 pipe.set(&hash_key, &hash_hex);
308
309 pipe.del(&children_key);
311 for (segment, hash) in &children {
312 let member = format!("{}:{}", segment, hex::encode(hash));
313 pipe.zadd(&children_key, &member, 0i64);
314 }
315
316 pipe.query_async::<()>(&mut conn).await.map_err(|e| {
317 StorageError::Backend(format!("Failed to update interior node: {}", e))
318 })?;
319
320 debug!(prefix = %prefix, children_count = children.len(), "Recomputed interior node");
321
322 Ok(())
323 }
324
325 pub async fn root_hash(&self) -> Result<Option<[u8; 32]>, StorageError> {
327 self.recompute_interior_node("").await?;
329
330 let key = self.prefixed_key(MERKLE_HASH_PREFIX);
332 let mut conn = self.conn.clone();
333
334 let result: Option<String> = conn.get(&key).await.map_err(|e| {
335 StorageError::Backend(format!("Failed to get root hash: {}", e))
336 })?;
337
338 match result {
339 Some(hex_str) => {
340 let bytes = hex::decode(&hex_str).map_err(|e| {
341 StorageError::Backend(format!("Invalid root hash hex: {}", e))
342 })?;
343 if bytes.len() != 32 {
344 return Err(StorageError::Backend(format!(
345 "Invalid root hash length: {}",
346 bytes.len()
347 )));
348 }
349 let mut hash = [0u8; 32];
350 hash.copy_from_slice(&bytes);
351 Ok(Some(hash))
352 }
353 None => Ok(None),
354 }
355 }
356
357 #[instrument(skip(self, their_children))]
361 pub async fn diff_children(
362 &self,
363 prefix: &str,
364 their_children: &BTreeMap<String, [u8; 32]>,
365 ) -> Result<Vec<String>, StorageError> {
366 let our_children: BTreeMap<String, [u8; 32]> = self.get_children(prefix).await?;
367 let mut diffs = Vec::new();
368
369 let prefix_with_dot = if prefix.is_empty() {
370 String::new()
371 } else {
372 format!("{}.", prefix)
373 };
374
375 for (segment, our_hash) in &our_children {
377 match their_children.get(segment) {
378 Some(their_hash) if their_hash != our_hash => {
379 diffs.push(format!("{}{}", prefix_with_dot, segment));
380 }
381 None => {
382 diffs.push(format!("{}{}", prefix_with_dot, segment));
384 }
385 _ => {} }
387 }
388
389 for segment in their_children.keys() {
391 if !our_children.contains_key(segment) {
392 diffs.push(format!("{}{}", prefix_with_dot, segment));
393 }
394 }
395
396 Ok(diffs)
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use super::*;
403
404 #[test]
405 fn test_key_prefixes() {
406 assert_eq!(
407 format!("{}{}", MERKLE_HASH_PREFIX, "uk.nhs.patient"),
408 "merkle:hash:uk.nhs.patient"
409 );
410 assert_eq!(
411 format!("{}{}", MERKLE_CHILDREN_PREFIX, "uk.nhs"),
412 "merkle:children:uk.nhs"
413 );
414 }
415}