mycommon-utils 0.2.1

Common utilities library for database operations, Redis caching and system utilities
Documentation
use std::sync::Arc;
use redis::{FromRedisValue, RedisResult, ToRedisArgs};
use crate::database::config::{ REDIS_ASYNC_POOL};

/// 同步 Redis 工具类,连接在实例 drop 时释放
// pub struct RedisUtil {
//     connection: PooledConnection<Client>
// }

// impl RedisUtil {
//     pub fn new() -> Result<RedisUtil> {
//         let client = REDIS_POOL.get().ok_or_else(|| {
//             tracing::error!("Failed to get Redis connection pool");
//             Error::NotFound()
//         })?;

//         let connection = client.get().map_err(|e| {
//             tracing::error!("Failed to obtain Redis connection: {:?}", e);
//             Error::NotFound()
//         })?;

//         Ok(RedisUtil { connection })
//     }

//     pub fn set<T: ToRedisArgs>(&mut self, key: &str, value: &T) -> RedisResult<bool> {
//         let result = self.connection.set(key, value)?;
//         Ok(result)
//     }

//     pub fn set_with_expiry<T: ToRedisArgs>(&mut self, key: &str, value: &T, expiry_seconds: u64) -> RedisResult<()> {
//         let result = self.connection.set_ex(key, value, expiry_seconds)?;
//         Ok(result)
//     }

//     pub fn set_nx_ex<T: ToRedisArgs>(&mut self, key: &str, value: &T, expiry_seconds: u64) -> RedisResult<bool> {
//         let mut cmd = redis::Cmd::set(key, value);
//         cmd.arg("NX").arg("EX").arg(expiry_seconds);
//         let result: String = cmd.query(&mut self.connection)?;
//         Ok(result == "OK")
//     }

//     pub fn expire(&mut self, key: &str, expiry_seconds: i64) -> RedisResult<bool> {
//         let result = self.connection.expire(key, expiry_seconds)?;
//         Ok(result)
//     }

//     pub fn get<T: FromRedisValue>(&mut self, key: &str) -> RedisResult<Option<T>> {
//         let value: Option<T> = self.connection.get(key)?;
//         Ok(value)
//     }

//     pub fn pipeline_operations<T: ToRedisArgs>(&mut self, keys: Vec<&str>, values: Vec<&T>) -> RedisResult<()> {
//         if keys.len() != values.len() {
//             return Err(redis::RedisError::from((
//                 redis::ErrorKind::TypeError,
//                 "Keys and values must have the same length",
//             )));
//         }
//         let mut pipe = redis::pipe();
//         for (key, value) in keys.iter().zip(values.iter()) {
//             pipe.set(*key, *value).ignore();
//         }
//         pipe.query::<()>(&mut self.connection)?;
//         Ok(())
//     }

//     pub fn exists(&mut self, key: &str) -> RedisResult<bool> {
//         let exists = self.connection.exists(key)?;
//         Ok(exists)
//     }

//     pub fn del_vec(&mut self, keys: Vec<&str>) -> RedisResult<usize> {
//         self.connection.del(keys)
//     }

//     pub fn del(&mut self, key: &str) -> RedisResult<usize> {
//         self.connection.del(key)
//     }

//     pub fn ttl(&mut self, key: &str) -> RedisResult<isize> {
//         self.connection.ttl(key)
//     }

//     pub fn incr(&mut self, key: &str) -> RedisResult<isize> {
//         self.connection.incr(key, 1)
//     }

//     pub fn decr(&mut self, key: &str) -> RedisResult<isize> {
//         self.connection.decr(key, 1)
//     }

//     pub fn hset(&mut self, key: &str, field: &str, value: &str) -> RedisResult<()> {
//         self.connection.hset(key, field, value)
//     }

//     pub fn hget(&mut self, key: &str, field: &str) -> RedisResult<Option<String>> {
//         self.connection.hget(key, field)
//     }

//     pub fn lpush<T: FromRedisValue>(&mut self, key: &str, value: &str) -> RedisResult<T> {
//         self.connection.lpush(key, value)
//     }

//     pub fn rpop<T: ToString>(&mut self, key: &str, count: Option<usize>) -> RedisResult<Option<Vec<String>>> {
//         use std::num::NonZeroUsize;
//         let non_zero_count = count.and_then(NonZeroUsize::new);
//         self.connection.rpop(key, non_zero_count)
//     }
// }

/// 异步 Redis 工具类,适用于 Tokio 异步运行时
/// 推荐在 Axum 中间件和异步代码中使用
/// 每个方法独立获取和释放连接,避免连接长时间占用
pub struct RedisAsync;

impl RedisAsync {
    fn pool() -> &'static Arc<deadpool_redis::Pool> {
        REDIS_ASYNC_POOL.get().expect("Redis async pool not initialized")
    }

    /// 异步设置键值对
    pub async fn set<T: ToRedisArgs + Send + Sync>(key: &str, value: &T) -> RedisResult<()> {
        let mut conn = Self::pool().get().await.map_err(|e| {
            redis::RedisError::from((redis::ErrorKind::IoError, "Pool error", format!("{:?}", e)))
        })?;
        redis::cmd("SET").arg(key).arg(value).query_async(&mut *conn).await
    }

    /// 异步设置带过期时间的键值对
    pub async fn set_with_expiry<T: ToRedisArgs + Send + Sync>(key: &str, value: &T, expiry_seconds: u64) -> RedisResult<()> {
        let mut conn = Self::pool().get().await.map_err(|e| {
            redis::RedisError::from((redis::ErrorKind::IoError, "Pool error", format!("{:?}", e)))
        })?;
        redis::cmd("SET")
            .arg(key)
            .arg(value)
            .arg("EX")
            .arg(expiry_seconds)
            .query_async(&mut *conn).await
    }

    /// 异步设置带 NX 和 EX 的键值对(分布式锁)
    pub async fn set_nx_ex<T: ToRedisArgs + Send + Sync>(key: &str, value: &T, expiry_seconds: u64) -> RedisResult<bool> {
        let mut conn = Self::pool().get().await.map_err(|e| {
            redis::RedisError::from((redis::ErrorKind::IoError, "Pool error", format!("{:?}", e)))
        })?;
        let result: Option<String> = redis::cmd("SET")
            .arg(key)
            .arg(value)
            .arg("NX")
            .arg("EX")
            .arg(expiry_seconds)
            .query_async(&mut *conn).await?;
        Ok(result == Some("OK".to_string()))
    }

    /// 异步获取键对应的值
    pub async fn get<T: FromRedisValue + Send>(key: &str) -> RedisResult<Option<T>> {
        let mut conn = Self::pool().get().await.map_err(|e| {
            redis::RedisError::from((redis::ErrorKind::IoError, "Pool error", format!("{:?}", e)))
        })?;
        redis::cmd("GET").arg(key).query_async(&mut *conn).await
    }

    /// 异步设置过期时间
    pub async fn expire(key: &str, expiry_seconds: i64) -> RedisResult<bool> {
        let mut conn = Self::pool().get().await.map_err(|e| {
            redis::RedisError::from((redis::ErrorKind::IoError, "Pool error", format!("{:?}", e)))
        })?;
        let result: i32 = redis::cmd("EXPIRE").arg(key).arg(expiry_seconds).query_async(&mut *conn).await?;
        Ok(result == 1)
    }

    /// 异步检查键是否存在
    pub async fn exists(key: &str) -> RedisResult<bool> {
        let mut conn = Self::pool().get().await.map_err(|e| {
            redis::RedisError::from((redis::ErrorKind::IoError, "Pool error", format!("{:?}", e)))
        })?;
        let exists: i32 = redis::cmd("EXISTS").arg(key).query_async(&mut *conn).await?;
        Ok(exists == 1)
    }

    /// 异步删除键
    pub async fn del(key: &str) -> RedisResult<i32> {
        let mut conn = Self::pool().get().await.map_err(|e| {
            redis::RedisError::from((redis::ErrorKind::IoError, "Pool error", format!("{:?}", e)))
        })?;
        redis::cmd("DEL").arg(key).query_async(&mut *conn).await
    }

    /// 异步删除多个键
    pub async fn del_vec(keys: &[&str]) -> RedisResult<i32> {
        let mut conn = Self::pool().get().await.map_err(|e| {
            redis::RedisError::from((redis::ErrorKind::IoError, "Pool error", format!("{:?}", e)))
        })?;
        redis::cmd("DEL").arg(keys).query_async(&mut *conn).await
    }

    /// 异步获取 TTL
    pub async fn ttl(key: &str) -> RedisResult<i64> {
        let mut conn = Self::pool().get().await.map_err(|e| {
            redis::RedisError::from((redis::ErrorKind::IoError, "Pool error", format!("{:?}", e)))
        })?;
        redis::cmd("TTL").arg(key).query_async(&mut *conn).await
    }

    /// 异步自增
    pub async fn incr(key: &str) -> RedisResult<i64> {
        let mut conn = Self::pool().get().await.map_err(|e| {
            redis::RedisError::from((redis::ErrorKind::IoError, "Pool error", format!("{:?}", e)))
        })?;
        redis::cmd("INCR").arg(key).query_async(&mut *conn).await
    }

    /// 异步自减
    pub async fn decr(key: &str) -> RedisResult<i64> {
        let mut conn = Self::pool().get().await.map_err(|e| {
            redis::RedisError::from((redis::ErrorKind::IoError, "Pool error", format!("{:?}", e)))
        })?;
        redis::cmd("DECR").arg(key).query_async(&mut *conn).await
    }

    /// 异步设置哈希字段
    pub async fn hset<K: ToRedisArgs + Send + Sync, V: ToRedisArgs + Send + Sync>(key: &str, field: K, value: V) -> RedisResult<()> {
        let mut conn = Self::pool().get().await.map_err(|e| {
            redis::RedisError::from((redis::ErrorKind::IoError, "Pool error", format!("{:?}", e)))
        })?;
        redis::cmd("HSET").arg(key).arg(field).arg(value).query_async(&mut *conn).await
    }

    /// 异步获取哈希字段
    pub async fn hget<T: FromRedisValue + Send>(key: &str, field: &str) -> RedisResult<Option<T>> {
        let mut conn = Self::pool().get().await.map_err(|e| {
            redis::RedisError::from((redis::ErrorKind::IoError, "Pool error", format!("{:?}", e)))
        })?;
        redis::cmd("HGET").arg(key).arg(field).query_async(&mut *conn).await
    }

    /// 异步向列表左侧推入元素
    pub async fn lpush<T: ToRedisArgs + Send + Sync>(key: &str, value: T) -> RedisResult<i32> {
        let mut conn = Self::pool().get().await.map_err(|e| {
            redis::RedisError::from((redis::ErrorKind::IoError, "Pool error", format!("{:?}", e)))
        })?;
        redis::cmd("LPUSH").arg(key).arg(value).query_async(&mut *conn).await
    }

    /// 异步从列表右侧弹出元素
    pub async fn rpop(key: &str) -> RedisResult<Option<String>> {
        let mut conn = Self::pool().get().await.map_err(|e| {
            redis::RedisError::from((redis::ErrorKind::IoError, "Pool error", format!("{:?}", e)))
        })?;
        redis::cmd("RPOP").arg(key).query_async(&mut *conn).await
    }

    /// 异步添加 Stream 消息(带自动清理)
    pub async fn xadd_maxlen<K: ToRedisArgs + Send + Sync, V: ToRedisArgs + Send + Sync>(
        key: &str,
        maxlen: usize,
        field: K,
        value: V,
    ) -> RedisResult<String> {
        let mut conn = Self::pool().get().await.map_err(|e| {
            redis::RedisError::from((redis::ErrorKind::IoError, "Pool error", format!("{:?}", e)))
        })?;
        redis::cmd("XADD")
            .arg(key)
            .arg("MAXLEN")
            .arg("~")
            .arg(maxlen)
            .arg("*")
            .arg(field)
            .arg(value)
            .query_async(&mut *conn)
            .await
    }
    /// 异步删除 Stream 消息
    pub async fn xdel<ID: ToRedisArgs + Send + Sync>(key: &str, ids: &[&ID]) -> RedisResult<i32> {
        let mut conn = Self::pool().get().await.map_err(|e| {
            redis::RedisError::from((redis::ErrorKind::IoError, "Pool error", format!("{:?}", e)))
        })?;
        redis::cmd("XDEL").arg(key).arg(ids).query_async(&mut *conn).await
    }
}