use super::{
Connect,
container::{ClusterNode, ConnectionDetails},
};
use crate::cluster::client::ClusterParams;
use crate::cluster::compat::get_connection_info;
use crate::cluster::slotmap::ReadFromReplicaStrategy;
use crate::connection::factory::FerrisKeyConnectionOptions;
use crate::connection::{ConnectionLike, DisconnectNotifier};
use crate::value::{ErrorKind, Error, Result};
use std::net::{Ipv6Addr, SocketAddr};
use futures::prelude::*;
use futures_util::{future::BoxFuture, join};
use tracing::warn;
pub(crate) type ConnectionFuture<C> = futures::future::Shared<BoxFuture<'static, C>>;
#[doc(hidden)]
pub type AsyncClusterNode<C> = ClusterNode<ConnectionFuture<C>>;
#[doc(hidden)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum RefreshConnectionType {
OnlyUserConnection,
OnlyManagementConnection,
AllConnections,
}
fn failed_management_connection<C>(
addr: &str,
user_conn: ConnectionDetails<ConnectionFuture<C>>,
err: Error,
) -> ConnectAndCheckResult<C>
where
C: ConnectionLike + Send + Clone + Sync + Connect + 'static,
{
warn!(
"Failed to create management connection for node `{:?}`. Error: `{:?}`",
addr, err
);
ConnectAndCheckResult::ManagementConnectionFailed {
node: AsyncClusterNode::new(user_conn, None),
}
}
pub(crate) async fn get_or_create_conn<C>(
addr: &str,
node: Option<AsyncClusterNode<C>>,
params: &ClusterParams,
conn_type: RefreshConnectionType,
ferriskey_connection_options: FerrisKeyConnectionOptions,
) -> Result<AsyncClusterNode<C>>
where
C: ConnectionLike + Send + Clone + Sync + Connect + 'static,
{
if let Some(node) = node {
match check_node_connections(&node, params, conn_type, addr).await {
None => Ok(node),
Some(conn_type) => connect_and_check(
addr,
params.clone(),
None,
conn_type,
Some(node),
ferriskey_connection_options,
)
.await
.get_node(),
}
} else {
connect_and_check(
addr,
params.clone(),
None,
conn_type,
None,
ferriskey_connection_options,
)
.await
.get_node()
}
}
fn create_async_node<C>(
user_conn: ConnectionDetails<C>,
management_conn: Option<ConnectionDetails<C>>,
) -> AsyncClusterNode<C>
where
C: ConnectionLike + Connect + Send + Sync + 'static + Clone,
{
AsyncClusterNode::new(
user_conn.into_future(),
management_conn.map(|conn| conn.into_future()),
)
}
pub(crate) async fn connect_and_check_all_connections<C>(
addr: &str,
params: ClusterParams,
socket_addr: Option<SocketAddr>,
ferriskey_connection_options: FerrisKeyConnectionOptions,
) -> ConnectAndCheckResult<C>
where
C: ConnectionLike + Connect + Send + Sync + 'static + Clone,
{
match future::join(
create_connection(
addr,
params.clone(),
socket_addr,
false,
ferriskey_connection_options.clone(),
),
create_connection(
addr,
params.clone(),
socket_addr,
true,
ferriskey_connection_options,
),
)
.await
{
(Ok(conn_1), Ok(conn_2)) => {
let mut user_conn: ConnectionDetails<C> = conn_1;
let mut management_conn: ConnectionDetails<C> = conn_2;
if let Err(err) = setup_user_connection(&mut user_conn, params).await {
return err.into();
}
match setup_management_connection(&mut management_conn.conn).await {
Ok(_) => ConnectAndCheckResult::Success(create_async_node(
user_conn,
Some(management_conn),
)),
Err(err) => failed_management_connection(addr, user_conn.into_future(), err),
}
}
(Ok(mut user_conn), Err(err)) => {
match setup_user_connection(&mut user_conn, params).await {
Ok(_) => failed_management_connection(addr, user_conn.into_future(), err),
Err(err) => err.into(),
}
}
(Err(err_user), Ok(_mgmt_conn)) => {
Error::from((
ErrorKind::IoError,
"Failed to create user connection",
format!("Node: {addr:?}, user connection error: `{err_user:?}`"),
))
.into()
}
(Err(err_1), Err(err_2)) => {
Error::from((
ErrorKind::IoError,
"Failed to refresh both connections",
format!("Node: {addr:?} received errors: `{err_1:?}`, `{err_2:?}`"),
))
.into()
}
}
}
async fn connect_and_check_only_management_conn<C>(
addr: &str,
params: ClusterParams,
socket_addr: Option<SocketAddr>,
prev_node: AsyncClusterNode<C>,
disconnect_notifier: Option<Box<dyn DisconnectNotifier>>,
) -> ConnectAndCheckResult<C>
where
C: ConnectionLike + Connect + Send + Sync + 'static + Clone,
{
let discover_az = matches!(
params.read_from_replicas,
crate::cluster::slotmap::ReadFromReplicaStrategy::AZAffinity(_)
| crate::cluster::slotmap::ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(_)
);
match create_connection::<C>(
addr,
params.clone(),
socket_addr,
true,
FerrisKeyConnectionOptions {
push_sender: None,
disconnect_notifier,
discover_az,
connection_timeout: Some(params.connection_timeout),
connection_retry_strategy: None,
tcp_nodelay: params.tcp_nodelay,
pubsub_synchronizer: None,
iam_token_provider: None,
},
)
.await
{
Err(conn_err) => failed_management_connection(addr, prev_node.user_connection, conn_err),
Ok(mut connection) => {
if let Err(err) = setup_management_connection(&mut connection.conn).await {
return failed_management_connection(addr, prev_node.user_connection, err);
}
ConnectAndCheckResult::Success(ClusterNode {
user_connection: prev_node.user_connection,
management_connection: Some(connection.into_future()),
})
}
}
}
#[doc(hidden)]
#[must_use]
pub enum ConnectAndCheckResult<C> {
Success(AsyncClusterNode<C>),
ManagementConnectionFailed { node: AsyncClusterNode<C> },
Failed(Error),
}
impl<C> ConnectAndCheckResult<C> {
pub fn get_node(self) -> Result<AsyncClusterNode<C>> {
match self {
ConnectAndCheckResult::Success(node) => Ok(node),
ConnectAndCheckResult::ManagementConnectionFailed { node } => Ok(node),
ConnectAndCheckResult::Failed(err) => Err(err),
}
}
}
impl<C> From<Error> for ConnectAndCheckResult<C> {
fn from(value: Error) -> Self {
ConnectAndCheckResult::Failed(value)
}
}
impl<C> From<AsyncClusterNode<C>> for ConnectAndCheckResult<C> {
fn from(value: AsyncClusterNode<C>) -> Self {
ConnectAndCheckResult::Success(value)
}
}
impl<C> From<Result<AsyncClusterNode<C>>> for ConnectAndCheckResult<C> {
fn from(value: Result<AsyncClusterNode<C>>) -> Self {
match value {
Ok(value) => value.into(),
Err(err) => err.into(),
}
}
}
#[doc(hidden)]
pub async fn connect_and_check<C>(
addr: &str,
params: ClusterParams,
socket_addr: Option<SocketAddr>,
conn_type: RefreshConnectionType,
node: Option<AsyncClusterNode<C>>,
ferriskey_connection_options: FerrisKeyConnectionOptions,
) -> ConnectAndCheckResult<C>
where
C: ConnectionLike + Connect + Send + Sync + 'static + Clone,
{
match conn_type {
RefreshConnectionType::OnlyUserConnection => {
let user_conn = match create_and_setup_user_connection(
addr,
params.clone(),
socket_addr,
ferriskey_connection_options,
)
.await
{
Ok(tuple) => tuple,
Err(err) => return err.into(),
};
let management_conn = node.and_then(|node| node.management_connection);
AsyncClusterNode::new(user_conn.into_future(), management_conn).into()
}
RefreshConnectionType::OnlyManagementConnection => {
match node {
Some(node) => {
connect_and_check_only_management_conn(
addr,
params,
socket_addr,
node,
ferriskey_connection_options.disconnect_notifier,
)
.await
}
None => {
connect_and_check_all_connections(
addr,
params,
socket_addr,
ferriskey_connection_options,
)
.await
}
}
}
RefreshConnectionType::AllConnections => {
connect_and_check_all_connections(
addr,
params,
socket_addr,
ferriskey_connection_options,
)
.await
}
}
}
async fn create_and_setup_user_connection<C>(
node: &str,
params: ClusterParams,
socket_addr: Option<SocketAddr>,
ferriskey_connection_options: FerrisKeyConnectionOptions,
) -> Result<ConnectionDetails<C>>
where
C: ConnectionLike + Connect + Send + 'static,
{
let mut connection: ConnectionDetails<C> = create_connection(
node,
params.clone(),
socket_addr,
false,
ferriskey_connection_options,
)
.await?;
setup_user_connection(&mut connection, params).await?;
Ok(connection)
}
async fn setup_user_connection<C>(
conn_details: &mut ConnectionDetails<C>,
params: ClusterParams,
) -> Result<()>
where
C: ConnectionLike + Connect + Send + 'static,
{
let read_from_replicas =
params.read_from_replicas != ReadFromReplicaStrategy::AlwaysFromPrimary;
let connection_timeout = params.connection_timeout;
check_connection(&mut conn_details.conn, connection_timeout).await?;
if read_from_replicas {
crate::cmd::cmd("READONLY")
.query_async::<_, ()>(&mut conn_details.conn)
.await?;
}
Ok(())
}
#[doc(hidden)]
pub const MANAGEMENT_CONN_NAME: &str = "ferriskey_management_connection";
async fn setup_management_connection<C>(conn: &mut C) -> Result<()>
where
C: ConnectionLike + Connect + Send + 'static,
{
crate::cmd::cmd("CLIENT")
.arg(&["SETNAME", MANAGEMENT_CONN_NAME])
.query_async::<_, ()>(conn)
.await?;
Ok(())
}
async fn create_connection<C>(
node: &str,
params: ClusterParams,
socket_addr: Option<SocketAddr>,
is_management: bool,
mut ferriskey_connection_options: FerrisKeyConnectionOptions,
) -> Result<ConnectionDetails<C>>
where
C: ConnectionLike + Connect + Send + 'static,
{
let connection_timeout = params.connection_timeout;
let response_timeout = params.response_timeout;
let info = get_connection_info(node, params)?;
if is_management {
ferriskey_connection_options.disconnect_notifier = None;
ferriskey_connection_options.pubsub_synchronizer = None;
}
C::connect(
info,
response_timeout,
connection_timeout,
socket_addr,
ferriskey_connection_options,
)
.await
.map(|conn| {
let az = conn.0.get_az();
(conn.0, conn.1, az).into()
})
}
#[doc(hidden)]
pub async fn check_node_connections<C>(
node: &AsyncClusterNode<C>,
params: &ClusterParams,
conn_type: RefreshConnectionType,
address: &str,
) -> Option<RefreshConnectionType>
where
C: ConnectionLike + Send + 'static + Clone,
{
let timeout = params.connection_timeout;
let (check_mgmt_connection, check_user_connection) = match conn_type {
RefreshConnectionType::OnlyUserConnection => (false, true),
RefreshConnectionType::OnlyManagementConnection => (true, false),
RefreshConnectionType::AllConnections => (true, true),
};
let check = |conn, timeout, conn_type| async move {
match check_connection(&mut conn.await, timeout).await {
Ok(_) => false,
Err(err) => {
warn!(
"The {} connection for node {} is unhealthy. Error: {:?}",
conn_type, address, err
);
true
}
}
};
let (mgmt_failed, user_failed) = join!(
async {
if !check_mgmt_connection {
return false;
}
match node.management_connection.clone() {
Some(connection) => check(connection.conn, timeout, "management").await,
None => {
warn!("The management connection for node {} isn't set", address);
true
}
}
},
async {
if !check_user_connection {
return false;
}
let conn = node.user_connection.conn.clone();
check(conn, timeout, "user").await
},
);
match (mgmt_failed, user_failed) {
(true, true) => Some(RefreshConnectionType::AllConnections),
(true, false) => Some(RefreshConnectionType::OnlyManagementConnection),
(false, true) => Some(RefreshConnectionType::OnlyUserConnection),
(false, false) => None,
}
}
async fn check_connection<C>(conn: &mut C, timeout: std::time::Duration) -> Result<()>
where
C: ConnectionLike + Send + 'static,
{
tokio::time::timeout(
timeout,
crate::cmd::cmd("PING").query_async::<_, String>(conn),
)
.await??;
Ok(())
}
pub fn get_host_and_port_from_addr(addr: &str) -> Option<(&str, u16)> {
let (host, port_str) = addr.rsplit_once(':')?;
let port = port_str.parse::<u16>().ok()?;
let host = if host.starts_with('[') && host.ends_with(']') {
let inner = host.strip_prefix('[').unwrap().strip_suffix(']').unwrap();
inner.parse::<Ipv6Addr>().ok()?;
inner
} else {
host
};
Some((host, port))
}