1use std::collections::HashMap;
4use std::time::Duration;
5
6use crate::cli::RedisCli;
7use crate::error::{Error, Result};
8use crate::server::{RedisServer, RedisServerHandle, SavePolicy};
9
10pub struct NodeContext {
16 pub server: RedisServer,
21 pub index: usize,
26 pub port: u16,
28 pub total_nodes: u16,
30 pub masters: u16,
32 pub replicas_per_master: u16,
34}
35
36impl NodeContext {
37 pub fn is_master(&self) -> bool {
39 self.index < self.masters as usize
40 }
41
42 pub fn is_replica(&self) -> bool {
44 !self.is_master()
45 }
46}
47
48pub struct RedisClusterBuilder {
69 masters: u16,
70 replicas_per_master: u16,
71 base_port: u16,
72 bind: String,
73 password: Option<String>,
74 logfile: Option<String>,
75 save: Option<SavePolicy>,
76 appendonly: Option<bool>,
77 cluster_node_timeout: Option<u64>,
78 cluster_require_full_coverage: Option<bool>,
79 cluster_allow_reads_when_down: Option<bool>,
80 cluster_allow_pubsubshard_when_down: Option<bool>,
81 cluster_allow_replica_migration: Option<bool>,
82 cluster_migration_barrier: Option<u32>,
83 cluster_announce_hostname: Option<String>,
84 cluster_announce_human_nodename: Option<String>,
85 cluster_preferred_endpoint_type: Option<String>,
86 cluster_replica_no_failover: Option<bool>,
87 cluster_replica_validity_factor: Option<u32>,
88 cluster_announce_ip: Option<String>,
89 cluster_announce_port: Option<u16>,
90 cluster_announce_bus_port: Option<u16>,
91 cluster_announce_tls_port: Option<u16>,
92 cluster_port: Option<u16>,
93 cluster_link_sendbuf_limit: Option<u64>,
94 cluster_compatibility_sample_ratio: Option<u32>,
95 cluster_slot_migration_handoff_max_lag_bytes: Option<u64>,
96 cluster_slot_migration_write_pause_timeout: Option<u64>,
97 cluster_slot_stats_enabled: Option<bool>,
98 min_replicas_to_write: Option<u32>,
99 min_replicas_max_lag: Option<u32>,
100 repl_diskless_sync: Option<bool>,
101 repl_diskless_sync_delay: Option<u32>,
102 repl_ping_replica_period: Option<u32>,
103 repl_timeout: Option<u32>,
104 extra: HashMap<String, String>,
105 redis_server_bin: String,
106 redis_cli_bin: String,
107 node_config_fn: Option<Box<dyn FnMut(NodeContext) -> RedisServer + Send>>,
108}
109
110impl RedisClusterBuilder {
111 pub fn masters(mut self, n: u16) -> Self {
113 self.masters = n;
114 self
115 }
116
117 pub fn replicas_per_master(mut self, n: u16) -> Self {
119 self.replicas_per_master = n;
120 self
121 }
122
123 pub fn base_port(mut self, port: u16) -> Self {
127 self.base_port = port;
128 self
129 }
130
131 pub fn bind(mut self, bind: impl Into<String>) -> Self {
133 self.bind = bind.into();
134 self
135 }
136
137 pub fn password(mut self, password: impl Into<String>) -> Self {
139 self.password = Some(password.into());
140 self
141 }
142
143 pub fn logfile(mut self, path: impl Into<String>) -> Self {
145 self.logfile = Some(path.into());
146 self
147 }
148
149 pub fn save(mut self, save: bool) -> Self {
154 self.save = Some(if save {
155 SavePolicy::Default
156 } else {
157 SavePolicy::Disabled
158 });
159 self
160 }
161
162 pub fn save_schedule(mut self, schedule: Vec<(u64, u64)>) -> Self {
164 self.save = Some(SavePolicy::Custom(schedule));
165 self
166 }
167
168 pub fn appendonly(mut self, appendonly: bool) -> Self {
170 self.appendonly = Some(appendonly);
171 self
172 }
173
174 pub fn cluster_node_timeout(mut self, ms: u64) -> Self {
176 self.cluster_node_timeout = Some(ms);
177 self
178 }
179
180 pub fn cluster_require_full_coverage(mut self, require: bool) -> Self {
182 self.cluster_require_full_coverage = Some(require);
183 self
184 }
185
186 pub fn cluster_allow_reads_when_down(mut self, allow: bool) -> Self {
188 self.cluster_allow_reads_when_down = Some(allow);
189 self
190 }
191
192 pub fn cluster_allow_pubsubshard_when_down(mut self, allow: bool) -> Self {
194 self.cluster_allow_pubsubshard_when_down = Some(allow);
195 self
196 }
197
198 pub fn cluster_allow_replica_migration(mut self, allow: bool) -> Self {
200 self.cluster_allow_replica_migration = Some(allow);
201 self
202 }
203
204 pub fn cluster_migration_barrier(mut self, barrier: u32) -> Self {
206 self.cluster_migration_barrier = Some(barrier);
207 self
208 }
209
210 pub fn cluster_announce_hostname(mut self, hostname: impl Into<String>) -> Self {
212 self.cluster_announce_hostname = Some(hostname.into());
213 self
214 }
215
216 pub fn cluster_preferred_endpoint_type(mut self, endpoint_type: impl Into<String>) -> Self {
218 self.cluster_preferred_endpoint_type = Some(endpoint_type.into());
219 self
220 }
221
222 pub fn cluster_replica_no_failover(mut self, no_failover: bool) -> Self {
226 self.cluster_replica_no_failover = Some(no_failover);
227 self
228 }
229
230 pub fn cluster_replica_validity_factor(mut self, factor: u32) -> Self {
236 self.cluster_replica_validity_factor = Some(factor);
237 self
238 }
239
240 pub fn cluster_announce_ip(mut self, ip: impl Into<String>) -> Self {
242 self.cluster_announce_ip = Some(ip.into());
243 self
244 }
245
246 pub fn cluster_announce_port(mut self, port: u16) -> Self {
248 self.cluster_announce_port = Some(port);
249 self
250 }
251
252 pub fn cluster_announce_bus_port(mut self, port: u16) -> Self {
254 self.cluster_announce_bus_port = Some(port);
255 self
256 }
257
258 pub fn cluster_announce_tls_port(mut self, port: u16) -> Self {
260 self.cluster_announce_tls_port = Some(port);
261 self
262 }
263
264 pub fn cluster_announce_human_nodename(mut self, name: impl Into<String>) -> Self {
266 self.cluster_announce_human_nodename = Some(name.into());
267 self
268 }
269
270 pub fn cluster_port(mut self, port: u16) -> Self {
272 self.cluster_port = Some(port);
273 self
274 }
275
276 pub fn cluster_link_sendbuf_limit(mut self, limit: u64) -> Self {
280 self.cluster_link_sendbuf_limit = Some(limit);
281 self
282 }
283
284 pub fn cluster_compatibility_sample_ratio(mut self, ratio: u32) -> Self {
286 self.cluster_compatibility_sample_ratio = Some(ratio);
287 self
288 }
289
290 pub fn cluster_slot_migration_handoff_max_lag_bytes(mut self, bytes: u64) -> Self {
292 self.cluster_slot_migration_handoff_max_lag_bytes = Some(bytes);
293 self
294 }
295
296 pub fn cluster_slot_migration_write_pause_timeout(mut self, ms: u64) -> Self {
298 self.cluster_slot_migration_write_pause_timeout = Some(ms);
299 self
300 }
301
302 pub fn cluster_slot_stats_enabled(mut self, enable: bool) -> Self {
304 self.cluster_slot_stats_enabled = Some(enable);
305 self
306 }
307
308 pub fn min_replicas_to_write(mut self, n: u32) -> Self {
315 self.min_replicas_to_write = Some(n);
316 self
317 }
318
319 pub fn min_replicas_max_lag(mut self, seconds: u32) -> Self {
323 self.min_replicas_max_lag = Some(seconds);
324 self
325 }
326
327 pub fn repl_diskless_sync(mut self, enable: bool) -> Self {
331 self.repl_diskless_sync = Some(enable);
332 self
333 }
334
335 pub fn repl_diskless_sync_delay(mut self, seconds: u32) -> Self {
339 self.repl_diskless_sync_delay = Some(seconds);
340 self
341 }
342
343 pub fn repl_ping_replica_period(mut self, seconds: u32) -> Self {
348 self.repl_ping_replica_period = Some(seconds);
349 self
350 }
351
352 pub fn repl_timeout(mut self, seconds: u32) -> Self {
356 self.repl_timeout = Some(seconds);
357 self
358 }
359
360 pub fn with_node_config(
395 mut self,
396 f: impl FnMut(NodeContext) -> RedisServer + Send + 'static,
397 ) -> Self {
398 self.node_config_fn = Some(Box::new(f));
399 self
400 }
401
402 pub fn extra(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
404 self.extra.insert(key.into(), value.into());
405 self
406 }
407
408 pub fn redis_server_bin(mut self, bin: impl Into<String>) -> Self {
410 self.redis_server_bin = bin.into();
411 self
412 }
413
414 pub fn redis_cli_bin(mut self, bin: impl Into<String>) -> Self {
416 self.redis_cli_bin = bin.into();
417 self
418 }
419
420 fn total_nodes(&self) -> u16 {
421 self.masters * (1 + self.replicas_per_master)
422 }
423
424 fn ports(&self) -> impl Iterator<Item = u16> {
425 let base = self.base_port;
426 let total = self.total_nodes();
427 (0..total).map(move |i| base + i)
428 }
429
430 pub async fn start(mut self) -> Result<RedisClusterHandle> {
432 for port in self.ports() {
434 let mut cli = RedisCli::new()
435 .bin(&self.redis_cli_bin)
436 .host(&self.bind)
437 .port(port);
438 if let Some(ref password) = self.password {
439 cli = cli.password(password);
440 }
441 cli.shutdown();
442 }
443 tokio::time::sleep(Duration::from_millis(500)).await;
444
445 let total_nodes = self.total_nodes();
447 let ports: Vec<u16> = self.ports().collect();
448 let mut nodes = Vec::new();
449 for (index, port) in ports.into_iter().enumerate() {
450 let node_dir = std::env::temp_dir().join(format!("redis-cluster-wrapper/node-{port}"));
451 let _ = std::fs::remove_dir_all(&node_dir);
452 let mut server = RedisServer::new()
453 .port(port)
454 .bind(&self.bind)
455 .dir(node_dir)
456 .cluster_enabled(true)
457 .cluster_node_timeout(self.cluster_node_timeout.unwrap_or(5000))
458 .redis_server_bin(&self.redis_server_bin)
459 .redis_cli_bin(&self.redis_cli_bin);
460 if let Some(v) = self.cluster_require_full_coverage {
461 server = server.cluster_require_full_coverage(v);
462 }
463 if let Some(v) = self.cluster_allow_reads_when_down {
464 server = server.cluster_allow_reads_when_down(v);
465 }
466 if let Some(v) = self.cluster_allow_pubsubshard_when_down {
467 server = server.cluster_allow_pubsubshard_when_down(v);
468 }
469 if let Some(v) = self.cluster_allow_replica_migration {
470 server = server.cluster_allow_replica_migration(v);
471 }
472 if let Some(barrier) = self.cluster_migration_barrier {
473 server = server.cluster_migration_barrier(barrier);
474 }
475 if let Some(ref hostname) = self.cluster_announce_hostname {
476 server = server.cluster_announce_hostname(hostname.clone());
477 }
478 if let Some(ref endpoint_type) = self.cluster_preferred_endpoint_type {
479 server = server.cluster_preferred_endpoint_type(endpoint_type.clone());
480 }
481 if let Some(v) = self.cluster_replica_no_failover {
482 server = server.cluster_replica_no_failover(v);
483 }
484 if let Some(factor) = self.cluster_replica_validity_factor {
485 server = server.cluster_replica_validity_factor(factor);
486 }
487 if let Some(ref ip) = self.cluster_announce_ip {
488 server = server.cluster_announce_ip(ip.clone());
489 }
490 if let Some(port) = self.cluster_announce_port {
491 server = server.cluster_announce_port(port);
492 }
493 if let Some(port) = self.cluster_announce_bus_port {
494 server = server.cluster_announce_bus_port(port);
495 }
496 if let Some(port) = self.cluster_announce_tls_port {
497 server = server.cluster_announce_tls_port(port);
498 }
499 if let Some(ref name) = self.cluster_announce_human_nodename {
500 server = server.cluster_announce_human_nodename(name.clone());
501 }
502 if let Some(port) = self.cluster_port {
503 server = server.cluster_port(port);
504 }
505 if let Some(limit) = self.cluster_link_sendbuf_limit {
506 server = server.cluster_link_sendbuf_limit(limit);
507 }
508 if let Some(ratio) = self.cluster_compatibility_sample_ratio {
509 server = server.cluster_compatibility_sample_ratio(ratio);
510 }
511 if let Some(bytes) = self.cluster_slot_migration_handoff_max_lag_bytes {
512 server = server.cluster_slot_migration_handoff_max_lag_bytes(bytes);
513 }
514 if let Some(ms) = self.cluster_slot_migration_write_pause_timeout {
515 server = server.cluster_slot_migration_write_pause_timeout(ms);
516 }
517 if let Some(v) = self.cluster_slot_stats_enabled {
518 server = server.cluster_slot_stats_enabled(v);
519 }
520 if let Some(n) = self.min_replicas_to_write {
521 server = server.min_replicas_to_write(n);
522 }
523 if let Some(seconds) = self.min_replicas_max_lag {
524 server = server.min_replicas_max_lag(seconds);
525 }
526 if let Some(v) = self.repl_diskless_sync {
527 server = server.repl_diskless_sync(v);
528 }
529 if let Some(seconds) = self.repl_diskless_sync_delay {
530 server = server.repl_diskless_sync_delay(seconds);
531 }
532 if let Some(seconds) = self.repl_ping_replica_period {
533 server = server.repl_ping_replica_period(seconds);
534 }
535 if let Some(seconds) = self.repl_timeout {
536 server = server.repl_timeout(seconds);
537 }
538 if let Some(ref password) = self.password {
539 server = server.password(password).masterauth(password);
540 }
541 if let Some(ref logfile) = self.logfile {
542 server = server.logfile(logfile.clone());
543 }
544 if let Some(ref save) = self.save {
545 match save {
546 SavePolicy::Disabled => server = server.save(false),
547 SavePolicy::Default => server = server.save(true),
548 SavePolicy::Custom(pairs) => {
549 server = server.save_schedule(pairs.clone());
550 }
551 }
552 }
553 if let Some(appendonly) = self.appendonly {
554 server = server.appendonly(appendonly);
555 }
556 for (key, value) in &self.extra {
557 server = server.extra(key.clone(), value.clone());
558 }
559 if let Some(ref mut f) = self.node_config_fn {
561 server = f(NodeContext {
562 server,
563 index,
564 port,
565 total_nodes,
566 masters: self.masters,
567 replicas_per_master: self.replicas_per_master,
568 });
569 }
570 let handle = server.start().await?;
571 nodes.push(handle);
572 }
573
574 let node_addrs: Vec<String> = nodes.iter().map(|n| n.addr()).collect();
576 let mut cli = RedisCli::new()
577 .bin(&self.redis_cli_bin)
578 .host(&self.bind)
579 .port(self.base_port);
580 if let Some(ref password) = self.password {
581 cli = cli.password(password);
582 }
583 cli.cluster_create(&node_addrs, self.replicas_per_master)
584 .await?;
585
586 tokio::time::sleep(Duration::from_secs(2)).await;
588
589 Ok(RedisClusterHandle {
590 nodes,
591 num_masters: self.masters,
592 bind: self.bind,
593 base_port: self.base_port,
594 password: self.password,
595 redis_cli_bin: self.redis_cli_bin,
596 })
597 }
598}
599
600pub struct RedisClusterHandle {
602 nodes: Vec<RedisServerHandle>,
603 num_masters: u16,
604 bind: String,
605 base_port: u16,
606 password: Option<String>,
607 redis_cli_bin: String,
608}
609
610pub struct RedisCluster;
615
616impl RedisCluster {
617 pub fn builder() -> RedisClusterBuilder {
619 RedisClusterBuilder {
620 masters: 3,
621 replicas_per_master: 0,
622 base_port: 7000,
623 bind: "127.0.0.1".into(),
624 password: None,
625 logfile: None,
626 save: None,
627 appendonly: None,
628 cluster_node_timeout: None,
629 cluster_require_full_coverage: None,
630 cluster_allow_reads_when_down: None,
631 cluster_allow_pubsubshard_when_down: None,
632 cluster_allow_replica_migration: None,
633 cluster_migration_barrier: None,
634 cluster_announce_hostname: None,
635 cluster_announce_human_nodename: None,
636 cluster_preferred_endpoint_type: None,
637 cluster_replica_no_failover: None,
638 cluster_replica_validity_factor: None,
639 cluster_announce_ip: None,
640 cluster_announce_port: None,
641 cluster_announce_bus_port: None,
642 cluster_announce_tls_port: None,
643 cluster_port: None,
644 cluster_link_sendbuf_limit: None,
645 cluster_compatibility_sample_ratio: None,
646 cluster_slot_migration_handoff_max_lag_bytes: None,
647 cluster_slot_migration_write_pause_timeout: None,
648 cluster_slot_stats_enabled: None,
649 min_replicas_to_write: None,
650 min_replicas_max_lag: None,
651 repl_diskless_sync: None,
652 repl_diskless_sync_delay: None,
653 repl_ping_replica_period: None,
654 repl_timeout: None,
655 extra: HashMap::new(),
656 redis_server_bin: "redis-server".into(),
657 redis_cli_bin: "redis-cli".into(),
658 node_config_fn: None,
659 }
660 }
661}
662
663impl RedisClusterHandle {
664 pub fn addr(&self) -> String {
666 format!("{}:{}", self.bind, self.base_port)
667 }
668
669 pub fn node_addrs(&self) -> Vec<String> {
671 self.nodes.iter().map(|n| n.addr()).collect()
672 }
673
674 pub fn pids(&self) -> Vec<u32> {
676 self.nodes.iter().map(|n| n.pid()).collect()
677 }
678
679 pub async fn all_alive(&self) -> bool {
681 for node in &self.nodes {
682 if !node.is_alive().await {
683 return false;
684 }
685 }
686 true
687 }
688
689 pub async fn is_healthy(&self) -> bool {
691 for node in &self.nodes {
692 if let Ok(info) = node.run(&["CLUSTER", "INFO"]).await {
693 if info.contains("cluster_state:ok") && info.contains("cluster_slots_ok:16384") {
694 return true;
695 }
696 }
697 }
698 false
699 }
700
701 pub async fn wait_for_healthy(&self, timeout: Duration) -> Result<()> {
703 let start = std::time::Instant::now();
704 loop {
705 if self.is_healthy().await {
706 return Ok(());
707 }
708 if start.elapsed() > timeout {
709 return Err(Error::Timeout {
710 message: "cluster did not become healthy in time".into(),
711 });
712 }
713 tokio::time::sleep(Duration::from_millis(500)).await;
714 }
715 }
716
717 pub fn node(&self, index: usize) -> &RedisServerHandle {
726 &self.nodes[index]
727 }
728
729 pub fn nodes(&self) -> &[RedisServerHandle] {
731 &self.nodes
732 }
733
734 pub fn num_masters(&self) -> u16 {
736 self.num_masters
737 }
738
739 pub fn master_nodes(&self) -> &[RedisServerHandle] {
744 &self.nodes[..self.num_masters as usize]
745 }
746
747 pub fn replica_nodes(&self) -> &[RedisServerHandle] {
752 &self.nodes[self.num_masters as usize..]
753 }
754
755 pub async fn config_set_all(&self, key: &str, value: &str) -> Result<()> {
757 for node in &self.nodes {
758 node.run(&["CONFIG", "SET", key, value]).await?;
759 }
760 Ok(())
761 }
762
763 pub async fn config_set_masters(&self, key: &str, value: &str) -> Result<()> {
765 for node in self.master_nodes() {
766 node.run(&["CONFIG", "SET", key, value]).await?;
767 }
768 Ok(())
769 }
770
771 pub async fn config_set_replicas(&self, key: &str, value: &str) -> Result<()> {
773 for node in self.replica_nodes() {
774 node.run(&["CONFIG", "SET", key, value]).await?;
775 }
776 Ok(())
777 }
778
779 pub fn cli(&self) -> RedisCli {
781 let mut cli = RedisCli::new()
782 .bin(&self.redis_cli_bin)
783 .host(&self.bind)
784 .port(self.base_port);
785 if let Some(ref password) = self.password {
786 cli = cli.password(password);
787 }
788 cli
789 }
790}
791
792impl Drop for RedisClusterHandle {
793 fn drop(&mut self) {
794 }
796}
797
798#[cfg(test)]
799mod tests {
800 use super::*;
801
802 #[test]
803 fn builder_defaults() {
804 let b = RedisCluster::builder();
805 assert_eq!(b.masters, 3);
806 assert_eq!(b.replicas_per_master, 0);
807 assert_eq!(b.base_port, 7000);
808 assert_eq!(b.password, None);
809 assert!(b.logfile.is_none());
810 assert!(b.extra.is_empty());
811 assert_eq!(b.total_nodes(), 3);
812 assert!(b.cluster_node_timeout.is_none());
813 assert!(b.cluster_require_full_coverage.is_none());
814 assert!(b.cluster_allow_reads_when_down.is_none());
815 assert!(b.cluster_allow_pubsubshard_when_down.is_none());
816 assert!(b.cluster_allow_replica_migration.is_none());
817 assert!(b.cluster_migration_barrier.is_none());
818 assert!(b.cluster_announce_hostname.is_none());
819 assert!(b.cluster_preferred_endpoint_type.is_none());
820 }
821
822 #[test]
823 fn builder_with_replicas() {
824 let b = RedisCluster::builder().masters(3).replicas_per_master(1);
825 assert_eq!(b.total_nodes(), 6);
826 let ports: Vec<u16> = b.ports().collect();
827 assert_eq!(ports, vec![7000, 7001, 7002, 7003, 7004, 7005]);
828 }
829
830 #[test]
831 fn builder_password() {
832 let b = RedisCluster::builder().password("secret");
833 assert_eq!(b.password.as_deref(), Some("secret"));
834 }
835
836 #[test]
837 fn builder_cluster_directives() {
838 let b = RedisCluster::builder()
839 .cluster_node_timeout(10000)
840 .cluster_require_full_coverage(false)
841 .cluster_allow_reads_when_down(true)
842 .cluster_allow_pubsubshard_when_down(true)
843 .cluster_allow_replica_migration(false)
844 .cluster_migration_barrier(2)
845 .cluster_announce_hostname("node.example.com")
846 .cluster_preferred_endpoint_type("hostname")
847 .cluster_replica_no_failover(true)
848 .cluster_replica_validity_factor(0)
849 .cluster_announce_ip("10.0.0.1")
850 .cluster_announce_port(7000)
851 .cluster_announce_bus_port(17000)
852 .cluster_announce_tls_port(7100)
853 .cluster_announce_human_nodename("node-1")
854 .cluster_port(17000)
855 .cluster_link_sendbuf_limit(67108864)
856 .cluster_compatibility_sample_ratio(50)
857 .cluster_slot_migration_handoff_max_lag_bytes(1048576)
858 .cluster_slot_migration_write_pause_timeout(5000)
859 .cluster_slot_stats_enabled(true);
860 assert_eq!(b.cluster_node_timeout, Some(10000));
861 assert_eq!(b.cluster_require_full_coverage, Some(false));
862 assert_eq!(b.cluster_allow_reads_when_down, Some(true));
863 assert_eq!(b.cluster_allow_pubsubshard_when_down, Some(true));
864 assert_eq!(b.cluster_allow_replica_migration, Some(false));
865 assert_eq!(b.cluster_migration_barrier, Some(2));
866 assert_eq!(
867 b.cluster_announce_hostname.as_deref(),
868 Some("node.example.com")
869 );
870 assert_eq!(
871 b.cluster_preferred_endpoint_type.as_deref(),
872 Some("hostname")
873 );
874 assert_eq!(b.cluster_replica_no_failover, Some(true));
875 assert_eq!(b.cluster_replica_validity_factor, Some(0));
876 assert_eq!(b.cluster_announce_ip.as_deref(), Some("10.0.0.1"));
877 assert_eq!(b.cluster_announce_port, Some(7000));
878 assert_eq!(b.cluster_announce_bus_port, Some(17000));
879 assert_eq!(b.cluster_announce_tls_port, Some(7100));
880 assert_eq!(b.cluster_announce_human_nodename.as_deref(), Some("node-1"));
881 assert_eq!(b.cluster_port, Some(17000));
882 assert_eq!(b.cluster_link_sendbuf_limit, Some(67108864));
883 assert_eq!(b.cluster_compatibility_sample_ratio, Some(50));
884 assert_eq!(
885 b.cluster_slot_migration_handoff_max_lag_bytes,
886 Some(1048576)
887 );
888 assert_eq!(b.cluster_slot_migration_write_pause_timeout, Some(5000));
889 assert_eq!(b.cluster_slot_stats_enabled, Some(true));
890 }
891
892 #[test]
893 fn builder_replication_directives() {
894 let b = RedisCluster::builder()
895 .min_replicas_to_write(1)
896 .min_replicas_max_lag(10)
897 .repl_diskless_sync(true)
898 .repl_diskless_sync_delay(0)
899 .repl_ping_replica_period(5)
900 .repl_timeout(30);
901 assert_eq!(b.min_replicas_to_write, Some(1));
902 assert_eq!(b.min_replicas_max_lag, Some(10));
903 assert_eq!(b.repl_diskless_sync, Some(true));
904 assert_eq!(b.repl_diskless_sync_delay, Some(0));
905 assert_eq!(b.repl_ping_replica_period, Some(5));
906 assert_eq!(b.repl_timeout, Some(30));
907 }
908
909 #[test]
910 fn builder_logfile_and_extra() {
911 let b = RedisCluster::builder()
912 .logfile("/tmp/cluster.log")
913 .extra("maxmemory", "10mb");
914 assert_eq!(b.logfile.as_deref(), Some("/tmp/cluster.log"));
915 assert_eq!(b.extra.get("maxmemory").map(String::as_str), Some("10mb"));
916 }
917}