use super::KeyspaceChanRx;
use super::connection::CassandraConnection;
use super::node::{CassandraNode, ConnectionFactory};
use super::routing_key::calculate_routing_key;
use super::token_ring::TokenRing;
use anyhow::{Context, Error, Result, anyhow};
use cassandra_protocol::frame::message_execute::BodyReqExecuteOwned;
use cassandra_protocol::types::CBytesShort;
use metrics::{Counter, counter};
use rand::prelude::*;
use std::sync::Arc;
use std::{collections::HashMap, net::SocketAddr};
use tokio::sync::{RwLock, watch};
#[derive(Debug, Clone)]
pub struct PreparedMetadata {
pub pk_indexes: Vec<i16>,
pub keyspace: Option<String>,
}
#[derive(Debug)]
pub enum GetReplicaErr {
NoPreparedMetadata,
NoKeyspaceMetadata,
NoNodeAvailable(anyhow::Error),
Other(Error),
NoRoutingKey,
}
#[derive(Debug, Clone, PartialEq)]
pub enum ReplicationStrategy {
SimpleStrategy,
NetworkTopologyStrategy,
LocalStrategy,
Unknown,
}
#[derive(Debug, Clone, PartialEq)]
pub struct KeyspaceMetadata {
pub replication_factor: usize,
pub replication_strategy: ReplicationStrategy,
}
#[derive(Clone)]
pub struct NodePoolBuilder {
prepared_metadata: Arc<RwLock<HashMap<CBytesShort, Arc<PreparedMetadata>>>>,
out_of_rack_requests: Counter,
}
impl NodePoolBuilder {
pub fn new(chain_name: String) -> Self {
Self {
prepared_metadata: Arc::new(RwLock::new(HashMap::new())),
out_of_rack_requests: counter!("shotover_out_of_rack_requests_count", "chain" => chain_name, "transform" => "CassandraSinkCluster"),
}
}
pub fn build(&self) -> NodePool {
NodePool {
prepared_metadata: self.prepared_metadata.clone(),
keyspace_metadata: HashMap::new(),
token_map: TokenRing::new(&[]),
nodes: vec![],
out_of_rack_requests: self.out_of_rack_requests.clone(),
}
}
}
pub struct NodePool {
prepared_metadata: Arc<RwLock<HashMap<CBytesShort, Arc<PreparedMetadata>>>>,
keyspace_metadata: HashMap<String, KeyspaceMetadata>,
token_map: TokenRing,
nodes: Vec<CassandraNode>,
out_of_rack_requests: Counter,
}
impl NodePool {
pub fn nodes_mut(&mut self) -> &mut [CassandraNode] {
&mut self.nodes
}
pub fn nodes(&self) -> &[CassandraNode] {
&self.nodes
}
pub fn update_nodes(&mut self, nodes_rx: &mut watch::Receiver<Vec<CassandraNode>>) {
let mut new_nodes = nodes_rx.borrow_and_update().clone();
for node in self.nodes.drain(..) {
if let Some(outbound) = node.outbound {
for new_node in &mut new_nodes {
if new_node.host_id == node.host_id && new_node.is_up {
new_node.outbound = Some(outbound);
break;
}
}
}
}
self.nodes = new_nodes;
self.token_map = TokenRing::new(self.nodes.as_slice());
tracing::debug!(
"nodes updated, nodes={:#?}\ntokens={:#?}",
self.nodes,
self.token_map
);
}
pub fn update_keyspaces(&mut self, keyspaces_rx: &mut KeyspaceChanRx) {
let updated_keyspaces = keyspaces_rx.borrow_and_update().clone();
self.keyspace_metadata = updated_keyspaces;
}
pub async fn add_prepared_result(&mut self, id: CBytesShort, metadata: PreparedMetadata) {
let mut write_lock = self.prepared_metadata.write().await;
write_lock.insert(id, Arc::new(metadata));
}
pub async fn get_random_node_in_dc_rack(
&mut self,
rack: &str,
rng: &mut SmallRng,
connection_factory: &ConnectionFactory,
) -> Result<&mut CassandraNode> {
let mut nodes: Vec<_> = self
.nodes
.iter_mut()
.filter(|node| node.is_up && node.rack == *rack)
.collect();
nodes.shuffle(rng);
get_accessible_node(connection_factory, nodes)
.await
.with_context(|| {
format!("Failed to open a connection to any nodes in the rack {rack:?}")
})
}
pub async fn get_random_connection_in_dc_rack(
&mut self,
rack: &str,
rng: &mut SmallRng,
connection_factory: &ConnectionFactory,
) -> Result<&mut CassandraConnection> {
self.get_random_node_in_dc_rack(rack, rng, connection_factory)
.await
.map(|x| {
x.outbound
.as_mut()
.expect("it is set to Some by get_random_node_in_dc_rack")
})
}
pub async fn get_random_owned_connection_in_dc_rack(
&mut self,
rack: &str,
rng: &mut SmallRng,
connection_factory: &ConnectionFactory,
) -> Result<(CassandraConnection, SocketAddr)> {
self.get_random_node_in_dc_rack(rack, rng, connection_factory)
.await
.map(|x| {
(
x.outbound
.take()
.expect("it is set to Some by get_random_node_in_dc_rack"),
x.address,
)
})
}
pub async fn get_replica_node_in_dc(
&mut self,
execute: &BodyReqExecuteOwned,
rack: &str,
rng: &mut SmallRng,
) -> Result<Vec<&mut CassandraNode>, GetReplicaErr> {
let metadata = {
let read_lock = self.prepared_metadata.read().await;
read_lock
.get(&execute.id)
.ok_or(GetReplicaErr::NoPreparedMetadata)?
.clone()
};
let keyspace = self
.keyspace_metadata
.get(
metadata
.keyspace
.as_ref()
.ok_or(GetReplicaErr::NoKeyspaceMetadata)?,
)
.ok_or(GetReplicaErr::NoKeyspaceMetadata)?;
let routing_key = calculate_routing_key(
&metadata.pk_indexes,
execute.query_parameters.values.as_ref().ok_or_else(|| {
GetReplicaErr::Other(anyhow!("Execute body does not have query parameters"))
})?,
)
.ok_or(GetReplicaErr::NoRoutingKey)?;
let replica_host_ids = self
.token_map
.iter_replica_nodes(self.nodes(), routing_key, keyspace)
.collect::<Vec<uuid::Uuid>>();
let mut nodes: Vec<&mut CassandraNode> = self
.nodes
.iter_mut()
.filter(|node| replica_host_ids.contains(&node.host_id) && node.is_up)
.collect();
nodes.shuffle(rng);
let mut nodes_found_in_rack = 0;
for i in 0..nodes.len() {
if nodes[i].rack == rack {
nodes.swap(i, nodes_found_in_rack);
nodes_found_in_rack += 1;
}
}
if nodes_found_in_rack == 0 {
#[cfg(debug_assertions)]
tracing::warn!(
"No suitable nodes to route to found within rack. This error only occurs in debug builds as it should never occur in an ideal integration test situation."
);
self.out_of_rack_requests.increment(1);
}
tracing::debug!(
"Shotover with designated rack {rack:?} found replica nodes {replica_host_ids:?}"
);
Ok(nodes)
}
pub async fn get_replica_connection_in_dc(
&mut self,
execute: &BodyReqExecuteOwned,
rack: &str,
rng: &mut SmallRng,
connection_factory: &ConnectionFactory,
) -> Result<&mut CassandraConnection, GetReplicaErr> {
let nodes = self.get_replica_node_in_dc(execute, rack, rng).await?;
get_accessible_node(connection_factory, nodes)
.await
.context("Failed to open a connection to any replicas of a specific token")
.map_err(GetReplicaErr::NoNodeAvailable)
.map(|x| {
x.outbound
.as_mut()
.expect("it is set to Some by get_accessible_node")
})
}
}
pub struct AddressError {
pub address: SocketAddr,
pub error: anyhow::Error,
}
pub fn bullet_list_of_node_failures<'a, I: Iterator<Item = &'a AddressError>>(errors: I) -> String {
let mut node_errors = String::new();
for AddressError { error, address } in errors {
node_errors.push_str(&format!("\n* {address:?}:"));
for sub_error in error.chain() {
node_errors.push_str(&format!("\n - {sub_error}"));
}
}
node_errors
}
pub async fn get_accessible_node<'a>(
connection_factory: &ConnectionFactory,
nodes: Vec<&'a mut CassandraNode>,
) -> Result<&'a mut CassandraNode> {
let mut errors = vec![];
for node in nodes {
match node.get_connection(connection_factory).await {
Ok(_) => {
if !errors.is_empty() {
let node_errors = bullet_list_of_node_failures(errors.iter());
tracing::warn!(
"A successful connection to a node was made but attempts to connect to these nodes failed first:{node_errors}"
);
}
return Ok(node);
}
Err(error) => {
node.report_issue();
errors.push(AddressError {
error,
address: node.address,
});
}
}
}
let node_errors = bullet_list_of_node_failures(errors.iter());
Err(anyhow!(
"Attempted to open a connection to one of multiple nodes but all attempts failed:{node_errors}"
))
}
pub async fn get_accessible_owned_connection(
connection_factory: &ConnectionFactory,
nodes: Vec<&'_ mut CassandraNode>,
) -> Result<(CassandraConnection, SocketAddr)> {
get_accessible_node(connection_factory, nodes)
.await
.map(|x| {
(
x.outbound
.take()
.expect("it is set to Some by get_random_node_in_dc_rack"),
x.address,
)
})
}