Skip to main content

redis_server_wrapper/
sentinel.rs

1//! Redis Sentinel topology management built on `RedisServer`.
2
3use std::collections::HashMap;
4use std::fs;
5use std::path::PathBuf;
6use std::time::Duration;
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use tokio::process::Command;
10
11use crate::cli::RedisCli;
12use crate::error::{Error, Result};
13use crate::server::{RedisServer, RedisServerHandle, SavePolicy};
14
15/// Builder for a Redis Sentinel topology.
16///
17/// # Example
18///
19/// ```no_run
20/// use redis_server_wrapper::RedisSentinel;
21///
22/// # async fn example() {
23/// let sentinel = RedisSentinel::builder()
24///     .master_name("mymaster")
25///     .master_port(6390)
26///     .replicas(2)
27///     .sentinels(3)
28///     .start()
29///     .await
30///     .unwrap();
31///
32/// assert!(sentinel.is_healthy().await);
33/// # }
34/// ```
35pub struct RedisSentinelBuilder {
36    master_name: String,
37    master_port: u16,
38    num_replicas: u16,
39    replica_base_port: u16,
40    num_sentinels: u16,
41    sentinel_base_port: u16,
42    quorum: u16,
43    bind: String,
44    logfile: Option<String>,
45    save: Option<SavePolicy>,
46    appendonly: Option<bool>,
47    down_after_ms: u64,
48    failover_timeout_ms: u64,
49    tls_port: Option<u16>,
50    tls_cert_file: Option<PathBuf>,
51    tls_key_file: Option<PathBuf>,
52    tls_ca_cert_file: Option<PathBuf>,
53    tls_ca_cert_dir: Option<PathBuf>,
54    tls_auth_clients: Option<bool>,
55    tls_replication: Option<bool>,
56    extra: HashMap<String, String>,
57    redis_server_bin: String,
58    redis_cli_bin: String,
59    monitored_masters: Vec<MonitoredMaster>,
60}
61
62#[derive(Clone, Debug, PartialEq, Eq)]
63struct MonitoredMaster {
64    name: String,
65    host: String,
66    port: u16,
67    expected_replicas: u16,
68}
69
70impl RedisSentinelBuilder {
71    /// Set the name of the monitored master (default: `"mymaster"`).
72    pub fn master_name(mut self, name: impl Into<String>) -> Self {
73        self.master_name = name.into();
74        self
75    }
76
77    /// Set the master's port (default: `6390`).
78    pub fn master_port(mut self, port: u16) -> Self {
79        self.master_port = port;
80        self
81    }
82
83    /// Set the number of replicas to start (default: `2`).
84    pub fn replicas(mut self, n: u16) -> Self {
85        self.num_replicas = n;
86        self
87    }
88
89    /// Set the base port for replica nodes (default: `6391`).
90    ///
91    /// Replicas are assigned consecutive ports starting at this value.
92    pub fn replica_base_port(mut self, port: u16) -> Self {
93        self.replica_base_port = port;
94        self
95    }
96
97    /// Set the number of sentinel processes to start (default: `3`).
98    pub fn sentinels(mut self, n: u16) -> Self {
99        self.num_sentinels = n;
100        self
101    }
102
103    /// Set the base port for sentinel processes (default: `26389`).
104    ///
105    /// Sentinels are assigned consecutive ports starting at this value.
106    pub fn sentinel_base_port(mut self, port: u16) -> Self {
107        self.sentinel_base_port = port;
108        self
109    }
110
111    /// Set the quorum count — how many sentinels must agree before a failover is triggered (default: `2`).
112    pub fn quorum(mut self, q: u16) -> Self {
113        self.quorum = q;
114        self
115    }
116
117    /// Set the bind address for all processes in the topology (default: `"127.0.0.1"`).
118    pub fn bind(mut self, bind: impl Into<String>) -> Self {
119        self.bind = bind.into();
120        self
121    }
122
123    /// Set the log file path for all processes in the topology.
124    pub fn logfile(mut self, path: impl Into<String>) -> Self {
125        self.logfile = Some(path.into());
126        self
127    }
128
129    /// Set the `down-after-milliseconds` threshold for all monitored masters (default: `5000`).
130    ///
131    /// A master is considered down after it fails to respond within this many milliseconds.
132    pub fn down_after_ms(mut self, ms: u64) -> Self {
133        self.down_after_ms = ms;
134        self
135    }
136
137    /// Set the `failover-timeout` for all monitored masters in milliseconds (default: `10000`).
138    pub fn failover_timeout_ms(mut self, ms: u64) -> Self {
139        self.failover_timeout_ms = ms;
140        self
141    }
142
143    /// Set the RDB save policy for all data-bearing processes in the topology.
144    ///
145    /// `true` omits the `save` directive (Redis defaults apply).
146    /// `false` emits `save ""` to disable RDB entirely.
147    pub fn save(mut self, save: bool) -> Self {
148        self.save = Some(if save {
149            SavePolicy::Default
150        } else {
151            SavePolicy::Disabled
152        });
153        self
154    }
155
156    /// Set a custom RDB save schedule for all data-bearing processes in the topology.
157    pub fn save_schedule(mut self, schedule: Vec<(u64, u64)>) -> Self {
158        self.save = Some(SavePolicy::Custom(schedule));
159        self
160    }
161
162    /// Enable or disable AOF persistence for all data-bearing processes in the topology.
163    ///
164    /// When not set, the builder defaults to `appendonly yes` for the master
165    /// and replicas.
166    pub fn appendonly(mut self, appendonly: bool) -> Self {
167        self.appendonly = Some(appendonly);
168        self
169    }
170
171    // -- TLS directives --
172
173    /// Set the TLS listening port for the master and replica nodes.
174    pub fn tls_port(mut self, port: u16) -> Self {
175        self.tls_port = Some(port);
176        self
177    }
178
179    /// Set the TLS certificate file path for all nodes.
180    pub fn tls_cert_file(mut self, path: impl Into<PathBuf>) -> Self {
181        self.tls_cert_file = Some(path.into());
182        self
183    }
184
185    /// Set the TLS private key file path for all nodes.
186    pub fn tls_key_file(mut self, path: impl Into<PathBuf>) -> Self {
187        self.tls_key_file = Some(path.into());
188        self
189    }
190
191    /// Set the TLS CA certificate file path for all nodes.
192    pub fn tls_ca_cert_file(mut self, path: impl Into<PathBuf>) -> Self {
193        self.tls_ca_cert_file = Some(path.into());
194        self
195    }
196
197    /// Set the TLS CA certificate directory for all nodes.
198    pub fn tls_ca_cert_dir(mut self, path: impl Into<PathBuf>) -> Self {
199        self.tls_ca_cert_dir = Some(path.into());
200        self
201    }
202
203    /// Require TLS client authentication for all nodes.
204    pub fn tls_auth_clients(mut self, auth: bool) -> Self {
205        self.tls_auth_clients = Some(auth);
206        self
207    }
208
209    /// Use TLS for replication traffic between nodes.
210    pub fn tls_replication(mut self, enable: bool) -> Self {
211        self.tls_replication = Some(enable);
212        self
213    }
214
215    /// Set an arbitrary config directive for all processes in the topology.
216    pub fn extra(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
217        self.extra.insert(key.into(), value.into());
218        self
219    }
220
221    /// Set a custom `redis-server` binary path.
222    pub fn redis_server_bin(mut self, bin: impl Into<String>) -> Self {
223        self.redis_server_bin = bin.into();
224        self
225    }
226
227    /// Set a custom `redis-cli` binary path.
228    pub fn redis_cli_bin(mut self, bin: impl Into<String>) -> Self {
229        self.redis_cli_bin = bin.into();
230        self
231    }
232
233    /// Add an additional master for the sentinels to monitor.
234    ///
235    /// The builder-managed topology still creates the primary master configured by
236    /// [`Self::master_name`] and [`Self::master_port`]. Additional monitored
237    /// masters are expected to already be running.
238    pub fn monitor(mut self, name: impl Into<String>, host: impl Into<String>, port: u16) -> Self {
239        self.monitored_masters.push(MonitoredMaster {
240            name: name.into(),
241            host: host.into(),
242            port,
243            expected_replicas: 0,
244        });
245        self
246    }
247
248    /// Add an additional master and the minimum number of replicas expected for it.
249    pub fn monitor_with_replicas(
250        mut self,
251        name: impl Into<String>,
252        host: impl Into<String>,
253        port: u16,
254        expected_replicas: u16,
255    ) -> Self {
256        self.monitored_masters.push(MonitoredMaster {
257            name: name.into(),
258            host: host.into(),
259            port,
260            expected_replicas,
261        });
262        self
263    }
264
265    /// Whether TLS is configured (cert + key present).
266    fn has_tls(&self) -> bool {
267        self.tls_cert_file.is_some() && self.tls_key_file.is_some()
268    }
269
270    /// Apply TLS flags to a CLI instance based on builder config.
271    fn apply_tls_to_cli(&self, mut cli: RedisCli) -> RedisCli {
272        if self.has_tls() {
273            cli = cli.tls(true);
274            if let Some(ref ca) = self.tls_ca_cert_file {
275                cli = cli.cacert(ca);
276            } else {
277                cli = cli.insecure(true);
278            }
279            if let Some(ref cert) = self.tls_cert_file {
280                cli = cli.cert(cert);
281            }
282            if let Some(ref key) = self.tls_key_file {
283                cli = cli.key(key);
284            }
285        }
286        cli
287    }
288
289    /// Apply TLS config to a server builder.
290    fn apply_tls_to_server(&self, mut server: RedisServer) -> RedisServer {
291        if let Some(port) = self.tls_port {
292            server = server.tls_port(port);
293        }
294        if let Some(ref path) = self.tls_cert_file {
295            server = server.tls_cert_file(path);
296        }
297        if let Some(ref path) = self.tls_key_file {
298            server = server.tls_key_file(path);
299        }
300        if let Some(ref path) = self.tls_ca_cert_file {
301            server = server.tls_ca_cert_file(path);
302        }
303        if let Some(ref path) = self.tls_ca_cert_dir {
304            server = server.tls_ca_cert_dir(path);
305        }
306        if let Some(v) = self.tls_auth_clients {
307            server = server.tls_auth_clients(v);
308        }
309        if let Some(v) = self.tls_replication {
310            server = server.tls_replication(v);
311        }
312        server
313    }
314
315    fn replica_ports(&self) -> impl Iterator<Item = u16> {
316        let base = self.replica_base_port;
317        let n = self.num_replicas;
318        (0..n).map(move |i| base + i)
319    }
320
321    fn sentinel_ports(&self) -> impl Iterator<Item = u16> {
322        let base = self.sentinel_base_port;
323        let n = self.num_sentinels;
324        (0..n).map(move |i| base + i)
325    }
326
327    /// Start the full topology: master, replicas, sentinels.
328    pub async fn start(self) -> Result<RedisSentinelHandle> {
329        let mut monitored_masters = Vec::with_capacity(1 + self.monitored_masters.len());
330        monitored_masters.push(MonitoredMaster {
331            name: self.master_name.clone(),
332            host: self.bind.clone(),
333            port: self.master_port,
334            expected_replicas: self.num_replicas,
335        });
336        monitored_masters.extend(self.monitored_masters.iter().cloned());
337
338        // Kill leftover processes.
339        let cli_for_shutdown = |port: u16| {
340            let cli = self.apply_tls_to_cli(
341                RedisCli::new()
342                    .bin(&self.redis_cli_bin)
343                    .host(&self.bind)
344                    .port(port),
345            );
346            cli.shutdown();
347        };
348        cli_for_shutdown(self.master_port);
349        for port in self.replica_ports() {
350            cli_for_shutdown(port);
351        }
352        for port in self.sentinel_ports() {
353            cli_for_shutdown(port);
354        }
355        tokio::time::sleep(Duration::from_millis(500)).await;
356
357        let unique = SystemTime::now()
358            .duration_since(UNIX_EPOCH)
359            .map(|duration| duration.as_nanos())
360            .unwrap_or(0);
361        let base_dir = std::env::temp_dir().join(format!(
362            "redis-sentinel-wrapper-{}-{}",
363            std::process::id(),
364            unique
365        ));
366        fs::create_dir_all(&base_dir)?;
367
368        // 1. Start master.
369        let appendonly = self.appendonly.unwrap_or(true);
370        let mut master = RedisServer::new()
371            .port(self.master_port)
372            .bind(&self.bind)
373            .dir(base_dir.join("master"))
374            .appendonly(appendonly)
375            .redis_server_bin(&self.redis_server_bin)
376            .redis_cli_bin(&self.redis_cli_bin);
377        master = self.apply_tls_to_server(master);
378        if let Some(ref logfile) = self.logfile {
379            master = master.logfile(logfile.clone());
380        }
381        if let Some(ref save) = self.save {
382            match save {
383                SavePolicy::Disabled => master = master.save(false),
384                SavePolicy::Default => master = master.save(true),
385                SavePolicy::Custom(pairs) => master = master.save_schedule(pairs.clone()),
386            }
387        }
388        for (key, value) in &self.extra {
389            master = master.extra(key.clone(), value.clone());
390        }
391        let master = master.start().await?;
392
393        // 2. Start replicas.
394        let mut replicas = Vec::new();
395        for port in self.replica_ports() {
396            let mut replica = RedisServer::new()
397                .port(port)
398                .bind(&self.bind)
399                .dir(base_dir.join(format!("replica-{port}")))
400                .appendonly(appendonly)
401                .replicaof(self.bind.clone(), self.master_port)
402                .redis_server_bin(&self.redis_server_bin)
403                .redis_cli_bin(&self.redis_cli_bin);
404            replica = self.apply_tls_to_server(replica);
405            if let Some(ref logfile) = self.logfile {
406                replica = replica.logfile(logfile.clone());
407            }
408            if let Some(ref save) = self.save {
409                match save {
410                    SavePolicy::Disabled => replica = replica.save(false),
411                    SavePolicy::Default => replica = replica.save(true),
412                    SavePolicy::Custom(pairs) => {
413                        replica = replica.save_schedule(pairs.clone());
414                    }
415                }
416            }
417            for (key, value) in &self.extra {
418                replica = replica.extra(key.clone(), value.clone());
419            }
420            let replica = replica.start().await?;
421            replicas.push(replica);
422        }
423
424        // Let replication link up.
425        tokio::time::sleep(Duration::from_secs(1)).await;
426
427        // 3. Start sentinels.
428        let mut sentinel_handles = Vec::new();
429        for port in self.sentinel_ports() {
430            let dir = base_dir.join(format!("sentinel-{port}"));
431            fs::create_dir_all(&dir)?;
432            let conf_path = dir.join("sentinel.conf");
433            let logfile = self
434                .logfile
435                .as_deref()
436                .map(str::to_owned)
437                .unwrap_or_else(|| format!("{}/sentinel.log", dir.display()));
438            let mut conf = format!(
439                "port {port}\n\
440                 bind {bind}\n\
441                 daemonize yes\n\
442                 pidfile {dir}/sentinel.pid\n\
443                 logfile {logfile}\n\
444                 dir {dir}\n",
445                port = port,
446                bind = self.bind,
447                dir = dir.display(),
448                logfile = logfile,
449            );
450            for master in &monitored_masters {
451                conf.push_str(&format!(
452                    "sentinel monitor {name} {host} {master_port} {quorum}\n\
453                     sentinel down-after-milliseconds {name} {down_after}\n\
454                     sentinel failover-timeout {name} {failover_timeout}\n\
455                     sentinel parallel-syncs {name} 1\n",
456                    name = master.name,
457                    host = master.host,
458                    master_port = master.port,
459                    quorum = self.quorum,
460                    down_after = self.down_after_ms,
461                    failover_timeout = self.failover_timeout_ms,
462                ));
463            }
464            // TLS directives for sentinels.
465            if let Some(ref path) = self.tls_cert_file {
466                conf.push_str(&format!("tls-cert-file {}\n", path.display()));
467            }
468            if let Some(ref path) = self.tls_key_file {
469                conf.push_str(&format!("tls-key-file {}\n", path.display()));
470            }
471            if let Some(ref path) = self.tls_ca_cert_file {
472                conf.push_str(&format!("tls-ca-cert-file {}\n", path.display()));
473            }
474            if let Some(ref path) = self.tls_ca_cert_dir {
475                conf.push_str(&format!("tls-ca-cert-dir {}\n", path.display()));
476            }
477            if let Some(tls_port) = self.tls_port {
478                conf.push_str(&format!("tls-port {tls_port}\n"));
479            }
480            if let Some(v) = self.tls_auth_clients {
481                conf.push_str(&format!(
482                    "tls-auth-clients {}\n",
483                    if v { "yes" } else { "no" }
484                ));
485            }
486            if let Some(v) = self.tls_replication {
487                conf.push_str(&format!(
488                    "tls-replication {}\n",
489                    if v { "yes" } else { "no" }
490                ));
491            }
492            for (key, value) in &self.extra {
493                conf.push_str(&format!("{key} {value}\n"));
494            }
495            fs::write(&conf_path, conf)?;
496
497            let status = Command::new(&self.redis_server_bin)
498                .arg(&conf_path)
499                .arg("--sentinel")
500                .stdout(std::process::Stdio::null())
501                .stderr(std::process::Stdio::null())
502                .status()
503                .await?;
504
505            if !status.success() {
506                return Err(Error::SentinelStart { port });
507            }
508
509            let cli = self.apply_tls_to_cli(
510                RedisCli::new()
511                    .bin(&self.redis_cli_bin)
512                    .host(&self.bind)
513                    .port(port),
514            );
515            cli.wait_for_ready(Duration::from_secs(10)).await?;
516
517            let pid_path = dir.join("sentinel.pid");
518            let pid: u32 = fs::read_to_string(&pid_path)?
519                .trim()
520                .parse()
521                .map_err(|_| Error::SentinelStart { port })?;
522
523            sentinel_handles.push((port, pid, cli));
524        }
525
526        // Wait for sentinels to discover each other.
527        tokio::time::sleep(Duration::from_secs(2)).await;
528
529        Ok(RedisSentinelHandle {
530            master,
531            replicas,
532            sentinel_ports: sentinel_handles.iter().map(|(p, _, _)| *p).collect(),
533            sentinel_pids: sentinel_handles.iter().map(|(_, pid, _)| *pid).collect(),
534            master_name: self.master_name,
535            bind: self.bind,
536            redis_cli_bin: self.redis_cli_bin,
537            num_sentinels: self.num_sentinels,
538            monitored_masters,
539            tls: TlsConfig {
540                cert_file: self.tls_cert_file,
541                key_file: self.tls_key_file,
542                ca_cert_file: self.tls_ca_cert_file,
543            },
544        })
545    }
546}
547
548/// TLS configuration snapshot stored in the handle for building CLI instances.
549#[derive(Clone, Debug, Default)]
550struct TlsConfig {
551    cert_file: Option<PathBuf>,
552    key_file: Option<PathBuf>,
553    ca_cert_file: Option<PathBuf>,
554}
555
556impl TlsConfig {
557    fn has_tls(&self) -> bool {
558        self.cert_file.is_some() && self.key_file.is_some()
559    }
560
561    fn apply(&self, mut cli: RedisCli) -> RedisCli {
562        if self.has_tls() {
563            cli = cli.tls(true);
564            if let Some(ref ca) = self.ca_cert_file {
565                cli = cli.cacert(ca);
566            } else {
567                cli = cli.insecure(true);
568            }
569            if let Some(ref cert) = self.cert_file {
570                cli = cli.cert(cert);
571            }
572            if let Some(ref key) = self.key_file {
573                cli = cli.key(key);
574            }
575        }
576        cli
577    }
578}
579
580/// A running Redis Sentinel topology. Stops everything on Drop.
581pub struct RedisSentinelHandle {
582    master: RedisServerHandle,
583    #[allow(dead_code)] // Kept alive for Drop cleanup
584    replicas: Vec<RedisServerHandle>,
585    sentinel_ports: Vec<u16>,
586    sentinel_pids: Vec<u32>,
587    master_name: String,
588    bind: String,
589    redis_cli_bin: String,
590    num_sentinels: u16,
591    monitored_masters: Vec<MonitoredMaster>,
592    tls: TlsConfig,
593}
594
595/// Entry point for building a Redis Sentinel topology.
596///
597/// Call [`RedisSentinel::builder`] to obtain a [`RedisSentinelBuilder`], then
598/// configure it and call [`RedisSentinelBuilder::start`] to launch the topology.
599pub struct RedisSentinel;
600
601impl RedisSentinel {
602    /// Create a new sentinel builder with defaults.
603    pub fn builder() -> RedisSentinelBuilder {
604        RedisSentinelBuilder {
605            master_name: "mymaster".into(),
606            master_port: 6390,
607            num_replicas: 2,
608            replica_base_port: 6391,
609            num_sentinels: 3,
610            sentinel_base_port: 26389,
611            quorum: 2,
612            bind: "127.0.0.1".into(),
613            logfile: None,
614            save: None,
615            appendonly: None,
616            down_after_ms: 5000,
617            failover_timeout_ms: 10000,
618            tls_port: None,
619            tls_cert_file: None,
620            tls_key_file: None,
621            tls_ca_cert_file: None,
622            tls_ca_cert_dir: None,
623            tls_auth_clients: None,
624            tls_replication: None,
625            extra: HashMap::new(),
626            redis_server_bin: "redis-server".into(),
627            redis_cli_bin: "redis-cli".into(),
628            monitored_masters: Vec::new(),
629        }
630    }
631}
632
633impl RedisSentinelHandle {
634    /// The master's address.
635    pub fn master_addr(&self) -> String {
636        self.master.addr()
637    }
638
639    /// All monitored master names.
640    pub fn monitored_master_names(&self) -> Vec<&str> {
641        self.monitored_masters
642            .iter()
643            .map(|master| master.name.as_str())
644            .collect()
645    }
646
647    /// All monitored master addresses.
648    pub fn monitored_master_addrs(&self) -> Vec<String> {
649        self.monitored_masters
650            .iter()
651            .map(|master| format!("{}:{}", master.host, master.port))
652            .collect()
653    }
654
655    /// The PIDs of all processes in the topology (master, replicas, sentinels).
656    pub fn pids(&self) -> Vec<u32> {
657        let mut pids = Vec::with_capacity(1 + self.replicas.len() + self.sentinel_pids.len());
658        pids.push(self.master.pid());
659        for replica in &self.replicas {
660            pids.push(replica.pid());
661        }
662        pids.extend_from_slice(&self.sentinel_pids);
663        pids
664    }
665
666    /// All sentinel addresses.
667    pub fn sentinel_addrs(&self) -> Vec<String> {
668        self.sentinel_ports
669            .iter()
670            .map(|p| format!("{}:{}", self.bind, p))
671            .collect()
672    }
673
674    /// The monitored master name.
675    pub fn master_name(&self) -> &str {
676        &self.master_name
677    }
678
679    /// Query a sentinel for the primary monitored master's status.
680    ///
681    /// Iterates over the sentinel processes until one responds, then runs
682    /// `SENTINEL MASTER <name>` and returns the result as a flat key/value map.
683    ///
684    /// Common keys in the returned map include `"ip"`, `"port"`, `"flags"`,
685    /// `"num-slaves"`, and `"num-other-sentinels"`.
686    ///
687    /// Returns [`Error::NoReachableSentinel`] if no sentinel responds.
688    pub async fn poke(&self) -> Result<HashMap<String, String>> {
689        self.poke_master(&self.master_name).await
690    }
691
692    /// Query a sentinel for a specific monitored master's status.
693    ///
694    /// Like [`poke`](Self::poke) but targets `master_name` instead of the
695    /// primary master configured for this topology.
696    pub async fn poke_master(&self, master_name: &str) -> Result<HashMap<String, String>> {
697        for port in &self.sentinel_ports {
698            let cli = self.tls.apply(
699                RedisCli::new()
700                    .bin(&self.redis_cli_bin)
701                    .host(&self.bind)
702                    .port(*port),
703            );
704            if let Ok(raw) = cli.run(&["SENTINEL", "MASTER", master_name]).await {
705                return Ok(parse_flat_kv(&raw));
706            }
707        }
708        Err(Error::NoReachableSentinel)
709    }
710
711    /// Check if the topology is healthy.
712    pub async fn is_healthy(&self) -> bool {
713        for master in &self.monitored_masters {
714            let Ok(info) = self.poke_master(&master.name).await else {
715                return false;
716            };
717            let flags = info.get("flags").map(|s| s.as_str()).unwrap_or("");
718            let num_slaves: u64 = info
719                .get("num-slaves")
720                .and_then(|v| v.parse().ok())
721                .unwrap_or(0);
722            let num_sentinels: u64 = info
723                .get("num-other-sentinels")
724                .and_then(|v| v.parse().ok())
725                .unwrap_or(0)
726                + 1;
727            if flags != "master"
728                || num_slaves < master.expected_replicas as u64
729                || num_sentinels < self.num_sentinels as u64
730            {
731                return false;
732            }
733        }
734        true
735    }
736
737    /// Wait until the topology is healthy or timeout.
738    pub async fn wait_for_healthy(&self, timeout: Duration) -> Result<()> {
739        let start = std::time::Instant::now();
740        loop {
741            if self.is_healthy().await {
742                return Ok(());
743            }
744            if start.elapsed() > timeout {
745                return Err(Error::Timeout {
746                    message: "sentinel topology did not become healthy in time".into(),
747                });
748            }
749            tokio::time::sleep(Duration::from_millis(500)).await;
750        }
751    }
752
753    /// Stop everything via an escalating shutdown strategy.
754    ///
755    /// 1. Sends `SHUTDOWN NOSAVE` to each sentinel process.
756    /// 2. Waits 500ms for them to exit.
757    /// 3. For each sentinel PID that is still alive, calls [`crate::process::force_kill`].
758    /// 4. Calls [`crate::process::kill_by_port`] for each sentinel port as a safety net.
759    ///
760    /// Replicas and the master are stopped by their own handles' [`Drop`] impls,
761    /// which also use the escalating strategy.
762    pub fn stop(&self) {
763        // Step 1: graceful shutdown for each sentinel.
764        for port in &self.sentinel_ports {
765            self.tls
766                .apply(
767                    RedisCli::new()
768                        .bin(&self.redis_cli_bin)
769                        .host(&self.bind)
770                        .port(*port),
771                )
772                .shutdown();
773        }
774        // Step 2: grace period.
775        std::thread::sleep(std::time::Duration::from_millis(500));
776        // Step 3: force kill any sentinels still alive.
777        for pid in &self.sentinel_pids {
778            if crate::process::pid_alive(*pid) {
779                crate::process::force_kill(*pid);
780            }
781        }
782        // Step 4: port cleanup as safety net.
783        for port in &self.sentinel_ports {
784            crate::process::kill_by_port(*port);
785        }
786        // Replicas and master stopped by their handles' Drop.
787    }
788}
789
790impl Drop for RedisSentinelHandle {
791    fn drop(&mut self) {
792        self.stop();
793    }
794}
795
796/// Parse alternating key/value lines from sentinel output.
797fn parse_flat_kv(raw: &str) -> HashMap<String, String> {
798    let lines: Vec<&str> = raw.lines().map(|l| l.trim()).collect();
799    let mut map = HashMap::new();
800    let mut i = 0;
801    while i + 1 < lines.len() {
802        map.insert(lines[i].to_string(), lines[i + 1].to_string());
803        i += 2;
804    }
805    map
806}
807
808#[cfg(test)]
809mod tests {
810    use super::*;
811
812    #[test]
813    fn builder_defaults() {
814        let b = RedisSentinel::builder();
815        assert_eq!(b.master_port, 6390);
816        assert_eq!(b.num_replicas, 2);
817        assert_eq!(b.num_sentinels, 3);
818        assert_eq!(b.quorum, 2);
819        assert!(b.logfile.is_none());
820        assert!(b.extra.is_empty());
821        assert!(b.monitored_masters.is_empty());
822    }
823
824    #[test]
825    fn builder_chain() {
826        let b = RedisSentinel::builder()
827            .master_name("custom")
828            .master_port(6500)
829            .replicas(1)
830            .sentinels(5)
831            .quorum(3)
832            .logfile("/tmp/sentinel.log")
833            .extra("maxmemory", "10mb")
834            .monitor("backup", "127.0.0.1", 6501);
835        assert_eq!(b.master_name, "custom");
836        assert_eq!(b.master_port, 6500);
837        assert_eq!(b.num_replicas, 1);
838        assert_eq!(b.num_sentinels, 5);
839        assert_eq!(b.quorum, 3);
840        assert_eq!(b.logfile.as_deref(), Some("/tmp/sentinel.log"));
841        assert_eq!(b.extra.get("maxmemory").map(String::as_str), Some("10mb"));
842        assert_eq!(b.monitored_masters.len(), 1);
843        assert_eq!(
844            b.monitored_masters[0],
845            MonitoredMaster {
846                name: "backup".into(),
847                host: "127.0.0.1".into(),
848                port: 6501,
849                expected_replicas: 0,
850            }
851        );
852    }
853
854    #[test]
855    fn parse_sentinel_output() {
856        let raw = "name\nmymaster\nip\n127.0.0.1\nport\n6380\n";
857        let map = parse_flat_kv(raw);
858        assert_eq!(map.get("name").unwrap(), "mymaster");
859        assert_eq!(map.get("ip").unwrap(), "127.0.0.1");
860        assert_eq!(map.get("port").unwrap(), "6380");
861    }
862}