#![feature(test, result_map_or_else)]
extern crate fnv;
pub mod collections;
use collections::{first, Map, Set};
use fnv::FnvBuildHasher;
use std::{
cmp,
hash::{BuildHasher, Hash, Hasher},
ops::Index,
};
#[derive(Clone)]
pub struct RingBuilder<T, S = FnvBuildHasher>
where
T: Hash + Eq + Clone,
S: BuildHasher,
{
hasher: S,
vnodes: Option<usize>,
replicas: Option<usize>,
nodes: Vec<T>,
weighted_nodes: Vec<(T, usize)>,
}
impl<T: Hash + Eq + Clone> Default for RingBuilder<T> {
fn default() -> Self {
RingBuilder::new(Default::default())
}
}
impl<T: Hash + Eq + Clone, S: BuildHasher> RingBuilder<T, S> {
pub fn new(hasher: S) -> Self {
RingBuilder {
hasher,
vnodes: None,
replicas: None,
nodes: vec![],
weighted_nodes: vec![],
}
}
pub fn vnodes(mut self, vnodes: usize) -> Self {
self.vnodes = Some(vnodes);
self
}
pub fn replicas(mut self, replicas: usize) -> Self {
self.replicas = Some(replicas);
self
}
pub fn weighted_node(mut self, node: T, vnodes: usize) -> Self {
self.weighted_nodes.push((node, vnodes));
self
}
pub fn weighted_nodes(mut self, weighted_nodes: &[(T, usize)]) -> Self {
self.weighted_nodes.extend_from_slice(weighted_nodes);
self
}
pub fn weighted_nodes_iter<I>(mut self, weighted_nodes: I) -> Self
where
I: Iterator<Item = (T, usize)>,
{
weighted_nodes.for_each(|w_node| self.weighted_nodes.push(w_node));
self
}
pub fn node(mut self, node: T) -> Self {
self.nodes.push(node);
self
}
pub fn nodes(mut self, nodes: &[T]) -> Self {
self.nodes.extend_from_slice(nodes);
self
}
pub fn nodes_iter<I>(mut self, nodes: I) -> Self
where
I: Iterator<Item = T>,
{
nodes.for_each(|node| self.nodes.push(node));
self
}
pub fn build(self) -> Ring<T, S> {
let vnodes = self.vnodes.unwrap_or(10);
let mut ring = Ring {
n_vnodes: vnodes,
replicas: self.replicas.unwrap_or(1),
hasher: self.hasher,
vnodes: Vec::with_capacity(vnodes * self.nodes.len()),
unique: Vec::with_capacity(self.nodes.len() + self.weighted_nodes.len()),
};
self.nodes
.into_iter()
.map(|n| (n, vnodes))
.chain(self.weighted_nodes)
.for_each(|(n, v)| ring.insert_weight(n, v));
ring
}
}
#[derive(Clone)]
pub struct Ring<T: Hash + Eq + Clone, S = FnvBuildHasher> {
n_vnodes: usize, replicas: usize, hasher: S,
vnodes: Vec<(u64, (T, u64))>,
unique: Vec<(u64, usize)>,
}
impl<T: Hash + Eq + Clone> Default for Ring<T> {
fn default() -> Self {
RingBuilder::default().build()
}
}
impl<K: Hash, T: Hash + Eq + Clone, S: BuildHasher> Index<K> for Ring<T, S> {
type Output = T;
fn index(&self, index: K) -> &Self::Output {
self.get(index)
}
}
impl<T: Hash + Eq + Clone, S: BuildHasher> Ring<T, S> {
pub fn len(&self) -> usize {
self.unique.len()
}
pub fn is_empty(&self) -> bool {
self.vnodes.is_empty()
}
pub fn vnodes(&self) -> usize {
self.vnodes.len()
}
pub fn weight(&self, node: &T) -> Option<usize> {
self.unique.map_lookup(&self.hash(node)).map(|w| *w)
}
pub fn insert(&mut self, node: T) {
self.insert_weight(node, self.n_vnodes)
}
pub fn insert_weight(&mut self, node: T, vnodes: usize) {
let node_hash = self.hash(&node);
let mut hash = node_hash;
for _ in 0..vnodes.saturating_sub(1) {
self.vnodes.map_insert(hash, (node.clone(), node_hash));
hash = self.hash(hash);
}
if vnodes > 0 {
self.vnodes.map_insert(hash, (node, node_hash));
hash = self.hash(hash);
}
while self.vnodes.map_remove(&hash).is_some() {
hash = self.hash(hash);
}
self.unique.map_insert(node_hash, vnodes);
}
fn hash<K: Hash>(&self, key: K) -> u64 {
let mut digest = self.hasher.build_hasher();
key.hash(&mut digest);
digest.finish()
}
pub fn remove(&mut self, node: &T) {
self.vnodes.retain(|(_, (_node, _))| node != _node);
self.unique.map_remove(&self.hash(node));
}
pub fn try_get<K: Hash>(&self, key: K) -> Option<&T> {
self.vnodes.find_gte(&self.hash(key)).map(first)
}
pub fn get<K: Hash>(&self, key: K) -> &T {
self.try_get(key).unwrap()
}
pub fn replicas<'a, K: Hash>(&'a self, key: K) -> Candidates<'a, T, S> {
Candidates {
limit: cmp::min(self.replicas, self.len()),
inner: self,
seen: Vec::with_capacity(self.replicas),
hash: self.hash(&key),
}
}
unsafe fn get_root_hash(&self, vnode_idx: usize) -> u64 {
(self.vnodes.get_unchecked(vnode_idx).1).1
}
unsafe fn get_node_ref(&self, vnode_idx: usize) -> &T {
&(self.vnodes.get_unchecked(vnode_idx).1).0
}
}
pub struct Candidates<'a, T: Hash + Eq + Clone, S = FnvBuildHasher> {
limit: usize, inner: &'a Ring<T, S>, seen: Vec<u64>, hash: u64, }
impl<'a, T: Hash + Eq + Clone, S: BuildHasher> Iterator for Candidates<'a, T, S> {
type Item = &'a T;
fn next(&mut self) -> Option<Self::Item> {
if self.seen.len() >= self.limit {
return None;
}
let checked = |i| match self.inner.vnodes.len() {
n if n == 0 => None,
n if n == i => Some(0),
_ => Some(i),
};
let mut idx = (self.inner.vnodes)
.binary_search_by_key(&&self.hash, first)
.map_or_else(checked, Some)?;
while !self
.seen
.set_insert(unsafe { self.inner.get_root_hash(idx) })
{
if idx < self.inner.vnodes.len() - 1 {
idx += 1;
} else {
idx = 0;
}
}
if self.seen.len() < self.limit {
self.hash = self.inner.hash(self.hash);
}
Some(unsafe { self.inner.get_node_ref(idx) })
}
fn size_hint(&self) -> (usize, Option<usize>) {
let n = self.limit - self.seen.len();
(n, Some(n))
}
}
#[cfg(test)]
mod consistent_hash_ring_tests {
extern crate test;
use super::*;
use test::Bencher;
const TEST_VNODES: usize = 4;
#[test]
fn remove_insert_is_idempotent() {
for vnodes in 1..=TEST_VNODES {
println!("vnodes: {}", vnodes);
let mut ring = RingBuilder::default()
.vnodes(vnodes)
.nodes_iter(0..16)
.build();
let x = ring["hello_worldo"];
ring.remove(&x);
ring.insert(x);
let y = ring["hello_worldo"];
assert_eq!(x, y);
}
}
#[test]
fn is_consistent() {
for vnodes in 1..=TEST_VNODES {
println!("vnodes: {}", vnodes);
let ring1 = RingBuilder::default()
.vnodes(vnodes)
.nodes_iter(vec![0, 1, 2].into_iter())
.build();
let ring2 = RingBuilder::default()
.vnodes(vnodes)
.nodes_iter(vec![1, 2, 0].into_iter())
.build();
(0..32).for_each(|i| assert_eq!(ring1[i], ring2[i]));
}
}
#[test]
fn try_get_does_not_panic() {
let ring: Ring<usize> = Ring::default();
assert_eq!(None, ring.try_get("helloworldo"));
}
#[test]
fn removing_nodes_does_not_redistribute_all_replicas() {
for vnodes in 1..=TEST_VNODES {
println!("vnodes: {}", vnodes);
let mut ring = RingBuilder::default()
.vnodes(vnodes)
.replicas(3)
.nodes_iter(0..32)
.build();
let control = ring.clone();
const REMOVED: usize = 2;
ring.remove(&REMOVED);
for x in 0..64 {
let ctl: Vec<_> = control.replicas(x).collect();
assert_eq!(*ctl[0], control[x]);
let real: Vec<_> = ring.replicas(x).collect();
assert_eq!(*real[0], ring[x]);
if !ctl.contains(&&REMOVED) {
assert_eq!(ctl, real);
}
}
}
}
#[test]
fn inserting_nodes_does_not_redistribute_all_replicas() {
for vnodes in 1..=TEST_VNODES {
println!("vnodes: {}", vnodes);
let mut x_ring = RingBuilder::default()
.vnodes(vnodes)
.replicas(3)
.nodes_iter(0..4)
.build();
let mut y_ring = x_ring.clone();
const X: usize = 42;
const Y: usize = 24;
x_ring.insert(X);
y_ring.insert(Y);
for v in 0..64 {
let xs: Vec<_> = x_ring.replicas(v).collect();
assert_eq!(*xs[0], x_ring[v]);
let ys: Vec<_> = y_ring.replicas(v).collect();
assert_eq!(*ys[0], y_ring[v]);
if !xs.contains(&&X) && !ys.contains(&&Y) {
assert_eq!(xs, ys);
}
}
}
}
fn bench_replicas32(b: &mut Bencher, replicas: usize) {
let mut ring = RingBuilder::default().vnodes(50).replicas(replicas).build();
let buckets: Vec<String> = (0..32)
.map(|s| format!("shard-{}", s))
.inspect(|b| ring.insert(b.clone()))
.collect();
let mut i = 0;
b.iter(|| {
i += 1;
ring.replicas(&buckets[i & 31]).for_each(|_| ());
});
}
#[bench]
fn bench_replicas32_1(b: &mut Bencher) {
bench_replicas32(b, 1);
}
#[bench]
fn bench_replicas32_2(b: &mut Bencher) {
bench_replicas32(b, 2);
}
#[bench]
fn bench_replicas32_3(b: &mut Bencher) {
bench_replicas32(b, 3);
}
#[bench]
fn bench_replicas32_4(b: &mut Bencher) {
bench_replicas32(b, 4);
}
fn bench_get(b: &mut Bencher, shards: usize) {
let mut ring = RingBuilder::default().vnodes(50).build();
let buckets: Vec<String> = (0..shards)
.map(|s| format!("shard-{}", s))
.inspect(|b| ring.insert(b.clone()))
.collect();
let mut i = 0;
b.iter(|| {
i += 1;
ring.get(&buckets[i & (shards - 1)]);
});
}
#[bench]
fn bench_get_a_8(b: &mut Bencher) {
bench_get(b, 8);
}
#[bench]
fn bench_get_b_32(b: &mut Bencher) {
bench_get(b, 32);
}
#[bench]
fn bench_get_c_128(b: &mut Bencher) {
bench_get(b, 128);
}
#[bench]
fn bench_get_d_512(b: &mut Bencher) {
bench_get(b, 512);
}
}