1use std::{cell::RefCell, fmt::Debug, hash::Hasher, net::SocketAddr, rc::Rc};
2
3use rand::{
4 RngExt,
5 distr::{Distribution, weighted::WeightedIndex},
6 prelude::IndexedRandom,
7 rng,
8};
9
10use crate::{backends::Backend, sozu_command::proto::command::LoadMetric};
11
12const DEFAULT_WEIGHT: i32 = 100;
16
17pub const DEFAULT_HASH_SEED: u64 = 0x9E37_79B9_7F4A_7C15;
22
23fn backend_weight(backend: &Backend) -> u32 {
27 let weight = backend
28 .load_balancing_parameters
29 .as_ref()
30 .map(|p| p.weight)
31 .unwrap_or(DEFAULT_WEIGHT)
32 .max(1) as u32;
33 debug_assert!(weight >= 1, "backend weight must be clamped to at least 1");
37 weight
38}
39
40fn hash_backend(seed: u64, key: u64, addr: &SocketAddr) -> u64 {
53 let mut h = FnvHasher::with_seed(seed);
54 h.write_u64(key);
55 match addr {
56 SocketAddr::V4(v4) => {
57 h.write_u8(4);
58 h.write(&v4.ip().octets());
59 h.write_u16(v4.port());
60 }
61 SocketAddr::V6(v6) => {
62 h.write_u8(6);
63 h.write(&v6.ip().octets());
64 h.write_u16(v6.port());
65 }
66 }
67 splitmix64_finalize(h.finish())
72}
73
74fn splitmix64_finalize(mut z: u64) -> u64 {
78 z = (z ^ (z >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9);
79 z = (z ^ (z >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB);
80 z ^ (z >> 31)
81}
82
83fn next_prime(n: usize) -> usize {
89 let mut candidate = n.max(2);
90 while !is_prime(candidate) {
91 candidate += 1;
92 }
93 debug_assert!(is_prime(candidate), "next_prime must return a prime");
96 debug_assert!(candidate >= 2, "next_prime must return at least 2");
97 debug_assert!(
98 candidate >= n,
99 "next_prime ({candidate}) must be >= the requested floor ({n})"
100 );
101 candidate
102}
103
104fn is_prime(x: usize) -> bool {
108 if x < 2 {
109 return false;
110 }
111 if x % 2 == 0 {
112 return x == 2;
113 }
114 let mut d = 3;
115 while d * d <= x {
116 if x % d == 0 {
117 return false;
118 }
119 d += 2;
120 }
121 true
122}
123
124struct FnvHasher {
128 state: u64,
129}
130
131impl FnvHasher {
132 const PRIME: u64 = 0x0000_0100_0000_01B3;
133 const OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
134
135 fn with_seed(seed: u64) -> Self {
136 Self {
137 state: Self::OFFSET ^ seed,
138 }
139 }
140}
141
142impl Hasher for FnvHasher {
143 fn finish(&self) -> u64 {
144 self.state
145 }
146
147 fn write(&mut self, bytes: &[u8]) {
148 for &b in bytes {
149 self.state ^= u64::from(b);
150 self.state = self.state.wrapping_mul(Self::PRIME);
151 }
152 }
153}
154
155pub trait LoadBalancingAlgorithm: Debug {
156 fn next_available_backend(
163 &mut self,
164 key: Option<u64>,
165 backends: &mut Vec<Rc<RefCell<Backend>>>,
166 ) -> Option<Rc<RefCell<Backend>>>;
167
168 fn rebuild(&mut self, _backends: &[Rc<RefCell<Backend>>]) {}
173}
174
175#[derive(Debug)]
176pub struct RoundRobin {
177 pub next_backend: u32,
178}
179
180impl LoadBalancingAlgorithm for RoundRobin {
181 fn next_available_backend(
182 &mut self,
183 _key: Option<u64>,
184 backends: &mut Vec<Rc<RefCell<Backend>>>,
185 ) -> Option<Rc<RefCell<Backend>>> {
186 if backends.is_empty() {
190 return None;
191 }
192 debug_assert!(
193 !backends.is_empty(),
194 "round-robin index math runs only on a non-empty set"
195 );
196
197 let index = self.next_backend as usize % backends.len();
198 debug_assert!(index < backends.len(), "round-robin index out of bounds");
201 let res = backends.get(index).map(|backend| (*backend).clone());
202 debug_assert!(
203 res.is_some(),
204 "round-robin must select a backend from a non-empty set"
205 );
206
207 self.next_backend = (self.next_backend + 1) % backends.len() as u32;
208 debug_assert!(
211 (self.next_backend as usize) < backends.len(),
212 "round-robin cursor must stay within bounds"
213 );
214 res
215 }
216}
217
218impl Default for RoundRobin {
219 fn default() -> Self {
220 Self::new()
221 }
222}
223
224impl RoundRobin {
225 pub fn new() -> Self {
226 Self { next_backend: 0 }
227 }
228}
229
230#[derive(Debug)]
231pub struct Random;
232
233impl LoadBalancingAlgorithm for Random {
234 fn next_available_backend(
235 &mut self,
236 _key: Option<u64>,
237 backends: &mut Vec<Rc<RefCell<Backend>>>,
238 ) -> Option<Rc<RefCell<Backend>>> {
239 let mut rng = rng();
240 let len = backends.len();
241 let weights: Vec<i32> = backends
242 .iter()
243 .map(|b| {
244 b.borrow()
245 .load_balancing_parameters
246 .as_ref()
247 .map(|p| p.weight)
248 .unwrap_or(100)
249 })
250 .collect();
251 debug_assert_eq!(
254 weights.len(),
255 len,
256 "Random must derive exactly one weight per backend"
257 );
258
259 if let Ok(dist) = WeightedIndex::new(weights) {
260 let index = dist.sample(&mut rng);
261 debug_assert!(index < len, "Random sampled an out-of-range index");
264 backends.get(index).cloned()
265 } else {
266 let chosen = (*backends)
269 .choose(&mut rng)
270 .map(|backend| (*backend).clone());
271 debug_assert_eq!(
272 chosen.is_some(),
273 len > 0,
274 "Random fallback selects iff the set is non-empty"
275 );
276 chosen
277 }
278 }
279}
280
281#[derive(Debug)]
282pub struct LeastLoaded {
283 pub metric: LoadMetric,
284}
285
286impl LoadBalancingAlgorithm for LeastLoaded {
287 fn next_available_backend(
288 &mut self,
289 _key: Option<u64>,
290 backends: &mut Vec<Rc<RefCell<Backend>>>,
291 ) -> Option<Rc<RefCell<Backend>>> {
292 let was_empty = backends.is_empty();
293 let opt_b = match self.metric {
294 LoadMetric::Connections => backends
295 .iter_mut()
296 .min_by_key(|backend| backend.borrow().active_connections),
297 LoadMetric::Requests => backends
298 .iter_mut()
299 .min_by_key(|backend| backend.borrow().active_requests),
300 LoadMetric::ConnectionTime => {
301 let mut b = None;
302 for backend in backends.iter_mut() {
303 let cost2 = backend.borrow_mut().peak_ewma_connection();
304
305 match b.take() {
306 None => b = Some((cost2, backend)),
307 Some((cost1, back1)) => {
308 if cost1 <= cost2 {
309 b = Some((cost1, back1));
310 } else {
311 b = Some((cost2, backend));
312 }
313 }
314 }
315 }
316
317 b.map(|(_cost, backend)| backend)
318 }
319 };
320 debug_assert_eq!(
324 opt_b.is_some(),
325 !was_empty,
326 "LeastLoaded selects iff the candidate set is non-empty"
327 );
328 opt_b.map(|backend| (*backend).clone())
329 }
330}
331
332#[derive(Debug)]
333pub struct PowerOfTwo {
334 pub metric: LoadMetric,
335}
336
337impl LoadBalancingAlgorithm for PowerOfTwo {
338 fn next_available_backend(
339 &mut self,
340 _key: Option<u64>,
341 backends: &mut Vec<Rc<RefCell<Backend>>>,
342 ) -> Option<Rc<RefCell<Backend>>> {
343 let len = backends.len();
344 let mut first = None;
345 let mut second = None;
346
347 for backend in backends.iter_mut() {
348 let measure = match self.metric {
349 LoadMetric::Connections => backend.borrow().active_connections as f64,
350 LoadMetric::Requests => backend.borrow().active_requests as f64,
351 LoadMetric::ConnectionTime => backend.borrow_mut().peak_ewma_connection(),
352 };
353
354 if first.is_none() {
355 first = Some((measure, backend));
356 } else if second.is_none() {
357 if first.as_ref().unwrap().0 <= measure {
358 second = Some((measure, backend));
359 } else {
360 second = first.take();
361 first = Some((measure, backend));
362 }
363 } else if first.as_ref().unwrap().0 <= measure && measure < second.as_ref().unwrap().0 {
364 second = Some((measure, backend));
365 } else {
367 second = first.take();
368 first = Some((measure, backend));
369 }
370 }
371
372 debug_assert!(
377 match (&first, &second) {
378 (Some((f, _)), Some((s, _))) => f <= s,
379 _ => true,
380 },
381 "power-of-two: first candidate must be no heavier than second"
382 );
383 debug_assert_eq!(
384 first.is_some(),
385 len > 0,
386 "power-of-two must hold a primary candidate iff the set is non-empty"
387 );
388 debug_assert_eq!(
389 second.is_some(),
390 len > 1,
391 "power-of-two holds a second candidate iff the set has >= 2 backends"
392 );
393
394 match (first, second) {
395 (None, None) => None,
396 (Some((_, b)), None) => Some(b.clone()),
397 (None, Some((_, b))) => Some(b.clone()),
399 (Some((_, b1)), Some((_, b2))) => {
400 if rng().random_bool(0.5) {
401 Some(b1.clone())
402 } else {
403 Some(b2.clone())
404 }
405 }
406 }
407 }
408}
409
410#[derive(Debug)]
431pub struct Rendezvous {
432 seed: u64,
434 round_robin: RoundRobin,
436}
437
438impl Default for Rendezvous {
439 fn default() -> Self {
440 Self::new()
441 }
442}
443
444impl Rendezvous {
445 pub fn new() -> Self {
446 Self::with_seed(DEFAULT_HASH_SEED)
447 }
448
449 pub fn with_seed(seed: u64) -> Self {
450 Self {
451 seed,
452 round_robin: RoundRobin::new(),
453 }
454 }
455
456 fn score(&self, key: u64, backend: &Backend) -> f64 {
459 let weight = backend_weight(backend) as f64;
460 let h = hash_backend(self.seed, key, &backend.address);
463 let unit = (h as f64 + 0.5) / (u64::MAX as f64 + 1.0);
465 debug_assert!(
469 unit > 0.0 && unit < 1.0,
470 "HRW unit must lie strictly inside (0, 1), got {unit}"
471 );
472 let score = -weight / unit.ln();
473 debug_assert!(
474 score.is_finite() && score > 0.0,
475 "HRW score must be finite and positive, got {score}"
476 );
477 score
478 }
479}
480
481impl LoadBalancingAlgorithm for Rendezvous {
482 fn next_available_backend(
483 &mut self,
484 key: Option<u64>,
485 backends: &mut Vec<Rc<RefCell<Backend>>>,
486 ) -> Option<Rc<RefCell<Backend>>> {
487 let Some(key) = key else {
488 return self.round_robin.next_available_backend(None, backends);
490 };
491
492 if backends.is_empty() {
493 return None;
494 }
495
496 let mut best: Option<(f64, &Rc<RefCell<Backend>>)> = None;
497 for backend in backends.iter() {
498 let score = self.score(key, &backend.borrow());
499 match best {
500 Some((best_score, _)) if best_score >= score => {}
501 _ => best = Some((score, backend)),
502 }
503 }
504 debug_assert!(
507 best.is_some(),
508 "HRW must select a winner from a non-empty set"
509 );
510 #[cfg(debug_assertions)]
511 if let Some((best_score, _)) = best {
512 for backend in backends.iter() {
513 debug_assert!(
514 best_score >= self.score(key, &backend.borrow()),
515 "HRW winner does not maximize the score over the set"
516 );
517 }
518 }
519 best.map(|(_, backend)| backend.clone())
520 }
521}
522
523#[derive(Debug)]
551pub struct Maglev {
552 seed: u64,
553 size: usize,
555 table: Vec<usize>,
558 backend_addrs: Vec<SocketAddr>,
562 round_robin: RoundRobin,
564}
565
566impl Default for Maglev {
567 fn default() -> Self {
568 Self::new()
569 }
570}
571
572impl Maglev {
573 pub const DEFAULT_TABLE_SIZE: usize = 65537;
577
578 pub fn new() -> Self {
579 Self::with_seed(DEFAULT_HASH_SEED)
580 }
581
582 pub fn with_seed(seed: u64) -> Self {
583 Self::with_seed_and_size(seed, Self::DEFAULT_TABLE_SIZE)
584 }
585
586 pub fn with_seed_and_size(seed: u64, size: usize) -> Self {
587 Self {
588 seed,
589 size: next_prime(size.max(2)),
596 table: Vec::new(),
597 backend_addrs: Vec::new(),
598 round_robin: RoundRobin::new(),
599 }
600 }
601
602 pub fn rebuild(&mut self, backends: &[Rc<RefCell<Backend>>]) {
605 let n = backends.len();
606 self.backend_addrs.clear();
607 self.table.clear();
608 if n == 0 || self.size == 0 {
609 return;
610 }
611
612 let m = self.size;
613
614 let mut offsets = Vec::with_capacity(n);
616 let mut skips = Vec::with_capacity(n);
617 let mut weights = Vec::with_capacity(n);
618 let mut total_weight: u64 = 0;
619 for backend in backends {
620 let b = backend.borrow();
621 let addr = b.address;
622 self.backend_addrs.push(addr);
623 let h1 = hash_backend(self.seed, 0x6F66_6673_6574, &addr); let h2 = hash_backend(self.seed, 0x736B_6970_5F5F, &addr); let offset = (h1 % m as u64) as usize;
629 let skip = (h2 % (m as u64 - 1)) as usize + 1;
630 debug_assert!(offset < m, "Maglev offset must be a valid slot");
634 debug_assert!(
635 (1..m).contains(&skip),
636 "Maglev skip must lie in [1, m-1] to stay coprime with the prime table"
637 );
638 offsets.push(offset);
639 skips.push(skip);
640 let w = backend_weight(&b) as u64;
641 weights.push(w);
642 total_weight += w;
643 }
644 debug_assert!(
647 total_weight > 0,
648 "Maglev total weight must be positive for a non-empty backend set"
649 );
650
651 let mut targets = vec![0usize; n];
654 let mut assigned = 0usize;
655 for (i, &w) in weights.iter().enumerate() {
656 let t = ((w as u128 * m as u128) / total_weight as u128) as usize;
657 targets[i] = t;
658 assigned += t;
659 }
660 let mut i = 0;
662 while assigned < m {
663 targets[i % n] += 1;
664 assigned += 1;
665 i += 1;
666 }
667 debug_assert_eq!(
672 assigned, m,
673 "Maglev target budget must sum to the table size"
674 );
675 debug_assert_eq!(
676 targets.iter().sum::<usize>(),
677 m,
678 "Maglev per-backend targets must sum to the table size"
679 );
680
681 let mut table = vec![usize::MAX; m];
683 let mut next = vec![0usize; n];
684 let mut filled = vec![0usize; n];
685 let mut count = 0usize;
686 while count < m {
687 for b in 0..n {
688 if filled[b] >= targets[b] {
689 continue;
690 }
691 let mut c = (offsets[b] + next[b] * skips[b]) % m;
693 while table[c] != usize::MAX {
694 next[b] += 1;
695 c = (offsets[b] + next[b] * skips[b]) % m;
696 }
697 table[c] = b;
698 next[b] += 1;
699 filled[b] += 1;
700 count += 1;
701 if count >= m {
702 break;
703 }
704 }
705 }
706 debug_assert_eq!(count, m, "Maglev population loop must fill exactly m slots");
710 debug_assert!(
711 table.iter().all(|&slot| slot != usize::MAX),
712 "Maglev population loop left an unfilled slot"
713 );
714 debug_assert!(
715 filled == targets,
716 "Maglev filled counts must match the per-backend targets"
717 );
718
719 self.table = table;
720
721 #[cfg(debug_assertions)]
731 {
732 debug_assert!(
733 is_prime(self.size),
734 "Maglev table size {} is not prime",
735 self.size
736 );
737 if self.table.is_empty() {
738 debug_assert!(
739 self.backend_addrs.is_empty(),
740 "Maglev: empty table but non-empty backend_addrs"
741 );
742 } else {
743 debug_assert_eq!(
744 self.table.len(),
745 self.size,
746 "Maglev table must have exactly `size` slots"
747 );
748 debug_assert!(
749 !self.backend_addrs.is_empty(),
750 "Maglev: non-empty table but empty backend_addrs"
751 );
752 debug_assert!(
753 self.table.iter().all(|&idx| idx < self.backend_addrs.len()),
754 "Maglev table holds an index out of backend_addrs range"
755 );
756 }
757 }
758 }
759}
760
761impl LoadBalancingAlgorithm for Maglev {
762 fn next_available_backend(
763 &mut self,
764 key: Option<u64>,
765 backends: &mut Vec<Rc<RefCell<Backend>>>,
766 ) -> Option<Rc<RefCell<Backend>>> {
767 let Some(key) = key else {
768 return self.round_robin.next_available_backend(None, backends);
769 };
770
771 if backends.is_empty() {
772 return None;
773 }
774
775 if self.table.is_empty() {
783 self.rebuild(backends);
784 }
785
786 if self.table.is_empty() {
787 return None;
789 }
790
791 let start = (key % self.size as u64) as usize;
798 debug_assert!(start < self.size, "Maglev start slot out of range");
801 debug_assert_eq!(
802 self.table.len(),
803 self.size,
804 "Maglev lookup on a table whose length != size"
805 );
806 for i in 0..self.size {
807 let slot = (start + i) % self.size;
808 debug_assert!(slot < self.size, "Maglev probe slot out of range");
812 let idx = self.table[slot];
813 debug_assert!(
814 idx < self.backend_addrs.len(),
815 "Maglev table entry indexes outside the captured address set"
816 );
817 if let Some(addr) = self.backend_addrs.get(idx) {
820 if let Some(backend) = backends.iter().find(|b| b.borrow().address == *addr) {
821 debug_assert!(
824 backends
825 .iter()
826 .any(|b| b.borrow().address == backend.borrow().address),
827 "Maglev must return a backend from the healthy subset"
828 );
829 return Some(backend.clone());
830 }
831 }
832 }
833
834 self.round_robin.next_available_backend(None, backends)
838 }
839
840 fn rebuild(&mut self, backends: &[Rc<RefCell<Backend>>]) {
841 Maglev::rebuild(self, backends);
842 }
843}
844
845#[cfg(test)]
846mod test {
847 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
848
849 use super::*;
850 use crate::{
851 PeakEWMA,
852 backends::{BackendStatus, HealthState},
853 retry::{ExponentialBackoffPolicy, RetryPolicyWrapper},
854 sozu_command::proto::command::{LoadBalancingParams, LoadMetric},
855 };
856
857 fn create_backend(id: String, connections: Option<usize>) -> Backend {
858 Backend {
859 sticky_id: None,
860 backend_id: id,
861 address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
862 status: BackendStatus::Normal,
863 retry_policy: RetryPolicyWrapper::ExponentialBackoff(ExponentialBackoffPolicy::new(1)),
864 active_connections: connections.unwrap_or(0),
865 active_requests: 0,
866 failures: 0,
867 load_balancing_parameters: None,
868 backup: false,
869 connection_time: PeakEWMA::new(),
870 health: HealthState::default(),
871 }
872 }
873
874 #[test]
875 fn it_should_find_the_backend_with_least_connections() {
876 let backend_with_least_connection =
877 Rc::new(RefCell::new(create_backend("yolo".to_string(), Some(1))));
878
879 let mut backends = vec![
880 Rc::new(RefCell::new(create_backend("nolo".to_string(), Some(10)))),
881 Rc::new(RefCell::new(create_backend("philo".to_string(), Some(20)))),
882 backend_with_least_connection.clone(),
883 ];
884
885 let mut least_connection_algorithm = LeastLoaded {
886 metric: LoadMetric::Connections,
887 };
888
889 let backend_res = least_connection_algorithm
890 .next_available_backend(None, &mut backends)
891 .unwrap();
892 let backend = backend_res.borrow();
893
894 assert!(*backend == *backend_with_least_connection.borrow());
895 }
896
897 #[test]
898 fn it_shouldnt_find_backend_with_least_connections_when_list_is_empty() {
899 let mut backends = vec![];
900
901 let mut least_connection_algorithm = LeastLoaded {
902 metric: LoadMetric::Connections,
903 };
904
905 let backend = least_connection_algorithm.next_available_backend(None, &mut backends);
906 assert!(backend.is_none());
907 }
908
909 #[test]
910 fn it_should_find_backend_with_roundrobin_when_some_backends_were_removed() {
911 let mut backends = vec![
912 Rc::new(RefCell::new(create_backend("toto".to_string(), None))),
913 Rc::new(RefCell::new(create_backend("voto".to_string(), None))),
914 Rc::new(RefCell::new(create_backend("yoto".to_string(), None))),
915 ];
916
917 let mut roundrobin = RoundRobin { next_backend: 1 };
918 let backend = roundrobin.next_available_backend(None, &mut backends);
919 assert_eq!(backend.as_ref(), backends.get(1));
920
921 backends.remove(1);
922
923 let backend2 = roundrobin.next_available_backend(None, &mut backends);
924 assert_eq!(backend2.as_ref(), backends.first());
925 }
926
927 fn addr_backend(id: &str, last_octet: u8, port: u16, weight: Option<i32>) -> Backend {
932 let mut b = create_backend(id.to_string(), None);
933 b.address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, last_octet)), port);
934 b.load_balancing_parameters = weight.map(|weight| LoadBalancingParams { weight });
935 b
936 }
937
938 fn rc(b: Backend) -> Rc<RefCell<Backend>> {
939 Rc::new(RefCell::new(b))
940 }
941
942 fn make_backends(n: u8) -> Vec<Rc<RefCell<Backend>>> {
943 (0..n)
944 .map(|i| rc(addr_backend(&format!("b{i}"), i + 1, 8000 + i as u16, None)))
945 .collect()
946 }
947
948 fn chosen_addr(b: &Rc<RefCell<Backend>>) -> SocketAddr {
949 b.borrow().address
950 }
951
952 #[test]
953 fn hrw_is_deterministic_for_a_fixed_key() {
954 let mut backends = make_backends(5);
955 let mut hrw = Rendezvous::new();
956
957 let first = hrw
958 .next_available_backend(Some(42), &mut backends)
959 .map(|b| chosen_addr(&b));
960 for _ in 0..50 {
961 let again = hrw
962 .next_available_backend(Some(42), &mut backends)
963 .map(|b| chosen_addr(&b));
964 assert_eq!(first, again, "HRW must be deterministic for a fixed key");
965 }
966 }
967
968 #[test]
969 fn hrw_none_key_falls_back_to_round_robin() {
970 let mut backends = make_backends(3);
971 let mut hrw = Rendezvous::new();
972
973 let a = chosen_addr(&hrw.next_available_backend(None, &mut backends).unwrap());
975 let b = chosen_addr(&hrw.next_available_backend(None, &mut backends).unwrap());
976 let c = chosen_addr(&hrw.next_available_backend(None, &mut backends).unwrap());
977 let d = chosen_addr(&hrw.next_available_backend(None, &mut backends).unwrap());
978 assert_eq!(a, chosen_addr(&backends[0]));
979 assert_eq!(b, chosen_addr(&backends[1]));
980 assert_eq!(c, chosen_addr(&backends[2]));
981 assert_eq!(d, a, "round-robin should wrap around");
982 }
983
984 #[test]
985 fn hrw_minimal_disruption_when_removing_a_non_winner() {
986 let mut backends = make_backends(6);
989 let mut hrw = Rendezvous::new();
990
991 let removed_addr = chosen_addr(&backends[3]);
993
994 let mut before = std::collections::HashMap::new();
996 for key in 0..2000u64 {
997 let w = chosen_addr(
998 &hrw.next_available_backend(Some(key), &mut backends)
999 .unwrap(),
1000 );
1001 before.insert(key, w);
1002 }
1003
1004 backends.remove(3);
1006 for key in 0..2000u64 {
1007 let after = chosen_addr(
1008 &hrw.next_available_backend(Some(key), &mut backends)
1009 .unwrap(),
1010 );
1011 let prev = before[&key];
1012 if prev != removed_addr {
1013 assert_eq!(
1014 prev, after,
1015 "removing a non-winner changed the choice for key {key}"
1016 );
1017 }
1018 }
1019 }
1020
1021 #[test]
1022 fn hrw_distribution_is_roughly_even() {
1023 let n = 5u8;
1024 let mut backends = make_backends(n);
1025 let mut hrw = Rendezvous::new();
1026
1027 let total = 20_000u64;
1028 let mut counts: std::collections::HashMap<SocketAddr, u64> =
1029 std::collections::HashMap::new();
1030 for key in 0..total {
1031 let w = chosen_addr(
1032 &hrw.next_available_backend(Some(key), &mut backends)
1033 .unwrap(),
1034 );
1035 *counts.entry(w).or_default() += 1;
1036 }
1037
1038 let expected = total / n as u64;
1039 for b in &backends {
1040 let c = counts.get(&chosen_addr(b)).copied().unwrap_or(0);
1041 assert!(
1043 c > expected * 65 / 100 && c < expected * 135 / 100,
1044 "HRW distribution skewed: backend got {c}, expected ~{expected}"
1045 );
1046 }
1047 }
1048
1049 #[test]
1050 fn maglev_is_deterministic_for_a_fixed_key() {
1051 let backends = make_backends(7);
1052 let mut mag = Maglev::new();
1053 mag.rebuild(&backends);
1054
1055 let mut sel = backends.clone();
1056 let first = chosen_addr(&mag.next_available_backend(Some(12345), &mut sel).unwrap());
1057 for _ in 0..50 {
1058 let again = chosen_addr(&mag.next_available_backend(Some(12345), &mut sel).unwrap());
1059 assert_eq!(first, again, "Maglev must be deterministic for a fixed key");
1060 }
1061 }
1062
1063 #[test]
1064 fn maglev_none_key_falls_back_to_round_robin() {
1065 let mut backends = make_backends(3);
1066 let mut mag = Maglev::new();
1067 mag.rebuild(&backends);
1068
1069 let a = chosen_addr(&mag.next_available_backend(None, &mut backends).unwrap());
1070 let b = chosen_addr(&mag.next_available_backend(None, &mut backends).unwrap());
1071 let c = chosen_addr(&mag.next_available_backend(None, &mut backends).unwrap());
1072 let d = chosen_addr(&mag.next_available_backend(None, &mut backends).unwrap());
1073 assert_eq!(a, chosen_addr(&backends[0]));
1074 assert_eq!(b, chosen_addr(&backends[1]));
1075 assert_eq!(c, chosen_addr(&backends[2]));
1076 assert_eq!(d, a);
1077 }
1078
1079 #[test]
1080 fn maglev_distribution_is_roughly_even() {
1081 let n = 5u8;
1082 let backends = make_backends(n);
1083 let mut mag = Maglev::with_seed_and_size(DEFAULT_HASH_SEED, 1009);
1085 mag.rebuild(&backends);
1086
1087 let total = 50_000u64;
1088 let mut counts: std::collections::HashMap<SocketAddr, u64> =
1089 std::collections::HashMap::new();
1090 let mut sel = backends.clone();
1091 for key in 0..total {
1092 let w = chosen_addr(&mag.next_available_backend(Some(key), &mut sel).unwrap());
1093 *counts.entry(w).or_default() += 1;
1094 }
1095
1096 let expected = total / n as u64;
1097 for b in &backends {
1098 let c = counts.get(&chosen_addr(b)).copied().unwrap_or(0);
1099 assert!(
1101 c > expected * 85 / 100 && c < expected * 115 / 100,
1102 "Maglev distribution skewed: backend got {c}, expected ~{expected}"
1103 );
1104 }
1105 }
1106
1107 #[test]
1108 fn maglev_table_rebuild_keeps_most_keys_stable() {
1109 let backends5 = make_backends(5);
1112 let mut mag = Maglev::with_seed_and_size(DEFAULT_HASH_SEED, 1009);
1113 mag.rebuild(&backends5);
1114
1115 let total = 20_000u64;
1116 let mut before = std::collections::HashMap::new();
1117 let mut sel = backends5.clone();
1118 for key in 0..total {
1119 before.insert(
1120 key,
1121 chosen_addr(&mag.next_available_backend(Some(key), &mut sel).unwrap()),
1122 );
1123 }
1124
1125 let mut backends6 = backends5.clone();
1127 backends6.push(rc(addr_backend("b5", 6, 8005, None)));
1128 mag.rebuild(&backends6);
1129
1130 let mut moved = 0u64;
1131 let mut sel6 = backends6.clone();
1132 for key in 0..total {
1133 let after = chosen_addr(&mag.next_available_backend(Some(key), &mut sel6).unwrap());
1134 if after != before[&key] {
1135 moved += 1;
1136 }
1137 }
1138
1139 assert!(
1141 moved < total / 2,
1142 "Maglev rebuild moved too many keys: {moved}/{total}"
1143 );
1144 }
1145
1146 #[test]
1147 fn maglev_partial_outage_does_not_rebuild_and_stays_stable() {
1148 let full = make_backends(5);
1154 let mut mag = Maglev::with_seed_and_size(DEFAULT_HASH_SEED, 1009);
1155 mag.rebuild(&full);
1156
1157 let table_before = mag.table.clone();
1159 let addrs_before = mag.backend_addrs.clone();
1160 assert_eq!(addrs_before.len(), 5, "table built from the full set");
1161
1162 let total = 4000u64;
1164 let mut before = std::collections::HashMap::new();
1165 let mut sel_full = full.clone();
1166 for key in 0..total {
1167 before.insert(
1168 key,
1169 chosen_addr(
1170 &mag.next_available_backend(Some(key), &mut sel_full)
1171 .unwrap(),
1172 ),
1173 );
1174 }
1175
1176 let unhealthy_addr = chosen_addr(&full[2]);
1181 let mut subset: Vec<_> = full
1182 .iter()
1183 .filter(|b| chosen_addr(b) != unhealthy_addr)
1184 .cloned()
1185 .collect();
1186
1187 for key in 0..total {
1188 let after = chosen_addr(&mag.next_available_backend(Some(key), &mut subset).unwrap());
1189 assert_ne!(
1191 after, unhealthy_addr,
1192 "selection returned the unhealthy backend for key {key}"
1193 );
1194 if before[&key] != unhealthy_addr {
1198 assert_eq!(
1199 before[&key], after,
1200 "a healthy key moved during a partial outage (key {key})"
1201 );
1202 }
1203 }
1204
1205 assert_eq!(
1208 mag.table, table_before,
1209 "partial outage must not rebuild the Maglev table"
1210 );
1211 assert_eq!(
1212 mag.backend_addrs, addrs_before,
1213 "partial outage must not change the captured backend set"
1214 );
1215 }
1216
1217 #[test]
1218 fn maglev_all_table_backends_unhealthy_falls_back_to_round_robin() {
1219 let table_set = make_backends(3);
1223 let mut mag = Maglev::with_seed_and_size(DEFAULT_HASH_SEED, 1009);
1224 mag.rebuild(&table_set);
1225 let table_before = mag.table.clone();
1226
1227 let mut fresh = vec![
1229 rc(addr_backend("n0", 50, 9000, None)),
1230 rc(addr_backend("n1", 51, 9001, None)),
1231 ];
1232 let fresh_addrs: Vec<_> = fresh.iter().map(chosen_addr).collect();
1233
1234 let picked = chosen_addr(&mag.next_available_backend(Some(7), &mut fresh).unwrap());
1235 assert!(
1236 fresh_addrs.contains(&picked),
1237 "fallback must route to a backend in the healthy subset"
1238 );
1239 assert_eq!(
1241 mag.table, table_before,
1242 "fallback must not rebuild the table"
1243 );
1244 }
1245
1246 #[test]
1247 fn maglev_cold_start_builds_table_once() {
1248 let mut backends = make_backends(4);
1251 let mut mag = Maglev::with_seed_and_size(DEFAULT_HASH_SEED, 1009);
1252 assert!(mag.table.is_empty(), "table starts empty (cold)");
1253
1254 let _ = mag.next_available_backend(Some(99), &mut backends).unwrap();
1255 assert_eq!(mag.table.len(), mag.size, "cold start populated the table");
1256 let table_after_cold = mag.table.clone();
1257
1258 let _ = mag
1260 .next_available_backend(Some(100), &mut backends)
1261 .unwrap();
1262 assert_eq!(
1263 mag.table, table_after_cold,
1264 "selection after cold start must not rebuild"
1265 );
1266 }
1267
1268 #[test]
1269 fn round_robin_empty_set_returns_none_without_panic() {
1270 let mut empty: Vec<Rc<RefCell<Backend>>> = vec![];
1272 let mut rr = RoundRobin::new();
1273 assert!(rr.next_available_backend(None, &mut empty).is_none());
1274
1275 let mut hrw = Rendezvous::new();
1278 assert!(hrw.next_available_backend(None, &mut empty).is_none());
1279 let mut mag = Maglev::new();
1280 assert!(mag.next_available_backend(None, &mut empty).is_none());
1281 }
1282
1283 #[test]
1284 fn maglev_table_size_one_is_clamped_to_a_prime() {
1285 let mut mag = Maglev::with_seed_and_size(DEFAULT_HASH_SEED, 1);
1288 assert!(
1289 mag.size >= 2,
1290 "size must be clamped to >= 2, got {}",
1291 mag.size
1292 );
1293
1294 let mut backends = make_backends(3);
1295 mag.rebuild(&backends);
1297 assert_eq!(mag.table.len(), mag.size);
1298 let _ = mag.next_available_backend(Some(1), &mut backends).unwrap();
1299 }
1300
1301 #[test]
1302 fn next_prime_picks_the_smallest_prime_at_least_n() {
1303 assert_eq!(next_prime(0), 2);
1304 assert_eq!(next_prime(1), 2);
1305 assert_eq!(next_prime(2), 2);
1306 assert_eq!(next_prime(3), 3);
1307 assert_eq!(next_prime(4), 5);
1308 assert_eq!(next_prime(1009), 1009); assert_eq!(next_prime(65537), 65537); }
1311
1312 #[test]
1313 fn maglev_honors_weight() {
1314 let backends = vec![
1316 rc(addr_backend("light", 1, 8001, Some(100))),
1317 rc(addr_backend("heavy", 2, 8002, Some(400))),
1318 ];
1319 let mut mag = Maglev::with_seed_and_size(DEFAULT_HASH_SEED, 1009);
1320 mag.rebuild(&backends);
1321
1322 let heavy_addr = chosen_addr(&backends[1]);
1323 let total = 20_000u64;
1324 let mut heavy = 0u64;
1325 let mut sel = backends.clone();
1326 for key in 0..total {
1327 if chosen_addr(&mag.next_available_backend(Some(key), &mut sel).unwrap()) == heavy_addr
1328 {
1329 heavy += 1;
1330 }
1331 }
1332 assert!(
1334 heavy > total * 70 / 100,
1335 "weighted Maglev did not favor the heavy backend: {heavy}/{total}"
1336 );
1337 }
1338}