use super::ClusterConnection;
use crate::RedisError;
use crate::cmd::{Cmd, cmd};
use crate::errors::ErrorKind;
use crate::types::{FromRedisValue, HashSet, RedisResult, ToRedisArgs, Value, from_redis_value};
pub(crate) const UNROUTABLE_ERROR: (ErrorKind, &str) = (
ErrorKind::Client,
"This command cannot be safely routed in cluster mode",
);
fn is_illegal_cmd(cmd: &str) -> bool {
matches!(
cmd,
"BGREWRITEAOF" | "BGSAVE" | "BITOP" | "BRPOPLPUSH" |
"CLIENT" | "CLIENT GETNAME" | "CLIENT KILL" | "CLIENT LIST" | "CLIENT SETNAME" |
"CONFIG" | "CONFIG GET" | "CONFIG RESETSTAT" | "CONFIG REWRITE" | "CONFIG SET" |
"DBSIZE" |
"ECHO" |
"FLUSHALL" | "FLUSHDB" |
"INFO" |
"KEYS" |
"LASTSAVE" |
"MGET" | "MOVE" | "MSET" | "MSETNX" | "MSETEX" |
"PING" | "PUBLISH" |
"RANDOMKEY" | "RENAME" | "RENAMENX" | "RPOPLPUSH" |
"SAVE" | "SCAN" |
"SCRIPT" | "SCRIPT EXISTS" | "SCRIPT FLUSH" | "SCRIPT KILL" | "SCRIPT LOAD" |
"SDIFF" | "SDIFFSTORE" |
"SENTINEL" | "SENTINEL GET MASTER ADDR BY NAME" | "SENTINEL MASTER" | "SENTINEL MASTERS" |
"SENTINEL MONITOR" | "SENTINEL REMOVE" | "SENTINEL SENTINELS" | "SENTINEL SET" |
"SENTINEL SLAVES" | "SHUTDOWN" | "SINTER" | "SINTERSTORE" | "SLAVEOF" |
"SLOWLOG" | "SLOWLOG GET" | "SLOWLOG LEN" | "SLOWLOG RESET" |
"SMOVE" | "SORT" | "SUNION" | "SUNIONSTORE" |
"TIME"
)
}
#[derive(Clone)]
pub struct ClusterPipeline {
commands: Vec<Cmd>,
ignored_commands: HashSet<usize>,
ignore_errors: bool,
}
impl ClusterPipeline {
pub fn new() -> ClusterPipeline {
Self::with_capacity(0)
}
pub fn with_capacity(capacity: usize) -> ClusterPipeline {
ClusterPipeline {
commands: Vec::with_capacity(capacity),
ignored_commands: HashSet::new(),
ignore_errors: false,
}
}
pub(crate) fn commands(&self) -> &Vec<Cmd> {
&self.commands
}
#[inline]
pub fn query<T: FromRedisValue>(&self, con: &mut ClusterConnection) -> RedisResult<T> {
for cmd in &self.commands {
let cmd_name = std::str::from_utf8(cmd.arg_idx(0).unwrap_or(b""))
.unwrap_or("")
.trim()
.to_ascii_uppercase();
if is_illegal_cmd(&cmd_name) {
fail!((
UNROUTABLE_ERROR.0,
UNROUTABLE_ERROR.1,
format!("Command '{cmd_name}' can't be executed in a cluster pipeline.")
))
}
}
if self.commands.is_empty() {
Err(RedisError::make_empty_command())
} else {
self.compose_response(con.execute_pipeline(self)?)
}
}
#[inline]
pub fn exec(&self, con: &mut ClusterConnection) -> RedisResult<()> {
self.query::<()>(con)
}
}
pub fn cluster_pipe() -> ClusterPipeline {
ClusterPipeline::new()
}
implement_pipeline_commands!(ClusterPipeline);