dbx_core/sharding/
node_ring.rs1use crate::sharding::router::{ShardNode, fnv1a_hash};
4use std::collections::BTreeMap;
5
6#[derive(Debug, Default)]
8pub struct NodeRing {
9 ring: BTreeMap<u64, usize>,
11 vnodes_per_node: usize,
13}
14
15impl NodeRing {
16 pub fn new(vnodes_per_node: usize) -> Self {
18 Self {
19 ring: BTreeMap::new(),
20 vnodes_per_node,
21 }
22 }
23
24 pub fn add_node(&mut self, node: &ShardNode) {
28 let vnode_count = ((self.vnodes_per_node as f64) * node.weight.max(0.1)) as usize;
29 for i in 0..vnode_count {
30 let vnode_key = format!("node:{}-vnode:{}", node.id, i);
31 let hash = fnv1a_hash(vnode_key.as_bytes());
32 self.ring.insert(hash, node.id);
33 }
34 }
35
36 pub fn remove_node(&mut self, node_id: usize) {
40 let max_vnodes = (self.vnodes_per_node as f64 * 10.0) as usize;
41 let mut to_remove = Vec::new();
42 for i in 0..max_vnodes {
43 let vnode_key = format!("node:{}-vnode:{}", node_id, i);
44 let hash = fnv1a_hash(vnode_key.as_bytes());
45 to_remove.push(hash);
46 }
47 for hash in to_remove {
48 self.ring.remove(&hash);
49 }
50 }
51
52 pub fn get_node(&self, hash_val: u64) -> Option<usize> {
54 if self.ring.is_empty() {
55 return None;
56 }
57
58 if let Some((_, &node_id)) = self.ring.range(hash_val..).next() {
60 Some(node_id)
61 } else {
62 let (_, &node_id) = self.ring.iter().next().unwrap();
64 Some(node_id)
65 }
66 }
67
68 pub fn is_empty(&self) -> bool {
70 self.ring.is_empty()
71 }
72}
73
74#[cfg(test)]
75mod tests {
76 use super::*;
77
78 #[test]
79 fn test_node_ring_distribution() {
80 let mut ring = NodeRing::new(300); let n1 = ShardNode {
82 id: 0,
83 address: "1".into(),
84 weight: 1.0,
85 };
86 let n2 = ShardNode {
87 id: 1,
88 address: "2".into(),
89 weight: 1.0,
90 };
91 let n3 = ShardNode {
92 id: 2,
93 address: "3".into(),
94 weight: 1.0,
95 };
96
97 ring.add_node(&n1);
98 ring.add_node(&n2);
99 ring.add_node(&n3);
100
101 let mut counts = vec![0; 3];
102 let total_keys = 30000;
104 for i in 0..total_keys {
105 let k = format!("data-{}", i);
106 let h = fnv1a_hash(k.as_bytes());
107 let node_id = ring.get_node(h).unwrap();
108 counts[node_id] += 1;
109 }
110
111 let expected = total_keys as f64 / 3.0;
114 let diff = expected * 0.7; for i in 0..3 {
117 let count = counts[i] as f64;
118 assert!(
119 (count - expected).abs() <= diff,
120 "Node {} load {} is outside expected {} +- {}",
121 i,
122 count,
123 expected,
124 diff
125 );
126 }
127 }
128
129 #[test]
130 fn test_node_ring_add_remove() {
131 let mut ring = NodeRing::new(10);
132 let n1 = ShardNode {
133 id: 100,
134 address: "A".into(),
135 weight: 1.0,
136 };
137 let n2 = ShardNode {
138 id: 200,
139 address: "B".into(),
140 weight: 1.0,
141 };
142
143 ring.add_node(&n1);
144 ring.add_node(&n2);
145
146 let node_id = ring.get_node(12345).unwrap();
148 assert!(node_id == 100 || node_id == 200);
149
150 ring.remove_node(100);
152
153 for i in 0..10_000 {
155 assert_eq!(ring.get_node(i).unwrap(), 200);
156 }
157
158 ring.remove_node(200);
160 assert!(ring.is_empty());
161 assert_eq!(ring.get_node(12345), None);
162 }
163}