tibba_cache/
cache.rs

1// Copyright 2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::{Error, RedisClient, RedisClientConn};
16use deadpool_redis::redis::{cmd, pipe};
17use redis::{AsyncCommands, RedisError};
18use serde::{Serialize, de::DeserializeOwned};
19use std::{borrow::Cow, time::Duration};
20use tibba_util::{Algorithm, compress, decompress};
21
22type Result<T> = std::result::Result<T, Error>;
23
24fn map_err(category: &str, e: RedisError) -> Error {
25    Error::Redis {
26        category: category.to_string(),
27        source: e,
28    }
29}
30
31/// Redis cache implementation that provides various caching operations
32pub struct RedisCache {
33    /// Time-to-live duration for cache entries
34    ttl: Duration,
35    /// Prefix added to all cache keys
36    prefix: String,
37    /// Redis connection pool
38    client: &'static RedisClient,
39}
40
41impl RedisCache {
42    #[inline]
43    pub async fn conn(&self) -> Result<RedisClientConn> {
44        self.client.conn().await
45    }
46    /// Creates a new RedisCacheBuilder with default settings:
47    /// - TTL: 10 minutes
48    /// - Empty prefix
49    /// - Given Redis pool
50    pub fn new(client: &'static RedisClient) -> Self {
51        Self {
52            ttl: Duration::from_secs(10 * 60),
53            prefix: "".to_string(),
54            client,
55        }
56    }
57
58    /// Sets the time-to-live duration for cache entries
59    /// Returns self for method chaining
60    pub fn with_ttl(mut self, ttl: Duration) -> Self {
61        self.ttl = ttl;
62        self
63    }
64
65    /// Sets the prefix for all cache keys
66    /// Returns self for method chaining
67    pub fn with_prefix(mut self, prefix: String) -> Self {
68        self.prefix = prefix;
69        self
70    }
71
72    /// Generates the full cache key by combining prefix (if any) with the provided key
73    /// # Arguments
74    /// * `key` - The base key to be prefixed
75    /// # Returns
76    /// * If prefix is empty: returns the original key
77    /// * If prefix exists: returns prefix + key
78    fn get_key<'a>(&'a self, key: &'a str) -> Cow<'a, str> {
79        if self.prefix.is_empty() {
80            Cow::Borrowed(key)
81        } else {
82            Cow::Owned(format!("{}{}", self.prefix, key))
83        }
84    }
85    /// Pings the Redis server to check connection
86    /// # Returns
87    /// * `Ok(())` - Connection is successful
88    /// * `Err(Error)` - Redis operation failed
89    pub async fn ping(&self) -> Result<()> {
90        let () = self
91            .conn()
92            .await?
93            .ping()
94            .await
95            .map_err(|e| map_err("ping", e))?;
96        Ok(())
97    }
98    /// Retrieves a raw value from Redis for the given key
99    /// # Type Parameters
100    /// * `T` - The type to deserialize the Redis value into
101    /// # Arguments
102    /// * `key` - The key to retrieve
103    /// # Returns
104    /// * `Ok(T)` - Successfully retrieved and converted value
105    /// * `Err(Error)` - Redis error or value conversion error
106    async fn get_value<T: redis::FromRedisValue>(&self, key: &str) -> Result<T> {
107        let result = self
108            .conn()
109            .await?
110            .get(key)
111            .await
112            .map_err(|e| map_err("get", e))?;
113
114        Ok(result)
115    }
116    /// Stores a raw value in Redis with optional TTL
117    /// # Type Parameters
118    /// * `T` - The type of value to store, must be convertible to Redis data
119    /// # Arguments
120    /// * `key` - The key under which to store the value
121    /// * `value` - The value to store
122    /// * `ttl` - Optional time-to-live duration (uses instance default if None)
123    async fn set_value<T: redis::ToRedisArgs + Send + Sync>(
124        &self,
125        key: &str,
126        value: T,
127        ttl: Option<Duration>,
128    ) -> Result<()> {
129        let seconds = ttl.unwrap_or(self.ttl).as_secs();
130        let () = self
131            .conn()
132            .await?
133            .set_ex(key, value, seconds)
134            .await
135            .map_err(|e| map_err("set", e))?;
136        Ok(())
137    }
138    /// Attempts to acquire a distributed lock using Redis SET NX command
139    /// # Arguments
140    /// * `key` - The lock key
141    /// * `ttl` - Optional lock duration (uses instance default if None)
142    /// # Returns
143    /// * `Ok(true)` - Lock was successfully acquired
144    /// * `Ok(false)` - Lock already exists
145    /// * `Err(Error)` - Redis operation failed
146    pub async fn lock(&self, key: &str, ttl: Option<Duration>) -> Result<bool> {
147        let mut conn = self.conn().await?;
148
149        let result = cmd("SET")
150            .arg(self.get_key(key))
151            .arg(true)
152            .arg("NX")
153            .arg("EX")
154            .arg(ttl.unwrap_or(self.ttl).as_secs())
155            .query_async(&mut conn)
156            .await
157            .map_err(|e| map_err("lock", e))?;
158        Ok(result)
159    }
160    /// Removes a key and its value from Redis
161    /// # Arguments
162    /// * `key` - The key to delete
163    /// # Returns
164    /// * `Ok(())` - Key was successfully deleted (or didn't exist)
165    /// * `Err(Error)` - Redis operation failed
166    pub async fn del(&self, key: &str) -> Result<()> {
167        let () = self
168            .conn()
169            .await?
170            .del(self.get_key(key))
171            .await
172            .map_err(|e| map_err("del", e))?;
173
174        Ok(())
175    }
176    /// Atomically increments a counter by delta
177    /// # Arguments
178    /// * `key` - The counter key
179    /// * `delta` - Amount to increment by (can be negative)
180    /// * `ttl` - Optional time-to-live for the counter
181    /// # Returns
182    /// * `Ok(i64)` - The new value after incrementing
183    /// * `Err(Error)` - Redis operation failed
184    /// # Notes
185    /// If the key doesn't exist, it's initialized to 0 with ttl before incrementing
186    pub async fn incr(&self, key: &str, delta: i64, ttl: Option<Duration>) -> Result<i64> {
187        let mut conn = self.conn().await?;
188        let k = self.get_key(key);
189        let (_, count) = pipe()
190            .cmd("SET")
191            .arg(&k)
192            .arg(0)
193            .arg("NX")
194            .arg("EX")
195            .arg(ttl.unwrap_or(self.ttl).as_secs())
196            .cmd("INCRBY")
197            .arg(&k)
198            .arg(delta)
199            .query_async::<(bool, i64)>(&mut conn)
200            .await
201            .map_err(|e| map_err("incr", e))?;
202        Ok(count)
203    }
204    /// Sets a value in Redis with an optional TTL
205    /// - If TTL is None, uses the default TTL configured for this cache
206    /// - Value type must implement ToRedisArgs trait
207    /// - Key will be automatically prefixed if a prefix is configured
208    pub async fn set<T: redis::ToRedisArgs + Send + Sync>(
209        &self,
210        key: &str,
211        value: T,
212        ttl: Option<Duration>,
213    ) -> Result<()> {
214        self.set_value(&self.get_key(key), value, ttl).await
215    }
216    /// Retrieves a value from Redis
217    /// - Value type must implement FromRedisValue trait
218    /// - Key will be automatically prefixed if a prefix is configured
219    /// - Returns Error if key doesn't exist or value can't be converted to T
220    pub async fn get<T: redis::FromRedisValue>(&self, key: &str) -> Result<T> {
221        self.get_value::<T>(&self.get_key(key)).await
222    }
223    /// Serializes and stores a struct in Redis as JSON
224    /// - Value must implement Serialize trait
225    /// - Optional TTL (uses default if None)
226    /// - Key will be automatically prefixed
227    pub async fn set_struct<T>(&self, key: &str, value: &T, ttl: Option<Duration>) -> Result<()>
228    where
229        T: ?Sized + Serialize,
230    {
231        let value = serde_json::to_vec(&value).map_err(|e| Error::Common {
232            category: "set_struct".to_string(),
233            message: e.to_string(),
234        })?;
235        self.set_value(&self.get_key(key), &value, ttl).await?;
236        Ok(())
237    }
238    /// Retrieves and deserializes a struct from Redis
239    /// - Type must implement DeserializeOwned trait
240    /// - Returns None if key doesn't exist
241    /// - Returns Error if deserialization fails
242    /// - Key will be automatically prefixed
243    pub async fn get_struct<T>(&self, key: &str) -> Result<Option<T>>
244    where
245        T: DeserializeOwned,
246    {
247        let buf: Vec<u8> = self.get_value(&self.get_key(key)).await?;
248
249        if buf.is_empty() {
250            return Ok(None);
251        }
252
253        let deserializer = &mut serde_json::Deserializer::from_slice(&buf);
254        let result = T::deserialize(deserializer).map_err(|e| Error::Common {
255            category: "get_struct".to_string(),
256            message: e.to_string(),
257        })?;
258
259        Ok(Some(result))
260    }
261    /// Gets the remaining time-to-live for a key
262    /// # Arguments
263    /// * `key` - The key to check
264    /// # Returns
265    /// * `Ok(seconds)` where:
266    ///   * `seconds > 0` - Remaining time in seconds
267    ///   * `seconds = -2` - Key does not exist
268    ///   * `seconds = -1` - Key exists but has no expiry
269    /// * `Err(Error)` - Redis operation failed
270    pub async fn ttl(&self, key: &str) -> Result<i32> {
271        let result = self
272            .conn()
273            .await?
274            .ttl(self.get_key(key))
275            .await
276            .map_err(|e| map_err("ttl", e))?;
277
278        Ok(result)
279    }
280    /// Atomically retrieves a value and deletes it from Redis(>=6.2.0)
281    /// # Type Parameters
282    /// * `T` - The type to deserialize the Redis value into
283    /// # Arguments
284    /// * `key` - The key to get and delete
285    /// # Returns
286    /// * `Ok(T)` - The value before deletion
287    /// * `Err(Error)` - Redis operation failed or value conversion error
288    pub async fn get_del<T: redis::FromRedisValue>(&self, key: &str) -> Result<T> {
289        let result = self
290            .conn()
291            .await?
292            .get_del(self.get_key(key))
293            .await
294            .map_err(|e| map_err("get_del", e))?;
295
296        Ok(result)
297    }
298    async fn set_struct_compressed<T>(
299        &self,
300        key: &str,
301        value: &T,
302        ttl: Option<Duration>,
303        algorithm: Algorithm,
304    ) -> Result<()>
305    where
306        T: ?Sized + Serialize,
307    {
308        let value = serde_json::to_vec(value).map_err(|e| Error::Common {
309            category: "serde_json".to_string(),
310            message: e.to_string(),
311        })?;
312        let buf = compress(&value, algorithm).map_err(|e| Error::Compression { source: e })?;
313        self.set_value(&self.get_key(key), &buf, ttl).await
314    }
315
316    async fn get_struct_compressed<T>(&self, key: &str, algorithm: Algorithm) -> Result<Option<T>>
317    where
318        T: DeserializeOwned,
319    {
320        let value: Option<Vec<u8>> = self.get_value(&self.get_key(key)).await?;
321        match value {
322            None => Ok(None),
323            Some(compressed_buf) => {
324                let buf = decompress(&compressed_buf, algorithm)
325                    .map_err(|e| Error::Compression { source: e })?;
326                serde_json::from_slice(&buf)
327                    .map_err(|e| Error::Common {
328                        category: "serde_json".to_string(),
329                        message: e.to_string(),
330                    })
331                    .map(Some)
332            }
333        }
334    }
335    /// Serializes a struct to JSON, compresses it with LZ4, and stores in Redis
336    /// # Type Parameters
337    /// * `T` - The struct type to serialize
338    /// # Arguments
339    /// * `key` - The key under which to store the compressed data
340    /// * `value` - The struct to serialize and compress
341    /// * `ttl` - Optional time-to-live duration
342    /// # Notes
343    /// Uses LZ4 compression which favors speed over compression ratio
344    pub async fn set_struct_lz4<T>(&self, key: &str, value: &T, ttl: Option<Duration>) -> Result<()>
345    where
346        T: ?Sized + Serialize,
347    {
348        self.set_struct_compressed(key, value, ttl, Algorithm::Lz4)
349            .await
350    }
351    /// Retrieves, decompresses (LZ4), and deserializes a struct from Redis
352    /// # Type Parameters
353    /// * `T` - The struct type to deserialize into
354    /// # Arguments
355    /// * `key` - The key to retrieve
356    /// # Returns
357    /// * `Ok(Some(T))` - Successfully retrieved and deserialized value
358    /// * `Ok(None)` - Key doesn't exist
359    /// * `Err(Error)` - Redis, decompression, or deserialization error
360    pub async fn get_struct_lz4<T>(&self, key: &str) -> Result<Option<T>>
361    where
362        T: DeserializeOwned,
363    {
364        self.get_struct_compressed(key, Algorithm::Lz4).await
365    }
366    /// Serializes a struct to JSON, compresses it with Zstd, and stores in Redis
367    /// # Type Parameters
368    /// * `T` - The struct type to serialize
369    /// # Arguments
370    /// * `key` - The key under which to store the compressed data
371    /// * `value` - The struct to serialize and compress
372    /// * `ttl` - Optional time-to-live duration
373    /// # Notes
374    /// Uses Zstd compression which provides better compression ratios than LZ4
375    pub async fn set_struct_zstd<T>(
376        &self,
377        key: &str,
378        value: &T,
379        ttl: Option<Duration>,
380    ) -> Result<()>
381    where
382        T: ?Sized + Serialize,
383    {
384        self.set_struct_compressed(key, value, ttl, Algorithm::default())
385            .await
386    }
387    /// Retrieves, decompresses (Zstd), and deserializes a struct from Redis
388    /// # Type Parameters
389    /// * `T` - The struct type to deserialize into
390    /// # Arguments
391    /// * `key` - The key to retrieve
392    /// # Returns
393    /// * `Ok(Some(T))` - Successfully retrieved and deserialized value
394    /// * `Ok(None)` - Key doesn't exist
395    /// * `Err(Error)` - Redis, decompression, or deserialization error
396    pub async fn get_struct_zstd<T>(&self, key: &str) -> Result<Option<T>>
397    where
398        T: DeserializeOwned,
399    {
400        self.get_struct_compressed(key, Algorithm::default()).await
401    }
402}