consistent_rs/
consistant.rs

1use std::default::Default;
2use std::rc::Rc;
3use std::iter::Iterator;
4use std::collections::hash_map::HashMap;
5use std::sync::RwLock;
6use crc::crc32::checksum_ieee;
7
8/// Consistant holds the information of the hash ring.
9#[derive(Debug)]
10pub struct Consistant {
11    pub replicas_num: usize,
12
13    circle: HashMap<u32, Rc<String>>,
14    members: HashMap<Rc<String>, ()>,
15    sorted_keys: Vec<u32>,
16    lock: RwLock<()>,
17}
18
19impl Default for Consistant {
20    fn default() -> Consistant {
21        Consistant {
22            replicas_num: 20,
23            circle: HashMap::new(),
24            members: HashMap::new(),
25            sorted_keys: Vec::new(),
26            lock: RwLock::new(()),
27        }
28    }
29}
30
31impl Consistant {
32    /// Crete a new instance of Consistant.
33    pub fn new(replicas_num: usize) -> Self {
34        Consistant {
35            replicas_num: replicas_num,
36            circle: HashMap::new(),
37            members: HashMap::new(),
38            sorted_keys: Vec::new(),
39            lock: RwLock::new(()),
40        }
41    }
42
43    /// Get the count of added elements.
44    pub fn count(&self) -> usize {
45        let _ = self.lock.read().expect("rLock");
46        self.members.len()
47    }
48
49    /// Add an elment to the hash ring.
50    pub fn add<S: Into<String>>(&mut self, element: S) {
51        let _ = self.lock.write().expect("wLock");
52        let s = &Rc::new(element.into());
53        if self.contains(s) {
54            return;
55        }
56
57        for i in 0..self.replicas_num {
58            let sum = checksum_ieee(Self::generate_element_name(s, i).as_bytes());
59            self.circle.insert(sum, s.clone());
60            self.sorted_keys.push(sum)
61        }
62
63        self.members.insert(s.clone(), ());
64        self.sorted_keys.sort();
65    }
66
67    /// Get the cloeset element's name to the given "name".
68    pub fn get<S: Into<String>>(&self, name: S) -> Option<String> {
69        let _ = self.lock.read().expect("rLock");
70        if self.circle.len() == 0 {
71            return None;
72        }
73        let key = self.sorted_keys[self.get_key_index(checksum_ieee(name.into().as_bytes()))];
74
75        Some(self.get_i_from_circle(key))
76    }
77
78    /// Get the N cloeset elements' names to the given "name".
79    pub fn get_n<S: Into<String>>(&self, name: S, n: usize) -> Option<Vec<String>> {
80        let _ = self.lock.read().expect("rLock");
81        if n == 0 || self.circle.len() == 0 {
82            return None;
83        }
84        let count = if self.count() > n { n } else { self.count() };
85        let mut start = self.get_key_index(checksum_ieee(name.into().as_bytes()));
86        let mut element = self.get_i_from_circle(self.sorted_keys[start]);
87
88        let mut res = Vec::with_capacity(count);
89        res.push(element);
90
91        loop {
92            start = start + 1;
93            if start >= self.sorted_keys.len() {
94                start = 0;
95            }
96            element = self.get_i_from_circle(self.sorted_keys[start]);
97            if !res.contains(&element) {
98                res.push(element)
99            }
100
101            if res.len() == count {
102                break;
103            }
104        }
105
106        Some(res)
107    }
108
109    /// Remove the given element.
110    pub fn remove<S: Into<String>>(&mut self, name: S) {
111        let _ = self.lock.write().expect("wLock");
112        let s = &Rc::new(name.into());
113        if !self.contains(s) {
114            return;
115        }
116
117        for i in 0..self.replicas_num {
118            let sum = &checksum_ieee(Self::generate_element_name(s, i).as_bytes());
119            self.circle.remove(sum);
120
121            match self.sorted_keys.iter().position(|key| key.eq(sum)) {
122                Some(index) => self.sorted_keys.remove(index),
123                None => unreachable!(),
124            };
125        }
126
127        self.members.remove(s);
128    }
129
130    #[inline]
131    fn get_i_from_circle(&self, i: u32) -> String {
132        match self.circle.get(&i) {
133            Some(rc) => (**rc).clone(),
134            None => unreachable!(),
135        }
136    }
137
138    #[inline]
139    fn contains(&self, name: &Rc<String>) -> bool {
140        self.members.contains_key(name)
141    }
142
143    #[inline]
144    fn get_key_index(&self, sum: u32) -> usize {
145        let iter = (&self.sorted_keys).into_iter();
146
147        for (i, key) in iter.enumerate() {
148            if sum < *key {
149                return i;
150            }
151        }
152
153        0
154    }
155
156    #[inline]
157    fn generate_element_name(element: &str, i: usize) -> String {
158        String::from(element) + &i.to_string()
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165
166    #[test]
167    fn test_default() {
168        let consistant = Consistant::default();
169
170        assert_eq!(consistant.replicas_num, 20);
171        assert_eq!(consistant.circle.len(), 0);
172        assert_eq!(consistant.sorted_keys.len(), 0);
173    }
174
175    #[test]
176    fn test_new() {
177        let consistant = Consistant::new(30);
178
179        assert_eq!(consistant.replicas_num, 30);
180        assert_eq!(consistant.circle.len(), 0);
181        assert_eq!(consistant.sorted_keys.len(), 0);
182    }
183
184    #[test]
185    fn test_count() {
186        let mut consistant = Consistant::default();
187        consistant.add("cacheA");
188        consistant.add("cacheB");
189        consistant.add("cacheC");
190        assert_eq!(consistant.count(), 3);
191    }
192
193    #[test]
194    fn test_add() {
195        let mut consistant = Consistant::default();
196        consistant.add("cacheA");
197        consistant.add("cacheB");
198        consistant.add("cacheC");
199
200        assert_eq!(consistant.circle.len(), 3 * consistant.replicas_num);
201        assert_eq!(consistant.sorted_keys.len(), 3 * consistant.replicas_num);
202    }
203
204    #[test]
205    fn test_contains() {
206        let mut consistant = Consistant::default();
207        consistant.add("cacheA");
208
209        assert_eq!(consistant.contains(&Rc::new(String::from("cacheA"))), true);
210        assert_eq!(consistant.contains(&Rc::new(String::from("cacheB"))), false);
211        assert_eq!(consistant.contains(&Rc::new(String::from("CachEa"))), false);
212    }
213
214    #[test]
215    fn test_get() {
216        let mut consistant = Consistant::default();
217        consistant.add("cacheA");
218        consistant.add("cacheB");
219        consistant.add("cacheC");
220
221        assert_eq!(consistant.get("david").unwrap(),
222                   consistant.get("david").unwrap());
223        assert_eq!(consistant.get("kally").unwrap(),
224                   consistant.get("kally").unwrap());
225        assert_eq!(consistant.get("jason").unwrap(),
226                   consistant.get("jason").unwrap());
227    }
228
229    #[test]
230    fn test_get_n() {
231        let mut consistant = Consistant::default();
232        consistant.add("cacheA");
233        consistant.add("cacheB");
234        consistant.add("cacheC");
235
236        let res = consistant.get_n("david", 3).unwrap();
237        assert_eq!(res.len(), 3);
238
239        consistant.remove("cacheA");
240
241        let res2 = consistant.get_n("david", 3).unwrap();
242        assert_eq!(res2.len(), 2);
243    }
244
245    #[test]
246    fn test_remove() {
247        let mut consistant = Consistant::default();
248        consistant.add("cacheA");
249        consistant.add("cacheB");
250        consistant.add("cacheC");
251
252        consistant.remove("cacheC");
253        assert_eq!(consistant.count(), 2);
254
255        assert!(consistant.get("david").unwrap() != String::from("cacheC"));
256        assert!(consistant.get("kally").unwrap() != String::from("cacheC"));
257        assert!(consistant.get("jason").unwrap() != String::from("cacheC"));
258    }
259}
260