use crate::cluster::connections::ConnectionFuture;
use crate::cluster::routing::{Route, ShardAddrs, SlotAddr};
use crate::cluster::slotmap::{ReadFromReplicaStrategy, SlotMap, SlotMapValue};
use crate::cluster::topology::TopologyHash;
use dashmap::DashMap;
use futures::FutureExt;
use rand::seq::IteratorRandom;
use std::collections::HashMap;
use std::net::IpAddr;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use tracing::debug;
use tokio::sync::Notify;
use tokio::task::JoinHandle;
macro_rules! count_connections {
($conn_map:expr) => {{
let mut count = 0usize;
for a in $conn_map {
count = count.saturating_add(if a.management_connection.is_some() {
2
} else {
1
});
}
count
}};
}
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct ConnectionDetails<Connection> {
pub conn: Connection,
pub ip: Option<IpAddr>,
pub az: Option<String>,
}
impl<Connection> ConnectionDetails<Connection>
where
Connection: Clone + Send + 'static,
{
#[doc(hidden)]
pub fn into_future(self) -> ConnectionDetails<ConnectionFuture<Connection>> {
ConnectionDetails {
conn: async { self.conn }.boxed().shared(),
ip: self.ip,
az: self.az,
}
}
}
impl<Connection> From<(Connection, Option<IpAddr>, Option<String>)>
for ConnectionDetails<Connection>
{
fn from(val: (Connection, Option<IpAddr>, Option<String>)) -> Self {
ConnectionDetails {
conn: val.0,
ip: val.1,
az: val.2,
}
}
}
impl<Connection> From<ConnectionDetails<Connection>>
for (Connection, Option<IpAddr>, Option<String>)
{
fn from(val: ConnectionDetails<Connection>) -> Self {
(val.conn, val.ip, val.az)
}
}
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct ClusterNode<Connection> {
pub user_connection: ConnectionDetails<Connection>,
pub management_connection: Option<ConnectionDetails<Connection>>,
}
impl<Connection> ClusterNode<Connection>
where
Connection: Clone,
{
pub fn new(
user_connection: ConnectionDetails<Connection>,
management_connection: Option<ConnectionDetails<Connection>>,
) -> Self {
Self {
user_connection,
management_connection,
}
}
pub fn connections_count(&self) -> usize {
if self.management_connection.is_some() {
2
} else {
1
}
}
pub(crate) fn get_connection(&self, conn_type: &ConnectionType) -> Connection {
match conn_type {
ConnectionType::User => self.user_connection.conn.clone(),
ConnectionType::PreferManagement => self.management_connection.as_ref().map_or_else(
|| self.user_connection.conn.clone(),
|management_conn| management_conn.conn.clone(),
),
}
}
}
#[derive(Clone, Eq, PartialEq, Debug)]
pub(crate) enum ConnectionType {
User,
PreferManagement,
}
pub(crate) struct ConnectionsMap<Connection>(pub(crate) DashMap<Arc<str>, ClusterNode<Connection>>);
impl<Connection> std::fmt::Display for ConnectionsMap<Connection> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for item in self.0.iter() {
let (address, node) = (item.key(), item.value());
match node.user_connection.ip {
Some(ip) => writeln!(f, "{address} - {ip}")?,
None => writeln!(f, "{address}")?,
};
}
Ok(())
}
}
#[derive(Clone, Debug)]
pub(crate) struct RefreshTaskNotifier {
notify: Arc<Notify>,
}
impl RefreshTaskNotifier {
pub fn new() -> Self {
RefreshTaskNotifier {
notify: Arc::new(Notify::new()),
}
}
pub fn get_notifier(&self) -> Arc<Notify> {
self.notify.clone()
}
pub fn notify(&self) {
self.notify.notify_waiters();
}
}
#[derive(Clone, Debug)]
pub(crate) enum RefreshTaskStatus {
Reconnecting(RefreshTaskNotifier),
#[allow(dead_code)]
ReconnectingTooLong,
}
impl Drop for RefreshTaskStatus {
fn drop(&mut self) {
if let RefreshTaskStatus::Reconnecting(notifier) = self {
debug!("RefreshTaskStatus: Dropped while in Reconnecting status. Notifying tasks.");
notifier.notify();
}
}
}
impl RefreshTaskStatus {
pub fn with_notifier(notifier: RefreshTaskNotifier) -> Self {
debug!("RefreshTaskStatus: Initialized in Reconnecting status with a provided notifier.");
RefreshTaskStatus::Reconnecting(notifier)
}
#[allow(dead_code)]
pub fn flip_status_to_too_long(&mut self) {
if let RefreshTaskStatus::Reconnecting(notifier) = self {
debug!(
"RefreshTaskStatus: Notifying tasks before transitioning to ReconnectingTooLong."
);
notifier.notify();
*self = RefreshTaskStatus::ReconnectingTooLong;
} else {
debug!("RefreshTaskStatus: Already in ReconnectingTooLong status.");
}
}
}
#[derive(Debug)]
pub(crate) struct RefreshTaskState {
pub handle: JoinHandle<()>,
pub status: RefreshTaskStatus,
}
impl RefreshTaskState {
pub fn new(handle: JoinHandle<()>, notifier: RefreshTaskNotifier) -> Self {
debug!("RefreshTaskState: Creating a new instance with a Reconnecting state.");
RefreshTaskState {
handle,
status: RefreshTaskStatus::with_notifier(notifier),
}
}
}
impl Drop for RefreshTaskState {
fn drop(&mut self) {
if let RefreshTaskStatus::Reconnecting(ref notifier) = self.status {
debug!("RefreshTaskState: Dropped while in Reconnecting status. Notifying tasks.");
notifier.notify();
} else {
debug!("RefreshTaskState: Dropped while in ReconnectingTooLong status.");
}
if !self.handle.is_finished() {
debug!("RefreshTaskState: Aborting unfinished task.");
self.handle.abort();
} else {
debug!("RefreshTaskState: Task already finished, no abort necessary.");
}
}
}
#[derive(Default)]
pub(crate) struct RefreshConnectionStates {
pub(crate) refresh_address_in_progress: HashMap<Arc<str>, RefreshTaskState>,
}
impl RefreshConnectionStates {
pub(crate) fn clear_refresh_state(&mut self) {
debug!(
"clear_refresh_state: removing all in-progress refresh connection tasks for addresses: {:?}",
self.refresh_address_in_progress.keys()
);
self.refresh_address_in_progress.clear();
}
}
pub(crate) struct ConnectionsContainer<Connection> {
connection_map: DashMap<Arc<str>, ClusterNode<Connection>>,
pub(crate) slot_map: SlotMap,
read_from_replica_strategy: ReadFromReplicaStrategy,
topology_hash: TopologyHash,
pub(crate) refresh_conn_state: RefreshConnectionStates,
}
impl<Connection> Drop for ConnectionsContainer<Connection> {
fn drop(&mut self) {
let count = count_connections!(&self.connection_map);
tracing::debug!(target: "ferriskey", event = "connections_removed", count = count as u64, "ferriskey: cluster connections removed");
}
}
impl<Connection> Default for ConnectionsContainer<Connection> {
fn default() -> Self {
Self {
connection_map: Default::default(),
slot_map: Default::default(),
read_from_replica_strategy: ReadFromReplicaStrategy::AlwaysFromPrimary,
topology_hash: 0,
refresh_conn_state: Default::default(),
}
}
}
pub(crate) type ConnectionAndAddress<Connection> = (Arc<str>, Connection);
impl<Connection> ConnectionsContainer<Connection>
where
Connection: Clone,
{
pub(crate) fn new(
slot_map: SlotMap,
connection_map: ConnectionsMap<Connection>,
read_from_replica_strategy: ReadFromReplicaStrategy,
topology_hash: TopologyHash,
) -> Self {
let connection_map = connection_map.0;
let count = count_connections!(&connection_map);
tracing::debug!(target: "ferriskey", event = "connections_added", count = count as u64, "ferriskey: cluster connections added");
Self {
connection_map,
slot_map,
read_from_replica_strategy,
topology_hash,
refresh_conn_state: Default::default(),
}
}
pub(crate) fn slot_map_nodes(
&self,
) -> impl Iterator<Item = (Arc<String>, (Option<IpAddr>, Arc<ShardAddrs>))> + '_ {
self.slot_map
.nodes_map()
.iter()
.map(|item| (item.key().clone(), item.value().clone()))
}
pub(crate) fn extend_connection_map(
&mut self,
other_connection_map: ConnectionsMap<Connection>,
) {
let conn_count_before = count_connections!(&self.connection_map);
self.connection_map.extend(other_connection_map.0);
let conn_count_after = count_connections!(&self.connection_map);
let added = conn_count_after.saturating_sub(conn_count_before);
tracing::debug!(
target: "ferriskey",
event = "connections_added",
count = added as u64,
"ferriskey: cluster connections added"
);
}
pub(crate) fn az_for_address(&self, address: &str) -> Option<String> {
self.connection_map
.get(address)
.and_then(|item| item.value().user_connection.az.clone())
}
pub(crate) fn is_primary(&self, address: &str) -> bool {
self.connection_for_address(address).is_some()
&& self.slot_map.is_primary(&address.to_string())
}
fn round_robin_read_from_replica(
&self,
slot_map_value: &SlotMapValue,
) -> Option<ConnectionAndAddress<Connection>> {
let addrs = &slot_map_value.addrs;
let initial_index = slot_map_value.last_used_replica.load(Ordering::Relaxed);
let mut check_count = 0;
loop {
check_count += 1;
if check_count > addrs.replicas().len() {
return self.connection_for_address(addrs.primary().as_str());
}
let index = (initial_index + check_count) % addrs.replicas().len();
if let Some(connection) = self.connection_for_address(addrs.replicas()[index].as_str())
{
let _ = slot_map_value.last_used_replica.compare_exchange_weak(
initial_index,
index,
Ordering::Relaxed,
Ordering::Relaxed,
);
return Some(connection);
}
}
}
pub(crate) fn round_robin_read_from_replica_with_az_awareness(
&self,
slot_map_value: &SlotMapValue,
client_az: String,
) -> Option<ConnectionAndAddress<Connection>> {
self.get_connection_by_az_affinity_strategy(slot_map_value, client_az, false)
}
pub(crate) fn round_robin_read_from_replica_with_az_awareness_replicas_and_primary(
&self,
slot_map_value: &SlotMapValue,
client_az: String,
) -> Option<ConnectionAndAddress<Connection>> {
self.get_connection_by_az_affinity_strategy(slot_map_value, client_az, true)
}
fn get_connection_by_az_affinity_strategy(
&self,
slot_map_value: &SlotMapValue,
client_az: String,
check_primary: bool, ) -> Option<ConnectionAndAddress<Connection>> {
let addrs = &slot_map_value.addrs;
let initial_index = slot_map_value.last_used_replica.load(Ordering::Relaxed);
let mut retries = 0usize;
loop {
retries = retries.saturating_add(1);
if retries > addrs.replicas().len() {
break;
}
let index = (initial_index + retries) % addrs.replicas().len();
let replica = &addrs.replicas()[index];
if let Some((address, connection_details)) =
self.connection_details_for_address(replica.as_str())
&& self.az_for_address(&address) == Some(client_az.clone())
{
let _ = slot_map_value.last_used_replica.compare_exchange_weak(
initial_index,
index,
Ordering::Relaxed,
Ordering::Relaxed,
);
return Some((address, connection_details.conn));
}
}
if check_primary
&& let Some((address, connection_details)) =
self.connection_details_for_address(addrs.primary().as_str())
&& self.az_for_address(&address) == Some(client_az)
{
return Some((address, connection_details.conn));
}
self.round_robin_read_from_replica(slot_map_value)
}
fn lookup_route(&self, route: &Route) -> Option<ConnectionAndAddress<Connection>> {
let slot_map_value = self.slot_map.slot_value_for_route(route)?;
let addrs = &slot_map_value.addrs;
if addrs.replicas().is_empty() {
return self.connection_for_address(addrs.primary().as_str());
}
match route.slot_addr() {
SlotAddr::Master => self.connection_for_address(addrs.primary().as_str()),
SlotAddr::ReplicaOptional => match &self.read_from_replica_strategy {
ReadFromReplicaStrategy::AlwaysFromPrimary => {
self.connection_for_address(addrs.primary().as_str())
}
ReadFromReplicaStrategy::RoundRobin => {
self.round_robin_read_from_replica(slot_map_value)
}
ReadFromReplicaStrategy::AZAffinity(az) => self
.round_robin_read_from_replica_with_az_awareness(
slot_map_value,
az.to_string(),
),
ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(az) => self
.round_robin_read_from_replica_with_az_awareness_replicas_and_primary(
slot_map_value,
az.to_string(),
),
},
SlotAddr::ReplicaRequired => match &self.read_from_replica_strategy {
ReadFromReplicaStrategy::AZAffinity(az) => self
.round_robin_read_from_replica_with_az_awareness(
slot_map_value,
az.to_string(),
),
ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(az) => self
.round_robin_read_from_replica_with_az_awareness_replicas_and_primary(
slot_map_value,
az.to_string(),
),
_ => self.round_robin_read_from_replica(slot_map_value),
},
}
}
pub(crate) fn connection_for_route(
&self,
route: &Route,
) -> Option<ConnectionAndAddress<Connection>> {
self.lookup_route(route).or_else(|| {
if route.slot_addr() != SlotAddr::Master {
self.lookup_route(&Route::new(route.slot(), SlotAddr::Master))
} else {
None
}
})
}
pub(crate) fn address_for_route(&self, route: &Route) -> Option<Arc<str>> {
let slot_map_value = self.slot_map.slot_value_for_route(route)?;
Some(Arc::from(slot_map_value.addrs.primary().as_str()))
}
pub(crate) fn notifier_for_route(&self, route: &Route) -> Option<Arc<Notify>> {
let address = self.address_for_route(route)?;
if let Some(task_state) = self
.refresh_conn_state
.refresh_address_in_progress
.get(&address)
{
match &task_state.status {
RefreshTaskStatus::Reconnecting(notifier) => {
debug!(
"notifier_for_route: Found reconnect notifier for address: {}",
address
);
Some(notifier.get_notifier())
}
RefreshTaskStatus::ReconnectingTooLong => {
debug!(
"notifier_for_route: Address {} is in ReconnectingTooLong state. No notifier will be returned.",
address
);
None
}
}
} else {
debug!(
"notifier_for_route: No refresh task exists for address: {}. No notifier will be returned.",
address
);
None
}
}
pub(crate) fn all_node_connections(
&self,
) -> impl Iterator<Item = ConnectionAndAddress<Connection>> + '_ {
self.connection_map.iter().map(move |item| {
let (node, address) = (item.key(), item.value());
(node.clone(), address.user_connection.conn.clone())
})
}
pub(crate) fn all_primary_connections(
&self,
) -> impl Iterator<Item = ConnectionAndAddress<Connection>> + '_ {
self.slot_map
.addresses_for_all_primaries()
.into_iter()
.flat_map(|addr| self.connection_for_address(&addr))
}
pub(crate) fn node_for_address(&self, address: &str) -> Option<ClusterNode<Connection>> {
self.connection_map
.get(address)
.map(|item| item.value().clone())
}
pub(crate) fn connection_for_address(
&self,
address: &str,
) -> Option<ConnectionAndAddress<Connection>> {
self.connection_map.get(address).map(|item| {
let (address, conn) = (item.key(), item.value());
(address.clone(), conn.user_connection.conn.clone())
})
}
pub(crate) fn management_connection_for_address(
&self,
address: &str,
) -> Option<ConnectionAndAddress<Connection>> {
self.connection_map.get(address).map(|item| {
let (address, conn) = (item.key(), item.value());
(
address.clone(),
conn.get_connection(&ConnectionType::PreferManagement),
)
})
}
pub(crate) fn connection_details_for_address(
&self,
address: &str,
) -> Option<ConnectionAndAddress<ConnectionDetails<Connection>>> {
self.connection_map.get(address).map(|item| {
let (address, conn) = (item.key(), item.value());
(address.clone(), conn.user_connection.clone())
})
}
pub(crate) fn random_connections(
&self,
amount: usize,
conn_type: ConnectionType,
) -> Option<Vec<ConnectionAndAddress<Connection>>> {
(!self.connection_map.is_empty()).then_some({
self.connection_map
.iter()
.choose_multiple(&mut rand::rng(), amount)
.into_iter()
.map(move |item| {
let (address, node) = (item.key(), item.value());
let conn = node.get_connection(&conn_type);
(address.clone(), conn)
})
.collect::<Vec<_>>()
})
}
pub(crate) fn replace_or_add_connection_for_address(
&self,
address: impl Into<Arc<str>>,
node: ClusterNode<Connection>,
) -> Arc<str> {
let address: Arc<str> = address.into();
let added = node.connections_count() as u64;
tracing::debug!(
target: "ferriskey",
event = "connections_added",
count = added,
"ferriskey: cluster connections added"
);
if let Some(old_conn) = self.connection_map.insert(Arc::clone(&address), node) {
let removed = old_conn.connections_count() as u64;
tracing::debug!(
target: "ferriskey",
event = "connections_removed",
count = removed,
"ferriskey: cluster connections removed"
);
};
address
}
pub(crate) fn remove_node(&self, address: &str) -> Option<ClusterNode<Connection>> {
if let Some((_key, old_conn)) = self.connection_map.remove(address) {
let removed = old_conn.connections_count() as u64;
tracing::debug!(
target: "ferriskey",
event = "connections_removed",
count = removed,
"ferriskey: cluster connections removed"
);
Some(old_conn)
} else {
None
}
}
pub(crate) fn len(&self) -> usize {
self.connection_map.len()
}
pub(crate) fn get_current_topology_hash(&self) -> TopologyHash {
self.topology_hash
}
pub(crate) fn is_empty(&self) -> bool {
self.connection_map.is_empty()
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use crate::cluster::routing::Slot;
use super::*;
impl<Connection> ClusterNode<Connection>
where
Connection: Clone,
{
pub(crate) fn new_only_with_user_conn(user_connection: Connection) -> Self {
let ip = None;
let az = None;
Self {
user_connection: (user_connection, ip, az).into(),
management_connection: None,
}
}
}
fn remove_nodes(container: &ConnectionsContainer<usize>, addresses: &[&str]) {
for address in addresses {
container.remove_node(address);
}
}
fn remove_all_connections(container: &ConnectionsContainer<usize>) {
remove_nodes(
container,
&[
"primary1",
"primary2",
"primary3",
"replica2-1",
"replica3-1",
"replica3-2",
],
);
}
fn one_of(
connection: Option<ConnectionAndAddress<usize>>,
expected_connections: &[usize],
) -> bool {
let found = connection.unwrap().1;
expected_connections.contains(&found)
}
fn create_cluster_node(
connection: usize,
use_management_connections: bool,
node_az: Option<String>,
) -> ClusterNode<usize> {
let ip = None;
ClusterNode::new(
(connection, ip, node_az.clone()).into(),
if use_management_connections {
Some((connection * 10, ip, node_az).into())
} else {
None
},
)
}
fn create_container_with_az_strategy(
use_management_connections: bool,
strategy: Option<ReadFromReplicaStrategy>,
) -> ConnectionsContainer<usize> {
let slot_map = SlotMap::new(
vec![
Slot::new(1, 1000, "primary1".to_owned(), Vec::new()),
Slot::new(
1002,
2000,
"primary2".to_owned(),
vec!["replica2-1".to_owned()],
),
Slot::new(
2001,
3000,
"primary3".to_owned(),
vec![
"replica3-1".to_owned(),
"replica3-2".to_owned(),
"replica3-3".to_owned(),
],
),
],
HashMap::new(),
ReadFromReplicaStrategy::AlwaysFromPrimary, );
let connection_map = DashMap::new();
connection_map.insert(
"primary1".into(),
create_cluster_node(1, use_management_connections, None),
);
connection_map.insert(
"primary2".into(),
create_cluster_node(2, use_management_connections, None),
);
connection_map.insert(
"primary3".into(),
create_cluster_node(3, use_management_connections, None),
);
connection_map.insert(
"replica2-1".into(),
create_cluster_node(21, use_management_connections, None),
);
connection_map.insert(
"replica3-1".into(),
create_cluster_node(31, use_management_connections, Some("use-1a".to_string())),
);
connection_map.insert(
"replica3-2".into(),
create_cluster_node(32, use_management_connections, Some("use-1b".to_string())),
);
connection_map.insert(
"replica3-3".into(),
create_cluster_node(33, use_management_connections, Some("use-1a".to_string())),
);
ConnectionsContainer {
slot_map,
connection_map,
read_from_replica_strategy: strategy
.unwrap_or(ReadFromReplicaStrategy::AZAffinity("use-1a".to_string())),
topology_hash: 0,
refresh_conn_state: Default::default(),
}
}
fn create_container_with_strategy(
strategy: ReadFromReplicaStrategy,
use_management_connections: bool,
) -> ConnectionsContainer<usize> {
let slot_map = SlotMap::new(
vec![
Slot::new(1, 1000, "primary1".to_owned(), Vec::new()),
Slot::new(
1002,
2000,
"primary2".to_owned(),
vec!["replica2-1".to_owned()],
),
Slot::new(
2001,
3000,
"primary3".to_owned(),
vec!["replica3-1".to_owned(), "replica3-2".to_owned()],
),
],
HashMap::new(),
ReadFromReplicaStrategy::AlwaysFromPrimary, );
let connection_map = DashMap::new();
connection_map.insert(
"primary1".into(),
create_cluster_node(1, use_management_connections, None),
);
connection_map.insert(
"primary2".into(),
create_cluster_node(2, use_management_connections, None),
);
connection_map.insert(
"primary3".into(),
create_cluster_node(3, use_management_connections, None),
);
connection_map.insert(
"replica2-1".into(),
create_cluster_node(21, use_management_connections, None),
);
connection_map.insert(
"replica3-1".into(),
create_cluster_node(31, use_management_connections, None),
);
connection_map.insert(
"replica3-2".into(),
create_cluster_node(32, use_management_connections, None),
);
ConnectionsContainer {
slot_map,
connection_map,
read_from_replica_strategy: strategy,
topology_hash: 0,
refresh_conn_state: Default::default(),
}
}
fn create_container() -> ConnectionsContainer<usize> {
create_container_with_strategy(ReadFromReplicaStrategy::RoundRobin, false)
}
#[test]
fn get_connection_for_primary_route() {
let container = create_container();
assert!(
container
.connection_for_route(&Route::new(0, SlotAddr::Master))
.is_none()
);
assert_eq!(
1,
container
.connection_for_route(&Route::new(500, SlotAddr::Master))
.unwrap()
.1
);
assert_eq!(
1,
container
.connection_for_route(&Route::new(1000, SlotAddr::Master))
.unwrap()
.1
);
assert!(
container
.connection_for_route(&Route::new(1001, SlotAddr::Master))
.is_none()
);
assert_eq!(
2,
container
.connection_for_route(&Route::new(1002, SlotAddr::Master))
.unwrap()
.1
);
assert_eq!(
2,
container
.connection_for_route(&Route::new(1500, SlotAddr::Master))
.unwrap()
.1
);
assert_eq!(
3,
container
.connection_for_route(&Route::new(2001, SlotAddr::Master))
.unwrap()
.1
);
}
#[test]
fn get_connection_for_replica_route() {
let container = create_container();
assert!(
container
.connection_for_route(&Route::new(1001, SlotAddr::ReplicaOptional))
.is_none()
);
assert_eq!(
21,
container
.connection_for_route(&Route::new(1002, SlotAddr::ReplicaOptional))
.unwrap()
.1
);
assert_eq!(
21,
container
.connection_for_route(&Route::new(1500, SlotAddr::ReplicaOptional))
.unwrap()
.1
);
assert!(one_of(
container.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional)),
&[31, 32],
));
}
#[test]
fn get_primary_connection_for_replica_route_if_no_replicas_were_added() {
let container = create_container();
assert!(
container
.connection_for_route(&Route::new(0, SlotAddr::ReplicaOptional))
.is_none()
);
assert_eq!(
1,
container
.connection_for_route(&Route::new(500, SlotAddr::ReplicaOptional))
.unwrap()
.1
);
assert_eq!(
1,
container
.connection_for_route(&Route::new(1000, SlotAddr::ReplicaOptional))
.unwrap()
.1
);
}
#[test]
fn get_replica_connection_for_replica_route_if_some_but_not_all_replicas_were_removed() {
let container = create_container();
container.remove_node("replica3-2");
assert_eq!(
31,
container
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired))
.unwrap()
.1
);
}
#[test]
fn get_replica_connection_for_replica_route_if_replica_is_required_even_if_strategy_is_always_from_primary()
{
let container =
create_container_with_strategy(ReadFromReplicaStrategy::AlwaysFromPrimary, false);
assert!(one_of(
container.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired)),
&[31, 32],
));
}
#[test]
fn get_primary_connection_for_replica_route_if_all_replicas_were_removed() {
let container = create_container();
remove_nodes(&container, &["replica2-1", "replica3-1", "replica3-2"]);
assert_eq!(
2,
container
.connection_for_route(&Route::new(1002, SlotAddr::ReplicaOptional))
.unwrap()
.1
);
assert_eq!(
2,
container
.connection_for_route(&Route::new(1500, SlotAddr::ReplicaOptional))
.unwrap()
.1
);
assert_eq!(
3,
container
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional))
.unwrap()
.1
);
}
#[test]
fn get_connection_for_az_affinity_route() {
let container = create_container_with_az_strategy(
false,
Some(ReadFromReplicaStrategy::AZAffinity("use-1a".to_string())),
);
assert!(
container
.connection_for_route(&Route::new(1001, SlotAddr::ReplicaOptional))
.is_none()
);
assert_eq!(
21,
container
.connection_for_route(&Route::new(1002, SlotAddr::ReplicaOptional))
.unwrap()
.1
);
assert_eq!(
2,
container
.connection_for_route(&Route::new(1500, SlotAddr::Master))
.unwrap()
.1
);
assert!(one_of(
container.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired)),
&[31, 33],
));
remove_nodes(&container, &["replica3-3"]);
assert_eq!(
31,
container
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional))
.unwrap()
.1
);
remove_nodes(&container, &["replica3-1"]);
assert_eq!(
32,
container
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional))
.unwrap()
.1
);
remove_nodes(&container, &["replica3-2"]);
assert_eq!(
3,
container
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional))
.unwrap()
.1
);
}
#[test]
fn get_connection_for_az_affinity_route_round_robin() {
let container = create_container_with_az_strategy(
false,
Some(ReadFromReplicaStrategy::AZAffinity("use-1a".to_string())),
);
let mut addresses = vec![
container
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional))
.unwrap()
.1,
container
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional))
.unwrap()
.1,
container
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional))
.unwrap()
.1,
container
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional))
.unwrap()
.1,
];
addresses.sort();
assert_eq!(addresses, vec![31, 31, 33, 33]);
}
#[test]
fn get_connection_for_az_affinity_replicas_and_primary_route() {
let container: ConnectionsContainer<usize> = create_container_with_az_strategy(
false,
Some(ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(
"use-1a".to_string(),
)),
);
container
.connection_map
.get_mut("primary1")
.unwrap()
.user_connection
.az = Some("use-1b".to_string());
container
.connection_map
.get_mut("primary2")
.unwrap()
.user_connection
.az = Some("use-1c".to_string());
container
.connection_map
.get_mut("primary3")
.unwrap()
.user_connection
.az = Some("use-1b".to_string());
container
.connection_map
.get_mut("replica2-1")
.unwrap()
.user_connection
.az = Some("use-1c".to_string());
assert!(
container
.connection_for_route(&Route::new(1001, SlotAddr::ReplicaOptional))
.is_none()
);
assert!(one_of(
container.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired)),
&[31, 33],
));
remove_nodes(&container, &["replica3-3"]);
assert_eq!(
31,
container
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired))
.unwrap()
.1
);
remove_nodes(&container, &["replica3-1"]);
assert_eq!(
32,
container
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired))
.unwrap()
.1
);
container
.connection_map
.get_mut("primary3")
.unwrap()
.user_connection
.az = Some("use-1a".to_string());
remove_nodes(&container, &["replica3-2"]);
assert_eq!(
3,
container
.connection_for_route(&Route::new(2001, SlotAddr::Master))
.unwrap()
.1
);
container
.connection_map
.get_mut("primary3")
.unwrap()
.user_connection
.az = Some("use-1b".to_string());
assert_eq!(
21,
container
.connection_for_route(&Route::new(1002, SlotAddr::ReplicaRequired))
.unwrap()
.1
);
remove_nodes(&container, &["replica2-1"]);
assert!(one_of(
container.connection_for_route(&Route::new(1002, SlotAddr::Master)),
&[2],
));
assert!(one_of(
container.connection_for_route(&Route::new(500, SlotAddr::Master)),
&[1],
));
}
#[test]
fn get_connection_by_address() {
let container = create_container();
assert!(container.connection_for_address("foobar").is_none());
assert_eq!(1, container.connection_for_address("primary1").unwrap().1);
assert_eq!(2, container.connection_for_address("primary2").unwrap().1);
assert_eq!(3, container.connection_for_address("primary3").unwrap().1);
assert_eq!(
21,
container.connection_for_address("replica2-1").unwrap().1
);
assert_eq!(
31,
container.connection_for_address("replica3-1").unwrap().1
);
assert_eq!(
32,
container.connection_for_address("replica3-2").unwrap().1
);
}
#[test]
fn get_connection_by_address_returns_none_if_connection_was_removed() {
let container = create_container();
container.remove_node("primary1");
assert!(container.connection_for_address("primary1").is_none());
}
#[test]
fn get_connection_by_address_returns_added_connection() {
let container = create_container();
let address = container.replace_or_add_connection_for_address(
"foobar",
ClusterNode::new_only_with_user_conn(4),
);
assert_eq!(
(address, 4),
container.connection_for_address("foobar").unwrap()
);
}
#[test]
fn get_random_connections_without_repetitions() {
let container = create_container();
let random_connections: HashSet<_> = container
.random_connections(3, ConnectionType::User)
.expect("No connections found")
.into_iter()
.map(|pair| pair.1)
.collect();
assert_eq!(random_connections.len(), 3);
assert!(
random_connections
.iter()
.all(|connection| [1, 2, 3, 21, 31, 32].contains(connection))
);
}
#[test]
fn get_random_connections_returns_none_if_all_connections_were_removed() {
let container = create_container();
remove_all_connections(&container);
assert!(
container
.random_connections(1, ConnectionType::User)
.is_none()
);
}
#[test]
fn get_random_connections_returns_added_connection() {
let container = create_container();
remove_all_connections(&container);
let address = container.replace_or_add_connection_for_address(
"foobar",
ClusterNode::new_only_with_user_conn(4),
);
let random_connections: Vec<_> = container
.random_connections(1, ConnectionType::User)
.expect("No connections found")
.into_iter()
.collect();
assert_eq!(vec![(address, 4)], random_connections);
}
#[test]
fn get_random_connections_is_bound_by_the_number_of_connections_in_the_map() {
let container = create_container();
let mut random_connections: Vec<_> = container
.random_connections(1000, ConnectionType::User)
.expect("No connections found")
.into_iter()
.map(|pair| pair.1)
.collect();
random_connections.sort();
assert_eq!(random_connections, vec![1, 2, 3, 21, 31, 32]);
}
#[test]
fn get_random_management_connections() {
let container = create_container_with_strategy(ReadFromReplicaStrategy::RoundRobin, true);
let mut random_connections: Vec<_> = container
.random_connections(1000, ConnectionType::PreferManagement)
.expect("No connections found")
.into_iter()
.map(|pair| pair.1)
.collect();
random_connections.sort();
assert_eq!(random_connections, vec![10, 20, 30, 210, 310, 320]);
}
#[test]
fn get_all_user_connections() {
let container = create_container();
let mut connections: Vec<_> = container
.all_node_connections()
.map(|conn| conn.1)
.collect();
connections.sort();
assert_eq!(vec![1, 2, 3, 21, 31, 32], connections);
}
#[test]
fn get_all_user_connections_returns_added_connection() {
let container = create_container();
container.replace_or_add_connection_for_address(
"foobar",
ClusterNode::new_only_with_user_conn(4),
);
let mut connections: Vec<_> = container
.all_node_connections()
.map(|conn| conn.1)
.collect();
connections.sort();
assert_eq!(vec![1, 2, 3, 4, 21, 31, 32], connections);
}
#[test]
fn get_all_user_connections_does_not_return_removed_connection() {
let container = create_container();
container.remove_node("primary1");
let mut connections: Vec<_> = container
.all_node_connections()
.map(|conn| conn.1)
.collect();
connections.sort();
assert_eq!(vec![2, 3, 21, 31, 32], connections);
}
#[test]
fn get_all_primaries() {
let container = create_container();
let mut connections: Vec<_> = container
.all_primary_connections()
.map(|conn| conn.1)
.collect();
connections.sort();
assert_eq!(vec![1, 2, 3], connections);
}
#[test]
fn get_all_primaries_does_not_return_removed_connection() {
let container = create_container();
container.remove_node("primary1");
let mut connections: Vec<_> = container
.all_primary_connections()
.map(|conn| conn.1)
.collect();
connections.sort();
assert_eq!(vec![2, 3], connections);
}
#[test]
fn len_is_adjusted_on_removals_and_additions() {
let container = create_container();
assert_eq!(container.len(), 6);
container.remove_node("primary1");
assert_eq!(container.len(), 5);
container.replace_or_add_connection_for_address(
"foobar",
ClusterNode::new_only_with_user_conn(4),
);
assert_eq!(container.len(), 6);
}
#[test]
fn len_is_not_adjusted_on_removals_of_nonexisting_connections_or_additions_of_existing_connections()
{
let container = create_container();
assert_eq!(container.len(), 6);
container.remove_node("foobar");
assert_eq!(container.len(), 6);
container.replace_or_add_connection_for_address(
"primary1",
ClusterNode::new_only_with_user_conn(4),
);
assert_eq!(container.len(), 6);
}
#[test]
fn remove_node_returns_connection_if_it_exists() {
let container = create_container();
let connection = container.remove_node("primary1");
assert_eq!(connection, Some(ClusterNode::new_only_with_user_conn(1)));
let non_connection = container.remove_node("foobar");
assert_eq!(non_connection, None);
}
#[test]
fn test_is_empty() {
let container = create_container();
assert!(!container.is_empty());
container.remove_node("primary1");
assert!(!container.is_empty());
container.remove_node("primary2");
container.remove_node("primary3");
assert!(!container.is_empty());
container.remove_node("replica2-1");
container.remove_node("replica3-1");
assert!(!container.is_empty());
container.remove_node("replica3-2");
assert!(container.is_empty());
}
#[test]
fn is_primary_returns_true_for_known_primary() {
let container = create_container();
assert!(container.is_primary("primary1"));
}
#[test]
fn is_primary_returns_false_for_known_replica() {
let container = create_container();
assert!(!container.is_primary("replica2-1"));
}
#[test]
fn is_primary_returns_false_for_removed_node() {
let container = create_container();
let address = "primary1";
container.remove_node(address);
assert!(!container.is_primary(address));
}
#[test]
fn test_extend_connection_map() {
let mut container = create_container();
let mut current_addresses: Vec<_> = container
.all_node_connections()
.map(|conn| conn.0)
.collect();
let new_node: Arc<str> = Arc::from("new_primary1");
assert!(container.connection_for_address(&new_node).is_none());
let new_connection_map = DashMap::new();
new_connection_map.insert(Arc::clone(&new_node), create_cluster_node(1, false, None));
container.extend_connection_map(ConnectionsMap(new_connection_map));
let mut new_addresses: Vec<_> = container
.all_node_connections()
.map(|conn| conn.0)
.collect();
current_addresses.push(new_node);
current_addresses.sort();
new_addresses.sort();
assert_eq!(current_addresses, new_addresses);
}
}