aerospike-core 2.1.0

Aerospike Client for Rust
// Copyright 2015-2018 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::net::ToSocketAddrs;
use std::str;
use std::vec::Vec;

use crate::cluster::version_parser::{Version, VersionParser};
use crate::cluster::Cluster;
use crate::commands::Message;
use crate::errors::{Error, Result};
use crate::net::{Connection, Host};
use crate::policy::{AdminPolicy, ClientPolicy};
use crate::ToHosts;

// Validates a Database server node
#[allow(clippy::struct_excessive_bools)]
#[derive(Clone)]
pub struct NodeValidator {
    pub name: String,
    pub aliases: Vec<Host>,
    pub services: Vec<Host>,
    pub address: String,
    pub client_policy: ClientPolicy,
    pub use_new_info: bool,
    pub version: Version,
}

// Generates a node validator
impl NodeValidator {
    pub fn new(client_policy: ClientPolicy) -> Self {
        NodeValidator {
            name: String::new(),
            services: vec![],
            aliases: vec![],
            address: String::new(),
            client_policy,
            use_new_info: true,
            version: Version::default(),
        }
    }

    #[allow(clippy::option_if_let_else)]
    pub async fn validate_node(&mut self, cluster: &Cluster, host: &Host) -> Result<()> {
        self.resolve_aliases(host)
            .map_err(|e| e.chain_error("Failed to resolve host aliases"))?;

        let mut last_err = None;
        for alias in &self.aliases() {
            match self.validate_alias(cluster, alias).await {
                Ok(()) => return Ok(()),
                Err(err) => {
                    debug!("Alias {alias} failed: {err:?}");
                    last_err = Some(err);
                }
            }
        }

        match last_err {
            Some(err) => Err(err),
            None => unreachable!(),
        }
    }

    pub fn aliases(&self) -> Vec<Host> {
        self.aliases.clone()
    }

    pub fn services(&self) -> Vec<Host> {
        self.services.clone()
    }

    fn resolve_aliases(&mut self, host: &Host) -> Result<()> {
        self.aliases = (host.name.as_ref(), host.port)
            .to_socket_addrs()?
            .map(|addr| {
                Host::new_tls(
                    &addr.ip().to_string(),
                    &host.tls_name.clone().unwrap_or_default(),
                    addr.port(),
                )
            })
            .collect();

        debug!("Resolved aliases for host {}: {:?}", host, self.aliases);
        if self.aliases.is_empty() {
            Err(Error::Connection(format!(
                "Failed to find addresses for {host}"
            )))
        } else {
            Ok(())
        }
    }

    async fn validate_alias(&mut self, cluster: &Cluster, alias: &Host) -> Result<()> {
        let mut conn =
            Connection::new(alias, &self.client_policy, cluster.hashed_pass().as_ref()).await?;
        let service_name = cluster.client_policy.load().service_string();
        let admin_policy = AdminPolicy {
            timeout: self.client_policy.timeout,
        };
        let info_map = Message::info(
            &admin_policy,
            &mut conn,
            &["node", "cluster-name", "build", service_name],
        )
        .await?;

        match info_map.get("node") {
            None => return Err(Error::InvalidNode(String::from("Missing node name"))),
            Some(node_name) => self.name.clone_from(node_name),
        }

        if let Some(ref cluster_name) = cluster.cluster_name() {
            match info_map.get("cluster-name") {
                None => return Err(Error::InvalidNode(String::from("Missing cluster name"))),
                Some(info_name) if info_name == cluster_name => {}
                Some(info_name) => {
                    return Err(Error::InvalidNode(format!(
                        "Cluster name mismatch: expected={cluster_name},
                                                         got={info_name}"
                    )))
                }
            }
        }

        self.address = alias.address();

        if let Some(build) = info_map.get("build") {
            let version = VersionParser::new(build).parse()?;
            self.version = version;
        }

        if let Some(peers) = info_map.get(service_name) {
            if !peers.trim().is_empty() {
                self.set_services(alias, peers);
            }
        }

        Ok(())
    }

    fn set_services(&mut self, alias: &Host, peers: &str) {
        let peers = peers.split(';');
        for peer in peers {
            match peer.to_hosts() {
                Err(e) => error!("Invalid host: {peer}, {e}"),
                Ok(host) => {
                    let mut host: Vec<Host> = host
                        .into_iter()
                        .map(|mut h| {
                            h.tls_name.clone_from(&alias.tls_name);
                            if let Some(ref ip_map) = self.client_policy.ip_map {
                                if let Some(mapped) = ip_map.get(&h.name) {
                                    h.name = mapped.clone();
                                }
                            }
                            h
                        })
                        .collect();
                    self.services.append(&mut host);
                }
            }
        }
    }
}