1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::io::Cursor;
4use std::rc::Rc;
5
6use blake2::{Blake2b, Digest};
7use byteorder::{LittleEndian, ReadBytesExt};
8
9#[derive(Debug)]
10pub struct Host {
11 name: String,
12 load: u64,
13}
14
15#[derive(Debug)]
16pub struct Config {
17 pub replication_factor: u64,
18 pub load: f64,
19}
20
21impl Default for Config {
22 fn default() -> Self {
23 Self {
24 replication_factor: 10,
25 load: 1.25,
26 }
27 }
28}
29
30pub struct Ring {
31 config: Config,
32
33 hashes: Vec<u64>, host_by_hash: HashMap<u64, Rc<RefCell<Host>>>, host_by_name: HashMap<String, Rc<RefCell<Host>>>, load: u64, }
38
39unsafe impl Send for Ring {}
40unsafe impl Sync for Ring {}
41
42impl Ring {
43 pub fn new(config: Config) -> Self {
44 Self {
45 config: config,
46 hashes: Default::default(),
47 host_by_hash: Default::default(),
48 host_by_name: Default::default(),
49 load: 0,
50 }
51 }
52
53 pub fn replication_factor(&self) -> u64 {
54 self.config.replication_factor
55 }
56
57 pub fn add(&mut self, hostname: &str) {
60 if self.host_by_name.contains_key(hostname) {
61 return;
62 }
63
64 let host = Rc::new(RefCell::new(Host {
65 name: hostname.to_owned(),
66 load: 0,
67 }));
68
69 self.host_by_name.insert(hostname.to_owned(), host.clone());
70
71 for i in 0..self.replication_factor() {
72 let hash = Self::hash(&format!("{}{}", hostname, i));
73 self.host_by_hash.insert(hash, host.clone());
74 self.hashes.push(hash);
75 }
76
77 self.hashes.sort();
78 }
79
80 pub fn remove(&mut self, hostname: &str) {
82 for i in 0..self.replication_factor() {
83 let hash = Self::hash(&format!("{}{}", hostname, i));
84 self.host_by_hash.remove(&hash);
85 let idx = self.hashes.iter().position(|x| *x == hash).unwrap();
86 self.hashes.remove(idx);
87 }
88
89 self.host_by_name.remove(hostname);
90 }
91
92 pub fn get(&mut self, key: &str) -> Option<String> {
94 if self.host_by_hash.is_empty() {
95 return None;
96 }
97
98 let hash = Self::hash(key);
99 let idx = self.search(hash);
100 if let Some(host) = self.host_by_hash.get(&self.hashes[idx]) {
101 Some(host.borrow().name.clone())
102 } else {
103 None
104 }
105 }
106
107 pub fn get_least(&mut self, key: &str) -> Option<String> {
109 if self.host_by_hash.is_empty() {
110 return None;
111 }
112
113 let hash = Self::hash(key);
114 let avg_load = self.avg_load();
115
116 let mut idx = self.search(hash);
117 loop {
118 let host = self.host_by_hash.get(&self.hashes[idx]).unwrap();
119 if (host.borrow().load + 1) as f64 <= avg_load {
120 return Some(host.borrow().name.clone());
121 }
122 idx += 1;
123 if idx >= self.host_by_hash.len() {
124 idx = 0;
125 }
126 }
127 }
128
129 pub fn hosts(&mut self) -> Vec<String> {
131 self.host_by_name.keys().cloned().into_iter().collect()
132 }
133
134 pub fn set_load(&mut self, hostname: &str, load: u64) {
136 if let Some(host) = self.host_by_name.get(hostname) {
137 let mut host = host.borrow_mut();
138 self.load -= host.load;
139 host.load = load;
140 self.load += load;
141 }
142 }
143
144 pub fn inc_load(&mut self, hostname: &str) {
146 if let Some(host) = self.host_by_name.get(hostname) {
147 self.load += 1;
148 host.borrow_mut().load += 1;
149 }
150 }
151
152 pub fn decr_load(&mut self, hostname: &str) {
154 if let Some(host) = self.host_by_name.get(hostname) {
155 self.load -= 1;
156 host.borrow_mut().load -= 1;
157 }
158 }
159
160 pub fn avg_load(&self) -> f64 {
162 let mut load = (self.load + 1) as f64 / self.host_by_name.len() as f64;
163 if load == 0.0 {
164 load = 1.0;
165 }
166 (load * self.config.load).ceil()
167 }
168
169 fn search(&self, key: u64) -> usize {
170 for i in 0..self.hashes.len() {
171 let idx = self.hashes[i];
172 if idx >= key {
173 return i as usize;
174 }
175 }
176
177 0
178 }
179
180 fn hash(key: &str) -> u64 {
183 let hash = Blake2b::new().chain(key.as_bytes()).result();
184
185 let mut rdr = Cursor::new(hash);
186
187 rdr.read_u64::<LittleEndian>().unwrap()
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use super::Ring;
194
195 #[test]
196 fn ring_add() {
197 let mut r = Ring::new(Default::default());
198 r.add("1.1.1.1");
199
200 assert_eq!(r.replication_factor(), r.hashes.len() as u64);
201 }
202
203 #[test]
204 fn ring_get() {
205 let mut r = Ring::new(Default::default());
206 r.add("1.1.1.1");
207 let host = r.get("1.1.1.1");
208
209 assert!(host.is_some());
210 assert_eq!("1.1.1.1", host.unwrap());
211 }
212
213 #[test]
214 fn ring_remove() {
215 let mut r = Ring::new(Default::default());
216 r.add("1.1.1.1");
217 r.remove("1.1.1.1");
218
219 assert!(r.hashes.is_empty());
220 assert!(r.hosts().is_empty());
221 }
222}