redis_oxide/
sentinel.rs

1//! Redis Sentinel support for high availability
2//!
3//! Redis Sentinel provides high availability for Redis. It monitors Redis master
4//! and slave instances, performs automatic failover when a master is not working
5//! as expected, and acts as a configuration provider for clients.
6//!
7//! # Examples
8//!
9//! ## Basic Sentinel Configuration
10//!
11//! ```no_run
12//! use redis_oxide::{Client, ConnectionConfig, SentinelConfig};
13//!
14//! # #[tokio::main]
15//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
16//! let sentinel_config = SentinelConfig::new("mymaster")
17//!     .add_sentinel("127.0.0.1:26379")
18//!     .add_sentinel("127.0.0.1:26380")
19//!     .add_sentinel("127.0.0.1:26381")
20//!     .with_password("sentinel_password");
21//!
22//! let config = ConnectionConfig::new_with_sentinel(sentinel_config);
23//! let client = Client::connect(config).await?;
24//!
25//! // Client automatically connects to current master
26//! client.set("key", "value").await?;
27//! # Ok(())
28//! # }
29//! ```
30//!
31//! ## Handling Failover
32//!
33//! ```no_run
34//! use redis_oxide::{Client, ConnectionConfig, SentinelConfig};
35//! use std::time::Duration;
36//!
37//! # #[tokio::main]
38//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
39//! let sentinel_config = SentinelConfig::new("mymaster")
40//!     .add_sentinel("127.0.0.1:26379")
41//!     .with_failover_timeout(Duration::from_secs(30));
42//!
43//! let config = ConnectionConfig::new_with_sentinel(sentinel_config);
44//! let client = Client::connect(config).await?;
45//!
46//! // Client automatically handles master failover
47//! loop {
48//!     match client.get("key").await {
49//!         Ok(_) => {
50//!             println!("Connected to master");
51//!             break;
52//!         }
53//!         Err(e) => {
54//!             println!("Connection failed: {}, retrying...", e);
55//!             tokio::time::sleep(Duration::from_secs(1)).await;
56//!         }
57//!     }
58//! }
59//! # Ok(())
60//! # }
61//! ```
62
63use crate::connection::RedisConnection;
64use crate::core::{
65    config::ConnectionConfig,
66    error::{RedisError, RedisResult},
67    value::RespValue,
68};
69use std::collections::HashMap;
70use std::sync::Arc;
71use std::time::{Duration, Instant};
72use tokio::sync::{Mutex, RwLock};
73use tracing::{debug, info, warn};
74
75/// Configuration for Redis Sentinel
76#[derive(Debug, Clone)]
77pub struct SentinelConfig {
78    /// Master name to monitor
79    pub master_name: String,
80    /// List of sentinel endpoints
81    pub sentinels: Vec<SentinelEndpoint>,
82    /// Password for sentinel authentication
83    pub password: Option<String>,
84    /// Timeout for failover operations
85    pub failover_timeout: Duration,
86    /// Interval for checking master status
87    pub check_interval: Duration,
88    /// Maximum number of failover retries
89    pub max_retries: usize,
90}
91
92/// Sentinel endpoint configuration
93#[derive(Debug, Clone)]
94pub struct SentinelEndpoint {
95    /// Sentinel host
96    pub host: String,
97    /// Sentinel port
98    pub port: u16,
99}
100
101impl SentinelEndpoint {
102    /// Create a new sentinel endpoint
103    #[must_use]
104    pub fn new(host: impl Into<String>, port: u16) -> Self {
105        Self {
106            host: host.into(),
107            port,
108        }
109    }
110
111    /// Parse from address string (host:port)
112    ///
113    /// # Errors
114    ///
115    /// Returns an error if the address format is invalid.
116    pub fn from_address(addr: &str) -> RedisResult<Self> {
117        let parts: Vec<&str> = addr.split(':').collect();
118        if parts.len() != 2 {
119            return Err(RedisError::Config(format!(
120                "Invalid sentinel address: {}",
121                addr
122            )));
123        }
124
125        let host = parts[0].to_string();
126        let port = parts[1].parse::<u16>().map_err(|_| {
127            RedisError::Config(format!("Invalid port in sentinel address: {}", addr))
128        })?;
129
130        Ok(Self::new(host, port))
131    }
132
133    /// Get the address string
134    #[must_use]
135    pub fn address(&self) -> String {
136        format!("{}:{}", self.host, self.port)
137    }
138}
139
140impl SentinelConfig {
141    /// Create a new sentinel configuration
142    #[must_use]
143    pub fn new(master_name: impl Into<String>) -> Self {
144        Self {
145            master_name: master_name.into(),
146            sentinels: Vec::new(),
147            password: None,
148            failover_timeout: Duration::from_secs(30),
149            check_interval: Duration::from_secs(5),
150            max_retries: 3,
151        }
152    }
153
154    /// Add a sentinel endpoint
155    #[must_use]
156    pub fn add_sentinel(mut self, addr: impl AsRef<str>) -> Self {
157        if let Ok(endpoint) = SentinelEndpoint::from_address(addr.as_ref()) {
158            self.sentinels.push(endpoint);
159        }
160        self
161    }
162
163    /// Set sentinel password
164    #[must_use]
165    pub fn with_password(mut self, password: impl Into<String>) -> Self {
166        self.password = Some(password.into());
167        self
168    }
169
170    /// Set failover timeout
171    #[must_use]
172    pub const fn with_failover_timeout(mut self, timeout: Duration) -> Self {
173        self.failover_timeout = timeout;
174        self
175    }
176
177    /// Set check interval
178    #[must_use]
179    pub const fn with_check_interval(mut self, interval: Duration) -> Self {
180        self.check_interval = interval;
181        self
182    }
183
184    /// Set maximum retries
185    #[must_use]
186    pub const fn with_max_retries(mut self, retries: usize) -> Self {
187        self.max_retries = retries;
188        self
189    }
190}
191
192/// Information about a Redis master
193#[derive(Debug, Clone)]
194pub struct MasterInfo {
195    /// Master name
196    pub name: String,
197    /// Master host
198    pub host: String,
199    /// Master port
200    pub port: u16,
201    /// Master status flags
202    pub flags: Vec<String>,
203    /// Number of connected slaves
204    pub num_slaves: u32,
205    /// Number of other sentinels
206    pub num_other_sentinels: u32,
207    /// Quorum for failover
208    pub quorum: u32,
209    /// Failover timeout
210    pub failover_timeout: Duration,
211    /// Parallel syncs
212    pub parallel_syncs: u32,
213}
214
215impl MasterInfo {
216    /// Check if master is down
217    #[must_use]
218    pub fn is_down(&self) -> bool {
219        self.flags.contains(&"s_down".to_string()) || self.flags.contains(&"o_down".to_string())
220    }
221
222    /// Check if failover is in progress
223    #[must_use]
224    pub fn is_failover_in_progress(&self) -> bool {
225        self.flags.contains(&"failover_in_progress".to_string())
226    }
227
228    /// Get master address
229    #[must_use]
230    pub fn address(&self) -> String {
231        format!("{}:{}", self.host, self.port)
232    }
233}
234
235/// Redis Sentinel client for high availability
236pub struct SentinelClient {
237    config: SentinelConfig,
238    sentinels: Arc<RwLock<Vec<Arc<Mutex<RedisConnection>>>>>,
239    current_master: Arc<RwLock<Option<MasterInfo>>>,
240    last_check: Arc<Mutex<Instant>>,
241}
242
243impl SentinelClient {
244    /// Create a new sentinel client
245    ///
246    /// # Errors
247    ///
248    /// Returns an error if no sentinels are configured.
249    pub async fn new(config: SentinelConfig) -> RedisResult<Self> {
250        if config.sentinels.is_empty() {
251            return Err(RedisError::Config("No sentinels configured".to_string()));
252        }
253
254        let client = Self {
255            config,
256            sentinels: Arc::new(RwLock::new(Vec::new())),
257            current_master: Arc::new(RwLock::new(None)),
258            last_check: Arc::new(Mutex::new(Instant::now())),
259        };
260
261        // Initialize sentinel connections
262        client.initialize_sentinels().await?;
263
264        // Discover current master
265        client.discover_master().await?;
266
267        Ok(client)
268    }
269
270    /// Get current master information
271    ///
272    /// # Errors
273    ///
274    /// Returns an error if no master is available.
275    pub async fn get_master(&self) -> RedisResult<MasterInfo> {
276        // Check if we need to refresh master info
277        {
278            let last_check = self.last_check.lock().await;
279            if last_check.elapsed() < self.config.check_interval {
280                if let Some(master) = self.current_master.read().await.clone() {
281                    return Ok(master);
282                }
283            }
284        }
285
286        // Refresh master info
287        self.discover_master().await?;
288
289        self.current_master
290            .read()
291            .await
292            .clone()
293            .ok_or_else(|| RedisError::Sentinel("No master available".to_string()))
294    }
295
296    /// Create a connection to the current master
297    ///
298    /// # Errors
299    ///
300    /// Returns an error if master connection fails.
301    pub async fn connect_to_master(&self) -> RedisResult<RedisConnection> {
302        let master = self.get_master().await?;
303
304        let master_config =
305            ConnectionConfig::new(&format!("redis://{}:{}", master.host, master.port));
306
307        RedisConnection::connect(&master.host, master.port, master_config).await
308    }
309
310    /// Monitor for master changes and failovers
311    pub async fn monitor(&self) -> RedisResult<()> {
312        let mut interval = tokio::time::interval(self.config.check_interval);
313
314        loop {
315            interval.tick().await;
316
317            if let Err(e) = self.check_master_status().await {
318                warn!("Failed to check master status: {}", e);
319            }
320        }
321    }
322
323    async fn initialize_sentinels(&self) -> RedisResult<()> {
324        let mut sentinels = self.sentinels.write().await;
325
326        for endpoint in &self.config.sentinels {
327            match self.connect_to_sentinel(endpoint).await {
328                Ok(conn) => {
329                    sentinels.push(Arc::new(Mutex::new(conn)));
330                    info!("Connected to sentinel: {}", endpoint.address());
331                }
332                Err(e) => {
333                    warn!(
334                        "Failed to connect to sentinel {}: {}",
335                        endpoint.address(),
336                        e
337                    );
338                }
339            }
340        }
341
342        if sentinels.is_empty() {
343            return Err(RedisError::Sentinel("No sentinels available".to_string()));
344        }
345
346        Ok(())
347    }
348
349    async fn connect_to_sentinel(
350        &self,
351        endpoint: &SentinelEndpoint,
352    ) -> RedisResult<RedisConnection> {
353        let sentinel_config =
354            ConnectionConfig::new(&format!("redis://{}:{}", endpoint.host, endpoint.port));
355
356        let mut conn =
357            RedisConnection::connect(&endpoint.host, endpoint.port, sentinel_config).await?;
358
359        // Authenticate if password is provided
360        if let Some(password) = &self.config.password {
361            let auth_cmd = RespValue::Array(vec![
362                RespValue::BulkString(bytes::Bytes::from("AUTH")),
363                RespValue::BulkString(bytes::Bytes::from(password.clone())),
364            ]);
365
366            conn.send_command(&auth_cmd).await?;
367            let _response = conn.read_response().await?;
368        }
369
370        Ok(conn)
371    }
372
373    async fn discover_master(&self) -> RedisResult<()> {
374        let sentinels = self.sentinels.read().await;
375
376        for sentinel in sentinels.iter() {
377            match self.query_master_info(sentinel).await {
378                Ok(master_info) => {
379                    info!("Discovered master: {}", master_info.address());
380                    *self.current_master.write().await = Some(master_info);
381                    *self.last_check.lock().await = Instant::now();
382                    return Ok(());
383                }
384                Err(e) => {
385                    debug!("Failed to query master from sentinel: {}", e);
386                }
387            }
388        }
389
390        Err(RedisError::Sentinel(
391            "Failed to discover master from any sentinel".to_string(),
392        ))
393    }
394
395    async fn query_master_info(
396        &self,
397        sentinel: &Arc<Mutex<RedisConnection>>,
398    ) -> RedisResult<MasterInfo> {
399        let mut conn = sentinel.lock().await;
400
401        // Send SENTINEL masters command
402        let cmd = RespValue::Array(vec![
403            RespValue::BulkString(bytes::Bytes::from("SENTINEL")),
404            RespValue::BulkString(bytes::Bytes::from("masters")),
405        ]);
406
407        conn.send_command(&cmd).await?;
408        let response = conn.read_response().await?;
409
410        self.parse_master_info(response)
411    }
412
413    fn parse_master_info(&self, response: RespValue) -> RedisResult<MasterInfo> {
414        match response {
415            RespValue::Array(masters) => {
416                for master in masters {
417                    if let RespValue::Array(master_data) = master {
418                        let master_info = self.parse_single_master(master_data)?;
419                        if master_info.name == self.config.master_name {
420                            return Ok(master_info);
421                        }
422                    }
423                }
424                Err(RedisError::Sentinel(format!(
425                    "Master '{}' not found",
426                    self.config.master_name
427                )))
428            }
429            _ => Err(RedisError::Sentinel("Invalid masters response".to_string())),
430        }
431    }
432
433    fn parse_single_master(&self, master_data: Vec<RespValue>) -> RedisResult<MasterInfo> {
434        let mut info = HashMap::new();
435
436        // Parse key-value pairs
437        for chunk in master_data.chunks(2) {
438            if chunk.len() == 2 {
439                let key = chunk[0].as_string()?;
440                let value = chunk[1].as_string()?;
441                info.insert(key, value);
442            }
443        }
444
445        let name = info
446            .get("name")
447            .ok_or_else(|| RedisError::Sentinel("Missing master name".to_string()))?
448            .clone();
449
450        let host = info
451            .get("ip")
452            .ok_or_else(|| RedisError::Sentinel("Missing master IP".to_string()))?
453            .clone();
454
455        let port = info
456            .get("port")
457            .ok_or_else(|| RedisError::Sentinel("Missing master port".to_string()))?
458            .parse::<u16>()
459            .map_err(|_| RedisError::Sentinel("Invalid master port".to_string()))?;
460
461        let flags = info
462            .get("flags")
463            .map(|f| f.split(',').map(String::from).collect())
464            .unwrap_or_default();
465
466        let num_slaves = info
467            .get("num-slaves")
468            .and_then(|s| s.parse().ok())
469            .unwrap_or(0);
470
471        let num_other_sentinels = info
472            .get("num-other-sentinels")
473            .and_then(|s| s.parse().ok())
474            .unwrap_or(0);
475
476        let quorum = info.get("quorum").and_then(|s| s.parse().ok()).unwrap_or(1);
477
478        let failover_timeout = info
479            .get("failover-timeout")
480            .and_then(|s| s.parse().ok())
481            .map(Duration::from_millis)
482            .unwrap_or(Duration::from_secs(60));
483
484        let parallel_syncs = info
485            .get("parallel-syncs")
486            .and_then(|s| s.parse().ok())
487            .unwrap_or(1);
488
489        Ok(MasterInfo {
490            name,
491            host,
492            port,
493            flags,
494            num_slaves,
495            num_other_sentinels,
496            quorum,
497            failover_timeout,
498            parallel_syncs,
499        })
500    }
501
502    async fn check_master_status(&self) -> RedisResult<()> {
503        let current_master = self.current_master.read().await.clone();
504
505        if let Some(master) = current_master {
506            // Try to connect to current master
507            match self.test_master_connection(&master).await {
508                Ok(_) => {
509                    debug!("Master {} is healthy", master.address());
510                }
511                Err(_) => {
512                    warn!(
513                        "Master {} is not responding, discovering new master",
514                        master.address()
515                    );
516                    self.discover_master().await?;
517                }
518            }
519        } else {
520            // No current master, try to discover
521            self.discover_master().await?;
522        }
523
524        Ok(())
525    }
526
527    async fn test_master_connection(&self, master: &MasterInfo) -> RedisResult<()> {
528        let master_config =
529            ConnectionConfig::new(&format!("redis://{}:{}", master.host, master.port));
530        let mut conn = RedisConnection::connect(&master.host, master.port, master_config).await?;
531
532        // Send PING command
533        let ping_cmd = RespValue::Array(vec![RespValue::BulkString(bytes::Bytes::from("PING"))]);
534
535        conn.send_command(&ping_cmd).await?;
536        let response = conn.read_response().await?;
537
538        match response {
539            RespValue::SimpleString(s) if s == "PONG" => Ok(()),
540            _ => Err(RedisError::Connection(
541                "Master did not respond to PING".to_string(),
542            )),
543        }
544    }
545}
546
547/// Extension trait for ConnectionConfig to support Sentinel
548impl ConnectionConfig {
549    /// Create a new connection config with Sentinel
550    #[must_use]
551    pub fn new_with_sentinel(sentinel_config: SentinelConfig) -> Self {
552        let mut config = Self::new("");
553        config.sentinel = Some(sentinel_config);
554        config
555    }
556}
557
558#[cfg(test)]
559mod tests {
560    use super::*;
561
562    #[test]
563    fn test_sentinel_endpoint_creation() {
564        let endpoint = SentinelEndpoint::new("localhost", 26379);
565        assert_eq!(endpoint.host, "localhost");
566        assert_eq!(endpoint.port, 26379);
567        assert_eq!(endpoint.address(), "localhost:26379");
568    }
569
570    #[test]
571    fn test_sentinel_endpoint_from_address() {
572        let endpoint = SentinelEndpoint::from_address("127.0.0.1:26379").unwrap();
573        assert_eq!(endpoint.host, "127.0.0.1");
574        assert_eq!(endpoint.port, 26379);
575
576        let invalid = SentinelEndpoint::from_address("invalid");
577        assert!(invalid.is_err());
578    }
579
580    #[test]
581    fn test_sentinel_config_builder() {
582        let config = SentinelConfig::new("mymaster")
583            .add_sentinel("127.0.0.1:26379")
584            .add_sentinel("127.0.0.1:26380")
585            .with_password("secret")
586            .with_failover_timeout(Duration::from_secs(60))
587            .with_max_retries(5);
588
589        assert_eq!(config.master_name, "mymaster");
590        assert_eq!(config.sentinels.len(), 2);
591        assert_eq!(config.password, Some("secret".to_string()));
592        assert_eq!(config.failover_timeout, Duration::from_secs(60));
593        assert_eq!(config.max_retries, 5);
594    }
595
596    #[test]
597    fn test_master_info_status() {
598        let mut master = MasterInfo {
599            name: "mymaster".to_string(),
600            host: "127.0.0.1".to_string(),
601            port: 6379,
602            flags: vec!["master".to_string()],
603            num_slaves: 2,
604            num_other_sentinels: 2,
605            quorum: 2,
606            failover_timeout: Duration::from_secs(60),
607            parallel_syncs: 1,
608        };
609
610        assert!(!master.is_down());
611        assert!(!master.is_failover_in_progress());
612
613        master.flags.push("s_down".to_string());
614        assert!(master.is_down());
615
616        master.flags.clear();
617        master.flags.push("failover_in_progress".to_string());
618        assert!(master.is_failover_in_progress());
619    }
620
621    #[test]
622    fn test_connection_config_with_sentinel() {
623        let sentinel_config = SentinelConfig::new("mymaster").add_sentinel("127.0.0.1:26379");
624
625        let config = ConnectionConfig::new_with_sentinel(sentinel_config);
626        assert!(config.sentinel.is_some());
627        assert_eq!(config.sentinel.as_ref().unwrap().master_name, "mymaster");
628    }
629}