use std::collections::{HashMap, HashSet};
#[cfg(feature = "consistent-hash")]
use std::hash::{Hash, Hasher};
use crate::hash::hash64;
use crate::runner::Runner;
use crate::types::{RunnerAddress, ShardId};
#[cfg(feature = "consistent-hash")]
use hashring::HashRing;
#[cfg(feature = "parallel")]
use rayon::prelude::*;
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub enum ShardAssignmentStrategy {
#[default]
Rendezvous,
#[cfg(feature = "parallel")]
RendezvousParallel,
#[cfg(feature = "consistent-hash")]
ConsistentHash {
vnodes_per_weight: u32,
},
}
pub struct ShardAssigner;
impl ShardAssigner {
#[tracing::instrument(level = "debug", skip(runners, shard_groups), fields(num_runners = runners.len(), num_groups = shard_groups.len(), shards_per_group))]
pub fn compute_assignments(
runners: &[Runner],
shard_groups: &[String],
shards_per_group: i32,
strategy: &ShardAssignmentStrategy,
) -> HashMap<ShardId, RunnerAddress> {
match strategy {
ShardAssignmentStrategy::Rendezvous => {
Self::compute_rendezvous(runners, shard_groups, shards_per_group)
}
#[cfg(feature = "parallel")]
ShardAssignmentStrategy::RendezvousParallel => {
Self::compute_rendezvous_parallel(runners, shard_groups, shards_per_group)
}
#[cfg(feature = "consistent-hash")]
ShardAssignmentStrategy::ConsistentHash { vnodes_per_weight } => {
Self::compute_consistent_hash(
runners,
shard_groups,
shards_per_group,
*vnodes_per_weight,
)
}
}
}
fn compute_rendezvous(
runners: &[Runner],
shard_groups: &[String],
shards_per_group: i32,
) -> HashMap<ShardId, RunnerAddress> {
let mut assignments = HashMap::new();
let healthy_runners: Vec<&Runner> = runners
.iter()
.filter(|r| {
if !r.healthy || r.weight <= 0 {
tracing::debug!(
runner = %r.address,
healthy = r.healthy,
weight = r.weight,
"excluding runner from shard assignment"
);
false
} else {
true
}
})
.collect();
if healthy_runners.is_empty() {
return assignments;
}
for group in shard_groups {
for id in 0..shards_per_group {
let shard_key = format!("{group}:{id}");
if let Some(runner) = select_runner_rendezvous(&shard_key, &healthy_runners) {
assignments.insert(ShardId::new(group, id), runner.address.clone());
}
}
}
assignments
}
#[cfg(feature = "parallel")]
fn compute_rendezvous_parallel(
runners: &[Runner],
shard_groups: &[String],
shards_per_group: i32,
) -> HashMap<ShardId, RunnerAddress> {
let healthy_runners: Vec<&Runner> = runners
.iter()
.filter(|r| {
if !r.healthy || r.weight <= 0 {
tracing::debug!(
runner = %r.address,
healthy = r.healthy,
weight = r.weight,
"excluding runner from shard assignment"
);
false
} else {
true
}
})
.collect();
if healthy_runners.is_empty() {
return HashMap::new();
}
let shards: Vec<(&String, i32)> = shard_groups
.iter()
.flat_map(|group| (0..shards_per_group).map(move |id| (group, id)))
.collect();
shards
.par_iter()
.filter_map(|(group, id)| {
let shard_key = format!("{group}:{id}");
select_runner_rendezvous(&shard_key, &healthy_runners)
.map(|runner| (ShardId::new(*group, *id), runner.address.clone()))
})
.collect()
}
#[cfg(feature = "consistent-hash")]
fn compute_consistent_hash(
runners: &[Runner],
shard_groups: &[String],
shards_per_group: i32,
vnodes_per_weight: u32,
) -> HashMap<ShardId, RunnerAddress> {
let mut assignments = HashMap::new();
let healthy_runners: Vec<&Runner> = runners
.iter()
.filter(|r| {
if !r.healthy || r.weight <= 0 {
tracing::debug!(
runner = %r.address,
healthy = r.healthy,
weight = r.weight,
"excluding runner from shard assignment"
);
false
} else {
true
}
})
.collect();
if healthy_runners.is_empty() {
return assignments;
}
let mut ring: HashRing<RunnerNode> = HashRing::new();
for runner in &healthy_runners {
let total_vnodes = (runner.weight as u32) * vnodes_per_weight;
for vnode_id in 0..total_vnodes {
ring.add(RunnerNode {
address: runner.address.clone(),
vnode_id,
});
}
}
for group in shard_groups {
for id in 0..shards_per_group {
let shard_key = format!("{group}:{id}");
if let Some(node) = ring.get(&shard_key) {
assignments.insert(ShardId::new(group, id), node.address.clone());
}
}
}
assignments
}
pub fn compute_diff(
desired: &HashMap<ShardId, RunnerAddress>,
current_owned: &HashSet<ShardId>,
my_address: &RunnerAddress,
) -> (HashSet<ShardId>, HashSet<ShardId>) {
let desired_mine: HashSet<ShardId> = desired
.iter()
.filter(|(_, addr)| *addr == my_address)
.map(|(shard, _)| shard.clone())
.collect();
let to_acquire: HashSet<ShardId> =
desired_mine.difference(current_owned).cloned().collect();
let to_release: HashSet<ShardId> =
current_owned.difference(&desired_mine).cloned().collect();
(to_acquire, to_release)
}
}
fn select_runner_rendezvous<'a>(shard_key: &str, runners: &[&'a Runner]) -> Option<&'a Runner> {
runners
.iter()
.max_by_key(|runner| compute_runner_score(shard_key, runner))
.copied()
}
fn compute_runner_score(shard_key: &str, runner: &Runner) -> u64 {
(0..runner.weight)
.map(|w| {
let combined_key = format!(
"{}\0{}:{}\0{}",
shard_key, runner.address.host, runner.address.port, w
);
hash64(combined_key.as_bytes())
})
.max()
.unwrap_or(0)
}
#[cfg(feature = "consistent-hash")]
#[derive(Clone)]
struct RunnerNode {
address: RunnerAddress,
vnode_id: u32,
}
#[cfg(feature = "consistent-hash")]
impl Hash for RunnerNode {
fn hash<H: Hasher>(&self, state: &mut H) {
self.address.host.hash(state);
self.address.port.hash(state);
self.vnode_id.hash(state);
}
}
#[cfg(test)]
mod tests {
use super::*;
fn default_strategy() -> ShardAssignmentStrategy {
ShardAssignmentStrategy::default()
}
#[test]
fn single_runner_gets_all_shards() {
let runners = vec![Runner::new(RunnerAddress::new("host1", 9000), 1)];
let groups = vec!["default".to_string()];
let assignments =
ShardAssigner::compute_assignments(&runners, &groups, 10, &default_strategy());
assert_eq!(assignments.len(), 10);
for addr in assignments.values() {
assert_eq!(addr, &RunnerAddress::new("host1", 9000));
}
}
#[test]
fn two_runners_distribute_shards() {
let runners = vec![
Runner::new(RunnerAddress::new("host1", 9000), 1),
Runner::new(RunnerAddress::new("host2", 9000), 1),
];
let groups = vec!["default".to_string()];
let assignments =
ShardAssigner::compute_assignments(&runners, &groups, 300, &default_strategy());
assert_eq!(assignments.len(), 300);
let host1_count = assignments.values().filter(|a| a.host == "host1").count();
let host2_count = assignments.values().filter(|a| a.host == "host2").count();
assert!(host1_count > 0, "host1 should have some shards");
assert!(host2_count > 0, "host2 should have some shards");
assert_eq!(host1_count + host2_count, 300);
}
#[test]
fn unhealthy_runners_excluded() {
let mut r2 = Runner::new(RunnerAddress::new("host2", 9000), 1);
r2.healthy = false;
let runners = vec![Runner::new(RunnerAddress::new("host1", 9000), 1), r2];
let groups = vec!["default".to_string()];
let assignments =
ShardAssigner::compute_assignments(&runners, &groups, 10, &default_strategy());
assert_eq!(assignments.len(), 10);
for addr in assignments.values() {
assert_eq!(addr, &RunnerAddress::new("host1", 9000));
}
}
#[test]
fn no_healthy_runners_empty() {
let mut r = Runner::new(RunnerAddress::new("host1", 9000), 1);
r.healthy = false;
let runners = vec![r];
let groups = vec!["default".to_string()];
let assignments =
ShardAssigner::compute_assignments(&runners, &groups, 10, &default_strategy());
assert!(assignments.is_empty());
}
#[test]
fn weighted_runner_gets_more_shards() {
let runners = vec![
Runner::new(RunnerAddress::new("host1", 9000), 3), Runner::new(RunnerAddress::new("host2", 9000), 1),
];
let groups = vec!["default".to_string()];
let assignments =
ShardAssigner::compute_assignments(&runners, &groups, 300, &default_strategy());
let host1_count = assignments.values().filter(|a| a.host == "host1").count();
assert!(
host1_count > 150,
"host1 (weight=3) should have more than half the shards, got {host1_count}"
);
}
#[test]
fn multiple_groups() {
let runners = vec![Runner::new(RunnerAddress::new("host1", 9000), 1)];
let groups = vec!["default".to_string(), "premium".to_string()];
let assignments =
ShardAssigner::compute_assignments(&runners, &groups, 10, &default_strategy());
assert_eq!(assignments.len(), 20); }
#[test]
fn compute_diff_works() {
let my_addr = RunnerAddress::new("host1", 9000);
let other_addr = RunnerAddress::new("host2", 9000);
let mut desired = HashMap::new();
desired.insert(ShardId::new("default", 0), my_addr.clone());
desired.insert(ShardId::new("default", 1), my_addr.clone());
desired.insert(ShardId::new("default", 2), other_addr.clone());
let mut current = HashSet::new();
current.insert(ShardId::new("default", 0)); current.insert(ShardId::new("default", 3));
let (to_acquire, to_release) = ShardAssigner::compute_diff(&desired, ¤t, &my_addr);
assert!(to_acquire.contains(&ShardId::new("default", 1)));
assert!(!to_acquire.contains(&ShardId::new("default", 0)));
assert!(to_release.contains(&ShardId::new("default", 3)));
assert!(!to_release.contains(&ShardId::new("default", 0)));
}
#[test]
fn distribution_uniformity_with_equal_weight_runners() {
let runners = vec![
Runner::new(RunnerAddress::new("host1", 9000), 1),
Runner::new(RunnerAddress::new("host2", 9000), 1),
Runner::new(RunnerAddress::new("host3", 9000), 1),
];
let groups = vec!["default".to_string()];
let assignments =
ShardAssigner::compute_assignments(&runners, &groups, 300, &default_strategy());
let count = |host: &str| assignments.values().filter(|a| a.host == host).count();
let h1 = count("host1");
let h2 = count("host2");
let h3 = count("host3");
assert_eq!(h1 + h2 + h3, 300);
let expected = 100;
let tolerance = 25; assert!(
h1.abs_diff(expected) <= tolerance,
"host1 got {h1} shards, expected ~{expected} (±{tolerance})"
);
assert!(
h2.abs_diff(expected) <= tolerance,
"host2 got {h2} shards, expected ~{expected} (±{tolerance})"
);
assert!(
h3.abs_diff(expected) <= tolerance,
"host3 got {h3} shards, expected ~{expected} (±{tolerance})"
);
}
#[test]
fn weight_zero_runners_excluded() {
let runners = vec![
Runner::new(RunnerAddress::new("host1", 9000), 1),
Runner::new(RunnerAddress::new("host2", 9000), 0), ];
let groups = vec!["default".to_string()];
let assignments =
ShardAssigner::compute_assignments(&runners, &groups, 10, &default_strategy());
assert_eq!(assignments.len(), 10);
for addr in assignments.values() {
assert_eq!(addr, &RunnerAddress::new("host1", 9000));
}
}
#[test]
fn deterministic_assignments() {
let runners = vec![
Runner::new(RunnerAddress::new("host1", 9000), 1),
Runner::new(RunnerAddress::new("host2", 9000), 1),
];
let groups = vec!["default".to_string()];
let a1 = ShardAssigner::compute_assignments(&runners, &groups, 300, &default_strategy());
let a2 = ShardAssigner::compute_assignments(&runners, &groups, 300, &default_strategy());
assert_eq!(a1, a2);
}
#[test]
fn rendezvous_distribution_uniformity() {
let runners = vec![
Runner::new(RunnerAddress::new("host1", 9000), 1),
Runner::new(RunnerAddress::new("host2", 9000), 1),
Runner::new(RunnerAddress::new("host3", 9000), 1),
];
let groups = vec!["default".to_string()];
let assignments =
ShardAssigner::compute_assignments(&runners, &groups, 2048, &default_strategy());
let count = |host: &str| assignments.values().filter(|a| a.host == host).count();
let h1 = count("host1");
let h2 = count("host2");
let h3 = count("host3");
let expected = 2048 / 3; let tolerance = 35;
assert!(
h1.abs_diff(expected) <= tolerance,
"host1: {h1}, expected ~{expected}"
);
assert!(
h2.abs_diff(expected) <= tolerance,
"host2: {h2}, expected ~{expected}"
);
assert!(
h3.abs_diff(expected) <= tolerance,
"host3: {h3}, expected ~{expected}"
);
}
#[test]
fn minimal_movement_on_node_removal() {
let runners_3 = vec![
Runner::new(RunnerAddress::new("host1", 9000), 1),
Runner::new(RunnerAddress::new("host2", 9000), 1),
Runner::new(RunnerAddress::new("host3", 9000), 1),
];
let runners_2 = vec![
Runner::new(RunnerAddress::new("host1", 9000), 1),
Runner::new(RunnerAddress::new("host2", 9000), 1),
];
let groups = vec!["default".to_string()];
let before =
ShardAssigner::compute_assignments(&runners_3, &groups, 2048, &default_strategy());
let after =
ShardAssigner::compute_assignments(&runners_2, &groups, 2048, &default_strategy());
let moved: usize = before
.iter()
.filter(|(shard, addr)| after.get(*shard) != Some(*addr))
.count();
let host3_shards = before.values().filter(|a| a.host == "host3").count();
assert_eq!(moved, host3_shards, "only host3 shards should move");
assert!(
moved > 600 && moved < 750,
"expected ~683 moves, got {moved}"
);
}
#[test]
fn minimal_movement_on_node_addition() {
let runners_3 = vec![
Runner::new(RunnerAddress::new("host1", 9000), 1),
Runner::new(RunnerAddress::new("host2", 9000), 1),
Runner::new(RunnerAddress::new("host3", 9000), 1),
];
let runners_4 = vec![
Runner::new(RunnerAddress::new("host1", 9000), 1),
Runner::new(RunnerAddress::new("host2", 9000), 1),
Runner::new(RunnerAddress::new("host3", 9000), 1),
Runner::new(RunnerAddress::new("host4", 9000), 1), ];
let groups = vec!["default".to_string()];
let before =
ShardAssigner::compute_assignments(&runners_3, &groups, 2048, &default_strategy());
let after =
ShardAssigner::compute_assignments(&runners_4, &groups, 2048, &default_strategy());
let moved: usize = before
.iter()
.filter(|(shard, addr)| after.get(*shard) != Some(*addr))
.count();
let host4_shards = after.values().filter(|a| a.host == "host4").count();
assert_eq!(moved, host4_shards, "moves should equal host4's new shards");
assert!(
moved > 450 && moved < 560,
"expected ~512 moves, got {moved}"
);
}
#[test]
fn weighted_distribution_at_scale() {
let runners = vec![
Runner::new(RunnerAddress::new("host1", 9000), 3), Runner::new(RunnerAddress::new("host2", 9000), 1),
];
let groups = vec!["default".to_string()];
let assignments =
ShardAssigner::compute_assignments(&runners, &groups, 2048, &default_strategy());
let h1 = assignments.values().filter(|a| a.host == "host1").count();
let h2 = assignments.values().filter(|a| a.host == "host2").count();
assert!(
h1 > 1450 && h1 < 1620,
"host1 (w=3): expected ~1536, got {h1}"
);
assert!(h2 > 430 && h2 < 600, "host2 (w=1): expected ~512, got {h2}");
}
#[test]
fn strategy_default_is_rendezvous() {
assert_eq!(
ShardAssignmentStrategy::default(),
ShardAssignmentStrategy::Rendezvous
);
}
#[cfg(feature = "parallel")]
mod parallel_tests {
use super::*;
fn parallel_strategy() -> ShardAssignmentStrategy {
ShardAssignmentStrategy::RendezvousParallel
}
#[test]
fn parallel_single_runner_gets_all_shards() {
let runners = vec![Runner::new(RunnerAddress::new("host1", 9000), 1)];
let groups = vec!["default".to_string()];
let assignments =
ShardAssigner::compute_assignments(&runners, &groups, 10, ¶llel_strategy());
assert_eq!(assignments.len(), 10);
for addr in assignments.values() {
assert_eq!(addr, &RunnerAddress::new("host1", 9000));
}
}
#[test]
fn parallel_produces_same_results_as_sequential() {
let runners: Vec<Runner> = (0..10)
.map(|i| Runner::new(RunnerAddress::new(format!("host{i}"), 9000), 1))
.collect();
let groups = vec!["default".to_string(), "premium".to_string()];
let sequential = ShardAssigner::compute_assignments(
&runners,
&groups,
2048,
&ShardAssignmentStrategy::Rendezvous,
);
let parallel =
ShardAssigner::compute_assignments(&runners, &groups, 2048, ¶llel_strategy());
assert_eq!(
sequential, parallel,
"parallel and sequential must produce identical results"
);
}
#[test]
fn parallel_produces_same_results_with_weights() {
let runners = vec![
Runner::new(RunnerAddress::new("host1", 9000), 3),
Runner::new(RunnerAddress::new("host2", 9000), 1),
Runner::new(RunnerAddress::new("host3", 9000), 2),
];
let groups = vec!["default".to_string()];
let sequential = ShardAssigner::compute_assignments(
&runners,
&groups,
2048,
&ShardAssignmentStrategy::Rendezvous,
);
let parallel =
ShardAssigner::compute_assignments(&runners, &groups, 2048, ¶llel_strategy());
assert_eq!(
sequential, parallel,
"parallel and sequential must produce identical results with weights"
);
}
#[test]
fn parallel_distribution_uniformity() {
let runners = vec![
Runner::new(RunnerAddress::new("host1", 9000), 1),
Runner::new(RunnerAddress::new("host2", 9000), 1),
Runner::new(RunnerAddress::new("host3", 9000), 1),
];
let groups = vec!["default".to_string()];
let assignments =
ShardAssigner::compute_assignments(&runners, &groups, 2048, ¶llel_strategy());
let count = |host: &str| assignments.values().filter(|a| a.host == host).count();
let h1 = count("host1");
let h2 = count("host2");
let h3 = count("host3");
let expected = 2048 / 3; let tolerance = 35;
assert!(
h1.abs_diff(expected) <= tolerance,
"host1: {h1}, expected ~{expected}"
);
assert!(
h2.abs_diff(expected) <= tolerance,
"host2: {h2}, expected ~{expected}"
);
assert!(
h3.abs_diff(expected) <= tolerance,
"host3: {h3}, expected ~{expected}"
);
}
#[test]
fn parallel_excludes_unhealthy_runners() {
let mut r2 = Runner::new(RunnerAddress::new("host2", 9000), 1);
r2.healthy = false;
let runners = vec![Runner::new(RunnerAddress::new("host1", 9000), 1), r2];
let groups = vec!["default".to_string()];
let assignments =
ShardAssigner::compute_assignments(&runners, &groups, 10, ¶llel_strategy());
assert_eq!(assignments.len(), 10);
for addr in assignments.values() {
assert_eq!(addr, &RunnerAddress::new("host1", 9000));
}
}
#[test]
fn parallel_excludes_weight_zero_runners() {
let runners = vec![
Runner::new(RunnerAddress::new("host1", 9000), 1),
Runner::new(RunnerAddress::new("host2", 9000), 0), ];
let groups = vec!["default".to_string()];
let assignments =
ShardAssigner::compute_assignments(&runners, &groups, 10, ¶llel_strategy());
assert_eq!(assignments.len(), 10);
for addr in assignments.values() {
assert_eq!(addr, &RunnerAddress::new("host1", 9000));
}
}
#[test]
fn parallel_deterministic() {
let runners = vec![
Runner::new(RunnerAddress::new("host1", 9000), 1),
Runner::new(RunnerAddress::new("host2", 9000), 1),
];
let groups = vec!["default".to_string()];
let a1 =
ShardAssigner::compute_assignments(&runners, &groups, 300, ¶llel_strategy());
let a2 =
ShardAssigner::compute_assignments(&runners, &groups, 300, ¶llel_strategy());
assert_eq!(a1, a2);
}
}
#[cfg(feature = "consistent-hash")]
mod consistent_hash_tests {
use super::*;
fn consistent_hash_strategy() -> ShardAssignmentStrategy {
ShardAssignmentStrategy::ConsistentHash {
vnodes_per_weight: 150,
}
}
#[test]
fn consistent_hash_single_runner_gets_all_shards() {
let runners = vec![Runner::new(RunnerAddress::new("host1", 9000), 1)];
let groups = vec!["default".to_string()];
let assignments = ShardAssigner::compute_assignments(
&runners,
&groups,
10,
&consistent_hash_strategy(),
);
assert_eq!(assignments.len(), 10);
for addr in assignments.values() {
assert_eq!(addr, &RunnerAddress::new("host1", 9000));
}
}
#[test]
fn consistent_hash_distributes_shards() {
let runners = vec![
Runner::new(RunnerAddress::new("host1", 9000), 1),
Runner::new(RunnerAddress::new("host2", 9000), 1),
];
let groups = vec!["default".to_string()];
let assignments = ShardAssigner::compute_assignments(
&runners,
&groups,
300,
&consistent_hash_strategy(),
);
assert_eq!(assignments.len(), 300);
let host1_count = assignments.values().filter(|a| a.host == "host1").count();
let host2_count = assignments.values().filter(|a| a.host == "host2").count();
assert!(host1_count > 0, "host1 should have some shards");
assert!(host2_count > 0, "host2 should have some shards");
assert_eq!(host1_count + host2_count, 300);
}
#[test]
fn consistent_hash_distribution_uniformity() {
let runners = vec![
Runner::new(RunnerAddress::new("host1", 9000), 1),
Runner::new(RunnerAddress::new("host2", 9000), 1),
Runner::new(RunnerAddress::new("host3", 9000), 1),
];
let groups = vec!["default".to_string()];
let assignments = ShardAssigner::compute_assignments(
&runners,
&groups,
2048,
&consistent_hash_strategy(),
);
let count = |host: &str| assignments.values().filter(|a| a.host == host).count();
let h1 = count("host1");
let h2 = count("host2");
let h3 = count("host3");
let expected = 2048 / 3; let tolerance = 102;
assert!(
h1.abs_diff(expected) <= tolerance,
"host1: {h1}, expected ~{expected}"
);
assert!(
h2.abs_diff(expected) <= tolerance,
"host2: {h2}, expected ~{expected}"
);
assert!(
h3.abs_diff(expected) <= tolerance,
"host3: {h3}, expected ~{expected}"
);
}
#[test]
fn consistent_hash_weighted_distribution() {
let runners = vec![
Runner::new(RunnerAddress::new("host1", 9000), 3), Runner::new(RunnerAddress::new("host2", 9000), 1),
];
let groups = vec!["default".to_string()];
let assignments = ShardAssigner::compute_assignments(
&runners,
&groups,
2048,
&consistent_hash_strategy(),
);
let h1 = assignments.values().filter(|a| a.host == "host1").count();
let h2 = assignments.values().filter(|a| a.host == "host2").count();
assert!(
h1 > 1350 && h1 < 1700,
"host1 (w=3): expected ~1536, got {h1}"
);
assert!(h2 > 350 && h2 < 700, "host2 (w=1): expected ~512, got {h2}");
}
#[test]
fn consistent_hash_deterministic() {
let runners = vec![
Runner::new(RunnerAddress::new("host1", 9000), 1),
Runner::new(RunnerAddress::new("host2", 9000), 1),
];
let groups = vec!["default".to_string()];
let strategy = consistent_hash_strategy();
let a1 = ShardAssigner::compute_assignments(&runners, &groups, 300, &strategy);
let a2 = ShardAssigner::compute_assignments(&runners, &groups, 300, &strategy);
assert_eq!(a1, a2);
}
#[test]
fn consistent_hash_minimal_movement_on_node_removal() {
let runners_3 = vec![
Runner::new(RunnerAddress::new("host1", 9000), 1),
Runner::new(RunnerAddress::new("host2", 9000), 1),
Runner::new(RunnerAddress::new("host3", 9000), 1),
];
let runners_2 = vec![
Runner::new(RunnerAddress::new("host1", 9000), 1),
Runner::new(RunnerAddress::new("host2", 9000), 1),
];
let groups = vec!["default".to_string()];
let strategy = consistent_hash_strategy();
let before = ShardAssigner::compute_assignments(&runners_3, &groups, 2048, &strategy);
let after = ShardAssigner::compute_assignments(&runners_2, &groups, 2048, &strategy);
let moved: usize = before
.iter()
.filter(|(shard, addr)| after.get(*shard) != Some(*addr))
.count();
let host3_shards = before.values().filter(|a| a.host == "host3").count();
assert_eq!(moved, host3_shards, "only host3 shards should move");
assert!(
moved > 500 && moved < 850,
"expected ~683 moves, got {moved}"
);
}
#[test]
fn consistent_hash_excludes_unhealthy_runners() {
let mut r2 = Runner::new(RunnerAddress::new("host2", 9000), 1);
r2.healthy = false;
let runners = vec![Runner::new(RunnerAddress::new("host1", 9000), 1), r2];
let groups = vec!["default".to_string()];
let assignments = ShardAssigner::compute_assignments(
&runners,
&groups,
10,
&consistent_hash_strategy(),
);
assert_eq!(assignments.len(), 10);
for addr in assignments.values() {
assert_eq!(addr, &RunnerAddress::new("host1", 9000));
}
}
#[test]
fn consistent_hash_excludes_weight_zero_runners() {
let runners = vec![
Runner::new(RunnerAddress::new("host1", 9000), 1),
Runner::new(RunnerAddress::new("host2", 9000), 0), ];
let groups = vec!["default".to_string()];
let assignments = ShardAssigner::compute_assignments(
&runners,
&groups,
10,
&consistent_hash_strategy(),
);
assert_eq!(assignments.len(), 10);
for addr in assignments.values() {
assert_eq!(addr, &RunnerAddress::new("host1", 9000));
}
}
}
}