1use crate::core::{
34 error::{RedisError, RedisResult},
35 value::RespValue,
36};
37use std::collections::VecDeque;
38use std::sync::Arc;
39use tokio::sync::Mutex;
40
41pub trait PipelineCommand: Send + Sync {
43 fn name(&self) -> &str;
45
46 fn args(&self) -> Vec<RespValue>;
48
49 fn key(&self) -> Option<String>;
51}
52
53pub struct Pipeline {
59 commands: VecDeque<Box<dyn PipelineCommand>>,
60 connection: Arc<Mutex<dyn PipelineExecutor + Send + Sync>>,
61}
62
63#[async_trait::async_trait]
65pub trait PipelineExecutor {
66 async fn execute_pipeline(
68 &mut self,
69 commands: Vec<Box<dyn PipelineCommand>>,
70 ) -> RedisResult<Vec<RespValue>>;
71}
72
73impl Pipeline {
74 pub fn new(connection: Arc<Mutex<dyn PipelineExecutor + Send + Sync>>) -> Self {
76 Self {
77 commands: VecDeque::new(),
78 connection,
79 }
80 }
81
82 pub fn add_command(&mut self, command: Box<dyn PipelineCommand>) -> &mut Self {
84 self.commands.push_back(command);
85 self
86 }
87
88 pub fn set(&mut self, key: impl Into<String>, value: impl Into<String>) -> &mut Self {
90 use crate::commands::SetCommand;
91 let cmd = SetCommand::new(key.into(), value.into());
92 self.add_command(Box::new(cmd))
93 }
94
95 pub fn get(&mut self, key: impl Into<String>) -> &mut Self {
97 use crate::commands::GetCommand;
98 let cmd = GetCommand::new(key.into());
99 self.add_command(Box::new(cmd))
100 }
101
102 pub fn del(&mut self, keys: Vec<String>) -> &mut Self {
104 use crate::commands::DelCommand;
105 let cmd = DelCommand::new(keys);
106 self.add_command(Box::new(cmd))
107 }
108
109 pub fn incr(&mut self, key: impl Into<String>) -> &mut Self {
111 use crate::commands::IncrCommand;
112 let cmd = IncrCommand::new(key.into());
113 self.add_command(Box::new(cmd))
114 }
115
116 pub fn decr(&mut self, key: impl Into<String>) -> &mut Self {
118 use crate::commands::DecrCommand;
119 let cmd = DecrCommand::new(key.into());
120 self.add_command(Box::new(cmd))
121 }
122
123 pub fn incr_by(&mut self, key: impl Into<String>, increment: i64) -> &mut Self {
125 use crate::commands::IncrByCommand;
126 let cmd = IncrByCommand::new(key.into(), increment);
127 self.add_command(Box::new(cmd))
128 }
129
130 pub fn decr_by(&mut self, key: impl Into<String>, decrement: i64) -> &mut Self {
132 use crate::commands::DecrByCommand;
133 let cmd = DecrByCommand::new(key.into(), decrement);
134 self.add_command(Box::new(cmd))
135 }
136
137 pub fn exists(&mut self, keys: Vec<String>) -> &mut Self {
139 use crate::commands::ExistsCommand;
140 let cmd = ExistsCommand::new(keys);
141 self.add_command(Box::new(cmd))
142 }
143
144 pub fn expire(&mut self, key: impl Into<String>, seconds: std::time::Duration) -> &mut Self {
146 use crate::commands::ExpireCommand;
147 let cmd = ExpireCommand::new(key.into(), seconds);
148 self.add_command(Box::new(cmd))
149 }
150
151 pub fn ttl(&mut self, key: impl Into<String>) -> &mut Self {
153 use crate::commands::TtlCommand;
154 let cmd = TtlCommand::new(key.into());
155 self.add_command(Box::new(cmd))
156 }
157
158 pub fn hget(&mut self, key: impl Into<String>, field: impl Into<String>) -> &mut Self {
162 use crate::commands::HGetCommand;
163 let cmd = HGetCommand::new(key.into(), field.into());
164 self.add_command(Box::new(cmd))
165 }
166
167 pub fn hset(
169 &mut self,
170 key: impl Into<String>,
171 field: impl Into<String>,
172 value: impl Into<String>,
173 ) -> &mut Self {
174 use crate::commands::HSetCommand;
175 let cmd = HSetCommand::new(key.into(), field.into(), value.into());
176 self.add_command(Box::new(cmd))
177 }
178
179 pub fn hdel(&mut self, key: impl Into<String>, fields: Vec<String>) -> &mut Self {
181 use crate::commands::HDelCommand;
182 let cmd = HDelCommand::new(key.into(), fields);
183 self.add_command(Box::new(cmd))
184 }
185
186 pub fn hgetall(&mut self, key: impl Into<String>) -> &mut Self {
188 use crate::commands::HGetAllCommand;
189 let cmd = HGetAllCommand::new(key.into());
190 self.add_command(Box::new(cmd))
191 }
192
193 pub fn hmget(&mut self, key: impl Into<String>, fields: Vec<String>) -> &mut Self {
195 use crate::commands::HMGetCommand;
196 let cmd = HMGetCommand::new(key.into(), fields);
197 self.add_command(Box::new(cmd))
198 }
199
200 pub fn hmset(
202 &mut self,
203 key: impl Into<String>,
204 fields: std::collections::HashMap<String, String>,
205 ) -> &mut Self {
206 use crate::commands::HMSetCommand;
207 let cmd = HMSetCommand::new(key.into(), fields);
208 self.add_command(Box::new(cmd))
209 }
210
211 pub fn hlen(&mut self, key: impl Into<String>) -> &mut Self {
213 use crate::commands::HLenCommand;
214 let cmd = HLenCommand::new(key.into());
215 self.add_command(Box::new(cmd))
216 }
217
218 pub fn lpush(&mut self, key: impl Into<String>, values: Vec<String>) -> &mut Self {
222 use crate::commands::LPushCommand;
223 let cmd = LPushCommand::new(key.into(), values);
224 self.add_command(Box::new(cmd))
225 }
226
227 pub fn rpush(&mut self, key: impl Into<String>, values: Vec<String>) -> &mut Self {
229 use crate::commands::RPushCommand;
230 let cmd = RPushCommand::new(key.into(), values);
231 self.add_command(Box::new(cmd))
232 }
233
234 pub fn lrange(&mut self, key: impl Into<String>, start: i64, stop: i64) -> &mut Self {
236 use crate::commands::LRangeCommand;
237 let cmd = LRangeCommand::new(key.into(), start, stop);
238 self.add_command(Box::new(cmd))
239 }
240
241 pub fn llen(&mut self, key: impl Into<String>) -> &mut Self {
243 use crate::commands::LLenCommand;
244 let cmd = LLenCommand::new(key.into());
245 self.add_command(Box::new(cmd))
246 }
247
248 pub fn sadd(&mut self, key: impl Into<String>, members: Vec<String>) -> &mut Self {
252 use crate::commands::SAddCommand;
253 let cmd = SAddCommand::new(key.into(), members);
254 self.add_command(Box::new(cmd))
255 }
256
257 pub fn smembers(&mut self, key: impl Into<String>) -> &mut Self {
259 use crate::commands::SMembersCommand;
260 let cmd = SMembersCommand::new(key.into());
261 self.add_command(Box::new(cmd))
262 }
263
264 pub fn hexists(&mut self, key: impl Into<String>, field: impl Into<String>) -> &mut Self {
266 use crate::commands::HExistsCommand;
267 let cmd = HExistsCommand::new(key.into(), field.into());
268 self.add_command(Box::new(cmd))
269 }
270
271 #[must_use]
273 pub fn len(&self) -> usize {
274 self.commands.len()
275 }
276
277 #[must_use]
279 pub fn is_empty(&self) -> bool {
280 self.commands.is_empty()
281 }
282
283 pub fn clear(&mut self) {
285 self.commands.clear();
286 }
287
288 pub async fn execute(&mut self) -> RedisResult<Vec<RespValue>> {
318 if self.commands.is_empty() {
319 return Err(RedisError::Protocol("Pipeline is empty".to_string()));
320 }
321
322 let commands: Vec<Box<dyn PipelineCommand>> = self.commands.drain(..).collect();
324
325 let mut connection = self.connection.lock().await;
327 let results = connection.execute_pipeline(commands).await?;
328
329 Ok(results)
330 }
331
332 pub async fn execute_typed<T>(&mut self) -> RedisResult<Vec<T>>
341 where
342 T: TryFrom<RespValue>,
343 T::Error: Into<RedisError>,
344 {
345 let results = self.execute().await?;
346 let mut typed_results = Vec::with_capacity(results.len());
347
348 for result in results {
349 let typed_result = T::try_from(result).map_err(Into::into)?;
350 typed_results.push(typed_result);
351 }
352
353 Ok(typed_results)
354 }
355}
356
357#[derive(Debug, Clone)]
359pub struct PipelineResult {
360 results: Vec<RespValue>,
361 index: usize,
362}
363
364impl PipelineResult {
365 #[must_use]
367 pub fn new(results: Vec<RespValue>) -> Self {
368 Self { results, index: 0 }
369 }
370
371 pub fn next<T>(&mut self) -> RedisResult<T>
377 where
378 T: TryFrom<RespValue>,
379 T::Error: Into<RedisError>,
380 {
381 if self.index >= self.results.len() {
382 return Err(RedisError::Protocol(
383 "No more results in pipeline".to_string(),
384 ));
385 }
386
387 let result = self.results[self.index].clone();
388 self.index += 1;
389
390 T::try_from(result).map_err(Into::into)
391 }
392
393 pub fn get<T>(&self, index: usize) -> RedisResult<T>
399 where
400 T: TryFrom<RespValue>,
401 T::Error: Into<RedisError>,
402 {
403 if index >= self.results.len() {
404 return Err(RedisError::Protocol(format!(
405 "Index {} out of bounds",
406 index
407 )));
408 }
409
410 let result = self.results[index].clone();
411 T::try_from(result).map_err(Into::into)
412 }
413
414 #[must_use]
416 pub fn len(&self) -> usize {
417 self.results.len()
418 }
419
420 #[must_use]
422 pub fn is_empty(&self) -> bool {
423 self.results.is_empty()
424 }
425
426 #[must_use]
428 pub fn into_results(self) -> Vec<RespValue> {
429 self.results
430 }
431}
432
433#[cfg(test)]
434mod tests {
435 use super::*;
436 use std::sync::Arc;
437 use tokio::sync::Mutex;
438
439 struct MockExecutor {
440 expected_commands: usize,
441 }
442
443 #[async_trait::async_trait]
444 impl PipelineExecutor for MockExecutor {
445 async fn execute_pipeline(
446 &mut self,
447 commands: Vec<Box<dyn PipelineCommand>>,
448 ) -> RedisResult<Vec<RespValue>> {
449 assert_eq!(commands.len(), self.expected_commands);
450
451 let mut results = Vec::new();
453 for _ in 0..commands.len() {
454 results.push(RespValue::SimpleString("OK".to_string()));
455 }
456 Ok(results)
457 }
458 }
459
460 #[tokio::test]
461 async fn test_pipeline_creation() {
462 let executor = MockExecutor {
463 expected_commands: 0,
464 };
465 let pipeline = Pipeline::new(Arc::new(Mutex::new(executor)));
466
467 assert!(pipeline.is_empty());
468 assert_eq!(pipeline.len(), 0);
469 }
470
471 #[tokio::test]
472 async fn test_pipeline_add_commands() {
473 let executor = MockExecutor {
474 expected_commands: 2,
475 };
476 let mut pipeline = Pipeline::new(Arc::new(Mutex::new(executor)));
477
478 pipeline.set("key1", "value1");
479 pipeline.get("key1");
480
481 assert_eq!(pipeline.len(), 2);
482 assert!(!pipeline.is_empty());
483 }
484
485 #[tokio::test]
486 async fn test_pipeline_execute() {
487 let executor = MockExecutor {
488 expected_commands: 2,
489 };
490 let mut pipeline = Pipeline::new(Arc::new(Mutex::new(executor)));
491
492 pipeline.set("key1", "value1");
493 pipeline.get("key1");
494
495 let results = pipeline.execute().await.unwrap();
496 assert_eq!(results.len(), 2);
497 assert!(pipeline.is_empty()); }
499
500 #[tokio::test]
501 async fn test_pipeline_clear() {
502 let executor = MockExecutor {
503 expected_commands: 0,
504 };
505 let mut pipeline = Pipeline::new(Arc::new(Mutex::new(executor)));
506
507 pipeline.set("key1", "value1");
508 pipeline.get("key1");
509 assert_eq!(pipeline.len(), 2);
510
511 pipeline.clear();
512 assert!(pipeline.is_empty());
513 assert_eq!(pipeline.len(), 0);
514 }
515
516 #[tokio::test]
517 async fn test_pipeline_result() {
518 let results = vec![
519 RespValue::SimpleString("OK".to_string()),
520 RespValue::BulkString(b"value1".to_vec().into()),
521 RespValue::Integer(42),
522 ];
523
524 let mut pipeline_result = PipelineResult::new(results);
525
526 assert_eq!(pipeline_result.len(), 3);
527 assert!(!pipeline_result.is_empty());
528
529 let first: String = pipeline_result.next().unwrap();
530 assert_eq!(first, "OK");
531
532 let second: String = pipeline_result.get(1).unwrap();
533 assert_eq!(second, "value1");
534 }
535}