1use std::{fs::File, io::Write, thread::sleep, time::Duration};
2
3use redis::{Client, ConnectionAddr, FromRedisValue, RedisResult};
4use tempfile::TempDir;
5
6use crate::{
7 server::{Module, RedisServer},
8 utils::{TlsFilePaths, build_keys_and_certs_for_tls, get_random_available_port},
9};
10
11pub struct RedisSentinelCluster {
28 pub servers: Vec<RedisServer>,
29 pub sentinel_servers: Vec<RedisServer>,
30 pub folders: Vec<TempDir>,
31}
32
33impl RedisSentinelCluster {
34 pub fn log_sentinel_state_via_cli(&self, master_name: &str) {
35 use std::process::Command;
36
37 if let Some(sentinel) = self.sentinel_servers.first() {
38 if let Some((_, port)) = sentinel.host_and_port() {
39 println!("\n=== Querying sentinel state via redis-cli ===");
40
41 let output = Command::new("redis-cli")
42 .args(["-p", &port.to_string(), "SENTINEL", "MASTERS"])
43 .output();
44
45 match output {
46 Ok(result) => {
47 println!("SENTINEL MASTERS output:");
48 println!("{}", String::from_utf8_lossy(&result.stdout));
49 if !result.stderr.is_empty() {
50 println!("stderr: {}", String::from_utf8_lossy(&result.stderr));
51 }
52 }
53 Err(e) => println!("Failed to execute redis-cli SENTINEL MASTERS: {}", e),
54 }
55
56 let output = Command::new("redis-cli")
57 .args(["-p", &port.to_string(), "SENTINEL", "SLAVES", master_name])
58 .output();
59
60 match output {
61 Ok(result) => {
62 println!("\nSENTINEL SLAVES {} output:", master_name);
63 println!("{}", String::from_utf8_lossy(&result.stdout));
64 if !result.stderr.is_empty() {
65 println!("stderr: {}", String::from_utf8_lossy(&result.stderr));
66 }
67 }
68 Err(e) => println!("Failed to execute redis-cli SENTINEL SLAVES: {}", e),
69 }
70
71 let output = Command::new("redis-cli")
72 .args([
73 "-p",
74 &port.to_string(),
75 "SENTINEL",
76 "GET-MASTER-ADDR-BY-NAME",
77 master_name,
78 ])
79 .output();
80
81 match output {
82 Ok(result) => {
83 println!("\nSENTINEL GET-MASTER-ADDR-BY-NAME {} output:", master_name);
84 println!("{}", String::from_utf8_lossy(&result.stdout));
85 if !result.stderr.is_empty() {
86 println!("stderr: {}", String::from_utf8_lossy(&result.stderr));
87 }
88 }
89 Err(e) => println!(
90 "Failed to execute redis-cli SENTINEL GET-MASTER-ADDR-BY-NAME: {}",
91 e
92 ),
93 }
94
95 println!("=== End sentinel state ===\n");
96 }
97 }
98 }
99
100 pub fn log_redis_state_via_cli(&self, port: u16) {
101 use std::process::Command;
102
103 let output = Command::new("redis-cli")
104 .args(["-p", &port.to_string(), "ROLE"])
105 .output();
106
107 match output {
108 Ok(result) => {
109 println!(
110 "Redis 127.0.0.1:{port} ROLE output: {}",
111 String::from_utf8_lossy(&result.stdout)
112 );
113 if !result.stderr.is_empty() {
114 println!("stderr: {}", String::from_utf8_lossy(&result.stderr));
115 }
116 }
117 Err(e) => println!("Failed to execute redis-cli ROLE on port {}: {}", port, e),
118 }
119 }
120}
121
122const MTLS_NOT_ENABLED: bool = false;
123
124fn get_addr(port: u16) -> ConnectionAddr {
125 let addr = RedisServer::get_addr(port);
126 if let ConnectionAddr::Unix(_) = addr {
127 ConnectionAddr::Tcp(String::from("127.0.0.1"), port)
128 } else {
129 addr
130 }
131}
132
133fn spawn_master_server(
134 port: u16,
135 dir: &TempDir,
136 tlspaths: &TlsFilePaths,
137 modules: &[Module],
138) -> RedisServer {
139 RedisServer::new_with_addr_tls_modules_and_spawner(
140 get_addr(port),
141 None,
142 Some(tlspaths.clone()),
143 MTLS_NOT_ENABLED,
144 None, modules,
146 |cmd| {
147 cmd.arg("--repl-diskless-sync-delay").arg("0");
149 if let ConnectionAddr::TcpTls { .. } = get_addr(port) {
150 cmd.arg("--tls-replication").arg("yes");
151 }
152 cmd.current_dir(dir.path());
153 cmd.spawn().unwrap()
154 },
155 )
156}
157
158fn spawn_replica_server(
159 port: u16,
160 master_port: u16,
161 dir: &TempDir,
162 tlspaths: &TlsFilePaths,
163 modules: &[Module],
164) -> RedisServer {
165 let config_file_path = dir.path().join("redis_config.conf");
166 File::create(&config_file_path).unwrap();
167
168 RedisServer::new_with_addr_tls_modules_and_spawner(
169 get_addr(port),
170 Some(&config_file_path),
171 Some(tlspaths.clone()),
172 MTLS_NOT_ENABLED,
173 None, modules,
175 |cmd| {
176 cmd.arg("--replicaof")
177 .arg("127.0.0.1")
178 .arg(master_port.to_string());
179 if let ConnectionAddr::TcpTls { .. } = get_addr(port) {
180 cmd.arg("--tls-replication").arg("yes");
181 }
182 cmd.current_dir(dir.path());
183 cmd.spawn().unwrap()
184 },
185 )
186}
187
188fn spawn_sentinel_server(
189 port: u16,
190 master_ports: &[u16],
191 dir: &TempDir,
192 tlspaths: &TlsFilePaths,
193 modules: &[Module],
194) -> RedisServer {
195 let config_file_path = dir.path().join("redis_config.conf");
196 let mut file = File::create(&config_file_path).unwrap();
197 for (i, master_port) in master_ports.iter().enumerate() {
198 file.write_all(
199 format!("sentinel monitor master{i} 127.0.0.1 {master_port} 1\n",).as_bytes(),
200 )
201 .unwrap();
202 }
203 file.flush().unwrap();
204
205 RedisServer::new_with_addr_tls_modules_and_spawner(
206 get_addr(port),
207 Some(&config_file_path),
208 Some(tlspaths.clone()),
209 MTLS_NOT_ENABLED,
210 None, modules,
212 |cmd| {
213 cmd.arg("--sentinel");
214 if let ConnectionAddr::TcpTls { .. } = get_addr(port) {
215 cmd.arg("--tls-replication").arg("yes");
216 }
217 cmd.current_dir(dir.path());
218 cmd.spawn().unwrap()
219 },
220 )
221}
222
223pub struct SentinelError;
224
225pub fn wait_for_master_server(
226 mut get_client_fn: impl FnMut() -> RedisResult<Client>,
227 cluster: Option<&RedisSentinelCluster>,
228) -> Result<(), SentinelError> {
229 let rolecmd = redis::cmd("ROLE");
230 for _ in 0..100 {
231 let master_client = get_client_fn();
232 match &master_client {
233 Ok(client) => match client.get_connection() {
234 Ok(mut conn) => {
235 let r: Vec<redis::Value> = rolecmd.query(&mut conn).unwrap();
236 let role = String::from_redis_value_ref(r.first().unwrap()).unwrap();
237 if role.starts_with("master") {
238 println!("found master");
239 return Ok(());
240 } else {
241 println!("failed check for master role - current role: {r:?}")
242 }
243 }
244 Err(err) => {
245 println!("failed to get master connection: {err:?}");
246 if let Some(cluster) = cluster {
247 if let ConnectionAddr::Tcp(_, port) = client.get_connection_info().addr() {
248 cluster.log_redis_state_via_cli(*port);
249 }
250 }
251 }
252 },
253 Err(err) => {
254 println!("failed to get master client: {err:?}",)
255 }
256 }
257
258 sleep(Duration::from_millis(25));
259 }
260
261 Err(SentinelError)
262}
263
264pub fn wait_for_replica(
265 mut get_client_fn: impl FnMut() -> RedisResult<Client>,
266 cluster: Option<&RedisSentinelCluster>,
267) -> Result<(), SentinelError> {
268 let rolecmd = redis::cmd("ROLE");
269 for i in 0..300 {
270 let replica_client = get_client_fn();
271 match &replica_client {
272 Ok(client) => match client.get_connection() {
273 Ok(mut conn) => {
274 let r: Vec<redis::Value> = rolecmd.query(&mut conn).unwrap();
275 let role = String::from_redis_value_ref(r.first().unwrap()).unwrap();
276 let state = String::from_redis_value_ref(r.get(3).unwrap()).unwrap();
277 if role.starts_with("slave") && state == "connected" {
278 println!("found replica");
279 return Ok(());
280 } else {
281 println!("failed check for replica role - current role: {r:?}")
282 }
283 }
284 Err(err) => {
285 println!("failed to get replica connection: {err:?}");
286 if let Some(cluster) = cluster {
287 if let ConnectionAddr::Tcp(_, port) = client.get_connection_info().addr() {
288 cluster.log_redis_state_via_cli(*port);
289 }
290 }
291 }
292 },
293 Err(err) => {
294 println!("failed to get replica client: {err:?}")
295 }
296 }
297
298 let delay = if i < 100 { 25 } else { 50 };
299 sleep(Duration::from_millis(delay));
300 }
301
302 Err(SentinelError)
303}
304
305fn wait_for_replicas_to_sync(cluster: &RedisSentinelCluster, masters: u16) {
306 let servers = &cluster.servers;
307 let cluster_size = servers.len() / (masters as usize);
308 let clusters = servers.len() / cluster_size;
309 let replicas = cluster_size - 1;
310
311 for cluster_index in 0..clusters {
312 let master_addr = servers[cluster_index * cluster_size].connection_info();
313 let r = wait_for_master_server(|| redis::Client::open(master_addr.clone()), Some(cluster));
314 if r.is_err() {
315 cluster.log_sentinel_state_via_cli(&format!("master{}", cluster_index));
316 panic!("failed waiting for master to be ready");
317 }
318
319 for replica_index in 0..replicas {
320 let replica_addr =
321 servers[(cluster_index * cluster_size) + 1 + replica_index].connection_info();
322 let r = wait_for_replica(|| redis::Client::open(replica_addr.clone()), Some(cluster));
323 if r.is_err() {
324 cluster.log_sentinel_state_via_cli(&format!("master{}", cluster_index));
325 panic!("failed waiting for replica to be ready and in sync");
326 }
327 }
328 }
329}
330
331impl RedisSentinelCluster {
332 pub fn new(masters: u16, replicas_per_master: u16, sentinels: u16) -> RedisSentinelCluster {
333 RedisSentinelCluster::with_modules(masters, replicas_per_master, sentinels, &[])
334 }
335
336 pub fn with_modules(
337 masters: u16,
338 replicas_per_master: u16,
339 sentinels: u16,
340 modules: &[Module],
341 ) -> RedisSentinelCluster {
342 let mut servers = vec![];
343 let mut folders = vec![];
344 let mut master_ports = vec![];
345
346 let tempdir = tempfile::Builder::new()
347 .prefix("redistls")
348 .tempdir()
349 .expect("failed to create tempdir");
350 let tlspaths = build_keys_and_certs_for_tls(&tempdir);
351 folders.push(tempdir);
352
353 let required_number_of_sockets = masters * (replicas_per_master + 1) + sentinels;
354 let mut available_ports = std::collections::HashSet::new();
355 while available_ports.len() < required_number_of_sockets as usize {
356 available_ports.insert(get_random_available_port());
357 }
358 let mut available_ports: Vec<_> = available_ports.into_iter().collect();
359
360 for _ in 0..masters {
361 let port = available_ports.pop().unwrap();
362 let tempdir = tempfile::Builder::new()
363 .prefix("redis")
364 .tempdir()
365 .expect("failed to create tempdir");
366 servers.push(spawn_master_server(port, &tempdir, &tlspaths, modules));
367 folders.push(tempdir);
368 master_ports.push(port);
369
370 for _ in 0..replicas_per_master {
371 let replica_port = available_ports.pop().unwrap();
372 let tempdir = tempfile::Builder::new()
373 .prefix("redis")
374 .tempdir()
375 .expect("failed to create tempdir");
376 servers.push(spawn_replica_server(
377 replica_port,
378 port,
379 &tempdir,
380 &tlspaths,
381 modules,
382 ));
383 folders.push(tempdir);
384 }
385 }
386
387 let mut sentinel_servers = vec![];
388 for _ in 0..sentinels {
389 let port = available_ports.pop().unwrap();
390 let tempdir = tempfile::Builder::new()
391 .prefix("redis")
392 .tempdir()
393 .expect("failed to create tempdir");
394
395 sentinel_servers.push(spawn_sentinel_server(
396 port,
397 &master_ports,
398 &tempdir,
399 &tlspaths,
400 modules,
401 ));
402 folders.push(tempdir);
403 }
404
405 let cluster = RedisSentinelCluster {
406 servers,
407 sentinel_servers,
408 folders,
409 };
410
411 wait_for_replicas_to_sync(&cluster, masters);
413
414 cluster
415 }
416
417 pub fn stop(&mut self) {
418 for server in &mut self.servers {
419 server.stop();
420 }
421 for server in &mut self.sentinel_servers {
422 server.stop();
423 }
424 }
425
426 pub fn iter_sentinel_servers(&self) -> impl Iterator<Item = &RedisServer> {
427 self.sentinel_servers.iter()
428 }
429}
430
431impl Drop for RedisSentinelCluster {
432 fn drop(&mut self) {
433 self.stop()
434 }
435}