1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
use crate::cluster::ClusterConnection;
use crate::cmd::{cmd, Cmd};
use crate::types::{
from_redis_value, ErrorKind, FromRedisValue, HashSet, RedisResult, ToRedisArgs, Value,
};
pub(crate) const UNROUTABLE_ERROR: (ErrorKind, &str) = (
ErrorKind::ClientError,
"This command cannot be safely routed in cluster mode",
);
fn is_illegal_cmd(cmd: &str) -> bool {
matches!(
cmd,
"BGREWRITEAOF" | "BGSAVE" | "BITOP" | "BRPOPLPUSH" |
// All commands that start with "CLIENT"
"CLIENT" | "CLIENT GETNAME" | "CLIENT KILL" | "CLIENT LIST" | "CLIENT SETNAME" |
// All commands that start with "CONFIG"
"CONFIG" | "CONFIG GET" | "CONFIG RESETSTAT" | "CONFIG REWRITE" | "CONFIG SET" |
"DBSIZE" |
"ECHO" | "EVALSHA" |
"FLUSHALL" | "FLUSHDB" |
"INFO" |
"KEYS" |
"LASTSAVE" |
"MGET" | "MOVE" | "MSET" | "MSETNX" |
"PFMERGE" | "PFCOUNT" | "PING" | "PUBLISH" |
"RANDOMKEY" | "RENAME" | "RENAMENX" | "RPOPLPUSH" |
"SAVE" | "SCAN" |
// All commands that start with "SCRIPT"
"SCRIPT" | "SCRIPT EXISTS" | "SCRIPT FLUSH" | "SCRIPT KILL" | "SCRIPT LOAD" |
"SDIFF" | "SDIFFSTORE" |
// All commands that start with "SENTINEL"
"SENTINEL" | "SENTINEL GET MASTER ADDR BY NAME" | "SENTINEL MASTER" | "SENTINEL MASTERS" |
"SENTINEL MONITOR" | "SENTINEL REMOVE" | "SENTINEL SENTINELS" | "SENTINEL SET" |
"SENTINEL SLAVES" | "SHUTDOWN" | "SINTER" | "SINTERSTORE" | "SLAVEOF" |
// All commands that start with "SLOWLOG"
"SLOWLOG" | "SLOWLOG GET" | "SLOWLOG LEN" | "SLOWLOG RESET" |
"SMOVE" | "SORT" | "SUNION" | "SUNIONSTORE" |
"TIME"
)
}
/// Represents a Redis Cluster command pipeline.
#[derive(Clone)]
pub struct ClusterPipeline {
commands: Vec<Cmd>,
ignored_commands: HashSet<usize>,
}
/// A cluster pipeline is almost identical to a normal [Pipeline](crate::pipeline::Pipeline), with two exceptions:
/// * It does not support transactions
/// * The following commands can not be used in a cluster pipeline:
/// ```text
/// BGREWRITEAOF, BGSAVE, BITOP, BRPOPLPUSH
/// CLIENT GETNAME, CLIENT KILL, CLIENT LIST, CLIENT SETNAME, CONFIG GET,
/// CONFIG RESETSTAT, CONFIG REWRITE, CONFIG SET
/// DBSIZE
/// ECHO, EVALSHA
/// FLUSHALL, FLUSHDB
/// INFO
/// KEYS
/// LASTSAVE
/// MGET, MOVE, MSET, MSETNX
/// PFMERGE, PFCOUNT, PING, PUBLISH
/// RANDOMKEY, RENAME, RENAMENX, RPOPLPUSH
/// SAVE, SCAN, SCRIPT EXISTS, SCRIPT FLUSH, SCRIPT KILL, SCRIPT LOAD, SDIFF, SDIFFSTORE,
/// SENTINEL GET MASTER ADDR BY NAME, SENTINEL MASTER, SENTINEL MASTERS, SENTINEL MONITOR,
/// SENTINEL REMOVE, SENTINEL SENTINELS, SENTINEL SET, SENTINEL SLAVES, SHUTDOWN, SINTER,
/// SINTERSTORE, SLAVEOF, SLOWLOG GET, SLOWLOG LEN, SLOWLOG RESET, SMOVE, SORT, SUNION, SUNIONSTORE
/// TIME
/// ```
impl ClusterPipeline {
/// Create an empty pipeline.
pub fn new() -> ClusterPipeline {
Self::with_capacity(0)
}
/// Creates an empty pipeline with pre-allocated capacity.
pub fn with_capacity(capacity: usize) -> ClusterPipeline {
ClusterPipeline {
commands: Vec::with_capacity(capacity),
ignored_commands: HashSet::new(),
}
}
pub(crate) fn commands(&self) -> &Vec<Cmd> {
&self.commands
}
/// Executes the pipeline and fetches the return values:
///
/// ```rust,no_run
/// # let nodes = vec!["redis://127.0.0.1:6379/"];
/// # let client = redis::cluster::ClusterClient::new(nodes).unwrap();
/// # let mut con = client.get_connection().unwrap();
/// let mut pipe = redis::cluster::cluster_pipe();
/// let (k1, k2) : (i32, i32) = pipe
/// .cmd("SET").arg("key_1").arg(42).ignore()
/// .cmd("SET").arg("key_2").arg(43).ignore()
/// .cmd("GET").arg("key_1")
/// .cmd("GET").arg("key_2").query(&mut con).unwrap();
/// ```
#[inline]
pub fn query<T: FromRedisValue>(&self, con: &mut ClusterConnection) -> RedisResult<T> {
for cmd in &self.commands {
let cmd_name = std::str::from_utf8(cmd.arg_idx(0).unwrap_or(b""))
.unwrap_or("")
.trim()
.to_ascii_uppercase();
if is_illegal_cmd(&cmd_name) {
fail!((
UNROUTABLE_ERROR.0,
UNROUTABLE_ERROR.1,
format!("Command '{cmd_name}' can't be executed in a cluster pipeline.")
))
}
}
from_redis_value(
&(if self.commands.is_empty() {
Value::Bulk(vec![])
} else {
self.make_pipeline_results(con.execute_pipeline(self)?)
}),
)
}
/// This is a shortcut to `query()` that does not return a value and
/// will fail the task if the query of the pipeline fails.
///
/// This is equivalent to a call to query like this:
///
/// ```rust,no_run
/// # let nodes = vec!["redis://127.0.0.1:6379/"];
/// # let client = redis::cluster::ClusterClient::new(nodes).unwrap();
/// # let mut con = client.get_connection().unwrap();
/// let mut pipe = redis::cluster::cluster_pipe();
/// let _ : () = pipe.cmd("SET").arg("key_1").arg(42).ignore().query(&mut con).unwrap();
/// ```
#[inline]
pub fn execute(&self, con: &mut ClusterConnection) {
self.query::<()>(con).unwrap();
}
}
/// Shortcut for creating a new cluster pipeline.
pub fn cluster_pipe() -> ClusterPipeline {
ClusterPipeline::new()
}
implement_pipeline_commands!(ClusterPipeline);