redis_driver/clients/
transaction.rs

1#[cfg(feature = "redis-graph")]
2use crate::GraphCommands;
3#[cfg(feature = "redis-json")]
4use crate::JsonCommands;
5#[cfg(feature = "redis-search")]
6use crate::SearchCommands;
7#[cfg(feature = "redis-time-series")]
8use crate::TimeSeriesCommands;
9use crate::{
10    resp::{cmd, Command, FromValue, ResultValueExt, Value},
11    BitmapCommands, Error, GenericCommands, GeoCommands, HashCommands, HyperLogLogCommands,
12    InnerClient, ListCommands, PipelinePreparedCommand, PreparedCommand, Result, ScriptingCommands,
13    ServerCommands, SetCommands, SortedSetCommands, StreamCommands, StringCommands,
14};
15#[cfg(feature = "redis-bloom")]
16use crate::{BloomCommands, CountMinSketchCommands, CuckooCommands, TDigestCommands, TopKCommands};
17use std::iter::zip;
18
19/// Represents an on-going [`transaction`](https://redis.io/docs/manual/transactions/) on a specific client instance.
20pub struct Transaction {
21    client: InnerClient,
22    commands: Vec<Command>,
23    forget_flags: Vec<bool>,
24}
25
26impl Transaction {
27    pub(crate) fn new(client: InnerClient) -> Transaction {
28        let mut transaction = Transaction {
29            client,
30            commands: Vec::new(),
31            forget_flags: Vec::new(),
32        };
33
34        transaction.queue(cmd("MULTI"));
35        transaction
36    }
37
38    /// Queue a command into the transaction.
39    pub fn queue(&mut self, command: Command) {
40        self.commands.push(command);
41        self.forget_flags.push(false);
42    }
43
44    /// Queue a command into the transaction and forget its response.
45    pub fn forget(&mut self, command: Command) {
46        self.commands.push(command);
47        self.forget_flags.push(true);
48    }
49
50    pub async fn execute<T: FromValue>(mut self) -> Result<T> {
51        self.queue(cmd("EXEC"));
52
53        let num_commands = self.commands.len();
54
55        let values: Vec<Value> = self.client.send_batch(self.commands).await?.into()?;
56        let mut iter = values.into_iter();
57
58        // MULTI + QUEUED commands
59        for _ in 0..num_commands - 1 {
60            if let Some(Value::Error(e)) = iter.next() {
61                return Err(Error::Redis(e));
62            }
63        }
64
65        // EXEC
66        if let Some(result) = iter.next() {
67            match result {
68                Value::Array(Some(results)) => {
69                    let mut filtered_results = zip(results, self.forget_flags.iter().skip(1))
70                        .filter_map(
71                            |(value, forget_flag)| if *forget_flag { None } else { Some(value) },
72                        )
73                        .collect::<Vec<_>>();
74
75                    if filtered_results.len() == 1 {
76                        let value = filtered_results.pop().unwrap();
77                        Ok(value).into_result()?.into()
78                    } else {
79                        Value::Array(Some(filtered_results)).into()
80                    }
81                }
82                Value::Array(None) | Value::BulkString(None) => Err(Error::Aborted),
83                _ => Err(Error::Client("Unexpected transaction reply".to_owned())),
84            }
85        } else {
86            Err(Error::Client(
87                "Unexpected result for transaction".to_owned(),
88            ))
89        }
90    }
91}
92
93impl<'a, R> PipelinePreparedCommand<'a, R> for PreparedCommand<'a, Transaction, R>
94where
95    R: FromValue + Send + 'a,
96{
97    /// Queue a command into the transaction.
98    fn queue(self) {
99        self.executor.queue(self.command)
100    }
101
102    /// Queue a command into the transaction and forget its response.
103    fn forget(self) {
104        self.executor.forget(self.command)
105    }
106}
107
108impl BitmapCommands for Transaction {}
109#[cfg(feature = "redis-bloom")]
110impl BloomCommands for Transaction {}
111#[cfg(feature = "redis-bloom")]
112impl CountMinSketchCommands for Transaction {}
113#[cfg(feature = "redis-bloom")]
114impl CuckooCommands for Transaction {}
115impl GenericCommands for Transaction {}
116impl GeoCommands for Transaction {}
117#[cfg(feature = "redis-graph")]
118impl GraphCommands for Transaction {}
119impl HashCommands for Transaction {}
120impl HyperLogLogCommands for Transaction {}
121#[cfg(feature = "redis-json")]
122impl JsonCommands for Transaction {}
123impl ListCommands for Transaction {}
124#[cfg(feature = "redis-search")]
125impl SearchCommands for Transaction {}
126impl SetCommands for Transaction {}
127impl ScriptingCommands for Transaction {}
128impl ServerCommands for Transaction {}
129impl SortedSetCommands for Transaction {}
130impl StreamCommands for Transaction {}
131impl StringCommands for Transaction {}
132#[cfg(feature = "redis-bloom")]
133impl TDigestCommands for Transaction {}
134#[cfg(feature = "redis-time-series")]
135impl TimeSeriesCommands for Transaction {}
136#[cfg(feature = "redis-bloom")]
137impl TopKCommands for Transaction {}