1use std::sync::Arc;
10
11use redis::aio::ConnectionManager;
12use redis::{AsyncCommands, Client};
13use serde::{de::DeserializeOwned, Serialize};
14use uuid::Uuid;
15
16use crate::error::{DbError, Result};
17
18pub mod keys {
20 pub const USER_SESSION: &str = "session:";
21 pub const USER_PROFILE: &str = "user:";
22 pub const TOKEN_PRICE: &str = "price:";
23 pub const TOKEN_META: &str = "token:";
24 pub const USER_BALANCE: &str = "balance:";
25 pub const RATE_LIMIT: &str = "ratelimit:";
26 pub const LOCK: &str = "lock:";
27}
28
29#[derive(Debug, Clone)]
31pub struct CacheConfig {
32 pub redis_url: String,
34 pub default_ttl_secs: u64,
36 pub session_ttl_secs: u64,
38 pub price_ttl_secs: u64,
40 pub profile_ttl_secs: u64,
42 pub balance_ttl_secs: u64,
44 pub key_prefix: String,
46}
47
48impl Default for CacheConfig {
49 fn default() -> Self {
50 Self {
51 redis_url: "redis://127.0.0.1:6379".to_string(),
52 default_ttl_secs: 3600, session_ttl_secs: 86400, price_ttl_secs: 5, profile_ttl_secs: 300, balance_ttl_secs: 30, key_prefix: "kaccy:".to_string(),
58 }
59 }
60}
61
62impl CacheConfig {
63 pub fn from_env() -> Self {
65 let redis_url =
66 std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
67
68 Self {
69 redis_url,
70 ..Default::default()
71 }
72 }
73
74 pub fn with_ttls(
76 mut self,
77 default: u64,
78 session: u64,
79 price: u64,
80 profile: u64,
81 balance: u64,
82 ) -> Self {
83 self.default_ttl_secs = default;
84 self.session_ttl_secs = session;
85 self.price_ttl_secs = price;
86 self.profile_ttl_secs = profile;
87 self.balance_ttl_secs = balance;
88 self
89 }
90}
91
92#[derive(Clone)]
94pub struct RedisCache {
95 conn: ConnectionManager,
96 config: CacheConfig,
97}
98
99impl RedisCache {
100 pub async fn new(config: CacheConfig) -> Result<Self> {
102 let client = Client::open(config.redis_url.as_str())
103 .map_err(|e| DbError::Connection(format!("Redis client error: {}", e)))?;
104
105 let conn = ConnectionManager::new(client)
106 .await
107 .map_err(|e| DbError::Connection(format!("Redis connection error: {}", e)))?;
108
109 tracing::info!("Redis cache connected to {}", config.redis_url);
110
111 Ok(Self { conn, config })
112 }
113
114 fn full_key(&self, key: &str) -> String {
116 format!("{}{}", self.config.key_prefix, key)
117 }
118
119 pub async fn get<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>> {
121 let full_key = self.full_key(key);
122 let mut conn = self.conn.clone();
123
124 let value: Option<String> = conn
125 .get(&full_key)
126 .await
127 .map_err(|e| DbError::Cache(format!("Redis GET error: {}", e)))?;
128
129 match value {
130 Some(json) => {
131 let parsed: T = serde_json::from_str(&json)
132 .map_err(|e| DbError::Cache(format!("Deserialization error: {}", e)))?;
133 Ok(Some(parsed))
134 }
135 None => Ok(None),
136 }
137 }
138
139 pub async fn set<T: Serialize>(&self, key: &str, value: &T, ttl_secs: u64) -> Result<()> {
141 let full_key = self.full_key(key);
142 let json = serde_json::to_string(value)
143 .map_err(|e| DbError::Cache(format!("Serialization error: {}", e)))?;
144
145 let mut conn = self.conn.clone();
146 let _: () = conn
147 .set_ex(&full_key, json, ttl_secs)
148 .await
149 .map_err(|e| DbError::Cache(format!("Redis SET error: {}", e)))?;
150
151 Ok(())
152 }
153
154 pub async fn set_default<T: Serialize>(&self, key: &str, value: &T) -> Result<()> {
156 self.set(key, value, self.config.default_ttl_secs).await
157 }
158
159 pub async fn delete(&self, key: &str) -> Result<bool> {
161 let full_key = self.full_key(key);
162 let mut conn = self.conn.clone();
163
164 let deleted: i64 = conn
165 .del(&full_key)
166 .await
167 .map_err(|e| DbError::Cache(format!("Redis DEL error: {}", e)))?;
168
169 Ok(deleted > 0)
170 }
171
172 pub async fn delete_pattern(&self, pattern: &str) -> Result<u64> {
174 let full_pattern = self.full_key(pattern);
175 let mut conn = self.conn.clone();
176
177 let keys: Vec<String> = conn
178 .keys(&full_pattern)
179 .await
180 .map_err(|e| DbError::Cache(format!("Redis KEYS error: {}", e)))?;
181
182 if keys.is_empty() {
183 return Ok(0);
184 }
185
186 let deleted: i64 = conn
187 .del(&keys)
188 .await
189 .map_err(|e| DbError::Cache(format!("Redis DEL error: {}", e)))?;
190
191 Ok(deleted as u64)
192 }
193
194 pub async fn exists(&self, key: &str) -> Result<bool> {
196 let full_key = self.full_key(key);
197 let mut conn = self.conn.clone();
198
199 let exists: bool = conn
200 .exists(&full_key)
201 .await
202 .map_err(|e| DbError::Cache(format!("Redis EXISTS error: {}", e)))?;
203
204 Ok(exists)
205 }
206
207 pub async fn incr(&self, key: &str) -> Result<i64> {
209 let full_key = self.full_key(key);
210 let mut conn = self.conn.clone();
211
212 let value: i64 = conn
213 .incr(&full_key, 1)
214 .await
215 .map_err(|e| DbError::Cache(format!("Redis INCR error: {}", e)))?;
216
217 Ok(value)
218 }
219
220 pub async fn incr_with_expiry(&self, key: &str, ttl_secs: u64) -> Result<i64> {
222 let full_key = self.full_key(key);
223 let mut conn = self.conn.clone();
224
225 let (value,): (i64,) = redis::pipe()
227 .atomic()
228 .incr(&full_key, 1)
229 .expire(&full_key, ttl_secs as i64)
230 .ignore()
231 .query_async(&mut conn)
232 .await
233 .map_err(|e| DbError::Cache(format!("Redis INCR/EXPIRE error: {}", e)))?;
234
235 Ok(value)
236 }
237
238 pub async fn expire(&self, key: &str, ttl_secs: u64) -> Result<bool> {
240 let full_key = self.full_key(key);
241 let mut conn = self.conn.clone();
242
243 let set: bool = conn
244 .expire(&full_key, ttl_secs as i64)
245 .await
246 .map_err(|e| DbError::Cache(format!("Redis EXPIRE error: {}", e)))?;
247
248 Ok(set)
249 }
250
251 pub async fn ttl(&self, key: &str) -> Result<i64> {
253 let full_key = self.full_key(key);
254 let mut conn = self.conn.clone();
255
256 let ttl: i64 = conn
257 .ttl(&full_key)
258 .await
259 .map_err(|e| DbError::Cache(format!("Redis TTL error: {}", e)))?;
260
261 Ok(ttl)
262 }
263
264 pub async fn try_lock(&self, resource: &str, ttl_secs: u64) -> Result<Option<String>> {
266 let key = format!("{}{}:{}", keys::LOCK, resource, Uuid::new_v4());
267 let full_key = self.full_key(&key);
268 let lock_id = Uuid::new_v4().to_string();
269 let mut conn = self.conn.clone();
270
271 let set: bool = conn
272 .set_nx(&full_key, &lock_id)
273 .await
274 .map_err(|e| DbError::Cache(format!("Redis SETNX error: {}", e)))?;
275
276 if set {
277 let _: () = conn
278 .expire(&full_key, ttl_secs as i64)
279 .await
280 .map_err(|e| DbError::Cache(format!("Redis EXPIRE error: {}", e)))?;
281 Ok(Some(lock_id))
282 } else {
283 Ok(None)
284 }
285 }
286
287 pub async fn release_lock(&self, resource: &str, lock_id: &str) -> Result<bool> {
289 let key = format!("{}{}:{}", keys::LOCK, resource, lock_id);
290 self.delete(&key).await
291 }
292
293 pub async fn health_check(&self) -> Result<bool> {
295 let mut conn = self.conn.clone();
296 let pong: String = redis::cmd("PING")
297 .query_async(&mut conn)
298 .await
299 .map_err(|e| DbError::Cache(format!("Redis PING error: {}", e)))?;
300
301 Ok(pong == "PONG")
302 }
303}
304
305impl RedisCache {
309 pub async fn set_session(&self, session_id: &str, user_id: Uuid) -> Result<()> {
311 let key = format!("{}{}", keys::USER_SESSION, session_id);
312 self.set(&key, &user_id, self.config.session_ttl_secs).await
313 }
314
315 pub async fn get_session(&self, session_id: &str) -> Result<Option<Uuid>> {
317 let key = format!("{}{}", keys::USER_SESSION, session_id);
318 self.get(&key).await
319 }
320
321 pub async fn invalidate_session(&self, session_id: &str) -> Result<bool> {
323 let key = format!("{}{}", keys::USER_SESSION, session_id);
324 self.delete(&key).await
325 }
326
327 #[allow(dead_code)]
329 pub async fn invalidate_user_sessions(&self, user_id: Uuid) -> Result<u64> {
330 tracing::warn!(
334 "invalidate_user_sessions: scanning all sessions for user {}",
335 user_id
336 );
337 Ok(0) }
339}
340
341impl RedisCache {
343 pub async fn set_user_profile<T: Serialize>(&self, user_id: Uuid, profile: &T) -> Result<()> {
345 let key = format!("{}{}", keys::USER_PROFILE, user_id);
346 self.set(&key, profile, self.config.profile_ttl_secs).await
347 }
348
349 pub async fn get_user_profile<T: DeserializeOwned>(&self, user_id: Uuid) -> Result<Option<T>> {
351 let key = format!("{}{}", keys::USER_PROFILE, user_id);
352 self.get(&key).await
353 }
354
355 pub async fn invalidate_user_profile(&self, user_id: Uuid) -> Result<bool> {
357 let key = format!("{}{}", keys::USER_PROFILE, user_id);
358 self.delete(&key).await
359 }
360}
361
362impl RedisCache {
364 pub async fn set_token_price(&self, token_id: Uuid, price_btc: f64) -> Result<()> {
366 let key = format!("{}{}", keys::TOKEN_PRICE, token_id);
367 self.set(&key, &price_btc, self.config.price_ttl_secs).await
368 }
369
370 pub async fn get_token_price(&self, token_id: Uuid) -> Result<Option<f64>> {
372 let key = format!("{}{}", keys::TOKEN_PRICE, token_id);
373 self.get(&key).await
374 }
375
376 pub async fn set_token_prices(&self, prices: &[(Uuid, f64)]) -> Result<()> {
378 for (token_id, price) in prices {
379 self.set_token_price(*token_id, *price).await?;
380 }
381 Ok(())
382 }
383}
384
385impl RedisCache {
387 pub async fn set_token_meta<T: Serialize>(&self, token_id: Uuid, meta: &T) -> Result<()> {
389 let key = format!("{}{}", keys::TOKEN_META, token_id);
390 self.set(&key, meta, self.config.default_ttl_secs).await
391 }
392
393 pub async fn get_token_meta<T: DeserializeOwned>(&self, token_id: Uuid) -> Result<Option<T>> {
395 let key = format!("{}{}", keys::TOKEN_META, token_id);
396 self.get(&key).await
397 }
398
399 pub async fn invalidate_token_meta(&self, token_id: Uuid) -> Result<bool> {
401 let key = format!("{}{}", keys::TOKEN_META, token_id);
402 self.delete(&key).await
403 }
404}
405
406impl RedisCache {
408 pub async fn set_balance(&self, user_id: Uuid, token_id: Uuid, balance: f64) -> Result<()> {
410 let key = format!("{}{}:{}", keys::USER_BALANCE, user_id, token_id);
411 self.set(&key, &balance, self.config.balance_ttl_secs).await
412 }
413
414 pub async fn get_balance(&self, user_id: Uuid, token_id: Uuid) -> Result<Option<f64>> {
416 let key = format!("{}{}:{}", keys::USER_BALANCE, user_id, token_id);
417 self.get(&key).await
418 }
419
420 pub async fn invalidate_user_balances(&self, user_id: Uuid) -> Result<u64> {
422 let pattern = format!("{}{}:*", keys::USER_BALANCE, user_id);
423 self.delete_pattern(&pattern).await
424 }
425
426 pub async fn invalidate_token_balances(&self, token_id: Uuid) -> Result<u64> {
428 let pattern = format!("{}*:{}", keys::USER_BALANCE, token_id);
429 self.delete_pattern(&pattern).await
430 }
431}
432
433impl RedisCache {
435 pub async fn check_rate_limit(
438 &self,
439 identifier: &str,
440 limit: u64,
441 window_secs: u64,
442 ) -> Result<(u64, bool)> {
443 let key = format!("{}{}", keys::RATE_LIMIT, identifier);
444 let count = self.incr_with_expiry(&key, window_secs).await? as u64;
445 Ok((count, count <= limit))
446 }
447
448 pub async fn get_rate_limit_count(&self, identifier: &str) -> Result<u64> {
450 let key = format!("{}{}", keys::RATE_LIMIT, identifier);
451 let count: Option<u64> = self.get(&key).await?;
452 Ok(count.unwrap_or(0))
453 }
454
455 pub async fn reset_rate_limit(&self, identifier: &str) -> Result<bool> {
457 let key = format!("{}{}", keys::RATE_LIMIT, identifier);
458 self.delete(&key).await
459 }
460}
461
462pub struct CachedRepository<R> {
464 cache: Arc<RedisCache>,
465 repo: R,
466}
467
468impl<R> CachedRepository<R> {
469 pub fn new(cache: Arc<RedisCache>, repo: R) -> Self {
470 Self { cache, repo }
471 }
472
473 pub fn repo(&self) -> &R {
475 &self.repo
476 }
477
478 pub fn cache(&self) -> &RedisCache {
480 &self.cache
481 }
482}
483
484#[derive(Debug, Clone, Default, serde::Serialize)]
486pub struct CacheStats {
487 pub hits: u64,
488 pub misses: u64,
489 pub sets: u64,
490 pub deletes: u64,
491}
492
493impl CacheStats {
494 pub fn hit_rate(&self) -> f64 {
495 let total = self.hits + self.misses;
496 if total == 0 {
497 0.0
498 } else {
499 self.hits as f64 / total as f64
500 }
501 }
502}