use std::{
collections::{BTreeMap, HashMap, HashSet},
fmt::Display,
net::IpAddr,
sync::{Arc, atomic::AtomicUsize},
};
use dashmap::DashMap;
use crate::cluster::routing::{Route, ShardAddrs, Slot, SlotAddr};
use crate::value::ErrorKind;
use crate::value::Error;
use crate::value::Result;
pub(crate) type NodesMap = DashMap<Arc<String>, (Option<IpAddr>, Arc<ShardAddrs>)>;
#[derive(Debug)]
pub struct SlotMapValue {
pub start: u16,
pub addrs: Arc<ShardAddrs>,
pub last_used_replica: Arc<AtomicUsize>,
}
#[derive(Debug, Default, Clone, PartialEq)]
pub enum ReadFromReplicaStrategy {
#[default]
AlwaysFromPrimary,
RoundRobin,
AZAffinity(String),
AZAffinityReplicasAndPrimary(String),
}
#[derive(Debug, Default)]
pub struct SlotMap {
slots: BTreeMap<u16, SlotMapValue>,
nodes_map: NodesMap,
read_from_replica: ReadFromReplicaStrategy,
}
fn get_address_from_slot(
slot: &SlotMapValue,
read_from_replica: ReadFromReplicaStrategy,
slot_addr: SlotAddr,
) -> Result<Arc<String>> {
let addrs = &slot.addrs;
if slot_addr == SlotAddr::Master || addrs.replicas().is_empty() {
return Ok(addrs.primary());
}
match read_from_replica {
ReadFromReplicaStrategy::AlwaysFromPrimary => Ok(addrs.primary()),
ReadFromReplicaStrategy::RoundRobin => {
let index = slot
.last_used_replica
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
% addrs.replicas().len();
Ok(addrs.replicas()[index].clone())
}
ReadFromReplicaStrategy::AZAffinity(_) | ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(_) => {
Err(Error::from((
ErrorKind::InvalidClientConfig,
"AZAffinity and AZAffinityReplicasAndPrimary are not supported in the sync client",
)))
}
}
}
impl SlotMap {
pub(crate) fn new_with_read_strategy(read_from_replica: ReadFromReplicaStrategy) -> Self {
SlotMap {
slots: BTreeMap::new(),
nodes_map: DashMap::new(),
read_from_replica,
}
}
pub(crate) fn new(
slots: Vec<Slot>,
ip_mappings: HashMap<String, IpAddr>,
read_from_replica: ReadFromReplicaStrategy,
) -> Self {
let mut slot_map = SlotMap::new_with_read_strategy(read_from_replica);
for slot in slots {
let primary_addr = Arc::new(slot.master.clone());
let primary_ip = ip_mappings.get(slot.master.as_str()).copied();
let shard_addrs_arc = slot_map
.nodes_map
.entry(primary_addr.clone())
.or_insert_with(|| {
let replicas: Vec<Arc<String>> =
slot.replicas.into_iter().map(Arc::new).collect();
(
primary_ip,
Arc::new(ShardAddrs::new(primary_addr, replicas)),
)
})
.1
.clone();
shard_addrs_arc.replicas().iter().for_each(|replica_addr| {
let replica_ip = ip_mappings.get(replica_addr.as_str()).copied();
slot_map
.nodes_map
.entry(replica_addr.clone())
.or_insert((replica_ip, shard_addrs_arc.clone()));
});
slot_map.slots.insert(
slot.end,
SlotMapValue {
addrs: shard_addrs_arc,
start: slot.start,
last_used_replica: Arc::new(AtomicUsize::new(0)),
},
);
}
slot_map
}
pub(crate) fn nodes_map(&self) -> &NodesMap {
&self.nodes_map
}
pub fn is_primary(&self, address: &String) -> bool {
self.nodes_map.get(address).is_some_and(|entry| {
let (_ip, shard_addrs) = entry.value();
*shard_addrs.primary() == *address
})
}
pub fn slot_value_for_route(&self, route: &Route) -> Option<&SlotMapValue> {
let slot = route.slot();
self.slots
.range(slot..)
.next()
.and_then(|(end, slot_value)| {
if slot <= *end && slot_value.start <= slot {
Some(slot_value)
} else {
None
}
})
}
pub fn slot_addr_for_route(&self, route: &Route) -> Option<Arc<String>> {
self.slot_value_for_route(route).and_then(|slot_value| {
get_address_from_slot(
slot_value,
self.read_from_replica.clone(),
route.slot_addr(),
)
.ok()
})
}
pub fn shard_addrs_for_slot(&self, slot: u16) -> Option<Arc<ShardAddrs>> {
self.slots
.range(slot..)
.next()
.map(|(_, slot_value)| slot_value.addrs.clone())
}
pub(crate) fn node_address_for_ip(&self, ip: IpAddr) -> Option<Arc<String>> {
self.nodes_map.iter().find_map(|entry| {
let (node_ip, _shard_addrs) = entry.value();
(*node_ip == Some(ip)).then(|| entry.key().clone())
})
}
pub fn addresses_for_all_primaries(&self) -> HashSet<Arc<String>> {
self.slots
.values()
.map(|slot_value| slot_value.addrs.primary().clone())
.collect()
}
pub fn all_node_addresses(&self) -> HashSet<Arc<String>> {
self.nodes_map
.iter()
.map(|map_item| {
let node_addr = map_item.key();
node_addr.clone()
})
.collect()
}
pub fn addresses_for_multi_slot<'a, 'b>(
&'a self,
routes: &'b [(Route, Vec<usize>)],
) -> impl Iterator<Item = Option<Arc<String>>> + 'a
where
'b: 'a,
{
routes
.iter()
.map(|(route, _)| self.slot_addr_for_route(route))
}
pub(crate) fn get_slots_of_node(&self, node_address: Arc<String>) -> Vec<u16> {
self.slots
.iter()
.filter_map(|(end, slot_value)| {
let addrs = &slot_value.addrs;
if addrs.primary() == node_address || addrs.replicas().contains(&node_address) {
Some(slot_value.start..(*end + 1))
} else {
None
}
})
.flatten()
.collect()
}
pub fn node_address_for_slot(&self, slot: u16, slot_addr: SlotAddr) -> Option<Arc<String>> {
self.slots.range(slot..).next().and_then(|(_, slot_value)| {
if slot_value.start <= slot {
get_address_from_slot(
slot_value,
self.read_from_replica.clone(),
slot_addr,
)
.ok()
} else {
None
}
})
}
fn insert_single_slot(
&mut self,
slot: u16,
shard_addrs: Arc<ShardAddrs>,
) -> Option<SlotMapValue> {
self.slots.insert(
slot,
SlotMapValue {
start: slot,
addrs: shard_addrs,
last_used_replica: Arc::new(AtomicUsize::new(0)),
},
)
}
pub(crate) fn add_new_primary(
&mut self,
slot: u16,
node_addr: Arc<String>,
ip_addr: Option<IpAddr>,
) -> Result<()> {
let shard_addrs = Arc::new(ShardAddrs::new_with_primary(node_addr.clone()));
self.nodes_map
.insert(node_addr, (ip_addr, shard_addrs.clone()));
self.update_slot_range(slot, shard_addrs)
}
fn shard_addrs_equal(shard1: &Arc<ShardAddrs>, shard2: &Arc<ShardAddrs>) -> bool {
Arc::ptr_eq(shard1, shard2)
}
fn update_end_range(&mut self, curr_end: u16, new_end: u16) -> Result<()> {
if let Some(curr_slot_val) = self.slots.remove(&curr_end) {
self.slots.insert(new_end, curr_slot_val);
return Ok(());
}
Err(Error::from((
ErrorKind::ClientError,
"Couldn't find slot range with end: {curr_end:?} in the slot map",
)))
}
fn try_merge_to_next_range(&mut self, slot: u16, new_addrs: Arc<ShardAddrs>) -> bool {
if let Some((_next_end, next_slot_value)) = self.slots.range_mut((slot + 1)..).next()
&& next_slot_value.start == slot + 1
&& Self::shard_addrs_equal(&next_slot_value.addrs, &new_addrs)
{
next_slot_value.start = slot;
return true;
}
false
}
fn try_merge_to_prev_range(
&mut self,
slot: u16,
new_addrs: Arc<ShardAddrs>,
) -> Result<bool> {
if let Some((prev_end, prev_slot_value)) = self.slots.range_mut(..slot).next_back()
&& *prev_end == slot - 1
&& Self::shard_addrs_equal(&prev_slot_value.addrs, &new_addrs)
{
let prev_end = *prev_end;
self.update_end_range(prev_end, slot)?;
return Ok(true);
}
Ok(false)
}
pub(crate) fn update_slot_range(
&mut self,
slot: u16,
new_addrs: Arc<ShardAddrs>,
) -> Result<()> {
let curr_tree_node =
self.slots
.range_mut(slot..)
.next()
.and_then(|(&end, slot_map_value)| {
if slot >= slot_map_value.start && slot <= end {
Some((end, slot_map_value))
} else {
None
}
});
if let Some((curr_end, curr_slot_val)) = curr_tree_node {
if Self::shard_addrs_equal(&curr_slot_val.addrs, &new_addrs) {
return Ok(());
}
else if curr_slot_val.start == curr_end && curr_slot_val.start == slot {
curr_slot_val.addrs = new_addrs;
} else if slot == curr_end {
if self.try_merge_to_next_range(slot, new_addrs.clone()) {
self.update_end_range(curr_end, curr_end - 1)?;
} else {
let curr_slot_val = self.insert_single_slot(curr_end, new_addrs);
if let Some(curr_slot_val) = curr_slot_val {
self.slots.insert(curr_end - 1, curr_slot_val);
}
}
} else if slot == curr_slot_val.start {
curr_slot_val.start += 1;
if !self.try_merge_to_prev_range(slot, new_addrs.clone())? {
self.insert_single_slot(slot, new_addrs);
}
} else if slot > curr_slot_val.start && slot < curr_end {
let start: u16 = curr_slot_val.start;
let addrs = curr_slot_val.addrs.clone();
let last_used_replica = curr_slot_val.last_used_replica.clone();
curr_slot_val.start = slot + 1;
self.slots.insert(
slot - 1,
SlotMapValue {
start,
addrs,
last_used_replica,
},
);
self.insert_single_slot(slot, new_addrs);
}
} else {
if !self.try_merge_to_prev_range(slot, new_addrs.clone())?
&& !self.try_merge_to_next_range(slot, new_addrs.clone())
{
self.insert_single_slot(slot, new_addrs);
}
}
Ok(())
}
}
impl Display for SlotMap {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "Strategy: {:?}", self.read_from_replica)?;
writeln!(f, "\nSlot mapping:")?;
for (end, slot_map_value) in self.slots.iter() {
let addrs = &slot_map_value.addrs;
writeln!(
f,
" ({}-{}): primary: {}, replicas: {:?}",
slot_map_value.start,
end,
addrs.primary(),
addrs.replicas()
)?;
}
writeln!(f, "\nNode IP mappings:")?;
let mut nodes: Vec<_> = self.nodes_map.iter().collect();
nodes.sort_by(|a, b| a.key().cmp(b.key()));
for entry in nodes {
let node_addr = entry.key();
let (ip_opt, _shard_addrs) = entry.value();
match ip_opt {
Some(ip) => writeln!(f, " {} -> {}", node_addr, ip)?,
None => writeln!(f, " {} -> (no IP)", node_addr)?,
}
}
Ok(())
}
}
#[cfg(test)]
mod tests_cluster_slotmap {
use super::*;
fn process_expected(expected: Vec<&str>) -> HashSet<Arc<String>> {
<HashSet<&str> as IntoIterator>::into_iter(HashSet::from_iter(expected))
.map(|s| Arc::new(s.to_string()))
.collect()
}
fn process_expected_with_option(expected: Vec<Option<&str>>) -> Vec<Arc<String>> {
expected
.into_iter()
.filter_map(|opt| opt.map(|s| Arc::new(s.to_string())))
.collect()
}
#[test]
fn test_slot_map_retrieve_routes() {
let slot_map = SlotMap::new(
vec![
Slot::new(
1,
1000,
"node1:6379".to_owned(),
vec!["replica1:6379".to_owned()],
),
Slot::new(
1002,
2000,
"node2:6379".to_owned(),
vec!["replica2:6379".to_owned()],
),
],
HashMap::new(),
ReadFromReplicaStrategy::AlwaysFromPrimary,
);
assert!(
slot_map
.slot_addr_for_route(&Route::new(0, SlotAddr::Master))
.is_none()
);
assert_eq!(
"node1:6379",
*slot_map
.slot_addr_for_route(&Route::new(1, SlotAddr::Master))
.unwrap()
);
assert_eq!(
"node1:6379",
*slot_map
.slot_addr_for_route(&Route::new(500, SlotAddr::Master))
.unwrap()
);
assert_eq!(
"node1:6379",
*slot_map
.slot_addr_for_route(&Route::new(1000, SlotAddr::Master))
.unwrap()
);
assert!(
slot_map
.slot_addr_for_route(&Route::new(1001, SlotAddr::Master))
.is_none()
);
assert_eq!(
"node2:6379",
*slot_map
.slot_addr_for_route(&Route::new(1002, SlotAddr::Master))
.unwrap()
);
assert_eq!(
"node2:6379",
*slot_map
.slot_addr_for_route(&Route::new(1500, SlotAddr::Master))
.unwrap()
);
assert_eq!(
"node2:6379",
*slot_map
.slot_addr_for_route(&Route::new(2000, SlotAddr::Master))
.unwrap()
);
assert!(
slot_map
.slot_addr_for_route(&Route::new(2001, SlotAddr::Master))
.is_none()
);
}
fn get_slot_map(read_from_replica: ReadFromReplicaStrategy) -> SlotMap {
SlotMap::new(
vec![
Slot::new(
1,
1000,
"node1:6379".to_owned(),
vec!["replica1:6379".to_owned()],
),
Slot::new(
1002,
2000,
"node2:6379".to_owned(),
vec!["replica2:6379".to_owned(), "replica3:6379".to_owned()],
),
Slot::new(
2001,
3000,
"node3:6379".to_owned(),
vec![
"replica4:6379".to_owned(),
"replica5:6379".to_owned(),
"replica6:6379".to_owned(),
],
),
Slot::new(
3001,
4000,
"node2:6379".to_owned(),
vec!["replica2:6379".to_owned(), "replica3:6379".to_owned()],
),
],
HashMap::new(),
read_from_replica,
)
}
fn get_slot_map_with_ip_mappings() -> SlotMap {
let mut ip_mappings = HashMap::new();
ip_mappings.insert("node1:6379".to_string(), "10.0.0.1".parse().unwrap());
ip_mappings.insert("replica1:6379".to_string(), "10.0.0.2".parse().unwrap());
ip_mappings.insert("node2:6379".to_string(), "10.0.0.3".parse().unwrap());
ip_mappings.insert("replica2:6379".to_string(), "10.0.0.4".parse().unwrap());
SlotMap::new(
vec![
Slot::new(
0,
5461,
"node1:6379".to_owned(),
vec!["replica1:6379".to_owned()],
),
Slot::new(
5462,
10922,
"node2:6379".to_owned(),
vec!["replica2:6379".to_owned()],
),
Slot::new(
10923,
16383,
"node3:6379".to_owned(),
vec!["replica3:6379".to_owned()],
),
],
ip_mappings,
ReadFromReplicaStrategy::AlwaysFromPrimary,
)
}
#[test]
fn test_slot_map_get_all_primaries() {
let slot_map = get_slot_map(ReadFromReplicaStrategy::AlwaysFromPrimary);
let addresses = slot_map.addresses_for_all_primaries();
assert_eq!(
addresses,
process_expected(vec!["node1:6379", "node2:6379", "node3:6379"])
);
}
#[test]
fn test_slot_map_get_all_nodes() {
let slot_map = get_slot_map(ReadFromReplicaStrategy::AlwaysFromPrimary);
let addresses = slot_map.all_node_addresses();
assert_eq!(
addresses,
process_expected(vec![
"node1:6379",
"node2:6379",
"node3:6379",
"replica1:6379",
"replica2:6379",
"replica3:6379",
"replica4:6379",
"replica5:6379",
"replica6:6379"
])
);
}
#[test]
fn test_slot_map_get_multi_node() {
let slot_map = get_slot_map(ReadFromReplicaStrategy::RoundRobin);
let routes = vec![
(Route::new(1, SlotAddr::Master), vec![]),
(Route::new(2001, SlotAddr::ReplicaOptional), vec![]),
];
let addresses = slot_map
.addresses_for_multi_slot(&routes)
.collect::<Vec<_>>();
assert!(addresses.contains(&Some(Arc::new("node1:6379".to_string()))));
assert!(
addresses.contains(&Some(Arc::new("replica4:6379".to_string())))
|| addresses.contains(&Some(Arc::new("replica5:6379".to_string())))
|| addresses.contains(&Some(Arc::new("replica6:6379".to_string())))
);
}
#[test]
fn test_slot_map_get_repeating_addresses_when_the_same_node_is_found_in_multi_slot() {
let slot_map = get_slot_map(ReadFromReplicaStrategy::RoundRobin);
let routes = vec![
(Route::new(1, SlotAddr::ReplicaOptional), vec![]),
(Route::new(2001, SlotAddr::Master), vec![]),
(Route::new(2, SlotAddr::ReplicaOptional), vec![]),
(Route::new(2002, SlotAddr::Master), vec![]),
(Route::new(3, SlotAddr::ReplicaOptional), vec![]),
(Route::new(2003, SlotAddr::Master), vec![]),
];
let addresses: Vec<Arc<String>> = slot_map
.addresses_for_multi_slot(&routes)
.flatten()
.collect();
assert_eq!(
addresses,
process_expected_with_option(vec![
Some("replica1:6379"),
Some("node3:6379"),
Some("replica1:6379"),
Some("node3:6379"),
Some("replica1:6379"),
Some("node3:6379")
])
);
}
#[test]
fn test_slot_map_get_none_when_slot_is_missing_from_multi_slot() {
let slot_map = get_slot_map(ReadFromReplicaStrategy::RoundRobin);
let routes = vec![
(Route::new(1, SlotAddr::ReplicaOptional), vec![]),
(Route::new(5000, SlotAddr::Master), vec![]),
(Route::new(6000, SlotAddr::ReplicaOptional), vec![]),
(Route::new(2002, SlotAddr::Master), vec![]),
];
let addresses: Vec<Arc<String>> = slot_map
.addresses_for_multi_slot(&routes)
.flatten()
.collect();
assert_eq!(
addresses,
process_expected_with_option(vec![
Some("replica1:6379"),
None,
None,
Some("node3:6379")
])
);
}
#[test]
fn test_slot_map_rotate_read_replicas() {
let slot_map = get_slot_map(ReadFromReplicaStrategy::RoundRobin);
let route = Route::new(2001, SlotAddr::ReplicaOptional);
let mut addresses = vec![
slot_map.slot_addr_for_route(&route).unwrap(),
slot_map.slot_addr_for_route(&route).unwrap(),
slot_map.slot_addr_for_route(&route).unwrap(),
];
addresses.sort();
assert_eq!(
addresses,
vec!["replica4:6379", "replica5:6379", "replica6:6379"]
.into_iter()
.map(|s| Arc::new(s.to_string()))
.collect::<Vec<_>>()
);
}
#[test]
fn test_get_slots_of_node() {
let slot_map = get_slot_map(ReadFromReplicaStrategy::AlwaysFromPrimary);
assert_eq!(
slot_map.get_slots_of_node(Arc::new("node1:6379".to_string())),
(1..1001).collect::<Vec<u16>>()
);
assert_eq!(
slot_map.get_slots_of_node(Arc::new("node2:6379".to_string())),
vec![1002..2001, 3001..4001]
.into_iter()
.flatten()
.collect::<Vec<u16>>()
);
assert_eq!(
slot_map.get_slots_of_node(Arc::new("replica3:6379".to_string())),
vec![1002..2001, 3001..4001]
.into_iter()
.flatten()
.collect::<Vec<u16>>()
);
assert_eq!(
slot_map.get_slots_of_node(Arc::new("replica4:6379".to_string())),
(2001..3001).collect::<Vec<u16>>()
);
assert_eq!(
slot_map.get_slots_of_node(Arc::new("replica5:6379".to_string())),
(2001..3001).collect::<Vec<u16>>()
);
assert_eq!(
slot_map.get_slots_of_node(Arc::new("replica6:6379".to_string())),
(2001..3001).collect::<Vec<u16>>()
);
}
fn create_slot(start: u16, end: u16, master: &str, replicas: Vec<&str>) -> Slot {
Slot::new(
start,
end,
master.to_owned(),
replicas.into_iter().map(|r| r.to_owned()).collect(),
)
}
fn assert_equal_slot_maps(this: SlotMap, expected: Vec<Slot>) {
for ((end, slot_value), expected_slot) in this.slots.iter().zip(expected.iter()) {
assert_eq!(*end, expected_slot.end);
assert_eq!(slot_value.start, expected_slot.start);
let shard_addrs = &slot_value.addrs;
assert_eq!(*shard_addrs.primary(), expected_slot.master);
let _ = shard_addrs
.replicas()
.iter()
.zip(expected_slot.replicas.iter())
.map(|(curr, expected)| {
assert_eq!(**curr, *expected);
});
}
}
fn assert_slot_map_and_shard_addrs(
slot_map: SlotMap,
slot: u16,
new_shard_addrs: Arc<ShardAddrs>,
expected_slots: Vec<Slot>,
) {
assert!(SlotMap::shard_addrs_equal(
&slot_map.shard_addrs_for_slot(slot).unwrap(),
&new_shard_addrs
));
assert_equal_slot_maps(slot_map, expected_slots);
}
#[test]
fn test_update_slot_range_single_slot_range() {
let test_slot = 8000;
let before_slots = vec![
create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]),
create_slot(8000, 8000, "node1:6379", vec!["replica1:6379"]),
create_slot(8001, 16383, "node3:6379", vec!["replica3:6379"]),
];
let mut slot_map = SlotMap::new(
before_slots,
HashMap::new(),
ReadFromReplicaStrategy::AlwaysFromPrimary,
);
let new_shard_addrs = slot_map
.shard_addrs_for_slot(8001)
.expect("Couldn't find shard address for slot");
let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone());
assert!(res.is_ok(), "{res:?}");
let after_slots = vec![
create_slot(0, test_slot - 1, "node1:6379", vec!["replica1:6379"]),
create_slot(test_slot, test_slot, "node3:6379", vec!["replica3:6379"]),
create_slot(test_slot + 1, 16383, "node3:6379", vec!["replica3:6379"]),
];
assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots);
}
#[test]
fn test_update_slot_range_slot_matches_end_range_merge_ranges() {
let test_slot = 7999;
let before_slots = vec![
create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]),
create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]),
];
let mut slot_map = SlotMap::new(
before_slots,
HashMap::new(),
ReadFromReplicaStrategy::AlwaysFromPrimary,
);
let new_shard_addrs = slot_map
.shard_addrs_for_slot(8000)
.expect("Couldn't find shard address for slot");
let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone());
assert!(res.is_ok(), "{res:?}");
let after_slots = vec![
create_slot(0, test_slot - 1, "node1:6379", vec!["replica1:6379"]),
create_slot(test_slot, 16383, "node2:6379", vec!["replica2:6379"]),
];
assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots);
}
#[test]
fn test_update_slot_range_slot_matches_end_range_cant_merge_ranges() {
let test_slot = 7999;
let before_slots = vec![
create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]),
create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]),
];
let mut slot_map = SlotMap::new(
before_slots,
HashMap::new(),
ReadFromReplicaStrategy::AlwaysFromPrimary,
);
let new_shard_addrs = Arc::new(ShardAddrs::new(
Arc::new("node3:6379".to_owned()),
vec![Arc::new("replica3:6379".to_owned())],
));
let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone());
assert!(res.is_ok(), "{res:?}");
let after_slots = vec![
create_slot(0, test_slot - 1, "node1:6379", vec!["replica1:6379"]),
create_slot(test_slot, test_slot, "node3:6379", vec!["replica3:6379"]),
create_slot(test_slot + 1, 16383, "node2:6379", vec!["replica2:6379"]),
];
assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots);
}
#[test]
fn test_update_slot_range_slot_matches_start_range_merge_ranges() {
let test_slot = 8000;
let before_slots = vec![
create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]),
create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]),
];
let mut slot_map = SlotMap::new(
before_slots,
HashMap::new(),
ReadFromReplicaStrategy::AlwaysFromPrimary,
);
let new_shard_addrs = slot_map
.shard_addrs_for_slot(7999)
.expect("Couldn't find shard address for slot");
let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone());
assert!(res.is_ok(), "{res:?}");
let after_slots = vec![
create_slot(0, test_slot, "node1:6379", vec!["replica1:6379"]),
create_slot(test_slot + 1, 16383, "node2:6379", vec!["replica2:6379"]),
];
assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots);
}
#[test]
fn test_update_slot_range_slot_matches_start_range_cant_merge_ranges() {
let test_slot = 8000;
let before_slots = vec![
create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]),
create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]),
];
let mut slot_map = SlotMap::new(
before_slots,
HashMap::new(),
ReadFromReplicaStrategy::AlwaysFromPrimary,
);
let new_shard_addrs = Arc::new(ShardAddrs::new(
Arc::new("node3:6379".to_owned()),
vec![Arc::new("replica3:6379".to_owned())],
));
let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone());
assert!(res.is_ok(), "{res:?}");
let after_slots = vec![
create_slot(0, test_slot - 1, "node1:6379", vec!["replica1:6379"]),
create_slot(test_slot, test_slot, "node3:6379", vec!["replica3:6379"]),
create_slot(test_slot + 1, 16383, "node2:6379", vec!["replica2:6379"]),
];
assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots);
}
#[test]
fn test_update_slot_range_slot_is_within_a_range() {
let test_slot = 4000;
let before_slots = vec![
create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]),
create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]),
];
let mut slot_map = SlotMap::new(
before_slots,
HashMap::new(),
ReadFromReplicaStrategy::AlwaysFromPrimary,
);
let new_shard_addrs = slot_map
.shard_addrs_for_slot(8000)
.expect("Couldn't find shard address for slot");
let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone());
assert!(res.is_ok(), "{res:?}");
let after_slots = vec![
create_slot(0, test_slot - 1, "node1:6379", vec!["replica1:6379"]),
create_slot(test_slot, test_slot, "node2:6379", vec!["replica2:6379"]),
create_slot(test_slot + 1, 7999, "node1:6379", vec!["replica1:6379"]),
create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]),
];
assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots);
}
#[test]
fn test_update_slot_range_slot_is_not_covered_cant_merge_ranges() {
let test_slot = 7998;
let before_slots = vec![
create_slot(0, 7000, "node1:6379", vec!["replica1:6379"]),
create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]),
];
let mut slot_map = SlotMap::new(
before_slots,
HashMap::new(),
ReadFromReplicaStrategy::AlwaysFromPrimary,
);
let new_shard_addrs = slot_map
.shard_addrs_for_slot(8000)
.expect("Couldn't find shard address for slot");
let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone());
assert!(res.is_ok(), "{res:?}");
let after_slots = vec![
create_slot(0, 7000, "node1:6379", vec!["replica1:6379"]),
create_slot(test_slot, test_slot, "node2:6379", vec!["replica2:6379"]),
create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]),
];
assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots);
}
#[test]
fn test_update_slot_range_slot_is_not_covered_merge_with_next() {
let test_slot = 7999;
let before_slots = vec![
create_slot(0, 7000, "node1:6379", vec!["replica1:6379"]),
create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]),
];
let mut slot_map = SlotMap::new(
before_slots,
HashMap::new(),
ReadFromReplicaStrategy::AlwaysFromPrimary,
);
let new_shard_addrs = slot_map
.shard_addrs_for_slot(8000)
.expect("Couldn't find shard address for slot");
let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone());
assert!(res.is_ok(), "{res:?}");
let after_slots = vec![
create_slot(0, 7000, "node1:6379", vec!["replica1:6379"]),
create_slot(test_slot, 16383, "node2:6379", vec!["replica2:6379"]),
];
assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots);
}
#[test]
fn test_update_slot_range_slot_is_not_covered_merge_with_prev() {
let test_slot = 7001;
let before_slots = vec![
create_slot(0, 7000, "node1:6379", vec!["replica1:6379"]),
create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]),
];
let mut slot_map = SlotMap::new(
before_slots,
HashMap::new(),
ReadFromReplicaStrategy::AlwaysFromPrimary,
);
let new_shard_addrs = slot_map
.shard_addrs_for_slot(7000)
.expect("Couldn't find shard address for slot");
let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone());
assert!(res.is_ok(), "{res:?}");
let after_slots = vec![
create_slot(0, test_slot, "node1:6379", vec!["replica1:6379"]),
create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]),
];
assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots);
}
#[test]
fn test_update_slot_range_same_shard_owner_no_change_needed() {
let test_slot = 7000;
let before_slots = vec![
create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]),
create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]),
];
let mut slot_map = SlotMap::new(
before_slots.clone(),
HashMap::new(),
ReadFromReplicaStrategy::AlwaysFromPrimary,
);
let new_shard_addrs = slot_map
.shard_addrs_for_slot(7000)
.expect("Couldn't find shard address for slot");
let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone());
assert!(res.is_ok(), "{res:?}");
let after_slots = before_slots;
assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots);
}
#[test]
fn test_update_slot_range_max_slot_matches_end_range() {
let max_slot = 16383;
let before_slots = vec![
create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]),
create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]),
];
let mut slot_map = SlotMap::new(
before_slots.clone(),
HashMap::new(),
ReadFromReplicaStrategy::AlwaysFromPrimary,
);
let new_shard_addrs = slot_map
.shard_addrs_for_slot(7000)
.expect("Couldn't find shard address for slot");
let res = slot_map.update_slot_range(max_slot, new_shard_addrs.clone());
assert!(res.is_ok(), "{res:?}");
let after_slots = vec![
create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]),
create_slot(8000, max_slot - 1, "node2:6379", vec!["replica2:6379"]),
create_slot(max_slot, max_slot, "node1:6379", vec!["replica1:6379"]),
];
assert_slot_map_and_shard_addrs(slot_map, max_slot, new_shard_addrs, after_slots);
}
#[test]
fn test_update_slot_range_max_slot_single_slot_range() {
let max_slot = 16383;
let before_slots = vec![
create_slot(0, 16382, "node1:6379", vec!["replica1:6379"]),
create_slot(16383, 16383, "node2:6379", vec!["replica2:6379"]),
];
let mut slot_map = SlotMap::new(
before_slots.clone(),
HashMap::new(),
ReadFromReplicaStrategy::AlwaysFromPrimary,
);
let new_shard_addrs = slot_map
.shard_addrs_for_slot(0)
.expect("Couldn't find shard address for slot");
let res = slot_map.update_slot_range(max_slot, new_shard_addrs.clone());
assert!(res.is_ok(), "{res:?}");
let after_slots = vec![
create_slot(0, max_slot - 1, "node1:6379", vec!["replica1:6379"]),
create_slot(max_slot, max_slot, "node1:6379", vec!["replica1:6379"]),
];
assert_slot_map_and_shard_addrs(slot_map, max_slot, new_shard_addrs, after_slots);
}
#[test]
fn test_update_slot_range_min_slot_matches_start_range() {
let min_slot = 0;
let before_slots = vec![
create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]),
create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]),
];
let mut slot_map = SlotMap::new(
before_slots.clone(),
HashMap::new(),
ReadFromReplicaStrategy::AlwaysFromPrimary,
);
let new_shard_addrs = slot_map
.shard_addrs_for_slot(8000)
.expect("Couldn't find shard address for slot");
let res = slot_map.update_slot_range(min_slot, new_shard_addrs.clone());
assert!(res.is_ok(), "{res:?}");
let after_slots = vec![
create_slot(min_slot, min_slot, "node2:6379", vec!["replica2:6379"]),
create_slot(min_slot + 1, 7999, "node1:6379", vec!["replica1:6379"]),
create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]),
];
assert_slot_map_and_shard_addrs(slot_map, min_slot, new_shard_addrs, after_slots);
}
#[test]
fn test_update_slot_range_min_slot_single_slot_range() {
let min_slot = 0;
let before_slots = vec![
create_slot(0, 0, "node1:6379", vec!["replica1:6379"]),
create_slot(1, 16383, "node2:6379", vec!["replica2:6379"]),
];
let mut slot_map = SlotMap::new(
before_slots.clone(),
HashMap::new(),
ReadFromReplicaStrategy::AlwaysFromPrimary,
);
let new_shard_addrs = slot_map
.shard_addrs_for_slot(1)
.expect("Couldn't find shard address for slot");
let res = slot_map.update_slot_range(min_slot, new_shard_addrs.clone());
assert!(res.is_ok(), "{res:?}");
let after_slots = vec![
create_slot(min_slot, min_slot, "node2:6379", vec!["replica2:6379"]),
create_slot(min_slot + 1, 16383, "node2:6379", vec!["replica2:6379"]),
];
assert_slot_map_and_shard_addrs(slot_map, min_slot, new_shard_addrs, after_slots);
}
#[test]
fn test_slot_map_stores_ip_mappings_in_nodes_map() {
let slot_map = get_slot_map_with_ip_mappings();
let node1_key = Arc::new("node1:6379".to_string());
let node1_entry = slot_map.nodes_map.get(&node1_key).unwrap();
assert_eq!(node1_entry.value().0, Some("10.0.0.1".parse().unwrap()));
let replica1_key = Arc::new("replica1:6379".to_string());
let replica1_entry = slot_map.nodes_map.get(&replica1_key).unwrap();
assert_eq!(replica1_entry.value().0, Some("10.0.0.2".parse().unwrap()));
let node2_key = Arc::new("node2:6379".to_string());
let node2_entry = slot_map.nodes_map.get(&node2_key).unwrap();
assert_eq!(node2_entry.value().0, Some("10.0.0.3".parse().unwrap()));
let replica2_key = Arc::new("replica2:6379".to_string());
let replica2_entry = slot_map.nodes_map.get(&replica2_key).unwrap();
assert_eq!(replica2_entry.value().0, Some("10.0.0.4".parse().unwrap()));
let node3_key = Arc::new("node3:6379".to_string());
let node3_entry = slot_map.nodes_map.get(&node3_key).unwrap();
assert_eq!(node3_entry.value().0, None);
let replica3_key = Arc::new("replica3:6379".to_string());
let replica3_entry = slot_map.nodes_map.get(&replica3_key).unwrap();
assert_eq!(replica3_entry.value().0, None);
}
#[test]
fn test_node_address_for_ip() {
let slot_map = get_slot_map_with_ip_mappings();
let result = slot_map.node_address_for_ip("10.0.0.1".parse().unwrap());
assert_eq!(result, Some(Arc::new("node1:6379".to_string())));
let result = slot_map.node_address_for_ip("10.0.0.3".parse().unwrap());
assert_eq!(result, Some(Arc::new("node2:6379".to_string())));
let result = slot_map.node_address_for_ip("10.0.0.2".parse().unwrap());
assert_eq!(result, Some(Arc::new("replica1:6379".to_string())));
let result = slot_map.node_address_for_ip("10.0.0.4".parse().unwrap());
assert_eq!(result, Some(Arc::new("replica2:6379".to_string())));
}
#[test]
fn test_node_address_for_ip_returns_none_for_unknown_ip() {
let slot_map = get_slot_map_with_ip_mappings();
let result = slot_map.node_address_for_ip("192.168.1.1".parse().unwrap());
assert!(result.is_none());
}
#[test]
fn test_node_address_for_ip_with_ipv6() {
let mut ip_mappings = HashMap::new();
ip_mappings.insert("node1:6379".to_string(), "2001:db8::1".parse().unwrap());
ip_mappings.insert("replica1:6379".to_string(), "2001:db8::2".parse().unwrap());
let slot_map = SlotMap::new(
vec![Slot::new(
0,
16383,
"node1:6379".to_owned(),
vec!["replica1:6379".to_owned()],
)],
ip_mappings,
ReadFromReplicaStrategy::AlwaysFromPrimary,
);
let result = slot_map.node_address_for_ip("2001:db8::1".parse().unwrap());
assert_eq!(result, Some(Arc::new("node1:6379".to_string())));
let result = slot_map.node_address_for_ip("2001:db8::2".parse().unwrap());
assert_eq!(result, Some(Arc::new("replica1:6379".to_string())));
let result = slot_map.node_address_for_ip("2001:db8::99".parse().unwrap());
assert!(result.is_none());
}
#[test]
fn test_slot_map_new_with_empty_ip_mappings() {
let slot_map = SlotMap::new(
vec![
Slot::new(
0,
8191,
"node1:6379".to_owned(),
vec!["replica1:6379".to_owned()],
),
Slot::new(
8192,
16383,
"node2:6379".to_owned(),
vec!["replica2:6379".to_owned()],
),
],
HashMap::new(),
ReadFromReplicaStrategy::AlwaysFromPrimary,
);
for entry in slot_map.nodes_map.iter() {
assert_eq!(entry.value().0, None);
}
assert!(
slot_map
.node_address_for_ip("10.0.0.1".parse().unwrap())
.is_none()
);
}
#[test]
fn test_slot_map_partial_ip_mappings() {
let mut ip_mappings = HashMap::new();
ip_mappings.insert("node1:6379".to_string(), "10.0.0.1".parse().unwrap());
let slot_map = SlotMap::new(
vec![Slot::new(
0,
16383,
"node1:6379".to_owned(),
vec!["replica1:6379".to_owned()],
)],
ip_mappings,
ReadFromReplicaStrategy::AlwaysFromPrimary,
);
let node1_entry = slot_map
.nodes_map
.get(&Arc::new("node1:6379".to_string()))
.unwrap();
assert_eq!(node1_entry.value().0, Some("10.0.0.1".parse().unwrap()));
let replica1_entry = slot_map
.nodes_map
.get(&Arc::new("replica1:6379".to_string()))
.unwrap();
assert_eq!(replica1_entry.value().0, None);
let result = slot_map.node_address_for_ip("10.0.0.1".parse().unwrap());
assert_eq!(result, Some(Arc::new("node1:6379".to_string())));
}
#[test]
fn test_nodes_map_preserves_shard_addrs_with_ip() {
let slot_map = get_slot_map_with_ip_mappings();
let node1_entry = slot_map
.nodes_map
.get(&Arc::new("node1:6379".to_string()))
.unwrap();
let (ip, shard_addrs) = node1_entry.value();
assert_eq!(*ip, Some("10.0.0.1".parse().unwrap()));
assert_eq!(*shard_addrs.primary(), "node1:6379");
assert_eq!(
*shard_addrs.replicas(),
vec![Arc::new("replica1:6379".to_string())]
);
let replica1_entry = slot_map
.nodes_map
.get(&Arc::new("replica1:6379".to_string()))
.unwrap();
let (replica_ip, replica_shard_addrs) = replica1_entry.value();
assert_eq!(*replica_ip, Some("10.0.0.2".parse().unwrap()));
assert_eq!(*replica_shard_addrs.primary(), "node1:6379");
}
#[test]
fn test_add_new_primary_without_ip() {
let mut slot_map = SlotMap::new(
vec![Slot::new(0, 16383, "node1:6379".to_owned(), vec![])],
HashMap::new(),
ReadFromReplicaStrategy::AlwaysFromPrimary,
);
let result = slot_map.add_new_primary(100, Arc::new("new-node:6379".to_owned()), None);
assert!(result.is_ok());
let new_node_entry = slot_map
.nodes_map
.get(&Arc::new("new-node:6379".to_string()))
.unwrap();
assert_eq!(new_node_entry.value().0, None);
}
#[test]
fn test_add_new_primary_with_ip() {
let mut slot_map = SlotMap::new(
vec![Slot::new(0, 16383, "node1:6379".to_owned(), vec![])],
HashMap::new(),
ReadFromReplicaStrategy::AlwaysFromPrimary,
);
let ip: IpAddr = "10.0.0.100".parse().unwrap();
let result = slot_map.add_new_primary(100, Arc::new("new-node:6379".to_owned()), Some(ip));
assert!(result.is_ok());
let new_node_entry = slot_map
.nodes_map
.get(&Arc::new("new-node:6379".to_string()))
.unwrap();
assert_eq!(new_node_entry.value().0, Some(ip));
let found_addr = slot_map.node_address_for_ip(ip);
assert_eq!(found_addr, Some(Arc::new("new-node:6379".to_string())));
}
}