1use std::collections::HashMap;
4use std::fs;
5use std::time::Duration;
6
7use tokio::process::Command;
8
9use crate::cli::RedisCli;
10use crate::error::{Error, Result};
11use crate::server::{RedisServer, RedisServerHandle};
12
13pub struct RedisSentinelBuilder {
34 master_name: String,
35 master_port: u16,
36 num_replicas: u16,
37 replica_base_port: u16,
38 num_sentinels: u16,
39 sentinel_base_port: u16,
40 quorum: u16,
41 bind: String,
42 down_after_ms: u64,
43 failover_timeout_ms: u64,
44 redis_server_bin: String,
45 redis_cli_bin: String,
46}
47
48impl RedisSentinelBuilder {
49 pub fn master_name(mut self, name: impl Into<String>) -> Self {
50 self.master_name = name.into();
51 self
52 }
53
54 pub fn master_port(mut self, port: u16) -> Self {
55 self.master_port = port;
56 self
57 }
58
59 pub fn replicas(mut self, n: u16) -> Self {
60 self.num_replicas = n;
61 self
62 }
63
64 pub fn replica_base_port(mut self, port: u16) -> Self {
65 self.replica_base_port = port;
66 self
67 }
68
69 pub fn sentinels(mut self, n: u16) -> Self {
70 self.num_sentinels = n;
71 self
72 }
73
74 pub fn sentinel_base_port(mut self, port: u16) -> Self {
75 self.sentinel_base_port = port;
76 self
77 }
78
79 pub fn quorum(mut self, q: u16) -> Self {
80 self.quorum = q;
81 self
82 }
83
84 pub fn bind(mut self, bind: impl Into<String>) -> Self {
85 self.bind = bind.into();
86 self
87 }
88
89 pub fn down_after_ms(mut self, ms: u64) -> Self {
90 self.down_after_ms = ms;
91 self
92 }
93
94 pub fn failover_timeout_ms(mut self, ms: u64) -> Self {
95 self.failover_timeout_ms = ms;
96 self
97 }
98
99 pub fn redis_server_bin(mut self, bin: impl Into<String>) -> Self {
100 self.redis_server_bin = bin.into();
101 self
102 }
103
104 pub fn redis_cli_bin(mut self, bin: impl Into<String>) -> Self {
105 self.redis_cli_bin = bin.into();
106 self
107 }
108
109 fn replica_ports(&self) -> impl Iterator<Item = u16> {
110 let base = self.replica_base_port;
111 let n = self.num_replicas;
112 (0..n).map(move |i| base + i)
113 }
114
115 fn sentinel_ports(&self) -> impl Iterator<Item = u16> {
116 let base = self.sentinel_base_port;
117 let n = self.num_sentinels;
118 (0..n).map(move |i| base + i)
119 }
120
121 pub async fn start(self) -> Result<RedisSentinelHandle> {
123 let cli_for_shutdown = |port: u16| {
125 RedisCli::new()
126 .bin(&self.redis_cli_bin)
127 .host(&self.bind)
128 .port(port)
129 .shutdown();
130 };
131 cli_for_shutdown(self.master_port);
132 for port in self.replica_ports() {
133 cli_for_shutdown(port);
134 }
135 for port in self.sentinel_ports() {
136 cli_for_shutdown(port);
137 }
138 tokio::time::sleep(Duration::from_millis(500)).await;
139
140 let base_dir = std::env::temp_dir().join("redis-sentinel-wrapper");
141 if base_dir.exists() {
142 let _ = fs::remove_dir_all(&base_dir);
143 }
144
145 let master = RedisServer::new()
147 .port(self.master_port)
148 .bind(&self.bind)
149 .dir(base_dir.join("master"))
150 .appendonly(true)
151 .redis_server_bin(&self.redis_server_bin)
152 .redis_cli_bin(&self.redis_cli_bin)
153 .start()
154 .await?;
155
156 let mut replicas = Vec::new();
158 for port in self.replica_ports() {
159 let replica = RedisServer::new()
160 .port(port)
161 .bind(&self.bind)
162 .dir(base_dir.join(format!("replica-{port}")))
163 .appendonly(true)
164 .extra("replicaof", format!("{} {}", self.bind, self.master_port))
165 .redis_server_bin(&self.redis_server_bin)
166 .redis_cli_bin(&self.redis_cli_bin)
167 .start()
168 .await?;
169 replicas.push(replica);
170 }
171
172 tokio::time::sleep(Duration::from_secs(1)).await;
174
175 let mut sentinel_handles = Vec::new();
177 for port in self.sentinel_ports() {
178 let dir = base_dir.join(format!("sentinel-{port}"));
179 fs::create_dir_all(&dir)?;
180 let conf_path = dir.join("sentinel.conf");
181 let conf = format!(
182 "port {port}\n\
183 bind {bind}\n\
184 daemonize yes\n\
185 pidfile {dir}/sentinel.pid\n\
186 logfile {dir}/sentinel.log\n\
187 dir {dir}\n\
188 sentinel monitor {name} {master_host} {master_port} {quorum}\n\
189 sentinel down-after-milliseconds {name} {down_after}\n\
190 sentinel failover-timeout {name} {failover_timeout}\n\
191 sentinel parallel-syncs {name} 1\n",
192 port = port,
193 bind = self.bind,
194 dir = dir.display(),
195 name = self.master_name,
196 master_host = self.bind,
197 master_port = self.master_port,
198 quorum = self.quorum,
199 down_after = self.down_after_ms,
200 failover_timeout = self.failover_timeout_ms,
201 );
202 fs::write(&conf_path, conf)?;
203
204 let status = Command::new(&self.redis_server_bin)
205 .arg(&conf_path)
206 .arg("--sentinel")
207 .stdout(std::process::Stdio::null())
208 .stderr(std::process::Stdio::null())
209 .status()
210 .await?;
211
212 if !status.success() {
213 return Err(Error::SentinelStart { port });
214 }
215
216 let cli = RedisCli::new()
217 .bin(&self.redis_cli_bin)
218 .host(&self.bind)
219 .port(port);
220 cli.wait_for_ready(Duration::from_secs(10)).await?;
221 sentinel_handles.push((port, cli));
222 }
223
224 tokio::time::sleep(Duration::from_secs(2)).await;
226
227 Ok(RedisSentinelHandle {
228 master,
229 replicas,
230 sentinel_ports: sentinel_handles.iter().map(|(p, _)| *p).collect(),
231 master_name: self.master_name,
232 bind: self.bind,
233 redis_cli_bin: self.redis_cli_bin,
234 num_sentinels: self.num_sentinels,
235 num_replicas: self.num_replicas,
236 })
237 }
238}
239
240pub struct RedisSentinelHandle {
242 master: RedisServerHandle,
243 #[allow(dead_code)] replicas: Vec<RedisServerHandle>,
245 sentinel_ports: Vec<u16>,
246 master_name: String,
247 bind: String,
248 redis_cli_bin: String,
249 num_sentinels: u16,
250 num_replicas: u16,
251}
252
253pub struct RedisSentinel;
255
256impl RedisSentinel {
257 pub fn builder() -> RedisSentinelBuilder {
259 RedisSentinelBuilder {
260 master_name: "mymaster".into(),
261 master_port: 6390,
262 num_replicas: 2,
263 replica_base_port: 6391,
264 num_sentinels: 3,
265 sentinel_base_port: 26389,
266 quorum: 2,
267 bind: "127.0.0.1".into(),
268 down_after_ms: 5000,
269 failover_timeout_ms: 10000,
270 redis_server_bin: "redis-server".into(),
271 redis_cli_bin: "redis-cli".into(),
272 }
273 }
274}
275
276impl RedisSentinelHandle {
277 pub fn master_addr(&self) -> String {
279 self.master.addr()
280 }
281
282 pub fn sentinel_addrs(&self) -> Vec<String> {
284 self.sentinel_ports
285 .iter()
286 .map(|p| format!("{}:{}", self.bind, p))
287 .collect()
288 }
289
290 pub fn master_name(&self) -> &str {
292 &self.master_name
293 }
294
295 pub async fn poke(&self) -> Result<HashMap<String, String>> {
297 for port in &self.sentinel_ports {
298 let cli = RedisCli::new()
299 .bin(&self.redis_cli_bin)
300 .host(&self.bind)
301 .port(*port);
302 if let Ok(raw) = cli.run(&["SENTINEL", "MASTER", &self.master_name]).await {
303 return Ok(parse_flat_kv(&raw));
304 }
305 }
306 Err(Error::NoReachableSentinel)
307 }
308
309 pub async fn is_healthy(&self) -> bool {
311 if let Ok(info) = self.poke().await {
312 let flags = info.get("flags").map(|s| s.as_str()).unwrap_or("");
313 let num_slaves: u64 = info
314 .get("num-slaves")
315 .and_then(|v| v.parse().ok())
316 .unwrap_or(0);
317 let num_sentinels: u64 = info
318 .get("num-other-sentinels")
319 .and_then(|v| v.parse().ok())
320 .unwrap_or(0)
321 + 1;
322 flags == "master"
323 && num_slaves >= self.num_replicas as u64
324 && num_sentinels >= self.num_sentinels as u64
325 } else {
326 false
327 }
328 }
329
330 pub async fn wait_for_healthy(&self, timeout: Duration) -> Result<()> {
332 let start = std::time::Instant::now();
333 loop {
334 if self.is_healthy().await {
335 return Ok(());
336 }
337 if start.elapsed() > timeout {
338 return Err(Error::Timeout {
339 message: "sentinel topology did not become healthy in time".into(),
340 });
341 }
342 tokio::time::sleep(Duration::from_millis(500)).await;
343 }
344 }
345
346 pub fn stop(&self) {
348 for port in &self.sentinel_ports {
350 RedisCli::new()
351 .bin(&self.redis_cli_bin)
352 .host(&self.bind)
353 .port(*port)
354 .shutdown();
355 }
356 }
358}
359
360impl Drop for RedisSentinelHandle {
361 fn drop(&mut self) {
362 self.stop();
363 }
364}
365
366fn parse_flat_kv(raw: &str) -> HashMap<String, String> {
368 let lines: Vec<&str> = raw.lines().map(|l| l.trim()).collect();
369 let mut map = HashMap::new();
370 let mut i = 0;
371 while i + 1 < lines.len() {
372 map.insert(lines[i].to_string(), lines[i + 1].to_string());
373 i += 2;
374 }
375 map
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381
382 #[test]
383 fn builder_defaults() {
384 let b = RedisSentinel::builder();
385 assert_eq!(b.master_port, 6390);
386 assert_eq!(b.num_replicas, 2);
387 assert_eq!(b.num_sentinels, 3);
388 assert_eq!(b.quorum, 2);
389 }
390
391 #[test]
392 fn builder_chain() {
393 let b = RedisSentinel::builder()
394 .master_name("custom")
395 .master_port(6500)
396 .replicas(1)
397 .sentinels(5)
398 .quorum(3);
399 assert_eq!(b.master_name, "custom");
400 assert_eq!(b.master_port, 6500);
401 assert_eq!(b.num_replicas, 1);
402 assert_eq!(b.num_sentinels, 5);
403 assert_eq!(b.quorum, 3);
404 }
405
406 #[test]
407 fn parse_sentinel_output() {
408 let raw = "name\nmymaster\nip\n127.0.0.1\nport\n6380\n";
409 let map = parse_flat_kv(raw);
410 assert_eq!(map.get("name").unwrap(), "mymaster");
411 assert_eq!(map.get("ip").unwrap(), "127.0.0.1");
412 assert_eq!(map.get("port").unwrap(), "6380");
413 }
414}