Skip to main content

ring_rs/
lib.rs

1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::io::Cursor;
4use std::rc::Rc;
5
6use blake2::{Blake2b, Digest};
7use byteorder::{LittleEndian, ReadBytesExt};
8
9#[derive(Debug)]
10pub struct Host {
11    name: String,
12    load: u64,
13}
14
15#[derive(Debug)]
16pub struct Config {
17    pub replication_factor: u64,
18    pub load: f64,
19}
20
21impl Default for Config {
22    fn default() -> Self {
23        Self {
24            replication_factor: 10,
25            load: 1.25,
26        }
27    }
28}
29
30pub struct Ring {
31    config: Config,
32
33    hashes: Vec<u64>,                                 // hashes sorted ascendingly
34    host_by_hash: HashMap<u64, Rc<RefCell<Host>>>,    // index host by hash
35    host_by_name: HashMap<String, Rc<RefCell<Host>>>, // index host by name
36    load: u64,                                        // the total load of ring
37}
38
39unsafe impl Send for Ring {}
40unsafe impl Sync for Ring {}
41
42impl Ring {
43    pub fn new(config: Config) -> Self {
44        Self {
45            config: config,
46            hashes: Default::default(),
47            host_by_hash: Default::default(),
48            host_by_name: Default::default(),
49            load: 0,
50        }
51    }
52
53    pub fn replication_factor(&self) -> u64 {
54        self.config.replication_factor
55    }
56
57    /// Adds a new host to the ring.
58    /// If the host already added, ignore.
59    pub fn add(&mut self, hostname: &str) {
60        if self.host_by_name.contains_key(hostname) {
61            return;
62        }
63
64        let host = Rc::new(RefCell::new(Host {
65            name: hostname.to_owned(),
66            load: 0,
67        }));
68
69        self.host_by_name.insert(hostname.to_owned(), host.clone());
70
71        for i in 0..self.replication_factor() {
72            let hash = Self::hash(&format!("{}{}", hostname, i));
73            self.host_by_hash.insert(hash, host.clone());
74            self.hashes.push(hash);
75        }
76
77        self.hashes.sort();
78    }
79
80    /// Removes host from the ring.
81    pub fn remove(&mut self, hostname: &str) {
82        for i in 0..self.replication_factor() {
83            let hash = Self::hash(&format!("{}{}", hostname, i));
84            self.host_by_hash.remove(&hash);
85            let idx = self.hashes.iter().position(|x| *x == hash).unwrap();
86            self.hashes.remove(idx);
87        }
88
89        self.host_by_name.remove(hostname);
90    }
91
92    /// Locates a host for the key.
93    pub fn get(&mut self, key: &str) -> Option<String> {
94        if self.host_by_hash.is_empty() {
95            return None;
96        }
97
98        let hash = Self::hash(key);
99        let idx = self.search(hash);
100        if let Some(host) = self.host_by_hash.get(&self.hashes[idx]) {
101            Some(host.borrow().name.clone())
102        } else {
103            None
104        }
105    }
106
107    /// Picks the least load host for the key.
108    pub fn get_least(&mut self, key: &str) -> Option<String> {
109        if self.host_by_hash.is_empty() {
110            return None;
111        }
112
113        let hash = Self::hash(key);
114        let avg_load = self.avg_load();
115
116        let mut idx = self.search(hash);
117        loop {
118            let host = self.host_by_hash.get(&self.hashes[idx]).unwrap();
119            if (host.borrow().load + 1) as f64 <= avg_load {
120                return Some(host.borrow().name.clone());
121            }
122            idx += 1;
123            if idx >= self.host_by_hash.len() {
124                idx = 0;
125            }
126        }
127    }
128
129    /// Lists all hosts in the ring.
130    pub fn hosts(&mut self) -> Vec<String> {
131        self.host_by_name.keys().cloned().into_iter().collect()
132    }
133
134    /// Sets the load of host to the given value.
135    pub fn set_load(&mut self, hostname: &str, load: u64) {
136        if let Some(host) = self.host_by_name.get(hostname) {
137            let mut host = host.borrow_mut();
138            self.load -= host.load;
139            host.load = load;
140            self.load += load;
141        }
142    }
143
144    /// Increments the load of host by 1.
145    pub fn inc_load(&mut self, hostname: &str) {
146        if let Some(host) = self.host_by_name.get(hostname) {
147            self.load += 1;
148            host.borrow_mut().load += 1;
149        }
150    }
151
152    /// Decrements the load of host by 1.
153    pub fn decr_load(&mut self, hostname: &str) {
154        if let Some(host) = self.host_by_name.get(hostname) {
155            self.load -= 1;
156            host.borrow_mut().load -= 1;
157        }
158    }
159
160    /// Gets the average load of ring.
161    pub fn avg_load(&self) -> f64 {
162        let mut load = (self.load + 1) as f64 / self.host_by_name.len() as f64;
163        if load == 0.0 {
164            load = 1.0;
165        }
166        (load * self.config.load).ceil()
167    }
168
169    fn search(&self, key: u64) -> usize {
170        for i in 0..self.hashes.len() {
171            let idx = self.hashes[i];
172            if idx >= key {
173                return i as usize;
174            }
175        }
176
177        0
178    }
179
180    /// Hashes key.
181    /// TODO(luncj): supports custom hasher.
182    fn hash(key: &str) -> u64 {
183        let hash = Blake2b::new().chain(key.as_bytes()).result();
184
185        let mut rdr = Cursor::new(hash);
186
187        rdr.read_u64::<LittleEndian>().unwrap()
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use super::Ring;
194
195    #[test]
196    fn ring_add() {
197        let mut r = Ring::new(Default::default());
198        r.add("1.1.1.1");
199
200        assert_eq!(r.replication_factor(), r.hashes.len() as u64);
201    }
202
203    #[test]
204    fn ring_get() {
205        let mut r = Ring::new(Default::default());
206        r.add("1.1.1.1");
207        let host = r.get("1.1.1.1");
208
209        assert!(host.is_some());
210        assert_eq!("1.1.1.1", host.unwrap());
211    }
212
213    #[test]
214    fn ring_remove() {
215        let mut r = Ring::new(Default::default());
216        r.add("1.1.1.1");
217        r.remove("1.1.1.1");
218
219        assert!(r.hashes.is_empty());
220        assert!(r.hosts().is_empty());
221    }
222}