#![warn(clippy::pedantic, missing_docs)]
use std::hash::{Hash, Hasher};
use fixedbitset::FixedBitSet;
enum NodeSet {
Inline(u128),
Heap(FixedBitSet),
}
impl NodeSet {
fn with_capacity(n: usize) -> Self {
if n <= 128 {
Self::Inline(0)
} else {
Self::Heap(FixedBitSet::with_capacity(n))
}
}
fn put(&mut self, bit: usize) -> bool {
match self {
Self::Inline(bits) => {
let mask = 1u128 << bit;
let was_set = *bits & mask != 0;
*bits |= mask;
was_set
}
Self::Heap(set) => set.put(bit),
}
}
}
pub struct MaglevTable<N> {
nodes: Vec<N>,
table: Vec<usize>,
capacity: usize,
}
impl<N: Hash + Clone + Eq> MaglevTable<N> {
#[must_use]
pub fn new(nodes: Vec<N>) -> Self {
let raw = if nodes.is_empty() {
1
} else {
nodes.len() * 100
};
Self::with_capacity(nodes, raw)
}
#[must_use]
#[allow(clippy::cast_possible_truncation)]
pub fn with_capacity(nodes: Vec<N>, min_capacity: usize) -> Self {
let m = next_prime(min_capacity.max(1));
if nodes.is_empty() {
return Self {
nodes,
table: Vec::new(),
capacity: m,
};
}
let n = nodes.len();
let mut offsets = Vec::with_capacity(n);
let mut skips = Vec::with_capacity(n);
for node in &nodes {
let offset = hash_with_seed(node, 0) % (m as u64);
let skip = hash_with_seed(node, 1) % ((m - 1) as u64) + 1;
offsets.push(offset);
skips.push(skip);
}
let mut table = vec![usize::MAX; m];
let mut next = vec![0u64; n];
let mut filled = 0usize;
while filled < m {
for i in 0..n {
let mut c = (offsets[i] + next[i] * skips[i]) % (m as u64);
while table[c as usize] != usize::MAX {
next[i] += 1;
c = (offsets[i] + next[i] * skips[i]) % (m as u64);
}
table[c as usize] = i;
next[i] += 1;
filled += 1;
if filled == m {
break;
}
}
}
Self {
nodes,
table,
capacity: m,
}
}
#[must_use]
#[allow(clippy::cast_possible_truncation)]
pub fn get<K: Hash>(&self, key: &K) -> Option<&N> {
if self.nodes.is_empty() {
return None;
}
let slot = (hash_with_seed(key, 2) % (self.capacity as u64)) as usize;
Some(&self.nodes[self.table[slot]])
}
#[must_use]
#[allow(clippy::cast_possible_truncation)]
pub fn get_top_k<K: Hash>(&self, key: &K, k: usize) -> Vec<&N> {
if self.nodes.is_empty() {
return Vec::new();
}
let k = k.min(self.nodes.len());
let m = self.capacity;
let start = (hash_with_seed(key, 2) % (m as u64)) as usize;
let mut result = Vec::with_capacity(k);
let mut seen = NodeSet::with_capacity(self.nodes.len());
for slot in (start..m).chain(0..start) {
let node_idx = self.table[slot];
if !seen.put(node_idx) {
result.push(&self.nodes[node_idx]);
if result.len() == k {
return result;
}
}
}
result
}
#[must_use]
#[allow(clippy::cast_possible_truncation)]
pub fn is_match<K: Hash>(&self, key: &K, node: &N, top_k: usize) -> bool {
if self.nodes.is_empty() {
return false;
}
let top_k = top_k.min(self.nodes.len());
let m = self.capacity;
let start = (hash_with_seed(key, 2) % (m as u64)) as usize;
let mut seen = NodeSet::with_capacity(self.nodes.len());
let mut count = 0usize;
for slot in (start..m).chain(0..start) {
let node_idx = self.table[slot];
if !seen.put(node_idx) {
if self.nodes[node_idx] == *node {
return true;
}
count += 1;
if count == top_k {
return false;
}
}
}
false
}
#[must_use]
pub fn nodes(&self) -> &[N] {
&self.nodes
}
#[must_use]
pub fn capacity(&self) -> usize {
self.capacity
}
#[must_use]
pub fn len(&self) -> usize {
self.nodes.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.nodes.is_empty()
}
}
fn hash_with_seed<T: Hash>(val: &T, seed: u64) -> u64 {
let mut hasher = std::hash::DefaultHasher::new();
seed.hash(&mut hasher);
val.hash(&mut hasher);
hasher.finish()
}
fn next_prime(n: usize) -> usize {
if n <= 2 {
return 2;
}
primal::Primes::all()
.find(|&p| p >= n)
.expect("primal yields unbounded primes")
}
#[cfg(test)]
#[allow(
clippy::cast_precision_loss,
clippy::cast_lossless,
clippy::similar_names
)]
mod tests {
use super::*;
use std::collections::HashMap;
use std::collections::HashSet;
#[test]
fn test_empty_table() {
let table: MaglevTable<String> = MaglevTable::new(vec![]);
assert!(table.is_empty());
assert_eq!(table.len(), 0);
assert!(table.get(&"any-key").is_none());
assert!(table.get_top_k(&"any-key", 5).is_empty());
assert!(!table.is_match(&"any-key", &"node".to_string(), 3));
}
#[test]
fn test_single_node() {
let table = MaglevTable::new(vec!["only-node".to_string()]);
assert_eq!(table.len(), 1);
for i in 0..100 {
let key = format!("key-{i}");
assert_eq!(table.get(&key).unwrap(), "only-node");
}
}
#[test]
fn test_deterministic() {
let nodes: Vec<String> = (0..5).map(|i| format!("node-{i}")).collect();
let t1 = MaglevTable::new(nodes.clone());
let t2 = MaglevTable::new(nodes);
for i in 0..200 {
let key = format!("key-{i}");
assert_eq!(t1.get(&key), t2.get(&key));
}
}
#[test]
fn test_uniform_distribution() {
let nodes: Vec<String> = (0..3).map(|i| format!("node-{i}")).collect();
let table = MaglevTable::new(nodes.clone());
let mut counts: HashMap<&str, usize> = HashMap::new();
let num_keys = 10_000;
for i in 0..num_keys {
let key = format!("key-{i}");
let node = table.get(&key).unwrap();
*counts.entry(node.as_str()).or_default() += 1;
}
for node in &nodes {
let count = counts.get(node.as_str()).copied().unwrap_or(0);
let ratio = count as f64 / num_keys as f64;
assert!(
ratio > 0.20 && ratio < 0.40,
"node {node} has ratio {ratio:.3}, expected between 0.20 and 0.40"
);
}
}
#[test]
fn test_minimal_disruption() {
let nodes_full: Vec<String> = (0..10).map(|i| format!("node-{i}")).collect();
let nodes_minus_one: Vec<String> = nodes_full[..9].to_vec();
let cap = nodes_full.len() * 100;
let t_full = MaglevTable::with_capacity(nodes_full, cap);
let t_minus = MaglevTable::with_capacity(nodes_minus_one, cap);
let num_keys = 10_000;
let mut changed = 0usize;
for i in 0..num_keys {
let key = format!("key-{i}");
let a = t_full.get(&key).unwrap();
let b = t_minus.get(&key).unwrap();
if a != b {
changed += 1;
}
}
let disruption = changed as f64 / num_keys as f64;
assert!(
disruption < 0.20,
"disruption {disruption:.3} exceeds 0.20 (expected ~0.10 for 10→9 nodes)"
);
}
#[test]
fn test_preference_list_length() {
let nodes: Vec<String> = (0..5).map(|i| format!("node-{i}")).collect();
let table = MaglevTable::new(nodes.clone());
let top = table.get_top_k(&"some-key", 5);
assert_eq!(top.len(), 5);
}
#[test]
fn test_preference_list_deterministic() {
let nodes: Vec<String> = (0..5).map(|i| format!("node-{i}")).collect();
let table = MaglevTable::new(nodes);
let a = table.get_top_k(&"stable-key", 5);
let b = table.get_top_k(&"stable-key", 5);
assert_eq!(a, b);
}
#[test]
fn test_preference_list_unique() {
let nodes: Vec<String> = (0..7).map(|i| format!("node-{i}")).collect();
let table = MaglevTable::new(nodes);
for i in 0..100 {
let key = format!("key-{i}");
let top = table.get_top_k(&key, 7);
let set: HashSet<&String> = top.iter().copied().collect();
assert_eq!(
set.len(),
top.len(),
"duplicate in preference list for {key}"
);
}
}
#[test]
fn test_preference_list_primary_matches_get() {
let nodes: Vec<String> = (0..5).map(|i| format!("node-{i}")).collect();
let table = MaglevTable::new(nodes);
for i in 0..200 {
let key = format!("key-{i}");
let primary = table.get(&key).unwrap();
let top = table.get_top_k(&key, 1);
assert_eq!(top[0], primary, "mismatch for {key}");
}
}
#[test]
fn test_is_match() {
let nodes: Vec<String> = (0..5).map(|i| format!("node-{i}")).collect();
let table = MaglevTable::new(nodes);
let key = "test-key";
let top3 = table.get_top_k(&key, 3);
for node in &top3 {
assert!(table.is_match(&key, node, 3));
}
let top5 = table.get_top_k(&key, 5);
for node in &top5[3..] {
assert!(!table.is_match(&key, node, 3));
}
}
#[test]
fn test_is_match_single_replica() {
let nodes: Vec<String> = (0..5).map(|i| format!("node-{i}")).collect();
let table = MaglevTable::new(nodes.clone());
let key = "single-replica-key";
let primary = table.get(&key).unwrap().clone();
assert!(table.is_match(&key, &primary, 1));
for node in &nodes {
if node != &primary {
assert!(
!table.is_match(&key, node, 1),
"non-primary node {node} matched with replicas=1"
);
}
}
}
#[test]
fn test_large_table() {
let nodes: Vec<String> = (0..100).map(|i| format!("worker-{i}")).collect();
let table = MaglevTable::new(nodes.clone());
assert_eq!(table.len(), 100);
for i in 0..500 {
let key = format!("request-{i}");
let node = table.get(&key).unwrap();
assert!(nodes.contains(node));
}
}
#[test]
fn test_prime_capacity() {
for n in [1, 2, 3, 5, 10, 50, 100] {
let nodes: Vec<u32> = (0..n).collect();
let table = MaglevTable::new(nodes);
let cap = table.capacity();
assert!(
primal::is_prime(cap as u64),
"capacity {cap} is not prime for {n} nodes"
);
}
}
#[test]
fn test_preference_list_covers_all_nodes() {
let nodes: Vec<String> = (0..7).map(|i| format!("node-{i}")).collect();
let table = MaglevTable::new(nodes.clone());
for i in 0..100 {
let key = format!("key-{i}");
let top = table.get_top_k(&key, 7);
let set: HashSet<&String> = top.iter().copied().collect();
assert_eq!(
set.len(),
7,
"preference list for {key} did not cover all nodes"
);
for node in &nodes {
assert!(set.contains(node), "node {node} missing for {key}");
}
}
}
#[test]
fn test_node_removal_minimal_disruption_preference_list() {
let nodes_full: Vec<String> = (0..6).map(|i| format!("node-{i}")).collect();
let nodes_minus: Vec<String> = nodes_full[..5].to_vec();
let removed = nodes_full[5].clone();
let cap = nodes_full.len() * 100;
let t_full = MaglevTable::with_capacity(nodes_full, cap);
let t_minus = MaglevTable::with_capacity(nodes_minus, cap);
let mut preserved = 0usize;
let mut total_pairs = 0usize;
let num_keys = 1_000;
for i in 0..num_keys {
let key = format!("key-{i}");
let full_list: Vec<&String> = t_full
.get_top_k(&key, 6)
.into_iter()
.filter(|n| **n != removed)
.collect();
let minus_list = t_minus.get_top_k(&key, 5);
for a in 0..full_list.len() {
for b in (a + 1)..full_list.len() {
total_pairs += 1;
let pos_a_minus = minus_list.iter().position(|n| *n == full_list[a]);
let pos_b_minus = minus_list.iter().position(|n| *n == full_list[b]);
if let (Some(pa), Some(pb)) = (pos_a_minus, pos_b_minus) {
if pa < pb {
preserved += 1;
}
}
}
}
}
let ratio = preserved as f64 / total_pairs as f64;
assert!(
ratio > 0.5,
"pairwise ordering preservation {ratio:.3} is too low"
);
}
#[test]
fn test_capacity_auto_sizing() {
let nodes: Vec<u32> = (0..10).collect();
let table = MaglevTable::new(nodes);
assert!(table.capacity() >= 1000);
assert!(primal::is_prime(table.capacity() as u64));
}
#[test]
fn test_string_nodes() {
let nodes: Vec<String> = vec![
"us-east-1a".into(),
"us-west-2b".into(),
"eu-west-1c".into(),
];
let table = MaglevTable::new(nodes.clone());
let node = table.get(&"user-session-123").unwrap();
assert!(nodes.contains(node));
let top = table.get_top_k(&"user-session-123", 3);
assert_eq!(top.len(), 3);
}
#[test]
fn test_get_top_k_with_k_greater_than_nodes() {
let nodes: Vec<String> = (0..3).map(|i| format!("node-{i}")).collect();
let table = MaglevTable::new(nodes);
let top = table.get_top_k(&"key", 10);
assert_eq!(top.len(), 3);
}
#[test]
fn test_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<MaglevTable<String>>();
}
#[test]
fn test_node_hashes_are_independent() {
let nodes_a: Vec<String> = vec!["alpha", "beta", "gamma"]
.into_iter()
.map(String::from)
.collect();
let nodes_b: Vec<String> = vec!["alpha", "beta", "gamma", "delta"]
.into_iter()
.map(String::from)
.collect();
let cap = 10007; let t_a = MaglevTable::with_capacity(nodes_a, cap);
let t_b = MaglevTable::with_capacity(nodes_b, cap);
let mut same = 0;
let mut comparable = 0;
for i in 0..10_000 {
let key = format!("key-{i}");
let a = t_a.get(&key).unwrap();
let b = t_b.get(&key).unwrap();
if b == "alpha" || b == "beta" || b == "gamma" {
comparable += 1;
if a == b {
same += 1;
}
}
}
let ratio = same as f64 / comparable as f64;
assert!(
ratio > 0.95,
"only {ratio:.3} of comparable keys matched — \
node hashes may be dependent on input order"
);
}
#[test]
fn test_top_k_minimal_disruption_on_node_addition() {
let nodes_3: Vec<String> = vec!["alpha", "beta", "gamma"]
.into_iter()
.map(String::from)
.collect();
let nodes_4: Vec<String> = vec!["alpha", "beta", "gamma", "delta"]
.into_iter()
.map(String::from)
.collect();
let cap = 10007;
let t3 = MaglevTable::with_capacity(nodes_3, cap);
let t4 = MaglevTable::with_capacity(nodes_4, cap);
let num_keys = 10_000;
let mut changed_at_rank = [0usize; 3];
for i in 0..num_keys {
let key = format!("key-{i}");
let top3_old = t3.get_top_k(&key, 3);
let top3_new = t4.get_top_k(&key, 3);
for rank in 0..3 {
if top3_old[rank] != top3_new[rank] {
changed_at_rank[rank] += 1;
}
}
}
let max_disruption = [0.35, 0.40, 0.55];
for (rank, &changed) in changed_at_rank.iter().enumerate() {
let ratio = changed as f64 / num_keys as f64;
assert!(
ratio < max_disruption[rank],
"rank {rank} disruption {ratio:.3} exceeds {:.2} — \
preference list may be fully scrambled on node addition",
max_disruption[rank]
);
}
let mut total_surviving = 0usize;
for i in 0..num_keys {
let key = format!("key-{i}");
let old_set: HashSet<&String> = t3.get_top_k(&key, 3).into_iter().collect();
let new_top3 = t4.get_top_k(&key, 3);
total_surviving += new_top3.iter().filter(|n| old_set.contains(*n)).count();
}
let avg_surviving = total_surviving as f64 / num_keys as f64;
assert!(
avg_surviving > 2.2,
"average surviving nodes in top-3 is {avg_surviving:.2}, \
expected > 2.2 (only ~1 slot should be taken by new node)"
);
}
#[test]
fn test_cross_process_agreement() {
let build = || {
let nodes: Vec<String> = vec![
"us-east-1a",
"us-west-2b",
"eu-west-1c",
"ap-south-1a",
"sa-east-1a",
]
.into_iter()
.map(String::from)
.collect();
MaglevTable::with_capacity(nodes, 5003)
};
let t1 = build();
let t2 = build();
for i in 0..10_000 {
let key = format!("request-{i}");
assert_eq!(
t1.get(&key),
t2.get(&key),
"tables disagree on key {key} — seeds may be non-deterministic"
);
assert_eq!(
t1.get_top_k(&key, 5),
t2.get_top_k(&key, 5),
"preference lists disagree on key {key}"
);
}
}
}