extern crate hashring_coordinator;
use hashring_coordinator::{HashRing, Replicas};
use rand::{Rng, distr::Alphanumeric};
use std::collections::HashMap;
use std::hash::Hash;
use std::net::IpAddr;
use std::ops::RangeInclusive;
use std::str::FromStr;
fn main() {
let replicas = 2;
let num_vnodes = 20;
let mut coordinator = Coordinator::new(
replicas,
num_vnodes,
vec![
VNode::new("127.0.0.1"),
VNode::new("127.0.0.2"),
VNode::new("127.0.0.3"),
VNode::new("127.0.0.4"),
VNode::new("127.0.0.5"),
],
);
let mut known_keys = vec![];
for _ in 0..10000 {
let value = random_string();
let key = format!("key_{value}"); known_keys.push(key.clone());
coordinator.post(key, value);
}
for key in &known_keys {
match coordinator.test_get(key) {
Ok(_) => (),
Err(mismatches) => println!("error: {key} not found on {mismatches} nodes"),
}
}
match coordinator.test_get(&"foo".to_string()) {
Ok(_) => println!("error: foo should not be found "),
Err(mismatches) if mismatches == 1 + replicas => (),
Err(mismatches) => println!(
"error: foo should not be found on exactly {} nodes got {mismatches} instead",
1 + replicas
),
}
let hashring_previous = coordinator.hashring();
coordinator.add_node(VNode::new("127.0.0.6"));
println!("\n# distribution of keys across cluster, after new node joined");
coordinator.print_utilization();
let available_nodes = hashring_previous.nodes();
coordinator.rebalance(&hashring_previous, &available_nodes);
println!("\n# distribution of keys across cluster, after new node was synchronized");
coordinator.print_utilization();
for key in &known_keys {
match coordinator.test_get(key) {
Ok(_) => (),
Err(mismatches) => println!("error: {key} not found on {mismatches} nodes"),
}
}
let hashring_previous = coordinator.hashring();
coordinator.drop_node(VNode::new("127.0.0.3"));
let available_nodes = coordinator.hashring.nodes();
coordinator.rebalance(&hashring_previous, &available_nodes);
for key in &known_keys {
match coordinator.test_get(key) {
Ok(_) => (),
Err(mismatches) => println!("error: {key} not found on {mismatches} nodes"),
}
}
println!("\n# distribution of keys across cluster, after one node left");
coordinator.print_utilization();
let mut coordinator_replacement = Coordinator::new(
replicas,
num_vnodes,
vec![
VNode::new("127.0.0.11"),
VNode::new("127.0.0.12"),
VNode::new("127.0.0.13"),
VNode::new("127.0.0.14"),
],
);
coordinator_replacement.synchronize(&coordinator);
println!("\n# distribution of keys across the new cluster");
coordinator_replacement.print_utilization();
for key in &known_keys {
match coordinator_replacement.test_get(key) {
Ok(_) => (),
Err(mismatches) => println!("error: {key} not found on {mismatches} nodes"),
}
}
}
#[derive(Clone, Debug, Hash, PartialEq)]
struct VNode {
ip: IpAddr,
}
impl VNode {
fn new(ip: &str) -> Self {
VNode {
ip: IpAddr::from_str(ip).unwrap(),
}
}
}
struct Node {
store: HashMap<String, String>,
}
impl Node {
fn new() -> Self {
Node {
store: HashMap::new(),
}
}
fn post(&mut self, key: String, value: String) {
self.store.insert(key, value);
}
fn get(&self, key: &String) -> Option<&String> {
self.store.get(key)
}
fn fetch_range(
&self,
hash_range: RangeInclusive<u64>,
hashring: &HashRing<VNode>,
) -> Vec<(String, String)> {
self.store
.iter()
.filter(|(key, _)| hash_range.contains(&hashring.get_hash(key)))
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
}
fn size(&self) -> usize {
self.store.len()
}
}
struct Coordinator {
hashring: HashRing<VNode>,
nodes: HashMap<IpAddr, Node>,
}
impl Coordinator {
fn new(replicas: usize, num_vnodes: usize, vnodes: Vec<VNode>) -> Self {
let mut hashring = HashRing::new(replicas, num_vnodes);
let mut nodes = HashMap::new();
for vnode in &vnodes {
nodes.insert(vnode.ip, Node::new());
}
hashring.batch_add(vnodes);
Coordinator { hashring, nodes }
}
fn add_node(&mut self, vnode: VNode) {
self.nodes.insert(vnode.ip, Node::new());
self.hashring.add(vnode);
}
fn drop_node(&mut self, vnode: VNode) {
self.nodes.retain(|ip, _| *ip != vnode.ip);
self.hashring.remove(&vnode);
}
fn post(&mut self, key: String, value: String) {
let vnodes = self.hashring.get(&key);
for vnode in vnodes {
self.nodes
.entry(vnode.ip)
.and_modify(|n| n.post(key.clone(), value.clone()));
}
}
fn test_get(&self, key: &String) -> Result<(), usize> {
let vnodes = self.hashring.get(&key);
let mut value = None;
let mut mismatch = 0;
for vnode in &vnodes {
let val = self.get_node(vnode).get(key);
match val {
Some(val) if value.is_none() => value = Some(val),
Some(val) if value == Some(val) => (),
Some(_) => mismatch += 1,
None => mismatch += 1,
}
}
match mismatch {
0 => Ok(()),
_ => Err(mismatch),
}
}
fn get_node(&self, vnode: &VNode) -> &Node {
self.nodes
.get(&vnode.ip)
.unwrap_or_else(|| panic!("expected to find node {}", vnode.ip))
}
fn print_utilization(&self) {
for (ip, node) in &self.nodes {
println!("{ip} contains {} values", node.size())
}
}
fn hashring(&self) -> HashRing<VNode> {
self.hashring.clone()
}
fn rebalance(&mut self, from: &HashRing<VNode>, available_nodes: &[VNode]) {
for target_vnode in &self.hashring {
let instructions = self
.hashring
.find_sources(target_vnode, from, available_nodes);
for Replicas { hash_range, nodes } in instructions {
if let Some(source_vnode) = nodes.first() {
let values = self
.get_node(source_vnode)
.fetch_range(hash_range, &self.hashring);
if let Some(target_node) = self.nodes.get_mut(&target_vnode.ip) {
for (key, value) in values {
target_node.post(key, value)
}
}
}
}
}
}
fn synchronize(&mut self, from: &Coordinator) {
for target_vnode in &self.hashring.nodes() {
let instructions =
self.hashring
.find_sources(target_vnode, &from.hashring, &from.hashring.nodes());
for Replicas { hash_range, nodes } in instructions {
if let Some(source_vnode) = nodes.first() {
let values = from
.get_node(source_vnode)
.fetch_range(hash_range, &self.hashring);
if let Some(target_node) = self.nodes.get_mut(&target_vnode.ip) {
for (key, value) in values {
target_node.post(key.clone(), value.clone())
}
}
}
}
}
}
}
fn random_string() -> String {
rand::rng()
.sample_iter(&Alphanumeric)
.take(7)
.map(char::from)
.collect()
}