1use std::collections::HashMap;
4use std::path::PathBuf;
5use std::time::Duration;
6
7use crate::cli::RedisCli;
8use crate::error::{Error, Result};
9use crate::server::{RedisServer, RedisServerHandle, SavePolicy};
10
11pub struct NodeContext {
17 pub server: RedisServer,
22 pub index: usize,
27 pub port: u16,
29 pub total_nodes: u16,
31 pub masters: u16,
33 pub replicas_per_master: u16,
35}
36
37impl NodeContext {
38 pub fn is_master(&self) -> bool {
40 self.index < self.masters as usize
41 }
42
43 pub fn is_replica(&self) -> bool {
45 !self.is_master()
46 }
47}
48
49pub struct RedisClusterBuilder {
70 masters: u16,
71 replicas_per_master: u16,
72 base_port: u16,
73 bind: String,
74 password: Option<String>,
75 logfile: Option<String>,
76 save: Option<SavePolicy>,
77 appendonly: Option<bool>,
78 cluster_node_timeout: Option<u64>,
79 cluster_require_full_coverage: Option<bool>,
80 cluster_allow_reads_when_down: Option<bool>,
81 cluster_allow_pubsubshard_when_down: Option<bool>,
82 cluster_allow_replica_migration: Option<bool>,
83 cluster_migration_barrier: Option<u32>,
84 cluster_announce_hostname: Option<String>,
85 cluster_announce_human_nodename: Option<String>,
86 cluster_preferred_endpoint_type: Option<String>,
87 cluster_replica_no_failover: Option<bool>,
88 cluster_replica_validity_factor: Option<u32>,
89 cluster_announce_ip: Option<String>,
90 cluster_announce_port: Option<u16>,
91 cluster_announce_bus_port: Option<u16>,
92 cluster_announce_tls_port: Option<u16>,
93 cluster_port: Option<u16>,
94 cluster_link_sendbuf_limit: Option<u64>,
95 cluster_compatibility_sample_ratio: Option<u32>,
96 cluster_slot_migration_handoff_max_lag_bytes: Option<u64>,
97 cluster_slot_migration_write_pause_timeout: Option<u64>,
98 cluster_slot_stats_enabled: Option<bool>,
99 min_replicas_to_write: Option<u32>,
100 min_replicas_max_lag: Option<u32>,
101 repl_diskless_sync: Option<bool>,
102 repl_diskless_sync_delay: Option<u32>,
103 repl_ping_replica_period: Option<u32>,
104 repl_timeout: Option<u32>,
105 tls_port: Option<u16>,
106 tls_cert_file: Option<PathBuf>,
107 tls_key_file: Option<PathBuf>,
108 tls_ca_cert_file: Option<PathBuf>,
109 tls_ca_cert_dir: Option<PathBuf>,
110 tls_auth_clients: Option<bool>,
111 tls_replication: Option<bool>,
112 tls_cluster: Option<bool>,
113 extra: HashMap<String, String>,
114 redis_server_bin: String,
115 redis_cli_bin: String,
116 node_config_fn: Option<Box<dyn FnMut(NodeContext) -> RedisServer + Send>>,
117}
118
119impl RedisClusterBuilder {
120 pub fn masters(mut self, n: u16) -> Self {
122 self.masters = n;
123 self
124 }
125
126 pub fn replicas_per_master(mut self, n: u16) -> Self {
128 self.replicas_per_master = n;
129 self
130 }
131
132 pub fn base_port(mut self, port: u16) -> Self {
136 self.base_port = port;
137 self
138 }
139
140 pub fn bind(mut self, bind: impl Into<String>) -> Self {
142 self.bind = bind.into();
143 self
144 }
145
146 pub fn password(mut self, password: impl Into<String>) -> Self {
148 self.password = Some(password.into());
149 self
150 }
151
152 pub fn logfile(mut self, path: impl Into<String>) -> Self {
154 self.logfile = Some(path.into());
155 self
156 }
157
158 pub fn save(mut self, save: bool) -> Self {
163 self.save = Some(if save {
164 SavePolicy::Default
165 } else {
166 SavePolicy::Disabled
167 });
168 self
169 }
170
171 pub fn save_schedule(mut self, schedule: Vec<(u64, u64)>) -> Self {
173 self.save = Some(SavePolicy::Custom(schedule));
174 self
175 }
176
177 pub fn appendonly(mut self, appendonly: bool) -> Self {
179 self.appendonly = Some(appendonly);
180 self
181 }
182
183 pub fn cluster_node_timeout(mut self, ms: u64) -> Self {
185 self.cluster_node_timeout = Some(ms);
186 self
187 }
188
189 pub fn cluster_require_full_coverage(mut self, require: bool) -> Self {
191 self.cluster_require_full_coverage = Some(require);
192 self
193 }
194
195 pub fn cluster_allow_reads_when_down(mut self, allow: bool) -> Self {
197 self.cluster_allow_reads_when_down = Some(allow);
198 self
199 }
200
201 pub fn cluster_allow_pubsubshard_when_down(mut self, allow: bool) -> Self {
203 self.cluster_allow_pubsubshard_when_down = Some(allow);
204 self
205 }
206
207 pub fn cluster_allow_replica_migration(mut self, allow: bool) -> Self {
209 self.cluster_allow_replica_migration = Some(allow);
210 self
211 }
212
213 pub fn cluster_migration_barrier(mut self, barrier: u32) -> Self {
215 self.cluster_migration_barrier = Some(barrier);
216 self
217 }
218
219 pub fn cluster_announce_hostname(mut self, hostname: impl Into<String>) -> Self {
221 self.cluster_announce_hostname = Some(hostname.into());
222 self
223 }
224
225 pub fn cluster_preferred_endpoint_type(mut self, endpoint_type: impl Into<String>) -> Self {
227 self.cluster_preferred_endpoint_type = Some(endpoint_type.into());
228 self
229 }
230
231 pub fn cluster_replica_no_failover(mut self, no_failover: bool) -> Self {
235 self.cluster_replica_no_failover = Some(no_failover);
236 self
237 }
238
239 pub fn cluster_replica_validity_factor(mut self, factor: u32) -> Self {
245 self.cluster_replica_validity_factor = Some(factor);
246 self
247 }
248
249 pub fn cluster_announce_ip(mut self, ip: impl Into<String>) -> Self {
251 self.cluster_announce_ip = Some(ip.into());
252 self
253 }
254
255 pub fn cluster_announce_port(mut self, port: u16) -> Self {
257 self.cluster_announce_port = Some(port);
258 self
259 }
260
261 pub fn cluster_announce_bus_port(mut self, port: u16) -> Self {
263 self.cluster_announce_bus_port = Some(port);
264 self
265 }
266
267 pub fn cluster_announce_tls_port(mut self, port: u16) -> Self {
269 self.cluster_announce_tls_port = Some(port);
270 self
271 }
272
273 pub fn cluster_announce_human_nodename(mut self, name: impl Into<String>) -> Self {
275 self.cluster_announce_human_nodename = Some(name.into());
276 self
277 }
278
279 pub fn cluster_port(mut self, port: u16) -> Self {
281 self.cluster_port = Some(port);
282 self
283 }
284
285 pub fn cluster_link_sendbuf_limit(mut self, limit: u64) -> Self {
289 self.cluster_link_sendbuf_limit = Some(limit);
290 self
291 }
292
293 pub fn cluster_compatibility_sample_ratio(mut self, ratio: u32) -> Self {
295 self.cluster_compatibility_sample_ratio = Some(ratio);
296 self
297 }
298
299 pub fn cluster_slot_migration_handoff_max_lag_bytes(mut self, bytes: u64) -> Self {
301 self.cluster_slot_migration_handoff_max_lag_bytes = Some(bytes);
302 self
303 }
304
305 pub fn cluster_slot_migration_write_pause_timeout(mut self, ms: u64) -> Self {
307 self.cluster_slot_migration_write_pause_timeout = Some(ms);
308 self
309 }
310
311 pub fn cluster_slot_stats_enabled(mut self, enable: bool) -> Self {
313 self.cluster_slot_stats_enabled = Some(enable);
314 self
315 }
316
317 pub fn min_replicas_to_write(mut self, n: u32) -> Self {
324 self.min_replicas_to_write = Some(n);
325 self
326 }
327
328 pub fn min_replicas_max_lag(mut self, seconds: u32) -> Self {
332 self.min_replicas_max_lag = Some(seconds);
333 self
334 }
335
336 pub fn repl_diskless_sync(mut self, enable: bool) -> Self {
340 self.repl_diskless_sync = Some(enable);
341 self
342 }
343
344 pub fn repl_diskless_sync_delay(mut self, seconds: u32) -> Self {
348 self.repl_diskless_sync_delay = Some(seconds);
349 self
350 }
351
352 pub fn repl_ping_replica_period(mut self, seconds: u32) -> Self {
357 self.repl_ping_replica_period = Some(seconds);
358 self
359 }
360
361 pub fn repl_timeout(mut self, seconds: u32) -> Self {
365 self.repl_timeout = Some(seconds);
366 self
367 }
368
369 pub fn tls_port(mut self, port: u16) -> Self {
373 self.tls_port = Some(port);
374 self
375 }
376
377 pub fn tls_cert_file(mut self, path: impl Into<PathBuf>) -> Self {
379 self.tls_cert_file = Some(path.into());
380 self
381 }
382
383 pub fn tls_key_file(mut self, path: impl Into<PathBuf>) -> Self {
385 self.tls_key_file = Some(path.into());
386 self
387 }
388
389 pub fn tls_ca_cert_file(mut self, path: impl Into<PathBuf>) -> Self {
391 self.tls_ca_cert_file = Some(path.into());
392 self
393 }
394
395 pub fn tls_ca_cert_dir(mut self, path: impl Into<PathBuf>) -> Self {
397 self.tls_ca_cert_dir = Some(path.into());
398 self
399 }
400
401 pub fn tls_auth_clients(mut self, auth: bool) -> Self {
403 self.tls_auth_clients = Some(auth);
404 self
405 }
406
407 pub fn tls_replication(mut self, enable: bool) -> Self {
409 self.tls_replication = Some(enable);
410 self
411 }
412
413 pub fn tls_cluster(mut self, enable: bool) -> Self {
415 self.tls_cluster = Some(enable);
416 self
417 }
418
419 pub fn with_node_config(
454 mut self,
455 f: impl FnMut(NodeContext) -> RedisServer + Send + 'static,
456 ) -> Self {
457 self.node_config_fn = Some(Box::new(f));
458 self
459 }
460
461 pub fn extra(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
463 self.extra.insert(key.into(), value.into());
464 self
465 }
466
467 pub fn redis_server_bin(mut self, bin: impl Into<String>) -> Self {
469 self.redis_server_bin = bin.into();
470 self
471 }
472
473 pub fn redis_cli_bin(mut self, bin: impl Into<String>) -> Self {
475 self.redis_cli_bin = bin.into();
476 self
477 }
478
479 fn total_nodes(&self) -> u16 {
480 self.masters * (1 + self.replicas_per_master)
481 }
482
483 fn ports(&self) -> impl Iterator<Item = u16> {
484 let base = self.base_port;
485 let total = self.total_nodes();
486 (0..total).map(move |i| base + i)
487 }
488
489 fn has_tls(&self) -> bool {
491 self.tls_cert_file.is_some() && self.tls_key_file.is_some()
492 }
493
494 fn apply_tls_to_cli(&self, mut cli: RedisCli) -> RedisCli {
496 if self.has_tls() {
497 cli = cli.tls(true);
498 if let Some(ref ca) = self.tls_ca_cert_file {
499 cli = cli.cacert(ca);
500 } else {
501 cli = cli.insecure(true);
502 }
503 if let Some(ref cert) = self.tls_cert_file {
504 cli = cli.cert(cert);
505 }
506 if let Some(ref key) = self.tls_key_file {
507 cli = cli.key(key);
508 }
509 }
510 cli
511 }
512
513 pub async fn start(mut self) -> Result<RedisClusterHandle> {
515 for port in self.ports() {
517 let mut cli = RedisCli::new()
518 .bin(&self.redis_cli_bin)
519 .host(&self.bind)
520 .port(port);
521 if let Some(ref password) = self.password {
522 cli = cli.password(password);
523 }
524 cli = self.apply_tls_to_cli(cli);
525 cli.shutdown();
526 }
527 tokio::time::sleep(Duration::from_millis(500)).await;
528
529 let total_nodes = self.total_nodes();
531 let ports: Vec<u16> = self.ports().collect();
532 let mut nodes = Vec::new();
533 for (index, port) in ports.into_iter().enumerate() {
534 let node_dir = std::env::temp_dir().join(format!("redis-cluster-wrapper/node-{port}"));
535 let _ = std::fs::remove_dir_all(&node_dir);
536 let mut server = RedisServer::new()
537 .port(port)
538 .bind(&self.bind)
539 .dir(node_dir)
540 .cluster_enabled(true)
541 .cluster_node_timeout(self.cluster_node_timeout.unwrap_or(5000))
542 .redis_server_bin(&self.redis_server_bin)
543 .redis_cli_bin(&self.redis_cli_bin);
544 if let Some(v) = self.cluster_require_full_coverage {
545 server = server.cluster_require_full_coverage(v);
546 }
547 if let Some(v) = self.cluster_allow_reads_when_down {
548 server = server.cluster_allow_reads_when_down(v);
549 }
550 if let Some(v) = self.cluster_allow_pubsubshard_when_down {
551 server = server.cluster_allow_pubsubshard_when_down(v);
552 }
553 if let Some(v) = self.cluster_allow_replica_migration {
554 server = server.cluster_allow_replica_migration(v);
555 }
556 if let Some(barrier) = self.cluster_migration_barrier {
557 server = server.cluster_migration_barrier(barrier);
558 }
559 if let Some(ref hostname) = self.cluster_announce_hostname {
560 server = server.cluster_announce_hostname(hostname.clone());
561 }
562 if let Some(ref endpoint_type) = self.cluster_preferred_endpoint_type {
563 server = server.cluster_preferred_endpoint_type(endpoint_type.clone());
564 }
565 if let Some(v) = self.cluster_replica_no_failover {
566 server = server.cluster_replica_no_failover(v);
567 }
568 if let Some(factor) = self.cluster_replica_validity_factor {
569 server = server.cluster_replica_validity_factor(factor);
570 }
571 if let Some(ref ip) = self.cluster_announce_ip {
572 server = server.cluster_announce_ip(ip.clone());
573 }
574 if let Some(port) = self.cluster_announce_port {
575 server = server.cluster_announce_port(port);
576 }
577 if let Some(port) = self.cluster_announce_bus_port {
578 server = server.cluster_announce_bus_port(port);
579 }
580 if let Some(port) = self.cluster_announce_tls_port {
581 server = server.cluster_announce_tls_port(port);
582 }
583 if let Some(ref name) = self.cluster_announce_human_nodename {
584 server = server.cluster_announce_human_nodename(name.clone());
585 }
586 if let Some(port) = self.cluster_port {
587 server = server.cluster_port(port);
588 }
589 if let Some(limit) = self.cluster_link_sendbuf_limit {
590 server = server.cluster_link_sendbuf_limit(limit);
591 }
592 if let Some(ratio) = self.cluster_compatibility_sample_ratio {
593 server = server.cluster_compatibility_sample_ratio(ratio);
594 }
595 if let Some(bytes) = self.cluster_slot_migration_handoff_max_lag_bytes {
596 server = server.cluster_slot_migration_handoff_max_lag_bytes(bytes);
597 }
598 if let Some(ms) = self.cluster_slot_migration_write_pause_timeout {
599 server = server.cluster_slot_migration_write_pause_timeout(ms);
600 }
601 if let Some(v) = self.cluster_slot_stats_enabled {
602 server = server.cluster_slot_stats_enabled(v);
603 }
604 if let Some(n) = self.min_replicas_to_write {
605 server = server.min_replicas_to_write(n);
606 }
607 if let Some(seconds) = self.min_replicas_max_lag {
608 server = server.min_replicas_max_lag(seconds);
609 }
610 if let Some(v) = self.repl_diskless_sync {
611 server = server.repl_diskless_sync(v);
612 }
613 if let Some(seconds) = self.repl_diskless_sync_delay {
614 server = server.repl_diskless_sync_delay(seconds);
615 }
616 if let Some(seconds) = self.repl_ping_replica_period {
617 server = server.repl_ping_replica_period(seconds);
618 }
619 if let Some(seconds) = self.repl_timeout {
620 server = server.repl_timeout(seconds);
621 }
622 if let Some(ref password) = self.password {
623 server = server.password(password).masterauth(password);
624 }
625 if let Some(ref logfile) = self.logfile {
626 server = server.logfile(logfile.clone());
627 }
628 if let Some(ref save) = self.save {
629 match save {
630 SavePolicy::Disabled => server = server.save(false),
631 SavePolicy::Default => server = server.save(true),
632 SavePolicy::Custom(pairs) => {
633 server = server.save_schedule(pairs.clone());
634 }
635 }
636 }
637 if let Some(appendonly) = self.appendonly {
638 server = server.appendonly(appendonly);
639 }
640 if let Some(port) = self.tls_port {
642 server = server.tls_port(port + index as u16);
643 }
644 if let Some(ref path) = self.tls_cert_file {
645 server = server.tls_cert_file(path);
646 }
647 if let Some(ref path) = self.tls_key_file {
648 server = server.tls_key_file(path);
649 }
650 if let Some(ref path) = self.tls_ca_cert_file {
651 server = server.tls_ca_cert_file(path);
652 }
653 if let Some(ref path) = self.tls_ca_cert_dir {
654 server = server.tls_ca_cert_dir(path);
655 }
656 if let Some(v) = self.tls_auth_clients {
657 server = server.tls_auth_clients(v);
658 }
659 if let Some(v) = self.tls_replication {
660 server = server.tls_replication(v);
661 }
662 if let Some(v) = self.tls_cluster {
663 server = server.tls_cluster(v);
664 }
665 for (key, value) in &self.extra {
666 server = server.extra(key.clone(), value.clone());
667 }
668 if let Some(ref mut f) = self.node_config_fn {
670 server = f(NodeContext {
671 server,
672 index,
673 port,
674 total_nodes,
675 masters: self.masters,
676 replicas_per_master: self.replicas_per_master,
677 });
678 }
679 let handle = server.start().await?;
680 nodes.push(handle);
681 }
682
683 let node_addrs: Vec<String> = nodes.iter().map(|n| n.addr()).collect();
685 let mut cli = RedisCli::new()
686 .bin(&self.redis_cli_bin)
687 .host(&self.bind)
688 .port(self.base_port);
689 if let Some(ref password) = self.password {
690 cli = cli.password(password);
691 }
692 cli = self.apply_tls_to_cli(cli);
693 cli.cluster_create(&node_addrs, self.replicas_per_master)
694 .await?;
695
696 tokio::time::sleep(Duration::from_secs(2)).await;
698
699 Ok(RedisClusterHandle {
700 nodes,
701 num_masters: self.masters,
702 bind: self.bind,
703 base_port: self.base_port,
704 password: self.password,
705 redis_cli_bin: self.redis_cli_bin,
706 tls: TlsConfig {
707 cert_file: self.tls_cert_file,
708 key_file: self.tls_key_file,
709 ca_cert_file: self.tls_ca_cert_file,
710 },
711 })
712 }
713}
714
715#[derive(Clone, Debug, Default)]
717struct TlsConfig {
718 cert_file: Option<PathBuf>,
719 key_file: Option<PathBuf>,
720 ca_cert_file: Option<PathBuf>,
721}
722
723impl TlsConfig {
724 fn has_tls(&self) -> bool {
725 self.cert_file.is_some() && self.key_file.is_some()
726 }
727
728 fn apply(&self, mut cli: RedisCli) -> RedisCli {
729 if self.has_tls() {
730 cli = cli.tls(true);
731 if let Some(ref ca) = self.ca_cert_file {
732 cli = cli.cacert(ca);
733 } else {
734 cli = cli.insecure(true);
735 }
736 if let Some(ref cert) = self.cert_file {
737 cli = cli.cert(cert);
738 }
739 if let Some(ref key) = self.key_file {
740 cli = cli.key(key);
741 }
742 }
743 cli
744 }
745}
746
747pub struct RedisClusterHandle {
749 nodes: Vec<RedisServerHandle>,
750 num_masters: u16,
751 bind: String,
752 base_port: u16,
753 password: Option<String>,
754 redis_cli_bin: String,
755 tls: TlsConfig,
756}
757
758pub struct RedisCluster;
763
764impl RedisCluster {
765 pub fn builder() -> RedisClusterBuilder {
767 RedisClusterBuilder {
768 masters: 3,
769 replicas_per_master: 0,
770 base_port: 7000,
771 bind: "127.0.0.1".into(),
772 password: None,
773 logfile: None,
774 save: None,
775 appendonly: None,
776 cluster_node_timeout: None,
777 cluster_require_full_coverage: None,
778 cluster_allow_reads_when_down: None,
779 cluster_allow_pubsubshard_when_down: None,
780 cluster_allow_replica_migration: None,
781 cluster_migration_barrier: None,
782 cluster_announce_hostname: None,
783 cluster_announce_human_nodename: None,
784 cluster_preferred_endpoint_type: None,
785 cluster_replica_no_failover: None,
786 cluster_replica_validity_factor: None,
787 cluster_announce_ip: None,
788 cluster_announce_port: None,
789 cluster_announce_bus_port: None,
790 cluster_announce_tls_port: None,
791 cluster_port: None,
792 cluster_link_sendbuf_limit: None,
793 cluster_compatibility_sample_ratio: None,
794 cluster_slot_migration_handoff_max_lag_bytes: None,
795 cluster_slot_migration_write_pause_timeout: None,
796 cluster_slot_stats_enabled: None,
797 min_replicas_to_write: None,
798 min_replicas_max_lag: None,
799 repl_diskless_sync: None,
800 repl_diskless_sync_delay: None,
801 repl_ping_replica_period: None,
802 repl_timeout: None,
803 tls_port: None,
804 tls_cert_file: None,
805 tls_key_file: None,
806 tls_ca_cert_file: None,
807 tls_ca_cert_dir: None,
808 tls_auth_clients: None,
809 tls_replication: None,
810 tls_cluster: None,
811 extra: HashMap::new(),
812 redis_server_bin: "redis-server".into(),
813 redis_cli_bin: "redis-cli".into(),
814 node_config_fn: None,
815 }
816 }
817}
818
819impl RedisClusterHandle {
820 pub fn addr(&self) -> String {
822 format!("{}:{}", self.bind, self.base_port)
823 }
824
825 pub fn node_addrs(&self) -> Vec<String> {
827 self.nodes.iter().map(|n| n.addr()).collect()
828 }
829
830 pub fn pids(&self) -> Vec<u32> {
832 self.nodes.iter().map(|n| n.pid()).collect()
833 }
834
835 pub async fn all_alive(&self) -> bool {
837 for node in &self.nodes {
838 if !node.is_alive().await {
839 return false;
840 }
841 }
842 true
843 }
844
845 pub async fn is_healthy(&self) -> bool {
847 for node in &self.nodes {
848 if let Ok(info) = node.run(&["CLUSTER", "INFO"]).await
849 && info.contains("cluster_state:ok")
850 && info.contains("cluster_slots_ok:16384")
851 {
852 return true;
853 }
854 }
855 false
856 }
857
858 pub async fn wait_for_healthy(&self, timeout: Duration) -> Result<()> {
860 let start = std::time::Instant::now();
861 loop {
862 if self.is_healthy().await {
863 return Ok(());
864 }
865 if start.elapsed() > timeout {
866 return Err(Error::Timeout {
867 message: "cluster did not become healthy in time".into(),
868 });
869 }
870 tokio::time::sleep(Duration::from_millis(500)).await;
871 }
872 }
873
874 pub fn node(&self, index: usize) -> &RedisServerHandle {
883 &self.nodes[index]
884 }
885
886 pub fn nodes(&self) -> &[RedisServerHandle] {
888 &self.nodes
889 }
890
891 pub fn num_masters(&self) -> u16 {
893 self.num_masters
894 }
895
896 pub fn master_nodes(&self) -> &[RedisServerHandle] {
901 &self.nodes[..self.num_masters as usize]
902 }
903
904 pub fn replica_nodes(&self) -> &[RedisServerHandle] {
909 &self.nodes[self.num_masters as usize..]
910 }
911
912 pub async fn config_set_all(&self, key: &str, value: &str) -> Result<()> {
914 for node in &self.nodes {
915 node.run(&["CONFIG", "SET", key, value]).await?;
916 }
917 Ok(())
918 }
919
920 pub async fn config_set_masters(&self, key: &str, value: &str) -> Result<()> {
922 for node in self.master_nodes() {
923 node.run(&["CONFIG", "SET", key, value]).await?;
924 }
925 Ok(())
926 }
927
928 pub async fn config_set_replicas(&self, key: &str, value: &str) -> Result<()> {
930 for node in self.replica_nodes() {
931 node.run(&["CONFIG", "SET", key, value]).await?;
932 }
933 Ok(())
934 }
935
936 pub fn cli(&self) -> RedisCli {
938 let mut cli = RedisCli::new()
939 .bin(&self.redis_cli_bin)
940 .host(&self.bind)
941 .port(self.base_port);
942 if let Some(ref password) = self.password {
943 cli = cli.password(password);
944 }
945 cli = self.tls.apply(cli);
946 cli
947 }
948}
949
950impl Drop for RedisClusterHandle {
951 fn drop(&mut self) {
952 }
954}
955
956#[cfg(test)]
957mod tests {
958 use super::*;
959
960 #[test]
961 fn builder_defaults() {
962 let b = RedisCluster::builder();
963 assert_eq!(b.masters, 3);
964 assert_eq!(b.replicas_per_master, 0);
965 assert_eq!(b.base_port, 7000);
966 assert_eq!(b.password, None);
967 assert!(b.logfile.is_none());
968 assert!(b.extra.is_empty());
969 assert_eq!(b.total_nodes(), 3);
970 assert!(b.cluster_node_timeout.is_none());
971 assert!(b.cluster_require_full_coverage.is_none());
972 assert!(b.cluster_allow_reads_when_down.is_none());
973 assert!(b.cluster_allow_pubsubshard_when_down.is_none());
974 assert!(b.cluster_allow_replica_migration.is_none());
975 assert!(b.cluster_migration_barrier.is_none());
976 assert!(b.cluster_announce_hostname.is_none());
977 assert!(b.cluster_preferred_endpoint_type.is_none());
978 }
979
980 #[test]
981 fn builder_with_replicas() {
982 let b = RedisCluster::builder().masters(3).replicas_per_master(1);
983 assert_eq!(b.total_nodes(), 6);
984 let ports: Vec<u16> = b.ports().collect();
985 assert_eq!(ports, vec![7000, 7001, 7002, 7003, 7004, 7005]);
986 }
987
988 #[test]
989 fn builder_password() {
990 let b = RedisCluster::builder().password("secret");
991 assert_eq!(b.password.as_deref(), Some("secret"));
992 }
993
994 #[test]
995 fn builder_cluster_directives() {
996 let b = RedisCluster::builder()
997 .cluster_node_timeout(10000)
998 .cluster_require_full_coverage(false)
999 .cluster_allow_reads_when_down(true)
1000 .cluster_allow_pubsubshard_when_down(true)
1001 .cluster_allow_replica_migration(false)
1002 .cluster_migration_barrier(2)
1003 .cluster_announce_hostname("node.example.com")
1004 .cluster_preferred_endpoint_type("hostname")
1005 .cluster_replica_no_failover(true)
1006 .cluster_replica_validity_factor(0)
1007 .cluster_announce_ip("10.0.0.1")
1008 .cluster_announce_port(7000)
1009 .cluster_announce_bus_port(17000)
1010 .cluster_announce_tls_port(7100)
1011 .cluster_announce_human_nodename("node-1")
1012 .cluster_port(17000)
1013 .cluster_link_sendbuf_limit(67108864)
1014 .cluster_compatibility_sample_ratio(50)
1015 .cluster_slot_migration_handoff_max_lag_bytes(1048576)
1016 .cluster_slot_migration_write_pause_timeout(5000)
1017 .cluster_slot_stats_enabled(true);
1018 assert_eq!(b.cluster_node_timeout, Some(10000));
1019 assert_eq!(b.cluster_require_full_coverage, Some(false));
1020 assert_eq!(b.cluster_allow_reads_when_down, Some(true));
1021 assert_eq!(b.cluster_allow_pubsubshard_when_down, Some(true));
1022 assert_eq!(b.cluster_allow_replica_migration, Some(false));
1023 assert_eq!(b.cluster_migration_barrier, Some(2));
1024 assert_eq!(
1025 b.cluster_announce_hostname.as_deref(),
1026 Some("node.example.com")
1027 );
1028 assert_eq!(
1029 b.cluster_preferred_endpoint_type.as_deref(),
1030 Some("hostname")
1031 );
1032 assert_eq!(b.cluster_replica_no_failover, Some(true));
1033 assert_eq!(b.cluster_replica_validity_factor, Some(0));
1034 assert_eq!(b.cluster_announce_ip.as_deref(), Some("10.0.0.1"));
1035 assert_eq!(b.cluster_announce_port, Some(7000));
1036 assert_eq!(b.cluster_announce_bus_port, Some(17000));
1037 assert_eq!(b.cluster_announce_tls_port, Some(7100));
1038 assert_eq!(b.cluster_announce_human_nodename.as_deref(), Some("node-1"));
1039 assert_eq!(b.cluster_port, Some(17000));
1040 assert_eq!(b.cluster_link_sendbuf_limit, Some(67108864));
1041 assert_eq!(b.cluster_compatibility_sample_ratio, Some(50));
1042 assert_eq!(
1043 b.cluster_slot_migration_handoff_max_lag_bytes,
1044 Some(1048576)
1045 );
1046 assert_eq!(b.cluster_slot_migration_write_pause_timeout, Some(5000));
1047 assert_eq!(b.cluster_slot_stats_enabled, Some(true));
1048 }
1049
1050 #[test]
1051 fn builder_replication_directives() {
1052 let b = RedisCluster::builder()
1053 .min_replicas_to_write(1)
1054 .min_replicas_max_lag(10)
1055 .repl_diskless_sync(true)
1056 .repl_diskless_sync_delay(0)
1057 .repl_ping_replica_period(5)
1058 .repl_timeout(30);
1059 assert_eq!(b.min_replicas_to_write, Some(1));
1060 assert_eq!(b.min_replicas_max_lag, Some(10));
1061 assert_eq!(b.repl_diskless_sync, Some(true));
1062 assert_eq!(b.repl_diskless_sync_delay, Some(0));
1063 assert_eq!(b.repl_ping_replica_period, Some(5));
1064 assert_eq!(b.repl_timeout, Some(30));
1065 }
1066
1067 #[test]
1068 fn builder_logfile_and_extra() {
1069 let b = RedisCluster::builder()
1070 .logfile("/tmp/cluster.log")
1071 .extra("maxmemory", "10mb");
1072 assert_eq!(b.logfile.as_deref(), Some("/tmp/cluster.log"));
1073 assert_eq!(b.extra.get("maxmemory").map(String::as_str), Some("10mb"));
1074 }
1075}