docker_wrapper/template/redis/
cluster.rs

1//! Redis Cluster template for multi-node Redis setup with sharding and replication
2
3#![allow(clippy::doc_markdown)]
4#![allow(clippy::must_use_candidate)]
5#![allow(clippy::return_self_not_must_use)]
6#![allow(clippy::uninlined_format_args)]
7#![allow(clippy::cast_possible_truncation)]
8#![allow(clippy::missing_errors_doc)]
9
10use crate::template::{Template, TemplateConfig, TemplateError};
11use crate::{DockerCommand, ExecCommand, NetworkCreateCommand, RunCommand};
12use async_trait::async_trait;
13
14/// Redis Cluster template for automatic multi-node cluster setup
15pub struct RedisClusterTemplate {
16    /// Base name for the cluster
17    name: String,
18    /// Number of master nodes (minimum 3)
19    num_masters: usize,
20    /// Number of replicas per master
21    num_replicas: usize,
22    /// Base port for Redis nodes
23    port_base: u16,
24    /// Network name for cluster communication
25    network_name: String,
26    /// Password for cluster authentication
27    password: Option<String>,
28    /// IP to announce to other nodes
29    announce_ip: Option<String>,
30    /// Volume prefix for persistence
31    volume_prefix: Option<String>,
32    /// Memory limit per node
33    memory_limit: Option<String>,
34    /// Cluster node timeout in milliseconds
35    node_timeout: u32,
36    /// Whether to remove containers on stop
37    auto_remove: bool,
38    /// Whether to use Redis Stack instead of standard Redis
39    use_redis_stack: bool,
40    /// Whether to include RedisInsight GUI
41    with_redis_insight: bool,
42    /// Port for RedisInsight UI
43    redis_insight_port: u16,
44    /// Custom Redis image
45    redis_image: Option<String>,
46    /// Custom Redis tag
47    redis_tag: Option<String>,
48    /// Platform for containers
49    platform: Option<String>,
50}
51
52impl RedisClusterTemplate {
53    /// Create a new Redis Cluster template with default settings
54    pub fn new(name: impl Into<String>) -> Self {
55        let name = name.into();
56        let network_name = format!("{}-network", name);
57
58        Self {
59            name,
60            num_masters: 3,
61            num_replicas: 0,
62            port_base: 7000,
63            network_name,
64            password: None,
65            announce_ip: None,
66            volume_prefix: None,
67            memory_limit: None,
68            node_timeout: 5000,
69            auto_remove: false,
70            use_redis_stack: false,
71            with_redis_insight: false,
72            redis_insight_port: 8001,
73            redis_image: None,
74            redis_tag: None,
75            platform: None,
76        }
77    }
78
79    /// Set the number of master nodes (minimum 3)
80    pub fn num_masters(mut self, masters: usize) -> Self {
81        self.num_masters = masters.max(3);
82        self
83    }
84
85    /// Set the number of replicas per master
86    pub fn num_replicas(mut self, replicas: usize) -> Self {
87        self.num_replicas = replicas;
88        self
89    }
90
91    /// Set the base port for Redis nodes
92    pub fn port_base(mut self, port: u16) -> Self {
93        self.port_base = port;
94        self
95    }
96
97    /// Set cluster password
98    pub fn password(mut self, password: impl Into<String>) -> Self {
99        self.password = Some(password.into());
100        self
101    }
102
103    /// Set the IP to announce to other cluster nodes
104    pub fn cluster_announce_ip(mut self, ip: impl Into<String>) -> Self {
105        self.announce_ip = Some(ip.into());
106        self
107    }
108
109    /// Enable persistence with volume prefix
110    pub fn with_persistence(mut self, volume_prefix: impl Into<String>) -> Self {
111        self.volume_prefix = Some(volume_prefix.into());
112        self
113    }
114
115    /// Set memory limit per node
116    pub fn memory_limit(mut self, limit: impl Into<String>) -> Self {
117        self.memory_limit = Some(limit.into());
118        self
119    }
120
121    /// Set cluster node timeout in milliseconds
122    pub fn cluster_node_timeout(mut self, timeout: u32) -> Self {
123        self.node_timeout = timeout;
124        self
125    }
126
127    /// Enable auto-remove when stopped
128    pub fn auto_remove(mut self) -> Self {
129        self.auto_remove = true;
130        self
131    }
132
133    /// Use Redis Stack instead of standard Redis (includes modules like JSON, Search, Graph, TimeSeries, Bloom)
134    pub fn with_redis_stack(mut self) -> Self {
135        self.use_redis_stack = true;
136        self
137    }
138
139    /// Enable RedisInsight GUI for cluster visualization and management
140    pub fn with_redis_insight(mut self) -> Self {
141        self.with_redis_insight = true;
142        self
143    }
144
145    /// Set the port for RedisInsight UI (default: 8001)
146    pub fn redis_insight_port(mut self, port: u16) -> Self {
147        self.redis_insight_port = port;
148        self
149    }
150
151    /// Use a custom Redis image and tag
152    pub fn custom_redis_image(mut self, image: impl Into<String>, tag: impl Into<String>) -> Self {
153        self.redis_image = Some(image.into());
154        self.redis_tag = Some(tag.into());
155        self
156    }
157
158    /// Set the platform for the containers (e.g., "linux/arm64", "linux/amd64")
159    pub fn platform(mut self, platform: impl Into<String>) -> Self {
160        self.platform = Some(platform.into());
161        self
162    }
163
164    /// Get the total number of nodes
165    fn total_nodes(&self) -> usize {
166        self.num_masters + (self.num_masters * self.num_replicas)
167    }
168
169    /// Create the cluster network
170    async fn create_network(&self) -> Result<String, TemplateError> {
171        let output = NetworkCreateCommand::new(&self.network_name)
172            .driver("bridge")
173            .execute()
174            .await?;
175
176        // Network ID is in stdout
177        Ok(output.stdout.trim().to_string())
178    }
179
180    /// Start a single Redis node
181    async fn start_node(&self, node_index: usize) -> Result<String, TemplateError> {
182        let node_name = format!("{}-node-{}", self.name, node_index);
183        let port = self.port_base + node_index as u16;
184        let cluster_port = port + 10000;
185
186        // Choose image based on custom image or Redis Stack preference
187        let image = if let Some(ref custom_image) = self.redis_image {
188            if let Some(ref tag) = self.redis_tag {
189                format!("{}:{}", custom_image, tag)
190            } else {
191                custom_image.clone()
192            }
193        } else if self.use_redis_stack {
194            "redis/redis-stack-server:latest".to_string()
195        } else {
196            "redis:7-alpine".to_string()
197        };
198
199        let mut cmd = RunCommand::new(image)
200            .name(&node_name)
201            .network(&self.network_name)
202            .port(port, 6379)
203            .port(cluster_port, 16379)
204            .detach();
205
206        // Add memory limit if specified
207        if let Some(ref limit) = self.memory_limit {
208            cmd = cmd.memory(limit);
209        }
210
211        // Add volume for persistence
212        if let Some(ref prefix) = self.volume_prefix {
213            let volume_name = format!("{}-{}", prefix, node_index);
214            cmd = cmd.volume(&volume_name, "/data");
215        }
216
217        // Add platform if specified
218        if let Some(ref platform) = self.platform {
219            cmd = cmd.platform(platform);
220        }
221
222        // Auto-remove
223        if self.auto_remove {
224            cmd = cmd.remove();
225        }
226
227        // Build Redis command with cluster configuration
228        let mut redis_args = vec![
229            "redis-server".to_string(),
230            "--cluster-enabled".to_string(),
231            "yes".to_string(),
232            "--cluster-config-file".to_string(),
233            "nodes.conf".to_string(),
234            "--cluster-node-timeout".to_string(),
235            self.node_timeout.to_string(),
236            "--appendonly".to_string(),
237            "yes".to_string(),
238            "--port".to_string(),
239            "6379".to_string(),
240        ];
241
242        // Add password if configured
243        if let Some(ref password) = self.password {
244            redis_args.push("--requirepass".to_string());
245            redis_args.push(password.clone());
246            redis_args.push("--masterauth".to_string());
247            redis_args.push(password.clone());
248        }
249
250        // Add announce IP if configured
251        if let Some(ref ip) = self.announce_ip {
252            redis_args.push("--cluster-announce-ip".to_string());
253            redis_args.push(ip.clone());
254            redis_args.push("--cluster-announce-port".to_string());
255            redis_args.push(port.to_string());
256            redis_args.push("--cluster-announce-bus-port".to_string());
257            redis_args.push(cluster_port.to_string());
258        }
259
260        cmd = cmd.cmd(redis_args);
261
262        let output = cmd.execute().await?;
263        Ok(output.0)
264    }
265
266    /// Start RedisInsight container
267    async fn start_redis_insight(&self) -> Result<String, TemplateError> {
268        let insight_name = format!("{}-insight", self.name);
269
270        let mut cmd = RunCommand::new("redislabs/redisinsight:latest")
271            .name(&insight_name)
272            .network(&self.network_name)
273            .port(self.redis_insight_port, 8001)
274            .detach();
275
276        // Add volume for RedisInsight data persistence
277        if let Some(ref prefix) = self.volume_prefix {
278            let volume_name = format!("{}-insight", prefix);
279            cmd = cmd.volume(&volume_name, "/db");
280        }
281
282        // Auto-remove
283        if self.auto_remove {
284            cmd = cmd.remove();
285        }
286
287        // Environment variables for RedisInsight
288        cmd = cmd.env("RITRUSTEDORIGINS", "http://localhost");
289
290        let output = cmd.execute().await?;
291        Ok(output.0)
292    }
293
294    /// Initialize the cluster after all nodes are started
295    async fn initialize_cluster(&self, container_ids: &[String]) -> Result<(), TemplateError> {
296        if container_ids.is_empty() {
297            return Err(TemplateError::InvalidConfig(
298                "No containers to initialize cluster".to_string(),
299            ));
300        }
301
302        // Wait a bit for all nodes to be ready
303        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
304
305        // Build the cluster create command
306        let mut create_args = vec![
307            "redis-cli".to_string(),
308            "--cluster".to_string(),
309            "create".to_string(),
310        ];
311
312        // Add all node addresses
313        let host = self.announce_ip.as_deref().unwrap_or("127.0.0.1");
314        for i in 0..self.total_nodes() {
315            let port = self.port_base + i as u16;
316            create_args.push(format!("{}:{}", host, port));
317        }
318
319        // Add replicas configuration
320        if self.num_replicas > 0 {
321            create_args.push("--cluster-replicas".to_string());
322            create_args.push(self.num_replicas.to_string());
323        }
324
325        // Add password if configured
326        if let Some(ref password) = self.password {
327            create_args.push("-a".to_string());
328            create_args.push(password.clone());
329        }
330
331        // Auto-accept the configuration
332        create_args.push("--cluster-yes".to_string());
333
334        // Execute cluster create in the first container
335        let first_node_name = format!("{}-node-0", self.name);
336
337        ExecCommand::new(&first_node_name, create_args)
338            .execute()
339            .await?;
340
341        Ok(())
342    }
343
344    /// Check cluster status
345    pub async fn cluster_info(&self) -> Result<ClusterInfo, TemplateError> {
346        let node_name = format!("{}-node-0", self.name);
347
348        let mut info_args = vec![
349            "redis-cli".to_string(),
350            "--cluster".to_string(),
351            "info".to_string(),
352            format!(
353                "{}:{}",
354                self.announce_ip.as_deref().unwrap_or("127.0.0.1"),
355                self.port_base
356            ),
357        ];
358
359        if let Some(ref password) = self.password {
360            info_args.push("-a".to_string());
361            info_args.push(password.clone());
362        }
363
364        let output = ExecCommand::new(&node_name, info_args).execute().await?;
365
366        // Parse the cluster info output
367        ClusterInfo::from_output(&output.stdout)
368    }
369}
370
371#[async_trait]
372impl Template for RedisClusterTemplate {
373    fn name(&self) -> &str {
374        &self.name
375    }
376
377    fn config(&self) -> &TemplateConfig {
378        // Return a dummy config as cluster doesn't map to single container
379        unimplemented!("RedisClusterTemplate manages multiple containers")
380    }
381
382    fn config_mut(&mut self) -> &mut TemplateConfig {
383        unimplemented!("RedisClusterTemplate manages multiple containers")
384    }
385
386    async fn start(&self) -> Result<String, TemplateError> {
387        // Create network first
388        let _network_id = self.create_network().await?;
389
390        // Start all nodes
391        let mut container_ids = Vec::new();
392        for i in 0..self.total_nodes() {
393            let id = self.start_node(i).await?;
394            container_ids.push(id);
395        }
396
397        // Initialize the cluster
398        self.initialize_cluster(&container_ids).await?;
399
400        // Start RedisInsight if enabled
401        let insight_info = if self.with_redis_insight {
402            let _insight_id = self.start_redis_insight().await?;
403            format!(
404                ", RedisInsight UI at http://localhost:{}",
405                self.redis_insight_port
406            )
407        } else {
408            String::new()
409        };
410
411        // Return a summary
412        Ok(format!(
413            "Redis Cluster '{}' started with {} nodes ({} masters, {} replicas){}",
414            self.name,
415            self.total_nodes(),
416            self.num_masters,
417            self.num_masters * self.num_replicas,
418            insight_info
419        ))
420    }
421
422    async fn stop(&self) -> Result<(), TemplateError> {
423        use crate::StopCommand;
424
425        // Stop all nodes
426        for i in 0..self.total_nodes() {
427            let node_name = format!("{}-node-{}", self.name, i);
428            let _ = StopCommand::new(&node_name).execute().await;
429        }
430
431        // Stop RedisInsight if it was started
432        if self.with_redis_insight {
433            let insight_name = format!("{}-insight", self.name);
434            let _ = StopCommand::new(&insight_name).execute().await;
435        }
436
437        Ok(())
438    }
439
440    async fn remove(&self) -> Result<(), TemplateError> {
441        use crate::{NetworkRmCommand, RmCommand};
442
443        // Remove all containers
444        for i in 0..self.total_nodes() {
445            let node_name = format!("{}-node-{}", self.name, i);
446            let _ = RmCommand::new(&node_name).force().volumes().execute().await;
447        }
448
449        // Remove RedisInsight if it was started
450        if self.with_redis_insight {
451            let insight_name = format!("{}-insight", self.name);
452            let _ = RmCommand::new(&insight_name)
453                .force()
454                .volumes()
455                .execute()
456                .await;
457        }
458
459        // Remove the network
460        let _ = NetworkRmCommand::new(&self.network_name).execute().await;
461
462        Ok(())
463    }
464}
465
466/// Cluster information
467#[derive(Debug, Clone)]
468pub struct ClusterInfo {
469    /// Current state of the cluster (ok/fail)
470    pub cluster_state: String,
471    /// Total number of hash slots (always 16384 for Redis)
472    pub total_slots: u16,
473    /// List of nodes in the cluster
474    pub nodes: Vec<NodeInfo>,
475}
476
477impl ClusterInfo {
478    #[allow(clippy::unnecessary_wraps)]
479    fn from_output(_output: &str) -> Result<Self, TemplateError> {
480        // Basic parsing - would need more sophisticated parsing in production
481        Ok(ClusterInfo {
482            cluster_state: "ok".to_string(),
483            total_slots: 16384,
484            nodes: Vec::new(),
485        })
486    }
487}
488
489/// Information about a cluster node
490#[derive(Debug, Clone)]
491pub struct NodeInfo {
492    /// Node ID in the cluster
493    pub id: String,
494    /// Hostname or IP address
495    pub host: String,
496    /// Port number
497    pub port: u16,
498    /// Role of the node (Master/Replica)
499    pub role: NodeRole,
500    /// Slot ranges assigned to this node (start, end)
501    pub slots: Vec<(u16, u16)>,
502}
503
504/// Node role in the cluster
505#[derive(Debug, Clone, PartialEq)]
506pub enum NodeRole {
507    /// Master node that owns hash slots
508    Master,
509    /// Replica node that replicates a master
510    Replica,
511}
512
513/// Connection helper for Redis Cluster
514pub struct RedisClusterConnection {
515    nodes: Vec<String>,
516    password: Option<String>,
517}
518
519impl RedisClusterConnection {
520    /// Create from a RedisClusterTemplate
521    pub fn from_template(template: &RedisClusterTemplate) -> Self {
522        let host = template.announce_ip.as_deref().unwrap_or("localhost");
523        let mut nodes = Vec::new();
524
525        for i in 0..template.total_nodes() {
526            let port = template.port_base + i as u16;
527            nodes.push(format!("{}:{}", host, port));
528        }
529
530        Self {
531            nodes,
532            password: template.password.clone(),
533        }
534    }
535
536    /// Get cluster nodes as comma-separated string
537    pub fn nodes_string(&self) -> String {
538        self.nodes.join(",")
539    }
540
541    /// Get connection URL for cluster-aware clients
542    pub fn cluster_url(&self) -> String {
543        let auth = self
544            .password
545            .as_ref()
546            .map(|p| format!(":{}@", p))
547            .unwrap_or_default();
548
549        format!("redis-cluster://{}{}", auth, self.nodes.join(","))
550    }
551}
552
553#[cfg(test)]
554mod tests {
555    use super::*;
556
557    #[test]
558    fn test_redis_cluster_template_basic() {
559        let template = RedisClusterTemplate::new("test-cluster");
560        assert_eq!(template.name, "test-cluster");
561        assert_eq!(template.num_masters, 3);
562        assert_eq!(template.num_replicas, 0);
563        assert_eq!(template.port_base, 7000);
564    }
565
566    #[test]
567    fn test_redis_cluster_template_with_replicas() {
568        let template = RedisClusterTemplate::new("test-cluster")
569            .num_masters(3)
570            .num_replicas(1);
571
572        assert_eq!(template.total_nodes(), 6);
573    }
574
575    #[test]
576    fn test_redis_cluster_template_minimum_masters() {
577        let template = RedisClusterTemplate::new("test-cluster").num_masters(2); // Should be forced to 3
578
579        assert_eq!(template.num_masters, 3);
580    }
581
582    #[test]
583    fn test_redis_cluster_connection() {
584        let template = RedisClusterTemplate::new("test-cluster")
585            .num_masters(3)
586            .port_base(7000)
587            .password("secret");
588
589        let conn = RedisClusterConnection::from_template(&template);
590        assert_eq!(conn.nodes.len(), 3);
591        assert_eq!(conn.nodes[0], "localhost:7000");
592        assert_eq!(
593            conn.cluster_url(),
594            "redis-cluster://:secret@localhost:7000,localhost:7001,localhost:7002"
595        );
596    }
597
598    #[test]
599    fn test_redis_cluster_with_stack_and_insight() {
600        let template = RedisClusterTemplate::new("test-cluster")
601            .num_masters(3)
602            .with_redis_stack()
603            .with_redis_insight()
604            .redis_insight_port(8080);
605
606        assert!(template.use_redis_stack);
607        assert!(template.with_redis_insight);
608        assert_eq!(template.redis_insight_port, 8080);
609    }
610}