docker_wrapper/template/redis/
cluster.rs

1//! Redis Cluster template for multi-node Redis setup with sharding and replication
2
3#![allow(clippy::doc_markdown)]
4#![allow(clippy::must_use_candidate)]
5#![allow(clippy::return_self_not_must_use)]
6#![allow(clippy::uninlined_format_args)]
7#![allow(clippy::cast_possible_truncation)]
8#![allow(clippy::missing_errors_doc)]
9
10use crate::template::{Template, TemplateConfig, TemplateError};
11use crate::{DockerCommand, ExecCommand, NetworkCreateCommand, RunCommand};
12use async_trait::async_trait;
13
14/// Redis Cluster template for automatic multi-node cluster setup
15pub struct RedisClusterTemplate {
16    /// Base name for the cluster
17    name: String,
18    /// Number of master nodes (minimum 3)
19    num_masters: usize,
20    /// Number of replicas per master
21    num_replicas: usize,
22    /// Base port for Redis nodes
23    port_base: u16,
24    /// Network name for cluster communication
25    network_name: String,
26    /// Password for cluster authentication
27    password: Option<String>,
28    /// IP to announce to other nodes
29    announce_ip: Option<String>,
30    /// Volume prefix for persistence
31    volume_prefix: Option<String>,
32    /// Memory limit per node
33    memory_limit: Option<String>,
34    /// Cluster node timeout in milliseconds
35    node_timeout: u32,
36    /// Whether to remove containers on stop
37    auto_remove: bool,
38    /// Whether to use Redis Stack instead of standard Redis
39    use_redis_stack: bool,
40    /// Whether to include RedisInsight GUI
41    with_redis_insight: bool,
42    /// Port for RedisInsight UI
43    redis_insight_port: u16,
44    /// Custom Redis image
45    redis_image: Option<String>,
46    /// Custom Redis tag
47    redis_tag: Option<String>,
48    /// Platform for containers
49    platform: Option<String>,
50}
51
52impl RedisClusterTemplate {
53    /// Create a new Redis Cluster template with default settings
54    pub fn new(name: impl Into<String>) -> Self {
55        let name = name.into();
56        let network_name = format!("{}-network", name);
57
58        Self {
59            name,
60            num_masters: 3,
61            num_replicas: 0,
62            port_base: 7000,
63            network_name,
64            password: None,
65            announce_ip: None,
66            volume_prefix: None,
67            memory_limit: None,
68            node_timeout: 5000,
69            auto_remove: false,
70            use_redis_stack: false,
71            with_redis_insight: false,
72            redis_insight_port: 8001,
73            redis_image: None,
74            redis_tag: None,
75            platform: None,
76        }
77    }
78
79    /// Set the number of master nodes (minimum 3)
80    pub fn num_masters(mut self, masters: usize) -> Self {
81        self.num_masters = masters.max(3);
82        self
83    }
84
85    /// Set the number of replicas per master
86    pub fn num_replicas(mut self, replicas: usize) -> Self {
87        self.num_replicas = replicas;
88        self
89    }
90
91    /// Set the base port for Redis nodes
92    pub fn port_base(mut self, port: u16) -> Self {
93        self.port_base = port;
94        self
95    }
96
97    /// Set cluster password
98    pub fn password(mut self, password: impl Into<String>) -> Self {
99        self.password = Some(password.into());
100        self
101    }
102
103    /// Set the IP to announce to other cluster nodes
104    pub fn cluster_announce_ip(mut self, ip: impl Into<String>) -> Self {
105        self.announce_ip = Some(ip.into());
106        self
107    }
108
109    /// Enable persistence with volume prefix
110    pub fn with_persistence(mut self, volume_prefix: impl Into<String>) -> Self {
111        self.volume_prefix = Some(volume_prefix.into());
112        self
113    }
114
115    /// Set memory limit per node
116    pub fn memory_limit(mut self, limit: impl Into<String>) -> Self {
117        self.memory_limit = Some(limit.into());
118        self
119    }
120
121    /// Set cluster node timeout in milliseconds
122    pub fn cluster_node_timeout(mut self, timeout: u32) -> Self {
123        self.node_timeout = timeout;
124        self
125    }
126
127    /// Enable auto-remove when stopped
128    pub fn auto_remove(mut self) -> Self {
129        self.auto_remove = true;
130        self
131    }
132
133    /// Use Redis Stack instead of standard Redis (includes modules like JSON, Search, Graph, TimeSeries, Bloom)
134    pub fn with_redis_stack(mut self) -> Self {
135        self.use_redis_stack = true;
136        self
137    }
138
139    /// Enable RedisInsight GUI for cluster visualization and management
140    pub fn with_redis_insight(mut self) -> Self {
141        self.with_redis_insight = true;
142        self
143    }
144
145    /// Set the port for RedisInsight UI (default: 8001)
146    pub fn redis_insight_port(mut self, port: u16) -> Self {
147        self.redis_insight_port = port;
148        self
149    }
150
151    /// Use a custom Redis image and tag
152    pub fn custom_redis_image(mut self, image: impl Into<String>, tag: impl Into<String>) -> Self {
153        self.redis_image = Some(image.into());
154        self.redis_tag = Some(tag.into());
155        self
156    }
157
158    /// Set the platform for the containers (e.g., "linux/arm64", "linux/amd64")
159    pub fn platform(mut self, platform: impl Into<String>) -> Self {
160        self.platform = Some(platform.into());
161        self
162    }
163
164    /// Get the total number of nodes
165    fn total_nodes(&self) -> usize {
166        self.num_masters + (self.num_masters * self.num_replicas)
167    }
168
169    /// Create the cluster network
170    async fn create_network(&self) -> Result<String, TemplateError> {
171        let output = NetworkCreateCommand::new(&self.network_name)
172            .driver("bridge")
173            .execute()
174            .await?;
175
176        // Network ID is in stdout
177        Ok(output.stdout.trim().to_string())
178    }
179
180    /// Start a single Redis node
181    async fn start_node(&self, node_index: usize) -> Result<String, TemplateError> {
182        let node_name = format!("{}-node-{}", self.name, node_index);
183        let port = self.port_base + node_index as u16;
184        let cluster_port = port + 10000;
185
186        // Choose image based on custom image or Redis Stack preference
187        let image = if let Some(ref custom_image) = self.redis_image {
188            if let Some(ref tag) = self.redis_tag {
189                format!("{}:{}", custom_image, tag)
190            } else {
191                custom_image.clone()
192            }
193        } else if self.use_redis_stack {
194            "redis/redis-stack-server:latest".to_string()
195        } else {
196            "redis:7-alpine".to_string()
197        };
198
199        let mut cmd = RunCommand::new(image)
200            .name(&node_name)
201            .network(&self.network_name)
202            .port(port, 6379)
203            .port(cluster_port, 16379)
204            .detach();
205
206        // Add memory limit if specified
207        if let Some(ref limit) = self.memory_limit {
208            cmd = cmd.memory(limit);
209        }
210
211        // Add volume for persistence
212        if let Some(ref prefix) = self.volume_prefix {
213            let volume_name = format!("{}-{}", prefix, node_index);
214            cmd = cmd.volume(&volume_name, "/data");
215        }
216
217        // Add platform if specified
218        if let Some(ref platform) = self.platform {
219            cmd = cmd.platform(platform);
220        }
221
222        // Auto-remove
223        if self.auto_remove {
224            cmd = cmd.remove();
225        }
226
227        // Build Redis command with cluster configuration
228        let mut redis_args = vec![
229            "redis-server".to_string(),
230            "--cluster-enabled".to_string(),
231            "yes".to_string(),
232            "--cluster-config-file".to_string(),
233            "nodes.conf".to_string(),
234            "--cluster-node-timeout".to_string(),
235            self.node_timeout.to_string(),
236            "--appendonly".to_string(),
237            "yes".to_string(),
238            "--port".to_string(),
239            "6379".to_string(),
240        ];
241
242        // Add password if configured
243        if let Some(ref password) = self.password {
244            redis_args.push("--requirepass".to_string());
245            redis_args.push(password.clone());
246            redis_args.push("--masterauth".to_string());
247            redis_args.push(password.clone());
248        }
249
250        // Add announce IP if configured
251        if let Some(ref ip) = self.announce_ip {
252            redis_args.push("--cluster-announce-ip".to_string());
253            redis_args.push(ip.clone());
254            redis_args.push("--cluster-announce-port".to_string());
255            redis_args.push(port.to_string());
256            redis_args.push("--cluster-announce-bus-port".to_string());
257            redis_args.push(cluster_port.to_string());
258        }
259
260        cmd = cmd.cmd(redis_args);
261
262        let output = cmd.execute().await?;
263        Ok(output.0)
264    }
265
266    /// Start RedisInsight container
267    async fn start_redis_insight(&self) -> Result<String, TemplateError> {
268        let insight_name = format!("{}-insight", self.name);
269
270        let mut cmd = RunCommand::new("redislabs/redisinsight:latest")
271            .name(&insight_name)
272            .network(&self.network_name)
273            .port(self.redis_insight_port, 8001)
274            .detach();
275
276        // Add volume for RedisInsight data persistence
277        if let Some(ref prefix) = self.volume_prefix {
278            let volume_name = format!("{}-insight", prefix);
279            cmd = cmd.volume(&volume_name, "/db");
280        }
281
282        // Auto-remove
283        if self.auto_remove {
284            cmd = cmd.remove();
285        }
286
287        // Environment variables for RedisInsight
288        cmd = cmd.env("RITRUSTEDORIGINS", "http://localhost");
289
290        let output = cmd.execute().await?;
291        Ok(output.0)
292    }
293
294    /// Initialize the cluster after all nodes are started
295    async fn initialize_cluster(&self, container_ids: &[String]) -> Result<(), TemplateError> {
296        if container_ids.is_empty() {
297            return Err(TemplateError::InvalidConfig(
298                "No containers to initialize cluster".to_string(),
299            ));
300        }
301
302        // Wait a bit for all nodes to be ready
303        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
304
305        // Build the cluster create command
306        let mut create_args = vec![
307            "redis-cli".to_string(),
308            "--cluster".to_string(),
309            "create".to_string(),
310        ];
311
312        // Add all node addresses using container hostnames (internal port is always 6379)
313        for i in 0..self.total_nodes() {
314            let host = format!("{}-node-{}", self.name, i);
315            let port = 6379;
316            create_args.push(format!("{}:{}", host, port));
317        }
318
319        // Add replicas configuration
320        if self.num_replicas > 0 {
321            create_args.push("--cluster-replicas".to_string());
322            create_args.push(self.num_replicas.to_string());
323        }
324
325        // Add password if configured
326        if let Some(ref password) = self.password {
327            create_args.push("-a".to_string());
328            create_args.push(password.clone());
329        }
330
331        // Auto-accept the configuration
332        create_args.push("--cluster-yes".to_string());
333
334        // Execute cluster create in the first container
335        let first_node_name = format!("{}-node-0", self.name);
336
337        ExecCommand::new(&first_node_name, create_args)
338            .execute()
339            .await?;
340
341        Ok(())
342    }
343
344    /// Check cluster status
345    pub async fn cluster_info(&self) -> Result<ClusterInfo, TemplateError> {
346        let node_name = format!("{}-node-0", self.name);
347
348        let mut info_args = vec![
349            "redis-cli".to_string(),
350            "--cluster".to_string(),
351            "info".to_string(),
352            format!("{}-node-0:6379", self.name),
353        ];
354
355        if let Some(ref password) = self.password {
356            info_args.push("-a".to_string());
357            info_args.push(password.clone());
358        }
359
360        let output = ExecCommand::new(&node_name, info_args).execute().await?;
361
362        // Parse the cluster info output
363        ClusterInfo::from_output(&output.stdout)
364    }
365}
366
367#[async_trait]
368impl Template for RedisClusterTemplate {
369    fn name(&self) -> &str {
370        &self.name
371    }
372
373    fn config(&self) -> &TemplateConfig {
374        // Return a dummy config as cluster doesn't map to single container
375        unimplemented!("RedisClusterTemplate manages multiple containers")
376    }
377
378    fn config_mut(&mut self) -> &mut TemplateConfig {
379        unimplemented!("RedisClusterTemplate manages multiple containers")
380    }
381
382    async fn start(&self) -> Result<String, TemplateError> {
383        // Create network first
384        let _network_id = self.create_network().await?;
385
386        // Start all nodes
387        let mut container_ids = Vec::new();
388        for i in 0..self.total_nodes() {
389            let id = self.start_node(i).await?;
390            container_ids.push(id);
391        }
392
393        // Initialize the cluster
394        self.initialize_cluster(&container_ids).await?;
395
396        // Start RedisInsight if enabled
397        let insight_info = if self.with_redis_insight {
398            let _insight_id = self.start_redis_insight().await?;
399            format!(
400                ", RedisInsight UI at http://localhost:{}",
401                self.redis_insight_port
402            )
403        } else {
404            String::new()
405        };
406
407        // Return a summary
408        Ok(format!(
409            "Redis Cluster '{}' started with {} nodes ({} masters, {} replicas){}",
410            self.name,
411            self.total_nodes(),
412            self.num_masters,
413            self.num_masters * self.num_replicas,
414            insight_info
415        ))
416    }
417
418    async fn stop(&self) -> Result<(), TemplateError> {
419        use crate::StopCommand;
420
421        // Stop all nodes
422        for i in 0..self.total_nodes() {
423            let node_name = format!("{}-node-{}", self.name, i);
424            let _ = StopCommand::new(&node_name).execute().await;
425        }
426
427        // Stop RedisInsight if it was started
428        if self.with_redis_insight {
429            let insight_name = format!("{}-insight", self.name);
430            let _ = StopCommand::new(&insight_name).execute().await;
431        }
432
433        Ok(())
434    }
435
436    async fn remove(&self) -> Result<(), TemplateError> {
437        use crate::{NetworkRmCommand, RmCommand};
438
439        // Remove all containers
440        for i in 0..self.total_nodes() {
441            let node_name = format!("{}-node-{}", self.name, i);
442            let _ = RmCommand::new(&node_name).force().volumes().execute().await;
443        }
444
445        // Remove RedisInsight if it was started
446        if self.with_redis_insight {
447            let insight_name = format!("{}-insight", self.name);
448            let _ = RmCommand::new(&insight_name)
449                .force()
450                .volumes()
451                .execute()
452                .await;
453        }
454
455        // Remove the network
456        let _ = NetworkRmCommand::new(&self.network_name).execute().await;
457
458        Ok(())
459    }
460}
461
462/// Cluster information
463#[derive(Debug, Clone)]
464pub struct ClusterInfo {
465    /// Current state of the cluster (ok/fail)
466    pub cluster_state: String,
467    /// Total number of hash slots (always 16384 for Redis)
468    pub total_slots: u16,
469    /// List of nodes in the cluster
470    pub nodes: Vec<NodeInfo>,
471}
472
473impl ClusterInfo {
474    #[allow(clippy::unnecessary_wraps)]
475    fn from_output(_output: &str) -> Result<Self, TemplateError> {
476        // Basic parsing - would need more sophisticated parsing in production
477        Ok(ClusterInfo {
478            cluster_state: "ok".to_string(),
479            total_slots: 16384,
480            nodes: Vec::new(),
481        })
482    }
483}
484
485/// Information about a cluster node
486#[derive(Debug, Clone)]
487pub struct NodeInfo {
488    /// Node ID in the cluster
489    pub id: String,
490    /// Hostname or IP address
491    pub host: String,
492    /// Port number
493    pub port: u16,
494    /// Role of the node (Master/Replica)
495    pub role: NodeRole,
496    /// Slot ranges assigned to this node (start, end)
497    pub slots: Vec<(u16, u16)>,
498}
499
500/// Node role in the cluster
501#[derive(Debug, Clone, PartialEq)]
502pub enum NodeRole {
503    /// Master node that owns hash slots
504    Master,
505    /// Replica node that replicates a master
506    Replica,
507}
508
509/// Connection helper for Redis Cluster
510pub struct RedisClusterConnection {
511    nodes: Vec<String>,
512    password: Option<String>,
513}
514
515impl RedisClusterConnection {
516    /// Create from a RedisClusterTemplate
517    pub fn from_template(template: &RedisClusterTemplate) -> Self {
518        let host = template.announce_ip.as_deref().unwrap_or("localhost");
519        let mut nodes = Vec::new();
520
521        for i in 0..template.total_nodes() {
522            let port = template.port_base + i as u16;
523            nodes.push(format!("{}:{}", host, port));
524        }
525
526        Self {
527            nodes,
528            password: template.password.clone(),
529        }
530    }
531
532    /// Get cluster nodes as comma-separated string
533    pub fn nodes_string(&self) -> String {
534        self.nodes.join(",")
535    }
536
537    /// Get connection URL for cluster-aware clients
538    pub fn cluster_url(&self) -> String {
539        let auth = self
540            .password
541            .as_ref()
542            .map(|p| format!(":{}@", p))
543            .unwrap_or_default();
544
545        format!("redis-cluster://{}{}", auth, self.nodes.join(","))
546    }
547}
548
549#[cfg(test)]
550mod tests {
551    use super::*;
552
553    #[test]
554    fn test_redis_cluster_template_basic() {
555        let template = RedisClusterTemplate::new("test-cluster");
556        assert_eq!(template.name, "test-cluster");
557        assert_eq!(template.num_masters, 3);
558        assert_eq!(template.num_replicas, 0);
559        assert_eq!(template.port_base, 7000);
560    }
561
562    #[test]
563    fn test_redis_cluster_template_with_replicas() {
564        let template = RedisClusterTemplate::new("test-cluster")
565            .num_masters(3)
566            .num_replicas(1);
567
568        assert_eq!(template.total_nodes(), 6);
569    }
570
571    #[test]
572    fn test_redis_cluster_template_minimum_masters() {
573        let template = RedisClusterTemplate::new("test-cluster").num_masters(2); // Should be forced to 3
574
575        assert_eq!(template.num_masters, 3);
576    }
577
578    #[test]
579    fn test_redis_cluster_connection() {
580        let template = RedisClusterTemplate::new("test-cluster")
581            .num_masters(3)
582            .port_base(7000)
583            .password("secret");
584
585        let conn = RedisClusterConnection::from_template(&template);
586        assert_eq!(conn.nodes.len(), 3);
587        assert_eq!(conn.nodes[0], "localhost:7000");
588        assert_eq!(
589            conn.cluster_url(),
590            "redis-cluster://:secret@localhost:7000,localhost:7001,localhost:7002"
591        );
592    }
593
594    #[test]
595    fn test_redis_cluster_with_stack_and_insight() {
596        let template = RedisClusterTemplate::new("test-cluster")
597            .num_masters(3)
598            .with_redis_stack()
599            .with_redis_insight()
600            .redis_insight_port(8080);
601
602        assert!(template.use_redis_stack);
603        assert!(template.with_redis_insight);
604        assert_eq!(template.redis_insight_port, 8080);
605    }
606}