use crate::{DbError, RedisClient, RedisValue, Result};
use redis::FromRedisValue;
pub struct RedisPipeline {
pipe: redis::Pipeline,
client: RedisClient,
}
impl RedisPipeline {
pub fn new(client: RedisClient) -> Self {
Self {
pipe: redis::pipe(),
client,
}
}
pub fn set(&mut self, key: impl Into<String>, value: impl Into<String>) -> &mut Self {
self.pipe.set(key.into(), value.into());
self
}
pub fn get(&mut self, key: impl Into<String>) -> &mut Self {
self.pipe.get(key.into());
self
}
pub fn del(&mut self, keys: &[String]) -> &mut Self {
self.pipe.del(keys);
self
}
pub fn incr(&mut self, key: impl Into<String>) -> &mut Self {
self.pipe.incr(key.into(), 1);
self
}
pub fn hset(
&mut self,
key: impl Into<String>,
field: impl Into<String>,
value: impl Into<String>,
) -> &mut Self {
self.pipe.hset(key.into(), field.into(), value.into());
self
}
pub fn hget(&mut self, key: impl Into<String>, field: impl Into<String>) -> &mut Self {
self.pipe.hget(key.into(), field.into());
self
}
pub fn lpush(&mut self, key: impl Into<String>, values: &[String]) -> &mut Self {
let key_str = key.into();
for value in values {
self.pipe.lpush(&key_str, value);
}
self
}
pub fn rpush(&mut self, key: impl Into<String>, values: &[String]) -> &mut Self {
let key_str = key.into();
for value in values {
self.pipe.rpush(&key_str, value);
}
self
}
pub fn sadd(&mut self, key: impl Into<String>, members: &[String]) -> &mut Self {
let key_str = key.into();
for member in members {
self.pipe.sadd(&key_str, member);
}
self
}
pub fn zadd(&mut self, key: impl Into<String>, members: &[(f64, String)]) -> &mut Self {
let key_str = key.into();
for (score, member) in members {
self.pipe.zadd(&key_str, member, *score);
}
self
}
pub fn cmd(&mut self, cmd: redis::Cmd) -> &mut Self {
self.pipe.add_command(cmd);
self
}
pub async fn query<T: FromRedisValue>(self) -> Result<Vec<T>> {
let mut conn = self
.client
.pool()
.get()
.await
.map_err(|e| DbError::RedisPoolError(format!("获取连接失败: {}", e)))?;
let results: Vec<T> = self
.pipe
.query_async(&mut *conn)
.await
.map_err(|e| DbError::RedisCommandError(format!("Pipeline 执行失败: {}", e)))?;
Ok(results)
}
pub async fn execute(self) -> Result<Vec<RedisValue>> {
let mut conn = self
.client
.pool()
.get()
.await
.map_err(|e| DbError::RedisPoolError(format!("获取连接失败: {}", e)))?;
let results: Vec<redis::Value> = self
.pipe
.query_async(&mut *conn)
.await
.map_err(|e| DbError::RedisCommandError(format!("Pipeline 执行失败: {}", e)))?;
Ok(results.into_iter().map(RedisValue::from).collect())
}
pub fn len(&self) -> usize {
self.pipe.cmd_iter().count()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
#[cfg(test)]
mod tests {
#[test]
fn test_pipeline_creation() {
}
#[test]
fn test_pipeline_len_empty() {
}
}