redis_driver/clients/
transaction.rs1#[cfg(feature = "redis-graph")]
2use crate::GraphCommands;
3#[cfg(feature = "redis-json")]
4use crate::JsonCommands;
5#[cfg(feature = "redis-search")]
6use crate::SearchCommands;
7#[cfg(feature = "redis-time-series")]
8use crate::TimeSeriesCommands;
9use crate::{
10 resp::{cmd, Command, FromValue, ResultValueExt, Value},
11 BitmapCommands, Error, GenericCommands, GeoCommands, HashCommands, HyperLogLogCommands,
12 InnerClient, ListCommands, PipelinePreparedCommand, PreparedCommand, Result, ScriptingCommands,
13 ServerCommands, SetCommands, SortedSetCommands, StreamCommands, StringCommands,
14};
15#[cfg(feature = "redis-bloom")]
16use crate::{BloomCommands, CountMinSketchCommands, CuckooCommands, TDigestCommands, TopKCommands};
17use std::iter::zip;
18
19pub struct Transaction {
21 client: InnerClient,
22 commands: Vec<Command>,
23 forget_flags: Vec<bool>,
24}
25
26impl Transaction {
27 pub(crate) fn new(client: InnerClient) -> Transaction {
28 let mut transaction = Transaction {
29 client,
30 commands: Vec::new(),
31 forget_flags: Vec::new(),
32 };
33
34 transaction.queue(cmd("MULTI"));
35 transaction
36 }
37
38 pub fn queue(&mut self, command: Command) {
40 self.commands.push(command);
41 self.forget_flags.push(false);
42 }
43
44 pub fn forget(&mut self, command: Command) {
46 self.commands.push(command);
47 self.forget_flags.push(true);
48 }
49
50 pub async fn execute<T: FromValue>(mut self) -> Result<T> {
51 self.queue(cmd("EXEC"));
52
53 let num_commands = self.commands.len();
54
55 let values: Vec<Value> = self.client.send_batch(self.commands).await?.into()?;
56 let mut iter = values.into_iter();
57
58 for _ in 0..num_commands - 1 {
60 if let Some(Value::Error(e)) = iter.next() {
61 return Err(Error::Redis(e));
62 }
63 }
64
65 if let Some(result) = iter.next() {
67 match result {
68 Value::Array(Some(results)) => {
69 let mut filtered_results = zip(results, self.forget_flags.iter().skip(1))
70 .filter_map(
71 |(value, forget_flag)| if *forget_flag { None } else { Some(value) },
72 )
73 .collect::<Vec<_>>();
74
75 if filtered_results.len() == 1 {
76 let value = filtered_results.pop().unwrap();
77 Ok(value).into_result()?.into()
78 } else {
79 Value::Array(Some(filtered_results)).into()
80 }
81 }
82 Value::Array(None) | Value::BulkString(None) => Err(Error::Aborted),
83 _ => Err(Error::Client("Unexpected transaction reply".to_owned())),
84 }
85 } else {
86 Err(Error::Client(
87 "Unexpected result for transaction".to_owned(),
88 ))
89 }
90 }
91}
92
93impl<'a, R> PipelinePreparedCommand<'a, R> for PreparedCommand<'a, Transaction, R>
94where
95 R: FromValue + Send + 'a,
96{
97 fn queue(self) {
99 self.executor.queue(self.command)
100 }
101
102 fn forget(self) {
104 self.executor.forget(self.command)
105 }
106}
107
108impl BitmapCommands for Transaction {}
109#[cfg(feature = "redis-bloom")]
110impl BloomCommands for Transaction {}
111#[cfg(feature = "redis-bloom")]
112impl CountMinSketchCommands for Transaction {}
113#[cfg(feature = "redis-bloom")]
114impl CuckooCommands for Transaction {}
115impl GenericCommands for Transaction {}
116impl GeoCommands for Transaction {}
117#[cfg(feature = "redis-graph")]
118impl GraphCommands for Transaction {}
119impl HashCommands for Transaction {}
120impl HyperLogLogCommands for Transaction {}
121#[cfg(feature = "redis-json")]
122impl JsonCommands for Transaction {}
123impl ListCommands for Transaction {}
124#[cfg(feature = "redis-search")]
125impl SearchCommands for Transaction {}
126impl SetCommands for Transaction {}
127impl ScriptingCommands for Transaction {}
128impl ServerCommands for Transaction {}
129impl SortedSetCommands for Transaction {}
130impl StreamCommands for Transaction {}
131impl StringCommands for Transaction {}
132#[cfg(feature = "redis-bloom")]
133impl TDigestCommands for Transaction {}
134#[cfg(feature = "redis-time-series")]
135impl TimeSeriesCommands for Transaction {}
136#[cfg(feature = "redis-bloom")]
137impl TopKCommands for Transaction {}