mod precomputed_replicas;
mod replicas;
mod replication_info;
pub(crate) mod tablets;
#[cfg(test)]
pub(crate) mod test;
mod token_ring;
use rand::{Rng, seq::IteratorRandom};
use scylla_cql::frame::response::result::TableSpec;
pub use token_ring::TokenRing;
use self::tablets::TabletsInfo;
use crate::cluster::metadata::Strategy;
use crate::cluster::{Node, NodeRef};
use crate::routing::{Shard, Token};
use itertools::Itertools;
use precomputed_replicas::PrecomputedReplicas;
use replicas::{EMPTY_REPLICAS, ReplicasArray};
use replication_info::ReplicationInfo;
use std::{
cmp,
collections::{HashMap, HashSet},
sync::Arc,
};
use tracing::debug;
#[derive(Debug, Clone)]
pub struct ReplicaLocator {
replication_data: ReplicationInfo,
precomputed_replicas: PrecomputedReplicas,
datacenters: Vec<String>,
pub(crate) tablets: TabletsInfo,
}
impl ReplicaLocator {
pub(crate) fn new<'a>(
ring_iter: impl Iterator<Item = (Token, Arc<Node>)>,
precompute_replica_sets_for: impl Iterator<Item = &'a Strategy>,
tablets: TabletsInfo,
) -> Self {
let replication_data = ReplicationInfo::new(ring_iter);
let precomputed_replicas =
PrecomputedReplicas::compute(&replication_data, precompute_replica_sets_for);
let datacenters = replication_data
.get_global_ring()
.iter()
.filter_map(|(_, node)| node.datacenter.as_deref())
.unique()
.map(ToOwned::to_owned)
.collect();
Self {
replication_data,
precomputed_replicas,
datacenters,
tablets,
}
}
pub fn replicas_for_token<'a>(
&'a self,
token: Token,
strategy: &'a Strategy,
datacenter: Option<&'a str>,
table_spec: &TableSpec,
) -> ReplicaSet<'a> {
if let Some(tablets) = self.tablets.tablets_for_table(table_spec) {
let replicas: Option<&[(Arc<Node>, Shard)]> = if let Some(datacenter) = datacenter {
tablets.dc_replicas_for_token(token, datacenter)
} else {
tablets.replicas_for_token(token)
};
ReplicaSet {
inner: ReplicaSetInner::PlainSharded(replicas.unwrap_or(
&[],
)),
token,
}
} else {
match strategy {
Strategy::SimpleStrategy { replication_factor } => {
if let Some(datacenter) = datacenter {
let replicas =
self.get_simple_strategy_replicas(token, *replication_factor);
return ReplicaSet {
inner: ReplicaSetInner::FilteredSimple {
replicas,
datacenter,
},
token,
};
} else {
return ReplicaSet {
inner: ReplicaSetInner::Plain(
self.get_simple_strategy_replicas(token, *replication_factor),
),
token,
};
}
}
Strategy::NetworkTopologyStrategy {
datacenter_repfactors,
} => {
if let Some(dc) = datacenter {
if let Some(repfactor) = datacenter_repfactors.get(dc) {
return ReplicaSet {
inner: ReplicaSetInner::Plain(
self.get_network_strategy_replicas(token, dc, *repfactor),
),
token,
};
} else {
debug!("Datacenter ({}) does not exist!", dc);
return ReplicaSet {
inner: ReplicaSetInner::Plain(EMPTY_REPLICAS),
token,
};
}
} else {
return ReplicaSet {
inner: ReplicaSetInner::ChainedNTS {
datacenter_repfactors,
locator: self,
token,
},
token,
};
}
}
Strategy::Other { name, .. } => {
debug!(
"Unknown strategy ({}), falling back to SimpleStrategy with replication_factor = 1",
name
)
}
_ => (),
}
self.replicas_for_token(
token,
&Strategy::SimpleStrategy {
replication_factor: 1,
},
datacenter,
table_spec,
)
}
}
pub fn ring(&self) -> &TokenRing<Arc<Node>> {
self.replication_data.get_global_ring()
}
pub fn unique_nodes_in_global_ring(&self) -> &[Arc<Node>] {
self.replication_data.unique_nodes_in_global_ring()
}
pub fn datacenter_names(&self) -> &[String] {
self.datacenters.as_slice()
}
pub fn unique_nodes_in_datacenter_ring<'a>(
&'a self,
datacenter_name: &str,
) -> Option<&'a [Arc<Node>]> {
self.replication_data
.unique_nodes_in_datacenter_ring(datacenter_name)
}
fn get_simple_strategy_replicas(
&self,
token: Token,
replication_factor: usize,
) -> ReplicasArray<'_> {
if replication_factor == 0 {
return EMPTY_REPLICAS;
}
if let Some(precomputed_replicas) = self
.precomputed_replicas
.get_precomputed_simple_strategy_replicas(token, replication_factor)
{
precomputed_replicas.into()
} else {
ReplicasArray::from_iter(
self.replication_data
.simple_strategy_replicas(token, replication_factor),
)
}
}
fn get_network_strategy_replicas<'a>(
&'a self,
token: Token,
datacenter: &str,
datacenter_replication_factor: usize,
) -> ReplicasArray<'a> {
if datacenter_replication_factor == 0 {
return EMPTY_REPLICAS;
}
if let Some(precomputed_replicas) = self
.precomputed_replicas
.get_precomputed_network_strategy_replicas(
token,
datacenter,
datacenter_replication_factor,
)
{
ReplicasArray::from(precomputed_replicas)
} else {
ReplicasArray::from_iter(self.replication_data.nts_replicas_in_datacenter(
token,
datacenter,
datacenter_replication_factor,
))
}
}
}
fn with_computed_shard(node: NodeRef, token: Token) -> (NodeRef, Shard) {
let shard = node
.sharder()
.map(|sharder| sharder.shard_of(token))
.unwrap_or(0);
(node, shard)
}
#[derive(Debug)]
enum ReplicaSetInner<'a> {
Plain(ReplicasArray<'a>),
PlainSharded(&'a [(Arc<Node>, Shard)]),
FilteredSimple {
replicas: ReplicasArray<'a>,
datacenter: &'a str,
},
ChainedNTS {
datacenter_repfactors: &'a HashMap<String, usize>,
locator: &'a ReplicaLocator,
token: Token,
},
}
#[derive(Debug)]
pub struct ReplicaSet<'a> {
inner: ReplicaSetInner<'a>,
token: Token,
}
impl<'a> ReplicaSet<'a> {
pub fn choose_filtered<R>(
self,
rng: &mut R,
predicate: impl Fn(&(NodeRef<'a>, Shard)) -> bool,
) -> Option<(NodeRef<'a>, Shard)>
where
R: Rng + ?Sized,
{
let happy = self.choose(rng)?;
if predicate(&happy) {
return Some(happy);
}
self.into_iter().filter(predicate).choose(rng)
}
pub fn len(&self) -> usize {
match &self.inner {
ReplicaSetInner::Plain(replicas) => replicas.len(),
ReplicaSetInner::PlainSharded(replicas) => replicas.len(),
ReplicaSetInner::FilteredSimple {
replicas,
datacenter,
} => replicas
.iter()
.filter(|node| node.datacenter.as_deref() == Some(*datacenter))
.count(),
ReplicaSetInner::ChainedNTS {
datacenter_repfactors,
locator,
token: _,
} => datacenter_repfactors
.iter()
.map(|(dc, rf)| {
let unique_nodes_in_dc_count = locator
.unique_nodes_in_datacenter_ring(dc)
.map(|nodes| nodes.len())
.unwrap_or(0);
cmp::min(*rf, unique_nodes_in_dc_count)
})
.sum(),
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
fn choose<R>(&self, rng: &mut R) -> Option<(NodeRef<'a>, Shard)>
where
R: Rng + ?Sized,
{
let len = self.len();
if len > 0 {
let index = rng.random_range(0..len);
match &self.inner {
ReplicaSetInner::Plain(replicas) => replicas
.get(index)
.map(|node| with_computed_shard(node, self.token)),
ReplicaSetInner::PlainSharded(replicas) => {
replicas.get(index).map(|(node, shard)| (node, *shard))
}
ReplicaSetInner::FilteredSimple {
replicas,
datacenter,
} => replicas
.iter()
.filter(|node| node.datacenter.as_deref() == Some(*datacenter))
.nth(index)
.map(|node| with_computed_shard(node, self.token)),
ReplicaSetInner::ChainedNTS {
datacenter_repfactors,
locator,
token,
} => {
let mut nodes_to_skip = index;
for datacenter in locator.datacenters.iter() {
let requested_repfactor =
*datacenter_repfactors.get(datacenter).unwrap_or(&0);
let unique_nodes_in_dc_count = locator
.unique_nodes_in_datacenter_ring(datacenter)
.map(|nodes| nodes.len())
.unwrap_or(0);
let repfactor = cmp::min(requested_repfactor, unique_nodes_in_dc_count);
if nodes_to_skip < repfactor {
return locator
.get_network_strategy_replicas(*token, datacenter, repfactor)
.get(nodes_to_skip)
.map(|node| with_computed_shard(node, self.token));
}
nodes_to_skip -= repfactor;
}
None
}
}
} else {
None
}
}
}
impl<'a> IntoIterator for ReplicaSet<'a> {
type Item = (NodeRef<'a>, Shard);
type IntoIter = ReplicaSetIterator<'a>;
fn into_iter(self) -> Self::IntoIter {
let inner = match self.inner {
ReplicaSetInner::Plain(replicas) => ReplicaSetIteratorInner::Plain { replicas, idx: 0 },
ReplicaSetInner::PlainSharded(replicas) => {
ReplicaSetIteratorInner::PlainSharded { replicas, idx: 0 }
}
ReplicaSetInner::FilteredSimple {
replicas,
datacenter,
} => ReplicaSetIteratorInner::FilteredSimple {
replicas,
datacenter,
idx: 0,
},
ReplicaSetInner::ChainedNTS {
datacenter_repfactors,
locator,
token,
} => {
if let Some(datacenter) = locator.datacenters.first() {
let repfactor = *datacenter_repfactors.get(datacenter.as_str()).unwrap_or(&0);
ReplicaSetIteratorInner::ChainedNTS {
replicas: locator
.get_network_strategy_replicas(token, datacenter, repfactor),
replicas_idx: 0,
locator,
token,
datacenter_idx: 0,
datacenter_repfactors,
}
} else {
ReplicaSetIteratorInner::Plain {
replicas: EMPTY_REPLICAS,
idx: 0,
}
}
}
};
ReplicaSetIterator {
inner,
token: self.token,
}
}
}
#[derive(Clone)]
enum ReplicaSetIteratorInner<'a> {
Plain {
replicas: ReplicasArray<'a>,
idx: usize,
},
PlainSharded {
replicas: &'a [(Arc<Node>, Shard)],
idx: usize,
},
FilteredSimple {
replicas: ReplicasArray<'a>,
datacenter: &'a str,
idx: usize,
},
ChainedNTS {
replicas: ReplicasArray<'a>,
replicas_idx: usize,
datacenter_repfactors: &'a HashMap<String, usize>,
locator: &'a ReplicaLocator,
token: Token,
datacenter_idx: usize,
},
}
#[derive(Clone)]
pub struct ReplicaSetIterator<'a> {
inner: ReplicaSetIteratorInner<'a>,
token: Token,
}
impl<'a> Iterator for ReplicaSetIterator<'a> {
type Item = (NodeRef<'a>, Shard);
fn next(&mut self) -> Option<Self::Item> {
match &mut self.inner {
ReplicaSetIteratorInner::Plain { replicas, idx } => {
if let Some(replica) = replicas.get(*idx) {
*idx += 1;
return Some(with_computed_shard(replica, self.token));
}
None
}
ReplicaSetIteratorInner::PlainSharded { replicas, idx } => {
if let Some((replica, shard)) = replicas.get(*idx) {
*idx += 1;
return Some((replica, *shard));
}
None
}
ReplicaSetIteratorInner::FilteredSimple {
replicas,
datacenter,
idx,
} => {
while let Some(replica) = replicas.get(*idx) {
*idx += 1;
if replica.datacenter.as_deref() == Some(*datacenter) {
return Some(with_computed_shard(replica, self.token));
}
}
None
}
ReplicaSetIteratorInner::ChainedNTS {
replicas,
replicas_idx,
locator,
token,
datacenter_idx,
datacenter_repfactors,
} => {
if let Some(replica) = replicas.get(*replicas_idx) {
*replicas_idx += 1;
Some(with_computed_shard(replica, self.token))
} else if *datacenter_idx + 1 < locator.datacenters.len() {
*datacenter_idx += 1;
*replicas_idx = 0;
let datacenter = &locator.datacenters[*datacenter_idx];
let repfactor = *datacenter_repfactors.get(datacenter).unwrap_or(&0);
*replicas =
locator.get_network_strategy_replicas(*token, datacenter, repfactor);
self.next()
} else {
None
}
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
match &self.inner {
ReplicaSetIteratorInner::Plain { replicas, idx } => {
let size = replicas.len() - *idx;
(size, Some(size))
}
ReplicaSetIteratorInner::PlainSharded { replicas, idx } => {
let size = replicas.len() - *idx;
(size, Some(size))
}
ReplicaSetIteratorInner::FilteredSimple {
replicas,
datacenter: _,
idx,
} => (0, Some(replicas.len() - *idx)),
ReplicaSetIteratorInner::ChainedNTS {
replicas: _,
replicas_idx: _,
datacenter_repfactors,
locator,
token: _,
datacenter_idx,
} => {
let yielded: usize = locator.datacenter_names()[0..*datacenter_idx]
.iter()
.filter_map(|name| datacenter_repfactors.get(name))
.sum();
(
0,
Some(datacenter_repfactors.values().sum::<usize>() - yielded),
)
}
}
}
fn nth(&mut self, n: usize) -> Option<Self::Item> {
match &mut self.inner {
ReplicaSetIteratorInner::Plain { replicas: _, idx }
| ReplicaSetIteratorInner::PlainSharded { replicas: _, idx } => {
*idx += n;
self.next()
}
_ => {
for _i in 0..n {
self.next()?;
}
self.next()
}
}
}
}
impl<'a> ReplicaSet<'a> {
pub fn into_replicas_ordered(self) -> ReplicasOrdered<'a> {
ReplicasOrdered { replica_set: self }
}
}
pub struct ReplicasOrdered<'a> {
replica_set: ReplicaSet<'a>,
}
pub struct ReplicasOrderedIterator<'a> {
inner: ReplicasOrderedIteratorInner<'a>,
}
enum ReplicasOrderedIteratorInner<'a> {
AlreadyRingOrdered {
replica_set_iter: ReplicaSetIterator<'a>,
},
PolyDatacenterNTS {
replicas_ordered_iter: ReplicasOrderedNTSIterator<'a>,
},
}
struct ReplicasOrderedNTSIterator<'a> {
token: Token,
inner: ReplicasOrderedNTSIteratorInner<'a>,
}
enum ReplicasOrderedNTSIteratorInner<'a> {
FreshForPick {
datacenter_repfactors: &'a HashMap<String, usize>,
locator: &'a ReplicaLocator,
token: Token,
},
Picked {
datacenter_repfactors: &'a HashMap<String, usize>,
locator: &'a ReplicaLocator,
token: Token,
picked: NodeRef<'a>,
},
ComputedFallback {
replicas: ReplicasArray<'a>,
idx: usize,
},
}
impl<'a> Iterator for ReplicasOrderedNTSIterator<'a> {
type Item = (NodeRef<'a>, Shard);
fn next(&mut self) -> Option<Self::Item> {
match self.inner {
ReplicasOrderedNTSIteratorInner::FreshForPick {
datacenter_repfactors,
locator,
token,
} => {
let nodes_on_ring = locator.replication_data.get_global_ring().ring_range(token);
for node in nodes_on_ring {
if let Some(dc) = &node.datacenter
&& datacenter_repfactors.get(dc).is_some()
{
self.inner = ReplicasOrderedNTSIteratorInner::Picked {
datacenter_repfactors,
locator,
token,
picked: node,
};
return Some(with_computed_shard(node, self.token));
}
}
None
}
ReplicasOrderedNTSIteratorInner::Picked {
datacenter_repfactors,
locator,
token,
picked,
} => {
#[expect(clippy::mutable_key_type)]
let mut all_replicas: HashSet<&'a Arc<Node>> = HashSet::new();
for (datacenter, repfactor) in datacenter_repfactors.iter() {
all_replicas.extend(
locator
.get_network_strategy_replicas(token, datacenter, *repfactor)
.iter(),
);
}
all_replicas.remove(picked);
let mut replicas_ordered = vec![];
let nodes_on_ring = locator.replication_data.get_global_ring().ring_range(token);
for node in nodes_on_ring {
if all_replicas.is_empty() {
break;
}
if all_replicas.remove(node) {
replicas_ordered.push(node);
}
}
assert!(
all_replicas.is_empty(),
"all_replicas somehow contained a node that wasn't present in the global ring!"
);
self.inner = ReplicasOrderedNTSIteratorInner::ComputedFallback {
replicas: ReplicasArray::Owned(replicas_ordered),
idx: 0,
};
self.next()
}
ReplicasOrderedNTSIteratorInner::ComputedFallback {
ref replicas,
ref mut idx,
} => {
if let Some(replica) = replicas.get(*idx) {
*idx += 1;
Some(with_computed_shard(replica, self.token))
} else {
None
}
}
}
}
}
impl<'a> Iterator for ReplicasOrderedIterator<'a> {
type Item = (NodeRef<'a>, Shard);
fn next(&mut self) -> Option<Self::Item> {
match &mut self.inner {
ReplicasOrderedIteratorInner::AlreadyRingOrdered { replica_set_iter } => {
replica_set_iter.next()
}
ReplicasOrderedIteratorInner::PolyDatacenterNTS {
replicas_ordered_iter,
} => replicas_ordered_iter.next(),
}
}
}
impl<'a> IntoIterator for ReplicasOrdered<'a> {
type Item = (NodeRef<'a>, Shard);
type IntoIter = ReplicasOrderedIterator<'a>;
fn into_iter(self) -> Self::IntoIter {
let Self { replica_set } = self;
Self::IntoIter {
inner: match replica_set.inner {
ReplicaSetInner::Plain(_) | ReplicaSetInner::FilteredSimple { .. } => {
ReplicasOrderedIteratorInner::AlreadyRingOrdered {
replica_set_iter: replica_set.into_iter(),
}
}
ReplicaSetInner::PlainSharded(_) => {
ReplicasOrderedIteratorInner::AlreadyRingOrdered {
replica_set_iter: replica_set.into_iter(),
}
}
ReplicaSetInner::ChainedNTS {
datacenter_repfactors,
locator,
token,
} => ReplicasOrderedIteratorInner::PolyDatacenterNTS {
replicas_ordered_iter: ReplicasOrderedNTSIterator {
token: replica_set.token,
inner: ReplicasOrderedNTSIteratorInner::FreshForPick {
datacenter_repfactors,
locator,
token,
},
},
},
},
}
}
}
#[cfg(test)]
mod tests {
use crate::{routing::Token, routing::locator::test::*, test_utils::setup_tracing};
#[tokio::test]
async fn test_replicas_ordered() {
setup_tracing();
let metadata = mock_metadata_for_token_aware_tests();
let locator = create_locator(&metadata);
let check = |token, limit_to_dc, strategy, table, expected| {
let replica_set =
locator.replicas_for_token(Token::new(token), strategy, limit_to_dc, table);
let replicas_ordered = replica_set.into_replicas_ordered();
let ids: Vec<_> = replicas_ordered
.into_iter()
.map(|(node, _shard)| node.address.port())
.collect();
assert_eq!(expected, ids);
};
check(
160,
None,
&metadata
.keyspaces
.get(KEYSPACE_NTS_RF_3)
.unwrap()
.as_ref()
.unwrap()
.strategy,
TABLE_NTS_RF_3,
vec![F, A, C, D, G, E],
);
check(
160,
None,
&metadata
.keyspaces
.get(KEYSPACE_NTS_RF_2)
.unwrap()
.as_ref()
.unwrap()
.strategy,
TABLE_NTS_RF_2,
vec![F, A, D, G],
);
check(
160,
None,
&metadata
.keyspaces
.get(KEYSPACE_SS_RF_2)
.unwrap()
.as_ref()
.unwrap()
.strategy,
TABLE_SS_RF_2,
vec![F, A],
);
check(
160,
Some("eu"),
&metadata
.keyspaces
.get(KEYSPACE_NTS_RF_3)
.unwrap()
.as_ref()
.unwrap()
.strategy,
TABLE_NTS_RF_3,
vec![A, C, G],
);
check(
160,
Some("us"),
&metadata
.keyspaces
.get(KEYSPACE_NTS_RF_3)
.unwrap()
.as_ref()
.unwrap()
.strategy,
TABLE_NTS_RF_3,
vec![F, D, E],
);
check(
160,
Some("eu"),
&metadata
.keyspaces
.get(KEYSPACE_SS_RF_2)
.unwrap()
.as_ref()
.unwrap()
.strategy,
TABLE_SS_RF_2,
vec![A],
);
}
}