use crate::{
Error, Result, RetryReason, StandaloneConnection,
client::{Config, SentinelConfig},
commands::{RoleResult, SentinelCommands, ServerCommands},
resp::{Command, RespResponse},
sleep,
};
use log::debug;
use std::{sync::Arc, task::Poll};
pub struct SentinelConnection {
sentinel_config: SentinelConfig,
config: Config,
pub inner_connection: StandaloneConnection,
}
impl SentinelConnection {
#[inline]
pub async fn feed(&mut self, command: &Command, retry_reasons: &[RetryReason]) -> Result<()> {
self.inner_connection.feed(command, retry_reasons).await
}
#[inline]
pub async fn flush(&mut self) -> Result<()> {
self.inner_connection.flush().await
}
#[inline]
pub async fn read(&mut self) -> Option<Result<RespResponse>> {
self.inner_connection.read().await
}
#[inline]
pub fn try_read(&mut self) -> Poll<Option<Result<RespResponse>>> {
self.inner_connection.try_read()
}
#[inline]
pub async fn reconnect(&mut self) -> Result<()> {
self.inner_connection =
Self::connect_to_sentinel(&self.sentinel_config, &self.config).await?;
Ok(())
}
pub async fn connect(
sentinel_config: &SentinelConfig,
config: &Config,
) -> Result<SentinelConnection> {
let inner_connection = Self::connect_to_sentinel(sentinel_config, config).await?;
Ok(SentinelConnection {
sentinel_config: sentinel_config.clone(),
config: config.clone(),
inner_connection,
})
}
async fn connect_to_sentinel(
sentinel_config: &SentinelConfig,
config: &Config,
) -> Result<StandaloneConnection> {
let mut restart = false;
let mut unreachable_sentinel = true;
let mut sentinel_node_config = config.clone();
sentinel_node_config
.username
.clone_from(&sentinel_config.username);
sentinel_node_config
.password
.clone_from(&sentinel_config.password);
loop {
for sentinel_instance in &sentinel_config.instances {
let (host, port) = sentinel_instance;
let mut sentinel_connection =
match StandaloneConnection::connect(host, *port, &sentinel_node_config).await {
Ok(sentinel_connection) => sentinel_connection,
Err(e) => {
debug!("Cannot connect to Sentinel {}:{} : {}", *host, *port, e);
continue;
}
};
let (master_host, master_port) = match sentinel_connection
.sentinel_get_master_addr_by_name(sentinel_config.service_name.clone())
.await
{
Ok(Some((master_host, master_port))) => (master_host, master_port),
Ok(None) => {
debug!(
"Sentinel {}:{} does not know master `{}`",
*host, *port, sentinel_config.service_name
);
unreachable_sentinel = false;
continue;
}
Err(e) => {
debug!(
"Cannot execute command `SENTINEL get-master-addr-by-name` with Sentinel {}:{}: {}",
*host, *port, e
);
continue;
}
};
let mut master_connection =
StandaloneConnection::connect(&master_host, master_port, config).await?;
let role: RoleResult = master_connection.role().await?;
if let RoleResult::Master {
master_replication_offset: _,
replica_infos: _,
} = role
{
return Ok(master_connection);
} else {
sleep(sentinel_config.wait_between_failures).await;
restart = true;
break;
}
}
if !restart {
break;
} else {
restart = false;
}
}
if unreachable_sentinel {
Err(Error::Sentinel(
"All Sentinel instances are unreachable".to_owned(),
))
} else {
Err(Error::Sentinel(format!(
"master {} is unknown by all Sentinel instances",
sentinel_config.service_name
)))
}
}
pub(crate) fn tag(&self) -> Arc<str> {
self.inner_connection.tag()
}
}