multi_tier_cache/backends/
redis_cache.rs

1//! Redis Cache - Distributed Cache Backend
2//!
3//! Redis-based distributed cache for warm data storage with persistence.
4
5use std::sync::Arc;
6use std::time::Duration;
7use anyhow::{Context, Result};
8use redis::{Client, AsyncCommands};
9use redis::aio::ConnectionManager;
10use serde_json;
11use std::sync::atomic::{AtomicU64, Ordering};
12use tracing::{info, debug};
13
14/// Redis distributed cache with ConnectionManager for automatic reconnection
15///
16/// This is the default L2 (warm tier) cache backend, providing:
17/// - Distributed caching across multiple instances
18/// - Persistence to disk
19/// - Automatic reconnection via ConnectionManager
20/// - TTL introspection for cache promotion
21/// - Pattern-based key scanning
22pub struct RedisCache {
23    /// Redis connection manager - handles reconnection automatically
24    conn_manager: ConnectionManager,
25    /// Hit counter
26    hits: Arc<AtomicU64>,
27    /// Miss counter
28    misses: Arc<AtomicU64>,
29    /// Set counter
30    sets: Arc<AtomicU64>,
31}
32
33impl RedisCache {
34    /// Create new Redis cache with ConnectionManager for automatic reconnection
35    pub async fn new() -> Result<Self> {
36        let redis_url = std::env::var("REDIS_URL")
37            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
38        Self::with_url(&redis_url).await
39    }
40
41    /// Create new Redis cache with custom URL
42    ///
43    /// # Arguments
44    ///
45    /// * `redis_url` - Redis connection string (e.g., "redis://localhost:6379")
46    pub async fn with_url(redis_url: &str) -> Result<Self> {
47        info!(redis_url = %redis_url, "Initializing Redis Cache with ConnectionManager");
48
49        let client = Client::open(redis_url)
50            .with_context(|| format!("Failed to create Redis client with URL: {}", redis_url))?;
51
52        // Create ConnectionManager - handles reconnection automatically
53        let conn_manager = ConnectionManager::new(client).await
54            .context("Failed to establish Redis connection manager")?;
55
56        // Test connection
57        let mut conn = conn_manager.clone();
58        let _: String = redis::cmd("PING")
59            .query_async(&mut conn)
60            .await
61            .context("Redis PING health check failed")?;
62
63        info!(redis_url = %redis_url, "Redis Cache connected successfully (ConnectionManager enabled)");
64
65        Ok(Self {
66            conn_manager,
67            hits: Arc::new(AtomicU64::new(0)),
68            misses: Arc::new(AtomicU64::new(0)),
69            sets: Arc::new(AtomicU64::new(0)),
70        })
71    }
72
73
74    /// Scan keys matching a pattern (glob-style: *, ?, [])
75    ///
76    /// Uses Redis SCAN command (non-blocking, cursor-based iteration)
77    /// This is safe for production use, unlike KEYS command.
78    ///
79    /// # Arguments
80    /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
81    ///
82    /// # Returns
83    /// Vector of matching key names
84    ///
85    /// # Examples
86    /// ```no_run
87    /// # use multi_tier_cache::backends::RedisCache;
88    /// # async fn example() -> anyhow::Result<()> {
89    /// # let cache = RedisCache::new().await?;
90    /// // Find all user cache keys
91    /// let keys = cache.scan_keys("user:*").await?;
92    ///
93    /// // Find specific user's cache keys
94    /// let keys = cache.scan_keys("user:123:*").await?;
95    /// # Ok(())
96    /// # }
97    /// ```
98    pub async fn scan_keys(&self, pattern: &str) -> Result<Vec<String>> {
99        let mut conn = self.conn_manager.clone();
100        let mut keys = Vec::new();
101        let mut cursor: u64 = 0;
102
103        loop {
104            // SCAN cursor MATCH pattern COUNT 100
105            let result: (u64, Vec<String>) = redis::cmd("SCAN")
106                .arg(cursor)
107                .arg("MATCH")
108                .arg(pattern)
109                .arg("COUNT")
110                .arg(100) // Fetch 100 keys per iteration
111                .query_async(&mut conn)
112                .await?;
113
114            cursor = result.0;
115            keys.extend(result.1);
116
117            // Cursor 0 means iteration is complete
118            if cursor == 0 {
119                break;
120            }
121        }
122
123        debug!(pattern = %pattern, count = keys.len(), "[Redis] Scanned keys matching pattern");
124        Ok(keys)
125    }
126
127    /// Remove multiple keys at once (bulk delete)
128    ///
129    /// More efficient than calling remove() multiple times
130    pub async fn remove_bulk(&self, keys: &[String]) -> Result<usize> {
131        if keys.is_empty() {
132            return Ok(0);
133        }
134
135        let mut conn = self.conn_manager.clone();
136        let count: usize = conn.del(keys).await?;
137        debug!(count = count, "[Redis] Removed keys in bulk");
138        Ok(count)
139    }
140
141}
142
143// ===== Trait Implementations =====
144
145use crate::traits::{CacheBackend, L2CacheBackend};
146use async_trait::async_trait;
147
148/// Implement CacheBackend trait for RedisCache
149///
150/// This allows RedisCache to be used as a pluggable backend in the multi-tier cache system.
151#[async_trait]
152impl CacheBackend for RedisCache {
153    async fn get(&self, key: &str) -> Option<serde_json::Value> {
154        let mut conn = self.conn_manager.clone();
155
156        match conn.get::<_, String>(key).await {
157            Ok(json_str) => {
158                match serde_json::from_str(&json_str) {
159                    Ok(value) => {
160                        self.hits.fetch_add(1, Ordering::Relaxed);
161                        Some(value)
162                    }
163                    Err(_) => {
164                        self.misses.fetch_add(1, Ordering::Relaxed);
165                        None
166                    }
167                }
168            }
169            Err(_) => {
170                self.misses.fetch_add(1, Ordering::Relaxed);
171                None
172            }
173        }
174    }
175
176    async fn set_with_ttl(
177        &self,
178        key: &str,
179        value: serde_json::Value,
180        ttl: Duration,
181    ) -> Result<()> {
182        let json_str = serde_json::to_string(&value)?;
183        let mut conn = self.conn_manager.clone();
184
185        let _: () = conn.set_ex(key, json_str, ttl.as_secs()).await?;
186        self.sets.fetch_add(1, Ordering::Relaxed);
187        debug!(key = %key, ttl_secs = %ttl.as_secs(), "[Redis] Cached key with TTL");
188        Ok(())
189    }
190
191    async fn remove(&self, key: &str) -> Result<()> {
192        let mut conn = self.conn_manager.clone();
193        let _: () = conn.del(key).await?;
194        Ok(())
195    }
196
197    async fn health_check(&self) -> bool {
198        let test_key = "health_check_redis";
199        let timestamp = std::time::SystemTime::now()
200            .duration_since(std::time::UNIX_EPOCH)
201            .unwrap_or(Duration::from_secs(0))
202            .as_secs();
203        let test_value = serde_json::json!({"test": true, "timestamp": timestamp});
204
205        match self.set_with_ttl(test_key, test_value.clone(), Duration::from_secs(10)).await {
206            Ok(_) => {
207                match self.get(test_key).await {
208                    Some(retrieved) => {
209                        let _ = self.remove(test_key).await;
210                        retrieved["test"].as_bool().unwrap_or(false)
211                    }
212                    None => false
213                }
214            }
215            Err(_) => false
216        }
217    }
218
219    fn name(&self) -> &str {
220        "Redis"
221    }
222}
223
224/// Implement L2CacheBackend trait for RedisCache
225///
226/// This extends CacheBackend with TTL introspection capabilities needed for L2->L1 promotion.
227#[async_trait]
228impl L2CacheBackend for RedisCache {
229    async fn get_with_ttl(
230        &self,
231        key: &str,
232    ) -> Option<(serde_json::Value, Option<Duration>)> {
233        let mut conn = self.conn_manager.clone();
234
235        // Get value
236        let json_str: String = match conn.get(key).await {
237            Ok(s) => s,
238            Err(_) => {
239                self.misses.fetch_add(1, Ordering::Relaxed);
240                return None;
241            }
242        };
243
244        // Parse JSON
245        let value: serde_json::Value = match serde_json::from_str(&json_str) {
246            Ok(v) => v,
247            Err(_) => {
248                self.misses.fetch_add(1, Ordering::Relaxed);
249                return None;
250            }
251        };
252
253        // Get TTL (in seconds, -1 = no expiry, -2 = key doesn't exist)
254        let ttl_secs: i64 = redis::cmd("TTL")
255            .arg(key)
256            .query_async(&mut conn)
257            .await
258            .unwrap_or(-1);
259
260        self.hits.fetch_add(1, Ordering::Relaxed);
261
262        let ttl = if ttl_secs > 0 {
263            Some(Duration::from_secs(ttl_secs as u64))
264        } else {
265            None // No expiry or error
266        };
267
268        Some((value, ttl))
269    }
270}