Skip to main content

ferro_rs/cache/
redis.rs

1//! Redis-backed cache implementation
2
3use async_trait::async_trait;
4use redis::{aio::ConnectionManager, AsyncCommands, Client};
5use std::time::Duration;
6
7use super::config::CacheConfig;
8use super::store::CacheStore;
9use crate::error::FrameworkError;
10
11/// Redis cache implementation
12///
13/// Uses redis-rs with async/tokio runtime for high-performance caching.
14pub struct RedisCache {
15    conn: ConnectionManager,
16    prefix: String,
17    default_ttl: Option<Duration>,
18}
19
20impl RedisCache {
21    /// Create a new Redis cache connection with a 2-second timeout
22    pub async fn connect(config: &CacheConfig) -> Result<Self, FrameworkError> {
23        let client = Client::open(config.url.as_str())
24            .map_err(|e| FrameworkError::internal(format!("Redis connection error: {e}")))?;
25
26        // Timeout after 2 seconds to avoid hanging when Redis is unavailable
27        let conn = tokio::time::timeout(Duration::from_secs(2), ConnectionManager::new(client))
28            .await
29            .map_err(|_| FrameworkError::internal("Redis connection timeout".to_string()))?
30            .map_err(|e| {
31                FrameworkError::internal(format!("Redis connection manager error: {e}"))
32            })?;
33
34        let default_ttl = if config.default_ttl > 0 {
35            Some(Duration::from_secs(config.default_ttl))
36        } else {
37            None
38        };
39
40        Ok(Self {
41            conn,
42            prefix: config.prefix.clone(),
43            default_ttl,
44        })
45    }
46
47    fn prefixed_key(&self, key: &str) -> String {
48        format!("{}{}", self.prefix, key)
49    }
50}
51
52#[async_trait]
53impl CacheStore for RedisCache {
54    async fn get_raw(&self, key: &str) -> Result<Option<String>, FrameworkError> {
55        let mut conn = self.conn.clone();
56        let key = self.prefixed_key(key);
57
58        let value: Option<String> = conn
59            .get(&key)
60            .await
61            .map_err(|e| FrameworkError::internal(format!("Cache get error: {e}")))?;
62
63        Ok(value)
64    }
65
66    async fn put_raw(
67        &self,
68        key: &str,
69        value: &str,
70        ttl: Option<Duration>,
71    ) -> Result<(), FrameworkError> {
72        let mut conn = self.conn.clone();
73        let key = self.prefixed_key(key);
74
75        let effective_ttl = ttl.or(self.default_ttl);
76
77        if let Some(duration) = effective_ttl {
78            conn.set_ex::<_, _, ()>(&key, value, duration.as_secs())
79                .await
80                .map_err(|e| FrameworkError::internal(format!("Cache set error: {e}")))?;
81        } else {
82            conn.set::<_, _, ()>(&key, value)
83                .await
84                .map_err(|e| FrameworkError::internal(format!("Cache set error: {e}")))?;
85        }
86
87        Ok(())
88    }
89
90    async fn has(&self, key: &str) -> Result<bool, FrameworkError> {
91        let mut conn = self.conn.clone();
92        let key = self.prefixed_key(key);
93
94        let exists: bool = conn
95            .exists(&key)
96            .await
97            .map_err(|e| FrameworkError::internal(format!("Cache exists error: {e}")))?;
98
99        Ok(exists)
100    }
101
102    async fn forget(&self, key: &str) -> Result<bool, FrameworkError> {
103        let mut conn = self.conn.clone();
104        let key = self.prefixed_key(key);
105
106        let deleted: i64 = conn
107            .del(&key)
108            .await
109            .map_err(|e| FrameworkError::internal(format!("Cache delete error: {e}")))?;
110
111        Ok(deleted > 0)
112    }
113
114    async fn flush(&self) -> Result<(), FrameworkError> {
115        let mut conn = self.conn.clone();
116
117        // Use KEYS to find and delete all keys with our prefix
118        // Note: KEYS is O(N) and should be used carefully in production
119        let pattern = format!("{}*", self.prefix);
120        let keys: Vec<String> = redis::cmd("KEYS")
121            .arg(&pattern)
122            .query_async(&mut conn)
123            .await
124            .map_err(|e| FrameworkError::internal(format!("Cache flush scan error: {e}")))?;
125
126        if !keys.is_empty() {
127            conn.del::<_, ()>(keys)
128                .await
129                .map_err(|e| FrameworkError::internal(format!("Cache flush delete error: {e}")))?;
130        }
131
132        Ok(())
133    }
134
135    async fn increment(&self, key: &str, amount: i64) -> Result<i64, FrameworkError> {
136        let mut conn = self.conn.clone();
137        let key = self.prefixed_key(key);
138
139        let value: i64 = conn
140            .incr(&key, amount)
141            .await
142            .map_err(|e| FrameworkError::internal(format!("Cache increment error: {e}")))?;
143
144        Ok(value)
145    }
146
147    async fn decrement(&self, key: &str, amount: i64) -> Result<i64, FrameworkError> {
148        let mut conn = self.conn.clone();
149        let key = self.prefixed_key(key);
150
151        let value: i64 = conn
152            .decr(&key, amount)
153            .await
154            .map_err(|e| FrameworkError::internal(format!("Cache decrement error: {e}")))?;
155
156        Ok(value)
157    }
158
159    async fn expire(&self, key: &str, ttl: Duration) -> Result<bool, FrameworkError> {
160        let mut conn = self.conn.clone();
161        let key = self.prefixed_key(key);
162
163        let result: bool = conn
164            .expire(&key, ttl.as_secs() as i64)
165            .await
166            .map_err(|e| FrameworkError::internal(format!("Cache expire error: {e}")))?;
167
168        Ok(result)
169    }
170}