warpdrive_proxy/cache/
redis.rs

1//! Redis-based distributed cache implementation
2//!
3//! This module provides a high-performance Redis cache backend with:
4//! - Connection pooling via ConnectionManager
5//! - Async/await with tokio
6//! - Key prefixing for namespace isolation
7//! - Batch operations for efficiency
8//! - Cluster support (when enabled via Cargo features)
9//!
10//! # Performance Characteristics
11//!
12//! - Connection reuse: ConnectionManager handles pooling internally
13//! - Zero-copy when possible: Uses bytes for efficient data transfer
14//! - Pipelined batch operations: Reduces round trips
15//! - Minimal allocations in hot paths
16//!
17//! # Example
18//!
19//! ```no_run
20//! use warpdrive::cache::redis::RedisCache;
21//!
22//! # async fn example() -> anyhow::Result<()> {
23//! let cache = RedisCache::from_url(
24//!     "redis://localhost:6379",
25//!     "warpdrive:cache:".to_string()
26//! ).await?;
27//!
28//! // Single operations
29//! cache.set("user:123", b"user_data", 60).await?;
30//! let data = cache.get("user:123").await?;
31//! cache.delete("user:123").await?;
32//!
33//! // Batch operations
34//! let values = cache.batch_get(&[
35//!     "user:123".to_string(),
36//!     "user:456".to_string()
37//! ]).await?;
38//!
39//! // Clear by pattern
40//! let deleted = cache.clear_prefix("user:*").await?;
41//! println!("Deleted {} keys", deleted);
42//! # Ok(())
43//! # }
44//! ```
45
46use async_trait::async_trait;
47use redis::AsyncCommands;
48use redis::aio::ConnectionManager;
49
50use crate::cache::Cache;
51
52/// Redis cache implementation with connection pooling
53///
54/// This struct wraps a Redis ConnectionManager and provides cache operations
55/// with automatic key prefixing for namespace isolation.
56///
57/// # Connection Management
58///
59/// Uses `ConnectionManager` which provides:
60/// - Automatic reconnection on connection loss
61/// - Connection health checks
62/// - Async operation support
63/// - Internal connection pooling
64///
65/// # Key Prefixing
66///
67/// All keys are automatically prefixed to avoid collisions:
68/// - User provides: `"user:123"`
69/// - Stored as: `"warpdrive:cache:user:123"` (if prefix is `"warpdrive:cache:"`)
70#[derive(Clone)]
71pub struct RedisCache {
72    /// Connection manager for async Redis operations
73    client: ConnectionManager,
74
75    /// Key prefix for namespace isolation
76    prefix: String,
77}
78
79impl RedisCache {
80    /// Create a new Redis cache from connection URL
81    ///
82    /// # Parameters
83    ///
84    /// - `url`: Redis connection URL (e.g., `"redis://localhost:6379"`)
85    /// - `prefix`: Key prefix for namespace isolation (e.g., `"warpdrive:cache:"`)
86    ///
87    /// # Errors
88    ///
89    /// Returns an error if:
90    /// - URL is invalid
91    /// - Cannot connect to Redis server
92    /// - Authentication fails
93    ///
94    /// # Example
95    ///
96    /// ```no_run
97    /// # use warpdrive::cache::redis::RedisCache;
98    /// # async fn example() -> anyhow::Result<()> {
99    /// let cache = RedisCache::from_url(
100    ///     "redis://localhost:6379/0",
101    ///     "myapp:".to_string()
102    /// ).await?;
103    /// # Ok(())
104    /// # }
105    /// ```
106    pub async fn from_url(url: &str, prefix: String) -> anyhow::Result<Self> {
107        let client = redis::Client::open(url)?;
108        let connection_manager = ConnectionManager::new(client).await?;
109
110        Ok(RedisCache {
111            client: connection_manager,
112            prefix,
113        })
114    }
115
116    /// Build the full key with prefix
117    ///
118    /// Internal helper to construct namespaced keys.
119    #[inline]
120    fn prefixed_key(&self, key: &str) -> String {
121        format!("{}{}", self.prefix, key)
122    }
123
124    /// Batch get multiple keys
125    ///
126    /// Retrieves multiple values in a single round trip using Redis pipelining.
127    /// This is significantly more efficient than multiple individual GET operations.
128    ///
129    /// # Returns
130    ///
131    /// A vector of `Option<Vec<u8>>` corresponding to the input keys.
132    /// `None` indicates the key was not found or expired.
133    ///
134    /// # Example
135    ///
136    /// ```no_run
137    /// # use warpdrive::cache::redis::RedisCache;
138    /// # async fn example(cache: &RedisCache) -> anyhow::Result<()> {
139    /// let keys = vec!["user:1".to_string(), "user:2".to_string()];
140    /// let values = cache.batch_get(&keys).await?;
141    ///
142    /// for (key, value) in keys.iter().zip(values.iter()) {
143    ///     match value {
144    ///         Some(data) => println!("{}: {} bytes", key, data.len()),
145    ///         None => println!("{}: not found", key),
146    ///     }
147    /// }
148    /// # Ok(())
149    /// # }
150    /// ```
151    pub async fn batch_get(&self, keys: &[String]) -> anyhow::Result<Vec<Option<Vec<u8>>>> {
152        if keys.is_empty() {
153            return Ok(vec![]);
154        }
155
156        let mut conn = self.client.clone();
157
158        // Build prefixed keys
159        let prefixed_keys: Vec<String> = keys.iter().map(|k| self.prefixed_key(k)).collect();
160
161        // Use MGET for batch retrieval - need to build the command manually
162        let mut cmd = redis::cmd("MGET");
163        for key in &prefixed_keys {
164            cmd.arg(key);
165        }
166
167        let values: Vec<Option<Vec<u8>>> = cmd.query_async(&mut conn).await?;
168
169        Ok(values)
170    }
171
172    /// Batch delete multiple keys
173    ///
174    /// Deletes multiple keys in a single round trip using Redis DEL.
175    /// Returns `Ok(())` regardless of whether keys existed.
176    ///
177    /// # Example
178    ///
179    /// ```no_run
180    /// # use warpdrive::cache::redis::RedisCache;
181    /// # async fn example(cache: &RedisCache) -> anyhow::Result<()> {
182    /// let keys = vec!["user:1".to_string(), "user:2".to_string()];
183    /// cache.batch_delete(&keys).await?;
184    /// # Ok(())
185    /// # }
186    /// ```
187    pub async fn batch_delete(&self, keys: &[String]) -> anyhow::Result<()> {
188        if keys.is_empty() {
189            return Ok(());
190        }
191
192        let mut conn = self.client.clone();
193
194        // Build prefixed keys
195        let prefixed_keys: Vec<String> = keys.iter().map(|k| self.prefixed_key(k)).collect();
196
197        // Use DEL for batch deletion
198        let _: () = conn.del(&prefixed_keys).await?;
199
200        Ok(())
201    }
202
203    /// Clear all keys matching a pattern
204    ///
205    /// Deletes all keys matching the given pattern (after prefixing).
206    /// Uses SCAN + DEL for safe iteration without blocking Redis.
207    ///
208    /// # Warning
209    ///
210    /// This operation scans the entire keyspace and can be slow on large datasets.
211    /// Use sparingly and consider the impact on Redis performance.
212    ///
213    /// # Parameters
214    ///
215    /// - `pattern`: Glob-style pattern (e.g., `"user:*"`, `"session:*"`)
216    ///
217    /// # Returns
218    ///
219    /// The number of keys deleted.
220    ///
221    /// # Example
222    ///
223    /// ```no_run
224    /// # use warpdrive::cache::redis::RedisCache;
225    /// # async fn example(cache: &RedisCache) -> anyhow::Result<()> {
226    /// // Delete all user cache entries
227    /// let count = cache.clear_prefix("user:*").await?;
228    /// println!("Cleared {} user entries", count);
229    /// # Ok(())
230    /// # }
231    /// ```
232    pub async fn clear_prefix(&self, pattern: &str) -> anyhow::Result<usize> {
233        let mut conn = self.client.clone();
234        let full_pattern = self.prefixed_key(pattern);
235
236        // Use SCAN to iterate over matching keys without blocking
237        let mut cursor = 0;
238        let mut total_deleted = 0;
239
240        loop {
241            // SCAN returns (cursor, keys)
242            let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
243                .arg(cursor)
244                .arg("MATCH")
245                .arg(&full_pattern)
246                .arg("COUNT")
247                .arg(100) // Scan 100 keys per iteration
248                .query_async(&mut conn)
249                .await?;
250
251            if !keys.is_empty() {
252                let deleted: usize = conn.del(&keys).await?;
253                total_deleted += deleted;
254            }
255
256            cursor = new_cursor;
257
258            // Cursor of 0 means we've completed the scan
259            if cursor == 0 {
260                break;
261            }
262        }
263
264        Ok(total_deleted)
265    }
266
267    /// Get connection manager for advanced operations
268    ///
269    /// Provides access to the underlying ConnectionManager for operations
270    /// not covered by the Cache trait (e.g., INCR, ZADD, etc.).
271    ///
272    /// # Example
273    ///
274    /// ```no_run
275    /// # use warpdrive::cache::redis::RedisCache;
276    /// # use redis::AsyncCommands;
277    /// # async fn example(cache: &RedisCache) -> anyhow::Result<()> {
278    /// let mut conn = cache.connection();
279    /// let count: i64 = conn.incr("page_views", 1).await?;
280    /// # Ok(())
281    /// # }
282    /// ```
283    pub fn connection(&self) -> ConnectionManager {
284        self.client.clone()
285    }
286}
287
288#[async_trait]
289impl Cache for RedisCache {
290    async fn get(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
291        let mut conn = self.client.clone();
292        let full_key = self.prefixed_key(key);
293
294        // GET returns Option<Vec<u8>>
295        let value: Option<Vec<u8>> = conn.get(&full_key).await?;
296
297        Ok(value)
298    }
299
300    async fn set(&self, key: &str, value: &[u8], ttl_seconds: u64) -> anyhow::Result<()> {
301        let mut conn = self.client.clone();
302        let full_key = self.prefixed_key(key);
303
304        if ttl_seconds > 0 {
305            // SETEX: Set with expiration
306            let _: () = conn.set_ex(&full_key, value, ttl_seconds).await?;
307        } else {
308            // SET: No expiration
309            let _: () = conn.set(&full_key, value).await?;
310        }
311
312        Ok(())
313    }
314
315    async fn delete(&self, key: &str) -> anyhow::Result<()> {
316        let mut conn = self.client.clone();
317        let full_key = self.prefixed_key(key);
318
319        // DEL returns number of keys deleted, we ignore it
320        let _: () = conn.del(&full_key).await?;
321
322        Ok(())
323    }
324
325    async fn exists(&self, key: &str) -> anyhow::Result<bool> {
326        let mut conn = self.client.clone();
327        let full_key = self.prefixed_key(key);
328
329        // EXISTS returns 1 if key exists, 0 otherwise
330        let exists: bool = conn.exists(&full_key).await?;
331
332        Ok(exists)
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    // Helper to test key prefixing without real Redis connection
339    fn mock_redis_cache(prefix: String) -> impl Fn(&str) -> String {
340        move |key: &str| format!("{}{}", prefix, key)
341    }
342
343    #[test]
344    fn test_prefixed_key() {
345        let prefix = "warpdrive:cache:".to_string();
346        let prefixer = mock_redis_cache(prefix);
347
348        assert_eq!(prefixer("user:123"), "warpdrive:cache:user:123");
349        assert_eq!(prefixer("session:abc"), "warpdrive:cache:session:abc");
350    }
351
352    #[test]
353    fn test_prefixed_key_empty_prefix() {
354        let prefix = "".to_string();
355        let prefixer = mock_redis_cache(prefix);
356
357        assert_eq!(prefixer("user:123"), "user:123");
358    }
359}