use crate::core::{
error::{RedisError, RedisResult},
value::RespValue,
};
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::sync::Mutex;
pub trait PipelineCommand: Send + Sync {
fn name(&self) -> &str;
fn args(&self) -> Vec<RespValue>;
fn key(&self) -> Option<String>;
}
pub struct Pipeline {
commands: VecDeque<Box<dyn PipelineCommand>>,
connection: Arc<Mutex<dyn PipelineExecutor + Send + Sync>>,
}
#[async_trait::async_trait]
pub trait PipelineExecutor {
async fn execute_pipeline(
&mut self,
commands: Vec<Box<dyn PipelineCommand>>,
) -> RedisResult<Vec<RespValue>>;
}
impl Pipeline {
pub fn new(connection: Arc<Mutex<dyn PipelineExecutor + Send + Sync>>) -> Self {
Self {
commands: VecDeque::new(),
connection,
}
}
pub fn add_command(&mut self, command: Box<dyn PipelineCommand>) -> &mut Self {
self.commands.push_back(command);
self
}
pub fn set(&mut self, key: impl Into<String>, value: impl Into<String>) -> &mut Self {
use crate::commands::SetCommand;
let cmd = SetCommand::new(key.into(), value.into());
self.add_command(Box::new(cmd))
}
pub fn get(&mut self, key: impl Into<String>) -> &mut Self {
use crate::commands::GetCommand;
let cmd = GetCommand::new(key.into());
self.add_command(Box::new(cmd))
}
pub fn del(&mut self, keys: Vec<String>) -> &mut Self {
use crate::commands::DelCommand;
let cmd = DelCommand::new(keys);
self.add_command(Box::new(cmd))
}
pub fn incr(&mut self, key: impl Into<String>) -> &mut Self {
use crate::commands::IncrCommand;
let cmd = IncrCommand::new(key.into());
self.add_command(Box::new(cmd))
}
pub fn decr(&mut self, key: impl Into<String>) -> &mut Self {
use crate::commands::DecrCommand;
let cmd = DecrCommand::new(key.into());
self.add_command(Box::new(cmd))
}
pub fn incr_by(&mut self, key: impl Into<String>, increment: i64) -> &mut Self {
use crate::commands::IncrByCommand;
let cmd = IncrByCommand::new(key.into(), increment);
self.add_command(Box::new(cmd))
}
pub fn decr_by(&mut self, key: impl Into<String>, decrement: i64) -> &mut Self {
use crate::commands::DecrByCommand;
let cmd = DecrByCommand::new(key.into(), decrement);
self.add_command(Box::new(cmd))
}
pub fn exists(&mut self, keys: Vec<String>) -> &mut Self {
use crate::commands::ExistsCommand;
let cmd = ExistsCommand::new(keys);
self.add_command(Box::new(cmd))
}
pub fn expire(&mut self, key: impl Into<String>, seconds: std::time::Duration) -> &mut Self {
use crate::commands::ExpireCommand;
let cmd = ExpireCommand::new(key.into(), seconds);
self.add_command(Box::new(cmd))
}
pub fn ttl(&mut self, key: impl Into<String>) -> &mut Self {
use crate::commands::TtlCommand;
let cmd = TtlCommand::new(key.into());
self.add_command(Box::new(cmd))
}
pub fn hget(&mut self, key: impl Into<String>, field: impl Into<String>) -> &mut Self {
use crate::commands::HGetCommand;
let cmd = HGetCommand::new(key.into(), field.into());
self.add_command(Box::new(cmd))
}
pub fn hset(
&mut self,
key: impl Into<String>,
field: impl Into<String>,
value: impl Into<String>,
) -> &mut Self {
use crate::commands::HSetCommand;
let cmd = HSetCommand::new(key.into(), field.into(), value.into());
self.add_command(Box::new(cmd))
}
pub fn hdel(&mut self, key: impl Into<String>, fields: Vec<String>) -> &mut Self {
use crate::commands::HDelCommand;
let cmd = HDelCommand::new(key.into(), fields);
self.add_command(Box::new(cmd))
}
pub fn hgetall(&mut self, key: impl Into<String>) -> &mut Self {
use crate::commands::HGetAllCommand;
let cmd = HGetAllCommand::new(key.into());
self.add_command(Box::new(cmd))
}
pub fn hmget(&mut self, key: impl Into<String>, fields: Vec<String>) -> &mut Self {
use crate::commands::HMGetCommand;
let cmd = HMGetCommand::new(key.into(), fields);
self.add_command(Box::new(cmd))
}
pub fn hmset(
&mut self,
key: impl Into<String>,
fields: std::collections::HashMap<String, String>,
) -> &mut Self {
use crate::commands::HMSetCommand;
let cmd = HMSetCommand::new(key.into(), fields);
self.add_command(Box::new(cmd))
}
pub fn hlen(&mut self, key: impl Into<String>) -> &mut Self {
use crate::commands::HLenCommand;
let cmd = HLenCommand::new(key.into());
self.add_command(Box::new(cmd))
}
pub fn lpush(&mut self, key: impl Into<String>, values: Vec<String>) -> &mut Self {
use crate::commands::LPushCommand;
let cmd = LPushCommand::new(key.into(), values);
self.add_command(Box::new(cmd))
}
pub fn rpush(&mut self, key: impl Into<String>, values: Vec<String>) -> &mut Self {
use crate::commands::RPushCommand;
let cmd = RPushCommand::new(key.into(), values);
self.add_command(Box::new(cmd))
}
pub fn lrange(&mut self, key: impl Into<String>, start: i64, stop: i64) -> &mut Self {
use crate::commands::LRangeCommand;
let cmd = LRangeCommand::new(key.into(), start, stop);
self.add_command(Box::new(cmd))
}
pub fn llen(&mut self, key: impl Into<String>) -> &mut Self {
use crate::commands::LLenCommand;
let cmd = LLenCommand::new(key.into());
self.add_command(Box::new(cmd))
}
pub fn sadd(&mut self, key: impl Into<String>, members: Vec<String>) -> &mut Self {
use crate::commands::SAddCommand;
let cmd = SAddCommand::new(key.into(), members);
self.add_command(Box::new(cmd))
}
pub fn smembers(&mut self, key: impl Into<String>) -> &mut Self {
use crate::commands::SMembersCommand;
let cmd = SMembersCommand::new(key.into());
self.add_command(Box::new(cmd))
}
pub fn hexists(&mut self, key: impl Into<String>, field: impl Into<String>) -> &mut Self {
use crate::commands::HExistsCommand;
let cmd = HExistsCommand::new(key.into(), field.into());
self.add_command(Box::new(cmd))
}
#[must_use]
pub fn len(&self) -> usize {
self.commands.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.commands.is_empty()
}
pub fn clear(&mut self) {
self.commands.clear();
}
pub async fn execute(&mut self) -> RedisResult<Vec<RespValue>> {
if self.commands.is_empty() {
return Err(RedisError::Protocol("Pipeline is empty".to_string()));
}
let commands: Vec<Box<dyn PipelineCommand>> = self.commands.drain(..).collect();
let mut connection = self.connection.lock().await;
let results = connection.execute_pipeline(commands).await?;
Ok(results)
}
pub async fn execute_typed<T>(&mut self) -> RedisResult<Vec<T>>
where
T: TryFrom<RespValue>,
T::Error: Into<RedisError>,
{
let results = self.execute().await?;
let mut typed_results = Vec::with_capacity(results.len());
for result in results {
let typed_result = T::try_from(result).map_err(Into::into)?;
typed_results.push(typed_result);
}
Ok(typed_results)
}
}
#[derive(Debug, Clone)]
pub struct PipelineResult {
results: Vec<RespValue>,
index: usize,
}
impl PipelineResult {
#[must_use]
pub fn new(results: Vec<RespValue>) -> Self {
Self { results, index: 0 }
}
pub fn next<T>(&mut self) -> RedisResult<T>
where
T: TryFrom<RespValue>,
T::Error: Into<RedisError>,
{
if self.index >= self.results.len() {
return Err(RedisError::Protocol(
"No more results in pipeline".to_string(),
));
}
let result = self.results[self.index].clone();
self.index += 1;
T::try_from(result).map_err(Into::into)
}
pub fn get<T>(&self, index: usize) -> RedisResult<T>
where
T: TryFrom<RespValue>,
T::Error: Into<RedisError>,
{
if index >= self.results.len() {
return Err(RedisError::Protocol(format!(
"Index {} out of bounds",
index
)));
}
let result = self.results[index].clone();
T::try_from(result).map_err(Into::into)
}
#[must_use]
pub fn len(&self) -> usize {
self.results.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.results.is_empty()
}
#[must_use]
pub fn into_results(self) -> Vec<RespValue> {
self.results
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use tokio::sync::Mutex;
struct MockExecutor {
expected_commands: usize,
}
#[async_trait::async_trait]
impl PipelineExecutor for MockExecutor {
async fn execute_pipeline(
&mut self,
commands: Vec<Box<dyn PipelineCommand>>,
) -> RedisResult<Vec<RespValue>> {
assert_eq!(commands.len(), self.expected_commands);
let mut results = Vec::new();
for _ in 0..commands.len() {
results.push(RespValue::SimpleString("OK".to_string()));
}
Ok(results)
}
}
#[tokio::test]
async fn test_pipeline_creation() {
let executor = MockExecutor {
expected_commands: 0,
};
let pipeline = Pipeline::new(Arc::new(Mutex::new(executor)));
assert!(pipeline.is_empty());
assert_eq!(pipeline.len(), 0);
}
#[tokio::test]
async fn test_pipeline_add_commands() {
let executor = MockExecutor {
expected_commands: 2,
};
let mut pipeline = Pipeline::new(Arc::new(Mutex::new(executor)));
pipeline.set("key1", "value1");
pipeline.get("key1");
assert_eq!(pipeline.len(), 2);
assert!(!pipeline.is_empty());
}
#[tokio::test]
async fn test_pipeline_execute() {
let executor = MockExecutor {
expected_commands: 2,
};
let mut pipeline = Pipeline::new(Arc::new(Mutex::new(executor)));
pipeline.set("key1", "value1");
pipeline.get("key1");
let results = pipeline.execute().await.unwrap();
assert_eq!(results.len(), 2);
assert!(pipeline.is_empty()); }
#[tokio::test]
async fn test_pipeline_clear() {
let executor = MockExecutor {
expected_commands: 0,
};
let mut pipeline = Pipeline::new(Arc::new(Mutex::new(executor)));
pipeline.set("key1", "value1");
pipeline.get("key1");
assert_eq!(pipeline.len(), 2);
pipeline.clear();
assert!(pipeline.is_empty());
assert_eq!(pipeline.len(), 0);
}
#[tokio::test]
async fn test_pipeline_result() {
let results = vec![
RespValue::SimpleString("OK".to_string()),
RespValue::BulkString(b"value1".to_vec().into()),
RespValue::Integer(42),
];
let mut pipeline_result = PipelineResult::new(results);
assert_eq!(pipeline_result.len(), 3);
assert!(!pipeline_result.is_empty());
let first: String = pipeline_result.next().unwrap();
assert_eq!(first, "OK");
let second: String = pipeline_result.get(1).unwrap();
assert_eq!(second, "value1");
}
}