1use std::{env, process, thread::sleep, time::Duration};
2
3use tempfile::TempDir;
4
5use crate::{
6 server::{Module, RedisServer},
7 utils::{TlsFilePaths, build_keys_and_certs_for_tls_ext, get_random_available_port},
8};
9
10pub struct RedisClusterConfiguration {
12 pub num_nodes: u16,
13 pub num_replicas: u16,
14 pub modules: Vec<Module>,
15 pub tls_insecure: bool,
16 pub mtls_enabled: bool,
17 pub ports: Vec<u16>,
18 pub certs_with_ip_alts: bool,
19}
20
21impl RedisClusterConfiguration {
22 pub fn single_replica_config() -> Self {
23 Self {
24 num_nodes: 6,
25 num_replicas: 1,
26 ..Default::default()
27 }
28 }
29}
30
31impl Default for RedisClusterConfiguration {
32 fn default() -> Self {
33 Self {
34 num_nodes: 3,
35 num_replicas: 0,
36 modules: vec![],
37 tls_insecure: true,
38 mtls_enabled: false,
39 ports: vec![],
40 certs_with_ip_alts: true,
41 }
42 }
43}
44
45#[derive(Debug, Clone, Copy, PartialEq)]
47#[non_exhaustive]
48pub enum ClusterType {
49 Tcp,
50 TcpTls,
51}
52
53impl ClusterType {
54 pub fn get_intended() -> ClusterType {
55 match env::var("REDISRS_SERVER_TYPE")
56 .ok()
57 .as_ref()
58 .map(|x| &x[..])
59 {
60 Some("tcp") => ClusterType::Tcp,
61 Some("tcp+tls") => ClusterType::TcpTls,
62 Some(val) => {
63 panic!("Unknown server type {val:?}");
64 }
65 None => ClusterType::Tcp,
66 }
67 }
68
69 fn build_addr(port: u16) -> redis::ConnectionAddr {
70 match ClusterType::get_intended() {
71 ClusterType::Tcp => redis::ConnectionAddr::Tcp("127.0.0.1".into(), port),
72 ClusterType::TcpTls => redis::ConnectionAddr::TcpTls {
73 host: "127.0.0.1".into(),
74 port,
75 insecure: true,
76 tls_params: None,
77 },
78 }
79 }
80}
81
82fn port_in_use(addr: &str) -> bool {
83 let socket_addr: std::net::SocketAddr = addr.parse().expect("Invalid address");
84 let socket = socket2::Socket::new(
85 socket2::Domain::for_address(socket_addr),
86 socket2::Type::STREAM,
87 None,
88 )
89 .expect("Failed to create socket");
90
91 socket.connect(&socket_addr.into()).is_ok()
92}
93
94pub struct RedisCluster {
112 pub servers: Vec<RedisServer>,
113 pub folders: Vec<TempDir>,
114 pub tls_paths: Option<TlsFilePaths>,
115}
116
117impl RedisCluster {
118 pub fn username() -> &'static str {
119 "hello"
120 }
121
122 pub fn password() -> &'static str {
123 "world"
124 }
125
126 pub fn new(configuration: RedisClusterConfiguration) -> RedisCluster {
127 let RedisClusterConfiguration {
128 num_nodes: nodes,
129 num_replicas: replicas,
130 modules,
131 tls_insecure,
132 mtls_enabled,
133 ports,
134 certs_with_ip_alts,
135 } = configuration;
136
137 let optional_ports = if ports.is_empty() {
138 vec![None; nodes as usize]
139 } else {
140 assert!(ports.len() == nodes as usize);
141 ports.into_iter().map(Some).collect()
142 };
143 let mut chosen_ports = std::collections::HashSet::new();
144
145 let mut folders = vec![];
146 let mut addrs = vec![];
147 let mut tls_paths = None;
148
149 let mut is_tls = false;
150
151 if let ClusterType::TcpTls = ClusterType::get_intended() {
152 let tempdir = tempfile::Builder::new()
154 .prefix("redis")
155 .tempdir()
156 .expect("failed to create tempdir");
157 let files = build_keys_and_certs_for_tls_ext(&tempdir, certs_with_ip_alts);
158 folders.push(tempdir);
159 tls_paths = Some(files);
160 is_tls = true;
161 }
162
163 let max_attempts = 5;
164
165 let mut make_server = |port| {
166 RedisServer::new_with_addr_tls_modules_and_spawner(
167 ClusterType::build_addr(port),
168 None,
169 tls_paths.clone(),
170 mtls_enabled,
171 None, &modules,
173 |cmd| {
174 let tempdir = tempfile::Builder::new()
175 .prefix("redis")
176 .tempdir()
177 .expect("failed to create tempdir");
178 let acl_path = tempdir.path().join("users.acl");
179 let acl_content = format!(
180 "user {} on allcommands allkeys >{}",
181 Self::username(),
182 Self::password()
183 );
184 std::fs::write(&acl_path, acl_content).expect("failed to write acl file");
185 cmd.arg("--cluster-enabled")
186 .arg("yes")
187 .arg("--cluster-config-file")
188 .arg(tempdir.path().join("nodes.conf"))
189 .arg("--cluster-node-timeout")
190 .arg("5000")
191 .arg("--aclfile")
192 .arg(&acl_path);
193 if is_tls {
194 cmd.arg("--tls-cluster").arg("yes");
195 if replicas > 0 {
196 cmd.arg("--tls-replication").arg("yes");
197 }
198 }
199 cmd.current_dir(tempdir.path());
200 folders.push(tempdir);
201 cmd.spawn().unwrap()
202 },
203 )
204 };
205
206 let verify_server = |server: &mut RedisServer| {
207 let process = &mut server.process;
208 match process.try_wait() {
209 Ok(Some(status)) => {
210 let log_file_contents = server.log_file_contents();
211 let err = format!(
212 "redis server creation failed with status {status:?}.\nlog file: {log_file_contents:?}"
213 );
214 Err(err)
215 }
216 Ok(None) => {
217 let max_attempts = 200;
219 let mut cur_attempts = 0;
220 loop {
221 if cur_attempts == max_attempts {
222 let log_file_contents = server.log_file_contents();
223 break Err(format!(
224 "redis server creation failed: Address {} closed. {log_file_contents:?}",
225 server.addr
226 ));
227 } else if port_in_use(&server.addr.to_string()) {
228 break Ok(());
229 }
230 eprintln!("Waiting for redis process to initialize");
231 sleep(Duration::from_millis(50));
232 cur_attempts += 1;
233 }
234 }
235 Err(e) => {
236 panic!("Unexpected error in redis server creation {e}");
237 }
238 }
239 };
240
241 let servers = optional_ports
242 .into_iter()
243 .map(|port_option| {
244 for _ in 0..5 {
245 let port = match port_option {
246 Some(port) => port,
247 None => loop {
248 let port = get_random_available_port();
249 if chosen_ports.contains(&port) {
250 continue;
251 }
252 chosen_ports.insert(port);
253 break port;
254 },
255 };
256 let mut server = make_server(port);
257 sleep(Duration::from_millis(50));
258
259 match verify_server(&mut server) {
260 Ok(_) => {
261 let addr = format!("127.0.0.1:{port}");
262 addrs.push(addr.clone());
263 return server;
264 }
265 Err(err) => eprintln!("{err}"),
266 }
267 }
268 panic!("Exhausted retries");
269 })
270 .collect();
271
272 let mut cmd = process::Command::new("redis-cli");
273 cmd.stdout(process::Stdio::piped())
274 .arg("--cluster")
275 .arg("create")
276 .args(&addrs);
277 if replicas > 0 {
278 cmd.arg("--cluster-replicas").arg(replicas.to_string());
279 }
280 cmd.arg("--cluster-yes");
281
282 if is_tls {
283 if mtls_enabled {
284 if let Some(TlsFilePaths {
285 redis_crt,
286 redis_key,
287 ca_crt,
288 }) = &tls_paths
289 {
290 cmd.arg("--cert");
291 cmd.arg(redis_crt);
292 cmd.arg("--key");
293 cmd.arg(redis_key);
294 cmd.arg("--cacert");
295 cmd.arg(ca_crt);
296 cmd.arg("--tls");
297 }
298 } else if !tls_insecure && tls_paths.is_some() {
299 let ca_crt = &tls_paths.as_ref().unwrap().ca_crt;
300 cmd.arg("--tls").arg("--cacert").arg(ca_crt);
301 } else {
302 cmd.arg("--tls").arg("--insecure");
303 }
304 }
305
306 let mut cur_attempts = 0;
307 loop {
308 let output = cmd.output().unwrap();
309 if output.status.success() {
310 break;
311 } else {
312 let err = format!("Cluster creation failed: {output:?}");
313 if cur_attempts == max_attempts {
314 panic!("{err}");
315 }
316 eprintln!("Retrying: {err}");
317 sleep(Duration::from_millis(50));
318 cur_attempts += 1;
319 }
320 }
321
322 let cluster = RedisCluster {
323 servers,
324 folders,
325 tls_paths,
326 };
327 if replicas > 0 {
328 cluster.wait_for_replicas(replicas);
329 }
330
331 wait_for_status_ok(&cluster);
332 cluster
333 }
334
335 fn wait_for_replicas(&self, replicas: u16) {
336 'server: for server in &self.servers {
337 let conn_info = server.connection_info();
338 eprintln!(
339 "waiting until {:?} knows required number of replicas",
340 conn_info.addr()
341 );
342
343 let client = redis::Client::open(server.connection_info()).unwrap();
344
345 let mut con = client.get_connection().unwrap();
346
347 for _ in 1..500 {
349 let value = redis::cmd("CLUSTER").arg("SLOTS").query(&mut con).unwrap();
350 let slots: Vec<Vec<redis::Value>> = redis::from_redis_value(value).unwrap();
351
352 if slots.iter().all(|slot| slot.len() >= 3 + replicas as usize) {
355 continue 'server;
356 }
357
358 sleep(Duration::from_millis(100));
359 }
360
361 panic!("failed to create enough replicas");
362 }
363 }
364
365 pub fn stop(&mut self) {
366 for server in &mut self.servers {
367 server.stop();
368 }
369 }
370
371 pub fn iter_servers(&self) -> impl Iterator<Item = &RedisServer> {
372 self.servers.iter()
373 }
374}
375
376fn wait_for_status_ok(cluster: &RedisCluster) {
377 'server: for server in &cluster.servers {
378 let log_file = RedisServer::log_file(&server.tempdir);
379
380 for _ in 1..500 {
381 let contents =
382 std::fs::read_to_string(&log_file).expect("Should have been able to read the file");
383
384 if contents.contains("Cluster state changed: ok") {
385 continue 'server;
386 }
387 sleep(Duration::from_millis(20));
388 }
389 panic!("failed to reach state change: OK");
390 }
391}
392
393impl Drop for RedisCluster {
394 fn drop(&mut self) {
395 self.stop()
396 }
397}