mco_redis_rs/cluster_pipeline.rs
1use crate::cluster::ClusterConnection;
2use crate::cmd::{cmd, Cmd};
3use crate::types::{from_redis_value, ErrorKind, FromRedisValue, RedisResult, ToRedisArgs, Value};
4use std::collections::HashSet;
5
6pub(crate) const UNROUTABLE_ERROR: (ErrorKind, &str) = (
7 ErrorKind::ClientError,
8 "This command cannot be safely routed in cluster mode",
9);
10
11fn is_illegal_cmd(cmd: &str) -> bool {
12 matches!(
13 cmd,
14 "BGREWRITEAOF" | "BGSAVE" | "BITOP" | "BRPOPLPUSH" |
15 // All commands that start with "CLIENT"
16 "CLIENT" | "CLIENT GETNAME" | "CLIENT KILL" | "CLIENT LIST" | "CLIENT SETNAME" |
17 // All commands that start with "CONFIG"
18 "CONFIG" | "CONFIG GET" | "CONFIG RESETSTAT" | "CONFIG REWRITE" | "CONFIG SET" |
19 "DBSIZE" |
20 "ECHO" | "EVALSHA" |
21 "FLUSHALL" | "FLUSHDB" |
22 "INFO" |
23 "KEYS" |
24 "LASTSAVE" |
25 "MGET" | "MOVE" | "MSET" | "MSETNX" |
26 "PFMERGE" | "PFCOUNT" | "PING" | "PUBLISH" |
27 "RANDOMKEY" | "RENAME" | "RENAMENX" | "RPOPLPUSH" |
28 "SAVE" | "SCAN" |
29 // All commands that start with "SCRIPT"
30 "SCRIPT" | "SCRIPT EXISTS" | "SCRIPT FLUSH" | "SCRIPT KILL" | "SCRIPT LOAD" |
31 "SDIFF" | "SDIFFSTORE" |
32 // All commands that start with "SENTINEL"
33 "SENTINEL" | "SENTINEL GET MASTER ADDR BY NAME" | "SENTINEL MASTER" | "SENTINEL MASTERS" |
34 "SENTINEL MONITOR" | "SENTINEL REMOVE" | "SENTINEL SENTINELS" | "SENTINEL SET" |
35 "SENTINEL SLAVES" | "SHUTDOWN" | "SINTER" | "SINTERSTORE" | "SLAVEOF" |
36 // All commands that start with "SLOWLOG"
37 "SLOWLOG" | "SLOWLOG GET" | "SLOWLOG LEN" | "SLOWLOG RESET" |
38 "SMOVE" | "SORT" | "SUNION" | "SUNIONSTORE" |
39 "TIME"
40 )
41}
42
43/// Represents a Redis Cluster command pipeline.
44#[derive(Clone)]
45pub struct ClusterPipeline {
46 commands: Vec<Cmd>,
47 ignored_commands: HashSet<usize>,
48}
49
50/// A cluster pipeline is almost identical to a normal [Pipeline](Pipeline), with two exceptions:
51/// * It does not support transactions
52/// * The following commands can not be used in a cluster pipeline:
53/// ```text
54/// BGREWRITEAOF, BGSAVE, BITOP, BRPOPLPUSH
55/// CLIENT GETNAME, CLIENT KILL, CLIENT LIST, CLIENT SETNAME, CONFIG GET,
56/// CONFIG RESETSTAT, CONFIG REWRITE, CONFIG SET
57/// DBSIZE
58/// ECHO, EVALSHA
59/// FLUSHALL, FLUSHDB
60/// INFO
61/// KEYS
62/// LASTSAVE
63/// MGET, MOVE, MSET, MSETNX
64/// PFMERGE, PFCOUNT, PING, PUBLISH
65/// RANDOMKEY, RENAME, RENAMENX, RPOPLPUSH
66/// SAVE, SCAN, SCRIPT EXISTS, SCRIPT FLUSH, SCRIPT KILL, SCRIPT LOAD, SDIFF, SDIFFSTORE,
67/// SENTINEL GET MASTER ADDR BY NAME, SENTINEL MASTER, SENTINEL MASTERS, SENTINEL MONITOR,
68/// SENTINEL REMOVE, SENTINEL SENTINELS, SENTINEL SET, SENTINEL SLAVES, SHUTDOWN, SINTER,
69/// SINTERSTORE, SLAVEOF, SLOWLOG GET, SLOWLOG LEN, SLOWLOG RESET, SMOVE, SORT, SUNION, SUNIONSTORE
70/// TIME
71/// ```
72impl ClusterPipeline {
73 /// Create an empty pipeline.
74 pub fn new() -> ClusterPipeline {
75 Self::with_capacity(0)
76 }
77
78 /// Creates an empty pipeline with pre-allocated capacity.
79 pub fn with_capacity(capacity: usize) -> ClusterPipeline {
80 ClusterPipeline {
81 commands: Vec::with_capacity(capacity),
82 ignored_commands: HashSet::new(),
83 }
84 }
85
86 pub(crate) fn commands(&self) -> &Vec<Cmd> {
87 &self.commands
88 }
89
90 /// Executes the pipeline and fetches the return values:
91 ///
92 /// ```rust,no_run
93 /// # let nodes = vec!["redis://127.0.0.1:6379/"];
94 /// # let client = redis::cluster::ClusterClient::open(nodes).unwrap();
95 /// # let mut con = client.get_connection().unwrap();
96 /// let mut pipe = redis::cluster::cluster_pipe();
97 /// let (k1, k2) : (i32, i32) = pipe
98 /// .cmd("SET").arg("key_1").arg(42).ignore()
99 /// .cmd("SET").arg("key_2").arg(43).ignore()
100 /// .cmd("GET").arg("key_1")
101 /// .cmd("GET").arg("key_2").query(&mut con).unwrap();
102 /// ```
103 #[inline]
104 pub fn query<T: FromRedisValue>(&self, con: &mut ClusterConnection) -> RedisResult<T> {
105 for cmd in &self.commands {
106 let cmd_name = std::str::from_utf8(cmd.arg_idx(0).unwrap_or(b""))
107 .unwrap_or("")
108 .trim()
109 .to_ascii_uppercase();
110
111 if is_illegal_cmd(&cmd_name) {
112 fail!((
113 UNROUTABLE_ERROR.0,
114 UNROUTABLE_ERROR.1,
115 format!(
116 "Command '{}' can't be executed in a cluster pipeline.",
117 cmd_name
118 )
119 ))
120 }
121 }
122
123 from_redis_value(
124 &(if self.commands.is_empty() {
125 Value::Bulk(vec![])
126 } else {
127 self.make_pipeline_results(con.execute_pipeline(self)?)
128 }),
129 )
130 }
131
132 /// This is a shortcut to `query()` that does not return a value and
133 /// will fail the task if the query of the pipeline fails.
134 ///
135 /// This is equivalent to a call to query like this:
136 ///
137 /// ```rust,no_run
138 /// # let nodes = vec!["redis://127.0.0.1:6379/"];
139 /// # let client = redis::cluster::ClusterClient::open(nodes).unwrap();
140 /// # let mut con = client.get_connection().unwrap();
141 /// let mut pipe = redis::cluster::cluster_pipe();
142 /// let _ : () = pipe.cmd("SET").arg("key_1").arg(42).ignore().query(&mut con).unwrap();
143 /// ```
144 #[inline]
145 pub fn execute(&self, con: &mut ClusterConnection) {
146 self.query::<()>(con).unwrap();
147 }
148}
149
150/// Shortcut for creating a new cluster pipeline.
151pub fn cluster_pipe() -> ClusterPipeline {
152 ClusterPipeline::new()
153}
154
155implement_pipeline_commands!(ClusterPipeline);