Skip to main content

multi_tier_cache/backends/
redis_cache.rs

1use crate::error::CacheResult;
2use crate::traits::{CacheBackend, L2CacheBackend};
3use bytes::Bytes;
4use futures_util::future::BoxFuture;
5use redis::aio::ConnectionManager;
6use redis::{AsyncCommands, Client};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::time::Duration;
10use tracing::{debug, info};
11
12/// Redis distributed cache with `ConnectionManager` for automatic reconnection
13pub struct RedisCache {
14    /// Redis connection manager
15    conn_manager: ConnectionManager,
16    /// Hit counter
17    hits: Arc<AtomicU64>,
18    /// Miss counter
19    misses: Arc<AtomicU64>,
20    /// Set counter
21    sets: Arc<AtomicU64>,
22}
23
24impl RedisCache {
25    /// Create new Redis cache
26    ///
27    /// # Errors
28    ///
29    /// Returns an error if the Redis URL is invalid or connection fails.
30    pub async fn new() -> CacheResult<Self> {
31        let redis_url =
32            std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
33        Self::with_url(&redis_url).await
34    }
35
36    /// Create new Redis cache with custom URL
37    ///
38    /// # Errors
39    ///
40    /// Returns an error if the Redis URL is invalid or connection fails.
41    pub async fn with_url(redis_url: &str) -> CacheResult<Self> {
42        info!(redis_url = %redis_url, "Initializing Redis Cache with ConnectionManager");
43
44        let client = Client::open(redis_url).map_err(|e| {
45            crate::error::CacheError::ConfigError(format!("Failed to create Redis client: {e}"))
46        })?;
47
48        let conn_manager = ConnectionManager::new(client).await.map_err(|e| {
49            crate::error::CacheError::BackendError(format!(
50                "Failed to establish Redis connection manager: {e}"
51            ))
52        })?;
53
54        let mut conn = conn_manager.clone();
55        let _: String = redis::cmd("PING")
56            .query_async(&mut conn)
57            .await
58            .map_err(|e| {
59                crate::error::CacheError::BackendError(format!(
60                    "Redis PING health check failed: {e}"
61                ))
62            })?;
63
64        info!(redis_url = %redis_url, "Redis Cache connected successfully");
65
66        Ok(Self {
67            conn_manager,
68            hits: Arc::new(AtomicU64::new(0)),
69            misses: Arc::new(AtomicU64::new(0)),
70            sets: Arc::new(AtomicU64::new(0)),
71        })
72    }
73
74    /// Scan keys matching a pattern
75    ///
76    /// # Errors
77    ///
78    /// Returns an error if the SCAN command fails.
79    pub async fn scan_keys(&self, pattern: &str) -> CacheResult<Vec<String>> {
80        let mut conn = self.conn_manager.clone();
81        let mut keys = Vec::new();
82        let mut cursor: u64 = 0;
83
84        loop {
85            let result: (u64, Vec<String>) = redis::cmd("SCAN")
86                .arg(cursor)
87                .arg("MATCH")
88                .arg(pattern)
89                .arg("COUNT")
90                .arg(100)
91                .query_async(&mut conn)
92                .await?;
93
94            cursor = result.0;
95            keys.extend(result.1);
96
97            if cursor == 0 {
98                break;
99            }
100        }
101
102        debug!(pattern = %pattern, count = keys.len(), "[Redis] Scanned keys matching pattern");
103        Ok(keys)
104    }
105
106    /// Remove multiple keys at once
107    ///
108    /// # Errors
109    ///
110    /// Returns an error if the DEL command fails.
111    pub async fn remove_bulk(&self, keys: &[String]) -> CacheResult<usize> {
112        if keys.is_empty() {
113            return Ok(0);
114        }
115
116        let mut conn = self.conn_manager.clone();
117        let count: usize = conn.del(keys).await?;
118        debug!(count = count, "[Redis] Removed keys in bulk");
119        Ok(count)
120    }
121}
122
123// ===== Trait Implementations =====
124
125/// Implement `CacheBackend` trait for `RedisCache`
126impl CacheBackend for RedisCache {
127    fn get<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Option<Bytes>> {
128        Box::pin(async move {
129            let mut conn = self.conn_manager.clone();
130            let result: redis::RedisResult<Vec<u8>> = conn.get(key).await;
131            match result {
132                Ok(bytes) if !bytes.is_empty() => {
133                    self.hits.fetch_add(1, Ordering::Relaxed);
134                    Some(Bytes::from(bytes))
135                }
136                _ => {
137                    self.misses.fetch_add(1, Ordering::Relaxed);
138                    None
139                }
140            }
141        })
142    }
143
144    fn set_with_ttl<'a>(
145        &'a self,
146        key: &'a str,
147        value: Bytes,
148        ttl: Duration,
149    ) -> BoxFuture<'a, CacheResult<()>> {
150        Box::pin(async move {
151            let mut conn = self.conn_manager.clone();
152            let ttl_ms = u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX);
153            let result = conn.pset_ex(key, value.to_vec(), ttl_ms).await;
154            if result.is_ok() {
155                self.sets.fetch_add(1, Ordering::Relaxed);
156                debug!(key = %key, ttl_ms = %ttl.as_millis(), "[Redis] Cached key bytes with TTL");
157            }
158            result.map_err(|e| {
159                crate::error::CacheError::BackendError(format!("Redis set failed: {e}"))
160            })
161        })
162    }
163
164    fn remove<'a>(&'a self, key: &'a str) -> BoxFuture<'a, CacheResult<()>> {
165        Box::pin(async move {
166            let mut conn = self.conn_manager.clone();
167            let _: usize = conn.del(key).await?;
168            Ok(())
169        })
170    }
171
172    fn health_check(&self) -> BoxFuture<'_, bool> {
173        Box::pin(async move {
174            let mut conn = self.conn_manager.clone();
175            let result: redis::RedisResult<String> =
176                redis::cmd("PING").query_async(&mut conn).await;
177            result.is_ok()
178        })
179    }
180
181    fn remove_pattern<'a>(&'a self, pattern: &'a str) -> BoxFuture<'a, CacheResult<()>> {
182        Box::pin(async move {
183            let keys = self.scan_keys(pattern).await?;
184            if !keys.is_empty() {
185                self.remove_bulk(&keys).await?;
186            }
187            Ok(())
188        })
189    }
190
191    fn name(&self) -> &'static str {
192        "Redis"
193    }
194}
195
196impl L2CacheBackend for RedisCache {
197    fn get_with_ttl<'a>(
198        &'a self,
199        key: &'a str,
200    ) -> BoxFuture<'a, Option<(Bytes, Option<Duration>)>> {
201        Box::pin(async move {
202            let mut conn = self.conn_manager.clone();
203            // Pipelining is better:
204            let (bytes, ttl_secs): (Option<Vec<u8>>, i64) =
205                match redis::pipe().get(key).ttl(key).query_async(&mut conn).await {
206                    Ok(res) => res,
207                    Err(_) => return None,
208                };
209
210            match bytes {
211                Some(b) if !b.is_empty() => {
212                    self.hits.fetch_add(1, Ordering::Relaxed);
213                    let ttl = if ttl_secs > 0 {
214                        Some(Duration::from_secs(ttl_secs.unsigned_abs()))
215                    } else {
216                        None
217                    };
218                    Some((Bytes::from(b), ttl))
219                }
220                _ => {
221                    self.misses.fetch_add(1, Ordering::Relaxed);
222                    None
223                }
224            }
225        })
226    }
227}