pingora_load_balancing/selection/
consistent.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Consistent Hashing
16
17use super::*;
18use pingora_core::protocols::l4::socket::SocketAddr;
19use pingora_ketama::{Bucket, Continuum};
20use std::collections::HashMap;
21
22/// Weighted Ketama consistent hashing
23pub struct KetamaHashing {
24    ring: Continuum,
25    // TODO: update Ketama to just store this
26    backends: HashMap<SocketAddr, Backend>,
27}
28
29impl BackendSelection for KetamaHashing {
30    type Iter = OwnedNodeIterator;
31
32    fn build(backends: &BTreeSet<Backend>) -> Self {
33        let buckets: Vec<_> = backends
34            .iter()
35            .filter_map(|b| {
36                // FIXME: ketama only supports Inet addr, UDS addrs are ignored here
37                if let SocketAddr::Inet(addr) = b.addr {
38                    Some(Bucket::new(addr, b.weight as u32))
39                } else {
40                    None
41                }
42            })
43            .collect();
44        let new_backends = backends
45            .iter()
46            .map(|b| (b.addr.clone(), b.clone()))
47            .collect();
48        KetamaHashing {
49            ring: Continuum::new(&buckets),
50            backends: new_backends,
51        }
52    }
53
54    fn iter(self: &Arc<Self>, key: &[u8]) -> Self::Iter {
55        OwnedNodeIterator {
56            idx: self.ring.node_idx(key),
57            ring: self.clone(),
58        }
59    }
60}
61
62/// Iterator over a Continuum
63pub struct OwnedNodeIterator {
64    idx: usize,
65    ring: Arc<KetamaHashing>,
66}
67
68impl BackendIter for OwnedNodeIterator {
69    fn next(&mut self) -> Option<&Backend> {
70        self.ring.ring.get_addr(&mut self.idx).and_then(|addr| {
71            let addr = SocketAddr::Inet(*addr);
72            self.ring.backends.get(&addr)
73        })
74    }
75}
76
77#[cfg(test)]
78mod test {
79    use super::*;
80
81    #[test]
82    fn test_ketama() {
83        let b1 = Backend::new("1.1.1.1:80").unwrap();
84        let b2 = Backend::new("1.0.0.1:80").unwrap();
85        let b3 = Backend::new("1.0.0.255:80").unwrap();
86        let backends = BTreeSet::from_iter([b1.clone(), b2.clone(), b3.clone()]);
87        let hash = Arc::new(KetamaHashing::build(&backends));
88
89        let mut iter = hash.iter(b"test0");
90        assert_eq!(iter.next(), Some(&b2));
91        let mut iter = hash.iter(b"test1");
92        assert_eq!(iter.next(), Some(&b1));
93        let mut iter = hash.iter(b"test2");
94        assert_eq!(iter.next(), Some(&b1));
95        let mut iter = hash.iter(b"test3");
96        assert_eq!(iter.next(), Some(&b1));
97        let mut iter = hash.iter(b"test4");
98        assert_eq!(iter.next(), Some(&b1));
99        let mut iter = hash.iter(b"test5");
100        assert_eq!(iter.next(), Some(&b3));
101        let mut iter = hash.iter(b"test6");
102        assert_eq!(iter.next(), Some(&b1));
103        let mut iter = hash.iter(b"test7");
104        assert_eq!(iter.next(), Some(&b3));
105        let mut iter = hash.iter(b"test8");
106        assert_eq!(iter.next(), Some(&b1));
107        let mut iter = hash.iter(b"test9");
108        assert_eq!(iter.next(), Some(&b2));
109
110        // remove b3
111        let backends = BTreeSet::from_iter([b1.clone(), b2.clone()]);
112        let hash = Arc::new(KetamaHashing::build(&backends));
113        let mut iter = hash.iter(b"test0");
114        assert_eq!(iter.next(), Some(&b2));
115        let mut iter = hash.iter(b"test1");
116        assert_eq!(iter.next(), Some(&b1));
117        let mut iter = hash.iter(b"test2");
118        assert_eq!(iter.next(), Some(&b1));
119        let mut iter = hash.iter(b"test3");
120        assert_eq!(iter.next(), Some(&b1));
121        let mut iter = hash.iter(b"test4");
122        assert_eq!(iter.next(), Some(&b1));
123        let mut iter = hash.iter(b"test5");
124        assert_eq!(iter.next(), Some(&b2)); // changed
125        let mut iter = hash.iter(b"test6");
126        assert_eq!(iter.next(), Some(&b1));
127        let mut iter = hash.iter(b"test7");
128        assert_eq!(iter.next(), Some(&b1)); // changed
129        let mut iter = hash.iter(b"test8");
130        assert_eq!(iter.next(), Some(&b1));
131        let mut iter = hash.iter(b"test9");
132        assert_eq!(iter.next(), Some(&b2));
133    }
134}