1use std::collections::HashMap;
4use std::fs;
5use std::path::PathBuf;
6use std::time::Duration;
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use tokio::process::Command;
10
11use crate::cli::RedisCli;
12use crate::error::{Error, Result};
13use crate::server::{RedisServer, RedisServerHandle, SavePolicy};
14
15pub struct RedisSentinelBuilder {
36 master_name: String,
37 master_port: u16,
38 num_replicas: u16,
39 replica_base_port: u16,
40 num_sentinels: u16,
41 sentinel_base_port: u16,
42 quorum: u16,
43 bind: String,
44 logfile: Option<String>,
45 save: Option<SavePolicy>,
46 appendonly: Option<bool>,
47 down_after_ms: u64,
48 failover_timeout_ms: u64,
49 tls_port: Option<u16>,
50 tls_cert_file: Option<PathBuf>,
51 tls_key_file: Option<PathBuf>,
52 tls_ca_cert_file: Option<PathBuf>,
53 tls_ca_cert_dir: Option<PathBuf>,
54 tls_auth_clients: Option<bool>,
55 tls_replication: Option<bool>,
56 extra: HashMap<String, String>,
57 redis_server_bin: String,
58 redis_cli_bin: String,
59 monitored_masters: Vec<MonitoredMaster>,
60}
61
62#[derive(Clone, Debug, PartialEq, Eq)]
63struct MonitoredMaster {
64 name: String,
65 host: String,
66 port: u16,
67 expected_replicas: u16,
68}
69
70impl RedisSentinelBuilder {
71 pub fn master_name(mut self, name: impl Into<String>) -> Self {
73 self.master_name = name.into();
74 self
75 }
76
77 pub fn master_port(mut self, port: u16) -> Self {
79 self.master_port = port;
80 self
81 }
82
83 pub fn replicas(mut self, n: u16) -> Self {
85 self.num_replicas = n;
86 self
87 }
88
89 pub fn replica_base_port(mut self, port: u16) -> Self {
93 self.replica_base_port = port;
94 self
95 }
96
97 pub fn sentinels(mut self, n: u16) -> Self {
99 self.num_sentinels = n;
100 self
101 }
102
103 pub fn sentinel_base_port(mut self, port: u16) -> Self {
107 self.sentinel_base_port = port;
108 self
109 }
110
111 pub fn quorum(mut self, q: u16) -> Self {
113 self.quorum = q;
114 self
115 }
116
117 pub fn bind(mut self, bind: impl Into<String>) -> Self {
119 self.bind = bind.into();
120 self
121 }
122
123 pub fn logfile(mut self, path: impl Into<String>) -> Self {
125 self.logfile = Some(path.into());
126 self
127 }
128
129 pub fn down_after_ms(mut self, ms: u64) -> Self {
133 self.down_after_ms = ms;
134 self
135 }
136
137 pub fn failover_timeout_ms(mut self, ms: u64) -> Self {
139 self.failover_timeout_ms = ms;
140 self
141 }
142
143 pub fn save(mut self, save: bool) -> Self {
148 self.save = Some(if save {
149 SavePolicy::Default
150 } else {
151 SavePolicy::Disabled
152 });
153 self
154 }
155
156 pub fn save_schedule(mut self, schedule: Vec<(u64, u64)>) -> Self {
158 self.save = Some(SavePolicy::Custom(schedule));
159 self
160 }
161
162 pub fn appendonly(mut self, appendonly: bool) -> Self {
167 self.appendonly = Some(appendonly);
168 self
169 }
170
171 pub fn tls_port(mut self, port: u16) -> Self {
175 self.tls_port = Some(port);
176 self
177 }
178
179 pub fn tls_cert_file(mut self, path: impl Into<PathBuf>) -> Self {
181 self.tls_cert_file = Some(path.into());
182 self
183 }
184
185 pub fn tls_key_file(mut self, path: impl Into<PathBuf>) -> Self {
187 self.tls_key_file = Some(path.into());
188 self
189 }
190
191 pub fn tls_ca_cert_file(mut self, path: impl Into<PathBuf>) -> Self {
193 self.tls_ca_cert_file = Some(path.into());
194 self
195 }
196
197 pub fn tls_ca_cert_dir(mut self, path: impl Into<PathBuf>) -> Self {
199 self.tls_ca_cert_dir = Some(path.into());
200 self
201 }
202
203 pub fn tls_auth_clients(mut self, auth: bool) -> Self {
205 self.tls_auth_clients = Some(auth);
206 self
207 }
208
209 pub fn tls_replication(mut self, enable: bool) -> Self {
211 self.tls_replication = Some(enable);
212 self
213 }
214
215 pub fn extra(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
217 self.extra.insert(key.into(), value.into());
218 self
219 }
220
221 pub fn redis_server_bin(mut self, bin: impl Into<String>) -> Self {
223 self.redis_server_bin = bin.into();
224 self
225 }
226
227 pub fn redis_cli_bin(mut self, bin: impl Into<String>) -> Self {
229 self.redis_cli_bin = bin.into();
230 self
231 }
232
233 pub fn monitor(mut self, name: impl Into<String>, host: impl Into<String>, port: u16) -> Self {
239 self.monitored_masters.push(MonitoredMaster {
240 name: name.into(),
241 host: host.into(),
242 port,
243 expected_replicas: 0,
244 });
245 self
246 }
247
248 pub fn monitor_with_replicas(
250 mut self,
251 name: impl Into<String>,
252 host: impl Into<String>,
253 port: u16,
254 expected_replicas: u16,
255 ) -> Self {
256 self.monitored_masters.push(MonitoredMaster {
257 name: name.into(),
258 host: host.into(),
259 port,
260 expected_replicas,
261 });
262 self
263 }
264
265 fn has_tls(&self) -> bool {
267 self.tls_cert_file.is_some() && self.tls_key_file.is_some()
268 }
269
270 fn apply_tls_to_cli(&self, mut cli: RedisCli) -> RedisCli {
272 if self.has_tls() {
273 cli = cli.tls(true);
274 if let Some(ref ca) = self.tls_ca_cert_file {
275 cli = cli.cacert(ca);
276 } else {
277 cli = cli.insecure(true);
278 }
279 if let Some(ref cert) = self.tls_cert_file {
280 cli = cli.cert(cert);
281 }
282 if let Some(ref key) = self.tls_key_file {
283 cli = cli.key(key);
284 }
285 }
286 cli
287 }
288
289 fn apply_tls_to_server(&self, mut server: RedisServer) -> RedisServer {
291 if let Some(port) = self.tls_port {
292 server = server.tls_port(port);
293 }
294 if let Some(ref path) = self.tls_cert_file {
295 server = server.tls_cert_file(path);
296 }
297 if let Some(ref path) = self.tls_key_file {
298 server = server.tls_key_file(path);
299 }
300 if let Some(ref path) = self.tls_ca_cert_file {
301 server = server.tls_ca_cert_file(path);
302 }
303 if let Some(ref path) = self.tls_ca_cert_dir {
304 server = server.tls_ca_cert_dir(path);
305 }
306 if let Some(v) = self.tls_auth_clients {
307 server = server.tls_auth_clients(v);
308 }
309 if let Some(v) = self.tls_replication {
310 server = server.tls_replication(v);
311 }
312 server
313 }
314
315 fn replica_ports(&self) -> impl Iterator<Item = u16> {
316 let base = self.replica_base_port;
317 let n = self.num_replicas;
318 (0..n).map(move |i| base + i)
319 }
320
321 fn sentinel_ports(&self) -> impl Iterator<Item = u16> {
322 let base = self.sentinel_base_port;
323 let n = self.num_sentinels;
324 (0..n).map(move |i| base + i)
325 }
326
327 pub async fn start(self) -> Result<RedisSentinelHandle> {
329 let mut monitored_masters = Vec::with_capacity(1 + self.monitored_masters.len());
330 monitored_masters.push(MonitoredMaster {
331 name: self.master_name.clone(),
332 host: self.bind.clone(),
333 port: self.master_port,
334 expected_replicas: self.num_replicas,
335 });
336 monitored_masters.extend(self.monitored_masters.iter().cloned());
337
338 let cli_for_shutdown = |port: u16| {
340 let cli = self.apply_tls_to_cli(
341 RedisCli::new()
342 .bin(&self.redis_cli_bin)
343 .host(&self.bind)
344 .port(port),
345 );
346 cli.shutdown();
347 };
348 cli_for_shutdown(self.master_port);
349 for port in self.replica_ports() {
350 cli_for_shutdown(port);
351 }
352 for port in self.sentinel_ports() {
353 cli_for_shutdown(port);
354 }
355 tokio::time::sleep(Duration::from_millis(500)).await;
356
357 let unique = SystemTime::now()
358 .duration_since(UNIX_EPOCH)
359 .map(|duration| duration.as_nanos())
360 .unwrap_or(0);
361 let base_dir = std::env::temp_dir().join(format!(
362 "redis-sentinel-wrapper-{}-{}",
363 std::process::id(),
364 unique
365 ));
366 fs::create_dir_all(&base_dir)?;
367
368 let appendonly = self.appendonly.unwrap_or(true);
370 let mut master = RedisServer::new()
371 .port(self.master_port)
372 .bind(&self.bind)
373 .dir(base_dir.join("master"))
374 .appendonly(appendonly)
375 .redis_server_bin(&self.redis_server_bin)
376 .redis_cli_bin(&self.redis_cli_bin);
377 master = self.apply_tls_to_server(master);
378 if let Some(ref logfile) = self.logfile {
379 master = master.logfile(logfile.clone());
380 }
381 if let Some(ref save) = self.save {
382 match save {
383 SavePolicy::Disabled => master = master.save(false),
384 SavePolicy::Default => master = master.save(true),
385 SavePolicy::Custom(pairs) => master = master.save_schedule(pairs.clone()),
386 }
387 }
388 for (key, value) in &self.extra {
389 master = master.extra(key.clone(), value.clone());
390 }
391 let master = master.start().await?;
392
393 let mut replicas = Vec::new();
395 for port in self.replica_ports() {
396 let mut replica = RedisServer::new()
397 .port(port)
398 .bind(&self.bind)
399 .dir(base_dir.join(format!("replica-{port}")))
400 .appendonly(appendonly)
401 .replicaof(self.bind.clone(), self.master_port)
402 .redis_server_bin(&self.redis_server_bin)
403 .redis_cli_bin(&self.redis_cli_bin);
404 replica = self.apply_tls_to_server(replica);
405 if let Some(ref logfile) = self.logfile {
406 replica = replica.logfile(logfile.clone());
407 }
408 if let Some(ref save) = self.save {
409 match save {
410 SavePolicy::Disabled => replica = replica.save(false),
411 SavePolicy::Default => replica = replica.save(true),
412 SavePolicy::Custom(pairs) => {
413 replica = replica.save_schedule(pairs.clone());
414 }
415 }
416 }
417 for (key, value) in &self.extra {
418 replica = replica.extra(key.clone(), value.clone());
419 }
420 let replica = replica.start().await?;
421 replicas.push(replica);
422 }
423
424 tokio::time::sleep(Duration::from_secs(1)).await;
426
427 let mut sentinel_handles = Vec::new();
429 for port in self.sentinel_ports() {
430 let dir = base_dir.join(format!("sentinel-{port}"));
431 fs::create_dir_all(&dir)?;
432 let conf_path = dir.join("sentinel.conf");
433 let logfile = self
434 .logfile
435 .as_deref()
436 .map(str::to_owned)
437 .unwrap_or_else(|| format!("{}/sentinel.log", dir.display()));
438 let mut conf = format!(
439 "port {port}\n\
440 bind {bind}\n\
441 daemonize yes\n\
442 pidfile {dir}/sentinel.pid\n\
443 logfile {logfile}\n\
444 dir {dir}\n",
445 port = port,
446 bind = self.bind,
447 dir = dir.display(),
448 logfile = logfile,
449 );
450 for master in &monitored_masters {
451 conf.push_str(&format!(
452 "sentinel monitor {name} {host} {master_port} {quorum}\n\
453 sentinel down-after-milliseconds {name} {down_after}\n\
454 sentinel failover-timeout {name} {failover_timeout}\n\
455 sentinel parallel-syncs {name} 1\n",
456 name = master.name,
457 host = master.host,
458 master_port = master.port,
459 quorum = self.quorum,
460 down_after = self.down_after_ms,
461 failover_timeout = self.failover_timeout_ms,
462 ));
463 }
464 if let Some(ref path) = self.tls_cert_file {
466 conf.push_str(&format!("tls-cert-file {}\n", path.display()));
467 }
468 if let Some(ref path) = self.tls_key_file {
469 conf.push_str(&format!("tls-key-file {}\n", path.display()));
470 }
471 if let Some(ref path) = self.tls_ca_cert_file {
472 conf.push_str(&format!("tls-ca-cert-file {}\n", path.display()));
473 }
474 if let Some(ref path) = self.tls_ca_cert_dir {
475 conf.push_str(&format!("tls-ca-cert-dir {}\n", path.display()));
476 }
477 if let Some(tls_port) = self.tls_port {
478 conf.push_str(&format!("tls-port {tls_port}\n"));
479 }
480 if let Some(v) = self.tls_auth_clients {
481 conf.push_str(&format!(
482 "tls-auth-clients {}\n",
483 if v { "yes" } else { "no" }
484 ));
485 }
486 if let Some(v) = self.tls_replication {
487 conf.push_str(&format!(
488 "tls-replication {}\n",
489 if v { "yes" } else { "no" }
490 ));
491 }
492 for (key, value) in &self.extra {
493 conf.push_str(&format!("{key} {value}\n"));
494 }
495 fs::write(&conf_path, conf)?;
496
497 let status = Command::new(&self.redis_server_bin)
498 .arg(&conf_path)
499 .arg("--sentinel")
500 .stdout(std::process::Stdio::null())
501 .stderr(std::process::Stdio::null())
502 .status()
503 .await?;
504
505 if !status.success() {
506 return Err(Error::SentinelStart { port });
507 }
508
509 let cli = self.apply_tls_to_cli(
510 RedisCli::new()
511 .bin(&self.redis_cli_bin)
512 .host(&self.bind)
513 .port(port),
514 );
515 cli.wait_for_ready(Duration::from_secs(10)).await?;
516
517 let pid_path = dir.join("sentinel.pid");
518 let pid: u32 = fs::read_to_string(&pid_path)?
519 .trim()
520 .parse()
521 .map_err(|_| Error::SentinelStart { port })?;
522
523 sentinel_handles.push((port, pid, cli));
524 }
525
526 tokio::time::sleep(Duration::from_secs(2)).await;
528
529 Ok(RedisSentinelHandle {
530 master,
531 replicas,
532 sentinel_ports: sentinel_handles.iter().map(|(p, _, _)| *p).collect(),
533 sentinel_pids: sentinel_handles.iter().map(|(_, pid, _)| *pid).collect(),
534 master_name: self.master_name,
535 bind: self.bind,
536 redis_cli_bin: self.redis_cli_bin,
537 num_sentinels: self.num_sentinels,
538 monitored_masters,
539 tls: TlsConfig {
540 cert_file: self.tls_cert_file,
541 key_file: self.tls_key_file,
542 ca_cert_file: self.tls_ca_cert_file,
543 },
544 })
545 }
546}
547
548#[derive(Clone, Debug, Default)]
550struct TlsConfig {
551 cert_file: Option<PathBuf>,
552 key_file: Option<PathBuf>,
553 ca_cert_file: Option<PathBuf>,
554}
555
556impl TlsConfig {
557 fn has_tls(&self) -> bool {
558 self.cert_file.is_some() && self.key_file.is_some()
559 }
560
561 fn apply(&self, mut cli: RedisCli) -> RedisCli {
562 if self.has_tls() {
563 cli = cli.tls(true);
564 if let Some(ref ca) = self.ca_cert_file {
565 cli = cli.cacert(ca);
566 } else {
567 cli = cli.insecure(true);
568 }
569 if let Some(ref cert) = self.cert_file {
570 cli = cli.cert(cert);
571 }
572 if let Some(ref key) = self.key_file {
573 cli = cli.key(key);
574 }
575 }
576 cli
577 }
578}
579
580pub struct RedisSentinelHandle {
582 master: RedisServerHandle,
583 #[allow(dead_code)] replicas: Vec<RedisServerHandle>,
585 sentinel_ports: Vec<u16>,
586 sentinel_pids: Vec<u32>,
587 master_name: String,
588 bind: String,
589 redis_cli_bin: String,
590 num_sentinels: u16,
591 monitored_masters: Vec<MonitoredMaster>,
592 tls: TlsConfig,
593}
594
595pub struct RedisSentinel;
600
601impl RedisSentinel {
602 pub fn builder() -> RedisSentinelBuilder {
604 RedisSentinelBuilder {
605 master_name: "mymaster".into(),
606 master_port: 6390,
607 num_replicas: 2,
608 replica_base_port: 6391,
609 num_sentinels: 3,
610 sentinel_base_port: 26389,
611 quorum: 2,
612 bind: "127.0.0.1".into(),
613 logfile: None,
614 save: None,
615 appendonly: None,
616 down_after_ms: 5000,
617 failover_timeout_ms: 10000,
618 tls_port: None,
619 tls_cert_file: None,
620 tls_key_file: None,
621 tls_ca_cert_file: None,
622 tls_ca_cert_dir: None,
623 tls_auth_clients: None,
624 tls_replication: None,
625 extra: HashMap::new(),
626 redis_server_bin: "redis-server".into(),
627 redis_cli_bin: "redis-cli".into(),
628 monitored_masters: Vec::new(),
629 }
630 }
631}
632
633impl RedisSentinelHandle {
634 pub fn master_addr(&self) -> String {
636 self.master.addr()
637 }
638
639 pub fn monitored_master_names(&self) -> Vec<&str> {
641 self.monitored_masters
642 .iter()
643 .map(|master| master.name.as_str())
644 .collect()
645 }
646
647 pub fn monitored_master_addrs(&self) -> Vec<String> {
649 self.monitored_masters
650 .iter()
651 .map(|master| format!("{}:{}", master.host, master.port))
652 .collect()
653 }
654
655 pub fn pids(&self) -> Vec<u32> {
657 let mut pids = Vec::with_capacity(1 + self.replicas.len() + self.sentinel_pids.len());
658 pids.push(self.master.pid());
659 for replica in &self.replicas {
660 pids.push(replica.pid());
661 }
662 pids.extend_from_slice(&self.sentinel_pids);
663 pids
664 }
665
666 pub fn sentinel_addrs(&self) -> Vec<String> {
668 self.sentinel_ports
669 .iter()
670 .map(|p| format!("{}:{}", self.bind, p))
671 .collect()
672 }
673
674 pub fn master_name(&self) -> &str {
676 &self.master_name
677 }
678
679 pub async fn poke(&self) -> Result<HashMap<String, String>> {
689 self.poke_master(&self.master_name).await
690 }
691
692 pub async fn poke_master(&self, master_name: &str) -> Result<HashMap<String, String>> {
697 for port in &self.sentinel_ports {
698 let cli = self.tls.apply(
699 RedisCli::new()
700 .bin(&self.redis_cli_bin)
701 .host(&self.bind)
702 .port(*port),
703 );
704 if let Ok(raw) = cli.run(&["SENTINEL", "MASTER", master_name]).await {
705 return Ok(parse_flat_kv(&raw));
706 }
707 }
708 Err(Error::NoReachableSentinel)
709 }
710
711 pub async fn is_healthy(&self) -> bool {
713 for master in &self.monitored_masters {
714 let Ok(info) = self.poke_master(&master.name).await else {
715 return false;
716 };
717 let flags = info.get("flags").map(|s| s.as_str()).unwrap_or("");
718 let num_slaves: u64 = info
719 .get("num-slaves")
720 .and_then(|v| v.parse().ok())
721 .unwrap_or(0);
722 let num_sentinels: u64 = info
723 .get("num-other-sentinels")
724 .and_then(|v| v.parse().ok())
725 .unwrap_or(0)
726 + 1;
727 if flags != "master"
728 || num_slaves < master.expected_replicas as u64
729 || num_sentinels < self.num_sentinels as u64
730 {
731 return false;
732 }
733 }
734 true
735 }
736
737 pub async fn wait_for_healthy(&self, timeout: Duration) -> Result<()> {
739 let start = std::time::Instant::now();
740 loop {
741 if self.is_healthy().await {
742 return Ok(());
743 }
744 if start.elapsed() > timeout {
745 return Err(Error::Timeout {
746 message: "sentinel topology did not become healthy in time".into(),
747 });
748 }
749 tokio::time::sleep(Duration::from_millis(500)).await;
750 }
751 }
752
753 pub fn stop(&self) {
763 for port in &self.sentinel_ports {
765 self.tls
766 .apply(
767 RedisCli::new()
768 .bin(&self.redis_cli_bin)
769 .host(&self.bind)
770 .port(*port),
771 )
772 .shutdown();
773 }
774 std::thread::sleep(std::time::Duration::from_millis(500));
776 for pid in &self.sentinel_pids {
778 if crate::process::pid_alive(*pid) {
779 crate::process::force_kill(*pid);
780 }
781 }
782 for port in &self.sentinel_ports {
784 crate::process::kill_by_port(*port);
785 }
786 }
788}
789
790impl Drop for RedisSentinelHandle {
791 fn drop(&mut self) {
792 self.stop();
793 }
794}
795
796fn parse_flat_kv(raw: &str) -> HashMap<String, String> {
798 let lines: Vec<&str> = raw.lines().map(|l| l.trim()).collect();
799 let mut map = HashMap::new();
800 let mut i = 0;
801 while i + 1 < lines.len() {
802 map.insert(lines[i].to_string(), lines[i + 1].to_string());
803 i += 2;
804 }
805 map
806}
807
808#[cfg(test)]
809mod tests {
810 use super::*;
811
812 #[test]
813 fn builder_defaults() {
814 let b = RedisSentinel::builder();
815 assert_eq!(b.master_port, 6390);
816 assert_eq!(b.num_replicas, 2);
817 assert_eq!(b.num_sentinels, 3);
818 assert_eq!(b.quorum, 2);
819 assert!(b.logfile.is_none());
820 assert!(b.extra.is_empty());
821 assert!(b.monitored_masters.is_empty());
822 }
823
824 #[test]
825 fn builder_chain() {
826 let b = RedisSentinel::builder()
827 .master_name("custom")
828 .master_port(6500)
829 .replicas(1)
830 .sentinels(5)
831 .quorum(3)
832 .logfile("/tmp/sentinel.log")
833 .extra("maxmemory", "10mb")
834 .monitor("backup", "127.0.0.1", 6501);
835 assert_eq!(b.master_name, "custom");
836 assert_eq!(b.master_port, 6500);
837 assert_eq!(b.num_replicas, 1);
838 assert_eq!(b.num_sentinels, 5);
839 assert_eq!(b.quorum, 3);
840 assert_eq!(b.logfile.as_deref(), Some("/tmp/sentinel.log"));
841 assert_eq!(b.extra.get("maxmemory").map(String::as_str), Some("10mb"));
842 assert_eq!(b.monitored_masters.len(), 1);
843 assert_eq!(
844 b.monitored_masters[0],
845 MonitoredMaster {
846 name: "backup".into(),
847 host: "127.0.0.1".into(),
848 port: 6501,
849 expected_replicas: 0,
850 }
851 );
852 }
853
854 #[test]
855 fn parse_sentinel_output() {
856 let raw = "name\nmymaster\nip\n127.0.0.1\nport\n6380\n";
857 let map = parse_flat_kv(raw);
858 assert_eq!(map.get("name").unwrap(), "mymaster");
859 assert_eq!(map.get("ip").unwrap(), "127.0.0.1");
860 assert_eq!(map.get("port").unwrap(), "6380");
861 }
862}