prax_query/data_cache/
redis.rs

1//! Redis cache backend for distributed caching.
2//!
3//! This module provides a Redis-based cache implementation with:
4//!
5//! - **Connection pooling** using bb8 or deadpool
6//! - **Cluster support** for horizontal scaling
7//! - **Pipelining** for batch operations
8//! - **Lua scripting** for atomic operations
9//! - **Pub/Sub** for cache invalidation across instances
10//!
11//! # Example
12//!
13//! ```rust,ignore
14//! use prax_query::data_cache::redis::{RedisCache, RedisCacheConfig};
15//!
16//! let cache = RedisCache::new(RedisCacheConfig {
17//!     url: "redis://localhost:6379".to_string(),
18//!     pool_size: 10,
19//!     ..Default::default()
20//! }).await?;
21//! ```
22
23use std::time::Duration;
24
25use super::backend::{BackendStats, CacheBackend, CacheError, CacheResult};
26use super::invalidation::EntityTag;
27use super::key::{CacheKey, KeyPattern};
28
29/// Configuration for Redis cache.
30#[derive(Debug, Clone)]
31pub struct RedisCacheConfig {
32    /// Redis connection URL.
33    pub url: String,
34    /// Connection pool size.
35    pub pool_size: u32,
36    /// Connection timeout.
37    pub connection_timeout: Duration,
38    /// Command timeout.
39    pub command_timeout: Duration,
40    /// Key prefix for all entries.
41    pub key_prefix: String,
42    /// Default TTL.
43    pub default_ttl: Option<Duration>,
44    /// Enable cluster mode.
45    pub cluster_mode: bool,
46    /// Database number (0-15).
47    pub database: u8,
48    /// Enable TLS.
49    pub tls: bool,
50    /// Username for AUTH.
51    pub username: Option<String>,
52    /// Password for AUTH.
53    pub password: Option<String>,
54}
55
56impl Default for RedisCacheConfig {
57    fn default() -> Self {
58        Self {
59            url: "redis://localhost:6379".to_string(),
60            pool_size: 10,
61            connection_timeout: Duration::from_secs(5),
62            command_timeout: Duration::from_secs(2),
63            key_prefix: "prax:cache".to_string(),
64            default_ttl: Some(Duration::from_secs(300)),
65            cluster_mode: false,
66            database: 0,
67            tls: false,
68            username: None,
69            password: None,
70        }
71    }
72}
73
74impl RedisCacheConfig {
75    /// Create a new config with the given URL.
76    pub fn new(url: impl Into<String>) -> Self {
77        Self {
78            url: url.into(),
79            ..Default::default()
80        }
81    }
82
83    /// Set pool size.
84    pub fn with_pool_size(mut self, size: u32) -> Self {
85        self.pool_size = size;
86        self
87    }
88
89    /// Set key prefix.
90    pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
91        self.key_prefix = prefix.into();
92        self
93    }
94
95    /// Set default TTL.
96    pub fn with_ttl(mut self, ttl: Duration) -> Self {
97        self.default_ttl = Some(ttl);
98        self
99    }
100
101    /// Enable cluster mode.
102    pub fn cluster(mut self) -> Self {
103        self.cluster_mode = true;
104        self
105    }
106
107    /// Set database number.
108    pub fn database(mut self, db: u8) -> Self {
109        self.database = db;
110        self
111    }
112
113    /// Set authentication.
114    pub fn auth(mut self, username: Option<String>, password: impl Into<String>) -> Self {
115        self.username = username;
116        self.password = Some(password.into());
117        self
118    }
119
120    /// Build the full key with prefix.
121    fn full_key(&self, key: &CacheKey) -> String {
122        format!("{}:{}", self.key_prefix, key.as_str())
123    }
124}
125
126/// Represents a Redis connection (placeholder for actual implementation).
127///
128/// In a real implementation, this would use `redis-rs` or `fred` crate.
129#[derive(Clone)]
130pub struct RedisConnection {
131    config: RedisCacheConfig,
132    // In real impl: pool: Pool<RedisConnectionManager>
133}
134
135impl RedisConnection {
136    /// Create a new connection.
137    pub async fn new(config: RedisCacheConfig) -> CacheResult<Self> {
138        // In real implementation:
139        // - Create connection pool using bb8 or deadpool
140        // - Establish initial connections
141        // - Verify connectivity
142
143        Ok(Self { config })
144    }
145
146    /// Get the config.
147    pub fn config(&self) -> &RedisCacheConfig {
148        &self.config
149    }
150
151    /// Execute a Redis command (placeholder).
152    async fn execute<T>(&self, _cmd: &str, _args: &[&str]) -> CacheResult<T>
153    where
154        T: Default,
155    {
156        // Placeholder - real impl would use redis-rs
157        // Example with redis-rs:
158        // let mut conn = self.pool.get().await?;
159        // redis::cmd(cmd).arg(args).query_async(&mut *conn).await
160        Ok(T::default())
161    }
162
163    /// GET command.
164    pub async fn get(&self, key: &str) -> CacheResult<Option<Vec<u8>>> {
165        // Placeholder
166        let _ = key;
167        Ok(None)
168    }
169
170    /// SET command with optional TTL.
171    pub async fn set(&self, key: &str, value: &[u8], ttl: Option<Duration>) -> CacheResult<()> {
172        // Placeholder
173        let _ = (key, value, ttl);
174        Ok(())
175    }
176
177    /// DEL command.
178    pub async fn del(&self, key: &str) -> CacheResult<bool> {
179        // Placeholder
180        let _ = key;
181        Ok(false)
182    }
183
184    /// EXISTS command.
185    pub async fn exists(&self, key: &str) -> CacheResult<bool> {
186        // Placeholder
187        let _ = key;
188        Ok(false)
189    }
190
191    /// KEYS command (use SCAN in production).
192    pub async fn keys(&self, pattern: &str) -> CacheResult<Vec<String>> {
193        // Placeholder - use SCAN in production for large datasets
194        let _ = pattern;
195        Ok(Vec::new())
196    }
197
198    /// MGET command.
199    pub async fn mget(&self, keys: &[String]) -> CacheResult<Vec<Option<Vec<u8>>>> {
200        // Placeholder
201        Ok(vec![None; keys.len()])
202    }
203
204    /// MSET command.
205    pub async fn mset(&self, pairs: &[(String, Vec<u8>)]) -> CacheResult<()> {
206        // Placeholder
207        let _ = pairs;
208        Ok(())
209    }
210
211    /// FLUSHDB command.
212    pub async fn flush(&self) -> CacheResult<()> {
213        // Placeholder
214        Ok(())
215    }
216
217    /// DBSIZE command.
218    pub async fn dbsize(&self) -> CacheResult<usize> {
219        // Placeholder
220        Ok(0)
221    }
222
223    /// INFO command.
224    pub async fn info(&self) -> CacheResult<String> {
225        // Placeholder
226        Ok(String::new())
227    }
228
229    /// SCAN for pattern matching.
230    pub async fn scan(&self, pattern: &str, count: usize) -> CacheResult<Vec<String>> {
231        // Placeholder - in real impl, iterate through all matches
232        let _ = (pattern, count);
233        Ok(Vec::new())
234    }
235
236    /// Pipeline multiple commands.
237    pub fn pipeline(&self) -> RedisPipeline {
238        RedisPipeline::new(self.clone())
239    }
240}
241
242/// A Redis pipeline for batching commands.
243pub struct RedisPipeline {
244    conn: RedisConnection,
245    commands: Vec<PipelineCommand>,
246}
247
248enum PipelineCommand {
249    Get(String),
250    Set(String, Vec<u8>, Option<Duration>),
251    Del(String),
252}
253
254impl RedisPipeline {
255    fn new(conn: RedisConnection) -> Self {
256        Self {
257            conn,
258            commands: Vec::new(),
259        }
260    }
261
262    /// Add a GET command.
263    pub fn get(mut self, key: impl Into<String>) -> Self {
264        self.commands.push(PipelineCommand::Get(key.into()));
265        self
266    }
267
268    /// Add a SET command.
269    pub fn set(mut self, key: impl Into<String>, value: Vec<u8>, ttl: Option<Duration>) -> Self {
270        self.commands.push(PipelineCommand::Set(key.into(), value, ttl));
271        self
272    }
273
274    /// Add a DEL command.
275    pub fn del(mut self, key: impl Into<String>) -> Self {
276        self.commands.push(PipelineCommand::Del(key.into()));
277        self
278    }
279
280    /// Execute the pipeline.
281    pub async fn execute(self) -> CacheResult<Vec<PipelineResult>> {
282        // Placeholder - real impl would batch execute
283        Ok(vec![PipelineResult::Ok; self.commands.len()])
284    }
285}
286
287/// Result of a pipeline command.
288#[derive(Debug, Clone)]
289pub enum PipelineResult {
290    Ok,
291    Value(Option<Vec<u8>>),
292    Error(String),
293}
294
295/// Redis cache backend.
296#[derive(Clone)]
297pub struct RedisCache {
298    conn: RedisConnection,
299    config: RedisCacheConfig,
300}
301
302impl RedisCache {
303    /// Create a new Redis cache.
304    pub async fn new(config: RedisCacheConfig) -> CacheResult<Self> {
305        let conn = RedisConnection::new(config.clone()).await?;
306        Ok(Self { conn, config })
307    }
308
309    /// Create from a URL.
310    pub async fn from_url(url: &str) -> CacheResult<Self> {
311        Self::new(RedisCacheConfig::new(url)).await
312    }
313
314    /// Get the connection.
315    pub fn connection(&self) -> &RedisConnection {
316        &self.conn
317    }
318
319    /// Get the config.
320    pub fn config(&self) -> &RedisCacheConfig {
321        &self.config
322    }
323
324    /// Build the full key with prefix.
325    fn full_key(&self, key: &CacheKey) -> String {
326        self.config.full_key(key)
327    }
328}
329
330impl CacheBackend for RedisCache {
331    async fn get<T>(&self, key: &CacheKey) -> CacheResult<Option<T>>
332    where
333        T: serde::de::DeserializeOwned,
334    {
335        let full_key = self.full_key(key);
336
337        match self.conn.get(&full_key).await? {
338            Some(data) => {
339                let value: T = serde_json::from_slice(&data)
340                    .map_err(|e| CacheError::Deserialization(e.to_string()))?;
341                Ok(Some(value))
342            }
343            None => Ok(None),
344        }
345    }
346
347    async fn set<T>(
348        &self,
349        key: &CacheKey,
350        value: &T,
351        ttl: Option<Duration>,
352    ) -> CacheResult<()>
353    where
354        T: serde::Serialize + Sync,
355    {
356        let full_key = self.full_key(key);
357        let data = serde_json::to_vec(value)
358            .map_err(|e| CacheError::Serialization(e.to_string()))?;
359
360        let effective_ttl = ttl.or(self.config.default_ttl);
361        self.conn.set(&full_key, &data, effective_ttl).await
362    }
363
364    async fn delete(&self, key: &CacheKey) -> CacheResult<bool> {
365        let full_key = self.full_key(key);
366        self.conn.del(&full_key).await
367    }
368
369    async fn exists(&self, key: &CacheKey) -> CacheResult<bool> {
370        let full_key = self.full_key(key);
371        self.conn.exists(&full_key).await
372    }
373
374    async fn get_many<T>(&self, keys: &[CacheKey]) -> CacheResult<Vec<Option<T>>>
375    where
376        T: serde::de::DeserializeOwned,
377    {
378        let full_keys: Vec<String> = keys.iter().map(|k| self.full_key(k)).collect();
379        let results = self.conn.mget(&full_keys).await?;
380
381        results
382            .into_iter()
383            .map(|opt| {
384                opt.map(|data| {
385                    serde_json::from_slice(&data)
386                        .map_err(|e| CacheError::Deserialization(e.to_string()))
387                })
388                .transpose()
389            })
390            .collect()
391    }
392
393    async fn invalidate_pattern(&self, pattern: &KeyPattern) -> CacheResult<u64> {
394        let full_pattern = format!("{}:{}", self.config.key_prefix, pattern.to_redis_pattern());
395
396        // Use SCAN to find matching keys
397        let keys = self.conn.scan(&full_pattern, 1000).await?;
398
399        if keys.is_empty() {
400            return Ok(0);
401        }
402
403        // Delete in batches
404        let mut deleted = 0u64;
405        for key in keys {
406            if self.conn.del(&key).await? {
407                deleted += 1;
408            }
409        }
410
411        Ok(deleted)
412    }
413
414    async fn invalidate_tags(&self, tags: &[EntityTag]) -> CacheResult<u64> {
415        // Tags stored as sets: tag:<tag_value> -> [key1, key2, ...]
416        let mut total = 0u64;
417
418        for tag in tags {
419            let tag_key = format!("{}:tag:{}", self.config.key_prefix, tag.value());
420            // In real impl: SMEMBERS to get keys, then DEL
421            let _ = tag_key;
422            total += 0; // Placeholder
423        }
424
425        Ok(total)
426    }
427
428    async fn clear(&self) -> CacheResult<()> {
429        // In production, use SCAN + DEL with prefix
430        // FLUSHDB would clear everything
431        self.conn.flush().await
432    }
433
434    async fn len(&self) -> CacheResult<usize> {
435        self.conn.dbsize().await
436    }
437
438    async fn stats(&self) -> CacheResult<BackendStats> {
439        let info = self.conn.info().await?;
440        let entries = self.conn.dbsize().await?;
441
442        Ok(BackendStats {
443            entries,
444            memory_bytes: None, // Parse from INFO
445            connections: Some(self.config.pool_size as usize),
446            info: Some(info),
447        })
448    }
449}
450
451#[cfg(test)]
452mod tests {
453    use super::*;
454
455    #[test]
456    fn test_redis_config() {
457        let config = RedisCacheConfig::new("redis://localhost:6379")
458            .with_pool_size(20)
459            .with_prefix("myapp")
460            .with_ttl(Duration::from_secs(600));
461
462        assert_eq!(config.pool_size, 20);
463        assert_eq!(config.key_prefix, "myapp");
464        assert_eq!(config.default_ttl, Some(Duration::from_secs(600)));
465    }
466
467    #[test]
468    fn test_full_key() {
469        let config = RedisCacheConfig::new("redis://localhost")
470            .with_prefix("app:cache");
471
472        let key = CacheKey::new("User", "id:123");
473        let full = config.full_key(&key);
474
475        assert_eq!(full, "app:cache:prax:User:id:123");
476    }
477
478    #[tokio::test]
479    async fn test_redis_cache_creation() {
480        // This test just verifies the API works
481        // Real tests would need a Redis instance
482        let config = RedisCacheConfig::default();
483        let cache = RedisCache::new(config).await.unwrap();
484
485        assert_eq!(cache.config().pool_size, 10);
486    }
487}
488