1use std::collections::HashMap;
4use std::fs;
5use std::time::Duration;
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use tokio::process::Command;
9
10use crate::cli::RedisCli;
11use crate::error::{Error, Result};
12use crate::server::{RedisServer, RedisServerHandle, SavePolicy};
13
14pub struct RedisSentinelBuilder {
35 master_name: String,
36 master_port: u16,
37 num_replicas: u16,
38 replica_base_port: u16,
39 num_sentinels: u16,
40 sentinel_base_port: u16,
41 quorum: u16,
42 bind: String,
43 logfile: Option<String>,
44 save: Option<SavePolicy>,
45 appendonly: Option<bool>,
46 down_after_ms: u64,
47 failover_timeout_ms: u64,
48 extra: HashMap<String, String>,
49 redis_server_bin: String,
50 redis_cli_bin: String,
51 monitored_masters: Vec<MonitoredMaster>,
52}
53
54#[derive(Clone, Debug, PartialEq, Eq)]
55struct MonitoredMaster {
56 name: String,
57 host: String,
58 port: u16,
59 expected_replicas: u16,
60}
61
62impl RedisSentinelBuilder {
63 pub fn master_name(mut self, name: impl Into<String>) -> Self {
65 self.master_name = name.into();
66 self
67 }
68
69 pub fn master_port(mut self, port: u16) -> Self {
71 self.master_port = port;
72 self
73 }
74
75 pub fn replicas(mut self, n: u16) -> Self {
77 self.num_replicas = n;
78 self
79 }
80
81 pub fn replica_base_port(mut self, port: u16) -> Self {
85 self.replica_base_port = port;
86 self
87 }
88
89 pub fn sentinels(mut self, n: u16) -> Self {
91 self.num_sentinels = n;
92 self
93 }
94
95 pub fn sentinel_base_port(mut self, port: u16) -> Self {
99 self.sentinel_base_port = port;
100 self
101 }
102
103 pub fn quorum(mut self, q: u16) -> Self {
105 self.quorum = q;
106 self
107 }
108
109 pub fn bind(mut self, bind: impl Into<String>) -> Self {
111 self.bind = bind.into();
112 self
113 }
114
115 pub fn logfile(mut self, path: impl Into<String>) -> Self {
117 self.logfile = Some(path.into());
118 self
119 }
120
121 pub fn down_after_ms(mut self, ms: u64) -> Self {
125 self.down_after_ms = ms;
126 self
127 }
128
129 pub fn failover_timeout_ms(mut self, ms: u64) -> Self {
131 self.failover_timeout_ms = ms;
132 self
133 }
134
135 pub fn save(mut self, save: bool) -> Self {
140 self.save = Some(if save {
141 SavePolicy::Default
142 } else {
143 SavePolicy::Disabled
144 });
145 self
146 }
147
148 pub fn save_schedule(mut self, schedule: Vec<(u64, u64)>) -> Self {
150 self.save = Some(SavePolicy::Custom(schedule));
151 self
152 }
153
154 pub fn appendonly(mut self, appendonly: bool) -> Self {
159 self.appendonly = Some(appendonly);
160 self
161 }
162
163 pub fn extra(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
165 self.extra.insert(key.into(), value.into());
166 self
167 }
168
169 pub fn redis_server_bin(mut self, bin: impl Into<String>) -> Self {
171 self.redis_server_bin = bin.into();
172 self
173 }
174
175 pub fn redis_cli_bin(mut self, bin: impl Into<String>) -> Self {
177 self.redis_cli_bin = bin.into();
178 self
179 }
180
181 pub fn monitor(mut self, name: impl Into<String>, host: impl Into<String>, port: u16) -> Self {
187 self.monitored_masters.push(MonitoredMaster {
188 name: name.into(),
189 host: host.into(),
190 port,
191 expected_replicas: 0,
192 });
193 self
194 }
195
196 pub fn monitor_with_replicas(
198 mut self,
199 name: impl Into<String>,
200 host: impl Into<String>,
201 port: u16,
202 expected_replicas: u16,
203 ) -> Self {
204 self.monitored_masters.push(MonitoredMaster {
205 name: name.into(),
206 host: host.into(),
207 port,
208 expected_replicas,
209 });
210 self
211 }
212
213 fn replica_ports(&self) -> impl Iterator<Item = u16> {
214 let base = self.replica_base_port;
215 let n = self.num_replicas;
216 (0..n).map(move |i| base + i)
217 }
218
219 fn sentinel_ports(&self) -> impl Iterator<Item = u16> {
220 let base = self.sentinel_base_port;
221 let n = self.num_sentinels;
222 (0..n).map(move |i| base + i)
223 }
224
225 pub async fn start(self) -> Result<RedisSentinelHandle> {
227 let mut monitored_masters = Vec::with_capacity(1 + self.monitored_masters.len());
228 monitored_masters.push(MonitoredMaster {
229 name: self.master_name.clone(),
230 host: self.bind.clone(),
231 port: self.master_port,
232 expected_replicas: self.num_replicas,
233 });
234 monitored_masters.extend(self.monitored_masters.iter().cloned());
235
236 let cli_for_shutdown = |port: u16| {
238 RedisCli::new()
239 .bin(&self.redis_cli_bin)
240 .host(&self.bind)
241 .port(port)
242 .shutdown();
243 };
244 cli_for_shutdown(self.master_port);
245 for port in self.replica_ports() {
246 cli_for_shutdown(port);
247 }
248 for port in self.sentinel_ports() {
249 cli_for_shutdown(port);
250 }
251 tokio::time::sleep(Duration::from_millis(500)).await;
252
253 let unique = SystemTime::now()
254 .duration_since(UNIX_EPOCH)
255 .map(|duration| duration.as_nanos())
256 .unwrap_or(0);
257 let base_dir = std::env::temp_dir().join(format!(
258 "redis-sentinel-wrapper-{}-{}",
259 std::process::id(),
260 unique
261 ));
262 fs::create_dir_all(&base_dir)?;
263
264 let appendonly = self.appendonly.unwrap_or(true);
266 let mut master = RedisServer::new()
267 .port(self.master_port)
268 .bind(&self.bind)
269 .dir(base_dir.join("master"))
270 .appendonly(appendonly)
271 .redis_server_bin(&self.redis_server_bin)
272 .redis_cli_bin(&self.redis_cli_bin);
273 if let Some(ref logfile) = self.logfile {
274 master = master.logfile(logfile.clone());
275 }
276 if let Some(ref save) = self.save {
277 match save {
278 SavePolicy::Disabled => master = master.save(false),
279 SavePolicy::Default => master = master.save(true),
280 SavePolicy::Custom(pairs) => master = master.save_schedule(pairs.clone()),
281 }
282 }
283 for (key, value) in &self.extra {
284 master = master.extra(key.clone(), value.clone());
285 }
286 let master = master.start().await?;
287
288 let mut replicas = Vec::new();
290 for port in self.replica_ports() {
291 let mut replica = RedisServer::new()
292 .port(port)
293 .bind(&self.bind)
294 .dir(base_dir.join(format!("replica-{port}")))
295 .appendonly(appendonly)
296 .replicaof(self.bind.clone(), self.master_port)
297 .redis_server_bin(&self.redis_server_bin)
298 .redis_cli_bin(&self.redis_cli_bin);
299 if let Some(ref logfile) = self.logfile {
300 replica = replica.logfile(logfile.clone());
301 }
302 if let Some(ref save) = self.save {
303 match save {
304 SavePolicy::Disabled => replica = replica.save(false),
305 SavePolicy::Default => replica = replica.save(true),
306 SavePolicy::Custom(pairs) => {
307 replica = replica.save_schedule(pairs.clone());
308 }
309 }
310 }
311 for (key, value) in &self.extra {
312 replica = replica.extra(key.clone(), value.clone());
313 }
314 let replica = replica.start().await?;
315 replicas.push(replica);
316 }
317
318 tokio::time::sleep(Duration::from_secs(1)).await;
320
321 let mut sentinel_handles = Vec::new();
323 for port in self.sentinel_ports() {
324 let dir = base_dir.join(format!("sentinel-{port}"));
325 fs::create_dir_all(&dir)?;
326 let conf_path = dir.join("sentinel.conf");
327 let logfile = self
328 .logfile
329 .as_deref()
330 .map(str::to_owned)
331 .unwrap_or_else(|| format!("{}/sentinel.log", dir.display()));
332 let mut conf = format!(
333 "port {port}\n\
334 bind {bind}\n\
335 daemonize yes\n\
336 pidfile {dir}/sentinel.pid\n\
337 logfile {logfile}\n\
338 dir {dir}\n",
339 port = port,
340 bind = self.bind,
341 dir = dir.display(),
342 logfile = logfile,
343 );
344 for master in &monitored_masters {
345 conf.push_str(&format!(
346 "sentinel monitor {name} {host} {master_port} {quorum}\n\
347 sentinel down-after-milliseconds {name} {down_after}\n\
348 sentinel failover-timeout {name} {failover_timeout}\n\
349 sentinel parallel-syncs {name} 1\n",
350 name = master.name,
351 host = master.host,
352 master_port = master.port,
353 quorum = self.quorum,
354 down_after = self.down_after_ms,
355 failover_timeout = self.failover_timeout_ms,
356 ));
357 }
358 for (key, value) in &self.extra {
359 conf.push_str(&format!("{key} {value}\n"));
360 }
361 fs::write(&conf_path, conf)?;
362
363 let status = Command::new(&self.redis_server_bin)
364 .arg(&conf_path)
365 .arg("--sentinel")
366 .stdout(std::process::Stdio::null())
367 .stderr(std::process::Stdio::null())
368 .status()
369 .await?;
370
371 if !status.success() {
372 return Err(Error::SentinelStart { port });
373 }
374
375 let cli = RedisCli::new()
376 .bin(&self.redis_cli_bin)
377 .host(&self.bind)
378 .port(port);
379 cli.wait_for_ready(Duration::from_secs(10)).await?;
380
381 let pid_path = dir.join("sentinel.pid");
382 let pid: u32 = fs::read_to_string(&pid_path)?
383 .trim()
384 .parse()
385 .map_err(|_| Error::SentinelStart { port })?;
386
387 sentinel_handles.push((port, pid, cli));
388 }
389
390 tokio::time::sleep(Duration::from_secs(2)).await;
392
393 Ok(RedisSentinelHandle {
394 master,
395 replicas,
396 sentinel_ports: sentinel_handles.iter().map(|(p, _, _)| *p).collect(),
397 sentinel_pids: sentinel_handles.iter().map(|(_, pid, _)| *pid).collect(),
398 master_name: self.master_name,
399 bind: self.bind,
400 redis_cli_bin: self.redis_cli_bin,
401 num_sentinels: self.num_sentinels,
402 monitored_masters,
403 })
404 }
405}
406
407pub struct RedisSentinelHandle {
409 master: RedisServerHandle,
410 #[allow(dead_code)] replicas: Vec<RedisServerHandle>,
412 sentinel_ports: Vec<u16>,
413 sentinel_pids: Vec<u32>,
414 master_name: String,
415 bind: String,
416 redis_cli_bin: String,
417 num_sentinels: u16,
418 monitored_masters: Vec<MonitoredMaster>,
419}
420
421pub struct RedisSentinel;
426
427impl RedisSentinel {
428 pub fn builder() -> RedisSentinelBuilder {
430 RedisSentinelBuilder {
431 master_name: "mymaster".into(),
432 master_port: 6390,
433 num_replicas: 2,
434 replica_base_port: 6391,
435 num_sentinels: 3,
436 sentinel_base_port: 26389,
437 quorum: 2,
438 bind: "127.0.0.1".into(),
439 logfile: None,
440 save: None,
441 appendonly: None,
442 down_after_ms: 5000,
443 failover_timeout_ms: 10000,
444 extra: HashMap::new(),
445 redis_server_bin: "redis-server".into(),
446 redis_cli_bin: "redis-cli".into(),
447 monitored_masters: Vec::new(),
448 }
449 }
450}
451
452impl RedisSentinelHandle {
453 pub fn master_addr(&self) -> String {
455 self.master.addr()
456 }
457
458 pub fn monitored_master_names(&self) -> Vec<&str> {
460 self.monitored_masters
461 .iter()
462 .map(|master| master.name.as_str())
463 .collect()
464 }
465
466 pub fn monitored_master_addrs(&self) -> Vec<String> {
468 self.monitored_masters
469 .iter()
470 .map(|master| format!("{}:{}", master.host, master.port))
471 .collect()
472 }
473
474 pub fn pids(&self) -> Vec<u32> {
476 let mut pids = Vec::with_capacity(1 + self.replicas.len() + self.sentinel_pids.len());
477 pids.push(self.master.pid());
478 for replica in &self.replicas {
479 pids.push(replica.pid());
480 }
481 pids.extend_from_slice(&self.sentinel_pids);
482 pids
483 }
484
485 pub fn sentinel_addrs(&self) -> Vec<String> {
487 self.sentinel_ports
488 .iter()
489 .map(|p| format!("{}:{}", self.bind, p))
490 .collect()
491 }
492
493 pub fn master_name(&self) -> &str {
495 &self.master_name
496 }
497
498 pub async fn poke(&self) -> Result<HashMap<String, String>> {
508 self.poke_master(&self.master_name).await
509 }
510
511 pub async fn poke_master(&self, master_name: &str) -> Result<HashMap<String, String>> {
516 for port in &self.sentinel_ports {
517 let cli = RedisCli::new()
518 .bin(&self.redis_cli_bin)
519 .host(&self.bind)
520 .port(*port);
521 if let Ok(raw) = cli.run(&["SENTINEL", "MASTER", master_name]).await {
522 return Ok(parse_flat_kv(&raw));
523 }
524 }
525 Err(Error::NoReachableSentinel)
526 }
527
528 pub async fn is_healthy(&self) -> bool {
530 for master in &self.monitored_masters {
531 let Ok(info) = self.poke_master(&master.name).await else {
532 return false;
533 };
534 let flags = info.get("flags").map(|s| s.as_str()).unwrap_or("");
535 let num_slaves: u64 = info
536 .get("num-slaves")
537 .and_then(|v| v.parse().ok())
538 .unwrap_or(0);
539 let num_sentinels: u64 = info
540 .get("num-other-sentinels")
541 .and_then(|v| v.parse().ok())
542 .unwrap_or(0)
543 + 1;
544 if flags != "master"
545 || num_slaves < master.expected_replicas as u64
546 || num_sentinels < self.num_sentinels as u64
547 {
548 return false;
549 }
550 }
551 true
552 }
553
554 pub async fn wait_for_healthy(&self, timeout: Duration) -> Result<()> {
556 let start = std::time::Instant::now();
557 loop {
558 if self.is_healthy().await {
559 return Ok(());
560 }
561 if start.elapsed() > timeout {
562 return Err(Error::Timeout {
563 message: "sentinel topology did not become healthy in time".into(),
564 });
565 }
566 tokio::time::sleep(Duration::from_millis(500)).await;
567 }
568 }
569
570 pub fn stop(&self) {
572 for port in &self.sentinel_ports {
574 RedisCli::new()
575 .bin(&self.redis_cli_bin)
576 .host(&self.bind)
577 .port(*port)
578 .shutdown();
579 }
580 }
582}
583
584impl Drop for RedisSentinelHandle {
585 fn drop(&mut self) {
586 self.stop();
587 }
588}
589
590fn parse_flat_kv(raw: &str) -> HashMap<String, String> {
592 let lines: Vec<&str> = raw.lines().map(|l| l.trim()).collect();
593 let mut map = HashMap::new();
594 let mut i = 0;
595 while i + 1 < lines.len() {
596 map.insert(lines[i].to_string(), lines[i + 1].to_string());
597 i += 2;
598 }
599 map
600}
601
602#[cfg(test)]
603mod tests {
604 use super::*;
605
606 #[test]
607 fn builder_defaults() {
608 let b = RedisSentinel::builder();
609 assert_eq!(b.master_port, 6390);
610 assert_eq!(b.num_replicas, 2);
611 assert_eq!(b.num_sentinels, 3);
612 assert_eq!(b.quorum, 2);
613 assert!(b.logfile.is_none());
614 assert!(b.extra.is_empty());
615 assert!(b.monitored_masters.is_empty());
616 }
617
618 #[test]
619 fn builder_chain() {
620 let b = RedisSentinel::builder()
621 .master_name("custom")
622 .master_port(6500)
623 .replicas(1)
624 .sentinels(5)
625 .quorum(3)
626 .logfile("/tmp/sentinel.log")
627 .extra("maxmemory", "10mb")
628 .monitor("backup", "127.0.0.1", 6501);
629 assert_eq!(b.master_name, "custom");
630 assert_eq!(b.master_port, 6500);
631 assert_eq!(b.num_replicas, 1);
632 assert_eq!(b.num_sentinels, 5);
633 assert_eq!(b.quorum, 3);
634 assert_eq!(b.logfile.as_deref(), Some("/tmp/sentinel.log"));
635 assert_eq!(b.extra.get("maxmemory").map(String::as_str), Some("10mb"));
636 assert_eq!(b.monitored_masters.len(), 1);
637 assert_eq!(
638 b.monitored_masters[0],
639 MonitoredMaster {
640 name: "backup".into(),
641 host: "127.0.0.1".into(),
642 port: 6501,
643 expected_replicas: 0,
644 }
645 );
646 }
647
648 #[test]
649 fn parse_sentinel_output() {
650 let raw = "name\nmymaster\nip\n127.0.0.1\nport\n6380\n";
651 let map = parse_flat_kv(raw);
652 assert_eq!(map.get("name").unwrap(), "mymaster");
653 assert_eq!(map.get("ip").unwrap(), "127.0.0.1");
654 assert_eq!(map.get("port").unwrap(), "6380");
655 }
656}