Skip to main content

redis_test/
sentinel.rs

1use std::{fs::File, io::Write, thread::sleep, time::Duration};
2
3use redis::{Client, ConnectionAddr, FromRedisValue, RedisResult};
4use tempfile::TempDir;
5
6use crate::{
7    server::{Module, RedisServer},
8    utils::{TlsFilePaths, build_keys_and_certs_for_tls, get_random_available_port},
9};
10
11/// A mock Redis Sentinel cluster for testing.
12///
13/// `RedisSentinelCluster` spawns multiple Redis instances configured as masters
14/// and replicas, along with Sentinel instances to monitor them.
15///
16/// # Example
17/// ```rust,no_run
18/// use redis_test::sentinel::RedisSentinelCluster;
19///
20/// let cluster = RedisSentinelCluster::new(1, 1, 3);
21///
22/// // Get the connection addresses of the sentinels and connect to the primary node
23/// let sentinel_addresses: Vec<_> = cluster.sentinel_servers.iter().map(|s| s.connection_info()).collect();
24/// let mut sentinel_client = redis::sentinel::SentinelClient::build(sentinel_addresses, String::from("master0"), None, redis::sentinel::SentinelServerType::Master).unwrap();
25/// let mut connection = sentinel_client.get_connection().unwrap();
26/// ```
27pub struct RedisSentinelCluster {
28    pub servers: Vec<RedisServer>,
29    pub sentinel_servers: Vec<RedisServer>,
30    pub folders: Vec<TempDir>,
31}
32
33impl RedisSentinelCluster {
34    pub fn log_sentinel_state_via_cli(&self, master_name: &str) {
35        use std::process::Command;
36
37        if let Some(sentinel) = self.sentinel_servers.first() {
38            if let Some((_, port)) = sentinel.host_and_port() {
39                println!("\n=== Querying sentinel state via redis-cli ===");
40
41                let output = Command::new("redis-cli")
42                    .args(["-p", &port.to_string(), "SENTINEL", "MASTERS"])
43                    .output();
44
45                match output {
46                    Ok(result) => {
47                        println!("SENTINEL MASTERS output:");
48                        println!("{}", String::from_utf8_lossy(&result.stdout));
49                        if !result.stderr.is_empty() {
50                            println!("stderr: {}", String::from_utf8_lossy(&result.stderr));
51                        }
52                    }
53                    Err(e) => println!("Failed to execute redis-cli SENTINEL MASTERS: {}", e),
54                }
55
56                let output = Command::new("redis-cli")
57                    .args(["-p", &port.to_string(), "SENTINEL", "SLAVES", master_name])
58                    .output();
59
60                match output {
61                    Ok(result) => {
62                        println!("\nSENTINEL SLAVES {} output:", master_name);
63                        println!("{}", String::from_utf8_lossy(&result.stdout));
64                        if !result.stderr.is_empty() {
65                            println!("stderr: {}", String::from_utf8_lossy(&result.stderr));
66                        }
67                    }
68                    Err(e) => println!("Failed to execute redis-cli SENTINEL SLAVES: {}", e),
69                }
70
71                let output = Command::new("redis-cli")
72                    .args([
73                        "-p",
74                        &port.to_string(),
75                        "SENTINEL",
76                        "GET-MASTER-ADDR-BY-NAME",
77                        master_name,
78                    ])
79                    .output();
80
81                match output {
82                    Ok(result) => {
83                        println!("\nSENTINEL GET-MASTER-ADDR-BY-NAME {} output:", master_name);
84                        println!("{}", String::from_utf8_lossy(&result.stdout));
85                        if !result.stderr.is_empty() {
86                            println!("stderr: {}", String::from_utf8_lossy(&result.stderr));
87                        }
88                    }
89                    Err(e) => println!(
90                        "Failed to execute redis-cli SENTINEL GET-MASTER-ADDR-BY-NAME: {}",
91                        e
92                    ),
93                }
94
95                println!("=== End sentinel state ===\n");
96            }
97        }
98    }
99
100    pub fn log_redis_state_via_cli(&self, port: u16) {
101        use std::process::Command;
102
103        let output = Command::new("redis-cli")
104            .args(["-p", &port.to_string(), "ROLE"])
105            .output();
106
107        match output {
108            Ok(result) => {
109                println!(
110                    "Redis 127.0.0.1:{port} ROLE output: {}",
111                    String::from_utf8_lossy(&result.stdout)
112                );
113                if !result.stderr.is_empty() {
114                    println!("stderr: {}", String::from_utf8_lossy(&result.stderr));
115                }
116            }
117            Err(e) => println!("Failed to execute redis-cli ROLE on port {}: {}", port, e),
118        }
119    }
120}
121
122const MTLS_NOT_ENABLED: bool = false;
123
124fn get_addr(port: u16) -> ConnectionAddr {
125    let addr = RedisServer::get_addr(port);
126    if let ConnectionAddr::Unix(_) = addr {
127        ConnectionAddr::Tcp(String::from("127.0.0.1"), port)
128    } else {
129        addr
130    }
131}
132
133fn spawn_master_server(
134    port: u16,
135    dir: &TempDir,
136    tlspaths: &TlsFilePaths,
137    modules: &[Module],
138) -> RedisServer {
139    RedisServer::new_with_addr_tls_modules_and_spawner(
140        get_addr(port),
141        None,
142        Some(tlspaths.clone()),
143        MTLS_NOT_ENABLED,
144        None, // cert_auth_field - not used in sentinel tests
145        modules,
146        |cmd| {
147            // Minimize startup delay
148            cmd.arg("--repl-diskless-sync-delay").arg("0");
149            if let ConnectionAddr::TcpTls { .. } = get_addr(port) {
150                cmd.arg("--tls-replication").arg("yes");
151            }
152            cmd.current_dir(dir.path());
153            cmd.spawn().unwrap()
154        },
155    )
156}
157
158fn spawn_replica_server(
159    port: u16,
160    master_port: u16,
161    dir: &TempDir,
162    tlspaths: &TlsFilePaths,
163    modules: &[Module],
164) -> RedisServer {
165    let config_file_path = dir.path().join("redis_config.conf");
166    File::create(&config_file_path).unwrap();
167
168    RedisServer::new_with_addr_tls_modules_and_spawner(
169        get_addr(port),
170        Some(&config_file_path),
171        Some(tlspaths.clone()),
172        MTLS_NOT_ENABLED,
173        None, // cert_auth_field - not used in sentinel tests
174        modules,
175        |cmd| {
176            cmd.arg("--replicaof")
177                .arg("127.0.0.1")
178                .arg(master_port.to_string());
179            if let ConnectionAddr::TcpTls { .. } = get_addr(port) {
180                cmd.arg("--tls-replication").arg("yes");
181            }
182            cmd.current_dir(dir.path());
183            cmd.spawn().unwrap()
184        },
185    )
186}
187
188fn spawn_sentinel_server(
189    port: u16,
190    master_ports: &[u16],
191    dir: &TempDir,
192    tlspaths: &TlsFilePaths,
193    modules: &[Module],
194) -> RedisServer {
195    let config_file_path = dir.path().join("redis_config.conf");
196    let mut file = File::create(&config_file_path).unwrap();
197    for (i, master_port) in master_ports.iter().enumerate() {
198        file.write_all(
199            format!("sentinel monitor master{i} 127.0.0.1 {master_port} 1\n",).as_bytes(),
200        )
201        .unwrap();
202    }
203    file.flush().unwrap();
204
205    RedisServer::new_with_addr_tls_modules_and_spawner(
206        get_addr(port),
207        Some(&config_file_path),
208        Some(tlspaths.clone()),
209        MTLS_NOT_ENABLED,
210        None, // cert_auth_field - not used in sentinel tests
211        modules,
212        |cmd| {
213            cmd.arg("--sentinel");
214            if let ConnectionAddr::TcpTls { .. } = get_addr(port) {
215                cmd.arg("--tls-replication").arg("yes");
216            }
217            cmd.current_dir(dir.path());
218            cmd.spawn().unwrap()
219        },
220    )
221}
222
223pub struct SentinelError;
224
225pub fn wait_for_master_server(
226    mut get_client_fn: impl FnMut() -> RedisResult<Client>,
227    cluster: Option<&RedisSentinelCluster>,
228) -> Result<(), SentinelError> {
229    let rolecmd = redis::cmd("ROLE");
230    for _ in 0..100 {
231        let master_client = get_client_fn();
232        match &master_client {
233            Ok(client) => match client.get_connection() {
234                Ok(mut conn) => {
235                    let r: Vec<redis::Value> = rolecmd.query(&mut conn).unwrap();
236                    let role = String::from_redis_value_ref(r.first().unwrap()).unwrap();
237                    if role.starts_with("master") {
238                        println!("found master");
239                        return Ok(());
240                    } else {
241                        println!("failed check for master role - current role: {r:?}")
242                    }
243                }
244                Err(err) => {
245                    println!("failed to get master connection: {err:?}");
246                    if let Some(cluster) = cluster {
247                        if let ConnectionAddr::Tcp(_, port) = client.get_connection_info().addr() {
248                            cluster.log_redis_state_via_cli(*port);
249                        }
250                    }
251                }
252            },
253            Err(err) => {
254                println!("failed to get master client: {err:?}",)
255            }
256        }
257
258        sleep(Duration::from_millis(25));
259    }
260
261    Err(SentinelError)
262}
263
264pub fn wait_for_replica(
265    mut get_client_fn: impl FnMut() -> RedisResult<Client>,
266    cluster: Option<&RedisSentinelCluster>,
267) -> Result<(), SentinelError> {
268    let rolecmd = redis::cmd("ROLE");
269    for i in 0..300 {
270        let replica_client = get_client_fn();
271        match &replica_client {
272            Ok(client) => match client.get_connection() {
273                Ok(mut conn) => {
274                    let r: Vec<redis::Value> = rolecmd.query(&mut conn).unwrap();
275                    let role = String::from_redis_value_ref(r.first().unwrap()).unwrap();
276                    let state = String::from_redis_value_ref(r.get(3).unwrap()).unwrap();
277                    if role.starts_with("slave") && state == "connected" {
278                        println!("found replica");
279                        return Ok(());
280                    } else {
281                        println!("failed check for replica role - current role: {r:?}")
282                    }
283                }
284                Err(err) => {
285                    println!("failed to get replica connection: {err:?}");
286                    if let Some(cluster) = cluster {
287                        if let ConnectionAddr::Tcp(_, port) = client.get_connection_info().addr() {
288                            cluster.log_redis_state_via_cli(*port);
289                        }
290                    }
291                }
292            },
293            Err(err) => {
294                println!("failed to get replica client: {err:?}")
295            }
296        }
297
298        let delay = if i < 100 { 25 } else { 50 };
299        sleep(Duration::from_millis(delay));
300    }
301
302    Err(SentinelError)
303}
304
305fn wait_for_replicas_to_sync(cluster: &RedisSentinelCluster, masters: u16) {
306    let servers = &cluster.servers;
307    let cluster_size = servers.len() / (masters as usize);
308    let clusters = servers.len() / cluster_size;
309    let replicas = cluster_size - 1;
310
311    for cluster_index in 0..clusters {
312        let master_addr = servers[cluster_index * cluster_size].connection_info();
313        let r = wait_for_master_server(|| redis::Client::open(master_addr.clone()), Some(cluster));
314        if r.is_err() {
315            cluster.log_sentinel_state_via_cli(&format!("master{}", cluster_index));
316            panic!("failed waiting for master to be ready");
317        }
318
319        for replica_index in 0..replicas {
320            let replica_addr =
321                servers[(cluster_index * cluster_size) + 1 + replica_index].connection_info();
322            let r = wait_for_replica(|| redis::Client::open(replica_addr.clone()), Some(cluster));
323            if r.is_err() {
324                cluster.log_sentinel_state_via_cli(&format!("master{}", cluster_index));
325                panic!("failed waiting for replica to be ready and in sync");
326            }
327        }
328    }
329}
330
331impl RedisSentinelCluster {
332    pub fn new(masters: u16, replicas_per_master: u16, sentinels: u16) -> RedisSentinelCluster {
333        RedisSentinelCluster::with_modules(masters, replicas_per_master, sentinels, &[])
334    }
335
336    pub fn with_modules(
337        masters: u16,
338        replicas_per_master: u16,
339        sentinels: u16,
340        modules: &[Module],
341    ) -> RedisSentinelCluster {
342        let mut servers = vec![];
343        let mut folders = vec![];
344        let mut master_ports = vec![];
345
346        let tempdir = tempfile::Builder::new()
347            .prefix("redistls")
348            .tempdir()
349            .expect("failed to create tempdir");
350        let tlspaths = build_keys_and_certs_for_tls(&tempdir);
351        folders.push(tempdir);
352
353        let required_number_of_sockets = masters * (replicas_per_master + 1) + sentinels;
354        let mut available_ports = std::collections::HashSet::new();
355        while available_ports.len() < required_number_of_sockets as usize {
356            available_ports.insert(get_random_available_port());
357        }
358        let mut available_ports: Vec<_> = available_ports.into_iter().collect();
359
360        for _ in 0..masters {
361            let port = available_ports.pop().unwrap();
362            let tempdir = tempfile::Builder::new()
363                .prefix("redis")
364                .tempdir()
365                .expect("failed to create tempdir");
366            servers.push(spawn_master_server(port, &tempdir, &tlspaths, modules));
367            folders.push(tempdir);
368            master_ports.push(port);
369
370            for _ in 0..replicas_per_master {
371                let replica_port = available_ports.pop().unwrap();
372                let tempdir = tempfile::Builder::new()
373                    .prefix("redis")
374                    .tempdir()
375                    .expect("failed to create tempdir");
376                servers.push(spawn_replica_server(
377                    replica_port,
378                    port,
379                    &tempdir,
380                    &tlspaths,
381                    modules,
382                ));
383                folders.push(tempdir);
384            }
385        }
386
387        let mut sentinel_servers = vec![];
388        for _ in 0..sentinels {
389            let port = available_ports.pop().unwrap();
390            let tempdir = tempfile::Builder::new()
391                .prefix("redis")
392                .tempdir()
393                .expect("failed to create tempdir");
394
395            sentinel_servers.push(spawn_sentinel_server(
396                port,
397                &master_ports,
398                &tempdir,
399                &tlspaths,
400                modules,
401            ));
402            folders.push(tempdir);
403        }
404
405        let cluster = RedisSentinelCluster {
406            servers,
407            sentinel_servers,
408            folders,
409        };
410
411        // Wait for replicas to sync so that the sentinels discover them on the first try
412        wait_for_replicas_to_sync(&cluster, masters);
413
414        cluster
415    }
416
417    pub fn stop(&mut self) {
418        for server in &mut self.servers {
419            server.stop();
420        }
421        for server in &mut self.sentinel_servers {
422            server.stop();
423        }
424    }
425
426    pub fn iter_sentinel_servers(&self) -> impl Iterator<Item = &RedisServer> {
427        self.sentinel_servers.iter()
428    }
429}
430
431impl Drop for RedisSentinelCluster {
432    fn drop(&mut self) {
433        self.stop()
434    }
435}