sync_engine/storage/
redis.rs

1//! Redis storage backend for L2 cache.
2//!
3//! Content-type aware storage using RedisJSON (Redis Stack):
4//! - **JSON content** → `JSON.SET` with full structure preserved → RedisSearch indexable!
5//! - **Binary content** → Redis STRING (SET) for efficient blob storage
6//!
7//! JSON documents are stored with a clean, flat structure:
8//! ```json
9//! {
10//!   "version": 1,
11//!   "timestamp": 1767084657058,
12//!   "payload_hash": "abc123...",
13//!   "payload": {"name": "Alice", "role": "admin"},
14//!   "audit": {"batch": "...", "trace": "...", "home": "..."}
15//! }
16//! ```
17//!
18//! This enables powerful search with RediSearch ON JSON:
19//! ```text
20//! FT.CREATE idx ON JSON PREFIX 1 sync: SCHEMA $.payload.name AS name TEXT
21//! FT.SEARCH idx '@name:Alice'
22//! ```
23
24use async_trait::async_trait;
25use redis::aio::ConnectionManager;
26use redis::{Client, AsyncCommands, pipe, cmd};
27use crate::sync_item::{SyncItem, ContentType};
28use super::traits::{BatchWriteResult, CacheStore, StorageError};
29use crate::resilience::retry::{retry, RetryConfig};
30
31pub struct RedisStore {
32    connection: ConnectionManager,
33    /// Optional key prefix for namespacing (e.g., "myapp:" → "myapp:user.alice")
34    prefix: String,
35}
36
37impl RedisStore {
38    /// Create a new Redis store without a key prefix.
39    pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
40        Self::with_prefix(connection_string, None).await
41    }
42    
43    /// Create a new Redis store with an optional key prefix.
44    /// 
45    /// The prefix is prepended to all keys, enabling namespacing when
46    /// sharing a Redis instance with other applications.
47    /// 
48    /// # Example
49    /// 
50    /// ```rust,no_run
51    /// # use sync_engine::storage::redis::RedisStore;
52    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
53    /// // Keys will be prefixed: "myapp:user.alice", "myapp:config.app"
54    /// let store = RedisStore::with_prefix("redis://localhost", Some("myapp:")).await?;
55    /// # Ok(())
56    /// # }
57    /// ```
58    pub async fn with_prefix(connection_string: &str, prefix: Option<&str>) -> Result<Self, StorageError> {
59        let client = Client::open(connection_string)
60            .map_err(|e| StorageError::Backend(e.to_string()))?;
61
62        // Use startup config: fast-fail after ~30s, don't hang forever
63        let connection = retry("redis_connect", &RetryConfig::startup(), || async {
64            ConnectionManager::new(client.clone()).await
65        })
66        .await
67        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
68
69        Ok(Self { 
70            connection,
71            prefix: prefix.unwrap_or("").to_string(),
72        })
73    }
74    
75    /// Apply the prefix to a key.
76    #[inline]
77    fn prefixed_key(&self, key: &str) -> String {
78        if self.prefix.is_empty() {
79            key.to_string()
80        } else {
81            format!("{}{}", self.prefix, key)
82        }
83    }
84    
85    /// Strip the prefix from a key (for returning clean IDs).
86    /// Will be used when implementing key iteration/scanning.
87    #[inline]
88    #[allow(dead_code)]
89    fn strip_prefix<'a>(&self, key: &'a str) -> &'a str {
90        if self.prefix.is_empty() {
91            key
92        } else {
93            key.strip_prefix(&self.prefix).unwrap_or(key)
94        }
95    }
96
97    /// Get a clone of the connection manager (for sharing with MerkleStore)
98    pub fn connection(&self) -> ConnectionManager {
99        self.connection.clone()
100    }
101    
102    /// Get the configured prefix
103    pub fn prefix(&self) -> &str {
104        &self.prefix
105    }
106    
107    /// Build the JSON document for RedisJSON storage.
108    /// 
109    /// Structure (flat with nested audit):
110    /// ```json
111    /// {
112    ///   "version": 1,
113    ///   "timestamp": 1767084657058,
114    ///   "payload_hash": "abc123...",
115    ///   "state": "default",
116    ///   "payload": {"name": "Alice", ...},
117    ///   "audit": {"batch": "...", "trace": "...", "home": "..."}
118    /// }
119    /// ```
120    fn build_json_document(item: &SyncItem) -> Result<String, StorageError> {
121        // Parse user content as JSON
122        let payload: serde_json::Value = serde_json::from_slice(&item.content)
123            .map_err(|e| StorageError::Backend(format!("Invalid JSON content: {}", e)))?;
124        
125        // Build audit object (internal operational metadata)
126        let mut audit = serde_json::Map::new();
127        if let Some(ref batch_id) = item.batch_id {
128            audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
129        }
130        if let Some(ref trace_parent) = item.trace_parent {
131            audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
132        }
133        if let Some(ref home) = item.home_instance_id {
134            audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
135        }
136        
137        // Build final document (flat structure)
138        let mut doc = serde_json::json!({
139            "version": item.version,
140            "timestamp": item.updated_at,
141            "state": item.state,
142            "payload": payload
143        });
144        
145        // Only include payload_hash if non-empty
146        if !item.content_hash.is_empty() {
147            doc["payload_hash"] = serde_json::Value::String(item.content_hash.clone());
148        }
149        
150        // Only include audit if there's something in it
151        if !audit.is_empty() {
152            doc["audit"] = serde_json::Value::Object(audit);
153        }
154        
155        serde_json::to_string(&doc)
156            .map_err(|e| StorageError::Backend(e.to_string()))
157    }
158    
159    /// Parse a RedisJSON document back into a SyncItem.
160    fn parse_json_document(id: &str, json_str: &str) -> Result<SyncItem, StorageError> {
161        let doc: serde_json::Value = serde_json::from_str(json_str)
162            .map_err(|e| StorageError::Backend(format!("Invalid JSON document: {}", e)))?;
163        
164        // Top-level fields
165        let version = doc.get("version").and_then(|v| v.as_u64()).unwrap_or(1);
166        let updated_at = doc.get("timestamp").and_then(|v| v.as_i64()).unwrap_or(0);
167        let content_hash = doc.get("payload_hash").and_then(|v| v.as_str()).unwrap_or("").to_string();
168        let state = doc.get("state").and_then(|v| v.as_str()).unwrap_or("default").to_string();
169        
170        // Audit fields (nested)
171        let audit = doc.get("audit");
172        let batch_id = audit.and_then(|a| a.get("batch")).and_then(|v| v.as_str()).map(String::from);
173        let trace_parent = audit.and_then(|a| a.get("trace")).and_then(|v| v.as_str()).map(String::from);
174        let home_instance_id = audit.and_then(|a| a.get("home")).and_then(|v| v.as_str()).map(String::from);
175        
176        // Extract payload and serialize back to bytes
177        let payload = doc.get("payload").cloned().unwrap_or(serde_json::Value::Null);
178        let content = serde_json::to_vec(&payload)
179            .map_err(|e| StorageError::Backend(e.to_string()))?;
180        
181        Ok(SyncItem::reconstruct(
182            id.to_string(),
183            version,
184            updated_at,
185            ContentType::Json,
186            content,
187            batch_id,
188            trace_parent,
189            content_hash,
190            home_instance_id,
191            state,
192        ))
193    }
194
195    /// Parse FT.SEARCH NOCONTENT response into a list of keys.
196    /// Response format: [count, key1, key2, ...]
197    fn parse_ft_search_response(value: redis::Value) -> Result<Vec<String>, redis::RedisError> {
198        match value {
199            redis::Value::Array(arr) => {
200                if arr.is_empty() {
201                    return Ok(vec![]);
202                }
203                // First element is count, rest are keys
204                let keys: Vec<String> = arr.into_iter()
205                    .skip(1) // Skip count
206                    .filter_map(|v| match v {
207                        redis::Value::BulkString(bytes) => String::from_utf8(bytes).ok(),
208                        redis::Value::SimpleString(s) => Some(s),
209                        _ => None,
210                    })
211                    .collect();
212                Ok(keys)
213            }
214            _ => Ok(vec![]),
215        }
216    }
217}
218
219#[async_trait]
220impl CacheStore for RedisStore {
221    async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
222        let conn = self.connection.clone();
223        let prefixed_id = self.prefixed_key(id);
224        let original_id = id.to_string();
225        
226        // Check the type of the key to determine how to read it
227        let key_type: Option<String> = retry("redis_type", &RetryConfig::query(), || {
228            let mut conn = conn.clone();
229            let key = prefixed_id.clone();
230            async move {
231                let t: String = redis::cmd("TYPE").arg(&key).query_async(&mut conn).await?;
232                Ok(if t == "none" { None } else { Some(t) })
233            }
234        })
235        .await
236        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
237        
238        match key_type.as_deref() {
239            None => Ok(None), // Key doesn't exist
240            Some("ReJSON-RL") => {
241                // RedisJSON document - use JSON.GET
242                let json_str: Option<String> = retry("redis_json_get", &RetryConfig::query(), || {
243                    let mut conn = conn.clone();
244                    let key = prefixed_id.clone();
245                    async move {
246                        let data: Option<String> = cmd("JSON.GET")
247                            .arg(&key)
248                            .query_async(&mut conn)
249                            .await?;
250                        Ok(data)
251                    }
252                })
253                .await
254                .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
255                
256                match json_str {
257                    Some(s) => Self::parse_json_document(&original_id, &s).map(Some),
258                    None => Ok(None),
259                }
260            }
261            Some("string") => {
262                // Binary content or legacy format - read as bytes
263                let data: Option<Vec<u8>> = retry("redis_get", &RetryConfig::query(), || {
264                    let mut conn = conn.clone();
265                    let key = prefixed_id.clone();
266                    async move {
267                        let data: Option<Vec<u8>> = conn.get(&key).await?;
268                        Ok(data)
269                    }
270                })
271                .await
272                .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
273                
274                data.map(|bytes| serde_json::from_slice(&bytes).map_err(|e| StorageError::Backend(e.to_string())))
275                    .transpose()
276            }
277            Some(other) => {
278                Err(StorageError::Backend(format!("Unexpected Redis key type: {}", other)))
279            }
280        }
281    }
282
283    async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
284        let conn = self.connection.clone();
285        let prefixed_id = self.prefixed_key(&item.object_id);
286        
287        match item.content_type {
288            ContentType::Json => {
289                // Build JSON document with metadata wrapper
290                let json_doc = Self::build_json_document(item)?;
291                
292                retry("redis_json_set", &RetryConfig::query(), || {
293                    let mut conn = conn.clone();
294                    let key = prefixed_id.clone();
295                    let doc = json_doc.clone();
296                    async move {
297                        // JSON.SET key $ <json>
298                        let _: () = cmd("JSON.SET")
299                            .arg(&key)
300                            .arg("$")
301                            .arg(&doc)
302                            .query_async(&mut conn)
303                            .await?;
304                        Ok(())
305                    }
306                })
307                .await
308                .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
309            }
310            ContentType::Binary => {
311                // Store as serialized blob (binary content)
312                let data = serde_json::to_vec(item)
313                    .map_err(|e| StorageError::Backend(e.to_string()))?;
314
315                retry("redis_set", &RetryConfig::query(), || {
316                    let mut conn = conn.clone();
317                    let key = prefixed_id.clone();
318                    let data = data.clone();
319                    async move {
320                        let _: () = conn.set(&key, &data).await?;
321                        Ok(())
322                    }
323                })
324                .await
325                .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
326            }
327        }
328    }
329
330    async fn delete(&self, id: &str) -> Result<(), StorageError> {
331        let conn = self.connection.clone();
332        let prefixed_id = self.prefixed_key(id);
333
334        retry("redis_delete", &RetryConfig::query(), || {
335            let mut conn = conn.clone();
336            let key = prefixed_id.clone();
337            async move {
338                let _: () = conn.del(&key).await?;
339                Ok(())
340            }
341        })
342        .await
343        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
344    }
345
346    async fn exists(&self, id: &str) -> Result<bool, StorageError> {
347        let conn = self.connection.clone();
348        let prefixed_id = self.prefixed_key(id);
349
350        retry("redis_exists", &RetryConfig::query(), || {
351            let mut conn = conn.clone();
352            let key = prefixed_id.clone();
353            async move {
354                let exists: bool = conn.exists(&key).await?;
355                Ok(exists)
356            }
357        })
358        .await
359        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
360    }
361
362    /// Write a batch of items using Redis pipeline (atomic, much faster than individual SETs).
363    async fn put_batch(&self, items: &[SyncItem]) -> Result<BatchWriteResult, StorageError> {
364        self.put_batch_with_ttl(items, None).await
365    }
366    
367    /// Write a batch of items with optional TTL.
368    async fn put_batch_with_ttl(&self, items: &[SyncItem], ttl_secs: Option<u64>) -> Result<BatchWriteResult, StorageError> {
369        self.put_batch_impl(items, ttl_secs).await
370    }
371
372    /// Create a RediSearch index (FT.CREATE).
373    async fn ft_create(&self, args: &[String]) -> Result<(), StorageError> {
374        let conn = self.connection.clone();
375
376        retry("redis_ft_create", &RetryConfig::query(), || {
377            let mut conn = conn.clone();
378            let args = args.to_vec();
379            async move {
380                let mut cmd = cmd("FT.CREATE");
381                for arg in &args {
382                    cmd.arg(arg);
383                }
384                let _: () = cmd.query_async(&mut conn).await?;
385                Ok(())
386            }
387        })
388        .await
389        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
390    }
391
392    /// Drop a RediSearch index (FT.DROPINDEX).
393    async fn ft_dropindex(&self, index: &str) -> Result<(), StorageError> {
394        let conn = self.connection.clone();
395        let index = index.to_string();
396
397        retry("redis_ft_dropindex", &RetryConfig::query(), || {
398            let mut conn = conn.clone();
399            let index = index.clone();
400            async move {
401                let _: () = cmd("FT.DROPINDEX")
402                    .arg(&index)
403                    .query_async(&mut conn)
404                    .await?;
405                Ok(())
406            }
407        })
408        .await
409        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
410    }
411
412    /// Search using RediSearch (FT.SEARCH).
413    async fn ft_search(&self, index: &str, query: &str, limit: usize) -> Result<Vec<String>, StorageError> {
414        let conn = self.connection.clone();
415        let index = index.to_string();
416        let query = query.to_string();
417
418        retry("redis_ft_search", &RetryConfig::query(), || {
419            let mut conn = conn.clone();
420            let index = index.clone();
421            let query = query.clone();
422            async move {
423                // FT.SEARCH index query LIMIT 0 limit NOCONTENT
424                // NOCONTENT returns only keys, not document content
425                let result: redis::Value = cmd("FT.SEARCH")
426                    .arg(&index)
427                    .arg(&query)
428                    .arg("LIMIT")
429                    .arg(0)
430                    .arg(limit)
431                    .arg("NOCONTENT")
432                    .query_async(&mut conn)
433                    .await?;
434
435                // Parse FT.SEARCH response: [count, key1, key2, ...]
436                Self::parse_ft_search_response(result)
437            }
438        })
439        .await
440        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
441    }
442}
443
444impl RedisStore {
445    /// Pipelined batch write implementation with content-type aware storage.
446    /// Uses JSON.SET for JSON content, SET for binary blobs.
447    /// Also adds items to state SETs for fast state-based queries.
448    async fn put_batch_impl(&self, items: &[SyncItem], ttl_secs: Option<u64>) -> Result<BatchWriteResult, StorageError> {
449        if items.is_empty() {
450            return Ok(BatchWriteResult {
451                batch_id: String::new(),
452                written: 0,
453                verified: true,
454            });
455        }
456
457        // Prepare items: JSON → JSON.SET document, Binary → serialized bytes
458        #[derive(Clone)]
459        enum PreparedItem {
460            Json { key: String, id: String, state: String, doc: String },
461            Blob { key: String, id: String, state: String, data: Vec<u8> },
462        }
463        
464        let prepared: Result<Vec<_>, _> = items.iter()
465            .map(|item| {
466                let prefixed_key = self.prefixed_key(&item.object_id);
467                let id = item.object_id.clone();
468                let state = item.state.clone();
469                match item.content_type {
470                    ContentType::Json => {
471                        Self::build_json_document(item)
472                            .map(|doc| PreparedItem::Json { key: prefixed_key, id, state, doc })
473                    }
474                    ContentType::Binary => {
475                        serde_json::to_vec(item)
476                            .map(|bytes| PreparedItem::Blob { key: prefixed_key, id, state, data: bytes })
477                            .map_err(|e| StorageError::Backend(e.to_string()))
478                    }
479                }
480            })
481            .collect();
482        let prepared = prepared?;
483        let count = prepared.len();
484
485        let conn = self.connection.clone();
486        let prefix = self.prefix.clone();
487        
488        retry("redis_put_batch", &RetryConfig::query(), || {
489            let mut conn = conn.clone();
490            let prepared = prepared.clone();
491            let prefix = prefix.clone();
492            async move {
493                let mut pipeline = pipe();
494                
495                for item in &prepared {
496                    match item {
497                        PreparedItem::Json { key, id, state, doc } => {
498                            // JSON.SET key $ <json>
499                            pipeline.cmd("JSON.SET").arg(key).arg("$").arg(doc);
500                            if let Some(ttl) = ttl_secs {
501                                pipeline.expire(key, ttl as i64);
502                            }
503                            // Add to state SET: sync:state:{state}
504                            let state_key = format!("{}state:{}", prefix, state);
505                            pipeline.cmd("SADD").arg(&state_key).arg(id);
506                        }
507                        PreparedItem::Blob { key, id, state, data } => {
508                            // SET for binary content
509                            if let Some(ttl) = ttl_secs {
510                                pipeline.cmd("SETEX").arg(key).arg(ttl as i64).arg(data.as_slice());
511                            } else {
512                                pipeline.set(key, data.as_slice());
513                            }
514                            // Add to state SET
515                            let state_key = format!("{}state:{}", prefix, state);
516                            pipeline.cmd("SADD").arg(&state_key).arg(id);
517                        }
518                    }
519                }
520                
521                pipeline.query_async::<()>(&mut conn).await?;
522                Ok(())
523            }
524        })
525        .await
526        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
527
528        Ok(BatchWriteResult {
529            batch_id: String::new(),
530            written: count,
531            verified: true,
532        })
533    }
534
535    /// Check if multiple keys exist in Redis (pipelined).
536    /// Returns a vec of bools matching the input order.
537    pub async fn exists_batch(&self, ids: &[String]) -> Result<Vec<bool>, StorageError> {
538        if ids.is_empty() {
539            return Ok(vec![]);
540        }
541
542        let conn = self.connection.clone();
543        // Apply prefix to all keys
544        let prefixed_ids: Vec<String> = ids.iter().map(|id| self.prefixed_key(id)).collect();
545
546        retry("redis_exists_batch", &RetryConfig::query(), || {
547            let mut conn = conn.clone();
548            let prefixed_ids = prefixed_ids.clone();
549            async move {
550                let mut pipeline = pipe();
551                for key in &prefixed_ids {
552                    pipeline.exists(key);
553                }
554                
555                let results: Vec<bool> = pipeline.query_async(&mut conn).await?;
556                Ok(results)
557            }
558        })
559        .await
560        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
561    }
562    
563    // ═══════════════════════════════════════════════════════════════════════════
564    // State SET operations: O(1) membership, fast iteration by state
565    // ═══════════════════════════════════════════════════════════════════════════
566    
567    /// Get all IDs in a given state (from Redis SET).
568    ///
569    /// Returns IDs without prefix - ready to use with `get()`.
570    pub async fn list_state_ids(&self, state: &str) -> Result<Vec<String>, StorageError> {
571        let mut conn = self.connection.clone();
572        let state_key = format!("{}state:{}", self.prefix, state);
573        
574        let ids: Vec<String> = cmd("SMEMBERS")
575            .arg(&state_key)
576            .query_async(&mut conn)
577            .await
578            .map_err(|e| StorageError::Backend(format!("Failed to get state members: {}", e)))?;
579        
580        Ok(ids)
581    }
582    
583    /// Count items in a given state (SET cardinality).
584    pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
585        let mut conn = self.connection.clone();
586        let state_key = format!("{}state:{}", self.prefix, state);
587        
588        let count: u64 = cmd("SCARD")
589            .arg(&state_key)
590            .query_async(&mut conn)
591            .await
592            .map_err(|e| StorageError::Backend(format!("Failed to count state: {}", e)))?;
593        
594        Ok(count)
595    }
596    
597    /// Check if an ID is in a given state (SET membership).
598    pub async fn is_in_state(&self, id: &str, state: &str) -> Result<bool, StorageError> {
599        let mut conn = self.connection.clone();
600        let state_key = format!("{}state:{}", self.prefix, state);
601        
602        let is_member: bool = cmd("SISMEMBER")
603            .arg(&state_key)
604            .arg(id)
605            .query_async(&mut conn)
606            .await
607            .map_err(|e| StorageError::Backend(format!("Failed to check state membership: {}", e)))?;
608        
609        Ok(is_member)
610    }
611    
612    /// Move an ID from one state to another (atomic SMOVE).
613    ///
614    /// Returns true if the item was moved, false if it wasn't in the source state.
615    pub async fn move_state(&self, id: &str, from_state: &str, to_state: &str) -> Result<bool, StorageError> {
616        let mut conn = self.connection.clone();
617        let from_key = format!("{}state:{}", self.prefix, from_state);
618        let to_key = format!("{}state:{}", self.prefix, to_state);
619        
620        let moved: bool = cmd("SMOVE")
621            .arg(&from_key)
622            .arg(&to_key)
623            .arg(id)
624            .query_async(&mut conn)
625            .await
626            .map_err(|e| StorageError::Backend(format!("Failed to move state: {}", e)))?;
627        
628        Ok(moved)
629    }
630    
631    /// Remove an ID from a state SET.
632    pub async fn remove_from_state(&self, id: &str, state: &str) -> Result<bool, StorageError> {
633        let mut conn = self.connection.clone();
634        let state_key = format!("{}state:{}", self.prefix, state);
635        
636        let removed: u32 = cmd("SREM")
637            .arg(&state_key)
638            .arg(id)
639            .query_async(&mut conn)
640            .await
641            .map_err(|e| StorageError::Backend(format!("Failed to remove from state: {}", e)))?;
642        
643        Ok(removed > 0)
644    }
645    
646    /// Delete all items in a state (both the SET and the actual keys).
647    ///
648    /// Returns the number of items deleted.
649    pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
650        let mut conn = self.connection.clone();
651        let state_key = format!("{}state:{}", self.prefix, state);
652        
653        // Get all IDs in this state
654        let ids: Vec<String> = cmd("SMEMBERS")
655            .arg(&state_key)
656            .query_async(&mut conn)
657            .await
658            .map_err(|e| StorageError::Backend(format!("Failed to get state members: {}", e)))?;
659        
660        if ids.is_empty() {
661            return Ok(0);
662        }
663        
664        let count = ids.len() as u64;
665        
666        // Delete all the keys and the state SET
667        let mut pipeline = pipe();
668        for id in &ids {
669            let key = self.prefixed_key(id);
670            pipeline.del(&key);
671        }
672        pipeline.del(&state_key);
673        
674        pipeline.query_async::<()>(&mut conn)
675            .await
676            .map_err(|e| StorageError::Backend(format!("Failed to delete state items: {}", e)))?;
677        
678        Ok(count)
679    }
680    
681    /// Scan items by ID prefix using Redis SCAN.
682    ///
683    /// Uses cursor-based SCAN with MATCH pattern for safe iteration.
684    /// Does NOT block the server (unlike KEYS).
685    ///
686    /// # Example
687    /// ```rust,ignore
688    /// // Get all deltas for object user.123
689    /// let deltas = store.scan_prefix("delta:user.123:", 1000).await?;
690    /// ```
691    pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
692        let mut conn = self.connection.clone();
693        
694        // Build match pattern: "{store_prefix}{user_prefix}*"
695        let pattern = format!("{}{}*", self.prefix, prefix);
696        
697        let mut items = Vec::new();
698        let mut cursor: u64 = 0;
699        
700        // SCAN iteration (cursor-based, non-blocking)
701        loop {
702            // SCAN cursor MATCH pattern COUNT batch_size
703            let (new_cursor, keys): (u64, Vec<String>) = cmd("SCAN")
704                .arg(cursor)
705                .arg("MATCH")
706                .arg(&pattern)
707                .arg("COUNT")
708                .arg(100) // Batch size per iteration
709                .query_async(&mut conn)
710                .await
711                .map_err(|e| StorageError::Backend(format!("SCAN failed: {}", e)))?;
712            
713            // Fetch each key using JSON.GET
714            for key in keys {
715                if items.len() >= limit {
716                    break;
717                }
718                
719                let json_opt: Option<String> = cmd("JSON.GET")
720                    .arg(&key)
721                    .query_async(&mut conn)
722                    .await
723                    .map_err(|e| StorageError::Backend(format!("JSON.GET failed: {}", e)))?;
724                
725                if let Some(json_str) = json_opt {
726                    // Strip prefix to get clean ID
727                    let id = self.strip_prefix(&key);
728                    if let Ok(item) = Self::parse_json_document(id, &json_str) {
729                        items.push(item);
730                    }
731                }
732            }
733            
734            cursor = new_cursor;
735            
736            // Stop if cursor is 0 (complete) or we have enough items
737            if cursor == 0 || items.len() >= limit {
738                break;
739            }
740        }
741        
742        Ok(items)
743    }
744    
745    /// Count items matching an ID prefix.
746    ///
747    /// Note: This requires scanning all matching keys, so use sparingly.
748    pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
749        let mut conn = self.connection.clone();
750        let pattern = format!("{}{}*", self.prefix, prefix);
751        
752        let mut count: u64 = 0;
753        let mut cursor: u64 = 0;
754        
755        loop {
756            let (new_cursor, keys): (u64, Vec<String>) = cmd("SCAN")
757                .arg(cursor)
758                .arg("MATCH")
759                .arg(&pattern)
760                .arg("COUNT")
761                .arg(1000)
762                .query_async(&mut conn)
763                .await
764                .map_err(|e| StorageError::Backend(format!("SCAN failed: {}", e)))?;
765            
766            count += keys.len() as u64;
767            cursor = new_cursor;
768            
769            if cursor == 0 {
770                break;
771            }
772        }
773        
774        Ok(count)
775    }
776    
777    /// Delete all items matching an ID prefix.
778    ///
779    /// Returns the number of deleted items.
780    pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
781        let mut conn = self.connection.clone();
782        let pattern = format!("{}{}*", self.prefix, prefix);
783        
784        let mut deleted: u64 = 0;
785        let mut cursor: u64 = 0;
786        
787        loop {
788            let (new_cursor, keys): (u64, Vec<String>) = cmd("SCAN")
789                .arg(cursor)
790                .arg("MATCH")
791                .arg(&pattern)
792                .arg("COUNT")
793                .arg(1000)
794                .query_async(&mut conn)
795                .await
796                .map_err(|e| StorageError::Backend(format!("SCAN failed: {}", e)))?;
797            
798            if !keys.is_empty() {
799                // Batch delete
800                let mut pipeline = pipe();
801                for key in &keys {
802                    pipeline.del(key);
803                }
804                pipeline.query_async::<()>(&mut conn)
805                    .await
806                    .map_err(|e| StorageError::Backend(format!("DEL failed: {}", e)))?;
807                
808                deleted += keys.len() as u64;
809            }
810            
811            cursor = new_cursor;
812            
813            if cursor == 0 {
814                break;
815            }
816        }
817        
818        Ok(deleted)
819    }
820    
821    // ========================================================================
822    // CDC Stream Methods
823    // ========================================================================
824    
825    /// Write a CDC entry to the stream.
826    /// 
827    /// Uses XADD with MAXLEN ~ for bounded stream size.
828    /// The stream key is `{prefix}__local__:cdc`.
829    pub async fn xadd_cdc(
830        &self, 
831        entry: &crate::cdc::CdcEntry, 
832        maxlen: u64
833    ) -> Result<String, StorageError> {
834        let stream_key = crate::cdc::cdc_stream_key(if self.prefix.is_empty() { None } else { Some(&self.prefix) });
835        let fields = entry.to_redis_fields();
836        
837        let mut conn = self.connection.clone();
838        
839        // Build XADD command: XADD key MAXLEN ~ maxlen * field1 value1 field2 value2 ...
840        let mut command = cmd("XADD");
841        command.arg(&stream_key);
842        command.arg("MAXLEN");
843        command.arg("~");
844        command.arg(maxlen);
845        command.arg("*"); // Auto-generate ID
846        
847        for (field, value) in fields {
848            command.arg(field);
849            command.arg(value.as_bytes());
850        }
851        
852        let entry_id: String = command
853            .query_async(&mut conn)
854            .await
855            .map_err(|e| StorageError::Backend(format!("XADD CDC failed: {}", e)))?;
856        
857        Ok(entry_id)
858    }
859    
860    /// Write multiple CDC entries to the stream in a pipeline.
861    /// 
862    /// Returns the stream entry IDs for each write.
863    pub async fn xadd_cdc_batch(
864        &self, 
865        entries: &[crate::cdc::CdcEntry], 
866        maxlen: u64
867    ) -> Result<Vec<String>, StorageError> {
868        if entries.is_empty() {
869            return Ok(vec![]);
870        }
871        
872        let stream_key = crate::cdc::cdc_stream_key(if self.prefix.is_empty() { None } else { Some(&self.prefix) });
873        let mut conn = self.connection.clone();
874        
875        let mut pipeline = pipe();
876        
877        for entry in entries {
878            let fields = entry.to_redis_fields();
879            
880            // Start XADD command in pipeline
881            let mut command = cmd("XADD");
882            command.arg(&stream_key);
883            command.arg("MAXLEN");
884            command.arg("~");
885            command.arg(maxlen);
886            command.arg("*");
887            
888            for (field, value) in fields {
889                command.arg(field);
890                command.arg(value.as_bytes());
891            }
892            
893            pipeline.add_command(command);
894        }
895        
896        let ids: Vec<String> = pipeline
897            .query_async(&mut conn)
898            .await
899            .map_err(|e| StorageError::Backend(format!("XADD CDC batch failed: {}", e)))?;
900        
901        Ok(ids)
902    }
903}