#![allow(clippy::doc_markdown)]
#![allow(clippy::must_use_candidate)]
#![allow(clippy::return_self_not_must_use)]
#![allow(clippy::needless_borrows_for_generic_args)]
#![allow(clippy::unnecessary_get_then_check)]
use super::common::{DEFAULT_REDIS_IMAGE, DEFAULT_REDIS_TAG};
use crate::template::{Template, TemplateConfig, TemplateError};
use crate::{DockerCommand, NetworkCreateCommand, RunCommand};
use async_trait::async_trait;
pub struct RedisSentinelTemplate {
name: String,
master_name: String,
num_replicas: usize,
num_sentinels: usize,
quorum: usize,
master_port: u16,
replica_port_base: u16,
sentinel_port_base: u16,
password: Option<String>,
down_after_milliseconds: u32,
failover_timeout: u32,
parallel_syncs: u32,
persistence: bool,
network: Option<String>,
announce_ip: Option<String>,
redis_image: Option<String>,
redis_tag: Option<String>,
platform: Option<String>,
}
impl RedisSentinelTemplate {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
master_name: "mymaster".to_string(),
num_replicas: 2,
num_sentinels: 3,
quorum: 2,
master_port: 6379,
replica_port_base: 6380,
sentinel_port_base: 26379,
password: None,
down_after_milliseconds: 5000,
failover_timeout: 10000,
parallel_syncs: 1,
persistence: false,
network: None,
announce_ip: None,
redis_image: None,
redis_tag: None,
platform: None,
}
}
pub fn master_name(mut self, name: impl Into<String>) -> Self {
self.master_name = name.into();
self
}
pub fn num_replicas(mut self, num: usize) -> Self {
self.num_replicas = num;
self
}
pub fn num_sentinels(mut self, num: usize) -> Self {
self.num_sentinels = num;
self
}
pub fn quorum(mut self, quorum: usize) -> Self {
self.quorum = quorum;
self
}
pub fn master_port(mut self, port: u16) -> Self {
self.master_port = port;
self
}
pub fn replica_port_base(mut self, port: u16) -> Self {
self.replica_port_base = port;
self
}
pub fn sentinel_port_base(mut self, port: u16) -> Self {
self.sentinel_port_base = port;
self
}
pub fn password(mut self, password: impl Into<String>) -> Self {
self.password = Some(password.into());
self
}
pub fn down_after_milliseconds(mut self, ms: u32) -> Self {
self.down_after_milliseconds = ms;
self
}
pub fn failover_timeout(mut self, ms: u32) -> Self {
self.failover_timeout = ms;
self
}
pub fn parallel_syncs(mut self, num: u32) -> Self {
self.parallel_syncs = num;
self
}
pub fn with_persistence(mut self) -> Self {
self.persistence = true;
self
}
pub fn network(mut self, network: impl Into<String>) -> Self {
self.network = Some(network.into());
self
}
pub fn announce_ip(mut self, ip: impl Into<String>) -> Self {
self.announce_ip = Some(ip.into());
self
}
pub fn custom_redis_image(mut self, image: impl Into<String>, tag: impl Into<String>) -> Self {
self.redis_image = Some(image.into());
self.redis_tag = Some(tag.into());
self
}
pub fn platform(mut self, platform: impl Into<String>) -> Self {
self.platform = Some(platform.into());
self
}
pub async fn start(self) -> Result<SentinelConnectionInfo, crate::Error> {
self.start_topology().await
}
fn resolved_host(&self) -> String {
self.announce_ip
.clone()
.unwrap_or_else(|| "localhost".to_string())
}
fn replica_port(&self, index: usize) -> u16 {
self.replica_port_base + u16::try_from(index).unwrap_or(0)
}
fn sentinel_port(&self, index: usize) -> u16 {
self.sentinel_port_base + u16::try_from(index).unwrap_or(0)
}
async fn start_topology(&self) -> Result<SentinelConnectionInfo, crate::Error> {
let network_name = self
.network
.clone()
.unwrap_or_else(|| format!("{}-network", self.name));
if self.network.is_none() {
NetworkCreateCommand::new(&network_name)
.execute()
.await
.map_err(|e| crate::Error::Custom {
message: format!("Failed to create network: {e}"),
})?;
}
let master_name = format!("{}-master", self.name);
let mut master_cmd = self.build_redis_command(&master_name, self.master_port, None, None);
master_cmd = master_cmd.network(&network_name);
master_cmd
.execute()
.await
.map_err(|e| crate::Error::Custom {
message: format!("Failed to start master: {e}"),
})?;
let mut replica_containers = Vec::new();
for i in 0..self.num_replicas {
let replica_name = format!("{}-replica-{}", self.name, i + 1);
let replica_port = self.replica_port(i);
let mut replica_cmd = self.build_redis_command(
&replica_name,
replica_port,
Some(&master_name),
Some(replica_port),
);
replica_cmd = replica_cmd.network(&network_name);
replica_cmd
.execute()
.await
.map_err(|e| crate::Error::Custom {
message: format!("Failed to start replica {}: {e}", i + 1),
})?;
replica_containers.push(replica_name);
}
let sentinel_config = self.build_sentinel_config(&master_name);
let mut sentinel_containers = Vec::new();
for i in 0..self.num_sentinels {
let sentinel_name = format!("{}-sentinel-{}", self.name, i + 1);
let sentinel_port = self.sentinel_port(i);
let sentinel_config = if self.announce_ip.is_some() {
format!("{sentinel_config}\nsentinel announce-port {sentinel_port}")
} else {
sentinel_config.clone()
};
let mut sentinel_cmd = Self::build_sentinel_command(
&sentinel_name,
sentinel_port,
&sentinel_config,
self.redis_image.as_deref(),
self.redis_tag.as_deref(),
self.platform.as_deref(),
);
sentinel_cmd = sentinel_cmd.network(&network_name);
sentinel_cmd
.execute()
.await
.map_err(|e| crate::Error::Custom {
message: format!("Failed to start sentinel {}: {e}", i + 1),
})?;
sentinel_containers.push((sentinel_name, sentinel_port));
}
let host = self.resolved_host();
Ok(SentinelConnectionInfo {
name: self.name.clone(),
master_name: self.master_name.clone(),
master_host: host.clone(),
master_port: self.master_port,
replica_ports: (0..self.num_replicas)
.map(|i| self.replica_port(i))
.collect(),
sentinels: sentinel_containers
.into_iter()
.map(|(_, port)| SentinelInfo {
host: host.clone(),
port,
})
.collect(),
password: self.password.clone(),
network: network_name,
containers: {
let mut containers = vec![master_name];
containers.extend(replica_containers);
containers.extend(
(0..self.num_sentinels).map(|i| format!("{}-sentinel-{}", self.name, i + 1)),
);
containers
},
})
}
fn build_redis_command(
&self,
name: &str,
port: u16,
master: Option<&str>,
announce_port: Option<u16>,
) -> RunCommand {
let image = if let Some(ref custom_image) = self.redis_image {
if let Some(ref tag) = self.redis_tag {
format!("{custom_image}:{tag}")
} else {
custom_image.clone()
}
} else {
format!("{DEFAULT_REDIS_IMAGE}:{DEFAULT_REDIS_TAG}")
};
let mut cmd = RunCommand::new(image).name(name).port(port, 6379).detach();
if let Some(ref platform) = self.platform {
cmd = cmd.platform(platform);
}
if self.persistence {
cmd = cmd.volume(format!("{name}-data"), "/data");
}
let mut args = Vec::new();
if let Some(master_name) = master {
args.push(format!("--replicaof {master_name} 6379"));
}
if let Some(ref password) = self.password {
args.push(format!("--requirepass {password}"));
if master.is_some() {
args.push(format!("--masterauth {password}"));
}
}
args.push("--protected-mode no".to_string());
if let Some(ref ip) = self.announce_ip {
args.push(format!("--replica-announce-ip {ip}"));
if let Some(announce_port) = announce_port {
args.push(format!("--replica-announce-port {announce_port}"));
}
}
if !args.is_empty() {
cmd = cmd.entrypoint("redis-server").cmd(args);
}
cmd
}
fn build_sentinel_command(
name: &str,
port: u16,
config: &str,
redis_image: Option<&str>,
redis_tag: Option<&str>,
platform: Option<&str>,
) -> RunCommand {
let image = if let Some(custom_image) = redis_image {
if let Some(tag) = redis_tag {
format!("{custom_image}:{tag}")
} else {
custom_image.to_string()
}
} else {
format!("{DEFAULT_REDIS_IMAGE}:{DEFAULT_REDIS_TAG}")
};
let mut cmd = RunCommand::new(image).name(name).port(port, 26379).detach();
if let Some(platform) = platform {
cmd = cmd.platform(platform);
}
let config_cmd = format!(
"echo '{}' > /tmp/sentinel.conf && redis-sentinel /tmp/sentinel.conf",
config.replace('\'', "'\\''").replace('\n', "\\n")
);
cmd = cmd.entrypoint("sh").cmd(vec!["-c".to_string(), config_cmd]);
cmd
}
fn build_sentinel_config(&self, master_container: &str) -> String {
let mut config = Vec::new();
config.push("port 26379".to_string());
if let Some(ref ip) = self.announce_ip {
config.push(format!(
"sentinel monitor {} {} {} {}",
self.master_name, ip, self.master_port, self.quorum
));
config.push(format!("sentinel announce-ip {ip}"));
} else {
config.push(format!(
"sentinel monitor {} {} 6379 {}",
self.master_name, master_container, self.quorum
));
}
if let Some(ref password) = self.password {
config.push(format!(
"sentinel auth-pass {} {}",
self.master_name, password
));
}
config.push(format!(
"sentinel down-after-milliseconds {} {}",
self.master_name, self.down_after_milliseconds
));
config.push(format!(
"sentinel failover-timeout {} {}",
self.master_name, self.failover_timeout
));
config.push(format!(
"sentinel parallel-syncs {} {}",
self.master_name, self.parallel_syncs
));
config.join("\n")
}
fn container_names(&self) -> Vec<String> {
let mut names = vec![format!("{}-master", self.name)];
names.extend((0..self.num_replicas).map(|i| format!("{}-replica-{}", self.name, i + 1)));
names.extend((0..self.num_sentinels).map(|i| format!("{}-sentinel-{}", self.name, i + 1)));
names
}
fn build_ping_args(&self) -> Vec<String> {
let mut args = vec!["redis-cli".to_string()];
if let Some(ref password) = self.password {
args.push("-a".to_string());
args.push(password.clone());
}
args.push("ping".to_string());
args
}
async fn wait_for_topology_ready(
&self,
timeout: std::time::Duration,
) -> Result<(), TemplateError> {
use crate::ExecCommand;
let ping_args = self.build_ping_args();
let check_interval = std::time::Duration::from_millis(500);
let start = std::time::Instant::now();
let mut targets: Vec<String> = vec![format!("{}-master", self.name)];
targets
.extend((0..self.num_sentinels).map(|i| format!("{}-sentinel-{}", self.name, i + 1)));
let mut pending = targets;
loop {
let mut still_pending = Vec::new();
for name in &pending {
let ready = ExecCommand::new(name, ping_args.clone())
.execute()
.await
.is_ok_and(|output| output.stdout.trim().eq_ignore_ascii_case("PONG"));
if !ready {
still_pending.push(name.clone());
}
}
if still_pending.is_empty() {
return Ok(());
}
pending = still_pending;
if start.elapsed() >= timeout {
return Err(TemplateError::Timeout(format!(
"Sentinel topology '{}' containers [{}] did not respond to PING within {:?}",
self.name,
pending.join(", "),
timeout
)));
}
tokio::time::sleep(check_interval).await;
}
}
}
#[async_trait]
impl Template for RedisSentinelTemplate {
fn name(&self) -> &str {
&self.name
}
fn config(&self) -> &TemplateConfig {
unimplemented!("RedisSentinelTemplate manages multiple containers")
}
fn config_mut(&mut self) -> &mut TemplateConfig {
unimplemented!("RedisSentinelTemplate manages multiple containers")
}
async fn start(&self) -> Result<String, TemplateError> {
let info = self.start_topology().await?;
Ok(format!(
"Redis Sentinel '{}' started with master, {} replica(s) and {} sentinel(s) (master at {}:{})",
self.name,
self.num_replicas,
self.num_sentinels,
info.master_host,
info.master_port
))
}
async fn start_and_wait(&self) -> Result<String, TemplateError> {
let summary = self.start().await?;
self.wait_for_ready().await?;
Ok(summary)
}
async fn is_running(&self) -> Result<bool, TemplateError> {
use crate::PsCommand;
let master = format!("{}-master", self.name);
let output = PsCommand::new()
.filter(format!("name={master}"))
.quiet()
.execute()
.await?;
Ok(!output.stdout.trim().is_empty())
}
async fn wait_for_ready(&self) -> Result<(), TemplateError> {
self.wait_for_topology_ready(std::time::Duration::from_secs(60))
.await
}
async fn stop(&self) -> Result<(), TemplateError> {
use crate::StopCommand;
for name in self.container_names() {
let _ = StopCommand::new(&name).execute().await;
}
Ok(())
}
async fn remove(&self) -> Result<(), TemplateError> {
use crate::{NetworkRmCommand, RmCommand};
for name in self.container_names() {
let _ = RmCommand::new(&name).force().volumes().execute().await;
}
if self.network.is_none() {
let network_name = format!("{}-network", self.name);
let _ = NetworkRmCommand::new(&network_name).execute().await;
}
Ok(())
}
}
pub struct SentinelConnectionInfo {
pub name: String,
pub master_name: String,
pub master_host: String,
pub master_port: u16,
pub replica_ports: Vec<u16>,
pub sentinels: Vec<SentinelInfo>,
pub password: Option<String>,
pub network: String,
pub containers: Vec<String>,
}
pub struct SentinelInfo {
pub host: String,
pub port: u16,
}
impl SentinelConnectionInfo {
pub fn master_url(&self) -> String {
if let Some(ref password) = self.password {
format!(
"redis://default:{}@{}:{}",
password, self.master_host, self.master_port
)
} else {
format!("redis://{}:{}", self.master_host, self.master_port)
}
}
pub fn sentinel_urls(&self) -> Vec<String> {
self.sentinels
.iter()
.map(|s| format!("redis://{}:{}", s.host, s.port))
.collect()
}
pub async fn stop(self) -> Result<(), crate::Error> {
use crate::{NetworkRmCommand, RmCommand, StopCommand};
for container in &self.containers {
StopCommand::new(container)
.execute()
.await
.map_err(|e| crate::Error::Custom {
message: format!("Failed to stop {container}: {e}"),
})?;
RmCommand::new(container)
.force()
.volumes()
.execute()
.await
.map_err(|e| crate::Error::Custom {
message: format!("Failed to remove {container}: {e}"),
})?;
}
if self.network.starts_with(&self.name) {
NetworkRmCommand::new(&self.network)
.execute()
.await
.map_err(|e| crate::Error::Custom {
message: format!("Failed to remove network: {e}"),
})?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sentinel_template_defaults() {
let template = RedisSentinelTemplate::new("test-sentinel");
assert_eq!(template.name, "test-sentinel");
assert_eq!(template.master_name, "mymaster");
assert_eq!(template.num_replicas, 2);
assert_eq!(template.num_sentinels, 3);
assert_eq!(template.quorum, 2);
}
#[test]
fn test_sentinel_template_builder() {
let template = RedisSentinelTemplate::new("test-sentinel")
.master_name("primary")
.num_replicas(3)
.num_sentinels(5)
.quorum(3)
.password("secret")
.with_persistence();
assert_eq!(template.master_name, "primary");
assert_eq!(template.num_replicas, 3);
assert_eq!(template.num_sentinels, 5);
assert_eq!(template.quorum, 3);
assert_eq!(template.password, Some("secret".to_string()));
assert!(template.persistence);
}
#[test]
fn test_sentinel_config_generation() {
let template = RedisSentinelTemplate::new("test")
.master_name("mymaster")
.password("secret")
.quorum(2);
let config = template.build_sentinel_config("redis-master");
assert!(config.contains("sentinel monitor mymaster redis-master 6379 2"));
assert!(config.contains("sentinel auth-pass mymaster secret"));
assert!(config.contains("sentinel down-after-milliseconds mymaster 5000"));
}
#[test]
fn test_sentinel_config_without_announce_uses_container_host() {
let template = RedisSentinelTemplate::new("test").master_name("mymaster");
let config = template.build_sentinel_config("test-master");
assert!(config.contains("sentinel monitor mymaster test-master 6379 2"));
assert!(!config.contains("sentinel announce-ip"));
}
#[test]
fn test_sentinel_config_with_announce_uses_announced_master_address() {
let template = RedisSentinelTemplate::new("test")
.master_name("mymaster")
.master_port(6390)
.quorum(2)
.announce_ip("127.0.0.1");
let config = template.build_sentinel_config("test-master");
assert!(config.contains("sentinel monitor mymaster 127.0.0.1 6390 2"));
assert!(config.contains("sentinel announce-ip 127.0.0.1"));
assert!(!config.contains("sentinel monitor mymaster test-master"));
}
#[test]
fn test_resolved_host_defaults_to_localhost() {
let template = RedisSentinelTemplate::new("test");
assert_eq!(template.resolved_host(), "localhost");
}
#[test]
fn test_resolved_host_uses_announce_ip() {
let template = RedisSentinelTemplate::new("test").announce_ip("10.0.0.5");
assert_eq!(template.resolved_host(), "10.0.0.5");
}
#[test]
fn test_replica_command_includes_announce_args() {
let template = RedisSentinelTemplate::new("test").announce_ip("127.0.0.1");
let cmd =
template.build_redis_command("test-replica-1", 6381, Some("test-master"), Some(6381));
let args = cmd.build_command_args();
let joined = args.join(" ");
assert!(joined.contains("--replica-announce-ip 127.0.0.1"));
assert!(joined.contains("--replica-announce-port 6381"));
}
#[test]
fn test_replica_command_without_announce_has_no_announce_args() {
let template = RedisSentinelTemplate::new("test");
let cmd =
template.build_redis_command("test-replica-1", 6381, Some("test-master"), Some(6381));
let joined = cmd.build_command_args().join(" ");
assert!(!joined.contains("--replica-announce-ip"));
assert!(!joined.contains("--replica-announce-port"));
}
#[test]
fn test_build_ping_args_without_password() {
let template = RedisSentinelTemplate::new("test");
assert_eq!(template.build_ping_args(), vec!["redis-cli", "ping"]);
}
#[test]
fn test_build_ping_args_with_password() {
let template = RedisSentinelTemplate::new("test").password("secret");
assert_eq!(
template.build_ping_args(),
vec!["redis-cli", "-a", "secret", "ping"]
);
}
#[test]
fn test_container_names() {
let template = RedisSentinelTemplate::new("test")
.num_replicas(2)
.num_sentinels(3);
assert_eq!(
template.container_names(),
vec![
"test-master",
"test-replica-1",
"test-replica-2",
"test-sentinel-1",
"test-sentinel-2",
"test-sentinel-3",
]
);
}
#[test]
fn test_template_trait_name() {
let template = RedisSentinelTemplate::new("test-sentinel");
assert_eq!(Template::name(&template), "test-sentinel");
}
}