1use std::{fs::File, io::Write, thread::sleep, time::Duration};
2
3use redis::{Client, ConnectionAddr, FromRedisValue, RedisResult};
4use tempfile::TempDir;
5
6use crate::{
7 server::{Module, RedisServer},
8 utils::{build_keys_and_certs_for_tls, get_random_available_port, TlsFilePaths},
9};
10
11pub struct RedisSentinelCluster {
12 pub servers: Vec<RedisServer>,
13 pub sentinel_servers: Vec<RedisServer>,
14 pub folders: Vec<TempDir>,
15}
16
17const MTLS_NOT_ENABLED: bool = false;
18
19fn get_addr(port: u16) -> ConnectionAddr {
20 let addr = RedisServer::get_addr(port);
21 if let ConnectionAddr::Unix(_) = addr {
22 ConnectionAddr::Tcp(String::from("127.0.0.1"), port)
23 } else {
24 addr
25 }
26}
27
28fn spawn_master_server(
29 port: u16,
30 dir: &TempDir,
31 tlspaths: &TlsFilePaths,
32 modules: &[Module],
33) -> RedisServer {
34 RedisServer::new_with_addr_tls_modules_and_spawner(
35 get_addr(port),
36 None,
37 Some(tlspaths.clone()),
38 MTLS_NOT_ENABLED,
39 modules,
40 |cmd| {
41 cmd.arg("--repl-diskless-sync-delay").arg("0");
43 cmd.arg("--appendonly").arg("yes");
44 if let ConnectionAddr::TcpTls { .. } = get_addr(port) {
45 cmd.arg("--tls-replication").arg("yes");
46 }
47 cmd.current_dir(dir.path());
48 cmd.spawn().unwrap()
49 },
50 )
51}
52
53fn spawn_replica_server(
54 port: u16,
55 master_port: u16,
56 dir: &TempDir,
57 tlspaths: &TlsFilePaths,
58 modules: &[Module],
59) -> RedisServer {
60 let config_file_path = dir.path().join("redis_config.conf");
61 File::create(&config_file_path).unwrap();
62
63 RedisServer::new_with_addr_tls_modules_and_spawner(
64 get_addr(port),
65 Some(&config_file_path),
66 Some(tlspaths.clone()),
67 MTLS_NOT_ENABLED,
68 modules,
69 |cmd| {
70 cmd.arg("--replicaof")
71 .arg("127.0.0.1")
72 .arg(master_port.to_string());
73 if let ConnectionAddr::TcpTls { .. } = get_addr(port) {
74 cmd.arg("--tls-replication").arg("yes");
75 }
76 cmd.arg("--appendonly").arg("yes");
77 cmd.current_dir(dir.path());
78 cmd.spawn().unwrap()
79 },
80 )
81}
82
83fn spawn_sentinel_server(
84 port: u16,
85 master_ports: &[u16],
86 dir: &TempDir,
87 tlspaths: &TlsFilePaths,
88 modules: &[Module],
89) -> RedisServer {
90 let config_file_path = dir.path().join("redis_config.conf");
91 let mut file = File::create(&config_file_path).unwrap();
92 for (i, master_port) in master_ports.iter().enumerate() {
93 file.write_all(
94 format!("sentinel monitor master{i} 127.0.0.1 {master_port} 1\n",).as_bytes(),
95 )
96 .unwrap();
97 }
98 file.flush().unwrap();
99
100 RedisServer::new_with_addr_tls_modules_and_spawner(
101 get_addr(port),
102 Some(&config_file_path),
103 Some(tlspaths.clone()),
104 MTLS_NOT_ENABLED,
105 modules,
106 |cmd| {
107 cmd.arg("--sentinel");
108 cmd.arg("--appendonly").arg("yes");
109 if let ConnectionAddr::TcpTls { .. } = get_addr(port) {
110 cmd.arg("--tls-replication").arg("yes");
111 }
112 cmd.current_dir(dir.path());
113 cmd.spawn().unwrap()
114 },
115 )
116}
117
118pub struct SentinelError;
119
120pub fn wait_for_master_server(
121 mut get_client_fn: impl FnMut() -> RedisResult<Client>,
122) -> Result<(), SentinelError> {
123 let rolecmd = redis::cmd("ROLE");
124 for _ in 0..100 {
125 let master_client = get_client_fn();
126 match master_client {
127 Ok(client) => match client.get_connection() {
128 Ok(mut conn) => {
129 let r: Vec<redis::Value> = rolecmd.query(&mut conn).unwrap();
130 let role = String::from_redis_value(r.first().unwrap()).unwrap();
131 if role.starts_with("master") {
132 return Ok(());
133 } else {
134 println!("failed check for master role - current role: {r:?}")
135 }
136 }
137 Err(err) => {
138 println!("failed to get master connection: {err:?}",)
139 }
140 },
141 Err(err) => {
142 println!("failed to get master client: {err:?}",)
143 }
144 }
145
146 sleep(Duration::from_millis(25));
147 }
148
149 Err(SentinelError)
150}
151
152pub fn wait_for_replica(
153 mut get_client_fn: impl FnMut() -> RedisResult<Client>,
154) -> Result<(), SentinelError> {
155 let rolecmd = redis::cmd("ROLE");
156 for _ in 0..200 {
157 let replica_client = get_client_fn();
158 match replica_client {
159 Ok(client) => match client.get_connection() {
160 Ok(mut conn) => {
161 let r: Vec<redis::Value> = rolecmd.query(&mut conn).unwrap();
162 let role = String::from_redis_value(r.first().unwrap()).unwrap();
163 let state = String::from_redis_value(r.get(3).unwrap()).unwrap();
164 if role.starts_with("slave") && state == "connected" {
165 return Ok(());
166 } else {
167 println!("failed check for replica role - current role: {r:?}")
168 }
169 }
170 Err(err) => {
171 println!("failed to get replica connection: {err:?}")
172 }
173 },
174 Err(err) => {
175 println!("failed to get replica client: {err:?}")
176 }
177 }
178
179 sleep(Duration::from_millis(25));
180 }
181
182 Err(SentinelError)
183}
184
185fn wait_for_replicas_to_sync(servers: &[RedisServer], masters: u16) {
186 let cluster_size = servers.len() / (masters as usize);
187 let clusters = servers.len() / cluster_size;
188 let replicas = cluster_size - 1;
189
190 for cluster_index in 0..clusters {
191 let master_addr = servers[cluster_index * cluster_size].connection_info();
192 let r = wait_for_master_server(|| redis::Client::open(master_addr.clone()));
193 if r.is_err() {
194 panic!("failed waiting for master to be ready");
195 }
196
197 for replica_index in 0..replicas {
198 let replica_addr =
199 servers[(cluster_index * cluster_size) + 1 + replica_index].connection_info();
200 let r = wait_for_replica(|| redis::Client::open(replica_addr.clone()));
201 if r.is_err() {
202 panic!("failed waiting for replica to be ready and in sync");
203 }
204 }
205 }
206}
207
208impl RedisSentinelCluster {
209 pub fn new(masters: u16, replicas_per_master: u16, sentinels: u16) -> RedisSentinelCluster {
210 RedisSentinelCluster::with_modules(masters, replicas_per_master, sentinels, &[])
211 }
212
213 pub fn with_modules(
214 masters: u16,
215 replicas_per_master: u16,
216 sentinels: u16,
217 modules: &[Module],
218 ) -> RedisSentinelCluster {
219 let mut servers = vec![];
220 let mut folders = vec![];
221 let mut master_ports = vec![];
222
223 let tempdir = tempfile::Builder::new()
224 .prefix("redistls")
225 .tempdir()
226 .expect("failed to create tempdir");
227 let tlspaths = build_keys_and_certs_for_tls(&tempdir);
228 folders.push(tempdir);
229
230 let required_number_of_sockets = masters * (replicas_per_master + 1) + sentinels;
231 let mut available_ports = std::collections::HashSet::new();
232 while available_ports.len() < required_number_of_sockets as usize {
233 available_ports.insert(get_random_available_port());
234 }
235 let mut available_ports: Vec<_> = available_ports.into_iter().collect();
236
237 for _ in 0..masters {
238 let port = available_ports.pop().unwrap();
239 let tempdir = tempfile::Builder::new()
240 .prefix("redis")
241 .tempdir()
242 .expect("failed to create tempdir");
243 servers.push(spawn_master_server(port, &tempdir, &tlspaths, modules));
244 folders.push(tempdir);
245 master_ports.push(port);
246
247 for _ in 0..replicas_per_master {
248 let replica_port = available_ports.pop().unwrap();
249 let tempdir = tempfile::Builder::new()
250 .prefix("redis")
251 .tempdir()
252 .expect("failed to create tempdir");
253 servers.push(spawn_replica_server(
254 replica_port,
255 port,
256 &tempdir,
257 &tlspaths,
258 modules,
259 ));
260 folders.push(tempdir);
261 }
262 }
263
264 wait_for_replicas_to_sync(&servers, masters);
266
267 let mut sentinel_servers = vec![];
268 for _ in 0..sentinels {
269 let port = available_ports.pop().unwrap();
270 let tempdir = tempfile::Builder::new()
271 .prefix("redis")
272 .tempdir()
273 .expect("failed to create tempdir");
274
275 sentinel_servers.push(spawn_sentinel_server(
276 port,
277 &master_ports,
278 &tempdir,
279 &tlspaths,
280 modules,
281 ));
282 folders.push(tempdir);
283 }
284
285 RedisSentinelCluster {
286 servers,
287 sentinel_servers,
288 folders,
289 }
290 }
291
292 pub fn stop(&mut self) {
293 for server in &mut self.servers {
294 server.stop();
295 }
296 for server in &mut self.sentinel_servers {
297 server.stop();
298 }
299 }
300
301 pub fn iter_sentinel_servers(&self) -> impl Iterator<Item = &RedisServer> {
302 self.sentinel_servers.iter()
303 }
304}
305
306impl Drop for RedisSentinelCluster {
307 fn drop(&mut self) {
308 self.stop()
309 }
310}