use super::{Error, RedisClient, RedisClientConn};
use deadpool_redis::redis::{cmd, pipe};
use redis::{AsyncCommands, RedisError};
use serde::{Serialize, de::DeserializeOwned};
use std::{borrow::Cow, time::Duration};
use tibba_util::{Algorithm, compress, decompress};
type Result<T> = std::result::Result<T, Error>;
fn map_err(category: &str, e: RedisError) -> Error {
Error::Redis {
category: category.to_string(),
source: e,
}
}
pub struct RedisCache {
ttl: Duration,
prefix: String,
client: &'static RedisClient,
}
impl RedisCache {
#[inline]
pub async fn conn(&self) -> Result<RedisClientConn> {
self.client.conn().await
}
pub fn new(client: &'static RedisClient) -> Self {
Self {
ttl: Duration::from_secs(10 * 60),
prefix: "".to_string(),
client,
}
}
pub fn with_ttl(mut self, ttl: Duration) -> Self {
self.ttl = ttl;
self
}
pub fn with_prefix(mut self, prefix: String) -> Self {
self.prefix = prefix;
self
}
fn get_key<'a>(&'a self, key: &'a str) -> Cow<'a, str> {
if self.prefix.is_empty() {
Cow::Borrowed(key)
} else {
Cow::Owned(format!("{}{}", self.prefix, key))
}
}
pub async fn ping(&self) -> Result<()> {
let () = self
.conn()
.await?
.ping()
.await
.map_err(|e| map_err("ping", e))?;
Ok(())
}
async fn get_value<T: redis::FromRedisValue>(&self, key: &str) -> Result<T> {
let result = self
.conn()
.await?
.get(key)
.await
.map_err(|e| map_err("get", e))?;
Ok(result)
}
async fn set_value<T: redis::ToRedisArgs + Send + Sync>(
&self,
key: &str,
value: T,
ttl: Option<Duration>,
) -> Result<()> {
let seconds = ttl.unwrap_or(self.ttl).as_secs();
let () = self
.conn()
.await?
.set_ex(key, value, seconds)
.await
.map_err(|e| map_err("set", e))?;
Ok(())
}
pub async fn lock(&self, key: &str, ttl: Option<Duration>) -> Result<bool> {
let mut conn = self.conn().await?;
let result = cmd("SET")
.arg(self.get_key(key))
.arg(true)
.arg("NX")
.arg("EX")
.arg(ttl.unwrap_or(self.ttl).as_secs())
.query_async(&mut conn)
.await
.map_err(|e| map_err("lock", e))?;
Ok(result)
}
pub async fn del(&self, key: &str) -> Result<()> {
let () = self
.conn()
.await?
.del(self.get_key(key))
.await
.map_err(|e| map_err("del", e))?;
Ok(())
}
pub async fn incr(&self, key: &str, delta: i64, ttl: Option<Duration>) -> Result<i64> {
let mut conn = self.conn().await?;
let k = self.get_key(key);
let (_, count) = pipe()
.cmd("SET")
.arg(&k)
.arg(0)
.arg("NX")
.arg("EX")
.arg(ttl.unwrap_or(self.ttl).as_secs())
.cmd("INCRBY")
.arg(&k)
.arg(delta)
.query_async::<(bool, i64)>(&mut conn)
.await
.map_err(|e| map_err("incr", e))?;
Ok(count)
}
pub async fn set<T: redis::ToRedisArgs + Send + Sync>(
&self,
key: &str,
value: T,
ttl: Option<Duration>,
) -> Result<()> {
self.set_value(&self.get_key(key), value, ttl).await
}
pub async fn get<T: redis::FromRedisValue>(&self, key: &str) -> Result<T> {
self.get_value::<T>(&self.get_key(key)).await
}
pub async fn set_struct<T>(&self, key: &str, value: &T, ttl: Option<Duration>) -> Result<()>
where
T: ?Sized + Serialize,
{
let value = serde_json::to_vec(&value).map_err(|e| Error::Common {
category: "set_struct".to_string(),
message: e.to_string(),
})?;
self.set_value(&self.get_key(key), &value, ttl).await?;
Ok(())
}
pub async fn get_struct<T>(&self, key: &str) -> Result<Option<T>>
where
T: DeserializeOwned,
{
let buf: Vec<u8> = self.get_value(&self.get_key(key)).await?;
if buf.is_empty() {
return Ok(None);
}
let deserializer = &mut serde_json::Deserializer::from_slice(&buf);
let result = T::deserialize(deserializer).map_err(|e| Error::Common {
category: "get_struct".to_string(),
message: e.to_string(),
})?;
Ok(Some(result))
}
pub async fn ttl(&self, key: &str) -> Result<i32> {
let result = self
.conn()
.await?
.ttl(self.get_key(key))
.await
.map_err(|e| map_err("ttl", e))?;
Ok(result)
}
pub async fn get_del<T: redis::FromRedisValue>(&self, key: &str) -> Result<T> {
let result = self
.conn()
.await?
.get_del(self.get_key(key))
.await
.map_err(|e| map_err("get_del", e))?;
Ok(result)
}
async fn set_struct_compressed<T>(
&self,
key: &str,
value: &T,
ttl: Option<Duration>,
algorithm: Algorithm,
) -> Result<()>
where
T: ?Sized + Serialize,
{
let value = serde_json::to_vec(value).map_err(|e| Error::Common {
category: "serde_json".to_string(),
message: e.to_string(),
})?;
let buf = compress(&value, algorithm).map_err(|e| Error::Compression { source: e })?;
self.set_value(&self.get_key(key), &buf, ttl).await
}
async fn get_struct_compressed<T>(&self, key: &str, algorithm: Algorithm) -> Result<Option<T>>
where
T: DeserializeOwned,
{
let value: Option<Vec<u8>> = self.get_value(&self.get_key(key)).await?;
match value {
None => Ok(None),
Some(compressed_buf) => {
let buf = decompress(&compressed_buf, algorithm)
.map_err(|e| Error::Compression { source: e })?;
serde_json::from_slice(&buf)
.map_err(|e| Error::Common {
category: "serde_json".to_string(),
message: e.to_string(),
})
.map(Some)
}
}
}
pub async fn set_struct_lz4<T>(&self, key: &str, value: &T, ttl: Option<Duration>) -> Result<()>
where
T: ?Sized + Serialize,
{
self.set_struct_compressed(key, value, ttl, Algorithm::Lz4)
.await
}
pub async fn get_struct_lz4<T>(&self, key: &str) -> Result<Option<T>>
where
T: DeserializeOwned,
{
self.get_struct_compressed(key, Algorithm::Lz4).await
}
pub async fn set_struct_zstd<T>(
&self,
key: &str,
value: &T,
ttl: Option<Duration>,
) -> Result<()>
where
T: ?Sized + Serialize,
{
self.set_struct_compressed(key, value, ttl, Algorithm::default())
.await
}
pub async fn get_struct_zstd<T>(&self, key: &str) -> Result<Option<T>>
where
T: DeserializeOwned,
{
self.get_struct_compressed(key, Algorithm::default()).await
}
}