redis_driver/clients/
pipeline.rs

1#[cfg(feature = "redis-graph")]
2use crate::GraphCommands;
3#[cfg(feature = "redis-json")]
4use crate::JsonCommands;
5#[cfg(feature = "redis-search")]
6use crate::SearchCommands;
7#[cfg(feature = "redis-time-series")]
8use crate::TimeSeriesCommands;
9use crate::{
10    resp::{Command, FromValue, ResultValueExt, Value},
11    BitmapCommands, ClusterCommands, ConnectionCommands, GenericCommands, GeoCommands,
12    HashCommands, HyperLogLogCommands, InnerClient, ListCommands, PreparedCommand, Result,
13    ScriptingCommands, ServerCommands, SetCommands, SortedSetCommands, StreamCommands,
14    StringCommands,
15};
16#[cfg(feature = "redis-bloom")]
17use crate::{BloomCommands, CountMinSketchCommands, CuckooCommands, TDigestCommands, TopKCommands};
18use std::iter::zip;
19
20pub struct Pipeline {
21    client: InnerClient,
22    commands: Vec<Command>,
23    forget_flags: Vec<bool>,
24}
25
26impl Pipeline {
27    pub(crate) fn new(client: InnerClient) -> Pipeline {
28        Pipeline {
29            client,
30            commands: Vec::new(),
31            forget_flags: Vec::new(),
32        }
33    }
34
35    /// Queue a command
36    pub fn queue(&mut self, command: Command) {
37        self.commands.push(command);
38        self.forget_flags.push(false);
39    }
40
41    /// Queue a command and forget its response
42    pub fn forget(&mut self, command: Command) {
43        self.commands.push(command);
44        self.forget_flags.push(true);
45    }
46
47    pub async fn execute<T: FromValue>(mut self) -> Result<T> {
48        let num_commands = self.commands.len();
49        let result = self.client.send_batch(self.commands).await?;
50
51        match result {
52            Value::Array(Some(results)) if num_commands > 1 => {
53                let mut filtered_results = zip(results, self.forget_flags.iter())
54                    .filter_map(
55                        |(value, forget_flag)| if *forget_flag { None } else { Some(value) },
56                    )
57                    .collect::<Vec<_>>();
58
59                if filtered_results.len() == 1 {
60                    let value = filtered_results.pop().unwrap();
61                    Ok(value).into_result()?.into()
62                } else {
63                    Value::Array(Some(filtered_results)).into()
64                }
65            }
66            _ => Ok(result).into_result()?.into(),
67        }
68    }
69}
70
71pub trait PipelinePreparedCommand<'a, R>
72where
73    R: FromValue,
74{
75    /// Queue a command.
76    fn queue(self);
77
78    /// Queue a command and forget its response.
79    fn forget(self);
80}
81
82impl<'a, R> PipelinePreparedCommand<'a, R> for PreparedCommand<'a, Pipeline, R>
83where
84    R: FromValue + Send + 'a,
85{
86    /// Queue a command.
87    fn queue(self) {
88        self.executor.queue(self.command)
89    }
90
91    /// Queue a command and forget its response.
92    fn forget(self) {
93        self.executor.forget(self.command)
94    }
95}
96
97impl BitmapCommands for Pipeline {}
98#[cfg(feature = "redis-bloom")]
99impl BloomCommands for Pipeline {}
100impl ClusterCommands for Pipeline {}
101impl ConnectionCommands for Pipeline {}
102#[cfg(feature = "redis-bloom")]
103impl CountMinSketchCommands for Pipeline {}
104#[cfg(feature = "redis-bloom")]
105impl CuckooCommands for Pipeline {}
106impl GenericCommands for Pipeline {}
107impl GeoCommands for Pipeline {}
108#[cfg(feature = "redis-graph")]
109impl GraphCommands for Pipeline {}
110impl HashCommands for Pipeline {}
111impl HyperLogLogCommands for Pipeline {}
112#[cfg(feature = "redis-json")]
113impl JsonCommands for Pipeline {}
114impl ListCommands for Pipeline {}
115#[cfg(feature = "redis-search")]
116impl SearchCommands for Pipeline {}
117impl SetCommands for Pipeline {}
118impl ScriptingCommands for Pipeline {}
119impl ServerCommands for Pipeline {}
120impl SortedSetCommands for Pipeline {}
121impl StreamCommands for Pipeline {}
122impl StringCommands for Pipeline {}
123#[cfg(feature = "redis-bloom")]
124impl TDigestCommands for Pipeline {}
125#[cfg(feature = "redis-time-series")]
126impl TimeSeriesCommands for Pipeline {}
127#[cfg(feature = "redis-bloom")]
128impl TopKCommands for Pipeline {}