Skip to main content

kovan_map/
hopscotch.rs

1//! Lock-Free Growing/Shrinking Hopscotch Hash Map
2//!
3//! # Features
4//!
5//! - **Robust Concurrency**: Uses Copy-on-Move displacement to prevent Use-After-Free.
6//! - **Safe Resizing**: Uses a lightweight resize lock to prevent lost updates during migration.
7//! - **Memory reclamation**: Uses Kovan.
8//! - **Clone Support**: Supports V: Clone (e.g., `Arc<T>`) instead of just Copy.
9
10extern crate alloc;
11
12use alloc::boxed::Box;
13use alloc::vec::Vec;
14use core::borrow::Borrow;
15use core::hash::{BuildHasher, Hash};
16use core::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
17use foldhash::fast::FixedState;
18use kovan::{Atomic, RetiredNode, Shared, pin, retire};
19
20/// Neighborhood size (H parameter in hopscotch hashing)
21const NEIGHBORHOOD_SIZE: usize = 32;
22
23/// Initial capacity
24const INITIAL_CAPACITY: usize = 64;
25
26/// Load factor threshold for growing (75%)
27const GROW_THRESHOLD: f64 = 0.75;
28
29/// Load factor threshold for shrinking (25%)
30const SHRINK_THRESHOLD: f64 = 0.25;
31
32/// Minimum capacity to prevent excessive shrinking
33const MIN_CAPACITY: usize = 64;
34
35/// Maximum probe distance before giving up and resizing
36const MAX_PROBE_DISTANCE: usize = 512;
37
38/// A bucket in the hopscotch hash table
39struct Bucket<K, V> {
40    /// Bitmap indicating which of the next H slots contain items that hash to this bucket
41    hop_info: AtomicU32,
42    /// The actual key-value slot at this position
43    slot: Atomic<Entry<K, V>>,
44    /// Per-home-bucket writer guard (Herlihy-style hopscotch): held only
45    /// across `try_insert`'s existence-scan + slot-claim so two same-key
46    /// inserters can never both claim a slot. Readers never touch it -
47    /// `get`/iteration stay lock-free. Without it, phase-1 (scan hop bits
48    /// for the key) and phase-2 (CAS any empty neighborhood slot, THEN set
49    /// the hop bit) are not atomic: two `insert_if_absent` callers could
50    /// both be told "absent", leaving two live versions of one key with
51    /// `get` only ever returning one of them.
52    write_guard: AtomicBool,
53}
54
55/// Releases a bucket's writer guard on drop, so every exit path out of the
56/// guarded section (success, retry, resize, early return) unlocks exactly
57/// once.
58struct WriteGuardRelease<'t, K: 'static, V: 'static> {
59    table: &'t Table<K, V>,
60    idx: usize,
61}
62
63impl<K, V> Drop for WriteGuardRelease<'_, K, V> {
64    fn drop(&mut self) {
65        self.table
66            .get_bucket(self.idx)
67            .write_guard
68            .store(false, Ordering::Release);
69    }
70}
71
72/// An entry in the hash table
73#[repr(C)]
74struct Entry<K, V> {
75    retired: RetiredNode,
76    hash: u64,
77    key: K,
78    value: V,
79}
80
81impl<K: Clone, V: Clone> Clone for Entry<K, V> {
82    fn clone(&self) -> Self {
83        Self {
84            retired: RetiredNode::new(),
85            hash: self.hash,
86            key: self.key.clone(),
87            value: self.value.clone(),
88        }
89    }
90}
91
92// SAFETY (kovan retirement rule): a retired Entry's destructor may run on
93// any thread, and entries (with K and V inside) move between threads —
94// hence `K: Send, V: Send` for Send. Lookups DO produce `&K`/`&V` from a
95// shared `&Entry` (get() clones V through &V under concurrent readers),
96// so Sync additionally requires `K: Sync, V: Sync` — the same bounds the
97// map-level Sync impl has always required for sharing the map.
98unsafe impl<K: Send, V: Send> Send for Entry<K, V> {}
99unsafe impl<K: Send + Sync, V: Send + Sync> Sync for Entry<K, V> {}
100
101/// The hash table structure
102#[repr(C)]
103struct Table<K, V> {
104    retired: RetiredNode,
105    buckets: Box<[Bucket<K, V>]>,
106    capacity: usize,
107    mask: usize,
108}
109
110// SAFETY (kovan retirement rule): same reasoning as Entry — a retired
111// Table's destructor may run on any thread (hence K, V: Send via the
112// contained entries); shared access to entries through a `&Table` carries
113// Entry's Sync requirements.
114unsafe impl<K: Send, V: Send> Send for Table<K, V> {}
115unsafe impl<K: Send + Sync, V: Send + Sync> Sync for Table<K, V> {}
116
117impl<K, V> Table<K, V> {
118    fn new(capacity: usize) -> Self {
119        let capacity = capacity.next_power_of_two().max(MIN_CAPACITY);
120        // We add padding to the array so we don't have to check bounds constantly
121        // during neighborhood scans.
122        let mut buckets = Vec::with_capacity(capacity + NEIGHBORHOOD_SIZE);
123
124        for _ in 0..(capacity + NEIGHBORHOOD_SIZE) {
125            buckets.push(Bucket {
126                hop_info: AtomicU32::new(0),
127                slot: Atomic::null(),
128                write_guard: AtomicBool::new(false),
129            });
130        }
131
132        Self {
133            retired: RetiredNode::new(),
134            buckets: buckets.into_boxed_slice(),
135            capacity,
136            mask: capacity - 1,
137        }
138    }
139
140    #[inline(always)]
141    fn bucket_index(&self, hash: u64) -> usize {
142        (hash as usize) & self.mask
143    }
144
145    #[inline(always)]
146    fn get_bucket(&self, idx: usize) -> &Bucket<K, V> {
147        // SAFETY: Internal indices are calculated via mask or bounded offset loops.
148        // The buckets array has padding to handle overflow up to NEIGHBORHOOD_SIZE.
149        unsafe { self.buckets.get_unchecked(idx) }
150    }
151}
152
153impl<K, V> Drop for Table<K, V> {
154    fn drop(&mut self) {
155        // Exclusive at this point: either the map itself is being dropped,
156        // or kovan reclaimed the table after every guard that could observe
157        // it has been released. Slots hold exactly the entries that were
158        // never individually unlinked+retired (remove/clear null the slot
159        // before retiring), so each entry is freed exactly once. Without
160        // this, every resize leaked the old table's entries.
161        let guard = pin();
162        for i in 0..(self.capacity + NEIGHBORHOOD_SIZE) {
163            let entry_ptr = self.buckets[i]
164                .slot
165                .load(Ordering::Relaxed, &guard)
166                .as_raw();
167            if !entry_ptr.is_null() {
168                unsafe {
169                    drop(Box::from_raw(entry_ptr));
170                }
171            }
172        }
173    }
174}
175
176/// A concurrent, lock-free hash map based on Hopscotch Hashing.
177pub struct HopscotchMap<K: 'static, V: 'static, S = FixedState> {
178    table: Atomic<Table<K, V>>,
179    count: AtomicUsize,
180    /// Prevents concurrent writes during resize migration to avoid lost updates
181    resizing: AtomicBool,
182    hasher: S,
183}
184
185#[cfg(feature = "std")]
186impl<K, V> HopscotchMap<K, V, FixedState>
187where
188    K: Hash + Eq + Clone + 'static,
189    V: Clone + 'static,
190{
191    /// Creates a new `HopscotchMap` with default capacity and hasher.
192    pub fn new() -> Self {
193        Self::with_hasher(FixedState::default())
194    }
195
196    /// Creates a new `HopscotchMap` with the specified capacity and default hasher.
197    pub fn with_capacity(capacity: usize) -> Self {
198        Self::with_capacity_and_hasher(capacity, FixedState::default())
199    }
200}
201
202impl<K, V, S> HopscotchMap<K, V, S>
203where
204    K: Hash + Eq + Clone + 'static,
205    V: Clone + 'static,
206    S: BuildHasher,
207{
208    /// Creates a new `HopscotchMap` with the specified hasher and default capacity.
209    pub fn with_hasher(hasher: S) -> Self {
210        Self::with_capacity_and_hasher(INITIAL_CAPACITY, hasher)
211    }
212
213    /// Creates a new `HopscotchMap` with the specified capacity and hasher.
214    pub fn with_capacity_and_hasher(capacity: usize, hasher: S) -> Self {
215        let table = Table::new(capacity);
216        Self {
217            table: Atomic::new(Box::into_raw(Box::new(table))),
218            count: AtomicUsize::new(0),
219            resizing: AtomicBool::new(false),
220            hasher,
221        }
222    }
223
224    /// Returns the number of elements in the map.
225    pub fn len(&self) -> usize {
226        self.count.load(Ordering::Relaxed)
227    }
228
229    /// Returns `true` if the map contains no elements.
230    pub fn is_empty(&self) -> bool {
231        self.len() == 0
232    }
233
234    /// Returns the current capacity of the map.
235    pub fn capacity(&self) -> usize {
236        let guard = pin();
237        let table_ptr = self.table.load(Ordering::Acquire, &guard);
238        unsafe { (*table_ptr.as_raw()).capacity }
239    }
240
241    #[inline]
242    fn wait_for_resize(&self) {
243        while self.resizing.load(Ordering::Acquire) {
244            core::hint::spin_loop();
245        }
246    }
247
248    /// Returns the value corresponding to the key.
249    pub fn get<Q>(&self, key: &Q) -> Option<V>
250    where
251        K: Borrow<Q>,
252        Q: Hash + Eq + ?Sized,
253    {
254        let hash = self.hasher.hash_one(key);
255        let guard = pin();
256        let table_ptr = self.table.load(Ordering::Acquire, &guard);
257        let table = unsafe { &*table_ptr.as_raw() };
258
259        let bucket_idx = table.bucket_index(hash);
260        let bucket = table.get_bucket(bucket_idx);
261
262        let hop_info = bucket.hop_info.load(Ordering::Acquire);
263
264        if hop_info == 0 {
265            return None;
266        }
267
268        for offset in 0..NEIGHBORHOOD_SIZE {
269            if hop_info & (1 << offset) != 0 {
270                let slot_idx = bucket_idx + offset;
271                let slot_bucket = table.get_bucket(slot_idx);
272                let entry_ptr = slot_bucket.slot.load(Ordering::Acquire, &guard);
273
274                if !entry_ptr.is_null() {
275                    let entry = unsafe { &*entry_ptr.as_raw() };
276                    if entry.hash == hash && entry.key.borrow() == key {
277                        return Some(entry.value.clone());
278                    }
279                }
280            }
281        }
282
283        None
284    }
285
286    /// Inserts a key-value pair into the map.
287    pub fn insert(&self, key: K, value: V) -> Option<V> {
288        self.insert_impl(key, value, false)
289    }
290
291    /// Helper for get_or_insert logic.
292    fn insert_impl(&self, key: K, value: V, only_if_absent: bool) -> Option<V> {
293        let hash = self.hasher.hash_one(&key);
294        // Track whether we already incremented count for a new insert across
295        // retry iterations.  Prevents both under-count (which causes cascading
296        // resizes on Windows) and double-count.
297        let mut counted = false;
298
299        loop {
300            self.wait_for_resize();
301
302            let guard = pin();
303            let table_ptr = self.table.load(Ordering::Acquire, &guard);
304            let table = unsafe { &*table_ptr.as_raw() };
305
306            if self.resizing.load(Ordering::Acquire) {
307                continue;
308            }
309
310            // Home-bucket writer guard: serialize same-bucket inserts so the
311            // existence scan and the slot claim inside try_insert are one
312            // atomic step. Contended -> spin via the outer loop, which keeps
313            // re-checking `resizing` (the resizer never takes this guard, so
314            // there is no lock-order deadlock; an insert that lands in the
315            // old table during a swap is caught by the re-validation below).
316            if self.acquire_write_guard(table, hash).is_none() {
317                core::hint::spin_loop();
318                continue;
319            }
320
321            // Note: We pass clones to try_insert if we loop here, but try_insert consumes them.
322            // Since `insert_impl` owns `key` and `value`, we must clone them for the call
323            // because `try_insert` might return `Retry` (looping again).
324            //
325            // The writer guard is scoped to the try_insert call alone: it
326            // must release BEFORE the resize arms below drop the pin and
327            // migrate (the release touches this table's bucket, which is
328            // only safe while the pin keeps the table alive).
329            let insert_result = {
330                let _wg = WriteGuardRelease {
331                    table,
332                    idx: table.bucket_index(hash),
333                };
334                self.try_insert(
335                    table,
336                    hash,
337                    key.clone(),
338                    value.clone(),
339                    only_if_absent,
340                    &guard,
341                )
342            };
343            match insert_result {
344                InsertResult::Success(old_val) => {
345                    // Count new inserts immediately so that concurrent removes
346                    // cannot decrement count below the true entry count.
347                    if old_val.is_none() && !counted {
348                        self.count.fetch_add(1, Ordering::Relaxed);
349                        counted = true;
350                    }
351
352                    // If a resize started while we were inserting, our update
353                    // might have been missed by the migration.
354                    // We must retry to ensure we write to the new table.
355                    if self.resizing.load(Ordering::SeqCst)
356                        || self.table.load(Ordering::SeqCst, &guard) != table_ptr
357                    {
358                        continue;
359                    }
360
361                    if counted {
362                        let new_count = self.count.load(Ordering::Relaxed);
363                        let current_capacity = table.capacity;
364                        let load_factor = new_count as f64 / current_capacity as f64;
365
366                        if load_factor > GROW_THRESHOLD {
367                            drop(guard);
368                            self.try_resize(current_capacity * 2);
369                        }
370                    }
371                    return old_val;
372                }
373                InsertResult::Exists(existing_val) => {
374                    return Some(existing_val);
375                }
376                InsertResult::NeedResize => {
377                    let current_capacity = table.capacity;
378                    drop(guard);
379                    self.try_resize(current_capacity * 2);
380                    continue;
381                }
382                InsertResult::Retry => {
383                    continue;
384                }
385            }
386        }
387    }
388
389    /// Returns the value corresponding to the key, or inserts the given value if the key is not present.
390    ///
391    /// When multiple threads call this concurrently for the same key (without
392    /// concurrent removes), all callers receive the same value.
393    pub fn get_or_insert(&self, key: K, value: V) -> V {
394        // Fast path: key already exists — no clone, no insert.
395        if let Some(v) = self.get(&key) {
396            return v;
397        }
398        // Slow path: insert_if_absent and use the return value directly.
399        // We must NOT do insert-then-get because a concurrent remove between
400        // the two operations would cause get to return None.
401        let key2 = key.clone();
402        match self.insert_impl(key, value.clone(), true) {
403            None => {
404                // We inserted, but concurrent inserts may have also placed
405                // the same key at a different offset (the CAS-then-hop-bit
406                // window allows duplicates). Re-get returns the canonical
407                // (lowest-offset) entry so every caller agrees on one value.
408                self.get(&key2).unwrap_or(value)
409            }
410            Some(existing) => existing, // Key already existed
411        }
412    }
413
414    /// Insert a key-value pair only if the key does not exist.
415    /// Returns `None` if inserted, `Some(existing_value)` if the key already exists.
416    pub fn insert_if_absent(&self, key: K, value: V) -> Option<V> {
417        self.insert_impl(key, value, true)
418    }
419
420    /// Remove **all** nodes matching `key`, returning the most recent value
421    /// if the key was present.
422    ///
423    /// [`remove`](Self::remove) unlinks only the first matching entry.
424    /// Insert/remove races can transiently leave more than one entry for
425    /// the same key ("versions"); after a plain `remove()` an older version
426    /// would become visible again. This method keeps removing until a full
427    /// scan finds no match, so the key is guaranteed absent at the
428    /// linearization point of the final scan.
429    ///
430    /// Use `remove()` for single-version removal semantics and
431    /// `force_remove()` when the key must be fully evicted.
432    ///
433    /// Note: a concurrent `insert` of the same key can land after the final
434    /// scan, as with any removal under contention.
435    pub fn force_remove<Q>(&self, key: &Q) -> Option<V>
436    where
437        K: Borrow<Q>,
438        Q: Hash + Eq + ?Sized,
439    {
440        let mut newest = None;
441        loop {
442            match self.remove(key) {
443                Some(v) => {
444                    // The first removal unlinks the first match in scan
445                    // order — the live (most recent) version.
446                    if newest.is_none() {
447                        newest = Some(v);
448                    }
449                }
450                None => return newest,
451            }
452        }
453    }
454
455    /// Removes a key from the map, returning the value at the key if the key was previously in the map.
456    pub fn remove<Q>(&self, key: &Q) -> Option<V>
457    where
458        K: Borrow<Q>,
459        Q: Hash + Eq + ?Sized,
460    {
461        let hash = self.hasher.hash_one(key);
462        // First successful removal's value is the linearized result;
463        // re-validation retries only evict migrated clones.
464        let mut result: Option<V> = None;
465
466        'outer: loop {
467            self.wait_for_resize();
468
469            let guard = pin();
470            let table_ptr = self.table.load(Ordering::Acquire, &guard);
471            let table_raw = table_ptr.as_raw();
472            let table = unsafe { &*table_raw };
473
474            if self.resizing.load(Ordering::Acquire) {
475                continue;
476            }
477
478            let bucket_idx = table.bucket_index(hash);
479            let bucket = table.get_bucket(bucket_idx);
480
481            let hop_info = bucket.hop_info.load(Ordering::Acquire);
482            if hop_info == 0 {
483                return result;
484            }
485
486            for offset in 0..NEIGHBORHOOD_SIZE {
487                if hop_info & (1 << offset) != 0 {
488                    let slot_idx = bucket_idx + offset;
489                    let slot_bucket = table.get_bucket(slot_idx);
490                    let entry_ptr = slot_bucket.slot.load(Ordering::Acquire, &guard);
491
492                    if !entry_ptr.is_null() {
493                        let entry = unsafe { &*entry_ptr.as_raw() };
494                        if entry.hash == hash && entry.key.borrow() == key {
495                            let old_value = entry.value.clone();
496
497                            match slot_bucket.slot.compare_exchange(
498                                entry_ptr,
499                                unsafe { Shared::from_raw(core::ptr::null_mut()) },
500                                Ordering::Release,
501                                Ordering::Relaxed,
502                                &guard,
503                            ) {
504                                Ok(_) => {
505                                    let mask = !(1u32 << offset);
506                                    bucket.hop_info.fetch_and(mask, Ordering::Release);
507
508                                    unsafe { retire(entry_ptr.as_raw()) };
509                                    if result.is_none() {
510                                        result = Some(old_value);
511                                    }
512
513                                    // Saturating decrement: prevent count from wrapping
514                                    // to usize::MAX which would trigger catastrophic
515                                    // cascading resizes.
516                                    let shrink_to = if let Ok(prev) = self.count.fetch_update(
517                                        Ordering::Relaxed,
518                                        Ordering::Relaxed,
519                                        |c| c.checked_sub(1),
520                                    ) {
521                                        let new_count = prev - 1;
522                                        let current_capacity = table.capacity;
523                                        let load_factor =
524                                            new_count as f64 / current_capacity as f64;
525                                        (load_factor < SHRINK_THRESHOLD
526                                            && current_capacity > MIN_CAPACITY)
527                                            .then_some(current_capacity / 2)
528                                    } else {
529                                        None
530                                    };
531
532                                    // Re-validate: a concurrent migration may
533                                    // have cloned this entry into a new table
534                                    // before we unlinked it here — redo the
535                                    // removal on the current table so the key
536                                    // does not resurrect.
537                                    if self.resizing.load(Ordering::SeqCst)
538                                        || self.table.load(Ordering::SeqCst, &guard).as_raw()
539                                            != table_raw
540                                    {
541                                        continue 'outer;
542                                    }
543
544                                    if let Some(cap) = shrink_to {
545                                        drop(guard);
546                                        self.try_resize(cap);
547                                    }
548                                    return result;
549                                }
550                                Err(_) => {
551                                    break;
552                                }
553                            }
554                        }
555                    }
556                }
557            }
558            return result;
559        }
560    }
561
562    /// Clears the map, removing all key-value pairs.
563    pub fn clear(&self) {
564        while self
565            .resizing
566            .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
567            .is_err()
568        {
569            core::hint::spin_loop();
570        }
571
572        let guard = pin();
573        let table_ptr = self.table.load(Ordering::Acquire, &guard);
574        let table = unsafe { &*table_ptr.as_raw() };
575
576        for i in 0..(table.capacity + NEIGHBORHOOD_SIZE) {
577            let bucket = table.get_bucket(i);
578            let entry_ptr = bucket.slot.load(Ordering::Acquire, &guard);
579
580            if !entry_ptr.is_null()
581                && bucket
582                    .slot
583                    .compare_exchange(
584                        entry_ptr,
585                        unsafe { Shared::from_raw(core::ptr::null_mut()) },
586                        Ordering::Release,
587                        Ordering::Relaxed,
588                        &guard,
589                    )
590                    .is_ok()
591            {
592                unsafe { retire(entry_ptr.as_raw()) };
593            }
594
595            if i < table.capacity {
596                let b = table.get_bucket(i);
597                b.hop_info.store(0, Ordering::Release);
598            }
599        }
600
601        self.count.store(0, Ordering::Release);
602        self.resizing.store(false, Ordering::Release);
603    }
604
605    /// Try to take the home bucket's writer guard. `Some(())` on success;
606    /// `None` when another writer holds it (caller re-loops, staying
607    /// responsive to resize).
608    fn acquire_write_guard(&self, table: &Table<K, V>, hash: u64) -> Option<()> {
609        let bucket = table.get_bucket(table.bucket_index(hash));
610        if bucket.write_guard.swap(true, Ordering::Acquire) {
611            None
612        } else {
613            Some(())
614        }
615    }
616
617    fn try_insert(
618        &self,
619        table: &Table<K, V>,
620        hash: u64,
621        key: K,
622        value: V,
623        only_if_absent: bool,
624        guard: &kovan::Guard,
625    ) -> InsertResult<V> {
626        let bucket_idx = table.bucket_index(hash);
627        let bucket = table.get_bucket(bucket_idx);
628
629        // 1. Check if key exists (Update)
630        let hop_info = bucket.hop_info.load(Ordering::Acquire);
631        for offset in 0..NEIGHBORHOOD_SIZE {
632            if hop_info & (1 << offset) != 0 {
633                let slot_idx = bucket_idx + offset;
634                let slot_bucket = table.get_bucket(slot_idx);
635                let entry_ptr = slot_bucket.slot.load(Ordering::Acquire, guard);
636
637                if !entry_ptr.is_null() {
638                    let entry = unsafe { &*entry_ptr.as_raw() };
639                    if entry.hash == hash && entry.key == key {
640                        if only_if_absent {
641                            return InsertResult::Exists(entry.value.clone());
642                        }
643
644                        let old_value = entry.value.clone();
645                        // Clone key and value because if CAS fails, we retry, and we are inside a loop.
646                        // We cannot move out of `key` or `value` inside a loop.
647                        let new_entry = Box::into_raw(Box::new(Entry {
648                            retired: RetiredNode::new(),
649                            hash,
650                            key: key.clone(),
651                            value: value.clone(),
652                        }));
653
654                        match slot_bucket.slot.compare_exchange(
655                            entry_ptr,
656                            unsafe { Shared::from_raw(new_entry) },
657                            Ordering::Release,
658                            Ordering::Relaxed,
659                            guard,
660                        ) {
661                            Ok(_) => {
662                                unsafe { retire(entry_ptr.as_raw()) };
663                                return InsertResult::Success(Some(old_value));
664                            }
665                            Err(_) => {
666                                drop(unsafe { Box::from_raw(new_entry) });
667                                return InsertResult::Retry;
668                            }
669                        }
670                    }
671                }
672            }
673        }
674
675        // 2. Find empty slot
676        for offset in 0..NEIGHBORHOOD_SIZE {
677            let slot_idx = bucket_idx + offset;
678            if slot_idx >= table.capacity + NEIGHBORHOOD_SIZE {
679                return InsertResult::NeedResize;
680            }
681
682            let slot_bucket = table.get_bucket(slot_idx);
683            let entry_ptr = slot_bucket.slot.load(Ordering::Acquire, guard);
684
685            if entry_ptr.is_null() {
686                // Clone key and value. If CAS fails, we continue the loop, so we need the originals
687                // for the next iteration.
688                let new_entry = Box::into_raw(Box::new(Entry {
689                    retired: RetiredNode::new(),
690                    hash,
691                    key: key.clone(),
692                    value: value.clone(),
693                }));
694
695                match slot_bucket.slot.compare_exchange(
696                    unsafe { Shared::from_raw(core::ptr::null_mut()) },
697                    unsafe { Shared::from_raw(new_entry) },
698                    Ordering::Release,
699                    Ordering::Relaxed,
700                    guard,
701                ) {
702                    Ok(_) => {
703                        bucket.hop_info.fetch_or(1u32 << offset, Ordering::Release);
704                        return InsertResult::Success(None);
705                    }
706                    Err(_) => {
707                        drop(unsafe { Box::from_raw(new_entry) });
708                        continue;
709                    }
710                }
711            }
712        }
713
714        // 3. Try displacement
715        match self.try_find_closer_slot(table, bucket_idx, guard) {
716            Some(final_offset) if final_offset < NEIGHBORHOOD_SIZE => {
717                let slot_idx = bucket_idx + final_offset;
718                let slot_bucket = table.get_bucket(slot_idx);
719
720                let curr = slot_bucket.slot.load(Ordering::Relaxed, guard);
721                if !curr.is_null() {
722                    return InsertResult::Retry;
723                }
724
725                // This is the final attempt in this function. We can move key/value here
726                // because previous usages were clones.
727                let new_entry = Box::into_raw(Box::new(Entry {
728                    retired: RetiredNode::new(),
729                    hash,
730                    key,
731                    value,
732                }));
733
734                match slot_bucket.slot.compare_exchange(
735                    unsafe { Shared::from_raw(core::ptr::null_mut()) },
736                    unsafe { Shared::from_raw(new_entry) },
737                    Ordering::Release,
738                    Ordering::Relaxed,
739                    guard,
740                ) {
741                    Ok(_) => {
742                        bucket
743                            .hop_info
744                            .fetch_or(1u32 << final_offset, Ordering::Release);
745                        InsertResult::Success(None)
746                    }
747                    Err(_) => {
748                        drop(unsafe { Box::from_raw(new_entry) });
749                        InsertResult::Retry
750                    }
751                }
752            }
753            _ => InsertResult::NeedResize,
754        }
755    }
756
757    fn try_find_closer_slot(
758        &self,
759        table: &Table<K, V>,
760        bucket_idx: usize,
761        guard: &kovan::Guard,
762    ) -> Option<usize> {
763        for probe_offset in NEIGHBORHOOD_SIZE..MAX_PROBE_DISTANCE {
764            let probe_idx = bucket_idx + probe_offset;
765            if probe_idx >= table.capacity + NEIGHBORHOOD_SIZE {
766                return None;
767            }
768
769            let probe_bucket = table.get_bucket(probe_idx);
770            let entry_ptr = probe_bucket.slot.load(Ordering::Acquire, guard);
771
772            if entry_ptr.is_null() {
773                return self.try_move_closer(table, bucket_idx, probe_idx, guard);
774            }
775        }
776        None
777    }
778
779    fn try_move_closer(
780        &self,
781        table: &Table<K, V>,
782        target_idx: usize,
783        empty_idx: usize,
784        guard: &kovan::Guard,
785    ) -> Option<usize> {
786        let mut current_empty = empty_idx;
787
788        while current_empty > target_idx + NEIGHBORHOOD_SIZE - 1 {
789            let mut moved = false;
790
791            for offset in 1..NEIGHBORHOOD_SIZE.min(current_empty - target_idx) {
792                let candidate_idx = current_empty - offset;
793                let candidate_bucket = table.get_bucket(candidate_idx);
794                let entry_ptr = candidate_bucket.slot.load(Ordering::Acquire, guard);
795
796                if !entry_ptr.is_null() {
797                    let entry = unsafe { &*entry_ptr.as_raw() };
798                    let entry_home = table.bucket_index(entry.hash);
799
800                    if entry_home <= candidate_idx && current_empty < entry_home + NEIGHBORHOOD_SIZE
801                    {
802                        // Copy-on-Move for safety
803                        let new_entry = Box::into_raw(Box::new(entry.clone()));
804                        let empty_bucket = table.get_bucket(current_empty);
805
806                        match empty_bucket.slot.compare_exchange(
807                            unsafe { Shared::from_raw(core::ptr::null_mut()) },
808                            unsafe { Shared::from_raw(new_entry) },
809                            Ordering::Release,
810                            Ordering::Relaxed,
811                            guard,
812                        ) {
813                            Ok(_) => {
814                                match candidate_bucket.slot.compare_exchange(
815                                    entry_ptr,
816                                    unsafe { Shared::from_raw(core::ptr::null_mut()) },
817                                    Ordering::Release,
818                                    Ordering::Relaxed,
819                                    guard,
820                                ) {
821                                    Ok(_) => {
822                                        let old_offset = candidate_idx - entry_home;
823                                        let new_offset = current_empty - entry_home;
824
825                                        let home_bucket = table.get_bucket(entry_home);
826                                        home_bucket
827                                            .hop_info
828                                            .fetch_and(!(1u32 << old_offset), Ordering::Release);
829                                        home_bucket
830                                            .hop_info
831                                            .fetch_or(1u32 << new_offset, Ordering::Release);
832
833                                        unsafe { retire(entry_ptr.as_raw()) };
834                                        current_empty = candidate_idx;
835                                        moved = true;
836                                        break;
837                                    }
838                                    Err(_) => {
839                                        // Attempt to revert the insertion of new_entry.
840                                        match empty_bucket.slot.compare_exchange(
841                                            unsafe { Shared::from_raw(new_entry) },
842                                            unsafe { Shared::from_raw(core::ptr::null_mut()) },
843                                            Ordering::Release,
844                                            Ordering::Relaxed,
845                                            guard,
846                                        ) {
847                                            Ok(_) => {
848                                                // Revert succeeded: no other thread touched it.
849                                                // Safe to drop immediately.
850                                                unsafe { drop(Box::from_raw(new_entry)) };
851                                            }
852                                            Err(_) => {
853                                                // Displacement already happened here...
854                                                // Too late, so we just drop it. Another thread found new_entry,
855                                                // displaced it, and replaced it with something else.
856                                                // This means new_entry is now part of the blocks
857                                                // and was already "retired" by the other thread,
858                                                // OR it was just moved. In either case, we must
859                                                // let the reclamation system handle it to avoid a
860                                                // double free.
861                                                unsafe { retire(new_entry) };
862                                            }
863                                        }
864                                        continue;
865                                    }
866                                }
867                            }
868                            Err(_) => {
869                                unsafe { drop(Box::from_raw(new_entry)) };
870                                continue;
871                            }
872                        }
873                    }
874                }
875            }
876
877            if !moved {
878                return None;
879            }
880        }
881
882        if current_empty >= target_idx && current_empty < target_idx + NEIGHBORHOOD_SIZE {
883            Some(current_empty - target_idx)
884        } else {
885            None
886        }
887    }
888
889    fn insert_into_new_table(
890        &self,
891        table: &Table<K, V>,
892        hash: u64,
893        key: K,
894        value: V,
895        guard: &kovan::Guard,
896    ) -> bool {
897        let bucket_idx = table.bucket_index(hash);
898
899        for probe_offset in 0..(table.capacity + NEIGHBORHOOD_SIZE) {
900            let probe_idx = bucket_idx + probe_offset;
901            if probe_idx >= table.capacity + NEIGHBORHOOD_SIZE {
902                break;
903            }
904
905            let probe_bucket = table.get_bucket(probe_idx);
906            let slot_ptr = probe_bucket.slot.load(Ordering::Relaxed, guard);
907
908            if slot_ptr.is_null() {
909                let offset_from_home = probe_idx - bucket_idx;
910
911                if offset_from_home < NEIGHBORHOOD_SIZE {
912                    let new_entry = Box::into_raw(Box::new(Entry {
913                        retired: RetiredNode::new(),
914                        hash,
915                        key,
916                        value,
917                    }));
918                    probe_bucket
919                        .slot
920                        .store(unsafe { Shared::from_raw(new_entry) }, Ordering::Release);
921
922                    let bucket = table.get_bucket(bucket_idx);
923                    bucket
924                        .hop_info
925                        .fetch_or(1u32 << offset_from_home, Ordering::Relaxed);
926                    return true;
927                } else {
928                    return false;
929                }
930            }
931        }
932        false
933    }
934
935    fn try_resize(&self, new_capacity: usize) {
936        if self
937            .resizing
938            .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
939            .is_err()
940        {
941            return;
942        }
943
944        let new_capacity = new_capacity.next_power_of_two().max(MIN_CAPACITY);
945        let guard = pin();
946        let old_table_ptr = self.table.load(Ordering::Acquire, &guard);
947        let old_table = unsafe { &*old_table_ptr.as_raw() };
948
949        if old_table.capacity == new_capacity {
950            self.resizing.store(false, Ordering::Release);
951            return;
952        }
953
954        let new_table = Box::into_raw(Box::new(Table::new(new_capacity)));
955        let new_table_ref = unsafe { &*new_table };
956
957        let mut success = true;
958
959        for i in 0..(old_table.capacity + NEIGHBORHOOD_SIZE) {
960            let bucket = old_table.get_bucket(i);
961            let entry_ptr = bucket.slot.load(Ordering::Acquire, &guard);
962
963            if !entry_ptr.is_null() {
964                let entry = unsafe { &*entry_ptr.as_raw() };
965                if !self.insert_into_new_table(
966                    new_table_ref,
967                    entry.hash,
968                    entry.key.clone(),
969                    entry.value.clone(),
970                    &guard,
971                ) {
972                    success = false;
973                    break;
974                }
975            }
976        }
977
978        if success {
979            match self.table.compare_exchange(
980                old_table_ptr,
981                unsafe { Shared::from_raw(new_table) },
982                Ordering::Release,
983                Ordering::Relaxed,
984                &guard,
985            ) {
986                Ok(_) => {
987                    unsafe { retire(old_table_ptr.as_raw()) };
988                }
989                Err(_) => {
990                    success = false;
991                }
992            }
993        }
994
995        if !success {
996            // The unpublished new table's destructor frees its cloned entries.
997            unsafe {
998                drop(Box::from_raw(new_table));
999            }
1000        }
1001
1002        self.resizing.store(false, Ordering::Release);
1003    }
1004    /// Returns an iterator over the map entries.
1005    pub fn iter(&self) -> HopscotchIter<'_, K, V, S> {
1006        let guard = pin();
1007        let table_ptr = self.table.load(Ordering::Acquire, &guard);
1008        let _table = unsafe { &*table_ptr.as_raw() };
1009        HopscotchIter {
1010            map: self,
1011            bucket_idx: 0,
1012            guard,
1013        }
1014    }
1015
1016    /// Returns an iterator over the map keys.
1017    pub fn keys(&self) -> HopscotchKeys<'_, K, V, S> {
1018        HopscotchKeys { iter: self.iter() }
1019    }
1020
1021    /// Returns an iterator over the map values (clones `V`).
1022    pub fn values(&self) -> HopscotchValues<'_, K, V, S> {
1023        HopscotchValues { iter: self.iter() }
1024    }
1025
1026    /// Returns `true` if the key is present.
1027    pub fn contains_key<Q>(&self, key: &Q) -> bool
1028    where
1029        K: Borrow<Q>,
1030        Q: Hash + Eq + ?Sized,
1031    {
1032        self.get(key).is_some()
1033    }
1034
1035    /// Insert all `(K, V)` pairs from `iter`. Takes `&self` (concurrent map).
1036    pub fn extend<I: IntoIterator<Item = (K, V)>>(&self, iter: I) {
1037        for (k, v) in iter {
1038            self.insert(k, v);
1039        }
1040    }
1041
1042    /// Get the underlying hasher.
1043    pub fn hasher(&self) -> &S {
1044        &self.hasher
1045    }
1046}
1047
1048/// Iterator over HopscotchMap entries.
1049pub struct HopscotchIter<'a, K: 'static, V: 'static, S> {
1050    map: &'a HopscotchMap<K, V, S>,
1051    bucket_idx: usize,
1052    guard: kovan::Guard,
1053}
1054
1055impl<'a, K, V, S> Iterator for HopscotchIter<'a, K, V, S>
1056where
1057    K: Clone,
1058    V: Clone,
1059{
1060    type Item = (K, V);
1061
1062    fn next(&mut self) -> Option<Self::Item> {
1063        let table_ptr = self.map.table.load(Ordering::Acquire, &self.guard);
1064        let table = unsafe { &*table_ptr.as_raw() };
1065
1066        while self.bucket_idx < table.buckets.len() {
1067            let bucket = table.get_bucket(self.bucket_idx);
1068            self.bucket_idx += 1;
1069
1070            let entry_ptr = bucket.slot.load(Ordering::Acquire, &self.guard);
1071            if !entry_ptr.is_null() {
1072                let entry = unsafe { &*entry_ptr.as_raw() };
1073                return Some((entry.key.clone(), entry.value.clone()));
1074            }
1075        }
1076        None
1077    }
1078}
1079
1080/// Iterator over HopscotchMap keys.
1081pub struct HopscotchKeys<'a, K: 'static, V: 'static, S> {
1082    iter: HopscotchIter<'a, K, V, S>,
1083}
1084
1085impl<'a, K, V, S> Iterator for HopscotchKeys<'a, K, V, S>
1086where
1087    K: Clone,
1088    V: Clone,
1089{
1090    type Item = K;
1091
1092    fn next(&mut self) -> Option<Self::Item> {
1093        self.iter.next().map(|(k, _)| k)
1094    }
1095}
1096
1097/// Iterator over HopscotchMap values (clones `V`).
1098pub struct HopscotchValues<'a, K: 'static, V: 'static, S> {
1099    iter: HopscotchIter<'a, K, V, S>,
1100}
1101
1102impl<'a, K, V, S> Iterator for HopscotchValues<'a, K, V, S>
1103where
1104    K: Clone,
1105    V: Clone,
1106{
1107    type Item = V;
1108
1109    #[inline]
1110    fn next(&mut self) -> Option<V> {
1111        self.iter.next().map(|(_, v)| v)
1112    }
1113}
1114
1115/// Owned iterator yielding `(K, V)` by value — moves out of the entries, no
1116/// clone. Each drained slot is nulled so the table destructor stays a no-op.
1117pub struct HopscotchIntoIter<K: 'static, V: 'static> {
1118    table: *mut Table<K, V>,
1119    bucket_idx: usize,
1120    guard: kovan::Guard,
1121}
1122
1123impl<K, V> Iterator for HopscotchIntoIter<K, V> {
1124    type Item = (K, V);
1125
1126    fn next(&mut self) -> Option<(K, V)> {
1127        let table = unsafe { &*self.table };
1128        while self.bucket_idx < table.buckets.len() {
1129            let bucket = table.get_bucket(self.bucket_idx);
1130            self.bucket_idx += 1;
1131            let entry = bucket.slot.load(Ordering::Acquire, &self.guard).as_raw();
1132            if !entry.is_null() {
1133                bucket.slot.store(
1134                    unsafe { Shared::from_raw(core::ptr::null_mut()) },
1135                    Ordering::Relaxed,
1136                );
1137                let k = unsafe { core::ptr::read(&(*entry).key) };
1138                let v = unsafe { core::ptr::read(&(*entry).value) };
1139                unsafe {
1140                    alloc::alloc::dealloc(
1141                        entry as *mut u8,
1142                        core::alloc::Layout::new::<Entry<K, V>>(),
1143                    );
1144                }
1145                return Some((k, v));
1146            }
1147        }
1148        None
1149    }
1150}
1151
1152impl<K, V> Drop for HopscotchIntoIter<K, V> {
1153    fn drop(&mut self) {
1154        while self.next().is_some() {}
1155        // All slots nulled above; Table::drop frees only the bucket array.
1156        unsafe { drop(Box::from_raw(self.table)) };
1157    }
1158}
1159
1160impl<K, V, S> IntoIterator for HopscotchMap<K, V, S>
1161where
1162    K: 'static,
1163    V: 'static,
1164{
1165    type Item = (K, V);
1166    type IntoIter = HopscotchIntoIter<K, V>;
1167
1168    fn into_iter(self) -> HopscotchIntoIter<K, V> {
1169        let mut me = core::mem::ManuallyDrop::new(self);
1170        let guard = pin();
1171        let table = me.table.load(Ordering::Relaxed, &guard).as_raw();
1172        unsafe { core::ptr::drop_in_place(&mut me.hasher) };
1173        HopscotchIntoIter {
1174            table,
1175            bucket_idx: 0,
1176            guard,
1177        }
1178    }
1179}
1180
1181impl<K, V, S> core::iter::FromIterator<(K, V)> for HopscotchMap<K, V, S>
1182where
1183    K: Hash + Eq + Clone + Send + 'static,
1184    V: Clone + Send + 'static,
1185    S: BuildHasher + Default,
1186{
1187    fn from_iter<I: IntoIterator<Item = (K, V)>>(iter: I) -> Self {
1188        let map = Self::with_hasher(S::default());
1189        for (k, v) in iter {
1190            map.insert(k, v);
1191        }
1192        map
1193    }
1194}
1195
1196impl<'a, K, V, S> IntoIterator for &'a HopscotchMap<K, V, S>
1197where
1198    K: Hash + Eq + Clone + 'static,
1199    V: Clone + 'static,
1200    S: BuildHasher,
1201{
1202    type Item = (K, V);
1203    type IntoIter = HopscotchIter<'a, K, V, S>;
1204
1205    fn into_iter(self) -> Self::IntoIter {
1206        self.iter()
1207    }
1208}
1209
1210enum InsertResult<V> {
1211    Success(Option<V>),
1212    Exists(V),
1213    NeedResize,
1214    Retry,
1215}
1216
1217#[cfg(feature = "std")]
1218impl<K, V> Default for HopscotchMap<K, V, FixedState>
1219where
1220    K: Hash + Eq + Clone + 'static,
1221    V: Clone + 'static,
1222{
1223    fn default() -> Self {
1224        Self::new()
1225    }
1226}
1227
1228unsafe impl<K: Send, V: Send, S: Send> Send for HopscotchMap<K, V, S> {}
1229// SAFETY: Shared references allow moving K and V across threads (via insert/remove),
1230// so K and V must be Send in addition to Sync.
1231unsafe impl<K: Send + Sync, V: Send + Sync, S: Send + Sync> Sync for HopscotchMap<K, V, S> {}
1232
1233impl<K, V, S> Drop for HopscotchMap<K, V, S> {
1234    fn drop(&mut self) {
1235        // SAFETY: `drop(&mut self)` guarantees exclusive ownership — no concurrent
1236        // readers can exist. The Table's destructor frees the remaining entries.
1237        let guard = pin();
1238        let table_ptr = self.table.load(Ordering::Acquire, &guard);
1239
1240        unsafe {
1241            drop(Box::from_raw(table_ptr.as_raw()));
1242        }
1243
1244        // Flush nodes previously retired by concurrent operations (insert/remove/resize)
1245        // to prevent use-after-free during process teardown.
1246        drop(guard);
1247        kovan::flush();
1248    }
1249}
1250
1251#[cfg(test)]
1252mod tests {
1253    use super::*;
1254
1255    #[test]
1256    fn test_insert_and_get() {
1257        let map = HopscotchMap::new();
1258        assert_eq!(map.insert(1, 100), None);
1259        assert_eq!(map.get(&1), Some(100));
1260        assert_eq!(map.get(&2), None);
1261    }
1262
1263    #[test]
1264    fn test_growing() {
1265        let map = HopscotchMap::with_capacity(32);
1266        for i in 0..100 {
1267            map.insert(i, i * 2);
1268        }
1269        for i in 0..100 {
1270            assert_eq!(map.get(&i), Some(i * 2));
1271        }
1272    }
1273
1274    #[test]
1275    fn test_concurrent() {
1276        use alloc::sync::Arc;
1277        extern crate std;
1278        use std::thread;
1279
1280        let map = Arc::new(HopscotchMap::with_capacity(64));
1281        let mut handles = alloc::vec::Vec::new();
1282
1283        for thread_id in 0..4 {
1284            let map_clone = Arc::clone(&map);
1285            let handle = thread::spawn(move || {
1286                for i in 0..1000 {
1287                    let key = thread_id * 1000 + i;
1288                    map_clone.insert(key, key * 2);
1289                }
1290            });
1291            handles.push(handle);
1292        }
1293
1294        for handle in handles {
1295            handle.join().unwrap();
1296        }
1297
1298        for thread_id in 0..4 {
1299            for i in 0..1000 {
1300                let key = thread_id * 1000 + i;
1301                assert_eq!(map.get(&key), Some(key * 2));
1302            }
1303        }
1304    }
1305
1306    #[test]
1307    fn test_concurrent_insert_and_remove() {
1308        use alloc::sync::Arc;
1309        extern crate std;
1310        use std::thread;
1311
1312        let map = Arc::new(HopscotchMap::with_capacity(64));
1313
1314        // Phase 1: Pre-populate so removers have something to work with
1315        for thread_id in 0..4u64 {
1316            for i in 0..500u64 {
1317                let key = thread_id * 1000 + i;
1318                map.insert(key, key * 3);
1319            }
1320        }
1321
1322        let mut insert_handles = alloc::vec::Vec::new();
1323        let mut remove_handles = alloc::vec::Vec::new();
1324
1325        // Spawn inserter threads: each inserts keys in its own range
1326        for thread_id in 0..4u64 {
1327            let map_clone = Arc::clone(&map);
1328            insert_handles.push(thread::spawn(move || {
1329                for i in 0..500u64 {
1330                    let key = thread_id * 1000 + i;
1331                    map_clone.insert(key, key * 3);
1332                }
1333            }));
1334        }
1335
1336        // Spawn remover threads: each removes keys from the same ranges,
1337        // racing with inserters
1338        for thread_id in 0..4u64 {
1339            let map_clone = Arc::clone(&map);
1340            remove_handles.push(thread::spawn(move || {
1341                for i in 0..500u64 {
1342                    let key = thread_id * 1000 + i;
1343                    if let Some(val) = map_clone.remove(&key) {
1344                        // Value must be correct if present
1345                        assert_eq!(val, key * 3);
1346                    }
1347                }
1348            }));
1349        }
1350
1351        for handle in insert_handles {
1352            handle.join().unwrap();
1353        }
1354        for handle in remove_handles {
1355            handle.join().unwrap();
1356        }
1357
1358        // Verify: every remaining key has the correct value
1359        for thread_id in 0..4u64 {
1360            for i in 0..500u64 {
1361                let key = thread_id * 1000 + i;
1362                if let Some(val) = map.get(&key) {
1363                    assert_eq!(val, key * 3);
1364                }
1365            }
1366        }
1367    }
1368
1369    /// Regression: get_or_insert must not panic when a concurrent remove
1370    /// deletes the key between the internal insert and the return.
1371    #[test]
1372    fn test_hopscotch_get_or_insert_concurrent_remove() {
1373        use alloc::sync::Arc;
1374        extern crate std;
1375        use std::sync::Barrier;
1376        use std::thread;
1377
1378        let map = Arc::new(HopscotchMap::<u64, u64>::with_capacity(64));
1379        let barrier = Arc::new(Barrier::new(8));
1380
1381        let handles: Vec<_> = (0..8u64)
1382            .map(|tid| {
1383                let map = map.clone();
1384                let barrier = barrier.clone();
1385                thread::spawn(move || {
1386                    barrier.wait();
1387                    for i in 0..5000u64 {
1388                        let key = i % 32; // Small key space forces heavy contention
1389                        if tid % 2 == 0 {
1390                            // Half the threads do get_or_insert
1391                            let _ = map.get_or_insert(key, tid * 1000 + i);
1392                        } else {
1393                            // Other half remove
1394                            let _ = map.remove(&key);
1395                        }
1396                    }
1397                })
1398            })
1399            .collect();
1400
1401        for h in handles {
1402            h.join()
1403                .expect("Thread panicked during get_or_insert/remove race");
1404        }
1405    }
1406}