lock_free/
hashmap_ultra.rs1use std::sync::atomic::{AtomicU64, AtomicU32, Ordering};
10use std::hash::{Hash, Hasher};
11use std::collections::hash_map::DefaultHasher;
12use std::marker::PhantomData;
13use std::mem::MaybeUninit;
14use std::ptr;
15use std::alloc::{alloc_zeroed, dealloc, Layout};
16
17const CACHE_LINE: usize = 64;
19
20const BUCKET_SIZE: usize = 8;
22
23const DEFAULT_CAPACITY: usize = 1024;
25
26const EMPTY: u8 = 0;
28const OCCUPIED: u8 = 1;
29const DELETED: u8 = 2;
30
31#[repr(C, align(64))]
33struct Bucket<K, V> {
34 states: AtomicU32,
36 hashes: [AtomicU32; BUCKET_SIZE],
38 keys: [MaybeUninit<K>; BUCKET_SIZE],
40 values: [MaybeUninit<V>; BUCKET_SIZE],
41}
42
43impl<K, V> Bucket<K, V> {
44 fn new() -> Self {
45 unsafe {
46 let mut bucket = MaybeUninit::<Self>::uninit();
47 let bucket_ptr = bucket.as_mut_ptr();
48
49 ptr::write(&mut (*bucket_ptr).states, AtomicU32::new(0));
51
52 for i in 0..BUCKET_SIZE {
54 ptr::write(&mut (*bucket_ptr).hashes[i], AtomicU32::new(0));
55 }
56
57 bucket.assume_init()
60 }
61 }
62
63 #[inline]
64 fn get_state(&self, idx: usize) -> u8 {
65 let states = self.states.load(Ordering::Acquire);
66 ((states >> (idx * 2)) & 0b11) as u8
67 }
68
69 #[inline]
70 fn set_state(&self, idx: usize, state: u8) -> bool {
71 let shift = idx * 2;
72 let mask = 0b11 << shift;
73 let new_bits = (state as u32) << shift;
74
75 loop {
76 let old = self.states.load(Ordering::Acquire);
77 let new = (old & !mask) | new_bits;
78
79 match self.states.compare_exchange_weak(
80 old, new,
81 Ordering::Release,
82 Ordering::Acquire
83 ) {
84 Ok(_) => return true,
85 Err(_) => continue,
86 }
87 }
88 }
89
90 #[inline]
92 fn find_entry(&self, hash: u32) -> Option<usize> {
93 let h1 = self.hashes[0].load(Ordering::Acquire);
95 let h2 = self.hashes[1].load(Ordering::Acquire);
96 let h3 = self.hashes[2].load(Ordering::Acquire);
97 let h4 = self.hashes[3].load(Ordering::Acquire);
98 let h5 = self.hashes[4].load(Ordering::Acquire);
99 let h6 = self.hashes[5].load(Ordering::Acquire);
100 let h7 = self.hashes[6].load(Ordering::Acquire);
101 let h8 = self.hashes[7].load(Ordering::Acquire);
102
103 let mask =
105 ((h1 == hash) as usize) << 0 |
106 ((h2 == hash) as usize) << 1 |
107 ((h3 == hash) as usize) << 2 |
108 ((h4 == hash) as usize) << 3 |
109 ((h5 == hash) as usize) << 4 |
110 ((h6 == hash) as usize) << 5 |
111 ((h7 == hash) as usize) << 6 |
112 ((h8 == hash) as usize) << 7;
113
114 if mask != 0 {
115 Some(mask.trailing_zeros() as usize)
116 } else {
117 None
118 }
119 }
120
121 #[inline]
123 fn find_empty(&self) -> Option<usize> {
124 let states = self.states.load(Ordering::Acquire);
125
126 for i in 0..BUCKET_SIZE {
128 if ((states >> (i * 2)) & 0b11) == EMPTY as u32 {
129 return Some(i);
130 }
131 }
132 None
133 }
134}
135
136pub struct UltraHashMap<K, V> {
138 buckets: *mut Bucket<K, V>,
140 bucket_count: usize,
142 mask: usize,
144 size: AtomicU64,
146 layout: Layout,
148 _phantom: PhantomData<(K, V)>,
150}
151
152impl<K: Hash + Eq + Clone, V: Clone> UltraHashMap<K, V> {
153 pub fn new() -> Self {
155 Self::with_capacity(DEFAULT_CAPACITY)
156 }
157
158 pub fn with_capacity(capacity: usize) -> Self {
160 let bucket_count = (capacity / BUCKET_SIZE).next_power_of_two().max(16);
161 let layout = Layout::array::<Bucket<K, V>>(bucket_count).unwrap();
162
163 let buckets = unsafe {
164 let ptr = alloc_zeroed(layout) as *mut Bucket<K, V>;
165
166 for i in 0..bucket_count {
168 ptr::write(ptr.add(i), Bucket::new());
169 }
170
171 ptr
172 };
173
174 Self {
175 buckets,
176 bucket_count,
177 mask: bucket_count - 1,
178 size: AtomicU64::new(0),
179 layout,
180 _phantom: PhantomData,
181 }
182 }
183
184 #[inline]
186 fn hash_key(key: &K) -> u32 {
187 let mut hasher = DefaultHasher::new();
188 key.hash(&mut hasher);
189 (hasher.finish() >> 32) as u32
190 }
191
192 pub fn insert(&self, key: K, value: V) -> Option<V> {
194 let hash = Self::hash_key(&key);
195 let mut bucket_idx = (hash as usize) & self.mask;
196 let mut distance = 0;
197
198 let mut curr_key = key;
200 let mut curr_value = value;
201 let mut curr_hash = hash;
202
203 loop {
204 let bucket = unsafe { &*self.buckets.add(bucket_idx) };
205
206 if let Some(idx) = bucket.find_entry(curr_hash) {
208 if bucket.get_state(idx) == OCCUPIED {
209 let stored_key = unsafe { &*bucket.keys[idx].as_ptr() };
210 if stored_key == &curr_key {
211 let old_value = unsafe {
213 ptr::read(bucket.values[idx].as_ptr())
214 };
215 unsafe {
216 let bucket_ptr = self.buckets.add(bucket_idx);
217 ptr::write((*bucket_ptr).values[idx].as_mut_ptr(), curr_value);
218 }
219 return Some(old_value);
220 }
221 }
222 }
223
224 if let Some(idx) = bucket.find_empty() {
226 bucket.hashes[idx].store(curr_hash, Ordering::Release);
228 unsafe {
229 let bucket_ptr = self.buckets.add(bucket_idx);
230 ptr::write((*bucket_ptr).keys[idx].as_mut_ptr(), curr_key);
231 ptr::write((*bucket_ptr).values[idx].as_mut_ptr(), curr_value);
232 }
233 bucket.set_state(idx, OCCUPIED);
234 self.size.fetch_add(1, Ordering::Relaxed);
235 return None;
236 }
237
238 let mut should_displace = false;
240 let mut displace_idx = 0;
241
242 for i in 0..BUCKET_SIZE {
243 if bucket.get_state(i) == OCCUPIED {
244 let other_hash = bucket.hashes[i].load(Ordering::Acquire);
245 let other_distance = bucket_idx.wrapping_sub((other_hash as usize) & self.mask);
246
247 if distance > other_distance {
248 should_displace = true;
249 displace_idx = i;
250 break;
251 }
252 }
253 }
254
255 if should_displace {
256 let old_hash = bucket.hashes[displace_idx].swap(curr_hash, Ordering::AcqRel);
258 let old_key = unsafe {
259 let bucket_ptr = self.buckets.add(bucket_idx);
260 ptr::replace((*bucket_ptr).keys[displace_idx].as_mut_ptr(), curr_key)
261 };
262 let old_value = unsafe {
263 let bucket_ptr = self.buckets.add(bucket_idx);
264 ptr::replace((*bucket_ptr).values[displace_idx].as_mut_ptr(), curr_value)
265 };
266
267 curr_hash = old_hash;
268 curr_key = old_key;
269 curr_value = old_value;
270 distance = bucket_idx.wrapping_sub((curr_hash as usize) & self.mask);
271 }
272
273 bucket_idx = (bucket_idx + 1) & self.mask;
275 distance += 1;
276
277 if distance > self.bucket_count {
279 return None; }
281 }
282 }
283
284 pub fn get(&self, key: &K) -> Option<V> {
286 let hash = Self::hash_key(key);
287 let mut bucket_idx = (hash as usize) & self.mask;
288 let mut distance = 0;
289
290 loop {
291 let bucket = unsafe { &*self.buckets.add(bucket_idx) };
292
293 if let Some(idx) = bucket.find_entry(hash) {
295 if bucket.get_state(idx) == OCCUPIED {
296 let stored_key = unsafe { &*bucket.keys[idx].as_ptr() };
297 if stored_key == key {
298 let value = unsafe { &*bucket.values[idx].as_ptr() };
299 return Some(value.clone());
300 }
301 }
302 }
303
304 let mut max_distance = 0;
306 for i in 0..BUCKET_SIZE {
307 if bucket.get_state(i) == OCCUPIED {
308 let other_hash = bucket.hashes[i].load(Ordering::Acquire);
309 let other_distance = bucket_idx.wrapping_sub((other_hash as usize) & self.mask);
310 max_distance = max_distance.max(other_distance);
311 }
312 }
313
314 if distance > max_distance {
315 return None; }
317
318 bucket_idx = (bucket_idx + 1) & self.mask;
319 distance += 1;
320
321 if distance > self.bucket_count {
322 return None;
323 }
324 }
325 }
326
327 pub fn remove(&self, key: &K) -> Option<V> {
329 let hash = Self::hash_key(key);
330 let mut bucket_idx = (hash as usize) & self.mask;
331 let mut distance = 0;
332
333 loop {
334 let bucket = unsafe { &*self.buckets.add(bucket_idx) };
335
336 if let Some(idx) = bucket.find_entry(hash) {
337 if bucket.get_state(idx) == OCCUPIED {
338 let stored_key = unsafe { &*bucket.keys[idx].as_ptr() };
339 if stored_key == key {
340 bucket.set_state(idx, DELETED);
342 bucket.hashes[idx].store(0, Ordering::Release);
343
344 let value = unsafe {
345 ptr::read(bucket.values[idx].as_ptr())
346 };
347
348 self.size.fetch_sub(1, Ordering::Relaxed);
349 return Some(value);
350 }
351 }
352 }
353
354 let mut max_distance = 0;
356 for i in 0..BUCKET_SIZE {
357 if bucket.get_state(i) == OCCUPIED {
358 let other_hash = bucket.hashes[i].load(Ordering::Acquire);
359 let other_distance = bucket_idx.wrapping_sub((other_hash as usize) & self.mask);
360 max_distance = max_distance.max(other_distance);
361 }
362 }
363
364 if distance > max_distance {
365 return None;
366 }
367
368 bucket_idx = (bucket_idx + 1) & self.mask;
369 distance += 1;
370
371 if distance > self.bucket_count {
372 return None;
373 }
374 }
375 }
376
377 #[inline]
379 pub fn contains_key(&self, key: &K) -> bool {
380 self.get(key).is_some()
381 }
382
383 #[inline]
385 pub fn len(&self) -> usize {
386 self.size.load(Ordering::Relaxed) as usize
387 }
388
389 #[inline]
391 pub fn is_empty(&self) -> bool {
392 self.len() == 0
393 }
394}
395
396unsafe impl<K: Send, V: Send> Send for UltraHashMap<K, V> {}
397unsafe impl<K: Send + Sync, V: Send + Sync> Sync for UltraHashMap<K, V> {}
398
399impl<K, V> Drop for UltraHashMap<K, V> {
400 fn drop(&mut self) {
401 unsafe {
402 for i in 0..self.bucket_count {
404 let bucket = self.buckets.add(i);
405 for j in 0..BUCKET_SIZE {
406 if (*bucket).get_state(j) == OCCUPIED {
407 ptr::drop_in_place((*bucket).keys[j].as_mut_ptr());
408 ptr::drop_in_place((*bucket).values[j].as_mut_ptr());
409 }
410 }
411 }
412
413 dealloc(self.buckets as *mut u8, self.layout);
415 }
416 }
417}