Skip to main content

kovan_map/
hashmap.rs

1//! High-Performance Lock-Free Concurrent Hash Map (FoldHash + Large Table).
2//!
3//! # Strategy for Beating Dashmap
4//!
5//! 1. **FoldHash**: We use `foldhash::fast::FixedState` for blazing fast, quality hashing.
6//!    This replaces the custom FxHash implementation.
7//! 2. **Oversized Bucket Array**: We bump buckets to 524,288.
8//!    - Memory footprint: ~4MB (Fits in L3 cache).
9//!    - Load Factor at 100k items: ~0.19.
10//!    - Result: almost ZERO collisions. Lookups become a single pointer dereference.
11//! 3. **Optimized Node Layout**: Fields are ordered `hash -> key -> value -> next` to
12//!    optimize CPU cache line usage during checks.
13//!
14//! # Architecture
15//! - **Buckets**: Array of Atomic pointers (null-initialized).
16//! - **Nodes**: Singly linked list.
17//! - **Concurrency**: CAS-based lock-free insertion/removal.
18
19extern crate alloc;
20
21#[cfg(feature = "std")]
22extern crate std;
23
24use alloc::boxed::Box;
25use alloc::vec::Vec;
26use core::borrow::Borrow;
27use core::hash::{BuildHasher, Hash};
28use core::sync::atomic::Ordering;
29use foldhash::fast::FixedState;
30use kovan::{Atomic, RetiredNode, Shared, pin, retire};
31
32/// Number of buckets.
33/// 524,288 = 2^19.
34/// Size of bucket array = 512k * 8 bytes = 4MB.
35/// This is large enough to minimize collisions for 100k-500k items
36/// while still fitting in modern L3 caches.
37const BUCKET_COUNT: usize = 524_288;
38
39/// A simple exponential backoff for reducing contention.
40struct Backoff {
41    step: u32,
42}
43
44impl Backoff {
45    #[inline(always)]
46    fn new() -> Self {
47        Self { step: 0 }
48    }
49
50    #[inline(always)]
51    fn spin(&mut self) {
52        for _ in 0..(1 << self.step.min(6)) {
53            core::hint::spin_loop();
54        }
55        if self.step <= 6 {
56            self.step += 1;
57        }
58    }
59}
60
61/// Node in the lock-free linked list.
62#[repr(C)]
63struct Node<K, V> {
64    retired: RetiredNode,
65    hash: u64,
66    key: K,
67    value: V,
68    next: Atomic<Node<K, V>>,
69}
70
71/// High-Performance Lock-Free Map.
72pub struct HashMap<K: 'static, V: 'static, S = FixedState> {
73    buckets: Box<[Atomic<Node<K, V>>]>,
74    mask: usize,
75    hasher: S,
76}
77
78#[cfg(feature = "std")]
79impl<K, V> HashMap<K, V, FixedState>
80where
81    K: Hash + Eq + Clone + 'static,
82    V: Clone + 'static,
83{
84    /// Creates a new empty hash map with FoldHash (FixedState).
85    pub fn new() -> Self {
86        Self::with_hasher(FixedState::default())
87    }
88}
89
90impl<K, V, S> HashMap<K, V, S>
91where
92    K: Hash + Eq + Clone + 'static,
93    V: Clone + 'static,
94    S: BuildHasher,
95{
96    /// Creates a new hash map with custom hasher.
97    pub fn with_hasher(hasher: S) -> Self {
98        let mut buckets = Vec::with_capacity(BUCKET_COUNT);
99        for _ in 0..BUCKET_COUNT {
100            buckets.push(Atomic::null());
101        }
102
103        Self {
104            buckets: buckets.into_boxed_slice(),
105            mask: BUCKET_COUNT - 1,
106            hasher,
107        }
108    }
109
110    #[inline(always)]
111    fn get_bucket_idx(&self, hash: u64) -> usize {
112        (hash as usize) & self.mask
113    }
114
115    #[inline(always)]
116    fn get_bucket(&self, idx: usize) -> &Atomic<Node<K, V>> {
117        unsafe { self.buckets.get_unchecked(idx) }
118    }
119
120    /// Optimized get operation.
121    pub fn get<Q>(&self, key: &Q) -> Option<V>
122    where
123        K: Borrow<Q>,
124        Q: Hash + Eq + ?Sized,
125    {
126        let hash = self.hasher.hash_one(key);
127        let idx = self.get_bucket_idx(hash);
128        let bucket = self.get_bucket(idx);
129
130        let guard = pin();
131        let mut current = bucket.load(Ordering::Acquire, &guard);
132
133        while !current.is_null() {
134            unsafe {
135                let node = current.deref();
136                // Check hash first (integer compare is fast)
137                if node.hash == hash && node.key.borrow() == key {
138                    return Some(node.value.clone());
139                }
140                current = node.next.load(Ordering::Acquire, &guard);
141            }
142        }
143        None
144    }
145
146    /// Checks if the key exists.
147    pub fn contains_key<Q>(&self, key: &Q) -> bool
148    where
149        K: Borrow<Q>,
150        Q: Hash + Eq + ?Sized,
151    {
152        self.get(key).is_some()
153    }
154
155    /// Insert a key-value pair.
156    pub fn insert(&self, key: K, value: V) -> Option<V> {
157        let hash = self.hasher.hash_one(&key);
158        let idx = self.get_bucket_idx(hash);
159        let bucket = self.get_bucket(idx);
160        let mut backoff = Backoff::new();
161
162        let guard = pin();
163
164        'outer: loop {
165            // 1. Search for existing key to update
166            let mut prev_link = bucket;
167            let mut current = prev_link.load(Ordering::Acquire, &guard);
168
169            while !current.is_null() {
170                unsafe {
171                    let node = current.deref();
172
173                    if node.hash == hash && node.key == key {
174                        // Key matches. Replace node (COW style).
175                        let next = node.next.load(Ordering::Relaxed, &guard);
176                        let old_value = node.value.clone();
177
178                        // Create new node pointing to existing next
179                        let new_node = Box::into_raw(Box::new(Node {
180                            retired: RetiredNode::new(),
181                            hash,
182                            key: key.clone(),
183                            value: value.clone(),
184                            next: Atomic::new(next.as_raw()),
185                        }));
186
187                        match prev_link.compare_exchange(
188                            current,
189                            Shared::from_raw(new_node),
190                            Ordering::Release,
191                            Ordering::Relaxed,
192                            &guard,
193                        ) {
194                            Ok(_) => {
195                                retire(current.as_raw());
196                                return Some(old_value);
197                            }
198                            Err(_) => {
199                                // CAS failed.
200                                drop(Box::from_raw(new_node));
201                                backoff.spin();
202                                continue 'outer;
203                            }
204                        }
205                    }
206
207                    prev_link = &node.next;
208                    current = node.next.load(Ordering::Acquire, &guard);
209                }
210            }
211
212            // 2. Key not found. Insert at TAIL (prev_link).
213            // Note: In low collision scenarios, prev_link is often the bucket head itself.
214            let new_node_ptr = Box::into_raw(Box::new(Node {
215                retired: RetiredNode::new(),
216                hash,
217                key: key.clone(),
218                value: value.clone(),
219                next: Atomic::null(),
220            }));
221
222            // Try to swap NULL -> NEW_NODE
223            match prev_link.compare_exchange(
224                unsafe { Shared::from_raw(core::ptr::null_mut()) },
225                unsafe { Shared::from_raw(new_node_ptr) },
226                Ordering::Release,
227                Ordering::Relaxed,
228                &guard,
229            ) {
230                Ok(_) => return None,
231                Err(actual_val) => {
232                    // Contention at the tail.
233                    // Someone appended something while we were watching.
234                    // Check if they inserted OUR key.
235                    unsafe {
236                        let actual_node = actual_val.deref();
237                        if actual_node.hash == hash && actual_node.key == key {
238                            // Race lost, but key exists now. We should retry to update it.
239                            drop(Box::from_raw(new_node_ptr));
240                            backoff.spin();
241                            continue 'outer;
242                        }
243                    }
244
245                    // Otherwise, just retry the search/append loop
246                    unsafe {
247                        drop(Box::from_raw(new_node_ptr));
248                    }
249                    backoff.spin();
250                    continue 'outer;
251                }
252            }
253        }
254    }
255
256    /// Insert a key-value pair only if the key does not exist.
257    /// Returns `None` if inserted, `Some(existing_value)` if the key already exists.
258    pub fn insert_if_absent(&self, key: K, value: V) -> Option<V> {
259        let hash = self.hasher.hash_one(&key);
260        let idx = self.get_bucket_idx(hash);
261        let bucket = self.get_bucket(idx);
262        let mut backoff = Backoff::new();
263
264        let guard = pin();
265
266        'outer: loop {
267            // 1. Search for existing key
268            let mut prev_link = bucket;
269            let mut current = prev_link.load(Ordering::Acquire, &guard);
270
271            while !current.is_null() {
272                unsafe {
273                    let node = current.deref();
274
275                    if node.hash == hash && node.key == key {
276                        // Key matches. Return existing value.
277                        return Some(node.value.clone());
278                    }
279
280                    prev_link = &node.next;
281                    current = node.next.load(Ordering::Acquire, &guard);
282                }
283            }
284
285            // 2. Key not found. Insert at TAIL (prev_link).
286            let new_node_ptr = Box::into_raw(Box::new(Node {
287                retired: RetiredNode::new(),
288                hash,
289                key: key.clone(),
290                value: value.clone(),
291                next: Atomic::null(),
292            }));
293
294            // Try to swap NULL -> NEW_NODE
295            match prev_link.compare_exchange(
296                unsafe { Shared::from_raw(core::ptr::null_mut()) },
297                unsafe { Shared::from_raw(new_node_ptr) },
298                Ordering::Release,
299                Ordering::Relaxed,
300                &guard,
301            ) {
302                Ok(_) => return None,
303                Err(actual_val) => {
304                    // Contention at the tail.
305                    unsafe {
306                        let actual_node = actual_val.deref();
307                        if actual_node.hash == hash && actual_node.key == key {
308                            // Race lost, key exists now. Return existing value.
309                            drop(Box::from_raw(new_node_ptr));
310                            return Some(actual_node.value.clone());
311                        }
312                    }
313
314                    // Otherwise, just retry the search/append loop
315                    unsafe {
316                        drop(Box::from_raw(new_node_ptr));
317                    }
318                    backoff.spin();
319                    continue 'outer;
320                }
321            }
322        }
323    }
324
325    /// Returns the value corresponding to the key, or inserts the given value if the key is not present.
326    ///
327    /// This is linearizable: concurrent callers for the same key are guaranteed to
328    /// agree on which value was inserted (exactly one thread's CAS succeeds at the
329    /// list tail, and all others see that node on retry).
330    pub fn get_or_insert(&self, key: K, value: V) -> V {
331        // HashMap's insert_if_absent is linearizable (CAS at list tail is atomic
332        // with the linked-list structure), so no post-insert get() needed.
333        match self.insert_if_absent(key, value.clone()) {
334            Some(existing) => existing,
335            None => value,
336        }
337    }
338
339    /// Remove a key-value pair.
340    pub fn remove<Q>(&self, key: &Q) -> Option<V>
341    where
342        K: Borrow<Q>,
343        Q: Hash + Eq + ?Sized,
344    {
345        let hash = self.hasher.hash_one(key);
346        let idx = self.get_bucket_idx(hash);
347        let bucket = self.get_bucket(idx);
348        let mut backoff = Backoff::new();
349
350        let guard = pin();
351
352        loop {
353            let mut prev_link = bucket;
354            let mut current = prev_link.load(Ordering::Acquire, &guard);
355
356            while !current.is_null() {
357                unsafe {
358                    let node = current.deref();
359
360                    if node.hash == hash && node.key.borrow() == key {
361                        let next = node.next.load(Ordering::Acquire, &guard);
362                        let old_value = node.value.clone();
363
364                        match prev_link.compare_exchange(
365                            current,
366                            next,
367                            Ordering::Release,
368                            Ordering::Relaxed,
369                            &guard,
370                        ) {
371                            Ok(_) => {
372                                retire(current.as_raw());
373                                return Some(old_value);
374                            }
375                            Err(_) => {
376                                backoff.spin();
377                                break; // Break inner loop to retry outer
378                            }
379                        }
380                    }
381
382                    prev_link = &node.next;
383                    current = node.next.load(Ordering::Acquire, &guard);
384                }
385            }
386
387            if current.is_null() {
388                return None;
389            }
390        }
391    }
392
393    /// Clear the map.
394    pub fn clear(&self) {
395        let guard = pin();
396
397        for bucket in self.buckets.iter() {
398            loop {
399                let head = bucket.load(Ordering::Acquire, &guard);
400                if head.is_null() {
401                    break;
402                }
403
404                // Try to unlink the whole chain at once
405                match bucket.compare_exchange(
406                    head,
407                    unsafe { Shared::from_raw(core::ptr::null_mut()) },
408                    Ordering::Release,
409                    Ordering::Relaxed,
410                    &guard,
411                ) {
412                    Ok(_) => {
413                        // Retire all nodes in the chain
414                        unsafe {
415                            let mut current = head;
416                            while !current.is_null() {
417                                let node = current.deref();
418                                let next = node.next.load(Ordering::Relaxed, &guard);
419                                retire(current.as_raw());
420                                current = next;
421                            }
422                        }
423                        break;
424                    }
425                    Err(_) => {
426                        // Contention, retry
427                        continue;
428                    }
429                }
430            }
431        }
432    }
433
434    /// Returns true if the map is empty.
435    pub fn is_empty(&self) -> bool {
436        self.len() == 0
437    }
438
439    /// Returns the number of elements in the map.
440    /// Note: This is an O(N) operation as it scans all buckets.
441    pub fn len(&self) -> usize {
442        let mut count = 0;
443        let guard = pin();
444        for bucket in self.buckets.iter() {
445            let mut current = bucket.load(Ordering::Acquire, &guard);
446            while !current.is_null() {
447                unsafe {
448                    let node = current.deref();
449                    count += 1;
450                    current = node.next.load(Ordering::Acquire, &guard);
451                }
452            }
453        }
454        count
455    }
456
457    /// Returns an iterator over the map entries.
458    /// Yields (K, V) clones.
459    pub fn iter(&self) -> Iter<'_, K, V, S> {
460        Iter {
461            map: self,
462            bucket_idx: 0,
463            current: core::ptr::null(),
464            guard: pin(),
465        }
466    }
467
468    /// Returns an iterator over the map keys.
469    /// Yields K clones.
470    pub fn keys(&self) -> Keys<'_, K, V, S> {
471        Keys { iter: self.iter() }
472    }
473
474    /// Get the underlying hasher itself.
475    pub fn hasher(&self) -> &S {
476        &self.hasher
477    }
478}
479
480/// Iterator over HashMap entries.
481pub struct Iter<'a, K: 'static, V: 'static, S> {
482    map: &'a HashMap<K, V, S>,
483    bucket_idx: usize,
484    current: *const Node<K, V>,
485    guard: kovan::Guard,
486}
487
488impl<'a, K, V, S> Iterator for Iter<'a, K, V, S>
489where
490    K: Clone,
491    V: Clone,
492{
493    type Item = (K, V);
494
495    fn next(&mut self) -> Option<Self::Item> {
496        loop {
497            if !self.current.is_null() {
498                unsafe {
499                    let node = &*self.current;
500                    // Advance current
501                    self.current = node.next.load(Ordering::Acquire, &self.guard).as_raw();
502                    return Some((node.key.clone(), node.value.clone()));
503                }
504            }
505
506            // Move to next bucket
507            if self.bucket_idx >= self.map.buckets.len() {
508                return None;
509            }
510
511            let bucket = unsafe { self.map.buckets.get_unchecked(self.bucket_idx) };
512            self.bucket_idx += 1;
513            self.current = bucket.load(Ordering::Acquire, &self.guard).as_raw();
514        }
515    }
516}
517
518/// Iterator over HashMap keys.
519pub struct Keys<'a, K: 'static, V: 'static, S> {
520    iter: Iter<'a, K, V, S>,
521}
522
523impl<'a, K, V, S> Iterator for Keys<'a, K, V, S>
524where
525    K: Clone,
526    V: Clone,
527{
528    type Item = K;
529
530    fn next(&mut self) -> Option<Self::Item> {
531        self.iter.next().map(|(k, _)| k)
532    }
533}
534
535impl<'a, K, V, S> IntoIterator for &'a HashMap<K, V, S>
536where
537    K: Hash + Eq + Clone + 'static,
538    V: Clone + 'static,
539    S: BuildHasher,
540{
541    type Item = (K, V);
542    type IntoIter = Iter<'a, K, V, S>;
543
544    fn into_iter(self) -> Self::IntoIter {
545        self.iter()
546    }
547}
548
549#[cfg(feature = "std")]
550impl<K, V> Default for HashMap<K, V, FixedState>
551where
552    K: Hash + Eq + Clone + 'static,
553    V: Clone + 'static,
554{
555    fn default() -> Self {
556        Self::new()
557    }
558}
559
560// SAFETY: HashMap is Send/Sync if K, V are.
561unsafe impl<K: Send, V: Send, S: Send> Send for HashMap<K, V, S> {}
562unsafe impl<K: Sync, V: Sync, S: Sync> Sync for HashMap<K, V, S> {}
563
564impl<K, V, S> Drop for HashMap<K, V, S> {
565    fn drop(&mut self) {
566        // SAFETY: `drop(&mut self)` guarantees exclusive ownership — no concurrent
567        // readers can exist. Free nodes immediately instead of deferring via `retire()`.
568        let guard = pin();
569
570        for bucket in self.buckets.iter() {
571            let mut current = bucket.load(Ordering::Acquire, &guard);
572
573            unsafe {
574                while !current.is_null() {
575                    let node = current.deref();
576                    let next = node.next.load(Ordering::Relaxed, &guard);
577                    drop(Box::from_raw(current.as_raw()));
578                    current = next;
579                }
580            }
581        }
582    }
583}
584
585#[cfg(test)]
586mod tests {
587    use super::*;
588
589    #[test]
590    fn test_insert_and_get() {
591        let map = HashMap::new();
592        assert_eq!(map.insert(1, 100), None);
593        assert_eq!(map.get(&1), Some(100));
594        assert_eq!(map.get(&2), None);
595    }
596
597    #[test]
598    fn test_insert_replace() {
599        let map = HashMap::new();
600        assert_eq!(map.insert(1, 100), None);
601        assert_eq!(map.insert(1, 200), Some(100));
602        assert_eq!(map.get(&1), Some(200));
603    }
604
605    #[test]
606    fn test_concurrent_inserts() {
607        use alloc::sync::Arc;
608        extern crate std;
609        use std::thread;
610
611        let map = Arc::new(HashMap::new());
612        let mut handles = alloc::vec::Vec::new();
613
614        for thread_id in 0..4 {
615            let map_clone = Arc::clone(&map);
616            let handle = thread::spawn(move || {
617                for i in 0..1000 {
618                    let key = thread_id * 1000 + i;
619                    map_clone.insert(key, key * 2);
620                }
621            });
622            handles.push(handle);
623        }
624
625        for handle in handles {
626            handle.join().unwrap();
627        }
628
629        for thread_id in 0..4 {
630            for i in 0..1000 {
631                let key = thread_id * 1000 + i;
632                assert_eq!(map.get(&key), Some(key * 2));
633            }
634        }
635    }
636}