orlando_cluster/
hash_ring.rs1use std::collections::{BTreeMap, HashSet};
2
3#[derive(Clone, Debug, PartialEq, Eq, Hash)]
4pub struct SiloAddress {
5 pub host: String,
6 pub port: u16,
7 pub silo_id: String,
8}
9
10impl SiloAddress {
11 pub fn endpoint(&self) -> String {
12 format!("{}:{}", self.host, self.port)
13 }
14}
15
16#[derive(Clone)]
17pub struct HashRing {
18 ring: BTreeMap<u64, SiloAddress>,
19 virtual_nodes: u32,
20}
21
22impl HashRing {
23 pub fn new(virtual_nodes: u32) -> Self {
24 Self {
25 ring: BTreeMap::new(),
26 virtual_nodes,
27 }
28 }
29
30 pub fn add(&mut self, silo: SiloAddress) {
31 for i in 0..self.virtual_nodes {
32 let key = hash_key(&format!("{}:{}", silo.silo_id, i));
33 self.ring.insert(key, silo.clone());
34 }
35 }
36
37 pub fn remove(&mut self, silo: &SiloAddress) {
38 for i in 0..self.virtual_nodes {
39 let key = hash_key(&format!("{}:{}", silo.silo_id, i));
40 self.ring.remove(&key);
41 }
42 }
43
44 pub fn get(&self, grain_key: &str) -> Option<&SiloAddress> {
45 if self.ring.is_empty() {
46 return None;
47 }
48 let key = hash_key(grain_key);
49 self.ring
50 .range(key..)
51 .next()
52 .or_else(|| self.ring.iter().next())
53 .map(|(_, v)| v)
54 }
55
56 pub fn members(&self) -> Vec<SiloAddress> {
57 let mut seen_ids = HashSet::new();
58 let mut result = Vec::new();
59 for silo in self.ring.values() {
60 if seen_ids.insert(&silo.silo_id) {
61 result.push(silo.clone());
62 }
63 }
64 result
65 }
66
67 pub fn is_empty(&self) -> bool {
68 self.ring.is_empty()
69 }
70}
71
72fn hash_key(key: &str) -> u64 {
76 let mut hash: u64 = 0xcbf29ce484222325; for byte in key.as_bytes() {
78 hash ^= *byte as u64;
79 hash = hash.wrapping_mul(0x100000001b3); }
81 hash
82}
83
84#[cfg(test)]
85mod tests {
86 use super::*;
87
88 #[test]
89 fn consistent_placement() {
90 let mut ring = HashRing::new(150);
91 ring.add(SiloAddress {
92 host: "127.0.0.1".into(),
93 port: 5001,
94 silo_id: "silo-a".into(),
95 });
96 ring.add(SiloAddress {
97 host: "127.0.0.1".into(),
98 port: 5002,
99 silo_id: "silo-b".into(),
100 });
101
102 let first = ring.get("my-grain/key-1").unwrap().silo_id.clone();
104 let second = ring.get("my-grain/key-1").unwrap().silo_id.clone();
105 assert_eq!(first, second);
106 }
107
108 #[test]
109 fn distributes_across_silos() {
110 let mut ring = HashRing::new(150);
111 ring.add(SiloAddress {
112 host: "127.0.0.1".into(),
113 port: 5001,
114 silo_id: "silo-a".into(),
115 });
116 ring.add(SiloAddress {
117 host: "127.0.0.1".into(),
118 port: 5002,
119 silo_id: "silo-b".into(),
120 });
121
122 let mut a_count = 0;
123 let mut b_count = 0;
124 for i in 0..100 {
125 let target = ring.get(&format!("grain/{}", i)).unwrap();
126 if target.silo_id == "silo-a" {
127 a_count += 1;
128 } else {
129 b_count += 1;
130 }
131 }
132
133 assert!(a_count > 5, "silo-a got {a_count} grains, expected > 5");
135 assert!(b_count > 5, "silo-b got {b_count} grains, expected > 5");
136 }
137
138 #[test]
139 fn remove_silo() {
140 let mut ring = HashRing::new(150);
141 let silo_a = SiloAddress {
142 host: "127.0.0.1".into(),
143 port: 5001,
144 silo_id: "silo-a".into(),
145 };
146 let silo_b = SiloAddress {
147 host: "127.0.0.1".into(),
148 port: 5002,
149 silo_id: "silo-b".into(),
150 };
151 ring.add(silo_a.clone());
152 ring.add(silo_b);
153
154 ring.remove(&silo_a);
155
156 for i in 0..20 {
158 let target = ring.get(&format!("grain/{}", i)).unwrap();
159 assert_eq!(target.silo_id, "silo-b");
160 }
161 }
162}