Skip to main content

multi_tier_cache/backends/
redis_cache.rs

1use crate::traits::{CacheBackend, L2CacheBackend};
2use anyhow::{Context, Result};
3use bytes::Bytes;
4use futures_util::future::BoxFuture;
5use redis::aio::ConnectionManager;
6use redis::{AsyncCommands, Client};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::time::Duration;
10use tracing::{debug, info};
11
12/// Redis distributed cache with `ConnectionManager` for automatic reconnection
13pub struct RedisCache {
14    /// Redis connection manager
15    conn_manager: ConnectionManager,
16    /// Hit counter
17    hits: Arc<AtomicU64>,
18    /// Miss counter
19    misses: Arc<AtomicU64>,
20    /// Set counter
21    sets: Arc<AtomicU64>,
22}
23
24impl RedisCache {
25    /// Create new Redis cache
26    ///
27    /// # Errors
28    ///
29    /// Returns an error if the Redis URL is invalid or connection fails.
30    pub async fn new() -> Result<Self> {
31        let redis_url =
32            std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
33        Self::with_url(&redis_url).await
34    }
35
36    /// Create new Redis cache with custom URL
37    ///
38    /// # Errors
39    ///
40    /// Returns an error if the Redis URL is invalid or connection fails.
41    pub async fn with_url(redis_url: &str) -> Result<Self> {
42        info!(redis_url = %redis_url, "Initializing Redis Cache with ConnectionManager");
43
44        let client = Client::open(redis_url)
45            .with_context(|| format!("Failed to create Redis client with URL: {redis_url}"))?;
46
47        let conn_manager = ConnectionManager::new(client)
48            .await
49            .context("Failed to establish Redis connection manager")?;
50
51        let mut conn = conn_manager.clone();
52        let _: String = redis::cmd("PING")
53            .query_async(&mut conn)
54            .await
55            .context("Redis PING health check failed")?;
56
57        info!(redis_url = %redis_url, "Redis Cache connected successfully");
58
59        Ok(Self {
60            conn_manager,
61            hits: Arc::new(AtomicU64::new(0)),
62            misses: Arc::new(AtomicU64::new(0)),
63            sets: Arc::new(AtomicU64::new(0)),
64        })
65    }
66
67    /// Scan keys matching a pattern
68    ///
69    /// # Errors
70    ///
71    /// Returns an error if the SCAN command fails.
72    pub async fn scan_keys(&self, pattern: &str) -> Result<Vec<String>> {
73        let mut conn = self.conn_manager.clone();
74        let mut keys = Vec::new();
75        let mut cursor: u64 = 0;
76
77        loop {
78            let result: (u64, Vec<String>) = redis::cmd("SCAN")
79                .arg(cursor)
80                .arg("MATCH")
81                .arg(pattern)
82                .arg("COUNT")
83                .arg(100)
84                .query_async(&mut conn)
85                .await?;
86
87            cursor = result.0;
88            keys.extend(result.1);
89
90            if cursor == 0 {
91                break;
92            }
93        }
94
95        debug!(pattern = %pattern, count = keys.len(), "[Redis] Scanned keys matching pattern");
96        Ok(keys)
97    }
98
99    /// Remove multiple keys at once
100    ///
101    /// # Errors
102    ///
103    /// Returns an error if the DEL command fails.
104    pub async fn remove_bulk(&self, keys: &[String]) -> Result<usize> {
105        if keys.is_empty() {
106            return Ok(0);
107        }
108
109        let mut conn = self.conn_manager.clone();
110        let count: usize = conn.del(keys).await?;
111        debug!(count = count, "[Redis] Removed keys in bulk");
112        Ok(count)
113    }
114}
115
116// ===== Trait Implementations =====
117
118/// Implement `CacheBackend` trait for `RedisCache`
119impl CacheBackend for RedisCache {
120    fn get<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Option<Bytes>> {
121        Box::pin(async move {
122            let mut conn = self.conn_manager.clone();
123            let result: redis::RedisResult<Vec<u8>> = conn.get(key).await;
124            match result {
125                Ok(bytes) if !bytes.is_empty() => {
126                    self.hits.fetch_add(1, Ordering::Relaxed);
127                    Some(Bytes::from(bytes))
128                }
129                _ => {
130                    self.misses.fetch_add(1, Ordering::Relaxed);
131                    None
132                }
133            }
134        })
135    }
136
137    fn set_with_ttl<'a>(
138        &'a self,
139        key: &'a str,
140        value: Bytes,
141        ttl: Duration,
142    ) -> BoxFuture<'a, Result<()>> {
143        Box::pin(async move {
144            let mut conn = self.conn_manager.clone();
145            let ttl_ms = u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX);
146            let result = conn.pset_ex(key, value.to_vec(), ttl_ms).await;
147            if result.is_ok() {
148                self.sets.fetch_add(1, Ordering::Relaxed);
149                debug!(key = %key, ttl_ms = %ttl.as_millis(), "[Redis] Cached key bytes with TTL");
150            }
151            result.map_err(|e| anyhow::anyhow!("Redis set failed: {e}"))
152        })
153    }
154
155    fn remove<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Result<()>> {
156        Box::pin(async move {
157            let mut conn = self.conn_manager.clone();
158            conn.del(key)
159                .await
160                .map_err(|e| anyhow::anyhow!("Redis del failed: {e}"))
161        })
162    }
163
164    fn health_check(&self) -> BoxFuture<'_, bool> {
165        Box::pin(async move {
166            let mut conn = self.conn_manager.clone();
167            let result: redis::RedisResult<String> =
168                redis::cmd("PING").query_async(&mut conn).await;
169            result.is_ok()
170        })
171    }
172
173    fn remove_pattern<'a>(&'a self, pattern: &'a str) -> BoxFuture<'a, Result<()>> {
174        Box::pin(async move {
175            let keys = self.scan_keys(pattern).await?;
176            if !keys.is_empty() {
177                self.remove_bulk(&keys).await?;
178            }
179            Ok(())
180        })
181    }
182
183    fn name(&self) -> &'static str {
184        "Redis"
185    }
186}
187
188impl L2CacheBackend for RedisCache {
189    fn get_with_ttl<'a>(
190        &'a self,
191        key: &'a str,
192    ) -> BoxFuture<'a, Option<(Bytes, Option<Duration>)>> {
193        Box::pin(async move {
194            let mut conn = self.conn_manager.clone();
195            // Pipelining is better:
196            let (bytes, ttl_secs): (Option<Vec<u8>>, i64) =
197                match redis::pipe().get(key).ttl(key).query_async(&mut conn).await {
198                    Ok(res) => res,
199                    Err(_) => return None,
200                };
201
202            match bytes {
203                Some(b) if !b.is_empty() => {
204                    self.hits.fetch_add(1, Ordering::Relaxed);
205                    let ttl = if ttl_secs > 0 {
206                        Some(Duration::from_secs(ttl_secs.unsigned_abs()))
207                    } else {
208                        None
209                    };
210                    Some((Bytes::from(b), ttl))
211                }
212                _ => {
213                    self.misses.fetch_add(1, Ordering::Relaxed);
214                    None
215                }
216            }
217        })
218    }
219}