redis_driver/clients/
pipeline.rs1#[cfg(feature = "redis-graph")]
2use crate::GraphCommands;
3#[cfg(feature = "redis-json")]
4use crate::JsonCommands;
5#[cfg(feature = "redis-search")]
6use crate::SearchCommands;
7#[cfg(feature = "redis-time-series")]
8use crate::TimeSeriesCommands;
9use crate::{
10 resp::{Command, FromValue, ResultValueExt, Value},
11 BitmapCommands, ClusterCommands, ConnectionCommands, GenericCommands, GeoCommands,
12 HashCommands, HyperLogLogCommands, InnerClient, ListCommands, PreparedCommand, Result,
13 ScriptingCommands, ServerCommands, SetCommands, SortedSetCommands, StreamCommands,
14 StringCommands,
15};
16#[cfg(feature = "redis-bloom")]
17use crate::{BloomCommands, CountMinSketchCommands, CuckooCommands, TDigestCommands, TopKCommands};
18use std::iter::zip;
19
20pub struct Pipeline {
21 client: InnerClient,
22 commands: Vec<Command>,
23 forget_flags: Vec<bool>,
24}
25
26impl Pipeline {
27 pub(crate) fn new(client: InnerClient) -> Pipeline {
28 Pipeline {
29 client,
30 commands: Vec::new(),
31 forget_flags: Vec::new(),
32 }
33 }
34
35 pub fn queue(&mut self, command: Command) {
37 self.commands.push(command);
38 self.forget_flags.push(false);
39 }
40
41 pub fn forget(&mut self, command: Command) {
43 self.commands.push(command);
44 self.forget_flags.push(true);
45 }
46
47 pub async fn execute<T: FromValue>(mut self) -> Result<T> {
48 let num_commands = self.commands.len();
49 let result = self.client.send_batch(self.commands).await?;
50
51 match result {
52 Value::Array(Some(results)) if num_commands > 1 => {
53 let mut filtered_results = zip(results, self.forget_flags.iter())
54 .filter_map(
55 |(value, forget_flag)| if *forget_flag { None } else { Some(value) },
56 )
57 .collect::<Vec<_>>();
58
59 if filtered_results.len() == 1 {
60 let value = filtered_results.pop().unwrap();
61 Ok(value).into_result()?.into()
62 } else {
63 Value::Array(Some(filtered_results)).into()
64 }
65 }
66 _ => Ok(result).into_result()?.into(),
67 }
68 }
69}
70
71pub trait PipelinePreparedCommand<'a, R>
72where
73 R: FromValue,
74{
75 fn queue(self);
77
78 fn forget(self);
80}
81
82impl<'a, R> PipelinePreparedCommand<'a, R> for PreparedCommand<'a, Pipeline, R>
83where
84 R: FromValue + Send + 'a,
85{
86 fn queue(self) {
88 self.executor.queue(self.command)
89 }
90
91 fn forget(self) {
93 self.executor.forget(self.command)
94 }
95}
96
97impl BitmapCommands for Pipeline {}
98#[cfg(feature = "redis-bloom")]
99impl BloomCommands for Pipeline {}
100impl ClusterCommands for Pipeline {}
101impl ConnectionCommands for Pipeline {}
102#[cfg(feature = "redis-bloom")]
103impl CountMinSketchCommands for Pipeline {}
104#[cfg(feature = "redis-bloom")]
105impl CuckooCommands for Pipeline {}
106impl GenericCommands for Pipeline {}
107impl GeoCommands for Pipeline {}
108#[cfg(feature = "redis-graph")]
109impl GraphCommands for Pipeline {}
110impl HashCommands for Pipeline {}
111impl HyperLogLogCommands for Pipeline {}
112#[cfg(feature = "redis-json")]
113impl JsonCommands for Pipeline {}
114impl ListCommands for Pipeline {}
115#[cfg(feature = "redis-search")]
116impl SearchCommands for Pipeline {}
117impl SetCommands for Pipeline {}
118impl ScriptingCommands for Pipeline {}
119impl ServerCommands for Pipeline {}
120impl SortedSetCommands for Pipeline {}
121impl StreamCommands for Pipeline {}
122impl StringCommands for Pipeline {}
123#[cfg(feature = "redis-bloom")]
124impl TDigestCommands for Pipeline {}
125#[cfg(feature = "redis-time-series")]
126impl TimeSeriesCommands for Pipeline {}
127#[cfg(feature = "redis-bloom")]
128impl TopKCommands for Pipeline {}