Skip to main content

redis_server_wrapper/
sentinel.rs

1//! Redis Sentinel topology management built on `RedisServer`.
2
3use std::collections::HashMap;
4use std::fs;
5use std::time::Duration;
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use tokio::process::Command;
9
10use crate::cli::RedisCli;
11use crate::error::{Error, Result};
12use crate::server::{RedisServer, RedisServerHandle, SavePolicy};
13
14/// Builder for a Redis Sentinel topology.
15///
16/// # Example
17///
18/// ```no_run
19/// use redis_server_wrapper::RedisSentinel;
20///
21/// # async fn example() {
22/// let sentinel = RedisSentinel::builder()
23///     .master_name("mymaster")
24///     .master_port(6390)
25///     .replicas(2)
26///     .sentinels(3)
27///     .start()
28///     .await
29///     .unwrap();
30///
31/// assert!(sentinel.is_healthy().await);
32/// # }
33/// ```
34pub struct RedisSentinelBuilder {
35    master_name: String,
36    master_port: u16,
37    num_replicas: u16,
38    replica_base_port: u16,
39    num_sentinels: u16,
40    sentinel_base_port: u16,
41    quorum: u16,
42    bind: String,
43    logfile: Option<String>,
44    save: Option<SavePolicy>,
45    appendonly: Option<bool>,
46    down_after_ms: u64,
47    failover_timeout_ms: u64,
48    extra: HashMap<String, String>,
49    redis_server_bin: String,
50    redis_cli_bin: String,
51    monitored_masters: Vec<MonitoredMaster>,
52}
53
54#[derive(Clone, Debug, PartialEq, Eq)]
55struct MonitoredMaster {
56    name: String,
57    host: String,
58    port: u16,
59    expected_replicas: u16,
60}
61
62impl RedisSentinelBuilder {
63    /// Set the name of the monitored master (default: `"mymaster"`).
64    pub fn master_name(mut self, name: impl Into<String>) -> Self {
65        self.master_name = name.into();
66        self
67    }
68
69    /// Set the master's port (default: `6390`).
70    pub fn master_port(mut self, port: u16) -> Self {
71        self.master_port = port;
72        self
73    }
74
75    /// Set the number of replicas to start (default: `2`).
76    pub fn replicas(mut self, n: u16) -> Self {
77        self.num_replicas = n;
78        self
79    }
80
81    /// Set the base port for replica nodes (default: `6391`).
82    ///
83    /// Replicas are assigned consecutive ports starting at this value.
84    pub fn replica_base_port(mut self, port: u16) -> Self {
85        self.replica_base_port = port;
86        self
87    }
88
89    /// Set the number of sentinel processes to start (default: `3`).
90    pub fn sentinels(mut self, n: u16) -> Self {
91        self.num_sentinels = n;
92        self
93    }
94
95    /// Set the base port for sentinel processes (default: `26389`).
96    ///
97    /// Sentinels are assigned consecutive ports starting at this value.
98    pub fn sentinel_base_port(mut self, port: u16) -> Self {
99        self.sentinel_base_port = port;
100        self
101    }
102
103    /// Set the quorum count — how many sentinels must agree before a failover is triggered (default: `2`).
104    pub fn quorum(mut self, q: u16) -> Self {
105        self.quorum = q;
106        self
107    }
108
109    /// Set the bind address for all processes in the topology (default: `"127.0.0.1"`).
110    pub fn bind(mut self, bind: impl Into<String>) -> Self {
111        self.bind = bind.into();
112        self
113    }
114
115    /// Set the log file path for all processes in the topology.
116    pub fn logfile(mut self, path: impl Into<String>) -> Self {
117        self.logfile = Some(path.into());
118        self
119    }
120
121    /// Set the `down-after-milliseconds` threshold for all monitored masters (default: `5000`).
122    ///
123    /// A master is considered down after it fails to respond within this many milliseconds.
124    pub fn down_after_ms(mut self, ms: u64) -> Self {
125        self.down_after_ms = ms;
126        self
127    }
128
129    /// Set the `failover-timeout` for all monitored masters in milliseconds (default: `10000`).
130    pub fn failover_timeout_ms(mut self, ms: u64) -> Self {
131        self.failover_timeout_ms = ms;
132        self
133    }
134
135    /// Set the RDB save policy for all data-bearing processes in the topology.
136    ///
137    /// `true` omits the `save` directive (Redis defaults apply).
138    /// `false` emits `save ""` to disable RDB entirely.
139    pub fn save(mut self, save: bool) -> Self {
140        self.save = Some(if save {
141            SavePolicy::Default
142        } else {
143            SavePolicy::Disabled
144        });
145        self
146    }
147
148    /// Set a custom RDB save schedule for all data-bearing processes in the topology.
149    pub fn save_schedule(mut self, schedule: Vec<(u64, u64)>) -> Self {
150        self.save = Some(SavePolicy::Custom(schedule));
151        self
152    }
153
154    /// Enable or disable AOF persistence for all data-bearing processes in the topology.
155    ///
156    /// When not set, the builder defaults to `appendonly yes` for the master
157    /// and replicas.
158    pub fn appendonly(mut self, appendonly: bool) -> Self {
159        self.appendonly = Some(appendonly);
160        self
161    }
162
163    /// Set an arbitrary config directive for all processes in the topology.
164    pub fn extra(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
165        self.extra.insert(key.into(), value.into());
166        self
167    }
168
169    /// Set a custom `redis-server` binary path.
170    pub fn redis_server_bin(mut self, bin: impl Into<String>) -> Self {
171        self.redis_server_bin = bin.into();
172        self
173    }
174
175    /// Set a custom `redis-cli` binary path.
176    pub fn redis_cli_bin(mut self, bin: impl Into<String>) -> Self {
177        self.redis_cli_bin = bin.into();
178        self
179    }
180
181    /// Add an additional master for the sentinels to monitor.
182    ///
183    /// The builder-managed topology still creates the primary master configured by
184    /// [`Self::master_name`] and [`Self::master_port`]. Additional monitored
185    /// masters are expected to already be running.
186    pub fn monitor(mut self, name: impl Into<String>, host: impl Into<String>, port: u16) -> Self {
187        self.monitored_masters.push(MonitoredMaster {
188            name: name.into(),
189            host: host.into(),
190            port,
191            expected_replicas: 0,
192        });
193        self
194    }
195
196    /// Add an additional master and the minimum number of replicas expected for it.
197    pub fn monitor_with_replicas(
198        mut self,
199        name: impl Into<String>,
200        host: impl Into<String>,
201        port: u16,
202        expected_replicas: u16,
203    ) -> Self {
204        self.monitored_masters.push(MonitoredMaster {
205            name: name.into(),
206            host: host.into(),
207            port,
208            expected_replicas,
209        });
210        self
211    }
212
213    fn replica_ports(&self) -> impl Iterator<Item = u16> {
214        let base = self.replica_base_port;
215        let n = self.num_replicas;
216        (0..n).map(move |i| base + i)
217    }
218
219    fn sentinel_ports(&self) -> impl Iterator<Item = u16> {
220        let base = self.sentinel_base_port;
221        let n = self.num_sentinels;
222        (0..n).map(move |i| base + i)
223    }
224
225    /// Start the full topology: master, replicas, sentinels.
226    pub async fn start(self) -> Result<RedisSentinelHandle> {
227        let mut monitored_masters = Vec::with_capacity(1 + self.monitored_masters.len());
228        monitored_masters.push(MonitoredMaster {
229            name: self.master_name.clone(),
230            host: self.bind.clone(),
231            port: self.master_port,
232            expected_replicas: self.num_replicas,
233        });
234        monitored_masters.extend(self.monitored_masters.iter().cloned());
235
236        // Kill leftover processes.
237        let cli_for_shutdown = |port: u16| {
238            RedisCli::new()
239                .bin(&self.redis_cli_bin)
240                .host(&self.bind)
241                .port(port)
242                .shutdown();
243        };
244        cli_for_shutdown(self.master_port);
245        for port in self.replica_ports() {
246            cli_for_shutdown(port);
247        }
248        for port in self.sentinel_ports() {
249            cli_for_shutdown(port);
250        }
251        tokio::time::sleep(Duration::from_millis(500)).await;
252
253        let unique = SystemTime::now()
254            .duration_since(UNIX_EPOCH)
255            .map(|duration| duration.as_nanos())
256            .unwrap_or(0);
257        let base_dir = std::env::temp_dir().join(format!(
258            "redis-sentinel-wrapper-{}-{}",
259            std::process::id(),
260            unique
261        ));
262        fs::create_dir_all(&base_dir)?;
263
264        // 1. Start master.
265        let appendonly = self.appendonly.unwrap_or(true);
266        let mut master = RedisServer::new()
267            .port(self.master_port)
268            .bind(&self.bind)
269            .dir(base_dir.join("master"))
270            .appendonly(appendonly)
271            .redis_server_bin(&self.redis_server_bin)
272            .redis_cli_bin(&self.redis_cli_bin);
273        if let Some(ref logfile) = self.logfile {
274            master = master.logfile(logfile.clone());
275        }
276        if let Some(ref save) = self.save {
277            match save {
278                SavePolicy::Disabled => master = master.save(false),
279                SavePolicy::Default => master = master.save(true),
280                SavePolicy::Custom(pairs) => master = master.save_schedule(pairs.clone()),
281            }
282        }
283        for (key, value) in &self.extra {
284            master = master.extra(key.clone(), value.clone());
285        }
286        let master = master.start().await?;
287
288        // 2. Start replicas.
289        let mut replicas = Vec::new();
290        for port in self.replica_ports() {
291            let mut replica = RedisServer::new()
292                .port(port)
293                .bind(&self.bind)
294                .dir(base_dir.join(format!("replica-{port}")))
295                .appendonly(appendonly)
296                .replicaof(self.bind.clone(), self.master_port)
297                .redis_server_bin(&self.redis_server_bin)
298                .redis_cli_bin(&self.redis_cli_bin);
299            if let Some(ref logfile) = self.logfile {
300                replica = replica.logfile(logfile.clone());
301            }
302            if let Some(ref save) = self.save {
303                match save {
304                    SavePolicy::Disabled => replica = replica.save(false),
305                    SavePolicy::Default => replica = replica.save(true),
306                    SavePolicy::Custom(pairs) => {
307                        replica = replica.save_schedule(pairs.clone());
308                    }
309                }
310            }
311            for (key, value) in &self.extra {
312                replica = replica.extra(key.clone(), value.clone());
313            }
314            let replica = replica.start().await?;
315            replicas.push(replica);
316        }
317
318        // Let replication link up.
319        tokio::time::sleep(Duration::from_secs(1)).await;
320
321        // 3. Start sentinels.
322        let mut sentinel_handles = Vec::new();
323        for port in self.sentinel_ports() {
324            let dir = base_dir.join(format!("sentinel-{port}"));
325            fs::create_dir_all(&dir)?;
326            let conf_path = dir.join("sentinel.conf");
327            let logfile = self
328                .logfile
329                .as_deref()
330                .map(str::to_owned)
331                .unwrap_or_else(|| format!("{}/sentinel.log", dir.display()));
332            let mut conf = format!(
333                "port {port}\n\
334                 bind {bind}\n\
335                 daemonize yes\n\
336                 pidfile {dir}/sentinel.pid\n\
337                 logfile {logfile}\n\
338                 dir {dir}\n",
339                port = port,
340                bind = self.bind,
341                dir = dir.display(),
342                logfile = logfile,
343            );
344            for master in &monitored_masters {
345                conf.push_str(&format!(
346                    "sentinel monitor {name} {host} {master_port} {quorum}\n\
347                     sentinel down-after-milliseconds {name} {down_after}\n\
348                     sentinel failover-timeout {name} {failover_timeout}\n\
349                     sentinel parallel-syncs {name} 1\n",
350                    name = master.name,
351                    host = master.host,
352                    master_port = master.port,
353                    quorum = self.quorum,
354                    down_after = self.down_after_ms,
355                    failover_timeout = self.failover_timeout_ms,
356                ));
357            }
358            for (key, value) in &self.extra {
359                conf.push_str(&format!("{key} {value}\n"));
360            }
361            fs::write(&conf_path, conf)?;
362
363            let status = Command::new(&self.redis_server_bin)
364                .arg(&conf_path)
365                .arg("--sentinel")
366                .stdout(std::process::Stdio::null())
367                .stderr(std::process::Stdio::null())
368                .status()
369                .await?;
370
371            if !status.success() {
372                return Err(Error::SentinelStart { port });
373            }
374
375            let cli = RedisCli::new()
376                .bin(&self.redis_cli_bin)
377                .host(&self.bind)
378                .port(port);
379            cli.wait_for_ready(Duration::from_secs(10)).await?;
380
381            let pid_path = dir.join("sentinel.pid");
382            let pid: u32 = fs::read_to_string(&pid_path)?
383                .trim()
384                .parse()
385                .map_err(|_| Error::SentinelStart { port })?;
386
387            sentinel_handles.push((port, pid, cli));
388        }
389
390        // Wait for sentinels to discover each other.
391        tokio::time::sleep(Duration::from_secs(2)).await;
392
393        Ok(RedisSentinelHandle {
394            master,
395            replicas,
396            sentinel_ports: sentinel_handles.iter().map(|(p, _, _)| *p).collect(),
397            sentinel_pids: sentinel_handles.iter().map(|(_, pid, _)| *pid).collect(),
398            master_name: self.master_name,
399            bind: self.bind,
400            redis_cli_bin: self.redis_cli_bin,
401            num_sentinels: self.num_sentinels,
402            monitored_masters,
403        })
404    }
405}
406
407/// A running Redis Sentinel topology. Stops everything on Drop.
408pub struct RedisSentinelHandle {
409    master: RedisServerHandle,
410    #[allow(dead_code)] // Kept alive for Drop cleanup
411    replicas: Vec<RedisServerHandle>,
412    sentinel_ports: Vec<u16>,
413    sentinel_pids: Vec<u32>,
414    master_name: String,
415    bind: String,
416    redis_cli_bin: String,
417    num_sentinels: u16,
418    monitored_masters: Vec<MonitoredMaster>,
419}
420
421/// Entry point for building a Redis Sentinel topology.
422///
423/// Call [`RedisSentinel::builder`] to obtain a [`RedisSentinelBuilder`], then
424/// configure it and call [`RedisSentinelBuilder::start`] to launch the topology.
425pub struct RedisSentinel;
426
427impl RedisSentinel {
428    /// Create a new sentinel builder with defaults.
429    pub fn builder() -> RedisSentinelBuilder {
430        RedisSentinelBuilder {
431            master_name: "mymaster".into(),
432            master_port: 6390,
433            num_replicas: 2,
434            replica_base_port: 6391,
435            num_sentinels: 3,
436            sentinel_base_port: 26389,
437            quorum: 2,
438            bind: "127.0.0.1".into(),
439            logfile: None,
440            save: None,
441            appendonly: None,
442            down_after_ms: 5000,
443            failover_timeout_ms: 10000,
444            extra: HashMap::new(),
445            redis_server_bin: "redis-server".into(),
446            redis_cli_bin: "redis-cli".into(),
447            monitored_masters: Vec::new(),
448        }
449    }
450}
451
452impl RedisSentinelHandle {
453    /// The master's address.
454    pub fn master_addr(&self) -> String {
455        self.master.addr()
456    }
457
458    /// All monitored master names.
459    pub fn monitored_master_names(&self) -> Vec<&str> {
460        self.monitored_masters
461            .iter()
462            .map(|master| master.name.as_str())
463            .collect()
464    }
465
466    /// All monitored master addresses.
467    pub fn monitored_master_addrs(&self) -> Vec<String> {
468        self.monitored_masters
469            .iter()
470            .map(|master| format!("{}:{}", master.host, master.port))
471            .collect()
472    }
473
474    /// The PIDs of all processes in the topology (master, replicas, sentinels).
475    pub fn pids(&self) -> Vec<u32> {
476        let mut pids = Vec::with_capacity(1 + self.replicas.len() + self.sentinel_pids.len());
477        pids.push(self.master.pid());
478        for replica in &self.replicas {
479            pids.push(replica.pid());
480        }
481        pids.extend_from_slice(&self.sentinel_pids);
482        pids
483    }
484
485    /// All sentinel addresses.
486    pub fn sentinel_addrs(&self) -> Vec<String> {
487        self.sentinel_ports
488            .iter()
489            .map(|p| format!("{}:{}", self.bind, p))
490            .collect()
491    }
492
493    /// The monitored master name.
494    pub fn master_name(&self) -> &str {
495        &self.master_name
496    }
497
498    /// Query a sentinel for the primary monitored master's status.
499    ///
500    /// Iterates over the sentinel processes until one responds, then runs
501    /// `SENTINEL MASTER <name>` and returns the result as a flat key/value map.
502    ///
503    /// Common keys in the returned map include `"ip"`, `"port"`, `"flags"`,
504    /// `"num-slaves"`, and `"num-other-sentinels"`.
505    ///
506    /// Returns [`Error::NoReachableSentinel`] if no sentinel responds.
507    pub async fn poke(&self) -> Result<HashMap<String, String>> {
508        self.poke_master(&self.master_name).await
509    }
510
511    /// Query a sentinel for a specific monitored master's status.
512    ///
513    /// Like [`poke`](Self::poke) but targets `master_name` instead of the
514    /// primary master configured for this topology.
515    pub async fn poke_master(&self, master_name: &str) -> Result<HashMap<String, String>> {
516        for port in &self.sentinel_ports {
517            let cli = RedisCli::new()
518                .bin(&self.redis_cli_bin)
519                .host(&self.bind)
520                .port(*port);
521            if let Ok(raw) = cli.run(&["SENTINEL", "MASTER", master_name]).await {
522                return Ok(parse_flat_kv(&raw));
523            }
524        }
525        Err(Error::NoReachableSentinel)
526    }
527
528    /// Check if the topology is healthy.
529    pub async fn is_healthy(&self) -> bool {
530        for master in &self.monitored_masters {
531            let Ok(info) = self.poke_master(&master.name).await else {
532                return false;
533            };
534            let flags = info.get("flags").map(|s| s.as_str()).unwrap_or("");
535            let num_slaves: u64 = info
536                .get("num-slaves")
537                .and_then(|v| v.parse().ok())
538                .unwrap_or(0);
539            let num_sentinels: u64 = info
540                .get("num-other-sentinels")
541                .and_then(|v| v.parse().ok())
542                .unwrap_or(0)
543                + 1;
544            if flags != "master"
545                || num_slaves < master.expected_replicas as u64
546                || num_sentinels < self.num_sentinels as u64
547            {
548                return false;
549            }
550        }
551        true
552    }
553
554    /// Wait until the topology is healthy or timeout.
555    pub async fn wait_for_healthy(&self, timeout: Duration) -> Result<()> {
556        let start = std::time::Instant::now();
557        loop {
558            if self.is_healthy().await {
559                return Ok(());
560            }
561            if start.elapsed() > timeout {
562                return Err(Error::Timeout {
563                    message: "sentinel topology did not become healthy in time".into(),
564                });
565            }
566            tokio::time::sleep(Duration::from_millis(500)).await;
567        }
568    }
569
570    /// Stop everything.
571    pub fn stop(&self) {
572        // Sentinels first.
573        for port in &self.sentinel_ports {
574            RedisCli::new()
575                .bin(&self.redis_cli_bin)
576                .host(&self.bind)
577                .port(*port)
578                .shutdown();
579        }
580        // Replicas and master stopped by their handles' Drop.
581    }
582}
583
584impl Drop for RedisSentinelHandle {
585    fn drop(&mut self) {
586        self.stop();
587    }
588}
589
590/// Parse alternating key/value lines from sentinel output.
591fn parse_flat_kv(raw: &str) -> HashMap<String, String> {
592    let lines: Vec<&str> = raw.lines().map(|l| l.trim()).collect();
593    let mut map = HashMap::new();
594    let mut i = 0;
595    while i + 1 < lines.len() {
596        map.insert(lines[i].to_string(), lines[i + 1].to_string());
597        i += 2;
598    }
599    map
600}
601
602#[cfg(test)]
603mod tests {
604    use super::*;
605
606    #[test]
607    fn builder_defaults() {
608        let b = RedisSentinel::builder();
609        assert_eq!(b.master_port, 6390);
610        assert_eq!(b.num_replicas, 2);
611        assert_eq!(b.num_sentinels, 3);
612        assert_eq!(b.quorum, 2);
613        assert!(b.logfile.is_none());
614        assert!(b.extra.is_empty());
615        assert!(b.monitored_masters.is_empty());
616    }
617
618    #[test]
619    fn builder_chain() {
620        let b = RedisSentinel::builder()
621            .master_name("custom")
622            .master_port(6500)
623            .replicas(1)
624            .sentinels(5)
625            .quorum(3)
626            .logfile("/tmp/sentinel.log")
627            .extra("maxmemory", "10mb")
628            .monitor("backup", "127.0.0.1", 6501);
629        assert_eq!(b.master_name, "custom");
630        assert_eq!(b.master_port, 6500);
631        assert_eq!(b.num_replicas, 1);
632        assert_eq!(b.num_sentinels, 5);
633        assert_eq!(b.quorum, 3);
634        assert_eq!(b.logfile.as_deref(), Some("/tmp/sentinel.log"));
635        assert_eq!(b.extra.get("maxmemory").map(String::as_str), Some("10mb"));
636        assert_eq!(b.monitored_masters.len(), 1);
637        assert_eq!(
638            b.monitored_masters[0],
639            MonitoredMaster {
640                name: "backup".into(),
641                host: "127.0.0.1".into(),
642                port: 6501,
643                expected_replicas: 0,
644            }
645        );
646    }
647
648    #[test]
649    fn parse_sentinel_output() {
650        let raw = "name\nmymaster\nip\n127.0.0.1\nport\n6380\n";
651        let map = parse_flat_kv(raw);
652        assert_eq!(map.get("name").unwrap(), "mymaster");
653        assert_eq!(map.get("ip").unwrap(), "127.0.0.1");
654        assert_eq!(map.get("port").unwrap(), "6380");
655    }
656}