sync_engine/storage/
redis.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Redis storage backend for L2 cache.
5//!
6//! Content-type aware storage using RedisJSON (Redis Stack):
7//! - **JSON content** → `JSON.SET` with full structure preserved → RedisSearch indexable!
8//! - **Binary content** → Redis STRING (SET) for efficient blob storage
9//!
10//! JSON documents are stored with a clean, flat structure:
11//! ```json
12//! {
13//!   "version": 1,
14//!   "timestamp": 1767084657058,
15//!   "payload_hash": "abc123...",
16//!   "payload": {"name": "Alice", "role": "admin"},
17//!   "audit": {"batch": "...", "trace": "...", "home": "..."}
18//! }
19//! ```
20//!
21//! This enables powerful search with RediSearch ON JSON:
22//! ```text
23//! FT.CREATE idx ON JSON PREFIX 1 sync: SCHEMA $.payload.name AS name TEXT
24//! FT.SEARCH idx '@name:Alice'
25//! ```
26
27use async_trait::async_trait;
28use redis::aio::ConnectionManager;
29use redis::{Client, AsyncCommands, pipe, cmd};
30use crate::sync_item::{SyncItem, ContentType};
31use super::traits::{BatchWriteResult, CacheStore, StorageError};
32use crate::resilience::retry::{retry, RetryConfig};
33
34pub struct RedisStore {
35    connection: ConnectionManager,
36    /// Optional key prefix for namespacing (e.g., "myapp:" → "myapp:user.alice")
37    prefix: String,
38}
39
40impl RedisStore {
41    /// Create a new Redis store without a key prefix.
42    pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
43        Self::with_prefix(connection_string, None).await
44    }
45    
46    /// Create a new Redis store with an optional key prefix.
47    /// 
48    /// The prefix is prepended to all keys, enabling namespacing when
49    /// sharing a Redis instance with other applications.
50    /// 
51    /// # Example
52    /// 
53    /// ```rust,no_run
54    /// # use sync_engine::storage::redis::RedisStore;
55    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
56    /// // Keys will be prefixed: "myapp:user.alice", "myapp:config.app"
57    /// let store = RedisStore::with_prefix("redis://localhost", Some("myapp:")).await?;
58    /// # Ok(())
59    /// # }
60    /// ```
61    pub async fn with_prefix(connection_string: &str, prefix: Option<&str>) -> Result<Self, StorageError> {
62        let client = Client::open(connection_string)
63            .map_err(|e| StorageError::Backend(e.to_string()))?;
64
65        // Use startup config: fast-fail after ~30s, don't hang forever
66        let connection = retry("redis_connect", &RetryConfig::startup(), || async {
67            ConnectionManager::new(client.clone()).await
68        })
69        .await
70        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
71
72        Ok(Self { 
73            connection,
74            prefix: prefix.unwrap_or("").to_string(),
75        })
76    }
77    
78    /// Apply the prefix to a key.
79    #[inline]
80    fn prefixed_key(&self, key: &str) -> String {
81        if self.prefix.is_empty() {
82            key.to_string()
83        } else {
84            format!("{}{}", self.prefix, key)
85        }
86    }
87    
88    /// Strip the prefix from a key (for returning clean IDs).
89    /// Will be used when implementing key iteration/scanning.
90    #[inline]
91    #[allow(dead_code)]
92    fn strip_prefix<'a>(&self, key: &'a str) -> &'a str {
93        if self.prefix.is_empty() {
94            key
95        } else {
96            key.strip_prefix(&self.prefix).unwrap_or(key)
97        }
98    }
99
100    /// Get a clone of the connection manager (for sharing with MerkleStore)
101    pub fn connection(&self) -> ConnectionManager {
102        self.connection.clone()
103    }
104    
105    /// Get the configured prefix
106    pub fn prefix(&self) -> &str {
107        &self.prefix
108    }
109    
110    /// Build the JSON document for RedisJSON storage.
111    /// 
112    /// Structure (flat with nested audit):
113    /// ```json
114    /// {
115    ///   "version": 1,
116    ///   "timestamp": 1767084657058,
117    ///   "payload_hash": "abc123...",
118    ///   "state": "default",
119    ///   "access_count": 5,
120    ///   "last_accessed": 1767084660000,
121    ///   "payload": {"name": "Alice", ...},
122    ///   "audit": {"batch": "...", "trace": "...", "home": "..."}
123    /// }
124    /// ```
125    fn build_json_document(item: &SyncItem) -> Result<String, StorageError> {
126        // Parse user content as JSON
127        let payload: serde_json::Value = serde_json::from_slice(&item.content)
128            .map_err(|e| StorageError::Backend(format!("Invalid JSON content: {}", e)))?;
129        
130        // Build audit object (internal operational metadata)
131        let mut audit = serde_json::Map::new();
132        if let Some(ref batch_id) = item.batch_id {
133            audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
134        }
135        if let Some(ref trace_parent) = item.trace_parent {
136            audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
137        }
138        if let Some(ref home) = item.home_instance_id {
139            audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
140        }
141        
142        // Build final document (flat structure)
143        let mut doc = serde_json::json!({
144            "version": item.version,
145            "timestamp": item.updated_at,
146            "state": item.state,
147            "access_count": item.access_count,
148            "last_accessed": item.last_accessed,
149            "payload": payload
150        });
151        
152        // Only include payload_hash if non-empty
153        if !item.content_hash.is_empty() {
154            doc["payload_hash"] = serde_json::Value::String(item.content_hash.clone());
155        }
156        
157        // Only include audit if there's something in it
158        if !audit.is_empty() {
159            doc["audit"] = serde_json::Value::Object(audit);
160        }
161        
162        serde_json::to_string(&doc)
163            .map_err(|e| StorageError::Backend(e.to_string()))
164    }
165    
166    /// Parse a RedisJSON document back into a SyncItem.
167    fn parse_json_document(id: &str, json_str: &str) -> Result<SyncItem, StorageError> {
168        let doc: serde_json::Value = serde_json::from_str(json_str)
169            .map_err(|e| StorageError::Backend(format!("Invalid JSON document: {}", e)))?;
170        
171        // Top-level fields
172        let version = doc.get("version").and_then(|v| v.as_u64()).unwrap_or(1);
173        let updated_at = doc.get("timestamp").and_then(|v| v.as_i64()).unwrap_or(0);
174        let content_hash = doc.get("payload_hash").and_then(|v| v.as_str()).unwrap_or("").to_string();
175        let state = doc.get("state").and_then(|v| v.as_str()).unwrap_or("default").to_string();
176        
177        // Access metadata (local-only, not replicated)
178        let access_count = doc.get("access_count").and_then(|v| v.as_u64()).unwrap_or(0);
179        let last_accessed = doc.get("last_accessed").and_then(|v| v.as_u64()).unwrap_or(0);
180        
181        // Audit fields (nested)
182        let audit = doc.get("audit");
183        let batch_id = audit.and_then(|a| a.get("batch")).and_then(|v| v.as_str()).map(String::from);
184        let trace_parent = audit.and_then(|a| a.get("trace")).and_then(|v| v.as_str()).map(String::from);
185        let home_instance_id = audit.and_then(|a| a.get("home")).and_then(|v| v.as_str()).map(String::from);
186        
187        // Extract payload and serialize back to bytes
188        let payload = doc.get("payload").cloned().unwrap_or(serde_json::Value::Null);
189        let content = serde_json::to_vec(&payload)
190            .map_err(|e| StorageError::Backend(e.to_string()))?;
191        
192        Ok(SyncItem::reconstruct(
193            id.to_string(),
194            version,
195            updated_at,
196            ContentType::Json,
197            content,
198            batch_id,
199            trace_parent,
200            content_hash,
201            home_instance_id,
202            state,
203            access_count,
204            last_accessed,
205        ))
206    }
207
208    /// Parse FT.SEARCH NOCONTENT response into a list of keys.
209    /// Response format: [count, key1, key2, ...]
210    fn parse_ft_search_response(value: redis::Value) -> Result<Vec<String>, redis::RedisError> {
211        match value {
212            redis::Value::Array(arr) => {
213                if arr.is_empty() {
214                    return Ok(vec![]);
215                }
216                // First element is count, rest are keys
217                let keys: Vec<String> = arr.into_iter()
218                    .skip(1) // Skip count
219                    .filter_map(|v| match v {
220                        redis::Value::BulkString(bytes) => String::from_utf8(bytes).ok(),
221                        redis::Value::SimpleString(s) => Some(s),
222                        _ => None,
223                    })
224                    .collect();
225                Ok(keys)
226            }
227            _ => Ok(vec![]),
228        }
229    }
230}
231
232#[async_trait]
233impl CacheStore for RedisStore {
234    async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
235        let conn = self.connection.clone();
236        let prefixed_id = self.prefixed_key(id);
237        let original_id = id.to_string();
238        
239        // Check the type of the key to determine how to read it
240        let key_type: Option<String> = retry("redis_type", &RetryConfig::query(), || {
241            let mut conn = conn.clone();
242            let key = prefixed_id.clone();
243            async move {
244                let t: String = redis::cmd("TYPE").arg(&key).query_async(&mut conn).await?;
245                Ok(if t == "none" { None } else { Some(t) })
246            }
247        })
248        .await
249        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
250        
251        match key_type.as_deref() {
252            None => Ok(None), // Key doesn't exist
253            Some("ReJSON-RL") => {
254                // RedisJSON document - use JSON.GET
255                let json_str: Option<String> = retry("redis_json_get", &RetryConfig::query(), || {
256                    let mut conn = conn.clone();
257                    let key = prefixed_id.clone();
258                    async move {
259                        let data: Option<String> = cmd("JSON.GET")
260                            .arg(&key)
261                            .query_async(&mut conn)
262                            .await?;
263                        Ok(data)
264                    }
265                })
266                .await
267                .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
268                
269                match json_str {
270                    Some(s) => Self::parse_json_document(&original_id, &s).map(Some),
271                    None => Ok(None),
272                }
273            }
274            Some("string") => {
275                // Binary content or legacy format - read as bytes
276                let data: Option<Vec<u8>> = retry("redis_get", &RetryConfig::query(), || {
277                    let mut conn = conn.clone();
278                    let key = prefixed_id.clone();
279                    async move {
280                        let data: Option<Vec<u8>> = conn.get(&key).await?;
281                        Ok(data)
282                    }
283                })
284                .await
285                .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
286                
287                data.map(|bytes| serde_json::from_slice(&bytes).map_err(|e| StorageError::Backend(e.to_string())))
288                    .transpose()
289            }
290            Some(other) => {
291                Err(StorageError::Backend(format!("Unexpected Redis key type: {}", other)))
292            }
293        }
294    }
295
296    async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
297        let conn = self.connection.clone();
298        let prefixed_id = self.prefixed_key(&item.object_id);
299        
300        match item.content_type {
301            ContentType::Json => {
302                // Build JSON document with metadata wrapper
303                let json_doc = Self::build_json_document(item)?;
304                
305                retry("redis_json_set", &RetryConfig::query(), || {
306                    let mut conn = conn.clone();
307                    let key = prefixed_id.clone();
308                    let doc = json_doc.clone();
309                    async move {
310                        // JSON.SET key $ <json>
311                        let _: () = cmd("JSON.SET")
312                            .arg(&key)
313                            .arg("$")
314                            .arg(&doc)
315                            .query_async(&mut conn)
316                            .await?;
317                        Ok(())
318                    }
319                })
320                .await
321                .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
322            }
323            ContentType::Binary => {
324                // Store as serialized blob (binary content)
325                let data = serde_json::to_vec(item)
326                    .map_err(|e| StorageError::Backend(e.to_string()))?;
327
328                retry("redis_set", &RetryConfig::query(), || {
329                    let mut conn = conn.clone();
330                    let key = prefixed_id.clone();
331                    let data = data.clone();
332                    async move {
333                        let _: () = conn.set(&key, &data).await?;
334                        Ok(())
335                    }
336                })
337                .await
338                .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
339            }
340        }
341    }
342
343    async fn delete(&self, id: &str) -> Result<(), StorageError> {
344        let conn = self.connection.clone();
345        let prefixed_id = self.prefixed_key(id);
346
347        retry("redis_delete", &RetryConfig::query(), || {
348            let mut conn = conn.clone();
349            let key = prefixed_id.clone();
350            async move {
351                let _: () = conn.del(&key).await?;
352                Ok(())
353            }
354        })
355        .await
356        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
357    }
358
359    async fn exists(&self, id: &str) -> Result<bool, StorageError> {
360        let conn = self.connection.clone();
361        let prefixed_id = self.prefixed_key(id);
362
363        retry("redis_exists", &RetryConfig::query(), || {
364            let mut conn = conn.clone();
365            let key = prefixed_id.clone();
366            async move {
367                let exists: bool = conn.exists(&key).await?;
368                Ok(exists)
369            }
370        })
371        .await
372        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
373    }
374
375    /// Write a batch of items using Redis pipeline (atomic, much faster than individual SETs).
376    async fn put_batch(&self, items: &[SyncItem]) -> Result<BatchWriteResult, StorageError> {
377        self.put_batch_with_ttl(items, None).await
378    }
379    
380    /// Write a batch of items with optional TTL.
381    async fn put_batch_with_ttl(&self, items: &[SyncItem], ttl_secs: Option<u64>) -> Result<BatchWriteResult, StorageError> {
382        self.put_batch_impl(items, ttl_secs).await
383    }
384
385    /// Create a RediSearch index (FT.CREATE).
386    async fn ft_create(&self, args: &[String]) -> Result<(), StorageError> {
387        let conn = self.connection.clone();
388
389        retry("redis_ft_create", &RetryConfig::query(), || {
390            let mut conn = conn.clone();
391            let args = args.to_vec();
392            async move {
393                let mut cmd = cmd("FT.CREATE");
394                for arg in &args {
395                    cmd.arg(arg);
396                }
397                let _: () = cmd.query_async(&mut conn).await?;
398                Ok(())
399            }
400        })
401        .await
402        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
403    }
404
405    /// Drop a RediSearch index (FT.DROPINDEX).
406    async fn ft_dropindex(&self, index: &str) -> Result<(), StorageError> {
407        let conn = self.connection.clone();
408        let index = index.to_string();
409
410        retry("redis_ft_dropindex", &RetryConfig::query(), || {
411            let mut conn = conn.clone();
412            let index = index.clone();
413            async move {
414                let _: () = cmd("FT.DROPINDEX")
415                    .arg(&index)
416                    .query_async(&mut conn)
417                    .await?;
418                Ok(())
419            }
420        })
421        .await
422        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
423    }
424
425    /// Search using RediSearch (FT.SEARCH).
426    async fn ft_search(&self, index: &str, query: &str, limit: usize) -> Result<Vec<String>, StorageError> {
427        let conn = self.connection.clone();
428        let index = index.to_string();
429        let query = query.to_string();
430
431        retry("redis_ft_search", &RetryConfig::query(), || {
432            let mut conn = conn.clone();
433            let index = index.clone();
434            let query = query.clone();
435            async move {
436                // FT.SEARCH index query LIMIT 0 limit NOCONTENT
437                // NOCONTENT returns only keys, not document content
438                let result: redis::Value = cmd("FT.SEARCH")
439                    .arg(&index)
440                    .arg(&query)
441                    .arg("LIMIT")
442                    .arg(0)
443                    .arg(limit)
444                    .arg("NOCONTENT")
445                    .query_async(&mut conn)
446                    .await?;
447
448                // Parse FT.SEARCH response: [count, key1, key2, ...]
449                Self::parse_ft_search_response(result)
450            }
451        })
452        .await
453        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
454    }
455
456    /// Search using RediSearch with binary parameters (for vector KNN search).
457    /// Uses FT.SEARCH index query PARAMS n name blob... LIMIT offset count NOCONTENT
458    async fn ft_search_with_params(
459        &self,
460        index: &str,
461        query: &str,
462        params: &[(String, Vec<u8>)],
463        limit: usize,
464    ) -> Result<Vec<String>, StorageError> {
465        let conn = self.connection.clone();
466        let index = index.to_string();
467        let query = query.to_string();
468        let params: Vec<(String, Vec<u8>)> = params.to_vec();
469
470        retry("redis_ft_search_knn", &RetryConfig::query(), || {
471            let mut conn = conn.clone();
472            let index = index.clone();
473            let query = query.clone();
474            let params = params.clone();
475            async move {
476                // FT.SEARCH index query PARAMS n name1 blob1 name2 blob2... LIMIT 0 limit NOCONTENT
477                let mut command = cmd("FT.SEARCH");
478                command.arg(&index).arg(&query);
479
480                // Add PARAMS section: PARAMS {count} {name} {blob}...
481                if !params.is_empty() {
482                    command.arg("PARAMS").arg(params.len() * 2);
483                    for (name, blob) in &params {
484                        command.arg(name).arg(blob.as_slice());
485                    }
486                }
487
488                command.arg("LIMIT").arg(0).arg(limit).arg("NOCONTENT");
489
490                let result: redis::Value = command.query_async(&mut conn).await?;
491                Self::parse_ft_search_response(result)
492            }
493        })
494        .await
495        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
496    }
497}
498
499impl RedisStore {
500    /// Pipelined batch write implementation with content-type aware storage.
501    /// Uses JSON.SET for JSON content, SET for binary blobs.
502    /// Also adds items to state SETs for fast state-based queries.
503    async fn put_batch_impl(&self, items: &[SyncItem], ttl_secs: Option<u64>) -> Result<BatchWriteResult, StorageError> {
504        if items.is_empty() {
505            return Ok(BatchWriteResult {
506                batch_id: String::new(),
507                written: 0,
508                verified: true,
509            });
510        }
511
512        // Prepare items: JSON → JSON.SET document, Binary → serialized bytes
513        #[derive(Clone)]
514        enum PreparedItem {
515            Json { key: String, id: String, state: String, doc: String },
516            Blob { key: String, id: String, state: String, data: Vec<u8> },
517        }
518        
519        let prepared: Result<Vec<_>, _> = items.iter()
520            .map(|item| {
521                let prefixed_key = self.prefixed_key(&item.object_id);
522                let id = item.object_id.clone();
523                let state = item.state.clone();
524                match item.content_type {
525                    ContentType::Json => {
526                        Self::build_json_document(item)
527                            .map(|doc| PreparedItem::Json { key: prefixed_key, id, state, doc })
528                    }
529                    ContentType::Binary => {
530                        serde_json::to_vec(item)
531                            .map(|bytes| PreparedItem::Blob { key: prefixed_key, id, state, data: bytes })
532                            .map_err(|e| StorageError::Backend(e.to_string()))
533                    }
534                }
535            })
536            .collect();
537        let prepared = prepared?;
538        let count = prepared.len();
539
540        let conn = self.connection.clone();
541        let prefix = self.prefix.clone();
542        
543        retry("redis_put_batch", &RetryConfig::batch_write(), || {
544            let mut conn = conn.clone();
545            let prepared = prepared.clone();
546            let prefix = prefix.clone();
547            async move {
548                let mut pipeline = pipe();
549                
550                for item in &prepared {
551                    match item {
552                        PreparedItem::Json { key, id, state, doc } => {
553                            // JSON.SET key $ <json>
554                            pipeline.cmd("JSON.SET").arg(key).arg("$").arg(doc);
555                            if let Some(ttl) = ttl_secs {
556                                pipeline.expire(key, ttl as i64);
557                            }
558                            // Add to state SET: sync:state:{state}
559                            let state_key = format!("{}state:{}", prefix, state);
560                            pipeline.cmd("SADD").arg(&state_key).arg(id);
561                        }
562                        PreparedItem::Blob { key, id, state, data } => {
563                            // SET for binary content
564                            if let Some(ttl) = ttl_secs {
565                                pipeline.cmd("SETEX").arg(key).arg(ttl as i64).arg(data.as_slice());
566                            } else {
567                                pipeline.set(key, data.as_slice());
568                            }
569                            // Add to state SET
570                            let state_key = format!("{}state:{}", prefix, state);
571                            pipeline.cmd("SADD").arg(&state_key).arg(id);
572                        }
573                    }
574                }
575                
576                pipeline.query_async::<()>(&mut conn).await?;
577                Ok(())
578            }
579        })
580        .await
581        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
582
583        Ok(BatchWriteResult {
584            batch_id: String::new(),
585            written: count,
586            verified: true,
587        })
588    }
589
590    /// Check if multiple keys exist in Redis (pipelined).
591    /// Returns a vec of bools matching the input order.
592    pub async fn exists_batch(&self, ids: &[String]) -> Result<Vec<bool>, StorageError> {
593        if ids.is_empty() {
594            return Ok(vec![]);
595        }
596
597        let conn = self.connection.clone();
598        // Apply prefix to all keys
599        let prefixed_ids: Vec<String> = ids.iter().map(|id| self.prefixed_key(id)).collect();
600
601        retry("redis_exists_batch", &RetryConfig::query(), || {
602            let mut conn = conn.clone();
603            let prefixed_ids = prefixed_ids.clone();
604            async move {
605                let mut pipeline = pipe();
606                for key in &prefixed_ids {
607                    pipeline.exists(key);
608                }
609                
610                let results: Vec<bool> = pipeline.query_async(&mut conn).await?;
611                Ok(results)
612            }
613        })
614        .await
615        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
616    }
617    
618    // ═══════════════════════════════════════════════════════════════════════════
619    // State SET operations: O(1) membership, fast iteration by state
620    // ═══════════════════════════════════════════════════════════════════════════
621    
622    /// Get all IDs in a given state (from Redis SET).
623    ///
624    /// Returns IDs without prefix - ready to use with `get()`.
625    pub async fn list_state_ids(&self, state: &str) -> Result<Vec<String>, StorageError> {
626        let mut conn = self.connection.clone();
627        let state_key = format!("{}state:{}", self.prefix, state);
628        
629        let ids: Vec<String> = cmd("SMEMBERS")
630            .arg(&state_key)
631            .query_async(&mut conn)
632            .await
633            .map_err(|e| StorageError::Backend(format!("Failed to get state members: {}", e)))?;
634        
635        Ok(ids)
636    }
637    
638    /// Count items in a given state (SET cardinality).
639    pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
640        let mut conn = self.connection.clone();
641        let state_key = format!("{}state:{}", self.prefix, state);
642        
643        let count: u64 = cmd("SCARD")
644            .arg(&state_key)
645            .query_async(&mut conn)
646            .await
647            .map_err(|e| StorageError::Backend(format!("Failed to count state: {}", e)))?;
648        
649        Ok(count)
650    }
651    
652    /// Check if an ID is in a given state (SET membership).
653    pub async fn is_in_state(&self, id: &str, state: &str) -> Result<bool, StorageError> {
654        let mut conn = self.connection.clone();
655        let state_key = format!("{}state:{}", self.prefix, state);
656        
657        let is_member: bool = cmd("SISMEMBER")
658            .arg(&state_key)
659            .arg(id)
660            .query_async(&mut conn)
661            .await
662            .map_err(|e| StorageError::Backend(format!("Failed to check state membership: {}", e)))?;
663        
664        Ok(is_member)
665    }
666    
667    /// Move an ID from one state to another (atomic SMOVE).
668    ///
669    /// Returns true if the item was moved, false if it wasn't in the source state.
670    pub async fn move_state(&self, id: &str, from_state: &str, to_state: &str) -> Result<bool, StorageError> {
671        let mut conn = self.connection.clone();
672        let from_key = format!("{}state:{}", self.prefix, from_state);
673        let to_key = format!("{}state:{}", self.prefix, to_state);
674        
675        let moved: bool = cmd("SMOVE")
676            .arg(&from_key)
677            .arg(&to_key)
678            .arg(id)
679            .query_async(&mut conn)
680            .await
681            .map_err(|e| StorageError::Backend(format!("Failed to move state: {}", e)))?;
682        
683        Ok(moved)
684    }
685    
686    /// Remove an ID from a state SET.
687    pub async fn remove_from_state(&self, id: &str, state: &str) -> Result<bool, StorageError> {
688        let mut conn = self.connection.clone();
689        let state_key = format!("{}state:{}", self.prefix, state);
690        
691        let removed: u32 = cmd("SREM")
692            .arg(&state_key)
693            .arg(id)
694            .query_async(&mut conn)
695            .await
696            .map_err(|e| StorageError::Backend(format!("Failed to remove from state: {}", e)))?;
697        
698        Ok(removed > 0)
699    }
700    
701    /// Delete all items in a state (both the SET and the actual keys).
702    ///
703    /// Returns the number of items deleted.
704    pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
705        let mut conn = self.connection.clone();
706        let state_key = format!("{}state:{}", self.prefix, state);
707        
708        // Get all IDs in this state
709        let ids: Vec<String> = cmd("SMEMBERS")
710            .arg(&state_key)
711            .query_async(&mut conn)
712            .await
713            .map_err(|e| StorageError::Backend(format!("Failed to get state members: {}", e)))?;
714        
715        if ids.is_empty() {
716            return Ok(0);
717        }
718        
719        let count = ids.len() as u64;
720        
721        // Delete all the keys and the state SET
722        let mut pipeline = pipe();
723        for id in &ids {
724            let key = self.prefixed_key(id);
725            pipeline.del(&key);
726        }
727        pipeline.del(&state_key);
728        
729        pipeline.query_async::<()>(&mut conn)
730            .await
731            .map_err(|e| StorageError::Backend(format!("Failed to delete state items: {}", e)))?;
732        
733        Ok(count)
734    }
735    
736    /// Scan items by ID prefix using Redis SCAN.
737    ///
738    /// Uses cursor-based SCAN with MATCH pattern for safe iteration.
739    /// Does NOT block the server (unlike KEYS).
740    ///
741    /// # Example
742    /// ```rust,ignore
743    /// // Get all deltas for object user.123
744    /// let deltas = store.scan_prefix("delta:user.123:", 1000).await?;
745    /// ```
746    pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
747        let mut conn = self.connection.clone();
748        
749        // Build match pattern: "{store_prefix}{user_prefix}*"
750        let pattern = format!("{}{}*", self.prefix, prefix);
751        
752        let mut items = Vec::new();
753        let mut cursor: u64 = 0;
754        
755        // SCAN iteration (cursor-based, non-blocking)
756        loop {
757            // SCAN cursor MATCH pattern COUNT batch_size
758            let (new_cursor, keys): (u64, Vec<String>) = cmd("SCAN")
759                .arg(cursor)
760                .arg("MATCH")
761                .arg(&pattern)
762                .arg("COUNT")
763                .arg(100) // Batch size per iteration
764                .query_async(&mut conn)
765                .await
766                .map_err(|e| StorageError::Backend(format!("SCAN failed: {}", e)))?;
767            
768            // Fetch each key using JSON.GET
769            for key in keys {
770                if items.len() >= limit {
771                    break;
772                }
773                
774                let json_opt: Option<String> = cmd("JSON.GET")
775                    .arg(&key)
776                    .query_async(&mut conn)
777                    .await
778                    .map_err(|e| StorageError::Backend(format!("JSON.GET failed: {}", e)))?;
779                
780                if let Some(json_str) = json_opt {
781                    // Strip prefix to get clean ID
782                    let id = self.strip_prefix(&key);
783                    if let Ok(item) = Self::parse_json_document(id, &json_str) {
784                        items.push(item);
785                    }
786                }
787            }
788            
789            cursor = new_cursor;
790            
791            // Stop if cursor is 0 (complete) or we have enough items
792            if cursor == 0 || items.len() >= limit {
793                break;
794            }
795        }
796        
797        Ok(items)
798    }
799    
800    /// Count items matching an ID prefix.
801    ///
802    /// Note: This requires scanning all matching keys, so use sparingly.
803    pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
804        let mut conn = self.connection.clone();
805        let pattern = format!("{}{}*", self.prefix, prefix);
806        
807        let mut count: u64 = 0;
808        let mut cursor: u64 = 0;
809        
810        loop {
811            let (new_cursor, keys): (u64, Vec<String>) = cmd("SCAN")
812                .arg(cursor)
813                .arg("MATCH")
814                .arg(&pattern)
815                .arg("COUNT")
816                .arg(1000)
817                .query_async(&mut conn)
818                .await
819                .map_err(|e| StorageError::Backend(format!("SCAN failed: {}", e)))?;
820            
821            count += keys.len() as u64;
822            cursor = new_cursor;
823            
824            if cursor == 0 {
825                break;
826            }
827        }
828        
829        Ok(count)
830    }
831    
832    /// Delete all items matching an ID prefix.
833    ///
834    /// Returns the number of deleted items.
835    pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
836        let mut conn = self.connection.clone();
837        let pattern = format!("{}{}*", self.prefix, prefix);
838        
839        let mut deleted: u64 = 0;
840        let mut cursor: u64 = 0;
841        
842        loop {
843            let (new_cursor, keys): (u64, Vec<String>) = cmd("SCAN")
844                .arg(cursor)
845                .arg("MATCH")
846                .arg(&pattern)
847                .arg("COUNT")
848                .arg(1000)
849                .query_async(&mut conn)
850                .await
851                .map_err(|e| StorageError::Backend(format!("SCAN failed: {}", e)))?;
852            
853            if !keys.is_empty() {
854                // Batch delete
855                let mut pipeline = pipe();
856                for key in &keys {
857                    pipeline.del(key);
858                }
859                pipeline.query_async::<()>(&mut conn)
860                    .await
861                    .map_err(|e| StorageError::Backend(format!("DEL failed: {}", e)))?;
862                
863                deleted += keys.len() as u64;
864            }
865            
866            cursor = new_cursor;
867            
868            if cursor == 0 {
869                break;
870            }
871        }
872        
873        Ok(deleted)
874    }
875    
876    // ========================================================================
877    // CDC Stream Methods
878    // ========================================================================
879    
880    /// Write a CDC entry to the stream.
881    /// 
882    /// Uses XADD with MAXLEN ~ for bounded stream size.
883    /// The stream key is `{prefix}__local__:cdc`.
884    pub async fn xadd_cdc(
885        &self, 
886        entry: &crate::cdc::CdcEntry, 
887        maxlen: u64
888    ) -> Result<String, StorageError> {
889        let stream_key = crate::cdc::cdc_stream_key(if self.prefix.is_empty() { None } else { Some(&self.prefix) });
890        let fields = entry.to_redis_fields();
891        
892        let mut conn = self.connection.clone();
893        
894        // Build XADD command: XADD key MAXLEN ~ maxlen * field1 value1 field2 value2 ...
895        let mut command = cmd("XADD");
896        command.arg(&stream_key);
897        command.arg("MAXLEN");
898        command.arg("~");
899        command.arg(maxlen);
900        command.arg("*"); // Auto-generate ID
901        
902        for (field, value) in fields {
903            command.arg(field);
904            command.arg(value.as_bytes());
905        }
906        
907        let entry_id: String = command
908            .query_async(&mut conn)
909            .await
910            .map_err(|e| StorageError::Backend(format!("XADD CDC failed: {}", e)))?;
911        
912        Ok(entry_id)
913    }
914    
915    /// Write multiple CDC entries to the stream in a pipeline.
916    /// 
917    /// Returns the stream entry IDs for each write.
918    pub async fn xadd_cdc_batch(
919        &self, 
920        entries: &[crate::cdc::CdcEntry], 
921        maxlen: u64
922    ) -> Result<Vec<String>, StorageError> {
923        if entries.is_empty() {
924            return Ok(vec![]);
925        }
926        
927        let stream_key = crate::cdc::cdc_stream_key(if self.prefix.is_empty() { None } else { Some(&self.prefix) });
928        let mut conn = self.connection.clone();
929        
930        let mut pipeline = pipe();
931        
932        for entry in entries {
933            let fields = entry.to_redis_fields();
934            
935            // Start XADD command in pipeline
936            let mut command = cmd("XADD");
937            command.arg(&stream_key);
938            command.arg("MAXLEN");
939            command.arg("~");
940            command.arg(maxlen);
941            command.arg("*");
942            
943            for (field, value) in fields {
944                command.arg(field);
945                command.arg(value.as_bytes());
946            }
947            
948            pipeline.add_command(command);
949        }
950        
951        let ids: Vec<String> = pipeline
952            .query_async(&mut conn)
953            .await
954            .map_err(|e| StorageError::Backend(format!("XADD CDC batch failed: {}", e)))?;
955        
956        Ok(ids)
957    }
958}