Skip to main content

pingora_load_balancing/selection/
consistent.rs

1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Consistent Hashing
16
17use super::*;
18use pingora_core::protocols::l4::socket::SocketAddr;
19use pingora_ketama::{Bucket, Continuum, Version};
20use std::collections::HashMap;
21
22/// Weighted Ketama consistent hashing
23pub struct KetamaHashing {
24    ring: Continuum,
25    // TODO: update Ketama to just store this
26    backends: HashMap<SocketAddr, Backend>,
27}
28
29#[derive(Clone, Debug, Copy, Default)]
30pub struct KetamaConfig {
31    pub point_multiple: Option<u32>,
32}
33
34impl BackendSelection for KetamaHashing {
35    type Iter = OwnedNodeIterator;
36
37    type Config = KetamaConfig;
38
39    fn build_with_config(backends: &BTreeSet<Backend>, config: &Self::Config) -> Self {
40        let KetamaConfig { point_multiple } = *config;
41
42        let buckets: Vec<_> = backends
43            .iter()
44            .filter_map(|b| {
45                // FIXME: ketama only supports Inet addr, UDS addrs are ignored here
46                if let SocketAddr::Inet(addr) = b.addr {
47                    Some(Bucket::new(addr, b.weight as u32))
48                } else {
49                    None
50                }
51            })
52            .collect();
53        let new_backends = backends
54            .iter()
55            .map(|b| (b.addr.clone(), b.clone()))
56            .collect();
57
58        #[allow(unused)]
59        let version = if let Some(point_multiple) = point_multiple {
60            match () {
61                #[cfg(feature = "v2")]
62                () => Version::V2 { point_multiple },
63                #[cfg(not(feature = "v2"))]
64                () => Version::V1,
65            }
66        } else {
67            Version::V1
68        };
69
70        KetamaHashing {
71            ring: Continuum::new_with_version(&buckets, version),
72            backends: new_backends,
73        }
74    }
75
76    fn build(backends: &BTreeSet<Backend>) -> Self {
77        Self::build_with_config(backends, &KetamaConfig::default())
78    }
79
80    fn iter(self: &Arc<Self>, key: &[u8]) -> Self::Iter {
81        OwnedNodeIterator {
82            idx: self.ring.node_idx(key),
83            ring: self.clone(),
84        }
85    }
86}
87
88/// Iterator over a Continuum
89pub struct OwnedNodeIterator {
90    idx: usize,
91    ring: Arc<KetamaHashing>,
92}
93
94impl BackendIter for OwnedNodeIterator {
95    fn next(&mut self) -> Option<&Backend> {
96        self.ring.ring.get_addr(&mut self.idx).and_then(|addr| {
97            let addr = SocketAddr::Inet(*addr);
98            self.ring.backends.get(&addr)
99        })
100    }
101}
102
103#[cfg(test)]
104mod test {
105    use super::*;
106
107    #[test]
108    fn test_ketama() {
109        let b1 = Backend::new("1.1.1.1:80").unwrap();
110        let b2 = Backend::new("1.0.0.1:80").unwrap();
111        let b3 = Backend::new("1.0.0.255:80").unwrap();
112        let backends = BTreeSet::from_iter([b1.clone(), b2.clone(), b3.clone()]);
113        let hash = Arc::new(KetamaHashing::build(&backends));
114
115        let mut iter = hash.iter(b"test0");
116        assert_eq!(iter.next(), Some(&b2));
117        let mut iter = hash.iter(b"test1");
118        assert_eq!(iter.next(), Some(&b1));
119        let mut iter = hash.iter(b"test2");
120        assert_eq!(iter.next(), Some(&b1));
121        let mut iter = hash.iter(b"test3");
122        assert_eq!(iter.next(), Some(&b1));
123        let mut iter = hash.iter(b"test4");
124        assert_eq!(iter.next(), Some(&b1));
125        let mut iter = hash.iter(b"test5");
126        assert_eq!(iter.next(), Some(&b3));
127        let mut iter = hash.iter(b"test6");
128        assert_eq!(iter.next(), Some(&b1));
129        let mut iter = hash.iter(b"test7");
130        assert_eq!(iter.next(), Some(&b3));
131        let mut iter = hash.iter(b"test8");
132        assert_eq!(iter.next(), Some(&b1));
133        let mut iter = hash.iter(b"test9");
134        assert_eq!(iter.next(), Some(&b2));
135
136        // remove b3
137        let backends = BTreeSet::from_iter([b1.clone(), b2.clone()]);
138        let hash = Arc::new(KetamaHashing::build(&backends));
139        let mut iter = hash.iter(b"test0");
140        assert_eq!(iter.next(), Some(&b2));
141        let mut iter = hash.iter(b"test1");
142        assert_eq!(iter.next(), Some(&b1));
143        let mut iter = hash.iter(b"test2");
144        assert_eq!(iter.next(), Some(&b1));
145        let mut iter = hash.iter(b"test3");
146        assert_eq!(iter.next(), Some(&b1));
147        let mut iter = hash.iter(b"test4");
148        assert_eq!(iter.next(), Some(&b1));
149        let mut iter = hash.iter(b"test5");
150        assert_eq!(iter.next(), Some(&b2)); // changed
151        let mut iter = hash.iter(b"test6");
152        assert_eq!(iter.next(), Some(&b1));
153        let mut iter = hash.iter(b"test7");
154        assert_eq!(iter.next(), Some(&b1)); // changed
155        let mut iter = hash.iter(b"test8");
156        assert_eq!(iter.next(), Some(&b1));
157        let mut iter = hash.iter(b"test9");
158        assert_eq!(iter.next(), Some(&b2));
159    }
160}