Skip to main content

pingora_ketama/
lib.rs

1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! # pingora-ketama
16//! A Rust port of the nginx consistent hashing algorithm.
17//!
18//! This crate provides a consistent hashing algorithm which is identical in
19//! behavior to [nginx consistent hashing](https://www.nginx.com/resources/wiki/modules/consistent_hash/).
20//!
21//! Using a consistent hash strategy like this is useful when one wants to
22//! minimize the amount of requests that need to be rehashed to different nodes
23//! when a node is added or removed.
24//!
25//! Here's a simple example of how one might use it:
26//!
27//! ```
28//! use pingora_ketama::{Bucket, Continuum};
29//!
30//! # #[allow(clippy::needless_doctest_main)]
31//! fn main() {
32//!     // Set up a continuum with a few nodes of various weight.
33//!     let mut buckets = vec![];
34//!     buckets.push(Bucket::new("127.0.0.1:12345".parse().unwrap(), 1));
35//!     buckets.push(Bucket::new("127.0.0.2:12345".parse().unwrap(), 2));
36//!     buckets.push(Bucket::new("127.0.0.3:12345".parse().unwrap(), 3));
37//!     let ring = Continuum::new(&buckets);
38//!
39//!     // Let's see what the result is for a few keys:
40//!     for key in &["some_key", "another_key", "last_key"] {
41//!         let node = ring.node(key.as_bytes()).unwrap();
42//!         println!("{}: {}:{}", key, node.ip(), node.port());
43//!     }
44//! }
45//! ```
46//!
47//! ```bash
48//! # Output:
49//! some_key: 127.0.0.3:12345
50//! another_key: 127.0.0.3:12345
51//! last_key: 127.0.0.2:12345
52//! ```
53//!
54//! We've provided a health-aware example in
55//! `pingora-ketama/examples/health_aware_selector.rs`.
56//!
57//! For a carefully crafted real-world example, see the [`pingora-load-balancing`](https://docs.rs/pingora-load-balancing)
58//! crate.
59
60use std::cmp::Ordering;
61use std::io::Write;
62use std::net::SocketAddr;
63
64use crc32fast::Hasher;
65#[cfg(feature = "v2")]
66use i_key_sort::sort::one_key_cmp::OneKeyAndCmpSort;
67
68/// This constant is copied from nginx. It will create 160 points per weight
69/// unit. For example, a weight of 2 will create 320 points on the ring.
70pub const DEFAULT_POINT_MULTIPLE: u32 = 160;
71
72/// A [Bucket] represents a server for consistent hashing
73///
74/// A [Bucket] contains a [SocketAddr] to the server and a weight associated with it.
75#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
76pub struct Bucket {
77    // The node name.
78    // TODO: UDS
79    node: SocketAddr,
80
81    // The weight associated with a node. A higher weight indicates that this node should
82    // receive more requests.
83    weight: u32,
84}
85
86impl Bucket {
87    /// Return a new bucket with the given node and weight.
88    ///
89    /// The chance that a [Bucket] is selected is proportional to the relative weight of all [Bucket]s.
90    ///
91    /// # Panics
92    ///
93    /// This will panic if the weight is zero.
94    pub fn new(node: SocketAddr, weight: u32) -> Self {
95        assert!(weight != 0, "weight must be at least one");
96
97        Bucket { node, weight }
98    }
99}
100
101// A point on the continuum.
102#[derive(Clone, Debug, Eq, PartialEq)]
103struct PointV1 {
104    // the index to the actual address
105    node: u32,
106    hash: u32,
107}
108
109// We only want to compare the hash when sorting, so we implement these traits by hand.
110impl Ord for PointV1 {
111    fn cmp(&self, other: &Self) -> Ordering {
112        self.hash.cmp(&other.hash)
113    }
114}
115
116impl PartialOrd for PointV1 {
117    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
118        Some(self.cmp(other))
119    }
120}
121
122impl PointV1 {
123    fn new(node: u32, hash: u32) -> Self {
124        PointV1 { node, hash }
125    }
126}
127
128/// A point on the continuum.
129///
130/// We are trying to save memory here, so this struct is equivalent to a struct
131/// this this definition, but doesn't require using the "untrustworthy" compact
132/// repr. This does mean we have to do the memory layout manually though, but
133/// the benchmarks show there is no performance hit for it.
134///
135/// #[repr(Rust, packed)]
136/// struct Point {
137///     node: u16,
138///     hash: u32,
139/// }
140#[cfg(feature = "v2")]
141#[derive(Copy, Clone, Eq, PartialEq)]
142#[repr(transparent)]
143struct PointV2([u8; 6]);
144
145#[cfg(feature = "v2")]
146impl PointV2 {
147    fn new(node: u16, hash: u32) -> Self {
148        let mut this = [0; 6];
149
150        this[0..4].copy_from_slice(&hash.to_ne_bytes());
151        this[4..6].copy_from_slice(&node.to_ne_bytes());
152
153        Self(this)
154    }
155
156    /// Return the hash of the point which is stored in the first 4 bytes (big endian).
157    fn hash(&self) -> u32 {
158        u32::from_ne_bytes(self.0[0..4].try_into().expect("There are exactly 4 bytes"))
159    }
160
161    /// Return the node of the point which is stored in the last 2 bytes (big endian).
162    fn node(&self) -> u16 {
163        u16::from_ne_bytes(self.0[4..6].try_into().expect("There are exactly 2 bytes"))
164    }
165}
166
167#[derive(Copy, Clone, Debug, Eq, PartialEq, Default)]
168pub enum Version {
169    #[default]
170    V1,
171    #[cfg(feature = "v2")]
172    V2 { point_multiple: u32 },
173}
174
175impl Version {
176    fn point_multiple(&self) -> u32 {
177        match self {
178            Version::V1 => DEFAULT_POINT_MULTIPLE,
179            #[cfg(feature = "v2")]
180            Version::V2 { point_multiple } => *point_multiple,
181        }
182    }
183}
184
185enum RingBuilder {
186    V1(Vec<PointV1>),
187    #[cfg(feature = "v2")]
188    V2(Vec<PointV2>),
189}
190
191impl RingBuilder {
192    fn new(version: Version, total_weight: u32) -> Self {
193        match version {
194            Version::V1 => RingBuilder::V1(Vec::with_capacity(
195                (total_weight * DEFAULT_POINT_MULTIPLE) as usize,
196            )),
197            #[cfg(feature = "v2")]
198            Version::V2 { point_multiple } => {
199                RingBuilder::V2(Vec::with_capacity((total_weight * point_multiple) as usize))
200            }
201        }
202    }
203
204    fn push(&mut self, node: u16, hash: u32) {
205        match self {
206            RingBuilder::V1(ring) => {
207                ring.push(PointV1::new(node as u32, hash));
208            }
209            #[cfg(feature = "v2")]
210            RingBuilder::V2(ring) => {
211                ring.push(PointV2::new(node, hash));
212            }
213        }
214    }
215
216    #[allow(unused)]
217    fn sort(&mut self, addresses: &[SocketAddr]) {
218        match self {
219            RingBuilder::V1(ring) => {
220                // Sort and remove any duplicates.
221                ring.sort_unstable();
222                ring.dedup_by(|a, b| a.hash == b.hash);
223            }
224            #[cfg(feature = "v2")]
225            RingBuilder::V2(ring) => {
226                ring.sort_by_one_key_then_by(
227                    true,
228                    |p| p.hash(),
229                    |p1, p2| addresses[p1.node() as usize].cmp(&addresses[p2.node() as usize]),
230                );
231
232                //secondary_radix_sort(ring, |p| p.hash(), |p| addresses[p.node() as usize]);
233                ring.dedup_by(|a, b| a.0[0..4] == b.0[0..4]);
234            }
235        }
236    }
237}
238
239impl From<RingBuilder> for VersionedRing {
240    fn from(ring: RingBuilder) -> Self {
241        match ring {
242            RingBuilder::V1(ring) => VersionedRing::V1(ring.into_boxed_slice()),
243            #[cfg(feature = "v2")]
244            RingBuilder::V2(ring) => VersionedRing::V2(ring.into_boxed_slice()),
245        }
246    }
247}
248
249enum VersionedRing {
250    V1(Box<[PointV1]>),
251    #[cfg(feature = "v2")]
252    V2(Box<[PointV2]>),
253}
254
255impl VersionedRing {
256    /// Find the associated index for the given input.
257    pub fn node_idx(&self, hash: u32) -> usize {
258        // The `Result` returned here is either a match or the error variant
259        // returns where the value would be inserted.
260        let search_result = match self {
261            VersionedRing::V1(ring) => ring.binary_search_by(|p| p.hash.cmp(&hash)),
262            #[cfg(feature = "v2")]
263            VersionedRing::V2(ring) => ring.binary_search_by(|p| p.hash().cmp(&hash)),
264        };
265
266        match search_result {
267            Ok(i) => i,
268            Err(i) => {
269                // We wrap around to the front if this value would be
270                // inserted at the end.
271                if i == self.len() {
272                    0
273                } else {
274                    i
275                }
276            }
277        }
278    }
279
280    pub fn get(&self, index: usize) -> Option<usize> {
281        match self {
282            VersionedRing::V1(ring) => ring.get(index).map(|p| p.node as usize),
283            #[cfg(feature = "v2")]
284            VersionedRing::V2(ring) => ring.get(index).map(|p| p.node() as usize),
285        }
286    }
287
288    pub fn len(&self) -> usize {
289        match self {
290            VersionedRing::V1(ring) => ring.len(),
291            #[cfg(feature = "v2")]
292            VersionedRing::V2(ring) => ring.len(),
293        }
294    }
295}
296
297/// The consistent hashing ring
298///
299/// A [Continuum] represents a ring of buckets where a node is associated with various points on
300/// the ring.
301pub struct Continuum {
302    ring: VersionedRing,
303    addrs: Box<[SocketAddr]>,
304}
305
306impl Continuum {
307    pub fn new(buckets: &[Bucket]) -> Self {
308        Self::new_with_version(buckets, Version::default())
309    }
310
311    /// Create a new [Continuum] with the given list of buckets.
312    pub fn new_with_version(buckets: &[Bucket], version: Version) -> Self {
313        if buckets.is_empty() {
314            return Continuum {
315                ring: VersionedRing::V1(Box::new([])),
316                addrs: Box::new([]),
317            };
318        }
319
320        // The total weight is multiplied by the factor of points to create many points per node.
321        let total_weight: u32 = buckets.iter().fold(0, |sum, b| sum + b.weight);
322        let mut ring = RingBuilder::new(version, total_weight);
323        let mut addrs = Vec::with_capacity(buckets.len());
324
325        for bucket in buckets {
326            let mut hasher = Hasher::new();
327
328            // We only do the following for backwards compatibility with nginx/memcache:
329            // - Convert SocketAddr to string
330            // - The hash input is as follows "HOST EMPTY PORT PREVIOUS_HASH". Spaces are only added
331            //   for readability.
332            // TODO: remove this logic and hash the literal SocketAddr once we no longer
333            // need backwards compatibility
334
335            // with_capacity = max_len(ipv6)(39) + len(null)(1) + max_len(port)(5)
336            let mut hash_bytes = Vec::with_capacity(39 + 1 + 5);
337            write!(&mut hash_bytes, "{}", bucket.node.ip()).unwrap();
338            write!(&mut hash_bytes, "\0").unwrap();
339            write!(&mut hash_bytes, "{}", bucket.node.port()).unwrap();
340            hasher.update(hash_bytes.as_ref());
341
342            // A higher weight will add more points for this node.
343            let num_points = bucket.weight * version.point_multiple();
344
345            // This is appended to the crc32 hash for each point.
346            let mut prev_hash: u32 = 0;
347            addrs.push(bucket.node);
348            let node = addrs.len() - 1;
349            for _ in 0..num_points {
350                let mut hasher = hasher.clone();
351                hasher.update(&prev_hash.to_le_bytes());
352
353                let hash = hasher.finalize();
354                ring.push(node as u16, hash);
355                prev_hash = hash;
356            }
357        }
358
359        let addrs = addrs.into_boxed_slice();
360
361        // Sort and remove any duplicates.
362        ring.sort(&addrs);
363
364        Continuum {
365            ring: ring.into(),
366            addrs,
367        }
368    }
369
370    /// Find the associated index for the given input.
371    pub fn node_idx(&self, input: &[u8]) -> usize {
372        let hash = crc32fast::hash(input);
373        self.ring.node_idx(hash)
374    }
375
376    /// Hash the given `hash_key` to the server address.
377    pub fn node(&self, hash_key: &[u8]) -> Option<SocketAddr> {
378        self.ring
379            .get(self.node_idx(hash_key)) // should we unwrap here?
380            .map(|n| self.addrs[n])
381    }
382
383    /// Get an iterator of nodes starting at the original hashed node of the `hash_key`.
384    ///
385    /// This function is useful to find failover servers if the original ones are offline, which is
386    /// cheaper than rebuilding the entire hash ring.
387    pub fn node_iter(&self, hash_key: &[u8]) -> NodeIterator<'_> {
388        NodeIterator {
389            idx: self.node_idx(hash_key),
390            continuum: self,
391        }
392    }
393
394    pub fn get_addr(&self, idx: &mut usize) -> Option<&SocketAddr> {
395        let point = self.ring.get(*idx);
396        if point.is_some() {
397            // only update idx for non-empty ring otherwise we will panic on modulo 0
398            *idx = (*idx + 1) % self.ring.len();
399        }
400        point.map(|n| &self.addrs[n])
401    }
402}
403
404/// Iterator over a Continuum
405pub struct NodeIterator<'a> {
406    idx: usize,
407    continuum: &'a Continuum,
408}
409
410impl<'a> Iterator for NodeIterator<'a> {
411    type Item = &'a SocketAddr;
412
413    fn next(&mut self) -> Option<Self::Item> {
414        self.continuum.get_addr(&mut self.idx)
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use std::net::SocketAddr;
421    use std::path::Path;
422
423    use super::{Bucket, Continuum};
424
425    fn get_sockaddr(ip: &str) -> SocketAddr {
426        ip.parse().unwrap()
427    }
428
429    #[test]
430    fn consistency_after_adding_host() {
431        fn assert_hosts(c: &Continuum) {
432            assert_eq!(c.node(b"a"), Some(get_sockaddr("127.0.0.10:6443")));
433            assert_eq!(c.node(b"b"), Some(get_sockaddr("127.0.0.5:6443")));
434        }
435
436        let buckets: Vec<_> = (1..11)
437            .map(|u| Bucket::new(get_sockaddr(&format!("127.0.0.{u}:6443")), 1))
438            .collect();
439        let c = Continuum::new(&buckets);
440        assert_hosts(&c);
441
442        // Now add a new host and ensure that the hosts don't get shuffled.
443        let buckets: Vec<_> = (1..12)
444            .map(|u| Bucket::new(get_sockaddr(&format!("127.0.0.{u}:6443")), 1))
445            .collect();
446
447        let c = Continuum::new(&buckets);
448        assert_hosts(&c);
449    }
450
451    #[test]
452    fn matches_nginx_sample() {
453        let upstream_hosts = ["127.0.0.1:7777", "127.0.0.1:7778"];
454        let upstream_hosts = upstream_hosts.iter().map(|i| get_sockaddr(i));
455
456        let mut buckets = Vec::new();
457        for upstream in upstream_hosts {
458            buckets.push(Bucket::new(upstream, 1));
459        }
460
461        let c = Continuum::new(&buckets);
462
463        assert_eq!(c.node(b"/some/path"), Some(get_sockaddr("127.0.0.1:7778")));
464        assert_eq!(
465            c.node(b"/some/longer/path"),
466            Some(get_sockaddr("127.0.0.1:7777"))
467        );
468        assert_eq!(
469            c.node(b"/sad/zaidoon"),
470            Some(get_sockaddr("127.0.0.1:7778"))
471        );
472        assert_eq!(c.node(b"/g"), Some(get_sockaddr("127.0.0.1:7777")));
473        assert_eq!(
474            c.node(b"/pingora/team/is/cool/and/this/is/a/long/uri"),
475            Some(get_sockaddr("127.0.0.1:7778"))
476        );
477        assert_eq!(
478            c.node(b"/i/am/not/confident/in/this/code"),
479            Some(get_sockaddr("127.0.0.1:7777"))
480        );
481    }
482
483    #[test]
484    fn matches_nginx_sample_data() {
485        let upstream_hosts = [
486            "10.0.0.1:443",
487            "10.0.0.2:443",
488            "10.0.0.3:443",
489            "10.0.0.4:443",
490            "10.0.0.5:443",
491            "10.0.0.6:443",
492            "10.0.0.7:443",
493            "10.0.0.8:443",
494            "10.0.0.9:443",
495        ];
496        let upstream_hosts = upstream_hosts.iter().map(|i| get_sockaddr(i));
497
498        let mut buckets = Vec::new();
499        for upstream in upstream_hosts {
500            buckets.push(Bucket::new(upstream, 100));
501        }
502
503        let c = Continuum::new(&buckets);
504
505        let path = Path::new(env!("CARGO_MANIFEST_DIR"))
506            .join("test-data")
507            .join("sample-nginx-upstream.csv");
508
509        let mut rdr = csv::ReaderBuilder::new()
510            .has_headers(false)
511            .from_path(path)
512            .unwrap();
513
514        for pair in rdr.records() {
515            let pair = pair.unwrap();
516            let uri = pair.get(0).unwrap();
517            let upstream = pair.get(1).unwrap();
518
519            let got = c.node(uri.as_bytes()).unwrap();
520            assert_eq!(got, get_sockaddr(upstream));
521        }
522    }
523
524    #[test]
525    fn node_iter() {
526        let upstream_hosts = ["127.0.0.1:7777", "127.0.0.1:7778", "127.0.0.1:7779"];
527        let upstream_hosts = upstream_hosts.iter().map(|i| get_sockaddr(i));
528
529        let mut buckets = Vec::new();
530        for upstream in upstream_hosts {
531            buckets.push(Bucket::new(upstream, 1));
532        }
533
534        let c = Continuum::new(&buckets);
535        let mut iter = c.node_iter(b"doghash");
536        assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7778")));
537        assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7779")));
538        assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7779")));
539        assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7777")));
540        assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7777")));
541        assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7778")));
542        assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7778")));
543        assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7779")));
544
545        // drop 127.0.0.1:7777
546        let upstream_hosts = ["127.0.0.1:7777", "127.0.0.1:7779"];
547        let upstream_hosts = upstream_hosts.iter().map(|i| get_sockaddr(i));
548
549        let mut buckets = Vec::new();
550        for upstream in upstream_hosts {
551            buckets.push(Bucket::new(upstream, 1));
552        }
553
554        let c = Continuum::new(&buckets);
555        let mut iter = c.node_iter(b"doghash");
556        // 127.0.0.1:7778 nodes are gone now
557        // assert_eq!(iter.next(), Some("127.0.0.1:7778"));
558        assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7779")));
559        assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7779")));
560        assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7777")));
561        assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7777")));
562        // assert_eq!(iter.next(), Some("127.0.0.1:7778"));
563        // assert_eq!(iter.next(), Some("127.0.0.1:7778"));
564        assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7779")));
565
566        // assert infinite cycle
567        let c = Continuum::new(&[Bucket::new(get_sockaddr("127.0.0.1:7777"), 1)]);
568        let mut iter = c.node_iter(b"doghash");
569
570        let start_idx = iter.idx;
571        for _ in 0..c.ring.len() {
572            assert!(iter.next().is_some());
573        }
574        // assert wrap around
575        assert_eq!(start_idx, iter.idx);
576    }
577
578    #[test]
579    fn test_empty() {
580        let c = Continuum::new(&[]);
581        assert!(c.node(b"doghash").is_none());
582
583        let mut iter = c.node_iter(b"doghash");
584        assert!(iter.next().is_none());
585        assert!(iter.next().is_none());
586        assert!(iter.next().is_none());
587    }
588
589    #[test]
590    fn test_ipv6_ring() {
591        let upstream_hosts = ["[::1]:7777", "[::1]:7778", "[::1]:7779"];
592        let upstream_hosts = upstream_hosts.iter().map(|i| get_sockaddr(i));
593
594        let mut buckets = Vec::new();
595        for upstream in upstream_hosts {
596            buckets.push(Bucket::new(upstream, 1));
597        }
598
599        let c = Continuum::new(&buckets);
600        let mut iter = c.node_iter(b"doghash");
601        assert_eq!(iter.next(), Some(&get_sockaddr("[::1]:7777")));
602        assert_eq!(iter.next(), Some(&get_sockaddr("[::1]:7778")));
603        assert_eq!(iter.next(), Some(&get_sockaddr("[::1]:7777")));
604        assert_eq!(iter.next(), Some(&get_sockaddr("[::1]:7778")));
605        assert_eq!(iter.next(), Some(&get_sockaddr("[::1]:7778")));
606        assert_eq!(iter.next(), Some(&get_sockaddr("[::1]:7777")));
607        assert_eq!(iter.next(), Some(&get_sockaddr("[::1]:7779")));
608    }
609}