Skip to main content

redis_server_wrapper/
sentinel.rs

1//! Redis Sentinel topology management built on `RedisServer`.
2
3use std::collections::HashMap;
4use std::fs;
5use std::time::Duration;
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use tokio::process::Command;
9
10use crate::cli::RedisCli;
11use crate::error::{Error, Result};
12use crate::server::{RedisServer, RedisServerHandle};
13
14/// Builder for a Redis Sentinel topology.
15///
16/// # Example
17///
18/// ```no_run
19/// use redis_server_wrapper::RedisSentinel;
20///
21/// # async fn example() {
22/// let sentinel = RedisSentinel::builder()
23///     .master_name("mymaster")
24///     .master_port(6390)
25///     .replicas(2)
26///     .sentinels(3)
27///     .start()
28///     .await
29///     .unwrap();
30///
31/// assert!(sentinel.is_healthy().await);
32/// # }
33/// ```
34pub struct RedisSentinelBuilder {
35    master_name: String,
36    master_port: u16,
37    num_replicas: u16,
38    replica_base_port: u16,
39    num_sentinels: u16,
40    sentinel_base_port: u16,
41    quorum: u16,
42    bind: String,
43    logfile: Option<String>,
44    down_after_ms: u64,
45    failover_timeout_ms: u64,
46    extra: HashMap<String, String>,
47    redis_server_bin: String,
48    redis_cli_bin: String,
49    monitored_masters: Vec<MonitoredMaster>,
50}
51
52#[derive(Clone, Debug, PartialEq, Eq)]
53struct MonitoredMaster {
54    name: String,
55    host: String,
56    port: u16,
57    expected_replicas: u16,
58}
59
60impl RedisSentinelBuilder {
61    /// Set the name of the monitored master (default: `"mymaster"`).
62    pub fn master_name(mut self, name: impl Into<String>) -> Self {
63        self.master_name = name.into();
64        self
65    }
66
67    /// Set the master's port (default: `6390`).
68    pub fn master_port(mut self, port: u16) -> Self {
69        self.master_port = port;
70        self
71    }
72
73    /// Set the number of replicas to start (default: `2`).
74    pub fn replicas(mut self, n: u16) -> Self {
75        self.num_replicas = n;
76        self
77    }
78
79    /// Set the base port for replica nodes (default: `6391`).
80    ///
81    /// Replicas are assigned consecutive ports starting at this value.
82    pub fn replica_base_port(mut self, port: u16) -> Self {
83        self.replica_base_port = port;
84        self
85    }
86
87    /// Set the number of sentinel processes to start (default: `3`).
88    pub fn sentinels(mut self, n: u16) -> Self {
89        self.num_sentinels = n;
90        self
91    }
92
93    /// Set the base port for sentinel processes (default: `26389`).
94    ///
95    /// Sentinels are assigned consecutive ports starting at this value.
96    pub fn sentinel_base_port(mut self, port: u16) -> Self {
97        self.sentinel_base_port = port;
98        self
99    }
100
101    /// Set the quorum count — how many sentinels must agree before a failover is triggered (default: `2`).
102    pub fn quorum(mut self, q: u16) -> Self {
103        self.quorum = q;
104        self
105    }
106
107    /// Set the bind address for all processes in the topology (default: `"127.0.0.1"`).
108    pub fn bind(mut self, bind: impl Into<String>) -> Self {
109        self.bind = bind.into();
110        self
111    }
112
113    /// Set the log file path for all processes in the topology.
114    pub fn logfile(mut self, path: impl Into<String>) -> Self {
115        self.logfile = Some(path.into());
116        self
117    }
118
119    /// Set the `down-after-milliseconds` threshold for all monitored masters (default: `5000`).
120    ///
121    /// A master is considered down after it fails to respond within this many milliseconds.
122    pub fn down_after_ms(mut self, ms: u64) -> Self {
123        self.down_after_ms = ms;
124        self
125    }
126
127    /// Set the `failover-timeout` for all monitored masters in milliseconds (default: `10000`).
128    pub fn failover_timeout_ms(mut self, ms: u64) -> Self {
129        self.failover_timeout_ms = ms;
130        self
131    }
132
133    /// Set an arbitrary config directive for all processes in the topology.
134    pub fn extra(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
135        self.extra.insert(key.into(), value.into());
136        self
137    }
138
139    /// Set a custom `redis-server` binary path.
140    pub fn redis_server_bin(mut self, bin: impl Into<String>) -> Self {
141        self.redis_server_bin = bin.into();
142        self
143    }
144
145    /// Set a custom `redis-cli` binary path.
146    pub fn redis_cli_bin(mut self, bin: impl Into<String>) -> Self {
147        self.redis_cli_bin = bin.into();
148        self
149    }
150
151    /// Add an additional master for the sentinels to monitor.
152    ///
153    /// The builder-managed topology still creates the primary master configured by
154    /// [`Self::master_name`] and [`Self::master_port`]. Additional monitored
155    /// masters are expected to already be running.
156    pub fn monitor(mut self, name: impl Into<String>, host: impl Into<String>, port: u16) -> Self {
157        self.monitored_masters.push(MonitoredMaster {
158            name: name.into(),
159            host: host.into(),
160            port,
161            expected_replicas: 0,
162        });
163        self
164    }
165
166    /// Add an additional master and the minimum number of replicas expected for it.
167    pub fn monitor_with_replicas(
168        mut self,
169        name: impl Into<String>,
170        host: impl Into<String>,
171        port: u16,
172        expected_replicas: u16,
173    ) -> Self {
174        self.monitored_masters.push(MonitoredMaster {
175            name: name.into(),
176            host: host.into(),
177            port,
178            expected_replicas,
179        });
180        self
181    }
182
183    fn replica_ports(&self) -> impl Iterator<Item = u16> {
184        let base = self.replica_base_port;
185        let n = self.num_replicas;
186        (0..n).map(move |i| base + i)
187    }
188
189    fn sentinel_ports(&self) -> impl Iterator<Item = u16> {
190        let base = self.sentinel_base_port;
191        let n = self.num_sentinels;
192        (0..n).map(move |i| base + i)
193    }
194
195    /// Start the full topology: master, replicas, sentinels.
196    pub async fn start(self) -> Result<RedisSentinelHandle> {
197        let mut monitored_masters = Vec::with_capacity(1 + self.monitored_masters.len());
198        monitored_masters.push(MonitoredMaster {
199            name: self.master_name.clone(),
200            host: self.bind.clone(),
201            port: self.master_port,
202            expected_replicas: self.num_replicas,
203        });
204        monitored_masters.extend(self.monitored_masters.iter().cloned());
205
206        // Kill leftover processes.
207        let cli_for_shutdown = |port: u16| {
208            RedisCli::new()
209                .bin(&self.redis_cli_bin)
210                .host(&self.bind)
211                .port(port)
212                .shutdown();
213        };
214        cli_for_shutdown(self.master_port);
215        for port in self.replica_ports() {
216            cli_for_shutdown(port);
217        }
218        for port in self.sentinel_ports() {
219            cli_for_shutdown(port);
220        }
221        tokio::time::sleep(Duration::from_millis(500)).await;
222
223        let unique = SystemTime::now()
224            .duration_since(UNIX_EPOCH)
225            .map(|duration| duration.as_nanos())
226            .unwrap_or(0);
227        let base_dir = std::env::temp_dir().join(format!(
228            "redis-sentinel-wrapper-{}-{}",
229            std::process::id(),
230            unique
231        ));
232        fs::create_dir_all(&base_dir)?;
233
234        // 1. Start master.
235        let mut master = RedisServer::new()
236            .port(self.master_port)
237            .bind(&self.bind)
238            .dir(base_dir.join("master"))
239            .appendonly(true)
240            .redis_server_bin(&self.redis_server_bin)
241            .redis_cli_bin(&self.redis_cli_bin);
242        if let Some(ref logfile) = self.logfile {
243            master = master.logfile(logfile.clone());
244        }
245        for (key, value) in &self.extra {
246            master = master.extra(key.clone(), value.clone());
247        }
248        let master = master.start().await?;
249
250        // 2. Start replicas.
251        let mut replicas = Vec::new();
252        for port in self.replica_ports() {
253            let mut replica = RedisServer::new()
254                .port(port)
255                .bind(&self.bind)
256                .dir(base_dir.join(format!("replica-{port}")))
257                .appendonly(true)
258                .replicaof(self.bind.clone(), self.master_port)
259                .redis_server_bin(&self.redis_server_bin)
260                .redis_cli_bin(&self.redis_cli_bin);
261            if let Some(ref logfile) = self.logfile {
262                replica = replica.logfile(logfile.clone());
263            }
264            for (key, value) in &self.extra {
265                replica = replica.extra(key.clone(), value.clone());
266            }
267            let replica = replica.start().await?;
268            replicas.push(replica);
269        }
270
271        // Let replication link up.
272        tokio::time::sleep(Duration::from_secs(1)).await;
273
274        // 3. Start sentinels.
275        let mut sentinel_handles = Vec::new();
276        for port in self.sentinel_ports() {
277            let dir = base_dir.join(format!("sentinel-{port}"));
278            fs::create_dir_all(&dir)?;
279            let conf_path = dir.join("sentinel.conf");
280            let logfile = self
281                .logfile
282                .as_deref()
283                .map(str::to_owned)
284                .unwrap_or_else(|| format!("{}/sentinel.log", dir.display()));
285            let mut conf = format!(
286                "port {port}\n\
287                 bind {bind}\n\
288                 daemonize yes\n\
289                 pidfile {dir}/sentinel.pid\n\
290                 logfile {logfile}\n\
291                 dir {dir}\n",
292                port = port,
293                bind = self.bind,
294                dir = dir.display(),
295                logfile = logfile,
296            );
297            for master in &monitored_masters {
298                conf.push_str(&format!(
299                    "sentinel monitor {name} {host} {master_port} {quorum}\n\
300                     sentinel down-after-milliseconds {name} {down_after}\n\
301                     sentinel failover-timeout {name} {failover_timeout}\n\
302                     sentinel parallel-syncs {name} 1\n",
303                    name = master.name,
304                    host = master.host,
305                    master_port = master.port,
306                    quorum = self.quorum,
307                    down_after = self.down_after_ms,
308                    failover_timeout = self.failover_timeout_ms,
309                ));
310            }
311            for (key, value) in &self.extra {
312                conf.push_str(&format!("{key} {value}\n"));
313            }
314            fs::write(&conf_path, conf)?;
315
316            let status = Command::new(&self.redis_server_bin)
317                .arg(&conf_path)
318                .arg("--sentinel")
319                .stdout(std::process::Stdio::null())
320                .stderr(std::process::Stdio::null())
321                .status()
322                .await?;
323
324            if !status.success() {
325                return Err(Error::SentinelStart { port });
326            }
327
328            let cli = RedisCli::new()
329                .bin(&self.redis_cli_bin)
330                .host(&self.bind)
331                .port(port);
332            cli.wait_for_ready(Duration::from_secs(10)).await?;
333
334            let pid_path = dir.join("sentinel.pid");
335            let pid: u32 = fs::read_to_string(&pid_path)?
336                .trim()
337                .parse()
338                .map_err(|_| Error::SentinelStart { port })?;
339
340            sentinel_handles.push((port, pid, cli));
341        }
342
343        // Wait for sentinels to discover each other.
344        tokio::time::sleep(Duration::from_secs(2)).await;
345
346        Ok(RedisSentinelHandle {
347            master,
348            replicas,
349            sentinel_ports: sentinel_handles.iter().map(|(p, _, _)| *p).collect(),
350            sentinel_pids: sentinel_handles.iter().map(|(_, pid, _)| *pid).collect(),
351            master_name: self.master_name,
352            bind: self.bind,
353            redis_cli_bin: self.redis_cli_bin,
354            num_sentinels: self.num_sentinels,
355            monitored_masters,
356        })
357    }
358}
359
360/// A running Redis Sentinel topology. Stops everything on Drop.
361pub struct RedisSentinelHandle {
362    master: RedisServerHandle,
363    #[allow(dead_code)] // Kept alive for Drop cleanup
364    replicas: Vec<RedisServerHandle>,
365    sentinel_ports: Vec<u16>,
366    sentinel_pids: Vec<u32>,
367    master_name: String,
368    bind: String,
369    redis_cli_bin: String,
370    num_sentinels: u16,
371    monitored_masters: Vec<MonitoredMaster>,
372}
373
374/// Entry point for building a Redis Sentinel topology.
375///
376/// Call [`RedisSentinel::builder`] to obtain a [`RedisSentinelBuilder`], then
377/// configure it and call [`RedisSentinelBuilder::start`] to launch the topology.
378pub struct RedisSentinel;
379
380impl RedisSentinel {
381    /// Create a new sentinel builder with defaults.
382    pub fn builder() -> RedisSentinelBuilder {
383        RedisSentinelBuilder {
384            master_name: "mymaster".into(),
385            master_port: 6390,
386            num_replicas: 2,
387            replica_base_port: 6391,
388            num_sentinels: 3,
389            sentinel_base_port: 26389,
390            quorum: 2,
391            bind: "127.0.0.1".into(),
392            logfile: None,
393            down_after_ms: 5000,
394            failover_timeout_ms: 10000,
395            extra: HashMap::new(),
396            redis_server_bin: "redis-server".into(),
397            redis_cli_bin: "redis-cli".into(),
398            monitored_masters: Vec::new(),
399        }
400    }
401}
402
403impl RedisSentinelHandle {
404    /// The master's address.
405    pub fn master_addr(&self) -> String {
406        self.master.addr()
407    }
408
409    /// All monitored master names.
410    pub fn monitored_master_names(&self) -> Vec<&str> {
411        self.monitored_masters
412            .iter()
413            .map(|master| master.name.as_str())
414            .collect()
415    }
416
417    /// All monitored master addresses.
418    pub fn monitored_master_addrs(&self) -> Vec<String> {
419        self.monitored_masters
420            .iter()
421            .map(|master| format!("{}:{}", master.host, master.port))
422            .collect()
423    }
424
425    /// The PIDs of all processes in the topology (master, replicas, sentinels).
426    pub fn pids(&self) -> Vec<u32> {
427        let mut pids = Vec::with_capacity(1 + self.replicas.len() + self.sentinel_pids.len());
428        pids.push(self.master.pid());
429        for replica in &self.replicas {
430            pids.push(replica.pid());
431        }
432        pids.extend_from_slice(&self.sentinel_pids);
433        pids
434    }
435
436    /// All sentinel addresses.
437    pub fn sentinel_addrs(&self) -> Vec<String> {
438        self.sentinel_ports
439            .iter()
440            .map(|p| format!("{}:{}", self.bind, p))
441            .collect()
442    }
443
444    /// The monitored master name.
445    pub fn master_name(&self) -> &str {
446        &self.master_name
447    }
448
449    /// Query a sentinel for the primary monitored master's status.
450    ///
451    /// Iterates over the sentinel processes until one responds, then runs
452    /// `SENTINEL MASTER <name>` and returns the result as a flat key/value map.
453    ///
454    /// Common keys in the returned map include `"ip"`, `"port"`, `"flags"`,
455    /// `"num-slaves"`, and `"num-other-sentinels"`.
456    ///
457    /// Returns [`Error::NoReachableSentinel`] if no sentinel responds.
458    pub async fn poke(&self) -> Result<HashMap<String, String>> {
459        self.poke_master(&self.master_name).await
460    }
461
462    /// Query a sentinel for a specific monitored master's status.
463    ///
464    /// Like [`poke`](Self::poke) but targets `master_name` instead of the
465    /// primary master configured for this topology.
466    pub async fn poke_master(&self, master_name: &str) -> Result<HashMap<String, String>> {
467        for port in &self.sentinel_ports {
468            let cli = RedisCli::new()
469                .bin(&self.redis_cli_bin)
470                .host(&self.bind)
471                .port(*port);
472            if let Ok(raw) = cli.run(&["SENTINEL", "MASTER", master_name]).await {
473                return Ok(parse_flat_kv(&raw));
474            }
475        }
476        Err(Error::NoReachableSentinel)
477    }
478
479    /// Check if the topology is healthy.
480    pub async fn is_healthy(&self) -> bool {
481        for master in &self.monitored_masters {
482            let Ok(info) = self.poke_master(&master.name).await else {
483                return false;
484            };
485            let flags = info.get("flags").map(|s| s.as_str()).unwrap_or("");
486            let num_slaves: u64 = info
487                .get("num-slaves")
488                .and_then(|v| v.parse().ok())
489                .unwrap_or(0);
490            let num_sentinels: u64 = info
491                .get("num-other-sentinels")
492                .and_then(|v| v.parse().ok())
493                .unwrap_or(0)
494                + 1;
495            if flags != "master"
496                || num_slaves < master.expected_replicas as u64
497                || num_sentinels < self.num_sentinels as u64
498            {
499                return false;
500            }
501        }
502        true
503    }
504
505    /// Wait until the topology is healthy or timeout.
506    pub async fn wait_for_healthy(&self, timeout: Duration) -> Result<()> {
507        let start = std::time::Instant::now();
508        loop {
509            if self.is_healthy().await {
510                return Ok(());
511            }
512            if start.elapsed() > timeout {
513                return Err(Error::Timeout {
514                    message: "sentinel topology did not become healthy in time".into(),
515                });
516            }
517            tokio::time::sleep(Duration::from_millis(500)).await;
518        }
519    }
520
521    /// Stop everything.
522    pub fn stop(&self) {
523        // Sentinels first.
524        for port in &self.sentinel_ports {
525            RedisCli::new()
526                .bin(&self.redis_cli_bin)
527                .host(&self.bind)
528                .port(*port)
529                .shutdown();
530        }
531        // Replicas and master stopped by their handles' Drop.
532    }
533}
534
535impl Drop for RedisSentinelHandle {
536    fn drop(&mut self) {
537        self.stop();
538    }
539}
540
541/// Parse alternating key/value lines from sentinel output.
542fn parse_flat_kv(raw: &str) -> HashMap<String, String> {
543    let lines: Vec<&str> = raw.lines().map(|l| l.trim()).collect();
544    let mut map = HashMap::new();
545    let mut i = 0;
546    while i + 1 < lines.len() {
547        map.insert(lines[i].to_string(), lines[i + 1].to_string());
548        i += 2;
549    }
550    map
551}
552
553#[cfg(test)]
554mod tests {
555    use super::*;
556
557    #[test]
558    fn builder_defaults() {
559        let b = RedisSentinel::builder();
560        assert_eq!(b.master_port, 6390);
561        assert_eq!(b.num_replicas, 2);
562        assert_eq!(b.num_sentinels, 3);
563        assert_eq!(b.quorum, 2);
564        assert!(b.logfile.is_none());
565        assert!(b.extra.is_empty());
566        assert!(b.monitored_masters.is_empty());
567    }
568
569    #[test]
570    fn builder_chain() {
571        let b = RedisSentinel::builder()
572            .master_name("custom")
573            .master_port(6500)
574            .replicas(1)
575            .sentinels(5)
576            .quorum(3)
577            .logfile("/tmp/sentinel.log")
578            .extra("maxmemory", "10mb")
579            .monitor("backup", "127.0.0.1", 6501);
580        assert_eq!(b.master_name, "custom");
581        assert_eq!(b.master_port, 6500);
582        assert_eq!(b.num_replicas, 1);
583        assert_eq!(b.num_sentinels, 5);
584        assert_eq!(b.quorum, 3);
585        assert_eq!(b.logfile.as_deref(), Some("/tmp/sentinel.log"));
586        assert_eq!(b.extra.get("maxmemory").map(String::as_str), Some("10mb"));
587        assert_eq!(b.monitored_masters.len(), 1);
588        assert_eq!(
589            b.monitored_masters[0],
590            MonitoredMaster {
591                name: "backup".into(),
592                host: "127.0.0.1".into(),
593                port: 6501,
594                expected_replicas: 0,
595            }
596        );
597    }
598
599    #[test]
600    fn parse_sentinel_output() {
601        let raw = "name\nmymaster\nip\n127.0.0.1\nport\n6380\n";
602        let map = parse_flat_kv(raw);
603        assert_eq!(map.get("name").unwrap(), "mymaster");
604        assert_eq!(map.get("ip").unwrap(), "127.0.0.1");
605        assert_eq!(map.get("port").unwrap(), "6380");
606    }
607}