redis_server_wrapper/
cluster.rs1use std::time::Duration;
4
5use crate::cli::RedisCli;
6use crate::error::{Error, Result};
7use crate::server::{RedisServer, RedisServerHandle};
8
9pub struct RedisClusterBuilder {
30 masters: u16,
31 replicas_per_master: u16,
32 base_port: u16,
33 bind: String,
34 redis_server_bin: String,
35 redis_cli_bin: String,
36}
37
38impl RedisClusterBuilder {
39 pub fn masters(mut self, n: u16) -> Self {
40 self.masters = n;
41 self
42 }
43
44 pub fn replicas_per_master(mut self, n: u16) -> Self {
45 self.replicas_per_master = n;
46 self
47 }
48
49 pub fn base_port(mut self, port: u16) -> Self {
50 self.base_port = port;
51 self
52 }
53
54 pub fn bind(mut self, bind: impl Into<String>) -> Self {
55 self.bind = bind.into();
56 self
57 }
58
59 pub fn redis_server_bin(mut self, bin: impl Into<String>) -> Self {
60 self.redis_server_bin = bin.into();
61 self
62 }
63
64 pub fn redis_cli_bin(mut self, bin: impl Into<String>) -> Self {
65 self.redis_cli_bin = bin.into();
66 self
67 }
68
69 fn total_nodes(&self) -> u16 {
70 self.masters * (1 + self.replicas_per_master)
71 }
72
73 fn ports(&self) -> impl Iterator<Item = u16> {
74 let base = self.base_port;
75 let total = self.total_nodes();
76 (0..total).map(move |i| base + i)
77 }
78
79 pub async fn start(self) -> Result<RedisClusterHandle> {
81 for port in self.ports() {
83 RedisCli::new()
84 .bin(&self.redis_cli_bin)
85 .host(&self.bind)
86 .port(port)
87 .shutdown();
88 }
89 tokio::time::sleep(Duration::from_millis(500)).await;
90
91 let mut nodes = Vec::new();
93 for port in self.ports() {
94 let handle = RedisServer::new()
95 .port(port)
96 .bind(&self.bind)
97 .dir(std::env::temp_dir().join(format!("redis-cluster-wrapper/node-{port}")))
98 .cluster_enabled(true)
99 .cluster_node_timeout(5000)
100 .redis_server_bin(&self.redis_server_bin)
101 .redis_cli_bin(&self.redis_cli_bin)
102 .start()
103 .await?;
104 nodes.push(handle);
105 }
106
107 let node_addrs: Vec<String> = nodes.iter().map(|n| n.addr()).collect();
109 let cli = RedisCli::new()
110 .bin(&self.redis_cli_bin)
111 .host(&self.bind)
112 .port(self.base_port);
113 cli.cluster_create(&node_addrs, self.replicas_per_master)
114 .await?;
115
116 tokio::time::sleep(Duration::from_secs(2)).await;
118
119 Ok(RedisClusterHandle {
120 nodes,
121 bind: self.bind,
122 base_port: self.base_port,
123 redis_cli_bin: self.redis_cli_bin,
124 })
125 }
126}
127
128pub struct RedisClusterHandle {
130 nodes: Vec<RedisServerHandle>,
131 bind: String,
132 base_port: u16,
133 redis_cli_bin: String,
134}
135
136pub struct RedisCluster;
138
139impl RedisCluster {
140 pub fn builder() -> RedisClusterBuilder {
142 RedisClusterBuilder {
143 masters: 3,
144 replicas_per_master: 0,
145 base_port: 7000,
146 bind: "127.0.0.1".into(),
147 redis_server_bin: "redis-server".into(),
148 redis_cli_bin: "redis-cli".into(),
149 }
150 }
151}
152
153impl RedisClusterHandle {
154 pub fn addr(&self) -> String {
156 format!("{}:{}", self.bind, self.base_port)
157 }
158
159 pub fn node_addrs(&self) -> Vec<String> {
161 self.nodes.iter().map(|n| n.addr()).collect()
162 }
163
164 pub async fn all_alive(&self) -> bool {
166 for node in &self.nodes {
167 if !node.is_alive().await {
168 return false;
169 }
170 }
171 true
172 }
173
174 pub async fn is_healthy(&self) -> bool {
176 for node in &self.nodes {
177 if let Ok(info) = node.run(&["CLUSTER", "INFO"]).await {
178 if info.contains("cluster_state:ok") && info.contains("cluster_slots_ok:16384") {
179 return true;
180 }
181 }
182 }
183 false
184 }
185
186 pub async fn wait_for_healthy(&self, timeout: Duration) -> Result<()> {
188 let start = std::time::Instant::now();
189 loop {
190 if self.is_healthy().await {
191 return Ok(());
192 }
193 if start.elapsed() > timeout {
194 return Err(Error::Timeout {
195 message: "cluster did not become healthy in time".into(),
196 });
197 }
198 tokio::time::sleep(Duration::from_millis(500)).await;
199 }
200 }
201
202 pub fn cli(&self) -> RedisCli {
204 RedisCli::new()
205 .bin(&self.redis_cli_bin)
206 .host(&self.bind)
207 .port(self.base_port)
208 }
209}
210
211impl Drop for RedisClusterHandle {
212 fn drop(&mut self) {
213 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220
221 #[test]
222 fn builder_defaults() {
223 let b = RedisCluster::builder();
224 assert_eq!(b.masters, 3);
225 assert_eq!(b.replicas_per_master, 0);
226 assert_eq!(b.base_port, 7000);
227 assert_eq!(b.total_nodes(), 3);
228 }
229
230 #[test]
231 fn builder_with_replicas() {
232 let b = RedisCluster::builder().masters(3).replicas_per_master(1);
233 assert_eq!(b.total_nodes(), 6);
234 let ports: Vec<u16> = b.ports().collect();
235 assert_eq!(ports, vec![7000, 7001, 7002, 7003, 7004, 7005]);
236 }
237}