pingora_load_balancing/selection/
consistent.rs1use super::*;
18use pingora_core::protocols::l4::socket::SocketAddr;
19use pingora_ketama::{Bucket, Continuum, Version};
20use std::collections::HashMap;
21
22pub struct KetamaHashing {
24 ring: Continuum,
25 backends: HashMap<SocketAddr, Backend>,
27}
28
29#[derive(Clone, Debug, Copy, Default)]
30pub struct KetamaConfig {
31 pub point_multiple: Option<u32>,
32}
33
34impl BackendSelection for KetamaHashing {
35 type Iter = OwnedNodeIterator;
36
37 type Config = KetamaConfig;
38
39 fn build_with_config(backends: &BTreeSet<Backend>, config: &Self::Config) -> Self {
40 let KetamaConfig { point_multiple } = *config;
41
42 let buckets: Vec<_> = backends
43 .iter()
44 .filter_map(|b| {
45 if let SocketAddr::Inet(addr) = b.addr {
47 Some(Bucket::new(addr, b.weight as u32))
48 } else {
49 None
50 }
51 })
52 .collect();
53 let new_backends = backends
54 .iter()
55 .map(|b| (b.addr.clone(), b.clone()))
56 .collect();
57
58 #[allow(unused)]
59 let version = if let Some(point_multiple) = point_multiple {
60 match () {
61 #[cfg(feature = "v2")]
62 () => Version::V2 { point_multiple },
63 #[cfg(not(feature = "v2"))]
64 () => Version::V1,
65 }
66 } else {
67 Version::V1
68 };
69
70 KetamaHashing {
71 ring: Continuum::new_with_version(&buckets, version),
72 backends: new_backends,
73 }
74 }
75
76 fn build(backends: &BTreeSet<Backend>) -> Self {
77 Self::build_with_config(backends, &KetamaConfig::default())
78 }
79
80 fn iter(self: &Arc<Self>, key: &[u8]) -> Self::Iter {
81 OwnedNodeIterator {
82 idx: self.ring.node_idx(key),
83 ring: self.clone(),
84 }
85 }
86}
87
88pub struct OwnedNodeIterator {
90 idx: usize,
91 ring: Arc<KetamaHashing>,
92}
93
94impl BackendIter for OwnedNodeIterator {
95 fn next(&mut self) -> Option<&Backend> {
96 self.ring.ring.get_addr(&mut self.idx).and_then(|addr| {
97 let addr = SocketAddr::Inet(*addr);
98 self.ring.backends.get(&addr)
99 })
100 }
101}
102
103#[cfg(test)]
104mod test {
105 use super::*;
106
107 #[test]
108 fn test_ketama() {
109 let b1 = Backend::new("1.1.1.1:80").unwrap();
110 let b2 = Backend::new("1.0.0.1:80").unwrap();
111 let b3 = Backend::new("1.0.0.255:80").unwrap();
112 let backends = BTreeSet::from_iter([b1.clone(), b2.clone(), b3.clone()]);
113 let hash = Arc::new(KetamaHashing::build(&backends));
114
115 let mut iter = hash.iter(b"test0");
116 assert_eq!(iter.next(), Some(&b2));
117 let mut iter = hash.iter(b"test1");
118 assert_eq!(iter.next(), Some(&b1));
119 let mut iter = hash.iter(b"test2");
120 assert_eq!(iter.next(), Some(&b1));
121 let mut iter = hash.iter(b"test3");
122 assert_eq!(iter.next(), Some(&b1));
123 let mut iter = hash.iter(b"test4");
124 assert_eq!(iter.next(), Some(&b1));
125 let mut iter = hash.iter(b"test5");
126 assert_eq!(iter.next(), Some(&b3));
127 let mut iter = hash.iter(b"test6");
128 assert_eq!(iter.next(), Some(&b1));
129 let mut iter = hash.iter(b"test7");
130 assert_eq!(iter.next(), Some(&b3));
131 let mut iter = hash.iter(b"test8");
132 assert_eq!(iter.next(), Some(&b1));
133 let mut iter = hash.iter(b"test9");
134 assert_eq!(iter.next(), Some(&b2));
135
136 let backends = BTreeSet::from_iter([b1.clone(), b2.clone()]);
138 let hash = Arc::new(KetamaHashing::build(&backends));
139 let mut iter = hash.iter(b"test0");
140 assert_eq!(iter.next(), Some(&b2));
141 let mut iter = hash.iter(b"test1");
142 assert_eq!(iter.next(), Some(&b1));
143 let mut iter = hash.iter(b"test2");
144 assert_eq!(iter.next(), Some(&b1));
145 let mut iter = hash.iter(b"test3");
146 assert_eq!(iter.next(), Some(&b1));
147 let mut iter = hash.iter(b"test4");
148 assert_eq!(iter.next(), Some(&b1));
149 let mut iter = hash.iter(b"test5");
150 assert_eq!(iter.next(), Some(&b2)); let mut iter = hash.iter(b"test6");
152 assert_eq!(iter.next(), Some(&b1));
153 let mut iter = hash.iter(b"test7");
154 assert_eq!(iter.next(), Some(&b1)); let mut iter = hash.iter(b"test8");
156 assert_eq!(iter.next(), Some(&b1));
157 let mut iter = hash.iter(b"test9");
158 assert_eq!(iter.next(), Some(&b2));
159 }
160}