1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
use std::iter::zip;
use crate::{
resp::{Array, Command, FromValue, ResultValueExt, Value},
BitmapCommands, ClusterCommands, ConnectionCommands, GenericCommands, GeoCommands,
HashCommands, HyperLogLogCommands, InnerClient, ListCommands, PreparedCommand, Result,
ScriptingCommands, ServerCommands, SetCommands, SortedSetCommands, StreamCommands,
StringCommands,
};
pub struct Pipeline {
client: InnerClient,
commands: Vec<Command>,
forget_flags: Vec<bool>,
}
impl Pipeline {
pub(crate) fn new(client: InnerClient) -> Pipeline {
Pipeline {
client,
commands: Vec::new(),
forget_flags: Vec::new(),
}
}
pub fn queue(&mut self, command: Command) {
self.commands.push(command);
self.forget_flags.push(false);
}
pub fn forget(&mut self, command: Command) {
self.commands.push(command);
self.forget_flags.push(true);
}
pub async fn execute<T: FromValue>(mut self) -> Result<T> {
let result = self.client.send_batch(self.commands).await?;
match result {
Value::Array(Array::Vec(results)) => {
let mut filtered_results = zip(results, self.forget_flags.iter())
.filter_map(
|(value, forget_flag)| if *forget_flag { None } else { Some(value) },
)
.collect::<Vec<_>>();
if filtered_results.len() == 1 {
let value = filtered_results.pop().unwrap();
Ok(value).into_result()?.into()
} else {
Value::Array(Array::Vec(filtered_results)).into()
}
}
_ => Ok(result).into_result()?.into(),
}
}
}
pub trait PipelinePreparedCommand<'a, R>
where
R: FromValue,
{
fn queue(self);
fn forget(self);
}
impl<'a, R> PipelinePreparedCommand<'a, R> for PreparedCommand<'a, Pipeline, R>
where
R: FromValue + Send + 'a,
{
fn queue(self) {
self.executor.queue(self.command)
}
fn forget(self) {
self.executor.forget(self.command)
}
}
impl BitmapCommands for Pipeline {}
impl ClusterCommands for Pipeline {}
impl ConnectionCommands for Pipeline {}
impl GenericCommands for Pipeline {}
impl GeoCommands for Pipeline {}
impl HashCommands for Pipeline {}
impl HyperLogLogCommands for Pipeline {}
impl ListCommands for Pipeline {}
impl SetCommands for Pipeline {}
impl ScriptingCommands for Pipeline {}
impl ServerCommands for Pipeline {}
impl SortedSetCommands for Pipeline {}
impl StreamCommands for Pipeline {}
impl StringCommands for Pipeline {}