1use crate::connection::RedisConnection;
64use crate::core::{
65 config::ConnectionConfig,
66 error::{RedisError, RedisResult},
67 value::RespValue,
68};
69use std::collections::HashMap;
70use std::sync::Arc;
71use std::time::{Duration, Instant};
72use tokio::sync::{Mutex, RwLock};
73use tracing::{debug, info, warn};
74
75#[derive(Debug, Clone)]
77pub struct SentinelConfig {
78 pub master_name: String,
80 pub sentinels: Vec<SentinelEndpoint>,
82 pub password: Option<String>,
84 pub failover_timeout: Duration,
86 pub check_interval: Duration,
88 pub max_retries: usize,
90}
91
92#[derive(Debug, Clone)]
94pub struct SentinelEndpoint {
95 pub host: String,
97 pub port: u16,
99}
100
101impl SentinelEndpoint {
102 #[must_use]
104 pub fn new(host: impl Into<String>, port: u16) -> Self {
105 Self {
106 host: host.into(),
107 port,
108 }
109 }
110
111 pub fn from_address(addr: &str) -> RedisResult<Self> {
117 let parts: Vec<&str> = addr.split(':').collect();
118 if parts.len() != 2 {
119 return Err(RedisError::Config(format!(
120 "Invalid sentinel address: {}",
121 addr
122 )));
123 }
124
125 let host = parts[0].to_string();
126 let port = parts[1].parse::<u16>().map_err(|_| {
127 RedisError::Config(format!("Invalid port in sentinel address: {}", addr))
128 })?;
129
130 Ok(Self::new(host, port))
131 }
132
133 #[must_use]
135 pub fn address(&self) -> String {
136 format!("{}:{}", self.host, self.port)
137 }
138}
139
140impl SentinelConfig {
141 #[must_use]
143 pub fn new(master_name: impl Into<String>) -> Self {
144 Self {
145 master_name: master_name.into(),
146 sentinels: Vec::new(),
147 password: None,
148 failover_timeout: Duration::from_secs(30),
149 check_interval: Duration::from_secs(5),
150 max_retries: 3,
151 }
152 }
153
154 #[must_use]
156 pub fn add_sentinel(mut self, addr: impl AsRef<str>) -> Self {
157 if let Ok(endpoint) = SentinelEndpoint::from_address(addr.as_ref()) {
158 self.sentinels.push(endpoint);
159 }
160 self
161 }
162
163 #[must_use]
165 pub fn with_password(mut self, password: impl Into<String>) -> Self {
166 self.password = Some(password.into());
167 self
168 }
169
170 #[must_use]
172 pub const fn with_failover_timeout(mut self, timeout: Duration) -> Self {
173 self.failover_timeout = timeout;
174 self
175 }
176
177 #[must_use]
179 pub const fn with_check_interval(mut self, interval: Duration) -> Self {
180 self.check_interval = interval;
181 self
182 }
183
184 #[must_use]
186 pub const fn with_max_retries(mut self, retries: usize) -> Self {
187 self.max_retries = retries;
188 self
189 }
190}
191
192#[derive(Debug, Clone)]
194pub struct MasterInfo {
195 pub name: String,
197 pub host: String,
199 pub port: u16,
201 pub flags: Vec<String>,
203 pub num_slaves: u32,
205 pub num_other_sentinels: u32,
207 pub quorum: u32,
209 pub failover_timeout: Duration,
211 pub parallel_syncs: u32,
213}
214
215impl MasterInfo {
216 #[must_use]
218 pub fn is_down(&self) -> bool {
219 self.flags.contains(&"s_down".to_string()) || self.flags.contains(&"o_down".to_string())
220 }
221
222 #[must_use]
224 pub fn is_failover_in_progress(&self) -> bool {
225 self.flags.contains(&"failover_in_progress".to_string())
226 }
227
228 #[must_use]
230 pub fn address(&self) -> String {
231 format!("{}:{}", self.host, self.port)
232 }
233}
234
235pub struct SentinelClient {
237 config: SentinelConfig,
238 sentinels: Arc<RwLock<Vec<Arc<Mutex<RedisConnection>>>>>,
239 current_master: Arc<RwLock<Option<MasterInfo>>>,
240 last_check: Arc<Mutex<Instant>>,
241}
242
243impl SentinelClient {
244 pub async fn new(config: SentinelConfig) -> RedisResult<Self> {
250 if config.sentinels.is_empty() {
251 return Err(RedisError::Config("No sentinels configured".to_string()));
252 }
253
254 let client = Self {
255 config,
256 sentinels: Arc::new(RwLock::new(Vec::new())),
257 current_master: Arc::new(RwLock::new(None)),
258 last_check: Arc::new(Mutex::new(Instant::now())),
259 };
260
261 client.initialize_sentinels().await?;
263
264 client.discover_master().await?;
266
267 Ok(client)
268 }
269
270 pub async fn get_master(&self) -> RedisResult<MasterInfo> {
276 {
278 let last_check = self.last_check.lock().await;
279 if last_check.elapsed() < self.config.check_interval {
280 if let Some(master) = self.current_master.read().await.clone() {
281 return Ok(master);
282 }
283 }
284 }
285
286 self.discover_master().await?;
288
289 self.current_master
290 .read()
291 .await
292 .clone()
293 .ok_or_else(|| RedisError::Sentinel("No master available".to_string()))
294 }
295
296 pub async fn connect_to_master(&self) -> RedisResult<RedisConnection> {
302 let master = self.get_master().await?;
303
304 let master_config =
305 ConnectionConfig::new(&format!("redis://{}:{}", master.host, master.port));
306
307 RedisConnection::connect(&master.host, master.port, master_config).await
308 }
309
310 pub async fn monitor(&self) -> RedisResult<()> {
312 let mut interval = tokio::time::interval(self.config.check_interval);
313
314 loop {
315 interval.tick().await;
316
317 if let Err(e) = self.check_master_status().await {
318 warn!("Failed to check master status: {}", e);
319 }
320 }
321 }
322
323 async fn initialize_sentinels(&self) -> RedisResult<()> {
324 let mut sentinels = self.sentinels.write().await;
325
326 for endpoint in &self.config.sentinels {
327 match self.connect_to_sentinel(endpoint).await {
328 Ok(conn) => {
329 sentinels.push(Arc::new(Mutex::new(conn)));
330 info!("Connected to sentinel: {}", endpoint.address());
331 }
332 Err(e) => {
333 warn!(
334 "Failed to connect to sentinel {}: {}",
335 endpoint.address(),
336 e
337 );
338 }
339 }
340 }
341
342 if sentinels.is_empty() {
343 return Err(RedisError::Sentinel("No sentinels available".to_string()));
344 }
345
346 Ok(())
347 }
348
349 async fn connect_to_sentinel(
350 &self,
351 endpoint: &SentinelEndpoint,
352 ) -> RedisResult<RedisConnection> {
353 let sentinel_config =
354 ConnectionConfig::new(&format!("redis://{}:{}", endpoint.host, endpoint.port));
355
356 let mut conn =
357 RedisConnection::connect(&endpoint.host, endpoint.port, sentinel_config).await?;
358
359 if let Some(password) = &self.config.password {
361 let auth_cmd = RespValue::Array(vec![
362 RespValue::BulkString(bytes::Bytes::from("AUTH")),
363 RespValue::BulkString(bytes::Bytes::from(password.clone())),
364 ]);
365
366 conn.send_command(&auth_cmd).await?;
367 let _response = conn.read_response().await?;
368 }
369
370 Ok(conn)
371 }
372
373 async fn discover_master(&self) -> RedisResult<()> {
374 let sentinels = self.sentinels.read().await;
375
376 for sentinel in sentinels.iter() {
377 match self.query_master_info(sentinel).await {
378 Ok(master_info) => {
379 info!("Discovered master: {}", master_info.address());
380 *self.current_master.write().await = Some(master_info);
381 *self.last_check.lock().await = Instant::now();
382 return Ok(());
383 }
384 Err(e) => {
385 debug!("Failed to query master from sentinel: {}", e);
386 }
387 }
388 }
389
390 Err(RedisError::Sentinel(
391 "Failed to discover master from any sentinel".to_string(),
392 ))
393 }
394
395 async fn query_master_info(
396 &self,
397 sentinel: &Arc<Mutex<RedisConnection>>,
398 ) -> RedisResult<MasterInfo> {
399 let mut conn = sentinel.lock().await;
400
401 let cmd = RespValue::Array(vec![
403 RespValue::BulkString(bytes::Bytes::from("SENTINEL")),
404 RespValue::BulkString(bytes::Bytes::from("masters")),
405 ]);
406
407 conn.send_command(&cmd).await?;
408 let response = conn.read_response().await?;
409
410 self.parse_master_info(response)
411 }
412
413 fn parse_master_info(&self, response: RespValue) -> RedisResult<MasterInfo> {
414 match response {
415 RespValue::Array(masters) => {
416 for master in masters {
417 if let RespValue::Array(master_data) = master {
418 let master_info = self.parse_single_master(master_data)?;
419 if master_info.name == self.config.master_name {
420 return Ok(master_info);
421 }
422 }
423 }
424 Err(RedisError::Sentinel(format!(
425 "Master '{}' not found",
426 self.config.master_name
427 )))
428 }
429 _ => Err(RedisError::Sentinel("Invalid masters response".to_string())),
430 }
431 }
432
433 fn parse_single_master(&self, master_data: Vec<RespValue>) -> RedisResult<MasterInfo> {
434 let mut info = HashMap::new();
435
436 for chunk in master_data.chunks(2) {
438 if chunk.len() == 2 {
439 let key = chunk[0].as_string()?;
440 let value = chunk[1].as_string()?;
441 info.insert(key, value);
442 }
443 }
444
445 let name = info
446 .get("name")
447 .ok_or_else(|| RedisError::Sentinel("Missing master name".to_string()))?
448 .clone();
449
450 let host = info
451 .get("ip")
452 .ok_or_else(|| RedisError::Sentinel("Missing master IP".to_string()))?
453 .clone();
454
455 let port = info
456 .get("port")
457 .ok_or_else(|| RedisError::Sentinel("Missing master port".to_string()))?
458 .parse::<u16>()
459 .map_err(|_| RedisError::Sentinel("Invalid master port".to_string()))?;
460
461 let flags = info
462 .get("flags")
463 .map(|f| f.split(',').map(String::from).collect())
464 .unwrap_or_default();
465
466 let num_slaves = info
467 .get("num-slaves")
468 .and_then(|s| s.parse().ok())
469 .unwrap_or(0);
470
471 let num_other_sentinels = info
472 .get("num-other-sentinels")
473 .and_then(|s| s.parse().ok())
474 .unwrap_or(0);
475
476 let quorum = info.get("quorum").and_then(|s| s.parse().ok()).unwrap_or(1);
477
478 let failover_timeout = info
479 .get("failover-timeout")
480 .and_then(|s| s.parse().ok())
481 .map(Duration::from_millis)
482 .unwrap_or(Duration::from_secs(60));
483
484 let parallel_syncs = info
485 .get("parallel-syncs")
486 .and_then(|s| s.parse().ok())
487 .unwrap_or(1);
488
489 Ok(MasterInfo {
490 name,
491 host,
492 port,
493 flags,
494 num_slaves,
495 num_other_sentinels,
496 quorum,
497 failover_timeout,
498 parallel_syncs,
499 })
500 }
501
502 async fn check_master_status(&self) -> RedisResult<()> {
503 let current_master = self.current_master.read().await.clone();
504
505 if let Some(master) = current_master {
506 match self.test_master_connection(&master).await {
508 Ok(_) => {
509 debug!("Master {} is healthy", master.address());
510 }
511 Err(_) => {
512 warn!(
513 "Master {} is not responding, discovering new master",
514 master.address()
515 );
516 self.discover_master().await?;
517 }
518 }
519 } else {
520 self.discover_master().await?;
522 }
523
524 Ok(())
525 }
526
527 async fn test_master_connection(&self, master: &MasterInfo) -> RedisResult<()> {
528 let master_config =
529 ConnectionConfig::new(&format!("redis://{}:{}", master.host, master.port));
530 let mut conn = RedisConnection::connect(&master.host, master.port, master_config).await?;
531
532 let ping_cmd = RespValue::Array(vec![RespValue::BulkString(bytes::Bytes::from("PING"))]);
534
535 conn.send_command(&ping_cmd).await?;
536 let response = conn.read_response().await?;
537
538 match response {
539 RespValue::SimpleString(s) if s == "PONG" => Ok(()),
540 _ => Err(RedisError::Connection(
541 "Master did not respond to PING".to_string(),
542 )),
543 }
544 }
545}
546
547impl ConnectionConfig {
549 #[must_use]
551 pub fn new_with_sentinel(sentinel_config: SentinelConfig) -> Self {
552 let mut config = Self::new("");
553 config.sentinel = Some(sentinel_config);
554 config
555 }
556}
557
558#[cfg(test)]
559mod tests {
560 use super::*;
561
562 #[test]
563 fn test_sentinel_endpoint_creation() {
564 let endpoint = SentinelEndpoint::new("localhost", 26379);
565 assert_eq!(endpoint.host, "localhost");
566 assert_eq!(endpoint.port, 26379);
567 assert_eq!(endpoint.address(), "localhost:26379");
568 }
569
570 #[test]
571 fn test_sentinel_endpoint_from_address() {
572 let endpoint = SentinelEndpoint::from_address("127.0.0.1:26379").unwrap();
573 assert_eq!(endpoint.host, "127.0.0.1");
574 assert_eq!(endpoint.port, 26379);
575
576 let invalid = SentinelEndpoint::from_address("invalid");
577 assert!(invalid.is_err());
578 }
579
580 #[test]
581 fn test_sentinel_config_builder() {
582 let config = SentinelConfig::new("mymaster")
583 .add_sentinel("127.0.0.1:26379")
584 .add_sentinel("127.0.0.1:26380")
585 .with_password("secret")
586 .with_failover_timeout(Duration::from_secs(60))
587 .with_max_retries(5);
588
589 assert_eq!(config.master_name, "mymaster");
590 assert_eq!(config.sentinels.len(), 2);
591 assert_eq!(config.password, Some("secret".to_string()));
592 assert_eq!(config.failover_timeout, Duration::from_secs(60));
593 assert_eq!(config.max_retries, 5);
594 }
595
596 #[test]
597 fn test_master_info_status() {
598 let mut master = MasterInfo {
599 name: "mymaster".to_string(),
600 host: "127.0.0.1".to_string(),
601 port: 6379,
602 flags: vec!["master".to_string()],
603 num_slaves: 2,
604 num_other_sentinels: 2,
605 quorum: 2,
606 failover_timeout: Duration::from_secs(60),
607 parallel_syncs: 1,
608 };
609
610 assert!(!master.is_down());
611 assert!(!master.is_failover_in_progress());
612
613 master.flags.push("s_down".to_string());
614 assert!(master.is_down());
615
616 master.flags.clear();
617 master.flags.push("failover_in_progress".to_string());
618 assert!(master.is_failover_in_progress());
619 }
620
621 #[test]
622 fn test_connection_config_with_sentinel() {
623 let sentinel_config = SentinelConfig::new("mymaster").add_sentinel("127.0.0.1:26379");
624
625 let config = ConnectionConfig::new_with_sentinel(sentinel_config);
626 assert!(config.sentinel.is_some());
627 assert_eq!(config.sentinel.as_ref().unwrap().master_name, "mymaster");
628 }
629}