1#![allow(clippy::doc_markdown)]
4#![allow(clippy::must_use_candidate)]
5#![allow(clippy::return_self_not_must_use)]
6#![allow(clippy::uninlined_format_args)]
7#![allow(clippy::cast_possible_truncation)]
8#![allow(clippy::missing_errors_doc)]
9
10use crate::template::{Template, TemplateConfig, TemplateError};
11use crate::{DockerCommand, ExecCommand, NetworkCreateCommand, RunCommand};
12use async_trait::async_trait;
13
14pub struct RedisClusterTemplate {
16 name: String,
18 num_masters: usize,
20 num_replicas: usize,
22 port_base: u16,
24 network_name: String,
26 password: Option<String>,
28 announce_ip: Option<String>,
30 volume_prefix: Option<String>,
32 memory_limit: Option<String>,
34 node_timeout: u32,
36 auto_remove: bool,
38 use_redis_stack: bool,
40 with_redis_insight: bool,
42 redis_insight_port: u16,
44 redis_image: Option<String>,
46 redis_tag: Option<String>,
48 platform: Option<String>,
50}
51
52impl RedisClusterTemplate {
53 pub fn new(name: impl Into<String>) -> Self {
55 let name = name.into();
56 let network_name = format!("{}-network", name);
57
58 Self {
59 name,
60 num_masters: 3,
61 num_replicas: 0,
62 port_base: 7000,
63 network_name,
64 password: None,
65 announce_ip: None,
66 volume_prefix: None,
67 memory_limit: None,
68 node_timeout: 5000,
69 auto_remove: false,
70 use_redis_stack: false,
71 with_redis_insight: false,
72 redis_insight_port: 8001,
73 redis_image: None,
74 redis_tag: None,
75 platform: None,
76 }
77 }
78
79 pub fn from_env(name: impl Into<String>) -> Self {
99 let mut template = Self::new(name);
100
101 if let Ok(port_base) = std::env::var("REDIS_CLUSTER_PORT_BASE") {
102 if let Ok(port) = port_base.parse::<u16>() {
103 template.port_base = port;
104 }
105 }
106
107 if let Ok(num_masters) = std::env::var("REDIS_CLUSTER_NUM_MASTERS") {
108 if let Ok(masters) = num_masters.parse::<usize>() {
109 template.num_masters = masters.max(3);
110 }
111 }
112
113 if let Ok(num_replicas) = std::env::var("REDIS_CLUSTER_NUM_REPLICAS") {
114 if let Ok(replicas) = num_replicas.parse::<usize>() {
115 template.num_replicas = replicas;
116 }
117 }
118
119 if let Ok(password) = std::env::var("REDIS_CLUSTER_PASSWORD") {
120 template.password = Some(password);
121 }
122
123 template
124 }
125
126 pub fn get_port_base(&self) -> u16 {
128 self.port_base
129 }
130
131 pub fn get_num_masters(&self) -> usize {
133 self.num_masters
134 }
135
136 pub fn get_num_replicas(&self) -> usize {
138 self.num_replicas
139 }
140
141 pub fn num_masters(mut self, masters: usize) -> Self {
143 self.num_masters = masters.max(3);
144 self
145 }
146
147 pub fn num_replicas(mut self, replicas: usize) -> Self {
149 self.num_replicas = replicas;
150 self
151 }
152
153 pub fn port_base(mut self, port: u16) -> Self {
155 self.port_base = port;
156 self
157 }
158
159 pub fn password(mut self, password: impl Into<String>) -> Self {
161 self.password = Some(password.into());
162 self
163 }
164
165 pub fn cluster_announce_ip(mut self, ip: impl Into<String>) -> Self {
167 self.announce_ip = Some(ip.into());
168 self
169 }
170
171 pub fn with_persistence(mut self, volume_prefix: impl Into<String>) -> Self {
173 self.volume_prefix = Some(volume_prefix.into());
174 self
175 }
176
177 pub fn memory_limit(mut self, limit: impl Into<String>) -> Self {
179 self.memory_limit = Some(limit.into());
180 self
181 }
182
183 pub fn cluster_node_timeout(mut self, timeout: u32) -> Self {
185 self.node_timeout = timeout;
186 self
187 }
188
189 pub fn auto_remove(mut self) -> Self {
191 self.auto_remove = true;
192 self
193 }
194
195 pub fn with_redis_stack(mut self) -> Self {
197 self.use_redis_stack = true;
198 self
199 }
200
201 pub fn with_redis_insight(mut self) -> Self {
203 self.with_redis_insight = true;
204 self
205 }
206
207 pub fn redis_insight_port(mut self, port: u16) -> Self {
209 self.redis_insight_port = port;
210 self
211 }
212
213 pub fn custom_redis_image(mut self, image: impl Into<String>, tag: impl Into<String>) -> Self {
215 self.redis_image = Some(image.into());
216 self.redis_tag = Some(tag.into());
217 self
218 }
219
220 pub fn platform(mut self, platform: impl Into<String>) -> Self {
222 self.platform = Some(platform.into());
223 self
224 }
225
226 fn total_nodes(&self) -> usize {
228 self.num_masters + (self.num_masters * self.num_replicas)
229 }
230
231 async fn create_network(&self) -> Result<String, TemplateError> {
233 let output = NetworkCreateCommand::new(&self.network_name)
234 .driver("bridge")
235 .execute()
236 .await?;
237
238 Ok(output.stdout.trim().to_string())
240 }
241
242 async fn start_node(&self, node_index: usize) -> Result<String, TemplateError> {
244 let node_name = format!("{}-node-{}", self.name, node_index);
245 let port = self.port_base + node_index as u16;
246 let cluster_port = port + 10000;
247
248 let image = if let Some(ref custom_image) = self.redis_image {
250 if let Some(ref tag) = self.redis_tag {
251 format!("{}:{}", custom_image, tag)
252 } else {
253 custom_image.clone()
254 }
255 } else if self.use_redis_stack {
256 "redis/redis-stack-server:latest".to_string()
257 } else {
258 "redis:7-alpine".to_string()
259 };
260
261 let mut cmd = RunCommand::new(image)
262 .name(&node_name)
263 .network(&self.network_name)
264 .port(port, 6379)
265 .port(cluster_port, 16379)
266 .detach();
267
268 if let Some(ref limit) = self.memory_limit {
270 cmd = cmd.memory(limit);
271 }
272
273 if let Some(ref prefix) = self.volume_prefix {
275 let volume_name = format!("{}-{}", prefix, node_index);
276 cmd = cmd.volume(&volume_name, "/data");
277 }
278
279 if let Some(ref platform) = self.platform {
281 cmd = cmd.platform(platform);
282 }
283
284 if self.auto_remove {
286 cmd = cmd.remove();
287 }
288
289 let mut redis_args = vec![
291 "redis-server".to_string(),
292 "--cluster-enabled".to_string(),
293 "yes".to_string(),
294 "--cluster-config-file".to_string(),
295 "nodes.conf".to_string(),
296 "--cluster-node-timeout".to_string(),
297 self.node_timeout.to_string(),
298 "--appendonly".to_string(),
299 "yes".to_string(),
300 "--port".to_string(),
301 "6379".to_string(),
302 ];
303
304 if let Some(ref password) = self.password {
306 redis_args.push("--requirepass".to_string());
307 redis_args.push(password.clone());
308 redis_args.push("--masterauth".to_string());
309 redis_args.push(password.clone());
310 }
311
312 if let Some(ref ip) = self.announce_ip {
314 redis_args.push("--cluster-announce-ip".to_string());
315 redis_args.push(ip.clone());
316 redis_args.push("--cluster-announce-port".to_string());
317 redis_args.push(port.to_string());
318 redis_args.push("--cluster-announce-bus-port".to_string());
319 redis_args.push(cluster_port.to_string());
320 }
321
322 cmd = cmd.cmd(redis_args);
323
324 let output = cmd.execute().await?;
325 Ok(output.0)
326 }
327
328 async fn start_redis_insight(&self) -> Result<String, TemplateError> {
330 let insight_name = format!("{}-insight", self.name);
331
332 let mut cmd = RunCommand::new("redislabs/redisinsight:latest")
333 .name(&insight_name)
334 .network(&self.network_name)
335 .port(self.redis_insight_port, 8001)
336 .detach();
337
338 if let Some(ref prefix) = self.volume_prefix {
340 let volume_name = format!("{}-insight", prefix);
341 cmd = cmd.volume(&volume_name, "/db");
342 }
343
344 if self.auto_remove {
346 cmd = cmd.remove();
347 }
348
349 cmd = cmd.env("RITRUSTEDORIGINS", "http://localhost");
351
352 let output = cmd.execute().await?;
353 Ok(output.0)
354 }
355
356 async fn initialize_cluster(&self, container_ids: &[String]) -> Result<(), TemplateError> {
358 if container_ids.is_empty() {
359 return Err(TemplateError::InvalidConfig(
360 "No containers to initialize cluster".to_string(),
361 ));
362 }
363
364 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
366
367 let mut create_args = vec![
369 "redis-cli".to_string(),
370 "--cluster".to_string(),
371 "create".to_string(),
372 ];
373
374 for i in 0..self.total_nodes() {
376 let host = format!("{}-node-{}", self.name, i);
377 let port = 6379;
378 create_args.push(format!("{}:{}", host, port));
379 }
380
381 if self.num_replicas > 0 {
383 create_args.push("--cluster-replicas".to_string());
384 create_args.push(self.num_replicas.to_string());
385 }
386
387 if let Some(ref password) = self.password {
389 create_args.push("-a".to_string());
390 create_args.push(password.clone());
391 }
392
393 create_args.push("--cluster-yes".to_string());
395
396 let first_node_name = format!("{}-node-0", self.name);
398
399 ExecCommand::new(&first_node_name, create_args)
400 .execute()
401 .await?;
402
403 Ok(())
404 }
405
406 pub async fn cluster_info(&self) -> Result<ClusterInfo, TemplateError> {
408 let node_name = format!("{}-node-0", self.name);
409
410 let mut info_args = vec![
411 "redis-cli".to_string(),
412 "--cluster".to_string(),
413 "info".to_string(),
414 format!("{}-node-0:6379", self.name),
415 ];
416
417 if let Some(ref password) = self.password {
418 info_args.push("-a".to_string());
419 info_args.push(password.clone());
420 }
421
422 let output = ExecCommand::new(&node_name, info_args).execute().await?;
423
424 ClusterInfo::from_output(&output.stdout)
426 }
427
428 pub async fn is_ready(&self) -> bool {
448 self.cluster_info()
449 .await
450 .map(|info| info.cluster_state == "ok")
451 .unwrap_or(false)
452 }
453
454 pub async fn wait_until_ready(
479 &self,
480 timeout: std::time::Duration,
481 ) -> Result<(), TemplateError> {
482 let start = std::time::Instant::now();
483
484 while start.elapsed() < timeout {
485 if self.is_ready().await {
486 return Ok(());
487 }
488 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
489 }
490
491 Err(TemplateError::Timeout(format!(
492 "Cluster '{}' did not become ready within {:?}",
493 self.name, timeout
494 )))
495 }
496
497 pub async fn detect_existing(&self) -> Option<RedisClusterConnection> {
521 let host = self.announce_ip.as_deref().unwrap_or("localhost");
522
523 let first_port = self.port_base;
525 let addr = format!("{}:{}", host, first_port);
526
527 let connect_result = tokio::time::timeout(
529 std::time::Duration::from_secs(2),
530 tokio::net::TcpStream::connect(&addr),
531 )
532 .await;
533
534 match connect_result {
535 Ok(Ok(_stream)) => {
536 Some(RedisClusterConnection::from_template(self))
539 }
540 _ => None,
541 }
542 }
543
544 pub async fn start_or_detect(
566 &self,
567 timeout: std::time::Duration,
568 ) -> Result<RedisClusterConnection, TemplateError> {
569 if let Some(conn) = self.detect_existing().await {
571 return Ok(conn);
572 }
573
574 self.start().await?;
576 self.wait_until_ready(timeout).await?;
577
578 Ok(RedisClusterConnection::from_template(self))
579 }
580}
581
582#[async_trait]
583impl Template for RedisClusterTemplate {
584 fn name(&self) -> &str {
585 &self.name
586 }
587
588 fn config(&self) -> &TemplateConfig {
589 unimplemented!("RedisClusterTemplate manages multiple containers")
591 }
592
593 fn config_mut(&mut self) -> &mut TemplateConfig {
594 unimplemented!("RedisClusterTemplate manages multiple containers")
595 }
596
597 async fn start(&self) -> Result<String, TemplateError> {
598 let _network_id = self.create_network().await?;
600
601 let mut container_ids = Vec::new();
603 for i in 0..self.total_nodes() {
604 let id = self.start_node(i).await?;
605 container_ids.push(id);
606 }
607
608 self.initialize_cluster(&container_ids).await?;
610
611 let insight_info = if self.with_redis_insight {
613 let _insight_id = self.start_redis_insight().await?;
614 format!(
615 ", RedisInsight UI at http://localhost:{}",
616 self.redis_insight_port
617 )
618 } else {
619 String::new()
620 };
621
622 Ok(format!(
624 "Redis Cluster '{}' started with {} nodes ({} masters, {} replicas){}",
625 self.name,
626 self.total_nodes(),
627 self.num_masters,
628 self.num_masters * self.num_replicas,
629 insight_info
630 ))
631 }
632
633 async fn stop(&self) -> Result<(), TemplateError> {
634 use crate::StopCommand;
635
636 for i in 0..self.total_nodes() {
638 let node_name = format!("{}-node-{}", self.name, i);
639 let _ = StopCommand::new(&node_name).execute().await;
640 }
641
642 if self.with_redis_insight {
644 let insight_name = format!("{}-insight", self.name);
645 let _ = StopCommand::new(&insight_name).execute().await;
646 }
647
648 Ok(())
649 }
650
651 async fn remove(&self) -> Result<(), TemplateError> {
652 use crate::{NetworkRmCommand, RmCommand};
653
654 for i in 0..self.total_nodes() {
656 let node_name = format!("{}-node-{}", self.name, i);
657 let _ = RmCommand::new(&node_name).force().volumes().execute().await;
658 }
659
660 if self.with_redis_insight {
662 let insight_name = format!("{}-insight", self.name);
663 let _ = RmCommand::new(&insight_name)
664 .force()
665 .volumes()
666 .execute()
667 .await;
668 }
669
670 let _ = NetworkRmCommand::new(&self.network_name).execute().await;
672
673 Ok(())
674 }
675}
676
677#[derive(Debug, Clone)]
679pub struct ClusterInfo {
680 pub cluster_state: String,
682 pub total_slots: u16,
684 pub nodes: Vec<NodeInfo>,
686}
687
688impl ClusterInfo {
689 #[allow(clippy::unnecessary_wraps)]
690 fn from_output(_output: &str) -> Result<Self, TemplateError> {
691 Ok(ClusterInfo {
693 cluster_state: "ok".to_string(),
694 total_slots: 16384,
695 nodes: Vec::new(),
696 })
697 }
698}
699
700#[derive(Debug, Clone)]
702pub struct NodeInfo {
703 pub id: String,
705 pub host: String,
707 pub port: u16,
709 pub role: NodeRole,
711 pub slots: Vec<(u16, u16)>,
713}
714
715#[derive(Debug, Clone, PartialEq)]
717pub enum NodeRole {
718 Master,
720 Replica,
722}
723
724#[derive(Debug, Clone)]
726pub struct RedisClusterConnection {
727 nodes: Vec<String>,
728 password: Option<String>,
729}
730
731impl RedisClusterConnection {
732 pub fn new(nodes: Vec<String>) -> Self {
749 Self {
750 nodes,
751 password: None,
752 }
753 }
754
755 pub fn with_password(nodes: Vec<String>, password: impl Into<String>) -> Self {
768 Self {
769 nodes,
770 password: Some(password.into()),
771 }
772 }
773
774 pub fn from_template(template: &RedisClusterTemplate) -> Self {
776 let host = template.announce_ip.as_deref().unwrap_or("localhost");
777 let mut nodes = Vec::new();
778
779 for i in 0..template.total_nodes() {
780 let port = template.port_base + i as u16;
781 nodes.push(format!("{}:{}", host, port));
782 }
783
784 Self {
785 nodes,
786 password: template.password.clone(),
787 }
788 }
789
790 pub fn nodes(&self) -> &[String] {
792 &self.nodes
793 }
794
795 pub fn nodes_string(&self) -> String {
797 self.nodes.join(",")
798 }
799
800 pub fn cluster_url(&self) -> String {
802 let auth = self
803 .password
804 .as_ref()
805 .map(|p| format!(":{}@", p))
806 .unwrap_or_default();
807
808 format!("redis-cluster://{}{}", auth, self.nodes.join(","))
809 }
810}
811
812#[cfg(test)]
813mod tests {
814 use super::*;
815 use serial_test::serial;
816
817 #[test]
818 fn test_redis_cluster_template_basic() {
819 let template = RedisClusterTemplate::new("test-cluster");
820 assert_eq!(template.name, "test-cluster");
821 assert_eq!(template.num_masters, 3);
822 assert_eq!(template.num_replicas, 0);
823 assert_eq!(template.port_base, 7000);
824 }
825
826 #[test]
827 fn test_redis_cluster_template_with_replicas() {
828 let template = RedisClusterTemplate::new("test-cluster")
829 .num_masters(3)
830 .num_replicas(1);
831
832 assert_eq!(template.total_nodes(), 6);
833 }
834
835 #[test]
836 fn test_redis_cluster_template_minimum_masters() {
837 let template = RedisClusterTemplate::new("test-cluster").num_masters(2); assert_eq!(template.num_masters, 3);
840 }
841
842 #[test]
843 fn test_redis_cluster_connection() {
844 let template = RedisClusterTemplate::new("test-cluster")
845 .num_masters(3)
846 .port_base(7000)
847 .password("secret");
848
849 let conn = RedisClusterConnection::from_template(&template);
850 assert_eq!(conn.nodes.len(), 3);
851 assert_eq!(conn.nodes[0], "localhost:7000");
852 assert_eq!(
853 conn.cluster_url(),
854 "redis-cluster://:secret@localhost:7000,localhost:7001,localhost:7002"
855 );
856 }
857
858 #[test]
859 fn test_redis_cluster_with_stack_and_insight() {
860 let template = RedisClusterTemplate::new("test-cluster")
861 .num_masters(3)
862 .with_redis_stack()
863 .with_redis_insight()
864 .redis_insight_port(8080);
865
866 assert!(template.use_redis_stack);
867 assert!(template.with_redis_insight);
868 assert_eq!(template.redis_insight_port, 8080);
869 }
870
871 #[test]
872 fn test_redis_cluster_connection_new() {
873 let nodes = vec![
874 "localhost:7000".to_string(),
875 "localhost:7001".to_string(),
876 "localhost:7002".to_string(),
877 ];
878 let conn = RedisClusterConnection::new(nodes.clone());
879
880 assert_eq!(conn.nodes(), &nodes);
881 assert_eq!(
882 conn.nodes_string(),
883 "localhost:7000,localhost:7001,localhost:7002"
884 );
885 assert_eq!(
886 conn.cluster_url(),
887 "redis-cluster://localhost:7000,localhost:7001,localhost:7002"
888 );
889 }
890
891 #[test]
892 fn test_redis_cluster_connection_with_password() {
893 let nodes = vec!["localhost:7000".to_string()];
894 let conn = RedisClusterConnection::with_password(nodes, "secret123");
895
896 assert_eq!(
897 conn.cluster_url(),
898 "redis-cluster://:secret123@localhost:7000"
899 );
900 }
901
902 #[test]
903 #[serial]
904 fn test_redis_cluster_from_env_defaults() {
905 std::env::remove_var("REDIS_CLUSTER_PORT_BASE");
907 std::env::remove_var("REDIS_CLUSTER_NUM_MASTERS");
908 std::env::remove_var("REDIS_CLUSTER_NUM_REPLICAS");
909 std::env::remove_var("REDIS_CLUSTER_PASSWORD");
910
911 let template = RedisClusterTemplate::from_env("test-cluster");
912
913 assert_eq!(template.get_port_base(), 7000);
914 assert_eq!(template.get_num_masters(), 3);
915 assert_eq!(template.get_num_replicas(), 0);
916 }
917
918 #[test]
919 #[serial]
920 fn test_redis_cluster_from_env_with_vars() {
921 std::env::set_var("REDIS_CLUSTER_PORT_BASE", "8000");
922 std::env::set_var("REDIS_CLUSTER_NUM_MASTERS", "6");
923 std::env::set_var("REDIS_CLUSTER_NUM_REPLICAS", "1");
924 std::env::set_var("REDIS_CLUSTER_PASSWORD", "testpass");
925
926 let template = RedisClusterTemplate::from_env("test-cluster");
927
928 assert_eq!(template.get_port_base(), 8000);
929 assert_eq!(template.get_num_masters(), 6);
930 assert_eq!(template.get_num_replicas(), 1);
931
932 std::env::remove_var("REDIS_CLUSTER_PORT_BASE");
934 std::env::remove_var("REDIS_CLUSTER_NUM_MASTERS");
935 std::env::remove_var("REDIS_CLUSTER_NUM_REPLICAS");
936 std::env::remove_var("REDIS_CLUSTER_PASSWORD");
937 }
938
939 #[test]
940 fn test_redis_cluster_getters() {
941 let template = RedisClusterTemplate::new("test-cluster")
942 .port_base(9000)
943 .num_masters(5)
944 .num_replicas(2);
945
946 assert_eq!(template.get_port_base(), 9000);
947 assert_eq!(template.get_num_masters(), 5);
948 assert_eq!(template.get_num_replicas(), 2);
949 }
950}