use std::path::PathBuf;
use std::process::{Command, Output, Stdio};
use tokio::process::Command as TokioCommand;
use crate::error::{Error, Result};
#[derive(Debug, Clone, Copy)]
pub enum RespProtocol {
Resp2,
Resp3,
}
#[derive(Debug, Clone, Copy, Default)]
pub enum IpPreference {
#[default]
Default,
Ipv4,
Ipv6,
}
#[derive(Debug, Clone, Copy)]
pub enum OutputFormat {
Default,
Raw,
Csv,
Json,
QuotedJson,
}
#[derive(Debug, Clone)]
pub struct RedisCli {
bin: String,
host: String,
port: u16,
password: Option<String>,
user: Option<String>,
db: Option<u32>,
unixsocket: Option<PathBuf>,
tls: bool,
sni: Option<String>,
cacert: Option<PathBuf>,
cacertdir: Option<PathBuf>,
cert: Option<PathBuf>,
key: Option<PathBuf>,
insecure: bool,
tls_ciphers: Option<String>,
tls_ciphersuites: Option<String>,
resp: Option<RespProtocol>,
cluster_mode: bool,
output_format: OutputFormat,
no_auth_warning: bool,
uri: Option<String>,
timeout: Option<f64>,
askpass: bool,
client_name: Option<String>,
ip_preference: IpPreference,
repeat: Option<u32>,
interval: Option<f64>,
stdin_last_arg: bool,
stdin_tag_arg: bool,
multi_bulk_delimiter: Option<String>,
output_delimiter: Option<String>,
exit_error_code: bool,
no_raw: bool,
quoted_input: bool,
show_pushes: Option<bool>,
stat: bool,
latency: bool,
latency_history: bool,
latency_dist: bool,
bigkeys: bool,
memkeys: bool,
memkeys_samples: Option<u32>,
keystats: bool,
keystats_samples: Option<u32>,
hotkeys: bool,
scan: bool,
pattern: Option<String>,
count: Option<u32>,
quoted_pattern: Option<String>,
cursor: Option<u64>,
top: Option<u32>,
intrinsic_latency: Option<u32>,
lru_test: Option<u64>,
verbose: bool,
eval_file: Option<PathBuf>,
ldb: bool,
ldb_sync_mode: bool,
pipe: bool,
pipe_timeout: Option<u32>,
rdb: Option<PathBuf>,
functions_rdb: Option<PathBuf>,
replica: bool,
}
impl RedisCli {
pub fn new() -> Self {
Self {
bin: "redis-cli".into(),
host: "127.0.0.1".into(),
port: 6379,
password: None,
user: None,
db: None,
unixsocket: None,
tls: false,
sni: None,
cacert: None,
cacertdir: None,
cert: None,
key: None,
insecure: false,
tls_ciphers: None,
tls_ciphersuites: None,
resp: None,
cluster_mode: false,
output_format: OutputFormat::Default,
no_auth_warning: false,
uri: None,
timeout: None,
askpass: false,
client_name: None,
ip_preference: IpPreference::Default,
repeat: None,
interval: None,
stdin_last_arg: false,
stdin_tag_arg: false,
multi_bulk_delimiter: None,
output_delimiter: None,
exit_error_code: false,
no_raw: false,
quoted_input: false,
show_pushes: None,
stat: false,
latency: false,
latency_history: false,
latency_dist: false,
bigkeys: false,
memkeys: false,
memkeys_samples: None,
keystats: false,
keystats_samples: None,
hotkeys: false,
scan: false,
pattern: None,
count: None,
quoted_pattern: None,
cursor: None,
top: None,
intrinsic_latency: None,
lru_test: None,
verbose: false,
eval_file: None,
ldb: false,
ldb_sync_mode: false,
pipe: false,
pipe_timeout: None,
rdb: None,
functions_rdb: None,
replica: false,
}
}
pub fn bin(mut self, bin: impl Into<String>) -> Self {
self.bin = bin.into();
self
}
pub fn host(mut self, host: impl Into<String>) -> Self {
self.host = host.into();
self
}
pub fn port(mut self, port: u16) -> Self {
self.port = port;
self
}
pub fn password(mut self, password: impl Into<String>) -> Self {
self.password = Some(password.into());
self
}
pub fn user(mut self, user: impl Into<String>) -> Self {
self.user = Some(user.into());
self
}
pub fn db(mut self, db: u32) -> Self {
self.db = Some(db);
self
}
pub fn unixsocket(mut self, path: impl Into<PathBuf>) -> Self {
self.unixsocket = Some(path.into());
self
}
pub fn tls(mut self, enable: bool) -> Self {
self.tls = enable;
self
}
pub fn sni(mut self, hostname: impl Into<String>) -> Self {
self.sni = Some(hostname.into());
self
}
pub fn cacert(mut self, path: impl Into<PathBuf>) -> Self {
self.cacert = Some(path.into());
self
}
pub fn cert(mut self, path: impl Into<PathBuf>) -> Self {
self.cert = Some(path.into());
self
}
pub fn key(mut self, path: impl Into<PathBuf>) -> Self {
self.key = Some(path.into());
self
}
pub fn cacertdir(mut self, path: impl Into<PathBuf>) -> Self {
self.cacertdir = Some(path.into());
self
}
pub fn insecure(mut self, enable: bool) -> Self {
self.insecure = enable;
self
}
pub fn tls_ciphers(mut self, ciphers: impl Into<String>) -> Self {
self.tls_ciphers = Some(ciphers.into());
self
}
pub fn tls_ciphersuites(mut self, ciphersuites: impl Into<String>) -> Self {
self.tls_ciphersuites = Some(ciphersuites.into());
self
}
pub fn uri(mut self, uri: impl Into<String>) -> Self {
self.uri = Some(uri.into());
self
}
pub fn timeout(mut self, seconds: f64) -> Self {
self.timeout = Some(seconds);
self
}
pub fn askpass(mut self, enable: bool) -> Self {
self.askpass = enable;
self
}
pub fn client_name(mut self, name: impl Into<String>) -> Self {
self.client_name = Some(name.into());
self
}
pub fn ip_preference(mut self, preference: IpPreference) -> Self {
self.ip_preference = preference;
self
}
pub fn repeat(mut self, count: u32) -> Self {
self.repeat = Some(count);
self
}
pub fn interval(mut self, seconds: f64) -> Self {
self.interval = Some(seconds);
self
}
pub fn resp(mut self, protocol: RespProtocol) -> Self {
self.resp = Some(protocol);
self
}
pub fn cluster_mode(mut self, enable: bool) -> Self {
self.cluster_mode = enable;
self
}
pub fn output_format(mut self, format: OutputFormat) -> Self {
self.output_format = format;
self
}
pub fn no_auth_warning(mut self, suppress: bool) -> Self {
self.no_auth_warning = suppress;
self
}
pub fn stdin_last_arg(mut self, enable: bool) -> Self {
self.stdin_last_arg = enable;
self
}
pub fn stdin_tag_arg(mut self, enable: bool) -> Self {
self.stdin_tag_arg = enable;
self
}
pub fn multi_bulk_delimiter(mut self, delim: impl Into<String>) -> Self {
self.multi_bulk_delimiter = Some(delim.into());
self
}
pub fn output_delimiter(mut self, delim: impl Into<String>) -> Self {
self.output_delimiter = Some(delim.into());
self
}
pub fn exit_error_code(mut self, enable: bool) -> Self {
self.exit_error_code = enable;
self
}
pub fn no_raw(mut self, enable: bool) -> Self {
self.no_raw = enable;
self
}
pub fn quoted_input(mut self, enable: bool) -> Self {
self.quoted_input = enable;
self
}
pub fn show_pushes(mut self, enable: bool) -> Self {
self.show_pushes = Some(enable);
self
}
pub fn stat(mut self, enable: bool) -> Self {
self.stat = enable;
self
}
pub fn latency(mut self, enable: bool) -> Self {
self.latency = enable;
self
}
pub fn latency_history(mut self, enable: bool) -> Self {
self.latency_history = enable;
self
}
pub fn latency_dist(mut self, enable: bool) -> Self {
self.latency_dist = enable;
self
}
pub fn bigkeys(mut self, enable: bool) -> Self {
self.bigkeys = enable;
self
}
pub fn memkeys(mut self, enable: bool) -> Self {
self.memkeys = enable;
self
}
pub fn memkeys_samples(mut self, n: u32) -> Self {
self.memkeys_samples = Some(n);
self
}
pub fn keystats(mut self, enable: bool) -> Self {
self.keystats = enable;
self
}
pub fn keystats_samples(mut self, n: u32) -> Self {
self.keystats_samples = Some(n);
self
}
pub fn hotkeys(mut self, enable: bool) -> Self {
self.hotkeys = enable;
self
}
pub fn scan(mut self, enable: bool) -> Self {
self.scan = enable;
self
}
pub fn pattern(mut self, pat: impl Into<String>) -> Self {
self.pattern = Some(pat.into());
self
}
pub fn count(mut self, n: u32) -> Self {
self.count = Some(n);
self
}
pub fn quoted_pattern(mut self, pat: impl Into<String>) -> Self {
self.quoted_pattern = Some(pat.into());
self
}
pub fn cursor(mut self, n: u64) -> Self {
self.cursor = Some(n);
self
}
pub fn top(mut self, n: u32) -> Self {
self.top = Some(n);
self
}
pub fn intrinsic_latency(mut self, seconds: u32) -> Self {
self.intrinsic_latency = Some(seconds);
self
}
pub fn lru_test(mut self, keys: u64) -> Self {
self.lru_test = Some(keys);
self
}
pub fn verbose(mut self, enable: bool) -> Self {
self.verbose = enable;
self
}
pub fn eval_file(mut self, path: impl Into<PathBuf>) -> Self {
self.eval_file = Some(path.into());
self
}
pub fn ldb(mut self, enable: bool) -> Self {
self.ldb = enable;
self
}
pub fn ldb_sync_mode(mut self, enable: bool) -> Self {
self.ldb_sync_mode = enable;
self
}
pub fn pipe(mut self, enable: bool) -> Self {
self.pipe = enable;
self
}
pub fn pipe_timeout(mut self, seconds: u32) -> Self {
self.pipe_timeout = Some(seconds);
self
}
pub fn rdb(mut self, path: impl Into<PathBuf>) -> Self {
self.rdb = Some(path.into());
self
}
pub fn functions_rdb(mut self, path: impl Into<PathBuf>) -> Self {
self.functions_rdb = Some(path.into());
self
}
pub fn replica(mut self, enable: bool) -> Self {
self.replica = enable;
self
}
pub async fn cluster_command(&self, command: &str, args: &[&str]) -> Result<String> {
let mut cli_args = self.base_args();
cli_args.push("--cluster".into());
cli_args.push(command.into());
cli_args.extend(args.iter().map(|s| s.to_string()));
let str_args: Vec<&str> = cli_args.iter().map(|s| s.as_str()).collect();
let output = TokioCommand::new(&self.bin)
.args(&str_args)
.output()
.await?;
if output.status.success() {
Ok(String::from_utf8_lossy(&output.stdout).into_owned())
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
Err(Error::Cli {
host: self.host.clone(),
port: self.port,
detail: stderr.into_owned(),
})
}
}
pub async fn run(&self, args: &[&str]) -> Result<String> {
let output = self.raw_output(args).await?;
if output.status.success() {
Ok(String::from_utf8_lossy(&output.stdout).to_string())
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
Err(Error::Cli {
host: self.host.clone(),
port: self.port,
detail: stderr.into_owned(),
})
}
}
pub fn fire_and_forget(&self, args: &[&str]) {
let _ = Command::new(&self.bin)
.args(self.base_args())
.args(args)
.stdout(Stdio::null())
.stderr(Stdio::null())
.status();
}
pub async fn ping(&self) -> bool {
self.run(&["PING"])
.await
.map(|r| r.trim() == "PONG")
.unwrap_or(false)
}
pub fn shutdown(&self) {
self.fire_and_forget(&["SHUTDOWN", "NOSAVE"]);
}
pub async fn wait_for_ready(&self, timeout: std::time::Duration) -> Result<()> {
let start = std::time::Instant::now();
loop {
if self.ping().await {
return Ok(());
}
if start.elapsed() > timeout {
return Err(Error::Timeout {
message: format!(
"{}:{} did not respond within {timeout:?}",
self.host, self.port
),
});
}
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
}
}
pub async fn cluster_create(
&self,
node_addrs: &[String],
replicas_per_master: u16,
) -> Result<()> {
let mut args = self.base_args();
args.push("--cluster".into());
args.push("create".into());
args.extend(node_addrs.iter().cloned());
if replicas_per_master > 0 {
args.push("--cluster-replicas".into());
args.push(replicas_per_master.to_string());
}
args.push("--cluster-yes".into());
let str_args: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
let output = TokioCommand::new(&self.bin)
.args(&str_args)
.output()
.await?;
if output.status.success() {
Ok(())
} else {
Err(Error::ClusterCreate {
stdout: String::from_utf8_lossy(&output.stdout).into_owned(),
stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
})
}
}
fn base_args(&self) -> Vec<String> {
let mut args = Vec::new();
if let Some(ref uri) = self.uri {
args.push("-u".to_string());
args.push(uri.clone());
} else if let Some(ref path) = self.unixsocket {
args.push("-s".to_string());
args.push(path.display().to_string());
} else {
args.push("-h".to_string());
args.push(self.host.clone());
args.push("-p".to_string());
args.push(self.port.to_string());
}
if let Some(ref user) = self.user {
args.push("--user".to_string());
args.push(user.clone());
}
if let Some(ref pw) = self.password {
args.push("-a".to_string());
args.push(pw.clone());
}
if self.askpass {
args.push("--askpass".to_string());
}
if let Some(db) = self.db {
args.push("-n".to_string());
args.push(db.to_string());
}
if let Some(ref name) = self.client_name {
args.push("--name".to_string());
args.push(name.clone());
}
match self.ip_preference {
IpPreference::Default => {}
IpPreference::Ipv4 => args.push("-4".to_string()),
IpPreference::Ipv6 => args.push("-6".to_string()),
}
if let Some(t) = self.timeout {
args.push("-t".to_string());
args.push(t.to_string());
}
if let Some(r) = self.repeat {
args.push("-r".to_string());
args.push(r.to_string());
}
if let Some(i) = self.interval {
args.push("-i".to_string());
args.push(i.to_string());
}
if self.tls {
args.push("--tls".to_string());
}
if let Some(ref sni) = self.sni {
args.push("--sni".to_string());
args.push(sni.clone());
}
if let Some(ref path) = self.cacert {
args.push("--cacert".to_string());
args.push(path.display().to_string());
}
if let Some(ref path) = self.cacertdir {
args.push("--cacertdir".to_string());
args.push(path.display().to_string());
}
if let Some(ref path) = self.cert {
args.push("--cert".to_string());
args.push(path.display().to_string());
}
if let Some(ref path) = self.key {
args.push("--key".to_string());
args.push(path.display().to_string());
}
if self.insecure {
args.push("--insecure".to_string());
}
if let Some(ref ciphers) = self.tls_ciphers {
args.push("--tls-ciphers".to_string());
args.push(ciphers.clone());
}
if let Some(ref suites) = self.tls_ciphersuites {
args.push("--tls-ciphersuites".to_string());
args.push(suites.clone());
}
if let Some(ref proto) = self.resp {
match proto {
RespProtocol::Resp2 => args.push("-2".to_string()),
RespProtocol::Resp3 => args.push("-3".to_string()),
}
}
if self.cluster_mode {
args.push("-c".to_string());
}
match self.output_format {
OutputFormat::Default => {}
OutputFormat::Raw => args.push("--raw".to_string()),
OutputFormat::Csv => args.push("--csv".to_string()),
OutputFormat::Json => args.push("--json".to_string()),
OutputFormat::QuotedJson => args.push("--quoted-json".to_string()),
}
if self.no_auth_warning {
args.push("--no-auth-warning".to_string());
}
if self.stdin_last_arg {
args.push("-x".to_string());
}
if self.stdin_tag_arg {
args.push("-X".to_string());
}
if let Some(ref delim) = self.multi_bulk_delimiter {
args.push("-d".to_string());
args.push(delim.clone());
}
if let Some(ref delim) = self.output_delimiter {
args.push("-D".to_string());
args.push(delim.clone());
}
if self.exit_error_code {
args.push("-e".to_string());
}
if self.no_raw {
args.push("--no-raw".to_string());
}
if self.quoted_input {
args.push("--quoted-input".to_string());
}
if let Some(enable) = self.show_pushes {
args.push("--show-pushes".to_string());
args.push(if enable { "yes" } else { "no" }.to_string());
}
if self.stat {
args.push("--stat".to_string());
}
if self.latency {
args.push("--latency".to_string());
}
if self.latency_history {
args.push("--latency-history".to_string());
}
if self.latency_dist {
args.push("--latency-dist".to_string());
}
if self.bigkeys {
args.push("--bigkeys".to_string());
}
if self.memkeys {
args.push("--memkeys".to_string());
}
if let Some(n) = self.memkeys_samples {
args.push("--memkeys-samples".to_string());
args.push(n.to_string());
}
if self.keystats {
args.push("--keystats".to_string());
}
if let Some(n) = self.keystats_samples {
args.push("--keystats-samples".to_string());
args.push(n.to_string());
}
if self.hotkeys {
args.push("--hotkeys".to_string());
}
if self.scan {
args.push("--scan".to_string());
}
if let Some(ref pat) = self.pattern {
args.push("--pattern".to_string());
args.push(pat.clone());
}
if let Some(n) = self.count {
args.push("--count".to_string());
args.push(n.to_string());
}
if let Some(ref pat) = self.quoted_pattern {
args.push("--quoted-pattern".to_string());
args.push(pat.clone());
}
if let Some(n) = self.cursor {
args.push("--cursor".to_string());
args.push(n.to_string());
}
if let Some(n) = self.top {
args.push("--top".to_string());
args.push(n.to_string());
}
if let Some(seconds) = self.intrinsic_latency {
args.push("--intrinsic-latency".to_string());
args.push(seconds.to_string());
}
if let Some(keys) = self.lru_test {
args.push("--lru-test".to_string());
args.push(keys.to_string());
}
if self.verbose {
args.push("--verbose".to_string());
}
if let Some(ref path) = self.eval_file {
args.push("--eval".to_string());
args.push(path.display().to_string());
}
if self.ldb {
args.push("--ldb".to_string());
}
if self.ldb_sync_mode {
args.push("--ldb-sync-mode".to_string());
}
if self.pipe {
args.push("--pipe".to_string());
}
if let Some(n) = self.pipe_timeout {
args.push("--pipe-timeout".to_string());
args.push(n.to_string());
}
if let Some(ref path) = self.rdb {
args.push("--rdb".to_string());
args.push(path.display().to_string());
}
if let Some(ref path) = self.functions_rdb {
args.push("--functions-rdb".to_string());
args.push(path.display().to_string());
}
if self.replica {
args.push("--replica".to_string());
}
args
}
async fn raw_output(&self, args: &[&str]) -> std::io::Result<Output> {
TokioCommand::new(&self.bin)
.args(self.base_args())
.args(args)
.output()
.await
}
}
impl Default for RedisCli {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_config() {
let cli = RedisCli::new();
assert_eq!(cli.host, "127.0.0.1");
assert_eq!(cli.port, 6379);
}
#[test]
fn builder_chain() {
let cli = RedisCli::new()
.host("10.0.0.1")
.port(6380)
.password("secret")
.bin("/usr/local/bin/redis-cli");
assert_eq!(cli.host, "10.0.0.1");
assert_eq!(cli.port, 6380);
assert_eq!(cli.password.as_deref(), Some("secret"));
assert_eq!(cli.bin, "/usr/local/bin/redis-cli");
}
}