Skip to main content

redis_server_wrapper/
cluster.rs

1//! Redis Cluster lifecycle management built on `RedisServer`.
2
3use std::collections::HashMap;
4use std::path::PathBuf;
5use std::time::Duration;
6
7use crate::cli::RedisCli;
8use crate::error::{Error, Result};
9use crate::server::{RedisServer, RedisServerHandle, SavePolicy};
10
11/// Context passed to the per-node configuration callback.
12///
13/// Provides information about the node being configured so the callback
14/// can make per-node decisions (e.g., different config for masters vs. replicas,
15/// or for a specific node index).
16pub struct NodeContext {
17    /// The pre-configured [`RedisServer`] builder for this node.
18    ///
19    /// All uniform cluster-level settings have already been applied.
20    /// The callback should modify and return this builder.
21    pub server: RedisServer,
22    /// Zero-based index of this node in the cluster.
23    ///
24    /// Nodes are ordered by port: masters occupy indices `0..masters`,
25    /// replicas occupy indices `masters..total`.
26    pub index: usize,
27    /// The port assigned to this node.
28    pub port: u16,
29    /// Total number of nodes in the cluster.
30    pub total_nodes: u16,
31    /// Number of master nodes.
32    pub masters: u16,
33    /// Number of replicas per master.
34    pub replicas_per_master: u16,
35}
36
37impl NodeContext {
38    /// Whether this node is a master (by initial topology order).
39    pub fn is_master(&self) -> bool {
40        self.index < self.masters as usize
41    }
42
43    /// Whether this node is a replica (by initial topology order).
44    pub fn is_replica(&self) -> bool {
45        !self.is_master()
46    }
47}
48
49/// Builder for a Redis Cluster.
50///
51/// # Example
52///
53/// ```no_run
54/// use redis_server_wrapper::RedisCluster;
55///
56/// # async fn example() {
57/// let cluster = RedisCluster::builder()
58///     .masters(3)
59///     .replicas_per_master(1)
60///     .base_port(7000)
61///     .start()
62///     .await
63///     .unwrap();
64///
65/// assert!(cluster.is_healthy().await);
66/// // Stopped automatically on Drop.
67/// # }
68/// ```
69pub struct RedisClusterBuilder {
70    masters: u16,
71    replicas_per_master: u16,
72    base_port: u16,
73    bind: String,
74    password: Option<String>,
75    logfile: Option<String>,
76    save: Option<SavePolicy>,
77    appendonly: Option<bool>,
78    cluster_node_timeout: Option<u64>,
79    cluster_require_full_coverage: Option<bool>,
80    cluster_allow_reads_when_down: Option<bool>,
81    cluster_allow_pubsubshard_when_down: Option<bool>,
82    cluster_allow_replica_migration: Option<bool>,
83    cluster_migration_barrier: Option<u32>,
84    cluster_announce_hostname: Option<String>,
85    cluster_announce_human_nodename: Option<String>,
86    cluster_preferred_endpoint_type: Option<String>,
87    cluster_replica_no_failover: Option<bool>,
88    cluster_replica_validity_factor: Option<u32>,
89    cluster_announce_ip: Option<String>,
90    cluster_announce_port: Option<u16>,
91    cluster_announce_bus_port: Option<u16>,
92    cluster_announce_tls_port: Option<u16>,
93    cluster_port: Option<u16>,
94    cluster_link_sendbuf_limit: Option<u64>,
95    cluster_compatibility_sample_ratio: Option<u32>,
96    cluster_slot_migration_handoff_max_lag_bytes: Option<u64>,
97    cluster_slot_migration_write_pause_timeout: Option<u64>,
98    cluster_slot_stats_enabled: Option<bool>,
99    min_replicas_to_write: Option<u32>,
100    min_replicas_max_lag: Option<u32>,
101    repl_diskless_sync: Option<bool>,
102    repl_diskless_sync_delay: Option<u32>,
103    repl_ping_replica_period: Option<u32>,
104    repl_timeout: Option<u32>,
105    tls_port: Option<u16>,
106    tls_cert_file: Option<PathBuf>,
107    tls_key_file: Option<PathBuf>,
108    tls_ca_cert_file: Option<PathBuf>,
109    tls_ca_cert_dir: Option<PathBuf>,
110    tls_auth_clients: Option<bool>,
111    tls_replication: Option<bool>,
112    tls_cluster: Option<bool>,
113    extra: HashMap<String, String>,
114    redis_server_bin: String,
115    redis_cli_bin: String,
116    node_config_fn: Option<Box<dyn FnMut(NodeContext) -> RedisServer + Send>>,
117}
118
119impl RedisClusterBuilder {
120    /// Set the number of master nodes (default: `3`).
121    pub fn masters(mut self, n: u16) -> Self {
122        self.masters = n;
123        self
124    }
125
126    /// Set the number of replicas per master (default: `0`).
127    pub fn replicas_per_master(mut self, n: u16) -> Self {
128        self.replicas_per_master = n;
129        self
130    }
131
132    /// Set the base port for cluster nodes (default: `7000`).
133    ///
134    /// Nodes are assigned consecutive ports starting at this value.
135    pub fn base_port(mut self, port: u16) -> Self {
136        self.base_port = port;
137        self
138    }
139
140    /// Set the bind address for all cluster nodes (default: `"127.0.0.1"`).
141    pub fn bind(mut self, bind: impl Into<String>) -> Self {
142        self.bind = bind.into();
143        self
144    }
145
146    /// Set a `requirepass` password for all cluster nodes.
147    pub fn password(mut self, password: impl Into<String>) -> Self {
148        self.password = Some(password.into());
149        self
150    }
151
152    /// Set the log file path for all cluster nodes.
153    pub fn logfile(mut self, path: impl Into<String>) -> Self {
154        self.logfile = Some(path.into());
155        self
156    }
157
158    /// Set the RDB save policy for all cluster nodes.
159    ///
160    /// `true` omits the `save` directive (Redis defaults apply).
161    /// `false` emits `save ""` to disable RDB entirely.
162    pub fn save(mut self, save: bool) -> Self {
163        self.save = Some(if save {
164            SavePolicy::Default
165        } else {
166            SavePolicy::Disabled
167        });
168        self
169    }
170
171    /// Set a custom RDB save schedule for all cluster nodes.
172    pub fn save_schedule(mut self, schedule: Vec<(u64, u64)>) -> Self {
173        self.save = Some(SavePolicy::Custom(schedule));
174        self
175    }
176
177    /// Enable or disable AOF persistence for all cluster nodes.
178    pub fn appendonly(mut self, appendonly: bool) -> Self {
179        self.appendonly = Some(appendonly);
180        self
181    }
182
183    /// Set the cluster node timeout in milliseconds for all nodes (default: `5000`).
184    pub fn cluster_node_timeout(mut self, ms: u64) -> Self {
185        self.cluster_node_timeout = Some(ms);
186        self
187    }
188
189    /// Require full hash slot coverage for the cluster to accept writes.
190    pub fn cluster_require_full_coverage(mut self, require: bool) -> Self {
191        self.cluster_require_full_coverage = Some(require);
192        self
193    }
194
195    /// Allow reads when the cluster is down.
196    pub fn cluster_allow_reads_when_down(mut self, allow: bool) -> Self {
197        self.cluster_allow_reads_when_down = Some(allow);
198        self
199    }
200
201    /// Allow pubsub shard channels when the cluster is down.
202    pub fn cluster_allow_pubsubshard_when_down(mut self, allow: bool) -> Self {
203        self.cluster_allow_pubsubshard_when_down = Some(allow);
204        self
205    }
206
207    /// Allow automatic replica migration between masters.
208    pub fn cluster_allow_replica_migration(mut self, allow: bool) -> Self {
209        self.cluster_allow_replica_migration = Some(allow);
210        self
211    }
212
213    /// Set the minimum number of replicas a master must retain before one can migrate.
214    pub fn cluster_migration_barrier(mut self, barrier: u32) -> Self {
215        self.cluster_migration_barrier = Some(barrier);
216        self
217    }
218
219    /// Set the hostname each node announces to the cluster.
220    pub fn cluster_announce_hostname(mut self, hostname: impl Into<String>) -> Self {
221        self.cluster_announce_hostname = Some(hostname.into());
222        self
223    }
224
225    /// Set the preferred endpoint type for cluster redirections (e.g. `"ip"`, `"hostname"`).
226    pub fn cluster_preferred_endpoint_type(mut self, endpoint_type: impl Into<String>) -> Self {
227        self.cluster_preferred_endpoint_type = Some(endpoint_type.into());
228        self
229    }
230
231    /// Prevent replicas from attempting automatic failover.
232    ///
233    /// Manual failover via `CLUSTER FAILOVER` still works.
234    pub fn cluster_replica_no_failover(mut self, no_failover: bool) -> Self {
235        self.cluster_replica_no_failover = Some(no_failover);
236        self
237    }
238
239    /// Set the replica validity factor for failover eligibility.
240    ///
241    /// A replica will not failover if it has been disconnected from the master
242    /// for more than `(node-timeout * factor) + repl-ping-replica-period` seconds.
243    /// Set to `0` to allow any replica to failover regardless of staleness.
244    pub fn cluster_replica_validity_factor(mut self, factor: u32) -> Self {
245        self.cluster_replica_validity_factor = Some(factor);
246        self
247    }
248
249    /// Set the IP address nodes announce for client redirects (MOVED/ASKING).
250    pub fn cluster_announce_ip(mut self, ip: impl Into<String>) -> Self {
251        self.cluster_announce_ip = Some(ip.into());
252        self
253    }
254
255    /// Set the client port nodes announce for redirects.
256    pub fn cluster_announce_port(mut self, port: u16) -> Self {
257        self.cluster_announce_port = Some(port);
258        self
259    }
260
261    /// Set the cluster bus port nodes announce for gossip.
262    pub fn cluster_announce_bus_port(mut self, port: u16) -> Self {
263        self.cluster_announce_bus_port = Some(port);
264        self
265    }
266
267    /// Set the TLS client port nodes announce for redirects.
268    pub fn cluster_announce_tls_port(mut self, port: u16) -> Self {
269        self.cluster_announce_tls_port = Some(port);
270        self
271    }
272
273    /// Set a friendly node name broadcast for debugging/admin display.
274    pub fn cluster_announce_human_nodename(mut self, name: impl Into<String>) -> Self {
275        self.cluster_announce_human_nodename = Some(name.into());
276        self
277    }
278
279    /// Set a dedicated cluster bus port (default: client port + 10000).
280    pub fn cluster_port(mut self, port: u16) -> Self {
281        self.cluster_port = Some(port);
282        self
283    }
284
285    /// Set the maximum memory for a cluster bus link's output buffer.
286    ///
287    /// When exceeded, the link is disconnected. Set to `0` for unlimited.
288    pub fn cluster_link_sendbuf_limit(mut self, limit: u64) -> Self {
289        self.cluster_link_sendbuf_limit = Some(limit);
290        self
291    }
292
293    /// Set the cluster compatibility sample ratio.
294    pub fn cluster_compatibility_sample_ratio(mut self, ratio: u32) -> Self {
295        self.cluster_compatibility_sample_ratio = Some(ratio);
296        self
297    }
298
299    /// Set the maximum replication lag in bytes before slot migration handoff.
300    pub fn cluster_slot_migration_handoff_max_lag_bytes(mut self, bytes: u64) -> Self {
301        self.cluster_slot_migration_handoff_max_lag_bytes = Some(bytes);
302        self
303    }
304
305    /// Set the write pause timeout in milliseconds during slot migration.
306    pub fn cluster_slot_migration_write_pause_timeout(mut self, ms: u64) -> Self {
307        self.cluster_slot_migration_write_pause_timeout = Some(ms);
308        self
309    }
310
311    /// Enable per-slot statistics tracking.
312    pub fn cluster_slot_stats_enabled(mut self, enable: bool) -> Self {
313        self.cluster_slot_stats_enabled = Some(enable);
314        self
315    }
316
317    // -- replication directives relevant in cluster mode --
318
319    /// Set the minimum number of connected replicas before the master accepts writes.
320    ///
321    /// Useful for split-brain protection: a partitioned master with no reachable
322    /// replicas stops accepting writes, reducing data loss during partitions.
323    pub fn min_replicas_to_write(mut self, n: u32) -> Self {
324        self.min_replicas_to_write = Some(n);
325        self
326    }
327
328    /// Set the maximum replication lag (seconds) before a replica is considered disconnected.
329    ///
330    /// Used with `min_replicas_to_write` to determine if enough replicas are connected.
331    pub fn min_replicas_max_lag(mut self, seconds: u32) -> Self {
332        self.min_replicas_max_lag = Some(seconds);
333        self
334    }
335
336    /// Enable or disable diskless replication sync.
337    ///
338    /// Diskless sync is faster but uses more memory during transfer.
339    pub fn repl_diskless_sync(mut self, enable: bool) -> Self {
340        self.repl_diskless_sync = Some(enable);
341        self
342    }
343
344    /// Set the delay in seconds before starting a diskless replication transfer.
345    ///
346    /// Allows batching multiple replicas syncing at once.
347    pub fn repl_diskless_sync_delay(mut self, seconds: u32) -> Self {
348        self.repl_diskless_sync_delay = Some(seconds);
349        self
350    }
351
352    /// Set how often replicas ping the master (seconds).
353    ///
354    /// Used in the replica validity calculation:
355    /// `(node-timeout * validity-factor) + repl-ping-replica-period`.
356    pub fn repl_ping_replica_period(mut self, seconds: u32) -> Self {
357        self.repl_ping_replica_period = Some(seconds);
358        self
359    }
360
361    /// Set the replication timeout in seconds.
362    ///
363    /// If a replica doesn't hear from master for this long, it considers the link dead.
364    pub fn repl_timeout(mut self, seconds: u32) -> Self {
365        self.repl_timeout = Some(seconds);
366        self
367    }
368
369    // -- TLS directives --
370
371    /// Set the TLS listening port for all cluster nodes.
372    pub fn tls_port(mut self, port: u16) -> Self {
373        self.tls_port = Some(port);
374        self
375    }
376
377    /// Set the TLS certificate file path for all cluster nodes.
378    pub fn tls_cert_file(mut self, path: impl Into<PathBuf>) -> Self {
379        self.tls_cert_file = Some(path.into());
380        self
381    }
382
383    /// Set the TLS private key file path for all cluster nodes.
384    pub fn tls_key_file(mut self, path: impl Into<PathBuf>) -> Self {
385        self.tls_key_file = Some(path.into());
386        self
387    }
388
389    /// Set the TLS CA certificate file path for all cluster nodes.
390    pub fn tls_ca_cert_file(mut self, path: impl Into<PathBuf>) -> Self {
391        self.tls_ca_cert_file = Some(path.into());
392        self
393    }
394
395    /// Set the TLS CA certificate directory for all cluster nodes.
396    pub fn tls_ca_cert_dir(mut self, path: impl Into<PathBuf>) -> Self {
397        self.tls_ca_cert_dir = Some(path.into());
398        self
399    }
400
401    /// Require TLS client authentication for all cluster nodes.
402    pub fn tls_auth_clients(mut self, auth: bool) -> Self {
403        self.tls_auth_clients = Some(auth);
404        self
405    }
406
407    /// Use TLS for replication traffic between cluster nodes.
408    pub fn tls_replication(mut self, enable: bool) -> Self {
409        self.tls_replication = Some(enable);
410        self
411    }
412
413    /// Use TLS for cluster bus communication between nodes.
414    pub fn tls_cluster(mut self, enable: bool) -> Self {
415        self.tls_cluster = Some(enable);
416        self
417    }
418
419    /// Set a per-node configuration callback.
420    ///
421    /// The callback receives a [`NodeContext`] containing the pre-configured
422    /// [`RedisServer`] builder (with all uniform settings already applied) and
423    /// metadata about the node's position in the cluster. It must return the
424    /// (possibly modified) `RedisServer` builder.
425    ///
426    /// # Example
427    ///
428    /// ```no_run
429    /// use redis_server_wrapper::RedisCluster;
430    ///
431    /// # async fn example() {
432    /// let cluster = RedisCluster::builder()
433    ///     .masters(3)
434    ///     .replicas_per_master(1)
435    ///     .base_port(7000)
436    ///     .with_node_config(|ctx| {
437    ///         let is_replica = ctx.is_replica();
438    ///         let index = ctx.index;
439    ///         let mut server = ctx.server;
440    ///         if is_replica {
441    ///             server = server.cluster_replica_no_failover(true);
442    ///         }
443    ///         if index == 0 {
444    ///             server = server.maxmemory("512mb");
445    ///         }
446    ///         server
447    ///     })
448    ///     .start()
449    ///     .await
450    ///     .unwrap();
451    /// # }
452    /// ```
453    pub fn with_node_config(
454        mut self,
455        f: impl FnMut(NodeContext) -> RedisServer + Send + 'static,
456    ) -> Self {
457        self.node_config_fn = Some(Box::new(f));
458        self
459    }
460
461    /// Set an arbitrary config directive for all cluster nodes.
462    pub fn extra(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
463        self.extra.insert(key.into(), value.into());
464        self
465    }
466
467    /// Set a custom `redis-server` binary path.
468    pub fn redis_server_bin(mut self, bin: impl Into<String>) -> Self {
469        self.redis_server_bin = bin.into();
470        self
471    }
472
473    /// Set a custom `redis-cli` binary path.
474    pub fn redis_cli_bin(mut self, bin: impl Into<String>) -> Self {
475        self.redis_cli_bin = bin.into();
476        self
477    }
478
479    fn total_nodes(&self) -> u16 {
480        self.masters * (1 + self.replicas_per_master)
481    }
482
483    fn ports(&self) -> impl Iterator<Item = u16> {
484        let base = self.base_port;
485        let total = self.total_nodes();
486        (0..total).map(move |i| base + i)
487    }
488
489    /// Whether TLS is configured (cert + key present).
490    fn has_tls(&self) -> bool {
491        self.tls_cert_file.is_some() && self.tls_key_file.is_some()
492    }
493
494    /// Apply TLS flags to a CLI instance based on builder config.
495    fn apply_tls_to_cli(&self, mut cli: RedisCli) -> RedisCli {
496        if self.has_tls() {
497            cli = cli.tls(true);
498            if let Some(ref ca) = self.tls_ca_cert_file {
499                cli = cli.cacert(ca);
500            } else {
501                cli = cli.insecure(true);
502            }
503            if let Some(ref cert) = self.tls_cert_file {
504                cli = cli.cert(cert);
505            }
506            if let Some(ref key) = self.tls_key_file {
507                cli = cli.key(key);
508            }
509        }
510        cli
511    }
512
513    /// Start all nodes and form the cluster.
514    pub async fn start(mut self) -> Result<RedisClusterHandle> {
515        // Stop any leftover nodes from previous runs.
516        for port in self.ports() {
517            let mut cli = RedisCli::new()
518                .bin(&self.redis_cli_bin)
519                .host(&self.bind)
520                .port(port);
521            if let Some(ref password) = self.password {
522                cli = cli.password(password);
523            }
524            cli = self.apply_tls_to_cli(cli);
525            cli.shutdown();
526        }
527        tokio::time::sleep(Duration::from_millis(500)).await;
528
529        // Start each node.
530        let total_nodes = self.total_nodes();
531        let ports: Vec<u16> = self.ports().collect();
532        let mut nodes = Vec::new();
533        for (index, port) in ports.into_iter().enumerate() {
534            let node_dir = std::env::temp_dir().join(format!("redis-cluster-wrapper/node-{port}"));
535            let _ = std::fs::remove_dir_all(&node_dir);
536            let mut server = RedisServer::new()
537                .port(port)
538                .bind(&self.bind)
539                .dir(node_dir)
540                .cluster_enabled(true)
541                .cluster_node_timeout(self.cluster_node_timeout.unwrap_or(5000))
542                .redis_server_bin(&self.redis_server_bin)
543                .redis_cli_bin(&self.redis_cli_bin);
544            if let Some(v) = self.cluster_require_full_coverage {
545                server = server.cluster_require_full_coverage(v);
546            }
547            if let Some(v) = self.cluster_allow_reads_when_down {
548                server = server.cluster_allow_reads_when_down(v);
549            }
550            if let Some(v) = self.cluster_allow_pubsubshard_when_down {
551                server = server.cluster_allow_pubsubshard_when_down(v);
552            }
553            if let Some(v) = self.cluster_allow_replica_migration {
554                server = server.cluster_allow_replica_migration(v);
555            }
556            if let Some(barrier) = self.cluster_migration_barrier {
557                server = server.cluster_migration_barrier(barrier);
558            }
559            if let Some(ref hostname) = self.cluster_announce_hostname {
560                server = server.cluster_announce_hostname(hostname.clone());
561            }
562            if let Some(ref endpoint_type) = self.cluster_preferred_endpoint_type {
563                server = server.cluster_preferred_endpoint_type(endpoint_type.clone());
564            }
565            if let Some(v) = self.cluster_replica_no_failover {
566                server = server.cluster_replica_no_failover(v);
567            }
568            if let Some(factor) = self.cluster_replica_validity_factor {
569                server = server.cluster_replica_validity_factor(factor);
570            }
571            if let Some(ref ip) = self.cluster_announce_ip {
572                server = server.cluster_announce_ip(ip.clone());
573            }
574            if let Some(port) = self.cluster_announce_port {
575                server = server.cluster_announce_port(port);
576            }
577            if let Some(port) = self.cluster_announce_bus_port {
578                server = server.cluster_announce_bus_port(port);
579            }
580            if let Some(port) = self.cluster_announce_tls_port {
581                server = server.cluster_announce_tls_port(port);
582            }
583            if let Some(ref name) = self.cluster_announce_human_nodename {
584                server = server.cluster_announce_human_nodename(name.clone());
585            }
586            if let Some(port) = self.cluster_port {
587                server = server.cluster_port(port);
588            }
589            if let Some(limit) = self.cluster_link_sendbuf_limit {
590                server = server.cluster_link_sendbuf_limit(limit);
591            }
592            if let Some(ratio) = self.cluster_compatibility_sample_ratio {
593                server = server.cluster_compatibility_sample_ratio(ratio);
594            }
595            if let Some(bytes) = self.cluster_slot_migration_handoff_max_lag_bytes {
596                server = server.cluster_slot_migration_handoff_max_lag_bytes(bytes);
597            }
598            if let Some(ms) = self.cluster_slot_migration_write_pause_timeout {
599                server = server.cluster_slot_migration_write_pause_timeout(ms);
600            }
601            if let Some(v) = self.cluster_slot_stats_enabled {
602                server = server.cluster_slot_stats_enabled(v);
603            }
604            if let Some(n) = self.min_replicas_to_write {
605                server = server.min_replicas_to_write(n);
606            }
607            if let Some(seconds) = self.min_replicas_max_lag {
608                server = server.min_replicas_max_lag(seconds);
609            }
610            if let Some(v) = self.repl_diskless_sync {
611                server = server.repl_diskless_sync(v);
612            }
613            if let Some(seconds) = self.repl_diskless_sync_delay {
614                server = server.repl_diskless_sync_delay(seconds);
615            }
616            if let Some(seconds) = self.repl_ping_replica_period {
617                server = server.repl_ping_replica_period(seconds);
618            }
619            if let Some(seconds) = self.repl_timeout {
620                server = server.repl_timeout(seconds);
621            }
622            if let Some(ref password) = self.password {
623                server = server.password(password).masterauth(password);
624            }
625            if let Some(ref logfile) = self.logfile {
626                server = server.logfile(logfile.clone());
627            }
628            if let Some(ref save) = self.save {
629                match save {
630                    SavePolicy::Disabled => server = server.save(false),
631                    SavePolicy::Default => server = server.save(true),
632                    SavePolicy::Custom(pairs) => {
633                        server = server.save_schedule(pairs.clone());
634                    }
635                }
636            }
637            if let Some(appendonly) = self.appendonly {
638                server = server.appendonly(appendonly);
639            }
640            // TLS directives.
641            if let Some(port) = self.tls_port {
642                server = server.tls_port(port + index as u16);
643            }
644            if let Some(ref path) = self.tls_cert_file {
645                server = server.tls_cert_file(path);
646            }
647            if let Some(ref path) = self.tls_key_file {
648                server = server.tls_key_file(path);
649            }
650            if let Some(ref path) = self.tls_ca_cert_file {
651                server = server.tls_ca_cert_file(path);
652            }
653            if let Some(ref path) = self.tls_ca_cert_dir {
654                server = server.tls_ca_cert_dir(path);
655            }
656            if let Some(v) = self.tls_auth_clients {
657                server = server.tls_auth_clients(v);
658            }
659            if let Some(v) = self.tls_replication {
660                server = server.tls_replication(v);
661            }
662            if let Some(v) = self.tls_cluster {
663                server = server.tls_cluster(v);
664            }
665            for (key, value) in &self.extra {
666                server = server.extra(key.clone(), value.clone());
667            }
668            // Apply per-node customization if configured.
669            if let Some(ref mut f) = self.node_config_fn {
670                server = f(NodeContext {
671                    server,
672                    index,
673                    port,
674                    total_nodes,
675                    masters: self.masters,
676                    replicas_per_master: self.replicas_per_master,
677                });
678            }
679            let handle = server.start().await?;
680            nodes.push(handle);
681        }
682
683        // Form the cluster.
684        let node_addrs: Vec<String> = nodes.iter().map(|n| n.addr()).collect();
685        let mut cli = RedisCli::new()
686            .bin(&self.redis_cli_bin)
687            .host(&self.bind)
688            .port(self.base_port);
689        if let Some(ref password) = self.password {
690            cli = cli.password(password);
691        }
692        cli = self.apply_tls_to_cli(cli);
693        cli.cluster_create(&node_addrs, self.replicas_per_master)
694            .await?;
695
696        // Wait for convergence.
697        tokio::time::sleep(Duration::from_secs(2)).await;
698
699        Ok(RedisClusterHandle {
700            nodes,
701            num_masters: self.masters,
702            bind: self.bind,
703            base_port: self.base_port,
704            password: self.password,
705            redis_cli_bin: self.redis_cli_bin,
706            tls: TlsConfig {
707                cert_file: self.tls_cert_file,
708                key_file: self.tls_key_file,
709                ca_cert_file: self.tls_ca_cert_file,
710            },
711        })
712    }
713}
714
715/// TLS configuration snapshot stored in the handle for building CLI instances.
716#[derive(Clone, Debug, Default)]
717struct TlsConfig {
718    cert_file: Option<PathBuf>,
719    key_file: Option<PathBuf>,
720    ca_cert_file: Option<PathBuf>,
721}
722
723impl TlsConfig {
724    fn has_tls(&self) -> bool {
725        self.cert_file.is_some() && self.key_file.is_some()
726    }
727
728    fn apply(&self, mut cli: RedisCli) -> RedisCli {
729        if self.has_tls() {
730            cli = cli.tls(true);
731            if let Some(ref ca) = self.ca_cert_file {
732                cli = cli.cacert(ca);
733            } else {
734                cli = cli.insecure(true);
735            }
736            if let Some(ref cert) = self.cert_file {
737                cli = cli.cert(cert);
738            }
739            if let Some(ref key) = self.key_file {
740                cli = cli.key(key);
741            }
742        }
743        cli
744    }
745}
746
747/// A running Redis Cluster. Stops all nodes on Drop.
748pub struct RedisClusterHandle {
749    nodes: Vec<RedisServerHandle>,
750    num_masters: u16,
751    bind: String,
752    base_port: u16,
753    password: Option<String>,
754    redis_cli_bin: String,
755    tls: TlsConfig,
756}
757
758/// Entry point for building a Redis Cluster topology.
759///
760/// Call [`RedisCluster::builder`] to obtain a [`RedisClusterBuilder`], then
761/// configure it and call [`RedisClusterBuilder::start`] to launch the cluster.
762pub struct RedisCluster;
763
764impl RedisCluster {
765    /// Create a new cluster builder with defaults (3 masters, 0 replicas, port 7000).
766    pub fn builder() -> RedisClusterBuilder {
767        RedisClusterBuilder {
768            masters: 3,
769            replicas_per_master: 0,
770            base_port: 7000,
771            bind: "127.0.0.1".into(),
772            password: None,
773            logfile: None,
774            save: None,
775            appendonly: None,
776            cluster_node_timeout: None,
777            cluster_require_full_coverage: None,
778            cluster_allow_reads_when_down: None,
779            cluster_allow_pubsubshard_when_down: None,
780            cluster_allow_replica_migration: None,
781            cluster_migration_barrier: None,
782            cluster_announce_hostname: None,
783            cluster_announce_human_nodename: None,
784            cluster_preferred_endpoint_type: None,
785            cluster_replica_no_failover: None,
786            cluster_replica_validity_factor: None,
787            cluster_announce_ip: None,
788            cluster_announce_port: None,
789            cluster_announce_bus_port: None,
790            cluster_announce_tls_port: None,
791            cluster_port: None,
792            cluster_link_sendbuf_limit: None,
793            cluster_compatibility_sample_ratio: None,
794            cluster_slot_migration_handoff_max_lag_bytes: None,
795            cluster_slot_migration_write_pause_timeout: None,
796            cluster_slot_stats_enabled: None,
797            min_replicas_to_write: None,
798            min_replicas_max_lag: None,
799            repl_diskless_sync: None,
800            repl_diskless_sync_delay: None,
801            repl_ping_replica_period: None,
802            repl_timeout: None,
803            tls_port: None,
804            tls_cert_file: None,
805            tls_key_file: None,
806            tls_ca_cert_file: None,
807            tls_ca_cert_dir: None,
808            tls_auth_clients: None,
809            tls_replication: None,
810            tls_cluster: None,
811            extra: HashMap::new(),
812            redis_server_bin: "redis-server".into(),
813            redis_cli_bin: "redis-cli".into(),
814            node_config_fn: None,
815        }
816    }
817}
818
819impl RedisClusterHandle {
820    /// The seed address (first node).
821    pub fn addr(&self) -> String {
822        format!("{}:{}", self.bind, self.base_port)
823    }
824
825    /// All node addresses.
826    pub fn node_addrs(&self) -> Vec<String> {
827        self.nodes.iter().map(|n| n.addr()).collect()
828    }
829
830    /// The PIDs of all `redis-server` processes in the cluster.
831    pub fn pids(&self) -> Vec<u32> {
832        self.nodes.iter().map(|n| n.pid()).collect()
833    }
834
835    /// Check if all nodes are alive.
836    pub async fn all_alive(&self) -> bool {
837        for node in &self.nodes {
838            if !node.is_alive().await {
839                return false;
840            }
841        }
842        true
843    }
844
845    /// Check CLUSTER INFO for state=ok and all slots assigned.
846    pub async fn is_healthy(&self) -> bool {
847        for node in &self.nodes {
848            if let Ok(info) = node.run(&["CLUSTER", "INFO"]).await
849                && info.contains("cluster_state:ok")
850                && info.contains("cluster_slots_ok:16384")
851            {
852                return true;
853            }
854        }
855        false
856    }
857
858    /// Wait until the cluster is healthy or timeout.
859    pub async fn wait_for_healthy(&self, timeout: Duration) -> Result<()> {
860        let start = std::time::Instant::now();
861        loop {
862            if self.is_healthy().await {
863                return Ok(());
864            }
865            if start.elapsed() > timeout {
866                return Err(Error::Timeout {
867                    message: "cluster did not become healthy in time".into(),
868                });
869            }
870            tokio::time::sleep(Duration::from_millis(500)).await;
871        }
872    }
873
874    /// Access a specific node by index.
875    ///
876    /// Nodes are ordered by port: masters first (indices `0..masters`),
877    /// then replicas (indices `masters..total`).
878    ///
879    /// # Panics
880    ///
881    /// Panics if `index >= total_nodes`.
882    pub fn node(&self, index: usize) -> &RedisServerHandle {
883        &self.nodes[index]
884    }
885
886    /// All node handles.
887    pub fn nodes(&self) -> &[RedisServerHandle] {
888        &self.nodes
889    }
890
891    /// The number of master nodes in the cluster.
892    pub fn num_masters(&self) -> u16 {
893        self.num_masters
894    }
895
896    /// Handles for the master nodes (the first `masters` nodes by port order).
897    ///
898    /// Note: this reflects the *initial* topology. After a failover, the actual
899    /// roles may differ from the startup ordering.
900    pub fn master_nodes(&self) -> &[RedisServerHandle] {
901        &self.nodes[..self.num_masters as usize]
902    }
903
904    /// Handles for the replica nodes (all nodes after the masters by port order).
905    ///
906    /// Note: this reflects the *initial* topology. After a failover, the actual
907    /// roles may differ from the startup ordering.
908    pub fn replica_nodes(&self) -> &[RedisServerHandle] {
909        &self.nodes[self.num_masters as usize..]
910    }
911
912    /// Run `CONFIG SET` on all nodes.
913    pub async fn config_set_all(&self, key: &str, value: &str) -> Result<()> {
914        for node in &self.nodes {
915            node.run(&["CONFIG", "SET", key, value]).await?;
916        }
917        Ok(())
918    }
919
920    /// Run `CONFIG SET` on master nodes only (initial topology).
921    pub async fn config_set_masters(&self, key: &str, value: &str) -> Result<()> {
922        for node in self.master_nodes() {
923            node.run(&["CONFIG", "SET", key, value]).await?;
924        }
925        Ok(())
926    }
927
928    /// Run `CONFIG SET` on replica nodes only (initial topology).
929    pub async fn config_set_replicas(&self, key: &str, value: &str) -> Result<()> {
930        for node in self.replica_nodes() {
931            node.run(&["CONFIG", "SET", key, value]).await?;
932        }
933        Ok(())
934    }
935
936    /// Get a `RedisCli` for the seed node.
937    pub fn cli(&self) -> RedisCli {
938        let mut cli = RedisCli::new()
939            .bin(&self.redis_cli_bin)
940            .host(&self.bind)
941            .port(self.base_port);
942        if let Some(ref password) = self.password {
943            cli = cli.password(password);
944        }
945        cli = self.tls.apply(cli);
946        cli
947    }
948}
949
950impl Drop for RedisClusterHandle {
951    fn drop(&mut self) {
952        // RedisServerHandle::drop() handles each node.
953    }
954}
955
956#[cfg(test)]
957mod tests {
958    use super::*;
959
960    #[test]
961    fn builder_defaults() {
962        let b = RedisCluster::builder();
963        assert_eq!(b.masters, 3);
964        assert_eq!(b.replicas_per_master, 0);
965        assert_eq!(b.base_port, 7000);
966        assert_eq!(b.password, None);
967        assert!(b.logfile.is_none());
968        assert!(b.extra.is_empty());
969        assert_eq!(b.total_nodes(), 3);
970        assert!(b.cluster_node_timeout.is_none());
971        assert!(b.cluster_require_full_coverage.is_none());
972        assert!(b.cluster_allow_reads_when_down.is_none());
973        assert!(b.cluster_allow_pubsubshard_when_down.is_none());
974        assert!(b.cluster_allow_replica_migration.is_none());
975        assert!(b.cluster_migration_barrier.is_none());
976        assert!(b.cluster_announce_hostname.is_none());
977        assert!(b.cluster_preferred_endpoint_type.is_none());
978    }
979
980    #[test]
981    fn builder_with_replicas() {
982        let b = RedisCluster::builder().masters(3).replicas_per_master(1);
983        assert_eq!(b.total_nodes(), 6);
984        let ports: Vec<u16> = b.ports().collect();
985        assert_eq!(ports, vec![7000, 7001, 7002, 7003, 7004, 7005]);
986    }
987
988    #[test]
989    fn builder_password() {
990        let b = RedisCluster::builder().password("secret");
991        assert_eq!(b.password.as_deref(), Some("secret"));
992    }
993
994    #[test]
995    fn builder_cluster_directives() {
996        let b = RedisCluster::builder()
997            .cluster_node_timeout(10000)
998            .cluster_require_full_coverage(false)
999            .cluster_allow_reads_when_down(true)
1000            .cluster_allow_pubsubshard_when_down(true)
1001            .cluster_allow_replica_migration(false)
1002            .cluster_migration_barrier(2)
1003            .cluster_announce_hostname("node.example.com")
1004            .cluster_preferred_endpoint_type("hostname")
1005            .cluster_replica_no_failover(true)
1006            .cluster_replica_validity_factor(0)
1007            .cluster_announce_ip("10.0.0.1")
1008            .cluster_announce_port(7000)
1009            .cluster_announce_bus_port(17000)
1010            .cluster_announce_tls_port(7100)
1011            .cluster_announce_human_nodename("node-1")
1012            .cluster_port(17000)
1013            .cluster_link_sendbuf_limit(67108864)
1014            .cluster_compatibility_sample_ratio(50)
1015            .cluster_slot_migration_handoff_max_lag_bytes(1048576)
1016            .cluster_slot_migration_write_pause_timeout(5000)
1017            .cluster_slot_stats_enabled(true);
1018        assert_eq!(b.cluster_node_timeout, Some(10000));
1019        assert_eq!(b.cluster_require_full_coverage, Some(false));
1020        assert_eq!(b.cluster_allow_reads_when_down, Some(true));
1021        assert_eq!(b.cluster_allow_pubsubshard_when_down, Some(true));
1022        assert_eq!(b.cluster_allow_replica_migration, Some(false));
1023        assert_eq!(b.cluster_migration_barrier, Some(2));
1024        assert_eq!(
1025            b.cluster_announce_hostname.as_deref(),
1026            Some("node.example.com")
1027        );
1028        assert_eq!(
1029            b.cluster_preferred_endpoint_type.as_deref(),
1030            Some("hostname")
1031        );
1032        assert_eq!(b.cluster_replica_no_failover, Some(true));
1033        assert_eq!(b.cluster_replica_validity_factor, Some(0));
1034        assert_eq!(b.cluster_announce_ip.as_deref(), Some("10.0.0.1"));
1035        assert_eq!(b.cluster_announce_port, Some(7000));
1036        assert_eq!(b.cluster_announce_bus_port, Some(17000));
1037        assert_eq!(b.cluster_announce_tls_port, Some(7100));
1038        assert_eq!(b.cluster_announce_human_nodename.as_deref(), Some("node-1"));
1039        assert_eq!(b.cluster_port, Some(17000));
1040        assert_eq!(b.cluster_link_sendbuf_limit, Some(67108864));
1041        assert_eq!(b.cluster_compatibility_sample_ratio, Some(50));
1042        assert_eq!(
1043            b.cluster_slot_migration_handoff_max_lag_bytes,
1044            Some(1048576)
1045        );
1046        assert_eq!(b.cluster_slot_migration_write_pause_timeout, Some(5000));
1047        assert_eq!(b.cluster_slot_stats_enabled, Some(true));
1048    }
1049
1050    #[test]
1051    fn builder_replication_directives() {
1052        let b = RedisCluster::builder()
1053            .min_replicas_to_write(1)
1054            .min_replicas_max_lag(10)
1055            .repl_diskless_sync(true)
1056            .repl_diskless_sync_delay(0)
1057            .repl_ping_replica_period(5)
1058            .repl_timeout(30);
1059        assert_eq!(b.min_replicas_to_write, Some(1));
1060        assert_eq!(b.min_replicas_max_lag, Some(10));
1061        assert_eq!(b.repl_diskless_sync, Some(true));
1062        assert_eq!(b.repl_diskless_sync_delay, Some(0));
1063        assert_eq!(b.repl_ping_replica_period, Some(5));
1064        assert_eq!(b.repl_timeout, Some(30));
1065    }
1066
1067    #[test]
1068    fn builder_logfile_and_extra() {
1069        let b = RedisCluster::builder()
1070            .logfile("/tmp/cluster.log")
1071            .extra("maxmemory", "10mb");
1072        assert_eq!(b.logfile.as_deref(), Some("/tmp/cluster.log"));
1073        assert_eq!(b.extra.get("maxmemory").map(String::as_str), Some("10mb"));
1074    }
1075}