consistent_hashing_aa/
consistent_hashing.rs1use std::{collections::{BTreeMap, HashSet}, hash::{DefaultHasher, Hash, Hasher}};
2
3use crate::transaction::Transaction;
4
5pub struct ConsistentHashing {
6 pub ring: BTreeMap<u64, String>,
7 pub nodes: HashSet<String>,
8 pub virtual_nodes_count: u32,
9}
10
11#[derive(Debug)]
12pub enum ConsistentHashingError {
13 NodeAlreadyExists(String),
14 NodeDoesNotExist(String),
15 RingIsEmpty(String),
16 ZeroVirtualNodes(String),
17 UnchangedVirtualNodeCount(String)
18}
19
20impl ConsistentHashing {
21
22 pub fn new(virtual_nodes_count: u32) -> Self {
23 return ConsistentHashing {
24 ring: BTreeMap::new(),
25 nodes: HashSet::new(),
26 virtual_nodes_count,
27 };
28 }
29
30 pub fn new_with_nodes(virtual_nodes_count: u32, nodes: Vec<String>) -> Self {
31 let mut consistent_hashing = ConsistentHashing::new(virtual_nodes_count);
32 for node in nodes {
33 match consistent_hashing.add_node(&node) {
34 Ok(_) => (),
35 Err(_) => panic!("Node already exists")
36 };
37 }
38 return consistent_hashing;
39 }
40
41 pub fn get_virtual_node_form(&self, node: &str, i: u32) -> String {
42 return format!("{}-{}", node, i);
43 }
44
45 pub fn get_current_state(&self) -> Vec<(u64, String)> {
46 let mut x: Vec<(u64, String)> = self.ring.iter().map(|(k, v)| (*k, v.clone())).collect();
47 x.sort_by(|a, b| a.0.cmp(&b.0));
48 return x;
49 }
50
51 pub fn hash<U: Hash>(&self, item: &U) -> u64 {
52 let mut hasher = DefaultHasher::default();
54 item.hash(&mut hasher);
56 return hasher.finish();
57 }
58
59 pub fn get_previous_node(&self, node: &str) -> Option<(&u64, &String)> {
60
61 let hashed_value = self.hash(&node.to_string());
62 if let Some(prev) = self.ring.range(..hashed_value).next_back() {
63 return Some(prev);
64 }
65 return self.ring.iter().next_back().clone();
66 }
67
68 pub fn get_previous_node_by_hash(&self, hash: u64) -> Option<(&u64, &String)> {
69 if let Some(prev) = self.ring.range(..hash).next_back() {
70 return Some(prev);
71 }
72 return self.ring.iter().next_back().clone();
73 }
74
75 pub fn get_next_node(&self, node: &str) -> Option<(&u64, &String)> {
76 let hashed_value = self.hash(&node.to_string());
77 if let Some(prev) = self.ring.range(hashed_value..).skip(1).next() {
78 return Some(prev);
79 }
80 return self.ring.iter().next().clone();
81 }
82
83 pub fn get_next_node_by_hash(&self, hash: u64) -> Option<(&u64, &String)> {
84 if let Some(prev) = self.ring.range(hash..).skip(1).next() {
85 return Some(prev);
86 }
87 return self.ring.iter().next().clone();
88 }
89
90 pub fn add_node(&mut self, node: &str) -> Result<Vec<Transaction<String, u64>>, ConsistentHashingError> {
92 if self.nodes.contains(node) {
93 return Err(ConsistentHashingError::NodeAlreadyExists("This node already exist".to_string()));
94 }
95
96 if self.virtual_nodes_count == 0 {
97 return Err(ConsistentHashingError::ZeroVirtualNodes("Cannot add node with zero virtual nodes".to_string()));
98 }
99 self.nodes.insert(node.to_string());
100 let mut transactions = Vec::with_capacity(self.virtual_nodes_count as usize);
101 for i in 0..self.virtual_nodes_count {
102
103 let v_node = self.get_virtual_node_form(node, i);
104 let hash = self.hash(&v_node);
105 self.ring.insert(hash, node.to_string());
106
107 let next = self.get_next_node_by_hash(hash).unwrap();
108 if next.1 != node {
109 let prev = self.get_previous_node_by_hash(hash).unwrap();
110 let new_transaction = Transaction::new(
111 next.1.to_string(),
112 node.to_string(),
113 *prev.0,
114 hash
115 );
116 transactions.push(new_transaction);
117 }
118 }
119 return Ok(transactions);
120 }
121
122 pub fn remove_node(&mut self, node: &str) -> Result<Vec<Transaction<String, u64>>, ConsistentHashingError> {
123 if !self.nodes.contains(node) {
124 return Err(ConsistentHashingError::NodeDoesNotExist("This node doesn't exist".to_string()));
125 }
126
127 let mut transactions = vec![];
128 self.nodes.remove(node);
129
130 println!("removing: {}", node);
131
132 for i in 0..self.virtual_nodes_count {
133 let v_node = self.get_virtual_node_form(node, i);
134 let hash = self.hash(&v_node);
135
136 let next = self.get_next_node_by_hash(hash).unwrap();
137 if next.1 != node {
138 let prev = self.get_previous_node_by_hash(hash).unwrap();
139 let new_transaction = Transaction::new(
140 node.to_string(),
141 next.1.to_string(),
142 *prev.0,
143 hash
144 );
145 transactions.push(new_transaction);
146 }
147 self.ring.remove(&hash);
148 }
149 return Ok(transactions);
150 }
151
152 pub fn set_virtual_nodes_count(&mut self, count: u32) -> Result<Vec<Transaction<String, u64>>, ConsistentHashingError> {
153
154 if count == 0 {
155 return Err(ConsistentHashingError::ZeroVirtualNodes("Cannot set virtual nodes count to 0".to_string()));
156 }
157
158 if count == self.virtual_nodes_count {
159 return Err(ConsistentHashingError::UnchangedVirtualNodeCount("New virtual nodes count is same as current".to_string()));
160 }
161
162 let mut transactions = vec![];
163 let diff: i32 = count as i32 - self.virtual_nodes_count as i32;
164
165 if diff > 0 {
166 for node in &self.nodes {
168 for i in self.virtual_nodes_count..count {
169
170 let v_node = self.get_virtual_node_form(node, i);
171 println!("adding v_node: {}", v_node);
172 let hash = self.hash(&v_node);
173
174 self.ring.insert(hash, node.to_string());
175 let prev = self.get_previous_node_by_hash(hash).unwrap();
176 let next = self.get_next_node_by_hash(hash).unwrap();
177
178 if next.1 != node {
179 let transaction = Transaction::new(
180 next.1.to_string(),
181 node.to_string(),
182 *prev.0,
183 hash
184 );
185 println!("{} with hash: {}", v_node, hash);
186 println!("trans {:?}", transaction);
187 transactions.push(transaction);
188 }
189
190
191 let state = self.get_current_state();
192 for (h, n) in state {
193 println!("{}: {}", n, h);
194 }
195 }
196 }
197 }
198 else {
199 for node in &self.nodes {
201 for i in (count..self.virtual_nodes_count).rev() {
202
203 let state = self.get_current_state();
204 for (h, n) in state {
205 println!("{}: {}", n, h);
206 }
207
208 let v_node = self.get_virtual_node_form(node, i);
209 let hash = self.hash(&v_node);
210
211 let prev = self.get_previous_node_by_hash(hash).unwrap();
212 let next = self.get_next_node_by_hash(hash).unwrap();
213
214 if next.1 != node {
215 let transaction = Transaction::new(
216 node.to_string(),
217 next.1.to_string(),
218 *prev.0,
219 hash
220 );
221 println!("{} with hash: {}", v_node, hash);
222 println!("trans {:?}", transaction);
223 transactions.push(transaction);
224 }
225
226 self.ring.remove(&hash);
227 }
228 }
229 }
230
231 self.virtual_nodes_count = count;
232 return Ok(transactions);
233 }
234
235 pub fn get_node<U: Hash>(&self, key: &U) -> (Option<&String>, Option<u64>) {
236 if self.ring.is_empty() {
237 return (None, None);
238 }
239 let hash = self.hash(key);
240 println!("key hash: {}", hash);
241 let node = self.ring
242 .range(hash..)
243 .next()
244 .or_else(|| self.ring.iter().next());
245 return (Some(node.unwrap().1), Some(hash));
246
247 }
248
249}