use super::{CompressionSnafu, Error, RedisClient, RedisClientConn, RedisSnafu, SerdeJsonSnafu};
use deadpool_redis::redis::{cmd, pipe};
use redis::AsyncCommands;
use serde::{Serialize, de::DeserializeOwned};
use snafu::ResultExt;
use std::{borrow::Cow, time::Duration};
use tibba_util::{Algorithm, compress, decompress};
const DEFAULT_ZSTD: Algorithm = Algorithm::Zstd(3);
type Result<T> = std::result::Result<T, Error>;
pub struct RedisCache {
ttl: Duration,
prefix: String,
client: &'static RedisClient,
}
impl RedisCache {
#[inline]
pub async fn conn(&self) -> Result<RedisClientConn> {
self.client.conn().await
}
pub fn new(client: &'static RedisClient) -> Self {
Self {
ttl: Duration::from_secs(10 * 60),
prefix: String::new(),
client,
}
}
#[must_use]
pub fn with_ttl(mut self, ttl: Duration) -> Self {
self.ttl = ttl;
self
}
#[must_use]
pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
self.prefix = prefix.into();
self
}
#[inline]
fn get_ttl(&self, ttl: Option<Duration>) -> u64 {
ttl.unwrap_or(self.ttl).as_secs()
}
#[inline]
fn get_key<'a>(&'a self, key: &'a str) -> Cow<'a, str> {
if self.prefix.is_empty() {
Cow::Borrowed(key)
} else {
Cow::Owned(format!("{}{}", self.prefix, key))
}
}
pub async fn ping(&self) -> Result<()> {
let () = self
.conn()
.await?
.ping()
.await
.context(RedisSnafu { category: "ping" })?;
Ok(())
}
async fn get_value<T: redis::FromRedisValue>(&self, key: &str) -> Result<T> {
let result = self
.conn()
.await?
.get(key)
.await
.context(RedisSnafu { category: "get" })?;
Ok(result)
}
async fn set_value<T: redis::ToSingleRedisArg + Send + Sync>(
&self,
key: &str,
value: T,
ttl: u64,
) -> Result<()> {
let () = self
.conn()
.await?
.set_ex(key, value, ttl)
.await
.context(RedisSnafu { category: "set" })?;
Ok(())
}
pub async fn lock(&self, key: &str, ttl: Option<Duration>) -> Result<bool> {
let mut conn = self.conn().await?;
let result = cmd("SET")
.arg(self.get_key(key))
.arg(true)
.arg("NX")
.arg("EX")
.arg(self.get_ttl(ttl))
.query_async(&mut conn)
.await
.context(RedisSnafu { category: "lock" })?;
Ok(result)
}
pub async fn del(&self, key: &str) -> Result<()> {
let () = self
.conn()
.await?
.del(self.get_key(key))
.await
.context(RedisSnafu { category: "del" })?;
Ok(())
}
pub async fn incr(&self, key: &str, delta: i64, ttl: Option<Duration>) -> Result<i64> {
let mut conn = self.conn().await?;
let k = self.get_key(key);
let (count, _) = pipe()
.cmd("INCRBY")
.arg(&k)
.arg(delta) .cmd("EXPIRE")
.arg(&k)
.arg(self.get_ttl(ttl))
.arg("NX") .query_async::<(i64, bool)>(&mut conn)
.await
.context(RedisSnafu { category: "incr" })?;
Ok(count)
}
pub async fn set<T: redis::ToSingleRedisArg + Send + Sync>(
&self,
key: &str,
value: T,
ttl: Option<Duration>,
) -> Result<()> {
self.set_value(&self.get_key(key), value, self.get_ttl(ttl))
.await
}
pub async fn get<T: redis::FromRedisValue>(&self, key: &str) -> Result<T> {
self.get_value::<T>(&self.get_key(key)).await
}
pub async fn set_struct<T>(&self, key: &str, value: &T, ttl: Option<Duration>) -> Result<()>
where
T: ?Sized + Serialize,
{
let value = serde_json::to_vec(&value).context(SerdeJsonSnafu)?;
self.set_value(&self.get_key(key), &value, self.get_ttl(ttl))
.await?;
Ok(())
}
pub async fn get_struct<T>(&self, key: &str) -> Result<Option<T>>
where
T: DeserializeOwned,
{
let buf: Option<Vec<u8>> = self.get_value(&self.get_key(key)).await?;
match buf {
None => Ok(None),
Some(b) => serde_json::from_slice(&b).context(SerdeJsonSnafu).map(Some),
}
}
pub async fn ttl(&self, key: &str) -> Result<i32> {
let result = self
.conn()
.await?
.ttl(self.get_key(key))
.await
.context(RedisSnafu { category: "ttl" })?;
Ok(result)
}
pub async fn get_del<T: redis::FromRedisValue>(&self, key: &str) -> Result<T> {
let result = self
.conn()
.await?
.get_del(self.get_key(key))
.await
.context(RedisSnafu {
category: "get_del",
})?;
Ok(result)
}
pub async fn exists(&self, key: &str) -> Result<bool> {
let result = self
.conn()
.await?
.exists(self.get_key(key))
.await
.context(RedisSnafu { category: "exists" })?;
Ok(result)
}
pub async fn expire(&self, key: &str, ttl: Option<Duration>) -> Result<bool> {
let result = self
.conn()
.await?
.expire(self.get_key(key), self.get_ttl(ttl) as i64)
.await
.context(RedisSnafu { category: "expire" })?;
Ok(result)
}
async fn set_struct_compressed<T>(
&self,
key: &str,
value: &T,
ttl: u64,
algorithm: Algorithm,
) -> Result<()>
where
T: ?Sized + Serialize,
{
let value = serde_json::to_vec(value).context(SerdeJsonSnafu)?;
let buf = compress(&value, algorithm).context(CompressionSnafu)?;
self.set_value(key, &buf, ttl).await
}
async fn get_struct_compressed<T>(&self, key: &str, algorithm: Algorithm) -> Result<Option<T>>
where
T: DeserializeOwned,
{
let value: Option<Vec<u8>> = self.get_value(&self.get_key(key)).await?;
match value {
None => Ok(None),
Some(compressed_buf) => {
let buf = decompress(&compressed_buf, algorithm).context(CompressionSnafu)?;
serde_json::from_slice(&buf)
.context(SerdeJsonSnafu)
.map(Some)
}
}
}
pub async fn set_struct_lz4<T>(&self, key: &str, value: &T, ttl: Option<Duration>) -> Result<()>
where
T: ?Sized + Serialize,
{
self.set_struct_compressed(&self.get_key(key), value, self.get_ttl(ttl), Algorithm::Lz4)
.await
}
pub async fn get_struct_lz4<T>(&self, key: &str) -> Result<Option<T>>
where
T: DeserializeOwned,
{
self.get_struct_compressed(key, Algorithm::Lz4).await
}
pub async fn set_struct_zstd<T>(
&self,
key: &str,
value: &T,
ttl: Option<Duration>,
) -> Result<()>
where
T: ?Sized + Serialize,
{
self.set_struct_compressed(&self.get_key(key), value, self.get_ttl(ttl), DEFAULT_ZSTD)
.await
}
pub async fn get_struct_zstd<T>(&self, key: &str) -> Result<Option<T>>
where
T: DeserializeOwned,
{
self.get_struct_compressed(key, DEFAULT_ZSTD).await
}
}