use super::*;
use pingora_core::protocols::l4::socket::SocketAddr;
use pingora_ketama::{Bucket, Continuum, Version};
use std::collections::HashMap;
pub struct KetamaHashing {
ring: Continuum,
backends: HashMap<SocketAddr, Backend>,
}
#[derive(Clone, Debug, Copy, Default)]
pub struct KetamaConfig {
pub point_multiple: Option<u32>,
}
impl BackendSelection for KetamaHashing {
type Iter = OwnedNodeIterator;
type Config = KetamaConfig;
fn build_with_config(backends: &BTreeSet<Backend>, config: &Self::Config) -> Self {
let KetamaConfig { point_multiple } = *config;
let buckets: Vec<_> = backends
.iter()
.filter_map(|b| {
if let SocketAddr::Inet(addr) = b.addr {
Some(Bucket::new(addr, b.weight as u32))
} else {
None
}
})
.collect();
let new_backends = backends
.iter()
.map(|b| (b.addr.clone(), b.clone()))
.collect();
#[allow(unused)]
let version = if let Some(point_multiple) = point_multiple {
match () {
#[cfg(feature = "v2")]
() => Version::V2 { point_multiple },
#[cfg(not(feature = "v2"))]
() => Version::V1,
}
} else {
Version::V1
};
KetamaHashing {
ring: Continuum::new_with_version(&buckets, version),
backends: new_backends,
}
}
fn build(backends: &BTreeSet<Backend>) -> Self {
Self::build_with_config(backends, &KetamaConfig::default())
}
fn iter(self: &Arc<Self>, key: &[u8]) -> Self::Iter {
OwnedNodeIterator {
idx: self.ring.node_idx(key),
ring: self.clone(),
}
}
}
pub struct OwnedNodeIterator {
idx: usize,
ring: Arc<KetamaHashing>,
}
impl BackendIter for OwnedNodeIterator {
fn next(&mut self) -> Option<&Backend> {
self.ring.ring.get_addr(&mut self.idx).and_then(|addr| {
let addr = SocketAddr::Inet(*addr);
self.ring.backends.get(&addr)
})
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_ketama() {
let b1 = Backend::new("1.1.1.1:80").unwrap();
let b2 = Backend::new("1.0.0.1:80").unwrap();
let b3 = Backend::new("1.0.0.255:80").unwrap();
let backends = BTreeSet::from_iter([b1.clone(), b2.clone(), b3.clone()]);
let hash = Arc::new(KetamaHashing::build(&backends));
let mut iter = hash.iter(b"test0");
assert_eq!(iter.next(), Some(&b2));
let mut iter = hash.iter(b"test1");
assert_eq!(iter.next(), Some(&b1));
let mut iter = hash.iter(b"test2");
assert_eq!(iter.next(), Some(&b1));
let mut iter = hash.iter(b"test3");
assert_eq!(iter.next(), Some(&b1));
let mut iter = hash.iter(b"test4");
assert_eq!(iter.next(), Some(&b1));
let mut iter = hash.iter(b"test5");
assert_eq!(iter.next(), Some(&b3));
let mut iter = hash.iter(b"test6");
assert_eq!(iter.next(), Some(&b1));
let mut iter = hash.iter(b"test7");
assert_eq!(iter.next(), Some(&b3));
let mut iter = hash.iter(b"test8");
assert_eq!(iter.next(), Some(&b1));
let mut iter = hash.iter(b"test9");
assert_eq!(iter.next(), Some(&b2));
let backends = BTreeSet::from_iter([b1.clone(), b2.clone()]);
let hash = Arc::new(KetamaHashing::build(&backends));
let mut iter = hash.iter(b"test0");
assert_eq!(iter.next(), Some(&b2));
let mut iter = hash.iter(b"test1");
assert_eq!(iter.next(), Some(&b1));
let mut iter = hash.iter(b"test2");
assert_eq!(iter.next(), Some(&b1));
let mut iter = hash.iter(b"test3");
assert_eq!(iter.next(), Some(&b1));
let mut iter = hash.iter(b"test4");
assert_eq!(iter.next(), Some(&b1));
let mut iter = hash.iter(b"test5");
assert_eq!(iter.next(), Some(&b2)); let mut iter = hash.iter(b"test6");
assert_eq!(iter.next(), Some(&b1));
let mut iter = hash.iter(b"test7");
assert_eq!(iter.next(), Some(&b1)); let mut iter = hash.iter(b"test8");
assert_eq!(iter.next(), Some(&b1));
let mut iter = hash.iter(b"test9");
assert_eq!(iter.next(), Some(&b2));
}
}