1use std::path::PathBuf;
4use std::process::{Command, Output, Stdio};
5
6use tokio::process::Command as TokioCommand;
7
8use crate::error::{Error, Result};
9
10#[derive(Debug, Clone, Copy)]
12pub enum RespProtocol {
13 Resp2,
15 Resp3,
17}
18
19#[derive(Debug, Clone, Copy, Default)]
21pub enum IpPreference {
22 #[default]
24 Default,
25 Ipv4,
27 Ipv6,
29}
30
31#[derive(Debug, Clone, Copy)]
33pub enum OutputFormat {
34 Default,
36 Raw,
38 Csv,
40 Json,
42 QuotedJson,
44}
45
46#[derive(Debug, Clone)]
48pub struct RedisCli {
49 bin: String,
50 host: String,
51 port: u16,
52 password: Option<String>,
53 user: Option<String>,
54 db: Option<u32>,
55 unixsocket: Option<PathBuf>,
56 tls: bool,
57 sni: Option<String>,
58 cacert: Option<PathBuf>,
59 cacertdir: Option<PathBuf>,
60 cert: Option<PathBuf>,
61 key: Option<PathBuf>,
62 insecure: bool,
63 tls_ciphers: Option<String>,
64 tls_ciphersuites: Option<String>,
65 resp: Option<RespProtocol>,
66 cluster_mode: bool,
67 output_format: OutputFormat,
68 no_auth_warning: bool,
69 uri: Option<String>,
70 timeout: Option<f64>,
71 askpass: bool,
72 client_name: Option<String>,
73 ip_preference: IpPreference,
74 repeat: Option<u32>,
75 interval: Option<f64>,
76
77 stdin_last_arg: bool,
79 stdin_tag_arg: bool,
80 multi_bulk_delimiter: Option<String>,
81 output_delimiter: Option<String>,
82 exit_error_code: bool,
83 no_raw: bool,
84 quoted_input: bool,
85 show_pushes: Option<bool>,
86
87 stat: bool,
89 latency: bool,
90 latency_history: bool,
91 latency_dist: bool,
92 bigkeys: bool,
93 memkeys: bool,
94 memkeys_samples: Option<u32>,
95 keystats: bool,
96 keystats_samples: Option<u32>,
97 hotkeys: bool,
98 scan: bool,
99 pattern: Option<String>,
100 count: Option<u32>,
101 quoted_pattern: Option<String>,
102 cursor: Option<u64>,
103 top: Option<u32>,
104 intrinsic_latency: Option<u32>,
105 lru_test: Option<u64>,
106 verbose: bool,
107
108 eval_file: Option<PathBuf>,
110 ldb: bool,
111 ldb_sync_mode: bool,
112
113 pipe: bool,
115 pipe_timeout: Option<u32>,
116 rdb: Option<PathBuf>,
117 functions_rdb: Option<PathBuf>,
118
119 replica: bool,
121}
122
123impl RedisCli {
124 pub fn new() -> Self {
126 Self {
127 bin: "redis-cli".into(),
128 host: "127.0.0.1".into(),
129 port: 6379,
130 password: None,
131 user: None,
132 db: None,
133 unixsocket: None,
134 tls: false,
135 sni: None,
136 cacert: None,
137 cacertdir: None,
138 cert: None,
139 key: None,
140 insecure: false,
141 tls_ciphers: None,
142 tls_ciphersuites: None,
143 resp: None,
144 cluster_mode: false,
145 output_format: OutputFormat::Default,
146 no_auth_warning: false,
147 uri: None,
148 timeout: None,
149 askpass: false,
150 client_name: None,
151 ip_preference: IpPreference::Default,
152 repeat: None,
153 interval: None,
154 stdin_last_arg: false,
155 stdin_tag_arg: false,
156 multi_bulk_delimiter: None,
157 output_delimiter: None,
158 exit_error_code: false,
159 no_raw: false,
160 quoted_input: false,
161 show_pushes: None,
162 stat: false,
163 latency: false,
164 latency_history: false,
165 latency_dist: false,
166 bigkeys: false,
167 memkeys: false,
168 memkeys_samples: None,
169 keystats: false,
170 keystats_samples: None,
171 hotkeys: false,
172 scan: false,
173 pattern: None,
174 count: None,
175 quoted_pattern: None,
176 cursor: None,
177 top: None,
178 intrinsic_latency: None,
179 lru_test: None,
180 verbose: false,
181 eval_file: None,
182 ldb: false,
183 ldb_sync_mode: false,
184 pipe: false,
185 pipe_timeout: None,
186 rdb: None,
187 functions_rdb: None,
188 replica: false,
189 }
190 }
191
192 pub fn bin(mut self, bin: impl Into<String>) -> Self {
194 self.bin = bin.into();
195 self
196 }
197
198 pub fn host(mut self, host: impl Into<String>) -> Self {
200 self.host = host.into();
201 self
202 }
203
204 pub fn port(mut self, port: u16) -> Self {
206 self.port = port;
207 self
208 }
209
210 pub fn password(mut self, password: impl Into<String>) -> Self {
212 self.password = Some(password.into());
213 self
214 }
215
216 pub fn user(mut self, user: impl Into<String>) -> Self {
218 self.user = Some(user.into());
219 self
220 }
221
222 pub fn db(mut self, db: u32) -> Self {
224 self.db = Some(db);
225 self
226 }
227
228 pub fn unixsocket(mut self, path: impl Into<PathBuf>) -> Self {
230 self.unixsocket = Some(path.into());
231 self
232 }
233
234 pub fn tls(mut self, enable: bool) -> Self {
236 self.tls = enable;
237 self
238 }
239
240 pub fn sni(mut self, hostname: impl Into<String>) -> Self {
242 self.sni = Some(hostname.into());
243 self
244 }
245
246 pub fn cacert(mut self, path: impl Into<PathBuf>) -> Self {
248 self.cacert = Some(path.into());
249 self
250 }
251
252 pub fn cert(mut self, path: impl Into<PathBuf>) -> Self {
254 self.cert = Some(path.into());
255 self
256 }
257
258 pub fn key(mut self, path: impl Into<PathBuf>) -> Self {
260 self.key = Some(path.into());
261 self
262 }
263
264 pub fn cacertdir(mut self, path: impl Into<PathBuf>) -> Self {
266 self.cacertdir = Some(path.into());
267 self
268 }
269
270 pub fn insecure(mut self, enable: bool) -> Self {
272 self.insecure = enable;
273 self
274 }
275
276 pub fn tls_ciphers(mut self, ciphers: impl Into<String>) -> Self {
278 self.tls_ciphers = Some(ciphers.into());
279 self
280 }
281
282 pub fn tls_ciphersuites(mut self, ciphersuites: impl Into<String>) -> Self {
284 self.tls_ciphersuites = Some(ciphersuites.into());
285 self
286 }
287
288 pub fn uri(mut self, uri: impl Into<String>) -> Self {
290 self.uri = Some(uri.into());
291 self
292 }
293
294 pub fn timeout(mut self, seconds: f64) -> Self {
296 self.timeout = Some(seconds);
297 self
298 }
299
300 pub fn askpass(mut self, enable: bool) -> Self {
302 self.askpass = enable;
303 self
304 }
305
306 pub fn client_name(mut self, name: impl Into<String>) -> Self {
308 self.client_name = Some(name.into());
309 self
310 }
311
312 pub fn ip_preference(mut self, preference: IpPreference) -> Self {
314 self.ip_preference = preference;
315 self
316 }
317
318 pub fn repeat(mut self, count: u32) -> Self {
320 self.repeat = Some(count);
321 self
322 }
323
324 pub fn interval(mut self, seconds: f64) -> Self {
326 self.interval = Some(seconds);
327 self
328 }
329
330 pub fn resp(mut self, protocol: RespProtocol) -> Self {
332 self.resp = Some(protocol);
333 self
334 }
335
336 pub fn cluster_mode(mut self, enable: bool) -> Self {
338 self.cluster_mode = enable;
339 self
340 }
341
342 pub fn output_format(mut self, format: OutputFormat) -> Self {
344 self.output_format = format;
345 self
346 }
347
348 pub fn no_auth_warning(mut self, suppress: bool) -> Self {
350 self.no_auth_warning = suppress;
351 self
352 }
353
354 pub fn stdin_last_arg(mut self, enable: bool) -> Self {
358 self.stdin_last_arg = enable;
359 self
360 }
361
362 pub fn stdin_tag_arg(mut self, enable: bool) -> Self {
364 self.stdin_tag_arg = enable;
365 self
366 }
367
368 pub fn multi_bulk_delimiter(mut self, delim: impl Into<String>) -> Self {
370 self.multi_bulk_delimiter = Some(delim.into());
371 self
372 }
373
374 pub fn output_delimiter(mut self, delim: impl Into<String>) -> Self {
376 self.output_delimiter = Some(delim.into());
377 self
378 }
379
380 pub fn exit_error_code(mut self, enable: bool) -> Self {
382 self.exit_error_code = enable;
383 self
384 }
385
386 pub fn no_raw(mut self, enable: bool) -> Self {
388 self.no_raw = enable;
389 self
390 }
391
392 pub fn quoted_input(mut self, enable: bool) -> Self {
394 self.quoted_input = enable;
395 self
396 }
397
398 pub fn show_pushes(mut self, enable: bool) -> Self {
400 self.show_pushes = Some(enable);
401 self
402 }
403
404 pub fn stat(mut self, enable: bool) -> Self {
408 self.stat = enable;
409 self
410 }
411
412 pub fn latency(mut self, enable: bool) -> Self {
414 self.latency = enable;
415 self
416 }
417
418 pub fn latency_history(mut self, enable: bool) -> Self {
420 self.latency_history = enable;
421 self
422 }
423
424 pub fn latency_dist(mut self, enable: bool) -> Self {
426 self.latency_dist = enable;
427 self
428 }
429
430 pub fn bigkeys(mut self, enable: bool) -> Self {
432 self.bigkeys = enable;
433 self
434 }
435
436 pub fn memkeys(mut self, enable: bool) -> Self {
438 self.memkeys = enable;
439 self
440 }
441
442 pub fn memkeys_samples(mut self, n: u32) -> Self {
444 self.memkeys_samples = Some(n);
445 self
446 }
447
448 pub fn keystats(mut self, enable: bool) -> Self {
450 self.keystats = enable;
451 self
452 }
453
454 pub fn keystats_samples(mut self, n: u32) -> Self {
456 self.keystats_samples = Some(n);
457 self
458 }
459
460 pub fn hotkeys(mut self, enable: bool) -> Self {
462 self.hotkeys = enable;
463 self
464 }
465
466 pub fn scan(mut self, enable: bool) -> Self {
468 self.scan = enable;
469 self
470 }
471
472 pub fn pattern(mut self, pat: impl Into<String>) -> Self {
474 self.pattern = Some(pat.into());
475 self
476 }
477
478 pub fn count(mut self, n: u32) -> Self {
480 self.count = Some(n);
481 self
482 }
483
484 pub fn quoted_pattern(mut self, pat: impl Into<String>) -> Self {
486 self.quoted_pattern = Some(pat.into());
487 self
488 }
489
490 pub fn cursor(mut self, n: u64) -> Self {
492 self.cursor = Some(n);
493 self
494 }
495
496 pub fn top(mut self, n: u32) -> Self {
498 self.top = Some(n);
499 self
500 }
501
502 pub fn intrinsic_latency(mut self, seconds: u32) -> Self {
504 self.intrinsic_latency = Some(seconds);
505 self
506 }
507
508 pub fn lru_test(mut self, keys: u64) -> Self {
510 self.lru_test = Some(keys);
511 self
512 }
513
514 pub fn verbose(mut self, enable: bool) -> Self {
516 self.verbose = enable;
517 self
518 }
519
520 pub fn eval_file(mut self, path: impl Into<PathBuf>) -> Self {
524 self.eval_file = Some(path.into());
525 self
526 }
527
528 pub fn ldb(mut self, enable: bool) -> Self {
530 self.ldb = enable;
531 self
532 }
533
534 pub fn ldb_sync_mode(mut self, enable: bool) -> Self {
536 self.ldb_sync_mode = enable;
537 self
538 }
539
540 pub fn pipe(mut self, enable: bool) -> Self {
544 self.pipe = enable;
545 self
546 }
547
548 pub fn pipe_timeout(mut self, seconds: u32) -> Self {
550 self.pipe_timeout = Some(seconds);
551 self
552 }
553
554 pub fn rdb(mut self, path: impl Into<PathBuf>) -> Self {
556 self.rdb = Some(path.into());
557 self
558 }
559
560 pub fn functions_rdb(mut self, path: impl Into<PathBuf>) -> Self {
562 self.functions_rdb = Some(path.into());
563 self
564 }
565
566 pub fn replica(mut self, enable: bool) -> Self {
570 self.replica = enable;
571 self
572 }
573
574 pub async fn cluster_command(&self, command: &str, args: &[&str]) -> Result<String> {
591 let mut cli_args = self.base_args();
592 cli_args.push("--cluster".into());
593 cli_args.push(command.into());
594 cli_args.extend(args.iter().map(|s| s.to_string()));
595
596 let str_args: Vec<&str> = cli_args.iter().map(|s| s.as_str()).collect();
597 let output = TokioCommand::new(&self.bin)
598 .args(&str_args)
599 .output()
600 .await?;
601
602 if output.status.success() {
603 Ok(String::from_utf8_lossy(&output.stdout).into_owned())
604 } else {
605 let stderr = String::from_utf8_lossy(&output.stderr);
606 Err(Error::Cli {
607 host: self.host.clone(),
608 port: self.port,
609 detail: stderr.into_owned(),
610 })
611 }
612 }
613
614 pub async fn run(&self, args: &[&str]) -> Result<String> {
616 let output = self.raw_output(args).await?;
617 if output.status.success() {
618 Ok(String::from_utf8_lossy(&output.stdout).to_string())
619 } else {
620 let stderr = String::from_utf8_lossy(&output.stderr);
621 Err(Error::Cli {
622 host: self.host.clone(),
623 port: self.port,
624 detail: stderr.into_owned(),
625 })
626 }
627 }
628
629 pub fn fire_and_forget(&self, args: &[&str]) {
631 let _ = Command::new(&self.bin)
632 .args(self.base_args())
633 .args(args)
634 .stdout(Stdio::null())
635 .stderr(Stdio::null())
636 .status();
637 }
638
639 pub async fn ping(&self) -> bool {
641 self.run(&["PING"])
642 .await
643 .map(|r| r.trim() == "PONG")
644 .unwrap_or(false)
645 }
646
647 pub fn shutdown(&self) {
649 self.fire_and_forget(&["SHUTDOWN", "NOSAVE"]);
650 }
651
652 pub async fn wait_for_ready(&self, timeout: std::time::Duration) -> Result<()> {
654 let start = std::time::Instant::now();
655 loop {
656 if self.ping().await {
657 return Ok(());
658 }
659 if start.elapsed() > timeout {
660 return Err(Error::Timeout {
661 message: format!(
662 "{}:{} did not respond within {timeout:?}",
663 self.host, self.port
664 ),
665 });
666 }
667 tokio::time::sleep(std::time::Duration::from_millis(250)).await;
668 }
669 }
670
671 pub async fn cluster_create(
673 &self,
674 node_addrs: &[String],
675 replicas_per_master: u16,
676 ) -> Result<()> {
677 let mut args = self.base_args();
678 args.push("--cluster".into());
679 args.push("create".into());
680 args.extend(node_addrs.iter().cloned());
681 if replicas_per_master > 0 {
682 args.push("--cluster-replicas".into());
683 args.push(replicas_per_master.to_string());
684 }
685 args.push("--cluster-yes".into());
686
687 let str_args: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
688 let output = TokioCommand::new(&self.bin)
689 .args(&str_args)
690 .output()
691 .await?;
692
693 if output.status.success() {
694 Ok(())
695 } else {
696 Err(Error::ClusterCreate {
697 stdout: String::from_utf8_lossy(&output.stdout).into_owned(),
698 stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
699 })
700 }
701 }
702
703 fn base_args(&self) -> Vec<String> {
704 let mut args = Vec::new();
705
706 if let Some(ref uri) = self.uri {
708 args.push("-u".to_string());
709 args.push(uri.clone());
710 } else if let Some(ref path) = self.unixsocket {
711 args.push("-s".to_string());
712 args.push(path.display().to_string());
713 } else {
714 args.push("-h".to_string());
715 args.push(self.host.clone());
716 args.push("-p".to_string());
717 args.push(self.port.to_string());
718 }
719
720 if let Some(ref user) = self.user {
722 args.push("--user".to_string());
723 args.push(user.clone());
724 }
725 if let Some(ref pw) = self.password {
726 args.push("-a".to_string());
727 args.push(pw.clone());
728 }
729 if self.askpass {
730 args.push("--askpass".to_string());
731 }
732 if let Some(db) = self.db {
733 args.push("-n".to_string());
734 args.push(db.to_string());
735 }
736
737 if let Some(ref name) = self.client_name {
739 args.push("--name".to_string());
740 args.push(name.clone());
741 }
742
743 match self.ip_preference {
745 IpPreference::Default => {}
746 IpPreference::Ipv4 => args.push("-4".to_string()),
747 IpPreference::Ipv6 => args.push("-6".to_string()),
748 }
749
750 if let Some(t) = self.timeout {
752 args.push("-t".to_string());
753 args.push(t.to_string());
754 }
755
756 if let Some(r) = self.repeat {
758 args.push("-r".to_string());
759 args.push(r.to_string());
760 }
761 if let Some(i) = self.interval {
762 args.push("-i".to_string());
763 args.push(i.to_string());
764 }
765
766 if self.tls {
768 args.push("--tls".to_string());
769 }
770 if let Some(ref sni) = self.sni {
771 args.push("--sni".to_string());
772 args.push(sni.clone());
773 }
774 if let Some(ref path) = self.cacert {
775 args.push("--cacert".to_string());
776 args.push(path.display().to_string());
777 }
778 if let Some(ref path) = self.cacertdir {
779 args.push("--cacertdir".to_string());
780 args.push(path.display().to_string());
781 }
782 if let Some(ref path) = self.cert {
783 args.push("--cert".to_string());
784 args.push(path.display().to_string());
785 }
786 if let Some(ref path) = self.key {
787 args.push("--key".to_string());
788 args.push(path.display().to_string());
789 }
790 if self.insecure {
791 args.push("--insecure".to_string());
792 }
793 if let Some(ref ciphers) = self.tls_ciphers {
794 args.push("--tls-ciphers".to_string());
795 args.push(ciphers.clone());
796 }
797 if let Some(ref suites) = self.tls_ciphersuites {
798 args.push("--tls-ciphersuites".to_string());
799 args.push(suites.clone());
800 }
801
802 if let Some(ref proto) = self.resp {
804 match proto {
805 RespProtocol::Resp2 => args.push("-2".to_string()),
806 RespProtocol::Resp3 => args.push("-3".to_string()),
807 }
808 }
809
810 if self.cluster_mode {
812 args.push("-c".to_string());
813 }
814
815 match self.output_format {
817 OutputFormat::Default => {}
818 OutputFormat::Raw => args.push("--raw".to_string()),
819 OutputFormat::Csv => args.push("--csv".to_string()),
820 OutputFormat::Json => args.push("--json".to_string()),
821 OutputFormat::QuotedJson => args.push("--quoted-json".to_string()),
822 }
823
824 if self.no_auth_warning {
825 args.push("--no-auth-warning".to_string());
826 }
827
828 if self.stdin_last_arg {
830 args.push("-x".to_string());
831 }
832 if self.stdin_tag_arg {
833 args.push("-X".to_string());
834 }
835 if let Some(ref delim) = self.multi_bulk_delimiter {
836 args.push("-d".to_string());
837 args.push(delim.clone());
838 }
839 if let Some(ref delim) = self.output_delimiter {
840 args.push("-D".to_string());
841 args.push(delim.clone());
842 }
843 if self.exit_error_code {
844 args.push("-e".to_string());
845 }
846 if self.no_raw {
847 args.push("--no-raw".to_string());
848 }
849 if self.quoted_input {
850 args.push("--quoted-input".to_string());
851 }
852 if let Some(enable) = self.show_pushes {
853 args.push("--show-pushes".to_string());
854 args.push(if enable { "yes" } else { "no" }.to_string());
855 }
856
857 if self.stat {
859 args.push("--stat".to_string());
860 }
861 if self.latency {
862 args.push("--latency".to_string());
863 }
864 if self.latency_history {
865 args.push("--latency-history".to_string());
866 }
867 if self.latency_dist {
868 args.push("--latency-dist".to_string());
869 }
870 if self.bigkeys {
871 args.push("--bigkeys".to_string());
872 }
873 if self.memkeys {
874 args.push("--memkeys".to_string());
875 }
876 if let Some(n) = self.memkeys_samples {
877 args.push("--memkeys-samples".to_string());
878 args.push(n.to_string());
879 }
880 if self.keystats {
881 args.push("--keystats".to_string());
882 }
883 if let Some(n) = self.keystats_samples {
884 args.push("--keystats-samples".to_string());
885 args.push(n.to_string());
886 }
887 if self.hotkeys {
888 args.push("--hotkeys".to_string());
889 }
890 if self.scan {
891 args.push("--scan".to_string());
892 }
893 if let Some(ref pat) = self.pattern {
894 args.push("--pattern".to_string());
895 args.push(pat.clone());
896 }
897 if let Some(n) = self.count {
898 args.push("--count".to_string());
899 args.push(n.to_string());
900 }
901 if let Some(ref pat) = self.quoted_pattern {
902 args.push("--quoted-pattern".to_string());
903 args.push(pat.clone());
904 }
905 if let Some(n) = self.cursor {
906 args.push("--cursor".to_string());
907 args.push(n.to_string());
908 }
909 if let Some(n) = self.top {
910 args.push("--top".to_string());
911 args.push(n.to_string());
912 }
913 if let Some(seconds) = self.intrinsic_latency {
914 args.push("--intrinsic-latency".to_string());
915 args.push(seconds.to_string());
916 }
917 if let Some(keys) = self.lru_test {
918 args.push("--lru-test".to_string());
919 args.push(keys.to_string());
920 }
921 if self.verbose {
922 args.push("--verbose".to_string());
923 }
924
925 if let Some(ref path) = self.eval_file {
927 args.push("--eval".to_string());
928 args.push(path.display().to_string());
929 }
930 if self.ldb {
931 args.push("--ldb".to_string());
932 }
933 if self.ldb_sync_mode {
934 args.push("--ldb-sync-mode".to_string());
935 }
936
937 if self.pipe {
939 args.push("--pipe".to_string());
940 }
941 if let Some(n) = self.pipe_timeout {
942 args.push("--pipe-timeout".to_string());
943 args.push(n.to_string());
944 }
945 if let Some(ref path) = self.rdb {
946 args.push("--rdb".to_string());
947 args.push(path.display().to_string());
948 }
949 if let Some(ref path) = self.functions_rdb {
950 args.push("--functions-rdb".to_string());
951 args.push(path.display().to_string());
952 }
953
954 if self.replica {
956 args.push("--replica".to_string());
957 }
958
959 args
960 }
961
962 async fn raw_output(&self, args: &[&str]) -> std::io::Result<Output> {
963 TokioCommand::new(&self.bin)
964 .args(self.base_args())
965 .args(args)
966 .output()
967 .await
968 }
969}
970
971impl Default for RedisCli {
972 fn default() -> Self {
973 Self::new()
974 }
975}
976
977#[cfg(test)]
978mod tests {
979 use super::*;
980
981 #[test]
982 fn default_config() {
983 let cli = RedisCli::new();
984 assert_eq!(cli.host, "127.0.0.1");
985 assert_eq!(cli.port, 6379);
986 }
987
988 #[test]
989 fn builder_chain() {
990 let cli = RedisCli::new()
991 .host("10.0.0.1")
992 .port(6380)
993 .password("secret")
994 .bin("/usr/local/bin/redis-cli");
995 assert_eq!(cli.host, "10.0.0.1");
996 assert_eq!(cli.port, 6380);
997 assert_eq!(cli.password.as_deref(), Some("secret"));
998 assert_eq!(cli.bin, "/usr/local/bin/redis-cli");
999 }
1000}