1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use crate::{
    cmd,
    resp::{Array, ResultValueExt, Value},
    Command, CommandSend, Database, Error, GenericCommands, HashCommands, ListCommands, Result,
    ServerCommands, SetCommands, SortedSetCommands, StringCommands, ValueReceiver, ValueSender,
};
use futures::{channel::oneshot, Future};
use std::{collections::VecDeque, pin::Pin, sync::Mutex};

pub struct Transaction {
    database: Database,
    command_queue: Mutex<VecDeque<Command>>,
    value_sender_queue: Mutex<VecDeque<ValueSender>>,
}

impl Transaction {
    pub fn new(database: Database) -> Self {
        Self {
            database,
            command_queue: Mutex::new(VecDeque::new()),
            value_sender_queue: Mutex::new(VecDeque::new()),
        }
    }

    pub fn send<'a>(&'a self, command: Command) -> impl Future<Output = Result<Value>> + 'a {
        let (value_sender, value_receiver): (ValueSender, ValueReceiver) = oneshot::channel();

        self.command_queue.lock().unwrap().push_back(command);
        self.value_sender_queue
            .lock()
            .unwrap()
            .push_back(value_sender);

        async fn await_for_result(value_receiver: ValueReceiver) -> Result<Value> {
            let value = value_receiver.await?;
            value.into_result()
        }

        await_for_result(value_receiver)
    }

    pub async fn execute(&self) -> Result<()> {
        let mut commands = self
            .command_queue
            .lock()
            .unwrap()
            .drain(..)
            .collect::<VecDeque<_>>();
        let mut value_senders = self
            .value_sender_queue
            .lock()
            .unwrap()
            .drain(..)
            .collect::<VecDeque<_>>();

        self.database.send(cmd("MULTI")).await?;

        while let Some(command) = commands.pop_front() {
            self.database.send(command).await.into_result()?;
        }

        let result = self.database.send(cmd("EXEC")).await?;

        match result {
            Value::Array(Array::Vec(results)) => {
                for value in results.into_iter() {
                    match value_senders.pop_front() {
                        Some(value_sender) => {
                            let _ = value_sender.send(value.into());
                        }
                        None => {
                            return Err(Error::Internal("Unexpected transaction reply".to_owned()))
                        }
                    }
                }
            }
            Value::Error(e) => return Err(Error::Redis(e)),
            _ => return Err(Error::Internal("Unexpected transaction reply".to_owned())),
        }

        Ok(())
    }
}

impl CommandSend for Transaction {
    fn send(&self, command: Command) -> Pin<Box<dyn Future<Output = Result<Value>> + Send + '_>> {
        Box::pin(self.send(command))
    }
}

impl GenericCommands for Transaction {}
impl HashCommands for Transaction {}
impl ListCommands for Transaction {}
impl SetCommands for Transaction {}
impl SortedSetCommands for Transaction {}
impl ServerCommands for Transaction {}
impl StringCommands for Transaction {}