use super::*;
use std::collections::BTreeSet;
use std::collections::{HashMap, HashSet};
use std::mem;
use std::panic;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use hex_literal::hex;
use log::{debug, error, trace, warn};
use rand::prelude::*;
use crate::types::{DefaultStdHasher, Update};
fn init() {
let _ = env_logger::builder().is_test(true).try_init();
}
#[test]
fn node_string() {
let s1 = String::from("Node1");
let a1 = Arc::new(s1);
let mut h = DefaultStdHasher::default();
let vn1 = VirtualNode::new(&mut h, Arc::clone(&a1), 1);
let vn2 = VirtualNode::new(&mut h, Arc::clone(&a1), 2);
let vn3 = VirtualNode::new(&mut h, Arc::clone(&a1), 3);
eprintln!("vn1 = {:?},\nvn2 = {:?},\nvn3 = {:?}", vn1, vn2, vn3);
}
#[test]
fn node_str() {
let s1 = "Node1";
let a1: Arc<str> = Arc::from(s1);
let mut h = DefaultStdHasher::default();
let vn1 = VirtualNode::new(&mut h, Arc::clone(&a1), 1);
let vn2 = VirtualNode::new(&mut h, Arc::clone(&a1), 2);
let vn3 = VirtualNode::new(&mut h, Arc::clone(&a1), 3);
eprintln!("vn1 = {:?},\nvn2 = {:?},\nvn3 = {:?}", vn1, vn2, vn3);
}
#[test]
fn test_node_impls() -> Result<()> {
const VNODES_PER_NODE: Vnid = 4;
const REPLICATION_FACTOR: u8 = 3;
init();
let mut h = DefaultStdHasher::default();
let string0: Arc<String> = Arc::from(String::from("String0"));
let string1: Arc<String> = Arc::from(String::from("String1"));
let _ = VirtualNode::new(&mut h, Arc::clone(&string0), 0);
let _ = VirtualNode::new(&mut h, Arc::clone(&string1), 1);
let _r = HashRing::with_nodes(VNODES_PER_NODE, REPLICATION_FACTOR, &[string0, string1])?;
trace!("{}", _r);
let str0: Arc<str> = Arc::from("str0");
let str1: Arc<str> = Arc::from("str1");
let _ = VirtualNode::new(&mut h, Arc::clone(&str0), 0);
let _ = VirtualNode::new(&mut h, Arc::clone(&str1), 1);
let _r = HashRing::with_nodes(VNODES_PER_NODE, REPLICATION_FACTOR, &[str0, str1])?;
trace!("{}", _r);
let vec0: Arc<Vec<u8>> = Arc::new(42u64.to_ne_bytes().to_vec());
let vec1: Arc<Vec<u8>> = Arc::new(u64::MAX.to_ne_bytes().to_vec());
let _ = VirtualNode::new(&mut h, Arc::clone(&vec0), 0);
let _ = VirtualNode::new(&mut h, Arc::clone(&vec1), 1);
let _r = HashRing::with_nodes(VNODES_PER_NODE, REPLICATION_FACTOR, &[vec0, vec1])?;
trace!("{}", _r);
let tmp0 = 42u64.to_ne_bytes();
let tmp1 = u64::MAX.to_ne_bytes();
let slcref0: Arc<&[u8]> = Arc::new(&tmp0);
let slcref1: Arc<&[u8]> = Arc::new(&tmp1);
let _ = VirtualNode::new(&mut h, Arc::clone(&slcref0), 0);
let _ = VirtualNode::new(&mut h, Arc::clone(&slcref1), 1);
let _r = HashRing::with_nodes(VNODES_PER_NODE, REPLICATION_FACTOR, &[slcref0, slcref1])?;
trace!("{}", _r);
let slcuns0: Arc<[u8]> = Arc::from(42u64.to_ne_bytes());
let slcuns1: Arc<[u8]> = Arc::from(u64::MAX.to_ne_bytes());
let _ = VirtualNode::new(&mut h, Arc::clone(&slcuns0), 0);
let _ = VirtualNode::new(&mut h, Arc::clone(&slcuns1), 1);
let _r = HashRing::with_nodes(VNODES_PER_NODE, REPLICATION_FACTOR, &[slcuns0, slcuns1])?;
trace!("{}", _r);
Ok(())
}
#[test]
fn new_ring() {
const VNODES_PER_NODE: Vnid = 4;
const REPLICATION_FACTOR: u8 = 2;
init();
let nodes: Vec<Arc<str>> = vec![Arc::from("Node1"), Arc::from("Node2"), Arc::from("Node3")];
let ring = HashRing::with_nodes(VNODES_PER_NODE, REPLICATION_FACTOR, &nodes);
assert!(ring.is_ok());
let ring = ring.unwrap();
eprintln!("ring = {:#?}", ring);
eprintln!("ring.len_nodes() = {:#?}", ring.len_nodes());
eprintln!("ring.len_virtual_nodes() = {:#?}", ring.len_virtual_nodes());
}
#[test]
fn new_ring_already_in() {
const VNODES_PER_NODE: Vnid = 4;
const REPLICATION_FACTOR: u8 = 2;
init();
let nodes: Vec<Arc<str>> = vec![Arc::from("Node1"), Arc::from("Node1"), Arc::from("Node1")];
let ring = HashRing::with_nodes(VNODES_PER_NODE, REPLICATION_FACTOR, &nodes);
eprintln!("ring = {:#?}", ring);
assert!(ring.is_err());
}
#[test]
fn test_insert_singlethr_01() -> Result<()> {
const VNODES_PER_NODE: Vnid = 4;
const REPLICATION_FACTOR: u8 = 2;
init();
let nodes: Vec<Arc<str>> = vec![Arc::from("Node1"), Arc::from("Node2"), Arc::from("Node3")];
let ring = HashRing::with_nodes(VNODES_PER_NODE, REPLICATION_FACTOR, &nodes)?;
eprintln!("ring = {:#?}", ring);
eprintln!("ring.len_nodes() = {:#?}", ring.len_nodes());
eprintln!("ring.len_virtual_nodes() = {:#?}", ring.len_virtual_nodes());
ring.insert(&[Arc::from("Node11"), Arc::from("Node12")])?;
eprintln!("ring = {:#?}", ring);
eprintln!("ring.len_nodes() = {:#?}", ring.len_nodes());
eprintln!("ring.len_virtual_nodes() = {:#?}", ring.len_virtual_nodes());
Ok(())
}
fn test_insert_multithr_01_with_hasher<H: Hasher + Send + Sync + 'static>(h: H) -> Result<()> {
const VNODES_PER_NODE: Vnid = 4;
const REPLICATION_FACTOR: u8 = 2;
init();
let ring = HashRing::with_hasher_and_nodes(h, VNODES_PER_NODE, REPLICATION_FACTOR, &[])?;
const ITERS: usize = 1000;
let rand_insertions = |ring: Arc<HashRing<String, H>>| {
let mut r = rand::thread_rng();
let mut inserted_nodes = HashSet::new();
for _ in 0..ITERS {
let sleep_dur = r.gen_range(50..100);
thread::sleep(Duration::from_millis(sleep_dur));
let node_id: usize = r.gen_range(0..3000);
let n = Arc::new(format!("Node-{}", node_id));
trace!("adding {:?}...", n);
match ring.insert(&[n]) {
Ok(_) => {
let _ = inserted_nodes.insert(node_id);
}
Err(err) => match err {
HashRingError::ConcurrentModification => {
debug!("{:?}", err);
}
_ => {
debug!("{:?}", err);
}
},
};
}
inserted_nodes
};
let ring = Arc::new(ring);
let r1 = Arc::clone(&ring);
let r2 = Arc::clone(&ring);
let t1 = thread::spawn(move || rand_insertions(r1));
let t2 = thread::spawn(move || rand_insertions(r2));
let s1 = t1.join().unwrap();
let s2 = t2.join().unwrap();
assert!(s1.is_disjoint(&s2));
let union: BTreeSet<_> = s1.union(&s2).collect();
assert_eq!(union.len(), s1.len() + s2.len());
debug!("Thr1 successfully inserted {} distinct nodes.", s1.len());
debug!("Thr2 successfully inserted {} distinct nodes.", s2.len());
debug!("A total of {} distinct nodes were inserted.", union.len());
debug!("ring.len_nodes() = {}", ring.len_nodes());
debug!("ring.len_virtual_nodes() = {}", ring.len_virtual_nodes());
assert_eq!(union.len(), ring.len_nodes());
assert_eq!(
union.len() * VNODES_PER_NODE as usize,
ring.len_virtual_nodes()
);
debug!("Hash Ring String Representation:\n{}", ring);
Ok(())
}
#[test]
fn test_insert_multithr_01_stdhash() -> Result<()> {
test_insert_multithr_01_with_hasher(DefaultStdHasher::default())
}
#[cfg(feature = "blake3-hash")]
#[test]
fn test_insert_multithr_01_blake3() -> Result<()> {
test_insert_multithr_01_with_hasher(Blake3Hasher::default())
}
#[cfg(feature = "blake2b-hash")]
#[test]
fn test_insert_multithr_01_blake2b() -> Result<()> {
test_insert_multithr_01_with_hasher(Blake2bHasher::default())
}
#[test]
fn test_replf_gt_nodes() -> Result<()> {
const VNODES_PER_NODE: Vnid = 2;
const REPLICATION_FACTOR: u8 = 4;
init();
let ring = HashRing::with_nodes(VNODES_PER_NODE, REPLICATION_FACTOR, &[])?;
const NUM_NODES: usize = 3;
for node_id in 0..NUM_NODES {
let n = Arc::new(format!("Node-{}", node_id));
ring.insert(&[n])?;
}
debug!("ring.len_nodes() = {}", ring.len_nodes());
debug!("ring.len_virtual_nodes() = {}", ring.len_virtual_nodes());
debug!("Hash Ring String Representation:\n{}", ring);
assert_eq!(ring.len_nodes(), NUM_NODES);
assert_eq!(
ring.len_virtual_nodes(),
NUM_NODES * VNODES_PER_NODE as usize
);
Ok(())
}
#[test]
fn test_remove_singlethr_01() -> Result<()> {
const VNODES_PER_NODE: Vnid = 3;
const REPLICATION_FACTOR: u8 = 6;
init();
let ring = HashRing::with_nodes(VNODES_PER_NODE, REPLICATION_FACTOR, &[])?;
const NUM_NODES: usize = 4;
for node_id in 0..NUM_NODES {
let n = Arc::new(format!("Node-{}", node_id));
ring.insert(&[n])?;
}
debug!("Hash Ring String Representation:\n{}", ring);
assert_eq!(ring.len_nodes(), NUM_NODES);
assert_eq!(
ring.len_virtual_nodes(),
NUM_NODES * VNODES_PER_NODE as usize
);
for node_id in 0..NUM_NODES {
assert_eq!(ring.len_nodes(), NUM_NODES - node_id);
assert_eq!(
ring.len_virtual_nodes(),
(NUM_NODES - node_id) * VNODES_PER_NODE as usize
);
let n = Arc::new(format!("Node-{}", node_id));
ring.remove(&[n])?;
debug!("Hash Ring String Representation:\n{}", ring);
assert_eq!(ring.len_nodes(), NUM_NODES - (node_id + 1));
assert_eq!(
ring.len_virtual_nodes(),
(NUM_NODES - (node_id + 1)) * VNODES_PER_NODE as usize
);
}
assert_eq!(0, ring.len_nodes());
if let Ok(()) = ring.remove(&[Arc::from("Node-42".to_string())]) {
panic!("Unexpectedly removed 'Node-42' successfully from an empty ring!");
}
Ok(())
}
#[test]
fn test_has_virtual_node_singlethr_01() -> Result<()> {
const VNODES_PER_NODE: Vnid = 3;
const REPLICATION_FACTOR: u8 = 3;
init();
let ring = HashRing::with_nodes(VNODES_PER_NODE, REPLICATION_FACTOR, &[])?;
const NUM_NODES: usize = 4;
for node_id in 0..NUM_NODES {
let n = Arc::new(format!("Node-{}", node_id));
ring.insert(&[n])?;
}
assert_eq!(ring.len_nodes(), NUM_NODES);
assert_eq!(
ring.len_virtual_nodes(),
NUM_NODES * VNODES_PER_NODE as usize
);
for node_id in 0..NUM_NODES {
let n = Arc::new(format!("Node-{}", node_id));
for vnid in 0..VNODES_PER_NODE {
let vn = VirtualNode::new(&mut DefaultStdHasher::default(), Arc::clone(&n), vnid);
assert!(ring.has_virtual_node(&vn));
let node_name = n.hashring_node_id();
let mut name = Vec::with_capacity(node_name.len() + mem::size_of::<Vnid>());
name.extend(&*node_name);
name.extend(&vnid.to_ne_bytes());
let name = DefaultStdHasher::default().digest(&name);
assert!(ring.has_virtual_node(&name));
}
let vn = VirtualNode::new(
&mut DefaultStdHasher::default(),
Arc::clone(&n),
VNODES_PER_NODE,
);
assert!(!ring.has_virtual_node(&vn));
let node_name = n.hashring_node_id();
let mut name = Vec::with_capacity(node_name.len() + mem::size_of::<Vnid>());
name.extend(&*node_name);
name.extend(&VNODES_PER_NODE.to_ne_bytes());
let name = DefaultStdHasher::default().digest(&name);
assert!(!ring.has_virtual_node(&name));
}
for v in vec![vec![0u8, 1, 2, 3, 4, 5], vec![42u8; 142]].iter() {
assert!(!ring.has_virtual_node(v));
}
Ok(())
}
#[test]
fn test_virtual_node_for_key_singlethr_01() -> Result<()> {
const VNODES_PER_NODE: Vnid = 3;
const REPLICATION_FACTOR: u8 = 3;
init();
let ring = HashRing::with_nodes(VNODES_PER_NODE, REPLICATION_FACTOR, &[])?;
const NUM_NODES: usize = 4;
for node_id in 0..NUM_NODES {
let n = Arc::new(format!("Node-{}", node_id));
ring.insert(&[n])?;
}
assert_eq!(ring.len_nodes(), NUM_NODES);
assert_eq!(
ring.len_virtual_nodes(),
NUM_NODES * VNODES_PER_NODE as usize
);
let keys = vec![
hex!("232a8a941ee901c1"),
hex!("324317a375aa4201"),
hex!("4ff59699a3bacc04"),
hex!("62338d102fd1edce"),
hex!("6aad47fd1f3fc789"),
hex!("728254115d9da0a8"),
hex!("7cf6c43df9ff4b72"),
hex!("a416af15b94f0122"),
hex!("ab1c5045e605c275"),
hex!("acec6c33d08ac530"),
hex!("cbdaa742e68b020d"),
hex!("ed59d86868c13210"),
];
for node_id in 0..NUM_NODES {
assert_eq!(ring.len_nodes(), NUM_NODES - node_id);
debug!("Hash Ring String Representation:\n{}", ring);
for key in &keys {
let vn = ring.virtual_node_for_key(key)?;
eprintln!("Key {:x?} is assigned to vnode {}", key, vn);
}
let n = Arc::new(format!("Node-{}", node_id));
ring.remove(&[n])?;
}
Ok(())
}
#[test]
fn test_nodes_for_key_singlethr_01() -> Result<()> {
const VNODES_PER_NODE: Vnid = 3;
const REPLICATION_FACTOR: u8 = 3;
init();
let ring = HashRing::with_nodes(VNODES_PER_NODE, REPLICATION_FACTOR, &[])?;
const NUM_NODES: usize = 4;
for node_id in 0..NUM_NODES {
let n = Arc::new(format!("Node-{}", node_id));
ring.insert(&[n])?;
}
let keys = vec![
hex!("232a8a941ee901c1"),
hex!("324317a375aa4201"),
hex!("4ff59699a3bacc04"),
hex!("62338d102fd1edce"),
hex!("6aad47fd1f3fc789"),
hex!("728254115d9da0a8"),
hex!("7cf6c43df9ff4b72"),
hex!("a416af15b94f0122"),
hex!("ab1c5045e605c275"),
hex!("acec6c33d08ac530"),
hex!("cbdaa742e68b020d"),
hex!("ed59d86868c13210"),
];
for key in keys {
assert_eq!(
ring.virtual_node_for_key(&key)?.replica_owners(),
ring.nodes_for_key(&key)?
);
}
Ok(())
}
#[test]
fn test_adjacent_singlethr_01() -> Result<()> {
const VNODES_PER_NODE: Vnid = 3;
const REPLICATION_FACTOR: u8 = 3;
init();
let ring = HashRing::with_nodes(VNODES_PER_NODE, REPLICATION_FACTOR, &[])?;
const NUM_NODES: usize = 4;
for node_id in 0..NUM_NODES {
let n = Arc::new(format!("Node-{}", node_id));
ring.insert(&[n])?;
}
let keys = vec![
hex!("232a8a941ee901c1"),
hex!("324317a375aa4201"),
hex!("4ff59699a3bacc04"),
hex!("62338d102fd1edce"),
hex!("6aad47fd1f3fc789"),
hex!("728254115d9da0a8"),
hex!("7cf6c43df9ff4b72"),
hex!("a416af15b94f0122"),
hex!("ab1c5045e605c275"),
hex!("acec6c33d08ac530"),
hex!("cbdaa742e68b020d"),
hex!("ed59d86868c13210"),
];
let key = keys.first().unwrap();
let _ = ring.virtual_node_for_key(key)?;
let prev_key = keys.last().unwrap();
let prev_vn = ring.virtual_node_for_key(prev_key)?;
let pred = ring.predecessor(key)?;
assert_eq!(pred, prev_vn);
let next_key = keys.get(1).unwrap();
let next_vn = ring.virtual_node_for_key(next_key)?;
let succ = ring.successor(key)?;
assert_eq!(succ, next_vn);
for i in 1..keys.len() - 1 {
let key = keys.get(i).unwrap();
let _ = ring.virtual_node_for_key(key)?;
let prev_key = keys.get(i - 1).unwrap();
let prev_vn = ring.virtual_node_for_key(prev_key)?;
let pred = ring.predecessor(key)?;
assert_eq!(pred, prev_vn);
let next_key = keys.get(i + 1).unwrap();
let next_vn = ring.virtual_node_for_key(next_key)?;
let succ = ring.successor(key)?;
assert_eq!(succ, next_vn);
}
let key = keys.last().unwrap();
let _ = ring.virtual_node_for_key(key)?;
let prev_key = keys.get(keys.len() - 2).unwrap();
let prev_vn = ring.virtual_node_for_key(prev_key)?;
let pred = ring.predecessor(key)?;
assert_eq!(pred, prev_vn);
let next_key = keys.first().unwrap();
let next_vn = ring.virtual_node_for_key(next_key)?;
let succ = ring.successor(key)?;
assert_eq!(succ, next_vn);
Ok(())
}
#[test]
fn test_adjacent_node_singlethr_01() -> Result<()> {
const VNODES_PER_NODE: Vnid = 3;
const REPLICATION_FACTOR: u8 = 3;
init();
let ring = HashRing::with_nodes(VNODES_PER_NODE, REPLICATION_FACTOR, &[])?;
const NUM_NODES: usize = 4;
for node_id in 0..NUM_NODES {
let n = Arc::new(format!("Node-{}", node_id));
ring.insert(&[n])?;
}
let keys = vec![
hex!("232a8a941ee901c1"),
hex!("324317a375aa4201"),
hex!("4ff59699a3bacc04"),
hex!("62338d102fd1edce"),
hex!("6aad47fd1f3fc789"),
hex!("728254115d9da0a8"),
hex!("7cf6c43df9ff4b72"),
hex!("a416af15b94f0122"),
hex!("ab1c5045e605c275"),
hex!("acec6c33d08ac530"),
hex!("cbdaa742e68b020d"),
hex!("ed59d86868c13210"),
];
trace!("ring = {}", ring);
debug!("Check successor_node()...");
for key in &keys {
trace!("\n--> key = {:x?}", key);
let owners = ring.nodes_for_key(key)?;
trace!("owners = {:?}", owners);
let succ_vn = ring.successor_node(key)?;
trace!("succ_vn = {}", succ_vn);
assert_eq!(
succ_vn.node.hashring_node_id(),
owners.get(1).unwrap().hashring_node_id()
);
}
debug!("Check predecessor_node()...");
for key in &keys {
trace!("\n--> key = {:x?}", key);
let vn = ring.virtual_node_for_key(key)?;
trace!("vn = {}", vn);
let pred_vn = ring.predecessor_node(key)?;
trace!("pred_vn = {}", pred_vn);
assert_eq!(pred_vn.replica_owners()[1], vn.node);
}
Ok(())
}
#[test]
fn test_iter_singlethr_01() -> Result<()> {
const VNODES_PER_NODE: Vnid = 3;
const REPLICATION_FACTOR: u8 = 3;
init();
let ring = HashRing::with_nodes(VNODES_PER_NODE, REPLICATION_FACTOR, &[])?;
const NUM_NODES: usize = 4;
for node_id in 0..NUM_NODES {
let n = Arc::new(format!("Node-{}", node_id));
ring.insert(&[n])?;
}
debug!("ring: {}", ring);
let guard = &pin();
for vn in ring.iter(guard) {
trace!("vn = {}", vn);
trace!("vn.name = {:x?}", vn.name);
trace!("vn.node = {}", vn.node);
trace!("vn.replica_owners = {:?}", vn.replica_owners());
}
Ok(())
}
fn test_iter_multithr_01_with_hasher<H: Hasher + Send + Sync + 'static>(h: H) -> Result<()> {
const VNODES_PER_NODE: Vnid = 3;
const REPLICATION_FACTOR: u8 = 3;
init();
let ring = HashRing::with_hasher(h, VNODES_PER_NODE, REPLICATION_FACTOR)?;
const NUM_NODES: usize = 4;
for node_id in 0..NUM_NODES {
let n = Arc::new(format!("Node-{}", node_id));
ring.insert(&[n])?;
}
debug!("ring: {}", ring);
let ring = Arc::new(ring);
let r1 = Arc::clone(&ring);
let r2 = Arc::clone(&ring);
let t1 = thread::spawn(move || {
trace!("START: r1.len_virtual_nodes() = {}", r1.len_virtual_nodes());
thread::sleep(Duration::from_millis(100));
const TOTAL_NODES: usize = 20;
for node_id in NUM_NODES..TOTAL_NODES {
let n = Arc::new(format!("Node-{}", node_id));
if let Err(err) = r1.insert(&[n]) {
match err {
HashRingError::ConcurrentModification => {
warn!("{:?}", err);
}
_ => {
error!("{:?}", err);
}
};
};
}
trace!("END: r1.len_virtual_nodes() = {}", r1.len_virtual_nodes());
});
let t2 = thread::spawn(move || {
debug!("START: r2.len_vnodes() = {}", r2.len_virtual_nodes());
let guard = &pin();
let hashring_iter = r2.iter(guard).enumerate();
thread::sleep(Duration::from_millis(1000));
let mut count = 0;
for (i, vn) in hashring_iter {
debug!("ITERATION {}: {}", i, vn);
count += 1;
}
assert_eq!(count, NUM_NODES * VNODES_PER_NODE as usize);
trace!("END: r2.len_virtual_nodes() = {}", r2.len_virtual_nodes());
trace!("END: r2 = {}", r2);
});
t1.join().unwrap();
t2.join().unwrap();
Ok(())
}
#[test]
fn test_iter_multithr_01_stdhash() -> Result<()> {
test_iter_multithr_01_with_hasher(DefaultStdHasher::default())
}
#[cfg(feature = "blake3-hash")]
#[test]
fn test_iter_multithr_01_blake3() -> Result<()> {
test_iter_multithr_01_with_hasher(Blake3Hasher::default())
}
#[cfg(feature = "blake2b-hash")]
#[test]
fn test_iter_multithr_01_blake2b() -> Result<()> {
test_iter_multithr_01_with_hasher(Blake2bHasher::default())
}
#[test]
fn test_iter_singlethr_02() -> Result<()> {
const VNODES_PER_NODE: Vnid = 3;
const REPLICATION_FACTOR: u8 = 3;
init();
let ring = HashRing::with_nodes(VNODES_PER_NODE, REPLICATION_FACTOR, &[])?;
const NUM_NODES: usize = 4;
for node_id in 0..NUM_NODES {
let n = Arc::new(format!("Node-{}", node_id));
ring.insert(&[n])?;
}
debug!("ring: {}", ring);
let guard = &pin();
let mut vns = Vec::with_capacity(NUM_NODES * VNODES_PER_NODE as usize);
for vn in ring.iter(guard).rev() {
trace!("vn = {}", vn);
trace!("vn.name = {:x?}", vn.name);
trace!("vn.node = {}", vn.node);
trace!("vn.replica_owners = {:?}", vn.replica_owners());
vns.push(vn);
}
for (i, vn) in ring.iter(guard).enumerate() {
trace!(
"comparing vn-{} to vns[{}]",
i,
NUM_NODES * VNODES_PER_NODE as usize - i - 1
);
assert_eq!(
&vn,
vns.get(NUM_NODES * VNODES_PER_NODE as usize - i - 1)
.expect("OOPS")
);
}
Ok(())
}
#[test]
fn test_iter_singlethr_03() -> Result<()> {
const VNODES_PER_NODE: Vnid = 3;
const REPLICATION_FACTOR: u8 = 3;
init();
let ring = HashRing::with_nodes(VNODES_PER_NODE, REPLICATION_FACTOR, &[])?;
const NUM_NODES: usize = 4;
for node_id in 0..NUM_NODES {
let n = Arc::new(format!("Node-{}", node_id));
ring.insert(&[n])?;
}
trace!("ring: {}", ring);
let guard = &pin();
let mut m = HashMap::with_capacity(NUM_NODES * VNODES_PER_NODE as usize);
for (i, vn) in ring.iter(guard).enumerate() {
m.insert(vn, i);
}
let mut iter = ring.iter(guard);
for node_id in 100..100 + NUM_NODES {
let n = Arc::new(format!("Node-{}", node_id));
ring.insert(&[n])?;
}
let mut times = 0;
let mut front = true;
loop {
let vn = if front { iter.next() } else { iter.next_back() };
if let Some(vn) = vn {
trace!("(vnode {:2}) : {}", m.get(vn).unwrap(), vn);
} else {
break;
}
front = !front;
times += 1;
}
assert_eq!(times, NUM_NODES * VNODES_PER_NODE as usize);
Ok(())
}
#[test]
fn test_iter_singlethr_04() -> Result<()> {
const VNODES_PER_NODE: Vnid = 3;
const REPLICATION_FACTOR: u8 = 3;
init();
let ring = HashRing::with_nodes(VNODES_PER_NODE, REPLICATION_FACTOR, &[])?;
const NUM_NODES: usize = 4;
for node_id in 0..NUM_NODES {
let n = Arc::new(format!("Node-{}", node_id));
ring.insert(&[n])?;
}
trace!("ring: {}", ring);
let guard = &pin();
assert_eq!(ring.iter(guard).count(), ring.iter(guard).len());
let iter = ring.iter(guard);
trace!("iter.size_hint() = {:?}", iter.size_hint());
assert_eq!(
iter.size_hint(),
(
NUM_NODES * VNODES_PER_NODE as usize,
Some(NUM_NODES * VNODES_PER_NODE as usize)
)
);
let iter = iter.filter(|&vn| vn.name.last() == Some(&42));
trace!("iter.size_hint() = {:?}", iter.size_hint());
assert_eq!(
iter.size_hint(),
(0, Some(NUM_NODES * VNODES_PER_NODE as usize))
);
let iter = iter.chain(ring.iter(guard));
trace!("iter.size_hint() = {:?}", iter.size_hint());
assert_eq!(
iter.size_hint(),
(
NUM_NODES * VNODES_PER_NODE as usize,
Some(2 * NUM_NODES * VNODES_PER_NODE as usize)
)
);
const ITERS: usize = 1000;
let mut iter = ring.iter(guard);
for _ in iter.by_ref() {}
let mut nonez = Vec::with_capacity(ITERS);
for _ in 0..ITERS {
nonez.push(iter.next());
}
assert!(nonez.iter().all(|none| none.is_none()));
trace!("all {} entries are None!", nonez.len());
Ok(())
}
#[test]
fn test_extend_singlethr_01() -> Result<()> {
const VNODES_PER_NODE: Vnid = 4;
const REPLICATION_FACTOR: u8 = 2;
init();
let nodes: Vec<Arc<str>> = vec![Arc::from("Node1"), Arc::from("Node2"), Arc::from("Node3")];
let mut ring = HashRing::with_nodes(VNODES_PER_NODE, REPLICATION_FACTOR, &nodes)?;
trace!("ring = {}", ring);
assert_eq!(ring.len_nodes(), nodes.len());
assert_eq!(
ring.len_virtual_nodes(),
nodes.len() * VNODES_PER_NODE as usize
);
ring.extend(vec![Arc::from("Node11"), Arc::from("Node12")]);
trace!("ring = {}", ring);
assert_eq!(ring.len_nodes(), nodes.len() + 2);
assert_eq!(
ring.len_virtual_nodes(),
(nodes.len() + 2) * VNODES_PER_NODE as usize
);
let res = std::panic::catch_unwind(move || {
ring.extend(vec![Arc::from("Node11"), Arc::from("Node12")])
});
assert!(res.is_err());
Ok(())
}
#[test]
fn test_contention_multithr_01() -> Result<()> {
const VNODES_PER_NODE: Vnid = 4;
const REPLICATION_FACTOR: u8 = 3;
init();
let ring = HashRing::with_nodes(VNODES_PER_NODE, REPLICATION_FACTOR, &[])?;
const ITERS: usize = 50;
const NUM_THREADS: usize = 10;
let chunk_operation =
|op: Update, tid: usize, ring: Arc<HashRing<String, DefaultStdHasher>>| {
let mut completed_nodes = HashSet::new();
for node_id in tid * ITERS..(tid + 1) * ITERS {
let n = Arc::new(format!("Node-{}", node_id));
trace!("[{}] adding {:?}...", tid, n);
while let Err(err) = match op {
Update::Insert => ring.insert(&[Arc::clone(&n)]),
Update::Remove => ring.remove(&[Arc::clone(&n)]),
} {
match err {
HashRingError::ConcurrentModification => {
trace!("[{}] failed on Node-{}: {:?}", tid, node_id, err);
}
_ => {
warn!("[{}] failed on Node-{}: {:?}", tid, node_id, err);
}
}
}
trace!("[{}] progressed on Node-{}", tid, node_id);
let _ = completed_nodes.insert(node_id);
}
completed_nodes
};
let ring = Arc::new(ring);
let mut handles = Vec::with_capacity(NUM_THREADS);
let mut sets = Vec::with_capacity(NUM_THREADS);
for tid in 0..NUM_THREADS {
let r = Arc::clone(&ring);
handles.push(thread::spawn(move || {
chunk_operation(Update::Insert, tid, r)
}));
}
for (tid, handle) in handles.into_iter().enumerate() {
match handle.join() {
Ok(s) => {
trace!("[main] thread {} was successfully joined", tid);
assert_eq!(s.len(), ITERS);
sets.push(s);
}
Err(err) => {
error!("[main] error joining thread {}: {:?}", tid, err);
}
};
}
let mut handles = Vec::with_capacity(NUM_THREADS);
for _ in 0..NUM_THREADS {
let r = Arc::clone(&ring);
handles.push(thread::spawn(move || {
let guard = &pin();
assert_eq!(
r.iter(guard).count(),
NUM_THREADS * ITERS * VNODES_PER_NODE as usize
);
assert_eq!(r.iter(guard).count(), r.len_virtual_nodes(),);
}));
}
sets.iter().enumerate().for_each(|(i, si)| {
sets.iter().enumerate().for_each(|(j, sj)| {
if i == j {
assert!(si.is_subset(sj) && si.is_superset(sj));
} else {
assert!(si.is_disjoint(sj));
}
});
});
let union: BTreeSet<_> = sets.iter().flatten().collect();
assert_eq!(union.len(), NUM_THREADS * ITERS);
union
.iter()
.zip(0..NUM_THREADS * ITERS)
.for_each(|(&&id, i)| {
assert_eq!(id, i);
});
trace!("[main] {} distinct nodes have been inserted", union.len());
trace!("[main] len_nodes() = {}", ring.len_nodes());
trace!("[main] len_virtual_nodes() = {}", ring.len_virtual_nodes());
assert_eq!(union.len(), ring.len_nodes());
assert_eq!(
union.len() * VNODES_PER_NODE as usize,
ring.len_virtual_nodes()
);
trace!("[main] ring = {}", ring);
for (tid, handle) in handles.into_iter().enumerate() {
if let Err(err) = handle.join() {
error!("[main] error joining thread {}: {:?}", tid, err);
}
}
let mut handles = Vec::with_capacity(NUM_THREADS);
let mut sets = Vec::with_capacity(NUM_THREADS);
for tid in 0..NUM_THREADS {
let r = Arc::clone(&ring);
handles.push(thread::spawn(move || {
chunk_operation(Update::Remove, tid, r)
}));
}
for (tid, handle) in handles.into_iter().enumerate() {
match handle.join() {
Ok(s) => {
trace!("[main] thread {} was successfully joined", tid);
assert_eq!(s.len(), ITERS);
sets.push(s);
}
Err(err) => {
error!("[main] error joining thread {}: {:?}", tid, err);
}
};
}
assert_eq!(0, ring.len_virtual_nodes());
assert_eq!(0, ring.len_nodes());
trace!("[main] ring = {}", ring);
Ok(())
}