use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use redis::aio::MultiplexedConnection;
use redis::sentinel::{SentinelClient, SentinelServerType};
use redis::{AsyncCommands, Value, cmd};
use tokio::sync::Mutex;
use tracing::{debug, warn};
use crate::config::{ServerRole, SentinelPoolConfig};
use crate::error::{Result, SentinelPoolError};
pub struct PooledConnection {
pub(crate) inner: MultiplexedConnection,
epoch: u64,
}
impl AsRef<MultiplexedConnection> for PooledConnection {
fn as_ref(&self) -> &MultiplexedConnection {
&self.inner
}
}
impl AsMut<MultiplexedConnection> for PooledConnection {
fn as_mut(&mut self) -> &mut MultiplexedConnection {
&mut self.inner
}
}
impl std::ops::Deref for PooledConnection {
type Target = MultiplexedConnection;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl std::ops::DerefMut for PooledConnection {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
#[derive(Clone)]
pub struct SentinelConnectionManager {
client: Arc<Mutex<SentinelClient>>,
pub(crate) config: SentinelPoolConfig,
pub(crate) epoch: Arc<AtomicU64>,
}
impl SentinelConnectionManager {
pub fn new(config: SentinelPoolConfig) -> Result<Self> {
if config.sentinels.is_empty() {
return Err(SentinelPoolError::Config(
"at least one sentinel address is required".into(),
));
}
if config.service_name.is_empty() {
return Err(SentinelPoolError::Config(
"service_name must not be empty".into(),
));
}
let server_type = match config.role {
ServerRole::Master => SentinelServerType::Master,
ServerRole::Replica => SentinelServerType::Replica,
};
let node_info = config.build_node_connection_info();
let client = SentinelClient::build(
config.sentinels.clone(),
config.service_name.clone(),
Some(node_info),
server_type,
)?;
Ok(Self {
client: Arc::new(Mutex::new(client)),
config,
epoch: Arc::new(AtomicU64::new(0)),
})
}
pub fn bump_epoch(&self) -> u64 {
let new_epoch = self.epoch.fetch_add(1, Ordering::AcqRel) + 1;
debug!(new_epoch, "sentinel manager epoch bumped");
new_epoch
}
pub fn current_epoch(&self) -> u64 {
self.epoch.load(Ordering::Acquire)
}
pub(crate) fn shared_client(&self) -> Arc<Mutex<SentinelClient>> {
Arc::clone(&self.client)
}
}
#[allow(clippy::manual_async_fn)]
impl bb8::ManageConnection for SentinelConnectionManager {
type Connection = PooledConnection;
type Error = SentinelPoolError;
fn connect(
&self,
) -> impl std::future::Future<Output = Result<Self::Connection>> + Send {
async move {
let conn = {
let mut client = self.client.lock().await;
client.get_async_connection().await?
};
let epoch = self.current_epoch();
debug!(epoch, "created new sentinel-aware connection");
Ok(PooledConnection { inner: conn, epoch })
}
}
fn is_valid(
&self,
conn: &mut Self::Connection,
) -> impl std::future::Future<Output = Result<()>> + Send {
async move {
let current = self.current_epoch();
if conn.epoch < current {
return Err(SentinelPoolError::Pool(format!(
"connection epoch {} is older than current {} (master changed)",
conn.epoch, current
)));
}
let pong: String = cmd("PING").query_async(&mut conn.inner).await?;
if pong != "PONG" {
return Err(SentinelPoolError::Pool(format!(
"unexpected PING reply: {pong}"
)));
}
if self.config.verify_role_on_checkout && self.config.role == ServerRole::Master {
let role: Value = cmd("ROLE").query_async(&mut conn.inner).await?;
if !is_master_role(&role) {
warn!("connection no longer points to a master, dropping");
self.bump_epoch();
return Err(SentinelPoolError::Pool(
"connection is no longer master".into(),
));
}
}
Ok(())
}
}
fn has_broken(&self, conn: &mut Self::Connection) -> bool {
conn.epoch < self.current_epoch()
}
}
#[allow(dead_code)]
fn _assert_async_commands_impl<T: AsyncCommands>(_: &T) {}
fn is_master_role(value: &Value) -> bool {
if let Value::Array(items) = value {
if let Some(first) = items.first() {
return matches!(
first,
Value::BulkString(s) if s.eq_ignore_ascii_case(b"master")
) || matches!(
first,
Value::SimpleString(s) if s.eq_ignore_ascii_case("master")
);
}
}
false
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn detects_master_role() {
let value = Value::Array(vec![
Value::BulkString(b"master".to_vec()),
Value::Int(0),
Value::Array(vec![]),
]);
assert!(is_master_role(&value));
}
#[test]
fn rejects_replica_role() {
let value = Value::Array(vec![
Value::BulkString(b"slave".to_vec()),
Value::BulkString(b"127.0.0.1".to_vec()),
Value::Int(6379),
]);
assert!(!is_master_role(&value));
}
}