loadwise_core/strategy/
consistent_hash.rs1use std::collections::BTreeMap;
2use std::hash::{DefaultHasher, Hash, Hasher};
3use std::sync::Mutex;
4
5use super::{node_set_fingerprint, SelectionContext, Strategy};
6use crate::Node;
7
8#[derive(Debug, bon::Builder)]
42pub struct ConsistentHash {
43 #[builder(default = 150)]
47 replicas: usize,
48 #[builder(skip = Mutex::new(None))]
49 cache: Mutex<Option<CachedRing>>,
50}
51
52#[derive(Debug)]
53struct CachedRing {
54 fingerprint: u64,
55 ring: BTreeMap<u64, usize>,
56}
57
58impl Default for ConsistentHash {
59 fn default() -> Self {
60 Self::builder().build()
61 }
62}
63
64fn build_ring<N: Node>(candidates: &[N], replicas: usize) -> BTreeMap<u64, usize> {
65 let mut ring = BTreeMap::new();
66 for (i, node) in candidates.iter().enumerate() {
67 for replica in 0..replicas {
68 let mut hasher = DefaultHasher::new();
69 node.id().hash(&mut hasher);
70 replica.hash(&mut hasher);
71 ring.insert(hasher.finish(), i);
72 }
73 }
74 ring
75}
76
77impl<N: Node> Strategy<N> for ConsistentHash {
78 fn select(&self, candidates: &[N], ctx: &SelectionContext) -> Option<usize> {
79 if candidates.is_empty() {
80 return None;
81 }
82
83 let request_hash = ctx.hash_key.unwrap_or(0);
84 let fingerprint = node_set_fingerprint(candidates);
85
86 let mut cache = self.cache.lock().unwrap();
87
88 let ring = match cache.as_ref() {
90 Some(cached) if cached.fingerprint == fingerprint => &cached.ring,
91 _ => {
92 *cache = Some(CachedRing {
93 fingerprint,
94 ring: build_ring(candidates, self.replicas),
95 });
96 &cache.as_ref().unwrap().ring
97 }
98 };
99
100 ring.range(request_hash..)
102 .chain(ring.iter())
103 .find(|(_, idx)| !ctx.is_excluded(**idx))
104 .map(|(_, &idx)| idx)
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 use super::*;
111
112 struct N(String);
113 impl crate::Node for N {
114 type Id = String;
115 fn id(&self) -> &String {
116 &self.0
117 }
118 }
119
120 #[test]
121 fn consistent_for_same_key() {
122 let ch = ConsistentHash::default();
123 let nodes = [N("a".into()), N("b".into()), N("c".into())];
124 let ctx = SelectionContext::builder().hash_key(42).build();
125
126 let first = ch.select(&nodes, &ctx);
127 let second = ch.select(&nodes, &ctx);
128 assert_eq!(first, second);
129 }
130
131 #[test]
132 fn different_keys_can_hit_different_nodes() {
133 use std::hash::{DefaultHasher, Hash, Hasher};
134
135 let ch = ConsistentHash::default();
136 let nodes = [N("a".into()), N("b".into()), N("c".into())];
137
138 let results: std::collections::HashSet<usize> = (0..1000)
139 .map(|i| {
140 let mut hasher = DefaultHasher::new();
141 i.hash(&mut hasher);
142 let ctx = SelectionContext::builder().hash_key(hasher.finish()).build();
143 ch.select(&nodes, &ctx).unwrap()
144 })
145 .collect();
146
147 assert!(results.len() >= 2);
148 }
149
150 #[test]
151 fn ring_is_cached() {
152 let ch = ConsistentHash::builder().replicas(10).build();
153 let nodes = [N("a".into()), N("b".into())];
154 let ctx = SelectionContext::builder().hash_key(100).build();
155
156 let r1 = ch.select(&nodes, &ctx);
158 let r2 = ch.select(&nodes, &ctx);
160 assert_eq!(r1, r2);
161
162 let nodes2 = [N("a".into()), N("b".into()), N("c".into())];
164 let _ = ch.select(&nodes2, &ctx);
165
166 let r3 = ch.select(&nodes, &ctx);
168 assert_eq!(r1, r3);
169 }
170
171 #[test]
172 fn skips_excluded_in_ring() {
173 let ch = ConsistentHash::default();
174 let nodes = [N("a".into()), N("b".into()), N("c".into())];
175 let ctx = SelectionContext::builder().hash_key(42).build();
176
177 let normal = ch.select(&nodes, &ctx).unwrap();
178 let excluded_ctx = SelectionContext::builder()
179 .hash_key(42)
180 .exclude(vec![normal])
181 .build();
182 let fallback = ch.select(&nodes, &excluded_ctx);
183 assert!(fallback.is_some());
184 assert_ne!(fallback.unwrap(), normal);
185 }
186}