lfchring/state.rs
1// This file is part of lfchring-rs.
2//
3// Copyright 2021 Christos Katsakioris
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::borrow::Borrow;
18use std::collections::BTreeSet;
19use std::fmt::{Display, Formatter};
20use std::mem;
21use std::sync::Arc;
22
23use log::trace;
24
25use crate::{
26 types::{Adjacency, HashRingError, Hasher, Node, Result, Vnid},
27 vnode::VirtualNode,
28};
29
30/// This is the internal data structure that is pointed to by [`HashRing<N, H>`].
31///
32/// The pointer to it is read, cloned and updated atomically, on an optimistic concurrency control
33/// basis, to avoid synchronization issues.
34/// The data themselves are freed through the epoch-based memory reclamation technique implemented
35/// by [`crossbeam_epoch`].
36#[derive(Debug)]
37pub(crate) struct HashRingState<N, H>
38where
39 N: Node + ?Sized,
40 H: Hasher,
41{
42 hasher: H,
43 vnodes_per_node: Vnid,
44 replication_factor: u8,
45 pub(crate) vnodes: Vec<VirtualNode<N>>,
46 // `crate::iter::Iter` requires access to this field, hence the `pub(crate)`.
47}
48
49impl<N, H> Clone for HashRingState<N, H>
50where
51 N: Node + ?Sized,
52 H: Hasher,
53{
54 fn clone(&self) -> Self {
55 Self {
56 hasher: H::default(),
57 vnodes_per_node: self.vnodes_per_node,
58 replication_factor: self.replication_factor,
59 vnodes: self.vnodes.clone(),
60 }
61 }
62}
63
64impl<N, H> HashRingState<N, H>
65where
66 N: Node + ?Sized,
67 H: Hasher,
68{
69 #[inline]
70 pub(crate) fn with_capacity(
71 capacity: usize,
72 hasher: H,
73 vnodes_per_node: Vnid,
74 replication_factor: u8,
75 ) -> Self {
76 Self {
77 hasher,
78 vnodes_per_node,
79 replication_factor,
80 vnodes: Vec::with_capacity(capacity),
81 }
82 }
83
84 // First, initialize all vnodes for the given nodes into a new `BTreeSet`. Then, check whether
85 // any of them is already present in the current vnodes map to make sure no collision occurs.
86 // Finally, merge the new vnodes into the old ones.
87 //
88 // NOTE: If any of the newly created `VirtualNode`s collides with an already existing one,
89 // none of the new `nodes` is inserted in the ring.
90 pub(crate) fn insert(&mut self, nodes: &[Arc<N>]) -> Result<()> {
91 let mut new = BTreeSet::new();
92 for node in nodes {
93 for vnid in 0..self.vnodes_per_node {
94 let vn = VirtualNode::new(&mut self.hasher, Arc::clone(&node), vnid);
95 // We need to not only check whether vn is already in the ring, but also whether
96 // it is present among the vnodes we are about to extend the ring by.
97 if self.vnodes.binary_search(&vn).is_ok() || !new.insert(vn.clone()) {
98 // FIXME: How to avoid cloning the VirtualNode ^ but also be able to use it in:
99 return Err(HashRingError::VirtualNodeAlreadyExists(format!("{}", vn)));
100 }
101 //trace!("vnode '{}' has been included in the ring extension", vn);
102 }
103 }
104 // TODO: What happens with the reallocation here? It is completely uncontrolled for now.
105 self.vnodes.extend(new);
106 self.vnodes.sort_unstable();
107 self.fix_replica_owners();
108 Ok(())
109 }
110
111 pub(crate) fn remove(&mut self, nodes: &[Arc<N>]) -> Result<()> {
112 let mut removed_indices = BTreeSet::new();
113 let node_names = nodes
114 .iter()
115 .map(|node| node.hashring_node_id())
116 .collect::<Vec<_>>();
117 let max_name_len = node_names.iter().map(|name| name.len()).max().unwrap();
118
119 let mut name = Vec::with_capacity(max_name_len + mem::size_of::<Vnid>());
120 for node_name in node_names {
121 for vnid in 0..self.vnodes_per_node {
122 name.clear();
123 name.extend(&*node_name);
124 name.extend(&vnid.to_ne_bytes());
125 let vn = self.hasher.digest(&name);
126 if let Ok(index) = self.vnodes.binary_search_by(|e| e.name.cmp(&vn)) {
127 //trace!("Removing vnode '{:x?}' at index {}.", vn, index);
128 removed_indices.insert(index);
129 } else {
130 return Err(HashRingError::VirtualNodeDoesNotExist(format!("{:x?}", vn)));
131 }
132 }
133 }
134
135 // TODO: Return the removed vnodes or not? I guess it would be best if the output of
136 // `HashRing::remove` is consistent with the output of `HashRing::insert`.
137 let mut removed_vnodes = Vec::with_capacity(removed_indices.len());
138 // Indices must be visited in reverse (descending) order for the removal; otherwise, the
139 // indices of the virtual nodes to be removed in `self.vnodes` become invalid as they are
140 // all shifted towards the beginning of the vector on every removal.
141 for &index in removed_indices.iter().rev() {
142 let vn = self.vnodes.remove(index);
143 removed_vnodes.push(vn);
144 }
145 //assert!(self.vnodes.is_sorted());
146 self.fix_replica_owners();
147 //Ok(removed_vnodes) TODO
148 Ok(())
149 }
150
151 fn fix_replica_owners(&mut self) {
152 for i in 0..self.vnodes.len() {
153 // SAFETY: `i` is always in range `0..self.vnodes.len()`
154 let curr_vn = unsafe { self.vnodes.get_unchecked(i) };
155
156 let mut replica_owners = Vec::with_capacity(self.replication_factor as usize);
157 // Some capacity might be wasted here ^^ but we prefer it over reallocation.
158 let original_owner = &curr_vn.node;
159 replica_owners.push(Arc::clone(original_owner));
160
161 // Number of subsequent replica-owning nodes remaining to be found
162 let mut k = self.replication_factor - 1;
163
164 for (j, vn) in self
165 .vnodes
166 .iter()
167 .enumerate()
168 .cycle()
169 .skip((i + 1) % self.vnodes.len())
170 {
171 // If all replica owners for this vnode have been determined, break.
172 // Similarly, if we wrapped around the ring back to ourselves, break, even if k > 0
173 // (which would mean that replication_factor > # of distinct ring nodes).
174 if k == 0 || j == i {
175 break;
176 }
177 // Since we want distinct nodes only in `replica_owners`, make sure `vn.node` is
178 // not already in.
179 let mut node_already_in = false;
180 for node in &replica_owners {
181 if vn.node.hashring_node_id() == node.hashring_node_id() {
182 node_already_in = true;
183 break;
184 }
185 }
186 // If `vn.node` is not already in, get it in, and decrease the number of distinct
187 // nodes remaining to be found.
188 if !node_already_in {
189 replica_owners.push(Arc::clone(&vn.node));
190 k -= 1;
191 }
192 }
193
194 // Store the replica owners we just found for the current vnode, in the current vnode.
195 // SAFETY: `i` is always in range `0..self.vnodes.len()`
196 let mut curr_vn = unsafe { self.vnodes.get_unchecked_mut(i) };
197 curr_vn.replica_owners = Some(replica_owners);
198 }
199 }
200
201 #[inline]
202 pub(crate) fn len_nodes(&self) -> usize {
203 self.vnodes.len() / self.vnodes_per_node as usize
204 }
205
206 #[inline]
207 pub(crate) fn len_virtual_nodes(&self) -> usize {
208 self.vnodes.len()
209 }
210
211 pub(crate) fn has_virtual_node<K>(&self, key: &K) -> bool
212 where
213 K: Borrow<[u8]>,
214 {
215 self.vnodes
216 .binary_search_by(|vn| {
217 let name: &[u8] = &vn.name;
218 name.cmp(key.borrow())
219 })
220 .is_ok()
221 }
222
223 // returns a reference to the actual `VirtualNode` in `HashRingState.vnodes`
224 pub(crate) fn virtual_node_for_key<K>(&self, key: &K) -> Result<&VirtualNode<N>>
225 where
226 K: Borrow<[u8]>,
227 {
228 // Return an error if the ring is empty...
229 if self.vnodes.is_empty() {
230 return Err(HashRingError::EmptyRing);
231 }
232 // ...otherwise find the correct index and return the associated vnode.
233 let index = self
234 .vnodes
235 .binary_search_by(|vn| {
236 let name: &[u8] = &vn.name;
237 name.cmp(key.borrow())
238 })
239 .unwrap_or_else(|index| index)
240 % self.vnodes.len();
241 // SAFETY: The remainder of the above integer division is always a usize between `0` and
242 // `self.vnodes.len() - 1`, hence can be used as an index in `self.vnodes`.
243 Ok(unsafe { self.vnodes.get_unchecked(index) })
244 }
245
246 pub(crate) fn adjacent<K>(&self, adjacency: Adjacency, key: &K) -> Result<&VirtualNode<N>>
247 where
248 K: Borrow<[u8]>,
249 {
250 // Return an error if the ring is empty...
251 if self.vnodes.is_empty() {
252 return Err(HashRingError::EmptyRing);
253 }
254 // ...otherwise find the current index...
255 let index = self
256 .vnodes
257 .binary_search_by(|vn| {
258 let name: &[u8] = &vn.name;
259 name.cmp(key.borrow())
260 })
261 .unwrap_or_else(|index| index)
262 % self.vnodes.len();
263 // ...and return the adjacent one.
264 let index = match adjacency {
265 Adjacency::Predecessor => {
266 if 0 == index {
267 self.vnodes.len() - 1
268 } else {
269 index - 1
270 }
271 }
272 Adjacency::Successor => (index + 1) % self.vnodes.len(),
273 };
274 // SAFETY: The value of the index always stays within the range `0` to
275 // `self.vnodes.len() - 1`, hence can be used as an index in `self.vnodes`.
276 Ok(unsafe { self.vnodes.get_unchecked(index as usize) })
277 }
278
279 pub(crate) fn adjacent_node<K>(&self, adjacency: Adjacency, key: &K) -> Result<&VirtualNode<N>>
280 where
281 K: Borrow<[u8]>,
282 {
283 // Return an error if the ring is empty or has only one distinct node...
284 match self.vnodes.len() / self.vnodes_per_node as usize {
285 0 => {
286 return Err(HashRingError::EmptyRing);
287 }
288 1 => {
289 return Err(HashRingError::SingleDistinctNodeRing);
290 }
291 _ => (),
292 };
293
294 // ...otherwise find the current index...
295 let index = self
296 .vnodes
297 .binary_search_by(|vn| {
298 let name: &[u8] = &vn.name;
299 name.cmp(key.borrow())
300 })
301 .unwrap_or_else(|index| index)
302 % self.vnodes.len();
303 // ...and linearly search the vnode from there.
304 match adjacency {
305 Adjacency::Predecessor => {
306 let mut iter = self
307 .vnodes
308 .iter()
309 .rev()
310 .cycle()
311 .skip(self.vnodes.len() - index)
312 .skip_while(|&vn| {
313 trace!("checking {} ...", vn);
314 vn.node.hashring_node_id()
315 == unsafe { self.vnodes.get_unchecked(index) }
316 .node
317 .hashring_node_id()
318 });
319 iter.next()
320 }
321 Adjacency::Successor => {
322 let mut iter = self
323 .vnodes
324 .iter()
325 .cycle()
326 .skip((index + 1) % self.vnodes.len())
327 .skip_while(|&vn| {
328 trace!("checking {} ...", vn);
329 vn.node.hashring_node_id()
330 == unsafe { self.vnodes.get_unchecked(index) }
331 .node
332 .hashring_node_id()
333 });
334 iter.next()
335 }
336 }
337 .ok_or_else(|| unreachable!())
338 }
339}
340
341impl<N, H> Display for HashRingState<N, H>
342where
343 N: Node + ?Sized,
344 H: Hasher,
345{
346 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
347 writeln!(
348 f,
349 "HashRingState ({} nodes X {} virtual, replication factor = {}) {{",
350 self.len_nodes(),
351 self.vnodes_per_node,
352 self.replication_factor
353 )?;
354 for (i, vn) in self.vnodes.iter().enumerate() {
355 writeln!(f, "\t- ({:0>6}) {}", i, vn)?
356 }
357 writeln!(f, "}}")
358 }
359}