docker_wrapper/template/redis/
cluster.rs1#![allow(clippy::doc_markdown)]
4#![allow(clippy::must_use_candidate)]
5#![allow(clippy::return_self_not_must_use)]
6#![allow(clippy::uninlined_format_args)]
7#![allow(clippy::cast_possible_truncation)]
8#![allow(clippy::missing_errors_doc)]
9
10use crate::template::{Template, TemplateConfig, TemplateError};
11use crate::{DockerCommand, ExecCommand, NetworkCreateCommand, RunCommand};
12use async_trait::async_trait;
13
14pub struct RedisClusterTemplate {
16 name: String,
18 num_masters: usize,
20 num_replicas: usize,
22 port_base: u16,
24 network_name: String,
26 password: Option<String>,
28 announce_ip: Option<String>,
30 volume_prefix: Option<String>,
32 memory_limit: Option<String>,
34 node_timeout: u32,
36 auto_remove: bool,
38 use_redis_stack: bool,
40 with_redis_insight: bool,
42 redis_insight_port: u16,
44 redis_image: Option<String>,
46 redis_tag: Option<String>,
48 platform: Option<String>,
50}
51
52impl RedisClusterTemplate {
53 pub fn new(name: impl Into<String>) -> Self {
55 let name = name.into();
56 let network_name = format!("{}-network", name);
57
58 Self {
59 name,
60 num_masters: 3,
61 num_replicas: 0,
62 port_base: 7000,
63 network_name,
64 password: None,
65 announce_ip: None,
66 volume_prefix: None,
67 memory_limit: None,
68 node_timeout: 5000,
69 auto_remove: false,
70 use_redis_stack: false,
71 with_redis_insight: false,
72 redis_insight_port: 8001,
73 redis_image: None,
74 redis_tag: None,
75 platform: None,
76 }
77 }
78
79 pub fn num_masters(mut self, masters: usize) -> Self {
81 self.num_masters = masters.max(3);
82 self
83 }
84
85 pub fn num_replicas(mut self, replicas: usize) -> Self {
87 self.num_replicas = replicas;
88 self
89 }
90
91 pub fn port_base(mut self, port: u16) -> Self {
93 self.port_base = port;
94 self
95 }
96
97 pub fn password(mut self, password: impl Into<String>) -> Self {
99 self.password = Some(password.into());
100 self
101 }
102
103 pub fn cluster_announce_ip(mut self, ip: impl Into<String>) -> Self {
105 self.announce_ip = Some(ip.into());
106 self
107 }
108
109 pub fn with_persistence(mut self, volume_prefix: impl Into<String>) -> Self {
111 self.volume_prefix = Some(volume_prefix.into());
112 self
113 }
114
115 pub fn memory_limit(mut self, limit: impl Into<String>) -> Self {
117 self.memory_limit = Some(limit.into());
118 self
119 }
120
121 pub fn cluster_node_timeout(mut self, timeout: u32) -> Self {
123 self.node_timeout = timeout;
124 self
125 }
126
127 pub fn auto_remove(mut self) -> Self {
129 self.auto_remove = true;
130 self
131 }
132
133 pub fn with_redis_stack(mut self) -> Self {
135 self.use_redis_stack = true;
136 self
137 }
138
139 pub fn with_redis_insight(mut self) -> Self {
141 self.with_redis_insight = true;
142 self
143 }
144
145 pub fn redis_insight_port(mut self, port: u16) -> Self {
147 self.redis_insight_port = port;
148 self
149 }
150
151 pub fn custom_redis_image(mut self, image: impl Into<String>, tag: impl Into<String>) -> Self {
153 self.redis_image = Some(image.into());
154 self.redis_tag = Some(tag.into());
155 self
156 }
157
158 pub fn platform(mut self, platform: impl Into<String>) -> Self {
160 self.platform = Some(platform.into());
161 self
162 }
163
164 fn total_nodes(&self) -> usize {
166 self.num_masters + (self.num_masters * self.num_replicas)
167 }
168
169 async fn create_network(&self) -> Result<String, TemplateError> {
171 let output = NetworkCreateCommand::new(&self.network_name)
172 .driver("bridge")
173 .execute()
174 .await?;
175
176 Ok(output.stdout.trim().to_string())
178 }
179
180 async fn start_node(&self, node_index: usize) -> Result<String, TemplateError> {
182 let node_name = format!("{}-node-{}", self.name, node_index);
183 let port = self.port_base + node_index as u16;
184 let cluster_port = port + 10000;
185
186 let image = if let Some(ref custom_image) = self.redis_image {
188 if let Some(ref tag) = self.redis_tag {
189 format!("{}:{}", custom_image, tag)
190 } else {
191 custom_image.clone()
192 }
193 } else if self.use_redis_stack {
194 "redis/redis-stack-server:latest".to_string()
195 } else {
196 "redis:7-alpine".to_string()
197 };
198
199 let mut cmd = RunCommand::new(image)
200 .name(&node_name)
201 .network(&self.network_name)
202 .port(port, 6379)
203 .port(cluster_port, 16379)
204 .detach();
205
206 if let Some(ref limit) = self.memory_limit {
208 cmd = cmd.memory(limit);
209 }
210
211 if let Some(ref prefix) = self.volume_prefix {
213 let volume_name = format!("{}-{}", prefix, node_index);
214 cmd = cmd.volume(&volume_name, "/data");
215 }
216
217 if let Some(ref platform) = self.platform {
219 cmd = cmd.platform(platform);
220 }
221
222 if self.auto_remove {
224 cmd = cmd.remove();
225 }
226
227 let mut redis_args = vec![
229 "redis-server".to_string(),
230 "--cluster-enabled".to_string(),
231 "yes".to_string(),
232 "--cluster-config-file".to_string(),
233 "nodes.conf".to_string(),
234 "--cluster-node-timeout".to_string(),
235 self.node_timeout.to_string(),
236 "--appendonly".to_string(),
237 "yes".to_string(),
238 "--port".to_string(),
239 "6379".to_string(),
240 ];
241
242 if let Some(ref password) = self.password {
244 redis_args.push("--requirepass".to_string());
245 redis_args.push(password.clone());
246 redis_args.push("--masterauth".to_string());
247 redis_args.push(password.clone());
248 }
249
250 if let Some(ref ip) = self.announce_ip {
252 redis_args.push("--cluster-announce-ip".to_string());
253 redis_args.push(ip.clone());
254 redis_args.push("--cluster-announce-port".to_string());
255 redis_args.push(port.to_string());
256 redis_args.push("--cluster-announce-bus-port".to_string());
257 redis_args.push(cluster_port.to_string());
258 }
259
260 cmd = cmd.cmd(redis_args);
261
262 let output = cmd.execute().await?;
263 Ok(output.0)
264 }
265
266 async fn start_redis_insight(&self) -> Result<String, TemplateError> {
268 let insight_name = format!("{}-insight", self.name);
269
270 let mut cmd = RunCommand::new("redislabs/redisinsight:latest")
271 .name(&insight_name)
272 .network(&self.network_name)
273 .port(self.redis_insight_port, 8001)
274 .detach();
275
276 if let Some(ref prefix) = self.volume_prefix {
278 let volume_name = format!("{}-insight", prefix);
279 cmd = cmd.volume(&volume_name, "/db");
280 }
281
282 if self.auto_remove {
284 cmd = cmd.remove();
285 }
286
287 cmd = cmd.env("RITRUSTEDORIGINS", "http://localhost");
289
290 let output = cmd.execute().await?;
291 Ok(output.0)
292 }
293
294 async fn initialize_cluster(&self, container_ids: &[String]) -> Result<(), TemplateError> {
296 if container_ids.is_empty() {
297 return Err(TemplateError::InvalidConfig(
298 "No containers to initialize cluster".to_string(),
299 ));
300 }
301
302 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
304
305 let mut create_args = vec![
307 "redis-cli".to_string(),
308 "--cluster".to_string(),
309 "create".to_string(),
310 ];
311
312 for i in 0..self.total_nodes() {
314 let host = format!("{}-node-{}", self.name, i);
315 let port = 6379;
316 create_args.push(format!("{}:{}", host, port));
317 }
318
319 if self.num_replicas > 0 {
321 create_args.push("--cluster-replicas".to_string());
322 create_args.push(self.num_replicas.to_string());
323 }
324
325 if let Some(ref password) = self.password {
327 create_args.push("-a".to_string());
328 create_args.push(password.clone());
329 }
330
331 create_args.push("--cluster-yes".to_string());
333
334 let first_node_name = format!("{}-node-0", self.name);
336
337 ExecCommand::new(&first_node_name, create_args)
338 .execute()
339 .await?;
340
341 Ok(())
342 }
343
344 pub async fn cluster_info(&self) -> Result<ClusterInfo, TemplateError> {
346 let node_name = format!("{}-node-0", self.name);
347
348 let mut info_args = vec![
349 "redis-cli".to_string(),
350 "--cluster".to_string(),
351 "info".to_string(),
352 format!("{}-node-0:6379", self.name),
353 ];
354
355 if let Some(ref password) = self.password {
356 info_args.push("-a".to_string());
357 info_args.push(password.clone());
358 }
359
360 let output = ExecCommand::new(&node_name, info_args).execute().await?;
361
362 ClusterInfo::from_output(&output.stdout)
364 }
365}
366
367#[async_trait]
368impl Template for RedisClusterTemplate {
369 fn name(&self) -> &str {
370 &self.name
371 }
372
373 fn config(&self) -> &TemplateConfig {
374 unimplemented!("RedisClusterTemplate manages multiple containers")
376 }
377
378 fn config_mut(&mut self) -> &mut TemplateConfig {
379 unimplemented!("RedisClusterTemplate manages multiple containers")
380 }
381
382 async fn start(&self) -> Result<String, TemplateError> {
383 let _network_id = self.create_network().await?;
385
386 let mut container_ids = Vec::new();
388 for i in 0..self.total_nodes() {
389 let id = self.start_node(i).await?;
390 container_ids.push(id);
391 }
392
393 self.initialize_cluster(&container_ids).await?;
395
396 let insight_info = if self.with_redis_insight {
398 let _insight_id = self.start_redis_insight().await?;
399 format!(
400 ", RedisInsight UI at http://localhost:{}",
401 self.redis_insight_port
402 )
403 } else {
404 String::new()
405 };
406
407 Ok(format!(
409 "Redis Cluster '{}' started with {} nodes ({} masters, {} replicas){}",
410 self.name,
411 self.total_nodes(),
412 self.num_masters,
413 self.num_masters * self.num_replicas,
414 insight_info
415 ))
416 }
417
418 async fn stop(&self) -> Result<(), TemplateError> {
419 use crate::StopCommand;
420
421 for i in 0..self.total_nodes() {
423 let node_name = format!("{}-node-{}", self.name, i);
424 let _ = StopCommand::new(&node_name).execute().await;
425 }
426
427 if self.with_redis_insight {
429 let insight_name = format!("{}-insight", self.name);
430 let _ = StopCommand::new(&insight_name).execute().await;
431 }
432
433 Ok(())
434 }
435
436 async fn remove(&self) -> Result<(), TemplateError> {
437 use crate::{NetworkRmCommand, RmCommand};
438
439 for i in 0..self.total_nodes() {
441 let node_name = format!("{}-node-{}", self.name, i);
442 let _ = RmCommand::new(&node_name).force().volumes().execute().await;
443 }
444
445 if self.with_redis_insight {
447 let insight_name = format!("{}-insight", self.name);
448 let _ = RmCommand::new(&insight_name)
449 .force()
450 .volumes()
451 .execute()
452 .await;
453 }
454
455 let _ = NetworkRmCommand::new(&self.network_name).execute().await;
457
458 Ok(())
459 }
460}
461
462#[derive(Debug, Clone)]
464pub struct ClusterInfo {
465 pub cluster_state: String,
467 pub total_slots: u16,
469 pub nodes: Vec<NodeInfo>,
471}
472
473impl ClusterInfo {
474 #[allow(clippy::unnecessary_wraps)]
475 fn from_output(_output: &str) -> Result<Self, TemplateError> {
476 Ok(ClusterInfo {
478 cluster_state: "ok".to_string(),
479 total_slots: 16384,
480 nodes: Vec::new(),
481 })
482 }
483}
484
485#[derive(Debug, Clone)]
487pub struct NodeInfo {
488 pub id: String,
490 pub host: String,
492 pub port: u16,
494 pub role: NodeRole,
496 pub slots: Vec<(u16, u16)>,
498}
499
500#[derive(Debug, Clone, PartialEq)]
502pub enum NodeRole {
503 Master,
505 Replica,
507}
508
509pub struct RedisClusterConnection {
511 nodes: Vec<String>,
512 password: Option<String>,
513}
514
515impl RedisClusterConnection {
516 pub fn from_template(template: &RedisClusterTemplate) -> Self {
518 let host = template.announce_ip.as_deref().unwrap_or("localhost");
519 let mut nodes = Vec::new();
520
521 for i in 0..template.total_nodes() {
522 let port = template.port_base + i as u16;
523 nodes.push(format!("{}:{}", host, port));
524 }
525
526 Self {
527 nodes,
528 password: template.password.clone(),
529 }
530 }
531
532 pub fn nodes_string(&self) -> String {
534 self.nodes.join(",")
535 }
536
537 pub fn cluster_url(&self) -> String {
539 let auth = self
540 .password
541 .as_ref()
542 .map(|p| format!(":{}@", p))
543 .unwrap_or_default();
544
545 format!("redis-cluster://{}{}", auth, self.nodes.join(","))
546 }
547}
548
549#[cfg(test)]
550mod tests {
551 use super::*;
552
553 #[test]
554 fn test_redis_cluster_template_basic() {
555 let template = RedisClusterTemplate::new("test-cluster");
556 assert_eq!(template.name, "test-cluster");
557 assert_eq!(template.num_masters, 3);
558 assert_eq!(template.num_replicas, 0);
559 assert_eq!(template.port_base, 7000);
560 }
561
562 #[test]
563 fn test_redis_cluster_template_with_replicas() {
564 let template = RedisClusterTemplate::new("test-cluster")
565 .num_masters(3)
566 .num_replicas(1);
567
568 assert_eq!(template.total_nodes(), 6);
569 }
570
571 #[test]
572 fn test_redis_cluster_template_minimum_masters() {
573 let template = RedisClusterTemplate::new("test-cluster").num_masters(2); assert_eq!(template.num_masters, 3);
576 }
577
578 #[test]
579 fn test_redis_cluster_connection() {
580 let template = RedisClusterTemplate::new("test-cluster")
581 .num_masters(3)
582 .port_base(7000)
583 .password("secret");
584
585 let conn = RedisClusterConnection::from_template(&template);
586 assert_eq!(conn.nodes.len(), 3);
587 assert_eq!(conn.nodes[0], "localhost:7000");
588 assert_eq!(
589 conn.cluster_url(),
590 "redis-cluster://:secret@localhost:7000,localhost:7001,localhost:7002"
591 );
592 }
593
594 #[test]
595 fn test_redis_cluster_with_stack_and_insight() {
596 let template = RedisClusterTemplate::new("test-cluster")
597 .num_masters(3)
598 .with_redis_stack()
599 .with_redis_insight()
600 .redis_insight_port(8080);
601
602 assert!(template.use_redis_stack);
603 assert!(template.with_redis_insight);
604 assert_eq!(template.redis_insight_port, 8080);
605 }
606}