Skip to main content

prax_query/data_cache/
redis.rs

1//! Redis cache backend for distributed caching.
2//!
3//! This module provides a Redis-based cache implementation with:
4//!
5//! - **Connection pooling** using bb8 or deadpool
6//! - **Cluster support** for horizontal scaling
7//! - **Pipelining** for batch operations
8//! - **Lua scripting** for atomic operations
9//! - **Pub/Sub** for cache invalidation across instances
10//!
11//! # Example
12//!
13//! ```rust,ignore
14//! use prax_query::data_cache::redis::{RedisCache, RedisCacheConfig};
15//!
16//! let cache = RedisCache::new(RedisCacheConfig {
17//!     url: "redis://localhost:6379".to_string(),
18//!     pool_size: 10,
19//!     ..Default::default()
20//! }).await?;
21//! ```
22
23use std::time::Duration;
24
25use super::backend::{BackendStats, CacheBackend, CacheError, CacheResult};
26use super::invalidation::EntityTag;
27use super::key::{CacheKey, KeyPattern};
28
29/// Configuration for Redis cache.
30#[derive(Debug, Clone)]
31pub struct RedisCacheConfig {
32    /// Redis connection URL.
33    pub url: String,
34    /// Connection pool size.
35    pub pool_size: u32,
36    /// Connection timeout.
37    pub connection_timeout: Duration,
38    /// Command timeout.
39    pub command_timeout: Duration,
40    /// Key prefix for all entries.
41    pub key_prefix: String,
42    /// Default TTL.
43    pub default_ttl: Option<Duration>,
44    /// Enable cluster mode.
45    pub cluster_mode: bool,
46    /// Database number (0-15).
47    pub database: u8,
48    /// Enable TLS.
49    pub tls: bool,
50    /// Username for AUTH.
51    pub username: Option<String>,
52    /// Password for AUTH.
53    pub password: Option<String>,
54}
55
56impl Default for RedisCacheConfig {
57    fn default() -> Self {
58        Self {
59            url: "redis://localhost:6379".to_string(),
60            pool_size: 10,
61            connection_timeout: Duration::from_secs(5),
62            command_timeout: Duration::from_secs(2),
63            key_prefix: "prax:cache".to_string(),
64            default_ttl: Some(Duration::from_secs(300)),
65            cluster_mode: false,
66            database: 0,
67            tls: false,
68            username: None,
69            password: None,
70        }
71    }
72}
73
74impl RedisCacheConfig {
75    /// Create a new config with the given URL.
76    pub fn new(url: impl Into<String>) -> Self {
77        Self {
78            url: url.into(),
79            ..Default::default()
80        }
81    }
82
83    /// Set pool size.
84    pub fn with_pool_size(mut self, size: u32) -> Self {
85        self.pool_size = size;
86        self
87    }
88
89    /// Set key prefix.
90    pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
91        self.key_prefix = prefix.into();
92        self
93    }
94
95    /// Set default TTL.
96    pub fn with_ttl(mut self, ttl: Duration) -> Self {
97        self.default_ttl = Some(ttl);
98        self
99    }
100
101    /// Enable cluster mode.
102    pub fn cluster(mut self) -> Self {
103        self.cluster_mode = true;
104        self
105    }
106
107    /// Set database number.
108    pub fn database(mut self, db: u8) -> Self {
109        self.database = db;
110        self
111    }
112
113    /// Set authentication.
114    pub fn auth(mut self, username: Option<String>, password: impl Into<String>) -> Self {
115        self.username = username;
116        self.password = Some(password.into());
117        self
118    }
119
120    /// Build the full key with prefix.
121    fn full_key(&self, key: &CacheKey) -> String {
122        format!("{}:{}", self.key_prefix, key.as_str())
123    }
124}
125
126/// Represents a Redis connection (placeholder for actual implementation).
127///
128/// In a real implementation, this would use `redis-rs` or `fred` crate.
129#[derive(Clone)]
130pub struct RedisConnection {
131    config: RedisCacheConfig,
132    // In real impl: pool: Pool<RedisConnectionManager>
133}
134
135impl RedisConnection {
136    /// Create a new connection.
137    pub async fn new(config: RedisCacheConfig) -> CacheResult<Self> {
138        // In real implementation:
139        // - Create connection pool using bb8 or deadpool
140        // - Establish initial connections
141        // - Verify connectivity
142
143        Ok(Self { config })
144    }
145
146    /// Get the config.
147    pub fn config(&self) -> &RedisCacheConfig {
148        &self.config
149    }
150
151    /// Execute a Redis command (placeholder).
152    async fn execute<T>(&self, _cmd: &str, _args: &[&str]) -> CacheResult<T>
153    where
154        T: Default,
155    {
156        // Placeholder - real impl would use redis-rs
157        // Example with redis-rs:
158        // let mut conn = self.pool.get().await?;
159        // redis::cmd(cmd).arg(args).query_async(&mut *conn).await
160        Ok(T::default())
161    }
162
163    /// GET command.
164    pub async fn get(&self, key: &str) -> CacheResult<Option<Vec<u8>>> {
165        // Placeholder
166        let _ = key;
167        Ok(None)
168    }
169
170    /// SET command with optional TTL.
171    pub async fn set(&self, key: &str, value: &[u8], ttl: Option<Duration>) -> CacheResult<()> {
172        // Placeholder
173        let _ = (key, value, ttl);
174        Ok(())
175    }
176
177    /// DEL command.
178    pub async fn del(&self, key: &str) -> CacheResult<bool> {
179        // Placeholder
180        let _ = key;
181        Ok(false)
182    }
183
184    /// EXISTS command.
185    pub async fn exists(&self, key: &str) -> CacheResult<bool> {
186        // Placeholder
187        let _ = key;
188        Ok(false)
189    }
190
191    /// KEYS command (use SCAN in production).
192    pub async fn keys(&self, pattern: &str) -> CacheResult<Vec<String>> {
193        // Placeholder - use SCAN in production for large datasets
194        let _ = pattern;
195        Ok(Vec::new())
196    }
197
198    /// MGET command.
199    pub async fn mget(&self, keys: &[String]) -> CacheResult<Vec<Option<Vec<u8>>>> {
200        // Placeholder
201        Ok(vec![None; keys.len()])
202    }
203
204    /// MSET command.
205    pub async fn mset(&self, pairs: &[(String, Vec<u8>)]) -> CacheResult<()> {
206        // Placeholder
207        let _ = pairs;
208        Ok(())
209    }
210
211    /// FLUSHDB command.
212    pub async fn flush(&self) -> CacheResult<()> {
213        // Placeholder
214        Ok(())
215    }
216
217    /// DBSIZE command.
218    pub async fn dbsize(&self) -> CacheResult<usize> {
219        // Placeholder
220        Ok(0)
221    }
222
223    /// INFO command.
224    pub async fn info(&self) -> CacheResult<String> {
225        // Placeholder
226        Ok(String::new())
227    }
228
229    /// SCAN for pattern matching.
230    pub async fn scan(&self, pattern: &str, count: usize) -> CacheResult<Vec<String>> {
231        // Placeholder - in real impl, iterate through all matches
232        let _ = (pattern, count);
233        Ok(Vec::new())
234    }
235
236    /// Pipeline multiple commands.
237    pub fn pipeline(&self) -> RedisPipeline {
238        RedisPipeline::new(self.clone())
239    }
240}
241
242/// A Redis pipeline for batching commands.
243pub struct RedisPipeline {
244    conn: RedisConnection,
245    commands: Vec<PipelineCommand>,
246}
247
248enum PipelineCommand {
249    Get(String),
250    Set(String, Vec<u8>, Option<Duration>),
251    Del(String),
252}
253
254impl RedisPipeline {
255    fn new(conn: RedisConnection) -> Self {
256        Self {
257            conn,
258            commands: Vec::new(),
259        }
260    }
261
262    /// Add a GET command.
263    pub fn get(mut self, key: impl Into<String>) -> Self {
264        self.commands.push(PipelineCommand::Get(key.into()));
265        self
266    }
267
268    /// Add a SET command.
269    pub fn set(mut self, key: impl Into<String>, value: Vec<u8>, ttl: Option<Duration>) -> Self {
270        self.commands
271            .push(PipelineCommand::Set(key.into(), value, ttl));
272        self
273    }
274
275    /// Add a DEL command.
276    pub fn del(mut self, key: impl Into<String>) -> Self {
277        self.commands.push(PipelineCommand::Del(key.into()));
278        self
279    }
280
281    /// Execute the pipeline.
282    pub async fn execute(self) -> CacheResult<Vec<PipelineResult>> {
283        // Placeholder - real impl would batch execute
284        Ok(vec![PipelineResult::Ok; self.commands.len()])
285    }
286}
287
288/// Result of a pipeline command.
289#[derive(Debug, Clone)]
290pub enum PipelineResult {
291    Ok,
292    Value(Option<Vec<u8>>),
293    Error(String),
294}
295
296/// Redis cache backend.
297#[derive(Clone)]
298pub struct RedisCache {
299    conn: RedisConnection,
300    config: RedisCacheConfig,
301}
302
303impl RedisCache {
304    /// Create a new Redis cache.
305    pub async fn new(config: RedisCacheConfig) -> CacheResult<Self> {
306        let conn = RedisConnection::new(config.clone()).await?;
307        Ok(Self { conn, config })
308    }
309
310    /// Create from a URL.
311    pub async fn from_url(url: &str) -> CacheResult<Self> {
312        Self::new(RedisCacheConfig::new(url)).await
313    }
314
315    /// Get the connection.
316    pub fn connection(&self) -> &RedisConnection {
317        &self.conn
318    }
319
320    /// Get the config.
321    pub fn config(&self) -> &RedisCacheConfig {
322        &self.config
323    }
324
325    /// Build the full key with prefix.
326    fn full_key(&self, key: &CacheKey) -> String {
327        self.config.full_key(key)
328    }
329}
330
331impl CacheBackend for RedisCache {
332    async fn get<T>(&self, key: &CacheKey) -> CacheResult<Option<T>>
333    where
334        T: serde::de::DeserializeOwned,
335    {
336        let full_key = self.full_key(key);
337
338        match self.conn.get(&full_key).await? {
339            Some(data) => {
340                let value: T = serde_json::from_slice(&data)
341                    .map_err(|e| CacheError::Deserialization(e.to_string()))?;
342                Ok(Some(value))
343            }
344            None => Ok(None),
345        }
346    }
347
348    async fn set<T>(&self, key: &CacheKey, value: &T, ttl: Option<Duration>) -> CacheResult<()>
349    where
350        T: serde::Serialize + Sync,
351    {
352        let full_key = self.full_key(key);
353        let data =
354            serde_json::to_vec(value).map_err(|e| CacheError::Serialization(e.to_string()))?;
355
356        let effective_ttl = ttl.or(self.config.default_ttl);
357        self.conn.set(&full_key, &data, effective_ttl).await
358    }
359
360    async fn delete(&self, key: &CacheKey) -> CacheResult<bool> {
361        let full_key = self.full_key(key);
362        self.conn.del(&full_key).await
363    }
364
365    async fn exists(&self, key: &CacheKey) -> CacheResult<bool> {
366        let full_key = self.full_key(key);
367        self.conn.exists(&full_key).await
368    }
369
370    async fn get_many<T>(&self, keys: &[CacheKey]) -> CacheResult<Vec<Option<T>>>
371    where
372        T: serde::de::DeserializeOwned,
373    {
374        let full_keys: Vec<String> = keys.iter().map(|k| self.full_key(k)).collect();
375        let results = self.conn.mget(&full_keys).await?;
376
377        results
378            .into_iter()
379            .map(|opt| {
380                opt.map(|data| {
381                    serde_json::from_slice(&data)
382                        .map_err(|e| CacheError::Deserialization(e.to_string()))
383                })
384                .transpose()
385            })
386            .collect()
387    }
388
389    async fn invalidate_pattern(&self, pattern: &KeyPattern) -> CacheResult<u64> {
390        let full_pattern = format!("{}:{}", self.config.key_prefix, pattern.to_redis_pattern());
391
392        // Use SCAN to find matching keys
393        let keys = self.conn.scan(&full_pattern, 1000).await?;
394
395        if keys.is_empty() {
396            return Ok(0);
397        }
398
399        // Delete in batches
400        let mut deleted = 0u64;
401        for key in keys {
402            if self.conn.del(&key).await? {
403                deleted += 1;
404            }
405        }
406
407        Ok(deleted)
408    }
409
410    async fn invalidate_tags(&self, tags: &[EntityTag]) -> CacheResult<u64> {
411        // Tags stored as sets: tag:<tag_value> -> [key1, key2, ...]
412        let mut total = 0u64;
413
414        for tag in tags {
415            let tag_key = format!("{}:tag:{}", self.config.key_prefix, tag.value());
416            // In real impl: SMEMBERS to get keys, then DEL
417            let _ = tag_key;
418            total += 0; // Placeholder
419        }
420
421        Ok(total)
422    }
423
424    async fn clear(&self) -> CacheResult<()> {
425        // In production, use SCAN + DEL with prefix
426        // FLUSHDB would clear everything
427        self.conn.flush().await
428    }
429
430    async fn len(&self) -> CacheResult<usize> {
431        self.conn.dbsize().await
432    }
433
434    async fn stats(&self) -> CacheResult<BackendStats> {
435        let info = self.conn.info().await?;
436        let entries = self.conn.dbsize().await?;
437
438        Ok(BackendStats {
439            entries,
440            memory_bytes: None, // Parse from INFO
441            connections: Some(self.config.pool_size as usize),
442            info: Some(info),
443        })
444    }
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450
451    #[test]
452    fn test_redis_config() {
453        let config = RedisCacheConfig::new("redis://localhost:6379")
454            .with_pool_size(20)
455            .with_prefix("myapp")
456            .with_ttl(Duration::from_secs(600));
457
458        assert_eq!(config.pool_size, 20);
459        assert_eq!(config.key_prefix, "myapp");
460        assert_eq!(config.default_ttl, Some(Duration::from_secs(600)));
461    }
462
463    #[test]
464    fn test_full_key() {
465        let config = RedisCacheConfig::new("redis://localhost").with_prefix("app:cache");
466
467        let key = CacheKey::new("User", "id:123");
468        let full = config.full_key(&key);
469
470        assert_eq!(full, "app:cache:prax:User:id:123");
471    }
472
473    #[tokio::test]
474    async fn test_redis_cache_creation() {
475        // This test just verifies the API works
476        // Real tests would need a Redis instance
477        let config = RedisCacheConfig::default();
478        let cache = RedisCache::new(config).await.unwrap();
479
480        assert_eq!(cache.config().pool_size, 10);
481    }
482}