conhash_ring/lib.rs
1#![doc = include_str!("../README.md")]
2
3#[macro_use]
4extern crate nestify;
5
6use ordermap::OrderSet;
7#[cfg(test)]
8use serde::{Deserialize, Serialize};
9
10use anyhow::{anyhow, bail, Result};
11use std::{
12 cell::RefCell,
13 collections::{BTreeMap, HashMap, HashSet},
14 fmt::Debug,
15 hash::{DefaultHasher, Hash, Hasher},
16 rc::Rc,
17};
18
19use bon::Builder;
20use digest::Digest;
21#[cfg(feature = "md5")]
22use md5::Md5;
23
24/// A trait that defines the behavior of a hashing function used in the consistent hashing ring.
25///
26/// Implementors of this trait must provide a `digest` method that takes a string input
27/// and returns a 64-bit hash value. This hash value is used to determine the placement
28/// of keys and nodes in the consistent hashing ring.
29pub trait RingHasherTrait {
30 /// Computes a 64-bit hash value for the given input string.
31 ///
32 /// # Arguments
33 /// - `data`: The input string to hash.
34 ///
35 /// # Returns
36 /// - `Result<u64>`: The computed hash value or an error if the hashing fails.
37 fn digest(&self, data: &str) -> Result<u64>;
38}
39
40#[derive(Debug, Default)]
41pub enum RingHasher {
42 #[default]
43 Default_,
44 #[cfg(feature = "md5")]
45 Md5,
46}
47
48impl RingHasherTrait for RingHasher {
49 fn digest(&self, data: &str) -> Result<u64> {
50 match self {
51 RingHasher::Default_ => {
52 let mut hasher = DefaultHasher::new();
53 hasher.write(data.as_bytes());
54
55 Ok(hasher.finish())
56 }
57 #[cfg(feature = "md5")]
58 RingHasher::Md5 => {
59 let hash = Md5::digest(data.as_bytes());
60 let slice = &hash[0..8];
61
62 Ok(u64::from_be_bytes(slice.try_into().map_err(|err| {
63 anyhow!("Failed to convert MD5 hash to u64: {}", err)
64 })?))
65 }
66 }
67 }
68}
69
70type VirtualId = String;
71type HashValue = u64;
72
73nest! {
74 /// Represents a physical node in the consistent hashing ring.
75 #[derive(Debug, Clone, PartialEq, Eq, Hash, Builder)]*
76 pub struct PhysicalNode {
77 /// A unique identifier for the physical node.
78 pub id: String,
79 /// The number of virtual nodes associated with this physical node.
80 pub num_vnodes: usize,
81 /// A map of virtual node IDs to their corresponding `VirtualNode` structs.
82 #[builder(default)]
83 pub vnodes: BTreeMap<VirtualId, pub struct VirtualNode {
84 /// The unique identifier for the virtual node.
85 pub id: String,
86 /// The hash value of the virtual node.
87 pub hash: u64,
88 /// A set of hash values associated with this virtual node.
89 #[builder(default)]
90 pub hashes: OrderSet<u64>,
91 }>,
92 /// A map of hash values to `Item` structs, representing the data stored on this node.
93 pub data: BTreeMap<
94 HashValue,
95 #[cfg_attr(test, derive(Serialize, Deserialize))]
96 pub struct Item {
97 /// The key of the item.
98 pub key: String,
99 /// The value of the item.
100 pub value: String,
101 }
102 >,
103 }
104}
105
106impl Item {
107 #[cfg(test)]
108 pub fn to_json(&self) -> serde_json::Value {
109 serde_json::to_value(self).unwrap()
110 }
111}
112
113impl PhysicalNode {
114 /// Initializes the virtual nodes for this physical node.
115 ///
116 /// This method generates virtual node IDs based on the physical node's ID and the number
117 /// of virtual nodes (`num_vnodes`). It computes a hash for each virtual node ID using
118 /// the provided hasher and creates a `VirtualNode` instance for each ID. The virtual
119 /// nodes are then added to the `vnodes` map of the physical node.
120 ///
121 /// # Arguments
122 /// - `hasher`: A reference to an object implementing the `RingHasherTrait` to compute hashes.
123 ///
124 /// # Returns
125 /// - `Result<()>`: Returns `Ok(())` if the virtual nodes are successfully initialized,
126 /// or an error if hashing fails.
127 pub fn init_vnodes<T: RingHasherTrait>(&mut self, hasher: &T) -> Result<()> {
128 let ids = (0..self.num_vnodes).map(|i| format!("{}-{}", self.id, i));
129
130 for id in ids {
131 let hash = hasher.digest(&id)?;
132 let vnode = VirtualNode::builder().id(id.clone()).hash(hash).build();
133 self.vnodes.insert(id, vnode);
134 }
135
136 Ok(())
137 }
138}
139
140/// Represents a consistent hashing ring.
141///
142/// This struct manages the consistent hashing ring, which is used to distribute data across multiple
143/// physical nodes. It supports features such as replication, virtual nodes, and dynamic addition/removal
144/// of nodes.
145///
146/// # Why `Rc<RefCell<>>` is used:
147/// - `Rc` (Reference Counted) is used to allow multiple ownership of `PhysicalNode` instances. This is
148/// necessary because multiple parts of the code may need to access the same physical node.
149/// - `RefCell` is used to enable interior mutability, allowing the `PhysicalNode` to be mutated even
150/// when it is wrapped in an `Rc`. This is important because there are cases that we need to borrow immutably and mutably in the same lifetime.
151#[derive(Debug, Builder)]
152#[builder(builder_type(doc {}))]
153pub struct ConsistentHashingRing<T: RingHasherTrait> {
154 /// The hashing function used to compute hash values for keys and nodes.
155 pub hasher: T,
156 /// The number of physical nodes each key is replicated to for fault tolerance.
157 pub replication_factor: usize,
158 /// A mapping of hash values to virtual node IDs.
159 pub hash_to_vid: BTreeMap<HashValue, String>,
160 /// A mapping of virtual node IDs to physical node IDs.
161 pub vid_to_pid: HashMap<String, String>,
162 /// A mapping of physical node IDs to their corresponding `PhysicalNode` instances.
163 pub physicals: HashMap<String, Rc<RefCell<PhysicalNode>>>,
164}
165
166nest! {
167 /// Represents the result of adding a physical node to the consistent hashing ring.
168 ///
169 /// This struct contains information about the virtual nodes created for the added physical node
170 /// and the number of keys reassigned to each virtual node.
171 #[derive(Debug, Default, Builder)]*
172 pub struct AddNodeResult {
173 /// A list of results for each virtual node created for the added physical node.
174 pub values: Vec<pub struct AddNodeResultValue {
175 /// The unique identifier of the virtual node.
176 pub id: String,
177 /// The hash value of the virtual node.
178 pub hash: u64,
179 /// The number of keys that were reassigned to this virtual node.
180 pub keys_added: usize,
181 }>,
182 }
183}
184
185nest! {
186 /// Represents the result of adding a key-value pair to the consistent hashing ring.
187 ///
188 /// This struct contains information about the key, its hash value, and the locations
189 /// (virtual nodes and their corresponding physical nodes) where the key was added.
190 #[derive(Debug, Builder)]*
191 pub struct AddKeyResult {
192 /// The key that was added to the ring.
193 pub key: String,
194 /// The hash value of the key.
195 pub hash: u64,
196 /// The locations where the key was added, represented as a list of virtual node and physical node pairs.
197 #[builder(default)]
198 pub locations: Vec<pub struct AddKeyResultLocation {
199 /// The unique identifier of the virtual node.
200 pub id: String,
201 /// The unique identifier of the physical node.
202 pub pid: String,
203 }>,
204 }
205}
206
207nest! {
208 /// Represents information about hash ranges in the consistent hashing ring.
209 ///
210 /// This struct contains details about the ranges of hash values managed by the ring,
211 /// including the number of keys and the items stored within each range.
212 #[derive(Debug, Builder)]*
213 #[cfg_attr(test, derive(Serialize, Deserialize))]*
214 pub struct RangeInfo {
215 /// A list of hash ranges managed by the ring.
216 pub ranges: Vec<pub struct Range {
217 /// The starting hash value of the range.
218 pub hash_start: u64,
219 /// The ending hash value of the range.
220 pub hash_end: u64,
221 /// The number of keys within the range.
222 pub count: usize,
223 /// The items stored within the range.
224 pub items: Vec<pub struct HashItem {
225 /// The hash value of the item.
226 pub hash: u64,
227 /// The actual item stored in the range.
228 pub inner: Item,
229 }>,
230 }>,
231 }
232}
233
234impl RangeInfo {
235 pub fn key_count(&self) -> usize {
236 self.ranges.iter().map(|r| r.count).sum()
237 }
238
239 #[cfg(test)]
240 pub fn to_json(&self) -> serde_json::Value {
241 serde_json::to_value(self).unwrap()
242 }
243
244 #[cfg(test)]
245 pub fn to_json_string_pretty(&self) -> String {
246 serde_json::to_string_pretty(self).unwrap()
247 }
248}
249
250nest! {
251 #[derive(Debug, Builder)]*
252 #[cfg_attr(test, derive(Serialize, Deserialize))]*
253 pub struct ItemsByVNode {
254 #[builder(default)]
255 pub values: Vec<pub struct ItemsByVNodeValue {
256 pub vid: String,
257 pub items: Vec<Item>,
258 }>,
259 }
260}
261
262impl ItemsByVNode {
263 #[cfg(test)]
264 pub fn to_json(&self) -> serde_json::Value {
265 serde_json::to_value(self).unwrap()
266 }
267
268 #[cfg(test)]
269 pub fn to_json_string_pretty(&self) -> String {
270 serde_json::to_string_pretty(self).unwrap()
271 }
272}
273
274impl<T: RingHasherTrait> ConsistentHashingRing<T> {
275 pub fn vid_to_physical(&self, vid: &str) -> Result<Rc<RefCell<PhysicalNode>>> {
276 let pid = self
277 .vid_to_pid
278 .get(vid)
279 .ok_or_else(|| anyhow!("Physical node not found, vid: {}", vid))?;
280
281 self.physicals
282 .get(pid)
283 .ok_or_else(|| anyhow!("Physical node not found, pid: {}", pid))
284 .cloned()
285 }
286
287 /// Adds a physical node to the consistent hashing ring.
288 ///
289 /// This method initializes the virtual nodes for the given physical node, calculates their hash values,
290 /// and inserts them into the ring. It also redistributes data from the previous virtual nodes to the new
291 /// virtual nodes based on their hash ranges.
292 ///
293 /// # Steps
294 /// 1. **Initialize Virtual Nodes**: Generate virtual node IDs for the physical node and compute their hash values.
295 /// 2. **Insert Physical Node**: Add the physical node to the `physicals` map, wrapped in `Rc<RefCell>` for shared ownership.
296 /// 3. **Redistribute Data**:
297 /// - For each virtual node, find the previous virtual node in the ring.
298 /// - Move data from the hash range `[prev_hash, curr_hash]` of the previous virtual node to the new virtual node.
299 /// 4. **Update Mappings**:
300 /// - Update `hash_to_vid` to map the hash of the new virtual node to its ID.
301 /// - Update `vid_to_pid` to map the virtual node ID to the physical node ID.
302 /// 5. **Track Results**: Record the number of keys reassigned to each virtual node in the `AddNodeResult`.
303 ///
304 /// # Arguments
305 /// - `node`: The physical node to be added to the ring.
306 ///
307 /// # Returns
308 /// - `Result<AddNodeResult>`: A result containing information about the virtual nodes created and the
309 /// number of keys reassigned to each virtual node, or an error if the operation fails.
310 pub fn add_physical_node(&mut self, mut node: PhysicalNode) -> Result<AddNodeResult> {
311 // Clone the physical node's ID for later use.
312 let pid = node.id.clone();
313
314 // Initialize virtual nodes for the physical node using the hasher.
315 node.init_vnodes(&self.hasher)?;
316
317 // Wrap the physical node in an Rc<RefCell> for shared ownership and mutability.
318 let pnode = Rc::new(RefCell::new(node));
319 self.physicals.insert(pid.clone(), pnode.clone());
320
321 // Prepare the result object to track the operation's outcome.
322 let mut result = AddNodeResult::default();
323
324 // Iterate over the virtual nodes of the physical node.
325 for vnode in pnode.borrow().vnodes.values() {
326 let hash = vnode.hash;
327
328 // Find the previous hash in the ring that is less than the current hash.
329 let prev_hash = self.hash_to_vid.range(..hash).last();
330 let mut keys_added = 0;
331
332 // If a previous virtual node exists, move data from its range to the new virtual node.
333 if let Some((prev_hash, prev_vid)) = prev_hash {
334 let prev_node = self.vid_to_physical(prev_vid)?;
335
336 // Collect keys within the range [prev_hash, hash].
337 let keys_to_move = prev_node
338 .borrow()
339 .data
340 .range(prev_hash..=&hash)
341 .map(|(k, _)| *k)
342 .collect::<Vec<_>>();
343 keys_added += keys_to_move.len();
344
345 // Move the keys and their associated values to the new virtual node.
346 for key in keys_to_move {
347 if let Some(value) = prev_node.borrow_mut().data.remove(&key) {
348 pnode.borrow_mut().data.insert(key, value);
349 }
350 }
351 }
352
353 // Update the hash-to-virtual-node mapping.
354 self.hash_to_vid.insert(hash, vnode.id.clone());
355
356 // Update the virtual-node-to-physical-node mapping.
357 self.vid_to_pid
358 .insert(vnode.id.clone(), pnode.borrow().id.clone());
359
360 // Add the virtual node's result to the operation's result object.
361 result.values.push(
362 AddNodeResultValue::builder()
363 .id(vnode.id.clone())
364 .hash(hash)
365 .keys_added(keys_added)
366 .build(),
367 );
368 }
369
370 // Return the result of the operation.
371 Ok(result)
372 }
373
374 /// Removes a physical node from the consistent hashing ring.
375 ///
376 /// This method removes all virtual nodes associated with the given physical node ID (`pid`).
377 /// It redistributes the data stored in the virtual nodes of the removed physical node to the
378 /// next virtual nodes in the ring that belong to different physical nodes.
379 ///
380 /// # Steps
381 /// 1. Check if the `physicals` map is empty. If so, clear all mappings and return early.
382 /// 2. Retrieve the virtual node IDs (`vids`) associated with the physical node.
383 /// 3. Remove each virtual node using the `remove_vnode` method.
384 /// 4. Remove the physical node from the `physicals` map.
385 ///
386 /// # Arguments
387 /// - `pid`: The unique identifier of the physical node to be removed.
388 ///
389 /// # Returns
390 /// - `Result<()>`: Returns `Ok(())` if the physical node is successfully removed, or an error
391 /// if the physical node does not exist or if any operation fails.
392 pub fn remove_physical_node(&mut self, pid: &str) -> Result<()> {
393 // If there are no physical nodes, clear all mappings and return early.
394 if self.physicals.is_empty() {
395 self.hash_to_vid.clear(); // Clear the hash-to-virtual-node mapping.
396 self.vid_to_pid.clear(); // Clear the virtual-node-to-physical-node mapping.
397 return Ok(());
398 }
399
400 // Retrieve the virtual node IDs (`vids`) associated with the physical node.
401 let vids = {
402 let pnode = self
403 .physicals
404 .get(pid)
405 .ok_or_else(|| anyhow!("Physical node not found, pid: {}", pid))? // Error if the physical node does not exist.
406 .clone();
407 let keys = pnode.borrow().vnodes.keys().cloned().collect::<Vec<_>>(); // Collect virtual node IDs.
408 keys
409 };
410
411 // Remove each virtual node using the `remove_vnode` method.
412 vids.iter().try_for_each(|vid| self.remove_vnode(vid))?;
413
414 // Remove the physical node from the `physicals` map.
415 self.physicals.remove(pid);
416
417 Ok(())
418 }
419
420 /// Inserts a key-value pair into the consistent hashing ring.
421 ///
422 /// This method computes the hash of the given key, determines the virtual nodes
423 /// where the key should be stored based on the replication factor, and inserts
424 /// the key-value pair into the appropriate physical nodes.
425 ///
426 /// # Steps
427 /// 1. Compute the hash of the key using the hasher.
428 /// 2. Determine the virtual nodes (`vids`) where the key should be stored.
429 /// 3. For each virtual node:
430 /// - Check if the physical node has already been used for replication.
431 /// - Insert the key-value pair into the virtual node and its corresponding physical node.
432 /// - Track the virtual node and physical node in the result.
433 /// 4. Stop once the replication factor is satisfied.
434 ///
435 /// # Arguments
436 /// - `key`: The key to be inserted into the ring.
437 /// - `value`: The value associated with the key.
438 ///
439 /// # Returns
440 /// - `Result<AddKeyResult>`: A result containing information about the virtual nodes
441 /// where the key was stored, or an error if the operation fails.
442 pub fn insert(&mut self, key: &str, value: &str) -> Result<AddKeyResult> {
443 // Compute the hash of the key.
444 let hash = self.hasher.digest(key)?;
445
446 // Prepare the result object to track the operation's outcome.
447 let mut result = AddKeyResult::builder()
448 .key(key.to_string())
449 .hash(hash)
450 .build();
451
452 // Track physical nodes to ensure no duplicate replication.
453 let mut pids = HashSet::new();
454
455 // Determine the virtual nodes (`vids`) where the key should be stored.
456 let vids: Vec<String> = self
457 .hash_to_vid
458 .range(hash..) // Start from the hash and wrap around the ring.
459 .chain(self.hash_to_vid.range(..)) // Handle wrap-around.
460 .map(|(_, vid)| vid.clone())
461 .collect();
462
463 // Iterate over the virtual nodes to insert the key-value pair.
464 for vid in vids {
465 // Get the physical node corresponding to the virtual node.
466 let pnode = self.vid_to_physical(&vid)?;
467 let pid = pnode.borrow().id.clone();
468
469 // Get a mutable reference to the physical node.
470 let mut pnode_refmut = pnode.borrow_mut();
471
472 // Get the virtual node from the physical node.
473 let vnode = pnode_refmut
474 .vnodes
475 .get_mut(&vid)
476 .ok_or_else(|| anyhow!("Virtual node not found, vid: {}", vid))?;
477
478 // Skip if this physical node has already been used for replication.
479 if pids.contains(&pid) {
480 continue;
481 }
482
483 // Insert the hash into the virtual node's hash set.
484 vnode.hashes.insert(hash);
485
486 // Insert the key-value pair into the physical node's data map.
487 pnode_refmut.data.insert(
488 hash,
489 Item::builder()
490 .key(key.to_string())
491 .value(value.to_string())
492 .build(),
493 );
494
495 // Track the physical node to avoid duplicate replication.
496 pids.insert(pid.clone());
497
498 // Add the virtual node and physical node to the result.
499 result.locations.push(
500 AddKeyResultLocation::builder()
501 .id(vid.clone())
502 .pid(pid.clone())
503 .build(),
504 );
505
506 // Stop once the replication factor is satisfied.
507 if pids.len() == self.replication_factor {
508 break;
509 }
510 }
511
512 // Return the result of the operation.
513 Ok(result)
514 }
515
516 /// Inserts multiple key-value pairs into the consistent hashing ring.
517 ///
518 /// This method iterates over a list of items, computes their hashes, and inserts
519 /// each key-value pair into the appropriate virtual nodes in the ring.
520 ///
521 /// # Steps
522 /// 1. Iterate over the list of items.
523 /// 2. For each item, call the `insert` method to add the key-value pair to the ring.
524 /// 3. Collect the results of each insertion into a vector.
525 ///
526 /// # Arguments
527 /// - `items`: A slice of `Item` structs, each containing a key and value to be inserted.
528 ///
529 /// # Returns
530 /// - `Result<Vec<AddKeyResult>>`: A vector of results for each inserted item, or an error
531 /// if any insertion fails.
532 pub fn insert_many(&mut self, items: &[Item]) -> Result<Vec<AddKeyResult>> {
533 // Initialize a vector to store the results of each insertion.
534 let mut results = Vec::new();
535
536 // Iterate over each item and insert it into the ring.
537 for item in items {
538 // Insert the key-value pair into the ring and collect the result.
539 let result = self.insert(&item.key, &item.value)?;
540 results.push(result);
541 }
542
543 // Return the results of all insertions.
544 Ok(results)
545 }
546
547 /// Retrieves an item from the consistent hashing ring by its key.
548 ///
549 /// This method computes the hash of the given key, determines the virtual nodes
550 /// where the key might be stored, and retrieves the associated value if it exists.
551 ///
552 /// # Steps
553 /// 1. Compute the hash of the key using the hasher.
554 /// 2. Iterate over the virtual nodes (`vids`) where the key might be stored.
555 /// 3. Check each physical node's data map for the key.
556 /// 4. Return the item if found, or an error if the key does not exist.
557 ///
558 /// # Arguments
559 /// - `key`: The key to retrieve from the ring.
560 ///
561 /// # Returns
562 /// - `Result<Item>`: The item associated with the key, or an error if the key is not found.
563 pub fn get_item(&self, key: &str) -> Result<Item> {
564 // Compute the hash of the key.
565 let hash = self.hasher.digest(key)?;
566
567 // Determine the virtual nodes (`vids`) where the key might be stored.
568 let vids: Vec<String> = self
569 .hash_to_vid
570 .range(hash..) // Start from the hash and wrap around the ring.
571 .chain(self.hash_to_vid.range(..)) // Handle wrap-around.
572 .map(|(_, vid)| vid.clone())
573 .collect();
574
575 // Iterate over the virtual nodes to find the key.
576 for vid in vids {
577 let pnode = self.vid_to_physical(&vid)?;
578 let pnode_ref = pnode.borrow();
579
580 // Check if the key exists in the physical node's data map.
581 if let Some(item) = pnode_ref.data.get(&hash) {
582 return Ok(item.clone());
583 }
584 }
585
586 // Return an error if the key is not found.
587 bail!("Item not found for key: {}", key)
588 }
589
590 /// Removes a key-value pair from the consistent hashing ring.
591 ///
592 /// This method computes the hash of the given key, iterates over the virtual nodes
593 /// in both clockwise and anti-clockwise directions, and removes the key-value pair
594 /// from the appropriate physical nodes. The removal stops once the replication factor
595 /// is satisfied.
596 ///
597 /// # Steps
598 /// 1. Compute the hash of the key using the hasher.
599 /// 2. Iterate over the virtual nodes in both clockwise and anti-clockwise directions.
600 /// 3. For each virtual node:
601 /// - Check if the key exists in the virtual node's hash set.
602 /// - Remove the key-value pair from the physical node's data map.
603 /// - Track the number of successful removals.
604 /// 4. Stop once the replication factor is satisfied.
605 /// 5. Return an error if the key is not found or if the replication factor is not met.
606 ///
607 /// # Arguments
608 /// - `key`: The key to be removed from the ring.
609 ///
610 /// # Returns
611 /// - `Result<()>`: Returns `Ok(())` if the key is successfully removed, or an error
612 /// if the key does not exist or if the replication factor is not met.
613 pub fn remove(&mut self, key: &str) -> Result<()> {
614 // Compute the hash of the key.
615 let hash = self.hasher.digest(key)?;
616 let mut remove_count = 0;
617
618 // Define iterators for clockwise and anti-clockwise traversal of the ring.
619 let ring_iters = [
620 self.hash_to_vid
621 .range(hash..) // Clockwise: Start from the hash and wrap around.
622 .chain(self.hash_to_vid.range(..)),
623 self.hash_to_vid
624 .range(..hash) // Anti-clockwise: Start before the hash and wrap around.
625 .chain(self.hash_to_vid.range(hash..)),
626 ];
627
628 // Iterate over both clockwise and anti-clockwise directions.
629 for it in ring_iters {
630 for (_, vid) in it {
631 // Retrieve the physical node associated with the virtual node.
632 let pnode = self.vid_to_physical(vid)?;
633 let mut pnode_refmut = pnode.borrow_mut();
634
635 // Retrieve the virtual node from the physical node.
636 let vnode = pnode_refmut
637 .vnodes
638 .get_mut(vid)
639 .ok_or_else(|| anyhow!("Virtual node not found, vid: {}", vid))?;
640
641 // Check if the key exists in the virtual node's hash set.
642 let removed = vnode.hashes.remove(&hash);
643
644 if removed {
645 // Remove the key-value pair from the physical node's data map.
646 if pnode_refmut.data.remove(&hash).is_some() {
647 remove_count += 1;
648 }
649
650 // Stop once the replication factor is satisfied.
651 if remove_count == self.replication_factor {
652 return Ok(());
653 }
654 } else {
655 // If a replica wasn't found, stop checking further in this direction.
656 break;
657 }
658 }
659 }
660
661 // If no replicas were removed, return an error indicating the key was not found.
662 if remove_count == 0 {
663 bail!("Item not found for key: {}", key);
664 }
665
666 // If the replication factor is not met, return an error.
667 bail!(
668 "Remove count {} is less than replication factor {}",
669 remove_count,
670 self.replication_factor
671 )
672 }
673
674 /// Retrieves the physical node IDs (`pids`) that contain a specific key.
675 ///
676 /// This method computes the hash of the given key, determines the virtual nodes
677 /// where the key might be stored, and collects the physical node IDs that store the key.
678 ///
679 /// # Steps
680 /// 1. Compute the hash of the key using the hasher.
681 /// 2. Iterate over the virtual nodes (`vids`) where the key might be stored.
682 /// 3. Check each physical node's data map for the key.
683 /// 4. Collect the physical node IDs that contain the key.
684 ///
685 /// # Arguments
686 /// - `key`: The key to locate in the ring.
687 ///
688 /// # Returns
689 /// - `Result<Vec<String>>`: A vector of physical node IDs that contain the key, or an error
690 /// if the operation fails.
691 pub fn get_pids_containing_key(&self, key: &str) -> Result<Vec<String>> {
692 // Compute the hash of the key.
693 let hash = self.hasher.digest(key)?;
694
695 // Determine the virtual nodes (`vids`) where the key might be stored.
696 let vids: Vec<String> = self
697 .hash_to_vid
698 .range(hash..) // Start from the hash and wrap around the ring.
699 .chain(self.hash_to_vid.range(..)) // Handle wrap-around.
700 .map(|(_, vid)| vid.clone())
701 .collect();
702
703 // Use a set to track unique physical node IDs.
704 let mut pids = HashSet::new();
705
706 // Iterate over the virtual nodes to find the key.
707 for vid in vids {
708 let pnode = self.vid_to_physical(&vid)?;
709 let pid = pnode.borrow().id.clone();
710
711 // Skip if the physical node has already been checked.
712 if pids.contains(&pid) {
713 continue;
714 }
715
716 // Check if the key exists in the physical node's data map.
717 if pnode.borrow().data.contains_key(&hash) {
718 pids.insert(pid);
719 }
720 }
721
722 // Return the collected physical node IDs as a vector.
723 Ok(pids.into_iter().collect())
724 }
725
726 /// Removes a virtual node from the consistent hashing ring.
727 ///
728 /// This method removes the specified virtual node (`vid`) from the ring. It redistributes
729 /// the data stored in the virtual node to the next virtual node in the ring that belongs
730 /// to a different physical node and does not already contain the same data (due to replication).
731 ///
732 /// # Steps
733 /// 1. Retrieve the physical node associated with the virtual node.
734 /// 2. Remove the virtual node from the physical node's `vnodes` map.
735 /// 3. Redistribute the data stored in the virtual node to the next virtual node in the ring.
736 /// 4. Update the `hash_to_vid` and `vid_to_pid` mappings to reflect the removal.
737 ///
738 /// # Arguments
739 /// - `vid`: The unique identifier of the virtual node to be removed.
740 ///
741 /// # Returns
742 /// - `Result<()>`: Returns `Ok(())` if the virtual node is successfully removed, or an error
743 /// if the virtual node does not exist or if any operation fails.
744 pub fn remove_vnode(&mut self, vid: &str) -> Result<()> {
745 // Retrieve the physical node associated with the virtual node.
746 let pnode = self.vid_to_physical(vid)?;
747 let pid = pnode.borrow().id.clone();
748
749 // Remove the virtual node from the physical node's `vnodes` map.
750 let vnode = {
751 let mut pnode_refmut = pnode.borrow_mut();
752 pnode_refmut
753 .vnodes
754 .remove(vid)
755 .ok_or_else(|| anyhow!("Virtual node not found, vid: {}", vid))?
756 };
757
758 // Redistribute the data stored in the virtual node.
759 for hash in vnode.hashes.iter() {
760 // Find the next virtual node in the ring that belongs to a different physical node
761 // and does not already contain the same hash (due to replication).
762 let (_, next_vid) = self
763 .hash_to_vid
764 .range(vnode.hash + 1..) // Search for the next hash in the ring.
765 .find(|(_, vid)| {
766 let next_pnode = self.vid_to_physical(vid).unwrap();
767 let next_pid = next_pnode.borrow().id.clone();
768
769 // Ensure the next virtual node belongs to a different physical node
770 // and does not already contain the same hash.
771 next_pid != pid && !next_pnode.borrow().data.contains_key(hash)
772 })
773 .ok_or_else(|| anyhow!("Next vnode not found for vid: {}", vid))?;
774
775 // Retrieve the next physical node and move the data to it.
776 let next_pnode = self.vid_to_physical(next_vid)?;
777 let mut next_pnode_refmut = next_pnode.borrow_mut();
778
779 if let Some(key) = pnode.borrow_mut().data.remove(hash) {
780 next_pnode_refmut.data.insert(*hash, key);
781 }
782
783 // Update the next virtual node's hash set to include the redistributed hashes.
784 next_pnode_refmut
785 .vnodes
786 .get_mut(next_vid)
787 .ok_or_else(|| anyhow!("Next vnode not found for vid: {}", next_vid))?
788 .hashes
789 .extend(vnode.hashes.iter());
790 }
791
792 // Remove the virtual node from the mappings.
793 self.hash_to_vid.remove(&vnode.hash);
794 self.vid_to_pid.remove(vid);
795
796 Ok(())
797 }
798
799 /// Adds a single virtual node to a physical node in the consistent hashing ring.
800 ///
801 /// This method creates a new virtual node for the specified physical node (`pid`),
802 /// computes its hash, and redistributes data from the next virtual node in the ring
803 /// to the newly added virtual node.
804 ///
805 /// # Steps
806 /// 1. Retrieve the physical node associated with the given `pid`.
807 /// 2. Generate a unique virtual node ID (`vid`) and compute its hash.
808 /// 3. Ensure the hash is unique and does not collide with existing virtual nodes.
809 /// 4. Redistribute data from the next virtual node in the ring to the new virtual node.
810 /// 5. Update the `hash_to_vid` and `vid_to_pid` mappings.
811 /// 6. Add the new virtual node to the physical node's `vnodes` map.
812 ///
813 /// # Arguments
814 /// - `pid`: The unique identifier of the physical node to which the virtual node will be added.
815 ///
816 /// # Returns
817 /// - `Result<()>`: Returns `Ok(())` if the virtual node is successfully added, or an error
818 /// if any operation fails.
819 pub fn add_one_vnode(&mut self, pid: &str) -> Result<()> {
820 // Retrieve the physical node associated with the given `pid`.
821 let pnode = self
822 .physicals
823 .get(pid)
824 .ok_or_else(|| anyhow!("Physical node not found, pid: {}", pid))?
825 .clone();
826 let pid = pnode.borrow().id.clone();
827
828 // Generate a unique virtual node ID (`vid`) and compute its hash.
829 let vid = format!("{}-{}", pid, pnode.borrow().num_vnodes);
830 let hash = self.hasher.digest(&vid)?;
831
832 // Ensure the hash is unique and does not collide with existing virtual nodes.
833 if self.hash_to_vid.contains_key(&hash) {
834 bail!("Virtual node with hash {} already exists", hash);
835 }
836
837 // Create the new virtual node.
838 let mut vnode = VirtualNode::builder().id(vid.clone()).hash(hash).build();
839
840 // Find the next virtual node in the ring.
841 let (next_hash, next_vid) = self
842 .hash_to_vid
843 .range(hash + 1..) // Search for the next hash in the ring.
844 .chain(self.hash_to_vid.range(..)) // Handle wrap-around.
845 .next()
846 .ok_or_else(|| anyhow!("Next vnode not found for vid: {}", vid))?;
847
848 // Retrieve the next physical node and its virtual node.
849 let next_pnode = self.vid_to_physical(next_vid)?;
850 let next_pid = next_pnode.borrow().id.clone();
851 let hashes_to_move = next_pnode
852 .borrow()
853 .vnodes
854 .get(next_vid)
855 .ok_or_else(|| anyhow!("Next vnode not found for vid: {}", next_vid))?
856 .hashes
857 .iter()
858 .filter(|candidate| {
859 if &hash < next_hash {
860 **candidate < hash
861 } else {
862 // Handle wrap-around.
863 **candidate < hash && *candidate > next_hash
864 }
865 })
866 .cloned()
867 .collect::<Vec<_>>();
868
869 // Add the hashes to the new virtual node.
870 vnode.hashes.extend(hashes_to_move.iter());
871
872 // Update the next virtual node's hash set to remove the redistributed hashes.
873 {
874 let mut next_pnode_refmut = next_pnode.borrow_mut();
875 let next_vnode = next_pnode_refmut
876 .vnodes
877 .get_mut(next_vid)
878 .ok_or_else(|| anyhow!("Next vnode not found for vid: {}", next_vid))?;
879 for hash in hashes_to_move.iter() {
880 next_vnode.hashes.remove(hash);
881 }
882 }
883
884 // Move data from the next virtual node to the new virtual node.
885 for hash in hashes_to_move.iter() {
886 if pid != next_pid {
887 if let Some(value) = next_pnode.borrow_mut().data.remove(hash) {
888 pnode.borrow_mut().data.insert(*hash, value);
889 }
890 }
891 }
892
893 // Update the `hash_to_vid` and `vid_to_pid` mappings.
894 self.hash_to_vid.insert(hash, vid.clone());
895 self.vid_to_pid.insert(vid.clone(), pid.clone());
896
897 // Add the new virtual node to the physical node's `vnodes` map.
898 pnode.borrow_mut().vnodes.insert(vid.clone(), vnode);
899 pnode.borrow_mut().num_vnodes += 1;
900
901 Ok(())
902 }
903
904 /// Adjusts the number of virtual nodes for a physical node in the consistent hashing ring.
905 ///
906 /// This method increases or decreases the number of virtual nodes for the specified physical node (`pid`).
907 /// If the new number of virtual nodes is zero, the physical node is removed from the ring.
908 ///
909 /// # Steps
910 /// 1. Retrieve the physical node associated with the given `pid`.
911 /// 2. If `new_num_vnodes` is zero, remove the physical node.
912 /// 3. If `new_num_vnodes` is less than the current number, remove the extra virtual nodes.
913 /// 4. If `new_num_vnodes` is greater than the current number, add new virtual nodes.
914 ///
915 /// # Arguments
916 /// - `pid`: The unique identifier of the physical node.
917 /// - `new_num_vnodes`: The desired number of virtual nodes for the physical node.
918 ///
919 /// # Returns
920 /// - `Result<()>`: Returns `Ok(())` if the operation is successful, or an error if any operation fails.
921 pub fn set_num_vnodes(&mut self, pid: &str, new_num_vnodes: usize) -> Result<()> {
922 // Retrieve the physical node associated with the given `pid`.
923 let pnode = self
924 .physicals
925 .get(pid)
926 .ok_or_else(|| anyhow!("Physical node not found, pid: {}", pid))?
927 .clone();
928 let curr_num_vnodes = pnode.borrow().num_vnodes;
929
930 // If the number of virtual nodes is unchanged, return early.
931 if new_num_vnodes == curr_num_vnodes {
932 return Ok(());
933 }
934
935 // If the new number of virtual nodes is zero, remove the physical node.
936 if new_num_vnodes == 0 {
937 self.remove_physical_node(pid)?;
938 return Ok(());
939 }
940
941 // If the new number of virtual nodes is less, remove the extra virtual nodes.
942 if new_num_vnodes < curr_num_vnodes {
943 for i in new_num_vnodes..curr_num_vnodes {
944 let vid = format!("{}-{}", pid, i);
945 self.remove_vnode(&vid)?; // Remove the virtual node.
946 }
947 } else {
948 // If the new number of virtual nodes is greater, add new virtual nodes.
949 for _ in curr_num_vnodes..new_num_vnodes {
950 self.add_one_vnode(pid)?; // Add a new virtual node.
951 }
952 }
953
954 Ok(())
955 }
956
957 /// Retrieves information about the hash ranges in the consistent hashing ring.
958 ///
959 /// This method computes the hash ranges managed by the ring, including the number of keys
960 /// and the items stored within each range. It provides a detailed view of how data is distributed
961 /// across the virtual nodes in the ring.
962 ///
963 /// # Steps
964 /// 1. Collect all hash values from the `hash_to_vid` mapping.
965 /// 2. Handle the case where there are no hashes (empty ring).
966 /// 3. Handle the case where there is only one hash (single virtual node).
967 /// 4. Iterate over the hash values to compute the ranges between consecutive hashes.
968 /// 5. For each range, collect the items stored in the corresponding virtual node.
969 ///
970 /// # Returns
971 /// - `Result<RangeInfo>`: A `RangeInfo` struct containing details about the hash ranges,
972 /// including the start and end hashes, the number of keys, and the items in each range.
973 pub fn range_info(&self) -> Result<RangeInfo> {
974 // Initialize an empty `RangeInfo` result.
975 let mut result = RangeInfo::builder().ranges(vec![]).build();
976
977 // Collect all hash values from the `hash_to_vid` mapping.
978 let hashes = self.hash_to_vid.keys().cloned().collect::<Vec<_>>();
979
980 // Handle the case where there are no hashes (empty ring).
981 if hashes.is_empty() {
982 return Ok(result);
983 }
984
985 // Handle the case where there is only one hash (single virtual node).
986 if hashes.len() == 1 {
987 let vid = self.hash_to_vid.get(&hashes[0]).unwrap();
988 let node = self.vid_to_physical(vid)?;
989 let items = node.borrow().data.values().cloned().collect::<Vec<_>>();
990
991 // Add the single range to the result.
992 result.ranges.push(
993 Range::builder()
994 .hash_start(hashes[0])
995 .hash_end(hashes[0])
996 .count(items.len())
997 .items(
998 items
999 .iter()
1000 .map(|item| {
1001 HashItem::builder()
1002 .hash(hashes[0])
1003 .inner(item.clone())
1004 .build()
1005 })
1006 .collect(),
1007 )
1008 .build(),
1009 );
1010
1011 return Ok(result);
1012 }
1013
1014 // Iterate over the hash values to compute the ranges between consecutive hashes.
1015 for i in 0..hashes.len() {
1016 let start_hash = hashes[i];
1017 let end_hash = hashes[(i + 1) % hashes.len()]; // Wrap around for the last range.
1018
1019 // Retrieve the virtual node corresponding to the end hash.
1020 let end_vid = self
1021 .hash_to_vid
1022 .get(&end_hash)
1023 .ok_or_else(|| anyhow!("Virtual ID not found for hash: {}", end_hash))?;
1024 let end_pnode = self.vid_to_physical(end_vid)?;
1025 let end_pnode_ref = end_pnode.borrow();
1026 let end_vnode = end_pnode_ref
1027 .vnodes
1028 .get(end_vid)
1029 .ok_or_else(|| anyhow!("Virtual node not found for vid: {}", end_vid))?;
1030
1031 // Collect the items stored in the virtual node for the current range.
1032 let items = end_vnode
1033 .hashes
1034 .iter()
1035 .map(|hash| {
1036 let value = end_pnode_ref
1037 .data
1038 .get(hash)
1039 .ok_or_else(|| anyhow!("Key not found for hash: {}", hash))?
1040 .clone();
1041
1042 Ok(HashItem::builder().hash(*hash).inner(value).build())
1043 })
1044 .collect::<Result<Vec<_>>>()?;
1045
1046 // Add the range to the result.
1047 result.ranges.push(
1048 Range::builder()
1049 .hash_start(start_hash)
1050 .hash_end(end_hash)
1051 .count(end_vnode.hashes.len())
1052 .items(items)
1053 .build(),
1054 );
1055 }
1056
1057 // Return the computed `RangeInfo`.
1058 Ok(result)
1059 }
1060
1061 /// Retrieves the total number of keys stored in the consistent hashing ring.
1062 ///
1063 /// This method iterates over all physical nodes in the ring and sums up the number of keys
1064 /// stored in their respective data maps.
1065 ///
1066 /// # Returns
1067 /// - `Result<usize>`: The total number of keys in the ring, or an error if any operation fails.
1068 pub fn key_count(&self) -> Result<usize> {
1069 // Sum up the number of keys in the data maps of all physical nodes.
1070 Ok(self
1071 .physicals
1072 .values()
1073 .map(|node| node.borrow().data.len()) // Get the number of keys in each physical node.
1074 .sum()) // Sum up the counts.
1075 }
1076
1077 /// Retrieves the items stored in each virtual node of the consistent hashing ring.
1078 ///
1079 /// This method iterates over all virtual nodes in the ring and collects the items stored
1080 /// in each virtual node. It provides a detailed view of how data is distributed across
1081 /// the virtual nodes.
1082 ///
1083 /// # Returns
1084 /// - `Result<ItemsByVNode>`: An `ItemsByVNode` struct containing the items stored in each virtual node,
1085 /// or an error if any operation fails.
1086 pub fn items_by_vnode(&self) -> Result<ItemsByVNode> {
1087 // Initialize an empty `ItemsByVNode` result.
1088 let mut result = ItemsByVNode::builder().build();
1089
1090 // Handle the case where the ring is empty.
1091 if self.hash_to_vid.is_empty() {
1092 return Ok(result);
1093 }
1094
1095 // Handle the case where there is only one virtual node.
1096 if self.hash_to_vid.len() == 1 {
1097 let (_, vid) = self.hash_to_vid.iter().next().unwrap(); // Get the single virtual node ID.
1098 let node = self.vid_to_physical(vid)?; // Retrieve the physical node associated with the virtual node.
1099 let items = node.borrow().data.values().cloned().collect::<Vec<_>>(); // Collect all items in the physical node.
1100
1101 // Add the items to the result.
1102 result.values.push(
1103 ItemsByVNodeValue::builder()
1104 .vid(vid.clone())
1105 .items(items)
1106 .build(),
1107 );
1108
1109 return Ok(result);
1110 }
1111
1112 // Iterate over all virtual nodes in the ring.
1113 for (_, vid) in self.hash_to_vid.iter() {
1114 let pnode = self.vid_to_physical(vid)?; // Retrieve the physical node associated with the virtual node.
1115 let pnode_ref = pnode.borrow();
1116 let vnode = pnode_ref.vnodes.get(vid).unwrap(); // Get the virtual node.
1117
1118 // Collect the items stored in the virtual node.
1119 let items = vnode
1120 .hashes
1121 .iter()
1122 .map(|hash| {
1123 pnode_ref
1124 .data
1125 .get(hash) // Retrieve the item associated with the hash.
1126 .cloned()
1127 .ok_or_else(|| anyhow!("Key not found for hash: {}", hash))
1128 })
1129 .collect::<Result<Vec<_>>>()?;
1130
1131 // Add the items to the result.
1132 result.values.push(
1133 ItemsByVNodeValue::builder()
1134 .vid(vid.clone())
1135 .items(items)
1136 .build(),
1137 );
1138 }
1139
1140 // Return the result containing items grouped by virtual nodes.
1141 Ok(result)
1142 }
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147 use super::*;
1148 use serde_json::json;
1149 use sugars::{btmap, hmap};
1150
1151 struct TestHasher;
1152 impl RingHasherTrait for TestHasher {
1153 fn digest(&self, data: &str) -> Result<u64> {
1154 // 5 | key1
1155 // 6 | key2
1156 // 10 | node2-0
1157 // 15 | key3
1158 // 20 | node3-0
1159 // 25 | key4
1160 // 30 | node1-0
1161 // 40 | node3-1
1162 // 50 | node2-1
1163 // 60 | node3-2
1164 // 65 | key5
1165 // 70 | node1-1
1166 match data {
1167 "node1-0" => Ok(30),
1168 "node1-1" => Ok(70),
1169 "node1-2" => Ok(26),
1170 "node2-0" => Ok(10),
1171 "node2-1" => Ok(50),
1172 "node3-0" => Ok(20),
1173 "node3-1" => Ok(40),
1174 "node3-2" => Ok(60),
1175 "key1" => Ok(5),
1176 "key2" => Ok(6),
1177 "key3" => Ok(15),
1178 "key4" => Ok(25),
1179 "key5" => Ok(65),
1180 _ => {
1181 bail!("Unimplemented test hasher for data: {}", data)
1182 }
1183 }
1184 }
1185 }
1186
1187 fn ring() -> ConsistentHashingRing<TestHasher> {
1188 ConsistentHashingRing::builder()
1189 .hasher(TestHasher)
1190 .replication_factor(2)
1191 .hash_to_vid(BTreeMap::new())
1192 .vid_to_pid(HashMap::new())
1193 .physicals(HashMap::new())
1194 .build()
1195 }
1196
1197 fn nodes() -> Vec<PhysicalNode> {
1198 vec![
1199 PhysicalNode::builder()
1200 .id("node1".to_string())
1201 .num_vnodes(1)
1202 .data(BTreeMap::new())
1203 .build(),
1204 PhysicalNode::builder()
1205 .id("node2".to_string())
1206 .num_vnodes(2)
1207 .data(BTreeMap::new())
1208 .build(),
1209 PhysicalNode::builder()
1210 .id("node3".to_string())
1211 .num_vnodes(3)
1212 .data(BTreeMap::new())
1213 .build(),
1214 ]
1215 }
1216
1217 fn items() -> Vec<Item> {
1218 serde_json::from_value(json!([
1219 { "key": "key1", "value": "value1" },
1220 { "key": "key2", "value": "value2" },
1221 { "key": "key3", "value": "value3" },
1222 { "key": "key4", "value": "value4" },
1223 { "key": "key5", "value": "value5" }
1224 ]))
1225 .unwrap()
1226 }
1227
1228 #[test]
1229 fn should_successfully_add_physical_nodes() {
1230 let mut ring = ring();
1231 let nodes = nodes();
1232
1233 for node in nodes {
1234 ring.add_physical_node(node).unwrap();
1235 }
1236
1237 assert_eq!(ring.physicals.len(), 3);
1238 assert_eq!(ring.hash_to_vid.len(), 6);
1239 assert_eq!(ring.vid_to_pid.len(), 6);
1240
1241 let expected_vnode_counts = vec![("node1", 1), ("node2", 2), ("node3", 3)];
1242 for (node_id, vnode_count) in expected_vnode_counts {
1243 let node = ring.physicals.get(node_id).unwrap().borrow();
1244 assert_eq!(node.vnodes.len(), vnode_count);
1245 assert_eq!(node.data.len(), 0);
1246
1247 for (i, (_, vnode)) in node.vnodes.iter().enumerate() {
1248 assert_eq!(vnode.id, format!("{}-{}", node_id, i));
1249 }
1250 }
1251
1252 assert_eq!(
1253 ring.hash_to_vid,
1254 btmap! {
1255 30 => "node1-0".to_string(),
1256 10 => "node2-0".to_string(),
1257 50 => "node2-1".to_string(),
1258 20 => "node3-0".to_string(),
1259 40 => "node3-1".to_string(),
1260 60 => "node3-2".to_string(),
1261 }
1262 );
1263
1264 assert_eq!(
1265 ring.vid_to_pid,
1266 hmap! {
1267 "node1-0".to_string() => "node1".to_string(),
1268 "node2-0".to_string() => "node2".to_string(),
1269 "node2-1".to_string() => "node2".to_string(),
1270 "node3-0".to_string() => "node3".to_string(),
1271 "node3-1".to_string() => "node3".to_string(),
1272 "node3-2".to_string() => "node3".to_string(),
1273 }
1274 );
1275 }
1276
1277 #[test]
1278 fn should_successfully_add_keys() {
1279 let mut ring = ring();
1280 let nodes = nodes();
1281
1282 for node in nodes {
1283 ring.add_physical_node(node).unwrap();
1284 }
1285
1286 let items = items();
1287 ring.insert_many(&items).unwrap();
1288
1289 assert!(ring.key_count().unwrap() == 10);
1290 let items_by_vnode = ring.items_by_vnode().unwrap();
1291
1292 assert_eq!(
1293 items_by_vnode.to_json(),
1294 json!({
1295 "values": [
1296 {
1297 "vid": "node2-0",
1298 "items": [
1299 { "key": "key1", "value": "value1" },
1300 { "key": "key2", "value": "value2" },
1301 { "key": "key5", "value": "value5" }
1302 ]
1303 },
1304 {
1305 "vid": "node3-0",
1306 "items": [
1307 { "key": "key1", "value": "value1" },
1308 { "key": "key2", "value": "value2" },
1309 { "key": "key3", "value": "value3" },
1310 { "key": "key5", "value": "value5" }
1311 ]
1312 },
1313 {
1314 "vid": "node1-0",
1315 "items": [
1316 { "key": "key3", "value": "value3" },
1317 { "key": "key4", "value": "value4" }
1318 ]
1319 },
1320 {
1321 "vid": "node3-1",
1322 "items": [
1323 { "key": "key4", "value": "value4" }
1324 ]
1325 },
1326 {
1327 "vid": "node2-1",
1328 "items": []
1329 },
1330 {
1331 "vid": "node3-2",
1332 "items": []
1333 }
1334 ]
1335 })
1336 );
1337 }
1338
1339 #[test]
1340 fn should_successfully_remove_key() {
1341 let mut ring = ring();
1342 let nodes = nodes();
1343
1344 for node in nodes {
1345 ring.add_physical_node(node).unwrap();
1346 }
1347
1348 let items = items();
1349 ring.insert_many(&items).unwrap();
1350
1351 ring.remove("key1").unwrap();
1352 assert_eq!(ring.key_count().unwrap(), 8);
1353
1354 let range_info = ring.range_info().unwrap();
1355 assert_eq!(range_info.key_count(), 8);
1356 assert_eq!(
1357 range_info.to_json(),
1358 json!({
1359 "ranges": [
1360 {
1361 "hash_start": 10,
1362 "hash_end": 20,
1363 "count": 3,
1364 "items": [
1365 {
1366 "hash": 6,
1367 "inner": {
1368 "key": "key2",
1369 "value": "value2"
1370 }
1371 },
1372 {
1373 "hash": 15,
1374 "inner": {
1375 "key": "key3",
1376 "value": "value3"
1377 }
1378 },
1379 {
1380 "hash": 65,
1381 "inner": {
1382 "key": "key5",
1383 "value": "value5"
1384 }
1385 }
1386 ]
1387 },
1388 {
1389 "hash_start": 20,
1390 "hash_end": 30,
1391 "count": 2,
1392 "items": [
1393 {
1394 "hash": 15,
1395 "inner": {
1396 "key": "key3",
1397 "value": "value3"
1398 }
1399 },
1400 {
1401 "hash": 25,
1402 "inner": {
1403 "key": "key4",
1404 "value": "value4"
1405 }
1406 }
1407 ]
1408 },
1409 {
1410 "hash_start": 30,
1411 "hash_end": 40,
1412 "count": 1,
1413 "items": [
1414 {
1415 "hash": 25,
1416 "inner": {
1417 "key": "key4",
1418 "value": "value4"
1419 }
1420 }
1421 ]
1422 },
1423 {
1424 "hash_start": 40,
1425 "hash_end": 50,
1426 "count": 0,
1427 "items": []
1428 },
1429 {
1430 "hash_start": 50,
1431 "hash_end": 60,
1432 "count": 0,
1433 "items": []
1434 },
1435 {
1436 "hash_start": 60,
1437 "hash_end": 10,
1438 "count": 2,
1439 "items": [
1440 {
1441 "hash": 6,
1442 "inner": {
1443 "key": "key2",
1444 "value": "value2"
1445 }
1446 },
1447 {
1448 "hash": 65,
1449 "inner": {
1450 "key": "key5",
1451 "value": "value5"
1452 }
1453 }
1454 ]
1455 }
1456 ]
1457 }
1458 )
1459 )
1460 }
1461
1462 #[test]
1463 fn should_successfully_remove_vnode() {
1464 let mut ring = ring();
1465 let nodes = nodes();
1466
1467 for node in nodes {
1468 ring.add_physical_node(node).unwrap();
1469 }
1470
1471 let items = items();
1472 ring.insert_many(&items).unwrap();
1473
1474 ring.remove_vnode("node2-0").unwrap();
1475 assert_eq!(ring.physicals.len(), 3);
1476 assert_eq!(ring.key_count().unwrap(), 10);
1477
1478 let range_info = ring.range_info().unwrap();
1479 assert_eq!(range_info.key_count(), 10);
1480
1481 assert_eq!(
1482 range_info.to_json(),
1483 json!({
1484 "ranges": [
1485 {
1486 "hash_start": 20,
1487 "hash_end": 30,
1488 "count": 5,
1489 "items": [
1490 { "hash": 15, "inner": { "key": "key3", "value": "value3" } },
1491 { "hash": 25, "inner": { "key": "key4", "value": "value4" } },
1492 { "hash": 5, "inner": { "key": "key1", "value": "value1" } },
1493 { "hash": 6, "inner": { "key": "key2", "value": "value2" } },
1494 { "hash": 65, "inner": { "key": "key5", "value": "value5" } }
1495 ]
1496 },
1497 {
1498 "hash_start": 30,
1499 "hash_end": 40,
1500 "count": 1,
1501 "items": [
1502 { "hash": 25, "inner": { "key": "key4", "value": "value4" } }
1503 ]
1504 },
1505 {
1506 "hash_start": 40,
1507 "hash_end": 50,
1508 "count": 0,
1509 "items": []
1510 },
1511 {
1512 "hash_start": 50,
1513 "hash_end": 60,
1514 "count": 0,
1515 "items": []
1516 },
1517 {
1518 "hash_start": 60,
1519 "hash_end": 20,
1520 "count": 4,
1521 "items": [
1522 { "hash": 5, "inner": { "key": "key1", "value": "value1" } },
1523 { "hash": 6, "inner": { "key": "key2", "value": "value2" } },
1524 { "hash": 15, "inner": { "key": "key3", "value": "value3" } },
1525 { "hash": 65, "inner": { "key": "key5", "value": "value5" } }
1526 ]
1527 }
1528 ]
1529 })
1530 )
1531 }
1532
1533 #[test]
1534 fn should_successfully_remove_vnode_of_physical_node_with_one_vnode() {
1535 let mut ring = ring();
1536 let nodes = nodes();
1537
1538 for node in nodes {
1539 ring.add_physical_node(node).unwrap();
1540 }
1541
1542 let items = items();
1543 ring.insert_many(&items).unwrap();
1544 ring.remove_vnode("node1-0").unwrap();
1545
1546 assert_eq!(ring.physicals.len(), 3);
1547 assert_eq!(ring.key_count().unwrap(), 10);
1548
1549 let count = ring
1550 .physicals
1551 .get("node1")
1552 .unwrap()
1553 .clone()
1554 .borrow()
1555 .vnodes
1556 .len();
1557 assert_eq!(count, 0)
1558 }
1559
1560 #[test]
1561 fn should_successfully_remove_physical_node_with_one_vnode() {
1562 let mut ring = ring();
1563 let nodes = nodes();
1564
1565 for node in nodes {
1566 ring.add_physical_node(node).unwrap();
1567 }
1568
1569 let items = items();
1570 ring.insert_many(&items).unwrap();
1571
1572 ring.remove_physical_node("node1").unwrap();
1573 assert_eq!(ring.physicals.len(), 2);
1574
1575 let range_info = ring.range_info().unwrap();
1576 assert_eq!(ring.key_count().unwrap(), 10);
1577 assert_eq!(range_info.key_count(), 10);
1578
1579 assert_eq!(
1580 range_info.to_json(),
1581 json!({
1582 "ranges": [
1583 {
1584 "hash_start": 10,
1585 "hash_end": 20,
1586 "count": 4,
1587 "items": [
1588 { "hash": 5, "inner": { "key": "key1", "value": "value1" } },
1589 { "hash": 6, "inner": { "key": "key2", "value": "value2" } },
1590 { "hash": 15, "inner": { "key": "key3", "value": "value3" } },
1591 { "hash": 65, "inner": { "key": "key5", "value": "value5" } }
1592 ]
1593 },
1594 {
1595 "hash_start": 20,
1596 "hash_end": 40,
1597 "count": 1,
1598 "items": [
1599 { "hash": 25, "inner": { "key": "key4", "value": "value4" } }
1600 ]
1601 },
1602 {
1603 "hash_start": 40,
1604 "hash_end": 50,
1605 "count": 2,
1606 "items": [
1607 { "hash": 15, "inner": { "key": "key3", "value": "value3" } },
1608 { "hash": 25, "inner": { "key": "key4", "value": "value4" } }
1609 ]
1610 },
1611 {
1612 "hash_start": 50,
1613 "hash_end": 60,
1614 "count": 0,
1615 "items": []
1616 },
1617 {
1618 "hash_start": 60,
1619 "hash_end": 10,
1620 "count": 3,
1621 "items": [
1622 { "hash": 5, "inner": { "key": "key1", "value": "value1" } },
1623 { "hash": 6, "inner": { "key": "key2", "value": "value2" } },
1624 { "hash": 65, "inner": { "key": "key5", "value": "value5" } }
1625 ]
1626 }
1627 ]
1628 })
1629 );
1630 }
1631
1632 #[test]
1633 fn should_successfully_remove_physical_node_with_multiple_vnodes() {
1634 let mut ring = ring();
1635 let nodes = nodes();
1636
1637 for node in nodes {
1638 ring.add_physical_node(node).unwrap();
1639 }
1640
1641 let items = items();
1642 ring.insert_many(&items).unwrap();
1643
1644 ring.remove_physical_node("node2").unwrap();
1645 assert_eq!(ring.physicals.len(), 2);
1646
1647 assert_eq!(ring.key_count().unwrap(), 10);
1648
1649 let range_info = ring.range_info().unwrap();
1650 assert_eq!(range_info.key_count(), 10);
1651
1652 assert_eq!(
1653 range_info.to_json(),
1654 json!({
1655 "ranges": [
1656 {
1657 "hash_start": 20,
1658 "hash_end": 30,
1659 "count": 5,
1660 "items": [
1661 { "hash": 15, "inner": { "key": "key3", "value": "value3" } },
1662 { "hash": 25, "inner": { "key": "key4", "value": "value4" } },
1663 { "hash": 5, "inner": { "key": "key1", "value": "value1" } },
1664 { "hash": 6, "inner": { "key": "key2", "value": "value2" } },
1665 { "hash": 65, "inner": { "key": "key5", "value": "value5" } }
1666 ]
1667 },
1668 {
1669 "hash_start": 30,
1670 "hash_end": 40,
1671 "count": 1,
1672 "items": [
1673 { "hash": 25, "inner": { "key": "key4", "value": "value4" } }
1674 ]
1675 },
1676 {
1677 "hash_start": 40,
1678 "hash_end": 60,
1679 "count": 0,
1680 "items": []
1681 },
1682 {
1683 "hash_start": 60,
1684 "hash_end": 20,
1685 "count": 4,
1686 "items": [
1687 { "hash": 5, "inner": { "key": "key1", "value": "value1" } },
1688 { "hash": 6, "inner": { "key": "key2", "value": "value2" } },
1689 { "hash": 15, "inner": { "key": "key3", "value": "value3" } },
1690 { "hash": 65, "inner": { "key": "key5", "value": "value5" } }
1691 ]
1692 }
1693 ]
1694 })
1695 );
1696 }
1697
1698 #[test]
1699 fn should_successfully_add_one_vnode() {
1700 let mut ring = ring();
1701 let nodes = nodes();
1702
1703 for node in nodes {
1704 ring.add_physical_node(node).unwrap();
1705 }
1706
1707 let items = items();
1708 ring.insert_many(&items).unwrap();
1709
1710 ring.add_one_vnode("node1").unwrap();
1711 assert_eq!(ring.physicals.len(), 3);
1712 assert_eq!(ring.key_count().unwrap(), 10);
1713
1714 let range_info = ring.range_info().unwrap();
1715 assert_eq!(range_info.key_count(), 10);
1716
1717 assert_eq!(
1718 range_info.to_json(),
1719 json!({
1720 "ranges": [
1721 {
1722 "hash_start": 10,
1723 "hash_end": 20,
1724 "count": 4,
1725 "items": [
1726 { "hash": 5, "inner": { "key": "key1", "value": "value1" } },
1727 { "hash": 6, "inner": { "key": "key2", "value": "value2" } },
1728 { "hash": 15, "inner": { "key": "key3", "value": "value3" } },
1729 { "hash": 65, "inner": { "key": "key5", "value": "value5" } }
1730 ]
1731 },
1732 {
1733 "hash_start": 20,
1734 "hash_end": 30,
1735 "count": 2,
1736 "items": [
1737 { "hash": 15, "inner": { "key": "key3", "value": "value3" } },
1738 { "hash": 25, "inner": { "key": "key4", "value": "value4" } }
1739 ]
1740 },
1741 {
1742 "hash_start": 30,
1743 "hash_end": 40,
1744 "count": 1,
1745 "items": [
1746 { "hash": 25, "inner": { "key": "key4", "value": "value4" } }
1747 ]
1748 },
1749 {
1750 "hash_start": 40,
1751 "hash_end": 50,
1752 "count": 0,
1753 "items": []
1754 },
1755 {
1756 "hash_start": 50,
1757 "hash_end": 60,
1758 "count": 0,
1759 "items": []
1760 },
1761 {
1762 "hash_start": 60,
1763 "hash_end": 70,
1764 "count": 1,
1765 "items": [
1766 { "hash": 65, "inner": { "key": "key5", "value": "value5" } }
1767 ]
1768 },
1769 {
1770 "hash_start": 70,
1771 "hash_end": 10,
1772 "count": 2,
1773 "items": [
1774 { "hash": 5, "inner": { "key": "key1", "value": "value1" } },
1775 { "hash": 6, "inner": { "key": "key2", "value": "value2" } }
1776 ]
1777 }
1778 ]
1779 })
1780 )
1781 }
1782
1783 #[test]
1784 fn should_successfully_decrease_num_vnodes_to_zero() {
1785 let mut ring = ring();
1786 let nodes = nodes();
1787
1788 for node in nodes {
1789 ring.add_physical_node(node).unwrap();
1790 }
1791
1792 let items = items();
1793 ring.insert_many(&items).unwrap();
1794
1795 ring.set_num_vnodes("node1", 0).unwrap();
1796 assert_eq!(ring.physicals.len(), 2);
1797 assert_eq!(ring.key_count().unwrap(), 10);
1798
1799 let range_info = ring.range_info().unwrap();
1800 assert_eq!(range_info.key_count(), 10);
1801 }
1802
1803 #[test]
1804 fn should_successfully_descrease_num_vnodes() {
1805 let mut ring = ring();
1806 let nodes = nodes();
1807
1808 for node in nodes {
1809 ring.add_physical_node(node).unwrap();
1810 }
1811
1812 let items = items();
1813 ring.insert_many(&items).unwrap();
1814
1815 ring.set_num_vnodes("node3", 1).unwrap();
1816 assert_eq!(ring.physicals.len(), 3);
1817 assert_eq!(ring.key_count().unwrap(), 10);
1818
1819 let range_info = ring.range_info().unwrap();
1820 assert_eq!(range_info.key_count(), 10);
1821
1822 assert_eq!(
1823 range_info.to_json(),
1824 json!({
1825 "ranges": [
1826 {
1827 "hash_start": 10,
1828 "hash_end": 20,
1829 "count": 4,
1830 "items": [
1831 { "hash": 5, "inner": { "key": "key1", "value": "value1" } },
1832 { "hash": 6, "inner": { "key": "key2", "value": "value2" } },
1833 { "hash": 15, "inner": { "key": "key3", "value": "value3" } },
1834 { "hash": 65, "inner": { "key": "key5", "value": "value5" } }
1835 ]
1836 },
1837 {
1838 "hash_start": 20,
1839 "hash_end": 30,
1840 "count": 2,
1841 "items": [
1842 { "hash": 15, "inner": { "key": "key3", "value": "value3" } },
1843 { "hash": 25, "inner": { "key": "key4", "value": "value4" } }
1844 ]
1845 },
1846 {
1847 "hash_start": 30,
1848 "hash_end": 50,
1849 "count": 1,
1850 "items": [
1851 { "hash": 25, "inner": { "key": "key4", "value": "value4" } }
1852 ]
1853 },
1854 {
1855 "hash_start": 50,
1856 "hash_end": 10,
1857 "count": 3,
1858 "items": [
1859 { "hash": 5, "inner": { "key": "key1", "value": "value1" } },
1860 { "hash": 6, "inner": { "key": "key2", "value": "value2" } },
1861 { "hash": 65, "inner": { "key": "key5", "value": "value5" } }
1862 ]
1863 }
1864 ]
1865 })
1866 )
1867 }
1868
1869 #[test]
1870 fn should_successfully_increase_num_vnodes() {
1871 let mut ring = ring();
1872 let nodes = nodes();
1873
1874 for node in nodes {
1875 ring.add_physical_node(node).unwrap();
1876 }
1877
1878 let items = items();
1879 ring.insert_many(&items).unwrap();
1880
1881 ring.set_num_vnodes("node1", 3).unwrap();
1882 assert_eq!(ring.physicals.len(), 3);
1883 assert_eq!(ring.key_count().unwrap(), 10);
1884
1885 let range_info = ring.range_info().unwrap();
1886 assert_eq!(range_info.key_count(), 10);
1887
1888 assert_eq!(
1889 range_info.to_json(),
1890 json!({
1891 "ranges": [
1892 {
1893 "hash_start": 10,
1894 "hash_end": 20,
1895 "count": 4,
1896 "items": [
1897 { "hash": 5, "inner": { "key": "key1", "value": "value1" } },
1898 { "hash": 6, "inner": { "key": "key2", "value": "value2" } },
1899 { "hash": 15, "inner": { "key": "key3", "value": "value3" } },
1900 { "hash": 65, "inner": { "key": "key5", "value": "value5" } }
1901 ]
1902 },
1903 {
1904 "hash_start": 20,
1905 "hash_end": 26,
1906 "count": 2,
1907 "items": [
1908 { "hash": 15, "inner": { "key": "key3", "value": "value3" } },
1909 { "hash": 25, "inner": { "key": "key4", "value": "value4" } }
1910 ]
1911 },
1912 {
1913 "hash_start": 26,
1914 "hash_end": 30,
1915 "count": 0,
1916 "items": []
1917 },
1918 {
1919 "hash_start": 30,
1920 "hash_end": 40,
1921 "count": 1,
1922 "items": [
1923 { "hash": 25, "inner": { "key": "key4", "value": "value4" } }
1924 ]
1925 },
1926 {
1927 "hash_start": 40,
1928 "hash_end": 50,
1929 "count": 0,
1930 "items": []
1931 },
1932 {
1933 "hash_start": 50,
1934 "hash_end": 60,
1935 "count": 0,
1936 "items": []
1937 },
1938 {
1939 "hash_start": 60,
1940 "hash_end": 70,
1941 "count": 1,
1942 "items": [
1943 { "hash": 65, "inner": { "key": "key5", "value": "value5" } }
1944 ]
1945 },
1946 {
1947 "hash_start": 70,
1948 "hash_end": 10,
1949 "count": 2,
1950 "items": [
1951 { "hash": 5, "inner": { "key": "key1", "value": "value1" } },
1952 { "hash": 6, "inner": { "key": "key2", "value": "value2" } }
1953 ]
1954 }
1955 ]
1956 })
1957 )
1958 }
1959
1960 #[test]
1961 fn items_should_exist() {
1962 let mut ring = ring();
1963 let nodes = nodes();
1964
1965 for node in nodes {
1966 ring.add_physical_node(node).unwrap();
1967 }
1968
1969 let items = items();
1970 ring.insert_many(&items).unwrap();
1971
1972 assert_eq!(
1973 ring.get_item("key1").unwrap().to_json(),
1974 json!({ "key": "key1", "value": "value1" })
1975 );
1976
1977 assert_eq!(
1978 ring.get_item("key2").unwrap().to_json(),
1979 json!({ "key": "key2", "value": "value2" })
1980 );
1981 }
1982
1983 #[test]
1984 fn items_should_not_exist() {
1985 let mut ring = ring();
1986 let nodes = nodes();
1987
1988 for node in nodes {
1989 ring.add_physical_node(node).unwrap();
1990 }
1991
1992 let items = items();
1993 ring.insert_many(&items).unwrap();
1994
1995 assert!(ring.get_item("key100").is_err());
1996 }
1997
1998 #[test]
1999 fn should_return_pids_containing_key() {
2000 let mut ring = ring();
2001 let nodes = nodes();
2002
2003 for node in nodes {
2004 ring.add_physical_node(node).unwrap();
2005 }
2006
2007 let items = items();
2008 ring.insert_many(&items).unwrap();
2009
2010 let mut actual = ring.get_pids_containing_key("key1").unwrap();
2011 actual.sort();
2012 assert_eq!(actual, vec!["node2".to_string(), "node3".to_string()]);
2013 }
2014}