1use std::{fs::File, io::Write, thread::sleep, time::Duration};
2
3use redis::{Client, ConnectionAddr, FromRedisValue, RedisResult};
4use tempfile::TempDir;
5
6use crate::{
7 server::{Module, RedisServer},
8 utils::{TlsFilePaths, build_keys_and_certs_for_tls, get_random_available_port},
9};
10
11pub struct RedisSentinelCluster {
12 pub servers: Vec<RedisServer>,
13 pub sentinel_servers: Vec<RedisServer>,
14 pub folders: Vec<TempDir>,
15}
16
17const MTLS_NOT_ENABLED: bool = false;
18
19fn get_addr(port: u16) -> ConnectionAddr {
20 let addr = RedisServer::get_addr(port);
21 if let ConnectionAddr::Unix(_) = addr {
22 ConnectionAddr::Tcp(String::from("127.0.0.1"), port)
23 } else {
24 addr
25 }
26}
27
28fn spawn_master_server(
29 port: u16,
30 dir: &TempDir,
31 tlspaths: &TlsFilePaths,
32 modules: &[Module],
33) -> RedisServer {
34 RedisServer::new_with_addr_tls_modules_and_spawner(
35 get_addr(port),
36 None,
37 Some(tlspaths.clone()),
38 MTLS_NOT_ENABLED,
39 modules,
40 |cmd| {
41 cmd.arg("--repl-diskless-sync-delay").arg("0");
43 if let ConnectionAddr::TcpTls { .. } = get_addr(port) {
44 cmd.arg("--tls-replication").arg("yes");
45 }
46 cmd.current_dir(dir.path());
47 cmd.spawn().unwrap()
48 },
49 )
50}
51
52fn spawn_replica_server(
53 port: u16,
54 master_port: u16,
55 dir: &TempDir,
56 tlspaths: &TlsFilePaths,
57 modules: &[Module],
58) -> RedisServer {
59 let config_file_path = dir.path().join("redis_config.conf");
60 File::create(&config_file_path).unwrap();
61
62 RedisServer::new_with_addr_tls_modules_and_spawner(
63 get_addr(port),
64 Some(&config_file_path),
65 Some(tlspaths.clone()),
66 MTLS_NOT_ENABLED,
67 modules,
68 |cmd| {
69 cmd.arg("--replicaof")
70 .arg("127.0.0.1")
71 .arg(master_port.to_string());
72 if let ConnectionAddr::TcpTls { .. } = get_addr(port) {
73 cmd.arg("--tls-replication").arg("yes");
74 }
75 cmd.current_dir(dir.path());
76 cmd.spawn().unwrap()
77 },
78 )
79}
80
81fn spawn_sentinel_server(
82 port: u16,
83 master_ports: &[u16],
84 dir: &TempDir,
85 tlspaths: &TlsFilePaths,
86 modules: &[Module],
87) -> RedisServer {
88 let config_file_path = dir.path().join("redis_config.conf");
89 let mut file = File::create(&config_file_path).unwrap();
90 for (i, master_port) in master_ports.iter().enumerate() {
91 file.write_all(
92 format!("sentinel monitor master{i} 127.0.0.1 {master_port} 1\n",).as_bytes(),
93 )
94 .unwrap();
95 }
96 file.flush().unwrap();
97
98 RedisServer::new_with_addr_tls_modules_and_spawner(
99 get_addr(port),
100 Some(&config_file_path),
101 Some(tlspaths.clone()),
102 MTLS_NOT_ENABLED,
103 modules,
104 |cmd| {
105 cmd.arg("--sentinel");
106 if let ConnectionAddr::TcpTls { .. } = get_addr(port) {
107 cmd.arg("--tls-replication").arg("yes");
108 }
109 cmd.current_dir(dir.path());
110 cmd.spawn().unwrap()
111 },
112 )
113}
114
115pub struct SentinelError;
116
117pub fn wait_for_master_server(
118 mut get_client_fn: impl FnMut() -> RedisResult<Client>,
119) -> Result<(), SentinelError> {
120 let rolecmd = redis::cmd("ROLE");
121 for _ in 0..100 {
122 let master_client = get_client_fn();
123 match master_client {
124 Ok(client) => match client.get_connection() {
125 Ok(mut conn) => {
126 let r: Vec<redis::Value> = rolecmd.query(&mut conn).unwrap();
127 let role = String::from_redis_value_ref(r.first().unwrap()).unwrap();
128 if role.starts_with("master") {
129 println!("found master");
130 return Ok(());
131 } else {
132 println!("failed check for master role - current role: {r:?}")
133 }
134 }
135 Err(err) => {
136 println!("failed to get master connection: {err:?}",)
137 }
138 },
139 Err(err) => {
140 println!("failed to get master client: {err:?}",)
141 }
142 }
143
144 sleep(Duration::from_millis(25));
145 }
146
147 Err(SentinelError)
148}
149
150pub fn wait_for_replica(
151 mut get_client_fn: impl FnMut() -> RedisResult<Client>,
152) -> Result<(), SentinelError> {
153 let rolecmd = redis::cmd("ROLE");
154 for i in 0..300 {
155 let replica_client = get_client_fn();
156 match replica_client {
157 Ok(client) => match client.get_connection() {
158 Ok(mut conn) => {
159 let r: Vec<redis::Value> = rolecmd.query(&mut conn).unwrap();
160 let role = String::from_redis_value_ref(r.first().unwrap()).unwrap();
161 let state = String::from_redis_value_ref(r.get(3).unwrap()).unwrap();
162 if role.starts_with("slave") && state == "connected" {
163 println!("found replica");
164 return Ok(());
165 } else {
166 println!("failed check for replica role - current role: {r:?}")
167 }
168 }
169 Err(err) => {
170 println!("failed to get replica connection: {err:?}")
171 }
172 },
173 Err(err) => {
174 println!("failed to get replica client: {err:?}")
175 }
176 }
177
178 let delay = if i < 100 { 25 } else { 50 };
179 sleep(Duration::from_millis(delay));
180 }
181
182 Err(SentinelError)
183}
184
185fn wait_for_replicas_to_sync(servers: &[RedisServer], masters: u16) {
186 let cluster_size = servers.len() / (masters as usize);
187 let clusters = servers.len() / cluster_size;
188 let replicas = cluster_size - 1;
189
190 for cluster_index in 0..clusters {
191 let master_addr = servers[cluster_index * cluster_size].connection_info();
192 let r = wait_for_master_server(|| redis::Client::open(master_addr.clone()));
193 if r.is_err() {
194 panic!("failed waiting for master to be ready");
195 }
196
197 for replica_index in 0..replicas {
198 let replica_addr =
199 servers[(cluster_index * cluster_size) + 1 + replica_index].connection_info();
200 let r = wait_for_replica(|| redis::Client::open(replica_addr.clone()));
201 if r.is_err() {
202 panic!("failed waiting for replica to be ready and in sync");
203 }
204 }
205 }
206}
207
208impl RedisSentinelCluster {
209 pub fn new(masters: u16, replicas_per_master: u16, sentinels: u16) -> RedisSentinelCluster {
210 RedisSentinelCluster::with_modules(masters, replicas_per_master, sentinels, &[])
211 }
212
213 pub fn with_modules(
214 masters: u16,
215 replicas_per_master: u16,
216 sentinels: u16,
217 modules: &[Module],
218 ) -> RedisSentinelCluster {
219 let mut servers = vec![];
220 let mut folders = vec![];
221 let mut master_ports = vec![];
222
223 let tempdir = tempfile::Builder::new()
224 .prefix("redistls")
225 .tempdir()
226 .expect("failed to create tempdir");
227 let tlspaths = build_keys_and_certs_for_tls(&tempdir);
228 folders.push(tempdir);
229
230 let required_number_of_sockets = masters * (replicas_per_master + 1) + sentinels;
231 let mut available_ports = std::collections::HashSet::new();
232 while available_ports.len() < required_number_of_sockets as usize {
233 available_ports.insert(get_random_available_port());
234 }
235 let mut available_ports: Vec<_> = available_ports.into_iter().collect();
236
237 for _ in 0..masters {
238 let port = available_ports.pop().unwrap();
239 let tempdir = tempfile::Builder::new()
240 .prefix("redis")
241 .tempdir()
242 .expect("failed to create tempdir");
243 servers.push(spawn_master_server(port, &tempdir, &tlspaths, modules));
244 folders.push(tempdir);
245 master_ports.push(port);
246
247 for _ in 0..replicas_per_master {
248 let replica_port = available_ports.pop().unwrap();
249 let tempdir = tempfile::Builder::new()
250 .prefix("redis")
251 .tempdir()
252 .expect("failed to create tempdir");
253 servers.push(spawn_replica_server(
254 replica_port,
255 port,
256 &tempdir,
257 &tlspaths,
258 modules,
259 ));
260 folders.push(tempdir);
261 }
262 }
263
264 wait_for_replicas_to_sync(&servers, masters);
266
267 let mut sentinel_servers = vec![];
268 for _ in 0..sentinels {
269 let port = available_ports.pop().unwrap();
270 let tempdir = tempfile::Builder::new()
271 .prefix("redis")
272 .tempdir()
273 .expect("failed to create tempdir");
274
275 sentinel_servers.push(spawn_sentinel_server(
276 port,
277 &master_ports,
278 &tempdir,
279 &tlspaths,
280 modules,
281 ));
282 folders.push(tempdir);
283 }
284
285 RedisSentinelCluster {
286 servers,
287 sentinel_servers,
288 folders,
289 }
290 }
291
292 pub fn stop(&mut self) {
293 for server in &mut self.servers {
294 server.stop();
295 }
296 for server in &mut self.sentinel_servers {
297 server.stop();
298 }
299 }
300
301 pub fn iter_sentinel_servers(&self) -> impl Iterator<Item = &RedisServer> {
302 self.sentinel_servers.iter()
303 }
304}
305
306impl Drop for RedisSentinelCluster {
307 fn drop(&mut self) {
308 self.stop()
309 }
310}