use crate::cluster::client::SlotsRefreshRateLimit;
use crate::cluster::compat::get_connection_addr;
use crate::cluster::routing::Slot;
use crate::cluster::slotmap::{ReadFromReplicaStrategy, SlotMap};
use crate::connection::info::TlsMode;
use crate::value::{ErrorKind, Error, Result, Value};
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::net::IpAddr;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::{Duration, SystemTime};
use tokio::sync::RwLock;
use tracing::info;
pub const DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES: usize = 3;
pub const DEFAULT_REFRESH_SLOTS_RETRY_BASE_DURATION_MILLIS: u64 = 500;
pub const DEFAULT_REFRESH_SLOTS_RETRY_BASE_FACTOR: f64 = 1.5;
pub const DEFAULT_SLOTS_REFRESH_WAIT_DURATION: Duration = Duration::from_secs(15);
pub const DEFAULT_SLOTS_REFRESH_MAX_JITTER_MILLI: u64 = 15 * 1000;
pub(crate) const SLOT_SIZE: u16 = 16384;
pub(crate) type TopologyHash = u64;
pub(crate) struct SlotRefreshState {
pub(crate) in_progress: AtomicBool,
pub(crate) last_run: Arc<RwLock<Option<SystemTime>>>,
pub(crate) rate_limiter: SlotsRefreshRateLimit,
}
impl SlotRefreshState {
pub(crate) fn new(rate_limiter: SlotsRefreshRateLimit) -> Self {
Self {
in_progress: AtomicBool::new(false),
last_run: Arc::new(RwLock::new(None)),
rate_limiter,
}
}
}
#[derive(Debug)]
pub(crate) struct TopologyView {
pub(crate) hash_value: TopologyHash,
pub(crate) nodes_count: u16,
slots_and_count: (u16, Vec<Slot>),
address_to_ip_map: HashMap<String, IpAddr>,
}
impl PartialEq for TopologyView {
fn eq(&self, other: &Self) -> bool {
self.hash_value == other.hash_value
}
}
impl Eq for TopologyView {}
pub(crate) fn slot(key: &[u8]) -> u16 {
crc16::State::<crc16::XMODEM>::calculate(key) % SLOT_SIZE
}
fn get_hashtag(key: &[u8]) -> Option<&[u8]> {
let open = key.iter().position(|v| *v == b'{')?;
let close = key[open..].iter().position(|v| *v == b'}')?;
let rv = &key[open + 1..open + close];
(!rv.is_empty()).then_some(rv)
}
pub fn get_slot(key: &[u8]) -> u16 {
let key = match get_hashtag(key) {
Some(tag) => tag,
None => key,
};
slot(key)
}
pub(crate) struct ParsedSlotsResult {
pub(crate) slots_count: u16,
pub(crate) slots: Vec<Slot>,
pub(crate) address_to_ip_map: HashMap<String, IpAddr>,
}
pub(crate) fn parse_and_count_slots(
raw_slot_resp: &Value,
tls: Option<TlsMode>,
addr_of_answering_node: &str,
) -> Result<ParsedSlotsResult> {
let mut slots = Vec::with_capacity(2);
let mut slots_count = 0;
let mut address_to_ip_map = HashMap::new();
if let Value::Array(items) = raw_slot_resp {
for entry in items.iter() {
let item = match entry {
Ok(Value::Array(item)) if item.len() >= 3 => item,
_ => continue,
};
let start = if let Ok(Value::Int(start)) = item[0] {
match u16::try_from(start) {
Ok(v) => v,
Err(_) => continue, }
} else {
continue;
};
let end = if let Ok(Value::Int(end)) = item[1] {
match u16::try_from(end) {
Ok(v) => v,
Err(_) => continue, }
} else {
continue;
};
let mut nodes: Vec<String> = item
.iter()
.skip(2)
.filter_map(|node| {
if let Ok(Value::Array(node)) = node {
if node.len() < 2 {
return None;
}
let primary_identifier = if let Ok(Value::BulkString(ref bytes)) = node[0] {
let received_address = String::from_utf8_lossy(bytes);
if received_address.is_empty() {
addr_of_answering_node.into()
} else if received_address == "?" {
return None;
} else {
received_address
}
} else if let Ok(Value::Nil) = node[0] {
addr_of_answering_node.into()
} else {
return None;
};
if primary_identifier.is_empty() {
return None;
}
let port = if let Ok(Value::Int(port)) = node[1] {
match u16::try_from(port) {
Ok(v) => v,
Err(_) => return None, }
} else {
return None;
};
let mut metadata_ip: Option<IpAddr> = None;
let mut metadata_hostname: Option<String> = None;
if node.len() >= 4 {
let mut process_kv = |key_bytes: &[u8], value_bytes: &[u8]| {
let key_str = String::from_utf8_lossy(key_bytes);
if key_str == "ip" {
metadata_ip =
String::from_utf8_lossy(value_bytes).parse::<IpAddr>().ok();
} else if key_str == "hostname" {
let h = String::from_utf8_lossy(value_bytes);
if !h.is_empty() {
metadata_hostname = Some(h.into_owned());
}
}
};
match &node[3] {
Ok(Value::Array(metadata)) => {
if metadata.len() % 2 != 0 {
tracing::warn!("cluster_topology - Node metadata array has odd length, some entries may be skipped");
}
for chunk in metadata.chunks_exact(2) {
if let (Ok(Value::BulkString(key)), Ok(Value::BulkString(value))) =
(&chunk[0], &chunk[1])
{
process_kv(key, value);
}
}
}
Ok(Value::Map(metadata)) => {
for (key, value) in metadata {
if let (Value::BulkString(key_bytes), Value::BulkString(value_bytes)) =
(key, value)
{
process_kv(key_bytes, value_bytes);
}
}
}
other => {
tracing::warn!("cluster_topology - Unexpected node metadata format: {:?}", other);
}
}
}
let (canonical_hostname, resolved_ip) =
if let Ok(primary_as_ip) = primary_identifier.parse::<IpAddr>() {
(
metadata_hostname
.unwrap_or_else(|| primary_identifier.into_owned()),
Some(primary_as_ip),
)
} else {
(primary_identifier.into_owned(), metadata_ip)
};
let connection_addr =
get_connection_addr(canonical_hostname, port, tls, None).to_string();
if let Some(ip) = resolved_ip {
address_to_ip_map.insert(connection_addr.clone(), ip);
}
Some(connection_addr)
} else {
None
}
})
.collect();
if nodes.is_empty() {
continue;
}
slots_count += end - start + 1;
let mut replicas = nodes.split_off(1);
replicas.sort_unstable();
slots.push(Slot::new(start, end, nodes.pop().unwrap(), replicas));
}
}
if slots.is_empty() {
return Err(Error::from((
ErrorKind::ResponseError,
"Error parsing slots: No healthy node found",
format!("Raw slot map response: {raw_slot_resp:?}"),
)));
}
Ok(ParsedSlotsResult {
slots_count,
slots,
address_to_ip_map,
})
}
fn calculate_hash<T: Hash>(t: &T) -> u64 {
struct FnvHasher(u64);
impl Hasher for FnvHasher {
fn finish(&self) -> u64 {
self.0
}
fn write(&mut self, bytes: &[u8]) {
for &byte in bytes {
self.0 ^= byte as u64;
self.0 = self.0.wrapping_mul(0x100000001b3);
}
}
}
let mut hasher = FnvHasher(0xcbf29ce484222325); t.hash(&mut hasher);
hasher.finish()
}
pub(crate) fn calculate_topology<'a>(
topology_views: impl Iterator<Item = (&'a str, &'a Value)>,
curr_retry: usize,
tls_mode: Option<TlsMode>,
num_of_queried_nodes: usize,
read_from_replica: ReadFromReplicaStrategy,
) -> Result<(SlotMap, TopologyHash)> {
let mut hash_view_map = HashMap::new();
for (host, view) in topology_views {
if let Ok(ParsedSlotsResult {
slots_count,
slots,
address_to_ip_map,
}) = parse_and_count_slots(view, tls_mode, host)
{
let hash_value = calculate_hash(&(slots_count, &slots));
let topology_entry = hash_view_map.entry(hash_value).or_insert(TopologyView {
hash_value,
nodes_count: 0,
slots_and_count: (slots_count, slots),
address_to_ip_map,
});
topology_entry.nodes_count += 1;
}
}
let mut non_unique_max_node_count = false;
let mut vec_iter = hash_view_map.into_values();
let mut most_frequent_topology = match vec_iter.next() {
Some(view) => view,
None => {
return Err(Error::from((
ErrorKind::ResponseError,
"No topology views found",
)));
}
};
for curr_view in vec_iter {
match most_frequent_topology
.nodes_count
.cmp(&curr_view.nodes_count)
{
std::cmp::Ordering::Less => {
most_frequent_topology = curr_view;
non_unique_max_node_count = false;
}
std::cmp::Ordering::Greater => continue,
std::cmp::Ordering::Equal => {
non_unique_max_node_count = true;
let seen_slot_count = most_frequent_topology.slots_and_count.0;
if let std::cmp::Ordering::Less = seen_slot_count.cmp(&curr_view.slots_and_count.0)
{
most_frequent_topology = curr_view;
}
}
}
}
let parse_and_built_result = |most_frequent_topology: TopologyView| {
info!(
"calculate_topology found topology map:\n{:?}",
most_frequent_topology
);
let slots_data = most_frequent_topology.slots_and_count.1;
Ok((
SlotMap::new(
slots_data,
most_frequent_topology.address_to_ip_map,
read_from_replica,
),
most_frequent_topology.hash_value,
))
};
if non_unique_max_node_count {
if curr_retry >= DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES || num_of_queried_nodes < 3 {
return parse_and_built_result(most_frequent_topology);
}
return Err(Error::from((
ErrorKind::ResponseError,
"Slot refresh error: Failed to obtain a majority in topology views",
)));
}
let agreement_rate = most_frequent_topology.nodes_count as f32 / num_of_queried_nodes as f32;
const MIN_AGREEMENT_RATE_MAJORITY: f32 = 0.5;
const MIN_AGREEMENT_RATE_FALLBACK: f32 = 0.2;
let min_rate = if curr_retry >= DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES {
MIN_AGREEMENT_RATE_FALLBACK
} else {
MIN_AGREEMENT_RATE_MAJORITY
};
if agreement_rate >= min_rate {
parse_and_built_result(most_frequent_topology)
} else {
Err(Error::from((
ErrorKind::ResponseError,
"Slot refresh error: The accuracy of the topology view is too low",
)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cluster::routing::ShardAddrs;
#[test]
fn test_get_hashtag() {
assert_eq!(get_hashtag(&b"foo{bar}baz"[..]), Some(&b"bar"[..]));
assert_eq!(get_hashtag(&b"foo{}{baz}"[..]), None);
assert_eq!(get_hashtag(&b"foo{{bar}}zap"[..]), Some(&b"{bar"[..]));
}
fn slot_value_with_replicas(start: u16, end: u16, nodes: Vec<(&str, u16)>) -> Value {
let mut node_values: Vec<Result<Value>> = nodes
.iter()
.map(|(host, port)| {
Ok(Value::Array(vec![
Ok(Value::BulkString(host.as_bytes().to_vec().into())),
Ok(Value::Int(*port as i64)),
]))
})
.collect();
let mut slot_vec = vec![Ok(Value::Int(start as i64)), Ok(Value::Int(end as i64))];
slot_vec.append(&mut node_values);
Value::Array(slot_vec)
}
#[derive(Clone, Copy)]
enum MetadataFormat {
Array,
Map,
}
type NodeWithMetadata<'a> = (&'a str, u16, Option<Vec<(&'a str, &'a str)>>);
fn slot_value_with_metadata(
start: u16,
end: u16,
nodes: Vec<NodeWithMetadata<'_>>,
format: MetadataFormat,
) -> Value {
let node_values: Vec<Result<Value>> = nodes
.iter()
.map(|(host, port, metadata)| {
let mut node_vec: Vec<Result<Value>> = vec![
Ok(Value::BulkString(host.as_bytes().to_vec().into())),
Ok(Value::Int(*port as i64)),
Ok(Value::BulkString(b"node-id-placeholder".to_vec().into())), ];
if let Some(meta) = metadata {
let metadata_value = match format {
MetadataFormat::Array => {
let meta_values: Vec<Result<Value>> = meta
.iter()
.flat_map(|(k, v)| {
vec![
Ok(Value::BulkString(k.as_bytes().to_vec().into())),
Ok(Value::BulkString(v.as_bytes().to_vec().into())),
]
})
.collect();
Value::Array(meta_values)
}
MetadataFormat::Map => {
let meta_pairs: Vec<(Value, Value)> = meta
.iter()
.map(|(k, v)| {
(
Value::BulkString(k.as_bytes().to_vec().into()),
Value::BulkString(v.as_bytes().to_vec().into()),
)
})
.collect();
Value::Map(meta_pairs)
}
};
node_vec.push(Ok(metadata_value));
}
Ok(Value::Array(node_vec))
})
.collect();
let mut slot_vec: Vec<Result<Value>> = vec![Ok(Value::Int(start as i64)), Ok(Value::Int(end as i64))];
slot_vec.extend(node_values);
Value::Array(slot_vec)
}
fn slot_value(start: u16, end: u16, node: &str, port: u16) -> Value {
slot_value_with_replicas(start, end, vec![(node, port)])
}
fn run_with_both_formats<F>(test_fn: F)
where
F: Fn(MetadataFormat),
{
test_fn(MetadataFormat::Array);
test_fn(MetadataFormat::Map);
}
#[test]
fn parse_slots_with_different_replicas_order_returns_the_same_view() {
let view1 = Value::Array(vec![
Ok(slot_value_with_replicas(
0,
4000,
vec![
("primary1", 6379),
("replica1_1", 6379),
("replica1_2", 6379),
("replica1_3", 6379),
],
)),
Ok(slot_value_with_replicas(
4001,
8000,
vec![
("primary2", 6379),
("replica2_1", 6379),
("replica2_2", 6379),
("replica2_3", 6379),
],
)),
Ok(slot_value_with_replicas(
8001,
16383,
vec![
("primary3", 6379),
("replica3_1", 6379),
("replica3_2", 6379),
("replica3_3", 6379),
],
)),
]);
let view2 = Value::Array(vec![
Ok(slot_value_with_replicas(
0,
4000,
vec![
("primary1", 6379),
("replica1_1", 6379),
("replica1_3", 6379),
("replica1_2", 6379),
],
)),
Ok(slot_value_with_replicas(
4001,
8000,
vec![
("primary2", 6379),
("replica2_2", 6379),
("replica2_3", 6379),
("replica2_1", 6379),
],
)),
Ok(slot_value_with_replicas(
8001,
16383,
vec![
("primary3", 6379),
("replica3_3", 6379),
("replica3_1", 6379),
("replica3_2", 6379),
],
)),
]);
let res1 = parse_and_count_slots(&view1, None, "foo").unwrap();
let res2 = parse_and_count_slots(&view2, None, "foo").unwrap();
assert_eq!(
calculate_hash(&(res1.slots_count, &res1.slots)),
calculate_hash(&(res2.slots_count, &res2.slots))
);
assert_eq!(res1.slots_count, res2.slots_count);
assert_eq!(res1.slots.len(), res2.slots.len());
let check = res1
.slots
.into_iter()
.zip(res2.slots)
.all(|(first, second)| first.replicas() == second.replicas());
assert!(check);
}
#[test]
fn parse_slots_returns_slots_with_host_name_if_missing() {
let view = Value::Array(vec![Ok(slot_value(0, 4000, "", 6379))]);
let ParsedSlotsResult {
slots_count, slots, ..
} = parse_and_count_slots(&view, None, "node").unwrap();
assert_eq!(slots_count, 4001);
assert_eq!(slots[0].master(), "node:6379");
}
#[test]
fn should_parse_and_hash_regardless_of_missing_host_name_and_replicas_order() {
let view1 = Value::Array(vec![
Ok(slot_value(0, 4000, "", 6379)),
Ok(slot_value(4001, 8000, "node2", 6380)),
Ok(slot_value_with_replicas(
8001,
16383,
vec![
("node3", 6379),
("replica3_1", 6379),
("replica3_2", 6379),
("replica3_3", 6379),
],
)),
]);
let view2 = Value::Array(vec![
Ok(slot_value(0, 4000, "node1", 6379)),
Ok(slot_value(4001, 8000, "node2", 6380)),
Ok(slot_value_with_replicas(
8001,
16383,
vec![
("", 6379),
("replica3_3", 6379),
("replica3_2", 6379),
("replica3_1", 6379),
],
)),
]);
let res1 = parse_and_count_slots(&view1, None, "node1").unwrap();
let res2 = parse_and_count_slots(&view2, None, "node3").unwrap();
assert_eq!(
calculate_hash(&(res1.slots_count, &res1.slots)),
calculate_hash(&(res2.slots_count, &res2.slots))
);
assert_eq!(res1.slots_count, res2.slots_count);
assert_eq!(res1.slots.len(), res2.slots.len());
let equality_check = res1
.slots
.iter()
.zip(&res2.slots)
.all(|(first, second)| first.start == second.start && first.end == second.end);
assert!(equality_check);
let replicas_check = res1
.slots
.iter()
.zip(&res2.slots)
.all(|(first, second)| first.replicas() == second.replicas());
assert!(replicas_check);
}
#[test]
fn parse_slots_hostname_primary_format_extracts_ip_from_metadata() {
run_with_both_formats(|format| {
let view = Value::Array(vec![Ok(slot_value_with_metadata(
0,
16383,
vec![
(
"valkey-node-1.example.com",
6379,
Some(vec![("ip", "172.31.24.34")]),
),
(
"valkey-node-2.example.com",
6379,
Some(vec![("ip", "172.31.24.35")]),
),
],
format,
))]);
let ParsedSlotsResult {
slots_count,
slots,
address_to_ip_map,
} = parse_and_count_slots(&view, None, "fallback").unwrap();
assert_eq!(slots_count, 16384);
assert_eq!(slots.len(), 1);
assert_eq!(slots[0].master(), "valkey-node-1.example.com:6379");
assert_eq!(
slots[0].replicas(),
vec!["valkey-node-2.example.com:6379".to_string()]
);
assert_eq!(address_to_ip_map.len(), 2);
assert_eq!(
address_to_ip_map.get("valkey-node-1.example.com:6379"),
Some(&"172.31.24.34".parse().unwrap())
);
assert_eq!(
address_to_ip_map.get("valkey-node-2.example.com:6379"),
Some(&"172.31.24.35".parse().unwrap())
);
});
}
#[test]
fn parse_slots_ip_primary_format_extracts_hostname_from_metadata() {
run_with_both_formats(|format| {
let view = Value::Array(vec![Ok(slot_value_with_metadata(
0,
16383,
vec![
(
"127.0.0.1",
30001,
Some(vec![("hostname", "host-1.valkey.example.com")]),
),
(
"127.0.0.2",
30002,
Some(vec![("hostname", "host-2.valkey.example.com")]),
),
],
format,
))]);
let ParsedSlotsResult {
slots_count,
slots,
address_to_ip_map,
} = parse_and_count_slots(&view, None, "fallback").unwrap();
assert_eq!(slots_count, 16384);
assert_eq!(slots.len(), 1);
assert_eq!(slots[0].master(), "host-1.valkey.example.com:30001");
assert_eq!(
slots[0].replicas(),
vec!["host-2.valkey.example.com:30002".to_string()]
);
assert_eq!(address_to_ip_map.len(), 2);
assert_eq!(
address_to_ip_map.get("host-1.valkey.example.com:30001"),
Some(&"127.0.0.1".parse().unwrap())
);
assert_eq!(
address_to_ip_map.get("host-2.valkey.example.com:30002"),
Some(&"127.0.0.2".parse().unwrap())
);
});
}
#[test]
fn parse_slots_valkey_format_without_hostname_uses_ip_as_address() {
run_with_both_formats(|format| {
let view = Value::Array(vec![Ok(slot_value_with_metadata(
0,
16383,
vec![
("192.168.1.1", 6379, Some(vec![("somekey", "somevalue")])),
("192.168.1.2", 6379, None),
],
format,
))]);
let ParsedSlotsResult {
slots_count,
slots,
address_to_ip_map,
} = parse_and_count_slots(&view, None, "fallback").unwrap();
assert_eq!(slots_count, 16384);
assert_eq!(slots.len(), 1);
assert_eq!(slots[0].master(), "192.168.1.1:6379");
assert_eq!(slots[0].replicas(), vec!["192.168.1.2:6379".to_string()]);
assert_eq!(address_to_ip_map.len(), 2);
assert_eq!(
address_to_ip_map.get("192.168.1.1:6379"),
Some(&"192.168.1.1".parse().unwrap())
);
assert_eq!(
address_to_ip_map.get("192.168.1.2:6379"),
Some(&"192.168.1.2".parse().unwrap())
);
});
}
#[test]
fn parse_slots_no_metadata_no_ip_mapping() {
let view = Value::Array(vec![Ok(slot_value_with_replicas(
0,
16383,
vec![("node1", 6379), ("replica1", 6379)],
))]);
let ParsedSlotsResult {
slots,
address_to_ip_map,
..
} = parse_and_count_slots(&view, None, "fallback").unwrap();
assert_eq!(slots[0].master(), "node1:6379");
assert!(address_to_ip_map.is_empty());
}
#[test]
fn parse_slots_mixed_nodes_with_and_without_ip() {
run_with_both_formats(|format| {
let view = Value::Array(vec![Ok(slot_value_with_metadata(
0,
16383,
vec![
("primary.example.com", 6379, Some(vec![("ip", "10.0.0.1")])),
("replica.example.com", 6379, None),
],
format,
))]);
let ParsedSlotsResult {
address_to_ip_map, ..
} = parse_and_count_slots(&view, None, "fallback").unwrap();
assert_eq!(address_to_ip_map.len(), 1);
assert_eq!(
address_to_ip_map.get("primary.example.com:6379"),
Some(&"10.0.0.1".parse().unwrap())
);
assert!(!address_to_ip_map.contains_key("replica.example.com:6379"));
});
}
#[test]
fn parse_slots_invalid_ip_in_metadata_ignored() {
run_with_both_formats(|format| {
let view = Value::Array(vec![Ok(slot_value_with_metadata(
0,
16383,
vec![("node1.example.com", 6379, Some(vec![("ip", "not-an-ip")]))],
format,
))]);
let ParsedSlotsResult {
slots,
address_to_ip_map,
..
} = parse_and_count_slots(&view, None, "fallback").unwrap();
assert_eq!(slots[0].master(), "node1.example.com:6379");
assert!(address_to_ip_map.is_empty());
});
}
#[test]
fn parse_slots_multiple_slot_ranges_with_ip_mapping() {
run_with_both_formats(|format| {
let view = Value::Array(vec![
Ok(slot_value_with_metadata(
0,
5461,
vec![
(
"shard1-primary.example.com",
6379,
Some(vec![("ip", "10.0.1.1")]),
),
(
"shard1-replica.example.com",
6379,
Some(vec![("ip", "10.0.1.2")]),
),
],
format,
)),
Ok(slot_value_with_metadata(
5462,
10922,
vec![(
"shard2-primary.example.com",
6379,
Some(vec![("ip", "10.0.2.1")]),
)],
format,
)),
Ok(slot_value_with_metadata(
10923,
16383,
vec![(
"shard3-primary.example.com",
6379,
Some(vec![("ip", "10.0.3.1")]),
)],
format,
)),
]);
let ParsedSlotsResult {
slots_count,
slots,
address_to_ip_map,
} = parse_and_count_slots(&view, None, "fallback").unwrap();
assert_eq!(slots_count, 16384);
assert_eq!(slots.len(), 3);
assert_eq!(address_to_ip_map.len(), 4);
assert_eq!(
address_to_ip_map.get("shard1-primary.example.com:6379"),
Some(&"10.0.1.1".parse().unwrap())
);
assert_eq!(
address_to_ip_map.get("shard2-primary.example.com:6379"),
Some(&"10.0.2.1".parse().unwrap())
);
assert_eq!(
address_to_ip_map.get("shard1-replica.example.com:6379"),
Some(&"10.0.1.2".parse().unwrap())
);
assert_eq!(
address_to_ip_map.get("shard3-primary.example.com:6379"),
Some(&"10.0.3.1".parse().unwrap())
);
});
}
#[test]
fn parse_slots_ipv6_address_in_metadata() {
run_with_both_formats(|format| {
let view = Value::Array(vec![Ok(slot_value_with_metadata(
0,
16383,
vec![("node1.example.com", 6379, Some(vec![("ip", "2001:db8::1")]))],
format,
))]);
let ParsedSlotsResult {
address_to_ip_map, ..
} = parse_and_count_slots(&view, None, "fallback").unwrap();
assert_eq!(address_to_ip_map.len(), 1);
assert_eq!(
address_to_ip_map.get("node1.example.com:6379"),
Some(&"2001:db8::1".parse().unwrap())
);
});
}
#[test]
fn parse_slots_empty_hostname_in_metadata_falls_back_to_ip() {
run_with_both_formats(|format| {
let view = Value::Array(vec![Ok(slot_value_with_metadata(
0,
16383,
vec![
("172.20.43.71", 6379, Some(vec![("hostname", "")])),
("172.20.78.117", 6379, Some(vec![("hostname", "")])),
],
format,
))]);
let ParsedSlotsResult {
slots_count,
slots,
address_to_ip_map,
} = parse_and_count_slots(&view, None, "fallback").unwrap();
assert_eq!(slots_count, 16384);
assert_eq!(slots.len(), 1);
assert_eq!(slots[0].master(), "172.20.43.71:6379");
assert_eq!(slots[0].replicas(), vec!["172.20.78.117:6379".to_string()]);
assert_eq!(address_to_ip_map.len(), 2);
assert_eq!(
address_to_ip_map.get("172.20.43.71:6379"),
Some(&"172.20.43.71".parse().unwrap())
);
assert_eq!(
address_to_ip_map.get("172.20.78.117:6379"),
Some(&"172.20.78.117".parse().unwrap())
);
});
}
enum ViewType {
SingleNodeViewFullCoverage,
SingleNodeViewMissingSlots,
TwoNodesViewFullCoverage,
TwoNodesViewMissingSlots,
}
fn get_view(view_type: &ViewType) -> (&str, Value) {
match view_type {
ViewType::SingleNodeViewFullCoverage => (
"first",
Value::Array(vec![Ok(slot_value(0, 16383, "node1", 6379))]),
),
ViewType::SingleNodeViewMissingSlots => (
"second",
Value::Array(vec![Ok(slot_value(0, 4000, "node1", 6379))]),
),
ViewType::TwoNodesViewFullCoverage => (
"third",
Value::Array(vec![
Ok(slot_value(0, 4000, "node1", 6379)),
Ok(slot_value(4001, 16383, "node2", 6380)),
]),
),
ViewType::TwoNodesViewMissingSlots => (
"fourth",
Value::Array(vec![
Ok(slot_value(0, 3000, "node3", 6381)),
Ok(slot_value(4001, 16383, "node4", 6382)),
]),
),
}
}
fn get_node_addr(name: &str, port: u16) -> Arc<ShardAddrs> {
Arc::new(ShardAddrs::new(format!("{name}:{port}").into(), Vec::new()))
}
fn collect_shard_addrs(slot_map: &SlotMap) -> Vec<Arc<ShardAddrs>> {
let mut shard_addrs: Vec<Arc<ShardAddrs>> = slot_map
.nodes_map()
.iter()
.map(|map_item| {
let shard_addrs = map_item.value().1.clone();
shard_addrs.clone()
})
.collect();
shard_addrs.sort_unstable();
shard_addrs
}
#[test]
fn test_topology_calculator_4_nodes_queried_has_a_majority_success() {
let queried_nodes: usize = 4;
let topology_results = [get_view(&ViewType::SingleNodeViewFullCoverage),
get_view(&ViewType::SingleNodeViewFullCoverage),
get_view(&ViewType::TwoNodesViewFullCoverage)];
let (topology_view, _) = calculate_topology(
topology_results.iter().map(|(addr, value)| (*addr, value)),
1,
None,
queried_nodes,
ReadFromReplicaStrategy::AlwaysFromPrimary,
)
.unwrap();
let res = collect_shard_addrs(&topology_view);
let node_1 = get_node_addr("node1", 6379);
let expected = vec![node_1];
assert_eq!(res, expected);
}
#[test]
fn test_topology_calculator_3_nodes_queried_no_majority_has_more_retries_raise_error() {
let queried_nodes = 3;
let topology_results = [get_view(&ViewType::SingleNodeViewFullCoverage),
get_view(&ViewType::TwoNodesViewFullCoverage),
get_view(&ViewType::TwoNodesViewMissingSlots)];
let topology_view = calculate_topology(
topology_results.iter().map(|(addr, value)| (*addr, value)),
1,
None,
queried_nodes,
ReadFromReplicaStrategy::AlwaysFromPrimary,
);
assert!(topology_view.is_err());
}
#[test]
fn test_topology_calculator_3_nodes_queried_no_majority_last_retry_success() {
let queried_nodes = 3;
let topology_results = [get_view(&ViewType::SingleNodeViewMissingSlots),
get_view(&ViewType::TwoNodesViewFullCoverage),
get_view(&ViewType::TwoNodesViewMissingSlots)];
let (topology_view, _) = calculate_topology(
topology_results.iter().map(|(addr, value)| (*addr, value)),
3,
None,
queried_nodes,
ReadFromReplicaStrategy::AlwaysFromPrimary,
)
.unwrap();
let res = collect_shard_addrs(&topology_view);
let node_1 = get_node_addr("node1", 6379);
let node_2 = get_node_addr("node2", 6380);
let expected = vec![node_1, node_2];
assert_eq!(res, expected);
}
#[test]
fn test_topology_calculator_2_nodes_queried_no_majority_return_full_slot_coverage_view() {
let queried_nodes = 2;
let topology_results = [
get_view(&ViewType::TwoNodesViewFullCoverage),
get_view(&ViewType::TwoNodesViewMissingSlots),
];
let (topology_view, _) = calculate_topology(
topology_results.iter().map(|(addr, value)| (*addr, value)),
1,
None,
queried_nodes,
ReadFromReplicaStrategy::AlwaysFromPrimary,
)
.unwrap();
let res = collect_shard_addrs(&topology_view);
let node_1 = get_node_addr("node1", 6379);
let node_2 = get_node_addr("node2", 6380);
let expected = vec![node_1, node_2];
assert_eq!(res, expected);
}
#[test]
fn test_topology_calculator_2_nodes_queried_no_majority_no_full_coverage_prefer_fuller_coverage()
{
let queried_nodes = 2;
let topology_results = [
get_view(&ViewType::SingleNodeViewMissingSlots),
get_view(&ViewType::TwoNodesViewMissingSlots),
];
let (topology_view, _) = calculate_topology(
topology_results.iter().map(|(addr, value)| (*addr, value)),
1,
None,
queried_nodes,
ReadFromReplicaStrategy::AlwaysFromPrimary,
)
.unwrap();
let res = collect_shard_addrs(&topology_view);
let node_1 = get_node_addr("node3", 6381);
let node_2 = get_node_addr("node4", 6382);
let expected = vec![node_1, node_2];
assert_eq!(res, expected);
}
#[test]
fn test_topology_calculator_3_nodes_queried_no_full_coverage_prefer_majority() {
let queried_nodes = 2;
let topology_results = [get_view(&ViewType::SingleNodeViewMissingSlots),
get_view(&ViewType::TwoNodesViewMissingSlots),
get_view(&ViewType::SingleNodeViewMissingSlots)];
let (topology_view, _) = calculate_topology(
topology_results.iter().map(|(addr, value)| (*addr, value)),
1,
None,
queried_nodes,
ReadFromReplicaStrategy::AlwaysFromPrimary,
)
.unwrap();
let res = collect_shard_addrs(&topology_view);
let node_1 = get_node_addr("node1", 6379);
let expected = vec![node_1];
assert_eq!(res, expected);
}
}