1#![warn(clippy::pedantic, missing_docs)]
71
72use std::hash::{Hash, Hasher};
73
74use fixedbitset::FixedBitSet;
75
76enum NodeSet {
81 Inline(u128),
83 Heap(FixedBitSet),
85}
86
87impl NodeSet {
88 fn with_capacity(n: usize) -> Self {
90 if n <= 128 {
91 Self::Inline(0)
92 } else {
93 Self::Heap(FixedBitSet::with_capacity(n))
94 }
95 }
96
97 fn put(&mut self, bit: usize) -> bool {
99 match self {
100 Self::Inline(bits) => {
101 let mask = 1u128 << bit;
102 let was_set = *bits & mask != 0;
103 *bits |= mask;
104 was_set
105 }
106 Self::Heap(set) => set.put(bit),
107 }
108 }
109}
110
111pub struct MaglevTable<N> {
117 nodes: Vec<N>,
118 table: Vec<usize>,
119 capacity: usize,
120}
121
122impl<N: Hash + Clone + Eq> MaglevTable<N> {
123 #[must_use]
126 pub fn new(nodes: Vec<N>) -> Self {
127 let raw = if nodes.is_empty() {
128 1
129 } else {
130 nodes.len() * 100
131 };
132 Self::with_capacity(nodes, raw)
133 }
134
135 #[must_use]
142 #[allow(clippy::cast_possible_truncation)]
144 pub fn with_capacity(nodes: Vec<N>, min_capacity: usize) -> Self {
145 let m = next_prime(min_capacity.max(1));
146
147 if nodes.is_empty() {
148 return Self {
149 nodes,
150 table: Vec::new(),
151 capacity: m,
152 };
153 }
154
155 let n = nodes.len();
156 let mut offsets = Vec::with_capacity(n);
157 let mut skips = Vec::with_capacity(n);
158
159 for node in &nodes {
160 let offset = hash_with_seed(node, 0) % (m as u64);
161 let skip = hash_with_seed(node, 1) % ((m - 1) as u64) + 1;
162 offsets.push(offset);
163 skips.push(skip);
164 }
165
166 let mut table = vec![usize::MAX; m];
167 let mut next = vec![0u64; n];
168
169 let mut filled = 0usize;
170 while filled < m {
171 for i in 0..n {
172 let mut c = (offsets[i] + next[i] * skips[i]) % (m as u64);
173 while table[c as usize] != usize::MAX {
174 next[i] += 1;
175 c = (offsets[i] + next[i] * skips[i]) % (m as u64);
176 }
177 table[c as usize] = i;
178 next[i] += 1;
179 filled += 1;
180 if filled == m {
181 break;
182 }
183 }
184 }
185
186 Self {
187 nodes,
188 table,
189 capacity: m,
190 }
191 }
192
193 #[must_use]
195 #[allow(clippy::cast_possible_truncation)]
197 pub fn get<K: Hash>(&self, key: &K) -> Option<&N> {
198 if self.nodes.is_empty() {
199 return None;
200 }
201 let slot = (hash_with_seed(key, 2) % (self.capacity as u64)) as usize;
202 Some(&self.nodes[self.table[slot]])
203 }
204
205 #[must_use]
212 #[allow(clippy::cast_possible_truncation)]
214 pub fn get_top_k<K: Hash>(&self, key: &K, k: usize) -> Vec<&N> {
215 if self.nodes.is_empty() {
216 return Vec::new();
217 }
218 let k = k.min(self.nodes.len());
219 let m = self.capacity;
220 let start = (hash_with_seed(key, 2) % (m as u64)) as usize;
221
222 let mut result = Vec::with_capacity(k);
223 let mut seen = NodeSet::with_capacity(self.nodes.len());
224
225 for slot in (start..m).chain(0..start) {
226 let node_idx = self.table[slot];
227 if !seen.put(node_idx) {
228 result.push(&self.nodes[node_idx]);
229 if result.len() == k {
230 return result;
231 }
232 }
233 }
234
235 result
236 }
237
238 #[must_use]
246 #[allow(clippy::cast_possible_truncation)]
248 pub fn is_match<K: Hash>(&self, key: &K, node: &N, top_k: usize) -> bool {
249 if self.nodes.is_empty() {
250 return false;
251 }
252 let top_k = top_k.min(self.nodes.len());
253 let m = self.capacity;
254 let start = (hash_with_seed(key, 2) % (m as u64)) as usize;
255
256 let mut seen = NodeSet::with_capacity(self.nodes.len());
257 let mut count = 0usize;
258
259 for slot in (start..m).chain(0..start) {
260 let node_idx = self.table[slot];
261 if !seen.put(node_idx) {
262 if self.nodes[node_idx] == *node {
263 return true;
264 }
265 count += 1;
266 if count == top_k {
267 return false;
268 }
269 }
270 }
271
272 false
273 }
274
275 #[must_use]
277 pub fn nodes(&self) -> &[N] {
278 &self.nodes
279 }
280
281 #[must_use]
283 pub fn capacity(&self) -> usize {
284 self.capacity
285 }
286
287 #[must_use]
289 pub fn len(&self) -> usize {
290 self.nodes.len()
291 }
292
293 #[must_use]
295 pub fn is_empty(&self) -> bool {
296 self.nodes.is_empty()
297 }
298}
299
300fn hash_with_seed<T: Hash>(val: &T, seed: u64) -> u64 {
302 let mut hasher = std::hash::DefaultHasher::new();
303 seed.hash(&mut hasher);
304 val.hash(&mut hasher);
305 hasher.finish()
306}
307
308fn next_prime(n: usize) -> usize {
310 if n <= 2 {
311 return 2;
312 }
313 primal::Primes::all()
314 .find(|&p| p >= n)
315 .expect("primal yields unbounded primes")
316}
317
318#[cfg(test)]
319#[allow(
320 clippy::cast_precision_loss,
321 clippy::cast_lossless,
322 clippy::similar_names
323)]
324mod tests {
325 use super::*;
326 use std::collections::HashMap;
327 use std::collections::HashSet;
328
329 #[test]
330 fn test_empty_table() {
331 let table: MaglevTable<String> = MaglevTable::new(vec![]);
332 assert!(table.is_empty());
333 assert_eq!(table.len(), 0);
334 assert!(table.get(&"any-key").is_none());
335 assert!(table.get_top_k(&"any-key", 5).is_empty());
336 assert!(!table.is_match(&"any-key", &"node".to_string(), 3));
337 }
338
339 #[test]
340 fn test_single_node() {
341 let table = MaglevTable::new(vec!["only-node".to_string()]);
342 assert_eq!(table.len(), 1);
343 for i in 0..100 {
344 let key = format!("key-{i}");
345 assert_eq!(table.get(&key).unwrap(), "only-node");
346 }
347 }
348
349 #[test]
350 fn test_deterministic() {
351 let nodes: Vec<String> = (0..5).map(|i| format!("node-{i}")).collect();
352 let t1 = MaglevTable::new(nodes.clone());
353 let t2 = MaglevTable::new(nodes);
354
355 for i in 0..200 {
356 let key = format!("key-{i}");
357 assert_eq!(t1.get(&key), t2.get(&key));
358 }
359 }
360
361 #[test]
362 fn test_uniform_distribution() {
363 let nodes: Vec<String> = (0..3).map(|i| format!("node-{i}")).collect();
364 let table = MaglevTable::new(nodes.clone());
365
366 let mut counts: HashMap<&str, usize> = HashMap::new();
367 let num_keys = 10_000;
368 for i in 0..num_keys {
369 let key = format!("key-{i}");
370 let node = table.get(&key).unwrap();
371 *counts.entry(node.as_str()).or_default() += 1;
372 }
373
374 for node in &nodes {
375 let count = counts.get(node.as_str()).copied().unwrap_or(0);
376 let ratio = count as f64 / num_keys as f64;
377 assert!(
378 ratio > 0.20 && ratio < 0.40,
379 "node {node} has ratio {ratio:.3}, expected between 0.20 and 0.40"
380 );
381 }
382 }
383
384 #[test]
385 fn test_minimal_disruption() {
386 let nodes_full: Vec<String> = (0..10).map(|i| format!("node-{i}")).collect();
387 let nodes_minus_one: Vec<String> = nodes_full[..9].to_vec();
388
389 let cap = nodes_full.len() * 100;
391 let t_full = MaglevTable::with_capacity(nodes_full, cap);
392 let t_minus = MaglevTable::with_capacity(nodes_minus_one, cap);
393
394 let num_keys = 10_000;
395 let mut changed = 0usize;
396 for i in 0..num_keys {
397 let key = format!("key-{i}");
398 let a = t_full.get(&key).unwrap();
399 let b = t_minus.get(&key).unwrap();
400 if a != b {
401 changed += 1;
402 }
403 }
404
405 let disruption = changed as f64 / num_keys as f64;
406 assert!(
407 disruption < 0.20,
408 "disruption {disruption:.3} exceeds 0.20 (expected ~0.10 for 10→9 nodes)"
409 );
410 }
411
412 #[test]
413 fn test_preference_list_length() {
414 let nodes: Vec<String> = (0..5).map(|i| format!("node-{i}")).collect();
415 let table = MaglevTable::new(nodes.clone());
416 let top = table.get_top_k(&"some-key", 5);
417 assert_eq!(top.len(), 5);
418 }
419
420 #[test]
421 fn test_preference_list_deterministic() {
422 let nodes: Vec<String> = (0..5).map(|i| format!("node-{i}")).collect();
423 let table = MaglevTable::new(nodes);
424
425 let a = table.get_top_k(&"stable-key", 5);
426 let b = table.get_top_k(&"stable-key", 5);
427 assert_eq!(a, b);
428 }
429
430 #[test]
431 fn test_preference_list_unique() {
432 let nodes: Vec<String> = (0..7).map(|i| format!("node-{i}")).collect();
433 let table = MaglevTable::new(nodes);
434
435 for i in 0..100 {
436 let key = format!("key-{i}");
437 let top = table.get_top_k(&key, 7);
438 let set: HashSet<&String> = top.iter().copied().collect();
439 assert_eq!(
440 set.len(),
441 top.len(),
442 "duplicate in preference list for {key}"
443 );
444 }
445 }
446
447 #[test]
448 fn test_preference_list_primary_matches_get() {
449 let nodes: Vec<String> = (0..5).map(|i| format!("node-{i}")).collect();
450 let table = MaglevTable::new(nodes);
451
452 for i in 0..200 {
453 let key = format!("key-{i}");
454 let primary = table.get(&key).unwrap();
455 let top = table.get_top_k(&key, 1);
456 assert_eq!(top[0], primary, "mismatch for {key}");
457 }
458 }
459
460 #[test]
461 fn test_is_match() {
462 let nodes: Vec<String> = (0..5).map(|i| format!("node-{i}")).collect();
463 let table = MaglevTable::new(nodes);
464
465 let key = "test-key";
466 let top3 = table.get_top_k(&key, 3);
467 for node in &top3 {
468 assert!(table.is_match(&key, node, 3));
469 }
470
471 let top5 = table.get_top_k(&key, 5);
472 for node in &top5[3..] {
473 assert!(!table.is_match(&key, node, 3));
474 }
475 }
476
477 #[test]
478 fn test_is_match_single_replica() {
479 let nodes: Vec<String> = (0..5).map(|i| format!("node-{i}")).collect();
480 let table = MaglevTable::new(nodes.clone());
481
482 let key = "single-replica-key";
483 let primary = table.get(&key).unwrap().clone();
484 assert!(table.is_match(&key, &primary, 1));
485
486 for node in &nodes {
487 if node != &primary {
488 assert!(
489 !table.is_match(&key, node, 1),
490 "non-primary node {node} matched with replicas=1"
491 );
492 }
493 }
494 }
495
496 #[test]
497 fn test_large_table() {
498 let nodes: Vec<String> = (0..100).map(|i| format!("worker-{i}")).collect();
499 let table = MaglevTable::new(nodes.clone());
500 assert_eq!(table.len(), 100);
501
502 for i in 0..500 {
503 let key = format!("request-{i}");
504 let node = table.get(&key).unwrap();
505 assert!(nodes.contains(node));
506 }
507 }
508
509 #[test]
510 fn test_prime_capacity() {
511 for n in [1, 2, 3, 5, 10, 50, 100] {
512 let nodes: Vec<u32> = (0..n).collect();
513 let table = MaglevTable::new(nodes);
514 let cap = table.capacity();
515 assert!(
516 primal::is_prime(cap as u64),
517 "capacity {cap} is not prime for {n} nodes"
518 );
519 }
520 }
521
522 #[test]
523 fn test_preference_list_covers_all_nodes() {
524 let nodes: Vec<String> = (0..7).map(|i| format!("node-{i}")).collect();
525 let table = MaglevTable::new(nodes.clone());
526
527 for i in 0..100 {
528 let key = format!("key-{i}");
529 let top = table.get_top_k(&key, 7);
530 let set: HashSet<&String> = top.iter().copied().collect();
531 assert_eq!(
532 set.len(),
533 7,
534 "preference list for {key} did not cover all nodes"
535 );
536 for node in &nodes {
537 assert!(set.contains(node), "node {node} missing for {key}");
538 }
539 }
540 }
541
542 #[test]
543 fn test_node_removal_minimal_disruption_preference_list() {
544 let nodes_full: Vec<String> = (0..6).map(|i| format!("node-{i}")).collect();
545 let nodes_minus: Vec<String> = nodes_full[..5].to_vec();
546 let removed = nodes_full[5].clone();
547
548 let cap = nodes_full.len() * 100;
550 let t_full = MaglevTable::with_capacity(nodes_full, cap);
551 let t_minus = MaglevTable::with_capacity(nodes_minus, cap);
552
553 let mut preserved = 0usize;
554 let mut total_pairs = 0usize;
555 let num_keys = 1_000;
556
557 for i in 0..num_keys {
558 let key = format!("key-{i}");
559 let full_list: Vec<&String> = t_full
560 .get_top_k(&key, 6)
561 .into_iter()
562 .filter(|n| **n != removed)
563 .collect();
564 let minus_list = t_minus.get_top_k(&key, 5);
565
566 for a in 0..full_list.len() {
567 for b in (a + 1)..full_list.len() {
568 total_pairs += 1;
569 let pos_a_minus = minus_list.iter().position(|n| *n == full_list[a]);
570 let pos_b_minus = minus_list.iter().position(|n| *n == full_list[b]);
571 if let (Some(pa), Some(pb)) = (pos_a_minus, pos_b_minus) {
572 if pa < pb {
573 preserved += 1;
574 }
575 }
576 }
577 }
578 }
579
580 let ratio = preserved as f64 / total_pairs as f64;
581 assert!(
582 ratio > 0.5,
583 "pairwise ordering preservation {ratio:.3} is too low"
584 );
585 }
586
587 #[test]
588 fn test_capacity_auto_sizing() {
589 let nodes: Vec<u32> = (0..10).collect();
590 let table = MaglevTable::new(nodes);
591 assert!(table.capacity() >= 1000);
592 assert!(primal::is_prime(table.capacity() as u64));
593 }
594
595 #[test]
596 fn test_string_nodes() {
597 let nodes: Vec<String> = vec![
598 "us-east-1a".into(),
599 "us-west-2b".into(),
600 "eu-west-1c".into(),
601 ];
602 let table = MaglevTable::new(nodes.clone());
603
604 let node = table.get(&"user-session-123").unwrap();
605 assert!(nodes.contains(node));
606
607 let top = table.get_top_k(&"user-session-123", 3);
608 assert_eq!(top.len(), 3);
609 }
610
611 #[test]
612 fn test_get_top_k_with_k_greater_than_nodes() {
613 let nodes: Vec<String> = (0..3).map(|i| format!("node-{i}")).collect();
614 let table = MaglevTable::new(nodes);
615 let top = table.get_top_k(&"key", 10);
616 assert_eq!(top.len(), 3);
617 }
618
619 #[test]
620 fn test_send_sync() {
621 fn assert_send_sync<T: Send + Sync>() {}
622 assert_send_sync::<MaglevTable<String>>();
623 }
624
625 #[test]
633 fn test_node_hashes_are_independent() {
634 let nodes_a: Vec<String> = vec!["alpha", "beta", "gamma"]
635 .into_iter()
636 .map(String::from)
637 .collect();
638 let nodes_b: Vec<String> = vec!["alpha", "beta", "gamma", "delta"]
639 .into_iter()
640 .map(String::from)
641 .collect();
642
643 let cap = 10007; let t_a = MaglevTable::with_capacity(nodes_a, cap);
645 let t_b = MaglevTable::with_capacity(nodes_b, cap);
646
647 let mut same = 0;
651 let mut comparable = 0;
652 for i in 0..10_000 {
653 let key = format!("key-{i}");
654 let a = t_a.get(&key).unwrap();
655 let b = t_b.get(&key).unwrap();
656 if b == "alpha" || b == "beta" || b == "gamma" {
658 comparable += 1;
659 if a == b {
660 same += 1;
661 }
662 }
663 }
664
665 let ratio = same as f64 / comparable as f64;
669 assert!(
670 ratio > 0.95,
671 "only {ratio:.3} of comparable keys matched — \
672 node hashes may be dependent on input order"
673 );
674 }
675
676 #[test]
680 fn test_top_k_minimal_disruption_on_node_addition() {
681 let nodes_3: Vec<String> = vec!["alpha", "beta", "gamma"]
682 .into_iter()
683 .map(String::from)
684 .collect();
685 let nodes_4: Vec<String> = vec!["alpha", "beta", "gamma", "delta"]
686 .into_iter()
687 .map(String::from)
688 .collect();
689
690 let cap = 10007;
691 let t3 = MaglevTable::with_capacity(nodes_3, cap);
692 let t4 = MaglevTable::with_capacity(nodes_4, cap);
693
694 let num_keys = 10_000;
695
696 let mut changed_at_rank = [0usize; 3];
698 for i in 0..num_keys {
699 let key = format!("key-{i}");
700 let top3_old = t3.get_top_k(&key, 3);
701 let top3_new = t4.get_top_k(&key, 3);
702
703 for rank in 0..3 {
704 if top3_old[rank] != top3_new[rank] {
705 changed_at_rank[rank] += 1;
706 }
707 }
708 }
709
710 let max_disruption = [0.35, 0.40, 0.55];
714 for (rank, &changed) in changed_at_rank.iter().enumerate() {
715 let ratio = changed as f64 / num_keys as f64;
716 assert!(
717 ratio < max_disruption[rank],
718 "rank {rank} disruption {ratio:.3} exceeds {:.2} — \
719 preference list may be fully scrambled on node addition",
720 max_disruption[rank]
721 );
722 }
723
724 let mut total_surviving = 0usize;
727 for i in 0..num_keys {
728 let key = format!("key-{i}");
729 let old_set: HashSet<&String> = t3.get_top_k(&key, 3).into_iter().collect();
730 let new_top3 = t4.get_top_k(&key, 3);
731 total_surviving += new_top3.iter().filter(|n| old_set.contains(*n)).count();
732 }
733 let avg_surviving = total_surviving as f64 / num_keys as f64;
734 assert!(
735 avg_surviving > 2.2,
736 "average surviving nodes in top-3 is {avg_surviving:.2}, \
737 expected > 2.2 (only ~1 slot should be taken by new node)"
738 );
739 }
740
741 #[test]
747 fn test_cross_process_agreement() {
748 let build = || {
750 let nodes: Vec<String> = vec![
751 "us-east-1a",
752 "us-west-2b",
753 "eu-west-1c",
754 "ap-south-1a",
755 "sa-east-1a",
756 ]
757 .into_iter()
758 .map(String::from)
759 .collect();
760 MaglevTable::with_capacity(nodes, 5003)
761 };
762
763 let t1 = build();
764 let t2 = build();
765
766 for i in 0..10_000 {
768 let key = format!("request-{i}");
769 assert_eq!(
770 t1.get(&key),
771 t2.get(&key),
772 "tables disagree on key {key} — seeds may be non-deterministic"
773 );
774 assert_eq!(
775 t1.get_top_k(&key, 5),
776 t2.get_top_k(&key, 5),
777 "preference lists disagree on key {key}"
778 );
779 }
780 }
781}