use itertools::Itertools;
use tokio::net::{ToSocketAddrs, lookup_host};
use tracing::warn;
use uuid::Uuid;
use crate::errors::{ConnectionPoolError, DnsLookupError, UseKeyspaceError};
use crate::network::VerifiedKeyspaceName;
use crate::network::{Connection, ConnectivityChangeEvent};
use crate::network::{NodeConnectionPool, PoolConfig};
#[cfg(feature = "metrics")]
use crate::observability::metrics::Metrics;
use crate::routing::{Shard, Sharder};
use std::fmt::Display;
use std::net::IpAddr;
#[cfg(test)]
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use std::{
hash::{Hash, Hasher},
net::SocketAddr,
sync::Arc,
};
use crate::cluster::metadata::{PeerEndpoint, UntranslatedEndpoint};
#[non_exhaustive]
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub enum NodeAddr {
Translatable(SocketAddr),
Untranslatable(SocketAddr),
}
impl NodeAddr {
pub(crate) fn into_inner(self) -> SocketAddr {
match self {
NodeAddr::Translatable(addr) | NodeAddr::Untranslatable(addr) => addr,
}
}
pub(crate) fn inner_mut(&mut self) -> &mut SocketAddr {
match self {
NodeAddr::Translatable(addr) | NodeAddr::Untranslatable(addr) => addr,
}
}
pub fn ip(&self) -> IpAddr {
self.into_inner().ip()
}
pub fn port(&self) -> u16 {
self.into_inner().port()
}
}
impl Display for NodeAddr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.into_inner())
}
}
#[derive(Debug)]
pub struct Node {
pub host_id: Uuid,
pub address: NodeAddr,
pub datacenter: Option<String>,
pub rack: Option<String>,
pool: Option<NodeConnectionPool>,
#[cfg(test)]
enabled_as_connected: AtomicBool,
}
pub type NodeRef<'a> = &'a Arc<Node>;
impl Node {
pub(crate) fn new(
peer: PeerEndpoint,
pool_config: &PoolConfig,
connectivity_events_sender: tokio::sync::mpsc::UnboundedSender<ConnectivityChangeEvent>,
keyspace_name: Option<VerifiedKeyspaceName>,
enabled: bool,
#[cfg(feature = "metrics")] metrics: Arc<Metrics>,
) -> Self {
let host_id = peer.host_id;
let address = peer.address;
let datacenter = peer.datacenter.clone();
let rack = peer.rack.clone();
let (pool_empty_notifier, _) = tokio::sync::mpsc::channel(1);
let pool = enabled.then(|| {
NodeConnectionPool::new(
UntranslatedEndpoint::Peer(peer),
pool_config,
Some((host_id, connectivity_events_sender)),
keyspace_name,
pool_empty_notifier,
#[cfg(feature = "metrics")]
metrics,
)
});
Node {
host_id,
address,
datacenter,
rack,
pool,
#[cfg(test)]
enabled_as_connected: AtomicBool::new(false),
}
}
pub(crate) fn inherit_with_ip_changed(node: &Node, endpoint: PeerEndpoint) -> Self {
let address = endpoint.address;
if let Some(ref pool) = node.pool {
pool.update_endpoint(endpoint);
}
Self {
address,
datacenter: node.datacenter.clone(),
rack: node.rack.clone(),
host_id: node.host_id,
pool: node.pool.clone(),
#[cfg(test)]
enabled_as_connected: AtomicBool::new(node.enabled_as_connected.load(Ordering::SeqCst)),
}
}
pub fn sharder(&self) -> Option<Sharder> {
self.pool.as_ref()?.sharder()
}
pub(crate) async fn connection_for_shard(
&self,
shard: Shard,
) -> Result<Arc<Connection>, ConnectionPoolError> {
self.get_pool()?.connection_for_shard(shard)
}
pub fn is_connected(&self) -> bool {
#[cfg(test)]
if self.enabled_as_connected.load(Ordering::SeqCst) {
return self.is_enabled();
}
let Ok(pool) = self.get_pool() else {
return false;
};
pool.is_connected()
}
pub fn is_enabled(&self) -> bool {
self.pool.is_some()
}
pub(crate) async fn use_keyspace(
&self,
keyspace_name: VerifiedKeyspaceName,
) -> Result<(), UseKeyspaceError> {
if let Some(pool) = &self.pool {
pool.use_keyspace(keyspace_name).await?;
}
Ok(())
}
pub(crate) fn get_working_connections(
&self,
) -> Result<Vec<Arc<Connection>>, ConnectionPoolError> {
self.get_pool()?.get_working_connections()
}
pub(crate) fn get_random_connection(&self) -> Result<Arc<Connection>, ConnectionPoolError> {
self.get_pool()?.random_connection()
}
pub(crate) async fn wait_until_pool_initialized(&self) {
if let Some(pool) = &self.pool {
pool.wait_until_initialized().await;
}
}
fn get_pool(&self) -> Result<&NodeConnectionPool, ConnectionPoolError> {
self.pool
.as_ref()
.ok_or(ConnectionPoolError::NodeDisabledByHostFilter)
}
}
impl PartialEq for Node {
fn eq(&self, other: &Self) -> bool {
self.host_id == other.host_id
}
}
impl Eq for Node {}
impl Hash for Node {
fn hash<H: Hasher>(&self, state: &mut H) {
self.host_id.hash(state);
}
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
#[non_exhaustive]
pub enum KnownNode {
Hostname(String),
Address(SocketAddr),
}
#[derive(Debug, Clone)]
pub(crate) struct ResolvedContactPoint {
pub(crate) address: SocketAddr,
}
async fn lookup_host_with_timeout(
host: impl ToSocketAddrs,
hostname_resolution_timeout: Option<Duration>,
) -> Result<impl Iterator<Item = SocketAddr>, DnsLookupError> {
if let Some(timeout) = hostname_resolution_timeout {
match tokio::time::timeout(timeout, lookup_host(host)).await {
Ok(res) => res.map_err(|io_err| DnsLookupError::IoError(Arc::new(io_err))),
Err(_) => Err(DnsLookupError::Timeout(timeout.as_millis())),
}
} else {
lookup_host(host)
.await
.map_err(|io_err| DnsLookupError::IoError(Arc::new(io_err)))
}
}
pub(crate) async fn resolve_hostname(
hostname: &str,
hostname_resolution_timeout: Option<Duration>,
) -> Result<SocketAddr, DnsLookupError> {
let addrs = match lookup_host_with_timeout(hostname, hostname_resolution_timeout).await {
Ok(addrs) => itertools::Either::Left(addrs),
Err(DnsLookupError::Timeout(t)) => return Err(DnsLookupError::Timeout(t)),
Err(e) => {
let addrs = lookup_host_with_timeout((hostname, 9042), hostname_resolution_timeout)
.await
.or(Err(e))?;
itertools::Either::Right(addrs)
}
};
addrs
.find_or_last(|addr| matches!(addr, SocketAddr::V4(_)))
.ok_or_else(|| DnsLookupError::EmptyAddressListForHost(hostname.into()))
}
pub(crate) async fn resolve_contact_points(
known_nodes: &[KnownNode],
hostname_resolution_timeout: Option<Duration>,
) -> (Vec<ResolvedContactPoint>, Vec<String>) {
let mut initial_peers: Vec<ResolvedContactPoint> = Vec::with_capacity(known_nodes.len());
let mut to_resolve: Vec<&str> = Vec::new();
let mut hostnames: Vec<String> = Vec::new();
for node in known_nodes.iter() {
match node {
KnownNode::Hostname(hostname) => {
to_resolve.push(hostname.as_str());
hostnames.push(hostname.clone());
}
KnownNode::Address(address) => {
initial_peers.push(ResolvedContactPoint { address: *address })
}
};
}
let resolve_futures = to_resolve.into_iter().map(|hostname| async move {
match resolve_hostname(hostname, hostname_resolution_timeout).await {
Ok(address) => Some(ResolvedContactPoint { address }),
Err(e) => {
warn!("Hostname resolution failed for {}: {}", hostname, &e);
None
}
}
});
let resolved: Vec<_> = futures::future::join_all(resolve_futures).await;
initial_peers.extend(resolved.into_iter().flatten());
(initial_peers, hostnames)
}
#[cfg(test)]
mod tests {
use super::*;
impl Node {
pub(crate) fn new_for_test(
id: Option<Uuid>,
address: Option<NodeAddr>,
datacenter: Option<String>,
rack: Option<String>,
) -> Self {
Self {
host_id: id.unwrap_or(Uuid::new_v4()),
address: address.unwrap_or(NodeAddr::Translatable(SocketAddr::from((
[255, 255, 255, 255],
0,
)))),
datacenter,
rack,
pool: None,
enabled_as_connected: AtomicBool::new(false),
}
}
pub(crate) fn use_enabled_as_connected(&self) {
self.enabled_as_connected.store(true, Ordering::SeqCst);
}
}
}