redis_server_wrapper/
cluster.rs1use std::collections::HashMap;
4use std::time::Duration;
5
6use crate::cli::RedisCli;
7use crate::error::{Error, Result};
8use crate::server::{RedisServer, RedisServerHandle};
9
10pub struct RedisClusterBuilder {
31 masters: u16,
32 replicas_per_master: u16,
33 base_port: u16,
34 bind: String,
35 password: Option<String>,
36 logfile: Option<String>,
37 extra: HashMap<String, String>,
38 redis_server_bin: String,
39 redis_cli_bin: String,
40}
41
42impl RedisClusterBuilder {
43 pub fn masters(mut self, n: u16) -> Self {
45 self.masters = n;
46 self
47 }
48
49 pub fn replicas_per_master(mut self, n: u16) -> Self {
51 self.replicas_per_master = n;
52 self
53 }
54
55 pub fn base_port(mut self, port: u16) -> Self {
59 self.base_port = port;
60 self
61 }
62
63 pub fn bind(mut self, bind: impl Into<String>) -> Self {
65 self.bind = bind.into();
66 self
67 }
68
69 pub fn password(mut self, password: impl Into<String>) -> Self {
71 self.password = Some(password.into());
72 self
73 }
74
75 pub fn logfile(mut self, path: impl Into<String>) -> Self {
77 self.logfile = Some(path.into());
78 self
79 }
80
81 pub fn extra(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
83 self.extra.insert(key.into(), value.into());
84 self
85 }
86
87 pub fn redis_server_bin(mut self, bin: impl Into<String>) -> Self {
89 self.redis_server_bin = bin.into();
90 self
91 }
92
93 pub fn redis_cli_bin(mut self, bin: impl Into<String>) -> Self {
95 self.redis_cli_bin = bin.into();
96 self
97 }
98
99 fn total_nodes(&self) -> u16 {
100 self.masters * (1 + self.replicas_per_master)
101 }
102
103 fn ports(&self) -> impl Iterator<Item = u16> {
104 let base = self.base_port;
105 let total = self.total_nodes();
106 (0..total).map(move |i| base + i)
107 }
108
109 pub async fn start(self) -> Result<RedisClusterHandle> {
111 for port in self.ports() {
113 let mut cli = RedisCli::new()
114 .bin(&self.redis_cli_bin)
115 .host(&self.bind)
116 .port(port);
117 if let Some(ref password) = self.password {
118 cli = cli.password(password);
119 }
120 cli.shutdown();
121 }
122 tokio::time::sleep(Duration::from_millis(500)).await;
123
124 let mut nodes = Vec::new();
126 for port in self.ports() {
127 let node_dir = std::env::temp_dir().join(format!("redis-cluster-wrapper/node-{port}"));
128 let _ = std::fs::remove_dir_all(&node_dir);
129 let mut server = RedisServer::new()
130 .port(port)
131 .bind(&self.bind)
132 .dir(node_dir)
133 .cluster_enabled(true)
134 .cluster_node_timeout(5000)
135 .redis_server_bin(&self.redis_server_bin)
136 .redis_cli_bin(&self.redis_cli_bin);
137 if let Some(ref password) = self.password {
138 server = server.password(password).masterauth(password);
139 }
140 if let Some(ref logfile) = self.logfile {
141 server = server.logfile(logfile.clone());
142 }
143 for (key, value) in &self.extra {
144 server = server.extra(key.clone(), value.clone());
145 }
146 let handle = server.start().await?;
147 nodes.push(handle);
148 }
149
150 let node_addrs: Vec<String> = nodes.iter().map(|n| n.addr()).collect();
152 let mut cli = RedisCli::new()
153 .bin(&self.redis_cli_bin)
154 .host(&self.bind)
155 .port(self.base_port);
156 if let Some(ref password) = self.password {
157 cli = cli.password(password);
158 }
159 cli.cluster_create(&node_addrs, self.replicas_per_master)
160 .await?;
161
162 tokio::time::sleep(Duration::from_secs(2)).await;
164
165 Ok(RedisClusterHandle {
166 nodes,
167 bind: self.bind,
168 base_port: self.base_port,
169 password: self.password,
170 redis_cli_bin: self.redis_cli_bin,
171 })
172 }
173}
174
175pub struct RedisClusterHandle {
177 nodes: Vec<RedisServerHandle>,
178 bind: String,
179 base_port: u16,
180 password: Option<String>,
181 redis_cli_bin: String,
182}
183
184pub struct RedisCluster;
189
190impl RedisCluster {
191 pub fn builder() -> RedisClusterBuilder {
193 RedisClusterBuilder {
194 masters: 3,
195 replicas_per_master: 0,
196 base_port: 7000,
197 bind: "127.0.0.1".into(),
198 password: None,
199 logfile: None,
200 extra: HashMap::new(),
201 redis_server_bin: "redis-server".into(),
202 redis_cli_bin: "redis-cli".into(),
203 }
204 }
205}
206
207impl RedisClusterHandle {
208 pub fn addr(&self) -> String {
210 format!("{}:{}", self.bind, self.base_port)
211 }
212
213 pub fn node_addrs(&self) -> Vec<String> {
215 self.nodes.iter().map(|n| n.addr()).collect()
216 }
217
218 pub fn pids(&self) -> Vec<u32> {
220 self.nodes.iter().map(|n| n.pid()).collect()
221 }
222
223 pub async fn all_alive(&self) -> bool {
225 for node in &self.nodes {
226 if !node.is_alive().await {
227 return false;
228 }
229 }
230 true
231 }
232
233 pub async fn is_healthy(&self) -> bool {
235 for node in &self.nodes {
236 if let Ok(info) = node.run(&["CLUSTER", "INFO"]).await {
237 if info.contains("cluster_state:ok") && info.contains("cluster_slots_ok:16384") {
238 return true;
239 }
240 }
241 }
242 false
243 }
244
245 pub async fn wait_for_healthy(&self, timeout: Duration) -> Result<()> {
247 let start = std::time::Instant::now();
248 loop {
249 if self.is_healthy().await {
250 return Ok(());
251 }
252 if start.elapsed() > timeout {
253 return Err(Error::Timeout {
254 message: "cluster did not become healthy in time".into(),
255 });
256 }
257 tokio::time::sleep(Duration::from_millis(500)).await;
258 }
259 }
260
261 pub fn cli(&self) -> RedisCli {
263 let mut cli = RedisCli::new()
264 .bin(&self.redis_cli_bin)
265 .host(&self.bind)
266 .port(self.base_port);
267 if let Some(ref password) = self.password {
268 cli = cli.password(password);
269 }
270 cli
271 }
272}
273
274impl Drop for RedisClusterHandle {
275 fn drop(&mut self) {
276 }
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283
284 #[test]
285 fn builder_defaults() {
286 let b = RedisCluster::builder();
287 assert_eq!(b.masters, 3);
288 assert_eq!(b.replicas_per_master, 0);
289 assert_eq!(b.base_port, 7000);
290 assert_eq!(b.password, None);
291 assert!(b.logfile.is_none());
292 assert!(b.extra.is_empty());
293 assert_eq!(b.total_nodes(), 3);
294 }
295
296 #[test]
297 fn builder_with_replicas() {
298 let b = RedisCluster::builder().masters(3).replicas_per_master(1);
299 assert_eq!(b.total_nodes(), 6);
300 let ports: Vec<u16> = b.ports().collect();
301 assert_eq!(ports, vec![7000, 7001, 7002, 7003, 7004, 7005]);
302 }
303
304 #[test]
305 fn builder_password() {
306 let b = RedisCluster::builder().password("secret");
307 assert_eq!(b.password.as_deref(), Some("secret"));
308 }
309
310 #[test]
311 fn builder_logfile_and_extra() {
312 let b = RedisCluster::builder()
313 .logfile("/tmp/cluster.log")
314 .extra("maxmemory", "10mb");
315 assert_eq!(b.logfile.as_deref(), Some("/tmp/cluster.log"));
316 assert_eq!(b.extra.get("maxmemory").map(String::as_str), Some("10mb"));
317 }
318}