consistent_hashing_aa/
consistent_hashing.rs

1use std::{collections::{BTreeMap, HashSet}, hash::{DefaultHasher, Hash, Hasher}};
2
3use crate::transaction::Transaction;
4
5pub struct ConsistentHashing {
6    pub ring: BTreeMap<u64, String>,
7    pub nodes: HashSet<String>,
8    pub virtual_nodes_count: u32,
9}
10
11#[derive(Debug)]
12pub enum ConsistentHashingError {
13    NodeAlreadyExists(String),
14    NodeDoesNotExist(String),
15    RingIsEmpty(String),
16    ZeroVirtualNodes(String),
17    UnchangedVirtualNodeCount(String)
18}
19
20impl ConsistentHashing {
21    
22    pub fn new(virtual_nodes_count: u32) -> Self {
23        return ConsistentHashing {
24            ring: BTreeMap::new(),
25            nodes: HashSet::new(),
26            virtual_nodes_count,
27        };
28    }
29
30    pub fn new_with_nodes(virtual_nodes_count: u32, nodes: Vec<String>) -> Self {
31        let mut consistent_hashing = ConsistentHashing::new(virtual_nodes_count);
32        for node in nodes {
33            match consistent_hashing.add_node(&node) {
34                Ok(_) => (),
35                Err(_) => panic!("Node already exists")
36            };
37        }
38        return consistent_hashing;
39    }
40
41    pub fn get_virtual_node_form(&self, node: &str, i: u32) -> String {
42        return format!("{}-{}", node, i);
43    }
44
45    pub fn get_current_state(&self) -> Vec<(u64, String)> {
46        let mut x: Vec<(u64, String)> = self.ring.iter().map(|(k, v)| (*k, v.clone())).collect();
47        x.sort_by(|a, b| a.0.cmp(&b.0));
48        return x;
49    }
50
51    pub fn hash<U: Hash>(&self, item: &U) -> u64 {
52        // let begin = Instant::now();
53        let mut hasher = DefaultHasher::default();
54        // println!("Hashing took {:?}", begin.elapsed());
55        item.hash(&mut hasher);
56        return hasher.finish();
57    }
58
59    pub fn get_previous_node(&self, node: &str) -> Option<(&u64, &String)> {
60        
61        let hashed_value = self.hash(&node.to_string());
62        if let Some(prev) = self.ring.range(..hashed_value).next_back() {
63            return Some(prev);
64        }
65        return self.ring.iter().next_back().clone();
66    }
67
68    pub fn get_previous_node_by_hash(&self, hash: u64) -> Option<(&u64, &String)> {
69        if let Some(prev) = self.ring.range(..hash).next_back() {
70            return Some(prev);
71        }
72        return self.ring.iter().next_back().clone();
73    }
74
75    pub fn get_next_node(&self, node: &str) -> Option<(&u64, &String)> {
76        let hashed_value = self.hash(&node.to_string());
77        if let Some(prev) = self.ring.range(hashed_value..).skip(1).next() {
78            return Some(prev);
79        }
80        return self.ring.iter().next().clone();
81    }
82
83    pub fn get_next_node_by_hash(&self, hash: u64) -> Option<(&u64, &String)> {
84        if let Some(prev) = self.ring.range(hash..).skip(1).next() {
85            return Some(prev);
86        }
87        return self.ring.iter().next().clone();
88    }
89
90    /// hashes nodex-i ...
91    pub fn add_node(&mut self, node: &str) -> Result<Vec<Transaction<String, u64>>, ConsistentHashingError> {
92        if self.nodes.contains(node) {
93            return Err(ConsistentHashingError::NodeAlreadyExists("This node already exist".to_string()));
94        }
95
96        if self.virtual_nodes_count == 0 {
97            return Err(ConsistentHashingError::ZeroVirtualNodes("Cannot add node with zero virtual nodes".to_string()));
98        }    
99        self.nodes.insert(node.to_string());
100        let mut transactions = Vec::with_capacity(self.virtual_nodes_count as usize);
101        for i in 0..self.virtual_nodes_count {
102            
103            let v_node = self.get_virtual_node_form(node, i);
104            let hash = self.hash(&v_node);
105            self.ring.insert(hash, node.to_string());
106
107            let next = self.get_next_node_by_hash(hash).unwrap();
108            if next.1 != node {
109                let prev = self.get_previous_node_by_hash(hash).unwrap();
110                let new_transaction = Transaction::new(
111                    next.1.to_string(),
112                    node.to_string(),
113                    *prev.0,
114                    hash
115                );
116                transactions.push(new_transaction);
117            }
118        }
119        return Ok(transactions);
120    }
121
122    pub fn remove_node(&mut self, node: &str) -> Result<Vec<Transaction<String, u64>>, ConsistentHashingError> {
123        if !self.nodes.contains(node) {
124            return Err(ConsistentHashingError::NodeDoesNotExist("This node doesn't exist".to_string()));
125        }
126
127        let mut transactions = vec![];
128        self.nodes.remove(node);
129
130        println!("removing: {}", node);
131
132        for i in 0..self.virtual_nodes_count {
133            let v_node = self.get_virtual_node_form(node, i);
134            let hash = self.hash(&v_node);
135
136            let next = self.get_next_node_by_hash(hash).unwrap();
137            if next.1 != node {
138                let prev = self.get_previous_node_by_hash(hash).unwrap();
139                let new_transaction = Transaction::new(
140                    node.to_string(),
141                    next.1.to_string(),
142                    *prev.0,
143                    hash
144                );
145                transactions.push(new_transaction);
146            }
147            self.ring.remove(&hash);
148        }
149        return Ok(transactions);
150    }
151
152    pub fn set_virtual_nodes_count(&mut self, count: u32) -> Result<Vec<Transaction<String, u64>>, ConsistentHashingError> {
153        
154        if count == 0 {
155            return Err(ConsistentHashingError::ZeroVirtualNodes("Cannot set virtual nodes count to 0".to_string()));
156        }
157
158        if count == self.virtual_nodes_count {
159            return Err(ConsistentHashingError::UnchangedVirtualNodeCount("New virtual nodes count is same as current".to_string()));
160        }
161
162        let mut transactions = vec![];
163        let diff: i32 = count as i32 - self.virtual_nodes_count as i32;
164
165        if diff > 0 {
166            // add nodes
167            for node in &self.nodes {
168                for i in self.virtual_nodes_count..count {
169
170                    let v_node = self.get_virtual_node_form(node, i);
171                    println!("adding v_node: {}", v_node);
172                    let hash = self.hash(&v_node);
173
174                    self.ring.insert(hash, node.to_string());
175                    let prev = self.get_previous_node_by_hash(hash).unwrap();
176                    let next = self.get_next_node_by_hash(hash).unwrap();
177
178                    if next.1 != node {
179                        let transaction = Transaction::new(
180                            next.1.to_string(),
181                            node.to_string(),
182                            *prev.0,
183                            hash
184                        );
185                        println!("{} with hash: {}", v_node, hash);
186                        println!("trans {:?}", transaction);
187                        transactions.push(transaction);
188                    }
189
190
191                    let state = self.get_current_state();
192                    for (h, n) in state {
193                        println!("{}: {}", n, h);
194                    }
195                }
196            }
197        }
198        else {
199            // remove nodes
200            for node in &self.nodes {
201                for i in (count..self.virtual_nodes_count).rev() {
202                    
203                    let state = self.get_current_state();
204                    for (h, n) in state {
205                        println!("{}: {}", n, h);
206                    }
207                    
208                    let v_node = self.get_virtual_node_form(node, i);
209                    let hash = self.hash(&v_node);
210
211                    let prev = self.get_previous_node_by_hash(hash).unwrap();
212                    let next = self.get_next_node_by_hash(hash).unwrap();
213
214                    if next.1 != node {
215                        let transaction = Transaction::new(
216                            node.to_string(), 
217                            next.1.to_string(),
218                            *prev.0,
219                            hash
220                        );
221                        println!("{} with hash: {}", v_node, hash);
222                        println!("trans {:?}", transaction);
223                        transactions.push(transaction);
224                    }
225
226                    self.ring.remove(&hash);
227                }
228            }
229        }
230
231        self.virtual_nodes_count = count;
232        return Ok(transactions);
233    }
234
235    pub fn get_node<U: Hash>(&self, key: &U) -> (Option<&String>, Option<u64>) {
236        if self.ring.is_empty() {
237            return (None, None);
238        }
239        let hash = self.hash(key);
240        println!("key hash: {}", hash);
241        let node = self.ring
242            .range(hash..)
243            .next()
244            .or_else(|| self.ring.iter().next());
245        return (Some(node.unwrap().1), Some(hash));
246            
247    }
248
249}