Skip to main content

redis_test/
cluster.rs

1use std::{env, process, thread::sleep, time::Duration};
2
3use tempfile::TempDir;
4
5use crate::{
6    server::{Module, RedisServer},
7    utils::{TlsFilePaths, build_keys_and_certs_for_tls_ext, get_random_available_port},
8};
9
10/// Configuration for creating a Redis Cluster.
11pub struct RedisClusterConfiguration {
12    pub num_nodes: u16,
13    pub num_replicas: u16,
14    pub modules: Vec<Module>,
15    pub tls_insecure: bool,
16    pub mtls_enabled: bool,
17    pub ports: Vec<u16>,
18    pub certs_with_ip_alts: bool,
19}
20
21impl RedisClusterConfiguration {
22    pub fn single_replica_config() -> Self {
23        Self {
24            num_nodes: 6,
25            num_replicas: 1,
26            ..Default::default()
27        }
28    }
29}
30
31impl Default for RedisClusterConfiguration {
32    fn default() -> Self {
33        Self {
34            num_nodes: 3,
35            num_replicas: 0,
36            modules: vec![],
37            tls_insecure: true,
38            mtls_enabled: false,
39            ports: vec![],
40            certs_with_ip_alts: true,
41        }
42    }
43}
44
45/// Indicates the connection type for the cluster (TCP or TCP with TLS).
46#[derive(Debug, Clone, Copy, PartialEq)]
47#[non_exhaustive]
48pub enum ClusterType {
49    Tcp,
50    TcpTls,
51}
52
53impl ClusterType {
54    pub fn get_intended() -> ClusterType {
55        match env::var("REDISRS_SERVER_TYPE")
56            .ok()
57            .as_ref()
58            .map(|x| &x[..])
59        {
60            Some("tcp") => ClusterType::Tcp,
61            Some("tcp+tls") => ClusterType::TcpTls,
62            Some(val) => {
63                panic!("Unknown server type {val:?}");
64            }
65            None => ClusterType::Tcp,
66        }
67    }
68
69    fn build_addr(port: u16) -> redis::ConnectionAddr {
70        match ClusterType::get_intended() {
71            ClusterType::Tcp => redis::ConnectionAddr::Tcp("127.0.0.1".into(), port),
72            ClusterType::TcpTls => redis::ConnectionAddr::TcpTls {
73                host: "127.0.0.1".into(),
74                port,
75                insecure: true,
76                tls_params: None,
77            },
78        }
79    }
80}
81
82fn port_in_use(addr: &str) -> bool {
83    let socket_addr: std::net::SocketAddr = addr.parse().expect("Invalid address");
84    let socket = socket2::Socket::new(
85        socket2::Domain::for_address(socket_addr),
86        socket2::Type::STREAM,
87        None,
88    )
89    .expect("Failed to create socket");
90
91    socket.connect(&socket_addr.into()).is_ok()
92}
93
94/// A mock Redis Cluster for testing.
95///
96/// `RedisCluster` spawns multiple `RedisServer` instances and configures them as a cluster.
97/// It provides methods to start, stop, and interact with the servers.
98///
99/// # Example
100/// ```rust,no_run
101/// use redis_test::cluster::{RedisCluster, RedisClusterConfiguration};
102///
103/// let config = RedisClusterConfiguration::default();
104/// let cluster = RedisCluster::new(config);
105///
106/// // Get the connection details of the nodes and connect
107/// let addresses: Vec<_> = cluster.servers.iter().map(|s| s.connection_info()).collect();
108/// let client = redis::cluster::ClusterClient::new(addresses).unwrap();
109/// let mut connection = client.get_connection().unwrap();
110/// ```
111pub struct RedisCluster {
112    pub servers: Vec<RedisServer>,
113    pub folders: Vec<TempDir>,
114    pub tls_paths: Option<TlsFilePaths>,
115}
116
117impl RedisCluster {
118    pub fn username() -> &'static str {
119        "hello"
120    }
121
122    pub fn password() -> &'static str {
123        "world"
124    }
125
126    pub fn new(configuration: RedisClusterConfiguration) -> RedisCluster {
127        let RedisClusterConfiguration {
128            num_nodes: nodes,
129            num_replicas: replicas,
130            modules,
131            tls_insecure,
132            mtls_enabled,
133            ports,
134            certs_with_ip_alts,
135        } = configuration;
136
137        let optional_ports = if ports.is_empty() {
138            vec![None; nodes as usize]
139        } else {
140            assert!(ports.len() == nodes as usize);
141            ports.into_iter().map(Some).collect()
142        };
143        let mut chosen_ports = std::collections::HashSet::new();
144
145        let mut folders = vec![];
146        let mut addrs = vec![];
147        let mut tls_paths = None;
148
149        let mut is_tls = false;
150
151        if let ClusterType::TcpTls = ClusterType::get_intended() {
152            // Create a shared set of keys in cluster mode
153            let tempdir = tempfile::Builder::new()
154                .prefix("redis")
155                .tempdir()
156                .expect("failed to create tempdir");
157            let files = build_keys_and_certs_for_tls_ext(&tempdir, certs_with_ip_alts);
158            folders.push(tempdir);
159            tls_paths = Some(files);
160            is_tls = true;
161        }
162
163        let max_attempts = 5;
164
165        let mut make_server = |port| {
166            RedisServer::new_with_addr_tls_modules_and_spawner(
167                ClusterType::build_addr(port),
168                None,
169                tls_paths.clone(),
170                mtls_enabled,
171                None, // cert_auth_field - not used in cluster tests
172                &modules,
173                |cmd| {
174                    let tempdir = tempfile::Builder::new()
175                        .prefix("redis")
176                        .tempdir()
177                        .expect("failed to create tempdir");
178                    let acl_path = tempdir.path().join("users.acl");
179                    let acl_content = format!(
180                        "user {} on allcommands allkeys >{}",
181                        Self::username(),
182                        Self::password()
183                    );
184                    std::fs::write(&acl_path, acl_content).expect("failed to write acl file");
185                    cmd.arg("--cluster-enabled")
186                        .arg("yes")
187                        .arg("--cluster-config-file")
188                        .arg(tempdir.path().join("nodes.conf"))
189                        .arg("--cluster-node-timeout")
190                        .arg("5000")
191                        .arg("--aclfile")
192                        .arg(&acl_path);
193                    if is_tls {
194                        cmd.arg("--tls-cluster").arg("yes");
195                        if replicas > 0 {
196                            cmd.arg("--tls-replication").arg("yes");
197                        }
198                    }
199                    cmd.current_dir(tempdir.path());
200                    folders.push(tempdir);
201                    cmd.spawn().unwrap()
202                },
203            )
204        };
205
206        let verify_server = |server: &mut RedisServer| {
207            let process = &mut server.process;
208            match process.try_wait() {
209                Ok(Some(status)) => {
210                    let log_file_contents = server.log_file_contents();
211                    let err = format!(
212                        "redis server creation failed with status {status:?}.\nlog file: {log_file_contents:?}"
213                    );
214                    Err(err)
215                }
216                Ok(None) => {
217                    // wait for 10 seconds for the server to be available.
218                    let max_attempts = 200;
219                    let mut cur_attempts = 0;
220                    loop {
221                        if cur_attempts == max_attempts {
222                            let log_file_contents = server.log_file_contents();
223                            break Err(format!(
224                                "redis server creation failed: Address {} closed. {log_file_contents:?}",
225                                server.addr
226                            ));
227                        } else if port_in_use(&server.addr.to_string()) {
228                            break Ok(());
229                        }
230                        eprintln!("Waiting for redis process to initialize");
231                        sleep(Duration::from_millis(50));
232                        cur_attempts += 1;
233                    }
234                }
235                Err(e) => {
236                    panic!("Unexpected error in redis server creation {e}");
237                }
238            }
239        };
240
241        let servers = optional_ports
242            .into_iter()
243            .map(|port_option| {
244                for _ in 0..5 {
245                    let port = match port_option {
246                        Some(port) => port,
247                        None => loop {
248                            let port = get_random_available_port();
249                            if chosen_ports.contains(&port) {
250                                continue;
251                            }
252                            chosen_ports.insert(port);
253                            break port;
254                        },
255                    };
256                    let mut server = make_server(port);
257                    sleep(Duration::from_millis(50));
258
259                    match verify_server(&mut server) {
260                        Ok(_) => {
261                            let addr = format!("127.0.0.1:{port}");
262                            addrs.push(addr.clone());
263                            return server;
264                        }
265                        Err(err) => eprintln!("{err}"),
266                    }
267                }
268                panic!("Exhausted retries");
269            })
270            .collect();
271
272        let mut cmd = process::Command::new("redis-cli");
273        cmd.stdout(process::Stdio::piped())
274            .arg("--cluster")
275            .arg("create")
276            .args(&addrs);
277        if replicas > 0 {
278            cmd.arg("--cluster-replicas").arg(replicas.to_string());
279        }
280        cmd.arg("--cluster-yes");
281
282        if is_tls {
283            if mtls_enabled {
284                if let Some(TlsFilePaths {
285                    redis_crt,
286                    redis_key,
287                    ca_crt,
288                }) = &tls_paths
289                {
290                    cmd.arg("--cert");
291                    cmd.arg(redis_crt);
292                    cmd.arg("--key");
293                    cmd.arg(redis_key);
294                    cmd.arg("--cacert");
295                    cmd.arg(ca_crt);
296                    cmd.arg("--tls");
297                }
298            } else if !tls_insecure && tls_paths.is_some() {
299                let ca_crt = &tls_paths.as_ref().unwrap().ca_crt;
300                cmd.arg("--tls").arg("--cacert").arg(ca_crt);
301            } else {
302                cmd.arg("--tls").arg("--insecure");
303            }
304        }
305
306        let mut cur_attempts = 0;
307        loop {
308            let output = cmd.output().unwrap();
309            if output.status.success() {
310                break;
311            } else {
312                let err = format!("Cluster creation failed: {output:?}");
313                if cur_attempts == max_attempts {
314                    panic!("{err}");
315                }
316                eprintln!("Retrying: {err}");
317                sleep(Duration::from_millis(50));
318                cur_attempts += 1;
319            }
320        }
321
322        let cluster = RedisCluster {
323            servers,
324            folders,
325            tls_paths,
326        };
327        if replicas > 0 {
328            cluster.wait_for_replicas(replicas);
329        }
330
331        wait_for_status_ok(&cluster);
332        cluster
333    }
334
335    fn wait_for_replicas(&self, replicas: u16) {
336        'server: for server in &self.servers {
337            let conn_info = server.connection_info();
338            eprintln!(
339                "waiting until {:?} knows required number of replicas",
340                conn_info.addr()
341            );
342
343            let client = redis::Client::open(server.connection_info()).unwrap();
344
345            let mut con = client.get_connection().unwrap();
346
347            // retry 500 times
348            for _ in 1..500 {
349                let value = redis::cmd("CLUSTER").arg("SLOTS").query(&mut con).unwrap();
350                let slots: Vec<Vec<redis::Value>> = redis::from_redis_value(value).unwrap();
351
352                // all slots should have following items:
353                // [start slot range, end slot range, master's IP, replica1's IP, replica2's IP,... ]
354                if slots.iter().all(|slot| slot.len() >= 3 + replicas as usize) {
355                    continue 'server;
356                }
357
358                sleep(Duration::from_millis(100));
359            }
360
361            panic!("failed to create enough replicas");
362        }
363    }
364
365    pub fn stop(&mut self) {
366        for server in &mut self.servers {
367            server.stop();
368        }
369    }
370
371    pub fn iter_servers(&self) -> impl Iterator<Item = &RedisServer> {
372        self.servers.iter()
373    }
374}
375
376fn wait_for_status_ok(cluster: &RedisCluster) {
377    'server: for server in &cluster.servers {
378        let log_file = RedisServer::log_file(&server.tempdir);
379
380        for _ in 1..500 {
381            let contents =
382                std::fs::read_to_string(&log_file).expect("Should have been able to read the file");
383
384            if contents.contains("Cluster state changed: ok") {
385                continue 'server;
386            }
387            sleep(Duration::from_millis(20));
388        }
389        panic!("failed to reach state change: OK");
390    }
391}
392
393impl Drop for RedisCluster {
394    fn drop(&mut self) {
395        self.stop()
396    }
397}