Skip to main content

redis_test/
sentinel.rs

1use std::{fs::File, io::Write, thread::sleep, time::Duration};
2
3use redis::{Client, ConnectionAddr, FromRedisValue, RedisResult};
4use tempfile::TempDir;
5
6use crate::{
7    server::{Module, RedisServer},
8    utils::{TlsFilePaths, build_keys_and_certs_for_tls, get_random_available_port},
9};
10
11pub struct RedisSentinelCluster {
12    pub servers: Vec<RedisServer>,
13    pub sentinel_servers: Vec<RedisServer>,
14    pub folders: Vec<TempDir>,
15}
16
17const MTLS_NOT_ENABLED: bool = false;
18
19fn get_addr(port: u16) -> ConnectionAddr {
20    let addr = RedisServer::get_addr(port);
21    if let ConnectionAddr::Unix(_) = addr {
22        ConnectionAddr::Tcp(String::from("127.0.0.1"), port)
23    } else {
24        addr
25    }
26}
27
28fn spawn_master_server(
29    port: u16,
30    dir: &TempDir,
31    tlspaths: &TlsFilePaths,
32    modules: &[Module],
33) -> RedisServer {
34    RedisServer::new_with_addr_tls_modules_and_spawner(
35        get_addr(port),
36        None,
37        Some(tlspaths.clone()),
38        MTLS_NOT_ENABLED,
39        modules,
40        |cmd| {
41            // Minimize startup delay
42            cmd.arg("--repl-diskless-sync-delay").arg("0");
43            if let ConnectionAddr::TcpTls { .. } = get_addr(port) {
44                cmd.arg("--tls-replication").arg("yes");
45            }
46            cmd.current_dir(dir.path());
47            cmd.spawn().unwrap()
48        },
49    )
50}
51
52fn spawn_replica_server(
53    port: u16,
54    master_port: u16,
55    dir: &TempDir,
56    tlspaths: &TlsFilePaths,
57    modules: &[Module],
58) -> RedisServer {
59    let config_file_path = dir.path().join("redis_config.conf");
60    File::create(&config_file_path).unwrap();
61
62    RedisServer::new_with_addr_tls_modules_and_spawner(
63        get_addr(port),
64        Some(&config_file_path),
65        Some(tlspaths.clone()),
66        MTLS_NOT_ENABLED,
67        modules,
68        |cmd| {
69            cmd.arg("--replicaof")
70                .arg("127.0.0.1")
71                .arg(master_port.to_string());
72            if let ConnectionAddr::TcpTls { .. } = get_addr(port) {
73                cmd.arg("--tls-replication").arg("yes");
74            }
75            cmd.current_dir(dir.path());
76            cmd.spawn().unwrap()
77        },
78    )
79}
80
81fn spawn_sentinel_server(
82    port: u16,
83    master_ports: &[u16],
84    dir: &TempDir,
85    tlspaths: &TlsFilePaths,
86    modules: &[Module],
87) -> RedisServer {
88    let config_file_path = dir.path().join("redis_config.conf");
89    let mut file = File::create(&config_file_path).unwrap();
90    for (i, master_port) in master_ports.iter().enumerate() {
91        file.write_all(
92            format!("sentinel monitor master{i} 127.0.0.1 {master_port} 1\n",).as_bytes(),
93        )
94        .unwrap();
95    }
96    file.flush().unwrap();
97
98    RedisServer::new_with_addr_tls_modules_and_spawner(
99        get_addr(port),
100        Some(&config_file_path),
101        Some(tlspaths.clone()),
102        MTLS_NOT_ENABLED,
103        modules,
104        |cmd| {
105            cmd.arg("--sentinel");
106            if let ConnectionAddr::TcpTls { .. } = get_addr(port) {
107                cmd.arg("--tls-replication").arg("yes");
108            }
109            cmd.current_dir(dir.path());
110            cmd.spawn().unwrap()
111        },
112    )
113}
114
115pub struct SentinelError;
116
117pub fn wait_for_master_server(
118    mut get_client_fn: impl FnMut() -> RedisResult<Client>,
119) -> Result<(), SentinelError> {
120    let rolecmd = redis::cmd("ROLE");
121    for _ in 0..100 {
122        let master_client = get_client_fn();
123        match master_client {
124            Ok(client) => match client.get_connection() {
125                Ok(mut conn) => {
126                    let r: Vec<redis::Value> = rolecmd.query(&mut conn).unwrap();
127                    let role = String::from_redis_value_ref(r.first().unwrap()).unwrap();
128                    if role.starts_with("master") {
129                        println!("found master");
130                        return Ok(());
131                    } else {
132                        println!("failed check for master role - current role: {r:?}")
133                    }
134                }
135                Err(err) => {
136                    println!("failed to get master connection: {err:?}",)
137                }
138            },
139            Err(err) => {
140                println!("failed to get master client: {err:?}",)
141            }
142        }
143
144        sleep(Duration::from_millis(25));
145    }
146
147    Err(SentinelError)
148}
149
150pub fn wait_for_replica(
151    mut get_client_fn: impl FnMut() -> RedisResult<Client>,
152) -> Result<(), SentinelError> {
153    let rolecmd = redis::cmd("ROLE");
154    for i in 0..300 {
155        let replica_client = get_client_fn();
156        match replica_client {
157            Ok(client) => match client.get_connection() {
158                Ok(mut conn) => {
159                    let r: Vec<redis::Value> = rolecmd.query(&mut conn).unwrap();
160                    let role = String::from_redis_value_ref(r.first().unwrap()).unwrap();
161                    let state = String::from_redis_value_ref(r.get(3).unwrap()).unwrap();
162                    if role.starts_with("slave") && state == "connected" {
163                        println!("found replica");
164                        return Ok(());
165                    } else {
166                        println!("failed check for replica role - current role: {r:?}")
167                    }
168                }
169                Err(err) => {
170                    println!("failed to get replica connection: {err:?}")
171                }
172            },
173            Err(err) => {
174                println!("failed to get replica client: {err:?}")
175            }
176        }
177
178        let delay = if i < 100 { 25 } else { 50 };
179        sleep(Duration::from_millis(delay));
180    }
181
182    Err(SentinelError)
183}
184
185fn wait_for_replicas_to_sync(servers: &[RedisServer], masters: u16) {
186    let cluster_size = servers.len() / (masters as usize);
187    let clusters = servers.len() / cluster_size;
188    let replicas = cluster_size - 1;
189
190    for cluster_index in 0..clusters {
191        let master_addr = servers[cluster_index * cluster_size].connection_info();
192        let r = wait_for_master_server(|| redis::Client::open(master_addr.clone()));
193        if r.is_err() {
194            panic!("failed waiting for master to be ready");
195        }
196
197        for replica_index in 0..replicas {
198            let replica_addr =
199                servers[(cluster_index * cluster_size) + 1 + replica_index].connection_info();
200            let r = wait_for_replica(|| redis::Client::open(replica_addr.clone()));
201            if r.is_err() {
202                panic!("failed waiting for replica to be ready and in sync");
203            }
204        }
205    }
206}
207
208impl RedisSentinelCluster {
209    pub fn new(masters: u16, replicas_per_master: u16, sentinels: u16) -> RedisSentinelCluster {
210        RedisSentinelCluster::with_modules(masters, replicas_per_master, sentinels, &[])
211    }
212
213    pub fn with_modules(
214        masters: u16,
215        replicas_per_master: u16,
216        sentinels: u16,
217        modules: &[Module],
218    ) -> RedisSentinelCluster {
219        let mut servers = vec![];
220        let mut folders = vec![];
221        let mut master_ports = vec![];
222
223        let tempdir = tempfile::Builder::new()
224            .prefix("redistls")
225            .tempdir()
226            .expect("failed to create tempdir");
227        let tlspaths = build_keys_and_certs_for_tls(&tempdir);
228        folders.push(tempdir);
229
230        let required_number_of_sockets = masters * (replicas_per_master + 1) + sentinels;
231        let mut available_ports = std::collections::HashSet::new();
232        while available_ports.len() < required_number_of_sockets as usize {
233            available_ports.insert(get_random_available_port());
234        }
235        let mut available_ports: Vec<_> = available_ports.into_iter().collect();
236
237        for _ in 0..masters {
238            let port = available_ports.pop().unwrap();
239            let tempdir = tempfile::Builder::new()
240                .prefix("redis")
241                .tempdir()
242                .expect("failed to create tempdir");
243            servers.push(spawn_master_server(port, &tempdir, &tlspaths, modules));
244            folders.push(tempdir);
245            master_ports.push(port);
246
247            for _ in 0..replicas_per_master {
248                let replica_port = available_ports.pop().unwrap();
249                let tempdir = tempfile::Builder::new()
250                    .prefix("redis")
251                    .tempdir()
252                    .expect("failed to create tempdir");
253                servers.push(spawn_replica_server(
254                    replica_port,
255                    port,
256                    &tempdir,
257                    &tlspaths,
258                    modules,
259                ));
260                folders.push(tempdir);
261            }
262        }
263
264        // Wait for replicas to sync so that the sentinels discover them on the first try
265        wait_for_replicas_to_sync(&servers, masters);
266
267        let mut sentinel_servers = vec![];
268        for _ in 0..sentinels {
269            let port = available_ports.pop().unwrap();
270            let tempdir = tempfile::Builder::new()
271                .prefix("redis")
272                .tempdir()
273                .expect("failed to create tempdir");
274
275            sentinel_servers.push(spawn_sentinel_server(
276                port,
277                &master_ports,
278                &tempdir,
279                &tlspaths,
280                modules,
281            ));
282            folders.push(tempdir);
283        }
284
285        RedisSentinelCluster {
286            servers,
287            sentinel_servers,
288            folders,
289        }
290    }
291
292    pub fn stop(&mut self) {
293        for server in &mut self.servers {
294            server.stop();
295        }
296        for server in &mut self.sentinel_servers {
297            server.stop();
298        }
299    }
300
301    pub fn iter_sentinel_servers(&self) -> impl Iterator<Item = &RedisServer> {
302        self.sentinel_servers.iter()
303    }
304}
305
306impl Drop for RedisSentinelCluster {
307    fn drop(&mut self) {
308        self.stop()
309    }
310}