sync_engine/storage/
redis.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Redis storage backend for L2 cache.
5//!
6//! Content-type aware storage using RedisJSON (Redis Stack):
7//! - **JSON content** → `JSON.SET` with full structure preserved → RedisSearch indexable!
8//! - **Binary content** → Redis STRING (SET) for efficient blob storage
9//!
10//! JSON documents are stored with a clean, flat structure:
11//! ```json
12//! {
13//!   "version": 1,
14//!   "timestamp": 1767084657058,
15//!   "payload_hash": "abc123...",
16//!   "payload": {"name": "Alice", "role": "admin"},
17//!   "audit": {"batch": "...", "trace": "...", "home": "..."}
18//! }
19//! ```
20//!
21//! This enables powerful search with RediSearch ON JSON:
22//! ```text
23//! FT.CREATE idx ON JSON PREFIX 1 sync: SCHEMA $.payload.name AS name TEXT
24//! FT.SEARCH idx '@name:Alice'
25//! ```
26
27use async_trait::async_trait;
28use redis::aio::ConnectionManager;
29use redis::{Client, AsyncCommands, pipe, cmd};
30use crate::sync_item::{SyncItem, ContentType};
31use super::traits::{BatchWriteResult, CacheStore, StorageError};
32use crate::resilience::retry::{retry, RetryConfig};
33
34pub struct RedisStore {
35    connection: ConnectionManager,
36    /// Optional key prefix for namespacing (e.g., "myapp:" → "myapp:user.alice")
37    prefix: String,
38}
39
40impl RedisStore {
41    /// Create a new Redis store without a key prefix.
42    pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
43        Self::with_prefix(connection_string, None).await
44    }
45    
46    /// Create a new Redis store with an optional key prefix.
47    /// 
48    /// The prefix is prepended to all keys, enabling namespacing when
49    /// sharing a Redis instance with other applications.
50    /// 
51    /// # Example
52    /// 
53    /// ```rust,no_run
54    /// # use sync_engine::storage::redis::RedisStore;
55    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
56    /// // Keys will be prefixed: "myapp:user.alice", "myapp:config.app"
57    /// let store = RedisStore::with_prefix("redis://localhost", Some("myapp:")).await?;
58    /// # Ok(())
59    /// # }
60    /// ```
61    pub async fn with_prefix(connection_string: &str, prefix: Option<&str>) -> Result<Self, StorageError> {
62        let client = Client::open(connection_string)
63            .map_err(|e| StorageError::Backend(e.to_string()))?;
64
65        // Use startup config: fast-fail after ~30s, don't hang forever
66        let connection = retry("redis_connect", &RetryConfig::startup(), || async {
67            ConnectionManager::new(client.clone()).await
68        })
69        .await
70        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
71
72        Ok(Self { 
73            connection,
74            prefix: prefix.unwrap_or("").to_string(),
75        })
76    }
77    
78    /// Apply the prefix to a key.
79    #[inline]
80    fn prefixed_key(&self, key: &str) -> String {
81        if self.prefix.is_empty() {
82            key.to_string()
83        } else {
84            format!("{}{}", self.prefix, key)
85        }
86    }
87    
88    /// Strip the prefix from a key (for returning clean IDs).
89    /// Will be used when implementing key iteration/scanning.
90    #[inline]
91    #[allow(dead_code)]
92    fn strip_prefix<'a>(&self, key: &'a str) -> &'a str {
93        if self.prefix.is_empty() {
94            key
95        } else {
96            key.strip_prefix(&self.prefix).unwrap_or(key)
97        }
98    }
99
100    /// Get a clone of the connection manager (for sharing with MerkleStore)
101    pub fn connection(&self) -> ConnectionManager {
102        self.connection.clone()
103    }
104    
105    /// Get the configured prefix
106    pub fn prefix(&self) -> &str {
107        &self.prefix
108    }
109    
110    /// Build the JSON document for RedisJSON storage.
111    /// 
112    /// Structure (flat with nested audit):
113    /// ```json
114    /// {
115    ///   "version": 1,
116    ///   "timestamp": 1767084657058,
117    ///   "payload_hash": "abc123...",
118    ///   "state": "default",
119    ///   "access_count": 5,
120    ///   "last_accessed": 1767084660000,
121    ///   "payload": {"name": "Alice", ...},
122    ///   "audit": {"batch": "...", "trace": "...", "home": "..."}
123    /// }
124    /// ```
125    fn build_json_document(item: &SyncItem) -> Result<String, StorageError> {
126        // Parse user content as JSON
127        let payload: serde_json::Value = serde_json::from_slice(&item.content)
128            .map_err(|e| StorageError::Backend(format!("Invalid JSON content: {}", e)))?;
129        
130        // Build audit object (internal operational metadata)
131        let mut audit = serde_json::Map::new();
132        if let Some(ref batch_id) = item.batch_id {
133            audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
134        }
135        if let Some(ref trace_parent) = item.trace_parent {
136            audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
137        }
138        if let Some(ref home) = item.home_instance_id {
139            audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
140        }
141        
142        // Build final document (flat structure)
143        let mut doc = serde_json::json!({
144            "version": item.version,
145            "timestamp": item.updated_at,
146            "state": item.state,
147            "access_count": item.access_count,
148            "last_accessed": item.last_accessed,
149            "payload": payload
150        });
151        
152        // Only include payload_hash if non-empty
153        if !item.content_hash.is_empty() {
154            doc["payload_hash"] = serde_json::Value::String(item.content_hash.clone());
155        }
156        
157        // Only include audit if there's something in it
158        if !audit.is_empty() {
159            doc["audit"] = serde_json::Value::Object(audit);
160        }
161        
162        serde_json::to_string(&doc)
163            .map_err(|e| StorageError::Backend(e.to_string()))
164    }
165    
166    /// Parse a RedisJSON document back into a SyncItem.
167    fn parse_json_document(id: &str, json_str: &str) -> Result<SyncItem, StorageError> {
168        let doc: serde_json::Value = serde_json::from_str(json_str)
169            .map_err(|e| StorageError::Backend(format!("Invalid JSON document: {}", e)))?;
170        
171        // Top-level fields
172        let version = doc.get("version").and_then(|v| v.as_u64()).unwrap_or(1);
173        let updated_at = doc.get("timestamp").and_then(|v| v.as_i64()).unwrap_or(0);
174        let content_hash = doc.get("payload_hash").and_then(|v| v.as_str()).unwrap_or("").to_string();
175        let state = doc.get("state").and_then(|v| v.as_str()).unwrap_or("default").to_string();
176        
177        // Access metadata (local-only, not replicated)
178        let access_count = doc.get("access_count").and_then(|v| v.as_u64()).unwrap_or(0);
179        let last_accessed = doc.get("last_accessed").and_then(|v| v.as_u64()).unwrap_or(0);
180        
181        // Audit fields (nested)
182        let audit = doc.get("audit");
183        let batch_id = audit.and_then(|a| a.get("batch")).and_then(|v| v.as_str()).map(String::from);
184        let trace_parent = audit.and_then(|a| a.get("trace")).and_then(|v| v.as_str()).map(String::from);
185        let home_instance_id = audit.and_then(|a| a.get("home")).and_then(|v| v.as_str()).map(String::from);
186        
187        // Extract payload and serialize back to bytes
188        let payload = doc.get("payload").cloned().unwrap_or(serde_json::Value::Null);
189        let content = serde_json::to_vec(&payload)
190            .map_err(|e| StorageError::Backend(e.to_string()))?;
191        
192        Ok(SyncItem::reconstruct(
193            id.to_string(),
194            version,
195            updated_at,
196            ContentType::Json,
197            content,
198            batch_id,
199            trace_parent,
200            content_hash,
201            home_instance_id,
202            state,
203            access_count,
204            last_accessed,
205        ))
206    }
207
208    /// Parse FT.SEARCH NOCONTENT response into a list of keys.
209    /// Response format: [count, key1, key2, ...]
210    fn parse_ft_search_response(value: redis::Value) -> Result<Vec<String>, redis::RedisError> {
211        match value {
212            redis::Value::Array(arr) => {
213                if arr.is_empty() {
214                    return Ok(vec![]);
215                }
216                // First element is count, rest are keys
217                let keys: Vec<String> = arr.into_iter()
218                    .skip(1) // Skip count
219                    .filter_map(|v| match v {
220                        redis::Value::BulkString(bytes) => String::from_utf8(bytes).ok(),
221                        redis::Value::SimpleString(s) => Some(s),
222                        _ => None,
223                    })
224                    .collect();
225                Ok(keys)
226            }
227            _ => Ok(vec![]),
228        }
229    }
230}
231
232#[async_trait]
233impl CacheStore for RedisStore {
234    async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
235        let conn = self.connection.clone();
236        let prefixed_id = self.prefixed_key(id);
237        let original_id = id.to_string();
238        
239        // Check the type of the key to determine how to read it
240        let key_type: Option<String> = retry("redis_type", &RetryConfig::query(), || {
241            let mut conn = conn.clone();
242            let key = prefixed_id.clone();
243            async move {
244                let t: String = redis::cmd("TYPE").arg(&key).query_async(&mut conn).await?;
245                Ok(if t == "none" { None } else { Some(t) })
246            }
247        })
248        .await
249        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
250        
251        match key_type.as_deref() {
252            None => Ok(None), // Key doesn't exist
253            Some("ReJSON-RL") => {
254                // RedisJSON document - use JSON.GET
255                let json_str: Option<String> = retry("redis_json_get", &RetryConfig::query(), || {
256                    let mut conn = conn.clone();
257                    let key = prefixed_id.clone();
258                    async move {
259                        let data: Option<String> = cmd("JSON.GET")
260                            .arg(&key)
261                            .query_async(&mut conn)
262                            .await?;
263                        Ok(data)
264                    }
265                })
266                .await
267                .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
268                
269                match json_str {
270                    Some(s) => Self::parse_json_document(&original_id, &s).map(Some),
271                    None => Ok(None),
272                }
273            }
274            Some("string") => {
275                // Binary content or legacy format - read as bytes
276                let data: Option<Vec<u8>> = retry("redis_get", &RetryConfig::query(), || {
277                    let mut conn = conn.clone();
278                    let key = prefixed_id.clone();
279                    async move {
280                        let data: Option<Vec<u8>> = conn.get(&key).await?;
281                        Ok(data)
282                    }
283                })
284                .await
285                .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
286                
287                data.map(|bytes| serde_json::from_slice(&bytes).map_err(|e| StorageError::Backend(e.to_string())))
288                    .transpose()
289            }
290            Some(other) => {
291                Err(StorageError::Backend(format!("Unexpected Redis key type: {}", other)))
292            }
293        }
294    }
295
296    async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
297        let conn = self.connection.clone();
298        let prefixed_id = self.prefixed_key(&item.object_id);
299        
300        match item.content_type {
301            ContentType::Json => {
302                // Build JSON document with metadata wrapper
303                let json_doc = Self::build_json_document(item)?;
304                
305                retry("redis_json_set", &RetryConfig::query(), || {
306                    let mut conn = conn.clone();
307                    let key = prefixed_id.clone();
308                    let doc = json_doc.clone();
309                    async move {
310                        // JSON.SET key $ <json>
311                        let _: () = cmd("JSON.SET")
312                            .arg(&key)
313                            .arg("$")
314                            .arg(&doc)
315                            .query_async(&mut conn)
316                            .await?;
317                        Ok(())
318                    }
319                })
320                .await
321                .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
322            }
323            ContentType::Binary => {
324                // Store as serialized blob (binary content)
325                let data = serde_json::to_vec(item)
326                    .map_err(|e| StorageError::Backend(e.to_string()))?;
327
328                retry("redis_set", &RetryConfig::query(), || {
329                    let mut conn = conn.clone();
330                    let key = prefixed_id.clone();
331                    let data = data.clone();
332                    async move {
333                        let _: () = conn.set(&key, &data).await?;
334                        Ok(())
335                    }
336                })
337                .await
338                .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
339            }
340        }
341    }
342
343    async fn delete(&self, id: &str) -> Result<(), StorageError> {
344        let conn = self.connection.clone();
345        let prefixed_id = self.prefixed_key(id);
346
347        retry("redis_delete", &RetryConfig::query(), || {
348            let mut conn = conn.clone();
349            let key = prefixed_id.clone();
350            async move {
351                let _: () = conn.del(&key).await?;
352                Ok(())
353            }
354        })
355        .await
356        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
357    }
358
359    async fn exists(&self, id: &str) -> Result<bool, StorageError> {
360        let conn = self.connection.clone();
361        let prefixed_id = self.prefixed_key(id);
362
363        retry("redis_exists", &RetryConfig::query(), || {
364            let mut conn = conn.clone();
365            let key = prefixed_id.clone();
366            async move {
367                let exists: bool = conn.exists(&key).await?;
368                Ok(exists)
369            }
370        })
371        .await
372        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
373    }
374
375    /// Write a batch of items using Redis pipeline (atomic, much faster than individual SETs).
376    async fn put_batch(&self, items: &[SyncItem]) -> Result<BatchWriteResult, StorageError> {
377        self.put_batch_with_ttl(items, None).await
378    }
379    
380    /// Write a batch of items with optional TTL.
381    async fn put_batch_with_ttl(&self, items: &[SyncItem], ttl_secs: Option<u64>) -> Result<BatchWriteResult, StorageError> {
382        self.put_batch_impl(items, ttl_secs).await
383    }
384
385    /// Create a RediSearch index (FT.CREATE).
386    async fn ft_create(&self, args: &[String]) -> Result<(), StorageError> {
387        let conn = self.connection.clone();
388
389        retry("redis_ft_create", &RetryConfig::query(), || {
390            let mut conn = conn.clone();
391            let args = args.to_vec();
392            async move {
393                let mut cmd = cmd("FT.CREATE");
394                for arg in &args {
395                    cmd.arg(arg);
396                }
397                let _: () = cmd.query_async(&mut conn).await?;
398                Ok(())
399            }
400        })
401        .await
402        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
403    }
404
405    /// Drop a RediSearch index (FT.DROPINDEX).
406    async fn ft_dropindex(&self, index: &str) -> Result<(), StorageError> {
407        let conn = self.connection.clone();
408        let index = index.to_string();
409
410        retry("redis_ft_dropindex", &RetryConfig::query(), || {
411            let mut conn = conn.clone();
412            let index = index.clone();
413            async move {
414                let _: () = cmd("FT.DROPINDEX")
415                    .arg(&index)
416                    .query_async(&mut conn)
417                    .await?;
418                Ok(())
419            }
420        })
421        .await
422        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
423    }
424
425    /// Search using RediSearch (FT.SEARCH).
426    async fn ft_search(&self, index: &str, query: &str, limit: usize) -> Result<Vec<String>, StorageError> {
427        let conn = self.connection.clone();
428        let index = index.to_string();
429        let query = query.to_string();
430
431        retry("redis_ft_search", &RetryConfig::query(), || {
432            let mut conn = conn.clone();
433            let index = index.clone();
434            let query = query.clone();
435            async move {
436                // FT.SEARCH index query LIMIT 0 limit NOCONTENT
437                // NOCONTENT returns only keys, not document content
438                let result: redis::Value = cmd("FT.SEARCH")
439                    .arg(&index)
440                    .arg(&query)
441                    .arg("LIMIT")
442                    .arg(0)
443                    .arg(limit)
444                    .arg("NOCONTENT")
445                    .query_async(&mut conn)
446                    .await?;
447
448                // Parse FT.SEARCH response: [count, key1, key2, ...]
449                Self::parse_ft_search_response(result)
450            }
451        })
452        .await
453        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
454    }
455}
456
457impl RedisStore {
458    /// Pipelined batch write implementation with content-type aware storage.
459    /// Uses JSON.SET for JSON content, SET for binary blobs.
460    /// Also adds items to state SETs for fast state-based queries.
461    async fn put_batch_impl(&self, items: &[SyncItem], ttl_secs: Option<u64>) -> Result<BatchWriteResult, StorageError> {
462        if items.is_empty() {
463            return Ok(BatchWriteResult {
464                batch_id: String::new(),
465                written: 0,
466                verified: true,
467            });
468        }
469
470        // Prepare items: JSON → JSON.SET document, Binary → serialized bytes
471        #[derive(Clone)]
472        enum PreparedItem {
473            Json { key: String, id: String, state: String, doc: String },
474            Blob { key: String, id: String, state: String, data: Vec<u8> },
475        }
476        
477        let prepared: Result<Vec<_>, _> = items.iter()
478            .map(|item| {
479                let prefixed_key = self.prefixed_key(&item.object_id);
480                let id = item.object_id.clone();
481                let state = item.state.clone();
482                match item.content_type {
483                    ContentType::Json => {
484                        Self::build_json_document(item)
485                            .map(|doc| PreparedItem::Json { key: prefixed_key, id, state, doc })
486                    }
487                    ContentType::Binary => {
488                        serde_json::to_vec(item)
489                            .map(|bytes| PreparedItem::Blob { key: prefixed_key, id, state, data: bytes })
490                            .map_err(|e| StorageError::Backend(e.to_string()))
491                    }
492                }
493            })
494            .collect();
495        let prepared = prepared?;
496        let count = prepared.len();
497
498        let conn = self.connection.clone();
499        let prefix = self.prefix.clone();
500        
501        retry("redis_put_batch", &RetryConfig::query(), || {
502            let mut conn = conn.clone();
503            let prepared = prepared.clone();
504            let prefix = prefix.clone();
505            async move {
506                let mut pipeline = pipe();
507                
508                for item in &prepared {
509                    match item {
510                        PreparedItem::Json { key, id, state, doc } => {
511                            // JSON.SET key $ <json>
512                            pipeline.cmd("JSON.SET").arg(key).arg("$").arg(doc);
513                            if let Some(ttl) = ttl_secs {
514                                pipeline.expire(key, ttl as i64);
515                            }
516                            // Add to state SET: sync:state:{state}
517                            let state_key = format!("{}state:{}", prefix, state);
518                            pipeline.cmd("SADD").arg(&state_key).arg(id);
519                        }
520                        PreparedItem::Blob { key, id, state, data } => {
521                            // SET for binary content
522                            if let Some(ttl) = ttl_secs {
523                                pipeline.cmd("SETEX").arg(key).arg(ttl as i64).arg(data.as_slice());
524                            } else {
525                                pipeline.set(key, data.as_slice());
526                            }
527                            // Add to state SET
528                            let state_key = format!("{}state:{}", prefix, state);
529                            pipeline.cmd("SADD").arg(&state_key).arg(id);
530                        }
531                    }
532                }
533                
534                pipeline.query_async::<()>(&mut conn).await?;
535                Ok(())
536            }
537        })
538        .await
539        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
540
541        Ok(BatchWriteResult {
542            batch_id: String::new(),
543            written: count,
544            verified: true,
545        })
546    }
547
548    /// Check if multiple keys exist in Redis (pipelined).
549    /// Returns a vec of bools matching the input order.
550    pub async fn exists_batch(&self, ids: &[String]) -> Result<Vec<bool>, StorageError> {
551        if ids.is_empty() {
552            return Ok(vec![]);
553        }
554
555        let conn = self.connection.clone();
556        // Apply prefix to all keys
557        let prefixed_ids: Vec<String> = ids.iter().map(|id| self.prefixed_key(id)).collect();
558
559        retry("redis_exists_batch", &RetryConfig::query(), || {
560            let mut conn = conn.clone();
561            let prefixed_ids = prefixed_ids.clone();
562            async move {
563                let mut pipeline = pipe();
564                for key in &prefixed_ids {
565                    pipeline.exists(key);
566                }
567                
568                let results: Vec<bool> = pipeline.query_async(&mut conn).await?;
569                Ok(results)
570            }
571        })
572        .await
573        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
574    }
575    
576    // ═══════════════════════════════════════════════════════════════════════════
577    // State SET operations: O(1) membership, fast iteration by state
578    // ═══════════════════════════════════════════════════════════════════════════
579    
580    /// Get all IDs in a given state (from Redis SET).
581    ///
582    /// Returns IDs without prefix - ready to use with `get()`.
583    pub async fn list_state_ids(&self, state: &str) -> Result<Vec<String>, StorageError> {
584        let mut conn = self.connection.clone();
585        let state_key = format!("{}state:{}", self.prefix, state);
586        
587        let ids: Vec<String> = cmd("SMEMBERS")
588            .arg(&state_key)
589            .query_async(&mut conn)
590            .await
591            .map_err(|e| StorageError::Backend(format!("Failed to get state members: {}", e)))?;
592        
593        Ok(ids)
594    }
595    
596    /// Count items in a given state (SET cardinality).
597    pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
598        let mut conn = self.connection.clone();
599        let state_key = format!("{}state:{}", self.prefix, state);
600        
601        let count: u64 = cmd("SCARD")
602            .arg(&state_key)
603            .query_async(&mut conn)
604            .await
605            .map_err(|e| StorageError::Backend(format!("Failed to count state: {}", e)))?;
606        
607        Ok(count)
608    }
609    
610    /// Check if an ID is in a given state (SET membership).
611    pub async fn is_in_state(&self, id: &str, state: &str) -> Result<bool, StorageError> {
612        let mut conn = self.connection.clone();
613        let state_key = format!("{}state:{}", self.prefix, state);
614        
615        let is_member: bool = cmd("SISMEMBER")
616            .arg(&state_key)
617            .arg(id)
618            .query_async(&mut conn)
619            .await
620            .map_err(|e| StorageError::Backend(format!("Failed to check state membership: {}", e)))?;
621        
622        Ok(is_member)
623    }
624    
625    /// Move an ID from one state to another (atomic SMOVE).
626    ///
627    /// Returns true if the item was moved, false if it wasn't in the source state.
628    pub async fn move_state(&self, id: &str, from_state: &str, to_state: &str) -> Result<bool, StorageError> {
629        let mut conn = self.connection.clone();
630        let from_key = format!("{}state:{}", self.prefix, from_state);
631        let to_key = format!("{}state:{}", self.prefix, to_state);
632        
633        let moved: bool = cmd("SMOVE")
634            .arg(&from_key)
635            .arg(&to_key)
636            .arg(id)
637            .query_async(&mut conn)
638            .await
639            .map_err(|e| StorageError::Backend(format!("Failed to move state: {}", e)))?;
640        
641        Ok(moved)
642    }
643    
644    /// Remove an ID from a state SET.
645    pub async fn remove_from_state(&self, id: &str, state: &str) -> Result<bool, StorageError> {
646        let mut conn = self.connection.clone();
647        let state_key = format!("{}state:{}", self.prefix, state);
648        
649        let removed: u32 = cmd("SREM")
650            .arg(&state_key)
651            .arg(id)
652            .query_async(&mut conn)
653            .await
654            .map_err(|e| StorageError::Backend(format!("Failed to remove from state: {}", e)))?;
655        
656        Ok(removed > 0)
657    }
658    
659    /// Delete all items in a state (both the SET and the actual keys).
660    ///
661    /// Returns the number of items deleted.
662    pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
663        let mut conn = self.connection.clone();
664        let state_key = format!("{}state:{}", self.prefix, state);
665        
666        // Get all IDs in this state
667        let ids: Vec<String> = cmd("SMEMBERS")
668            .arg(&state_key)
669            .query_async(&mut conn)
670            .await
671            .map_err(|e| StorageError::Backend(format!("Failed to get state members: {}", e)))?;
672        
673        if ids.is_empty() {
674            return Ok(0);
675        }
676        
677        let count = ids.len() as u64;
678        
679        // Delete all the keys and the state SET
680        let mut pipeline = pipe();
681        for id in &ids {
682            let key = self.prefixed_key(id);
683            pipeline.del(&key);
684        }
685        pipeline.del(&state_key);
686        
687        pipeline.query_async::<()>(&mut conn)
688            .await
689            .map_err(|e| StorageError::Backend(format!("Failed to delete state items: {}", e)))?;
690        
691        Ok(count)
692    }
693    
694    /// Scan items by ID prefix using Redis SCAN.
695    ///
696    /// Uses cursor-based SCAN with MATCH pattern for safe iteration.
697    /// Does NOT block the server (unlike KEYS).
698    ///
699    /// # Example
700    /// ```rust,ignore
701    /// // Get all deltas for object user.123
702    /// let deltas = store.scan_prefix("delta:user.123:", 1000).await?;
703    /// ```
704    pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
705        let mut conn = self.connection.clone();
706        
707        // Build match pattern: "{store_prefix}{user_prefix}*"
708        let pattern = format!("{}{}*", self.prefix, prefix);
709        
710        let mut items = Vec::new();
711        let mut cursor: u64 = 0;
712        
713        // SCAN iteration (cursor-based, non-blocking)
714        loop {
715            // SCAN cursor MATCH pattern COUNT batch_size
716            let (new_cursor, keys): (u64, Vec<String>) = cmd("SCAN")
717                .arg(cursor)
718                .arg("MATCH")
719                .arg(&pattern)
720                .arg("COUNT")
721                .arg(100) // Batch size per iteration
722                .query_async(&mut conn)
723                .await
724                .map_err(|e| StorageError::Backend(format!("SCAN failed: {}", e)))?;
725            
726            // Fetch each key using JSON.GET
727            for key in keys {
728                if items.len() >= limit {
729                    break;
730                }
731                
732                let json_opt: Option<String> = cmd("JSON.GET")
733                    .arg(&key)
734                    .query_async(&mut conn)
735                    .await
736                    .map_err(|e| StorageError::Backend(format!("JSON.GET failed: {}", e)))?;
737                
738                if let Some(json_str) = json_opt {
739                    // Strip prefix to get clean ID
740                    let id = self.strip_prefix(&key);
741                    if let Ok(item) = Self::parse_json_document(id, &json_str) {
742                        items.push(item);
743                    }
744                }
745            }
746            
747            cursor = new_cursor;
748            
749            // Stop if cursor is 0 (complete) or we have enough items
750            if cursor == 0 || items.len() >= limit {
751                break;
752            }
753        }
754        
755        Ok(items)
756    }
757    
758    /// Count items matching an ID prefix.
759    ///
760    /// Note: This requires scanning all matching keys, so use sparingly.
761    pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
762        let mut conn = self.connection.clone();
763        let pattern = format!("{}{}*", self.prefix, prefix);
764        
765        let mut count: u64 = 0;
766        let mut cursor: u64 = 0;
767        
768        loop {
769            let (new_cursor, keys): (u64, Vec<String>) = cmd("SCAN")
770                .arg(cursor)
771                .arg("MATCH")
772                .arg(&pattern)
773                .arg("COUNT")
774                .arg(1000)
775                .query_async(&mut conn)
776                .await
777                .map_err(|e| StorageError::Backend(format!("SCAN failed: {}", e)))?;
778            
779            count += keys.len() as u64;
780            cursor = new_cursor;
781            
782            if cursor == 0 {
783                break;
784            }
785        }
786        
787        Ok(count)
788    }
789    
790    /// Delete all items matching an ID prefix.
791    ///
792    /// Returns the number of deleted items.
793    pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
794        let mut conn = self.connection.clone();
795        let pattern = format!("{}{}*", self.prefix, prefix);
796        
797        let mut deleted: u64 = 0;
798        let mut cursor: u64 = 0;
799        
800        loop {
801            let (new_cursor, keys): (u64, Vec<String>) = cmd("SCAN")
802                .arg(cursor)
803                .arg("MATCH")
804                .arg(&pattern)
805                .arg("COUNT")
806                .arg(1000)
807                .query_async(&mut conn)
808                .await
809                .map_err(|e| StorageError::Backend(format!("SCAN failed: {}", e)))?;
810            
811            if !keys.is_empty() {
812                // Batch delete
813                let mut pipeline = pipe();
814                for key in &keys {
815                    pipeline.del(key);
816                }
817                pipeline.query_async::<()>(&mut conn)
818                    .await
819                    .map_err(|e| StorageError::Backend(format!("DEL failed: {}", e)))?;
820                
821                deleted += keys.len() as u64;
822            }
823            
824            cursor = new_cursor;
825            
826            if cursor == 0 {
827                break;
828            }
829        }
830        
831        Ok(deleted)
832    }
833    
834    // ========================================================================
835    // CDC Stream Methods
836    // ========================================================================
837    
838    /// Write a CDC entry to the stream.
839    /// 
840    /// Uses XADD with MAXLEN ~ for bounded stream size.
841    /// The stream key is `{prefix}__local__:cdc`.
842    pub async fn xadd_cdc(
843        &self, 
844        entry: &crate::cdc::CdcEntry, 
845        maxlen: u64
846    ) -> Result<String, StorageError> {
847        let stream_key = crate::cdc::cdc_stream_key(if self.prefix.is_empty() { None } else { Some(&self.prefix) });
848        let fields = entry.to_redis_fields();
849        
850        let mut conn = self.connection.clone();
851        
852        // Build XADD command: XADD key MAXLEN ~ maxlen * field1 value1 field2 value2 ...
853        let mut command = cmd("XADD");
854        command.arg(&stream_key);
855        command.arg("MAXLEN");
856        command.arg("~");
857        command.arg(maxlen);
858        command.arg("*"); // Auto-generate ID
859        
860        for (field, value) in fields {
861            command.arg(field);
862            command.arg(value.as_bytes());
863        }
864        
865        let entry_id: String = command
866            .query_async(&mut conn)
867            .await
868            .map_err(|e| StorageError::Backend(format!("XADD CDC failed: {}", e)))?;
869        
870        Ok(entry_id)
871    }
872    
873    /// Write multiple CDC entries to the stream in a pipeline.
874    /// 
875    /// Returns the stream entry IDs for each write.
876    pub async fn xadd_cdc_batch(
877        &self, 
878        entries: &[crate::cdc::CdcEntry], 
879        maxlen: u64
880    ) -> Result<Vec<String>, StorageError> {
881        if entries.is_empty() {
882            return Ok(vec![]);
883        }
884        
885        let stream_key = crate::cdc::cdc_stream_key(if self.prefix.is_empty() { None } else { Some(&self.prefix) });
886        let mut conn = self.connection.clone();
887        
888        let mut pipeline = pipe();
889        
890        for entry in entries {
891            let fields = entry.to_redis_fields();
892            
893            // Start XADD command in pipeline
894            let mut command = cmd("XADD");
895            command.arg(&stream_key);
896            command.arg("MAXLEN");
897            command.arg("~");
898            command.arg(maxlen);
899            command.arg("*");
900            
901            for (field, value) in fields {
902                command.arg(field);
903                command.arg(value.as_bytes());
904            }
905            
906            pipeline.add_command(command);
907        }
908        
909        let ids: Vec<String> = pipeline
910            .query_async(&mut conn)
911            .await
912            .map_err(|e| StorageError::Backend(format!("XADD CDC batch failed: {}", e)))?;
913        
914        Ok(ids)
915    }
916}