sync_engine/storage/
redis.rs

1//! Redis storage backend for L2 cache.
2//!
3//! Content-type aware storage using RedisJSON (Redis Stack):
4//! - **JSON content** → `JSON.SET` with full structure preserved → RedisSearch indexable!
5//! - **Binary content** → Redis STRING (SET) for efficient blob storage
6//!
7//! JSON documents are stored with a clean, flat structure:
8//! ```json
9//! {
10//!   "version": 1,
11//!   "timestamp": 1767084657058,
12//!   "payload_hash": "abc123...",
13//!   "payload": {"name": "Alice", "role": "admin"},
14//!   "audit": {"batch": "...", "trace": "...", "home": "..."}
15//! }
16//! ```
17//!
18//! This enables powerful search with RediSearch ON JSON:
19//! ```text
20//! FT.CREATE idx ON JSON PREFIX 1 sync: SCHEMA $.payload.name AS name TEXT
21//! FT.SEARCH idx '@name:Alice'
22//! ```
23
24use async_trait::async_trait;
25use redis::aio::ConnectionManager;
26use redis::{Client, AsyncCommands, pipe, cmd};
27use crate::sync_item::{SyncItem, ContentType};
28use super::traits::{BatchWriteResult, CacheStore, StorageError};
29use crate::resilience::retry::{retry, RetryConfig};
30
31pub struct RedisStore {
32    connection: ConnectionManager,
33    /// Optional key prefix for namespacing (e.g., "myapp:" → "myapp:user.alice")
34    prefix: String,
35}
36
37impl RedisStore {
38    /// Create a new Redis store without a key prefix.
39    pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
40        Self::with_prefix(connection_string, None).await
41    }
42    
43    /// Create a new Redis store with an optional key prefix.
44    /// 
45    /// The prefix is prepended to all keys, enabling namespacing when
46    /// sharing a Redis instance with other applications.
47    /// 
48    /// # Example
49    /// 
50    /// ```rust,no_run
51    /// # use sync_engine::storage::redis::RedisStore;
52    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
53    /// // Keys will be prefixed: "myapp:user.alice", "myapp:config.app"
54    /// let store = RedisStore::with_prefix("redis://localhost", Some("myapp:")).await?;
55    /// # Ok(())
56    /// # }
57    /// ```
58    pub async fn with_prefix(connection_string: &str, prefix: Option<&str>) -> Result<Self, StorageError> {
59        let client = Client::open(connection_string)
60            .map_err(|e| StorageError::Backend(e.to_string()))?;
61
62        // Use startup config: fast-fail after ~30s, don't hang forever
63        let connection = retry("redis_connect", &RetryConfig::startup(), || async {
64            ConnectionManager::new(client.clone()).await
65        })
66        .await
67        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
68
69        Ok(Self { 
70            connection,
71            prefix: prefix.unwrap_or("").to_string(),
72        })
73    }
74    
75    /// Apply the prefix to a key.
76    #[inline]
77    fn prefixed_key(&self, key: &str) -> String {
78        if self.prefix.is_empty() {
79            key.to_string()
80        } else {
81            format!("{}{}", self.prefix, key)
82        }
83    }
84    
85    /// Strip the prefix from a key (for returning clean IDs).
86    /// Will be used when implementing key iteration/scanning.
87    #[inline]
88    #[allow(dead_code)]
89    fn strip_prefix<'a>(&self, key: &'a str) -> &'a str {
90        if self.prefix.is_empty() {
91            key
92        } else {
93            key.strip_prefix(&self.prefix).unwrap_or(key)
94        }
95    }
96
97    /// Get a clone of the connection manager (for sharing with MerkleStore)
98    pub fn connection(&self) -> ConnectionManager {
99        self.connection.clone()
100    }
101    
102    /// Get the configured prefix
103    pub fn prefix(&self) -> &str {
104        &self.prefix
105    }
106    
107    /// Build the JSON document for RedisJSON storage.
108    /// 
109    /// Structure (flat with nested audit):
110    /// ```json
111    /// {
112    ///   "version": 1,
113    ///   "timestamp": 1767084657058,
114    ///   "payload_hash": "abc123...",
115    ///   "state": "default",
116    ///   "payload": {"name": "Alice", ...},
117    ///   "audit": {"batch": "...", "trace": "...", "home": "..."}
118    /// }
119    /// ```
120    fn build_json_document(item: &SyncItem) -> Result<String, StorageError> {
121        // Parse user content as JSON
122        let payload: serde_json::Value = serde_json::from_slice(&item.content)
123            .map_err(|e| StorageError::Backend(format!("Invalid JSON content: {}", e)))?;
124        
125        // Build audit object (internal operational metadata)
126        let mut audit = serde_json::Map::new();
127        if let Some(ref batch_id) = item.batch_id {
128            audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
129        }
130        if let Some(ref trace_parent) = item.trace_parent {
131            audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
132        }
133        if let Some(ref home) = item.home_instance_id {
134            audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
135        }
136        
137        // Build final document (flat structure)
138        let mut doc = serde_json::json!({
139            "version": item.version,
140            "timestamp": item.updated_at,
141            "state": item.state,
142            "payload": payload
143        });
144        
145        // Only include payload_hash if non-empty
146        if !item.merkle_root.is_empty() {
147            doc["payload_hash"] = serde_json::Value::String(item.merkle_root.clone());
148        }
149        
150        // Only include audit if there's something in it
151        if !audit.is_empty() {
152            doc["audit"] = serde_json::Value::Object(audit);
153        }
154        
155        serde_json::to_string(&doc)
156            .map_err(|e| StorageError::Backend(e.to_string()))
157    }
158    
159    /// Parse a RedisJSON document back into a SyncItem.
160    fn parse_json_document(id: &str, json_str: &str) -> Result<SyncItem, StorageError> {
161        let doc: serde_json::Value = serde_json::from_str(json_str)
162            .map_err(|e| StorageError::Backend(format!("Invalid JSON document: {}", e)))?;
163        
164        // Top-level fields
165        let version = doc.get("version").and_then(|v| v.as_u64()).unwrap_or(1);
166        let updated_at = doc.get("timestamp").and_then(|v| v.as_i64()).unwrap_or(0);
167        let merkle_root = doc.get("payload_hash").and_then(|v| v.as_str()).unwrap_or("").to_string();
168        let state = doc.get("state").and_then(|v| v.as_str()).unwrap_or("default").to_string();
169        
170        // Audit fields (nested)
171        let audit = doc.get("audit");
172        let batch_id = audit.and_then(|a| a.get("batch")).and_then(|v| v.as_str()).map(String::from);
173        let trace_parent = audit.and_then(|a| a.get("trace")).and_then(|v| v.as_str()).map(String::from);
174        let home_instance_id = audit.and_then(|a| a.get("home")).and_then(|v| v.as_str()).map(String::from);
175        
176        // Extract payload and serialize back to bytes
177        let payload = doc.get("payload").cloned().unwrap_or(serde_json::Value::Null);
178        let content = serde_json::to_vec(&payload)
179            .map_err(|e| StorageError::Backend(e.to_string()))?;
180        
181        Ok(SyncItem::reconstruct(
182            id.to_string(),
183            version,
184            updated_at,
185            ContentType::Json,
186            content,
187            batch_id,
188            trace_parent,
189            merkle_root,
190            home_instance_id,
191            state,
192        ))
193    }
194}
195
196#[async_trait]
197impl CacheStore for RedisStore {
198    async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
199        let conn = self.connection.clone();
200        let prefixed_id = self.prefixed_key(id);
201        let original_id = id.to_string();
202        
203        // Check the type of the key to determine how to read it
204        let key_type: Option<String> = retry("redis_type", &RetryConfig::query(), || {
205            let mut conn = conn.clone();
206            let key = prefixed_id.clone();
207            async move {
208                let t: String = redis::cmd("TYPE").arg(&key).query_async(&mut conn).await?;
209                Ok(if t == "none" { None } else { Some(t) })
210            }
211        })
212        .await
213        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
214        
215        match key_type.as_deref() {
216            None => Ok(None), // Key doesn't exist
217            Some("ReJSON-RL") => {
218                // RedisJSON document - use JSON.GET
219                let json_str: Option<String> = retry("redis_json_get", &RetryConfig::query(), || {
220                    let mut conn = conn.clone();
221                    let key = prefixed_id.clone();
222                    async move {
223                        let data: Option<String> = cmd("JSON.GET")
224                            .arg(&key)
225                            .query_async(&mut conn)
226                            .await?;
227                        Ok(data)
228                    }
229                })
230                .await
231                .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
232                
233                match json_str {
234                    Some(s) => Self::parse_json_document(&original_id, &s).map(Some),
235                    None => Ok(None),
236                }
237            }
238            Some("string") => {
239                // Binary content or legacy format - read as bytes
240                let data: Option<Vec<u8>> = retry("redis_get", &RetryConfig::query(), || {
241                    let mut conn = conn.clone();
242                    let key = prefixed_id.clone();
243                    async move {
244                        let data: Option<Vec<u8>> = conn.get(&key).await?;
245                        Ok(data)
246                    }
247                })
248                .await
249                .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
250                
251                data.map(|bytes| serde_json::from_slice(&bytes).map_err(|e| StorageError::Backend(e.to_string())))
252                    .transpose()
253            }
254            Some(other) => {
255                Err(StorageError::Backend(format!("Unexpected Redis key type: {}", other)))
256            }
257        }
258    }
259
260    async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
261        let conn = self.connection.clone();
262        let prefixed_id = self.prefixed_key(&item.object_id);
263        
264        match item.content_type {
265            ContentType::Json => {
266                // Build JSON document with metadata wrapper
267                let json_doc = Self::build_json_document(item)?;
268                
269                retry("redis_json_set", &RetryConfig::query(), || {
270                    let mut conn = conn.clone();
271                    let key = prefixed_id.clone();
272                    let doc = json_doc.clone();
273                    async move {
274                        // JSON.SET key $ <json>
275                        let _: () = cmd("JSON.SET")
276                            .arg(&key)
277                            .arg("$")
278                            .arg(&doc)
279                            .query_async(&mut conn)
280                            .await?;
281                        Ok(())
282                    }
283                })
284                .await
285                .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
286            }
287            ContentType::Binary => {
288                // Store as serialized blob (binary content)
289                let data = serde_json::to_vec(item)
290                    .map_err(|e| StorageError::Backend(e.to_string()))?;
291
292                retry("redis_set", &RetryConfig::query(), || {
293                    let mut conn = conn.clone();
294                    let key = prefixed_id.clone();
295                    let data = data.clone();
296                    async move {
297                        let _: () = conn.set(&key, &data).await?;
298                        Ok(())
299                    }
300                })
301                .await
302                .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
303            }
304        }
305    }
306
307    async fn delete(&self, id: &str) -> Result<(), StorageError> {
308        let conn = self.connection.clone();
309        let prefixed_id = self.prefixed_key(id);
310
311        retry("redis_delete", &RetryConfig::query(), || {
312            let mut conn = conn.clone();
313            let key = prefixed_id.clone();
314            async move {
315                let _: () = conn.del(&key).await?;
316                Ok(())
317            }
318        })
319        .await
320        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
321    }
322
323    async fn exists(&self, id: &str) -> Result<bool, StorageError> {
324        let conn = self.connection.clone();
325        let prefixed_id = self.prefixed_key(id);
326
327        retry("redis_exists", &RetryConfig::query(), || {
328            let mut conn = conn.clone();
329            let key = prefixed_id.clone();
330            async move {
331                let exists: bool = conn.exists(&key).await?;
332                Ok(exists)
333            }
334        })
335        .await
336        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
337    }
338
339    /// Write a batch of items using Redis pipeline (atomic, much faster than individual SETs).
340    async fn put_batch(&self, items: &[SyncItem]) -> Result<BatchWriteResult, StorageError> {
341        self.put_batch_with_ttl(items, None).await
342    }
343    
344    /// Write a batch of items with optional TTL.
345    async fn put_batch_with_ttl(&self, items: &[SyncItem], ttl_secs: Option<u64>) -> Result<BatchWriteResult, StorageError> {
346        self.put_batch_impl(items, ttl_secs).await
347    }
348}
349
350impl RedisStore {
351    /// Pipelined batch write implementation with content-type aware storage.
352    /// Uses JSON.SET for JSON content, SET for binary blobs.
353    /// Also adds items to state SETs for fast state-based queries.
354    async fn put_batch_impl(&self, items: &[SyncItem], ttl_secs: Option<u64>) -> Result<BatchWriteResult, StorageError> {
355        if items.is_empty() {
356            return Ok(BatchWriteResult {
357                batch_id: String::new(),
358                written: 0,
359                verified: true,
360            });
361        }
362
363        // Prepare items: JSON → JSON.SET document, Binary → serialized bytes
364        #[derive(Clone)]
365        enum PreparedItem {
366            Json { key: String, id: String, state: String, doc: String },
367            Blob { key: String, id: String, state: String, data: Vec<u8> },
368        }
369        
370        let prepared: Result<Vec<_>, _> = items.iter()
371            .map(|item| {
372                let prefixed_key = self.prefixed_key(&item.object_id);
373                let id = item.object_id.clone();
374                let state = item.state.clone();
375                match item.content_type {
376                    ContentType::Json => {
377                        Self::build_json_document(item)
378                            .map(|doc| PreparedItem::Json { key: prefixed_key, id, state, doc })
379                    }
380                    ContentType::Binary => {
381                        serde_json::to_vec(item)
382                            .map(|bytes| PreparedItem::Blob { key: prefixed_key, id, state, data: bytes })
383                            .map_err(|e| StorageError::Backend(e.to_string()))
384                    }
385                }
386            })
387            .collect();
388        let prepared = prepared?;
389        let count = prepared.len();
390
391        let conn = self.connection.clone();
392        let prefix = self.prefix.clone();
393        
394        retry("redis_put_batch", &RetryConfig::query(), || {
395            let mut conn = conn.clone();
396            let prepared = prepared.clone();
397            let prefix = prefix.clone();
398            async move {
399                let mut pipeline = pipe();
400                
401                for item in &prepared {
402                    match item {
403                        PreparedItem::Json { key, id, state, doc } => {
404                            // JSON.SET key $ <json>
405                            pipeline.cmd("JSON.SET").arg(key).arg("$").arg(doc);
406                            if let Some(ttl) = ttl_secs {
407                                pipeline.expire(key, ttl as i64);
408                            }
409                            // Add to state SET: sync:state:{state}
410                            let state_key = format!("{}state:{}", prefix, state);
411                            pipeline.cmd("SADD").arg(&state_key).arg(id);
412                        }
413                        PreparedItem::Blob { key, id, state, data } => {
414                            // SET for binary content
415                            if let Some(ttl) = ttl_secs {
416                                pipeline.cmd("SETEX").arg(key).arg(ttl as i64).arg(data.as_slice());
417                            } else {
418                                pipeline.set(key, data.as_slice());
419                            }
420                            // Add to state SET
421                            let state_key = format!("{}state:{}", prefix, state);
422                            pipeline.cmd("SADD").arg(&state_key).arg(id);
423                        }
424                    }
425                }
426                
427                pipeline.query_async::<()>(&mut conn).await?;
428                Ok(())
429            }
430        })
431        .await
432        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
433
434        Ok(BatchWriteResult {
435            batch_id: String::new(),
436            written: count,
437            verified: true,
438        })
439    }
440
441    /// Check if multiple keys exist in Redis (pipelined).
442    /// Returns a vec of bools matching the input order.
443    pub async fn exists_batch(&self, ids: &[String]) -> Result<Vec<bool>, StorageError> {
444        if ids.is_empty() {
445            return Ok(vec![]);
446        }
447
448        let conn = self.connection.clone();
449        // Apply prefix to all keys
450        let prefixed_ids: Vec<String> = ids.iter().map(|id| self.prefixed_key(id)).collect();
451
452        retry("redis_exists_batch", &RetryConfig::query(), || {
453            let mut conn = conn.clone();
454            let prefixed_ids = prefixed_ids.clone();
455            async move {
456                let mut pipeline = pipe();
457                for key in &prefixed_ids {
458                    pipeline.exists(key);
459                }
460                
461                let results: Vec<bool> = pipeline.query_async(&mut conn).await?;
462                Ok(results)
463            }
464        })
465        .await
466        .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
467    }
468    
469    // ═══════════════════════════════════════════════════════════════════════════
470    // State SET operations: O(1) membership, fast iteration by state
471    // ═══════════════════════════════════════════════════════════════════════════
472    
473    /// Get all IDs in a given state (from Redis SET).
474    ///
475    /// Returns IDs without prefix - ready to use with `get()`.
476    pub async fn list_state_ids(&self, state: &str) -> Result<Vec<String>, StorageError> {
477        let mut conn = self.connection.clone();
478        let state_key = format!("{}state:{}", self.prefix, state);
479        
480        let ids: Vec<String> = cmd("SMEMBERS")
481            .arg(&state_key)
482            .query_async(&mut conn)
483            .await
484            .map_err(|e| StorageError::Backend(format!("Failed to get state members: {}", e)))?;
485        
486        Ok(ids)
487    }
488    
489    /// Count items in a given state (SET cardinality).
490    pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
491        let mut conn = self.connection.clone();
492        let state_key = format!("{}state:{}", self.prefix, state);
493        
494        let count: u64 = cmd("SCARD")
495            .arg(&state_key)
496            .query_async(&mut conn)
497            .await
498            .map_err(|e| StorageError::Backend(format!("Failed to count state: {}", e)))?;
499        
500        Ok(count)
501    }
502    
503    /// Check if an ID is in a given state (SET membership).
504    pub async fn is_in_state(&self, id: &str, state: &str) -> Result<bool, StorageError> {
505        let mut conn = self.connection.clone();
506        let state_key = format!("{}state:{}", self.prefix, state);
507        
508        let is_member: bool = cmd("SISMEMBER")
509            .arg(&state_key)
510            .arg(id)
511            .query_async(&mut conn)
512            .await
513            .map_err(|e| StorageError::Backend(format!("Failed to check state membership: {}", e)))?;
514        
515        Ok(is_member)
516    }
517    
518    /// Move an ID from one state to another (atomic SMOVE).
519    ///
520    /// Returns true if the item was moved, false if it wasn't in the source state.
521    pub async fn move_state(&self, id: &str, from_state: &str, to_state: &str) -> Result<bool, StorageError> {
522        let mut conn = self.connection.clone();
523        let from_key = format!("{}state:{}", self.prefix, from_state);
524        let to_key = format!("{}state:{}", self.prefix, to_state);
525        
526        let moved: bool = cmd("SMOVE")
527            .arg(&from_key)
528            .arg(&to_key)
529            .arg(id)
530            .query_async(&mut conn)
531            .await
532            .map_err(|e| StorageError::Backend(format!("Failed to move state: {}", e)))?;
533        
534        Ok(moved)
535    }
536    
537    /// Remove an ID from a state SET.
538    pub async fn remove_from_state(&self, id: &str, state: &str) -> Result<bool, StorageError> {
539        let mut conn = self.connection.clone();
540        let state_key = format!("{}state:{}", self.prefix, state);
541        
542        let removed: u32 = cmd("SREM")
543            .arg(&state_key)
544            .arg(id)
545            .query_async(&mut conn)
546            .await
547            .map_err(|e| StorageError::Backend(format!("Failed to remove from state: {}", e)))?;
548        
549        Ok(removed > 0)
550    }
551    
552    /// Delete all items in a state (both the SET and the actual keys).
553    ///
554    /// Returns the number of items deleted.
555    pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
556        let mut conn = self.connection.clone();
557        let state_key = format!("{}state:{}", self.prefix, state);
558        
559        // Get all IDs in this state
560        let ids: Vec<String> = cmd("SMEMBERS")
561            .arg(&state_key)
562            .query_async(&mut conn)
563            .await
564            .map_err(|e| StorageError::Backend(format!("Failed to get state members: {}", e)))?;
565        
566        if ids.is_empty() {
567            return Ok(0);
568        }
569        
570        let count = ids.len() as u64;
571        
572        // Delete all the keys and the state SET
573        let mut pipeline = pipe();
574        for id in &ids {
575            let key = self.prefixed_key(id);
576            pipeline.del(&key);
577        }
578        pipeline.del(&state_key);
579        
580        pipeline.query_async::<()>(&mut conn)
581            .await
582            .map_err(|e| StorageError::Backend(format!("Failed to delete state items: {}", e)))?;
583        
584        Ok(count)
585    }
586    
587    /// Scan items by ID prefix using Redis SCAN.
588    ///
589    /// Uses cursor-based SCAN with MATCH pattern for safe iteration.
590    /// Does NOT block the server (unlike KEYS).
591    ///
592    /// # Example
593    /// ```rust,ignore
594    /// // Get all deltas for object user.123
595    /// let deltas = store.scan_prefix("delta:user.123:", 1000).await?;
596    /// ```
597    pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
598        let mut conn = self.connection.clone();
599        
600        // Build match pattern: "{store_prefix}{user_prefix}*"
601        let pattern = format!("{}{}*", self.prefix, prefix);
602        
603        let mut items = Vec::new();
604        let mut cursor: u64 = 0;
605        
606        // SCAN iteration (cursor-based, non-blocking)
607        loop {
608            // SCAN cursor MATCH pattern COUNT batch_size
609            let (new_cursor, keys): (u64, Vec<String>) = cmd("SCAN")
610                .arg(cursor)
611                .arg("MATCH")
612                .arg(&pattern)
613                .arg("COUNT")
614                .arg(100) // Batch size per iteration
615                .query_async(&mut conn)
616                .await
617                .map_err(|e| StorageError::Backend(format!("SCAN failed: {}", e)))?;
618            
619            // Fetch each key using JSON.GET
620            for key in keys {
621                if items.len() >= limit {
622                    break;
623                }
624                
625                let json_opt: Option<String> = cmd("JSON.GET")
626                    .arg(&key)
627                    .query_async(&mut conn)
628                    .await
629                    .map_err(|e| StorageError::Backend(format!("JSON.GET failed: {}", e)))?;
630                
631                if let Some(json_str) = json_opt {
632                    // Strip prefix to get clean ID
633                    let id = self.strip_prefix(&key);
634                    if let Ok(item) = Self::parse_json_document(id, &json_str) {
635                        items.push(item);
636                    }
637                }
638            }
639            
640            cursor = new_cursor;
641            
642            // Stop if cursor is 0 (complete) or we have enough items
643            if cursor == 0 || items.len() >= limit {
644                break;
645            }
646        }
647        
648        Ok(items)
649    }
650    
651    /// Count items matching an ID prefix.
652    ///
653    /// Note: This requires scanning all matching keys, so use sparingly.
654    pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
655        let mut conn = self.connection.clone();
656        let pattern = format!("{}{}*", self.prefix, prefix);
657        
658        let mut count: u64 = 0;
659        let mut cursor: u64 = 0;
660        
661        loop {
662            let (new_cursor, keys): (u64, Vec<String>) = cmd("SCAN")
663                .arg(cursor)
664                .arg("MATCH")
665                .arg(&pattern)
666                .arg("COUNT")
667                .arg(1000)
668                .query_async(&mut conn)
669                .await
670                .map_err(|e| StorageError::Backend(format!("SCAN failed: {}", e)))?;
671            
672            count += keys.len() as u64;
673            cursor = new_cursor;
674            
675            if cursor == 0 {
676                break;
677            }
678        }
679        
680        Ok(count)
681    }
682    
683    /// Delete all items matching an ID prefix.
684    ///
685    /// Returns the number of deleted items.
686    pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
687        let mut conn = self.connection.clone();
688        let pattern = format!("{}{}*", self.prefix, prefix);
689        
690        let mut deleted: u64 = 0;
691        let mut cursor: u64 = 0;
692        
693        loop {
694            let (new_cursor, keys): (u64, Vec<String>) = cmd("SCAN")
695                .arg(cursor)
696                .arg("MATCH")
697                .arg(&pattern)
698                .arg("COUNT")
699                .arg(1000)
700                .query_async(&mut conn)
701                .await
702                .map_err(|e| StorageError::Backend(format!("SCAN failed: {}", e)))?;
703            
704            if !keys.is_empty() {
705                // Batch delete
706                let mut pipeline = pipe();
707                for key in &keys {
708                    pipeline.del(key);
709                }
710                pipeline.query_async::<()>(&mut conn)
711                    .await
712                    .map_err(|e| StorageError::Backend(format!("DEL failed: {}", e)))?;
713                
714                deleted += keys.len() as u64;
715            }
716            
717            cursor = new_cursor;
718            
719            if cursor == 0 {
720                break;
721            }
722        }
723        
724        Ok(deleted)
725    }
726}