lfchring/
state.rs

1// This file is part of lfchring-rs.
2//
3// Copyright 2021 Christos Katsakioris
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::borrow::Borrow;
18use std::collections::BTreeSet;
19use std::fmt::{Display, Formatter};
20use std::mem;
21use std::sync::Arc;
22
23use log::trace;
24
25use crate::{
26    types::{Adjacency, HashRingError, Hasher, Node, Result, Vnid},
27    vnode::VirtualNode,
28};
29
30/// This is the internal data structure that is pointed to by [`HashRing<N, H>`].
31///
32/// The pointer to it is read, cloned and updated atomically, on an optimistic concurrency control
33/// basis, to avoid synchronization issues.
34/// The data themselves are freed through the epoch-based memory reclamation technique implemented
35/// by [`crossbeam_epoch`].
36#[derive(Debug)]
37pub(crate) struct HashRingState<N, H>
38where
39    N: Node + ?Sized,
40    H: Hasher,
41{
42    hasher: H,
43    vnodes_per_node: Vnid,
44    replication_factor: u8,
45    pub(crate) vnodes: Vec<VirtualNode<N>>,
46    // `crate::iter::Iter` requires access to this field, hence the `pub(crate)`.
47}
48
49impl<N, H> Clone for HashRingState<N, H>
50where
51    N: Node + ?Sized,
52    H: Hasher,
53{
54    fn clone(&self) -> Self {
55        Self {
56            hasher: H::default(),
57            vnodes_per_node: self.vnodes_per_node,
58            replication_factor: self.replication_factor,
59            vnodes: self.vnodes.clone(),
60        }
61    }
62}
63
64impl<N, H> HashRingState<N, H>
65where
66    N: Node + ?Sized,
67    H: Hasher,
68{
69    #[inline]
70    pub(crate) fn with_capacity(
71        capacity: usize,
72        hasher: H,
73        vnodes_per_node: Vnid,
74        replication_factor: u8,
75    ) -> Self {
76        Self {
77            hasher,
78            vnodes_per_node,
79            replication_factor,
80            vnodes: Vec::with_capacity(capacity),
81        }
82    }
83
84    // First, initialize all vnodes for the given nodes into a new `BTreeSet`. Then, check whether
85    // any of them is already present in the current vnodes map to make sure no collision occurs.
86    // Finally, merge the new vnodes into the old ones.
87    //
88    // NOTE: If any of the newly created `VirtualNode`s collides with an already existing one,
89    // none of the new `nodes` is inserted in the ring.
90    pub(crate) fn insert(&mut self, nodes: &[Arc<N>]) -> Result<()> {
91        let mut new = BTreeSet::new();
92        for node in nodes {
93            for vnid in 0..self.vnodes_per_node {
94                let vn = VirtualNode::new(&mut self.hasher, Arc::clone(&node), vnid);
95                // We need to not only check whether vn is already in the ring, but also whether
96                // it is present among the vnodes we are about to extend the ring by.
97                if self.vnodes.binary_search(&vn).is_ok() || !new.insert(vn.clone()) {
98                    // FIXME: How to avoid cloning the VirtualNode ^ but also be able to use it in:
99                    return Err(HashRingError::VirtualNodeAlreadyExists(format!("{}", vn)));
100                }
101                //trace!("vnode '{}' has been included in the ring extension", vn);
102            }
103        }
104        // TODO: What happens with the reallocation here? It is completely uncontrolled for now.
105        self.vnodes.extend(new);
106        self.vnodes.sort_unstable();
107        self.fix_replica_owners();
108        Ok(())
109    }
110
111    pub(crate) fn remove(&mut self, nodes: &[Arc<N>]) -> Result<()> {
112        let mut removed_indices = BTreeSet::new();
113        let node_names = nodes
114            .iter()
115            .map(|node| node.hashring_node_id())
116            .collect::<Vec<_>>();
117        let max_name_len = node_names.iter().map(|name| name.len()).max().unwrap();
118
119        let mut name = Vec::with_capacity(max_name_len + mem::size_of::<Vnid>());
120        for node_name in node_names {
121            for vnid in 0..self.vnodes_per_node {
122                name.clear();
123                name.extend(&*node_name);
124                name.extend(&vnid.to_ne_bytes());
125                let vn = self.hasher.digest(&name);
126                if let Ok(index) = self.vnodes.binary_search_by(|e| e.name.cmp(&vn)) {
127                    //trace!("Removing vnode '{:x?}' at index {}.", vn, index);
128                    removed_indices.insert(index);
129                } else {
130                    return Err(HashRingError::VirtualNodeDoesNotExist(format!("{:x?}", vn)));
131                }
132            }
133        }
134
135        // TODO: Return the removed vnodes or not? I guess it would be best if the output of
136        //       `HashRing::remove` is consistent with the output of `HashRing::insert`.
137        let mut removed_vnodes = Vec::with_capacity(removed_indices.len());
138        // Indices must be visited in reverse (descending) order for the removal; otherwise, the
139        // indices of the virtual nodes to be removed in `self.vnodes` become invalid as they are
140        // all shifted towards the beginning of the vector on every removal.
141        for &index in removed_indices.iter().rev() {
142            let vn = self.vnodes.remove(index);
143            removed_vnodes.push(vn);
144        }
145        //assert!(self.vnodes.is_sorted());
146        self.fix_replica_owners();
147        //Ok(removed_vnodes) TODO
148        Ok(())
149    }
150
151    fn fix_replica_owners(&mut self) {
152        for i in 0..self.vnodes.len() {
153            // SAFETY: `i` is always in range `0..self.vnodes.len()`
154            let curr_vn = unsafe { self.vnodes.get_unchecked(i) };
155
156            let mut replica_owners = Vec::with_capacity(self.replication_factor as usize);
157            // Some capacity might be wasted here  ^^  but we prefer it over reallocation.
158            let original_owner = &curr_vn.node;
159            replica_owners.push(Arc::clone(original_owner));
160
161            // Number of subsequent replica-owning nodes remaining to be found
162            let mut k = self.replication_factor - 1;
163
164            for (j, vn) in self
165                .vnodes
166                .iter()
167                .enumerate()
168                .cycle()
169                .skip((i + 1) % self.vnodes.len())
170            {
171                // If all replica owners for this vnode have been determined, break.
172                // Similarly, if we wrapped around the ring back to ourselves, break, even if k > 0
173                // (which would mean that replication_factor > # of distinct ring nodes).
174                if k == 0 || j == i {
175                    break;
176                }
177                // Since we want distinct nodes only in `replica_owners`, make sure `vn.node` is
178                // not already in.
179                let mut node_already_in = false;
180                for node in &replica_owners {
181                    if vn.node.hashring_node_id() == node.hashring_node_id() {
182                        node_already_in = true;
183                        break;
184                    }
185                }
186                // If `vn.node` is not already in, get it in, and decrease the number of distinct
187                // nodes remaining to be found.
188                if !node_already_in {
189                    replica_owners.push(Arc::clone(&vn.node));
190                    k -= 1;
191                }
192            }
193
194            // Store the replica owners we just found for the current vnode, in the current vnode.
195            // SAFETY: `i` is always in range `0..self.vnodes.len()`
196            let mut curr_vn = unsafe { self.vnodes.get_unchecked_mut(i) };
197            curr_vn.replica_owners = Some(replica_owners);
198        }
199    }
200
201    #[inline]
202    pub(crate) fn len_nodes(&self) -> usize {
203        self.vnodes.len() / self.vnodes_per_node as usize
204    }
205
206    #[inline]
207    pub(crate) fn len_virtual_nodes(&self) -> usize {
208        self.vnodes.len()
209    }
210
211    pub(crate) fn has_virtual_node<K>(&self, key: &K) -> bool
212    where
213        K: Borrow<[u8]>,
214    {
215        self.vnodes
216            .binary_search_by(|vn| {
217                let name: &[u8] = &vn.name;
218                name.cmp(key.borrow())
219            })
220            .is_ok()
221    }
222
223    // returns a reference to the actual `VirtualNode` in `HashRingState.vnodes`
224    pub(crate) fn virtual_node_for_key<K>(&self, key: &K) -> Result<&VirtualNode<N>>
225    where
226        K: Borrow<[u8]>,
227    {
228        // Return an error if the ring is empty...
229        if self.vnodes.is_empty() {
230            return Err(HashRingError::EmptyRing);
231        }
232        // ...otherwise find the correct index and return the associated vnode.
233        let index = self
234            .vnodes
235            .binary_search_by(|vn| {
236                let name: &[u8] = &vn.name;
237                name.cmp(key.borrow())
238            })
239            .unwrap_or_else(|index| index)
240            % self.vnodes.len();
241        // SAFETY: The remainder of the above integer division is always a usize between `0` and
242        //         `self.vnodes.len() - 1`, hence can be used as an index in `self.vnodes`.
243        Ok(unsafe { self.vnodes.get_unchecked(index) })
244    }
245
246    pub(crate) fn adjacent<K>(&self, adjacency: Adjacency, key: &K) -> Result<&VirtualNode<N>>
247    where
248        K: Borrow<[u8]>,
249    {
250        // Return an error if the ring is empty...
251        if self.vnodes.is_empty() {
252            return Err(HashRingError::EmptyRing);
253        }
254        // ...otherwise find the current index...
255        let index = self
256            .vnodes
257            .binary_search_by(|vn| {
258                let name: &[u8] = &vn.name;
259                name.cmp(key.borrow())
260            })
261            .unwrap_or_else(|index| index)
262            % self.vnodes.len();
263        // ...and return the adjacent one.
264        let index = match adjacency {
265            Adjacency::Predecessor => {
266                if 0 == index {
267                    self.vnodes.len() - 1
268                } else {
269                    index - 1
270                }
271            }
272            Adjacency::Successor => (index + 1) % self.vnodes.len(),
273        };
274        // SAFETY: The value of the index always stays within the range `0` to
275        //         `self.vnodes.len() - 1`, hence can be used as an index in `self.vnodes`.
276        Ok(unsafe { self.vnodes.get_unchecked(index as usize) })
277    }
278
279    pub(crate) fn adjacent_node<K>(&self, adjacency: Adjacency, key: &K) -> Result<&VirtualNode<N>>
280    where
281        K: Borrow<[u8]>,
282    {
283        // Return an error if the ring is empty or has only one distinct node...
284        match self.vnodes.len() / self.vnodes_per_node as usize {
285            0 => {
286                return Err(HashRingError::EmptyRing);
287            }
288            1 => {
289                return Err(HashRingError::SingleDistinctNodeRing);
290            }
291            _ => (),
292        };
293
294        // ...otherwise find the current index...
295        let index = self
296            .vnodes
297            .binary_search_by(|vn| {
298                let name: &[u8] = &vn.name;
299                name.cmp(key.borrow())
300            })
301            .unwrap_or_else(|index| index)
302            % self.vnodes.len();
303        // ...and linearly search the vnode from there.
304        match adjacency {
305            Adjacency::Predecessor => {
306                let mut iter = self
307                    .vnodes
308                    .iter()
309                    .rev()
310                    .cycle()
311                    .skip(self.vnodes.len() - index)
312                    .skip_while(|&vn| {
313                        trace!("checking {} ...", vn);
314                        vn.node.hashring_node_id()
315                            == unsafe { self.vnodes.get_unchecked(index) }
316                                .node
317                                .hashring_node_id()
318                    });
319                iter.next()
320            }
321            Adjacency::Successor => {
322                let mut iter = self
323                    .vnodes
324                    .iter()
325                    .cycle()
326                    .skip((index + 1) % self.vnodes.len())
327                    .skip_while(|&vn| {
328                        trace!("checking {} ...", vn);
329                        vn.node.hashring_node_id()
330                            == unsafe { self.vnodes.get_unchecked(index) }
331                                .node
332                                .hashring_node_id()
333                    });
334                iter.next()
335            }
336        }
337        .ok_or_else(|| unreachable!())
338    }
339}
340
341impl<N, H> Display for HashRingState<N, H>
342where
343    N: Node + ?Sized,
344    H: Hasher,
345{
346    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
347        writeln!(
348            f,
349            "HashRingState ({} nodes X {} virtual, replication factor = {}) {{",
350            self.len_nodes(),
351            self.vnodes_per_node,
352            self.replication_factor
353        )?;
354        for (i, vn) in self.vnodes.iter().enumerate() {
355            writeln!(f, "\t- ({:0>6})  {}", i, vn)?
356        }
357        writeln!(f, "}}")
358    }
359}