multi_tier_cache/backends/
redis_cache.rs1use crate::traits::{CacheBackend, L2CacheBackend};
2use anyhow::{Context, Result};
3use bytes::Bytes;
4use futures_util::future::BoxFuture;
5use redis::aio::ConnectionManager;
6use redis::{AsyncCommands, Client};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::time::Duration;
10use tracing::{debug, info};
11
12pub struct RedisCache {
14 conn_manager: ConnectionManager,
16 hits: Arc<AtomicU64>,
18 misses: Arc<AtomicU64>,
20 sets: Arc<AtomicU64>,
22}
23
24impl RedisCache {
25 pub async fn new() -> Result<Self> {
31 let redis_url =
32 std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
33 Self::with_url(&redis_url).await
34 }
35
36 pub async fn with_url(redis_url: &str) -> Result<Self> {
42 info!(redis_url = %redis_url, "Initializing Redis Cache with ConnectionManager");
43
44 let client = Client::open(redis_url)
45 .with_context(|| format!("Failed to create Redis client with URL: {redis_url}"))?;
46
47 let conn_manager = ConnectionManager::new(client)
48 .await
49 .context("Failed to establish Redis connection manager")?;
50
51 let mut conn = conn_manager.clone();
52 let _: String = redis::cmd("PING")
53 .query_async(&mut conn)
54 .await
55 .context("Redis PING health check failed")?;
56
57 info!(redis_url = %redis_url, "Redis Cache connected successfully");
58
59 Ok(Self {
60 conn_manager,
61 hits: Arc::new(AtomicU64::new(0)),
62 misses: Arc::new(AtomicU64::new(0)),
63 sets: Arc::new(AtomicU64::new(0)),
64 })
65 }
66
67 pub async fn scan_keys(&self, pattern: &str) -> Result<Vec<String>> {
73 let mut conn = self.conn_manager.clone();
74 let mut keys = Vec::new();
75 let mut cursor: u64 = 0;
76
77 loop {
78 let result: (u64, Vec<String>) = redis::cmd("SCAN")
79 .arg(cursor)
80 .arg("MATCH")
81 .arg(pattern)
82 .arg("COUNT")
83 .arg(100)
84 .query_async(&mut conn)
85 .await?;
86
87 cursor = result.0;
88 keys.extend(result.1);
89
90 if cursor == 0 {
91 break;
92 }
93 }
94
95 debug!(pattern = %pattern, count = keys.len(), "[Redis] Scanned keys matching pattern");
96 Ok(keys)
97 }
98
99 pub async fn remove_bulk(&self, keys: &[String]) -> Result<usize> {
105 if keys.is_empty() {
106 return Ok(0);
107 }
108
109 let mut conn = self.conn_manager.clone();
110 let count: usize = conn.del(keys).await?;
111 debug!(count = count, "[Redis] Removed keys in bulk");
112 Ok(count)
113 }
114}
115
116impl CacheBackend for RedisCache {
120 fn get<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Option<Bytes>> {
121 Box::pin(async move {
122 let mut conn = self.conn_manager.clone();
123 let result: redis::RedisResult<Vec<u8>> = conn.get(key).await;
124 match result {
125 Ok(bytes) if !bytes.is_empty() => {
126 self.hits.fetch_add(1, Ordering::Relaxed);
127 Some(Bytes::from(bytes))
128 }
129 _ => {
130 self.misses.fetch_add(1, Ordering::Relaxed);
131 None
132 }
133 }
134 })
135 }
136
137 fn set_with_ttl<'a>(
138 &'a self,
139 key: &'a str,
140 value: Bytes,
141 ttl: Duration,
142 ) -> BoxFuture<'a, Result<()>> {
143 Box::pin(async move {
144 let mut conn = self.conn_manager.clone();
145 let ttl_ms = u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX);
146 let result = conn.pset_ex(key, value.to_vec(), ttl_ms).await;
147 if result.is_ok() {
148 self.sets.fetch_add(1, Ordering::Relaxed);
149 debug!(key = %key, ttl_ms = %ttl.as_millis(), "[Redis] Cached key bytes with TTL");
150 }
151 result.map_err(|e| anyhow::anyhow!("Redis set failed: {e}"))
152 })
153 }
154
155 fn remove<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Result<()>> {
156 Box::pin(async move {
157 let mut conn = self.conn_manager.clone();
158 conn.del(key)
159 .await
160 .map_err(|e| anyhow::anyhow!("Redis del failed: {e}"))
161 })
162 }
163
164 fn health_check(&self) -> BoxFuture<'_, bool> {
165 Box::pin(async move {
166 let mut conn = self.conn_manager.clone();
167 let result: redis::RedisResult<String> =
168 redis::cmd("PING").query_async(&mut conn).await;
169 result.is_ok()
170 })
171 }
172
173 fn remove_pattern<'a>(&'a self, pattern: &'a str) -> BoxFuture<'a, Result<()>> {
174 Box::pin(async move {
175 let keys = self.scan_keys(pattern).await?;
176 if !keys.is_empty() {
177 self.remove_bulk(&keys).await?;
178 }
179 Ok(())
180 })
181 }
182
183 fn name(&self) -> &'static str {
184 "Redis"
185 }
186}
187
188impl L2CacheBackend for RedisCache {
189 fn get_with_ttl<'a>(
190 &'a self,
191 key: &'a str,
192 ) -> BoxFuture<'a, Option<(Bytes, Option<Duration>)>> {
193 Box::pin(async move {
194 let mut conn = self.conn_manager.clone();
195 let (bytes, ttl_secs): (Option<Vec<u8>>, i64) =
197 match redis::pipe().get(key).ttl(key).query_async(&mut conn).await {
198 Ok(res) => res,
199 Err(_) => return None,
200 };
201
202 match bytes {
203 Some(b) if !b.is_empty() => {
204 self.hits.fetch_add(1, Ordering::Relaxed);
205 let ttl = if ttl_secs > 0 {
206 Some(Duration::from_secs(ttl_secs.unsigned_abs()))
207 } else {
208 None
209 };
210 Some((Bytes::from(b), ttl))
211 }
212 _ => {
213 self.misses.fetch_add(1, Ordering::Relaxed);
214 None
215 }
216 }
217 })
218 }
219}