1use super::path_tree::MerkleNode;
22use crate::StorageError;
23use redis::aio::ConnectionManager;
24use redis::AsyncCommands;
25use std::collections::BTreeMap;
26use tracing::{debug, instrument};
27
28const MERKLE_HASH_PREFIX: &str = "merkle:hash:";
30const MERKLE_CHILDREN_PREFIX: &str = "merkle:children:";
31
32#[derive(Clone)]
37pub struct MerkleCacheStore {
38 conn: ConnectionManager,
39 prefix: String,
41}
42
43impl MerkleCacheStore {
44 pub fn new(conn: ConnectionManager) -> Self {
46 Self::with_prefix(conn, None)
47 }
48
49 pub fn with_prefix(conn: ConnectionManager, prefix: Option<&str>) -> Self {
51 Self {
52 conn,
53 prefix: prefix.unwrap_or("").to_string(),
54 }
55 }
56
57 #[inline]
59 fn prefixed_key(&self, suffix: &str) -> String {
60 if self.prefix.is_empty() {
61 suffix.to_string()
62 } else {
63 format!("{}{}", self.prefix, suffix)
64 }
65 }
66
67 pub fn key_prefix(&self) -> &str {
69 &self.prefix
70 }
71
72 #[instrument(skip(self))]
78 pub async fn get_hash(&self, path: &str) -> Result<Option<[u8; 32]>, StorageError> {
79 let key = self.prefixed_key(&format!("{}{}", MERKLE_HASH_PREFIX, path));
80 let mut conn = self.conn.clone();
81
82 let result: Option<String> = conn.get(&key).await.map_err(|e| {
83 StorageError::Backend(format!("Failed to get merkle hash from cache: {}", e))
84 })?;
85
86 match result {
87 Some(hex_str) => {
88 let bytes = hex::decode(&hex_str).map_err(|e| {
89 StorageError::Backend(format!("Invalid merkle hash hex in cache: {}", e))
90 })?;
91 if bytes.len() != 32 {
92 return Err(StorageError::Backend(format!(
93 "Invalid merkle hash length in cache: {}",
94 bytes.len()
95 )));
96 }
97 let mut hash = [0u8; 32];
98 hash.copy_from_slice(&bytes);
99 Ok(Some(hash))
100 }
101 None => Ok(None),
102 }
103 }
104
105 pub async fn root_hash(&self) -> Result<Option<[u8; 32]>, StorageError> {
107 self.get_hash("").await
108 }
109
110 #[instrument(skip(self))]
112 pub async fn get_children(
113 &self,
114 path: &str,
115 ) -> Result<BTreeMap<String, [u8; 32]>, StorageError> {
116 let key = self.prefixed_key(&format!("{}{}", MERKLE_CHILDREN_PREFIX, path));
117 let mut conn = self.conn.clone();
118
119 let members: Vec<String> = conn.zrange(&key, 0, -1).await.map_err(|e| {
121 StorageError::Backend(format!("Failed to get merkle children from cache: {}", e))
122 })?;
123
124 let mut children: BTreeMap<String, [u8; 32]> = BTreeMap::new();
125 for member in &members {
126 if let Some((segment, hash_hex)) = member.split_once(':') {
128 if let Ok(bytes) = hex::decode(hash_hex) {
129 if bytes.len() == 32 {
130 let mut hash = [0u8; 32];
131 hash.copy_from_slice(&bytes);
132 children.insert(segment.to_string(), hash);
133 }
134 }
135 }
136 }
137
138 Ok(children)
139 }
140
141 pub async fn get_node(&self, path: &str) -> Result<Option<MerkleNode>, StorageError> {
143 let hash = self.get_hash(path).await?;
144
145 match hash {
146 Some(h) => {
147 let children = self.get_children(path).await?;
148 Ok(Some(if children.is_empty() {
149 MerkleNode::leaf(h)
150 } else {
151 MerkleNode {
152 hash: h,
153 children,
154 is_leaf: false,
155 }
156 }))
157 }
158 None => Ok(None),
159 }
160 }
161
162 pub async fn cache_node(
170 &self,
171 path: &str,
172 hash: [u8; 32],
173 children: &BTreeMap<String, [u8; 32]>,
174 ) -> Result<(), StorageError> {
175 let hash_key = self.prefixed_key(&format!("{}{}", MERKLE_HASH_PREFIX, path));
176 let children_key = self.prefixed_key(&format!("{}{}", MERKLE_CHILDREN_PREFIX, path));
177
178 let hash_hex = hex::encode(hash);
179 let mut conn = self.conn.clone();
180
181 let mut pipe = redis::pipe();
182 pipe.atomic();
183
184 pipe.set(&hash_key, &hash_hex);
186
187 pipe.del(&children_key);
189 for (segment, child_hash) in children {
190 let member = format!("{}:{}", segment, hex::encode(child_hash));
191 pipe.zadd(&children_key, &member, 0i64);
192 }
193
194 pipe.query_async::<()>(&mut conn).await.map_err(|e| {
195 StorageError::Backend(format!("Failed to cache merkle node: {}", e))
196 })?;
197
198 Ok(())
199 }
200
201 pub async fn delete_node(&self, path: &str) -> Result<(), StorageError> {
203 let hash_key = self.prefixed_key(&format!("{}{}", MERKLE_HASH_PREFIX, path));
204 let children_key = self.prefixed_key(&format!("{}{}", MERKLE_CHILDREN_PREFIX, path));
205
206 let mut conn = self.conn.clone();
207 let mut pipe = redis::pipe();
208 pipe.del(&hash_key);
209 pipe.del(&children_key);
210
211 pipe.query_async::<()>(&mut conn).await.map_err(|e| {
212 StorageError::Backend(format!("Failed to delete cached merkle node: {}", e))
213 })?;
214
215 Ok(())
216 }
217
218 #[instrument(skip(self, sql_store))]
223 pub async fn sync_from_sql(
224 &self,
225 sql_store: &super::SqlMerkleStore,
226 ) -> Result<usize, StorageError> {
227 let nodes = sql_store.get_all_nodes().await?;
229 let count = nodes.len();
230
231 if count == 0 {
232 debug!("No SQL merkle nodes to cache");
233 return Ok(0);
234 }
235
236 let mut conn = self.conn.clone();
238 let mut pipe = redis::pipe();
239 pipe.atomic();
240
241 for (path, hash, children) in &nodes {
242 let hash_key = self.prefixed_key(&format!("{}{}", MERKLE_HASH_PREFIX, path));
243 let children_key = self.prefixed_key(&format!("{}{}", MERKLE_CHILDREN_PREFIX, path));
244
245 pipe.set(&hash_key, hex::encode(hash));
246 pipe.del(&children_key);
247
248 for (segment, child_hash) in children {
249 let member = format!("{}:{}", segment, hex::encode(child_hash));
250 pipe.zadd(&children_key, &member, 0i64);
251 }
252 }
253
254 pipe.query_async::<()>(&mut conn).await.map_err(|e| {
255 StorageError::Backend(format!("Failed to sync merkle cache from SQL: {}", e))
256 })?;
257
258 debug!(nodes_cached = count, "Synced SQL merkle tree to cache");
259 Ok(count)
260 }
261
262 #[instrument(skip(self, sql_store, affected_paths))]
267 pub async fn sync_affected_from_sql(
268 &self,
269 sql_store: &super::SqlMerkleStore,
270 affected_paths: &[String],
271 ) -> Result<usize, StorageError> {
272 use std::collections::HashSet;
273 use super::PathMerkle;
274
275 if affected_paths.is_empty() {
276 return Ok(0);
277 }
278
279 let mut paths_to_sync: HashSet<String> = HashSet::new();
281 paths_to_sync.insert(String::new()); for path in affected_paths {
284 paths_to_sync.insert(path.clone());
285 for ancestor in PathMerkle::ancestor_prefixes(path) {
287 paths_to_sync.insert(ancestor);
288 }
289 }
290
291 let mut conn = self.conn.clone();
292 let mut pipe = redis::pipe();
293 pipe.atomic();
294 let mut count = 0;
295
296 for path in &paths_to_sync {
297 if let Ok(Some(hash)) = sql_store.get_hash(path).await {
299 let hash_key = self.prefixed_key(&format!("{}{}", MERKLE_HASH_PREFIX, path));
300 pipe.set(&hash_key, hex::encode(hash));
301 count += 1;
302
303 if let Ok(children) = sql_store.get_children(path).await {
305 if !children.is_empty() {
306 let children_key = self.prefixed_key(&format!("{}{}", MERKLE_CHILDREN_PREFIX, path));
307 pipe.del(&children_key);
308 for (segment, child_hash) in &children {
309 let member = format!("{}:{}", segment, hex::encode(child_hash));
310 pipe.zadd(&children_key, &member, 0i64);
311 }
312 }
313 }
314 }
315 }
316
317 if count > 0 {
318 pipe.query_async::<()>(&mut conn).await.map_err(|e| {
319 StorageError::Backend(format!("Failed to sync affected merkle nodes to cache: {}", e))
320 })?;
321 }
322
323 debug!(paths_synced = count, "Synced affected merkle paths to cache");
324 Ok(count)
325 }
326
327 #[instrument(skip(self, their_children))]
331 pub async fn diff_children(
332 &self,
333 prefix: &str,
334 their_children: &BTreeMap<String, [u8; 32]>,
335 ) -> Result<Vec<String>, StorageError> {
336 let our_children = self.get_children(prefix).await?;
337 let mut diffs = Vec::new();
338
339 let prefix_with_dot = if prefix.is_empty() {
340 String::new()
341 } else {
342 format!("{}.", prefix)
343 };
344
345 for (segment, our_hash) in &our_children {
347 match their_children.get(segment) {
348 Some(their_hash) if their_hash != our_hash => {
349 diffs.push(format!("{}{}", prefix_with_dot, segment));
350 }
351 None => {
352 diffs.push(format!("{}{}", prefix_with_dot, segment));
353 }
354 _ => {}
355 }
356 }
357
358 for segment in their_children.keys() {
360 if !our_children.contains_key(segment) {
361 diffs.push(format!("{}{}", prefix_with_dot, segment));
362 }
363 }
364
365 Ok(diffs)
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372
373 #[test]
374 fn test_key_prefixes() {
375 assert_eq!(
376 format!("{}{}", MERKLE_HASH_PREFIX, "uk.nhs.patient"),
377 "merkle:hash:uk.nhs.patient"
378 );
379 assert_eq!(
380 format!("{}{}", MERKLE_CHILDREN_PREFIX, "uk.nhs"),
381 "merkle:children:uk.nhs"
382 );
383 }
384}