use rand::SeedableRng;
use rand_chacha::ChaCha8Rng;
use scylla_cql::frame::response::result::TableSpec;
use uuid::Uuid;
use super::tablets::TabletsInfo;
use super::{ReplicaLocator, ReplicaSet};
use crate::cluster::Node;
use crate::cluster::metadata::{Keyspace, Metadata, Peer, Strategy};
use crate::cluster::{NodeAddr, NodeRef};
use crate::network::PoolConfig;
use crate::routing::Token;
use crate::test_utils::setup_tracing;
use std::collections::HashSet;
use std::sync::Arc;
use std::{
collections::{BTreeSet, HashMap},
net::SocketAddr,
};
pub(crate) const KEYSPACE_NTS_RF_2: &str = "keyspace_with_nts_rf_2";
pub(crate) const KEYSPACE_NTS_RF_3: &str = "keyspace_with_nts_rf_3";
pub(crate) const KEYSPACE_SS_RF_2: &str = "keyspace_with_ss_rf_2";
pub(crate) const TABLE_NTS_RF_2: &TableSpec<'static> =
&TableSpec::borrowed(KEYSPACE_NTS_RF_2, "table");
pub(crate) const TABLE_NTS_RF_3: &TableSpec<'static> =
&TableSpec::borrowed(KEYSPACE_NTS_RF_3, "table");
pub(crate) const TABLE_SS_RF_2: &TableSpec<'static> =
&TableSpec::borrowed(KEYSPACE_SS_RF_2, "table");
pub(crate) const TABLE_INVALID: &TableSpec<'static> = &TableSpec::borrowed("invalid", "invalid");
pub(crate) const A: u16 = 1;
pub(crate) const B: u16 = 2;
pub(crate) const C: u16 = 3;
pub(crate) const D: u16 = 4;
pub(crate) const E: u16 = 5;
pub(crate) const F: u16 = 6;
pub(crate) const G: u16 = 7;
pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata {
let peers = [
Peer {
datacenter: Some("eu".into()),
rack: Some("r1".to_owned()),
address: id_to_invalid_addr(1),
tokens: vec![Token::new(50), Token::new(250), Token::new(400)],
host_id: Uuid::new_v4(),
},
Peer {
datacenter: Some("eu".into()),
rack: Some("r1".to_owned()),
address: id_to_invalid_addr(2),
tokens: vec![Token::new(100), Token::new(600), Token::new(900)],
host_id: Uuid::new_v4(),
},
Peer {
datacenter: Some("eu".into()),
rack: Some("r1".to_owned()),
address: id_to_invalid_addr(3),
tokens: vec![Token::new(300), Token::new(650), Token::new(700)],
host_id: Uuid::new_v4(),
},
Peer {
datacenter: Some("us".into()),
rack: Some("r1".to_owned()),
address: id_to_invalid_addr(4),
tokens: vec![Token::new(350), Token::new(550)],
host_id: Uuid::new_v4(),
},
Peer {
datacenter: Some("us".into()),
rack: Some("r1".to_owned()),
address: id_to_invalid_addr(5),
tokens: vec![Token::new(150), Token::new(750)],
host_id: Uuid::new_v4(),
},
Peer {
datacenter: Some("us".into()),
rack: Some("r2".to_owned()),
address: id_to_invalid_addr(6),
tokens: vec![Token::new(200), Token::new(450)],
host_id: Uuid::new_v4(),
},
Peer {
datacenter: Some("eu".into()),
rack: Some("r2".to_owned()),
address: id_to_invalid_addr(7),
tokens: vec![Token::new(500), Token::new(800)],
host_id: Uuid::new_v4(),
},
];
let keyspaces = [
(
KEYSPACE_SS_RF_2.into(),
Ok(Keyspace {
strategy: Strategy::SimpleStrategy {
replication_factor: 2,
},
durable_writes: true,
tables: HashMap::new(),
views: HashMap::new(),
user_defined_types: HashMap::new(),
}),
),
(
KEYSPACE_NTS_RF_2.into(),
Ok(Keyspace {
strategy: Strategy::NetworkTopologyStrategy {
datacenter_repfactors: [("eu".to_owned(), 2), ("us".to_owned(), 2)]
.into_iter()
.collect(),
},
durable_writes: true,
tables: HashMap::new(),
views: HashMap::new(),
user_defined_types: HashMap::new(),
}),
),
(
KEYSPACE_NTS_RF_3.into(),
Ok(Keyspace {
strategy: Strategy::NetworkTopologyStrategy {
datacenter_repfactors: [("eu".to_owned(), 3), ("us".to_owned(), 3)]
.into_iter()
.collect(),
},
durable_writes: true,
tables: HashMap::new(),
views: HashMap::new(),
user_defined_types: HashMap::new(),
}),
),
]
.iter()
.cloned()
.collect();
Metadata {
peers: Vec::from(peers),
keyspaces,
}
}
pub(crate) fn id_to_invalid_addr(id: u16) -> NodeAddr {
NodeAddr::Translatable(SocketAddr::from(([255, 255, 255, 255], id)))
}
fn assert_same_node_ids<'a>(left: impl Iterator<Item = NodeRef<'a>>, ids: &[u16]) {
let left: BTreeSet<_> = left.map(|node| node.address.port()).collect();
assert_eq!(left, ids.iter().copied().collect())
}
fn assert_replica_set_equal_to(nodes: ReplicaSet<'_>, ids: &[u16]) {
assert_same_node_ids(nodes.into_iter().map(|(node, _shard)| node), ids)
}
pub(crate) fn create_ring(metadata: &Metadata) -> impl Iterator<Item = (Token, Arc<Node>)> + use<> {
let pool_config: PoolConfig = Default::default();
let mut ring: Vec<(Token, Arc<Node>)> = Vec::new();
let (connectivity_events_sender, _) = tokio::sync::mpsc::unbounded_channel();
for peer in &metadata.peers {
let node = Arc::new(Node::new(
peer.to_peer_endpoint(),
&pool_config,
connectivity_events_sender.clone(),
None,
true,
#[cfg(feature = "metrics")]
Default::default(),
));
for token in &peer.tokens {
ring.push((*token, node.clone()));
}
}
ring.into_iter()
}
pub(crate) fn create_locator(metadata: &Metadata) -> ReplicaLocator {
let ring = create_ring(metadata);
let strategies = metadata
.keyspaces
.values()
.map(|ks| &ks.as_ref().unwrap().strategy);
ReplicaLocator::new(ring, strategies, TabletsInfo::new())
}
#[tokio::test]
async fn test_locator() {
setup_tracing();
let locator = create_locator(&mock_metadata_for_token_aware_tests());
test_datacenter_info(&locator);
test_simple_strategy_replicas(&locator);
test_network_topology_strategy_replicas(&locator);
test_replica_set_len(&locator);
test_replica_set_choose(&locator);
test_replica_set_choose_filtered(&locator);
}
fn test_datacenter_info(locator: &ReplicaLocator) {
let names: BTreeSet<_> = locator
.datacenter_names()
.iter()
.map(|name| name.as_str())
.collect();
assert_eq!(names, ["eu", "us"].into_iter().collect());
assert_same_node_ids(
locator.unique_nodes_in_global_ring().iter(),
&[A, B, C, D, E, F, G],
);
assert_same_node_ids(
locator
.unique_nodes_in_datacenter_ring("eu")
.unwrap()
.iter(),
&[A, B, C, G],
);
assert_same_node_ids(
locator
.unique_nodes_in_datacenter_ring("us")
.unwrap()
.iter(),
&[D, E, F],
);
}
fn test_simple_strategy_replicas(locator: &ReplicaLocator) {
assert_replica_set_equal_to(
locator.replicas_for_token(
Token::new(450),
&Strategy::SimpleStrategy {
replication_factor: 3,
},
None,
TABLE_INVALID,
),
&[F, G, D],
);
assert_replica_set_equal_to(
locator.replicas_for_token(
Token::new(450),
&Strategy::SimpleStrategy {
replication_factor: 4,
},
None,
TABLE_INVALID,
),
&[F, G, D, B],
);
assert_replica_set_equal_to(
locator.replicas_for_token(
Token::new(201),
&Strategy::SimpleStrategy {
replication_factor: 4,
},
None,
TABLE_INVALID,
),
&[A, C, D, F],
);
assert_replica_set_equal_to(
locator.replicas_for_token(
Token::new(201),
&Strategy::SimpleStrategy {
replication_factor: 0,
},
None,
TABLE_INVALID,
),
&[],
);
assert_replica_set_equal_to(
locator.replicas_for_token(
Token::new(50),
&Strategy::SimpleStrategy {
replication_factor: 1,
},
Some("us"),
TABLE_INVALID,
),
&[],
);
assert_replica_set_equal_to(
locator.replicas_for_token(
Token::new(50),
&Strategy::SimpleStrategy {
replication_factor: 3,
},
Some("us"),
TABLE_INVALID,
),
&[E],
);
assert_replica_set_equal_to(
locator.replicas_for_token(
Token::new(50),
&Strategy::SimpleStrategy {
replication_factor: 3,
},
Some("eu"),
TABLE_INVALID,
),
&[A, B],
);
}
fn test_network_topology_strategy_replicas(locator: &ReplicaLocator) {
assert_replica_set_equal_to(
locator.replicas_for_token(
Token::new(75),
&Strategy::NetworkTopologyStrategy {
datacenter_repfactors: [("eu".to_owned(), 1), ("us".to_owned(), 1)]
.into_iter()
.collect(),
},
Some("eu"),
TABLE_INVALID,
),
&[B],
);
assert_replica_set_equal_to(
locator.replicas_for_token(
Token::new(75),
&Strategy::NetworkTopologyStrategy {
datacenter_repfactors: [("eu".to_owned(), 1), ("us".to_owned(), 1)]
.into_iter()
.collect(),
},
Some("us"),
TABLE_INVALID,
),
&[E],
);
assert_replica_set_equal_to(
locator.replicas_for_token(
Token::new(75),
&Strategy::NetworkTopologyStrategy {
datacenter_repfactors: [("eu".to_owned(), 1), ("us".to_owned(), 1)]
.into_iter()
.collect(),
},
None,
TABLE_INVALID,
),
&[B, E],
);
assert_replica_set_equal_to(
locator.replicas_for_token(
Token::new(75),
&Strategy::NetworkTopologyStrategy {
datacenter_repfactors: [("eu".to_owned(), 2), ("us".to_owned(), 1)]
.into_iter()
.collect(),
},
None,
TABLE_INVALID,
),
&[B, E, G],
);
assert_replica_set_equal_to(
locator.replicas_for_token(
Token::new(75),
&Strategy::NetworkTopologyStrategy {
datacenter_repfactors: [("unknown".to_owned(), 2), ("us".to_owned(), 1)]
.into_iter()
.collect(),
},
None,
TABLE_INVALID,
),
&[E],
);
assert_replica_set_equal_to(
locator.replicas_for_token(
Token::new(800),
&Strategy::NetworkTopologyStrategy {
datacenter_repfactors: [("eu".to_owned(), 1), ("us".to_owned(), 1)]
.into_iter()
.collect(),
},
None,
TABLE_INVALID,
),
&[G, E],
);
}
fn test_replica_set_len(locator: &ReplicaLocator) {
let merged_nts_len = locator
.replicas_for_token(
Token::new(75),
&Strategy::NetworkTopologyStrategy {
datacenter_repfactors: [("eu".to_owned(), 2), ("us".to_owned(), 1)]
.into_iter()
.collect(),
},
None,
TABLE_INVALID,
)
.len();
assert_eq!(merged_nts_len, 3);
let capped_merged_nts_len = locator
.replicas_for_token(
Token::new(75),
&Strategy::NetworkTopologyStrategy {
datacenter_repfactors: [("eu".to_owned(), 69), ("us".to_owned(), 1)]
.into_iter()
.collect(),
},
None,
TABLE_INVALID,
)
.len();
assert_eq!(capped_merged_nts_len, 5);
let filtered_nts_len = locator
.replicas_for_token(
Token::new(450),
&Strategy::NetworkTopologyStrategy {
datacenter_repfactors: [("eu".to_owned(), 2), ("us".to_owned(), 1)]
.into_iter()
.collect(),
},
Some("eu"),
TABLE_INVALID,
)
.len();
assert_eq!(filtered_nts_len, 2);
let ss_len = locator
.replicas_for_token(
Token::new(75),
&Strategy::SimpleStrategy {
replication_factor: 3,
},
None,
TABLE_INVALID,
)
.len();
assert_eq!(ss_len, 3);
let filtered_ss_len = locator
.replicas_for_token(
Token::new(75),
&Strategy::SimpleStrategy {
replication_factor: 3,
},
Some("eu"),
TABLE_INVALID,
)
.len();
assert_eq!(filtered_ss_len, 1)
}
fn test_replica_set_choose(locator: &ReplicaLocator) {
let strategies = [
Strategy::NetworkTopologyStrategy {
datacenter_repfactors: [("eu".to_owned(), 2137), ("us".to_owned(), 69)]
.into_iter()
.collect(),
},
Strategy::SimpleStrategy {
replication_factor: 42,
},
];
let mut rng = ChaCha8Rng::seed_from_u64(69);
for strategy in strategies {
let replica_set_generator =
|| locator.replicas_for_token(Token::new(75), &strategy, None, TABLE_INVALID);
let mut chosen_replicas = HashSet::new();
for _ in 0..32 {
let set = replica_set_generator();
let (node, _shard) = set
.choose(&mut rng)
.expect("choose from non-empty set must return some node");
chosen_replicas.insert(node.host_id);
}
assert_eq!(
chosen_replicas,
locator
.unique_nodes_in_global_ring()
.iter()
.map(|node| node.host_id)
.collect()
)
}
}
fn test_replica_set_choose_filtered(locator: &ReplicaLocator) {
let strategies = [
Strategy::NetworkTopologyStrategy {
datacenter_repfactors: [("eu".to_owned(), 2137), ("us".to_owned(), 69)]
.into_iter()
.collect(),
},
Strategy::SimpleStrategy {
replication_factor: 42,
},
];
let mut rng = ChaCha8Rng::seed_from_u64(69);
for strategy in strategies {
let replica_set_generator =
|| locator.replicas_for_token(Token::new(75), &strategy, None, TABLE_INVALID);
let mut chosen_replicas = HashSet::new();
for _ in 0..32 {
let set = replica_set_generator();
let (node, _shard) = set
.choose_filtered(&mut rng, |(node, _shard)| {
node.datacenter == Some("eu".into())
})
.expect("choose from non-empty set must return some node");
chosen_replicas.insert(node.host_id);
}
assert_eq!(
chosen_replicas,
locator
.unique_nodes_in_datacenter_ring("eu")
.unwrap()
.iter()
.map(|node| node.host_id)
.collect()
)
}
let empty = locator
.replicas_for_token(
Token::new(75),
&Strategy::LocalStrategy,
Some("unknown_dc_name"),
TABLE_INVALID,
)
.choose_filtered(&mut rng, |_| true);
assert_eq!(empty, None);
}