warpdrive_proxy/cache/redis.rs
1//! Redis-based distributed cache implementation
2//!
3//! This module provides a high-performance Redis cache backend with:
4//! - Connection pooling via ConnectionManager
5//! - Async/await with tokio
6//! - Key prefixing for namespace isolation
7//! - Batch operations for efficiency
8//! - Cluster support (when enabled via Cargo features)
9//!
10//! # Performance Characteristics
11//!
12//! - Connection reuse: ConnectionManager handles pooling internally
13//! - Zero-copy when possible: Uses bytes for efficient data transfer
14//! - Pipelined batch operations: Reduces round trips
15//! - Minimal allocations in hot paths
16//!
17//! # Example
18//!
19//! ```no_run
20//! use warpdrive::cache::redis::RedisCache;
21//!
22//! # async fn example() -> anyhow::Result<()> {
23//! let cache = RedisCache::from_url(
24//! "redis://localhost:6379",
25//! "warpdrive:cache:".to_string()
26//! ).await?;
27//!
28//! // Single operations
29//! cache.set("user:123", b"user_data", 60).await?;
30//! let data = cache.get("user:123").await?;
31//! cache.delete("user:123").await?;
32//!
33//! // Batch operations
34//! let values = cache.batch_get(&[
35//! "user:123".to_string(),
36//! "user:456".to_string()
37//! ]).await?;
38//!
39//! // Clear by pattern
40//! let deleted = cache.clear_prefix("user:*").await?;
41//! println!("Deleted {} keys", deleted);
42//! # Ok(())
43//! # }
44//! ```
45
46use async_trait::async_trait;
47use redis::AsyncCommands;
48use redis::aio::ConnectionManager;
49
50use crate::cache::Cache;
51
52/// Redis cache implementation with connection pooling
53///
54/// This struct wraps a Redis ConnectionManager and provides cache operations
55/// with automatic key prefixing for namespace isolation.
56///
57/// # Connection Management
58///
59/// Uses `ConnectionManager` which provides:
60/// - Automatic reconnection on connection loss
61/// - Connection health checks
62/// - Async operation support
63/// - Internal connection pooling
64///
65/// # Key Prefixing
66///
67/// All keys are automatically prefixed to avoid collisions:
68/// - User provides: `"user:123"`
69/// - Stored as: `"warpdrive:cache:user:123"` (if prefix is `"warpdrive:cache:"`)
70#[derive(Clone)]
71pub struct RedisCache {
72 /// Connection manager for async Redis operations
73 client: ConnectionManager,
74
75 /// Key prefix for namespace isolation
76 prefix: String,
77}
78
79impl RedisCache {
80 /// Create a new Redis cache from connection URL
81 ///
82 /// # Parameters
83 ///
84 /// - `url`: Redis connection URL (e.g., `"redis://localhost:6379"`)
85 /// - `prefix`: Key prefix for namespace isolation (e.g., `"warpdrive:cache:"`)
86 ///
87 /// # Errors
88 ///
89 /// Returns an error if:
90 /// - URL is invalid
91 /// - Cannot connect to Redis server
92 /// - Authentication fails
93 ///
94 /// # Example
95 ///
96 /// ```no_run
97 /// # use warpdrive::cache::redis::RedisCache;
98 /// # async fn example() -> anyhow::Result<()> {
99 /// let cache = RedisCache::from_url(
100 /// "redis://localhost:6379/0",
101 /// "myapp:".to_string()
102 /// ).await?;
103 /// # Ok(())
104 /// # }
105 /// ```
106 pub async fn from_url(url: &str, prefix: String) -> anyhow::Result<Self> {
107 let client = redis::Client::open(url)?;
108 let connection_manager = ConnectionManager::new(client).await?;
109
110 Ok(RedisCache {
111 client: connection_manager,
112 prefix,
113 })
114 }
115
116 /// Build the full key with prefix
117 ///
118 /// Internal helper to construct namespaced keys.
119 #[inline]
120 fn prefixed_key(&self, key: &str) -> String {
121 format!("{}{}", self.prefix, key)
122 }
123
124 /// Batch get multiple keys
125 ///
126 /// Retrieves multiple values in a single round trip using Redis pipelining.
127 /// This is significantly more efficient than multiple individual GET operations.
128 ///
129 /// # Returns
130 ///
131 /// A vector of `Option<Vec<u8>>` corresponding to the input keys.
132 /// `None` indicates the key was not found or expired.
133 ///
134 /// # Example
135 ///
136 /// ```no_run
137 /// # use warpdrive::cache::redis::RedisCache;
138 /// # async fn example(cache: &RedisCache) -> anyhow::Result<()> {
139 /// let keys = vec!["user:1".to_string(), "user:2".to_string()];
140 /// let values = cache.batch_get(&keys).await?;
141 ///
142 /// for (key, value) in keys.iter().zip(values.iter()) {
143 /// match value {
144 /// Some(data) => println!("{}: {} bytes", key, data.len()),
145 /// None => println!("{}: not found", key),
146 /// }
147 /// }
148 /// # Ok(())
149 /// # }
150 /// ```
151 pub async fn batch_get(&self, keys: &[String]) -> anyhow::Result<Vec<Option<Vec<u8>>>> {
152 if keys.is_empty() {
153 return Ok(vec![]);
154 }
155
156 let mut conn = self.client.clone();
157
158 // Build prefixed keys
159 let prefixed_keys: Vec<String> = keys.iter().map(|k| self.prefixed_key(k)).collect();
160
161 // Use MGET for batch retrieval - need to build the command manually
162 let mut cmd = redis::cmd("MGET");
163 for key in &prefixed_keys {
164 cmd.arg(key);
165 }
166
167 let values: Vec<Option<Vec<u8>>> = cmd.query_async(&mut conn).await?;
168
169 Ok(values)
170 }
171
172 /// Batch delete multiple keys
173 ///
174 /// Deletes multiple keys in a single round trip using Redis DEL.
175 /// Returns `Ok(())` regardless of whether keys existed.
176 ///
177 /// # Example
178 ///
179 /// ```no_run
180 /// # use warpdrive::cache::redis::RedisCache;
181 /// # async fn example(cache: &RedisCache) -> anyhow::Result<()> {
182 /// let keys = vec!["user:1".to_string(), "user:2".to_string()];
183 /// cache.batch_delete(&keys).await?;
184 /// # Ok(())
185 /// # }
186 /// ```
187 pub async fn batch_delete(&self, keys: &[String]) -> anyhow::Result<()> {
188 if keys.is_empty() {
189 return Ok(());
190 }
191
192 let mut conn = self.client.clone();
193
194 // Build prefixed keys
195 let prefixed_keys: Vec<String> = keys.iter().map(|k| self.prefixed_key(k)).collect();
196
197 // Use DEL for batch deletion
198 let _: () = conn.del(&prefixed_keys).await?;
199
200 Ok(())
201 }
202
203 /// Clear all keys matching a pattern
204 ///
205 /// Deletes all keys matching the given pattern (after prefixing).
206 /// Uses SCAN + DEL for safe iteration without blocking Redis.
207 ///
208 /// # Warning
209 ///
210 /// This operation scans the entire keyspace and can be slow on large datasets.
211 /// Use sparingly and consider the impact on Redis performance.
212 ///
213 /// # Parameters
214 ///
215 /// - `pattern`: Glob-style pattern (e.g., `"user:*"`, `"session:*"`)
216 ///
217 /// # Returns
218 ///
219 /// The number of keys deleted.
220 ///
221 /// # Example
222 ///
223 /// ```no_run
224 /// # use warpdrive::cache::redis::RedisCache;
225 /// # async fn example(cache: &RedisCache) -> anyhow::Result<()> {
226 /// // Delete all user cache entries
227 /// let count = cache.clear_prefix("user:*").await?;
228 /// println!("Cleared {} user entries", count);
229 /// # Ok(())
230 /// # }
231 /// ```
232 pub async fn clear_prefix(&self, pattern: &str) -> anyhow::Result<usize> {
233 let mut conn = self.client.clone();
234 let full_pattern = self.prefixed_key(pattern);
235
236 // Use SCAN to iterate over matching keys without blocking
237 let mut cursor = 0;
238 let mut total_deleted = 0;
239
240 loop {
241 // SCAN returns (cursor, keys)
242 let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
243 .arg(cursor)
244 .arg("MATCH")
245 .arg(&full_pattern)
246 .arg("COUNT")
247 .arg(100) // Scan 100 keys per iteration
248 .query_async(&mut conn)
249 .await?;
250
251 if !keys.is_empty() {
252 let deleted: usize = conn.del(&keys).await?;
253 total_deleted += deleted;
254 }
255
256 cursor = new_cursor;
257
258 // Cursor of 0 means we've completed the scan
259 if cursor == 0 {
260 break;
261 }
262 }
263
264 Ok(total_deleted)
265 }
266
267 /// Get connection manager for advanced operations
268 ///
269 /// Provides access to the underlying ConnectionManager for operations
270 /// not covered by the Cache trait (e.g., INCR, ZADD, etc.).
271 ///
272 /// # Example
273 ///
274 /// ```no_run
275 /// # use warpdrive::cache::redis::RedisCache;
276 /// # use redis::AsyncCommands;
277 /// # async fn example(cache: &RedisCache) -> anyhow::Result<()> {
278 /// let mut conn = cache.connection();
279 /// let count: i64 = conn.incr("page_views", 1).await?;
280 /// # Ok(())
281 /// # }
282 /// ```
283 pub fn connection(&self) -> ConnectionManager {
284 self.client.clone()
285 }
286}
287
288#[async_trait]
289impl Cache for RedisCache {
290 async fn get(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
291 let mut conn = self.client.clone();
292 let full_key = self.prefixed_key(key);
293
294 // GET returns Option<Vec<u8>>
295 let value: Option<Vec<u8>> = conn.get(&full_key).await?;
296
297 Ok(value)
298 }
299
300 async fn set(&self, key: &str, value: &[u8], ttl_seconds: u64) -> anyhow::Result<()> {
301 let mut conn = self.client.clone();
302 let full_key = self.prefixed_key(key);
303
304 if ttl_seconds > 0 {
305 // SETEX: Set with expiration
306 let _: () = conn.set_ex(&full_key, value, ttl_seconds).await?;
307 } else {
308 // SET: No expiration
309 let _: () = conn.set(&full_key, value).await?;
310 }
311
312 Ok(())
313 }
314
315 async fn delete(&self, key: &str) -> anyhow::Result<()> {
316 let mut conn = self.client.clone();
317 let full_key = self.prefixed_key(key);
318
319 // DEL returns number of keys deleted, we ignore it
320 let _: () = conn.del(&full_key).await?;
321
322 Ok(())
323 }
324
325 async fn exists(&self, key: &str) -> anyhow::Result<bool> {
326 let mut conn = self.client.clone();
327 let full_key = self.prefixed_key(key);
328
329 // EXISTS returns 1 if key exists, 0 otherwise
330 let exists: bool = conn.exists(&full_key).await?;
331
332 Ok(exists)
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 // Helper to test key prefixing without real Redis connection
339 fn mock_redis_cache(prefix: String) -> impl Fn(&str) -> String {
340 move |key: &str| format!("{}{}", prefix, key)
341 }
342
343 #[test]
344 fn test_prefixed_key() {
345 let prefix = "warpdrive:cache:".to_string();
346 let prefixer = mock_redis_cache(prefix);
347
348 assert_eq!(prefixer("user:123"), "warpdrive:cache:user:123");
349 assert_eq!(prefixer("session:abc"), "warpdrive:cache:session:abc");
350 }
351
352 #[test]
353 fn test_prefixed_key_empty_prefix() {
354 let prefix = "".to_string();
355 let prefixer = mock_redis_cache(prefix);
356
357 assert_eq!(prefixer("user:123"), "user:123");
358 }
359}