use std::{cell::RefCell, fmt::Debug, hash::Hasher, net::SocketAddr, rc::Rc};
use rand::{
RngExt,
distr::{Distribution, weighted::WeightedIndex},
prelude::IndexedRandom,
rng,
};
use crate::{backends::Backend, sozu_command::proto::command::LoadMetric};
const DEFAULT_WEIGHT: i32 = 100;
pub const DEFAULT_HASH_SEED: u64 = 0x9E37_79B9_7F4A_7C15;
fn backend_weight(backend: &Backend) -> u32 {
let weight = backend
.load_balancing_parameters
.as_ref()
.map(|p| p.weight)
.unwrap_or(DEFAULT_WEIGHT)
.max(1) as u32;
debug_assert!(weight >= 1, "backend weight must be clamped to at least 1");
weight
}
fn hash_backend(seed: u64, key: u64, addr: &SocketAddr) -> u64 {
let mut h = FnvHasher::with_seed(seed);
h.write_u64(key);
match addr {
SocketAddr::V4(v4) => {
h.write_u8(4);
h.write(&v4.ip().octets());
h.write_u16(v4.port());
}
SocketAddr::V6(v6) => {
h.write_u8(6);
h.write(&v6.ip().octets());
h.write_u16(v6.port());
}
}
splitmix64_finalize(h.finish())
}
fn splitmix64_finalize(mut z: u64) -> u64 {
z = (z ^ (z >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9);
z = (z ^ (z >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB);
z ^ (z >> 31)
}
fn next_prime(n: usize) -> usize {
let mut candidate = n.max(2);
while !is_prime(candidate) {
candidate += 1;
}
debug_assert!(is_prime(candidate), "next_prime must return a prime");
debug_assert!(candidate >= 2, "next_prime must return at least 2");
debug_assert!(
candidate >= n,
"next_prime ({candidate}) must be >= the requested floor ({n})"
);
candidate
}
fn is_prime(x: usize) -> bool {
if x < 2 {
return false;
}
if x % 2 == 0 {
return x == 2;
}
let mut d = 3;
while d * d <= x {
if x % d == 0 {
return false;
}
d += 2;
}
true
}
struct FnvHasher {
state: u64,
}
impl FnvHasher {
const PRIME: u64 = 0x0000_0100_0000_01B3;
const OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
fn with_seed(seed: u64) -> Self {
Self {
state: Self::OFFSET ^ seed,
}
}
}
impl Hasher for FnvHasher {
fn finish(&self) -> u64 {
self.state
}
fn write(&mut self, bytes: &[u8]) {
for &b in bytes {
self.state ^= u64::from(b);
self.state = self.state.wrapping_mul(Self::PRIME);
}
}
}
pub trait LoadBalancingAlgorithm: Debug {
fn next_available_backend(
&mut self,
key: Option<u64>,
backends: &mut Vec<Rc<RefCell<Backend>>>,
) -> Option<Rc<RefCell<Backend>>>;
fn rebuild(&mut self, _backends: &[Rc<RefCell<Backend>>]) {}
}
#[derive(Debug)]
pub struct RoundRobin {
pub next_backend: u32,
}
impl LoadBalancingAlgorithm for RoundRobin {
fn next_available_backend(
&mut self,
_key: Option<u64>,
backends: &mut Vec<Rc<RefCell<Backend>>>,
) -> Option<Rc<RefCell<Backend>>> {
if backends.is_empty() {
return None;
}
debug_assert!(
!backends.is_empty(),
"round-robin index math runs only on a non-empty set"
);
let index = self.next_backend as usize % backends.len();
debug_assert!(index < backends.len(), "round-robin index out of bounds");
let res = backends.get(index).map(|backend| (*backend).clone());
debug_assert!(
res.is_some(),
"round-robin must select a backend from a non-empty set"
);
self.next_backend = (self.next_backend + 1) % backends.len() as u32;
debug_assert!(
(self.next_backend as usize) < backends.len(),
"round-robin cursor must stay within bounds"
);
res
}
}
impl Default for RoundRobin {
fn default() -> Self {
Self::new()
}
}
impl RoundRobin {
pub fn new() -> Self {
Self { next_backend: 0 }
}
}
#[derive(Debug)]
pub struct Random;
impl LoadBalancingAlgorithm for Random {
fn next_available_backend(
&mut self,
_key: Option<u64>,
backends: &mut Vec<Rc<RefCell<Backend>>>,
) -> Option<Rc<RefCell<Backend>>> {
let mut rng = rng();
let len = backends.len();
let weights: Vec<i32> = backends
.iter()
.map(|b| {
b.borrow()
.load_balancing_parameters
.as_ref()
.map(|p| p.weight)
.unwrap_or(100)
})
.collect();
debug_assert_eq!(
weights.len(),
len,
"Random must derive exactly one weight per backend"
);
if let Ok(dist) = WeightedIndex::new(weights) {
let index = dist.sample(&mut rng);
debug_assert!(index < len, "Random sampled an out-of-range index");
backends.get(index).cloned()
} else {
let chosen = (*backends)
.choose(&mut rng)
.map(|backend| (*backend).clone());
debug_assert_eq!(
chosen.is_some(),
len > 0,
"Random fallback selects iff the set is non-empty"
);
chosen
}
}
}
#[derive(Debug)]
pub struct LeastLoaded {
pub metric: LoadMetric,
}
impl LoadBalancingAlgorithm for LeastLoaded {
fn next_available_backend(
&mut self,
_key: Option<u64>,
backends: &mut Vec<Rc<RefCell<Backend>>>,
) -> Option<Rc<RefCell<Backend>>> {
let was_empty = backends.is_empty();
let opt_b = match self.metric {
LoadMetric::Connections => backends
.iter_mut()
.min_by_key(|backend| backend.borrow().active_connections),
LoadMetric::Requests => backends
.iter_mut()
.min_by_key(|backend| backend.borrow().active_requests),
LoadMetric::ConnectionTime => {
let mut b = None;
for backend in backends.iter_mut() {
let cost2 = backend.borrow_mut().peak_ewma_connection();
match b.take() {
None => b = Some((cost2, backend)),
Some((cost1, back1)) => {
if cost1 <= cost2 {
b = Some((cost1, back1));
} else {
b = Some((cost2, backend));
}
}
}
}
b.map(|(_cost, backend)| backend)
}
};
debug_assert_eq!(
opt_b.is_some(),
!was_empty,
"LeastLoaded selects iff the candidate set is non-empty"
);
opt_b.map(|backend| (*backend).clone())
}
}
#[derive(Debug)]
pub struct PowerOfTwo {
pub metric: LoadMetric,
}
impl LoadBalancingAlgorithm for PowerOfTwo {
fn next_available_backend(
&mut self,
_key: Option<u64>,
backends: &mut Vec<Rc<RefCell<Backend>>>,
) -> Option<Rc<RefCell<Backend>>> {
let len = backends.len();
let mut first = None;
let mut second = None;
for backend in backends.iter_mut() {
let measure = match self.metric {
LoadMetric::Connections => backend.borrow().active_connections as f64,
LoadMetric::Requests => backend.borrow().active_requests as f64,
LoadMetric::ConnectionTime => backend.borrow_mut().peak_ewma_connection(),
};
if first.is_none() {
first = Some((measure, backend));
} else if second.is_none() {
if first.as_ref().unwrap().0 <= measure {
second = Some((measure, backend));
} else {
second = first.take();
first = Some((measure, backend));
}
} else if first.as_ref().unwrap().0 <= measure && measure < second.as_ref().unwrap().0 {
second = Some((measure, backend));
} else {
second = first.take();
first = Some((measure, backend));
}
}
debug_assert!(
match (&first, &second) {
(Some((f, _)), Some((s, _))) => f <= s,
_ => true,
},
"power-of-two: first candidate must be no heavier than second"
);
debug_assert_eq!(
first.is_some(),
len > 0,
"power-of-two must hold a primary candidate iff the set is non-empty"
);
debug_assert_eq!(
second.is_some(),
len > 1,
"power-of-two holds a second candidate iff the set has >= 2 backends"
);
match (first, second) {
(None, None) => None,
(Some((_, b)), None) => Some(b.clone()),
(None, Some((_, b))) => Some(b.clone()),
(Some((_, b1)), Some((_, b2))) => {
if rng().random_bool(0.5) {
Some(b1.clone())
} else {
Some(b2.clone())
}
}
}
}
}
#[derive(Debug)]
pub struct Rendezvous {
seed: u64,
round_robin: RoundRobin,
}
impl Default for Rendezvous {
fn default() -> Self {
Self::new()
}
}
impl Rendezvous {
pub fn new() -> Self {
Self::with_seed(DEFAULT_HASH_SEED)
}
pub fn with_seed(seed: u64) -> Self {
Self {
seed,
round_robin: RoundRobin::new(),
}
}
fn score(&self, key: u64, backend: &Backend) -> f64 {
let weight = backend_weight(backend) as f64;
let h = hash_backend(self.seed, key, &backend.address);
let unit = (h as f64 + 0.5) / (u64::MAX as f64 + 1.0);
debug_assert!(
unit > 0.0 && unit < 1.0,
"HRW unit must lie strictly inside (0, 1), got {unit}"
);
let score = -weight / unit.ln();
debug_assert!(
score.is_finite() && score > 0.0,
"HRW score must be finite and positive, got {score}"
);
score
}
}
impl LoadBalancingAlgorithm for Rendezvous {
fn next_available_backend(
&mut self,
key: Option<u64>,
backends: &mut Vec<Rc<RefCell<Backend>>>,
) -> Option<Rc<RefCell<Backend>>> {
let Some(key) = key else {
return self.round_robin.next_available_backend(None, backends);
};
if backends.is_empty() {
return None;
}
let mut best: Option<(f64, &Rc<RefCell<Backend>>)> = None;
for backend in backends.iter() {
let score = self.score(key, &backend.borrow());
match best {
Some((best_score, _)) if best_score >= score => {}
_ => best = Some((score, backend)),
}
}
debug_assert!(
best.is_some(),
"HRW must select a winner from a non-empty set"
);
#[cfg(debug_assertions)]
if let Some((best_score, _)) = best {
for backend in backends.iter() {
debug_assert!(
best_score >= self.score(key, &backend.borrow()),
"HRW winner does not maximize the score over the set"
);
}
}
best.map(|(_, backend)| backend.clone())
}
}
#[derive(Debug)]
pub struct Maglev {
seed: u64,
size: usize,
table: Vec<usize>,
backend_addrs: Vec<SocketAddr>,
round_robin: RoundRobin,
}
impl Default for Maglev {
fn default() -> Self {
Self::new()
}
}
impl Maglev {
pub const DEFAULT_TABLE_SIZE: usize = 65537;
pub fn new() -> Self {
Self::with_seed(DEFAULT_HASH_SEED)
}
pub fn with_seed(seed: u64) -> Self {
Self::with_seed_and_size(seed, Self::DEFAULT_TABLE_SIZE)
}
pub fn with_seed_and_size(seed: u64, size: usize) -> Self {
Self {
seed,
size: next_prime(size.max(2)),
table: Vec::new(),
backend_addrs: Vec::new(),
round_robin: RoundRobin::new(),
}
}
pub fn rebuild(&mut self, backends: &[Rc<RefCell<Backend>>]) {
let n = backends.len();
self.backend_addrs.clear();
self.table.clear();
if n == 0 || self.size == 0 {
return;
}
let m = self.size;
let mut offsets = Vec::with_capacity(n);
let mut skips = Vec::with_capacity(n);
let mut weights = Vec::with_capacity(n);
let mut total_weight: u64 = 0;
for backend in backends {
let b = backend.borrow();
let addr = b.address;
self.backend_addrs.push(addr);
let h1 = hash_backend(self.seed, 0x6F66_6673_6574, &addr); let h2 = hash_backend(self.seed, 0x736B_6970_5F5F, &addr); let offset = (h1 % m as u64) as usize;
let skip = (h2 % (m as u64 - 1)) as usize + 1;
debug_assert!(offset < m, "Maglev offset must be a valid slot");
debug_assert!(
(1..m).contains(&skip),
"Maglev skip must lie in [1, m-1] to stay coprime with the prime table"
);
offsets.push(offset);
skips.push(skip);
let w = backend_weight(&b) as u64;
weights.push(w);
total_weight += w;
}
debug_assert!(
total_weight > 0,
"Maglev total weight must be positive for a non-empty backend set"
);
let mut targets = vec![0usize; n];
let mut assigned = 0usize;
for (i, &w) in weights.iter().enumerate() {
let t = ((w as u128 * m as u128) / total_weight as u128) as usize;
targets[i] = t;
assigned += t;
}
let mut i = 0;
while assigned < m {
targets[i % n] += 1;
assigned += 1;
i += 1;
}
debug_assert_eq!(
assigned, m,
"Maglev target budget must sum to the table size"
);
debug_assert_eq!(
targets.iter().sum::<usize>(),
m,
"Maglev per-backend targets must sum to the table size"
);
let mut table = vec![usize::MAX; m];
let mut next = vec![0usize; n];
let mut filled = vec![0usize; n];
let mut count = 0usize;
while count < m {
for b in 0..n {
if filled[b] >= targets[b] {
continue;
}
let mut c = (offsets[b] + next[b] * skips[b]) % m;
while table[c] != usize::MAX {
next[b] += 1;
c = (offsets[b] + next[b] * skips[b]) % m;
}
table[c] = b;
next[b] += 1;
filled[b] += 1;
count += 1;
if count >= m {
break;
}
}
}
debug_assert_eq!(count, m, "Maglev population loop must fill exactly m slots");
debug_assert!(
table.iter().all(|&slot| slot != usize::MAX),
"Maglev population loop left an unfilled slot"
);
debug_assert!(
filled == targets,
"Maglev filled counts must match the per-backend targets"
);
self.table = table;
#[cfg(debug_assertions)]
{
debug_assert!(
is_prime(self.size),
"Maglev table size {} is not prime",
self.size
);
if self.table.is_empty() {
debug_assert!(
self.backend_addrs.is_empty(),
"Maglev: empty table but non-empty backend_addrs"
);
} else {
debug_assert_eq!(
self.table.len(),
self.size,
"Maglev table must have exactly `size` slots"
);
debug_assert!(
!self.backend_addrs.is_empty(),
"Maglev: non-empty table but empty backend_addrs"
);
debug_assert!(
self.table.iter().all(|&idx| idx < self.backend_addrs.len()),
"Maglev table holds an index out of backend_addrs range"
);
}
}
}
}
impl LoadBalancingAlgorithm for Maglev {
fn next_available_backend(
&mut self,
key: Option<u64>,
backends: &mut Vec<Rc<RefCell<Backend>>>,
) -> Option<Rc<RefCell<Backend>>> {
let Some(key) = key else {
return self.round_robin.next_available_backend(None, backends);
};
if backends.is_empty() {
return None;
}
if self.table.is_empty() {
self.rebuild(backends);
}
if self.table.is_empty() {
return None;
}
let start = (key % self.size as u64) as usize;
debug_assert!(start < self.size, "Maglev start slot out of range");
debug_assert_eq!(
self.table.len(),
self.size,
"Maglev lookup on a table whose length != size"
);
for i in 0..self.size {
let slot = (start + i) % self.size;
debug_assert!(slot < self.size, "Maglev probe slot out of range");
let idx = self.table[slot];
debug_assert!(
idx < self.backend_addrs.len(),
"Maglev table entry indexes outside the captured address set"
);
if let Some(addr) = self.backend_addrs.get(idx) {
if let Some(backend) = backends.iter().find(|b| b.borrow().address == *addr) {
debug_assert!(
backends
.iter()
.any(|b| b.borrow().address == backend.borrow().address),
"Maglev must return a backend from the healthy subset"
);
return Some(backend.clone());
}
}
}
self.round_robin.next_available_backend(None, backends)
}
fn rebuild(&mut self, backends: &[Rc<RefCell<Backend>>]) {
Maglev::rebuild(self, backends);
}
}
#[cfg(test)]
mod test {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use super::*;
use crate::{
PeakEWMA,
backends::{BackendStatus, HealthState},
retry::{ExponentialBackoffPolicy, RetryPolicyWrapper},
sozu_command::proto::command::{LoadBalancingParams, LoadMetric},
};
fn create_backend(id: String, connections: Option<usize>) -> Backend {
Backend {
sticky_id: None,
backend_id: id,
address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
status: BackendStatus::Normal,
retry_policy: RetryPolicyWrapper::ExponentialBackoff(ExponentialBackoffPolicy::new(1)),
active_connections: connections.unwrap_or(0),
active_requests: 0,
failures: 0,
load_balancing_parameters: None,
backup: false,
connection_time: PeakEWMA::new(),
health: HealthState::default(),
}
}
#[test]
fn it_should_find_the_backend_with_least_connections() {
let backend_with_least_connection =
Rc::new(RefCell::new(create_backend("yolo".to_string(), Some(1))));
let mut backends = vec![
Rc::new(RefCell::new(create_backend("nolo".to_string(), Some(10)))),
Rc::new(RefCell::new(create_backend("philo".to_string(), Some(20)))),
backend_with_least_connection.clone(),
];
let mut least_connection_algorithm = LeastLoaded {
metric: LoadMetric::Connections,
};
let backend_res = least_connection_algorithm
.next_available_backend(None, &mut backends)
.unwrap();
let backend = backend_res.borrow();
assert!(*backend == *backend_with_least_connection.borrow());
}
#[test]
fn it_shouldnt_find_backend_with_least_connections_when_list_is_empty() {
let mut backends = vec![];
let mut least_connection_algorithm = LeastLoaded {
metric: LoadMetric::Connections,
};
let backend = least_connection_algorithm.next_available_backend(None, &mut backends);
assert!(backend.is_none());
}
#[test]
fn it_should_find_backend_with_roundrobin_when_some_backends_were_removed() {
let mut backends = vec![
Rc::new(RefCell::new(create_backend("toto".to_string(), None))),
Rc::new(RefCell::new(create_backend("voto".to_string(), None))),
Rc::new(RefCell::new(create_backend("yoto".to_string(), None))),
];
let mut roundrobin = RoundRobin { next_backend: 1 };
let backend = roundrobin.next_available_backend(None, &mut backends);
assert_eq!(backend.as_ref(), backends.get(1));
backends.remove(1);
let backend2 = roundrobin.next_available_backend(None, &mut backends);
assert_eq!(backend2.as_ref(), backends.first());
}
fn addr_backend(id: &str, last_octet: u8, port: u16, weight: Option<i32>) -> Backend {
let mut b = create_backend(id.to_string(), None);
b.address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, last_octet)), port);
b.load_balancing_parameters = weight.map(|weight| LoadBalancingParams { weight });
b
}
fn rc(b: Backend) -> Rc<RefCell<Backend>> {
Rc::new(RefCell::new(b))
}
fn make_backends(n: u8) -> Vec<Rc<RefCell<Backend>>> {
(0..n)
.map(|i| rc(addr_backend(&format!("b{i}"), i + 1, 8000 + i as u16, None)))
.collect()
}
fn chosen_addr(b: &Rc<RefCell<Backend>>) -> SocketAddr {
b.borrow().address
}
#[test]
fn hrw_is_deterministic_for_a_fixed_key() {
let mut backends = make_backends(5);
let mut hrw = Rendezvous::new();
let first = hrw
.next_available_backend(Some(42), &mut backends)
.map(|b| chosen_addr(&b));
for _ in 0..50 {
let again = hrw
.next_available_backend(Some(42), &mut backends)
.map(|b| chosen_addr(&b));
assert_eq!(first, again, "HRW must be deterministic for a fixed key");
}
}
#[test]
fn hrw_none_key_falls_back_to_round_robin() {
let mut backends = make_backends(3);
let mut hrw = Rendezvous::new();
let a = chosen_addr(&hrw.next_available_backend(None, &mut backends).unwrap());
let b = chosen_addr(&hrw.next_available_backend(None, &mut backends).unwrap());
let c = chosen_addr(&hrw.next_available_backend(None, &mut backends).unwrap());
let d = chosen_addr(&hrw.next_available_backend(None, &mut backends).unwrap());
assert_eq!(a, chosen_addr(&backends[0]));
assert_eq!(b, chosen_addr(&backends[1]));
assert_eq!(c, chosen_addr(&backends[2]));
assert_eq!(d, a, "round-robin should wrap around");
}
#[test]
fn hrw_minimal_disruption_when_removing_a_non_winner() {
let mut backends = make_backends(6);
let mut hrw = Rendezvous::new();
let removed_addr = chosen_addr(&backends[3]);
let mut before = std::collections::HashMap::new();
for key in 0..2000u64 {
let w = chosen_addr(
&hrw.next_available_backend(Some(key), &mut backends)
.unwrap(),
);
before.insert(key, w);
}
backends.remove(3);
for key in 0..2000u64 {
let after = chosen_addr(
&hrw.next_available_backend(Some(key), &mut backends)
.unwrap(),
);
let prev = before[&key];
if prev != removed_addr {
assert_eq!(
prev, after,
"removing a non-winner changed the choice for key {key}"
);
}
}
}
#[test]
fn hrw_distribution_is_roughly_even() {
let n = 5u8;
let mut backends = make_backends(n);
let mut hrw = Rendezvous::new();
let total = 20_000u64;
let mut counts: std::collections::HashMap<SocketAddr, u64> =
std::collections::HashMap::new();
for key in 0..total {
let w = chosen_addr(
&hrw.next_available_backend(Some(key), &mut backends)
.unwrap(),
);
*counts.entry(w).or_default() += 1;
}
let expected = total / n as u64;
for b in &backends {
let c = counts.get(&chosen_addr(b)).copied().unwrap_or(0);
assert!(
c > expected * 65 / 100 && c < expected * 135 / 100,
"HRW distribution skewed: backend got {c}, expected ~{expected}"
);
}
}
#[test]
fn maglev_is_deterministic_for_a_fixed_key() {
let backends = make_backends(7);
let mut mag = Maglev::new();
mag.rebuild(&backends);
let mut sel = backends.clone();
let first = chosen_addr(&mag.next_available_backend(Some(12345), &mut sel).unwrap());
for _ in 0..50 {
let again = chosen_addr(&mag.next_available_backend(Some(12345), &mut sel).unwrap());
assert_eq!(first, again, "Maglev must be deterministic for a fixed key");
}
}
#[test]
fn maglev_none_key_falls_back_to_round_robin() {
let mut backends = make_backends(3);
let mut mag = Maglev::new();
mag.rebuild(&backends);
let a = chosen_addr(&mag.next_available_backend(None, &mut backends).unwrap());
let b = chosen_addr(&mag.next_available_backend(None, &mut backends).unwrap());
let c = chosen_addr(&mag.next_available_backend(None, &mut backends).unwrap());
let d = chosen_addr(&mag.next_available_backend(None, &mut backends).unwrap());
assert_eq!(a, chosen_addr(&backends[0]));
assert_eq!(b, chosen_addr(&backends[1]));
assert_eq!(c, chosen_addr(&backends[2]));
assert_eq!(d, a);
}
#[test]
fn maglev_distribution_is_roughly_even() {
let n = 5u8;
let backends = make_backends(n);
let mut mag = Maglev::with_seed_and_size(DEFAULT_HASH_SEED, 1009);
mag.rebuild(&backends);
let total = 50_000u64;
let mut counts: std::collections::HashMap<SocketAddr, u64> =
std::collections::HashMap::new();
let mut sel = backends.clone();
for key in 0..total {
let w = chosen_addr(&mag.next_available_backend(Some(key), &mut sel).unwrap());
*counts.entry(w).or_default() += 1;
}
let expected = total / n as u64;
for b in &backends {
let c = counts.get(&chosen_addr(b)).copied().unwrap_or(0);
assert!(
c > expected * 85 / 100 && c < expected * 115 / 100,
"Maglev distribution skewed: backend got {c}, expected ~{expected}"
);
}
}
#[test]
fn maglev_table_rebuild_keeps_most_keys_stable() {
let backends5 = make_backends(5);
let mut mag = Maglev::with_seed_and_size(DEFAULT_HASH_SEED, 1009);
mag.rebuild(&backends5);
let total = 20_000u64;
let mut before = std::collections::HashMap::new();
let mut sel = backends5.clone();
for key in 0..total {
before.insert(
key,
chosen_addr(&mag.next_available_backend(Some(key), &mut sel).unwrap()),
);
}
let mut backends6 = backends5.clone();
backends6.push(rc(addr_backend("b5", 6, 8005, None)));
mag.rebuild(&backends6);
let mut moved = 0u64;
let mut sel6 = backends6.clone();
for key in 0..total {
let after = chosen_addr(&mag.next_available_backend(Some(key), &mut sel6).unwrap());
if after != before[&key] {
moved += 1;
}
}
assert!(
moved < total / 2,
"Maglev rebuild moved too many keys: {moved}/{total}"
);
}
#[test]
fn maglev_partial_outage_does_not_rebuild_and_stays_stable() {
let full = make_backends(5);
let mut mag = Maglev::with_seed_and_size(DEFAULT_HASH_SEED, 1009);
mag.rebuild(&full);
let table_before = mag.table.clone();
let addrs_before = mag.backend_addrs.clone();
assert_eq!(addrs_before.len(), 5, "table built from the full set");
let total = 4000u64;
let mut before = std::collections::HashMap::new();
let mut sel_full = full.clone();
for key in 0..total {
before.insert(
key,
chosen_addr(
&mag.next_available_backend(Some(key), &mut sel_full)
.unwrap(),
),
);
}
let unhealthy_addr = chosen_addr(&full[2]);
let mut subset: Vec<_> = full
.iter()
.filter(|b| chosen_addr(b) != unhealthy_addr)
.cloned()
.collect();
for key in 0..total {
let after = chosen_addr(&mag.next_available_backend(Some(key), &mut subset).unwrap());
assert_ne!(
after, unhealthy_addr,
"selection returned the unhealthy backend for key {key}"
);
if before[&key] != unhealthy_addr {
assert_eq!(
before[&key], after,
"a healthy key moved during a partial outage (key {key})"
);
}
}
assert_eq!(
mag.table, table_before,
"partial outage must not rebuild the Maglev table"
);
assert_eq!(
mag.backend_addrs, addrs_before,
"partial outage must not change the captured backend set"
);
}
#[test]
fn maglev_all_table_backends_unhealthy_falls_back_to_round_robin() {
let table_set = make_backends(3);
let mut mag = Maglev::with_seed_and_size(DEFAULT_HASH_SEED, 1009);
mag.rebuild(&table_set);
let table_before = mag.table.clone();
let mut fresh = vec![
rc(addr_backend("n0", 50, 9000, None)),
rc(addr_backend("n1", 51, 9001, None)),
];
let fresh_addrs: Vec<_> = fresh.iter().map(chosen_addr).collect();
let picked = chosen_addr(&mag.next_available_backend(Some(7), &mut fresh).unwrap());
assert!(
fresh_addrs.contains(&picked),
"fallback must route to a backend in the healthy subset"
);
assert_eq!(
mag.table, table_before,
"fallback must not rebuild the table"
);
}
#[test]
fn maglev_cold_start_builds_table_once() {
let mut backends = make_backends(4);
let mut mag = Maglev::with_seed_and_size(DEFAULT_HASH_SEED, 1009);
assert!(mag.table.is_empty(), "table starts empty (cold)");
let _ = mag.next_available_backend(Some(99), &mut backends).unwrap();
assert_eq!(mag.table.len(), mag.size, "cold start populated the table");
let table_after_cold = mag.table.clone();
let _ = mag
.next_available_backend(Some(100), &mut backends)
.unwrap();
assert_eq!(
mag.table, table_after_cold,
"selection after cold start must not rebuild"
);
}
#[test]
fn round_robin_empty_set_returns_none_without_panic() {
let mut empty: Vec<Rc<RefCell<Backend>>> = vec![];
let mut rr = RoundRobin::new();
assert!(rr.next_available_backend(None, &mut empty).is_none());
let mut hrw = Rendezvous::new();
assert!(hrw.next_available_backend(None, &mut empty).is_none());
let mut mag = Maglev::new();
assert!(mag.next_available_backend(None, &mut empty).is_none());
}
#[test]
fn maglev_table_size_one_is_clamped_to_a_prime() {
let mut mag = Maglev::with_seed_and_size(DEFAULT_HASH_SEED, 1);
assert!(
mag.size >= 2,
"size must be clamped to >= 2, got {}",
mag.size
);
let mut backends = make_backends(3);
mag.rebuild(&backends);
assert_eq!(mag.table.len(), mag.size);
let _ = mag.next_available_backend(Some(1), &mut backends).unwrap();
}
#[test]
fn next_prime_picks_the_smallest_prime_at_least_n() {
assert_eq!(next_prime(0), 2);
assert_eq!(next_prime(1), 2);
assert_eq!(next_prime(2), 2);
assert_eq!(next_prime(3), 3);
assert_eq!(next_prime(4), 5);
assert_eq!(next_prime(1009), 1009); assert_eq!(next_prime(65537), 65537); }
#[test]
fn maglev_honors_weight() {
let backends = vec![
rc(addr_backend("light", 1, 8001, Some(100))),
rc(addr_backend("heavy", 2, 8002, Some(400))),
];
let mut mag = Maglev::with_seed_and_size(DEFAULT_HASH_SEED, 1009);
mag.rebuild(&backends);
let heavy_addr = chosen_addr(&backends[1]);
let total = 20_000u64;
let mut heavy = 0u64;
let mut sel = backends.clone();
for key in 0..total {
if chosen_addr(&mag.next_available_backend(Some(key), &mut sel).unwrap()) == heavy_addr
{
heavy += 1;
}
}
assert!(
heavy > total * 70 / 100,
"weighted Maglev did not favor the heavy backend: {heavy}/{total}"
);
}
}