use crate::sharding::router::{ShardNode, fnv1a_hash};
use std::collections::BTreeMap;
#[derive(Debug, Default)]
pub struct NodeRing {
ring: BTreeMap<u64, usize>,
vnodes_per_node: usize,
}
impl NodeRing {
pub fn new(vnodes_per_node: usize) -> Self {
Self {
ring: BTreeMap::new(),
vnodes_per_node,
}
}
pub fn add_node(&mut self, node: &ShardNode) {
let vnode_count = ((self.vnodes_per_node as f64) * node.weight.max(0.1)) as usize;
for i in 0..vnode_count {
let vnode_key = format!("node:{}-vnode:{}", node.id, i);
let hash = fnv1a_hash(vnode_key.as_bytes());
self.ring.insert(hash, node.id);
}
}
pub fn remove_node(&mut self, node_id: usize) {
let max_vnodes = (self.vnodes_per_node as f64 * 10.0) as usize;
let mut to_remove = Vec::new();
for i in 0..max_vnodes {
let vnode_key = format!("node:{}-vnode:{}", node_id, i);
let hash = fnv1a_hash(vnode_key.as_bytes());
to_remove.push(hash);
}
for hash in to_remove {
self.ring.remove(&hash);
}
}
pub fn get_node(&self, hash_val: u64) -> Option<usize> {
if self.ring.is_empty() {
return None;
}
if let Some((_, &node_id)) = self.ring.range(hash_val..).next() {
Some(node_id)
} else {
let (_, &node_id) = self.ring.iter().next().unwrap();
Some(node_id)
}
}
pub fn is_empty(&self) -> bool {
self.ring.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_node_ring_distribution() {
let mut ring = NodeRing::new(300); let n1 = ShardNode {
id: 0,
address: "1".into(),
weight: 1.0,
};
let n2 = ShardNode {
id: 1,
address: "2".into(),
weight: 1.0,
};
let n3 = ShardNode {
id: 2,
address: "3".into(),
weight: 1.0,
};
ring.add_node(&n1);
ring.add_node(&n2);
ring.add_node(&n3);
let mut counts = vec![0; 3];
let total_keys = 30000;
for i in 0..total_keys {
let k = format!("data-{}", i);
let h = fnv1a_hash(k.as_bytes());
let node_id = ring.get_node(h).unwrap();
counts[node_id] += 1;
}
let expected = total_keys as f64 / 3.0;
let diff = expected * 0.7;
for i in 0..3 {
let count = counts[i] as f64;
assert!(
(count - expected).abs() <= diff,
"Node {} load {} is outside expected {} +- {}",
i,
count,
expected,
diff
);
}
}
#[test]
fn test_node_ring_add_remove() {
let mut ring = NodeRing::new(10);
let n1 = ShardNode {
id: 100,
address: "A".into(),
weight: 1.0,
};
let n2 = ShardNode {
id: 200,
address: "B".into(),
weight: 1.0,
};
ring.add_node(&n1);
ring.add_node(&n2);
let node_id = ring.get_node(12345).unwrap();
assert!(node_id == 100 || node_id == 200);
ring.remove_node(100);
for i in 0..10_000 {
assert_eq!(ring.get_node(i).unwrap(), 200);
}
ring.remove_node(200);
assert!(ring.is_empty());
assert_eq!(ring.get_node(12345), None);
}
}