1#![allow(clippy::doc_markdown)]
9#![allow(clippy::must_use_candidate)]
10#![allow(clippy::return_self_not_must_use)]
11#![allow(clippy::needless_borrows_for_generic_args)]
12#![allow(clippy::unnecessary_get_then_check)]
13
14use super::common::{DEFAULT_REDIS_IMAGE, DEFAULT_REDIS_TAG};
15use crate::template::{Template, TemplateConfig, TemplateError};
16use crate::{DockerCommand, NetworkCreateCommand, RunCommand};
17use async_trait::async_trait;
18
19pub struct RedisSentinelTemplate {
21 name: String,
22 master_name: String,
23 num_replicas: usize,
24 num_sentinels: usize,
25 quorum: usize,
26 master_port: u16,
27 replica_port_base: u16,
28 sentinel_port_base: u16,
29 password: Option<String>,
30 down_after_milliseconds: u32,
31 failover_timeout: u32,
32 parallel_syncs: u32,
33 persistence: bool,
34 network: Option<String>,
35 announce_ip: Option<String>,
37 redis_image: Option<String>,
39 redis_tag: Option<String>,
41 platform: Option<String>,
43}
44
45impl RedisSentinelTemplate {
46 pub fn new(name: impl Into<String>) -> Self {
48 Self {
49 name: name.into(),
50 master_name: "mymaster".to_string(),
51 num_replicas: 2,
52 num_sentinels: 3,
53 quorum: 2,
54 master_port: 6379,
55 replica_port_base: 6380,
56 sentinel_port_base: 26379,
57 password: None,
58 down_after_milliseconds: 5000,
59 failover_timeout: 10000,
60 parallel_syncs: 1,
61 persistence: false,
62 network: None,
63 announce_ip: None,
64 redis_image: None,
65 redis_tag: None,
66 platform: None,
67 }
68 }
69
70 pub fn master_name(mut self, name: impl Into<String>) -> Self {
72 self.master_name = name.into();
73 self
74 }
75
76 pub fn num_replicas(mut self, num: usize) -> Self {
78 self.num_replicas = num;
79 self
80 }
81
82 pub fn num_sentinels(mut self, num: usize) -> Self {
84 self.num_sentinels = num;
85 self
86 }
87
88 pub fn quorum(mut self, quorum: usize) -> Self {
90 self.quorum = quorum;
91 self
92 }
93
94 pub fn master_port(mut self, port: u16) -> Self {
96 self.master_port = port;
97 self
98 }
99
100 pub fn replica_port_base(mut self, port: u16) -> Self {
102 self.replica_port_base = port;
103 self
104 }
105
106 pub fn sentinel_port_base(mut self, port: u16) -> Self {
108 self.sentinel_port_base = port;
109 self
110 }
111
112 pub fn password(mut self, password: impl Into<String>) -> Self {
114 self.password = Some(password.into());
115 self
116 }
117
118 pub fn down_after_milliseconds(mut self, ms: u32) -> Self {
120 self.down_after_milliseconds = ms;
121 self
122 }
123
124 pub fn failover_timeout(mut self, ms: u32) -> Self {
126 self.failover_timeout = ms;
127 self
128 }
129
130 pub fn parallel_syncs(mut self, num: u32) -> Self {
132 self.parallel_syncs = num;
133 self
134 }
135
136 pub fn with_persistence(mut self) -> Self {
138 self.persistence = true;
139 self
140 }
141
142 pub fn network(mut self, network: impl Into<String>) -> Self {
144 self.network = Some(network.into());
145 self
146 }
147
148 pub fn announce_ip(mut self, ip: impl Into<String>) -> Self {
163 self.announce_ip = Some(ip.into());
164 self
165 }
166
167 pub fn custom_redis_image(mut self, image: impl Into<String>, tag: impl Into<String>) -> Self {
169 self.redis_image = Some(image.into());
170 self.redis_tag = Some(tag.into());
171 self
172 }
173
174 pub fn platform(mut self, platform: impl Into<String>) -> Self {
176 self.platform = Some(platform.into());
177 self
178 }
179
180 pub async fn start(self) -> Result<SentinelConnectionInfo, crate::Error> {
188 self.start_topology().await
189 }
190
191 fn resolved_host(&self) -> String {
195 self.announce_ip
196 .clone()
197 .unwrap_or_else(|| "localhost".to_string())
198 }
199
200 fn replica_port(&self, index: usize) -> u16 {
202 self.replica_port_base + u16::try_from(index).unwrap_or(0)
203 }
204
205 fn sentinel_port(&self, index: usize) -> u16 {
207 self.sentinel_port_base + u16::try_from(index).unwrap_or(0)
208 }
209
210 async fn start_topology(&self) -> Result<SentinelConnectionInfo, crate::Error> {
215 let network_name = self
216 .network
217 .clone()
218 .unwrap_or_else(|| format!("{}-network", self.name));
219
220 if self.network.is_none() {
222 NetworkCreateCommand::new(&network_name)
223 .execute()
224 .await
225 .map_err(|e| crate::Error::Custom {
226 message: format!("Failed to create network: {e}"),
227 })?;
228 }
229
230 let master_name = format!("{}-master", self.name);
232 let mut master_cmd = self.build_redis_command(&master_name, self.master_port, None, None);
233 master_cmd = master_cmd.network(&network_name);
234
235 master_cmd
236 .execute()
237 .await
238 .map_err(|e| crate::Error::Custom {
239 message: format!("Failed to start master: {e}"),
240 })?;
241
242 let mut replica_containers = Vec::new();
244 for i in 0..self.num_replicas {
245 let replica_name = format!("{}-replica-{}", self.name, i + 1);
246 let replica_port = self.replica_port(i);
247
248 let mut replica_cmd = self.build_redis_command(
249 &replica_name,
250 replica_port,
251 Some(&master_name),
252 Some(replica_port),
253 );
254 replica_cmd = replica_cmd.network(&network_name);
255
256 replica_cmd
257 .execute()
258 .await
259 .map_err(|e| crate::Error::Custom {
260 message: format!("Failed to start replica {}: {e}", i + 1),
261 })?;
262
263 replica_containers.push(replica_name);
264 }
265
266 let sentinel_config = self.build_sentinel_config(&master_name);
268
269 let mut sentinel_containers = Vec::new();
271 for i in 0..self.num_sentinels {
272 let sentinel_name = format!("{}-sentinel-{}", self.name, i + 1);
273 let sentinel_port = self.sentinel_port(i);
274
275 let sentinel_config = if self.announce_ip.is_some() {
278 format!("{sentinel_config}\nsentinel announce-port {sentinel_port}")
279 } else {
280 sentinel_config.clone()
281 };
282
283 let mut sentinel_cmd = Self::build_sentinel_command(
284 &sentinel_name,
285 sentinel_port,
286 &sentinel_config,
287 self.redis_image.as_deref(),
288 self.redis_tag.as_deref(),
289 self.platform.as_deref(),
290 );
291 sentinel_cmd = sentinel_cmd.network(&network_name);
292
293 sentinel_cmd
294 .execute()
295 .await
296 .map_err(|e| crate::Error::Custom {
297 message: format!("Failed to start sentinel {}: {e}", i + 1),
298 })?;
299
300 sentinel_containers.push((sentinel_name, sentinel_port));
301 }
302
303 let host = self.resolved_host();
304
305 Ok(SentinelConnectionInfo {
306 name: self.name.clone(),
307 master_name: self.master_name.clone(),
308 master_host: host.clone(),
309 master_port: self.master_port,
310 replica_ports: (0..self.num_replicas)
311 .map(|i| self.replica_port(i))
312 .collect(),
313 sentinels: sentinel_containers
314 .into_iter()
315 .map(|(_, port)| SentinelInfo {
316 host: host.clone(),
317 port,
318 })
319 .collect(),
320 password: self.password.clone(),
321 network: network_name,
322 containers: {
323 let mut containers = vec![master_name];
324 containers.extend(replica_containers);
325 containers.extend(
326 (0..self.num_sentinels).map(|i| format!("{}-sentinel-{}", self.name, i + 1)),
327 );
328 containers
329 },
330 })
331 }
332
333 fn build_redis_command(
339 &self,
340 name: &str,
341 port: u16,
342 master: Option<&str>,
343 announce_port: Option<u16>,
344 ) -> RunCommand {
345 let image = if let Some(ref custom_image) = self.redis_image {
347 if let Some(ref tag) = self.redis_tag {
348 format!("{custom_image}:{tag}")
349 } else {
350 custom_image.clone()
351 }
352 } else {
353 format!("{DEFAULT_REDIS_IMAGE}:{DEFAULT_REDIS_TAG}")
354 };
355
356 let mut cmd = RunCommand::new(image).name(name).port(port, 6379).detach();
357
358 if let Some(ref platform) = self.platform {
360 cmd = cmd.platform(platform);
361 }
362
363 if self.persistence {
365 cmd = cmd.volume(format!("{name}-data"), "/data");
366 }
367
368 let mut args = Vec::new();
370
371 if let Some(master_name) = master {
373 args.push(format!("--replicaof {master_name} 6379"));
374 }
375
376 if let Some(ref password) = self.password {
378 args.push(format!("--requirepass {password}"));
379 if master.is_some() {
380 args.push(format!("--masterauth {password}"));
381 }
382 }
383
384 args.push("--protected-mode no".to_string());
386
387 if let Some(ref ip) = self.announce_ip {
390 args.push(format!("--replica-announce-ip {ip}"));
391 if let Some(announce_port) = announce_port {
392 args.push(format!("--replica-announce-port {announce_port}"));
393 }
394 }
395
396 if !args.is_empty() {
397 cmd = cmd.entrypoint("redis-server").cmd(args);
398 }
399
400 cmd
401 }
402
403 fn build_sentinel_command(
405 name: &str,
406 port: u16,
407 config: &str,
408 redis_image: Option<&str>,
409 redis_tag: Option<&str>,
410 platform: Option<&str>,
411 ) -> RunCommand {
412 let image = if let Some(custom_image) = redis_image {
414 if let Some(tag) = redis_tag {
415 format!("{custom_image}:{tag}")
416 } else {
417 custom_image.to_string()
418 }
419 } else {
420 format!("{DEFAULT_REDIS_IMAGE}:{DEFAULT_REDIS_TAG}")
421 };
422
423 let mut cmd = RunCommand::new(image).name(name).port(port, 26379).detach();
424
425 if let Some(platform) = platform {
427 cmd = cmd.platform(platform);
428 }
429
430 let config_cmd = format!(
432 "echo '{}' > /tmp/sentinel.conf && redis-sentinel /tmp/sentinel.conf",
433 config.replace('\'', "'\\''").replace('\n', "\\n")
434 );
435
436 cmd = cmd.entrypoint("sh").cmd(vec!["-c".to_string(), config_cmd]);
437
438 cmd
439 }
440
441 fn build_sentinel_config(&self, master_container: &str) -> String {
443 let mut config = Vec::new();
444
445 config.push("port 26379".to_string());
446
447 if let Some(ref ip) = self.announce_ip {
454 config.push(format!(
455 "sentinel monitor {} {} {} {}",
456 self.master_name, ip, self.master_port, self.quorum
457 ));
458 config.push(format!("sentinel announce-ip {ip}"));
459 } else {
460 config.push(format!(
461 "sentinel monitor {} {} 6379 {}",
462 self.master_name, master_container, self.quorum
463 ));
464 }
465
466 if let Some(ref password) = self.password {
467 config.push(format!(
468 "sentinel auth-pass {} {}",
469 self.master_name, password
470 ));
471 }
472
473 config.push(format!(
474 "sentinel down-after-milliseconds {} {}",
475 self.master_name, self.down_after_milliseconds
476 ));
477 config.push(format!(
478 "sentinel failover-timeout {} {}",
479 self.master_name, self.failover_timeout
480 ));
481 config.push(format!(
482 "sentinel parallel-syncs {} {}",
483 self.master_name, self.parallel_syncs
484 ));
485
486 config.join("\n")
487 }
488
489 fn container_names(&self) -> Vec<String> {
494 let mut names = vec![format!("{}-master", self.name)];
495 names.extend((0..self.num_replicas).map(|i| format!("{}-replica-{}", self.name, i + 1)));
496 names.extend((0..self.num_sentinels).map(|i| format!("{}-sentinel-{}", self.name, i + 1)));
497 names
498 }
499
500 fn build_ping_args(&self) -> Vec<String> {
502 let mut args = vec!["redis-cli".to_string()];
503 if let Some(ref password) = self.password {
504 args.push("-a".to_string());
505 args.push(password.clone());
506 }
507 args.push("ping".to_string());
508 args
509 }
510
511 async fn wait_for_topology_ready(
517 &self,
518 timeout: std::time::Duration,
519 ) -> Result<(), TemplateError> {
520 use crate::ExecCommand;
521
522 let ping_args = self.build_ping_args();
523 let check_interval = std::time::Duration::from_millis(500);
524 let start = std::time::Instant::now();
525
526 let mut targets: Vec<String> = vec![format!("{}-master", self.name)];
528 targets
529 .extend((0..self.num_sentinels).map(|i| format!("{}-sentinel-{}", self.name, i + 1)));
530
531 let mut pending = targets;
532
533 loop {
534 let mut still_pending = Vec::new();
535 for name in &pending {
536 let ready = ExecCommand::new(name, ping_args.clone())
537 .execute()
538 .await
539 .is_ok_and(|output| output.stdout.trim().eq_ignore_ascii_case("PONG"));
540
541 if !ready {
542 still_pending.push(name.clone());
543 }
544 }
545
546 if still_pending.is_empty() {
547 return Ok(());
548 }
549 pending = still_pending;
550
551 if start.elapsed() >= timeout {
552 return Err(TemplateError::Timeout(format!(
553 "Sentinel topology '{}' containers [{}] did not respond to PING within {:?}",
554 self.name,
555 pending.join(", "),
556 timeout
557 )));
558 }
559
560 tokio::time::sleep(check_interval).await;
561 }
562 }
563}
564
565#[async_trait]
566impl Template for RedisSentinelTemplate {
567 fn name(&self) -> &str {
568 &self.name
569 }
570
571 fn config(&self) -> &TemplateConfig {
572 unimplemented!("RedisSentinelTemplate manages multiple containers")
574 }
575
576 fn config_mut(&mut self) -> &mut TemplateConfig {
577 unimplemented!("RedisSentinelTemplate manages multiple containers")
578 }
579
580 async fn start(&self) -> Result<String, TemplateError> {
581 let info = self.start_topology().await?;
582 Ok(format!(
583 "Redis Sentinel '{}' started with master, {} replica(s) and {} sentinel(s) (master at {}:{})",
584 self.name,
585 self.num_replicas,
586 self.num_sentinels,
587 info.master_host,
588 info.master_port
589 ))
590 }
591
592 async fn start_and_wait(&self) -> Result<String, TemplateError> {
593 let summary = self.start().await?;
596 self.wait_for_ready().await?;
597 Ok(summary)
598 }
599
600 async fn is_running(&self) -> Result<bool, TemplateError> {
601 use crate::PsCommand;
602
603 let master = format!("{}-master", self.name);
605 let output = PsCommand::new()
606 .filter(format!("name={master}"))
607 .quiet()
608 .execute()
609 .await?;
610
611 Ok(!output.stdout.trim().is_empty())
612 }
613
614 async fn wait_for_ready(&self) -> Result<(), TemplateError> {
615 self.wait_for_topology_ready(std::time::Duration::from_secs(60))
616 .await
617 }
618
619 async fn stop(&self) -> Result<(), TemplateError> {
620 use crate::StopCommand;
621
622 for name in self.container_names() {
623 let _ = StopCommand::new(&name).execute().await;
624 }
625
626 Ok(())
627 }
628
629 async fn remove(&self) -> Result<(), TemplateError> {
630 use crate::{NetworkRmCommand, RmCommand};
631
632 for name in self.container_names() {
633 let _ = RmCommand::new(&name).force().volumes().execute().await;
634 }
635
636 if self.network.is_none() {
638 let network_name = format!("{}-network", self.name);
639 let _ = NetworkRmCommand::new(&network_name).execute().await;
640 }
641
642 Ok(())
643 }
644}
645
646pub struct SentinelConnectionInfo {
648 pub name: String,
650 pub master_name: String,
652 pub master_host: String,
654 pub master_port: u16,
656 pub replica_ports: Vec<u16>,
658 pub sentinels: Vec<SentinelInfo>,
660 pub password: Option<String>,
662 pub network: String,
664 pub containers: Vec<String>,
666}
667
668pub struct SentinelInfo {
670 pub host: String,
672 pub port: u16,
674}
675
676impl SentinelConnectionInfo {
677 pub fn master_url(&self) -> String {
679 if let Some(ref password) = self.password {
680 format!(
681 "redis://default:{}@{}:{}",
682 password, self.master_host, self.master_port
683 )
684 } else {
685 format!("redis://{}:{}", self.master_host, self.master_port)
686 }
687 }
688
689 pub fn sentinel_urls(&self) -> Vec<String> {
691 self.sentinels
692 .iter()
693 .map(|s| format!("redis://{}:{}", s.host, s.port))
694 .collect()
695 }
696
697 pub async fn stop(self) -> Result<(), crate::Error> {
705 use crate::{NetworkRmCommand, RmCommand, StopCommand};
706
707 for container in &self.containers {
709 StopCommand::new(container)
710 .execute()
711 .await
712 .map_err(|e| crate::Error::Custom {
713 message: format!("Failed to stop {container}: {e}"),
714 })?;
715
716 RmCommand::new(container)
717 .force()
718 .volumes()
719 .execute()
720 .await
721 .map_err(|e| crate::Error::Custom {
722 message: format!("Failed to remove {container}: {e}"),
723 })?;
724 }
725
726 if self.network.starts_with(&self.name) {
728 NetworkRmCommand::new(&self.network)
729 .execute()
730 .await
731 .map_err(|e| crate::Error::Custom {
732 message: format!("Failed to remove network: {e}"),
733 })?;
734 }
735
736 Ok(())
737 }
738}
739
740#[cfg(test)]
741mod tests {
742 use super::*;
743
744 #[test]
745 fn test_sentinel_template_defaults() {
746 let template = RedisSentinelTemplate::new("test-sentinel");
747 assert_eq!(template.name, "test-sentinel");
748 assert_eq!(template.master_name, "mymaster");
749 assert_eq!(template.num_replicas, 2);
750 assert_eq!(template.num_sentinels, 3);
751 assert_eq!(template.quorum, 2);
752 }
753
754 #[test]
755 fn test_sentinel_template_builder() {
756 let template = RedisSentinelTemplate::new("test-sentinel")
757 .master_name("primary")
758 .num_replicas(3)
759 .num_sentinels(5)
760 .quorum(3)
761 .password("secret")
762 .with_persistence();
763
764 assert_eq!(template.master_name, "primary");
765 assert_eq!(template.num_replicas, 3);
766 assert_eq!(template.num_sentinels, 5);
767 assert_eq!(template.quorum, 3);
768 assert_eq!(template.password, Some("secret".to_string()));
769 assert!(template.persistence);
770 }
771
772 #[test]
773 fn test_sentinel_config_generation() {
774 let template = RedisSentinelTemplate::new("test")
775 .master_name("mymaster")
776 .password("secret")
777 .quorum(2);
778
779 let config = template.build_sentinel_config("redis-master");
780
781 assert!(config.contains("sentinel monitor mymaster redis-master 6379 2"));
782 assert!(config.contains("sentinel auth-pass mymaster secret"));
783 assert!(config.contains("sentinel down-after-milliseconds mymaster 5000"));
784 }
785
786 #[test]
787 fn test_sentinel_config_without_announce_uses_container_host() {
788 let template = RedisSentinelTemplate::new("test").master_name("mymaster");
789 let config = template.build_sentinel_config("test-master");
790
791 assert!(config.contains("sentinel monitor mymaster test-master 6379 2"));
792 assert!(!config.contains("sentinel announce-ip"));
793 }
794
795 #[test]
796 fn test_sentinel_config_with_announce_uses_announced_master_address() {
797 let template = RedisSentinelTemplate::new("test")
798 .master_name("mymaster")
799 .master_port(6390)
800 .quorum(2)
801 .announce_ip("127.0.0.1");
802
803 let config = template.build_sentinel_config("test-master");
804
805 assert!(config.contains("sentinel monitor mymaster 127.0.0.1 6390 2"));
808 assert!(config.contains("sentinel announce-ip 127.0.0.1"));
809 assert!(!config.contains("sentinel monitor mymaster test-master"));
810 }
811
812 #[test]
813 fn test_resolved_host_defaults_to_localhost() {
814 let template = RedisSentinelTemplate::new("test");
815 assert_eq!(template.resolved_host(), "localhost");
816 }
817
818 #[test]
819 fn test_resolved_host_uses_announce_ip() {
820 let template = RedisSentinelTemplate::new("test").announce_ip("10.0.0.5");
821 assert_eq!(template.resolved_host(), "10.0.0.5");
822 }
823
824 #[test]
825 fn test_replica_command_includes_announce_args() {
826 let template = RedisSentinelTemplate::new("test").announce_ip("127.0.0.1");
827
828 let cmd =
829 template.build_redis_command("test-replica-1", 6381, Some("test-master"), Some(6381));
830 let args = cmd.build_command_args();
831 let joined = args.join(" ");
832
833 assert!(joined.contains("--replica-announce-ip 127.0.0.1"));
834 assert!(joined.contains("--replica-announce-port 6381"));
835 }
836
837 #[test]
838 fn test_replica_command_without_announce_has_no_announce_args() {
839 let template = RedisSentinelTemplate::new("test");
840
841 let cmd =
842 template.build_redis_command("test-replica-1", 6381, Some("test-master"), Some(6381));
843 let joined = cmd.build_command_args().join(" ");
844
845 assert!(!joined.contains("--replica-announce-ip"));
846 assert!(!joined.contains("--replica-announce-port"));
847 }
848
849 #[test]
850 fn test_build_ping_args_without_password() {
851 let template = RedisSentinelTemplate::new("test");
852 assert_eq!(template.build_ping_args(), vec!["redis-cli", "ping"]);
853 }
854
855 #[test]
856 fn test_build_ping_args_with_password() {
857 let template = RedisSentinelTemplate::new("test").password("secret");
858 assert_eq!(
859 template.build_ping_args(),
860 vec!["redis-cli", "-a", "secret", "ping"]
861 );
862 }
863
864 #[test]
865 fn test_container_names() {
866 let template = RedisSentinelTemplate::new("test")
867 .num_replicas(2)
868 .num_sentinels(3);
869
870 assert_eq!(
871 template.container_names(),
872 vec![
873 "test-master",
874 "test-replica-1",
875 "test-replica-2",
876 "test-sentinel-1",
877 "test-sentinel-2",
878 "test-sentinel-3",
879 ]
880 );
881 }
882
883 #[test]
884 fn test_template_trait_name() {
885 let template = RedisSentinelTemplate::new("test-sentinel");
886 assert_eq!(Template::name(&template), "test-sentinel");
887 }
888}