dynomite/hashkit/ketama.rs
1//! Ketama consistent-hashing continuum.
2//!
3//! For every live server we generate `KETAMA_POINTS_PER_SERVER` (160)
4//! continuum points proportional to the server's weight. Each set of 4
5//! points is computed from a single MD5 digest of `"<name>-<idx>"`,
6//! pulling 32-bit slices out of the digest at offsets 0, 4, 8, and 12.
7//! Lookups walk the sorted continuum and pick the first point with a
8//! token strictly greater than the requested hash, wrapping back to the
9//! beginning when the lookup falls past the end.
10
11// Weighted-fraction arithmetic widens u32 weights into f64 in the
12// continuum builder; precision loss at u32::MAX is negligible for ring-
13// point counts. See docs/journal/allowances.md.
14#![allow(clippy::cast_precision_loss)]
15
16use std::cmp::Ordering;
17
18use crate::core::types::DynError;
19use crate::hashkit::md5_signature;
20use crate::hashkit::token::DynToken;
21
22/// 160 points per server.
23///
24/// # Examples
25///
26/// ```
27/// use dynomite::hashkit::ketama::POINTS_PER_SERVER;
28/// assert_eq!(POINTS_PER_SERVER, 160);
29/// ```
30pub const POINTS_PER_SERVER: u32 = 160;
31/// Each MD5 digest yields 4 continuum points.
32///
33/// # Examples
34///
35/// ```
36/// use dynomite::hashkit::ketama::POINTS_PER_HASH;
37/// assert_eq!(POINTS_PER_HASH, 4);
38/// ```
39pub const POINTS_PER_HASH: u32 = 4;
40/// Maximum length of `"<name>-<idx>"` used to seed each digest.
41///
42/// # Examples
43///
44/// ```
45/// use dynomite::hashkit::ketama::MAX_HOSTLEN;
46/// assert_eq!(MAX_HOSTLEN, 86);
47/// ```
48pub const MAX_HOSTLEN: usize = 86;
49
50/// Specification for one server in the continuum.
51///
52/// # Examples
53///
54/// ```
55/// use dynomite::hashkit::ketama::ServerSpec;
56/// let s = ServerSpec { name: "redis-a".into(), weight: 2 };
57/// assert_eq!(s.weight, 2);
58/// ```
59#[derive(Clone, Debug)]
60pub struct ServerSpec {
61 /// Stable, unique identifier (used to derive the continuum points).
62 pub name: String,
63 /// Relative weight; higher weights map to more continuum points.
64 pub weight: u32,
65}
66
67/// One entry on the continuum: a token and the index of the server that
68/// owns it.
69///
70/// # Examples
71///
72/// ```
73/// use dynomite::hashkit::ketama::{Continuum, ServerSpec};
74/// let c = Continuum::build(&[ServerSpec { name: "a".into(), weight: 1 }]).unwrap();
75/// let p = c.points().first().unwrap();
76/// assert_eq!(p.server, 0);
77/// ```
78#[derive(Clone, Debug)]
79pub struct ContinuumPoint {
80 /// Sorted-by-token coordinate.
81 pub token: DynToken,
82 /// Index back into the original server list.
83 pub server: usize,
84}
85
86/// Sorted continuum, ready for `dispatch`.
87///
88/// # Examples
89///
90/// ```
91/// use dynomite::hashkit::ketama::{Continuum, ServerSpec};
92/// use dynomite::hashkit::DynToken;
93/// let c = Continuum::build(&[
94/// ServerSpec { name: "a".into(), weight: 1 },
95/// ServerSpec { name: "b".into(), weight: 1 },
96/// ]).unwrap();
97/// assert!(!c.is_empty());
98/// let _ = c.dispatch(&DynToken::from_u32(123)).unwrap();
99/// ```
100#[derive(Clone, Debug, Default)]
101pub struct Continuum {
102 points: Vec<ContinuumPoint>,
103}
104
105impl Continuum {
106 /// Build the continuum for the supplied servers.
107 ///
108 /// # Errors
109 ///
110 /// Returns `DynError::Generic` when a server's `name + index` would
111 /// overflow the [`MAX_HOSTLEN`] buffer.
112 ///
113 /// # Examples
114 ///
115 /// ```
116 /// use dynomite::hashkit::ketama::{Continuum, ServerSpec, POINTS_PER_SERVER};
117 /// let c = Continuum::build(&[ServerSpec { name: "s0".into(), weight: 1 }]).unwrap();
118 /// assert_eq!(c.len(), POINTS_PER_SERVER as usize);
119 /// ```
120 pub fn build(servers: &[ServerSpec]) -> Result<Self, DynError> {
121 if servers.is_empty() {
122 return Ok(Self::default());
123 }
124 let total_weight: u64 = servers.iter().map(|s| u64::from(s.weight)).sum();
125 if total_weight == 0 {
126 return Ok(Self::default());
127 }
128 let nlive = servers.len() as u64;
129 let mut points: Vec<ContinuumPoint> = Vec::new();
130
131 for (server_idx, server) in servers.iter().enumerate() {
132 let pct = f64::from(server.weight) / total_weight as f64;
133 let raw = pct * f64::from(POINTS_PER_SERVER) / f64::from(POINTS_PER_HASH)
134 * (nlive as f64)
135 + 0.000_000_000_1;
136 let pointer_per_server = raw.floor() as u32 * POINTS_PER_HASH;
137 let groups = pointer_per_server / POINTS_PER_HASH;
138
139 for pointer_index in 1..=groups {
140 let host = format!("{}-{}", server.name, pointer_index - 1);
141 if host.len() >= MAX_HOSTLEN {
142 return Err(DynError::Generic(format!(
143 "ketama host string {host:?} exceeds {MAX_HOSTLEN}"
144 )));
145 }
146 let digest = md5_signature(host.as_bytes());
147 for x in 0..POINTS_PER_HASH {
148 let off = (x as usize) * 4;
149 let value = (u32::from(digest[3 + off]) << 24)
150 | (u32::from(digest[2 + off]) << 16)
151 | (u32::from(digest[1 + off]) << 8)
152 | u32::from(digest[off]);
153 points.push(ContinuumPoint {
154 token: DynToken::from_u32(value),
155 server: server_idx,
156 });
157 }
158 }
159 }
160
161 points.sort_by(|a, b| a.token.cmp(&b.token));
162 Ok(Self { points })
163 }
164
165 /// Number of continuum points.
166 ///
167 /// # Examples
168 ///
169 /// ```
170 /// use dynomite::hashkit::ketama::Continuum;
171 /// assert_eq!(Continuum::default().len(), 0);
172 /// ```
173 #[must_use]
174 pub fn len(&self) -> usize {
175 self.points.len()
176 }
177
178 /// Whether the continuum is empty.
179 ///
180 /// # Examples
181 ///
182 /// ```
183 /// use dynomite::hashkit::ketama::Continuum;
184 /// assert!(Continuum::default().is_empty());
185 /// ```
186 #[must_use]
187 pub fn is_empty(&self) -> bool {
188 self.points.is_empty()
189 }
190
191 /// Read-only view of the continuum points, in sorted order.
192 ///
193 /// # Examples
194 ///
195 /// ```
196 /// use dynomite::hashkit::ketama::{Continuum, ServerSpec};
197 /// let c = Continuum::build(&[ServerSpec { name: "a".into(), weight: 1 }]).unwrap();
198 /// let pts = c.points();
199 /// assert!(pts.windows(2).all(|w| w[0].token <= w[1].token));
200 /// ```
201 #[must_use]
202 pub fn points(&self) -> &[ContinuumPoint] {
203 &self.points
204 }
205
206 /// Map a hash value to the owning server index.
207 ///
208 /// Walks the continuum with a binary search and wraps around when
209 /// the requested token sorts after the last point.
210 ///
211 /// # Errors
212 ///
213 /// Returns an error if the continuum is empty.
214 ///
215 /// # Examples
216 ///
217 /// ```
218 /// use dynomite::hashkit::ketama::{Continuum, ServerSpec};
219 /// use dynomite::hashkit::DynToken;
220 /// let c = Continuum::build(&[
221 /// ServerSpec { name: "a".into(), weight: 1 },
222 /// ServerSpec { name: "b".into(), weight: 1 },
223 /// ]).unwrap();
224 /// let s = c.dispatch(&DynToken::from_u32(0xabcd)).unwrap();
225 /// assert!(s < 2);
226 /// assert!(Continuum::default().dispatch(&DynToken::from_u32(0)).is_err());
227 /// ```
228 pub fn dispatch(&self, hash: &DynToken) -> Result<usize, DynError> {
229 if self.points.is_empty() {
230 return Err(DynError::Generic("empty ketama continuum".into()));
231 }
232 // Lower bound: first point with token >= hash.
233 let mut left = 0usize;
234 let mut right = self.points.len();
235 while left < right {
236 let mid = left + (right - left) / 2;
237 match self.points[mid].token.cmp(hash) {
238 Ordering::Less => left = mid + 1,
239 _ => right = mid,
240 }
241 }
242 let pos = if right == self.points.len() { 0 } else { right };
243 Ok(self.points[pos].server)
244 }
245}
246
247#[cfg(test)]
248mod tests {
249 use super::*;
250
251 fn equal_servers(n: usize) -> Vec<ServerSpec> {
252 (0..n)
253 .map(|i| ServerSpec {
254 name: format!("server-{i}"),
255 weight: 1,
256 })
257 .collect()
258 }
259
260 #[test]
261 fn empty_input_yields_empty_continuum() {
262 let c = Continuum::build(&[]).unwrap();
263 assert!(c.is_empty());
264 assert!(c.dispatch(&DynToken::from_u32(123)).is_err());
265 }
266
267 #[test]
268 fn equal_weight_balanced() {
269 let c = Continuum::build(&equal_servers(4)).unwrap();
270 // Each server should contribute the same number of points.
271 let mut counts = [0usize; 4];
272 for p in c.points() {
273 counts[p.server] += 1;
274 }
275 let expected = counts[0];
276 for c in &counts {
277 assert_eq!(*c, expected);
278 }
279 }
280
281 #[test]
282 fn dispatch_is_deterministic() {
283 let c = Continuum::build(&equal_servers(3)).unwrap();
284 for k in [123u32, 1, 0xdead_beef, 0x8000_0000, u32::MAX] {
285 let a = c.dispatch(&DynToken::from_u32(k)).unwrap();
286 let b = c.dispatch(&DynToken::from_u32(k)).unwrap();
287 assert_eq!(a, b);
288 }
289 }
290
291 #[test]
292 fn dispatch_wraps_past_last_point() {
293 let c = Continuum::build(&equal_servers(2)).unwrap();
294 let last = c.points().last().unwrap().token.clone();
295 let beyond = DynToken::from_u32(last.get_int().wrapping_add(1));
296 let s = c.dispatch(&beyond).unwrap();
297 assert_eq!(s, c.points()[0].server);
298 }
299
300 #[test]
301 fn weight_changes_share() {
302 let servers = vec![
303 ServerSpec {
304 name: "s0".into(),
305 weight: 1,
306 },
307 ServerSpec {
308 name: "s1".into(),
309 weight: 2,
310 },
311 ];
312 let c = Continuum::build(&servers).unwrap();
313 let mut counts = [0usize; 2];
314 for p in c.points() {
315 counts[p.server] += 1;
316 }
317 assert!(counts[1] > counts[0]);
318 }
319}