redis_test/
sentinel.rs

1use std::{fs::File, io::Write, thread::sleep, time::Duration};
2
3use redis::{Client, ConnectionAddr, FromRedisValue, RedisResult};
4use tempfile::TempDir;
5
6use crate::{
7    server::{Module, RedisServer},
8    utils::{build_keys_and_certs_for_tls, get_random_available_port, TlsFilePaths},
9};
10
11pub struct RedisSentinelCluster {
12    pub servers: Vec<RedisServer>,
13    pub sentinel_servers: Vec<RedisServer>,
14    pub folders: Vec<TempDir>,
15}
16
17const MTLS_NOT_ENABLED: bool = false;
18
19fn get_addr(port: u16) -> ConnectionAddr {
20    let addr = RedisServer::get_addr(port);
21    if let ConnectionAddr::Unix(_) = addr {
22        ConnectionAddr::Tcp(String::from("127.0.0.1"), port)
23    } else {
24        addr
25    }
26}
27
28fn spawn_master_server(
29    port: u16,
30    dir: &TempDir,
31    tlspaths: &TlsFilePaths,
32    modules: &[Module],
33) -> RedisServer {
34    RedisServer::new_with_addr_tls_modules_and_spawner(
35        get_addr(port),
36        None,
37        Some(tlspaths.clone()),
38        MTLS_NOT_ENABLED,
39        modules,
40        |cmd| {
41            // Minimize startup delay
42            cmd.arg("--repl-diskless-sync-delay").arg("0");
43            cmd.arg("--appendonly").arg("yes");
44            if let ConnectionAddr::TcpTls { .. } = get_addr(port) {
45                cmd.arg("--tls-replication").arg("yes");
46            }
47            cmd.current_dir(dir.path());
48            cmd.spawn().unwrap()
49        },
50    )
51}
52
53fn spawn_replica_server(
54    port: u16,
55    master_port: u16,
56    dir: &TempDir,
57    tlspaths: &TlsFilePaths,
58    modules: &[Module],
59) -> RedisServer {
60    let config_file_path = dir.path().join("redis_config.conf");
61    File::create(&config_file_path).unwrap();
62
63    RedisServer::new_with_addr_tls_modules_and_spawner(
64        get_addr(port),
65        Some(&config_file_path),
66        Some(tlspaths.clone()),
67        MTLS_NOT_ENABLED,
68        modules,
69        |cmd| {
70            cmd.arg("--replicaof")
71                .arg("127.0.0.1")
72                .arg(master_port.to_string());
73            if let ConnectionAddr::TcpTls { .. } = get_addr(port) {
74                cmd.arg("--tls-replication").arg("yes");
75            }
76            cmd.arg("--appendonly").arg("yes");
77            cmd.current_dir(dir.path());
78            cmd.spawn().unwrap()
79        },
80    )
81}
82
83fn spawn_sentinel_server(
84    port: u16,
85    master_ports: &[u16],
86    dir: &TempDir,
87    tlspaths: &TlsFilePaths,
88    modules: &[Module],
89) -> RedisServer {
90    let config_file_path = dir.path().join("redis_config.conf");
91    let mut file = File::create(&config_file_path).unwrap();
92    for (i, master_port) in master_ports.iter().enumerate() {
93        file.write_all(
94            format!("sentinel monitor master{i} 127.0.0.1 {master_port} 1\n",).as_bytes(),
95        )
96        .unwrap();
97    }
98    file.flush().unwrap();
99
100    RedisServer::new_with_addr_tls_modules_and_spawner(
101        get_addr(port),
102        Some(&config_file_path),
103        Some(tlspaths.clone()),
104        MTLS_NOT_ENABLED,
105        modules,
106        |cmd| {
107            cmd.arg("--sentinel");
108            cmd.arg("--appendonly").arg("yes");
109            if let ConnectionAddr::TcpTls { .. } = get_addr(port) {
110                cmd.arg("--tls-replication").arg("yes");
111            }
112            cmd.current_dir(dir.path());
113            cmd.spawn().unwrap()
114        },
115    )
116}
117
118pub struct SentinelError;
119
120pub fn wait_for_master_server(
121    mut get_client_fn: impl FnMut() -> RedisResult<Client>,
122) -> Result<(), SentinelError> {
123    let rolecmd = redis::cmd("ROLE");
124    for _ in 0..100 {
125        let master_client = get_client_fn();
126        match master_client {
127            Ok(client) => match client.get_connection() {
128                Ok(mut conn) => {
129                    let r: Vec<redis::Value> = rolecmd.query(&mut conn).unwrap();
130                    let role = String::from_redis_value(r.first().unwrap()).unwrap();
131                    if role.starts_with("master") {
132                        return Ok(());
133                    } else {
134                        println!("failed check for master role - current role: {r:?}")
135                    }
136                }
137                Err(err) => {
138                    println!("failed to get master connection: {err:?}",)
139                }
140            },
141            Err(err) => {
142                println!("failed to get master client: {err:?}",)
143            }
144        }
145
146        sleep(Duration::from_millis(25));
147    }
148
149    Err(SentinelError)
150}
151
152pub fn wait_for_replica(
153    mut get_client_fn: impl FnMut() -> RedisResult<Client>,
154) -> Result<(), SentinelError> {
155    let rolecmd = redis::cmd("ROLE");
156    for _ in 0..200 {
157        let replica_client = get_client_fn();
158        match replica_client {
159            Ok(client) => match client.get_connection() {
160                Ok(mut conn) => {
161                    let r: Vec<redis::Value> = rolecmd.query(&mut conn).unwrap();
162                    let role = String::from_redis_value(r.first().unwrap()).unwrap();
163                    let state = String::from_redis_value(r.get(3).unwrap()).unwrap();
164                    if role.starts_with("slave") && state == "connected" {
165                        return Ok(());
166                    } else {
167                        println!("failed check for replica role - current role: {r:?}")
168                    }
169                }
170                Err(err) => {
171                    println!("failed to get replica connection: {err:?}")
172                }
173            },
174            Err(err) => {
175                println!("failed to get replica client: {err:?}")
176            }
177        }
178
179        sleep(Duration::from_millis(25));
180    }
181
182    Err(SentinelError)
183}
184
185fn wait_for_replicas_to_sync(servers: &[RedisServer], masters: u16) {
186    let cluster_size = servers.len() / (masters as usize);
187    let clusters = servers.len() / cluster_size;
188    let replicas = cluster_size - 1;
189
190    for cluster_index in 0..clusters {
191        let master_addr = servers[cluster_index * cluster_size].connection_info();
192        let r = wait_for_master_server(|| redis::Client::open(master_addr.clone()));
193        if r.is_err() {
194            panic!("failed waiting for master to be ready");
195        }
196
197        for replica_index in 0..replicas {
198            let replica_addr =
199                servers[(cluster_index * cluster_size) + 1 + replica_index].connection_info();
200            let r = wait_for_replica(|| redis::Client::open(replica_addr.clone()));
201            if r.is_err() {
202                panic!("failed waiting for replica to be ready and in sync");
203            }
204        }
205    }
206}
207
208impl RedisSentinelCluster {
209    pub fn new(masters: u16, replicas_per_master: u16, sentinels: u16) -> RedisSentinelCluster {
210        RedisSentinelCluster::with_modules(masters, replicas_per_master, sentinels, &[])
211    }
212
213    pub fn with_modules(
214        masters: u16,
215        replicas_per_master: u16,
216        sentinels: u16,
217        modules: &[Module],
218    ) -> RedisSentinelCluster {
219        let mut servers = vec![];
220        let mut folders = vec![];
221        let mut master_ports = vec![];
222
223        let tempdir = tempfile::Builder::new()
224            .prefix("redistls")
225            .tempdir()
226            .expect("failed to create tempdir");
227        let tlspaths = build_keys_and_certs_for_tls(&tempdir);
228        folders.push(tempdir);
229
230        let required_number_of_sockets = masters * (replicas_per_master + 1) + sentinels;
231        let mut available_ports = std::collections::HashSet::new();
232        while available_ports.len() < required_number_of_sockets as usize {
233            available_ports.insert(get_random_available_port());
234        }
235        let mut available_ports: Vec<_> = available_ports.into_iter().collect();
236
237        for _ in 0..masters {
238            let port = available_ports.pop().unwrap();
239            let tempdir = tempfile::Builder::new()
240                .prefix("redis")
241                .tempdir()
242                .expect("failed to create tempdir");
243            servers.push(spawn_master_server(port, &tempdir, &tlspaths, modules));
244            folders.push(tempdir);
245            master_ports.push(port);
246
247            for _ in 0..replicas_per_master {
248                let replica_port = available_ports.pop().unwrap();
249                let tempdir = tempfile::Builder::new()
250                    .prefix("redis")
251                    .tempdir()
252                    .expect("failed to create tempdir");
253                servers.push(spawn_replica_server(
254                    replica_port,
255                    port,
256                    &tempdir,
257                    &tlspaths,
258                    modules,
259                ));
260                folders.push(tempdir);
261            }
262        }
263
264        // Wait for replicas to sync so that the sentinels discover them on the first try
265        wait_for_replicas_to_sync(&servers, masters);
266
267        let mut sentinel_servers = vec![];
268        for _ in 0..sentinels {
269            let port = available_ports.pop().unwrap();
270            let tempdir = tempfile::Builder::new()
271                .prefix("redis")
272                .tempdir()
273                .expect("failed to create tempdir");
274
275            sentinel_servers.push(spawn_sentinel_server(
276                port,
277                &master_ports,
278                &tempdir,
279                &tlspaths,
280                modules,
281            ));
282            folders.push(tempdir);
283        }
284
285        RedisSentinelCluster {
286            servers,
287            sentinel_servers,
288            folders,
289        }
290    }
291
292    pub fn stop(&mut self) {
293        for server in &mut self.servers {
294            server.stop();
295        }
296        for server in &mut self.sentinel_servers {
297            server.stop();
298        }
299    }
300
301    pub fn iter_sentinel_servers(&self) -> impl Iterator<Item = &RedisServer> {
302        self.sentinel_servers.iter()
303    }
304}
305
306impl Drop for RedisSentinelCluster {
307    fn drop(&mut self) {
308        self.stop()
309    }
310}