sync_engine/storage/
redis.rs

1//! Redis storage backend for L2 cache.
2//!
3//! Content-type aware storage using RedisJSON (Redis Stack):
4//! - **JSON content** → `JSON.SET` with full structure preserved → RedisSearch indexable!
5//! - **Binary content** → Redis STRING (SET) for efficient blob storage
6//!
7//! JSON documents are stored with a clean, flat structure:
8//! ```json
9//! {
10//!   "version": 1,
11//!   "timestamp": 1767084657058,
12//!   "payload_hash": "abc123...",
13//!   "payload": {"name": "Alice", "role": "admin"},
14//!   "audit": {"batch": "...", "trace": "...", "home": "..."}
15//! }
16//! ```
17//!
18//! This enables powerful search with RediSearch ON JSON:
19//! ```text
20//! FT.CREATE idx ON JSON PREFIX 1 sync: SCHEMA $.payload.name AS name TEXT
21//! FT.SEARCH idx '@name:Alice'
22//! ```
23
24use async_trait::async_trait;
25use redis::aio::ConnectionManager;
26use redis::{Client, AsyncCommands, pipe, cmd};
27use crate::sync_item::{SyncItem, ContentType};
28use super::traits::{BatchWriteResult, CacheStore, StorageError};
29use crate::resilience::retry::{retry, RetryConfig};
30
31pub struct RedisStore {
32    connection: ConnectionManager,
33    /// Optional key prefix for namespacing (e.g., "myapp:" → "myapp:user.alice")
34    prefix: String,
35}
36
37impl RedisStore {
38    /// Create a new Redis store without a key prefix.
39    pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
40        Self::with_prefix(connection_string, None).await
41    }
42    
43    /// Create a new Redis store with an optional key prefix.
44    /// 
45    /// The prefix is prepended to all keys, enabling namespacing when
46    /// sharing a Redis instance with other applications.
47    /// 
48    /// # Example
49    /// 
50    /// ```rust,no_run
51    /// # use sync_engine::storage::redis::RedisStore;
52    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
53    /// // Keys will be prefixed: "myapp:user.alice", "myapp:config.app"
54    /// let store = RedisStore::with_prefix("redis://localhost", Some("myapp:")).await?;
55    /// # Ok(())
56    /// # }
57    /// ```
58    pub async fn with_prefix(connection_string: &str, prefix: Option<&str>) -> Result<Self, StorageError> {
59        let client = Client::open(connection_string)
60            .map_err(|e| StorageError::Backend(e.to_string()))?;
61
62        // Use startup config: fast-fail after ~30s, don't hang forever
63        let connection = retry("redis_connect", &RetryConfig::startup(), || async {
64            ConnectionManager::new(client.clone()).await
65        })
66        .await
67        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
68
69        Ok(Self { 
70            connection,
71            prefix: prefix.unwrap_or("").to_string(),
72        })
73    }
74    
75    /// Apply the prefix to a key.
76    #[inline]
77    fn prefixed_key(&self, key: &str) -> String {
78        if self.prefix.is_empty() {
79            key.to_string()
80        } else {
81            format!("{}{}", self.prefix, key)
82        }
83    }
84    
85    /// Strip the prefix from a key (for returning clean IDs).
86    /// Will be used when implementing key iteration/scanning.
87    #[inline]
88    #[allow(dead_code)]
89    fn strip_prefix<'a>(&self, key: &'a str) -> &'a str {
90        if self.prefix.is_empty() {
91            key
92        } else {
93            key.strip_prefix(&self.prefix).unwrap_or(key)
94        }
95    }
96
97    /// Get a clone of the connection manager (for sharing with MerkleStore)
98    pub fn connection(&self) -> ConnectionManager {
99        self.connection.clone()
100    }
101    
102    /// Get the configured prefix
103    pub fn prefix(&self) -> &str {
104        &self.prefix
105    }
106    
107    /// Build the JSON document for RedisJSON storage.
108    /// 
109    /// Structure (flat with nested audit):
110    /// ```json
111    /// {
112    ///   "version": 1,
113    ///   "timestamp": 1767084657058,
114    ///   "payload_hash": "abc123...",
115    ///   "payload": {"name": "Alice", ...},
116    ///   "audit": {"batch": "...", "trace": "...", "home": "..."}
117    /// }
118    /// ```
119    fn build_json_document(item: &SyncItem) -> Result<String, StorageError> {
120        // Parse user content as JSON
121        let payload: serde_json::Value = serde_json::from_slice(&item.content)
122            .map_err(|e| StorageError::Backend(format!("Invalid JSON content: {}", e)))?;
123        
124        // Build audit object (internal operational metadata)
125        let mut audit = serde_json::Map::new();
126        if let Some(ref batch_id) = item.batch_id {
127            audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
128        }
129        if let Some(ref trace_parent) = item.trace_parent {
130            audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
131        }
132        if let Some(ref home) = item.home_instance_id {
133            audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
134        }
135        
136        // Build final document (flat structure)
137        let mut doc = serde_json::json!({
138            "version": item.version,
139            "timestamp": item.updated_at,
140            "payload": payload
141        });
142        
143        // Only include payload_hash if non-empty
144        if !item.merkle_root.is_empty() {
145            doc["payload_hash"] = serde_json::Value::String(item.merkle_root.clone());
146        }
147        
148        // Only include audit if there's something in it
149        if !audit.is_empty() {
150            doc["audit"] = serde_json::Value::Object(audit);
151        }
152        
153        serde_json::to_string(&doc)
154            .map_err(|e| StorageError::Backend(e.to_string()))
155    }
156    
157    /// Parse a RedisJSON document back into a SyncItem.
158    fn parse_json_document(id: &str, json_str: &str) -> Result<SyncItem, StorageError> {
159        let doc: serde_json::Value = serde_json::from_str(json_str)
160            .map_err(|e| StorageError::Backend(format!("Invalid JSON document: {}", e)))?;
161        
162        // Top-level fields
163        let version = doc.get("version").and_then(|v| v.as_u64()).unwrap_or(1);
164        let updated_at = doc.get("timestamp").and_then(|v| v.as_i64()).unwrap_or(0);
165        let merkle_root = doc.get("payload_hash").and_then(|v| v.as_str()).unwrap_or("").to_string();
166        
167        // Audit fields (nested)
168        let audit = doc.get("audit");
169        let batch_id = audit.and_then(|a| a.get("batch")).and_then(|v| v.as_str()).map(String::from);
170        let trace_parent = audit.and_then(|a| a.get("trace")).and_then(|v| v.as_str()).map(String::from);
171        let home_instance_id = audit.and_then(|a| a.get("home")).and_then(|v| v.as_str()).map(String::from);
172        
173        // Extract payload and serialize back to bytes
174        let payload = doc.get("payload").cloned().unwrap_or(serde_json::Value::Null);
175        let content = serde_json::to_vec(&payload)
176            .map_err(|e| StorageError::Backend(e.to_string()))?;
177        
178        Ok(SyncItem::reconstruct(
179            id.to_string(),
180            version,
181            updated_at,
182            ContentType::Json,
183            content,
184            batch_id,
185            trace_parent,
186            merkle_root,
187            home_instance_id,
188        ))
189    }
190}
191
192#[async_trait]
193impl CacheStore for RedisStore {
194    async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
195        let conn = self.connection.clone();
196        let prefixed_id = self.prefixed_key(id);
197        let original_id = id.to_string();
198        
199        // Check the type of the key to determine how to read it
200        let key_type: Option<String> = retry("redis_type", &RetryConfig::query(), || {
201            let mut conn = conn.clone();
202            let key = prefixed_id.clone();
203            async move {
204                let t: String = redis::cmd("TYPE").arg(&key).query_async(&mut conn).await?;
205                Ok(if t == "none" { None } else { Some(t) })
206            }
207        })
208        .await
209        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
210        
211        match key_type.as_deref() {
212            None => Ok(None), // Key doesn't exist
213            Some("ReJSON-RL") => {
214                // RedisJSON document - use JSON.GET
215                let json_str: Option<String> = retry("redis_json_get", &RetryConfig::query(), || {
216                    let mut conn = conn.clone();
217                    let key = prefixed_id.clone();
218                    async move {
219                        let data: Option<String> = cmd("JSON.GET")
220                            .arg(&key)
221                            .query_async(&mut conn)
222                            .await?;
223                        Ok(data)
224                    }
225                })
226                .await
227                .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
228                
229                match json_str {
230                    Some(s) => Self::parse_json_document(&original_id, &s).map(Some),
231                    None => Ok(None),
232                }
233            }
234            Some("string") => {
235                // Binary content or legacy format - read as bytes
236                let data: Option<Vec<u8>> = retry("redis_get", &RetryConfig::query(), || {
237                    let mut conn = conn.clone();
238                    let key = prefixed_id.clone();
239                    async move {
240                        let data: Option<Vec<u8>> = conn.get(&key).await?;
241                        Ok(data)
242                    }
243                })
244                .await
245                .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
246                
247                data.map(|bytes| serde_json::from_slice(&bytes).map_err(|e| StorageError::Backend(e.to_string())))
248                    .transpose()
249            }
250            Some(other) => {
251                Err(StorageError::Backend(format!("Unexpected Redis key type: {}", other)))
252            }
253        }
254    }
255
256    async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
257        let conn = self.connection.clone();
258        let prefixed_id = self.prefixed_key(&item.object_id);
259        
260        match item.content_type {
261            ContentType::Json => {
262                // Build JSON document with metadata wrapper
263                let json_doc = Self::build_json_document(item)?;
264                
265                retry("redis_json_set", &RetryConfig::query(), || {
266                    let mut conn = conn.clone();
267                    let key = prefixed_id.clone();
268                    let doc = json_doc.clone();
269                    async move {
270                        // JSON.SET key $ <json>
271                        let _: () = cmd("JSON.SET")
272                            .arg(&key)
273                            .arg("$")
274                            .arg(&doc)
275                            .query_async(&mut conn)
276                            .await?;
277                        Ok(())
278                    }
279                })
280                .await
281                .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
282            }
283            ContentType::Binary => {
284                // Store as serialized blob (binary content)
285                let data = serde_json::to_vec(item)
286                    .map_err(|e| StorageError::Backend(e.to_string()))?;
287
288                retry("redis_set", &RetryConfig::query(), || {
289                    let mut conn = conn.clone();
290                    let key = prefixed_id.clone();
291                    let data = data.clone();
292                    async move {
293                        let _: () = conn.set(&key, &data).await?;
294                        Ok(())
295                    }
296                })
297                .await
298                .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
299            }
300        }
301    }
302
303    async fn delete(&self, id: &str) -> Result<(), StorageError> {
304        let conn = self.connection.clone();
305        let prefixed_id = self.prefixed_key(id);
306
307        retry("redis_delete", &RetryConfig::query(), || {
308            let mut conn = conn.clone();
309            let key = prefixed_id.clone();
310            async move {
311                let _: () = conn.del(&key).await?;
312                Ok(())
313            }
314        })
315        .await
316        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
317    }
318
319    async fn exists(&self, id: &str) -> Result<bool, StorageError> {
320        let conn = self.connection.clone();
321        let prefixed_id = self.prefixed_key(id);
322
323        retry("redis_exists", &RetryConfig::query(), || {
324            let mut conn = conn.clone();
325            let key = prefixed_id.clone();
326            async move {
327                let exists: bool = conn.exists(&key).await?;
328                Ok(exists)
329            }
330        })
331        .await
332        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
333    }
334
335    /// Write a batch of items using Redis pipeline (atomic, much faster than individual SETs).
336    async fn put_batch(&self, items: &[SyncItem]) -> Result<BatchWriteResult, StorageError> {
337        self.put_batch_with_ttl(items, None).await
338    }
339    
340    /// Write a batch of items with optional TTL.
341    async fn put_batch_with_ttl(&self, items: &[SyncItem], ttl_secs: Option<u64>) -> Result<BatchWriteResult, StorageError> {
342        self.put_batch_impl(items, ttl_secs).await
343    }
344}
345
346impl RedisStore {
347    /// Pipelined batch write implementation with content-type aware storage.
348    /// Uses JSON.SET for JSON content, SET for binary blobs.
349    async fn put_batch_impl(&self, items: &[SyncItem], ttl_secs: Option<u64>) -> Result<BatchWriteResult, StorageError> {
350        if items.is_empty() {
351            return Ok(BatchWriteResult {
352                batch_id: String::new(),
353                written: 0,
354                verified: true,
355            });
356        }
357
358        // Prepare items: JSON → JSON.SET document, Binary → serialized bytes
359        #[derive(Clone)]
360        enum PreparedItem {
361            Json { key: String, doc: String },
362            Blob { key: String, data: Vec<u8> },
363        }
364        
365        let prepared: Result<Vec<_>, _> = items.iter()
366            .map(|item| {
367                let prefixed_key = self.prefixed_key(&item.object_id);
368                match item.content_type {
369                    ContentType::Json => {
370                        Self::build_json_document(item)
371                            .map(|doc| PreparedItem::Json { key: prefixed_key, doc })
372                    }
373                    ContentType::Binary => {
374                        serde_json::to_vec(item)
375                            .map(|bytes| PreparedItem::Blob { key: prefixed_key, data: bytes })
376                            .map_err(|e| StorageError::Backend(e.to_string()))
377                    }
378                }
379            })
380            .collect();
381        let prepared = prepared?;
382        let count = prepared.len();
383
384        let conn = self.connection.clone();
385        
386        retry("redis_put_batch", &RetryConfig::query(), || {
387            let mut conn = conn.clone();
388            let prepared = prepared.clone();
389            async move {
390                let mut pipeline = pipe();
391                
392                for item in &prepared {
393                    match item {
394                        PreparedItem::Json { key, doc } => {
395                            // JSON.SET key $ <json>
396                            pipeline.cmd("JSON.SET").arg(key).arg("$").arg(doc);
397                            if let Some(ttl) = ttl_secs {
398                                pipeline.expire(key, ttl as i64);
399                            }
400                        }
401                        PreparedItem::Blob { key, data } => {
402                            // SET for binary content
403                            if let Some(ttl) = ttl_secs {
404                                pipeline.cmd("SETEX").arg(key).arg(ttl as i64).arg(data.as_slice());
405                            } else {
406                                pipeline.set(key, data.as_slice());
407                            }
408                        }
409                    }
410                }
411                
412                pipeline.query_async::<()>(&mut conn).await?;
413                Ok(())
414            }
415        })
416        .await
417        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
418
419        Ok(BatchWriteResult {
420            batch_id: String::new(),
421            written: count,
422            verified: true,
423        })
424    }
425
426    /// Check if multiple keys exist in Redis (pipelined).
427    /// Returns a vec of bools matching the input order.
428    pub async fn exists_batch(&self, ids: &[String]) -> Result<Vec<bool>, StorageError> {
429        if ids.is_empty() {
430            return Ok(vec![]);
431        }
432
433        let conn = self.connection.clone();
434        // Apply prefix to all keys
435        let prefixed_ids: Vec<String> = ids.iter().map(|id| self.prefixed_key(id)).collect();
436
437        retry("redis_exists_batch", &RetryConfig::query(), || {
438            let mut conn = conn.clone();
439            let prefixed_ids = prefixed_ids.clone();
440            async move {
441                let mut pipeline = pipe();
442                for key in &prefixed_ids {
443                    pipeline.exists(key);
444                }
445                
446                let results: Vec<bool> = pipeline.query_async(&mut conn).await?;
447                Ok(results)
448            }
449        })
450        .await
451        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
452    }
453}