1use std::collections::HashMap;
4use std::fs;
5use std::time::Duration;
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use tokio::process::Command;
9
10use crate::cli::RedisCli;
11use crate::error::{Error, Result};
12use crate::server::{RedisServer, RedisServerHandle};
13
14pub struct RedisSentinelBuilder {
35 master_name: String,
36 master_port: u16,
37 num_replicas: u16,
38 replica_base_port: u16,
39 num_sentinels: u16,
40 sentinel_base_port: u16,
41 quorum: u16,
42 bind: String,
43 logfile: Option<String>,
44 down_after_ms: u64,
45 failover_timeout_ms: u64,
46 extra: HashMap<String, String>,
47 redis_server_bin: String,
48 redis_cli_bin: String,
49 monitored_masters: Vec<MonitoredMaster>,
50}
51
52#[derive(Clone, Debug, PartialEq, Eq)]
53struct MonitoredMaster {
54 name: String,
55 host: String,
56 port: u16,
57 expected_replicas: u16,
58}
59
60impl RedisSentinelBuilder {
61 pub fn master_name(mut self, name: impl Into<String>) -> Self {
63 self.master_name = name.into();
64 self
65 }
66
67 pub fn master_port(mut self, port: u16) -> Self {
69 self.master_port = port;
70 self
71 }
72
73 pub fn replicas(mut self, n: u16) -> Self {
75 self.num_replicas = n;
76 self
77 }
78
79 pub fn replica_base_port(mut self, port: u16) -> Self {
83 self.replica_base_port = port;
84 self
85 }
86
87 pub fn sentinels(mut self, n: u16) -> Self {
89 self.num_sentinels = n;
90 self
91 }
92
93 pub fn sentinel_base_port(mut self, port: u16) -> Self {
97 self.sentinel_base_port = port;
98 self
99 }
100
101 pub fn quorum(mut self, q: u16) -> Self {
103 self.quorum = q;
104 self
105 }
106
107 pub fn bind(mut self, bind: impl Into<String>) -> Self {
109 self.bind = bind.into();
110 self
111 }
112
113 pub fn logfile(mut self, path: impl Into<String>) -> Self {
115 self.logfile = Some(path.into());
116 self
117 }
118
119 pub fn down_after_ms(mut self, ms: u64) -> Self {
123 self.down_after_ms = ms;
124 self
125 }
126
127 pub fn failover_timeout_ms(mut self, ms: u64) -> Self {
129 self.failover_timeout_ms = ms;
130 self
131 }
132
133 pub fn extra(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
135 self.extra.insert(key.into(), value.into());
136 self
137 }
138
139 pub fn redis_server_bin(mut self, bin: impl Into<String>) -> Self {
141 self.redis_server_bin = bin.into();
142 self
143 }
144
145 pub fn redis_cli_bin(mut self, bin: impl Into<String>) -> Self {
147 self.redis_cli_bin = bin.into();
148 self
149 }
150
151 pub fn monitor(mut self, name: impl Into<String>, host: impl Into<String>, port: u16) -> Self {
157 self.monitored_masters.push(MonitoredMaster {
158 name: name.into(),
159 host: host.into(),
160 port,
161 expected_replicas: 0,
162 });
163 self
164 }
165
166 pub fn monitor_with_replicas(
168 mut self,
169 name: impl Into<String>,
170 host: impl Into<String>,
171 port: u16,
172 expected_replicas: u16,
173 ) -> Self {
174 self.monitored_masters.push(MonitoredMaster {
175 name: name.into(),
176 host: host.into(),
177 port,
178 expected_replicas,
179 });
180 self
181 }
182
183 fn replica_ports(&self) -> impl Iterator<Item = u16> {
184 let base = self.replica_base_port;
185 let n = self.num_replicas;
186 (0..n).map(move |i| base + i)
187 }
188
189 fn sentinel_ports(&self) -> impl Iterator<Item = u16> {
190 let base = self.sentinel_base_port;
191 let n = self.num_sentinels;
192 (0..n).map(move |i| base + i)
193 }
194
195 pub async fn start(self) -> Result<RedisSentinelHandle> {
197 let mut monitored_masters = Vec::with_capacity(1 + self.monitored_masters.len());
198 monitored_masters.push(MonitoredMaster {
199 name: self.master_name.clone(),
200 host: self.bind.clone(),
201 port: self.master_port,
202 expected_replicas: self.num_replicas,
203 });
204 monitored_masters.extend(self.monitored_masters.iter().cloned());
205
206 let cli_for_shutdown = |port: u16| {
208 RedisCli::new()
209 .bin(&self.redis_cli_bin)
210 .host(&self.bind)
211 .port(port)
212 .shutdown();
213 };
214 cli_for_shutdown(self.master_port);
215 for port in self.replica_ports() {
216 cli_for_shutdown(port);
217 }
218 for port in self.sentinel_ports() {
219 cli_for_shutdown(port);
220 }
221 tokio::time::sleep(Duration::from_millis(500)).await;
222
223 let unique = SystemTime::now()
224 .duration_since(UNIX_EPOCH)
225 .map(|duration| duration.as_nanos())
226 .unwrap_or(0);
227 let base_dir = std::env::temp_dir().join(format!(
228 "redis-sentinel-wrapper-{}-{}",
229 std::process::id(),
230 unique
231 ));
232 fs::create_dir_all(&base_dir)?;
233
234 let mut master = RedisServer::new()
236 .port(self.master_port)
237 .bind(&self.bind)
238 .dir(base_dir.join("master"))
239 .appendonly(true)
240 .redis_server_bin(&self.redis_server_bin)
241 .redis_cli_bin(&self.redis_cli_bin);
242 if let Some(ref logfile) = self.logfile {
243 master = master.logfile(logfile.clone());
244 }
245 for (key, value) in &self.extra {
246 master = master.extra(key.clone(), value.clone());
247 }
248 let master = master.start().await?;
249
250 let mut replicas = Vec::new();
252 for port in self.replica_ports() {
253 let mut replica = RedisServer::new()
254 .port(port)
255 .bind(&self.bind)
256 .dir(base_dir.join(format!("replica-{port}")))
257 .appendonly(true)
258 .replicaof(self.bind.clone(), self.master_port)
259 .redis_server_bin(&self.redis_server_bin)
260 .redis_cli_bin(&self.redis_cli_bin);
261 if let Some(ref logfile) = self.logfile {
262 replica = replica.logfile(logfile.clone());
263 }
264 for (key, value) in &self.extra {
265 replica = replica.extra(key.clone(), value.clone());
266 }
267 let replica = replica.start().await?;
268 replicas.push(replica);
269 }
270
271 tokio::time::sleep(Duration::from_secs(1)).await;
273
274 let mut sentinel_handles = Vec::new();
276 for port in self.sentinel_ports() {
277 let dir = base_dir.join(format!("sentinel-{port}"));
278 fs::create_dir_all(&dir)?;
279 let conf_path = dir.join("sentinel.conf");
280 let logfile = self
281 .logfile
282 .as_deref()
283 .map(str::to_owned)
284 .unwrap_or_else(|| format!("{}/sentinel.log", dir.display()));
285 let mut conf = format!(
286 "port {port}\n\
287 bind {bind}\n\
288 daemonize yes\n\
289 pidfile {dir}/sentinel.pid\n\
290 logfile {logfile}\n\
291 dir {dir}\n",
292 port = port,
293 bind = self.bind,
294 dir = dir.display(),
295 logfile = logfile,
296 );
297 for master in &monitored_masters {
298 conf.push_str(&format!(
299 "sentinel monitor {name} {host} {master_port} {quorum}\n\
300 sentinel down-after-milliseconds {name} {down_after}\n\
301 sentinel failover-timeout {name} {failover_timeout}\n\
302 sentinel parallel-syncs {name} 1\n",
303 name = master.name,
304 host = master.host,
305 master_port = master.port,
306 quorum = self.quorum,
307 down_after = self.down_after_ms,
308 failover_timeout = self.failover_timeout_ms,
309 ));
310 }
311 for (key, value) in &self.extra {
312 conf.push_str(&format!("{key} {value}\n"));
313 }
314 fs::write(&conf_path, conf)?;
315
316 let status = Command::new(&self.redis_server_bin)
317 .arg(&conf_path)
318 .arg("--sentinel")
319 .stdout(std::process::Stdio::null())
320 .stderr(std::process::Stdio::null())
321 .status()
322 .await?;
323
324 if !status.success() {
325 return Err(Error::SentinelStart { port });
326 }
327
328 let cli = RedisCli::new()
329 .bin(&self.redis_cli_bin)
330 .host(&self.bind)
331 .port(port);
332 cli.wait_for_ready(Duration::from_secs(10)).await?;
333
334 let pid_path = dir.join("sentinel.pid");
335 let pid: u32 = fs::read_to_string(&pid_path)?
336 .trim()
337 .parse()
338 .map_err(|_| Error::SentinelStart { port })?;
339
340 sentinel_handles.push((port, pid, cli));
341 }
342
343 tokio::time::sleep(Duration::from_secs(2)).await;
345
346 Ok(RedisSentinelHandle {
347 master,
348 replicas,
349 sentinel_ports: sentinel_handles.iter().map(|(p, _, _)| *p).collect(),
350 sentinel_pids: sentinel_handles.iter().map(|(_, pid, _)| *pid).collect(),
351 master_name: self.master_name,
352 bind: self.bind,
353 redis_cli_bin: self.redis_cli_bin,
354 num_sentinels: self.num_sentinels,
355 monitored_masters,
356 })
357 }
358}
359
360pub struct RedisSentinelHandle {
362 master: RedisServerHandle,
363 #[allow(dead_code)] replicas: Vec<RedisServerHandle>,
365 sentinel_ports: Vec<u16>,
366 sentinel_pids: Vec<u32>,
367 master_name: String,
368 bind: String,
369 redis_cli_bin: String,
370 num_sentinels: u16,
371 monitored_masters: Vec<MonitoredMaster>,
372}
373
374pub struct RedisSentinel;
379
380impl RedisSentinel {
381 pub fn builder() -> RedisSentinelBuilder {
383 RedisSentinelBuilder {
384 master_name: "mymaster".into(),
385 master_port: 6390,
386 num_replicas: 2,
387 replica_base_port: 6391,
388 num_sentinels: 3,
389 sentinel_base_port: 26389,
390 quorum: 2,
391 bind: "127.0.0.1".into(),
392 logfile: None,
393 down_after_ms: 5000,
394 failover_timeout_ms: 10000,
395 extra: HashMap::new(),
396 redis_server_bin: "redis-server".into(),
397 redis_cli_bin: "redis-cli".into(),
398 monitored_masters: Vec::new(),
399 }
400 }
401}
402
403impl RedisSentinelHandle {
404 pub fn master_addr(&self) -> String {
406 self.master.addr()
407 }
408
409 pub fn monitored_master_names(&self) -> Vec<&str> {
411 self.monitored_masters
412 .iter()
413 .map(|master| master.name.as_str())
414 .collect()
415 }
416
417 pub fn monitored_master_addrs(&self) -> Vec<String> {
419 self.monitored_masters
420 .iter()
421 .map(|master| format!("{}:{}", master.host, master.port))
422 .collect()
423 }
424
425 pub fn pids(&self) -> Vec<u32> {
427 let mut pids = Vec::with_capacity(1 + self.replicas.len() + self.sentinel_pids.len());
428 pids.push(self.master.pid());
429 for replica in &self.replicas {
430 pids.push(replica.pid());
431 }
432 pids.extend_from_slice(&self.sentinel_pids);
433 pids
434 }
435
436 pub fn sentinel_addrs(&self) -> Vec<String> {
438 self.sentinel_ports
439 .iter()
440 .map(|p| format!("{}:{}", self.bind, p))
441 .collect()
442 }
443
444 pub fn master_name(&self) -> &str {
446 &self.master_name
447 }
448
449 pub async fn poke(&self) -> Result<HashMap<String, String>> {
459 self.poke_master(&self.master_name).await
460 }
461
462 pub async fn poke_master(&self, master_name: &str) -> Result<HashMap<String, String>> {
467 for port in &self.sentinel_ports {
468 let cli = RedisCli::new()
469 .bin(&self.redis_cli_bin)
470 .host(&self.bind)
471 .port(*port);
472 if let Ok(raw) = cli.run(&["SENTINEL", "MASTER", master_name]).await {
473 return Ok(parse_flat_kv(&raw));
474 }
475 }
476 Err(Error::NoReachableSentinel)
477 }
478
479 pub async fn is_healthy(&self) -> bool {
481 for master in &self.monitored_masters {
482 let Ok(info) = self.poke_master(&master.name).await else {
483 return false;
484 };
485 let flags = info.get("flags").map(|s| s.as_str()).unwrap_or("");
486 let num_slaves: u64 = info
487 .get("num-slaves")
488 .and_then(|v| v.parse().ok())
489 .unwrap_or(0);
490 let num_sentinels: u64 = info
491 .get("num-other-sentinels")
492 .and_then(|v| v.parse().ok())
493 .unwrap_or(0)
494 + 1;
495 if flags != "master"
496 || num_slaves < master.expected_replicas as u64
497 || num_sentinels < self.num_sentinels as u64
498 {
499 return false;
500 }
501 }
502 true
503 }
504
505 pub async fn wait_for_healthy(&self, timeout: Duration) -> Result<()> {
507 let start = std::time::Instant::now();
508 loop {
509 if self.is_healthy().await {
510 return Ok(());
511 }
512 if start.elapsed() > timeout {
513 return Err(Error::Timeout {
514 message: "sentinel topology did not become healthy in time".into(),
515 });
516 }
517 tokio::time::sleep(Duration::from_millis(500)).await;
518 }
519 }
520
521 pub fn stop(&self) {
523 for port in &self.sentinel_ports {
525 RedisCli::new()
526 .bin(&self.redis_cli_bin)
527 .host(&self.bind)
528 .port(*port)
529 .shutdown();
530 }
531 }
533}
534
535impl Drop for RedisSentinelHandle {
536 fn drop(&mut self) {
537 self.stop();
538 }
539}
540
541fn parse_flat_kv(raw: &str) -> HashMap<String, String> {
543 let lines: Vec<&str> = raw.lines().map(|l| l.trim()).collect();
544 let mut map = HashMap::new();
545 let mut i = 0;
546 while i + 1 < lines.len() {
547 map.insert(lines[i].to_string(), lines[i + 1].to_string());
548 i += 2;
549 }
550 map
551}
552
553#[cfg(test)]
554mod tests {
555 use super::*;
556
557 #[test]
558 fn builder_defaults() {
559 let b = RedisSentinel::builder();
560 assert_eq!(b.master_port, 6390);
561 assert_eq!(b.num_replicas, 2);
562 assert_eq!(b.num_sentinels, 3);
563 assert_eq!(b.quorum, 2);
564 assert!(b.logfile.is_none());
565 assert!(b.extra.is_empty());
566 assert!(b.monitored_masters.is_empty());
567 }
568
569 #[test]
570 fn builder_chain() {
571 let b = RedisSentinel::builder()
572 .master_name("custom")
573 .master_port(6500)
574 .replicas(1)
575 .sentinels(5)
576 .quorum(3)
577 .logfile("/tmp/sentinel.log")
578 .extra("maxmemory", "10mb")
579 .monitor("backup", "127.0.0.1", 6501);
580 assert_eq!(b.master_name, "custom");
581 assert_eq!(b.master_port, 6500);
582 assert_eq!(b.num_replicas, 1);
583 assert_eq!(b.num_sentinels, 5);
584 assert_eq!(b.quorum, 3);
585 assert_eq!(b.logfile.as_deref(), Some("/tmp/sentinel.log"));
586 assert_eq!(b.extra.get("maxmemory").map(String::as_str), Some("10mb"));
587 assert_eq!(b.monitored_masters.len(), 1);
588 assert_eq!(
589 b.monitored_masters[0],
590 MonitoredMaster {
591 name: "backup".into(),
592 host: "127.0.0.1".into(),
593 port: 6501,
594 expected_replicas: 0,
595 }
596 );
597 }
598
599 #[test]
600 fn parse_sentinel_output() {
601 let raw = "name\nmymaster\nip\n127.0.0.1\nport\n6380\n";
602 let map = parse_flat_kv(raw);
603 assert_eq!(map.get("name").unwrap(), "mymaster");
604 assert_eq!(map.get("ip").unwrap(), "127.0.0.1");
605 assert_eq!(map.get("port").unwrap(), "6380");
606 }
607}