use std::collections::{HashMap, HashSet};
use std::fmt;
use std::hash::{Hash, Hasher};
use std::result::Result as StdResult;
use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
use std::sync::Arc;
use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
use hazarc::AtomicArc;
use crate::cluster::node_validator::NodeValidator;
use crate::cluster::peers_parser::PeersParser;
use crate::cluster::CLIENT_VERSION;
use crate::commands::Message;
use crate::errors::{Error, Result};
use crate::net::{ConnectionPool, Host, PooledConnection};
use crate::policy::{AdminPolicy, ClientPolicy};
use crate::Version;
pub const PARTITIONS: usize = 4096;
pub const PARTITION_GENERATION: &str = "partition-generation";
pub const REBALANCE_GENERATION: &str = "rebalance-generation";
#[derive(Debug)]
pub struct Node {
client_policy: ClientPolicy,
name: String,
host: Host,
aliases: AtomicArc<Vec<Host>>,
address: String,
connection_pool: ConnectionPool,
failures: AtomicUsize,
partition_generation: AtomicIsize,
rebalance_generation: AtomicIsize,
rack_ids: AtomicArc<HashMap<String, usize>>,
refresh_count: AtomicUsize,
reference_count: AtomicUsize,
responded: AtomicBool,
active: AtomicBool,
version: Version,
}
impl Drop for Node {
fn drop(&mut self) {
debug!("Node closed {self}");
self.close();
self.connection_pool.close();
}
}
impl Node {
#![allow(missing_docs)]
pub fn new(client_policy: ClientPolicy, nv: Arc<NodeValidator>) -> Self {
Node {
client_policy: client_policy.clone(),
name: nv.name.clone(),
aliases: AtomicArc::from(nv.aliases.clone()),
address: nv.address.clone(),
host: nv.aliases[0].clone(),
rebalance_generation: AtomicIsize::new(if client_policy.rack_ids.is_some() {
-1
} else {
0
}),
connection_pool: ConnectionPool::new(nv.aliases[0].clone(), client_policy),
failures: AtomicUsize::new(0),
partition_generation: AtomicIsize::new(-1),
refresh_count: AtomicUsize::new(0),
reference_count: AtomicUsize::new(0),
responded: AtomicBool::new(false),
active: AtomicBool::new(true),
version: nv.version.clone(),
rack_ids: AtomicArc::from(HashMap::new()),
}
}
pub fn address(&self) -> &str {
&self.address
}
pub fn name(&self) -> &str {
&self.name
}
pub const fn version(&self) -> &Version {
&self.version
}
pub const fn client_policy(&self) -> &ClientPolicy {
&self.client_policy
}
pub fn host(&self) -> Host {
self.host.clone()
}
pub fn reference_count(&self) -> usize {
self.reference_count.load(Ordering::Relaxed)
}
pub(crate) fn has_responded(&self) -> bool {
self.responded.load(Ordering::Relaxed)
}
pub async fn refresh(&self, current_aliases: HashMap<Host, Arc<Node>>) -> Result<Vec<Host>> {
self.reference_count.store(0, Ordering::Relaxed);
self.responded.store(false, Ordering::Relaxed);
self.refresh_count.fetch_add(1, Ordering::Relaxed);
let mut commands = vec![
"node",
"cluster-name",
PARTITION_GENERATION,
self.client_policy.peers_string(),
];
if self.client_policy.rack_ids.is_some() {
commands.push(REBALANCE_GENERATION);
}
let admin_policy = AdminPolicy {
timeout: self.client_policy.timeout,
};
let info_map = self
.info(&admin_policy, &commands)
.await
.map_err(|e| e.chain_error("Info command failed"))?;
self.validate_node(&info_map)
.map_err(|e| e.chain_error("Failed to validate node"))?;
self.responded.store(true, Ordering::Relaxed);
let friends = self
.add_friends(current_aliases, &info_map)
.map_err(|e| e.chain_error("Failed to add friends"))?;
self.update_partitions(&info_map)
.map_err(|e| e.chain_error("Failed to update partitions"))?;
self.update_rebalance_generation(&info_map)
.map_err(|e| e.chain_error("Failed to update rebalance generation"))?;
self.reset_failures();
let _ = self.fill_min_conns().await;
Ok(friends)
}
fn validate_node(&self, info_map: &HashMap<String, String>) -> Result<()> {
self.verify_node_name(info_map)?;
self.verify_cluster_name(info_map)?;
Ok(())
}
fn verify_node_name(&self, info_map: &HashMap<String, String>) -> Result<()> {
match info_map.get("node") {
None => Err(Error::InvalidNode("Missing node name".to_string())),
Some(info_name) if info_name == &self.name => Ok(()),
Some(info_name) => {
self.inactivate();
Err(Error::InvalidNode(format!(
"Node name has changed: '{}' => '{}'",
self.name, info_name
)))
}
}
}
#[allow(clippy::option_if_let_else)]
fn verify_cluster_name(&self, info_map: &HashMap<String, String>) -> Result<()> {
match self.client_policy.cluster_name {
None => Ok(()),
Some(ref expected) => match info_map.get("cluster-name") {
None => Err(Error::InvalidNode("Missing cluster name".to_string())),
Some(info_name) if info_name == expected => Ok(()),
Some(info_name) => {
self.inactivate();
Err(Error::InvalidNode(format!(
"Cluster name mismatch: expected={expected},
got={info_name}"
)))
}
},
}
}
fn add_friends(
&self,
current_aliases: HashMap<Host, Arc<Node>>,
info_map: &HashMap<String, String>,
) -> Result<Vec<Host>> {
let mut friends: Vec<Host> = vec![];
let friend_string = match info_map.get(self.client_policy.peers_string()) {
None => return Err(Error::BadResponse("Missing services list".to_string())),
Some(friend_string) if friend_string.is_empty() => return Ok(friends),
Some(friend_string) => friend_string,
};
let (_, hosts) = PeersParser::new(friend_string).parse()?;
for mut alias in hosts {
if let Some(ref ip_map) = self.client_policy.ip_map {
if let Some(mapped) = ip_map.get(&alias.name) {
alias.name.clone_from(mapped);
}
}
if current_aliases.contains_key(&alias) {
self.reference_count.fetch_add(1, Ordering::Relaxed);
} else if !friends.contains(&alias) {
friends.push(alias);
}
}
Ok(friends)
}
pub fn update_partitions(&self, info_map: &HashMap<String, String>) -> Result<()> {
match info_map.get(PARTITION_GENERATION) {
None => {
return Err(Error::BadResponse(
"Missing partition generation".to_string(),
))
}
Some(gen_string) => {
let gen = gen_string.parse::<isize>()?;
self.partition_generation.store(gen, Ordering::Relaxed);
}
}
Ok(())
}
pub fn update_rebalance_generation(&self, info_map: &HashMap<String, String>) -> Result<()> {
if let Some(gen_string) = info_map.get(REBALANCE_GENERATION) {
let gen = gen_string.parse::<isize>()?;
self.rebalance_generation.store(gen, Ordering::Relaxed);
}
Ok(())
}
pub fn is_in_rack(&self, namespace: &str, rack_ids: &HashSet<usize>) -> bool {
self.rack_ids
.load()
.get(namespace)
.is_some_and(|r| rack_ids.contains(r))
}
pub fn parse_rack(&self, buf: &str) -> Result<()> {
let new_table = buf
.split(';')
.map(|entry| {
let (key, val) = entry
.split_once(':')
.ok_or(Error::BadResponse("Invalid rack entry".into()))?;
Ok((key.to_string(), val.parse::<usize>()?))
})
.collect::<Result<HashMap<_, _>>>()?;
self.rack_ids.store(Arc::new(new_table));
Ok(())
}
pub async fn get_connection(&self, hint: u8) -> Result<PooledConnection> {
if !self.is_active() {
return Err(Error::InvalidNode(format!(
"Cannot get a connection for node. The node `{self}` is inactive"
)));
}
if let Ok(conn) = self.connection_pool.get(hint) {
return Ok(conn);
}
self.connection_pool.make_conn(0).await
}
pub fn put_connection(&self, mut pconn: PooledConnection) {
if self.is_active() {
if let Some(conn) = pconn.conn.take() {
pconn.queue.put_back(conn);
}
} else {
pconn.invalidate();
}
}
pub fn failures(&self) -> usize {
self.failures.load(Ordering::Relaxed)
}
fn reset_failures(&self) {
self.failures.store(0, Ordering::Relaxed);
}
pub fn increase_failures(&self) -> usize {
self.failures.fetch_add(1, Ordering::Relaxed)
}
fn inactivate(&self) {
self.active.store(false, Ordering::Relaxed);
}
pub fn is_active(&self) -> bool {
self.active.load(Ordering::Relaxed)
}
pub fn aliases(&self) -> Vec<Host> {
self.aliases.load().to_vec()
}
pub fn add_alias(&self, alias: Host) {
let mut aliases = self.aliases();
aliases.push(alias);
self.aliases.store(Arc::new(aliases));
self.reference_count.fetch_add(1, Ordering::Relaxed);
}
pub fn close(&self) {
self.inactivate();
}
pub async fn info(
&self,
policy: &AdminPolicy,
commands: &[&str],
) -> Result<HashMap<String, String>> {
let mut conn = self.get_connection(0).await?;
let res = Message::info(policy, &mut conn, commands).await;
if let Err(e) = res {
conn.invalidate();
return Err(e);
}
self.put_connection(conn);
res
}
pub fn partition_generation(&self) -> isize {
self.partition_generation.load(Ordering::Relaxed)
}
pub fn rebalance_generation(&self) -> isize {
self.rebalance_generation.load(Ordering::Relaxed)
}
pub(crate) async fn send_user_agent_id(&self) {
if !self.version().supports_app_id() {
return;
}
let app_id = self.client_policy().application_id();
let user_agent_id = format!("1,rust-{CLIENT_VERSION},{app_id}");
let user_agent_id = BASE64.encode(&user_agent_id);
let user_agent_command = format!("user-agent-set:value={user_agent_id}");
let policy = AdminPolicy {
timeout: self.client_policy().timeout,
};
let _ = self.info(&policy, &[&user_agent_command]).await;
}
pub(crate) async fn fill_min_conns(&self) -> Result<usize> {
if self.is_active() {
let mut count = 0;
let client_policy = self.client_policy();
if client_policy.min_conns_per_node > 0 {
let to_fill = client_policy.min_conns_per_node - self.connection_pool.num_conns();
for _ in 0..to_fill {
self.connection_pool.make_conn(count).await?;
count += 1;
}
}
Ok(count)
} else {
Err(Error::InvalidNode(format!(
"Cannot fill the connection pool to 'policy.min_conns_per_node'. The node `{self}` is inactive"
)))
}
}
}
impl Hash for Node {
fn hash<H: Hasher>(&self, state: &mut H) {
self.name.hash(state);
}
}
impl PartialEq for Node {
fn eq(&self, other: &Node) -> bool {
self.name == other.name
}
}
impl Eq for Node {}
impl fmt::Display for Node {
fn fmt(&self, f: &mut fmt::Formatter) -> StdResult<(), fmt::Error> {
format!("{}: {}", self.name, self.host).fmt(f)
}
}
#[cfg(test)]
mod node_tests {
use std::sync::Arc;
use crate::cluster::node_validator::NodeValidator;
use crate::errors::Error;
use crate::net::Host;
use crate::policy::ClientPolicy;
use crate::Version;
use super::Node;
fn test_node() -> Node {
let policy = ClientPolicy::default();
let nv = Arc::new(NodeValidator {
name: "test-node".to_string(),
aliases: vec![Host::new("127.0.0.1", 3000)],
services: vec![],
address: "127.0.0.1:3000".to_string(),
client_policy: policy.clone(),
use_new_info: true,
version: Version::default(),
});
Node::new(policy, nv)
}
async fn create_node_with_connection() -> Node {
let node = test_node();
let pconn = node
.connection_pool
.make_conn(0)
.await
.expect("make_conn uses test Connection");
node.put_connection(pconn);
assert_eq!(node.connection_pool.num_conns(), 1);
node
}
#[aerospike_macro::test]
async fn get_connection_returns_invalid_node_when_inactive() {
let node = create_node_with_connection().await;
let before = node.connection_pool.num_conns();
node.close();
assert!(!node.is_active());
let err = node.get_connection(0).await.unwrap_err();
match err {
Error::InvalidNode(msg) => assert!(msg.contains("inactive"), "unexpected: {}", msg),
other => panic!("expected InvalidNode, got {:?}", other),
}
assert_eq!(
node.connection_pool.num_conns(),
before,
"inactive node must not open or hand out pool connections"
);
}
#[aerospike_macro::test]
async fn put_connection_does_not_return_conn_to_pool_when_inactive() {
let node = create_node_with_connection().await;
let pconn = node
.get_connection(0)
.await
.expect("active node with one mock conn in pool");
assert_eq!(node.connection_pool.num_conns(), 0);
node.close();
assert!(!node.is_active());
node.put_connection(pconn);
assert_eq!(
node.connection_pool.num_conns(),
0,
"inactive node must not return connections to the pool"
);
}
#[aerospike_macro::test]
async fn node_drop_inactivates_and_closes_pool_when_last_arc_dropped() {
let arc = Arc::new(create_node_with_connection().await);
let queue_witness = {
let pconn = arc
.get_connection(0)
.await
.expect("pool should have one connection");
let q = pconn.queue.clone();
arc.put_connection(pconn);
q
};
let weak = Arc::downgrade(&arc);
assert_eq!(Arc::strong_count(&arc), 1);
assert!(arc.is_active());
assert_eq!(arc.connection_pool.num_conns(), 1);
drop(arc);
assert!(
weak.upgrade().is_none(),
"expected Node to be dropped after the last Arc was released"
);
assert_eq!(
queue_witness.num_conns(),
0,
"Node::drop should clear pooled connections"
);
}
}