multi_tier_cache/backends/
redis_cache.rs

1//! Redis Cache - Distributed Cache Backend
2//!
3//! Redis-based distributed cache for warm data storage with persistence.
4
5use anyhow::{Context, Result};
6use redis::aio::ConnectionManager;
7use redis::{AsyncCommands, Client};
8use serde_json;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12use tracing::{debug, info};
13
14/// Redis distributed cache with `ConnectionManager` for automatic reconnection
15///
16/// This is the default L2 (warm tier) cache backend, providing:
17/// - Distributed caching across multiple instances
18/// - Persistence to disk
19/// - Automatic reconnection via `ConnectionManager`
20/// - TTL introspection for cache promotion
21/// - Pattern-based key scanning
22pub struct RedisCache {
23    /// Redis connection manager - handles reconnection automatically
24    conn_manager: ConnectionManager,
25    /// Hit counter
26    hits: Arc<AtomicU64>,
27    /// Miss counter
28    misses: Arc<AtomicU64>,
29    /// Set counter
30    sets: Arc<AtomicU64>,
31}
32
33impl RedisCache {
34    /// Create new Redis cache with `ConnectionManager` for automatic reconnection
35    /// # Errors
36    ///
37    /// Returns an error if the Redis client cannot be created or connection fails.
38    pub async fn new() -> Result<Self> {
39        let redis_url =
40            std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
41        Self::with_url(&redis_url).await
42    }
43
44    /// Create new Redis cache with custom URL
45    ///
46    /// # Arguments
47    ///
48    /// * `redis_url` - Redis connection string (e.g., `<redis://localhost:6379>`)
49    /// # Errors
50    ///
51    /// Returns an error if the Redis client cannot be created or connection fails.
52    pub async fn with_url(redis_url: &str) -> Result<Self> {
53        info!(redis_url = %redis_url, "Initializing Redis Cache with ConnectionManager");
54
55        let client = Client::open(redis_url)
56            .with_context(|| format!("Failed to create Redis client with URL: {redis_url}"))?;
57
58        // Create ConnectionManager - handles reconnection automatically
59        let conn_manager = ConnectionManager::new(client)
60            .await
61            .context("Failed to establish Redis connection manager")?;
62
63        // Test connection
64        let mut conn = conn_manager.clone();
65        let _: String = redis::cmd("PING")
66            .query_async(&mut conn)
67            .await
68            .context("Redis PING health check failed")?;
69
70        info!(redis_url = %redis_url, "Redis Cache connected successfully (ConnectionManager enabled)");
71
72        Ok(Self {
73            conn_manager,
74            hits: Arc::new(AtomicU64::new(0)),
75            misses: Arc::new(AtomicU64::new(0)),
76            sets: Arc::new(AtomicU64::new(0)),
77        })
78    }
79
80    /// Scan keys matching a pattern (glob-style: *, ?, [])
81    ///
82    /// Uses Redis SCAN command (non-blocking, cursor-based iteration)
83    /// This is safe for production use, unlike KEYS command.
84    ///
85    /// # Arguments
86    /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
87    ///
88    /// # Returns
89    /// Vector of matching key names
90    ///
91    /// # Examples
92    /// ```no_run
93    /// # use multi_tier_cache::backends::RedisCache;
94    /// # async fn example() -> anyhow::Result<()> {
95    /// # let cache = RedisCache::new().await?;
96    /// // Find all user cache keys
97    /// let keys = cache.scan_keys("user:*").await?;
98    ///
99    /// // Find specific user's cache keys
100    /// let keys = cache.scan_keys("user:123:*").await?;
101    /// # Ok(())
102    /// # }
103    /// ```
104    /// # Errors
105    ///
106    /// Returns an error if the Redis command fails.
107    pub async fn scan_keys(&self, pattern: &str) -> Result<Vec<String>> {
108        let mut conn = self.conn_manager.clone();
109        let mut keys = Vec::new();
110        let mut cursor: u64 = 0;
111
112        loop {
113            // SCAN cursor MATCH pattern COUNT 100
114            let result: (u64, Vec<String>) = redis::cmd("SCAN")
115                .arg(cursor)
116                .arg("MATCH")
117                .arg(pattern)
118                .arg("COUNT")
119                .arg(100) // Fetch 100 keys per iteration
120                .query_async(&mut conn)
121                .await?;
122
123            cursor = result.0;
124            keys.extend(result.1);
125
126            // Cursor 0 means iteration is complete
127            if cursor == 0 {
128                break;
129            }
130        }
131
132        debug!(pattern = %pattern, count = keys.len(), "[Redis] Scanned keys matching pattern");
133        Ok(keys)
134    }
135
136    /// Remove multiple keys at once (bulk delete)
137    ///
138    /// More efficient than calling `remove()` multiple times
139    /// # Errors
140    ///
141    /// Returns an error if the Redis command fails.
142    pub async fn remove_bulk(&self, keys: &[String]) -> Result<usize> {
143        if keys.is_empty() {
144            return Ok(0);
145        }
146
147        let mut conn = self.conn_manager.clone();
148        let count: usize = conn.del(keys).await?;
149        debug!(count = count, "[Redis] Removed keys in bulk");
150        Ok(count)
151    }
152}
153
154// ===== Trait Implementations =====
155
156use crate::traits::{CacheBackend, L2CacheBackend};
157use async_trait::async_trait;
158
159/// Implement `CacheBackend` trait for `RedisCache`
160///
161/// This allows `RedisCache` to be used as a pluggable backend in the multi-tier cache system.
162#[async_trait]
163impl CacheBackend for RedisCache {
164    async fn get(&self, key: &str) -> Option<serde_json::Value> {
165        let mut conn = self.conn_manager.clone();
166
167        if let Ok(json_str) = conn.get::<_, String>(key).await {
168            if let Ok(value) = serde_json::from_str(&json_str) {
169                self.hits.fetch_add(1, Ordering::Relaxed);
170                Some(value)
171            } else {
172                self.misses.fetch_add(1, Ordering::Relaxed);
173                None
174            }
175        } else {
176            self.misses.fetch_add(1, Ordering::Relaxed);
177            None
178        }
179    }
180
181    async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
182        let json_str = serde_json::to_string(&value)?;
183        let mut conn = self.conn_manager.clone();
184
185        let _: () = conn.set_ex(key, json_str, ttl.as_secs()).await?;
186        self.sets.fetch_add(1, Ordering::Relaxed);
187        debug!(key = %key, ttl_secs = %ttl.as_secs(), "[Redis] Cached key with TTL");
188        Ok(())
189    }
190
191    async fn remove(&self, key: &str) -> Result<()> {
192        let mut conn = self.conn_manager.clone();
193        let _: () = conn.del(key).await?;
194        Ok(())
195    }
196
197    async fn health_check(&self) -> bool {
198        let test_key = "health_check_redis";
199        let timestamp = std::time::SystemTime::now()
200            .duration_since(std::time::UNIX_EPOCH)
201            .unwrap_or(Duration::from_secs(0))
202            .as_secs();
203        let test_value = serde_json::json!({"test": true, "timestamp": timestamp});
204
205        match self
206            .set_with_ttl(test_key, test_value.clone(), Duration::from_secs(10))
207            .await
208        {
209            Ok(()) => match self.get(test_key).await {
210                Some(retrieved) => {
211                    let _ = self.remove(test_key).await;
212                    retrieved
213                        .get("test")
214                        .and_then(serde_json::Value::as_bool)
215                        .unwrap_or(false)
216                }
217                None => false,
218            },
219            Err(_) => false,
220        }
221    }
222
223    fn name(&self) -> &'static str {
224        "Redis"
225    }
226}
227
228/// Implement `L2CacheBackend` trait for `RedisCache`
229///
230/// This extends `CacheBackend` with TTL introspection capabilities needed for L2->L1 promotion.
231#[async_trait]
232impl L2CacheBackend for RedisCache {
233    async fn get_with_ttl(&self, key: &str) -> Option<(serde_json::Value, Option<Duration>)> {
234        let mut conn = self.conn_manager.clone();
235
236        // Get value
237        let json_str: String = if let Ok(s) = conn.get(key).await {
238            s
239        } else {
240            self.misses.fetch_add(1, Ordering::Relaxed);
241            return None;
242        };
243
244        // Parse JSON
245        let value: serde_json::Value = if let Ok(v) = serde_json::from_str(&json_str) {
246            v
247        } else {
248            self.misses.fetch_add(1, Ordering::Relaxed);
249            return None;
250        };
251
252        // Get TTL (in seconds, -1 = no expiry, -2 = key doesn't exist)
253        let ttl_secs: i64 = redis::cmd("TTL")
254            .arg(key)
255            .query_async(&mut conn)
256            .await
257            .unwrap_or(-1);
258
259        self.hits.fetch_add(1, Ordering::Relaxed);
260
261        let ttl = if ttl_secs > 0 {
262            Some(Duration::from_secs(ttl_secs.unsigned_abs()))
263        } else {
264            None // No expiry or error
265        };
266
267        Some((value, ttl))
268    }
269}