docker_wrapper/template/redis/
cluster.rs1#![allow(clippy::doc_markdown)]
4#![allow(clippy::must_use_candidate)]
5#![allow(clippy::return_self_not_must_use)]
6#![allow(clippy::uninlined_format_args)]
7#![allow(clippy::cast_possible_truncation)]
8#![allow(clippy::missing_errors_doc)]
9
10use crate::template::{Template, TemplateConfig, TemplateError};
11use crate::{DockerCommand, ExecCommand, NetworkCreateCommand, RunCommand};
12use async_trait::async_trait;
13
14pub struct RedisClusterTemplate {
16 name: String,
18 num_masters: usize,
20 num_replicas: usize,
22 port_base: u16,
24 network_name: String,
26 password: Option<String>,
28 announce_ip: Option<String>,
30 volume_prefix: Option<String>,
32 memory_limit: Option<String>,
34 node_timeout: u32,
36 auto_remove: bool,
38 use_redis_stack: bool,
40 with_redis_insight: bool,
42 redis_insight_port: u16,
44 redis_image: Option<String>,
46 redis_tag: Option<String>,
48 platform: Option<String>,
50}
51
52impl RedisClusterTemplate {
53 pub fn new(name: impl Into<String>) -> Self {
55 let name = name.into();
56 let network_name = format!("{}-network", name);
57
58 Self {
59 name,
60 num_masters: 3,
61 num_replicas: 0,
62 port_base: 7000,
63 network_name,
64 password: None,
65 announce_ip: None,
66 volume_prefix: None,
67 memory_limit: None,
68 node_timeout: 5000,
69 auto_remove: false,
70 use_redis_stack: false,
71 with_redis_insight: false,
72 redis_insight_port: 8001,
73 redis_image: None,
74 redis_tag: None,
75 platform: None,
76 }
77 }
78
79 pub fn num_masters(mut self, masters: usize) -> Self {
81 self.num_masters = masters.max(3);
82 self
83 }
84
85 pub fn num_replicas(mut self, replicas: usize) -> Self {
87 self.num_replicas = replicas;
88 self
89 }
90
91 pub fn port_base(mut self, port: u16) -> Self {
93 self.port_base = port;
94 self
95 }
96
97 pub fn password(mut self, password: impl Into<String>) -> Self {
99 self.password = Some(password.into());
100 self
101 }
102
103 pub fn cluster_announce_ip(mut self, ip: impl Into<String>) -> Self {
105 self.announce_ip = Some(ip.into());
106 self
107 }
108
109 pub fn with_persistence(mut self, volume_prefix: impl Into<String>) -> Self {
111 self.volume_prefix = Some(volume_prefix.into());
112 self
113 }
114
115 pub fn memory_limit(mut self, limit: impl Into<String>) -> Self {
117 self.memory_limit = Some(limit.into());
118 self
119 }
120
121 pub fn cluster_node_timeout(mut self, timeout: u32) -> Self {
123 self.node_timeout = timeout;
124 self
125 }
126
127 pub fn auto_remove(mut self) -> Self {
129 self.auto_remove = true;
130 self
131 }
132
133 pub fn with_redis_stack(mut self) -> Self {
135 self.use_redis_stack = true;
136 self
137 }
138
139 pub fn with_redis_insight(mut self) -> Self {
141 self.with_redis_insight = true;
142 self
143 }
144
145 pub fn redis_insight_port(mut self, port: u16) -> Self {
147 self.redis_insight_port = port;
148 self
149 }
150
151 pub fn custom_redis_image(mut self, image: impl Into<String>, tag: impl Into<String>) -> Self {
153 self.redis_image = Some(image.into());
154 self.redis_tag = Some(tag.into());
155 self
156 }
157
158 pub fn platform(mut self, platform: impl Into<String>) -> Self {
160 self.platform = Some(platform.into());
161 self
162 }
163
164 fn total_nodes(&self) -> usize {
166 self.num_masters + (self.num_masters * self.num_replicas)
167 }
168
169 async fn create_network(&self) -> Result<String, TemplateError> {
171 let output = NetworkCreateCommand::new(&self.network_name)
172 .driver("bridge")
173 .execute()
174 .await?;
175
176 Ok(output.stdout.trim().to_string())
178 }
179
180 async fn start_node(&self, node_index: usize) -> Result<String, TemplateError> {
182 let node_name = format!("{}-node-{}", self.name, node_index);
183 let port = self.port_base + node_index as u16;
184 let cluster_port = port + 10000;
185
186 let image = if let Some(ref custom_image) = self.redis_image {
188 if let Some(ref tag) = self.redis_tag {
189 format!("{}:{}", custom_image, tag)
190 } else {
191 custom_image.clone()
192 }
193 } else if self.use_redis_stack {
194 "redis/redis-stack-server:latest".to_string()
195 } else {
196 "redis:7-alpine".to_string()
197 };
198
199 let mut cmd = RunCommand::new(image)
200 .name(&node_name)
201 .network(&self.network_name)
202 .port(port, 6379)
203 .port(cluster_port, 16379)
204 .detach();
205
206 if let Some(ref limit) = self.memory_limit {
208 cmd = cmd.memory(limit);
209 }
210
211 if let Some(ref prefix) = self.volume_prefix {
213 let volume_name = format!("{}-{}", prefix, node_index);
214 cmd = cmd.volume(&volume_name, "/data");
215 }
216
217 if let Some(ref platform) = self.platform {
219 cmd = cmd.platform(platform);
220 }
221
222 if self.auto_remove {
224 cmd = cmd.remove();
225 }
226
227 let mut redis_args = vec![
229 "redis-server".to_string(),
230 "--cluster-enabled".to_string(),
231 "yes".to_string(),
232 "--cluster-config-file".to_string(),
233 "nodes.conf".to_string(),
234 "--cluster-node-timeout".to_string(),
235 self.node_timeout.to_string(),
236 "--appendonly".to_string(),
237 "yes".to_string(),
238 "--port".to_string(),
239 "6379".to_string(),
240 ];
241
242 if let Some(ref password) = self.password {
244 redis_args.push("--requirepass".to_string());
245 redis_args.push(password.clone());
246 redis_args.push("--masterauth".to_string());
247 redis_args.push(password.clone());
248 }
249
250 if let Some(ref ip) = self.announce_ip {
252 redis_args.push("--cluster-announce-ip".to_string());
253 redis_args.push(ip.clone());
254 redis_args.push("--cluster-announce-port".to_string());
255 redis_args.push(port.to_string());
256 redis_args.push("--cluster-announce-bus-port".to_string());
257 redis_args.push(cluster_port.to_string());
258 }
259
260 cmd = cmd.cmd(redis_args);
261
262 let output = cmd.execute().await?;
263 Ok(output.0)
264 }
265
266 async fn start_redis_insight(&self) -> Result<String, TemplateError> {
268 let insight_name = format!("{}-insight", self.name);
269
270 let mut cmd = RunCommand::new("redislabs/redisinsight:latest")
271 .name(&insight_name)
272 .network(&self.network_name)
273 .port(self.redis_insight_port, 8001)
274 .detach();
275
276 if let Some(ref prefix) = self.volume_prefix {
278 let volume_name = format!("{}-insight", prefix);
279 cmd = cmd.volume(&volume_name, "/db");
280 }
281
282 if self.auto_remove {
284 cmd = cmd.remove();
285 }
286
287 cmd = cmd.env("RITRUSTEDORIGINS", "http://localhost");
289
290 let output = cmd.execute().await?;
291 Ok(output.0)
292 }
293
294 async fn initialize_cluster(&self, container_ids: &[String]) -> Result<(), TemplateError> {
296 if container_ids.is_empty() {
297 return Err(TemplateError::InvalidConfig(
298 "No containers to initialize cluster".to_string(),
299 ));
300 }
301
302 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
304
305 let mut create_args = vec![
307 "redis-cli".to_string(),
308 "--cluster".to_string(),
309 "create".to_string(),
310 ];
311
312 let host = self.announce_ip.as_deref().unwrap_or("127.0.0.1");
314 for i in 0..self.total_nodes() {
315 let port = self.port_base + i as u16;
316 create_args.push(format!("{}:{}", host, port));
317 }
318
319 if self.num_replicas > 0 {
321 create_args.push("--cluster-replicas".to_string());
322 create_args.push(self.num_replicas.to_string());
323 }
324
325 if let Some(ref password) = self.password {
327 create_args.push("-a".to_string());
328 create_args.push(password.clone());
329 }
330
331 create_args.push("--cluster-yes".to_string());
333
334 let first_node_name = format!("{}-node-0", self.name);
336
337 ExecCommand::new(&first_node_name, create_args)
338 .execute()
339 .await?;
340
341 Ok(())
342 }
343
344 pub async fn cluster_info(&self) -> Result<ClusterInfo, TemplateError> {
346 let node_name = format!("{}-node-0", self.name);
347
348 let mut info_args = vec![
349 "redis-cli".to_string(),
350 "--cluster".to_string(),
351 "info".to_string(),
352 format!(
353 "{}:{}",
354 self.announce_ip.as_deref().unwrap_or("127.0.0.1"),
355 self.port_base
356 ),
357 ];
358
359 if let Some(ref password) = self.password {
360 info_args.push("-a".to_string());
361 info_args.push(password.clone());
362 }
363
364 let output = ExecCommand::new(&node_name, info_args).execute().await?;
365
366 ClusterInfo::from_output(&output.stdout)
368 }
369}
370
371#[async_trait]
372impl Template for RedisClusterTemplate {
373 fn name(&self) -> &str {
374 &self.name
375 }
376
377 fn config(&self) -> &TemplateConfig {
378 unimplemented!("RedisClusterTemplate manages multiple containers")
380 }
381
382 fn config_mut(&mut self) -> &mut TemplateConfig {
383 unimplemented!("RedisClusterTemplate manages multiple containers")
384 }
385
386 async fn start(&self) -> Result<String, TemplateError> {
387 let _network_id = self.create_network().await?;
389
390 let mut container_ids = Vec::new();
392 for i in 0..self.total_nodes() {
393 let id = self.start_node(i).await?;
394 container_ids.push(id);
395 }
396
397 self.initialize_cluster(&container_ids).await?;
399
400 let insight_info = if self.with_redis_insight {
402 let _insight_id = self.start_redis_insight().await?;
403 format!(
404 ", RedisInsight UI at http://localhost:{}",
405 self.redis_insight_port
406 )
407 } else {
408 String::new()
409 };
410
411 Ok(format!(
413 "Redis Cluster '{}' started with {} nodes ({} masters, {} replicas){}",
414 self.name,
415 self.total_nodes(),
416 self.num_masters,
417 self.num_masters * self.num_replicas,
418 insight_info
419 ))
420 }
421
422 async fn stop(&self) -> Result<(), TemplateError> {
423 use crate::StopCommand;
424
425 for i in 0..self.total_nodes() {
427 let node_name = format!("{}-node-{}", self.name, i);
428 let _ = StopCommand::new(&node_name).execute().await;
429 }
430
431 if self.with_redis_insight {
433 let insight_name = format!("{}-insight", self.name);
434 let _ = StopCommand::new(&insight_name).execute().await;
435 }
436
437 Ok(())
438 }
439
440 async fn remove(&self) -> Result<(), TemplateError> {
441 use crate::{NetworkRmCommand, RmCommand};
442
443 for i in 0..self.total_nodes() {
445 let node_name = format!("{}-node-{}", self.name, i);
446 let _ = RmCommand::new(&node_name).force().volumes().execute().await;
447 }
448
449 if self.with_redis_insight {
451 let insight_name = format!("{}-insight", self.name);
452 let _ = RmCommand::new(&insight_name)
453 .force()
454 .volumes()
455 .execute()
456 .await;
457 }
458
459 let _ = NetworkRmCommand::new(&self.network_name).execute().await;
461
462 Ok(())
463 }
464}
465
466#[derive(Debug, Clone)]
468pub struct ClusterInfo {
469 pub cluster_state: String,
471 pub total_slots: u16,
473 pub nodes: Vec<NodeInfo>,
475}
476
477impl ClusterInfo {
478 #[allow(clippy::unnecessary_wraps)]
479 fn from_output(_output: &str) -> Result<Self, TemplateError> {
480 Ok(ClusterInfo {
482 cluster_state: "ok".to_string(),
483 total_slots: 16384,
484 nodes: Vec::new(),
485 })
486 }
487}
488
489#[derive(Debug, Clone)]
491pub struct NodeInfo {
492 pub id: String,
494 pub host: String,
496 pub port: u16,
498 pub role: NodeRole,
500 pub slots: Vec<(u16, u16)>,
502}
503
504#[derive(Debug, Clone, PartialEq)]
506pub enum NodeRole {
507 Master,
509 Replica,
511}
512
513pub struct RedisClusterConnection {
515 nodes: Vec<String>,
516 password: Option<String>,
517}
518
519impl RedisClusterConnection {
520 pub fn from_template(template: &RedisClusterTemplate) -> Self {
522 let host = template.announce_ip.as_deref().unwrap_or("localhost");
523 let mut nodes = Vec::new();
524
525 for i in 0..template.total_nodes() {
526 let port = template.port_base + i as u16;
527 nodes.push(format!("{}:{}", host, port));
528 }
529
530 Self {
531 nodes,
532 password: template.password.clone(),
533 }
534 }
535
536 pub fn nodes_string(&self) -> String {
538 self.nodes.join(",")
539 }
540
541 pub fn cluster_url(&self) -> String {
543 let auth = self
544 .password
545 .as_ref()
546 .map(|p| format!(":{}@", p))
547 .unwrap_or_default();
548
549 format!("redis-cluster://{}{}", auth, self.nodes.join(","))
550 }
551}
552
553#[cfg(test)]
554mod tests {
555 use super::*;
556
557 #[test]
558 fn test_redis_cluster_template_basic() {
559 let template = RedisClusterTemplate::new("test-cluster");
560 assert_eq!(template.name, "test-cluster");
561 assert_eq!(template.num_masters, 3);
562 assert_eq!(template.num_replicas, 0);
563 assert_eq!(template.port_base, 7000);
564 }
565
566 #[test]
567 fn test_redis_cluster_template_with_replicas() {
568 let template = RedisClusterTemplate::new("test-cluster")
569 .num_masters(3)
570 .num_replicas(1);
571
572 assert_eq!(template.total_nodes(), 6);
573 }
574
575 #[test]
576 fn test_redis_cluster_template_minimum_masters() {
577 let template = RedisClusterTemplate::new("test-cluster").num_masters(2); assert_eq!(template.num_masters, 3);
580 }
581
582 #[test]
583 fn test_redis_cluster_connection() {
584 let template = RedisClusterTemplate::new("test-cluster")
585 .num_masters(3)
586 .port_base(7000)
587 .password("secret");
588
589 let conn = RedisClusterConnection::from_template(&template);
590 assert_eq!(conn.nodes.len(), 3);
591 assert_eq!(conn.nodes[0], "localhost:7000");
592 assert_eq!(
593 conn.cluster_url(),
594 "redis-cluster://:secret@localhost:7000,localhost:7001,localhost:7002"
595 );
596 }
597
598 #[test]
599 fn test_redis_cluster_with_stack_and_insight() {
600 let template = RedisClusterTemplate::new("test-cluster")
601 .num_masters(3)
602 .with_redis_stack()
603 .with_redis_insight()
604 .redis_insight_port(8080);
605
606 assert!(template.use_redis_stack);
607 assert!(template.with_redis_insight);
608 assert_eq!(template.redis_insight_port, 8080);
609 }
610}