redis_oxide/
pipeline.rs

1//! Pipeline support for batching Redis commands
2//!
3//! This module provides functionality to batch multiple Redis commands together
4//! and execute them in a single network round-trip, improving performance
5//! when executing multiple operations.
6//!
7//! # Examples
8//!
9//! ```no_run
10//! use redis_oxide::{Client, ConnectionConfig};
11//!
12//! # #[tokio::main]
13//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
14//! let config = ConnectionConfig::new("redis://localhost:6379");
15//! let client = Client::connect(config).await?;
16//!
17//! // Create a pipeline
18//! let mut pipeline = client.pipeline();
19//!
20//! // Add commands to the pipeline
21//! pipeline.set("key1", "value1");
22//! pipeline.set("key2", "value2");
23//! pipeline.get("key1");
24//! pipeline.incr("counter");
25//!
26//! // Execute all commands at once
27//! let results = pipeline.execute().await?;
28//! println!("Pipeline results: {:?}", results);
29//! # Ok(())
30//! # }
31//! ```
32
33use crate::core::{
34    error::{RedisError, RedisResult},
35    value::RespValue,
36};
37use std::collections::VecDeque;
38use std::sync::Arc;
39use tokio::sync::Mutex;
40
41/// Trait for commands that can be used in pipelines
42pub trait PipelineCommand: Send + Sync {
43    /// Get the command name
44    fn name(&self) -> &str;
45
46    /// Get the command arguments
47    fn args(&self) -> Vec<RespValue>;
48
49    /// Get the key(s) involved in this command (for cluster routing)
50    fn key(&self) -> Option<String>;
51}
52
53/// A pipeline for batching Redis commands
54///
55/// Pipeline allows you to send multiple commands to Redis in a single
56/// network round-trip, which can significantly improve performance when
57/// executing many operations.
58pub struct Pipeline {
59    commands: VecDeque<Box<dyn PipelineCommand>>,
60    connection: Arc<Mutex<dyn PipelineExecutor + Send + Sync>>,
61}
62
63/// Trait for executing pipelined commands
64#[async_trait::async_trait]
65pub trait PipelineExecutor {
66    /// Execute a batch of commands and return their results
67    async fn execute_pipeline(
68        &mut self,
69        commands: Vec<Box<dyn PipelineCommand>>,
70    ) -> RedisResult<Vec<RespValue>>;
71}
72
73impl Pipeline {
74    /// Create a new pipeline
75    pub fn new(connection: Arc<Mutex<dyn PipelineExecutor + Send + Sync>>) -> Self {
76        Self {
77            commands: VecDeque::new(),
78            connection,
79        }
80    }
81
82    /// Add a command to the pipeline
83    pub fn add_command(&mut self, command: Box<dyn PipelineCommand>) -> &mut Self {
84        self.commands.push_back(command);
85        self
86    }
87
88    /// Add a SET command to the pipeline
89    pub fn set(&mut self, key: impl Into<String>, value: impl Into<String>) -> &mut Self {
90        use crate::commands::SetCommand;
91        let cmd = SetCommand::new(key.into(), value.into());
92        self.add_command(Box::new(cmd))
93    }
94
95    /// Add a GET command to the pipeline
96    pub fn get(&mut self, key: impl Into<String>) -> &mut Self {
97        use crate::commands::GetCommand;
98        let cmd = GetCommand::new(key.into());
99        self.add_command(Box::new(cmd))
100    }
101
102    /// Add a DEL command to the pipeline
103    pub fn del(&mut self, keys: Vec<String>) -> &mut Self {
104        use crate::commands::DelCommand;
105        let cmd = DelCommand::new(keys);
106        self.add_command(Box::new(cmd))
107    }
108
109    /// Add an INCR command to the pipeline
110    pub fn incr(&mut self, key: impl Into<String>) -> &mut Self {
111        use crate::commands::IncrCommand;
112        let cmd = IncrCommand::new(key.into());
113        self.add_command(Box::new(cmd))
114    }
115
116    /// Add a DECR command to the pipeline
117    pub fn decr(&mut self, key: impl Into<String>) -> &mut Self {
118        use crate::commands::DecrCommand;
119        let cmd = DecrCommand::new(key.into());
120        self.add_command(Box::new(cmd))
121    }
122
123    /// Add an INCRBY command to the pipeline
124    pub fn incr_by(&mut self, key: impl Into<String>, increment: i64) -> &mut Self {
125        use crate::commands::IncrByCommand;
126        let cmd = IncrByCommand::new(key.into(), increment);
127        self.add_command(Box::new(cmd))
128    }
129
130    /// Add a DECRBY command to the pipeline
131    pub fn decr_by(&mut self, key: impl Into<String>, decrement: i64) -> &mut Self {
132        use crate::commands::DecrByCommand;
133        let cmd = DecrByCommand::new(key.into(), decrement);
134        self.add_command(Box::new(cmd))
135    }
136
137    /// Add an EXISTS command to the pipeline
138    pub fn exists(&mut self, keys: Vec<String>) -> &mut Self {
139        use crate::commands::ExistsCommand;
140        let cmd = ExistsCommand::new(keys);
141        self.add_command(Box::new(cmd))
142    }
143
144    /// Add an EXPIRE command to the pipeline
145    pub fn expire(&mut self, key: impl Into<String>, seconds: std::time::Duration) -> &mut Self {
146        use crate::commands::ExpireCommand;
147        let cmd = ExpireCommand::new(key.into(), seconds);
148        self.add_command(Box::new(cmd))
149    }
150
151    /// Add a TTL command to the pipeline
152    pub fn ttl(&mut self, key: impl Into<String>) -> &mut Self {
153        use crate::commands::TtlCommand;
154        let cmd = TtlCommand::new(key.into());
155        self.add_command(Box::new(cmd))
156    }
157
158    // Hash commands
159
160    /// Add an HGET command to the pipeline
161    pub fn hget(&mut self, key: impl Into<String>, field: impl Into<String>) -> &mut Self {
162        use crate::commands::HGetCommand;
163        let cmd = HGetCommand::new(key.into(), field.into());
164        self.add_command(Box::new(cmd))
165    }
166
167    /// Add an HSET command to the pipeline
168    pub fn hset(
169        &mut self,
170        key: impl Into<String>,
171        field: impl Into<String>,
172        value: impl Into<String>,
173    ) -> &mut Self {
174        use crate::commands::HSetCommand;
175        let cmd = HSetCommand::new(key.into(), field.into(), value.into());
176        self.add_command(Box::new(cmd))
177    }
178
179    /// Add an HDEL command to the pipeline
180    pub fn hdel(&mut self, key: impl Into<String>, fields: Vec<String>) -> &mut Self {
181        use crate::commands::HDelCommand;
182        let cmd = HDelCommand::new(key.into(), fields);
183        self.add_command(Box::new(cmd))
184    }
185
186    /// Add an HGETALL command to the pipeline
187    pub fn hgetall(&mut self, key: impl Into<String>) -> &mut Self {
188        use crate::commands::HGetAllCommand;
189        let cmd = HGetAllCommand::new(key.into());
190        self.add_command(Box::new(cmd))
191    }
192
193    /// Add an HMGET command to the pipeline
194    pub fn hmget(&mut self, key: impl Into<String>, fields: Vec<String>) -> &mut Self {
195        use crate::commands::HMGetCommand;
196        let cmd = HMGetCommand::new(key.into(), fields);
197        self.add_command(Box::new(cmd))
198    }
199
200    /// Add an HMSET command to the pipeline
201    pub fn hmset(
202        &mut self,
203        key: impl Into<String>,
204        fields: std::collections::HashMap<String, String>,
205    ) -> &mut Self {
206        use crate::commands::HMSetCommand;
207        let cmd = HMSetCommand::new(key.into(), fields);
208        self.add_command(Box::new(cmd))
209    }
210
211    /// Add an HLEN command to the pipeline
212    pub fn hlen(&mut self, key: impl Into<String>) -> &mut Self {
213        use crate::commands::HLenCommand;
214        let cmd = HLenCommand::new(key.into());
215        self.add_command(Box::new(cmd))
216    }
217
218    // List commands
219
220    /// Add an LPUSH command to the pipeline
221    pub fn lpush(&mut self, key: impl Into<String>, values: Vec<String>) -> &mut Self {
222        use crate::commands::LPushCommand;
223        let cmd = LPushCommand::new(key.into(), values);
224        self.add_command(Box::new(cmd))
225    }
226
227    /// Add an RPUSH command to the pipeline
228    pub fn rpush(&mut self, key: impl Into<String>, values: Vec<String>) -> &mut Self {
229        use crate::commands::RPushCommand;
230        let cmd = RPushCommand::new(key.into(), values);
231        self.add_command(Box::new(cmd))
232    }
233
234    /// Add an LRANGE command to the pipeline
235    pub fn lrange(&mut self, key: impl Into<String>, start: i64, stop: i64) -> &mut Self {
236        use crate::commands::LRangeCommand;
237        let cmd = LRangeCommand::new(key.into(), start, stop);
238        self.add_command(Box::new(cmd))
239    }
240
241    /// Add an LLEN command to the pipeline
242    pub fn llen(&mut self, key: impl Into<String>) -> &mut Self {
243        use crate::commands::LLenCommand;
244        let cmd = LLenCommand::new(key.into());
245        self.add_command(Box::new(cmd))
246    }
247
248    // Set commands
249
250    /// Add an SADD command to the pipeline
251    pub fn sadd(&mut self, key: impl Into<String>, members: Vec<String>) -> &mut Self {
252        use crate::commands::SAddCommand;
253        let cmd = SAddCommand::new(key.into(), members);
254        self.add_command(Box::new(cmd))
255    }
256
257    /// Add an SMEMBERS command to the pipeline
258    pub fn smembers(&mut self, key: impl Into<String>) -> &mut Self {
259        use crate::commands::SMembersCommand;
260        let cmd = SMembersCommand::new(key.into());
261        self.add_command(Box::new(cmd))
262    }
263
264    /// Add an HEXISTS command to the pipeline
265    pub fn hexists(&mut self, key: impl Into<String>, field: impl Into<String>) -> &mut Self {
266        use crate::commands::HExistsCommand;
267        let cmd = HExistsCommand::new(key.into(), field.into());
268        self.add_command(Box::new(cmd))
269    }
270
271    /// Get the number of commands in the pipeline
272    #[must_use]
273    pub fn len(&self) -> usize {
274        self.commands.len()
275    }
276
277    /// Check if the pipeline is empty
278    #[must_use]
279    pub fn is_empty(&self) -> bool {
280        self.commands.is_empty()
281    }
282
283    /// Clear all commands from the pipeline
284    pub fn clear(&mut self) {
285        self.commands.clear();
286    }
287
288    /// Execute all commands in the pipeline
289    ///
290    /// This sends all batched commands to Redis in a single network round-trip
291    /// and returns their results in the same order they were added.
292    ///
293    /// # Errors
294    ///
295    /// Returns an error if:
296    /// - The pipeline is empty
297    /// - Network communication fails
298    /// - Any command in the pipeline fails
299    ///
300    /// # Examples
301    ///
302    /// ```no_run
303    /// # use redis_oxide::{Client, ConnectionConfig};
304    /// # #[tokio::main]
305    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
306    /// # let config = ConnectionConfig::new("redis://localhost:6379");
307    /// # let client = Client::connect(config).await?;
308    /// let mut pipeline = client.pipeline();
309    /// pipeline.set("key1", "value1");
310    /// pipeline.get("key1");
311    ///
312    /// let results = pipeline.execute().await?;
313    /// assert_eq!(results.len(), 2);
314    /// # Ok(())
315    /// # }
316    /// ```
317    pub async fn execute(&mut self) -> RedisResult<Vec<RespValue>> {
318        if self.commands.is_empty() {
319            return Err(RedisError::Protocol("Pipeline is empty".to_string()));
320        }
321
322        // Convert VecDeque to Vec for execution
323        let commands: Vec<Box<dyn PipelineCommand>> = self.commands.drain(..).collect();
324
325        // Execute the pipeline
326        let mut connection = self.connection.lock().await;
327        let results = connection.execute_pipeline(commands).await?;
328
329        Ok(results)
330    }
331
332    /// Execute the pipeline and return typed results
333    ///
334    /// This is a convenience method that executes the pipeline and attempts
335    /// to convert the results to the expected types.
336    ///
337    /// # Errors
338    ///
339    /// Returns an error if execution fails or type conversion fails.
340    pub async fn execute_typed<T>(&mut self) -> RedisResult<Vec<T>>
341    where
342        T: TryFrom<RespValue>,
343        T::Error: Into<RedisError>,
344    {
345        let results = self.execute().await?;
346        let mut typed_results = Vec::with_capacity(results.len());
347
348        for result in results {
349            let typed_result = T::try_from(result).map_err(Into::into)?;
350            typed_results.push(typed_result);
351        }
352
353        Ok(typed_results)
354    }
355}
356
357/// Pipeline result wrapper for easier handling
358#[derive(Debug, Clone)]
359pub struct PipelineResult {
360    results: Vec<RespValue>,
361    index: usize,
362}
363
364impl PipelineResult {
365    /// Create a new pipeline result
366    #[must_use]
367    pub fn new(results: Vec<RespValue>) -> Self {
368        Self { results, index: 0 }
369    }
370
371    /// Get the next result from the pipeline
372    ///
373    /// # Errors
374    ///
375    /// Returns an error if there are no more results or type conversion fails.
376    pub fn next<T>(&mut self) -> RedisResult<T>
377    where
378        T: TryFrom<RespValue>,
379        T::Error: Into<RedisError>,
380    {
381        if self.index >= self.results.len() {
382            return Err(RedisError::Protocol(
383                "No more results in pipeline".to_string(),
384            ));
385        }
386
387        let result = self.results[self.index].clone();
388        self.index += 1;
389
390        T::try_from(result).map_err(Into::into)
391    }
392
393    /// Get a result at a specific index
394    ///
395    /// # Errors
396    ///
397    /// Returns an error if the index is out of bounds or type conversion fails.
398    pub fn get<T>(&self, index: usize) -> RedisResult<T>
399    where
400        T: TryFrom<RespValue>,
401        T::Error: Into<RedisError>,
402    {
403        if index >= self.results.len() {
404            return Err(RedisError::Protocol(format!(
405                "Index {} out of bounds",
406                index
407            )));
408        }
409
410        let result = self.results[index].clone();
411        T::try_from(result).map_err(Into::into)
412    }
413
414    /// Get the number of results
415    #[must_use]
416    pub fn len(&self) -> usize {
417        self.results.len()
418    }
419
420    /// Check if there are no results
421    #[must_use]
422    pub fn is_empty(&self) -> bool {
423        self.results.is_empty()
424    }
425
426    /// Get all results as a vector
427    #[must_use]
428    pub fn into_results(self) -> Vec<RespValue> {
429        self.results
430    }
431}
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436    use std::sync::Arc;
437    use tokio::sync::Mutex;
438
439    struct MockExecutor {
440        expected_commands: usize,
441    }
442
443    #[async_trait::async_trait]
444    impl PipelineExecutor for MockExecutor {
445        async fn execute_pipeline(
446            &mut self,
447            commands: Vec<Box<dyn PipelineCommand>>,
448        ) -> RedisResult<Vec<RespValue>> {
449            assert_eq!(commands.len(), self.expected_commands);
450
451            // Return mock results
452            let mut results = Vec::new();
453            for _ in 0..commands.len() {
454                results.push(RespValue::SimpleString("OK".to_string()));
455            }
456            Ok(results)
457        }
458    }
459
460    #[tokio::test]
461    async fn test_pipeline_creation() {
462        let executor = MockExecutor {
463            expected_commands: 0,
464        };
465        let pipeline = Pipeline::new(Arc::new(Mutex::new(executor)));
466
467        assert!(pipeline.is_empty());
468        assert_eq!(pipeline.len(), 0);
469    }
470
471    #[tokio::test]
472    async fn test_pipeline_add_commands() {
473        let executor = MockExecutor {
474            expected_commands: 2,
475        };
476        let mut pipeline = Pipeline::new(Arc::new(Mutex::new(executor)));
477
478        pipeline.set("key1", "value1");
479        pipeline.get("key1");
480
481        assert_eq!(pipeline.len(), 2);
482        assert!(!pipeline.is_empty());
483    }
484
485    #[tokio::test]
486    async fn test_pipeline_execute() {
487        let executor = MockExecutor {
488            expected_commands: 2,
489        };
490        let mut pipeline = Pipeline::new(Arc::new(Mutex::new(executor)));
491
492        pipeline.set("key1", "value1");
493        pipeline.get("key1");
494
495        let results = pipeline.execute().await.unwrap();
496        assert_eq!(results.len(), 2);
497        assert!(pipeline.is_empty()); // Commands should be consumed
498    }
499
500    #[tokio::test]
501    async fn test_pipeline_clear() {
502        let executor = MockExecutor {
503            expected_commands: 0,
504        };
505        let mut pipeline = Pipeline::new(Arc::new(Mutex::new(executor)));
506
507        pipeline.set("key1", "value1");
508        pipeline.get("key1");
509        assert_eq!(pipeline.len(), 2);
510
511        pipeline.clear();
512        assert!(pipeline.is_empty());
513        assert_eq!(pipeline.len(), 0);
514    }
515
516    #[tokio::test]
517    async fn test_pipeline_result() {
518        let results = vec![
519            RespValue::SimpleString("OK".to_string()),
520            RespValue::BulkString(b"value1".to_vec().into()),
521            RespValue::Integer(42),
522        ];
523
524        let mut pipeline_result = PipelineResult::new(results);
525
526        assert_eq!(pipeline_result.len(), 3);
527        assert!(!pipeline_result.is_empty());
528
529        let first: String = pipeline_result.next().unwrap();
530        assert_eq!(first, "OK");
531
532        let second: String = pipeline_result.get(1).unwrap();
533        assert_eq!(second, "value1");
534    }
535}