tibba_cache/cache.rs
1// Copyright 2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::{Error, RedisClient, RedisClientConn};
16use deadpool_redis::redis::{cmd, pipe};
17use redis::{AsyncCommands, RedisError};
18use serde::{Serialize, de::DeserializeOwned};
19use std::{borrow::Cow, time::Duration};
20use tibba_util::{Algorithm, compress, decompress};
21
22type Result<T> = std::result::Result<T, Error>;
23
24fn map_err(category: &str, e: RedisError) -> Error {
25 Error::Redis {
26 category: category.to_string(),
27 source: e,
28 }
29}
30
31/// Redis cache implementation that provides various caching operations
32pub struct RedisCache {
33 /// Time-to-live duration for cache entries
34 ttl: Duration,
35 /// Prefix added to all cache keys
36 prefix: String,
37 /// Redis connection pool
38 client: &'static RedisClient,
39}
40
41impl RedisCache {
42 #[inline]
43 pub async fn conn(&self) -> Result<RedisClientConn> {
44 self.client.conn().await
45 }
46 /// Creates a new RedisCacheBuilder with default settings:
47 /// - TTL: 10 minutes
48 /// - Empty prefix
49 /// - Given Redis pool
50 pub fn new(client: &'static RedisClient) -> Self {
51 Self {
52 ttl: Duration::from_secs(10 * 60),
53 prefix: "".to_string(),
54 client,
55 }
56 }
57
58 /// Sets the time-to-live duration for cache entries
59 /// Returns self for method chaining
60 pub fn with_ttl(mut self, ttl: Duration) -> Self {
61 self.ttl = ttl;
62 self
63 }
64
65 /// Sets the prefix for all cache keys
66 /// Returns self for method chaining
67 pub fn with_prefix(mut self, prefix: String) -> Self {
68 self.prefix = prefix;
69 self
70 }
71
72 /// Generates the full cache key by combining prefix (if any) with the provided key
73 /// # Arguments
74 /// * `key` - The base key to be prefixed
75 /// # Returns
76 /// * If prefix is empty: returns the original key
77 /// * If prefix exists: returns prefix + key
78 fn get_key<'a>(&'a self, key: &'a str) -> Cow<'a, str> {
79 if self.prefix.is_empty() {
80 Cow::Borrowed(key)
81 } else {
82 Cow::Owned(format!("{}{}", self.prefix, key))
83 }
84 }
85 /// Pings the Redis server to check connection
86 /// # Returns
87 /// * `Ok(())` - Connection is successful
88 /// * `Err(Error)` - Redis operation failed
89 pub async fn ping(&self) -> Result<()> {
90 let () = self
91 .conn()
92 .await?
93 .ping()
94 .await
95 .map_err(|e| map_err("ping", e))?;
96 Ok(())
97 }
98 /// Retrieves a raw value from Redis for the given key
99 /// # Type Parameters
100 /// * `T` - The type to deserialize the Redis value into
101 /// # Arguments
102 /// * `key` - The key to retrieve
103 /// # Returns
104 /// * `Ok(T)` - Successfully retrieved and converted value
105 /// * `Err(Error)` - Redis error or value conversion error
106 async fn get_value<T: redis::FromRedisValue>(&self, key: &str) -> Result<T> {
107 let result = self
108 .conn()
109 .await?
110 .get(key)
111 .await
112 .map_err(|e| map_err("get", e))?;
113
114 Ok(result)
115 }
116 /// Stores a raw value in Redis with optional TTL
117 /// # Type Parameters
118 /// * `T` - The type of value to store, must be convertible to Redis data
119 /// # Arguments
120 /// * `key` - The key under which to store the value
121 /// * `value` - The value to store
122 /// * `ttl` - Optional time-to-live duration (uses instance default if None)
123 async fn set_value<T: redis::ToRedisArgs + Send + Sync>(
124 &self,
125 key: &str,
126 value: T,
127 ttl: Option<Duration>,
128 ) -> Result<()> {
129 let seconds = ttl.unwrap_or(self.ttl).as_secs();
130 let () = self
131 .conn()
132 .await?
133 .set_ex(key, value, seconds)
134 .await
135 .map_err(|e| map_err("set", e))?;
136 Ok(())
137 }
138 /// Attempts to acquire a distributed lock using Redis SET NX command
139 /// # Arguments
140 /// * `key` - The lock key
141 /// * `ttl` - Optional lock duration (uses instance default if None)
142 /// # Returns
143 /// * `Ok(true)` - Lock was successfully acquired
144 /// * `Ok(false)` - Lock already exists
145 /// * `Err(Error)` - Redis operation failed
146 pub async fn lock(&self, key: &str, ttl: Option<Duration>) -> Result<bool> {
147 let mut conn = self.conn().await?;
148
149 let result = cmd("SET")
150 .arg(self.get_key(key))
151 .arg(true)
152 .arg("NX")
153 .arg("EX")
154 .arg(ttl.unwrap_or(self.ttl).as_secs())
155 .query_async(&mut conn)
156 .await
157 .map_err(|e| map_err("lock", e))?;
158 Ok(result)
159 }
160 /// Removes a key and its value from Redis
161 /// # Arguments
162 /// * `key` - The key to delete
163 /// # Returns
164 /// * `Ok(())` - Key was successfully deleted (or didn't exist)
165 /// * `Err(Error)` - Redis operation failed
166 pub async fn del(&self, key: &str) -> Result<()> {
167 let () = self
168 .conn()
169 .await?
170 .del(self.get_key(key))
171 .await
172 .map_err(|e| map_err("del", e))?;
173
174 Ok(())
175 }
176 /// Atomically increments a counter by delta
177 /// # Arguments
178 /// * `key` - The counter key
179 /// * `delta` - Amount to increment by (can be negative)
180 /// * `ttl` - Optional time-to-live for the counter
181 /// # Returns
182 /// * `Ok(i64)` - The new value after incrementing
183 /// * `Err(Error)` - Redis operation failed
184 /// # Notes
185 /// If the key doesn't exist, it's initialized to 0 with ttl before incrementing
186 pub async fn incr(&self, key: &str, delta: i64, ttl: Option<Duration>) -> Result<i64> {
187 let mut conn = self.conn().await?;
188 let k = self.get_key(key);
189 let (_, count) = pipe()
190 .cmd("SET")
191 .arg(&k)
192 .arg(0)
193 .arg("NX")
194 .arg("EX")
195 .arg(ttl.unwrap_or(self.ttl).as_secs())
196 .cmd("INCRBY")
197 .arg(&k)
198 .arg(delta)
199 .query_async::<(bool, i64)>(&mut conn)
200 .await
201 .map_err(|e| map_err("incr", e))?;
202 Ok(count)
203 }
204 /// Sets a value in Redis with an optional TTL
205 /// - If TTL is None, uses the default TTL configured for this cache
206 /// - Value type must implement ToRedisArgs trait
207 /// - Key will be automatically prefixed if a prefix is configured
208 pub async fn set<T: redis::ToRedisArgs + Send + Sync>(
209 &self,
210 key: &str,
211 value: T,
212 ttl: Option<Duration>,
213 ) -> Result<()> {
214 self.set_value(&self.get_key(key), value, ttl).await
215 }
216 /// Retrieves a value from Redis
217 /// - Value type must implement FromRedisValue trait
218 /// - Key will be automatically prefixed if a prefix is configured
219 /// - Returns Error if key doesn't exist or value can't be converted to T
220 pub async fn get<T: redis::FromRedisValue>(&self, key: &str) -> Result<T> {
221 self.get_value::<T>(&self.get_key(key)).await
222 }
223 /// Serializes and stores a struct in Redis as JSON
224 /// - Value must implement Serialize trait
225 /// - Optional TTL (uses default if None)
226 /// - Key will be automatically prefixed
227 pub async fn set_struct<T>(&self, key: &str, value: &T, ttl: Option<Duration>) -> Result<()>
228 where
229 T: ?Sized + Serialize,
230 {
231 let value = serde_json::to_vec(&value).map_err(|e| Error::Common {
232 category: "set_struct".to_string(),
233 message: e.to_string(),
234 })?;
235 self.set_value(&self.get_key(key), &value, ttl).await?;
236 Ok(())
237 }
238 /// Retrieves and deserializes a struct from Redis
239 /// - Type must implement DeserializeOwned trait
240 /// - Returns None if key doesn't exist
241 /// - Returns Error if deserialization fails
242 /// - Key will be automatically prefixed
243 pub async fn get_struct<T>(&self, key: &str) -> Result<Option<T>>
244 where
245 T: DeserializeOwned,
246 {
247 let buf: Vec<u8> = self.get_value(&self.get_key(key)).await?;
248
249 if buf.is_empty() {
250 return Ok(None);
251 }
252
253 let deserializer = &mut serde_json::Deserializer::from_slice(&buf);
254 let result = T::deserialize(deserializer).map_err(|e| Error::Common {
255 category: "get_struct".to_string(),
256 message: e.to_string(),
257 })?;
258
259 Ok(Some(result))
260 }
261 /// Gets the remaining time-to-live for a key
262 /// # Arguments
263 /// * `key` - The key to check
264 /// # Returns
265 /// * `Ok(seconds)` where:
266 /// * `seconds > 0` - Remaining time in seconds
267 /// * `seconds = -2` - Key does not exist
268 /// * `seconds = -1` - Key exists but has no expiry
269 /// * `Err(Error)` - Redis operation failed
270 pub async fn ttl(&self, key: &str) -> Result<i32> {
271 let result = self
272 .conn()
273 .await?
274 .ttl(self.get_key(key))
275 .await
276 .map_err(|e| map_err("ttl", e))?;
277
278 Ok(result)
279 }
280 /// Atomically retrieves a value and deletes it from Redis(>=6.2.0)
281 /// # Type Parameters
282 /// * `T` - The type to deserialize the Redis value into
283 /// # Arguments
284 /// * `key` - The key to get and delete
285 /// # Returns
286 /// * `Ok(T)` - The value before deletion
287 /// * `Err(Error)` - Redis operation failed or value conversion error
288 pub async fn get_del<T: redis::FromRedisValue>(&self, key: &str) -> Result<T> {
289 let result = self
290 .conn()
291 .await?
292 .get_del(self.get_key(key))
293 .await
294 .map_err(|e| map_err("get_del", e))?;
295
296 Ok(result)
297 }
298 async fn set_struct_compressed<T>(
299 &self,
300 key: &str,
301 value: &T,
302 ttl: Option<Duration>,
303 algorithm: Algorithm,
304 ) -> Result<()>
305 where
306 T: ?Sized + Serialize,
307 {
308 let value = serde_json::to_vec(value).map_err(|e| Error::Common {
309 category: "serde_json".to_string(),
310 message: e.to_string(),
311 })?;
312 let buf = compress(&value, algorithm).map_err(|e| Error::Compression { source: e })?;
313 self.set_value(&self.get_key(key), &buf, ttl).await
314 }
315
316 async fn get_struct_compressed<T>(&self, key: &str, algorithm: Algorithm) -> Result<Option<T>>
317 where
318 T: DeserializeOwned,
319 {
320 let value: Option<Vec<u8>> = self.get_value(&self.get_key(key)).await?;
321 match value {
322 None => Ok(None),
323 Some(compressed_buf) => {
324 let buf = decompress(&compressed_buf, algorithm)
325 .map_err(|e| Error::Compression { source: e })?;
326 serde_json::from_slice(&buf)
327 .map_err(|e| Error::Common {
328 category: "serde_json".to_string(),
329 message: e.to_string(),
330 })
331 .map(Some)
332 }
333 }
334 }
335 /// Serializes a struct to JSON, compresses it with LZ4, and stores in Redis
336 /// # Type Parameters
337 /// * `T` - The struct type to serialize
338 /// # Arguments
339 /// * `key` - The key under which to store the compressed data
340 /// * `value` - The struct to serialize and compress
341 /// * `ttl` - Optional time-to-live duration
342 /// # Notes
343 /// Uses LZ4 compression which favors speed over compression ratio
344 pub async fn set_struct_lz4<T>(&self, key: &str, value: &T, ttl: Option<Duration>) -> Result<()>
345 where
346 T: ?Sized + Serialize,
347 {
348 self.set_struct_compressed(key, value, ttl, Algorithm::Lz4)
349 .await
350 }
351 /// Retrieves, decompresses (LZ4), and deserializes a struct from Redis
352 /// # Type Parameters
353 /// * `T` - The struct type to deserialize into
354 /// # Arguments
355 /// * `key` - The key to retrieve
356 /// # Returns
357 /// * `Ok(Some(T))` - Successfully retrieved and deserialized value
358 /// * `Ok(None)` - Key doesn't exist
359 /// * `Err(Error)` - Redis, decompression, or deserialization error
360 pub async fn get_struct_lz4<T>(&self, key: &str) -> Result<Option<T>>
361 where
362 T: DeserializeOwned,
363 {
364 self.get_struct_compressed(key, Algorithm::Lz4).await
365 }
366 /// Serializes a struct to JSON, compresses it with Zstd, and stores in Redis
367 /// # Type Parameters
368 /// * `T` - The struct type to serialize
369 /// # Arguments
370 /// * `key` - The key under which to store the compressed data
371 /// * `value` - The struct to serialize and compress
372 /// * `ttl` - Optional time-to-live duration
373 /// # Notes
374 /// Uses Zstd compression which provides better compression ratios than LZ4
375 pub async fn set_struct_zstd<T>(
376 &self,
377 key: &str,
378 value: &T,
379 ttl: Option<Duration>,
380 ) -> Result<()>
381 where
382 T: ?Sized + Serialize,
383 {
384 self.set_struct_compressed(key, value, ttl, Algorithm::default())
385 .await
386 }
387 /// Retrieves, decompresses (Zstd), and deserializes a struct from Redis
388 /// # Type Parameters
389 /// * `T` - The struct type to deserialize into
390 /// # Arguments
391 /// * `key` - The key to retrieve
392 /// # Returns
393 /// * `Ok(Some(T))` - Successfully retrieved and deserialized value
394 /// * `Ok(None)` - Key doesn't exist
395 /// * `Err(Error)` - Redis, decompression, or deserialization error
396 pub async fn get_struct_zstd<T>(&self, key: &str) -> Result<Option<T>>
397 where
398 T: DeserializeOwned,
399 {
400 self.get_struct_compressed(key, Algorithm::default()).await
401 }
402}