Skip to main content

dynomite/hashkit/
ketama.rs

1//! Ketama consistent-hashing continuum.
2//!
3//! For every live server we generate `KETAMA_POINTS_PER_SERVER` (160)
4//! continuum points proportional to the server's weight. Each set of 4
5//! points is computed from a single MD5 digest of `"<name>-<idx>"`,
6//! pulling 32-bit slices out of the digest at offsets 0, 4, 8, and 12.
7//! Lookups walk the sorted continuum and pick the first point with a
8//! token strictly greater than the requested hash, wrapping back to the
9//! beginning when the lookup falls past the end.
10
11// Weighted-fraction arithmetic widens u32 weights into f64 in the
12// continuum builder; precision loss at u32::MAX is negligible for ring-
13// point counts. See docs/journal/allowances.md.
14#![allow(clippy::cast_precision_loss)]
15
16use std::cmp::Ordering;
17
18use crate::core::types::DynError;
19use crate::hashkit::md5_signature;
20use crate::hashkit::token::DynToken;
21
22/// 160 points per server.
23///
24/// # Examples
25///
26/// ```
27/// use dynomite::hashkit::ketama::POINTS_PER_SERVER;
28/// assert_eq!(POINTS_PER_SERVER, 160);
29/// ```
30pub const POINTS_PER_SERVER: u32 = 160;
31/// Each MD5 digest yields 4 continuum points.
32///
33/// # Examples
34///
35/// ```
36/// use dynomite::hashkit::ketama::POINTS_PER_HASH;
37/// assert_eq!(POINTS_PER_HASH, 4);
38/// ```
39pub const POINTS_PER_HASH: u32 = 4;
40/// Maximum length of `"<name>-<idx>"` used to seed each digest.
41///
42/// # Examples
43///
44/// ```
45/// use dynomite::hashkit::ketama::MAX_HOSTLEN;
46/// assert_eq!(MAX_HOSTLEN, 86);
47/// ```
48pub const MAX_HOSTLEN: usize = 86;
49
50/// Specification for one server in the continuum.
51///
52/// # Examples
53///
54/// ```
55/// use dynomite::hashkit::ketama::ServerSpec;
56/// let s = ServerSpec { name: "redis-a".into(), weight: 2 };
57/// assert_eq!(s.weight, 2);
58/// ```
59#[derive(Clone, Debug)]
60pub struct ServerSpec {
61    /// Stable, unique identifier (used to derive the continuum points).
62    pub name: String,
63    /// Relative weight; higher weights map to more continuum points.
64    pub weight: u32,
65}
66
67/// One entry on the continuum: a token and the index of the server that
68/// owns it.
69///
70/// # Examples
71///
72/// ```
73/// use dynomite::hashkit::ketama::{Continuum, ServerSpec};
74/// let c = Continuum::build(&[ServerSpec { name: "a".into(), weight: 1 }]).unwrap();
75/// let p = c.points().first().unwrap();
76/// assert_eq!(p.server, 0);
77/// ```
78#[derive(Clone, Debug)]
79pub struct ContinuumPoint {
80    /// Sorted-by-token coordinate.
81    pub token: DynToken,
82    /// Index back into the original server list.
83    pub server: usize,
84}
85
86/// Sorted continuum, ready for `dispatch`.
87///
88/// # Examples
89///
90/// ```
91/// use dynomite::hashkit::ketama::{Continuum, ServerSpec};
92/// use dynomite::hashkit::DynToken;
93/// let c = Continuum::build(&[
94///     ServerSpec { name: "a".into(), weight: 1 },
95///     ServerSpec { name: "b".into(), weight: 1 },
96/// ]).unwrap();
97/// assert!(!c.is_empty());
98/// let _ = c.dispatch(&DynToken::from_u32(123)).unwrap();
99/// ```
100#[derive(Clone, Debug, Default)]
101pub struct Continuum {
102    points: Vec<ContinuumPoint>,
103}
104
105impl Continuum {
106    /// Build the continuum for the supplied servers.
107    ///
108    /// # Errors
109    ///
110    /// Returns `DynError::Generic` when a server's `name + index` would
111    /// overflow the [`MAX_HOSTLEN`] buffer.
112    ///
113    /// # Examples
114    ///
115    /// ```
116    /// use dynomite::hashkit::ketama::{Continuum, ServerSpec, POINTS_PER_SERVER};
117    /// let c = Continuum::build(&[ServerSpec { name: "s0".into(), weight: 1 }]).unwrap();
118    /// assert_eq!(c.len(), POINTS_PER_SERVER as usize);
119    /// ```
120    pub fn build(servers: &[ServerSpec]) -> Result<Self, DynError> {
121        if servers.is_empty() {
122            return Ok(Self::default());
123        }
124        let total_weight: u64 = servers.iter().map(|s| u64::from(s.weight)).sum();
125        if total_weight == 0 {
126            return Ok(Self::default());
127        }
128        let nlive = servers.len() as u64;
129        let mut points: Vec<ContinuumPoint> = Vec::new();
130
131        for (server_idx, server) in servers.iter().enumerate() {
132            let pct = f64::from(server.weight) / total_weight as f64;
133            let raw = pct * f64::from(POINTS_PER_SERVER) / f64::from(POINTS_PER_HASH)
134                * (nlive as f64)
135                + 0.000_000_000_1;
136            let pointer_per_server = raw.floor() as u32 * POINTS_PER_HASH;
137            let groups = pointer_per_server / POINTS_PER_HASH;
138
139            for pointer_index in 1..=groups {
140                let host = format!("{}-{}", server.name, pointer_index - 1);
141                if host.len() >= MAX_HOSTLEN {
142                    return Err(DynError::Generic(format!(
143                        "ketama host string {host:?} exceeds {MAX_HOSTLEN}"
144                    )));
145                }
146                let digest = md5_signature(host.as_bytes());
147                for x in 0..POINTS_PER_HASH {
148                    let off = (x as usize) * 4;
149                    let value = (u32::from(digest[3 + off]) << 24)
150                        | (u32::from(digest[2 + off]) << 16)
151                        | (u32::from(digest[1 + off]) << 8)
152                        | u32::from(digest[off]);
153                    points.push(ContinuumPoint {
154                        token: DynToken::from_u32(value),
155                        server: server_idx,
156                    });
157                }
158            }
159        }
160
161        points.sort_by(|a, b| a.token.cmp(&b.token));
162        Ok(Self { points })
163    }
164
165    /// Number of continuum points.
166    ///
167    /// # Examples
168    ///
169    /// ```
170    /// use dynomite::hashkit::ketama::Continuum;
171    /// assert_eq!(Continuum::default().len(), 0);
172    /// ```
173    #[must_use]
174    pub fn len(&self) -> usize {
175        self.points.len()
176    }
177
178    /// Whether the continuum is empty.
179    ///
180    /// # Examples
181    ///
182    /// ```
183    /// use dynomite::hashkit::ketama::Continuum;
184    /// assert!(Continuum::default().is_empty());
185    /// ```
186    #[must_use]
187    pub fn is_empty(&self) -> bool {
188        self.points.is_empty()
189    }
190
191    /// Read-only view of the continuum points, in sorted order.
192    ///
193    /// # Examples
194    ///
195    /// ```
196    /// use dynomite::hashkit::ketama::{Continuum, ServerSpec};
197    /// let c = Continuum::build(&[ServerSpec { name: "a".into(), weight: 1 }]).unwrap();
198    /// let pts = c.points();
199    /// assert!(pts.windows(2).all(|w| w[0].token <= w[1].token));
200    /// ```
201    #[must_use]
202    pub fn points(&self) -> &[ContinuumPoint] {
203        &self.points
204    }
205
206    /// Map a hash value to the owning server index.
207    ///
208    /// Walks the continuum with a binary search and wraps around when
209    /// the requested token sorts after the last point.
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if the continuum is empty.
214    ///
215    /// # Examples
216    ///
217    /// ```
218    /// use dynomite::hashkit::ketama::{Continuum, ServerSpec};
219    /// use dynomite::hashkit::DynToken;
220    /// let c = Continuum::build(&[
221    ///     ServerSpec { name: "a".into(), weight: 1 },
222    ///     ServerSpec { name: "b".into(), weight: 1 },
223    /// ]).unwrap();
224    /// let s = c.dispatch(&DynToken::from_u32(0xabcd)).unwrap();
225    /// assert!(s < 2);
226    /// assert!(Continuum::default().dispatch(&DynToken::from_u32(0)).is_err());
227    /// ```
228    pub fn dispatch(&self, hash: &DynToken) -> Result<usize, DynError> {
229        if self.points.is_empty() {
230            return Err(DynError::Generic("empty ketama continuum".into()));
231        }
232        // Lower bound: first point with token >= hash.
233        let mut left = 0usize;
234        let mut right = self.points.len();
235        while left < right {
236            let mid = left + (right - left) / 2;
237            match self.points[mid].token.cmp(hash) {
238                Ordering::Less => left = mid + 1,
239                _ => right = mid,
240            }
241        }
242        let pos = if right == self.points.len() { 0 } else { right };
243        Ok(self.points[pos].server)
244    }
245}
246
247#[cfg(test)]
248mod tests {
249    use super::*;
250
251    fn equal_servers(n: usize) -> Vec<ServerSpec> {
252        (0..n)
253            .map(|i| ServerSpec {
254                name: format!("server-{i}"),
255                weight: 1,
256            })
257            .collect()
258    }
259
260    #[test]
261    fn empty_input_yields_empty_continuum() {
262        let c = Continuum::build(&[]).unwrap();
263        assert!(c.is_empty());
264        assert!(c.dispatch(&DynToken::from_u32(123)).is_err());
265    }
266
267    #[test]
268    fn equal_weight_balanced() {
269        let c = Continuum::build(&equal_servers(4)).unwrap();
270        // Each server should contribute the same number of points.
271        let mut counts = [0usize; 4];
272        for p in c.points() {
273            counts[p.server] += 1;
274        }
275        let expected = counts[0];
276        for c in &counts {
277            assert_eq!(*c, expected);
278        }
279    }
280
281    #[test]
282    fn dispatch_is_deterministic() {
283        let c = Continuum::build(&equal_servers(3)).unwrap();
284        for k in [123u32, 1, 0xdead_beef, 0x8000_0000, u32::MAX] {
285            let a = c.dispatch(&DynToken::from_u32(k)).unwrap();
286            let b = c.dispatch(&DynToken::from_u32(k)).unwrap();
287            assert_eq!(a, b);
288        }
289    }
290
291    #[test]
292    fn dispatch_wraps_past_last_point() {
293        let c = Continuum::build(&equal_servers(2)).unwrap();
294        let last = c.points().last().unwrap().token.clone();
295        let beyond = DynToken::from_u32(last.get_int().wrapping_add(1));
296        let s = c.dispatch(&beyond).unwrap();
297        assert_eq!(s, c.points()[0].server);
298    }
299
300    #[test]
301    fn weight_changes_share() {
302        let servers = vec![
303            ServerSpec {
304                name: "s0".into(),
305                weight: 1,
306            },
307            ServerSpec {
308                name: "s1".into(),
309                weight: 2,
310            },
311        ];
312        let c = Continuum::build(&servers).unwrap();
313        let mut counts = [0usize; 2];
314        for p in c.points() {
315            counts[p.server] += 1;
316        }
317        assert!(counts[1] > counts[0]);
318    }
319}