use {
keyspace::{
DefaultReplicationStrategy,
KeyRange,
KeyspaceBuilder,
KeyspaceError,
KeyspaceNode,
ReplicationStrategy,
},
std::{
collections::{HashMap, HashSet},
hash::{BuildHasher, Hash},
},
};
#[derive(Debug, Hash, PartialEq, Eq, Clone)]
struct Node(String);
impl KeyspaceNode for Node {
type Id = String;
fn id(&self) -> &Self::Id {
&self.0
}
}
impl Node {
pub fn new(id: &str) -> Self {
Node(id.to_string())
}
}
#[test]
fn keyspace_builder() {
let init_nodes = (0..3)
.map(|i| Node::new(&format!("node{}", i)))
.collect::<Vec<_>>();
const RF: usize = 4;
{
let mut init_nodes = init_nodes.clone();
let keyspace = KeyspaceBuilder::new(init_nodes.clone()).build();
assert!(keyspace.is_ok());
init_nodes.pop(); let keyspace = KeyspaceBuilder::new(init_nodes).build();
assert_eq!(keyspace.err(), Some(KeyspaceError::NotEnoughNodes(3)));
}
{
let keyspace = KeyspaceBuilder::new(init_nodes.clone())
.with_replication_factor::<3>()
.build();
assert!(keyspace.is_ok());
let keyspace = KeyspaceBuilder::new(init_nodes.clone())
.with_replication_factor::<RF>()
.build();
assert_eq!(keyspace.err(), Some(KeyspaceError::NotEnoughNodes(RF)));
}
{
let keyspace = KeyspaceBuilder::new(init_nodes.clone())
.with_replication_strategy(DefaultReplicationStrategy::new())
.build();
assert!(keyspace.is_ok());
let mut init_nodes = init_nodes.clone();
init_nodes.pop();
let keyspace = KeyspaceBuilder::new(init_nodes)
.with_replication_strategy(DefaultReplicationStrategy::new())
.build();
assert_eq!(keyspace.err(), Some(KeyspaceError::NotEnoughNodes(3)));
}
{
let keyspace = KeyspaceBuilder::new(init_nodes.clone())
.with_replication_factor::<RF>()
.with_replication_strategy(DefaultReplicationStrategy::new())
.build();
assert_eq!(keyspace.err(), Some(KeyspaceError::NotEnoughNodes(RF)));
let keyspace = KeyspaceBuilder::new(init_nodes.clone())
.with_replication_strategy(DefaultReplicationStrategy::new())
.with_replication_factor::<RF>()
.build();
assert_eq!(keyspace.err(), Some(KeyspaceError::NotEnoughNodes(RF)));
}
}
#[test]
fn replica_set_fair_distribution() {
let init_nodes = (0..10)
.map(|i| Node::new(&format!("node{}", i)))
.collect::<Vec<_>>();
let keyspace = KeyspaceBuilder::new(init_nodes)
.with_replication_factor::<1>()
.build()
.expect("Failed to create keyspace");
assert_eq!(keyspace.version(), 0);
let key_replica_pairs = vec![
("key1", "node7"),
("key2", "node0"),
("key3", "node6"),
("key4", "node0"),
("key5", "node6"),
("key6", "node5"),
("key7", "node8"),
("key8", "node1"),
("key9", "node4"),
("key10", "node9"),
];
for (key, expected_replica) in key_replica_pairs {
let replicas = keyspace.replicas(&key).collect::<Vec<_>>();
assert_eq!(replicas.len(), 1);
assert_eq!(
replicas[0].id(),
expected_replica,
"Replica for key '{key}' should be '{expected_replica}'"
);
}
let hasher = std::hash::RandomState::new();
let mut replica_count = HashMap::<_, usize>::new();
for i in 0..=u16::MAX {
let key = hasher.hash_one(i);
let replicas = keyspace.replicas(&key).collect::<Vec<_>>();
let entry = replica_count.entry(replicas[0].clone()).or_insert(0);
*entry += 1;
}
let min = *replica_count.values().min().unwrap();
let max = *replica_count.values().max().unwrap();
let diff = max - min;
let threshold = max - (max as f64 * 0.93) as usize;
assert!(
diff <= threshold,
"Replica count difference is too high: {diff} > {threshold}"
);
}
#[test]
fn add_node_migration_plan() {
const MAX_NODES: usize = 64;
let init_nodes = (0..MAX_NODES)
.map(|i| Node::new(&format!("node{}", i)))
.collect::<Vec<_>>();
let mut keyspace = KeyspaceBuilder::new(init_nodes)
.with_replication_factor::<3>()
.build()
.expect("Failed to create keyspace");
assert_eq!(keyspace.version(), 0);
let key = 1755092165295214000u64;
let old_replicas = keyspace.replicas(&key).collect::<Vec<_>>();
assert_eq!(
old_replicas,
["node46", "node63", "node54"]
.into_iter()
.map(Node::new)
.collect::<Vec<_>>()
);
let new_node = Node::new(&format!("node{}", MAX_NODES));
let migrations = keyspace
.add_node(new_node.clone())
.expect("Failed to add node");
assert_eq!(keyspace.version(), 1);
assert_eq!(keyspace.version(), migrations.version());
assert_eq!(migrations.keys().len(), 1);
let pull_intervals = migrations
.pull_intervals(new_node.id())
.map(|interval| interval)
.collect::<Vec<_>>();
let new_replicas = keyspace.replicas(&key).collect::<Vec<_>>();
assert_eq!(
new_replicas,
["node46", new_node.id(), "node63"]
.into_iter()
.map(Node::new)
.collect::<Vec<_>>()
);
assert_eq!(pull_intervals.len(), 2978);
let interval = pull_intervals.first().unwrap();
assert_eq!(
interval.key_range(),
&KeyRange::new(2814749767106560, Some(3096224743817216))
);
assert_eq!(
interval.nodes(),
&vec!["node60", "node57", "node30"]
.into_iter()
.map(Node::new)
.collect::<Vec<_>>()
);
}
#[test]
fn remove_node_migration_plan() {
const MAX_NODES: usize = 64;
let init_nodes = (0..MAX_NODES)
.map(|i| Node::new(&format!("node{}", i)))
.collect::<Vec<_>>();
let mut keyspace = KeyspaceBuilder::new(init_nodes)
.with_replication_factor::<3>()
.build()
.expect("Failed to create keyspace");
assert_eq!(keyspace.version(), 0);
let key = 3705152965598471701u64;
let old_replicas = keyspace.replicas(&key).collect::<Vec<_>>();
assert_eq!(
old_replicas,
["node35", "node27", "node5"]
.into_iter()
.map(Node::new)
.collect::<Vec<_>>()
);
let removed_node = Node::new("node45");
let migrations = keyspace
.remove_node(removed_node.id())
.expect("Failed to remove node");
assert_eq!(keyspace.version(), 1);
assert_eq!(keyspace.version(), migrations.version());
assert_eq!(migrations.keys().len(), 63);
let pull_intervals = migrations
.pull_intervals(removed_node.id())
.collect::<Vec<_>>();
assert_eq!(pull_intervals.len(), 0);
let new_replicas = keyspace.replicas(&key).collect::<Vec<_>>();
assert_eq!(
new_replicas,
["node35", "node27", "node5"]
.into_iter()
.map(Node::new)
.collect::<Vec<_>>()
);
let pull_intervals = migrations
.pull_intervals(&"node35".to_string())
.map(|interval| interval)
.collect::<Vec<_>>();
assert_eq!(pull_intervals.len(), 52);
let interval = pull_intervals.first().unwrap();
assert_eq!(
interval.nodes(),
&vec!["node45", "node9", "node55"]
.into_iter()
.map(Node::new)
.collect::<Vec<_>>()
);
assert_eq!(
interval.key_range(),
&KeyRange::new(569986827839078400, Some(570268302815789056))
);
}
#[test]
fn custom_replication_strategy() {
#[derive(Debug, Hash, PartialEq, Eq, Clone)]
enum AvailabilityZone {
Zone1,
Zone2,
Zone3,
}
#[derive(Debug, Hash, PartialEq, Eq, Clone)]
struct MyNode {
id: String,
zone: AvailabilityZone,
}
impl MyNode {
fn new(id: String, zone: AvailabilityZone) -> Self {
MyNode { id, zone }
}
fn zone(&self) -> AvailabilityZone {
self.zone.clone()
}
}
impl KeyspaceNode for MyNode {
type Id = String;
fn id(&self) -> &Self::Id {
&self.id
}
}
struct DistinctZoneReplicationStrategy {
used_zones: HashSet<AvailabilityZone>,
}
impl DistinctZoneReplicationStrategy {
fn new() -> Self {
DistinctZoneReplicationStrategy {
used_zones: HashSet::new(),
}
}
}
impl Clone for DistinctZoneReplicationStrategy {
fn clone(&self) -> Self {
DistinctZoneReplicationStrategy::new()
}
}
impl ReplicationStrategy<MyNode> for DistinctZoneReplicationStrategy {
fn is_eligible_replica(&mut self, node: &MyNode) -> bool {
self.used_zones.insert(node.zone())
}
}
let init_nodes: Vec<MyNode> = vec![
("node0", AvailabilityZone::Zone1),
("node1", AvailabilityZone::Zone1),
("node2", AvailabilityZone::Zone1),
("node3", AvailabilityZone::Zone1),
("node4", AvailabilityZone::Zone2),
("node5", AvailabilityZone::Zone2),
("node6", AvailabilityZone::Zone2),
("node7", AvailabilityZone::Zone2),
("node8", AvailabilityZone::Zone2),
("node9", AvailabilityZone::Zone2),
("node10", AvailabilityZone::Zone3), ]
.into_iter()
.map(|(id, zone)| MyNode::new(id.to_string(), zone))
.collect();
let ks = KeyspaceBuilder::new(init_nodes.clone())
.with_replication_factor::<3>()
.with_replication_strategy(DistinctZoneReplicationStrategy::new())
.build()
.expect("Failed to create keyspace");
let key_replicas: Vec<String> = ks
.replicas(&"key0")
.map(|replica| replica.id().clone())
.collect();
assert_eq!(key_replicas, vec![
"node6", "node2", "node10" ]);
let key_replicas: Vec<String> = ks
.replicas(&"key1")
.map(|replica| replica.id().clone())
.collect();
assert_eq!(key_replicas, vec![
"node8", "node10", "node1", ]);
}
#[test]
fn migrations_and_rebalancing() {
let init_nodes = vec!["node1", "node2", "node3"];
let mut ks = KeyspaceBuilder::new(init_nodes.clone())
.build()
.expect("Failed to create keyspace");
let replicas = ks.replicas(&"key").collect::<Vec<_>>();
assert_eq!(replicas.len(), 3, "There should be 3 replicas for the key");
assert!(
replicas
.iter()
.all(|node| init_nodes.iter().any(|n| n.id() == node.id())),
"All replicas should be from initial nodes",
);
let migration_plan = ks.add_node("node4").expect("Failed to add node");
let replicas = ks.replicas(&"key").collect::<Vec<_>>();
assert_eq!(replicas.len(), 3,);
assert!(
replicas
.iter()
.all(|node| init_nodes.iter().any(|n| n.id() == node.id())),
"All replicas should be from initial nodes",
);
let replicas = ks.replicas(&"another_key").collect::<Vec<_>>();
assert!(
replicas.iter().any(|node| node == &"node4"),
"New node should be in the replica set"
);
let interval = migration_plan
.pull_intervals(&"node4")
.next()
.expect("No intervals found");
let key_range = interval.key_range();
assert_eq!(
key_range,
&KeyRange::new(844424930131968, Some(1125899906842624)),
);
let source_nodes = interval.nodes();
assert!(
source_nodes.len() == 3,
"There should be 3 source nodes for the new node"
);
assert!(
source_nodes
.iter()
.all(|n| init_nodes.iter().any(|i| i.id() == n.id())),
"Source nodes should be from initial nodes"
);
}