Skip to main content

redis_server_wrapper/
cluster.rs

1//! Redis Cluster lifecycle management built on `RedisServer`.
2
3use std::collections::HashMap;
4use std::time::Duration;
5
6use crate::cli::RedisCli;
7use crate::error::{Error, Result};
8use crate::server::{RedisServer, RedisServerHandle};
9
10/// Builder for a Redis Cluster.
11///
12/// # Example
13///
14/// ```no_run
15/// use redis_server_wrapper::RedisCluster;
16///
17/// # async fn example() {
18/// let cluster = RedisCluster::builder()
19///     .masters(3)
20///     .replicas_per_master(1)
21///     .base_port(7000)
22///     .start()
23///     .await
24///     .unwrap();
25///
26/// assert!(cluster.is_healthy().await);
27/// // Stopped automatically on Drop.
28/// # }
29/// ```
30pub struct RedisClusterBuilder {
31    masters: u16,
32    replicas_per_master: u16,
33    base_port: u16,
34    bind: String,
35    password: Option<String>,
36    logfile: Option<String>,
37    extra: HashMap<String, String>,
38    redis_server_bin: String,
39    redis_cli_bin: String,
40}
41
42impl RedisClusterBuilder {
43    /// Set the number of master nodes (default: `3`).
44    pub fn masters(mut self, n: u16) -> Self {
45        self.masters = n;
46        self
47    }
48
49    /// Set the number of replicas per master (default: `0`).
50    pub fn replicas_per_master(mut self, n: u16) -> Self {
51        self.replicas_per_master = n;
52        self
53    }
54
55    /// Set the base port for cluster nodes (default: `7000`).
56    ///
57    /// Nodes are assigned consecutive ports starting at this value.
58    pub fn base_port(mut self, port: u16) -> Self {
59        self.base_port = port;
60        self
61    }
62
63    /// Set the bind address for all cluster nodes (default: `"127.0.0.1"`).
64    pub fn bind(mut self, bind: impl Into<String>) -> Self {
65        self.bind = bind.into();
66        self
67    }
68
69    /// Set a `requirepass` password for all cluster nodes.
70    pub fn password(mut self, password: impl Into<String>) -> Self {
71        self.password = Some(password.into());
72        self
73    }
74
75    /// Set the log file path for all cluster nodes.
76    pub fn logfile(mut self, path: impl Into<String>) -> Self {
77        self.logfile = Some(path.into());
78        self
79    }
80
81    /// Set an arbitrary config directive for all cluster nodes.
82    pub fn extra(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
83        self.extra.insert(key.into(), value.into());
84        self
85    }
86
87    /// Set a custom `redis-server` binary path.
88    pub fn redis_server_bin(mut self, bin: impl Into<String>) -> Self {
89        self.redis_server_bin = bin.into();
90        self
91    }
92
93    /// Set a custom `redis-cli` binary path.
94    pub fn redis_cli_bin(mut self, bin: impl Into<String>) -> Self {
95        self.redis_cli_bin = bin.into();
96        self
97    }
98
99    fn total_nodes(&self) -> u16 {
100        self.masters * (1 + self.replicas_per_master)
101    }
102
103    fn ports(&self) -> impl Iterator<Item = u16> {
104        let base = self.base_port;
105        let total = self.total_nodes();
106        (0..total).map(move |i| base + i)
107    }
108
109    /// Start all nodes and form the cluster.
110    pub async fn start(self) -> Result<RedisClusterHandle> {
111        // Stop any leftover nodes from previous runs.
112        for port in self.ports() {
113            let mut cli = RedisCli::new()
114                .bin(&self.redis_cli_bin)
115                .host(&self.bind)
116                .port(port);
117            if let Some(ref password) = self.password {
118                cli = cli.password(password);
119            }
120            cli.shutdown();
121        }
122        tokio::time::sleep(Duration::from_millis(500)).await;
123
124        // Start each node.
125        let mut nodes = Vec::new();
126        for port in self.ports() {
127            let node_dir = std::env::temp_dir().join(format!("redis-cluster-wrapper/node-{port}"));
128            let _ = std::fs::remove_dir_all(&node_dir);
129            let mut server = RedisServer::new()
130                .port(port)
131                .bind(&self.bind)
132                .dir(node_dir)
133                .cluster_enabled(true)
134                .cluster_node_timeout(5000)
135                .redis_server_bin(&self.redis_server_bin)
136                .redis_cli_bin(&self.redis_cli_bin);
137            if let Some(ref password) = self.password {
138                server = server.password(password).masterauth(password);
139            }
140            if let Some(ref logfile) = self.logfile {
141                server = server.logfile(logfile.clone());
142            }
143            for (key, value) in &self.extra {
144                server = server.extra(key.clone(), value.clone());
145            }
146            let handle = server.start().await?;
147            nodes.push(handle);
148        }
149
150        // Form the cluster.
151        let node_addrs: Vec<String> = nodes.iter().map(|n| n.addr()).collect();
152        let mut cli = RedisCli::new()
153            .bin(&self.redis_cli_bin)
154            .host(&self.bind)
155            .port(self.base_port);
156        if let Some(ref password) = self.password {
157            cli = cli.password(password);
158        }
159        cli.cluster_create(&node_addrs, self.replicas_per_master)
160            .await?;
161
162        // Wait for convergence.
163        tokio::time::sleep(Duration::from_secs(2)).await;
164
165        Ok(RedisClusterHandle {
166            nodes,
167            bind: self.bind,
168            base_port: self.base_port,
169            password: self.password,
170            redis_cli_bin: self.redis_cli_bin,
171        })
172    }
173}
174
175/// A running Redis Cluster. Stops all nodes on Drop.
176pub struct RedisClusterHandle {
177    nodes: Vec<RedisServerHandle>,
178    bind: String,
179    base_port: u16,
180    password: Option<String>,
181    redis_cli_bin: String,
182}
183
184/// Entry point for building a Redis Cluster topology.
185///
186/// Call [`RedisCluster::builder`] to obtain a [`RedisClusterBuilder`], then
187/// configure it and call [`RedisClusterBuilder::start`] to launch the cluster.
188pub struct RedisCluster;
189
190impl RedisCluster {
191    /// Create a new cluster builder with defaults (3 masters, 0 replicas, port 7000).
192    pub fn builder() -> RedisClusterBuilder {
193        RedisClusterBuilder {
194            masters: 3,
195            replicas_per_master: 0,
196            base_port: 7000,
197            bind: "127.0.0.1".into(),
198            password: None,
199            logfile: None,
200            extra: HashMap::new(),
201            redis_server_bin: "redis-server".into(),
202            redis_cli_bin: "redis-cli".into(),
203        }
204    }
205}
206
207impl RedisClusterHandle {
208    /// The seed address (first node).
209    pub fn addr(&self) -> String {
210        format!("{}:{}", self.bind, self.base_port)
211    }
212
213    /// All node addresses.
214    pub fn node_addrs(&self) -> Vec<String> {
215        self.nodes.iter().map(|n| n.addr()).collect()
216    }
217
218    /// The PIDs of all `redis-server` processes in the cluster.
219    pub fn pids(&self) -> Vec<u32> {
220        self.nodes.iter().map(|n| n.pid()).collect()
221    }
222
223    /// Check if all nodes are alive.
224    pub async fn all_alive(&self) -> bool {
225        for node in &self.nodes {
226            if !node.is_alive().await {
227                return false;
228            }
229        }
230        true
231    }
232
233    /// Check CLUSTER INFO for state=ok and all slots assigned.
234    pub async fn is_healthy(&self) -> bool {
235        for node in &self.nodes {
236            if let Ok(info) = node.run(&["CLUSTER", "INFO"]).await {
237                if info.contains("cluster_state:ok") && info.contains("cluster_slots_ok:16384") {
238                    return true;
239                }
240            }
241        }
242        false
243    }
244
245    /// Wait until the cluster is healthy or timeout.
246    pub async fn wait_for_healthy(&self, timeout: Duration) -> Result<()> {
247        let start = std::time::Instant::now();
248        loop {
249            if self.is_healthy().await {
250                return Ok(());
251            }
252            if start.elapsed() > timeout {
253                return Err(Error::Timeout {
254                    message: "cluster did not become healthy in time".into(),
255                });
256            }
257            tokio::time::sleep(Duration::from_millis(500)).await;
258        }
259    }
260
261    /// Get a `RedisCli` for the seed node.
262    pub fn cli(&self) -> RedisCli {
263        let mut cli = RedisCli::new()
264            .bin(&self.redis_cli_bin)
265            .host(&self.bind)
266            .port(self.base_port);
267        if let Some(ref password) = self.password {
268            cli = cli.password(password);
269        }
270        cli
271    }
272}
273
274impl Drop for RedisClusterHandle {
275    fn drop(&mut self) {
276        // RedisServerHandle::drop() handles each node.
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283
284    #[test]
285    fn builder_defaults() {
286        let b = RedisCluster::builder();
287        assert_eq!(b.masters, 3);
288        assert_eq!(b.replicas_per_master, 0);
289        assert_eq!(b.base_port, 7000);
290        assert_eq!(b.password, None);
291        assert!(b.logfile.is_none());
292        assert!(b.extra.is_empty());
293        assert_eq!(b.total_nodes(), 3);
294    }
295
296    #[test]
297    fn builder_with_replicas() {
298        let b = RedisCluster::builder().masters(3).replicas_per_master(1);
299        assert_eq!(b.total_nodes(), 6);
300        let ports: Vec<u16> = b.ports().collect();
301        assert_eq!(ports, vec![7000, 7001, 7002, 7003, 7004, 7005]);
302    }
303
304    #[test]
305    fn builder_password() {
306        let b = RedisCluster::builder().password("secret");
307        assert_eq!(b.password.as_deref(), Some("secret"));
308    }
309
310    #[test]
311    fn builder_logfile_and_extra() {
312        let b = RedisCluster::builder()
313            .logfile("/tmp/cluster.log")
314            .extra("maxmemory", "10mb");
315        assert_eq!(b.logfile.as_deref(), Some("/tmp/cluster.log"));
316        assert_eq!(b.extra.get("maxmemory").map(String::as_str), Some("10mb"));
317    }
318}