Skip to main content

kovan_map/
hopscotch.rs

1//! Lock-Free Growing/Shrinking Hopscotch Hash Map
2//!
3//! # Features
4//!
5//! - **Robust Concurrency**: Uses Copy-on-Move displacement to prevent Use-After-Free.
6//! - **Safe Resizing**: Uses a lightweight resize lock to prevent lost updates during migration.
7//! - **Memory reclamation**: Uses Kovan.
8//! - **Clone Support**: Supports V: Clone (e.g., `Arc<T>`) instead of just Copy.
9
10extern crate alloc;
11
12use alloc::boxed::Box;
13use alloc::vec::Vec;
14use core::borrow::Borrow;
15use core::hash::{BuildHasher, Hash};
16use core::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
17use foldhash::fast::FixedState;
18use kovan::{Atomic, RetiredNode, Shared, pin, retire};
19
20/// Neighborhood size (H parameter in hopscotch hashing)
21const NEIGHBORHOOD_SIZE: usize = 32;
22
23/// Initial capacity
24const INITIAL_CAPACITY: usize = 64;
25
26/// Load factor threshold for growing (75%)
27const GROW_THRESHOLD: f64 = 0.75;
28
29/// Load factor threshold for shrinking (25%)
30const SHRINK_THRESHOLD: f64 = 0.25;
31
32/// Minimum capacity to prevent excessive shrinking
33const MIN_CAPACITY: usize = 64;
34
35/// Maximum probe distance before giving up and resizing
36const MAX_PROBE_DISTANCE: usize = 512;
37
38/// A bucket in the hopscotch hash table
39struct Bucket<K, V> {
40    /// Bitmap indicating which of the next H slots contain items that hash to this bucket
41    hop_info: AtomicU32,
42    /// The actual key-value slot at this position
43    slot: Atomic<Entry<K, V>>,
44}
45
46/// An entry in the hash table
47#[repr(C)]
48struct Entry<K, V> {
49    retired: RetiredNode,
50    hash: u64,
51    key: K,
52    value: V,
53}
54
55impl<K: Clone, V: Clone> Clone for Entry<K, V> {
56    fn clone(&self) -> Self {
57        Self {
58            retired: RetiredNode::new(),
59            hash: self.hash,
60            key: self.key.clone(),
61            value: self.value.clone(),
62        }
63    }
64}
65
66/// The hash table structure
67#[repr(C)]
68struct Table<K, V> {
69    retired: RetiredNode,
70    buckets: Box<[Bucket<K, V>]>,
71    capacity: usize,
72    mask: usize,
73}
74
75impl<K, V> Table<K, V> {
76    fn new(capacity: usize) -> Self {
77        let capacity = capacity.next_power_of_two().max(MIN_CAPACITY);
78        // We add padding to the array so we don't have to check bounds constantly
79        // during neighborhood scans.
80        let mut buckets = Vec::with_capacity(capacity + NEIGHBORHOOD_SIZE);
81
82        for _ in 0..(capacity + NEIGHBORHOOD_SIZE) {
83            buckets.push(Bucket {
84                hop_info: AtomicU32::new(0),
85                slot: Atomic::null(),
86            });
87        }
88
89        Self {
90            retired: RetiredNode::new(),
91            buckets: buckets.into_boxed_slice(),
92            capacity,
93            mask: capacity - 1,
94        }
95    }
96
97    #[inline(always)]
98    fn bucket_index(&self, hash: u64) -> usize {
99        (hash as usize) & self.mask
100    }
101
102    #[inline(always)]
103    fn get_bucket(&self, idx: usize) -> &Bucket<K, V> {
104        // SAFETY: Internal indices are calculated via mask or bounded offset loops.
105        // The buckets array has padding to handle overflow up to NEIGHBORHOOD_SIZE.
106        unsafe { self.buckets.get_unchecked(idx) }
107    }
108}
109
110/// A concurrent, lock-free hash map based on Hopscotch Hashing.
111pub struct HopscotchMap<K: 'static, V: 'static, S = FixedState> {
112    table: Atomic<Table<K, V>>,
113    count: AtomicUsize,
114    /// Prevents concurrent writes during resize migration to avoid lost updates
115    resizing: AtomicBool,
116    hasher: S,
117}
118
119#[cfg(feature = "std")]
120impl<K, V> HopscotchMap<K, V, FixedState>
121where
122    K: Hash + Eq + Clone + 'static,
123    V: Clone + 'static,
124{
125    /// Creates a new `HopscotchMap` with default capacity and hasher.
126    pub fn new() -> Self {
127        Self::with_hasher(FixedState::default())
128    }
129
130    /// Creates a new `HopscotchMap` with the specified capacity and default hasher.
131    pub fn with_capacity(capacity: usize) -> Self {
132        Self::with_capacity_and_hasher(capacity, FixedState::default())
133    }
134}
135
136impl<K, V, S> HopscotchMap<K, V, S>
137where
138    K: Hash + Eq + Clone + 'static,
139    V: Clone + 'static,
140    S: BuildHasher,
141{
142    /// Creates a new `HopscotchMap` with the specified hasher and default capacity.
143    pub fn with_hasher(hasher: S) -> Self {
144        Self::with_capacity_and_hasher(INITIAL_CAPACITY, hasher)
145    }
146
147    /// Creates a new `HopscotchMap` with the specified capacity and hasher.
148    pub fn with_capacity_and_hasher(capacity: usize, hasher: S) -> Self {
149        let table = Table::new(capacity);
150        Self {
151            table: Atomic::new(Box::into_raw(Box::new(table))),
152            count: AtomicUsize::new(0),
153            resizing: AtomicBool::new(false),
154            hasher,
155        }
156    }
157
158    /// Returns the number of elements in the map.
159    pub fn len(&self) -> usize {
160        self.count.load(Ordering::Relaxed)
161    }
162
163    /// Returns `true` if the map contains no elements.
164    pub fn is_empty(&self) -> bool {
165        self.len() == 0
166    }
167
168    /// Returns the current capacity of the map.
169    pub fn capacity(&self) -> usize {
170        let guard = pin();
171        let table_ptr = self.table.load(Ordering::Acquire, &guard);
172        unsafe { (*table_ptr.as_raw()).capacity }
173    }
174
175    #[inline]
176    fn wait_for_resize(&self) {
177        while self.resizing.load(Ordering::Acquire) {
178            core::hint::spin_loop();
179        }
180    }
181
182    /// Returns the value corresponding to the key.
183    pub fn get<Q>(&self, key: &Q) -> Option<V>
184    where
185        K: Borrow<Q>,
186        Q: Hash + Eq + ?Sized,
187    {
188        let hash = self.hasher.hash_one(key);
189        let guard = pin();
190        let table_ptr = self.table.load(Ordering::Acquire, &guard);
191        let table = unsafe { &*table_ptr.as_raw() };
192
193        let bucket_idx = table.bucket_index(hash);
194        let bucket = table.get_bucket(bucket_idx);
195
196        let hop_info = bucket.hop_info.load(Ordering::Acquire);
197
198        if hop_info == 0 {
199            return None;
200        }
201
202        for offset in 0..NEIGHBORHOOD_SIZE {
203            if hop_info & (1 << offset) != 0 {
204                let slot_idx = bucket_idx + offset;
205                let slot_bucket = table.get_bucket(slot_idx);
206                let entry_ptr = slot_bucket.slot.load(Ordering::Acquire, &guard);
207
208                if !entry_ptr.is_null() {
209                    let entry = unsafe { &*entry_ptr.as_raw() };
210                    if entry.hash == hash && entry.key.borrow() == key {
211                        return Some(entry.value.clone());
212                    }
213                }
214            }
215        }
216
217        None
218    }
219
220    /// Inserts a key-value pair into the map.
221    pub fn insert(&self, key: K, value: V) -> Option<V> {
222        self.insert_impl(key, value, false)
223    }
224
225    /// Helper for get_or_insert logic.
226    fn insert_impl(&self, key: K, value: V, only_if_absent: bool) -> Option<V> {
227        let hash = self.hasher.hash_one(&key);
228        // Track whether we already incremented count for a new insert across
229        // retry iterations.  Prevents both under-count (which causes cascading
230        // resizes on Windows) and double-count.
231        let mut counted = false;
232
233        loop {
234            self.wait_for_resize();
235
236            let guard = pin();
237            let table_ptr = self.table.load(Ordering::Acquire, &guard);
238            let table = unsafe { &*table_ptr.as_raw() };
239
240            if self.resizing.load(Ordering::Acquire) {
241                continue;
242            }
243
244            // Note: We pass clones to try_insert if we loop here, but try_insert consumes them.
245            // Since `insert_impl` owns `key` and `value`, we must clone them for the call
246            // because `try_insert` might return `Retry` (looping again).
247            match self.try_insert(
248                table,
249                hash,
250                key.clone(),
251                value.clone(),
252                only_if_absent,
253                &guard,
254            ) {
255                InsertResult::Success(old_val) => {
256                    // Count new inserts immediately so that concurrent removes
257                    // cannot decrement count below the true entry count.
258                    if old_val.is_none() && !counted {
259                        self.count.fetch_add(1, Ordering::Relaxed);
260                        counted = true;
261                    }
262
263                    // If a resize started while we were inserting, our update
264                    // might have been missed by the migration.
265                    // We must retry to ensure we write to the new table.
266                    if self.resizing.load(Ordering::SeqCst)
267                        || self.table.load(Ordering::SeqCst, &guard) != table_ptr
268                    {
269                        continue;
270                    }
271
272                    if counted {
273                        let new_count = self.count.load(Ordering::Relaxed);
274                        let current_capacity = table.capacity;
275                        let load_factor = new_count as f64 / current_capacity as f64;
276
277                        if load_factor > GROW_THRESHOLD {
278                            drop(guard);
279                            self.try_resize(current_capacity * 2);
280                        }
281                    }
282                    return old_val;
283                }
284                InsertResult::Exists(existing_val) => {
285                    return Some(existing_val);
286                }
287                InsertResult::NeedResize => {
288                    let current_capacity = table.capacity;
289                    drop(guard);
290                    self.try_resize(current_capacity * 2);
291                    continue;
292                }
293                InsertResult::Retry => {
294                    continue;
295                }
296            }
297        }
298    }
299
300    /// Returns the value corresponding to the key, or inserts the given value if the key is not present.
301    ///
302    /// When multiple threads call this concurrently for the same key (without
303    /// concurrent removes), all callers receive the same value.
304    pub fn get_or_insert(&self, key: K, value: V) -> V {
305        // Fast path: key already exists — no clone, no insert.
306        if let Some(v) = self.get(&key) {
307            return v;
308        }
309        // Slow path: insert_if_absent and use the return value directly.
310        // We must NOT do insert-then-get because a concurrent remove between
311        // the two operations would cause get to return None.
312        let key2 = key.clone();
313        match self.insert_impl(key, value.clone(), true) {
314            None => {
315                // We inserted, but concurrent inserts may have also placed
316                // the same key at a different offset (the CAS-then-hop-bit
317                // window allows duplicates). Re-get returns the canonical
318                // (lowest-offset) entry so every caller agrees on one value.
319                self.get(&key2).unwrap_or(value)
320            }
321            Some(existing) => existing, // Key already existed
322        }
323    }
324
325    /// Insert a key-value pair only if the key does not exist.
326    /// Returns `None` if inserted, `Some(existing_value)` if the key already exists.
327    pub fn insert_if_absent(&self, key: K, value: V) -> Option<V> {
328        self.insert_impl(key, value, true)
329    }
330
331    /// Removes a key from the map, returning the value at the key if the key was previously in the map.
332    pub fn remove<Q>(&self, key: &Q) -> Option<V>
333    where
334        K: Borrow<Q>,
335        Q: Hash + Eq + ?Sized,
336    {
337        let hash = self.hasher.hash_one(key);
338
339        loop {
340            self.wait_for_resize();
341
342            let guard = pin();
343            let table_ptr = self.table.load(Ordering::Acquire, &guard);
344            let table = unsafe { &*table_ptr.as_raw() };
345
346            if self.resizing.load(Ordering::Acquire) {
347                continue;
348            }
349
350            let bucket_idx = table.bucket_index(hash);
351            let bucket = table.get_bucket(bucket_idx);
352
353            let hop_info = bucket.hop_info.load(Ordering::Acquire);
354            if hop_info == 0 {
355                return None;
356            }
357
358            for offset in 0..NEIGHBORHOOD_SIZE {
359                if hop_info & (1 << offset) != 0 {
360                    let slot_idx = bucket_idx + offset;
361                    let slot_bucket = table.get_bucket(slot_idx);
362                    let entry_ptr = slot_bucket.slot.load(Ordering::Acquire, &guard);
363
364                    if !entry_ptr.is_null() {
365                        let entry = unsafe { &*entry_ptr.as_raw() };
366                        if entry.hash == hash && entry.key.borrow() == key {
367                            let old_value = entry.value.clone();
368
369                            match slot_bucket.slot.compare_exchange(
370                                entry_ptr,
371                                unsafe { Shared::from_raw(core::ptr::null_mut()) },
372                                Ordering::Release,
373                                Ordering::Relaxed,
374                                &guard,
375                            ) {
376                                Ok(_) => {
377                                    let mask = !(1u32 << offset);
378                                    bucket.hop_info.fetch_and(mask, Ordering::Release);
379
380                                    unsafe { retire(entry_ptr.as_raw()) };
381
382                                    // Saturating decrement: prevent count from wrapping
383                                    // to usize::MAX which would trigger catastrophic
384                                    // cascading resizes.
385                                    if let Ok(prev) = self.count.fetch_update(
386                                        Ordering::Relaxed,
387                                        Ordering::Relaxed,
388                                        |c| c.checked_sub(1),
389                                    ) {
390                                        let new_count = prev - 1;
391                                        let current_capacity = table.capacity;
392                                        let load_factor =
393                                            new_count as f64 / current_capacity as f64;
394
395                                        if load_factor < SHRINK_THRESHOLD
396                                            && current_capacity > MIN_CAPACITY
397                                        {
398                                            drop(guard);
399                                            self.try_resize(current_capacity / 2);
400                                        }
401                                    }
402
403                                    return Some(old_value);
404                                }
405                                Err(_) => {
406                                    break;
407                                }
408                            }
409                        }
410                    }
411                }
412            }
413            return None;
414        }
415    }
416
417    /// Clears the map, removing all key-value pairs.
418    pub fn clear(&self) {
419        while self
420            .resizing
421            .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
422            .is_err()
423        {
424            core::hint::spin_loop();
425        }
426
427        let guard = pin();
428        let table_ptr = self.table.load(Ordering::Acquire, &guard);
429        let table = unsafe { &*table_ptr.as_raw() };
430
431        for i in 0..(table.capacity + NEIGHBORHOOD_SIZE) {
432            let bucket = table.get_bucket(i);
433            let entry_ptr = bucket.slot.load(Ordering::Acquire, &guard);
434
435            if !entry_ptr.is_null()
436                && bucket
437                    .slot
438                    .compare_exchange(
439                        entry_ptr,
440                        unsafe { Shared::from_raw(core::ptr::null_mut()) },
441                        Ordering::Release,
442                        Ordering::Relaxed,
443                        &guard,
444                    )
445                    .is_ok()
446            {
447                unsafe { retire(entry_ptr.as_raw()) };
448            }
449
450            if i < table.capacity {
451                let b = table.get_bucket(i);
452                b.hop_info.store(0, Ordering::Release);
453            }
454        }
455
456        self.count.store(0, Ordering::Release);
457        self.resizing.store(false, Ordering::Release);
458    }
459
460    fn try_insert(
461        &self,
462        table: &Table<K, V>,
463        hash: u64,
464        key: K,
465        value: V,
466        only_if_absent: bool,
467        guard: &kovan::Guard,
468    ) -> InsertResult<V> {
469        let bucket_idx = table.bucket_index(hash);
470        let bucket = table.get_bucket(bucket_idx);
471
472        // 1. Check if key exists (Update)
473        let hop_info = bucket.hop_info.load(Ordering::Acquire);
474        for offset in 0..NEIGHBORHOOD_SIZE {
475            if hop_info & (1 << offset) != 0 {
476                let slot_idx = bucket_idx + offset;
477                let slot_bucket = table.get_bucket(slot_idx);
478                let entry_ptr = slot_bucket.slot.load(Ordering::Acquire, guard);
479
480                if !entry_ptr.is_null() {
481                    let entry = unsafe { &*entry_ptr.as_raw() };
482                    if entry.hash == hash && entry.key == key {
483                        if only_if_absent {
484                            return InsertResult::Exists(entry.value.clone());
485                        }
486
487                        let old_value = entry.value.clone();
488                        // Clone key and value because if CAS fails, we retry, and we are inside a loop.
489                        // We cannot move out of `key` or `value` inside a loop.
490                        let new_entry = Box::into_raw(Box::new(Entry {
491                            retired: RetiredNode::new(),
492                            hash,
493                            key: key.clone(),
494                            value: value.clone(),
495                        }));
496
497                        match slot_bucket.slot.compare_exchange(
498                            entry_ptr,
499                            unsafe { Shared::from_raw(new_entry) },
500                            Ordering::Release,
501                            Ordering::Relaxed,
502                            guard,
503                        ) {
504                            Ok(_) => {
505                                unsafe { retire(entry_ptr.as_raw()) };
506                                return InsertResult::Success(Some(old_value));
507                            }
508                            Err(_) => {
509                                drop(unsafe { Box::from_raw(new_entry) });
510                                return InsertResult::Retry;
511                            }
512                        }
513                    }
514                }
515            }
516        }
517
518        // 2. Find empty slot
519        for offset in 0..NEIGHBORHOOD_SIZE {
520            let slot_idx = bucket_idx + offset;
521            if slot_idx >= table.capacity + NEIGHBORHOOD_SIZE {
522                return InsertResult::NeedResize;
523            }
524
525            let slot_bucket = table.get_bucket(slot_idx);
526            let entry_ptr = slot_bucket.slot.load(Ordering::Acquire, guard);
527
528            if entry_ptr.is_null() {
529                // Clone key and value. If CAS fails, we continue the loop, so we need the originals
530                // for the next iteration.
531                let new_entry = Box::into_raw(Box::new(Entry {
532                    retired: RetiredNode::new(),
533                    hash,
534                    key: key.clone(),
535                    value: value.clone(),
536                }));
537
538                match slot_bucket.slot.compare_exchange(
539                    unsafe { Shared::from_raw(core::ptr::null_mut()) },
540                    unsafe { Shared::from_raw(new_entry) },
541                    Ordering::Release,
542                    Ordering::Relaxed,
543                    guard,
544                ) {
545                    Ok(_) => {
546                        bucket.hop_info.fetch_or(1u32 << offset, Ordering::Release);
547                        return InsertResult::Success(None);
548                    }
549                    Err(_) => {
550                        drop(unsafe { Box::from_raw(new_entry) });
551                        continue;
552                    }
553                }
554            }
555        }
556
557        // 3. Try displacement
558        match self.try_find_closer_slot(table, bucket_idx, guard) {
559            Some(final_offset) if final_offset < NEIGHBORHOOD_SIZE => {
560                let slot_idx = bucket_idx + final_offset;
561                let slot_bucket = table.get_bucket(slot_idx);
562
563                let curr = slot_bucket.slot.load(Ordering::Relaxed, guard);
564                if !curr.is_null() {
565                    return InsertResult::Retry;
566                }
567
568                // This is the final attempt in this function. We can move key/value here
569                // because previous usages were clones.
570                let new_entry = Box::into_raw(Box::new(Entry {
571                    retired: RetiredNode::new(),
572                    hash,
573                    key,
574                    value,
575                }));
576
577                match slot_bucket.slot.compare_exchange(
578                    unsafe { Shared::from_raw(core::ptr::null_mut()) },
579                    unsafe { Shared::from_raw(new_entry) },
580                    Ordering::Release,
581                    Ordering::Relaxed,
582                    guard,
583                ) {
584                    Ok(_) => {
585                        bucket
586                            .hop_info
587                            .fetch_or(1u32 << final_offset, Ordering::Release);
588                        InsertResult::Success(None)
589                    }
590                    Err(_) => {
591                        drop(unsafe { Box::from_raw(new_entry) });
592                        InsertResult::Retry
593                    }
594                }
595            }
596            _ => InsertResult::NeedResize,
597        }
598    }
599
600    fn try_find_closer_slot(
601        &self,
602        table: &Table<K, V>,
603        bucket_idx: usize,
604        guard: &kovan::Guard,
605    ) -> Option<usize> {
606        for probe_offset in NEIGHBORHOOD_SIZE..MAX_PROBE_DISTANCE {
607            let probe_idx = bucket_idx + probe_offset;
608            if probe_idx >= table.capacity + NEIGHBORHOOD_SIZE {
609                return None;
610            }
611
612            let probe_bucket = table.get_bucket(probe_idx);
613            let entry_ptr = probe_bucket.slot.load(Ordering::Acquire, guard);
614
615            if entry_ptr.is_null() {
616                return self.try_move_closer(table, bucket_idx, probe_idx, guard);
617            }
618        }
619        None
620    }
621
622    fn try_move_closer(
623        &self,
624        table: &Table<K, V>,
625        target_idx: usize,
626        empty_idx: usize,
627        guard: &kovan::Guard,
628    ) -> Option<usize> {
629        let mut current_empty = empty_idx;
630
631        while current_empty > target_idx + NEIGHBORHOOD_SIZE - 1 {
632            let mut moved = false;
633
634            for offset in 1..NEIGHBORHOOD_SIZE.min(current_empty - target_idx) {
635                let candidate_idx = current_empty - offset;
636                let candidate_bucket = table.get_bucket(candidate_idx);
637                let entry_ptr = candidate_bucket.slot.load(Ordering::Acquire, guard);
638
639                if !entry_ptr.is_null() {
640                    let entry = unsafe { &*entry_ptr.as_raw() };
641                    let entry_home = table.bucket_index(entry.hash);
642
643                    if entry_home <= candidate_idx && current_empty < entry_home + NEIGHBORHOOD_SIZE
644                    {
645                        // Copy-on-Move for safety
646                        let new_entry = Box::into_raw(Box::new(entry.clone()));
647                        let empty_bucket = table.get_bucket(current_empty);
648
649                        match empty_bucket.slot.compare_exchange(
650                            unsafe { Shared::from_raw(core::ptr::null_mut()) },
651                            unsafe { Shared::from_raw(new_entry) },
652                            Ordering::Release,
653                            Ordering::Relaxed,
654                            guard,
655                        ) {
656                            Ok(_) => {
657                                match candidate_bucket.slot.compare_exchange(
658                                    entry_ptr,
659                                    unsafe { Shared::from_raw(core::ptr::null_mut()) },
660                                    Ordering::Release,
661                                    Ordering::Relaxed,
662                                    guard,
663                                ) {
664                                    Ok(_) => {
665                                        let old_offset = candidate_idx - entry_home;
666                                        let new_offset = current_empty - entry_home;
667
668                                        let home_bucket = table.get_bucket(entry_home);
669                                        home_bucket
670                                            .hop_info
671                                            .fetch_and(!(1u32 << old_offset), Ordering::Release);
672                                        home_bucket
673                                            .hop_info
674                                            .fetch_or(1u32 << new_offset, Ordering::Release);
675
676                                        unsafe { retire(entry_ptr.as_raw()) };
677                                        current_empty = candidate_idx;
678                                        moved = true;
679                                        break;
680                                    }
681                                    Err(_) => {
682                                        // Attempt to revert the insertion of new_entry.
683                                        match empty_bucket.slot.compare_exchange(
684                                            unsafe { Shared::from_raw(new_entry) },
685                                            unsafe { Shared::from_raw(core::ptr::null_mut()) },
686                                            Ordering::Release,
687                                            Ordering::Relaxed,
688                                            guard,
689                                        ) {
690                                            Ok(_) => {
691                                                // Revert succeeded: no other thread touched it.
692                                                // Safe to drop immediately.
693                                                unsafe { drop(Box::from_raw(new_entry)) };
694                                            }
695                                            Err(_) => {
696                                                // Displacement already happened here...
697                                                // Too late, so we just drop it. Another thread found new_entry,
698                                                // displaced it, and replaced it with something else.
699                                                // This means new_entry is now part of the blocks
700                                                // and was already "retired" by the other thread,
701                                                // OR it was just moved. In either case, we must
702                                                // let the reclamation system handle it to avoid a
703                                                // double free.
704                                                unsafe { retire(new_entry) };
705                                            }
706                                        }
707                                        continue;
708                                    }
709                                }
710                            }
711                            Err(_) => {
712                                unsafe { drop(Box::from_raw(new_entry)) };
713                                continue;
714                            }
715                        }
716                    }
717                }
718            }
719
720            if !moved {
721                return None;
722            }
723        }
724
725        if current_empty >= target_idx && current_empty < target_idx + NEIGHBORHOOD_SIZE {
726            Some(current_empty - target_idx)
727        } else {
728            None
729        }
730    }
731
732    fn insert_into_new_table(
733        &self,
734        table: &Table<K, V>,
735        hash: u64,
736        key: K,
737        value: V,
738        guard: &kovan::Guard,
739    ) -> bool {
740        let bucket_idx = table.bucket_index(hash);
741
742        for probe_offset in 0..(table.capacity + NEIGHBORHOOD_SIZE) {
743            let probe_idx = bucket_idx + probe_offset;
744            if probe_idx >= table.capacity + NEIGHBORHOOD_SIZE {
745                break;
746            }
747
748            let probe_bucket = table.get_bucket(probe_idx);
749            let slot_ptr = probe_bucket.slot.load(Ordering::Relaxed, guard);
750
751            if slot_ptr.is_null() {
752                let offset_from_home = probe_idx - bucket_idx;
753
754                if offset_from_home < NEIGHBORHOOD_SIZE {
755                    let new_entry = Box::into_raw(Box::new(Entry {
756                        retired: RetiredNode::new(),
757                        hash,
758                        key,
759                        value,
760                    }));
761                    probe_bucket
762                        .slot
763                        .store(unsafe { Shared::from_raw(new_entry) }, Ordering::Release);
764
765                    let bucket = table.get_bucket(bucket_idx);
766                    bucket
767                        .hop_info
768                        .fetch_or(1u32 << offset_from_home, Ordering::Relaxed);
769                    return true;
770                } else {
771                    return false;
772                }
773            }
774        }
775        false
776    }
777
778    fn try_resize(&self, new_capacity: usize) {
779        if self
780            .resizing
781            .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
782            .is_err()
783        {
784            return;
785        }
786
787        let new_capacity = new_capacity.next_power_of_two().max(MIN_CAPACITY);
788        let guard = pin();
789        let old_table_ptr = self.table.load(Ordering::Acquire, &guard);
790        let old_table = unsafe { &*old_table_ptr.as_raw() };
791
792        if old_table.capacity == new_capacity {
793            self.resizing.store(false, Ordering::Release);
794            return;
795        }
796
797        let new_table = Box::into_raw(Box::new(Table::new(new_capacity)));
798        let new_table_ref = unsafe { &*new_table };
799
800        let mut success = true;
801
802        for i in 0..(old_table.capacity + NEIGHBORHOOD_SIZE) {
803            let bucket = old_table.get_bucket(i);
804            let entry_ptr = bucket.slot.load(Ordering::Acquire, &guard);
805
806            if !entry_ptr.is_null() {
807                let entry = unsafe { &*entry_ptr.as_raw() };
808                if !self.insert_into_new_table(
809                    new_table_ref,
810                    entry.hash,
811                    entry.key.clone(),
812                    entry.value.clone(),
813                    &guard,
814                ) {
815                    success = false;
816                    break;
817                }
818            }
819        }
820
821        if success {
822            match self.table.compare_exchange(
823                old_table_ptr,
824                unsafe { Shared::from_raw(new_table) },
825                Ordering::Release,
826                Ordering::Relaxed,
827                &guard,
828            ) {
829                Ok(_) => {
830                    unsafe { retire(old_table_ptr.as_raw()) };
831                }
832                Err(_) => {
833                    success = false;
834                }
835            }
836        }
837
838        if !success {
839            for i in 0..(new_table_ref.capacity + NEIGHBORHOOD_SIZE) {
840                let bucket = new_table_ref.get_bucket(i);
841                let entry_ptr = bucket.slot.load(Ordering::Relaxed, &guard);
842                if !entry_ptr.is_null() {
843                    unsafe {
844                        drop(Box::from_raw(entry_ptr.as_raw()));
845                    }
846                }
847            }
848            unsafe {
849                drop(Box::from_raw(new_table));
850            }
851        }
852
853        self.resizing.store(false, Ordering::Release);
854    }
855    /// Returns an iterator over the map entries.
856    pub fn iter(&self) -> HopscotchIter<'_, K, V, S> {
857        let guard = pin();
858        let table_ptr = self.table.load(Ordering::Acquire, &guard);
859        let _table = unsafe { &*table_ptr.as_raw() };
860        HopscotchIter {
861            map: self,
862            bucket_idx: 0,
863            guard,
864        }
865    }
866
867    /// Returns an iterator over the map keys.
868    pub fn keys(&self) -> HopscotchKeys<'_, K, V, S> {
869        HopscotchKeys { iter: self.iter() }
870    }
871
872    /// Get the underlying hasher.
873    pub fn hasher(&self) -> &S {
874        &self.hasher
875    }
876}
877
878/// Iterator over HopscotchMap entries.
879pub struct HopscotchIter<'a, K: 'static, V: 'static, S> {
880    map: &'a HopscotchMap<K, V, S>,
881    bucket_idx: usize,
882    guard: kovan::Guard,
883}
884
885impl<'a, K, V, S> Iterator for HopscotchIter<'a, K, V, S>
886where
887    K: Clone,
888    V: Clone,
889{
890    type Item = (K, V);
891
892    fn next(&mut self) -> Option<Self::Item> {
893        let table_ptr = self.map.table.load(Ordering::Acquire, &self.guard);
894        let table = unsafe { &*table_ptr.as_raw() };
895
896        while self.bucket_idx < table.buckets.len() {
897            let bucket = table.get_bucket(self.bucket_idx);
898            self.bucket_idx += 1;
899
900            let entry_ptr = bucket.slot.load(Ordering::Acquire, &self.guard);
901            if !entry_ptr.is_null() {
902                let entry = unsafe { &*entry_ptr.as_raw() };
903                return Some((entry.key.clone(), entry.value.clone()));
904            }
905        }
906        None
907    }
908}
909
910/// Iterator over HopscotchMap keys.
911pub struct HopscotchKeys<'a, K: 'static, V: 'static, S> {
912    iter: HopscotchIter<'a, K, V, S>,
913}
914
915impl<'a, K, V, S> Iterator for HopscotchKeys<'a, K, V, S>
916where
917    K: Clone,
918    V: Clone,
919{
920    type Item = K;
921
922    fn next(&mut self) -> Option<Self::Item> {
923        self.iter.next().map(|(k, _)| k)
924    }
925}
926
927impl<'a, K, V, S> IntoIterator for &'a HopscotchMap<K, V, S>
928where
929    K: Hash + Eq + Clone + 'static,
930    V: Clone + 'static,
931    S: BuildHasher,
932{
933    type Item = (K, V);
934    type IntoIter = HopscotchIter<'a, K, V, S>;
935
936    fn into_iter(self) -> Self::IntoIter {
937        self.iter()
938    }
939}
940
941enum InsertResult<V> {
942    Success(Option<V>),
943    Exists(V),
944    NeedResize,
945    Retry,
946}
947
948#[cfg(feature = "std")]
949impl<K, V> Default for HopscotchMap<K, V, FixedState>
950where
951    K: Hash + Eq + Clone + 'static,
952    V: Clone + 'static,
953{
954    fn default() -> Self {
955        Self::new()
956    }
957}
958
959unsafe impl<K: Send, V: Send, S: Send> Send for HopscotchMap<K, V, S> {}
960// SAFETY: Shared references allow moving K and V across threads (via insert/remove),
961// so K and V must be Send in addition to Sync.
962unsafe impl<K: Send + Sync, V: Send + Sync, S: Send + Sync> Sync for HopscotchMap<K, V, S> {}
963
964impl<K, V, S> Drop for HopscotchMap<K, V, S> {
965    fn drop(&mut self) {
966        // SAFETY: `drop(&mut self)` guarantees exclusive ownership — no concurrent
967        // readers can exist. Free nodes immediately instead of deferring via `retire()`.
968        let guard = pin();
969        let table_ptr = self.table.load(Ordering::Acquire, &guard);
970        let table = unsafe { &*table_ptr.as_raw() };
971
972        for i in 0..(table.capacity + NEIGHBORHOOD_SIZE) {
973            let bucket = table.get_bucket(i);
974            let entry_ptr = bucket.slot.load(Ordering::Acquire, &guard);
975
976            if !entry_ptr.is_null() {
977                unsafe {
978                    drop(Box::from_raw(entry_ptr.as_raw()));
979                }
980            }
981        }
982
983        unsafe {
984            drop(Box::from_raw(table_ptr.as_raw()));
985        }
986
987        // Flush nodes previously retired by concurrent operations (insert/remove/resize)
988        // to prevent use-after-free during process teardown.
989        drop(guard);
990        kovan::flush();
991    }
992}
993
994#[cfg(test)]
995mod tests {
996    use super::*;
997
998    #[test]
999    fn test_insert_and_get() {
1000        let map = HopscotchMap::new();
1001        assert_eq!(map.insert(1, 100), None);
1002        assert_eq!(map.get(&1), Some(100));
1003        assert_eq!(map.get(&2), None);
1004    }
1005
1006    #[test]
1007    fn test_growing() {
1008        let map = HopscotchMap::with_capacity(32);
1009        for i in 0..100 {
1010            map.insert(i, i * 2);
1011        }
1012        for i in 0..100 {
1013            assert_eq!(map.get(&i), Some(i * 2));
1014        }
1015    }
1016
1017    #[test]
1018    fn test_concurrent() {
1019        use alloc::sync::Arc;
1020        extern crate std;
1021        use std::thread;
1022
1023        let map = Arc::new(HopscotchMap::with_capacity(64));
1024        let mut handles = alloc::vec::Vec::new();
1025
1026        for thread_id in 0..4 {
1027            let map_clone = Arc::clone(&map);
1028            let handle = thread::spawn(move || {
1029                for i in 0..1000 {
1030                    let key = thread_id * 1000 + i;
1031                    map_clone.insert(key, key * 2);
1032                }
1033            });
1034            handles.push(handle);
1035        }
1036
1037        for handle in handles {
1038            handle.join().unwrap();
1039        }
1040
1041        for thread_id in 0..4 {
1042            for i in 0..1000 {
1043                let key = thread_id * 1000 + i;
1044                assert_eq!(map.get(&key), Some(key * 2));
1045            }
1046        }
1047    }
1048
1049    #[test]
1050    fn test_concurrent_insert_and_remove() {
1051        use alloc::sync::Arc;
1052        extern crate std;
1053        use std::thread;
1054
1055        let map = Arc::new(HopscotchMap::with_capacity(64));
1056
1057        // Phase 1: Pre-populate so removers have something to work with
1058        for thread_id in 0..4u64 {
1059            for i in 0..500u64 {
1060                let key = thread_id * 1000 + i;
1061                map.insert(key, key * 3);
1062            }
1063        }
1064
1065        let mut insert_handles = alloc::vec::Vec::new();
1066        let mut remove_handles = alloc::vec::Vec::new();
1067
1068        // Spawn inserter threads: each inserts keys in its own range
1069        for thread_id in 0..4u64 {
1070            let map_clone = Arc::clone(&map);
1071            insert_handles.push(thread::spawn(move || {
1072                for i in 0..500u64 {
1073                    let key = thread_id * 1000 + i;
1074                    map_clone.insert(key, key * 3);
1075                }
1076            }));
1077        }
1078
1079        // Spawn remover threads: each removes keys from the same ranges,
1080        // racing with inserters
1081        for thread_id in 0..4u64 {
1082            let map_clone = Arc::clone(&map);
1083            remove_handles.push(thread::spawn(move || {
1084                for i in 0..500u64 {
1085                    let key = thread_id * 1000 + i;
1086                    if let Some(val) = map_clone.remove(&key) {
1087                        // Value must be correct if present
1088                        assert_eq!(val, key * 3);
1089                    }
1090                }
1091            }));
1092        }
1093
1094        for handle in insert_handles {
1095            handle.join().unwrap();
1096        }
1097        for handle in remove_handles {
1098            handle.join().unwrap();
1099        }
1100
1101        // Verify: every remaining key has the correct value
1102        for thread_id in 0..4u64 {
1103            for i in 0..500u64 {
1104                let key = thread_id * 1000 + i;
1105                if let Some(val) = map.get(&key) {
1106                    assert_eq!(val, key * 3);
1107                }
1108            }
1109        }
1110    }
1111
1112    /// Regression: get_or_insert must not panic when a concurrent remove
1113    /// deletes the key between the internal insert and the return.
1114    #[test]
1115    fn test_hopscotch_get_or_insert_concurrent_remove() {
1116        use alloc::sync::Arc;
1117        extern crate std;
1118        use std::sync::Barrier;
1119        use std::thread;
1120
1121        let map = Arc::new(HopscotchMap::<u64, u64>::with_capacity(64));
1122        let barrier = Arc::new(Barrier::new(8));
1123
1124        let handles: Vec<_> = (0..8u64)
1125            .map(|tid| {
1126                let map = map.clone();
1127                let barrier = barrier.clone();
1128                thread::spawn(move || {
1129                    barrier.wait();
1130                    for i in 0..5000u64 {
1131                        let key = i % 32; // Small key space forces heavy contention
1132                        if tid % 2 == 0 {
1133                            // Half the threads do get_or_insert
1134                            let _ = map.get_or_insert(key, tid * 1000 + i);
1135                        } else {
1136                            // Other half remove
1137                            let _ = map.remove(&key);
1138                        }
1139                    }
1140                })
1141            })
1142            .collect();
1143
1144        for h in handles {
1145            h.join()
1146                .expect("Thread panicked during get_or_insert/remove race");
1147        }
1148    }
1149}