use std::net::ToSocketAddrs;
use std::str;
use std::vec::Vec;
use crate::cluster::version_parser::{Version, VersionParser};
use crate::cluster::Cluster;
use crate::commands::Message;
use crate::errors::{Error, Result};
use crate::net::{Connection, Host};
use crate::policy::{AdminPolicy, ClientPolicy};
use crate::ToHosts;
#[allow(clippy::struct_excessive_bools)]
#[derive(Clone)]
pub struct NodeValidator {
pub name: String,
pub aliases: Vec<Host>,
pub services: Vec<Host>,
pub address: String,
pub client_policy: ClientPolicy,
pub use_new_info: bool,
pub version: Version,
}
impl NodeValidator {
pub fn new(client_policy: ClientPolicy) -> Self {
NodeValidator {
name: String::new(),
services: vec![],
aliases: vec![],
address: String::new(),
client_policy,
use_new_info: true,
version: Version::default(),
}
}
#[allow(clippy::option_if_let_else)]
pub async fn validate_node(&mut self, cluster: &Cluster, host: &Host) -> Result<()> {
self.resolve_aliases(host)
.map_err(|e| e.chain_error("Failed to resolve host aliases"))?;
let mut last_err = None;
for alias in &self.aliases() {
match self.validate_alias(cluster, alias).await {
Ok(()) => return Ok(()),
Err(err) => {
debug!("Alias {alias} failed: {err:?}");
last_err = Some(err);
}
}
}
match last_err {
Some(err) => Err(err),
None => unreachable!(),
}
}
pub fn aliases(&self) -> Vec<Host> {
self.aliases.clone()
}
pub fn services(&self) -> Vec<Host> {
self.services.clone()
}
fn resolve_aliases(&mut self, host: &Host) -> Result<()> {
self.aliases = (host.name.as_ref(), host.port)
.to_socket_addrs()?
.map(|addr| {
Host::new_tls(
&addr.ip().to_string(),
&host.tls_name.clone().unwrap_or_default(),
addr.port(),
)
})
.collect();
debug!("Resolved aliases for host {}: {:?}", host, self.aliases);
if self.aliases.is_empty() {
Err(Error::Connection(format!(
"Failed to find addresses for {host}"
)))
} else {
Ok(())
}
}
async fn validate_alias(&mut self, cluster: &Cluster, alias: &Host) -> Result<()> {
let mut conn =
Connection::new(alias, &self.client_policy, cluster.hashed_pass().as_ref()).await?;
let service_name = cluster.client_policy.load().service_string();
let admin_policy = AdminPolicy {
timeout: self.client_policy.timeout,
};
let info_map = Message::info(
&admin_policy,
&mut conn,
&["node", "cluster-name", "build", service_name],
)
.await?;
match info_map.get("node") {
None => return Err(Error::InvalidNode(String::from("Missing node name"))),
Some(node_name) => self.name.clone_from(node_name),
}
if let Some(ref cluster_name) = cluster.cluster_name() {
match info_map.get("cluster-name") {
None => return Err(Error::InvalidNode(String::from("Missing cluster name"))),
Some(info_name) if info_name == cluster_name => {}
Some(info_name) => {
return Err(Error::InvalidNode(format!(
"Cluster name mismatch: expected={cluster_name},
got={info_name}"
)))
}
}
}
self.address = alias.address();
if let Some(build) = info_map.get("build") {
let version = VersionParser::new(build).parse()?;
self.version = version;
}
if let Some(peers) = info_map.get(service_name) {
if !peers.trim().is_empty() {
self.set_services(alias, peers);
}
}
Ok(())
}
fn set_services(&mut self, alias: &Host, peers: &str) {
let peers = peers.split(';');
for peer in peers {
match peer.to_hosts() {
Err(e) => error!("Invalid host: {peer}, {e}"),
Ok(host) => {
let mut host: Vec<Host> = host
.into_iter()
.map(|mut h| {
h.tls_name.clone_from(&alias.tls_name);
if let Some(ref ip_map) = self.client_policy.ip_map {
if let Some(mapped) = ip_map.get(&h.name) {
h.name = mapped.clone();
}
}
h
})
.collect();
self.services.append(&mut host);
}
}
}
}
}