Skip to main content

redis_server_wrapper/
cluster.rs

1//! Redis Cluster lifecycle management built on `RedisServer`.
2
3use std::collections::HashMap;
4use std::time::Duration;
5
6use crate::cli::RedisCli;
7use crate::error::{Error, Result};
8use crate::server::{RedisServer, RedisServerHandle, SavePolicy};
9
10/// Context passed to the per-node configuration callback.
11///
12/// Provides information about the node being configured so the callback
13/// can make per-node decisions (e.g., different config for masters vs. replicas,
14/// or for a specific node index).
15pub struct NodeContext {
16    /// The pre-configured [`RedisServer`] builder for this node.
17    ///
18    /// All uniform cluster-level settings have already been applied.
19    /// The callback should modify and return this builder.
20    pub server: RedisServer,
21    /// Zero-based index of this node in the cluster.
22    ///
23    /// Nodes are ordered by port: masters occupy indices `0..masters`,
24    /// replicas occupy indices `masters..total`.
25    pub index: usize,
26    /// The port assigned to this node.
27    pub port: u16,
28    /// Total number of nodes in the cluster.
29    pub total_nodes: u16,
30    /// Number of master nodes.
31    pub masters: u16,
32    /// Number of replicas per master.
33    pub replicas_per_master: u16,
34}
35
36impl NodeContext {
37    /// Whether this node is a master (by initial topology order).
38    pub fn is_master(&self) -> bool {
39        self.index < self.masters as usize
40    }
41
42    /// Whether this node is a replica (by initial topology order).
43    pub fn is_replica(&self) -> bool {
44        !self.is_master()
45    }
46}
47
48/// Builder for a Redis Cluster.
49///
50/// # Example
51///
52/// ```no_run
53/// use redis_server_wrapper::RedisCluster;
54///
55/// # async fn example() {
56/// let cluster = RedisCluster::builder()
57///     .masters(3)
58///     .replicas_per_master(1)
59///     .base_port(7000)
60///     .start()
61///     .await
62///     .unwrap();
63///
64/// assert!(cluster.is_healthy().await);
65/// // Stopped automatically on Drop.
66/// # }
67/// ```
68pub struct RedisClusterBuilder {
69    masters: u16,
70    replicas_per_master: u16,
71    base_port: u16,
72    bind: String,
73    password: Option<String>,
74    logfile: Option<String>,
75    save: Option<SavePolicy>,
76    appendonly: Option<bool>,
77    cluster_node_timeout: Option<u64>,
78    cluster_require_full_coverage: Option<bool>,
79    cluster_allow_reads_when_down: Option<bool>,
80    cluster_allow_pubsubshard_when_down: Option<bool>,
81    cluster_allow_replica_migration: Option<bool>,
82    cluster_migration_barrier: Option<u32>,
83    cluster_announce_hostname: Option<String>,
84    cluster_announce_human_nodename: Option<String>,
85    cluster_preferred_endpoint_type: Option<String>,
86    cluster_replica_no_failover: Option<bool>,
87    cluster_replica_validity_factor: Option<u32>,
88    cluster_announce_ip: Option<String>,
89    cluster_announce_port: Option<u16>,
90    cluster_announce_bus_port: Option<u16>,
91    cluster_announce_tls_port: Option<u16>,
92    cluster_port: Option<u16>,
93    cluster_link_sendbuf_limit: Option<u64>,
94    cluster_compatibility_sample_ratio: Option<u32>,
95    cluster_slot_migration_handoff_max_lag_bytes: Option<u64>,
96    cluster_slot_migration_write_pause_timeout: Option<u64>,
97    cluster_slot_stats_enabled: Option<bool>,
98    min_replicas_to_write: Option<u32>,
99    min_replicas_max_lag: Option<u32>,
100    repl_diskless_sync: Option<bool>,
101    repl_diskless_sync_delay: Option<u32>,
102    repl_ping_replica_period: Option<u32>,
103    repl_timeout: Option<u32>,
104    extra: HashMap<String, String>,
105    redis_server_bin: String,
106    redis_cli_bin: String,
107    node_config_fn: Option<Box<dyn FnMut(NodeContext) -> RedisServer + Send>>,
108}
109
110impl RedisClusterBuilder {
111    /// Set the number of master nodes (default: `3`).
112    pub fn masters(mut self, n: u16) -> Self {
113        self.masters = n;
114        self
115    }
116
117    /// Set the number of replicas per master (default: `0`).
118    pub fn replicas_per_master(mut self, n: u16) -> Self {
119        self.replicas_per_master = n;
120        self
121    }
122
123    /// Set the base port for cluster nodes (default: `7000`).
124    ///
125    /// Nodes are assigned consecutive ports starting at this value.
126    pub fn base_port(mut self, port: u16) -> Self {
127        self.base_port = port;
128        self
129    }
130
131    /// Set the bind address for all cluster nodes (default: `"127.0.0.1"`).
132    pub fn bind(mut self, bind: impl Into<String>) -> Self {
133        self.bind = bind.into();
134        self
135    }
136
137    /// Set a `requirepass` password for all cluster nodes.
138    pub fn password(mut self, password: impl Into<String>) -> Self {
139        self.password = Some(password.into());
140        self
141    }
142
143    /// Set the log file path for all cluster nodes.
144    pub fn logfile(mut self, path: impl Into<String>) -> Self {
145        self.logfile = Some(path.into());
146        self
147    }
148
149    /// Set the RDB save policy for all cluster nodes.
150    ///
151    /// `true` omits the `save` directive (Redis defaults apply).
152    /// `false` emits `save ""` to disable RDB entirely.
153    pub fn save(mut self, save: bool) -> Self {
154        self.save = Some(if save {
155            SavePolicy::Default
156        } else {
157            SavePolicy::Disabled
158        });
159        self
160    }
161
162    /// Set a custom RDB save schedule for all cluster nodes.
163    pub fn save_schedule(mut self, schedule: Vec<(u64, u64)>) -> Self {
164        self.save = Some(SavePolicy::Custom(schedule));
165        self
166    }
167
168    /// Enable or disable AOF persistence for all cluster nodes.
169    pub fn appendonly(mut self, appendonly: bool) -> Self {
170        self.appendonly = Some(appendonly);
171        self
172    }
173
174    /// Set the cluster node timeout in milliseconds for all nodes (default: `5000`).
175    pub fn cluster_node_timeout(mut self, ms: u64) -> Self {
176        self.cluster_node_timeout = Some(ms);
177        self
178    }
179
180    /// Require full hash slot coverage for the cluster to accept writes.
181    pub fn cluster_require_full_coverage(mut self, require: bool) -> Self {
182        self.cluster_require_full_coverage = Some(require);
183        self
184    }
185
186    /// Allow reads when the cluster is down.
187    pub fn cluster_allow_reads_when_down(mut self, allow: bool) -> Self {
188        self.cluster_allow_reads_when_down = Some(allow);
189        self
190    }
191
192    /// Allow pubsub shard channels when the cluster is down.
193    pub fn cluster_allow_pubsubshard_when_down(mut self, allow: bool) -> Self {
194        self.cluster_allow_pubsubshard_when_down = Some(allow);
195        self
196    }
197
198    /// Allow automatic replica migration between masters.
199    pub fn cluster_allow_replica_migration(mut self, allow: bool) -> Self {
200        self.cluster_allow_replica_migration = Some(allow);
201        self
202    }
203
204    /// Set the minimum number of replicas a master must retain before one can migrate.
205    pub fn cluster_migration_barrier(mut self, barrier: u32) -> Self {
206        self.cluster_migration_barrier = Some(barrier);
207        self
208    }
209
210    /// Set the hostname each node announces to the cluster.
211    pub fn cluster_announce_hostname(mut self, hostname: impl Into<String>) -> Self {
212        self.cluster_announce_hostname = Some(hostname.into());
213        self
214    }
215
216    /// Set the preferred endpoint type for cluster redirections (e.g. `"ip"`, `"hostname"`).
217    pub fn cluster_preferred_endpoint_type(mut self, endpoint_type: impl Into<String>) -> Self {
218        self.cluster_preferred_endpoint_type = Some(endpoint_type.into());
219        self
220    }
221
222    /// Prevent replicas from attempting automatic failover.
223    ///
224    /// Manual failover via `CLUSTER FAILOVER` still works.
225    pub fn cluster_replica_no_failover(mut self, no_failover: bool) -> Self {
226        self.cluster_replica_no_failover = Some(no_failover);
227        self
228    }
229
230    /// Set the replica validity factor for failover eligibility.
231    ///
232    /// A replica will not failover if it has been disconnected from the master
233    /// for more than `(node-timeout * factor) + repl-ping-replica-period` seconds.
234    /// Set to `0` to allow any replica to failover regardless of staleness.
235    pub fn cluster_replica_validity_factor(mut self, factor: u32) -> Self {
236        self.cluster_replica_validity_factor = Some(factor);
237        self
238    }
239
240    /// Set the IP address nodes announce for client redirects (MOVED/ASKING).
241    pub fn cluster_announce_ip(mut self, ip: impl Into<String>) -> Self {
242        self.cluster_announce_ip = Some(ip.into());
243        self
244    }
245
246    /// Set the client port nodes announce for redirects.
247    pub fn cluster_announce_port(mut self, port: u16) -> Self {
248        self.cluster_announce_port = Some(port);
249        self
250    }
251
252    /// Set the cluster bus port nodes announce for gossip.
253    pub fn cluster_announce_bus_port(mut self, port: u16) -> Self {
254        self.cluster_announce_bus_port = Some(port);
255        self
256    }
257
258    /// Set the TLS client port nodes announce for redirects.
259    pub fn cluster_announce_tls_port(mut self, port: u16) -> Self {
260        self.cluster_announce_tls_port = Some(port);
261        self
262    }
263
264    /// Set a friendly node name broadcast for debugging/admin display.
265    pub fn cluster_announce_human_nodename(mut self, name: impl Into<String>) -> Self {
266        self.cluster_announce_human_nodename = Some(name.into());
267        self
268    }
269
270    /// Set a dedicated cluster bus port (default: client port + 10000).
271    pub fn cluster_port(mut self, port: u16) -> Self {
272        self.cluster_port = Some(port);
273        self
274    }
275
276    /// Set the maximum memory for a cluster bus link's output buffer.
277    ///
278    /// When exceeded, the link is disconnected. Set to `0` for unlimited.
279    pub fn cluster_link_sendbuf_limit(mut self, limit: u64) -> Self {
280        self.cluster_link_sendbuf_limit = Some(limit);
281        self
282    }
283
284    /// Set the cluster compatibility sample ratio.
285    pub fn cluster_compatibility_sample_ratio(mut self, ratio: u32) -> Self {
286        self.cluster_compatibility_sample_ratio = Some(ratio);
287        self
288    }
289
290    /// Set the maximum replication lag in bytes before slot migration handoff.
291    pub fn cluster_slot_migration_handoff_max_lag_bytes(mut self, bytes: u64) -> Self {
292        self.cluster_slot_migration_handoff_max_lag_bytes = Some(bytes);
293        self
294    }
295
296    /// Set the write pause timeout in milliseconds during slot migration.
297    pub fn cluster_slot_migration_write_pause_timeout(mut self, ms: u64) -> Self {
298        self.cluster_slot_migration_write_pause_timeout = Some(ms);
299        self
300    }
301
302    /// Enable per-slot statistics tracking.
303    pub fn cluster_slot_stats_enabled(mut self, enable: bool) -> Self {
304        self.cluster_slot_stats_enabled = Some(enable);
305        self
306    }
307
308    // -- replication directives relevant in cluster mode --
309
310    /// Set the minimum number of connected replicas before the master accepts writes.
311    ///
312    /// Useful for split-brain protection: a partitioned master with no reachable
313    /// replicas stops accepting writes, reducing data loss during partitions.
314    pub fn min_replicas_to_write(mut self, n: u32) -> Self {
315        self.min_replicas_to_write = Some(n);
316        self
317    }
318
319    /// Set the maximum replication lag (seconds) before a replica is considered disconnected.
320    ///
321    /// Used with `min_replicas_to_write` to determine if enough replicas are connected.
322    pub fn min_replicas_max_lag(mut self, seconds: u32) -> Self {
323        self.min_replicas_max_lag = Some(seconds);
324        self
325    }
326
327    /// Enable or disable diskless replication sync.
328    ///
329    /// Diskless sync is faster but uses more memory during transfer.
330    pub fn repl_diskless_sync(mut self, enable: bool) -> Self {
331        self.repl_diskless_sync = Some(enable);
332        self
333    }
334
335    /// Set the delay in seconds before starting a diskless replication transfer.
336    ///
337    /// Allows batching multiple replicas syncing at once.
338    pub fn repl_diskless_sync_delay(mut self, seconds: u32) -> Self {
339        self.repl_diskless_sync_delay = Some(seconds);
340        self
341    }
342
343    /// Set how often replicas ping the master (seconds).
344    ///
345    /// Used in the replica validity calculation:
346    /// `(node-timeout * validity-factor) + repl-ping-replica-period`.
347    pub fn repl_ping_replica_period(mut self, seconds: u32) -> Self {
348        self.repl_ping_replica_period = Some(seconds);
349        self
350    }
351
352    /// Set the replication timeout in seconds.
353    ///
354    /// If a replica doesn't hear from master for this long, it considers the link dead.
355    pub fn repl_timeout(mut self, seconds: u32) -> Self {
356        self.repl_timeout = Some(seconds);
357        self
358    }
359
360    /// Set a per-node configuration callback.
361    ///
362    /// The callback receives a [`NodeContext`] containing the pre-configured
363    /// [`RedisServer`] builder (with all uniform settings already applied) and
364    /// metadata about the node's position in the cluster. It must return the
365    /// (possibly modified) `RedisServer` builder.
366    ///
367    /// # Example
368    ///
369    /// ```no_run
370    /// use redis_server_wrapper::RedisCluster;
371    ///
372    /// # async fn example() {
373    /// let cluster = RedisCluster::builder()
374    ///     .masters(3)
375    ///     .replicas_per_master(1)
376    ///     .base_port(7000)
377    ///     .with_node_config(|ctx| {
378    ///         let is_replica = ctx.is_replica();
379    ///         let index = ctx.index;
380    ///         let mut server = ctx.server;
381    ///         if is_replica {
382    ///             server = server.cluster_replica_no_failover(true);
383    ///         }
384    ///         if index == 0 {
385    ///             server = server.maxmemory("512mb");
386    ///         }
387    ///         server
388    ///     })
389    ///     .start()
390    ///     .await
391    ///     .unwrap();
392    /// # }
393    /// ```
394    pub fn with_node_config(
395        mut self,
396        f: impl FnMut(NodeContext) -> RedisServer + Send + 'static,
397    ) -> Self {
398        self.node_config_fn = Some(Box::new(f));
399        self
400    }
401
402    /// Set an arbitrary config directive for all cluster nodes.
403    pub fn extra(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
404        self.extra.insert(key.into(), value.into());
405        self
406    }
407
408    /// Set a custom `redis-server` binary path.
409    pub fn redis_server_bin(mut self, bin: impl Into<String>) -> Self {
410        self.redis_server_bin = bin.into();
411        self
412    }
413
414    /// Set a custom `redis-cli` binary path.
415    pub fn redis_cli_bin(mut self, bin: impl Into<String>) -> Self {
416        self.redis_cli_bin = bin.into();
417        self
418    }
419
420    fn total_nodes(&self) -> u16 {
421        self.masters * (1 + self.replicas_per_master)
422    }
423
424    fn ports(&self) -> impl Iterator<Item = u16> {
425        let base = self.base_port;
426        let total = self.total_nodes();
427        (0..total).map(move |i| base + i)
428    }
429
430    /// Start all nodes and form the cluster.
431    pub async fn start(mut self) -> Result<RedisClusterHandle> {
432        // Stop any leftover nodes from previous runs.
433        for port in self.ports() {
434            let mut cli = RedisCli::new()
435                .bin(&self.redis_cli_bin)
436                .host(&self.bind)
437                .port(port);
438            if let Some(ref password) = self.password {
439                cli = cli.password(password);
440            }
441            cli.shutdown();
442        }
443        tokio::time::sleep(Duration::from_millis(500)).await;
444
445        // Start each node.
446        let total_nodes = self.total_nodes();
447        let ports: Vec<u16> = self.ports().collect();
448        let mut nodes = Vec::new();
449        for (index, port) in ports.into_iter().enumerate() {
450            let node_dir = std::env::temp_dir().join(format!("redis-cluster-wrapper/node-{port}"));
451            let _ = std::fs::remove_dir_all(&node_dir);
452            let mut server = RedisServer::new()
453                .port(port)
454                .bind(&self.bind)
455                .dir(node_dir)
456                .cluster_enabled(true)
457                .cluster_node_timeout(self.cluster_node_timeout.unwrap_or(5000))
458                .redis_server_bin(&self.redis_server_bin)
459                .redis_cli_bin(&self.redis_cli_bin);
460            if let Some(v) = self.cluster_require_full_coverage {
461                server = server.cluster_require_full_coverage(v);
462            }
463            if let Some(v) = self.cluster_allow_reads_when_down {
464                server = server.cluster_allow_reads_when_down(v);
465            }
466            if let Some(v) = self.cluster_allow_pubsubshard_when_down {
467                server = server.cluster_allow_pubsubshard_when_down(v);
468            }
469            if let Some(v) = self.cluster_allow_replica_migration {
470                server = server.cluster_allow_replica_migration(v);
471            }
472            if let Some(barrier) = self.cluster_migration_barrier {
473                server = server.cluster_migration_barrier(barrier);
474            }
475            if let Some(ref hostname) = self.cluster_announce_hostname {
476                server = server.cluster_announce_hostname(hostname.clone());
477            }
478            if let Some(ref endpoint_type) = self.cluster_preferred_endpoint_type {
479                server = server.cluster_preferred_endpoint_type(endpoint_type.clone());
480            }
481            if let Some(v) = self.cluster_replica_no_failover {
482                server = server.cluster_replica_no_failover(v);
483            }
484            if let Some(factor) = self.cluster_replica_validity_factor {
485                server = server.cluster_replica_validity_factor(factor);
486            }
487            if let Some(ref ip) = self.cluster_announce_ip {
488                server = server.cluster_announce_ip(ip.clone());
489            }
490            if let Some(port) = self.cluster_announce_port {
491                server = server.cluster_announce_port(port);
492            }
493            if let Some(port) = self.cluster_announce_bus_port {
494                server = server.cluster_announce_bus_port(port);
495            }
496            if let Some(port) = self.cluster_announce_tls_port {
497                server = server.cluster_announce_tls_port(port);
498            }
499            if let Some(ref name) = self.cluster_announce_human_nodename {
500                server = server.cluster_announce_human_nodename(name.clone());
501            }
502            if let Some(port) = self.cluster_port {
503                server = server.cluster_port(port);
504            }
505            if let Some(limit) = self.cluster_link_sendbuf_limit {
506                server = server.cluster_link_sendbuf_limit(limit);
507            }
508            if let Some(ratio) = self.cluster_compatibility_sample_ratio {
509                server = server.cluster_compatibility_sample_ratio(ratio);
510            }
511            if let Some(bytes) = self.cluster_slot_migration_handoff_max_lag_bytes {
512                server = server.cluster_slot_migration_handoff_max_lag_bytes(bytes);
513            }
514            if let Some(ms) = self.cluster_slot_migration_write_pause_timeout {
515                server = server.cluster_slot_migration_write_pause_timeout(ms);
516            }
517            if let Some(v) = self.cluster_slot_stats_enabled {
518                server = server.cluster_slot_stats_enabled(v);
519            }
520            if let Some(n) = self.min_replicas_to_write {
521                server = server.min_replicas_to_write(n);
522            }
523            if let Some(seconds) = self.min_replicas_max_lag {
524                server = server.min_replicas_max_lag(seconds);
525            }
526            if let Some(v) = self.repl_diskless_sync {
527                server = server.repl_diskless_sync(v);
528            }
529            if let Some(seconds) = self.repl_diskless_sync_delay {
530                server = server.repl_diskless_sync_delay(seconds);
531            }
532            if let Some(seconds) = self.repl_ping_replica_period {
533                server = server.repl_ping_replica_period(seconds);
534            }
535            if let Some(seconds) = self.repl_timeout {
536                server = server.repl_timeout(seconds);
537            }
538            if let Some(ref password) = self.password {
539                server = server.password(password).masterauth(password);
540            }
541            if let Some(ref logfile) = self.logfile {
542                server = server.logfile(logfile.clone());
543            }
544            if let Some(ref save) = self.save {
545                match save {
546                    SavePolicy::Disabled => server = server.save(false),
547                    SavePolicy::Default => server = server.save(true),
548                    SavePolicy::Custom(pairs) => {
549                        server = server.save_schedule(pairs.clone());
550                    }
551                }
552            }
553            if let Some(appendonly) = self.appendonly {
554                server = server.appendonly(appendonly);
555            }
556            for (key, value) in &self.extra {
557                server = server.extra(key.clone(), value.clone());
558            }
559            // Apply per-node customization if configured.
560            if let Some(ref mut f) = self.node_config_fn {
561                server = f(NodeContext {
562                    server,
563                    index,
564                    port,
565                    total_nodes,
566                    masters: self.masters,
567                    replicas_per_master: self.replicas_per_master,
568                });
569            }
570            let handle = server.start().await?;
571            nodes.push(handle);
572        }
573
574        // Form the cluster.
575        let node_addrs: Vec<String> = nodes.iter().map(|n| n.addr()).collect();
576        let mut cli = RedisCli::new()
577            .bin(&self.redis_cli_bin)
578            .host(&self.bind)
579            .port(self.base_port);
580        if let Some(ref password) = self.password {
581            cli = cli.password(password);
582        }
583        cli.cluster_create(&node_addrs, self.replicas_per_master)
584            .await?;
585
586        // Wait for convergence.
587        tokio::time::sleep(Duration::from_secs(2)).await;
588
589        Ok(RedisClusterHandle {
590            nodes,
591            num_masters: self.masters,
592            bind: self.bind,
593            base_port: self.base_port,
594            password: self.password,
595            redis_cli_bin: self.redis_cli_bin,
596        })
597    }
598}
599
600/// A running Redis Cluster. Stops all nodes on Drop.
601pub struct RedisClusterHandle {
602    nodes: Vec<RedisServerHandle>,
603    num_masters: u16,
604    bind: String,
605    base_port: u16,
606    password: Option<String>,
607    redis_cli_bin: String,
608}
609
610/// Entry point for building a Redis Cluster topology.
611///
612/// Call [`RedisCluster::builder`] to obtain a [`RedisClusterBuilder`], then
613/// configure it and call [`RedisClusterBuilder::start`] to launch the cluster.
614pub struct RedisCluster;
615
616impl RedisCluster {
617    /// Create a new cluster builder with defaults (3 masters, 0 replicas, port 7000).
618    pub fn builder() -> RedisClusterBuilder {
619        RedisClusterBuilder {
620            masters: 3,
621            replicas_per_master: 0,
622            base_port: 7000,
623            bind: "127.0.0.1".into(),
624            password: None,
625            logfile: None,
626            save: None,
627            appendonly: None,
628            cluster_node_timeout: None,
629            cluster_require_full_coverage: None,
630            cluster_allow_reads_when_down: None,
631            cluster_allow_pubsubshard_when_down: None,
632            cluster_allow_replica_migration: None,
633            cluster_migration_barrier: None,
634            cluster_announce_hostname: None,
635            cluster_announce_human_nodename: None,
636            cluster_preferred_endpoint_type: None,
637            cluster_replica_no_failover: None,
638            cluster_replica_validity_factor: None,
639            cluster_announce_ip: None,
640            cluster_announce_port: None,
641            cluster_announce_bus_port: None,
642            cluster_announce_tls_port: None,
643            cluster_port: None,
644            cluster_link_sendbuf_limit: None,
645            cluster_compatibility_sample_ratio: None,
646            cluster_slot_migration_handoff_max_lag_bytes: None,
647            cluster_slot_migration_write_pause_timeout: None,
648            cluster_slot_stats_enabled: None,
649            min_replicas_to_write: None,
650            min_replicas_max_lag: None,
651            repl_diskless_sync: None,
652            repl_diskless_sync_delay: None,
653            repl_ping_replica_period: None,
654            repl_timeout: None,
655            extra: HashMap::new(),
656            redis_server_bin: "redis-server".into(),
657            redis_cli_bin: "redis-cli".into(),
658            node_config_fn: None,
659        }
660    }
661}
662
663impl RedisClusterHandle {
664    /// The seed address (first node).
665    pub fn addr(&self) -> String {
666        format!("{}:{}", self.bind, self.base_port)
667    }
668
669    /// All node addresses.
670    pub fn node_addrs(&self) -> Vec<String> {
671        self.nodes.iter().map(|n| n.addr()).collect()
672    }
673
674    /// The PIDs of all `redis-server` processes in the cluster.
675    pub fn pids(&self) -> Vec<u32> {
676        self.nodes.iter().map(|n| n.pid()).collect()
677    }
678
679    /// Check if all nodes are alive.
680    pub async fn all_alive(&self) -> bool {
681        for node in &self.nodes {
682            if !node.is_alive().await {
683                return false;
684            }
685        }
686        true
687    }
688
689    /// Check CLUSTER INFO for state=ok and all slots assigned.
690    pub async fn is_healthy(&self) -> bool {
691        for node in &self.nodes {
692            if let Ok(info) = node.run(&["CLUSTER", "INFO"]).await {
693                if info.contains("cluster_state:ok") && info.contains("cluster_slots_ok:16384") {
694                    return true;
695                }
696            }
697        }
698        false
699    }
700
701    /// Wait until the cluster is healthy or timeout.
702    pub async fn wait_for_healthy(&self, timeout: Duration) -> Result<()> {
703        let start = std::time::Instant::now();
704        loop {
705            if self.is_healthy().await {
706                return Ok(());
707            }
708            if start.elapsed() > timeout {
709                return Err(Error::Timeout {
710                    message: "cluster did not become healthy in time".into(),
711                });
712            }
713            tokio::time::sleep(Duration::from_millis(500)).await;
714        }
715    }
716
717    /// Access a specific node by index.
718    ///
719    /// Nodes are ordered by port: masters first (indices `0..masters`),
720    /// then replicas (indices `masters..total`).
721    ///
722    /// # Panics
723    ///
724    /// Panics if `index >= total_nodes`.
725    pub fn node(&self, index: usize) -> &RedisServerHandle {
726        &self.nodes[index]
727    }
728
729    /// All node handles.
730    pub fn nodes(&self) -> &[RedisServerHandle] {
731        &self.nodes
732    }
733
734    /// The number of master nodes in the cluster.
735    pub fn num_masters(&self) -> u16 {
736        self.num_masters
737    }
738
739    /// Handles for the master nodes (the first `masters` nodes by port order).
740    ///
741    /// Note: this reflects the *initial* topology. After a failover, the actual
742    /// roles may differ from the startup ordering.
743    pub fn master_nodes(&self) -> &[RedisServerHandle] {
744        &self.nodes[..self.num_masters as usize]
745    }
746
747    /// Handles for the replica nodes (all nodes after the masters by port order).
748    ///
749    /// Note: this reflects the *initial* topology. After a failover, the actual
750    /// roles may differ from the startup ordering.
751    pub fn replica_nodes(&self) -> &[RedisServerHandle] {
752        &self.nodes[self.num_masters as usize..]
753    }
754
755    /// Run `CONFIG SET` on all nodes.
756    pub async fn config_set_all(&self, key: &str, value: &str) -> Result<()> {
757        for node in &self.nodes {
758            node.run(&["CONFIG", "SET", key, value]).await?;
759        }
760        Ok(())
761    }
762
763    /// Run `CONFIG SET` on master nodes only (initial topology).
764    pub async fn config_set_masters(&self, key: &str, value: &str) -> Result<()> {
765        for node in self.master_nodes() {
766            node.run(&["CONFIG", "SET", key, value]).await?;
767        }
768        Ok(())
769    }
770
771    /// Run `CONFIG SET` on replica nodes only (initial topology).
772    pub async fn config_set_replicas(&self, key: &str, value: &str) -> Result<()> {
773        for node in self.replica_nodes() {
774            node.run(&["CONFIG", "SET", key, value]).await?;
775        }
776        Ok(())
777    }
778
779    /// Get a `RedisCli` for the seed node.
780    pub fn cli(&self) -> RedisCli {
781        let mut cli = RedisCli::new()
782            .bin(&self.redis_cli_bin)
783            .host(&self.bind)
784            .port(self.base_port);
785        if let Some(ref password) = self.password {
786            cli = cli.password(password);
787        }
788        cli
789    }
790}
791
792impl Drop for RedisClusterHandle {
793    fn drop(&mut self) {
794        // RedisServerHandle::drop() handles each node.
795    }
796}
797
798#[cfg(test)]
799mod tests {
800    use super::*;
801
802    #[test]
803    fn builder_defaults() {
804        let b = RedisCluster::builder();
805        assert_eq!(b.masters, 3);
806        assert_eq!(b.replicas_per_master, 0);
807        assert_eq!(b.base_port, 7000);
808        assert_eq!(b.password, None);
809        assert!(b.logfile.is_none());
810        assert!(b.extra.is_empty());
811        assert_eq!(b.total_nodes(), 3);
812        assert!(b.cluster_node_timeout.is_none());
813        assert!(b.cluster_require_full_coverage.is_none());
814        assert!(b.cluster_allow_reads_when_down.is_none());
815        assert!(b.cluster_allow_pubsubshard_when_down.is_none());
816        assert!(b.cluster_allow_replica_migration.is_none());
817        assert!(b.cluster_migration_barrier.is_none());
818        assert!(b.cluster_announce_hostname.is_none());
819        assert!(b.cluster_preferred_endpoint_type.is_none());
820    }
821
822    #[test]
823    fn builder_with_replicas() {
824        let b = RedisCluster::builder().masters(3).replicas_per_master(1);
825        assert_eq!(b.total_nodes(), 6);
826        let ports: Vec<u16> = b.ports().collect();
827        assert_eq!(ports, vec![7000, 7001, 7002, 7003, 7004, 7005]);
828    }
829
830    #[test]
831    fn builder_password() {
832        let b = RedisCluster::builder().password("secret");
833        assert_eq!(b.password.as_deref(), Some("secret"));
834    }
835
836    #[test]
837    fn builder_cluster_directives() {
838        let b = RedisCluster::builder()
839            .cluster_node_timeout(10000)
840            .cluster_require_full_coverage(false)
841            .cluster_allow_reads_when_down(true)
842            .cluster_allow_pubsubshard_when_down(true)
843            .cluster_allow_replica_migration(false)
844            .cluster_migration_barrier(2)
845            .cluster_announce_hostname("node.example.com")
846            .cluster_preferred_endpoint_type("hostname")
847            .cluster_replica_no_failover(true)
848            .cluster_replica_validity_factor(0)
849            .cluster_announce_ip("10.0.0.1")
850            .cluster_announce_port(7000)
851            .cluster_announce_bus_port(17000)
852            .cluster_announce_tls_port(7100)
853            .cluster_announce_human_nodename("node-1")
854            .cluster_port(17000)
855            .cluster_link_sendbuf_limit(67108864)
856            .cluster_compatibility_sample_ratio(50)
857            .cluster_slot_migration_handoff_max_lag_bytes(1048576)
858            .cluster_slot_migration_write_pause_timeout(5000)
859            .cluster_slot_stats_enabled(true);
860        assert_eq!(b.cluster_node_timeout, Some(10000));
861        assert_eq!(b.cluster_require_full_coverage, Some(false));
862        assert_eq!(b.cluster_allow_reads_when_down, Some(true));
863        assert_eq!(b.cluster_allow_pubsubshard_when_down, Some(true));
864        assert_eq!(b.cluster_allow_replica_migration, Some(false));
865        assert_eq!(b.cluster_migration_barrier, Some(2));
866        assert_eq!(
867            b.cluster_announce_hostname.as_deref(),
868            Some("node.example.com")
869        );
870        assert_eq!(
871            b.cluster_preferred_endpoint_type.as_deref(),
872            Some("hostname")
873        );
874        assert_eq!(b.cluster_replica_no_failover, Some(true));
875        assert_eq!(b.cluster_replica_validity_factor, Some(0));
876        assert_eq!(b.cluster_announce_ip.as_deref(), Some("10.0.0.1"));
877        assert_eq!(b.cluster_announce_port, Some(7000));
878        assert_eq!(b.cluster_announce_bus_port, Some(17000));
879        assert_eq!(b.cluster_announce_tls_port, Some(7100));
880        assert_eq!(b.cluster_announce_human_nodename.as_deref(), Some("node-1"));
881        assert_eq!(b.cluster_port, Some(17000));
882        assert_eq!(b.cluster_link_sendbuf_limit, Some(67108864));
883        assert_eq!(b.cluster_compatibility_sample_ratio, Some(50));
884        assert_eq!(
885            b.cluster_slot_migration_handoff_max_lag_bytes,
886            Some(1048576)
887        );
888        assert_eq!(b.cluster_slot_migration_write_pause_timeout, Some(5000));
889        assert_eq!(b.cluster_slot_stats_enabled, Some(true));
890    }
891
892    #[test]
893    fn builder_replication_directives() {
894        let b = RedisCluster::builder()
895            .min_replicas_to_write(1)
896            .min_replicas_max_lag(10)
897            .repl_diskless_sync(true)
898            .repl_diskless_sync_delay(0)
899            .repl_ping_replica_period(5)
900            .repl_timeout(30);
901        assert_eq!(b.min_replicas_to_write, Some(1));
902        assert_eq!(b.min_replicas_max_lag, Some(10));
903        assert_eq!(b.repl_diskless_sync, Some(true));
904        assert_eq!(b.repl_diskless_sync_delay, Some(0));
905        assert_eq!(b.repl_ping_replica_period, Some(5));
906        assert_eq!(b.repl_timeout, Some(30));
907    }
908
909    #[test]
910    fn builder_logfile_and_extra() {
911        let b = RedisCluster::builder()
912            .logfile("/tmp/cluster.log")
913            .extra("maxmemory", "10mb");
914        assert_eq!(b.logfile.as_deref(), Some("/tmp/cluster.log"));
915        assert_eq!(b.extra.get("maxmemory").map(String::as_str), Some("10mb"));
916    }
917}