Skip to main content

sozu_lib/
load_balancing.rs

1use std::{cell::RefCell, fmt::Debug, hash::Hasher, net::SocketAddr, rc::Rc};
2
3use rand::{
4    RngExt,
5    distr::{Distribution, weighted::WeightedIndex},
6    prelude::IndexedRandom,
7    rng,
8};
9
10use crate::{backends::Backend, sozu_command::proto::command::LoadMetric};
11
12/// Default weight applied when a backend declares no explicit
13/// `load_balancing_parameters.weight`. Mirrors the value used by the
14/// `Random` algorithm so weighting stays consistent across policies.
15const DEFAULT_WEIGHT: i32 = 100;
16
17/// Fixed seed used by the affinity hashers (HRW / Maglev). It must NOT be a
18/// `RandomState`: the selection has to be reproducible across workers and
19/// across process restarts so that every Sōzu instance routing the same flow
20/// key lands on the same backend. The constant is arbitrary but stable.
21pub const DEFAULT_HASH_SEED: u64 = 0x9E37_79B9_7F4A_7C15;
22
23/// Read a backend's weight, defaulting to [`DEFAULT_WEIGHT`] when unset. Weight
24/// is clamped to at least `1` so a `0`/negative configured weight never zeroes
25/// out a backend's share (which would make weighted hashing degenerate).
26fn backend_weight(backend: &Backend) -> u32 {
27    let weight = backend
28        .load_balancing_parameters
29        .as_ref()
30        .map(|p| p.weight)
31        .unwrap_or(DEFAULT_WEIGHT)
32        .max(1) as u32;
33    // The clamp guarantees every backend carries a strictly positive weight,
34    // otherwise weighted hashing (HRW score, Maglev slot share) would zero out
35    // the backend's share and skew or divide-by-zero the distribution.
36    debug_assert!(weight >= 1, "backend weight must be clamped to at least 1");
37    weight
38}
39
40/// Deterministic, seedable 64-bit hash over the backend's STABLE identifier.
41///
42/// We hash the backend **socket address** (`SocketAddr`) rather than the
43/// `backend_id` string: the address is the routing-stable identity (it is the
44/// key used by `BackendList::remove_backend` / `has_backend` and survives a
45/// reconfiguration that merely re-emits the same backend), whereas `backend_id`
46/// is a human label that the control plane may rename without changing where
47/// traffic actually goes. Hashing the address keeps HRW/Maglev placement stable
48/// across such cosmetic reconfigurations.
49///
50/// Uses `std::hash::SipHasher13` indirectly via a tiny FNV-1a construction with
51/// an injected seed — fully reproducible, never `RandomState`.
52fn hash_backend(seed: u64, key: u64, addr: &SocketAddr) -> u64 {
53    let mut h = FnvHasher::with_seed(seed);
54    h.write_u64(key);
55    match addr {
56        SocketAddr::V4(v4) => {
57            h.write_u8(4);
58            h.write(&v4.ip().octets());
59            h.write_u16(v4.port());
60        }
61        SocketAddr::V6(v6) => {
62            h.write_u8(6);
63            h.write(&v6.ip().octets());
64            h.write_u16(v6.port());
65        }
66    }
67    // FNV-1a alone has weak avalanche on structured input (the low bits barely
68    // mix), which skews rendezvous/Maglev distribution. Apply a splitmix64
69    // finalizer to scatter the bits before the consumers reduce mod M or feed
70    // it into the score function. Deterministic and seed-independent.
71    splitmix64_finalize(h.finish())
72}
73
74/// splitmix64 mixing step — a strong, reversible 64-bit avalanche finalizer.
75/// Used to scatter the FNV output so downstream `% M` / `ln(h)` reductions see
76/// well-distributed bits.
77fn splitmix64_finalize(mut z: u64) -> u64 {
78    z = (z ^ (z >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9);
79    z = (z ^ (z >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB);
80    z ^ (z >> 31)
81}
82
83/// Smallest prime `>= n`. Maglev requires a prime table size `M`: it keeps the
84/// permutation stride coprime with `M` so the population loop visits every
85/// slot, and `M >= 2` so `skip = h2 % (M - 1) + 1` never divides by zero.
86/// Runs once at construction (off the datapath); a trial-division check is more
87/// than fast enough for the small sizes Sōzu uses (default 65537).
88fn next_prime(n: usize) -> usize {
89    let mut candidate = n.max(2);
90    while !is_prime(candidate) {
91        candidate += 1;
92    }
93    // The result is the smallest prime not below `max(n, 2)`: it is prime, it
94    // is at least 2, and it never went below the requested floor.
95    debug_assert!(is_prime(candidate), "next_prime must return a prime");
96    debug_assert!(candidate >= 2, "next_prime must return at least 2");
97    debug_assert!(
98        candidate >= n,
99        "next_prime ({candidate}) must be >= the requested floor ({n})"
100    );
101    candidate
102}
103
104/// Trial-division primality test. Module-level (not nested in `next_prime`) so
105/// the Maglev `rebuild` post-condition can re-assert that the table size stayed
106/// the prime it was constructed with.
107fn is_prime(x: usize) -> bool {
108    if x < 2 {
109        return false;
110    }
111    if x % 2 == 0 {
112        return x == 2;
113    }
114    let mut d = 3;
115    while d * d <= x {
116        if x % d == 0 {
117            return false;
118        }
119        d += 2;
120    }
121    true
122}
123
124/// Minimal FNV-1a 64-bit hasher with a seeded offset basis. Deterministic and
125/// dependency-free — used for the affinity algorithms so results are
126/// reproducible across runs (unlike `std::collections::hash_map::RandomState`).
127struct FnvHasher {
128    state: u64,
129}
130
131impl FnvHasher {
132    const PRIME: u64 = 0x0000_0100_0000_01B3;
133    const OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
134
135    fn with_seed(seed: u64) -> Self {
136        Self {
137            state: Self::OFFSET ^ seed,
138        }
139    }
140}
141
142impl Hasher for FnvHasher {
143    fn finish(&self) -> u64 {
144        self.state
145    }
146
147    fn write(&mut self, bytes: &[u8]) {
148        for &b in bytes {
149            self.state ^= u64::from(b);
150            self.state = self.state.wrapping_mul(Self::PRIME);
151        }
152    }
153}
154
155pub trait LoadBalancingAlgorithm: Debug {
156    /// Select the next backend.
157    ///
158    /// `key` carries an optional affinity hash (e.g. a UDP flow key). The
159    /// stateless/round-robin policies ignore it; the consistent-hashing
160    /// policies ([`Rendezvous`], [`Maglev`]) use it to pin a key to a backend.
161    /// Passing `None` preserves the historical, key-agnostic behavior.
162    fn next_available_backend(
163        &mut self,
164        key: Option<u64>,
165        backends: &mut Vec<Rc<RefCell<Backend>>>,
166    ) -> Option<Rc<RefCell<Backend>>>;
167
168    /// Called by the control plane when the live backend set for a cluster
169    /// changes, so table-based policies (Maglev) can recompute their lookup
170    /// table off the datapath. The default is a no-op; only [`Maglev`]
171    /// overrides it.
172    fn rebuild(&mut self, _backends: &[Rc<RefCell<Backend>>]) {}
173}
174
175#[derive(Debug)]
176pub struct RoundRobin {
177    pub next_backend: u32,
178}
179
180impl LoadBalancingAlgorithm for RoundRobin {
181    fn next_available_backend(
182        &mut self,
183        _key: Option<u64>,
184        backends: &mut Vec<Rc<RefCell<Backend>>>,
185    ) -> Option<Rc<RefCell<Backend>>> {
186        // Guard against an empty set: `% backends.len()` would panic with a
187        // divide-by-zero. This also covers the `Rendezvous`/`Maglev` policies,
188        // which delegate here on `key == None`.
189        if backends.is_empty() {
190            return None;
191        }
192        debug_assert!(
193            !backends.is_empty(),
194            "round-robin index math runs only on a non-empty set"
195        );
196
197        let index = self.next_backend as usize % backends.len();
198        // The reduced index always addresses a real slot, so the lookup yields
199        // a backend (never the `None` arm of `get`).
200        debug_assert!(index < backends.len(), "round-robin index out of bounds");
201        let res = backends.get(index).map(|backend| (*backend).clone());
202        debug_assert!(
203            res.is_some(),
204            "round-robin must select a backend from a non-empty set"
205        );
206
207        self.next_backend = (self.next_backend + 1) % backends.len() as u32;
208        // The cursor stays a valid index into the current set after advancing,
209        // so the next call never starts out of range.
210        debug_assert!(
211            (self.next_backend as usize) < backends.len(),
212            "round-robin cursor must stay within bounds"
213        );
214        res
215    }
216}
217
218impl Default for RoundRobin {
219    fn default() -> Self {
220        Self::new()
221    }
222}
223
224impl RoundRobin {
225    pub fn new() -> Self {
226        Self { next_backend: 0 }
227    }
228}
229
230#[derive(Debug)]
231pub struct Random;
232
233impl LoadBalancingAlgorithm for Random {
234    fn next_available_backend(
235        &mut self,
236        _key: Option<u64>,
237        backends: &mut Vec<Rc<RefCell<Backend>>>,
238    ) -> Option<Rc<RefCell<Backend>>> {
239        let mut rng = rng();
240        let len = backends.len();
241        let weights: Vec<i32> = backends
242            .iter()
243            .map(|b| {
244                b.borrow()
245                    .load_balancing_parameters
246                    .as_ref()
247                    .map(|p| p.weight)
248                    .unwrap_or(100)
249            })
250            .collect();
251        // One weight per backend feeds the weighted distribution; a mismatch
252        // would make `dist.sample` index a backend that does not exist.
253        debug_assert_eq!(
254            weights.len(),
255            len,
256            "Random must derive exactly one weight per backend"
257        );
258
259        if let Ok(dist) = WeightedIndex::new(weights) {
260            let index = dist.sample(&mut rng);
261            // `WeightedIndex` only samples valid indices into the weight vector,
262            // which is the same length as `backends`, so the lookup hits.
263            debug_assert!(index < len, "Random sampled an out-of-range index");
264            backends.get(index).cloned()
265        } else {
266            // `WeightedIndex::new` fails only when the set is empty or every
267            // weight is zero; the uniform `choose` then selects iff non-empty.
268            let chosen = (*backends)
269                .choose(&mut rng)
270                .map(|backend| (*backend).clone());
271            debug_assert_eq!(
272                chosen.is_some(),
273                len > 0,
274                "Random fallback selects iff the set is non-empty"
275            );
276            chosen
277        }
278    }
279}
280
281#[derive(Debug)]
282pub struct LeastLoaded {
283    pub metric: LoadMetric,
284}
285
286impl LoadBalancingAlgorithm for LeastLoaded {
287    fn next_available_backend(
288        &mut self,
289        _key: Option<u64>,
290        backends: &mut Vec<Rc<RefCell<Backend>>>,
291    ) -> Option<Rc<RefCell<Backend>>> {
292        let was_empty = backends.is_empty();
293        let opt_b = match self.metric {
294            LoadMetric::Connections => backends
295                .iter_mut()
296                .min_by_key(|backend| backend.borrow().active_connections),
297            LoadMetric::Requests => backends
298                .iter_mut()
299                .min_by_key(|backend| backend.borrow().active_requests),
300            LoadMetric::ConnectionTime => {
301                let mut b = None;
302                for backend in backends.iter_mut() {
303                    let cost2 = backend.borrow_mut().peak_ewma_connection();
304
305                    match b.take() {
306                        None => b = Some((cost2, backend)),
307                        Some((cost1, back1)) => {
308                            if cost1 <= cost2 {
309                                b = Some((cost1, back1));
310                            } else {
311                                b = Some((cost2, backend));
312                            }
313                        }
314                    }
315                }
316
317                b.map(|(_cost, backend)| backend)
318            }
319        };
320        // Least-loaded over a non-empty set always finds a minimum; an empty
321        // set yields nothing. (`min_by_key` / the manual fold both have this
322        // shape.)
323        debug_assert_eq!(
324            opt_b.is_some(),
325            !was_empty,
326            "LeastLoaded selects iff the candidate set is non-empty"
327        );
328        opt_b.map(|backend| (*backend).clone())
329    }
330}
331
332#[derive(Debug)]
333pub struct PowerOfTwo {
334    pub metric: LoadMetric,
335}
336
337impl LoadBalancingAlgorithm for PowerOfTwo {
338    fn next_available_backend(
339        &mut self,
340        _key: Option<u64>,
341        backends: &mut Vec<Rc<RefCell<Backend>>>,
342    ) -> Option<Rc<RefCell<Backend>>> {
343        let len = backends.len();
344        let mut first = None;
345        let mut second = None;
346
347        for backend in backends.iter_mut() {
348            let measure = match self.metric {
349                LoadMetric::Connections => backend.borrow().active_connections as f64,
350                LoadMetric::Requests => backend.borrow().active_requests as f64,
351                LoadMetric::ConnectionTime => backend.borrow_mut().peak_ewma_connection(),
352            };
353
354            if first.is_none() {
355                first = Some((measure, backend));
356            } else if second.is_none() {
357                if first.as_ref().unwrap().0 <= measure {
358                    second = Some((measure, backend));
359                } else {
360                    second = first.take();
361                    first = Some((measure, backend));
362                }
363            } else if first.as_ref().unwrap().0 <= measure && measure < second.as_ref().unwrap().0 {
364                second = Some((measure, backend));
365                // other case: we don't change anything
366            } else {
367                second = first.take();
368                first = Some((measure, backend));
369            }
370        }
371
372        // `first` holds the lighter of the two tracked candidates and `second`
373        // the heavier — the running fold keeps `first.measure <= second.measure`
374        // — and the candidates populate in step with the set size (none for an
375        // empty set, only `first` for a singleton, both for >= 2 backends).
376        debug_assert!(
377            match (&first, &second) {
378                (Some((f, _)), Some((s, _))) => f <= s,
379                _ => true,
380            },
381            "power-of-two: first candidate must be no heavier than second"
382        );
383        debug_assert_eq!(
384            first.is_some(),
385            len > 0,
386            "power-of-two must hold a primary candidate iff the set is non-empty"
387        );
388        debug_assert_eq!(
389            second.is_some(),
390            len > 1,
391            "power-of-two holds a second candidate iff the set has >= 2 backends"
392        );
393
394        match (first, second) {
395            (None, None) => None,
396            (Some((_, b)), None) => Some(b.clone()),
397            // should not happen, but let's be exhaustive
398            (None, Some((_, b))) => Some(b.clone()),
399            (Some((_, b1)), Some((_, b2))) => {
400                if rng().random_bool(0.5) {
401                    Some(b1.clone())
402                } else {
403                    Some(b2.clone())
404                }
405            }
406        }
407    }
408}
409
410/// Weighted Rendezvous (Highest Random Weight) hashing.
411///
412/// For an affinity `key`, the chosen backend is the one maximizing a stable,
413/// per-(key, backend) score. With no key it degrades to plain round-robin.
414///
415/// # Weighting
416///
417/// We use the standard continuous weighted-rendezvous score
418/// (Schindelhauer–Schomaker / "logarithmic method"):
419///
420/// ```text
421///     score(key, backend) = -weight / ln(h)
422/// ```
423///
424/// where `h = hash64(seed, key, backend_addr) / 2^64 ∈ (0, 1)`. Because
425/// `ln(h) < 0`, the score is positive and grows with `weight`, so heavier
426/// backends win proportionally more keys while the choice stays deterministic
427/// and minimally disruptive: removing a non-winning backend cannot change the
428/// winner, and adding one only steals the keys for which it now scores highest.
429/// Selection is `O(N)` per key.
430#[derive(Debug)]
431pub struct Rendezvous {
432    /// Reproducible hash seed (NOT a `RandomState`).
433    seed: u64,
434    /// Round-robin cursor used for the `key == None` fallback.
435    round_robin: RoundRobin,
436}
437
438impl Default for Rendezvous {
439    fn default() -> Self {
440        Self::new()
441    }
442}
443
444impl Rendezvous {
445    pub fn new() -> Self {
446        Self::with_seed(DEFAULT_HASH_SEED)
447    }
448
449    pub fn with_seed(seed: u64) -> Self {
450        Self {
451            seed,
452            round_robin: RoundRobin::new(),
453        }
454    }
455
456    /// Continuous weighted-rendezvous score for a (key, backend) pair.
457    /// Larger is better. See the struct docs for the formula.
458    fn score(&self, key: u64, backend: &Backend) -> f64 {
459        let weight = backend_weight(backend) as f64;
460        // Map the 64-bit hash into the open interval (0, 1). Guard the
461        // endpoints so `ln` stays finite: 0 would give -inf, 1 would give 0.
462        let h = hash_backend(self.seed, key, &backend.address);
463        // (h + 0.5) / 2^64 keeps the value strictly inside (0, 1).
464        let unit = (h as f64 + 0.5) / (u64::MAX as f64 + 1.0);
465        // `unit` must land strictly inside (0, 1) so `ln(unit) < 0` and the
466        // score stays a positive, finite number — the whole HRW ordering relies
467        // on a well-defined comparable score for every backend.
468        debug_assert!(
469            unit > 0.0 && unit < 1.0,
470            "HRW unit must lie strictly inside (0, 1), got {unit}"
471        );
472        let score = -weight / unit.ln();
473        debug_assert!(
474            score.is_finite() && score > 0.0,
475            "HRW score must be finite and positive, got {score}"
476        );
477        score
478    }
479}
480
481impl LoadBalancingAlgorithm for Rendezvous {
482    fn next_available_backend(
483        &mut self,
484        key: Option<u64>,
485        backends: &mut Vec<Rc<RefCell<Backend>>>,
486    ) -> Option<Rc<RefCell<Backend>>> {
487        let Some(key) = key else {
488            // No affinity key: behave exactly like RoundRobin.
489            return self.round_robin.next_available_backend(None, backends);
490        };
491
492        if backends.is_empty() {
493            return None;
494        }
495
496        let mut best: Option<(f64, &Rc<RefCell<Backend>>)> = None;
497        for backend in backends.iter() {
498            let score = self.score(key, &backend.borrow());
499            match best {
500                Some((best_score, _)) if best_score >= score => {}
501                _ => best = Some((score, backend)),
502            }
503        }
504        // A non-empty set always yields a winner, and that winner's score is the
505        // maximum over the whole set (HRW = highest random weight).
506        debug_assert!(
507            best.is_some(),
508            "HRW must select a winner from a non-empty set"
509        );
510        #[cfg(debug_assertions)]
511        if let Some((best_score, _)) = best {
512            for backend in backends.iter() {
513                debug_assert!(
514                    best_score >= self.score(key, &backend.borrow()),
515                    "HRW winner does not maximize the score over the set"
516                );
517            }
518        }
519        best.map(|(_, backend)| backend.clone())
520    }
521}
522
523/// Maglev consistent hashing (Google, NSDI'16).
524///
525/// Builds a precomputed lookup table of `M` (a prime, default 65537) slots from
526/// the **stable, full** backend set. Each backend derives a `(offset, skip)`
527/// permutation from two seeded hashes and claims slots in permutation order
528/// until the table is full; a backend's share of slots is proportional to its
529/// weight.
530///
531/// # Rebuild discipline (never on the hot path)
532///
533/// The table is rebuilt **only when the full backend set changes**
534/// ([`Maglev::rebuild`], wired into `BackendList::add_backend` /
535/// `remove_backend`), never per packet. Building the table is `O(M)` = 65537
536/// slot writes; doing it per datagram would be a DoS amplifier (one unhealthy
537/// backend would trigger a full rebuild on every selection) and would also
538/// destroy Maglev's stability, since the table would track a shifting subset.
539///
540/// # Selection (handles a shrunk healthy subset without rebuilding)
541///
542/// At selection time the caller passes the **healthy subset** of backends.
543/// Lookup is near-`O(1)`: compute `slot = key % M`, then probe forward through
544/// the table (`table[(slot + i) % M]`) and return the first entry whose address
545/// is present in the healthy subset. The table maps to the full set it was
546/// built from; membership is checked against the subset, so an unhealthy
547/// backend is simply skipped — no rebuild, and healthy keys stay pinned. If no
548/// table entry resolves to a healthy backend, fall back to round-robin over the
549/// subset. With no key it falls back to round-robin.
550#[derive(Debug)]
551pub struct Maglev {
552    seed: u64,
553    /// Prime table size `M`.
554    size: usize,
555    /// `table[i]` is an index into `backends` (the set captured at the last
556    /// [`rebuild`]). Empty when there are no backends.
557    table: Vec<usize>,
558    /// Backend addresses captured at the last rebuild, in the order used to
559    /// build `table`. The lookup maps a table entry back to the live backend
560    /// by address so it survives `Vec` reordering between rebuilds.
561    backend_addrs: Vec<SocketAddr>,
562    /// Round-robin cursor used for the `key == None` fallback.
563    round_robin: RoundRobin,
564}
565
566impl Default for Maglev {
567    fn default() -> Self {
568        Self::new()
569    }
570}
571
572impl Maglev {
573    /// Default prime table size. 65537 is the smallest prime above 2^16; large
574    /// enough to keep per-key disruption small on backend churn, small enough
575    /// to rebuild cheaply off the datapath.
576    pub const DEFAULT_TABLE_SIZE: usize = 65537;
577
578    pub fn new() -> Self {
579        Self::with_seed(DEFAULT_HASH_SEED)
580    }
581
582    pub fn with_seed(seed: u64) -> Self {
583        Self::with_seed_and_size(seed, Self::DEFAULT_TABLE_SIZE)
584    }
585
586    pub fn with_seed_and_size(seed: u64, size: usize) -> Self {
587        Self {
588            seed,
589            // The Maglev permutation uses `skip = h2 % (m - 1) + 1`, which
590            // panics (divide-by-zero) when `m == 1`, and a single-slot table
591            // is degenerate anyway. Clamp to the next prime `>= max(size, 2)`
592            // so the table is always usable and the permutation stride is
593            // coprime with `m` (a prime), guaranteeing the population loop
594            // visits every slot.
595            size: next_prime(size.max(2)),
596            table: Vec::new(),
597            backend_addrs: Vec::new(),
598            round_robin: RoundRobin::new(),
599        }
600    }
601
602    /// Rebuild the lookup table from `backends`. Called on backend-set change,
603    /// NOT per packet. Honors backend weight via proportional slot share.
604    pub fn rebuild(&mut self, backends: &[Rc<RefCell<Backend>>]) {
605        let n = backends.len();
606        self.backend_addrs.clear();
607        self.table.clear();
608        if n == 0 || self.size == 0 {
609            return;
610        }
611
612        let m = self.size;
613
614        // Per-backend (offset, skip) permutation parameters and weights.
615        let mut offsets = Vec::with_capacity(n);
616        let mut skips = Vec::with_capacity(n);
617        let mut weights = Vec::with_capacity(n);
618        let mut total_weight: u64 = 0;
619        for backend in backends {
620            let b = backend.borrow();
621            let addr = b.address;
622            self.backend_addrs.push(addr);
623            // Two independent seeded hashes give the permutation seeds. We mix
624            // distinct domain separators into the `key` slot of `hash_backend`
625            // so `offset` and `skip` are uncorrelated.
626            let h1 = hash_backend(self.seed, 0x6F66_6673_6574, &addr); // "offset"
627            let h2 = hash_backend(self.seed, 0x736B_6970_5F5F, &addr); // "skip__"
628            let offset = (h1 % m as u64) as usize;
629            let skip = (h2 % (m as u64 - 1)) as usize + 1;
630            // The permutation parameters must address valid slots and keep a
631            // non-zero stride; `skip ∈ [1, m-1]` is coprime with the prime `m`,
632            // which is what guarantees the population loop visits every slot.
633            debug_assert!(offset < m, "Maglev offset must be a valid slot");
634            debug_assert!(
635                (1..m).contains(&skip),
636                "Maglev skip must lie in [1, m-1] to stay coprime with the prime table"
637            );
638            offsets.push(offset);
639            skips.push(skip);
640            let w = backend_weight(&b) as u64;
641            weights.push(w);
642            total_weight += w;
643        }
644        // Every backend contributes weight >= 1, so a non-empty set has a
645        // strictly positive total — the proportional target math divides by it.
646        debug_assert!(
647            total_weight > 0,
648            "Maglev total weight must be positive for a non-empty backend set"
649        );
650
651        // Target slot count per backend, proportional to weight. The sum of
652        // targets equals `m` (remainder handed to the heaviest/first backends).
653        let mut targets = vec![0usize; n];
654        let mut assigned = 0usize;
655        for (i, &w) in weights.iter().enumerate() {
656            let t = ((w as u128 * m as u128) / total_weight as u128) as usize;
657            targets[i] = t;
658            assigned += t;
659        }
660        // Distribute the rounding remainder so the table fills exactly.
661        let mut i = 0;
662        while assigned < m {
663            targets[i % n] += 1;
664            assigned += 1;
665            i += 1;
666        }
667        // The targets must sum to exactly `m`: this is the termination
668        // guarantee for the population loop below — it writes one slot per unit
669        // of target budget and stops at `count == m`, so an under/over sum would
670        // either leave holes (`usize::MAX` entries) or loop forever.
671        debug_assert_eq!(
672            assigned, m,
673            "Maglev target budget must sum to the table size"
674        );
675        debug_assert_eq!(
676            targets.iter().sum::<usize>(),
677            m,
678            "Maglev per-backend targets must sum to the table size"
679        );
680
681        // Standard Maglev population loop, capped per backend by `targets`.
682        let mut table = vec![usize::MAX; m];
683        let mut next = vec![0usize; n];
684        let mut filled = vec![0usize; n];
685        let mut count = 0usize;
686        while count < m {
687            for b in 0..n {
688                if filled[b] >= targets[b] {
689                    continue;
690                }
691                // Find this backend's next free preferred slot.
692                let mut c = (offsets[b] + next[b] * skips[b]) % m;
693                while table[c] != usize::MAX {
694                    next[b] += 1;
695                    c = (offsets[b] + next[b] * skips[b]) % m;
696                }
697                table[c] = b;
698                next[b] += 1;
699                filled[b] += 1;
700                count += 1;
701                if count >= m {
702                    break;
703                }
704            }
705        }
706        // The loop terminates with every slot claimed exactly once: `count`
707        // reached `m`, no slot still holds the `usize::MAX` sentinel, and each
708        // backend filled precisely its target budget.
709        debug_assert_eq!(count, m, "Maglev population loop must fill exactly m slots");
710        debug_assert!(
711            table.iter().all(|&slot| slot != usize::MAX),
712            "Maglev population loop left an unfilled slot"
713        );
714        debug_assert!(
715            filled == targets,
716            "Maglev filled counts must match the per-backend targets"
717        );
718
719        self.table = table;
720
721        // Post-conditions (TigerStyle). The table is either fully built or empty:
722        //   * `size` is the prime chosen at construction — rebuild never changes
723        //     it (the permutation math depends on a coprime stride over a prime).
724        //   * a populated table is exactly `size` slots, every entry a valid
725        //     index into `backend_addrs` (`< backend_addrs.len() <= size`), so a
726        //     later `table[slot]` lookup can never index out of `backend_addrs`.
727        //   * `backend_addrs` is non-empty whenever the table is non-empty (the
728        //     table maps slots to addresses; an empty address vector would make
729        //     every lookup resolve to nothing).
730        #[cfg(debug_assertions)]
731        {
732            debug_assert!(
733                is_prime(self.size),
734                "Maglev table size {} is not prime",
735                self.size
736            );
737            if self.table.is_empty() {
738                debug_assert!(
739                    self.backend_addrs.is_empty(),
740                    "Maglev: empty table but non-empty backend_addrs"
741                );
742            } else {
743                debug_assert_eq!(
744                    self.table.len(),
745                    self.size,
746                    "Maglev table must have exactly `size` slots"
747                );
748                debug_assert!(
749                    !self.backend_addrs.is_empty(),
750                    "Maglev: non-empty table but empty backend_addrs"
751                );
752                debug_assert!(
753                    self.table.iter().all(|&idx| idx < self.backend_addrs.len()),
754                    "Maglev table holds an index out of backend_addrs range"
755                );
756            }
757        }
758    }
759}
760
761impl LoadBalancingAlgorithm for Maglev {
762    fn next_available_backend(
763        &mut self,
764        key: Option<u64>,
765        backends: &mut Vec<Rc<RefCell<Backend>>>,
766    ) -> Option<Rc<RefCell<Backend>>> {
767        let Some(key) = key else {
768            return self.round_robin.next_available_backend(None, backends);
769        };
770
771        if backends.is_empty() {
772            return None;
773        }
774
775        // Cold start ONLY: the table is empty because no `rebuild` has fired
776        // yet (the control plane wires `rebuild` into backend-set mutations,
777        // but a freshly constructed policy or a never-mutated cluster may not
778        // have one). Build once from the current set. This is NOT a per-packet
779        // rebuild: once populated, the table is only refreshed by `rebuild` on
780        // an actual set change — a partial outage (shrunk healthy subset) never
781        // rebuilds, it is handled by the probe-forward below.
782        if self.table.is_empty() {
783            self.rebuild(backends);
784        }
785
786        if self.table.is_empty() {
787            // Still empty (e.g. zero backends captured): nothing to select.
788            return None;
789        }
790
791        // Probe forward from `key % M` through the table and return the first
792        // entry whose backend address is present in the passed healthy subset.
793        // The table was built from the full set; checking membership against
794        // the subset lets us skip unhealthy backends WITHOUT rebuilding, which
795        // is what keeps a partial outage off the hot path and keeps healthy
796        // keys pinned to the same backend.
797        let start = (key % self.size as u64) as usize;
798        // `key % size` is always a valid table slot, and the table is exactly
799        // `size` long here (non-empty checked above, rebuild post-condition).
800        debug_assert!(start < self.size, "Maglev start slot out of range");
801        debug_assert_eq!(
802            self.table.len(),
803            self.size,
804            "Maglev lookup on a table whose length != size"
805        );
806        for i in 0..self.size {
807            let slot = (start + i) % self.size;
808            // Both the running slot and the index it stores are in range: the
809            // table is `size` long and every entry is a valid `backend_addrs`
810            // index (rebuild post-condition), so the resolution below is total.
811            debug_assert!(slot < self.size, "Maglev probe slot out of range");
812            let idx = self.table[slot];
813            debug_assert!(
814                idx < self.backend_addrs.len(),
815                "Maglev table entry indexes outside the captured address set"
816            );
817            // Resolve the table's backend index back to an address captured at
818            // the last rebuild, then look it up in the live healthy subset.
819            if let Some(addr) = self.backend_addrs.get(idx) {
820                if let Some(backend) = backends.iter().find(|b| b.borrow().address == *addr) {
821                    // The chosen backend is, by construction, a member of the
822                    // healthy subset we were handed.
823                    debug_assert!(
824                        backends
825                            .iter()
826                            .any(|b| b.borrow().address == backend.borrow().address),
827                        "Maglev must return a backend from the healthy subset"
828                    );
829                    return Some(backend.clone());
830                }
831            }
832        }
833
834        // None of the table entries resolved to a healthy backend (every
835        // backend the table knows about is currently unhealthy/absent). Fall
836        // back to round-robin over the healthy subset so we still route.
837        self.round_robin.next_available_backend(None, backends)
838    }
839
840    fn rebuild(&mut self, backends: &[Rc<RefCell<Backend>>]) {
841        Maglev::rebuild(self, backends);
842    }
843}
844
845#[cfg(test)]
846mod test {
847    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
848
849    use super::*;
850    use crate::{
851        PeakEWMA,
852        backends::{BackendStatus, HealthState},
853        retry::{ExponentialBackoffPolicy, RetryPolicyWrapper},
854        sozu_command::proto::command::{LoadBalancingParams, LoadMetric},
855    };
856
857    fn create_backend(id: String, connections: Option<usize>) -> Backend {
858        Backend {
859            sticky_id: None,
860            backend_id: id,
861            address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
862            status: BackendStatus::Normal,
863            retry_policy: RetryPolicyWrapper::ExponentialBackoff(ExponentialBackoffPolicy::new(1)),
864            active_connections: connections.unwrap_or(0),
865            active_requests: 0,
866            failures: 0,
867            load_balancing_parameters: None,
868            backup: false,
869            connection_time: PeakEWMA::new(),
870            health: HealthState::default(),
871        }
872    }
873
874    #[test]
875    fn it_should_find_the_backend_with_least_connections() {
876        let backend_with_least_connection =
877            Rc::new(RefCell::new(create_backend("yolo".to_string(), Some(1))));
878
879        let mut backends = vec![
880            Rc::new(RefCell::new(create_backend("nolo".to_string(), Some(10)))),
881            Rc::new(RefCell::new(create_backend("philo".to_string(), Some(20)))),
882            backend_with_least_connection.clone(),
883        ];
884
885        let mut least_connection_algorithm = LeastLoaded {
886            metric: LoadMetric::Connections,
887        };
888
889        let backend_res = least_connection_algorithm
890            .next_available_backend(None, &mut backends)
891            .unwrap();
892        let backend = backend_res.borrow();
893
894        assert!(*backend == *backend_with_least_connection.borrow());
895    }
896
897    #[test]
898    fn it_shouldnt_find_backend_with_least_connections_when_list_is_empty() {
899        let mut backends = vec![];
900
901        let mut least_connection_algorithm = LeastLoaded {
902            metric: LoadMetric::Connections,
903        };
904
905        let backend = least_connection_algorithm.next_available_backend(None, &mut backends);
906        assert!(backend.is_none());
907    }
908
909    #[test]
910    fn it_should_find_backend_with_roundrobin_when_some_backends_were_removed() {
911        let mut backends = vec![
912            Rc::new(RefCell::new(create_backend("toto".to_string(), None))),
913            Rc::new(RefCell::new(create_backend("voto".to_string(), None))),
914            Rc::new(RefCell::new(create_backend("yoto".to_string(), None))),
915        ];
916
917        let mut roundrobin = RoundRobin { next_backend: 1 };
918        let backend = roundrobin.next_available_backend(None, &mut backends);
919        assert_eq!(backend.as_ref(), backends.get(1));
920
921        backends.remove(1);
922
923        let backend2 = roundrobin.next_available_backend(None, &mut backends);
924        assert_eq!(backend2.as_ref(), backends.first());
925    }
926
927    // ----- HRW (Rendezvous) and Maglev affinity tests -----
928
929    /// Build a backend with a distinct address so the address-based affinity
930    /// hashers (HRW / Maglev) see distinct identities, and an optional weight.
931    fn addr_backend(id: &str, last_octet: u8, port: u16, weight: Option<i32>) -> Backend {
932        let mut b = create_backend(id.to_string(), None);
933        b.address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, last_octet)), port);
934        b.load_balancing_parameters = weight.map(|weight| LoadBalancingParams { weight });
935        b
936    }
937
938    fn rc(b: Backend) -> Rc<RefCell<Backend>> {
939        Rc::new(RefCell::new(b))
940    }
941
942    fn make_backends(n: u8) -> Vec<Rc<RefCell<Backend>>> {
943        (0..n)
944            .map(|i| rc(addr_backend(&format!("b{i}"), i + 1, 8000 + i as u16, None)))
945            .collect()
946    }
947
948    fn chosen_addr(b: &Rc<RefCell<Backend>>) -> SocketAddr {
949        b.borrow().address
950    }
951
952    #[test]
953    fn hrw_is_deterministic_for_a_fixed_key() {
954        let mut backends = make_backends(5);
955        let mut hrw = Rendezvous::new();
956
957        let first = hrw
958            .next_available_backend(Some(42), &mut backends)
959            .map(|b| chosen_addr(&b));
960        for _ in 0..50 {
961            let again = hrw
962                .next_available_backend(Some(42), &mut backends)
963                .map(|b| chosen_addr(&b));
964            assert_eq!(first, again, "HRW must be deterministic for a fixed key");
965        }
966    }
967
968    #[test]
969    fn hrw_none_key_falls_back_to_round_robin() {
970        let mut backends = make_backends(3);
971        let mut hrw = Rendezvous::new();
972
973        // With None it should cycle round-robin: addresses in order then wrap.
974        let a = chosen_addr(&hrw.next_available_backend(None, &mut backends).unwrap());
975        let b = chosen_addr(&hrw.next_available_backend(None, &mut backends).unwrap());
976        let c = chosen_addr(&hrw.next_available_backend(None, &mut backends).unwrap());
977        let d = chosen_addr(&hrw.next_available_backend(None, &mut backends).unwrap());
978        assert_eq!(a, chosen_addr(&backends[0]));
979        assert_eq!(b, chosen_addr(&backends[1]));
980        assert_eq!(c, chosen_addr(&backends[2]));
981        assert_eq!(d, a, "round-robin should wrap around");
982    }
983
984    #[test]
985    fn hrw_minimal_disruption_when_removing_a_non_winner() {
986        // For every key whose winner is NOT the removed backend, the choice
987        // must be unchanged after removal (HRW minimal-disruption property).
988        let mut backends = make_backends(6);
989        let mut hrw = Rendezvous::new();
990
991        // Pick a backend to remove (index 3).
992        let removed_addr = chosen_addr(&backends[3]);
993
994        // Record winners for many keys on the full set.
995        let mut before = std::collections::HashMap::new();
996        for key in 0..2000u64 {
997            let w = chosen_addr(
998                &hrw.next_available_backend(Some(key), &mut backends)
999                    .unwrap(),
1000            );
1001            before.insert(key, w);
1002        }
1003
1004        // Remove the backend and re-evaluate.
1005        backends.remove(3);
1006        for key in 0..2000u64 {
1007            let after = chosen_addr(
1008                &hrw.next_available_backend(Some(key), &mut backends)
1009                    .unwrap(),
1010            );
1011            let prev = before[&key];
1012            if prev != removed_addr {
1013                assert_eq!(
1014                    prev, after,
1015                    "removing a non-winner changed the choice for key {key}"
1016                );
1017            }
1018        }
1019    }
1020
1021    #[test]
1022    fn hrw_distribution_is_roughly_even() {
1023        let n = 5u8;
1024        let mut backends = make_backends(n);
1025        let mut hrw = Rendezvous::new();
1026
1027        let total = 20_000u64;
1028        let mut counts: std::collections::HashMap<SocketAddr, u64> =
1029            std::collections::HashMap::new();
1030        for key in 0..total {
1031            let w = chosen_addr(
1032                &hrw.next_available_backend(Some(key), &mut backends)
1033                    .unwrap(),
1034            );
1035            *counts.entry(w).or_default() += 1;
1036        }
1037
1038        let expected = total / n as u64;
1039        for b in &backends {
1040            let c = counts.get(&chosen_addr(b)).copied().unwrap_or(0);
1041            // Allow generous +/-35% slack: this is a statistical property.
1042            assert!(
1043                c > expected * 65 / 100 && c < expected * 135 / 100,
1044                "HRW distribution skewed: backend got {c}, expected ~{expected}"
1045            );
1046        }
1047    }
1048
1049    #[test]
1050    fn maglev_is_deterministic_for_a_fixed_key() {
1051        let backends = make_backends(7);
1052        let mut mag = Maglev::new();
1053        mag.rebuild(&backends);
1054
1055        let mut sel = backends.clone();
1056        let first = chosen_addr(&mag.next_available_backend(Some(12345), &mut sel).unwrap());
1057        for _ in 0..50 {
1058            let again = chosen_addr(&mag.next_available_backend(Some(12345), &mut sel).unwrap());
1059            assert_eq!(first, again, "Maglev must be deterministic for a fixed key");
1060        }
1061    }
1062
1063    #[test]
1064    fn maglev_none_key_falls_back_to_round_robin() {
1065        let mut backends = make_backends(3);
1066        let mut mag = Maglev::new();
1067        mag.rebuild(&backends);
1068
1069        let a = chosen_addr(&mag.next_available_backend(None, &mut backends).unwrap());
1070        let b = chosen_addr(&mag.next_available_backend(None, &mut backends).unwrap());
1071        let c = chosen_addr(&mag.next_available_backend(None, &mut backends).unwrap());
1072        let d = chosen_addr(&mag.next_available_backend(None, &mut backends).unwrap());
1073        assert_eq!(a, chosen_addr(&backends[0]));
1074        assert_eq!(b, chosen_addr(&backends[1]));
1075        assert_eq!(c, chosen_addr(&backends[2]));
1076        assert_eq!(d, a);
1077    }
1078
1079    #[test]
1080    fn maglev_distribution_is_roughly_even() {
1081        let n = 5u8;
1082        let backends = make_backends(n);
1083        // Use a small prime table so the test is fast but still representative.
1084        let mut mag = Maglev::with_seed_and_size(DEFAULT_HASH_SEED, 1009);
1085        mag.rebuild(&backends);
1086
1087        let total = 50_000u64;
1088        let mut counts: std::collections::HashMap<SocketAddr, u64> =
1089            std::collections::HashMap::new();
1090        let mut sel = backends.clone();
1091        for key in 0..total {
1092            let w = chosen_addr(&mag.next_available_backend(Some(key), &mut sel).unwrap());
1093            *counts.entry(w).or_default() += 1;
1094        }
1095
1096        let expected = total / n as u64;
1097        for b in &backends {
1098            let c = counts.get(&chosen_addr(b)).copied().unwrap_or(0);
1099            // Maglev is more even than HRW; +/-15% is comfortable.
1100            assert!(
1101                c > expected * 85 / 100 && c < expected * 115 / 100,
1102                "Maglev distribution skewed: backend got {c}, expected ~{expected}"
1103            );
1104        }
1105    }
1106
1107    #[test]
1108    fn maglev_table_rebuild_keeps_most_keys_stable() {
1109        // Adding a backend should move only a bounded fraction of keys
1110        // (Maglev disruption bound ~ 1/N of the table, well under 50%).
1111        let backends5 = make_backends(5);
1112        let mut mag = Maglev::with_seed_and_size(DEFAULT_HASH_SEED, 1009);
1113        mag.rebuild(&backends5);
1114
1115        let total = 20_000u64;
1116        let mut before = std::collections::HashMap::new();
1117        let mut sel = backends5.clone();
1118        for key in 0..total {
1119            before.insert(
1120                key,
1121                chosen_addr(&mag.next_available_backend(Some(key), &mut sel).unwrap()),
1122            );
1123        }
1124
1125        // Add a sixth backend and rebuild.
1126        let mut backends6 = backends5.clone();
1127        backends6.push(rc(addr_backend("b5", 6, 8005, None)));
1128        mag.rebuild(&backends6);
1129
1130        let mut moved = 0u64;
1131        let mut sel6 = backends6.clone();
1132        for key in 0..total {
1133            let after = chosen_addr(&mag.next_available_backend(Some(key), &mut sel6).unwrap());
1134            if after != before[&key] {
1135                moved += 1;
1136            }
1137        }
1138
1139        // Disruption should be well under half: most keys stay on their backend.
1140        assert!(
1141            moved < total / 2,
1142            "Maglev rebuild moved too many keys: {moved}/{total}"
1143        );
1144    }
1145
1146    #[test]
1147    fn maglev_partial_outage_does_not_rebuild_and_stays_stable() {
1148        // FIX 1 guard: a partial outage (one backend missing from the healthy
1149        // subset passed to `next_available_backend`) must NOT rebuild the
1150        // table — the table tracks the full set captured at `rebuild`, and the
1151        // unhealthy backend is simply skipped via probe-forward. Selection for
1152        // keys that did not land on the unhealthy backend stays identical.
1153        let full = make_backends(5);
1154        let mut mag = Maglev::with_seed_and_size(DEFAULT_HASH_SEED, 1009);
1155        mag.rebuild(&full);
1156
1157        // Snapshot the table state built from the FULL set.
1158        let table_before = mag.table.clone();
1159        let addrs_before = mag.backend_addrs.clone();
1160        assert_eq!(addrs_before.len(), 5, "table built from the full set");
1161
1162        // Record winners on the full healthy set.
1163        let total = 4000u64;
1164        let mut before = std::collections::HashMap::new();
1165        let mut sel_full = full.clone();
1166        for key in 0..total {
1167            before.insert(
1168                key,
1169                chosen_addr(
1170                    &mag.next_available_backend(Some(key), &mut sel_full)
1171                        .unwrap(),
1172                ),
1173            );
1174        }
1175
1176        // Now mark backend index 2 unhealthy by passing a REDUCED subset
1177        // (everything except index 2). This is exactly what
1178        // `BackendList::next_available_backend_with_key` does when a backend
1179        // fails its health check / enters retry backoff.
1180        let unhealthy_addr = chosen_addr(&full[2]);
1181        let mut subset: Vec<_> = full
1182            .iter()
1183            .filter(|b| chosen_addr(b) != unhealthy_addr)
1184            .cloned()
1185            .collect();
1186
1187        for key in 0..total {
1188            let after = chosen_addr(&mag.next_available_backend(Some(key), &mut subset).unwrap());
1189            // The unhealthy backend must never be selected.
1190            assert_ne!(
1191                after, unhealthy_addr,
1192                "selection returned the unhealthy backend for key {key}"
1193            );
1194            // Keys that did NOT previously land on the unhealthy backend must
1195            // keep their original backend — probe-forward only reroutes the
1196            // keys that were pinned to the now-missing backend.
1197            if before[&key] != unhealthy_addr {
1198                assert_eq!(
1199                    before[&key], after,
1200                    "a healthy key moved during a partial outage (key {key})"
1201                );
1202            }
1203        }
1204
1205        // The crux of FIX 1: the table was NOT rebuilt by the partial outage.
1206        // Neither the table contents nor the captured address set changed.
1207        assert_eq!(
1208            mag.table, table_before,
1209            "partial outage must not rebuild the Maglev table"
1210        );
1211        assert_eq!(
1212            mag.backend_addrs, addrs_before,
1213            "partial outage must not change the captured backend set"
1214        );
1215    }
1216
1217    #[test]
1218    fn maglev_all_table_backends_unhealthy_falls_back_to_round_robin() {
1219        // When the healthy subset shares NO address with the table (every
1220        // table backend is down), probe-forward finds nothing and we fall back
1221        // to round-robin over the subset rather than returning None.
1222        let table_set = make_backends(3);
1223        let mut mag = Maglev::with_seed_and_size(DEFAULT_HASH_SEED, 1009);
1224        mag.rebuild(&table_set);
1225        let table_before = mag.table.clone();
1226
1227        // A disjoint healthy subset (different addresses than the table).
1228        let mut fresh = vec![
1229            rc(addr_backend("n0", 50, 9000, None)),
1230            rc(addr_backend("n1", 51, 9001, None)),
1231        ];
1232        let fresh_addrs: Vec<_> = fresh.iter().map(chosen_addr).collect();
1233
1234        let picked = chosen_addr(&mag.next_available_backend(Some(7), &mut fresh).unwrap());
1235        assert!(
1236            fresh_addrs.contains(&picked),
1237            "fallback must route to a backend in the healthy subset"
1238        );
1239        // And still no rebuild happened.
1240        assert_eq!(
1241            mag.table, table_before,
1242            "fallback must not rebuild the table"
1243        );
1244    }
1245
1246    #[test]
1247    fn maglev_cold_start_builds_table_once() {
1248        // A freshly constructed Maglev with no prior `rebuild` must still
1249        // select (one-time cold-start build), then keep the table populated.
1250        let mut backends = make_backends(4);
1251        let mut mag = Maglev::with_seed_and_size(DEFAULT_HASH_SEED, 1009);
1252        assert!(mag.table.is_empty(), "table starts empty (cold)");
1253
1254        let _ = mag.next_available_backend(Some(99), &mut backends).unwrap();
1255        assert_eq!(mag.table.len(), mag.size, "cold start populated the table");
1256        let table_after_cold = mag.table.clone();
1257
1258        // A subsequent selection (still the full set) must NOT rebuild.
1259        let _ = mag
1260            .next_available_backend(Some(100), &mut backends)
1261            .unwrap();
1262        assert_eq!(
1263            mag.table, table_after_cold,
1264            "selection after cold start must not rebuild"
1265        );
1266    }
1267
1268    #[test]
1269    fn round_robin_empty_set_returns_none_without_panic() {
1270        // FIX 2 guard: `% backends.len()` would panic on an empty Vec.
1271        let mut empty: Vec<Rc<RefCell<Backend>>> = vec![];
1272        let mut rr = RoundRobin::new();
1273        assert!(rr.next_available_backend(None, &mut empty).is_none());
1274
1275        // Delegators (Rendezvous / Maglev with key == None) route through
1276        // RoundRobin and must be safe on an empty set too.
1277        let mut hrw = Rendezvous::new();
1278        assert!(hrw.next_available_backend(None, &mut empty).is_none());
1279        let mut mag = Maglev::new();
1280        assert!(mag.next_available_backend(None, &mut empty).is_none());
1281    }
1282
1283    #[test]
1284    fn maglev_table_size_one_is_clamped_to_a_prime() {
1285        // FIX 3 guard: requesting size 1 would make `skip = h2 % (m - 1)`
1286        // divide by zero. The constructor must clamp to a prime >= 2.
1287        let mut mag = Maglev::with_seed_and_size(DEFAULT_HASH_SEED, 1);
1288        assert!(
1289            mag.size >= 2,
1290            "size must be clamped to >= 2, got {}",
1291            mag.size
1292        );
1293
1294        let mut backends = make_backends(3);
1295        // Building and selecting must not panic.
1296        mag.rebuild(&backends);
1297        assert_eq!(mag.table.len(), mag.size);
1298        let _ = mag.next_available_backend(Some(1), &mut backends).unwrap();
1299    }
1300
1301    #[test]
1302    fn next_prime_picks_the_smallest_prime_at_least_n() {
1303        assert_eq!(next_prime(0), 2);
1304        assert_eq!(next_prime(1), 2);
1305        assert_eq!(next_prime(2), 2);
1306        assert_eq!(next_prime(3), 3);
1307        assert_eq!(next_prime(4), 5);
1308        assert_eq!(next_prime(1009), 1009); // already prime — preserved
1309        assert_eq!(next_prime(65537), 65537); // default size is prime
1310    }
1311
1312    #[test]
1313    fn maglev_honors_weight() {
1314        // A backend with 4x weight should win clearly more slots.
1315        let backends = vec![
1316            rc(addr_backend("light", 1, 8001, Some(100))),
1317            rc(addr_backend("heavy", 2, 8002, Some(400))),
1318        ];
1319        let mut mag = Maglev::with_seed_and_size(DEFAULT_HASH_SEED, 1009);
1320        mag.rebuild(&backends);
1321
1322        let heavy_addr = chosen_addr(&backends[1]);
1323        let total = 20_000u64;
1324        let mut heavy = 0u64;
1325        let mut sel = backends.clone();
1326        for key in 0..total {
1327            if chosen_addr(&mag.next_available_backend(Some(key), &mut sel).unwrap()) == heavy_addr
1328            {
1329                heavy += 1;
1330            }
1331        }
1332        // Expected ~80% (400 / 500); allow a margin.
1333        assert!(
1334            heavy > total * 70 / 100,
1335            "weighted Maglev did not favor the heavy backend: {heavy}/{total}"
1336        );
1337    }
1338}