multi_tier_cache/backends/
redis_cache.rs

1//! Redis Cache - Distributed Cache Backend
2//!
3//! Redis-based distributed cache for warm data storage with persistence.
4
5use std::sync::Arc;
6use std::time::Duration;
7use anyhow::Result;
8use redis::{Client, AsyncCommands};
9use redis::aio::ConnectionManager;
10use serde_json;
11use std::sync::atomic::{AtomicU64, Ordering};
12
13/// Redis distributed cache with ConnectionManager for automatic reconnection
14///
15/// This is the default L2 (warm tier) cache backend, providing:
16/// - Distributed caching across multiple instances
17/// - Persistence to disk
18/// - Automatic reconnection via ConnectionManager
19/// - TTL introspection for cache promotion
20/// - Pattern-based key scanning
21pub struct RedisCache {
22    /// Redis connection manager - handles reconnection automatically
23    conn_manager: ConnectionManager,
24    /// Hit counter
25    hits: Arc<AtomicU64>,
26    /// Miss counter
27    misses: Arc<AtomicU64>,
28    /// Set counter
29    sets: Arc<AtomicU64>,
30}
31
32impl RedisCache {
33    /// Create new Redis cache with ConnectionManager for automatic reconnection
34    pub async fn new() -> Result<Self> {
35        println!("  🔴 Initializing Redis Cache (with ConnectionManager)...");
36
37        // Try to connect to Redis
38        let redis_url = std::env::var("REDIS_URL")
39            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
40
41        let client = Client::open(redis_url.as_str())?;
42
43        // Create ConnectionManager - handles reconnection automatically
44        let conn_manager = ConnectionManager::new(client).await?;
45
46        // Test connection
47        let mut conn = conn_manager.clone();
48        let _: String = redis::cmd("PING").query_async(&mut conn).await?;
49
50        println!("  ✅ Redis Cache connected at {} (ConnectionManager enabled)", redis_url);
51
52        Ok(Self {
53            conn_manager,
54            hits: Arc::new(AtomicU64::new(0)),
55            misses: Arc::new(AtomicU64::new(0)),
56            sets: Arc::new(AtomicU64::new(0)),
57        })
58    }
59
60    /// Get value from Redis cache using persistent ConnectionManager
61    pub async fn get(&self, key: &str) -> Option<serde_json::Value> {
62        let mut conn = self.conn_manager.clone();
63
64        match conn.get::<_, String>(key).await {
65            Ok(json_str) => {
66                match serde_json::from_str(&json_str) {
67                    Ok(value) => {
68                        self.hits.fetch_add(1, Ordering::Relaxed);
69                        Some(value)
70                    }
71                    Err(_) => {
72                        self.misses.fetch_add(1, Ordering::Relaxed);
73                        None
74                    }
75                }
76            }
77            Err(_) => {
78                self.misses.fetch_add(1, Ordering::Relaxed);
79                None
80            }
81        }
82    }
83
84    /// Get value with its remaining TTL from Redis cache
85    ///
86    /// Returns tuple of (value, ttl) if key exists
87    /// TTL is in seconds, None if key doesn't exist or has no expiration
88    pub async fn get_with_ttl(&self, key: &str) -> Option<(serde_json::Value, Option<Duration>)> {
89        let mut conn = self.conn_manager.clone();
90
91        // Get value
92        let json_str: String = match conn.get(key).await {
93            Ok(s) => s,
94            Err(_) => {
95                self.misses.fetch_add(1, Ordering::Relaxed);
96                return None;
97            }
98        };
99
100        // Parse JSON
101        let value: serde_json::Value = match serde_json::from_str(&json_str) {
102            Ok(v) => v,
103            Err(_) => {
104                self.misses.fetch_add(1, Ordering::Relaxed);
105                return None;
106            }
107        };
108
109        // Get TTL (in seconds, -1 = no expiry, -2 = key doesn't exist)
110        let ttl_secs: i64 = match redis::cmd("TTL").arg(key).query_async(&mut conn).await {
111            Ok(ttl) => ttl,
112            Err(_) => -1, // Fallback: treat as no expiry
113        };
114
115        self.hits.fetch_add(1, Ordering::Relaxed);
116
117        let ttl = if ttl_secs > 0 {
118            Some(Duration::from_secs(ttl_secs as u64))
119        } else {
120            None // No expiry or error
121        };
122
123        Some((value, ttl))
124    }
125
126    /// Set value with custom TTL using persistent ConnectionManager
127    pub async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
128        let json_str = serde_json::to_string(&value)?;
129        let mut conn = self.conn_manager.clone();
130
131        let _: () = conn.set_ex(key, json_str, ttl.as_secs()).await?;
132        self.sets.fetch_add(1, Ordering::Relaxed);
133        println!("💾 [Redis] Cached '{}' with TTL {:?}", key, ttl);
134        Ok(())
135    }
136
137    /// Remove value from cache using persistent ConnectionManager
138    pub async fn remove(&self, key: &str) -> Result<()> {
139        let mut conn = self.conn_manager.clone();
140        let _: () = conn.del(key).await?;
141        Ok(())
142    }
143
144    /// Scan keys matching a pattern (glob-style: *, ?, [])
145    ///
146    /// Uses Redis SCAN command (non-blocking, cursor-based iteration)
147    /// This is safe for production use, unlike KEYS command.
148    ///
149    /// # Arguments
150    /// * `pattern` - Glob-style pattern (e.g., "user:*", "product:123:*")
151    ///
152    /// # Returns
153    /// Vector of matching key names
154    ///
155    /// # Examples
156    /// ```no_run
157    /// # use multi_tier_cache::backends::RedisCache;
158    /// # async fn example() -> anyhow::Result<()> {
159    /// # let cache = RedisCache::new().await?;
160    /// // Find all user cache keys
161    /// let keys = cache.scan_keys("user:*").await?;
162    ///
163    /// // Find specific user's cache keys
164    /// let keys = cache.scan_keys("user:123:*").await?;
165    /// # Ok(())
166    /// # }
167    /// ```
168    pub async fn scan_keys(&self, pattern: &str) -> Result<Vec<String>> {
169        let mut conn = self.conn_manager.clone();
170        let mut keys = Vec::new();
171        let mut cursor: u64 = 0;
172
173        loop {
174            // SCAN cursor MATCH pattern COUNT 100
175            let result: (u64, Vec<String>) = redis::cmd("SCAN")
176                .arg(cursor)
177                .arg("MATCH")
178                .arg(pattern)
179                .arg("COUNT")
180                .arg(100) // Fetch 100 keys per iteration
181                .query_async(&mut conn)
182                .await?;
183
184            cursor = result.0;
185            keys.extend(result.1);
186
187            // Cursor 0 means iteration is complete
188            if cursor == 0 {
189                break;
190            }
191        }
192
193        println!("🔍 [Redis] Scanned keys matching '{}': {} found", pattern, keys.len());
194        Ok(keys)
195    }
196
197    /// Remove multiple keys at once (bulk delete)
198    ///
199    /// More efficient than calling remove() multiple times
200    pub async fn remove_bulk(&self, keys: &[String]) -> Result<usize> {
201        if keys.is_empty() {
202            return Ok(0);
203        }
204
205        let mut conn = self.conn_manager.clone();
206        let count: usize = conn.del(keys).await?;
207        println!("🗑️  [Redis] Removed {} keys", count);
208        Ok(count)
209    }
210
211    /// Health check
212    pub async fn health_check(&self) -> bool {
213        let test_key = "health_check_redis";
214        let timestamp = std::time::SystemTime::now()
215            .duration_since(std::time::UNIX_EPOCH)
216            .unwrap_or(Duration::from_secs(0))
217            .as_secs();
218        let test_value = serde_json::json!({"test": true, "timestamp": timestamp});
219
220        match self.set_with_ttl(test_key, test_value.clone(), Duration::from_secs(10)).await {
221            Ok(_) => {
222                match self.get(test_key).await {
223                    Some(retrieved) => {
224                        let _ = self.remove(test_key).await;
225                        retrieved["test"].as_bool().unwrap_or(false)
226                    }
227                    None => false
228                }
229            }
230            Err(_) => false
231        }
232    }
233}
234
235// ===== Trait Implementations =====
236
237use crate::traits::{CacheBackend, L2CacheBackend};
238use async_trait::async_trait;
239
240/// Implement CacheBackend trait for RedisCache
241///
242/// This allows RedisCache to be used as a pluggable backend in the multi-tier cache system.
243#[async_trait]
244impl CacheBackend for RedisCache {
245    async fn get(&self, key: &str) -> Option<serde_json::Value> {
246        RedisCache::get(self, key).await
247    }
248
249    async fn set_with_ttl(
250        &self,
251        key: &str,
252        value: serde_json::Value,
253        ttl: Duration,
254    ) -> Result<()> {
255        RedisCache::set_with_ttl(self, key, value, ttl).await
256    }
257
258    async fn remove(&self, key: &str) -> Result<()> {
259        RedisCache::remove(self, key).await
260    }
261
262    async fn health_check(&self) -> bool {
263        RedisCache::health_check(self).await
264    }
265
266    fn name(&self) -> &str {
267        "Redis"
268    }
269}
270
271/// Implement L2CacheBackend trait for RedisCache
272///
273/// This extends CacheBackend with TTL introspection capabilities needed for L2->L1 promotion.
274#[async_trait]
275impl L2CacheBackend for RedisCache {
276    async fn get_with_ttl(
277        &self,
278        key: &str,
279    ) -> Option<(serde_json::Value, Option<Duration>)> {
280        RedisCache::get_with_ttl(self, key).await
281    }
282}