1use std::path::PathBuf;
4use std::process::{Command, Output, Stdio};
5
6use tokio::process::Command as TokioCommand;
7
8use crate::error::{Error, Result};
9
10#[derive(Debug, Clone, Copy)]
12pub enum RespProtocol {
13 Resp2,
15 Resp3,
17}
18
19#[derive(Debug, Clone, Copy)]
21pub enum OutputFormat {
22 Default,
24 Raw,
26 Csv,
28 Json,
30 QuotedJson,
32}
33
34#[derive(Debug, Clone)]
36pub struct RedisCli {
37 bin: String,
38 host: String,
39 port: u16,
40 password: Option<String>,
41 user: Option<String>,
42 db: Option<u32>,
43 unixsocket: Option<PathBuf>,
44 tls: bool,
45 sni: Option<String>,
46 cacert: Option<PathBuf>,
47 cert: Option<PathBuf>,
48 key: Option<PathBuf>,
49 resp: Option<RespProtocol>,
50 cluster_mode: bool,
51 output_format: OutputFormat,
52 no_auth_warning: bool,
53}
54
55impl RedisCli {
56 pub fn new() -> Self {
58 Self {
59 bin: "redis-cli".into(),
60 host: "127.0.0.1".into(),
61 port: 6379,
62 password: None,
63 user: None,
64 db: None,
65 unixsocket: None,
66 tls: false,
67 sni: None,
68 cacert: None,
69 cert: None,
70 key: None,
71 resp: None,
72 cluster_mode: false,
73 output_format: OutputFormat::Default,
74 no_auth_warning: false,
75 }
76 }
77
78 pub fn bin(mut self, bin: impl Into<String>) -> Self {
80 self.bin = bin.into();
81 self
82 }
83
84 pub fn host(mut self, host: impl Into<String>) -> Self {
86 self.host = host.into();
87 self
88 }
89
90 pub fn port(mut self, port: u16) -> Self {
92 self.port = port;
93 self
94 }
95
96 pub fn password(mut self, password: impl Into<String>) -> Self {
98 self.password = Some(password.into());
99 self
100 }
101
102 pub fn user(mut self, user: impl Into<String>) -> Self {
104 self.user = Some(user.into());
105 self
106 }
107
108 pub fn db(mut self, db: u32) -> Self {
110 self.db = Some(db);
111 self
112 }
113
114 pub fn unixsocket(mut self, path: impl Into<PathBuf>) -> Self {
116 self.unixsocket = Some(path.into());
117 self
118 }
119
120 pub fn tls(mut self, enable: bool) -> Self {
122 self.tls = enable;
123 self
124 }
125
126 pub fn sni(mut self, hostname: impl Into<String>) -> Self {
128 self.sni = Some(hostname.into());
129 self
130 }
131
132 pub fn cacert(mut self, path: impl Into<PathBuf>) -> Self {
134 self.cacert = Some(path.into());
135 self
136 }
137
138 pub fn cert(mut self, path: impl Into<PathBuf>) -> Self {
140 self.cert = Some(path.into());
141 self
142 }
143
144 pub fn key(mut self, path: impl Into<PathBuf>) -> Self {
146 self.key = Some(path.into());
147 self
148 }
149
150 pub fn resp(mut self, protocol: RespProtocol) -> Self {
152 self.resp = Some(protocol);
153 self
154 }
155
156 pub fn cluster_mode(mut self, enable: bool) -> Self {
158 self.cluster_mode = enable;
159 self
160 }
161
162 pub fn output_format(mut self, format: OutputFormat) -> Self {
164 self.output_format = format;
165 self
166 }
167
168 pub fn no_auth_warning(mut self, suppress: bool) -> Self {
170 self.no_auth_warning = suppress;
171 self
172 }
173
174 pub async fn run(&self, args: &[&str]) -> Result<String> {
176 let output = self.raw_output(args).await?;
177 if output.status.success() {
178 Ok(String::from_utf8_lossy(&output.stdout).to_string())
179 } else {
180 let stderr = String::from_utf8_lossy(&output.stderr);
181 Err(Error::Cli {
182 host: self.host.clone(),
183 port: self.port,
184 detail: stderr.into_owned(),
185 })
186 }
187 }
188
189 pub fn fire_and_forget(&self, args: &[&str]) {
191 let _ = Command::new(&self.bin)
192 .args(self.base_args())
193 .args(args)
194 .stdout(Stdio::null())
195 .stderr(Stdio::null())
196 .status();
197 }
198
199 pub async fn ping(&self) -> bool {
201 self.run(&["PING"])
202 .await
203 .map(|r| r.trim() == "PONG")
204 .unwrap_or(false)
205 }
206
207 pub fn shutdown(&self) {
209 self.fire_and_forget(&["SHUTDOWN", "NOSAVE"]);
210 }
211
212 pub async fn wait_for_ready(&self, timeout: std::time::Duration) -> Result<()> {
214 let start = std::time::Instant::now();
215 loop {
216 if self.ping().await {
217 return Ok(());
218 }
219 if start.elapsed() > timeout {
220 return Err(Error::Timeout {
221 message: format!(
222 "{}:{} did not respond within {timeout:?}",
223 self.host, self.port
224 ),
225 });
226 }
227 tokio::time::sleep(std::time::Duration::from_millis(250)).await;
228 }
229 }
230
231 pub async fn cluster_create(
233 &self,
234 node_addrs: &[String],
235 replicas_per_master: u16,
236 ) -> Result<()> {
237 let mut args: Vec<String> = vec!["--cluster".into(), "create".into()];
238 args.extend(node_addrs.iter().cloned());
239 if replicas_per_master > 0 {
240 args.push("--cluster-replicas".into());
241 args.push(replicas_per_master.to_string());
242 }
243 args.push("--cluster-yes".into());
244
245 let str_args: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
246 let output = TokioCommand::new(&self.bin)
247 .args(&str_args)
248 .output()
249 .await?;
250
251 if output.status.success() {
252 Ok(())
253 } else {
254 Err(Error::ClusterCreate {
255 stdout: String::from_utf8_lossy(&output.stdout).into_owned(),
256 stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
257 })
258 }
259 }
260
261 fn base_args(&self) -> Vec<String> {
262 let mut args = Vec::new();
263
264 if let Some(ref path) = self.unixsocket {
265 args.push("-s".to_string());
266 args.push(path.display().to_string());
267 } else {
268 args.push("-h".to_string());
269 args.push(self.host.clone());
270 args.push("-p".to_string());
271 args.push(self.port.to_string());
272 }
273
274 if let Some(ref user) = self.user {
275 args.push("--user".to_string());
276 args.push(user.clone());
277 }
278 if let Some(ref pw) = self.password {
279 args.push("-a".to_string());
280 args.push(pw.clone());
281 }
282 if let Some(db) = self.db {
283 args.push("-n".to_string());
284 args.push(db.to_string());
285 }
286
287 if self.tls {
289 args.push("--tls".to_string());
290 }
291 if let Some(ref sni) = self.sni {
292 args.push("--sni".to_string());
293 args.push(sni.clone());
294 }
295 if let Some(ref path) = self.cacert {
296 args.push("--cacert".to_string());
297 args.push(path.display().to_string());
298 }
299 if let Some(ref path) = self.cert {
300 args.push("--cert".to_string());
301 args.push(path.display().to_string());
302 }
303 if let Some(ref path) = self.key {
304 args.push("--key".to_string());
305 args.push(path.display().to_string());
306 }
307
308 if let Some(ref proto) = self.resp {
310 match proto {
311 RespProtocol::Resp2 => args.push("-2".to_string()),
312 RespProtocol::Resp3 => args.push("-3".to_string()),
313 }
314 }
315
316 if self.cluster_mode {
318 args.push("-c".to_string());
319 }
320
321 match self.output_format {
323 OutputFormat::Default => {}
324 OutputFormat::Raw => args.push("--raw".to_string()),
325 OutputFormat::Csv => args.push("--csv".to_string()),
326 OutputFormat::Json => args.push("--json".to_string()),
327 OutputFormat::QuotedJson => args.push("--quoted-json".to_string()),
328 }
329
330 if self.no_auth_warning {
331 args.push("--no-auth-warning".to_string());
332 }
333
334 args
335 }
336
337 async fn raw_output(&self, args: &[&str]) -> std::io::Result<Output> {
338 TokioCommand::new(&self.bin)
339 .args(self.base_args())
340 .args(args)
341 .output()
342 .await
343 }
344}
345
346impl Default for RedisCli {
347 fn default() -> Self {
348 Self::new()
349 }
350}
351
352#[cfg(test)]
353mod tests {
354 use super::*;
355
356 #[test]
357 fn default_config() {
358 let cli = RedisCli::new();
359 assert_eq!(cli.host, "127.0.0.1");
360 assert_eq!(cli.port, 6379);
361 }
362
363 #[test]
364 fn builder_chain() {
365 let cli = RedisCli::new()
366 .host("10.0.0.1")
367 .port(6380)
368 .password("secret")
369 .bin("/usr/local/bin/redis-cli");
370 assert_eq!(cli.host, "10.0.0.1");
371 assert_eq!(cli.port, 6380);
372 assert_eq!(cli.password.as_deref(), Some("secret"));
373 assert_eq!(cli.bin, "/usr/local/bin/redis-cli");
374 }
375}