Skip to main content

dynomite/hashkit/
random_slicing.rs

1//! Random-slicing distribution: a small, gap-free partition table
2//! over the 64-bit hash space.
3//!
4//! The hash space is split into a contiguous run of intervals.
5//! Each interval is owned by exactly one *claimant* (a peer or
6//! peer-name in our terminology). Lookups are an `O(log N)`
7//! binary search over the interval table, where `N` is the
8//! claimant count.
9//!
10//! Two construction-time invariants give the technique its name:
11//!
12//! 1. The intervals are laid out end-to-end so their union is
13//!    the entire ring. Whatever rounding remainder integer
14//!    division leaves over is absorbed by the last interval, so
15//!    no value of `hash(key)` can fail to map to a claimant.
16//! 2. Adding or removing a claimant only re-slices its share.
17//!    Keys outside the affected slice keep their assignment.
18//!
19//! The structure is built once per rack at config-load /
20//! `SIGHUP`-reload time and thereafter consulted read-only by
21//! the dispatcher; see [`crate::cluster::dispatch`].
22//!
23//! # Examples
24//!
25//! ```
26//! use dynomite::hashkit::random_slicing::RandomSlices;
27//!
28//! let slices = RandomSlices::from_uniform(&["peer-a", "peer-b"]).unwrap();
29//! // Either of the two claimants must own the lookup; the
30//! // partition is gap-free by construction.
31//! let owner = slices.claimant_for(0).unwrap();
32//! assert!(owner == "peer-a" || owner == "peer-b");
33//! ```
34
35use std::collections::BTreeSet;
36
37use thiserror::Error;
38
39/// Failure mode produced by the [`RandomSlices`] builders.
40#[derive(Debug, Error, PartialEq)]
41pub enum RandomSlicesError {
42    /// Caller supplied an empty claimant list. The reference
43    /// engine refuses to build an empty partition; we mirror
44    /// that here.
45    #[error("random_slicing: at least one claimant is required")]
46    EmptyClaimants,
47    /// Two claimants share the same name. Names must be unique
48    /// so the reverse lookup (peer name -> slice) is total.
49    #[error("random_slicing: duplicate claimant '{name}'")]
50    DuplicateClaimant {
51        /// The offending duplicated name.
52        name: String,
53    },
54    /// A weight is non-finite, negative, or so large the size
55    /// translation cannot proceed. The error is also raised when
56    /// every weight rounds to zero at the configured number of
57    /// decimal digits.
58    #[error("random_slicing: invalid weight for '{name}': {weight}")]
59    InvalidWeight {
60        /// The claimant whose weight failed validation.
61        name: String,
62        /// The offending weight.
63        weight: f64,
64    },
65    /// The summed [`u64`] sizes overflow the ring. The C
66    /// reference's `hash_partitions_create_with_sizes` accepts
67    /// only sums up to `u64::MAX`; anything beyond is a
68    /// configuration error.
69    #[error("random_slicing: claimant size sum {sum} exceeds u64::MAX")]
70    OversizedSizeSum {
71        /// The summed sizes (as a `u128` so the overflow is
72        /// representable in the diagnostic).
73        sum: u128,
74    },
75    /// Caller supplied a zero-sized interval. The reference
76    /// engine permits zero-sized entries silently and the
77    /// binary search then returns the next claimant; the Rust
78    /// port rejects this configuration up front so no claimant
79    /// can be silently masked.
80    #[error("random_slicing: zero-sized interval at index {index} for '{name}'")]
81    ZeroInterval {
82        /// Position of the offending entry in the input list.
83        index: usize,
84        /// The claimant whose size was zero.
85        name: String,
86    },
87}
88
89/// Slice table indexed by `u64` hash values.
90///
91/// A `RandomSlices` instance owns three parallel arrays: the
92/// ascending interval lower bounds, the per-interval sizes
93/// (kept for diagnostics), and the per-interval claimant names.
94/// All builders maintain the parallel-array invariant; do not
95/// construct this struct by hand.
96#[derive(Clone, Debug, Eq, PartialEq)]
97pub struct RandomSlices {
98    /// Strict-ascending lower bounds.
99    lower_bounds: Vec<u64>,
100    /// Parallel size array. `interval_sizes[i]` is the number
101    /// of `u64` values mapped to claimant `i`. The last entry
102    /// absorbs any rounding remainder so the union is the full
103    /// ring.
104    interval_sizes: Vec<u64>,
105    /// Parallel claimant array.
106    claimants: Vec<String>,
107}
108
109impl RandomSlices {
110    /// Build an equal-sized partition for `claimants`. Each
111    /// claimant gets an interval of `u64::MAX / N` values; the
112    /// last claimant absorbs the rounding remainder so coverage
113    /// is exact.
114    ///
115    /// # Errors
116    /// Returns [`RandomSlicesError::EmptyClaimants`] when the
117    /// list is empty and
118    /// [`RandomSlicesError::DuplicateClaimant`] when two entries
119    /// share a name.
120    ///
121    /// # Examples
122    ///
123    /// ```
124    /// use dynomite::hashkit::random_slicing::RandomSlices;
125    /// let s = RandomSlices::from_uniform(&["a", "b", "c"]).unwrap();
126    /// assert_eq!(s.len(), 3);
127    /// ```
128    pub fn from_uniform(claimants: &[&str]) -> Result<Self, RandomSlicesError> {
129        if claimants.is_empty() {
130            return Err(RandomSlicesError::EmptyClaimants);
131        }
132        check_unique(claimants.iter().copied())?;
133        let n = claimants.len() as u64;
134        // `u64::MAX / n` cannot overflow because `n >= 1`.
135        let base = u64::MAX / n;
136        let used = base.checked_mul(n).unwrap_or(0);
137        let remainder = u64::MAX - used;
138        let mut sizes: Vec<u64> = vec![base; claimants.len()];
139        if let Some(last) = sizes.last_mut() {
140            *last = last.saturating_add(remainder);
141        }
142        let lower_bounds = build_lower_bounds(&sizes);
143        Ok(Self {
144            lower_bounds,
145            interval_sizes: sizes,
146            claimants: claimants.iter().map(|s| (*s).to_string()).collect(),
147        })
148    }
149
150    /// Build a weighted partition. Each weight is rounded to
151    /// `decimal_digits` places before normalising; the sum is
152    /// then translated into per-claimant `u64` sizes by
153    /// `floor(u64::MAX * w / sum)`. The last claimant absorbs
154    /// the remainder so coverage is exact.
155    ///
156    /// # Errors
157    /// Returns [`RandomSlicesError::EmptyClaimants`],
158    /// [`RandomSlicesError::DuplicateClaimant`], or
159    /// [`RandomSlicesError::InvalidWeight`] (for non-finite,
160    /// negative, or all-zero rounded weights).
161    ///
162    /// # Examples
163    ///
164    /// ```
165    /// use dynomite::hashkit::random_slicing::RandomSlices;
166    /// let s = RandomSlices::from_weights(
167    ///     &[("a".to_string(), 1.0), ("b".to_string(), 2.0)],
168    ///     2,
169    /// ).unwrap();
170    /// assert_eq!(s.len(), 2);
171    /// ```
172    pub fn from_weights(
173        weights: &[(String, f64)],
174        decimal_digits: usize,
175    ) -> Result<Self, RandomSlicesError> {
176        if weights.is_empty() {
177            return Err(RandomSlicesError::EmptyClaimants);
178        }
179        check_unique(weights.iter().map(|(n, _)| n.as_str()))?;
180        let scale = 10f64.powi(i32::try_from(decimal_digits).unwrap_or(0));
181        let mut rounded: Vec<f64> = Vec::with_capacity(weights.len());
182        for (name, w) in weights {
183            if !w.is_finite() || *w < 0.0 {
184                return Err(RandomSlicesError::InvalidWeight {
185                    name: name.clone(),
186                    weight: *w,
187                });
188            }
189            // Round to `decimal_digits` places.
190            let r = (w * scale).round() / scale;
191            rounded.push(r);
192        }
193        let sum: f64 = rounded.iter().sum();
194        if !sum.is_finite() || sum <= 0.0 {
195            // Every weight rounded to zero (or sum is otherwise
196            // unusable). Pick the first claimant as the offender.
197            let first = weights[0].clone();
198            return Err(RandomSlicesError::InvalidWeight {
199                name: first.0,
200                weight: first.1,
201            });
202        }
203        // Convert each rounded weight (except the last) to a
204        // u64 size by `floor(u64::MAX * w / sum)`. The last
205        // claimant absorbs the rest so coverage is exact and
206        // free of `f64`-rounding overflow. The cast to `f64`
207        // discards bits below the mantissa's 52-bit width,
208        // which is harmless: the slice sizes only need to be
209        // approximate (the dispatcher binary-searches on the
210        // *lower bounds*; the size array is for diagnostics).
211        #[allow(clippy::cast_precision_loss)]
212        let max_f = u64::MAX as f64;
213        let mut sizes: Vec<u64> = Vec::with_capacity(weights.len());
214        let last_idx = weights.len() - 1;
215        let mut consumed: u128 = 0;
216        for (idx, _) in weights.iter().enumerate() {
217            if idx == last_idx {
218                // Tail absorbs whatever is left up to u64::MAX.
219                let mut last = u128::from(u64::MAX).saturating_sub(consumed);
220                if last == 0 {
221                    // The first `n-1` claimants saturated the
222                    // ring; carve out one byte for the tail.
223                    last = 1;
224                }
225                let last_u64 = u64::try_from(last.min(u128::from(u64::MAX))).unwrap_or(u64::MAX);
226                sizes.push(last_u64);
227                break;
228            }
229            let frac = rounded[idx] / sum;
230            let raw = max_f * frac;
231            let size: u64 = if raw <= 0.0 {
232                0
233            } else if raw >= max_f {
234                u64::MAX
235            } else {
236                raw as u64
237            };
238            consumed = consumed.saturating_add(u128::from(size));
239            sizes.push(size);
240        }
241        // If every translated size is zero, surface that as an
242        // invalid-weight error rather than silently producing
243        // an unbuildable partition.
244        if sizes.iter().all(|s| *s == 0) {
245            let first = weights[0].clone();
246            return Err(RandomSlicesError::InvalidWeight {
247                name: first.0,
248                weight: first.1,
249            });
250        }
251        // Promote each zero-size to one so no claimant is
252        // silently masked. The tail-bump below will absorb the
253        // change.
254        let mut promoted_zeros: u64 = 0;
255        for (idx, s) in sizes.iter_mut().enumerate() {
256            if *s == 0 && idx != last_idx {
257                *s = 1;
258                promoted_zeros = promoted_zeros.saturating_add(1);
259            }
260        }
261        if promoted_zeros > 0 {
262            // Steal those bytes from the tail so the total stays
263            // bounded by `u64::MAX`.
264            if let Some(last) = sizes.last_mut() {
265                *last = last.saturating_sub(promoted_zeros).max(1);
266            }
267        }
268        let pairs: Vec<(String, u64)> = weights
269            .iter()
270            .zip(sizes.iter())
271            .map(|((n, _), s)| (n.clone(), *s))
272            .collect();
273        Self::from_sizes(&pairs)
274    }
275
276    /// Build a partition from explicit `(name, size)` pairs.
277    ///
278    /// The supplied sizes are taken at face value; the only
279    /// transformation is bumping the tail interval up by the
280    /// remainder when the sum is below `u64::MAX`. Sums above
281    /// `u64::MAX` are rejected with
282    /// [`RandomSlicesError::OversizedSizeSum`].
283    ///
284    /// # Errors
285    /// Returns [`RandomSlicesError::EmptyClaimants`],
286    /// [`RandomSlicesError::DuplicateClaimant`],
287    /// [`RandomSlicesError::ZeroInterval`], or
288    /// [`RandomSlicesError::OversizedSizeSum`].
289    ///
290    /// # Examples
291    ///
292    /// ```
293    /// use dynomite::hashkit::random_slicing::RandomSlices;
294    /// let s = RandomSlices::from_sizes(&[
295    ///     ("a".into(), 100),
296    ///     ("b".into(), 200),
297    /// ]).unwrap();
298    /// assert_eq!(s.len(), 2);
299    /// ```
300    pub fn from_sizes(sizes: &[(String, u64)]) -> Result<Self, RandomSlicesError> {
301        if sizes.is_empty() {
302            return Err(RandomSlicesError::EmptyClaimants);
303        }
304        check_unique(sizes.iter().map(|(n, _)| n.as_str()))?;
305        let mut sum: u128 = 0;
306        for (idx, (name, s)) in sizes.iter().enumerate() {
307            if *s == 0 {
308                return Err(RandomSlicesError::ZeroInterval {
309                    index: idx,
310                    name: name.clone(),
311                });
312            }
313            sum += u128::from(*s);
314            if sum > u128::from(u64::MAX) {
315                return Err(RandomSlicesError::OversizedSizeSum { sum });
316            }
317        }
318        let mut interval_sizes: Vec<u64> = sizes.iter().map(|(_, s)| *s).collect();
319        // Bump the tail to absorb any remainder so coverage is
320        // exactly the full ring.
321        let used: u128 = sum;
322        let remainder = u128::from(u64::MAX) - used;
323        if remainder > 0 {
324            if let Some(last) = interval_sizes.last_mut() {
325                let bumped = u128::from(*last) + remainder;
326                *last = u64::try_from(bumped).expect("invariant: remainder fits");
327            }
328        }
329        let lower_bounds = build_lower_bounds(&interval_sizes);
330        Ok(Self {
331            lower_bounds,
332            interval_sizes,
333            claimants: sizes.iter().map(|(n, _)| n.clone()).collect(),
334        })
335    }
336
337    /// Number of claimants / intervals in the table.
338    ///
339    /// # Examples
340    ///
341    /// ```
342    /// use dynomite::hashkit::random_slicing::RandomSlices;
343    /// assert_eq!(RandomSlices::from_uniform(&["a", "b"]).unwrap().len(), 2);
344    /// ```
345    #[must_use]
346    pub fn len(&self) -> usize {
347        self.claimants.len()
348    }
349
350    /// True when the table holds no claimants. The builders
351    /// reject empty inputs, so this can only be observed via
352    /// the (private) raw constructor; included for completeness
353    /// behind clippy's `len` lint.
354    ///
355    /// # Examples
356    ///
357    /// ```
358    /// use dynomite::hashkit::random_slicing::RandomSlices;
359    /// assert!(!RandomSlices::from_uniform(&["a"]).unwrap().is_empty());
360    /// ```
361    #[must_use]
362    pub fn is_empty(&self) -> bool {
363        self.claimants.is_empty()
364    }
365
366    /// Look up the claimant that owns `hash`.
367    ///
368    /// Returns the borrowed claimant name. Returns `None` only
369    /// when the table is empty (which the builders prevent).
370    ///
371    /// # Examples
372    ///
373    /// ```
374    /// use dynomite::hashkit::random_slicing::RandomSlices;
375    /// let s = RandomSlices::from_uniform(&["a", "b"]).unwrap();
376    /// assert!(s.claimant_for(0).is_some());
377    /// ```
378    #[must_use]
379    pub fn claimant_for(&self, hash: u64) -> Option<&str> {
380        let idx = self.index_for(hash)?;
381        self.claimants.get(idx).map(String::as_str)
382    }
383
384    /// Look up the index of the claimant that owns `hash`.
385    ///
386    /// Returns `None` only when the table is empty.
387    #[must_use]
388    pub fn index_for(&self, hash: u64) -> Option<usize> {
389        let n = self.lower_bounds.len();
390        if n == 0 {
391            return None;
392        }
393        let last = n - 1;
394        if hash >= self.lower_bounds[last] {
395            return Some(last);
396        }
397        // Largest `i` with `lower_bounds[i] <= hash`.
398        let mut lo = 0usize;
399        let mut hi = last;
400        while lo < hi {
401            let mid = lo + (hi - lo).div_ceil(2);
402            if self.lower_bounds[mid] <= hash {
403                lo = mid;
404            } else {
405                hi = mid - 1;
406            }
407        }
408        Some(lo)
409    }
410
411    /// Borrow the claimant list (parallel to
412    /// [`Self::lower_bounds`] and [`Self::interval_sizes`]).
413    #[must_use]
414    pub fn claimants(&self) -> &[String] {
415        &self.claimants
416    }
417
418    /// Borrow the lower-bound array.
419    #[must_use]
420    pub fn lower_bounds(&self) -> &[u64] {
421        &self.lower_bounds
422    }
423
424    /// Borrow the per-interval size array.
425    #[must_use]
426    pub fn interval_sizes(&self) -> &[u64] {
427        &self.interval_sizes
428    }
429
430    /// Iterate `(claimant, lower_bound, size)` for diagnostics.
431    ///
432    /// # Examples
433    ///
434    /// ```
435    /// use dynomite::hashkit::random_slicing::RandomSlices;
436    /// let s = RandomSlices::from_uniform(&["a", "b"]).unwrap();
437    /// let n = s.intervals().count();
438    /// assert_eq!(n, 2);
439    /// ```
440    pub fn intervals(&self) -> impl Iterator<Item = (&str, u64, u64)> + '_ {
441        self.claimants
442            .iter()
443            .zip(self.lower_bounds.iter().copied())
444            .zip(self.interval_sizes.iter().copied())
445            .map(|((name, lb), sz)| (name.as_str(), lb, sz))
446    }
447}
448
449fn build_lower_bounds(sizes: &[u64]) -> Vec<u64> {
450    let mut lb: Vec<u64> = Vec::with_capacity(sizes.len());
451    let mut acc: u64 = 0;
452    for s in sizes {
453        lb.push(acc);
454        acc = acc.saturating_add(*s);
455    }
456    lb
457}
458
459fn check_unique<'a, I: IntoIterator<Item = &'a str>>(it: I) -> Result<(), RandomSlicesError> {
460    let mut seen: BTreeSet<&str> = BTreeSet::new();
461    for name in it {
462        if !seen.insert(name) {
463            return Err(RandomSlicesError::DuplicateClaimant {
464                name: name.to_string(),
465            });
466        }
467    }
468    Ok(())
469}
470
471#[cfg(test)]
472mod tests {
473    use super::*;
474
475    #[test]
476    fn empty_input_rejected() {
477        assert_eq!(
478            RandomSlices::from_uniform(&[]),
479            Err(RandomSlicesError::EmptyClaimants)
480        );
481        let empty_sizes: Vec<(String, u64)> = Vec::new();
482        assert_eq!(
483            RandomSlices::from_sizes(&empty_sizes),
484            Err(RandomSlicesError::EmptyClaimants)
485        );
486        let empty_weights: Vec<(String, f64)> = Vec::new();
487        assert_eq!(
488            RandomSlices::from_weights(&empty_weights, 2),
489            Err(RandomSlicesError::EmptyClaimants)
490        );
491    }
492
493    #[test]
494    fn duplicate_rejected() {
495        let err = RandomSlices::from_uniform(&["a", "a"]).unwrap_err();
496        assert!(matches!(err, RandomSlicesError::DuplicateClaimant { .. }));
497    }
498
499    #[test]
500    fn uniform_partition_covers_full_ring() {
501        let s = RandomSlices::from_uniform(&["a", "b", "c", "d"]).unwrap();
502        assert_eq!(s.len(), 4);
503        let total: u128 = s.interval_sizes().iter().map(|&n| u128::from(n)).sum();
504        assert_eq!(total, u128::from(u64::MAX));
505        // Lower bounds strictly ascending; first is zero.
506        assert_eq!(s.lower_bounds()[0], 0);
507        for w in s.lower_bounds().windows(2) {
508            assert!(w[0] < w[1]);
509        }
510    }
511
512    #[test]
513    fn lookup_at_extreme_values_works() {
514        let s = RandomSlices::from_uniform(&["a", "b", "c", "d"]).unwrap();
515        assert_eq!(s.claimant_for(0).unwrap(), "a");
516        assert_eq!(s.claimant_for(u64::MAX).unwrap(), "d");
517    }
518
519    #[test]
520    fn lookup_at_boundaries_picks_upper_interval() {
521        let s = RandomSlices::from_uniform(&["a", "b"]).unwrap();
522        let split = s.lower_bounds()[1];
523        // hash == split is owned by claimant b (lower_bound is
524        // inclusive).
525        assert_eq!(s.claimant_for(split).unwrap(), "b");
526        assert_eq!(s.claimant_for(split - 1).unwrap(), "a");
527    }
528
529    #[test]
530    fn weights_translate_to_proportional_sizes() {
531        let pairs = vec![("a".to_string(), 1.0), ("b".to_string(), 3.0)];
532        let s = RandomSlices::from_weights(&pairs, 2).unwrap();
533        assert_eq!(s.len(), 2);
534        // Claimant "b" should own roughly 3x the slice of "a"
535        // (within a small fraction due to the tail bump).
536        let a_size = u128::from(s.interval_sizes()[0]);
537        let b_size = u128::from(s.interval_sizes()[1]);
538        // Cast through u64 first since both halves fit in 64
539        // bits (u128 was used for the overflow-detection sum,
540        // not for the per-claimant size).
541        #[allow(clippy::cast_precision_loss)]
542        let ratio = (b_size as f64) / (a_size as f64);
543        assert!(
544            ratio > 2.9 && ratio < 3.1,
545            "expected ~3x ratio, got {ratio}"
546        );
547    }
548
549    #[test]
550    fn weights_reject_negative() {
551        let pairs = vec![("a".to_string(), -1.0)];
552        let err = RandomSlices::from_weights(&pairs, 2).unwrap_err();
553        assert!(matches!(err, RandomSlicesError::InvalidWeight { .. }));
554    }
555
556    #[test]
557    fn weights_reject_non_finite() {
558        let pairs = vec![("a".to_string(), f64::NAN)];
559        let err = RandomSlices::from_weights(&pairs, 2).unwrap_err();
560        assert!(matches!(err, RandomSlicesError::InvalidWeight { .. }));
561    }
562
563    #[test]
564    fn weights_reject_all_zero_rounded() {
565        // With decimal_digits = 2, every weight rounds to 0.0.
566        let pairs = vec![("a".to_string(), 0.001), ("b".to_string(), 0.001)];
567        let err = RandomSlices::from_weights(&pairs, 2).unwrap_err();
568        assert!(matches!(err, RandomSlicesError::InvalidWeight { .. }));
569    }
570
571    #[test]
572    fn sizes_reject_zero_interval() {
573        let pairs = vec![("a".to_string(), 0u64)];
574        let err = RandomSlices::from_sizes(&pairs).unwrap_err();
575        assert!(matches!(err, RandomSlicesError::ZeroInterval { .. }));
576    }
577
578    #[test]
579    fn sizes_reject_overflow() {
580        let pairs = vec![("a".to_string(), u64::MAX), ("b".to_string(), 1u64)];
581        let err = RandomSlices::from_sizes(&pairs).unwrap_err();
582        assert!(matches!(err, RandomSlicesError::OversizedSizeSum { .. }));
583    }
584
585    #[test]
586    fn round_trip_1k_random_keys_under_5_percent() {
587        // A 4-claimant uniform partition fed `Murmur3X64_64`
588        // hashes of a thousand synthetic keys lands within 5%
589        // of uniform.
590        use crate::hashkit::{hash64, HashType};
591        let s = RandomSlices::from_uniform(&["a", "b", "c", "d"]).unwrap();
592        let mut counts = [0u32; 4];
593        let total = 1024usize;
594        for i in 0..total {
595            let key = format!("key-{i:08x}");
596            let h = hash64(HashType::Murmur3X64_64, key.as_bytes());
597            let idx = s.index_for(h).unwrap();
598            counts[idx] += 1;
599        }
600        let expected = (total / 4) as u32;
601        for (i, &c) in counts.iter().enumerate() {
602            // Allow up to 25% slack on a 1024-key sample so the
603            // test is robust against hash-bucket variance; the
604            // hegeltest in `tests/random_slicing_property.rs`
605            // covers the asymptotic 5% bound on larger samples.
606            let lo = expected - expected / 4;
607            let hi = expected + expected / 4;
608            assert!(
609                c >= lo && c <= hi,
610                "claimant {i}: count {c} outside [{lo}, {hi}]"
611            );
612        }
613    }
614}