pingora_load_balancing/selection/
consistent.rs1use super::*;
18use pingora_core::protocols::l4::socket::SocketAddr;
19use pingora_ketama::{Bucket, Continuum};
20use std::collections::HashMap;
21
22pub struct KetamaHashing {
24 ring: Continuum,
25 backends: HashMap<SocketAddr, Backend>,
27}
28
29impl BackendSelection for KetamaHashing {
30 type Iter = OwnedNodeIterator;
31
32 fn build(backends: &BTreeSet<Backend>) -> Self {
33 let buckets: Vec<_> = backends
34 .iter()
35 .filter_map(|b| {
36 if let SocketAddr::Inet(addr) = b.addr {
38 Some(Bucket::new(addr, b.weight as u32))
39 } else {
40 None
41 }
42 })
43 .collect();
44 let new_backends = backends
45 .iter()
46 .map(|b| (b.addr.clone(), b.clone()))
47 .collect();
48 KetamaHashing {
49 ring: Continuum::new(&buckets),
50 backends: new_backends,
51 }
52 }
53
54 fn iter(self: &Arc<Self>, key: &[u8]) -> Self::Iter {
55 OwnedNodeIterator {
56 idx: self.ring.node_idx(key),
57 ring: self.clone(),
58 }
59 }
60}
61
62pub struct OwnedNodeIterator {
64 idx: usize,
65 ring: Arc<KetamaHashing>,
66}
67
68impl BackendIter for OwnedNodeIterator {
69 fn next(&mut self) -> Option<&Backend> {
70 self.ring.ring.get_addr(&mut self.idx).and_then(|addr| {
71 let addr = SocketAddr::Inet(*addr);
72 self.ring.backends.get(&addr)
73 })
74 }
75}
76
77#[cfg(test)]
78mod test {
79 use super::*;
80
81 #[test]
82 fn test_ketama() {
83 let b1 = Backend::new("1.1.1.1:80").unwrap();
84 let b2 = Backend::new("1.0.0.1:80").unwrap();
85 let b3 = Backend::new("1.0.0.255:80").unwrap();
86 let backends = BTreeSet::from_iter([b1.clone(), b2.clone(), b3.clone()]);
87 let hash = Arc::new(KetamaHashing::build(&backends));
88
89 let mut iter = hash.iter(b"test0");
90 assert_eq!(iter.next(), Some(&b2));
91 let mut iter = hash.iter(b"test1");
92 assert_eq!(iter.next(), Some(&b1));
93 let mut iter = hash.iter(b"test2");
94 assert_eq!(iter.next(), Some(&b1));
95 let mut iter = hash.iter(b"test3");
96 assert_eq!(iter.next(), Some(&b1));
97 let mut iter = hash.iter(b"test4");
98 assert_eq!(iter.next(), Some(&b1));
99 let mut iter = hash.iter(b"test5");
100 assert_eq!(iter.next(), Some(&b3));
101 let mut iter = hash.iter(b"test6");
102 assert_eq!(iter.next(), Some(&b1));
103 let mut iter = hash.iter(b"test7");
104 assert_eq!(iter.next(), Some(&b3));
105 let mut iter = hash.iter(b"test8");
106 assert_eq!(iter.next(), Some(&b1));
107 let mut iter = hash.iter(b"test9");
108 assert_eq!(iter.next(), Some(&b2));
109
110 let backends = BTreeSet::from_iter([b1.clone(), b2.clone()]);
112 let hash = Arc::new(KetamaHashing::build(&backends));
113 let mut iter = hash.iter(b"test0");
114 assert_eq!(iter.next(), Some(&b2));
115 let mut iter = hash.iter(b"test1");
116 assert_eq!(iter.next(), Some(&b1));
117 let mut iter = hash.iter(b"test2");
118 assert_eq!(iter.next(), Some(&b1));
119 let mut iter = hash.iter(b"test3");
120 assert_eq!(iter.next(), Some(&b1));
121 let mut iter = hash.iter(b"test4");
122 assert_eq!(iter.next(), Some(&b1));
123 let mut iter = hash.iter(b"test5");
124 assert_eq!(iter.next(), Some(&b2)); let mut iter = hash.iter(b"test6");
126 assert_eq!(iter.next(), Some(&b1));
127 let mut iter = hash.iter(b"test7");
128 assert_eq!(iter.next(), Some(&b1)); let mut iter = hash.iter(b"test8");
130 assert_eq!(iter.next(), Some(&b1));
131 let mut iter = hash.iter(b"test9");
132 assert_eq!(iter.next(), Some(&b2));
133 }
134}