Skip to main content

redis_server_wrapper/
sentinel.rs

1//! Redis Sentinel topology management built on `RedisServer`.
2
3use std::collections::HashMap;
4use std::fs;
5use std::time::Duration;
6
7use tokio::process::Command;
8
9use crate::cli::RedisCli;
10use crate::error::{Error, Result};
11use crate::server::{RedisServer, RedisServerHandle};
12
13/// Builder for a Redis Sentinel topology.
14///
15/// # Example
16///
17/// ```no_run
18/// use redis_server_wrapper::RedisSentinel;
19///
20/// # async fn example() {
21/// let sentinel = RedisSentinel::builder()
22///     .master_name("mymaster")
23///     .master_port(6390)
24///     .replicas(2)
25///     .sentinels(3)
26///     .start()
27///     .await
28///     .unwrap();
29///
30/// assert!(sentinel.is_healthy().await);
31/// # }
32/// ```
33pub struct RedisSentinelBuilder {
34    master_name: String,
35    master_port: u16,
36    num_replicas: u16,
37    replica_base_port: u16,
38    num_sentinels: u16,
39    sentinel_base_port: u16,
40    quorum: u16,
41    bind: String,
42    down_after_ms: u64,
43    failover_timeout_ms: u64,
44    redis_server_bin: String,
45    redis_cli_bin: String,
46}
47
48impl RedisSentinelBuilder {
49    pub fn master_name(mut self, name: impl Into<String>) -> Self {
50        self.master_name = name.into();
51        self
52    }
53
54    pub fn master_port(mut self, port: u16) -> Self {
55        self.master_port = port;
56        self
57    }
58
59    pub fn replicas(mut self, n: u16) -> Self {
60        self.num_replicas = n;
61        self
62    }
63
64    pub fn replica_base_port(mut self, port: u16) -> Self {
65        self.replica_base_port = port;
66        self
67    }
68
69    pub fn sentinels(mut self, n: u16) -> Self {
70        self.num_sentinels = n;
71        self
72    }
73
74    pub fn sentinel_base_port(mut self, port: u16) -> Self {
75        self.sentinel_base_port = port;
76        self
77    }
78
79    pub fn quorum(mut self, q: u16) -> Self {
80        self.quorum = q;
81        self
82    }
83
84    pub fn bind(mut self, bind: impl Into<String>) -> Self {
85        self.bind = bind.into();
86        self
87    }
88
89    pub fn down_after_ms(mut self, ms: u64) -> Self {
90        self.down_after_ms = ms;
91        self
92    }
93
94    pub fn failover_timeout_ms(mut self, ms: u64) -> Self {
95        self.failover_timeout_ms = ms;
96        self
97    }
98
99    pub fn redis_server_bin(mut self, bin: impl Into<String>) -> Self {
100        self.redis_server_bin = bin.into();
101        self
102    }
103
104    pub fn redis_cli_bin(mut self, bin: impl Into<String>) -> Self {
105        self.redis_cli_bin = bin.into();
106        self
107    }
108
109    fn replica_ports(&self) -> impl Iterator<Item = u16> {
110        let base = self.replica_base_port;
111        let n = self.num_replicas;
112        (0..n).map(move |i| base + i)
113    }
114
115    fn sentinel_ports(&self) -> impl Iterator<Item = u16> {
116        let base = self.sentinel_base_port;
117        let n = self.num_sentinels;
118        (0..n).map(move |i| base + i)
119    }
120
121    /// Start the full topology: master, replicas, sentinels.
122    pub async fn start(self) -> Result<RedisSentinelHandle> {
123        // Kill leftover processes.
124        let cli_for_shutdown = |port: u16| {
125            RedisCli::new()
126                .bin(&self.redis_cli_bin)
127                .host(&self.bind)
128                .port(port)
129                .shutdown();
130        };
131        cli_for_shutdown(self.master_port);
132        for port in self.replica_ports() {
133            cli_for_shutdown(port);
134        }
135        for port in self.sentinel_ports() {
136            cli_for_shutdown(port);
137        }
138        tokio::time::sleep(Duration::from_millis(500)).await;
139
140        let base_dir = std::env::temp_dir().join("redis-sentinel-wrapper");
141        if base_dir.exists() {
142            let _ = fs::remove_dir_all(&base_dir);
143        }
144
145        // 1. Start master.
146        let master = RedisServer::new()
147            .port(self.master_port)
148            .bind(&self.bind)
149            .dir(base_dir.join("master"))
150            .appendonly(true)
151            .redis_server_bin(&self.redis_server_bin)
152            .redis_cli_bin(&self.redis_cli_bin)
153            .start()
154            .await?;
155
156        // 2. Start replicas.
157        let mut replicas = Vec::new();
158        for port in self.replica_ports() {
159            let replica = RedisServer::new()
160                .port(port)
161                .bind(&self.bind)
162                .dir(base_dir.join(format!("replica-{port}")))
163                .appendonly(true)
164                .extra("replicaof", format!("{} {}", self.bind, self.master_port))
165                .redis_server_bin(&self.redis_server_bin)
166                .redis_cli_bin(&self.redis_cli_bin)
167                .start()
168                .await?;
169            replicas.push(replica);
170        }
171
172        // Let replication link up.
173        tokio::time::sleep(Duration::from_secs(1)).await;
174
175        // 3. Start sentinels.
176        let mut sentinel_handles = Vec::new();
177        for port in self.sentinel_ports() {
178            let dir = base_dir.join(format!("sentinel-{port}"));
179            fs::create_dir_all(&dir)?;
180            let conf_path = dir.join("sentinel.conf");
181            let conf = format!(
182                "port {port}\n\
183                 bind {bind}\n\
184                 daemonize yes\n\
185                 pidfile {dir}/sentinel.pid\n\
186                 logfile {dir}/sentinel.log\n\
187                 dir {dir}\n\
188                 sentinel monitor {name} {master_host} {master_port} {quorum}\n\
189                 sentinel down-after-milliseconds {name} {down_after}\n\
190                 sentinel failover-timeout {name} {failover_timeout}\n\
191                 sentinel parallel-syncs {name} 1\n",
192                port = port,
193                bind = self.bind,
194                dir = dir.display(),
195                name = self.master_name,
196                master_host = self.bind,
197                master_port = self.master_port,
198                quorum = self.quorum,
199                down_after = self.down_after_ms,
200                failover_timeout = self.failover_timeout_ms,
201            );
202            fs::write(&conf_path, conf)?;
203
204            let status = Command::new(&self.redis_server_bin)
205                .arg(&conf_path)
206                .arg("--sentinel")
207                .stdout(std::process::Stdio::null())
208                .stderr(std::process::Stdio::null())
209                .status()
210                .await?;
211
212            if !status.success() {
213                return Err(Error::SentinelStart { port });
214            }
215
216            let cli = RedisCli::new()
217                .bin(&self.redis_cli_bin)
218                .host(&self.bind)
219                .port(port);
220            cli.wait_for_ready(Duration::from_secs(10)).await?;
221            sentinel_handles.push((port, cli));
222        }
223
224        // Wait for sentinels to discover each other.
225        tokio::time::sleep(Duration::from_secs(2)).await;
226
227        Ok(RedisSentinelHandle {
228            master,
229            replicas,
230            sentinel_ports: sentinel_handles.iter().map(|(p, _)| *p).collect(),
231            master_name: self.master_name,
232            bind: self.bind,
233            redis_cli_bin: self.redis_cli_bin,
234            num_sentinels: self.num_sentinels,
235            num_replicas: self.num_replicas,
236        })
237    }
238}
239
240/// A running Redis Sentinel topology. Stops everything on Drop.
241pub struct RedisSentinelHandle {
242    master: RedisServerHandle,
243    #[allow(dead_code)] // Kept alive for Drop cleanup
244    replicas: Vec<RedisServerHandle>,
245    sentinel_ports: Vec<u16>,
246    master_name: String,
247    bind: String,
248    redis_cli_bin: String,
249    num_sentinels: u16,
250    num_replicas: u16,
251}
252
253/// Convenience constructor.
254pub struct RedisSentinel;
255
256impl RedisSentinel {
257    /// Create a new sentinel builder with defaults.
258    pub fn builder() -> RedisSentinelBuilder {
259        RedisSentinelBuilder {
260            master_name: "mymaster".into(),
261            master_port: 6390,
262            num_replicas: 2,
263            replica_base_port: 6391,
264            num_sentinels: 3,
265            sentinel_base_port: 26389,
266            quorum: 2,
267            bind: "127.0.0.1".into(),
268            down_after_ms: 5000,
269            failover_timeout_ms: 10000,
270            redis_server_bin: "redis-server".into(),
271            redis_cli_bin: "redis-cli".into(),
272        }
273    }
274}
275
276impl RedisSentinelHandle {
277    /// The master's address.
278    pub fn master_addr(&self) -> String {
279        self.master.addr()
280    }
281
282    /// All sentinel addresses.
283    pub fn sentinel_addrs(&self) -> Vec<String> {
284        self.sentinel_ports
285            .iter()
286            .map(|p| format!("{}:{}", self.bind, p))
287            .collect()
288    }
289
290    /// The monitored master name.
291    pub fn master_name(&self) -> &str {
292        &self.master_name
293    }
294
295    /// Query a sentinel for the current master status.
296    pub async fn poke(&self) -> Result<HashMap<String, String>> {
297        for port in &self.sentinel_ports {
298            let cli = RedisCli::new()
299                .bin(&self.redis_cli_bin)
300                .host(&self.bind)
301                .port(*port);
302            if let Ok(raw) = cli.run(&["SENTINEL", "MASTER", &self.master_name]).await {
303                return Ok(parse_flat_kv(&raw));
304            }
305        }
306        Err(Error::NoReachableSentinel)
307    }
308
309    /// Check if the topology is healthy.
310    pub async fn is_healthy(&self) -> bool {
311        if let Ok(info) = self.poke().await {
312            let flags = info.get("flags").map(|s| s.as_str()).unwrap_or("");
313            let num_slaves: u64 = info
314                .get("num-slaves")
315                .and_then(|v| v.parse().ok())
316                .unwrap_or(0);
317            let num_sentinels: u64 = info
318                .get("num-other-sentinels")
319                .and_then(|v| v.parse().ok())
320                .unwrap_or(0)
321                + 1;
322            flags == "master"
323                && num_slaves >= self.num_replicas as u64
324                && num_sentinels >= self.num_sentinels as u64
325        } else {
326            false
327        }
328    }
329
330    /// Wait until the topology is healthy or timeout.
331    pub async fn wait_for_healthy(&self, timeout: Duration) -> Result<()> {
332        let start = std::time::Instant::now();
333        loop {
334            if self.is_healthy().await {
335                return Ok(());
336            }
337            if start.elapsed() > timeout {
338                return Err(Error::Timeout {
339                    message: "sentinel topology did not become healthy in time".into(),
340                });
341            }
342            tokio::time::sleep(Duration::from_millis(500)).await;
343        }
344    }
345
346    /// Stop everything.
347    pub fn stop(&self) {
348        // Sentinels first.
349        for port in &self.sentinel_ports {
350            RedisCli::new()
351                .bin(&self.redis_cli_bin)
352                .host(&self.bind)
353                .port(*port)
354                .shutdown();
355        }
356        // Replicas and master stopped by their handles' Drop.
357    }
358}
359
360impl Drop for RedisSentinelHandle {
361    fn drop(&mut self) {
362        self.stop();
363    }
364}
365
366/// Parse alternating key/value lines from sentinel output.
367fn parse_flat_kv(raw: &str) -> HashMap<String, String> {
368    let lines: Vec<&str> = raw.lines().map(|l| l.trim()).collect();
369    let mut map = HashMap::new();
370    let mut i = 0;
371    while i + 1 < lines.len() {
372        map.insert(lines[i].to_string(), lines[i + 1].to_string());
373        i += 2;
374    }
375    map
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381
382    #[test]
383    fn builder_defaults() {
384        let b = RedisSentinel::builder();
385        assert_eq!(b.master_port, 6390);
386        assert_eq!(b.num_replicas, 2);
387        assert_eq!(b.num_sentinels, 3);
388        assert_eq!(b.quorum, 2);
389    }
390
391    #[test]
392    fn builder_chain() {
393        let b = RedisSentinel::builder()
394            .master_name("custom")
395            .master_port(6500)
396            .replicas(1)
397            .sentinels(5)
398            .quorum(3);
399        assert_eq!(b.master_name, "custom");
400        assert_eq!(b.master_port, 6500);
401        assert_eq!(b.num_replicas, 1);
402        assert_eq!(b.num_sentinels, 5);
403        assert_eq!(b.quorum, 3);
404    }
405
406    #[test]
407    fn parse_sentinel_output() {
408        let raw = "name\nmymaster\nip\n127.0.0.1\nport\n6380\n";
409        let map = parse_flat_kv(raw);
410        assert_eq!(map.get("name").unwrap(), "mymaster");
411        assert_eq!(map.get("ip").unwrap(), "127.0.0.1");
412        assert_eq!(map.get("port").unwrap(), "6380");
413    }
414}