kaccy_db/
cache.rs

1//! Redis caching layer for high-performance data access
2//!
3//! Provides caching for:
4//! - User sessions and profiles
5//! - Token prices and metadata
6//! - User balances (hot data)
7//! - Rate limiting counters
8
9use std::sync::Arc;
10
11use redis::aio::ConnectionManager;
12use redis::{AsyncCommands, Client};
13use serde::{de::DeserializeOwned, Serialize};
14use uuid::Uuid;
15
16use crate::error::{DbError, Result};
17
18/// Cache key prefixes for namespacing
19pub mod keys {
20    pub const USER_SESSION: &str = "session:";
21    pub const USER_PROFILE: &str = "user:";
22    pub const TOKEN_PRICE: &str = "price:";
23    pub const TOKEN_META: &str = "token:";
24    pub const USER_BALANCE: &str = "balance:";
25    pub const RATE_LIMIT: &str = "ratelimit:";
26    pub const LOCK: &str = "lock:";
27}
28
29/// Cache configuration
30#[derive(Debug, Clone)]
31pub struct CacheConfig {
32    /// Redis connection URL
33    pub redis_url: String,
34    /// Default TTL for cached items (seconds)
35    pub default_ttl_secs: u64,
36    /// TTL for user sessions (seconds)
37    pub session_ttl_secs: u64,
38    /// TTL for price cache (seconds) - should be short
39    pub price_ttl_secs: u64,
40    /// TTL for user profiles (seconds)
41    pub profile_ttl_secs: u64,
42    /// TTL for balance cache (seconds)
43    pub balance_ttl_secs: u64,
44    /// Key prefix for this application instance
45    pub key_prefix: String,
46}
47
48impl Default for CacheConfig {
49    fn default() -> Self {
50        Self {
51            redis_url: "redis://127.0.0.1:6379".to_string(),
52            default_ttl_secs: 3600,  // 1 hour
53            session_ttl_secs: 86400, // 24 hours
54            price_ttl_secs: 5,       // 5 seconds (volatile)
55            profile_ttl_secs: 300,   // 5 minutes
56            balance_ttl_secs: 30,    // 30 seconds
57            key_prefix: "kaccy:".to_string(),
58        }
59    }
60}
61
62impl CacheConfig {
63    /// Create config from environment variable
64    pub fn from_env() -> Self {
65        let redis_url =
66            std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
67
68        Self {
69            redis_url,
70            ..Default::default()
71        }
72    }
73
74    /// Set custom TTLs
75    pub fn with_ttls(
76        mut self,
77        default: u64,
78        session: u64,
79        price: u64,
80        profile: u64,
81        balance: u64,
82    ) -> Self {
83        self.default_ttl_secs = default;
84        self.session_ttl_secs = session;
85        self.price_ttl_secs = price;
86        self.profile_ttl_secs = profile;
87        self.balance_ttl_secs = balance;
88        self
89    }
90}
91
92/// Redis cache client with connection management
93#[derive(Clone)]
94pub struct RedisCache {
95    conn: ConnectionManager,
96    config: CacheConfig,
97}
98
99impl RedisCache {
100    /// Create a new Redis cache connection
101    pub async fn new(config: CacheConfig) -> Result<Self> {
102        let client = Client::open(config.redis_url.as_str())
103            .map_err(|e| DbError::Connection(format!("Redis client error: {}", e)))?;
104
105        let conn = ConnectionManager::new(client)
106            .await
107            .map_err(|e| DbError::Connection(format!("Redis connection error: {}", e)))?;
108
109        tracing::info!("Redis cache connected to {}", config.redis_url);
110
111        Ok(Self { conn, config })
112    }
113
114    /// Get the full key with prefix
115    fn full_key(&self, key: &str) -> String {
116        format!("{}{}", self.config.key_prefix, key)
117    }
118
119    /// Get a value from cache
120    pub async fn get<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>> {
121        let full_key = self.full_key(key);
122        let mut conn = self.conn.clone();
123
124        let value: Option<String> = conn
125            .get(&full_key)
126            .await
127            .map_err(|e| DbError::Cache(format!("Redis GET error: {}", e)))?;
128
129        match value {
130            Some(json) => {
131                let parsed: T = serde_json::from_str(&json)
132                    .map_err(|e| DbError::Cache(format!("Deserialization error: {}", e)))?;
133                Ok(Some(parsed))
134            }
135            None => Ok(None),
136        }
137    }
138
139    /// Set a value in cache with TTL
140    pub async fn set<T: Serialize>(&self, key: &str, value: &T, ttl_secs: u64) -> Result<()> {
141        let full_key = self.full_key(key);
142        let json = serde_json::to_string(value)
143            .map_err(|e| DbError::Cache(format!("Serialization error: {}", e)))?;
144
145        let mut conn = self.conn.clone();
146        let _: () = conn
147            .set_ex(&full_key, json, ttl_secs)
148            .await
149            .map_err(|e| DbError::Cache(format!("Redis SET error: {}", e)))?;
150
151        Ok(())
152    }
153
154    /// Set a value with default TTL
155    pub async fn set_default<T: Serialize>(&self, key: &str, value: &T) -> Result<()> {
156        self.set(key, value, self.config.default_ttl_secs).await
157    }
158
159    /// Delete a key from cache
160    pub async fn delete(&self, key: &str) -> Result<bool> {
161        let full_key = self.full_key(key);
162        let mut conn = self.conn.clone();
163
164        let deleted: i64 = conn
165            .del(&full_key)
166            .await
167            .map_err(|e| DbError::Cache(format!("Redis DEL error: {}", e)))?;
168
169        Ok(deleted > 0)
170    }
171
172    /// Delete multiple keys matching a pattern
173    pub async fn delete_pattern(&self, pattern: &str) -> Result<u64> {
174        let full_pattern = self.full_key(pattern);
175        let mut conn = self.conn.clone();
176
177        let keys: Vec<String> = conn
178            .keys(&full_pattern)
179            .await
180            .map_err(|e| DbError::Cache(format!("Redis KEYS error: {}", e)))?;
181
182        if keys.is_empty() {
183            return Ok(0);
184        }
185
186        let deleted: i64 = conn
187            .del(&keys)
188            .await
189            .map_err(|e| DbError::Cache(format!("Redis DEL error: {}", e)))?;
190
191        Ok(deleted as u64)
192    }
193
194    /// Check if a key exists
195    pub async fn exists(&self, key: &str) -> Result<bool> {
196        let full_key = self.full_key(key);
197        let mut conn = self.conn.clone();
198
199        let exists: bool = conn
200            .exists(&full_key)
201            .await
202            .map_err(|e| DbError::Cache(format!("Redis EXISTS error: {}", e)))?;
203
204        Ok(exists)
205    }
206
207    /// Increment a counter (for rate limiting)
208    pub async fn incr(&self, key: &str) -> Result<i64> {
209        let full_key = self.full_key(key);
210        let mut conn = self.conn.clone();
211
212        let value: i64 = conn
213            .incr(&full_key, 1)
214            .await
215            .map_err(|e| DbError::Cache(format!("Redis INCR error: {}", e)))?;
216
217        Ok(value)
218    }
219
220    /// Increment with expiry (for rate limiting windows)
221    pub async fn incr_with_expiry(&self, key: &str, ttl_secs: u64) -> Result<i64> {
222        let full_key = self.full_key(key);
223        let mut conn = self.conn.clone();
224
225        // Use MULTI/EXEC for atomicity
226        let (value,): (i64,) = redis::pipe()
227            .atomic()
228            .incr(&full_key, 1)
229            .expire(&full_key, ttl_secs as i64)
230            .ignore()
231            .query_async(&mut conn)
232            .await
233            .map_err(|e| DbError::Cache(format!("Redis INCR/EXPIRE error: {}", e)))?;
234
235        Ok(value)
236    }
237
238    /// Set TTL on existing key
239    pub async fn expire(&self, key: &str, ttl_secs: u64) -> Result<bool> {
240        let full_key = self.full_key(key);
241        let mut conn = self.conn.clone();
242
243        let set: bool = conn
244            .expire(&full_key, ttl_secs as i64)
245            .await
246            .map_err(|e| DbError::Cache(format!("Redis EXPIRE error: {}", e)))?;
247
248        Ok(set)
249    }
250
251    /// Get remaining TTL for a key
252    pub async fn ttl(&self, key: &str) -> Result<i64> {
253        let full_key = self.full_key(key);
254        let mut conn = self.conn.clone();
255
256        let ttl: i64 = conn
257            .ttl(&full_key)
258            .await
259            .map_err(|e| DbError::Cache(format!("Redis TTL error: {}", e)))?;
260
261        Ok(ttl)
262    }
263
264    /// Try to acquire a distributed lock
265    pub async fn try_lock(&self, resource: &str, ttl_secs: u64) -> Result<Option<String>> {
266        let key = format!("{}{}:{}", keys::LOCK, resource, Uuid::new_v4());
267        let full_key = self.full_key(&key);
268        let lock_id = Uuid::new_v4().to_string();
269        let mut conn = self.conn.clone();
270
271        let set: bool = conn
272            .set_nx(&full_key, &lock_id)
273            .await
274            .map_err(|e| DbError::Cache(format!("Redis SETNX error: {}", e)))?;
275
276        if set {
277            let _: () = conn
278                .expire(&full_key, ttl_secs as i64)
279                .await
280                .map_err(|e| DbError::Cache(format!("Redis EXPIRE error: {}", e)))?;
281            Ok(Some(lock_id))
282        } else {
283            Ok(None)
284        }
285    }
286
287    /// Release a distributed lock
288    pub async fn release_lock(&self, resource: &str, lock_id: &str) -> Result<bool> {
289        let key = format!("{}{}:{}", keys::LOCK, resource, lock_id);
290        self.delete(&key).await
291    }
292
293    /// Health check
294    pub async fn health_check(&self) -> Result<bool> {
295        let mut conn = self.conn.clone();
296        let pong: String = redis::cmd("PING")
297            .query_async(&mut conn)
298            .await
299            .map_err(|e| DbError::Cache(format!("Redis PING error: {}", e)))?;
300
301        Ok(pong == "PONG")
302    }
303}
304
305// ============== Specialized Cache Operations ==============
306
307/// User session cache operations
308impl RedisCache {
309    /// Cache a user session
310    pub async fn set_session(&self, session_id: &str, user_id: Uuid) -> Result<()> {
311        let key = format!("{}{}", keys::USER_SESSION, session_id);
312        self.set(&key, &user_id, self.config.session_ttl_secs).await
313    }
314
315    /// Get user ID from session
316    pub async fn get_session(&self, session_id: &str) -> Result<Option<Uuid>> {
317        let key = format!("{}{}", keys::USER_SESSION, session_id);
318        self.get(&key).await
319    }
320
321    /// Invalidate a session
322    pub async fn invalidate_session(&self, session_id: &str) -> Result<bool> {
323        let key = format!("{}{}", keys::USER_SESSION, session_id);
324        self.delete(&key).await
325    }
326
327    /// Invalidate all sessions for a user
328    #[allow(dead_code)]
329    pub async fn invalidate_user_sessions(&self, user_id: Uuid) -> Result<u64> {
330        // This requires scanning - consider using a SET per user for production
331        // Note: This is a simplified implementation
332        // In production, maintain a set of session IDs per user
333        tracing::warn!(
334            "invalidate_user_sessions: scanning all sessions for user {}",
335            user_id
336        );
337        Ok(0) // Placeholder - implement with user session tracking
338    }
339}
340
341/// User profile cache operations
342impl RedisCache {
343    /// Cache user profile
344    pub async fn set_user_profile<T: Serialize>(&self, user_id: Uuid, profile: &T) -> Result<()> {
345        let key = format!("{}{}", keys::USER_PROFILE, user_id);
346        self.set(&key, profile, self.config.profile_ttl_secs).await
347    }
348
349    /// Get cached user profile
350    pub async fn get_user_profile<T: DeserializeOwned>(&self, user_id: Uuid) -> Result<Option<T>> {
351        let key = format!("{}{}", keys::USER_PROFILE, user_id);
352        self.get(&key).await
353    }
354
355    /// Invalidate user profile cache
356    pub async fn invalidate_user_profile(&self, user_id: Uuid) -> Result<bool> {
357        let key = format!("{}{}", keys::USER_PROFILE, user_id);
358        self.delete(&key).await
359    }
360}
361
362/// Token price cache operations
363impl RedisCache {
364    /// Cache token price
365    pub async fn set_token_price(&self, token_id: Uuid, price_btc: f64) -> Result<()> {
366        let key = format!("{}{}", keys::TOKEN_PRICE, token_id);
367        self.set(&key, &price_btc, self.config.price_ttl_secs).await
368    }
369
370    /// Get cached token price
371    pub async fn get_token_price(&self, token_id: Uuid) -> Result<Option<f64>> {
372        let key = format!("{}{}", keys::TOKEN_PRICE, token_id);
373        self.get(&key).await
374    }
375
376    /// Cache multiple token prices
377    pub async fn set_token_prices(&self, prices: &[(Uuid, f64)]) -> Result<()> {
378        for (token_id, price) in prices {
379            self.set_token_price(*token_id, *price).await?;
380        }
381        Ok(())
382    }
383}
384
385/// Token metadata cache operations
386impl RedisCache {
387    /// Cache token metadata
388    pub async fn set_token_meta<T: Serialize>(&self, token_id: Uuid, meta: &T) -> Result<()> {
389        let key = format!("{}{}", keys::TOKEN_META, token_id);
390        self.set(&key, meta, self.config.default_ttl_secs).await
391    }
392
393    /// Get cached token metadata
394    pub async fn get_token_meta<T: DeserializeOwned>(&self, token_id: Uuid) -> Result<Option<T>> {
395        let key = format!("{}{}", keys::TOKEN_META, token_id);
396        self.get(&key).await
397    }
398
399    /// Invalidate token metadata cache
400    pub async fn invalidate_token_meta(&self, token_id: Uuid) -> Result<bool> {
401        let key = format!("{}{}", keys::TOKEN_META, token_id);
402        self.delete(&key).await
403    }
404}
405
406/// User balance cache operations
407impl RedisCache {
408    /// Cache user balance for a token
409    pub async fn set_balance(&self, user_id: Uuid, token_id: Uuid, balance: f64) -> Result<()> {
410        let key = format!("{}{}:{}", keys::USER_BALANCE, user_id, token_id);
411        self.set(&key, &balance, self.config.balance_ttl_secs).await
412    }
413
414    /// Get cached balance
415    pub async fn get_balance(&self, user_id: Uuid, token_id: Uuid) -> Result<Option<f64>> {
416        let key = format!("{}{}:{}", keys::USER_BALANCE, user_id, token_id);
417        self.get(&key).await
418    }
419
420    /// Invalidate all balances for a user
421    pub async fn invalidate_user_balances(&self, user_id: Uuid) -> Result<u64> {
422        let pattern = format!("{}{}:*", keys::USER_BALANCE, user_id);
423        self.delete_pattern(&pattern).await
424    }
425
426    /// Invalidate all balances for a token
427    pub async fn invalidate_token_balances(&self, token_id: Uuid) -> Result<u64> {
428        let pattern = format!("{}*:{}", keys::USER_BALANCE, token_id);
429        self.delete_pattern(&pattern).await
430    }
431}
432
433/// Rate limiting operations
434impl RedisCache {
435    /// Check and increment rate limit
436    /// Returns (current_count, is_allowed)
437    pub async fn check_rate_limit(
438        &self,
439        identifier: &str,
440        limit: u64,
441        window_secs: u64,
442    ) -> Result<(u64, bool)> {
443        let key = format!("{}{}", keys::RATE_LIMIT, identifier);
444        let count = self.incr_with_expiry(&key, window_secs).await? as u64;
445        Ok((count, count <= limit))
446    }
447
448    /// Get current rate limit count
449    pub async fn get_rate_limit_count(&self, identifier: &str) -> Result<u64> {
450        let key = format!("{}{}", keys::RATE_LIMIT, identifier);
451        let count: Option<u64> = self.get(&key).await?;
452        Ok(count.unwrap_or(0))
453    }
454
455    /// Reset rate limit for identifier
456    pub async fn reset_rate_limit(&self, identifier: &str) -> Result<bool> {
457        let key = format!("{}{}", keys::RATE_LIMIT, identifier);
458        self.delete(&key).await
459    }
460}
461
462/// Cached repository wrapper for read-through caching
463pub struct CachedRepository<R> {
464    cache: Arc<RedisCache>,
465    repo: R,
466}
467
468impl<R> CachedRepository<R> {
469    pub fn new(cache: Arc<RedisCache>, repo: R) -> Self {
470        Self { cache, repo }
471    }
472
473    /// Get the underlying repository
474    pub fn repo(&self) -> &R {
475        &self.repo
476    }
477
478    /// Get the cache
479    pub fn cache(&self) -> &RedisCache {
480        &self.cache
481    }
482}
483
484/// Cache statistics
485#[derive(Debug, Clone, Default, serde::Serialize)]
486pub struct CacheStats {
487    pub hits: u64,
488    pub misses: u64,
489    pub sets: u64,
490    pub deletes: u64,
491}
492
493impl CacheStats {
494    pub fn hit_rate(&self) -> f64 {
495        let total = self.hits + self.misses;
496        if total == 0 {
497            0.0
498        } else {
499            self.hits as f64 / total as f64
500        }
501    }
502}