1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use crate::{
cmd,
resp::{Array, ResultValueExt, Value},
Command, CommandSend, Database, Error, GenericCommands, HashCommands, ListCommands, Result,
ServerCommands, SetCommands, StringCommands, ValueReceiver, ValueSender,
};
use futures::{channel::oneshot, Future};
use std::{collections::VecDeque, pin::Pin, sync::Mutex};
pub struct Transaction {
database: Database,
command_queue: Mutex<VecDeque<Command>>,
value_sender_queue: Mutex<VecDeque<ValueSender>>,
}
impl Transaction {
pub fn new(database: Database) -> Self {
Self {
database,
command_queue: Mutex::new(VecDeque::new()),
value_sender_queue: Mutex::new(VecDeque::new()),
}
}
pub fn send<'a>(&'a self, command: Command) -> impl Future<Output = Result<Value>> + 'a {
let (value_sender, value_receiver): (ValueSender, ValueReceiver) = oneshot::channel();
self.command_queue.lock().unwrap().push_back(command);
self.value_sender_queue
.lock()
.unwrap()
.push_back(value_sender);
async fn await_for_result(value_receiver: ValueReceiver) -> Result<Value> {
let value = value_receiver.await?;
value.into_result()
}
await_for_result(value_receiver)
}
pub async fn execute(&self) -> Result<()> {
let mut commands = self
.command_queue
.lock()
.unwrap()
.drain(..)
.collect::<VecDeque<_>>();
let mut value_senders = self
.value_sender_queue
.lock()
.unwrap()
.drain(..)
.collect::<VecDeque<_>>();
self.database.send(cmd("MULTI")).await?;
while let Some(command) = commands.pop_front() {
self.database.send(command).await.into_result()?;
}
let result = self.database.send(cmd("EXEC")).await?;
match result {
Value::Array(Array::Vec(results)) => {
for value in results.into_iter() {
match value_senders.pop_front() {
Some(value_sender) => {
let _ = value_sender.send(value.into());
}
None => {
return Err(Error::Internal("Unexpected transaction reply".to_owned()))
}
}
}
}
Value::Error(e) => return Err(Error::Redis(e)),
_ => return Err(Error::Internal("Unexpected transaction reply".to_owned())),
}
Ok(())
}
}
impl CommandSend for Transaction {
fn send(&self, command: Command) -> Pin<Box<dyn Future<Output = Result<Value>> + Send + '_>> {
Box::pin(self.send(command))
}
}
impl GenericCommands for Transaction {}
impl HashCommands for Transaction {}
impl ListCommands for Transaction {}
impl SetCommands for Transaction {}
impl ServerCommands for Transaction {}
impl StringCommands for Transaction {}