conhash_ring/
lib.rs

1#![doc = include_str!("../README.md")]
2
3#[macro_use]
4extern crate nestify;
5
6use ordermap::OrderSet;
7#[cfg(test)]
8use serde::{Deserialize, Serialize};
9
10use anyhow::{anyhow, bail, Result};
11use std::{
12    cell::RefCell,
13    collections::{BTreeMap, HashMap, HashSet},
14    fmt::Debug,
15    hash::{DefaultHasher, Hash, Hasher},
16    rc::Rc,
17};
18
19use bon::Builder;
20use digest::Digest;
21#[cfg(feature = "md5")]
22use md5::Md5;
23
24/// A trait that defines the behavior of a hashing function used in the consistent hashing ring.
25///
26/// Implementors of this trait must provide a `digest` method that takes a string input
27/// and returns a 64-bit hash value. This hash value is used to determine the placement
28/// of keys and nodes in the consistent hashing ring.
29pub trait RingHasherTrait {
30    /// Computes a 64-bit hash value for the given input string.
31    ///
32    /// # Arguments
33    /// - `data`: The input string to hash.
34    ///
35    /// # Returns
36    /// - `Result<u64>`: The computed hash value or an error if the hashing fails.
37    fn digest(&self, data: &str) -> Result<u64>;
38}
39
40#[derive(Debug, Default)]
41pub enum RingHasher {
42    #[default]
43    Default_,
44    #[cfg(feature = "md5")]
45    Md5,
46}
47
48impl RingHasherTrait for RingHasher {
49    fn digest(&self, data: &str) -> Result<u64> {
50        match self {
51            RingHasher::Default_ => {
52                let mut hasher = DefaultHasher::new();
53                hasher.write(data.as_bytes());
54
55                Ok(hasher.finish())
56            }
57            #[cfg(feature = "md5")]
58            RingHasher::Md5 => {
59                let hash = Md5::digest(data.as_bytes());
60                let slice = &hash[0..8];
61
62                Ok(u64::from_be_bytes(slice.try_into().map_err(|err| {
63                    anyhow!("Failed to convert MD5 hash to u64: {}", err)
64                })?))
65            }
66        }
67    }
68}
69
70type VirtualId = String;
71type HashValue = u64;
72
73nest! {
74    /// Represents a physical node in the consistent hashing ring.
75    #[derive(Debug, Clone, PartialEq, Eq, Hash, Builder)]*
76    pub struct PhysicalNode {
77        /// A unique identifier for the physical node.
78        pub id: String,
79        /// The number of virtual nodes associated with this physical node.
80        pub num_vnodes: usize,
81        /// A map of virtual node IDs to their corresponding `VirtualNode` structs.
82        #[builder(default)]
83        pub vnodes: BTreeMap<VirtualId, pub struct VirtualNode {
84            /// The unique identifier for the virtual node.
85            pub id: String,
86            /// The hash value of the virtual node.
87            pub hash: u64,
88            /// A set of hash values associated with this virtual node.
89            #[builder(default)]
90            pub hashes: OrderSet<u64>,
91        }>,
92        /// A map of hash values to `Item` structs, representing the data stored on this node.
93        pub data: BTreeMap<
94            HashValue,
95            #[cfg_attr(test, derive(Serialize, Deserialize))]
96            pub struct Item {
97                /// The key of the item.
98                pub key: String,
99                /// The value of the item.
100                pub value: String,
101            }
102            >,
103    }
104}
105
106impl Item {
107    #[cfg(test)]
108    pub fn to_json(&self) -> serde_json::Value {
109        serde_json::to_value(self).unwrap()
110    }
111}
112
113impl PhysicalNode {
114    /// Initializes the virtual nodes for this physical node.
115    ///
116    /// This method generates virtual node IDs based on the physical node's ID and the number
117    /// of virtual nodes (`num_vnodes`). It computes a hash for each virtual node ID using
118    /// the provided hasher and creates a `VirtualNode` instance for each ID. The virtual
119    /// nodes are then added to the `vnodes` map of the physical node.
120    ///
121    /// # Arguments
122    /// - `hasher`: A reference to an object implementing the `RingHasherTrait` to compute hashes.
123    ///
124    /// # Returns
125    /// - `Result<()>`: Returns `Ok(())` if the virtual nodes are successfully initialized,
126    ///   or an error if hashing fails.
127    pub fn init_vnodes<T: RingHasherTrait>(&mut self, hasher: &T) -> Result<()> {
128        let ids = (0..self.num_vnodes).map(|i| format!("{}-{}", self.id, i));
129
130        for id in ids {
131            let hash = hasher.digest(&id)?;
132            let vnode = VirtualNode::builder().id(id.clone()).hash(hash).build();
133            self.vnodes.insert(id, vnode);
134        }
135
136        Ok(())
137    }
138}
139
140/// Represents a consistent hashing ring.
141///
142/// This struct manages the consistent hashing ring, which is used to distribute data across multiple
143/// physical nodes. It supports features such as replication, virtual nodes, and dynamic addition/removal
144/// of nodes.
145///
146/// # Why `Rc<RefCell<>>` is used:
147/// - `Rc` (Reference Counted) is used to allow multiple ownership of `PhysicalNode` instances. This is
148///   necessary because multiple parts of the code may need to access the same physical node.
149/// - `RefCell` is used to enable interior mutability, allowing the `PhysicalNode` to be mutated even
150///   when it is wrapped in an `Rc`. This is important because there are cases that we need to borrow immutably and mutably in the same lifetime.
151#[derive(Debug, Builder)]
152#[builder(builder_type(doc {}))]
153pub struct ConsistentHashingRing<T: RingHasherTrait> {
154    /// The hashing function used to compute hash values for keys and nodes.
155    pub hasher: T,
156    /// The number of physical nodes each key is replicated to for fault tolerance.
157    pub replication_factor: usize,
158    /// A mapping of hash values to virtual node IDs.
159    pub hash_to_vid: BTreeMap<HashValue, String>,
160    /// A mapping of virtual node IDs to physical node IDs.
161    pub vid_to_pid: HashMap<String, String>,
162    /// A mapping of physical node IDs to their corresponding `PhysicalNode` instances.
163    pub physicals: HashMap<String, Rc<RefCell<PhysicalNode>>>,
164}
165
166nest! {
167    /// Represents the result of adding a physical node to the consistent hashing ring.
168    ///
169    /// This struct contains information about the virtual nodes created for the added physical node
170    /// and the number of keys reassigned to each virtual node.
171    #[derive(Debug, Default, Builder)]*
172    pub struct AddNodeResult {
173        /// A list of results for each virtual node created for the added physical node.
174        pub values: Vec<pub struct AddNodeResultValue {
175            /// The unique identifier of the virtual node.
176            pub id: String,
177            /// The hash value of the virtual node.
178            pub hash: u64,
179            /// The number of keys that were reassigned to this virtual node.
180            pub keys_added: usize,
181        }>,
182    }
183}
184
185nest! {
186    /// Represents the result of adding a key-value pair to the consistent hashing ring.
187    ///
188    /// This struct contains information about the key, its hash value, and the locations
189    /// (virtual nodes and their corresponding physical nodes) where the key was added.
190    #[derive(Debug, Builder)]*
191    pub struct AddKeyResult {
192        /// The key that was added to the ring.
193        pub key: String,
194        /// The hash value of the key.
195        pub hash: u64,
196        /// The locations where the key was added, represented as a list of virtual node and physical node pairs.
197        #[builder(default)]
198        pub locations: Vec<pub struct AddKeyResultLocation {
199            /// The unique identifier of the virtual node.
200            pub id: String,
201            /// The unique identifier of the physical node.
202            pub pid: String,
203        }>,
204    }
205}
206
207nest! {
208    /// Represents information about hash ranges in the consistent hashing ring.
209    ///
210    /// This struct contains details about the ranges of hash values managed by the ring,
211    /// including the number of keys and the items stored within each range.
212    #[derive(Debug, Builder)]*
213    #[cfg_attr(test, derive(Serialize, Deserialize))]*
214    pub struct RangeInfo {
215        /// A list of hash ranges managed by the ring.
216        pub ranges: Vec<pub struct Range {
217            /// The starting hash value of the range.
218            pub hash_start: u64,
219            /// The ending hash value of the range.
220            pub hash_end: u64,
221            /// The number of keys within the range.
222            pub count: usize,
223            /// The items stored within the range.
224            pub items: Vec<pub struct HashItem {
225                /// The hash value of the item.
226                pub hash: u64,
227                /// The actual item stored in the range.
228                pub inner: Item,
229            }>,
230        }>,
231    }
232}
233
234impl RangeInfo {
235    pub fn key_count(&self) -> usize {
236        self.ranges.iter().map(|r| r.count).sum()
237    }
238
239    #[cfg(test)]
240    pub fn to_json(&self) -> serde_json::Value {
241        serde_json::to_value(self).unwrap()
242    }
243
244    #[cfg(test)]
245    pub fn to_json_string_pretty(&self) -> String {
246        serde_json::to_string_pretty(self).unwrap()
247    }
248}
249
250nest! {
251    #[derive(Debug, Builder)]*
252    #[cfg_attr(test, derive(Serialize, Deserialize))]*
253    pub struct ItemsByVNode {
254        #[builder(default)]
255        pub values: Vec<pub struct ItemsByVNodeValue {
256            pub vid: String,
257            pub items: Vec<Item>,
258        }>,
259    }
260}
261
262impl ItemsByVNode {
263    #[cfg(test)]
264    pub fn to_json(&self) -> serde_json::Value {
265        serde_json::to_value(self).unwrap()
266    }
267
268    #[cfg(test)]
269    pub fn to_json_string_pretty(&self) -> String {
270        serde_json::to_string_pretty(self).unwrap()
271    }
272}
273
274impl<T: RingHasherTrait> ConsistentHashingRing<T> {
275    pub fn vid_to_physical(&self, vid: &str) -> Result<Rc<RefCell<PhysicalNode>>> {
276        let pid = self
277            .vid_to_pid
278            .get(vid)
279            .ok_or_else(|| anyhow!("Physical node not found, vid: {}", vid))?;
280
281        self.physicals
282            .get(pid)
283            .ok_or_else(|| anyhow!("Physical node not found, pid: {}", pid))
284            .cloned()
285    }
286
287    /// Adds a physical node to the consistent hashing ring.
288    ///
289    /// This method initializes the virtual nodes for the given physical node, calculates their hash values,
290    /// and inserts them into the ring. It also redistributes data from the previous virtual nodes to the new
291    /// virtual nodes based on their hash ranges.
292    ///
293    /// # Steps
294    /// 1. **Initialize Virtual Nodes**: Generate virtual node IDs for the physical node and compute their hash values.
295    /// 2. **Insert Physical Node**: Add the physical node to the `physicals` map, wrapped in `Rc<RefCell>` for shared ownership.
296    /// 3. **Redistribute Data**:
297    ///    - For each virtual node, find the previous virtual node in the ring.
298    ///    - Move data from the hash range `[prev_hash, curr_hash]` of the previous virtual node to the new virtual node.
299    /// 4. **Update Mappings**:
300    ///    - Update `hash_to_vid` to map the hash of the new virtual node to its ID.
301    ///    - Update `vid_to_pid` to map the virtual node ID to the physical node ID.
302    /// 5. **Track Results**: Record the number of keys reassigned to each virtual node in the `AddNodeResult`.
303    ///
304    /// # Arguments
305    /// - `node`: The physical node to be added to the ring.
306    ///
307    /// # Returns
308    /// - `Result<AddNodeResult>`: A result containing information about the virtual nodes created and the
309    ///   number of keys reassigned to each virtual node, or an error if the operation fails.
310    pub fn add_physical_node(&mut self, mut node: PhysicalNode) -> Result<AddNodeResult> {
311        // Clone the physical node's ID for later use.
312        let pid = node.id.clone();
313
314        // Initialize virtual nodes for the physical node using the hasher.
315        node.init_vnodes(&self.hasher)?;
316
317        // Wrap the physical node in an Rc<RefCell> for shared ownership and mutability.
318        let pnode = Rc::new(RefCell::new(node));
319        self.physicals.insert(pid.clone(), pnode.clone());
320
321        // Prepare the result object to track the operation's outcome.
322        let mut result = AddNodeResult::default();
323
324        // Iterate over the virtual nodes of the physical node.
325        for vnode in pnode.borrow().vnodes.values() {
326            let hash = vnode.hash;
327
328            // Find the previous hash in the ring that is less than the current hash.
329            let prev_hash = self.hash_to_vid.range(..hash).last();
330            let mut keys_added = 0;
331
332            // If a previous virtual node exists, move data from its range to the new virtual node.
333            if let Some((prev_hash, prev_vid)) = prev_hash {
334                let prev_node = self.vid_to_physical(prev_vid)?;
335
336                // Collect keys within the range [prev_hash, hash].
337                let keys_to_move = prev_node
338                    .borrow()
339                    .data
340                    .range(prev_hash..=&hash)
341                    .map(|(k, _)| *k)
342                    .collect::<Vec<_>>();
343                keys_added += keys_to_move.len();
344
345                // Move the keys and their associated values to the new virtual node.
346                for key in keys_to_move {
347                    if let Some(value) = prev_node.borrow_mut().data.remove(&key) {
348                        pnode.borrow_mut().data.insert(key, value);
349                    }
350                }
351            }
352
353            // Update the hash-to-virtual-node mapping.
354            self.hash_to_vid.insert(hash, vnode.id.clone());
355
356            // Update the virtual-node-to-physical-node mapping.
357            self.vid_to_pid
358                .insert(vnode.id.clone(), pnode.borrow().id.clone());
359
360            // Add the virtual node's result to the operation's result object.
361            result.values.push(
362                AddNodeResultValue::builder()
363                    .id(vnode.id.clone())
364                    .hash(hash)
365                    .keys_added(keys_added)
366                    .build(),
367            );
368        }
369
370        // Return the result of the operation.
371        Ok(result)
372    }
373
374    /// Removes a physical node from the consistent hashing ring.
375    ///
376    /// This method removes all virtual nodes associated with the given physical node ID (`pid`).
377    /// It redistributes the data stored in the virtual nodes of the removed physical node to the
378    /// next virtual nodes in the ring that belong to different physical nodes.
379    ///
380    /// # Steps
381    /// 1. Check if the `physicals` map is empty. If so, clear all mappings and return early.
382    /// 2. Retrieve the virtual node IDs (`vids`) associated with the physical node.
383    /// 3. Remove each virtual node using the `remove_vnode` method.
384    /// 4. Remove the physical node from the `physicals` map.
385    ///
386    /// # Arguments
387    /// - `pid`: The unique identifier of the physical node to be removed.
388    ///
389    /// # Returns
390    /// - `Result<()>`: Returns `Ok(())` if the physical node is successfully removed, or an error
391    ///   if the physical node does not exist or if any operation fails.
392    pub fn remove_physical_node(&mut self, pid: &str) -> Result<()> {
393        // If there are no physical nodes, clear all mappings and return early.
394        if self.physicals.is_empty() {
395            self.hash_to_vid.clear(); // Clear the hash-to-virtual-node mapping.
396            self.vid_to_pid.clear(); // Clear the virtual-node-to-physical-node mapping.
397            return Ok(());
398        }
399
400        // Retrieve the virtual node IDs (`vids`) associated with the physical node.
401        let vids = {
402            let pnode = self
403                .physicals
404                .get(pid)
405                .ok_or_else(|| anyhow!("Physical node not found, pid: {}", pid))? // Error if the physical node does not exist.
406                .clone();
407            let keys = pnode.borrow().vnodes.keys().cloned().collect::<Vec<_>>(); // Collect virtual node IDs.
408            keys
409        };
410
411        // Remove each virtual node using the `remove_vnode` method.
412        vids.iter().try_for_each(|vid| self.remove_vnode(vid))?;
413
414        // Remove the physical node from the `physicals` map.
415        self.physicals.remove(pid);
416
417        Ok(())
418    }
419
420    /// Inserts a key-value pair into the consistent hashing ring.
421    ///
422    /// This method computes the hash of the given key, determines the virtual nodes
423    /// where the key should be stored based on the replication factor, and inserts
424    /// the key-value pair into the appropriate physical nodes.
425    ///
426    /// # Steps
427    /// 1. Compute the hash of the key using the hasher.
428    /// 2. Determine the virtual nodes (`vids`) where the key should be stored.
429    /// 3. For each virtual node:
430    ///    - Check if the physical node has already been used for replication.
431    ///    - Insert the key-value pair into the virtual node and its corresponding physical node.
432    ///    - Track the virtual node and physical node in the result.
433    /// 4. Stop once the replication factor is satisfied.
434    ///
435    /// # Arguments
436    /// - `key`: The key to be inserted into the ring.
437    /// - `value`: The value associated with the key.
438    ///
439    /// # Returns
440    /// - `Result<AddKeyResult>`: A result containing information about the virtual nodes
441    ///   where the key was stored, or an error if the operation fails.
442    pub fn insert(&mut self, key: &str, value: &str) -> Result<AddKeyResult> {
443        // Compute the hash of the key.
444        let hash = self.hasher.digest(key)?;
445
446        // Prepare the result object to track the operation's outcome.
447        let mut result = AddKeyResult::builder()
448            .key(key.to_string())
449            .hash(hash)
450            .build();
451
452        // Track physical nodes to ensure no duplicate replication.
453        let mut pids = HashSet::new();
454
455        // Determine the virtual nodes (`vids`) where the key should be stored.
456        let vids: Vec<String> = self
457            .hash_to_vid
458            .range(hash..) // Start from the hash and wrap around the ring.
459            .chain(self.hash_to_vid.range(..)) // Handle wrap-around.
460            .map(|(_, vid)| vid.clone())
461            .collect();
462
463        // Iterate over the virtual nodes to insert the key-value pair.
464        for vid in vids {
465            // Get the physical node corresponding to the virtual node.
466            let pnode = self.vid_to_physical(&vid)?;
467            let pid = pnode.borrow().id.clone();
468
469            // Get a mutable reference to the physical node.
470            let mut pnode_refmut = pnode.borrow_mut();
471
472            // Get the virtual node from the physical node.
473            let vnode = pnode_refmut
474                .vnodes
475                .get_mut(&vid)
476                .ok_or_else(|| anyhow!("Virtual node not found, vid: {}", vid))?;
477
478            // Skip if this physical node has already been used for replication.
479            if pids.contains(&pid) {
480                continue;
481            }
482
483            // Insert the hash into the virtual node's hash set.
484            vnode.hashes.insert(hash);
485
486            // Insert the key-value pair into the physical node's data map.
487            pnode_refmut.data.insert(
488                hash,
489                Item::builder()
490                    .key(key.to_string())
491                    .value(value.to_string())
492                    .build(),
493            );
494
495            // Track the physical node to avoid duplicate replication.
496            pids.insert(pid.clone());
497
498            // Add the virtual node and physical node to the result.
499            result.locations.push(
500                AddKeyResultLocation::builder()
501                    .id(vid.clone())
502                    .pid(pid.clone())
503                    .build(),
504            );
505
506            // Stop once the replication factor is satisfied.
507            if pids.len() == self.replication_factor {
508                break;
509            }
510        }
511
512        // Return the result of the operation.
513        Ok(result)
514    }
515
516    /// Inserts multiple key-value pairs into the consistent hashing ring.
517    ///
518    /// This method iterates over a list of items, computes their hashes, and inserts
519    /// each key-value pair into the appropriate virtual nodes in the ring.
520    ///
521    /// # Steps
522    /// 1. Iterate over the list of items.
523    /// 2. For each item, call the `insert` method to add the key-value pair to the ring.
524    /// 3. Collect the results of each insertion into a vector.
525    ///
526    /// # Arguments
527    /// - `items`: A slice of `Item` structs, each containing a key and value to be inserted.
528    ///
529    /// # Returns
530    /// - `Result<Vec<AddKeyResult>>`: A vector of results for each inserted item, or an error
531    ///   if any insertion fails.
532    pub fn insert_many(&mut self, items: &[Item]) -> Result<Vec<AddKeyResult>> {
533        // Initialize a vector to store the results of each insertion.
534        let mut results = Vec::new();
535
536        // Iterate over each item and insert it into the ring.
537        for item in items {
538            // Insert the key-value pair into the ring and collect the result.
539            let result = self.insert(&item.key, &item.value)?;
540            results.push(result);
541        }
542
543        // Return the results of all insertions.
544        Ok(results)
545    }
546
547    /// Retrieves an item from the consistent hashing ring by its key.
548    ///
549    /// This method computes the hash of the given key, determines the virtual nodes
550    /// where the key might be stored, and retrieves the associated value if it exists.
551    ///
552    /// # Steps
553    /// 1. Compute the hash of the key using the hasher.
554    /// 2. Iterate over the virtual nodes (`vids`) where the key might be stored.
555    /// 3. Check each physical node's data map for the key.
556    /// 4. Return the item if found, or an error if the key does not exist.
557    ///
558    /// # Arguments
559    /// - `key`: The key to retrieve from the ring.
560    ///
561    /// # Returns
562    /// - `Result<Item>`: The item associated with the key, or an error if the key is not found.
563    pub fn get_item(&self, key: &str) -> Result<Item> {
564        // Compute the hash of the key.
565        let hash = self.hasher.digest(key)?;
566
567        // Determine the virtual nodes (`vids`) where the key might be stored.
568        let vids: Vec<String> = self
569            .hash_to_vid
570            .range(hash..) // Start from the hash and wrap around the ring.
571            .chain(self.hash_to_vid.range(..)) // Handle wrap-around.
572            .map(|(_, vid)| vid.clone())
573            .collect();
574
575        // Iterate over the virtual nodes to find the key.
576        for vid in vids {
577            let pnode = self.vid_to_physical(&vid)?;
578            let pnode_ref = pnode.borrow();
579
580            // Check if the key exists in the physical node's data map.
581            if let Some(item) = pnode_ref.data.get(&hash) {
582                return Ok(item.clone());
583            }
584        }
585
586        // Return an error if the key is not found.
587        bail!("Item not found for key: {}", key)
588    }
589
590    /// Removes a key-value pair from the consistent hashing ring.
591    ///
592    /// This method computes the hash of the given key, iterates over the virtual nodes
593    /// in both clockwise and anti-clockwise directions, and removes the key-value pair
594    /// from the appropriate physical nodes. The removal stops once the replication factor
595    /// is satisfied.
596    ///
597    /// # Steps
598    /// 1. Compute the hash of the key using the hasher.
599    /// 2. Iterate over the virtual nodes in both clockwise and anti-clockwise directions.
600    /// 3. For each virtual node:
601    ///    - Check if the key exists in the virtual node's hash set.
602    ///    - Remove the key-value pair from the physical node's data map.
603    ///    - Track the number of successful removals.
604    /// 4. Stop once the replication factor is satisfied.
605    /// 5. Return an error if the key is not found or if the replication factor is not met.
606    ///
607    /// # Arguments
608    /// - `key`: The key to be removed from the ring.
609    ///
610    /// # Returns
611    /// - `Result<()>`: Returns `Ok(())` if the key is successfully removed, or an error
612    ///   if the key does not exist or if the replication factor is not met.
613    pub fn remove(&mut self, key: &str) -> Result<()> {
614        // Compute the hash of the key.
615        let hash = self.hasher.digest(key)?;
616        let mut remove_count = 0;
617
618        // Define iterators for clockwise and anti-clockwise traversal of the ring.
619        let ring_iters = [
620            self.hash_to_vid
621                .range(hash..) // Clockwise: Start from the hash and wrap around.
622                .chain(self.hash_to_vid.range(..)),
623            self.hash_to_vid
624                .range(..hash) // Anti-clockwise: Start before the hash and wrap around.
625                .chain(self.hash_to_vid.range(hash..)),
626        ];
627
628        // Iterate over both clockwise and anti-clockwise directions.
629        for it in ring_iters {
630            for (_, vid) in it {
631                // Retrieve the physical node associated with the virtual node.
632                let pnode = self.vid_to_physical(vid)?;
633                let mut pnode_refmut = pnode.borrow_mut();
634
635                // Retrieve the virtual node from the physical node.
636                let vnode = pnode_refmut
637                    .vnodes
638                    .get_mut(vid)
639                    .ok_or_else(|| anyhow!("Virtual node not found, vid: {}", vid))?;
640
641                // Check if the key exists in the virtual node's hash set.
642                let removed = vnode.hashes.remove(&hash);
643
644                if removed {
645                    // Remove the key-value pair from the physical node's data map.
646                    if pnode_refmut.data.remove(&hash).is_some() {
647                        remove_count += 1;
648                    }
649
650                    // Stop once the replication factor is satisfied.
651                    if remove_count == self.replication_factor {
652                        return Ok(());
653                    }
654                } else {
655                    // If a replica wasn't found, stop checking further in this direction.
656                    break;
657                }
658            }
659        }
660
661        // If no replicas were removed, return an error indicating the key was not found.
662        if remove_count == 0 {
663            bail!("Item not found for key: {}", key);
664        }
665
666        // If the replication factor is not met, return an error.
667        bail!(
668            "Remove count {} is less than replication factor {}",
669            remove_count,
670            self.replication_factor
671        )
672    }
673
674    /// Retrieves the physical node IDs (`pids`) that contain a specific key.
675    ///
676    /// This method computes the hash of the given key, determines the virtual nodes
677    /// where the key might be stored, and collects the physical node IDs that store the key.
678    ///
679    /// # Steps
680    /// 1. Compute the hash of the key using the hasher.
681    /// 2. Iterate over the virtual nodes (`vids`) where the key might be stored.
682    /// 3. Check each physical node's data map for the key.
683    /// 4. Collect the physical node IDs that contain the key.
684    ///
685    /// # Arguments
686    /// - `key`: The key to locate in the ring.
687    ///
688    /// # Returns
689    /// - `Result<Vec<String>>`: A vector of physical node IDs that contain the key, or an error
690    ///   if the operation fails.
691    pub fn get_pids_containing_key(&self, key: &str) -> Result<Vec<String>> {
692        // Compute the hash of the key.
693        let hash = self.hasher.digest(key)?;
694
695        // Determine the virtual nodes (`vids`) where the key might be stored.
696        let vids: Vec<String> = self
697            .hash_to_vid
698            .range(hash..) // Start from the hash and wrap around the ring.
699            .chain(self.hash_to_vid.range(..)) // Handle wrap-around.
700            .map(|(_, vid)| vid.clone())
701            .collect();
702
703        // Use a set to track unique physical node IDs.
704        let mut pids = HashSet::new();
705
706        // Iterate over the virtual nodes to find the key.
707        for vid in vids {
708            let pnode = self.vid_to_physical(&vid)?;
709            let pid = pnode.borrow().id.clone();
710
711            // Skip if the physical node has already been checked.
712            if pids.contains(&pid) {
713                continue;
714            }
715
716            // Check if the key exists in the physical node's data map.
717            if pnode.borrow().data.contains_key(&hash) {
718                pids.insert(pid);
719            }
720        }
721
722        // Return the collected physical node IDs as a vector.
723        Ok(pids.into_iter().collect())
724    }
725
726    /// Removes a virtual node from the consistent hashing ring.
727    ///
728    /// This method removes the specified virtual node (`vid`) from the ring. It redistributes
729    /// the data stored in the virtual node to the next virtual node in the ring that belongs
730    /// to a different physical node and does not already contain the same data (due to replication).
731    ///
732    /// # Steps
733    /// 1. Retrieve the physical node associated with the virtual node.
734    /// 2. Remove the virtual node from the physical node's `vnodes` map.
735    /// 3. Redistribute the data stored in the virtual node to the next virtual node in the ring.
736    /// 4. Update the `hash_to_vid` and `vid_to_pid` mappings to reflect the removal.
737    ///
738    /// # Arguments
739    /// - `vid`: The unique identifier of the virtual node to be removed.
740    ///
741    /// # Returns
742    /// - `Result<()>`: Returns `Ok(())` if the virtual node is successfully removed, or an error
743    ///   if the virtual node does not exist or if any operation fails.
744    pub fn remove_vnode(&mut self, vid: &str) -> Result<()> {
745        // Retrieve the physical node associated with the virtual node.
746        let pnode = self.vid_to_physical(vid)?;
747        let pid = pnode.borrow().id.clone();
748
749        // Remove the virtual node from the physical node's `vnodes` map.
750        let vnode = {
751            let mut pnode_refmut = pnode.borrow_mut();
752            pnode_refmut
753                .vnodes
754                .remove(vid)
755                .ok_or_else(|| anyhow!("Virtual node not found, vid: {}", vid))?
756        };
757
758        // Redistribute the data stored in the virtual node.
759        for hash in vnode.hashes.iter() {
760            // Find the next virtual node in the ring that belongs to a different physical node
761            // and does not already contain the same hash (due to replication).
762            let (_, next_vid) = self
763                .hash_to_vid
764                .range(vnode.hash + 1..) // Search for the next hash in the ring.
765                .find(|(_, vid)| {
766                    let next_pnode = self.vid_to_physical(vid).unwrap();
767                    let next_pid = next_pnode.borrow().id.clone();
768
769                    // Ensure the next virtual node belongs to a different physical node
770                    // and does not already contain the same hash.
771                    next_pid != pid && !next_pnode.borrow().data.contains_key(hash)
772                })
773                .ok_or_else(|| anyhow!("Next vnode not found for vid: {}", vid))?;
774
775            // Retrieve the next physical node and move the data to it.
776            let next_pnode = self.vid_to_physical(next_vid)?;
777            let mut next_pnode_refmut = next_pnode.borrow_mut();
778
779            if let Some(key) = pnode.borrow_mut().data.remove(hash) {
780                next_pnode_refmut.data.insert(*hash, key);
781            }
782
783            // Update the next virtual node's hash set to include the redistributed hashes.
784            next_pnode_refmut
785                .vnodes
786                .get_mut(next_vid)
787                .ok_or_else(|| anyhow!("Next vnode not found for vid: {}", next_vid))?
788                .hashes
789                .extend(vnode.hashes.iter());
790        }
791
792        // Remove the virtual node from the mappings.
793        self.hash_to_vid.remove(&vnode.hash);
794        self.vid_to_pid.remove(vid);
795
796        Ok(())
797    }
798
799    /// Adds a single virtual node to a physical node in the consistent hashing ring.
800    ///
801    /// This method creates a new virtual node for the specified physical node (`pid`),
802    /// computes its hash, and redistributes data from the next virtual node in the ring
803    /// to the newly added virtual node.
804    ///
805    /// # Steps
806    /// 1. Retrieve the physical node associated with the given `pid`.
807    /// 2. Generate a unique virtual node ID (`vid`) and compute its hash.
808    /// 3. Ensure the hash is unique and does not collide with existing virtual nodes.
809    /// 4. Redistribute data from the next virtual node in the ring to the new virtual node.
810    /// 5. Update the `hash_to_vid` and `vid_to_pid` mappings.
811    /// 6. Add the new virtual node to the physical node's `vnodes` map.
812    ///
813    /// # Arguments
814    /// - `pid`: The unique identifier of the physical node to which the virtual node will be added.
815    ///
816    /// # Returns
817    /// - `Result<()>`: Returns `Ok(())` if the virtual node is successfully added, or an error
818    ///   if any operation fails.
819    pub fn add_one_vnode(&mut self, pid: &str) -> Result<()> {
820        // Retrieve the physical node associated with the given `pid`.
821        let pnode = self
822            .physicals
823            .get(pid)
824            .ok_or_else(|| anyhow!("Physical node not found, pid: {}", pid))?
825            .clone();
826        let pid = pnode.borrow().id.clone();
827
828        // Generate a unique virtual node ID (`vid`) and compute its hash.
829        let vid = format!("{}-{}", pid, pnode.borrow().num_vnodes);
830        let hash = self.hasher.digest(&vid)?;
831
832        // Ensure the hash is unique and does not collide with existing virtual nodes.
833        if self.hash_to_vid.contains_key(&hash) {
834            bail!("Virtual node with hash {} already exists", hash);
835        }
836
837        // Create the new virtual node.
838        let mut vnode = VirtualNode::builder().id(vid.clone()).hash(hash).build();
839
840        // Find the next virtual node in the ring.
841        let (next_hash, next_vid) = self
842            .hash_to_vid
843            .range(hash + 1..) // Search for the next hash in the ring.
844            .chain(self.hash_to_vid.range(..)) // Handle wrap-around.
845            .next()
846            .ok_or_else(|| anyhow!("Next vnode not found for vid: {}", vid))?;
847
848        // Retrieve the next physical node and its virtual node.
849        let next_pnode = self.vid_to_physical(next_vid)?;
850        let next_pid = next_pnode.borrow().id.clone();
851        let hashes_to_move = next_pnode
852            .borrow()
853            .vnodes
854            .get(next_vid)
855            .ok_or_else(|| anyhow!("Next vnode not found for vid: {}", next_vid))?
856            .hashes
857            .iter()
858            .filter(|candidate| {
859                if &hash < next_hash {
860                    **candidate < hash
861                } else {
862                    // Handle wrap-around.
863                    **candidate < hash && *candidate > next_hash
864                }
865            })
866            .cloned()
867            .collect::<Vec<_>>();
868
869        // Add the hashes to the new virtual node.
870        vnode.hashes.extend(hashes_to_move.iter());
871
872        // Update the next virtual node's hash set to remove the redistributed hashes.
873        {
874            let mut next_pnode_refmut = next_pnode.borrow_mut();
875            let next_vnode = next_pnode_refmut
876                .vnodes
877                .get_mut(next_vid)
878                .ok_or_else(|| anyhow!("Next vnode not found for vid: {}", next_vid))?;
879            for hash in hashes_to_move.iter() {
880                next_vnode.hashes.remove(hash);
881            }
882        }
883
884        // Move data from the next virtual node to the new virtual node.
885        for hash in hashes_to_move.iter() {
886            if pid != next_pid {
887                if let Some(value) = next_pnode.borrow_mut().data.remove(hash) {
888                    pnode.borrow_mut().data.insert(*hash, value);
889                }
890            }
891        }
892
893        // Update the `hash_to_vid` and `vid_to_pid` mappings.
894        self.hash_to_vid.insert(hash, vid.clone());
895        self.vid_to_pid.insert(vid.clone(), pid.clone());
896
897        // Add the new virtual node to the physical node's `vnodes` map.
898        pnode.borrow_mut().vnodes.insert(vid.clone(), vnode);
899        pnode.borrow_mut().num_vnodes += 1;
900
901        Ok(())
902    }
903
904    /// Adjusts the number of virtual nodes for a physical node in the consistent hashing ring.
905    ///
906    /// This method increases or decreases the number of virtual nodes for the specified physical node (`pid`).
907    /// If the new number of virtual nodes is zero, the physical node is removed from the ring.
908    ///
909    /// # Steps
910    /// 1. Retrieve the physical node associated with the given `pid`.
911    /// 2. If `new_num_vnodes` is zero, remove the physical node.
912    /// 3. If `new_num_vnodes` is less than the current number, remove the extra virtual nodes.
913    /// 4. If `new_num_vnodes` is greater than the current number, add new virtual nodes.
914    ///
915    /// # Arguments
916    /// - `pid`: The unique identifier of the physical node.
917    /// - `new_num_vnodes`: The desired number of virtual nodes for the physical node.
918    ///
919    /// # Returns
920    /// - `Result<()>`: Returns `Ok(())` if the operation is successful, or an error if any operation fails.
921    pub fn set_num_vnodes(&mut self, pid: &str, new_num_vnodes: usize) -> Result<()> {
922        // Retrieve the physical node associated with the given `pid`.
923        let pnode = self
924            .physicals
925            .get(pid)
926            .ok_or_else(|| anyhow!("Physical node not found, pid: {}", pid))?
927            .clone();
928        let curr_num_vnodes = pnode.borrow().num_vnodes;
929
930        // If the number of virtual nodes is unchanged, return early.
931        if new_num_vnodes == curr_num_vnodes {
932            return Ok(());
933        }
934
935        // If the new number of virtual nodes is zero, remove the physical node.
936        if new_num_vnodes == 0 {
937            self.remove_physical_node(pid)?;
938            return Ok(());
939        }
940
941        // If the new number of virtual nodes is less, remove the extra virtual nodes.
942        if new_num_vnodes < curr_num_vnodes {
943            for i in new_num_vnodes..curr_num_vnodes {
944                let vid = format!("{}-{}", pid, i);
945                self.remove_vnode(&vid)?; // Remove the virtual node.
946            }
947        } else {
948            // If the new number of virtual nodes is greater, add new virtual nodes.
949            for _ in curr_num_vnodes..new_num_vnodes {
950                self.add_one_vnode(pid)?; // Add a new virtual node.
951            }
952        }
953
954        Ok(())
955    }
956
957    /// Retrieves information about the hash ranges in the consistent hashing ring.
958    ///
959    /// This method computes the hash ranges managed by the ring, including the number of keys
960    /// and the items stored within each range. It provides a detailed view of how data is distributed
961    /// across the virtual nodes in the ring.
962    ///
963    /// # Steps
964    /// 1. Collect all hash values from the `hash_to_vid` mapping.
965    /// 2. Handle the case where there are no hashes (empty ring).
966    /// 3. Handle the case where there is only one hash (single virtual node).
967    /// 4. Iterate over the hash values to compute the ranges between consecutive hashes.
968    /// 5. For each range, collect the items stored in the corresponding virtual node.
969    ///
970    /// # Returns
971    /// - `Result<RangeInfo>`: A `RangeInfo` struct containing details about the hash ranges,
972    ///   including the start and end hashes, the number of keys, and the items in each range.
973    pub fn range_info(&self) -> Result<RangeInfo> {
974        // Initialize an empty `RangeInfo` result.
975        let mut result = RangeInfo::builder().ranges(vec![]).build();
976
977        // Collect all hash values from the `hash_to_vid` mapping.
978        let hashes = self.hash_to_vid.keys().cloned().collect::<Vec<_>>();
979
980        // Handle the case where there are no hashes (empty ring).
981        if hashes.is_empty() {
982            return Ok(result);
983        }
984
985        // Handle the case where there is only one hash (single virtual node).
986        if hashes.len() == 1 {
987            let vid = self.hash_to_vid.get(&hashes[0]).unwrap();
988            let node = self.vid_to_physical(vid)?;
989            let items = node.borrow().data.values().cloned().collect::<Vec<_>>();
990
991            // Add the single range to the result.
992            result.ranges.push(
993                Range::builder()
994                    .hash_start(hashes[0])
995                    .hash_end(hashes[0])
996                    .count(items.len())
997                    .items(
998                        items
999                            .iter()
1000                            .map(|item| {
1001                                HashItem::builder()
1002                                    .hash(hashes[0])
1003                                    .inner(item.clone())
1004                                    .build()
1005                            })
1006                            .collect(),
1007                    )
1008                    .build(),
1009            );
1010
1011            return Ok(result);
1012        }
1013
1014        // Iterate over the hash values to compute the ranges between consecutive hashes.
1015        for i in 0..hashes.len() {
1016            let start_hash = hashes[i];
1017            let end_hash = hashes[(i + 1) % hashes.len()]; // Wrap around for the last range.
1018
1019            // Retrieve the virtual node corresponding to the end hash.
1020            let end_vid = self
1021                .hash_to_vid
1022                .get(&end_hash)
1023                .ok_or_else(|| anyhow!("Virtual ID not found for hash: {}", end_hash))?;
1024            let end_pnode = self.vid_to_physical(end_vid)?;
1025            let end_pnode_ref = end_pnode.borrow();
1026            let end_vnode = end_pnode_ref
1027                .vnodes
1028                .get(end_vid)
1029                .ok_or_else(|| anyhow!("Virtual node not found for vid: {}", end_vid))?;
1030
1031            // Collect the items stored in the virtual node for the current range.
1032            let items = end_vnode
1033                .hashes
1034                .iter()
1035                .map(|hash| {
1036                    let value = end_pnode_ref
1037                        .data
1038                        .get(hash)
1039                        .ok_or_else(|| anyhow!("Key not found for hash: {}", hash))?
1040                        .clone();
1041
1042                    Ok(HashItem::builder().hash(*hash).inner(value).build())
1043                })
1044                .collect::<Result<Vec<_>>>()?;
1045
1046            // Add the range to the result.
1047            result.ranges.push(
1048                Range::builder()
1049                    .hash_start(start_hash)
1050                    .hash_end(end_hash)
1051                    .count(end_vnode.hashes.len())
1052                    .items(items)
1053                    .build(),
1054            );
1055        }
1056
1057        // Return the computed `RangeInfo`.
1058        Ok(result)
1059    }
1060
1061    /// Retrieves the total number of keys stored in the consistent hashing ring.
1062    ///
1063    /// This method iterates over all physical nodes in the ring and sums up the number of keys
1064    /// stored in their respective data maps.
1065    ///
1066    /// # Returns
1067    /// - `Result<usize>`: The total number of keys in the ring, or an error if any operation fails.
1068    pub fn key_count(&self) -> Result<usize> {
1069        // Sum up the number of keys in the data maps of all physical nodes.
1070        Ok(self
1071            .physicals
1072            .values()
1073            .map(|node| node.borrow().data.len()) // Get the number of keys in each physical node.
1074            .sum()) // Sum up the counts.
1075    }
1076
1077    /// Retrieves the items stored in each virtual node of the consistent hashing ring.
1078    ///
1079    /// This method iterates over all virtual nodes in the ring and collects the items stored
1080    /// in each virtual node. It provides a detailed view of how data is distributed across
1081    /// the virtual nodes.
1082    ///
1083    /// # Returns
1084    /// - `Result<ItemsByVNode>`: An `ItemsByVNode` struct containing the items stored in each virtual node,
1085    ///   or an error if any operation fails.
1086    pub fn items_by_vnode(&self) -> Result<ItemsByVNode> {
1087        // Initialize an empty `ItemsByVNode` result.
1088        let mut result = ItemsByVNode::builder().build();
1089
1090        // Handle the case where the ring is empty.
1091        if self.hash_to_vid.is_empty() {
1092            return Ok(result);
1093        }
1094
1095        // Handle the case where there is only one virtual node.
1096        if self.hash_to_vid.len() == 1 {
1097            let (_, vid) = self.hash_to_vid.iter().next().unwrap(); // Get the single virtual node ID.
1098            let node = self.vid_to_physical(vid)?; // Retrieve the physical node associated with the virtual node.
1099            let items = node.borrow().data.values().cloned().collect::<Vec<_>>(); // Collect all items in the physical node.
1100
1101            // Add the items to the result.
1102            result.values.push(
1103                ItemsByVNodeValue::builder()
1104                    .vid(vid.clone())
1105                    .items(items)
1106                    .build(),
1107            );
1108
1109            return Ok(result);
1110        }
1111
1112        // Iterate over all virtual nodes in the ring.
1113        for (_, vid) in self.hash_to_vid.iter() {
1114            let pnode = self.vid_to_physical(vid)?; // Retrieve the physical node associated with the virtual node.
1115            let pnode_ref = pnode.borrow();
1116            let vnode = pnode_ref.vnodes.get(vid).unwrap(); // Get the virtual node.
1117
1118            // Collect the items stored in the virtual node.
1119            let items = vnode
1120                .hashes
1121                .iter()
1122                .map(|hash| {
1123                    pnode_ref
1124                        .data
1125                        .get(hash) // Retrieve the item associated with the hash.
1126                        .cloned()
1127                        .ok_or_else(|| anyhow!("Key not found for hash: {}", hash))
1128                })
1129                .collect::<Result<Vec<_>>>()?;
1130
1131            // Add the items to the result.
1132            result.values.push(
1133                ItemsByVNodeValue::builder()
1134                    .vid(vid.clone())
1135                    .items(items)
1136                    .build(),
1137            );
1138        }
1139
1140        // Return the result containing items grouped by virtual nodes.
1141        Ok(result)
1142    }
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147    use super::*;
1148    use serde_json::json;
1149    use sugars::{btmap, hmap};
1150
1151    struct TestHasher;
1152    impl RingHasherTrait for TestHasher {
1153        fn digest(&self, data: &str) -> Result<u64> {
1154            //     5   | key1
1155            //     6   | key2
1156            //    10   | node2-0
1157            //    15   | key3
1158            //    20   | node3-0
1159            //    25   | key4
1160            //    30   | node1-0
1161            //    40   | node3-1
1162            //    50   | node2-1
1163            //    60   | node3-2
1164            //    65   | key5
1165            //    70   | node1-1
1166            match data {
1167                "node1-0" => Ok(30),
1168                "node1-1" => Ok(70),
1169                "node1-2" => Ok(26),
1170                "node2-0" => Ok(10),
1171                "node2-1" => Ok(50),
1172                "node3-0" => Ok(20),
1173                "node3-1" => Ok(40),
1174                "node3-2" => Ok(60),
1175                "key1" => Ok(5),
1176                "key2" => Ok(6),
1177                "key3" => Ok(15),
1178                "key4" => Ok(25),
1179                "key5" => Ok(65),
1180                _ => {
1181                    bail!("Unimplemented test hasher for data: {}", data)
1182                }
1183            }
1184        }
1185    }
1186
1187    fn ring() -> ConsistentHashingRing<TestHasher> {
1188        ConsistentHashingRing::builder()
1189            .hasher(TestHasher)
1190            .replication_factor(2)
1191            .hash_to_vid(BTreeMap::new())
1192            .vid_to_pid(HashMap::new())
1193            .physicals(HashMap::new())
1194            .build()
1195    }
1196
1197    fn nodes() -> Vec<PhysicalNode> {
1198        vec![
1199            PhysicalNode::builder()
1200                .id("node1".to_string())
1201                .num_vnodes(1)
1202                .data(BTreeMap::new())
1203                .build(),
1204            PhysicalNode::builder()
1205                .id("node2".to_string())
1206                .num_vnodes(2)
1207                .data(BTreeMap::new())
1208                .build(),
1209            PhysicalNode::builder()
1210                .id("node3".to_string())
1211                .num_vnodes(3)
1212                .data(BTreeMap::new())
1213                .build(),
1214        ]
1215    }
1216
1217    fn items() -> Vec<Item> {
1218        serde_json::from_value(json!([
1219            { "key": "key1", "value": "value1" },
1220            { "key": "key2", "value": "value2" },
1221            { "key": "key3", "value": "value3" },
1222            { "key": "key4", "value": "value4" },
1223            { "key": "key5", "value": "value5" }
1224        ]))
1225        .unwrap()
1226    }
1227
1228    #[test]
1229    fn should_successfully_add_physical_nodes() {
1230        let mut ring = ring();
1231        let nodes = nodes();
1232
1233        for node in nodes {
1234            ring.add_physical_node(node).unwrap();
1235        }
1236
1237        assert_eq!(ring.physicals.len(), 3);
1238        assert_eq!(ring.hash_to_vid.len(), 6);
1239        assert_eq!(ring.vid_to_pid.len(), 6);
1240
1241        let expected_vnode_counts = vec![("node1", 1), ("node2", 2), ("node3", 3)];
1242        for (node_id, vnode_count) in expected_vnode_counts {
1243            let node = ring.physicals.get(node_id).unwrap().borrow();
1244            assert_eq!(node.vnodes.len(), vnode_count);
1245            assert_eq!(node.data.len(), 0);
1246
1247            for (i, (_, vnode)) in node.vnodes.iter().enumerate() {
1248                assert_eq!(vnode.id, format!("{}-{}", node_id, i));
1249            }
1250        }
1251
1252        assert_eq!(
1253            ring.hash_to_vid,
1254            btmap! {
1255                30 => "node1-0".to_string(),
1256                10 => "node2-0".to_string(),
1257                50 => "node2-1".to_string(),
1258                20 => "node3-0".to_string(),
1259                40 => "node3-1".to_string(),
1260                60 => "node3-2".to_string(),
1261            }
1262        );
1263
1264        assert_eq!(
1265            ring.vid_to_pid,
1266            hmap! {
1267                "node1-0".to_string() => "node1".to_string(),
1268                "node2-0".to_string() => "node2".to_string(),
1269                "node2-1".to_string() => "node2".to_string(),
1270                "node3-0".to_string() => "node3".to_string(),
1271                "node3-1".to_string() => "node3".to_string(),
1272                "node3-2".to_string() => "node3".to_string(),
1273            }
1274        );
1275    }
1276
1277    #[test]
1278    fn should_successfully_add_keys() {
1279        let mut ring = ring();
1280        let nodes = nodes();
1281
1282        for node in nodes {
1283            ring.add_physical_node(node).unwrap();
1284        }
1285
1286        let items = items();
1287        ring.insert_many(&items).unwrap();
1288
1289        assert!(ring.key_count().unwrap() == 10);
1290        let items_by_vnode = ring.items_by_vnode().unwrap();
1291
1292        assert_eq!(
1293            items_by_vnode.to_json(),
1294            json!({
1295              "values": [
1296                {
1297                  "vid": "node2-0",
1298                  "items": [
1299                    { "key": "key1", "value": "value1" },
1300                    { "key": "key2", "value": "value2" },
1301                    { "key": "key5", "value": "value5" }
1302                  ]
1303                },
1304                {
1305                  "vid": "node3-0",
1306                  "items": [
1307                    { "key": "key1", "value": "value1" },
1308                    { "key": "key2", "value": "value2" },
1309                    { "key": "key3", "value": "value3" },
1310                    { "key": "key5", "value": "value5" }
1311                  ]
1312                },
1313                {
1314                  "vid": "node1-0",
1315                  "items": [
1316                    { "key": "key3", "value": "value3" },
1317                    { "key": "key4", "value": "value4" }
1318                  ]
1319                },
1320                {
1321                  "vid": "node3-1",
1322                  "items": [
1323                    { "key": "key4", "value": "value4" }
1324                  ]
1325                },
1326                {
1327                  "vid": "node2-1",
1328                  "items": []
1329                },
1330                {
1331                  "vid": "node3-2",
1332                  "items": []
1333                }
1334              ]
1335            })
1336        );
1337    }
1338
1339    #[test]
1340    fn should_successfully_remove_key() {
1341        let mut ring = ring();
1342        let nodes = nodes();
1343
1344        for node in nodes {
1345            ring.add_physical_node(node).unwrap();
1346        }
1347
1348        let items = items();
1349        ring.insert_many(&items).unwrap();
1350
1351        ring.remove("key1").unwrap();
1352        assert_eq!(ring.key_count().unwrap(), 8);
1353
1354        let range_info = ring.range_info().unwrap();
1355        assert_eq!(range_info.key_count(), 8);
1356        assert_eq!(
1357            range_info.to_json(),
1358            json!({
1359                "ranges": [
1360                  {
1361                    "hash_start": 10,
1362                    "hash_end": 20,
1363                    "count": 3,
1364                    "items": [
1365                      {
1366                        "hash": 6,
1367                        "inner": {
1368                          "key": "key2",
1369                          "value": "value2"
1370                        }
1371                      },
1372                      {
1373                        "hash": 15,
1374                        "inner": {
1375                          "key": "key3",
1376                          "value": "value3"
1377                        }
1378                      },
1379                      {
1380                        "hash": 65,
1381                        "inner": {
1382                          "key": "key5",
1383                          "value": "value5"
1384                        }
1385                      }
1386                    ]
1387                  },
1388                  {
1389                    "hash_start": 20,
1390                    "hash_end": 30,
1391                    "count": 2,
1392                    "items": [
1393                      {
1394                        "hash": 15,
1395                        "inner": {
1396                          "key": "key3",
1397                          "value": "value3"
1398                        }
1399                      },
1400                      {
1401                        "hash": 25,
1402                        "inner": {
1403                          "key": "key4",
1404                          "value": "value4"
1405                        }
1406                      }
1407                    ]
1408                  },
1409                  {
1410                    "hash_start": 30,
1411                    "hash_end": 40,
1412                    "count": 1,
1413                    "items": [
1414                      {
1415                        "hash": 25,
1416                        "inner": {
1417                          "key": "key4",
1418                          "value": "value4"
1419                        }
1420                      }
1421                    ]
1422                  },
1423                  {
1424                    "hash_start": 40,
1425                    "hash_end": 50,
1426                    "count": 0,
1427                    "items": []
1428                  },
1429                  {
1430                    "hash_start": 50,
1431                    "hash_end": 60,
1432                    "count": 0,
1433                    "items": []
1434                  },
1435                  {
1436                    "hash_start": 60,
1437                    "hash_end": 10,
1438                    "count": 2,
1439                    "items": [
1440                      {
1441                        "hash": 6,
1442                        "inner": {
1443                          "key": "key2",
1444                          "value": "value2"
1445                        }
1446                      },
1447                      {
1448                        "hash": 65,
1449                        "inner": {
1450                          "key": "key5",
1451                          "value": "value5"
1452                        }
1453                      }
1454                    ]
1455                  }
1456                ]
1457              }
1458            )
1459        )
1460    }
1461
1462    #[test]
1463    fn should_successfully_remove_vnode() {
1464        let mut ring = ring();
1465        let nodes = nodes();
1466
1467        for node in nodes {
1468            ring.add_physical_node(node).unwrap();
1469        }
1470
1471        let items = items();
1472        ring.insert_many(&items).unwrap();
1473
1474        ring.remove_vnode("node2-0").unwrap();
1475        assert_eq!(ring.physicals.len(), 3);
1476        assert_eq!(ring.key_count().unwrap(), 10);
1477
1478        let range_info = ring.range_info().unwrap();
1479        assert_eq!(range_info.key_count(), 10);
1480
1481        assert_eq!(
1482            range_info.to_json(),
1483            json!({
1484              "ranges": [
1485                {
1486                  "hash_start": 20,
1487                  "hash_end": 30,
1488                  "count": 5,
1489                  "items": [
1490                    { "hash": 15, "inner": { "key": "key3", "value": "value3" } },
1491                    { "hash": 25, "inner": { "key": "key4", "value": "value4" } },
1492                    { "hash": 5, "inner": { "key": "key1", "value": "value1" } },
1493                    { "hash": 6, "inner": { "key": "key2", "value": "value2" } },
1494                    { "hash": 65, "inner": { "key": "key5", "value": "value5" } }
1495                  ]
1496                },
1497                {
1498                  "hash_start": 30,
1499                  "hash_end": 40,
1500                  "count": 1,
1501                  "items": [
1502                    { "hash": 25, "inner": { "key": "key4", "value": "value4" } }
1503                  ]
1504                },
1505                {
1506                  "hash_start": 40,
1507                  "hash_end": 50,
1508                  "count": 0,
1509                  "items": []
1510                },
1511                {
1512                  "hash_start": 50,
1513                  "hash_end": 60,
1514                  "count": 0,
1515                  "items": []
1516                },
1517                {
1518                  "hash_start": 60,
1519                  "hash_end": 20,
1520                  "count": 4,
1521                  "items": [
1522                    { "hash": 5, "inner": { "key": "key1", "value": "value1" } },
1523                    { "hash": 6, "inner": { "key": "key2", "value": "value2" } },
1524                    { "hash": 15, "inner": { "key": "key3", "value": "value3" } },
1525                    { "hash": 65, "inner": { "key": "key5", "value": "value5" } }
1526                  ]
1527                }
1528              ]
1529            })
1530        )
1531    }
1532
1533    #[test]
1534    fn should_successfully_remove_vnode_of_physical_node_with_one_vnode() {
1535        let mut ring = ring();
1536        let nodes = nodes();
1537
1538        for node in nodes {
1539            ring.add_physical_node(node).unwrap();
1540        }
1541
1542        let items = items();
1543        ring.insert_many(&items).unwrap();
1544        ring.remove_vnode("node1-0").unwrap();
1545
1546        assert_eq!(ring.physicals.len(), 3);
1547        assert_eq!(ring.key_count().unwrap(), 10);
1548
1549        let count = ring
1550            .physicals
1551            .get("node1")
1552            .unwrap()
1553            .clone()
1554            .borrow()
1555            .vnodes
1556            .len();
1557        assert_eq!(count, 0)
1558    }
1559
1560    #[test]
1561    fn should_successfully_remove_physical_node_with_one_vnode() {
1562        let mut ring = ring();
1563        let nodes = nodes();
1564
1565        for node in nodes {
1566            ring.add_physical_node(node).unwrap();
1567        }
1568
1569        let items = items();
1570        ring.insert_many(&items).unwrap();
1571
1572        ring.remove_physical_node("node1").unwrap();
1573        assert_eq!(ring.physicals.len(), 2);
1574
1575        let range_info = ring.range_info().unwrap();
1576        assert_eq!(ring.key_count().unwrap(), 10);
1577        assert_eq!(range_info.key_count(), 10);
1578
1579        assert_eq!(
1580            range_info.to_json(),
1581            json!({
1582              "ranges": [
1583                {
1584                  "hash_start": 10,
1585                  "hash_end": 20,
1586                  "count": 4,
1587                  "items": [
1588                    { "hash": 5, "inner": { "key": "key1", "value": "value1" } },
1589                    { "hash": 6, "inner": { "key": "key2", "value": "value2" } },
1590                    { "hash": 15, "inner": { "key": "key3", "value": "value3" } },
1591                    { "hash": 65, "inner": { "key": "key5", "value": "value5" } }
1592                  ]
1593                },
1594                {
1595                  "hash_start": 20,
1596                  "hash_end": 40,
1597                  "count": 1,
1598                  "items": [
1599                    { "hash": 25, "inner": { "key": "key4", "value": "value4" } }
1600                  ]
1601                },
1602                {
1603                  "hash_start": 40,
1604                  "hash_end": 50,
1605                  "count": 2,
1606                  "items": [
1607                    { "hash": 15, "inner": { "key": "key3", "value": "value3" } },
1608                    { "hash": 25, "inner": { "key": "key4", "value": "value4" } }
1609                  ]
1610                },
1611                {
1612                  "hash_start": 50,
1613                  "hash_end": 60,
1614                  "count": 0,
1615                  "items": []
1616                },
1617                {
1618                  "hash_start": 60,
1619                  "hash_end": 10,
1620                  "count": 3,
1621                  "items": [
1622                    { "hash": 5, "inner": { "key": "key1", "value": "value1" } },
1623                    { "hash": 6, "inner": { "key": "key2", "value": "value2" } },
1624                    { "hash": 65, "inner": { "key": "key5", "value": "value5" } }
1625                  ]
1626                }
1627              ]
1628            })
1629        );
1630    }
1631
1632    #[test]
1633    fn should_successfully_remove_physical_node_with_multiple_vnodes() {
1634        let mut ring = ring();
1635        let nodes = nodes();
1636
1637        for node in nodes {
1638            ring.add_physical_node(node).unwrap();
1639        }
1640
1641        let items = items();
1642        ring.insert_many(&items).unwrap();
1643
1644        ring.remove_physical_node("node2").unwrap();
1645        assert_eq!(ring.physicals.len(), 2);
1646
1647        assert_eq!(ring.key_count().unwrap(), 10);
1648
1649        let range_info = ring.range_info().unwrap();
1650        assert_eq!(range_info.key_count(), 10);
1651
1652        assert_eq!(
1653            range_info.to_json(),
1654            json!({
1655              "ranges": [
1656                {
1657                  "hash_start": 20,
1658                  "hash_end": 30,
1659                  "count": 5,
1660                  "items": [
1661                    { "hash": 15, "inner": { "key": "key3", "value": "value3" } },
1662                    { "hash": 25, "inner": { "key": "key4", "value": "value4" } },
1663                    { "hash": 5, "inner": { "key": "key1", "value": "value1" } },
1664                    { "hash": 6, "inner": { "key": "key2", "value": "value2" } },
1665                    { "hash": 65, "inner": { "key": "key5", "value": "value5" } }
1666                  ]
1667                },
1668                {
1669                  "hash_start": 30,
1670                  "hash_end": 40,
1671                  "count": 1,
1672                  "items": [
1673                    { "hash": 25, "inner": { "key": "key4", "value": "value4" } }
1674                  ]
1675                },
1676                {
1677                  "hash_start": 40,
1678                  "hash_end": 60,
1679                  "count": 0,
1680                  "items": []
1681                },
1682                {
1683                  "hash_start": 60,
1684                  "hash_end": 20,
1685                  "count": 4,
1686                  "items": [
1687                    { "hash": 5, "inner": { "key": "key1", "value": "value1" } },
1688                    { "hash": 6, "inner": { "key": "key2", "value": "value2" } },
1689                    { "hash": 15, "inner": { "key": "key3", "value": "value3" } },
1690                    { "hash": 65, "inner": { "key": "key5", "value": "value5" } }
1691                  ]
1692                }
1693              ]
1694            })
1695        );
1696    }
1697
1698    #[test]
1699    fn should_successfully_add_one_vnode() {
1700        let mut ring = ring();
1701        let nodes = nodes();
1702
1703        for node in nodes {
1704            ring.add_physical_node(node).unwrap();
1705        }
1706
1707        let items = items();
1708        ring.insert_many(&items).unwrap();
1709
1710        ring.add_one_vnode("node1").unwrap();
1711        assert_eq!(ring.physicals.len(), 3);
1712        assert_eq!(ring.key_count().unwrap(), 10);
1713
1714        let range_info = ring.range_info().unwrap();
1715        assert_eq!(range_info.key_count(), 10);
1716
1717        assert_eq!(
1718            range_info.to_json(),
1719            json!({
1720              "ranges": [
1721                {
1722                  "hash_start": 10,
1723                  "hash_end": 20,
1724                  "count": 4,
1725                  "items": [
1726                    { "hash": 5, "inner": { "key": "key1", "value": "value1" } },
1727                    { "hash": 6, "inner": { "key": "key2", "value": "value2" } },
1728                    { "hash": 15, "inner": { "key": "key3", "value": "value3" } },
1729                    { "hash": 65, "inner": { "key": "key5", "value": "value5" } }
1730                  ]
1731                },
1732                {
1733                  "hash_start": 20,
1734                  "hash_end": 30,
1735                  "count": 2,
1736                  "items": [
1737                    { "hash": 15, "inner": { "key": "key3", "value": "value3" } },
1738                    { "hash": 25, "inner": { "key": "key4", "value": "value4" } }
1739                  ]
1740                },
1741                {
1742                  "hash_start": 30,
1743                  "hash_end": 40,
1744                  "count": 1,
1745                  "items": [
1746                    { "hash": 25, "inner": { "key": "key4", "value": "value4" } }
1747                  ]
1748                },
1749                {
1750                  "hash_start": 40,
1751                  "hash_end": 50,
1752                  "count": 0,
1753                  "items": []
1754                },
1755                {
1756                  "hash_start": 50,
1757                  "hash_end": 60,
1758                  "count": 0,
1759                  "items": []
1760                },
1761                {
1762                  "hash_start": 60,
1763                  "hash_end": 70,
1764                  "count": 1,
1765                  "items": [
1766                    { "hash": 65, "inner": { "key": "key5", "value": "value5" } }
1767                  ]
1768                },
1769                {
1770                  "hash_start": 70,
1771                  "hash_end": 10,
1772                  "count": 2,
1773                  "items": [
1774                    { "hash": 5, "inner": { "key": "key1", "value": "value1" } },
1775                    { "hash": 6, "inner": { "key": "key2", "value": "value2" } }
1776                  ]
1777                }
1778              ]
1779            })
1780        )
1781    }
1782
1783    #[test]
1784    fn should_successfully_decrease_num_vnodes_to_zero() {
1785        let mut ring = ring();
1786        let nodes = nodes();
1787
1788        for node in nodes {
1789            ring.add_physical_node(node).unwrap();
1790        }
1791
1792        let items = items();
1793        ring.insert_many(&items).unwrap();
1794
1795        ring.set_num_vnodes("node1", 0).unwrap();
1796        assert_eq!(ring.physicals.len(), 2);
1797        assert_eq!(ring.key_count().unwrap(), 10);
1798
1799        let range_info = ring.range_info().unwrap();
1800        assert_eq!(range_info.key_count(), 10);
1801    }
1802
1803    #[test]
1804    fn should_successfully_descrease_num_vnodes() {
1805        let mut ring = ring();
1806        let nodes = nodes();
1807
1808        for node in nodes {
1809            ring.add_physical_node(node).unwrap();
1810        }
1811
1812        let items = items();
1813        ring.insert_many(&items).unwrap();
1814
1815        ring.set_num_vnodes("node3", 1).unwrap();
1816        assert_eq!(ring.physicals.len(), 3);
1817        assert_eq!(ring.key_count().unwrap(), 10);
1818
1819        let range_info = ring.range_info().unwrap();
1820        assert_eq!(range_info.key_count(), 10);
1821
1822        assert_eq!(
1823            range_info.to_json(),
1824            json!({
1825              "ranges": [
1826                {
1827                  "hash_start": 10,
1828                  "hash_end": 20,
1829                  "count": 4,
1830                  "items": [
1831                    { "hash": 5, "inner": { "key": "key1", "value": "value1" } },
1832                    { "hash": 6, "inner": { "key": "key2", "value": "value2" } },
1833                    { "hash": 15, "inner": { "key": "key3", "value": "value3" } },
1834                    { "hash": 65, "inner": { "key": "key5", "value": "value5" } }
1835                  ]
1836                },
1837                {
1838                  "hash_start": 20,
1839                  "hash_end": 30,
1840                  "count": 2,
1841                  "items": [
1842                    { "hash": 15, "inner": { "key": "key3", "value": "value3" } },
1843                    { "hash": 25, "inner": { "key": "key4", "value": "value4" } }
1844                  ]
1845                },
1846                {
1847                  "hash_start": 30,
1848                  "hash_end": 50,
1849                  "count": 1,
1850                  "items": [
1851                    { "hash": 25, "inner": { "key": "key4", "value": "value4" } }
1852                  ]
1853                },
1854                {
1855                  "hash_start": 50,
1856                  "hash_end": 10,
1857                  "count": 3,
1858                  "items": [
1859                    { "hash": 5, "inner": { "key": "key1", "value": "value1" } },
1860                    { "hash": 6, "inner": { "key": "key2", "value": "value2" } },
1861                    { "hash": 65, "inner": { "key": "key5", "value": "value5" } }
1862                  ]
1863                }
1864              ]
1865            })
1866        )
1867    }
1868
1869    #[test]
1870    fn should_successfully_increase_num_vnodes() {
1871        let mut ring = ring();
1872        let nodes = nodes();
1873
1874        for node in nodes {
1875            ring.add_physical_node(node).unwrap();
1876        }
1877
1878        let items = items();
1879        ring.insert_many(&items).unwrap();
1880
1881        ring.set_num_vnodes("node1", 3).unwrap();
1882        assert_eq!(ring.physicals.len(), 3);
1883        assert_eq!(ring.key_count().unwrap(), 10);
1884
1885        let range_info = ring.range_info().unwrap();
1886        assert_eq!(range_info.key_count(), 10);
1887
1888        assert_eq!(
1889            range_info.to_json(),
1890            json!({
1891              "ranges": [
1892                {
1893                  "hash_start": 10,
1894                  "hash_end": 20,
1895                  "count": 4,
1896                  "items": [
1897                    { "hash": 5, "inner": { "key": "key1", "value": "value1" } },
1898                    { "hash": 6, "inner": { "key": "key2", "value": "value2" } },
1899                    { "hash": 15, "inner": { "key": "key3", "value": "value3" } },
1900                    { "hash": 65, "inner": { "key": "key5", "value": "value5" } }
1901                  ]
1902                },
1903                {
1904                  "hash_start": 20,
1905                  "hash_end": 26,
1906                  "count": 2,
1907                  "items": [
1908                    { "hash": 15, "inner": { "key": "key3", "value": "value3" } },
1909                    { "hash": 25, "inner": { "key": "key4", "value": "value4" } }
1910                  ]
1911                },
1912                {
1913                  "hash_start": 26,
1914                  "hash_end": 30,
1915                  "count": 0,
1916                  "items": []
1917                },
1918                {
1919                  "hash_start": 30,
1920                  "hash_end": 40,
1921                  "count": 1,
1922                  "items": [
1923                    { "hash": 25, "inner": { "key": "key4", "value": "value4" } }
1924                  ]
1925                },
1926                {
1927                  "hash_start": 40,
1928                  "hash_end": 50,
1929                  "count": 0,
1930                  "items": []
1931                },
1932                {
1933                  "hash_start": 50,
1934                  "hash_end": 60,
1935                  "count": 0,
1936                  "items": []
1937                },
1938                {
1939                  "hash_start": 60,
1940                  "hash_end": 70,
1941                  "count": 1,
1942                  "items": [
1943                    { "hash": 65, "inner": { "key": "key5", "value": "value5" } }
1944                  ]
1945                },
1946                {
1947                  "hash_start": 70,
1948                  "hash_end": 10,
1949                  "count": 2,
1950                  "items": [
1951                    { "hash": 5, "inner": { "key": "key1", "value": "value1" } },
1952                    { "hash": 6, "inner": { "key": "key2", "value": "value2" } }
1953                  ]
1954                }
1955              ]
1956            })
1957        )
1958    }
1959
1960    #[test]
1961    fn items_should_exist() {
1962        let mut ring = ring();
1963        let nodes = nodes();
1964
1965        for node in nodes {
1966            ring.add_physical_node(node).unwrap();
1967        }
1968
1969        let items = items();
1970        ring.insert_many(&items).unwrap();
1971
1972        assert_eq!(
1973            ring.get_item("key1").unwrap().to_json(),
1974            json!({ "key": "key1", "value": "value1" })
1975        );
1976
1977        assert_eq!(
1978            ring.get_item("key2").unwrap().to_json(),
1979            json!({ "key": "key2", "value": "value2" })
1980        );
1981    }
1982
1983    #[test]
1984    fn items_should_not_exist() {
1985        let mut ring = ring();
1986        let nodes = nodes();
1987
1988        for node in nodes {
1989            ring.add_physical_node(node).unwrap();
1990        }
1991
1992        let items = items();
1993        ring.insert_many(&items).unwrap();
1994
1995        assert!(ring.get_item("key100").is_err());
1996    }
1997
1998    #[test]
1999    fn should_return_pids_containing_key() {
2000        let mut ring = ring();
2001        let nodes = nodes();
2002
2003        for node in nodes {
2004            ring.add_physical_node(node).unwrap();
2005        }
2006
2007        let items = items();
2008        ring.insert_many(&items).unwrap();
2009
2010        let mut actual = ring.get_pids_containing_key("key1").unwrap();
2011        actual.sort();
2012        assert_eq!(actual, vec!["node2".to_string(), "node3".to_string()]);
2013    }
2014}