use std::collections::HashMap;
use deadpool_redis::Pool;
use redis::AsyncCommands;
use crate::error::RedisResult;
#[derive(Clone)]
pub struct RedisClient {
pool: Pool,
}
impl RedisClient {
pub fn new(pool: Pool) -> Self {
Self { pool }
}
pub fn pool(&self) -> &Pool {
&self.pool
}
pub async fn get(&self, key: &str) -> RedisResult<Option<String>> {
let mut conn = self.pool.get().await?;
let val: Option<String> = conn.get(key).await?;
Ok(val)
}
pub async fn set(&self, key: &str, value: &str, ttl_seconds: Option<u64>) -> RedisResult<()> {
let mut conn = self.pool.get().await?;
match ttl_seconds {
Some(ttl) => {
let _: () = conn.set_ex(key, value, ttl).await?;
}
None => {
let _: () = conn.set(key, value).await?;
}
}
Ok(())
}
pub async fn set_nx(&self, key: &str, value: &str, ttl_seconds: u64) -> RedisResult<bool> {
let mut conn = self.pool.get().await?;
let result: Option<()> = redis::cmd("SET")
.arg(key)
.arg(value)
.arg("NX")
.arg("EX")
.arg(ttl_seconds)
.query_async(&mut *conn)
.await?;
Ok(result.is_some())
}
pub async fn delete(&self, key: &str) -> RedisResult<bool> {
let mut conn = self.pool.get().await?;
let deleted: i64 = conn.del(key).await?;
Ok(deleted > 0)
}
pub async fn exists(&self, key: &str) -> RedisResult<bool> {
let mut conn = self.pool.get().await?;
let exists: bool = conn.exists(key).await?;
Ok(exists)
}
pub async fn expire(&self, key: &str, ttl_seconds: i64) -> RedisResult<bool> {
let mut conn = self.pool.get().await?;
let set: bool = conn.expire(key, ttl_seconds).await?;
Ok(set)
}
pub async fn increment(&self, key: &str) -> RedisResult<i64> {
let mut conn = self.pool.get().await?;
let val: i64 = conn.incr(key, 1i64).await?;
Ok(val)
}
pub async fn increment_by(&self, key: &str, amount: i64) -> RedisResult<i64> {
let mut conn = self.pool.get().await?;
let val: i64 = conn.incr(key, amount).await?;
Ok(val)
}
pub async fn decrement_by(&self, key: &str, amount: i64) -> RedisResult<i64> {
let mut conn = self.pool.get().await?;
let val: i64 = conn.decr(key, amount).await?;
Ok(val)
}
pub async fn ttl(&self, key: &str) -> RedisResult<i64> {
let mut conn = self.pool.get().await?;
let val: i64 = conn.ttl(key).await?;
Ok(val)
}
pub async fn set_json<T: serde::Serialize>(
&self,
key: &str,
value: &T,
ttl_seconds: Option<u64>,
) -> RedisResult<()> {
let json = serde_json::to_string(value)?;
self.set(key, &json, ttl_seconds).await
}
pub async fn get_json<T: serde::de::DeserializeOwned>(
&self,
key: &str,
) -> RedisResult<Option<T>> {
match self.get(key).await? {
Some(raw) => Ok(Some(serde_json::from_str(&raw)?)),
None => Ok(None),
}
}
pub async fn set_multiple(&self, pairs: &[(String, String)]) -> RedisResult<()> {
if pairs.is_empty() {
return Ok(());
}
let mut conn = self.pool.get().await?;
let _: () = conn.mset(pairs).await?;
Ok(())
}
pub async fn mget(&self, keys: &[String]) -> RedisResult<Vec<Option<String>>> {
if keys.is_empty() {
return Ok(Vec::new());
}
let mut conn = self.pool.get().await?;
let values: Vec<Option<String>> = conn.mget(keys).await?;
Ok(values)
}
pub async fn scan_keys(&self, pattern: &str) -> RedisResult<Vec<String>> {
let mut conn = self.pool.get().await?;
let mut keys = Vec::new();
let mut cursor: u64 = 0;
loop {
let (next_cursor, batch): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg(pattern)
.arg("COUNT")
.arg(1000)
.query_async(&mut *conn)
.await?;
keys.extend(batch);
cursor = next_cursor;
if cursor == 0 {
break;
}
}
keys.sort_unstable();
keys.dedup();
Ok(keys)
}
pub async fn set_add(&self, key: &str, members: &[String]) -> RedisResult<usize> {
if members.is_empty() {
return Ok(0);
}
let mut conn = self.pool.get().await?;
let count: usize = conn.sadd(key, members).await?;
Ok(count)
}
pub async fn set_members(&self, key: &str) -> RedisResult<Vec<String>> {
let mut conn = self.pool.get().await?;
let members: Vec<String> = conn.smembers(key).await?;
Ok(members)
}
pub async fn set_is_member(&self, key: &str, member: &str) -> RedisResult<bool> {
let mut conn = self.pool.get().await?;
let is_member: bool = conn.sismember(key, member).await?;
Ok(is_member)
}
pub async fn hash_set(&self, key: &str, field: &str, value: &str) -> RedisResult<()> {
let mut conn = self.pool.get().await?;
let _: () = conn.hset(key, field, value).await?;
Ok(())
}
pub async fn hash_get(&self, key: &str, field: &str) -> RedisResult<Option<String>> {
let mut conn = self.pool.get().await?;
let val: Option<String> = conn.hget(key, field).await?;
Ok(val)
}
pub async fn hash_get_all(&self, key: &str) -> RedisResult<HashMap<String, String>> {
let mut conn = self.pool.get().await?;
let map: HashMap<String, String> = conn.hgetall(key).await?;
Ok(map)
}
pub async fn xadd(
&self,
key: &str,
max_len: usize,
fields: &[(&str, &str)],
) -> RedisResult<String> {
let mut conn = self.pool.get().await?;
let mut cmd = redis::cmd("XADD");
cmd.arg(key).arg("MAXLEN").arg("~").arg(max_len).arg("*");
for &(field, value) in fields {
cmd.arg(field).arg(value);
}
let id: String = cmd.query_async(&mut *conn).await?;
Ok(id)
}
pub async fn list_push(&self, key: &str, values: &[String]) -> RedisResult<i64> {
if values.is_empty() {
return Ok(0);
}
let mut conn = self.pool.get().await?;
let len: i64 = conn.rpush(key, values).await?;
Ok(len)
}
pub async fn list_pop(&self, key: &str) -> RedisResult<Option<String>> {
let mut conn = self.pool.get().await?;
let val: Option<String> = conn.lpop(key, None).await?;
Ok(val)
}
pub async fn list_length(&self, key: &str) -> RedisResult<i64> {
let mut conn = self.pool.get().await?;
let len: i64 = conn.llen(key).await?;
Ok(len)
}
pub async fn eval<T: redis::FromRedisValue>(
&self,
script: &str,
keys: &[&str],
args: &[&str],
) -> RedisResult<T> {
let mut conn = self.pool.get().await?;
let result: T = redis::cmd("EVAL")
.arg(script)
.arg(keys.len())
.arg(keys)
.arg(args)
.query_async(&mut *conn)
.await?;
Ok(result)
}
pub async fn ping(&self) -> RedisResult<bool> {
let mut conn = self.pool.get().await?;
let response: String = redis::cmd("PING").query_async(&mut *conn).await?;
Ok(response == "PONG")
}
pub async fn health_check(&self) -> bool {
self.get("health_check").await.is_ok()
}
pub fn pool_size(&self) -> usize {
self.pool.status().size
}
pub fn available_connections(&self) -> usize {
self.pool.status().available
}
pub fn waiting_connections(&self) -> usize {
self.pool.status().waiting
}
}