1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
use crate::cluster::ClusterConnection;
use crate::cmd::{cmd, Cmd};
use crate::types::{
    from_redis_value, ErrorKind, FromRedisValue, HashSet, RedisResult, ToRedisArgs, Value,
};

pub(crate) const UNROUTABLE_ERROR: (ErrorKind, &str) = (
    ErrorKind::ClientError,
    "This command cannot be safely routed in cluster mode",
);

fn is_illegal_cmd(cmd: &str) -> bool {
    matches!(
        cmd,
        "BGREWRITEAOF" | "BGSAVE" | "BITOP" | "BRPOPLPUSH" |
        // All commands that start with "CLIENT"
        "CLIENT" | "CLIENT GETNAME" | "CLIENT KILL" | "CLIENT LIST" | "CLIENT SETNAME" |
        // All commands that start with "CONFIG"
        "CONFIG" | "CONFIG GET" | "CONFIG RESETSTAT" | "CONFIG REWRITE" | "CONFIG SET" |
        "DBSIZE" |
        "ECHO" | "EVALSHA" |
        "FLUSHALL" | "FLUSHDB" |
        "INFO" |
        "KEYS" |
        "LASTSAVE" |
        "MGET" | "MOVE" | "MSET" | "MSETNX" |
        "PFMERGE" | "PFCOUNT" | "PING" | "PUBLISH" |
        "RANDOMKEY" | "RENAME" | "RENAMENX" | "RPOPLPUSH" |
        "SAVE" | "SCAN" |
        // All commands that start with "SCRIPT"
        "SCRIPT" | "SCRIPT EXISTS" | "SCRIPT FLUSH" | "SCRIPT KILL" | "SCRIPT LOAD" |
        "SDIFF" | "SDIFFSTORE" |
        // All commands that start with "SENTINEL"
        "SENTINEL" | "SENTINEL GET MASTER ADDR BY NAME" | "SENTINEL MASTER" | "SENTINEL MASTERS" |
        "SENTINEL MONITOR" | "SENTINEL REMOVE" | "SENTINEL SENTINELS" | "SENTINEL SET" |
        "SENTINEL SLAVES" | "SHUTDOWN" | "SINTER" | "SINTERSTORE" | "SLAVEOF" |
        // All commands that start with "SLOWLOG"
        "SLOWLOG" | "SLOWLOG GET" | "SLOWLOG LEN" | "SLOWLOG RESET" |
        "SMOVE" | "SORT" | "SUNION" | "SUNIONSTORE" |
        "TIME"
    )
}

/// Represents a Redis Cluster command pipeline.
#[derive(Clone)]
pub struct ClusterPipeline {
    commands: Vec<Cmd>,
    ignored_commands: HashSet<usize>,
}

/// A cluster pipeline is almost identical to a normal [Pipeline](crate::pipeline::Pipeline), with two exceptions:
/// * It does not support transactions
/// * The following commands can not be used in a cluster pipeline:
/// ```text
/// BGREWRITEAOF, BGSAVE, BITOP, BRPOPLPUSH
/// CLIENT GETNAME, CLIENT KILL, CLIENT LIST, CLIENT SETNAME, CONFIG GET,
/// CONFIG RESETSTAT, CONFIG REWRITE, CONFIG SET
/// DBSIZE
/// ECHO, EVALSHA
/// FLUSHALL, FLUSHDB
/// INFO
/// KEYS
/// LASTSAVE
/// MGET, MOVE, MSET, MSETNX
/// PFMERGE, PFCOUNT, PING, PUBLISH
/// RANDOMKEY, RENAME, RENAMENX, RPOPLPUSH
/// SAVE, SCAN, SCRIPT EXISTS, SCRIPT FLUSH, SCRIPT KILL, SCRIPT LOAD, SDIFF, SDIFFSTORE,
/// SENTINEL GET MASTER ADDR BY NAME, SENTINEL MASTER, SENTINEL MASTERS, SENTINEL MONITOR,
/// SENTINEL REMOVE, SENTINEL SENTINELS, SENTINEL SET, SENTINEL SLAVES, SHUTDOWN, SINTER,
/// SINTERSTORE, SLAVEOF, SLOWLOG GET, SLOWLOG LEN, SLOWLOG RESET, SMOVE, SORT, SUNION, SUNIONSTORE
/// TIME
/// ```
impl ClusterPipeline {
    /// Create an empty pipeline.
    pub fn new() -> ClusterPipeline {
        Self::with_capacity(0)
    }

    /// Creates an empty pipeline with pre-allocated capacity.
    pub fn with_capacity(capacity: usize) -> ClusterPipeline {
        ClusterPipeline {
            commands: Vec::with_capacity(capacity),
            ignored_commands: HashSet::new(),
        }
    }

    pub(crate) fn commands(&self) -> &Vec<Cmd> {
        &self.commands
    }

    /// Executes the pipeline and fetches the return values:
    ///
    /// ```rust,no_run
    /// # let nodes = vec!["redis://127.0.0.1:6379/"];
    /// # let client = redis::cluster::ClusterClient::new(nodes).unwrap();
    /// # let mut con = client.get_connection().unwrap();
    /// let mut pipe = redis::cluster::cluster_pipe();
    /// let (k1, k2) : (i32, i32) = pipe
    ///     .cmd("SET").arg("key_1").arg(42).ignore()
    ///     .cmd("SET").arg("key_2").arg(43).ignore()
    ///     .cmd("GET").arg("key_1")
    ///     .cmd("GET").arg("key_2").query(&mut con).unwrap();
    /// ```
    #[inline]
    pub fn query<T: FromRedisValue>(&self, con: &mut ClusterConnection) -> RedisResult<T> {
        for cmd in &self.commands {
            let cmd_name = std::str::from_utf8(cmd.arg_idx(0).unwrap_or(b""))
                .unwrap_or("")
                .trim()
                .to_ascii_uppercase();

            if is_illegal_cmd(&cmd_name) {
                fail!((
                    UNROUTABLE_ERROR.0,
                    UNROUTABLE_ERROR.1,
                    format!("Command '{cmd_name}' can't be executed in a cluster pipeline.")
                ))
            }
        }

        from_redis_value(
            &(if self.commands.is_empty() {
                Value::Bulk(vec![])
            } else {
                self.make_pipeline_results(con.execute_pipeline(self)?)
            }),
        )
    }

    /// This is a shortcut to `query()` that does not return a value and
    /// will fail the task if the query of the pipeline fails.
    ///
    /// This is equivalent to a call to query like this:
    ///
    /// ```rust,no_run
    /// # let nodes = vec!["redis://127.0.0.1:6379/"];
    /// # let client = redis::cluster::ClusterClient::new(nodes).unwrap();
    /// # let mut con = client.get_connection().unwrap();
    /// let mut pipe = redis::cluster::cluster_pipe();
    /// let _ : () = pipe.cmd("SET").arg("key_1").arg(42).ignore().query(&mut con).unwrap();
    /// ```
    #[inline]
    pub fn execute(&self, con: &mut ClusterConnection) {
        self.query::<()>(con).unwrap();
    }
}

/// Shortcut for creating a new cluster pipeline.
pub fn cluster_pipe() -> ClusterPipeline {
    ClusterPipeline::new()
}

implement_pipeline_commands!(ClusterPipeline);