Skip to main content

redis_server_wrapper/
cluster.rs

1//! Redis Cluster lifecycle management built on `RedisServer`.
2
3use std::time::Duration;
4
5use crate::cli::RedisCli;
6use crate::error::{Error, Result};
7use crate::server::{RedisServer, RedisServerHandle};
8
9/// Builder for a Redis Cluster.
10///
11/// # Example
12///
13/// ```no_run
14/// use redis_server_wrapper::RedisCluster;
15///
16/// # async fn example() {
17/// let cluster = RedisCluster::builder()
18///     .masters(3)
19///     .replicas_per_master(1)
20///     .base_port(7000)
21///     .start()
22///     .await
23///     .unwrap();
24///
25/// assert!(cluster.is_healthy().await);
26/// // Stopped automatically on Drop.
27/// # }
28/// ```
29pub struct RedisClusterBuilder {
30    masters: u16,
31    replicas_per_master: u16,
32    base_port: u16,
33    bind: String,
34    redis_server_bin: String,
35    redis_cli_bin: String,
36}
37
38impl RedisClusterBuilder {
39    pub fn masters(mut self, n: u16) -> Self {
40        self.masters = n;
41        self
42    }
43
44    pub fn replicas_per_master(mut self, n: u16) -> Self {
45        self.replicas_per_master = n;
46        self
47    }
48
49    pub fn base_port(mut self, port: u16) -> Self {
50        self.base_port = port;
51        self
52    }
53
54    pub fn bind(mut self, bind: impl Into<String>) -> Self {
55        self.bind = bind.into();
56        self
57    }
58
59    pub fn redis_server_bin(mut self, bin: impl Into<String>) -> Self {
60        self.redis_server_bin = bin.into();
61        self
62    }
63
64    pub fn redis_cli_bin(mut self, bin: impl Into<String>) -> Self {
65        self.redis_cli_bin = bin.into();
66        self
67    }
68
69    fn total_nodes(&self) -> u16 {
70        self.masters * (1 + self.replicas_per_master)
71    }
72
73    fn ports(&self) -> impl Iterator<Item = u16> {
74        let base = self.base_port;
75        let total = self.total_nodes();
76        (0..total).map(move |i| base + i)
77    }
78
79    /// Start all nodes and form the cluster.
80    pub async fn start(self) -> Result<RedisClusterHandle> {
81        // Stop any leftover nodes from previous runs.
82        for port in self.ports() {
83            RedisCli::new()
84                .bin(&self.redis_cli_bin)
85                .host(&self.bind)
86                .port(port)
87                .shutdown();
88        }
89        tokio::time::sleep(Duration::from_millis(500)).await;
90
91        // Start each node.
92        let mut nodes = Vec::new();
93        for port in self.ports() {
94            let handle = RedisServer::new()
95                .port(port)
96                .bind(&self.bind)
97                .dir(std::env::temp_dir().join(format!("redis-cluster-wrapper/node-{port}")))
98                .cluster_enabled(true)
99                .cluster_node_timeout(5000)
100                .redis_server_bin(&self.redis_server_bin)
101                .redis_cli_bin(&self.redis_cli_bin)
102                .start()
103                .await?;
104            nodes.push(handle);
105        }
106
107        // Form the cluster.
108        let node_addrs: Vec<String> = nodes.iter().map(|n| n.addr()).collect();
109        let cli = RedisCli::new()
110            .bin(&self.redis_cli_bin)
111            .host(&self.bind)
112            .port(self.base_port);
113        cli.cluster_create(&node_addrs, self.replicas_per_master)
114            .await?;
115
116        // Wait for convergence.
117        tokio::time::sleep(Duration::from_secs(2)).await;
118
119        Ok(RedisClusterHandle {
120            nodes,
121            bind: self.bind,
122            base_port: self.base_port,
123            redis_cli_bin: self.redis_cli_bin,
124        })
125    }
126}
127
128/// A running Redis Cluster. Stops all nodes on Drop.
129pub struct RedisClusterHandle {
130    nodes: Vec<RedisServerHandle>,
131    bind: String,
132    base_port: u16,
133    redis_cli_bin: String,
134}
135
136/// Convenience constructor.
137pub struct RedisCluster;
138
139impl RedisCluster {
140    /// Create a new cluster builder with defaults (3 masters, 0 replicas, port 7000).
141    pub fn builder() -> RedisClusterBuilder {
142        RedisClusterBuilder {
143            masters: 3,
144            replicas_per_master: 0,
145            base_port: 7000,
146            bind: "127.0.0.1".into(),
147            redis_server_bin: "redis-server".into(),
148            redis_cli_bin: "redis-cli".into(),
149        }
150    }
151}
152
153impl RedisClusterHandle {
154    /// The seed address (first node).
155    pub fn addr(&self) -> String {
156        format!("{}:{}", self.bind, self.base_port)
157    }
158
159    /// All node addresses.
160    pub fn node_addrs(&self) -> Vec<String> {
161        self.nodes.iter().map(|n| n.addr()).collect()
162    }
163
164    /// Check if all nodes are alive.
165    pub async fn all_alive(&self) -> bool {
166        for node in &self.nodes {
167            if !node.is_alive().await {
168                return false;
169            }
170        }
171        true
172    }
173
174    /// Check CLUSTER INFO for state=ok and all slots assigned.
175    pub async fn is_healthy(&self) -> bool {
176        for node in &self.nodes {
177            if let Ok(info) = node.run(&["CLUSTER", "INFO"]).await {
178                if info.contains("cluster_state:ok") && info.contains("cluster_slots_ok:16384") {
179                    return true;
180                }
181            }
182        }
183        false
184    }
185
186    /// Wait until the cluster is healthy or timeout.
187    pub async fn wait_for_healthy(&self, timeout: Duration) -> Result<()> {
188        let start = std::time::Instant::now();
189        loop {
190            if self.is_healthy().await {
191                return Ok(());
192            }
193            if start.elapsed() > timeout {
194                return Err(Error::Timeout {
195                    message: "cluster did not become healthy in time".into(),
196                });
197            }
198            tokio::time::sleep(Duration::from_millis(500)).await;
199        }
200    }
201
202    /// Get a `RedisCli` for the seed node.
203    pub fn cli(&self) -> RedisCli {
204        RedisCli::new()
205            .bin(&self.redis_cli_bin)
206            .host(&self.bind)
207            .port(self.base_port)
208    }
209}
210
211impl Drop for RedisClusterHandle {
212    fn drop(&mut self) {
213        // RedisServerHandle::drop() handles each node.
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220
221    #[test]
222    fn builder_defaults() {
223        let b = RedisCluster::builder();
224        assert_eq!(b.masters, 3);
225        assert_eq!(b.replicas_per_master, 0);
226        assert_eq!(b.base_port, 7000);
227        assert_eq!(b.total_nodes(), 3);
228    }
229
230    #[test]
231    fn builder_with_replicas() {
232        let b = RedisCluster::builder().masters(3).replicas_per_master(1);
233        assert_eq!(b.total_nodes(), 6);
234        let ports: Vec<u16> = b.ports().collect();
235        assert_eq!(ports, vec![7000, 7001, 7002, 7003, 7004, 7005]);
236    }
237}