Skip to main content

kovan_map/
hopscotch.rs

1//! Lock-Free Growing/Shrinking Hopscotch Hash Map
2//!
3//! # Features
4//!
5//! - **Robust Concurrency**: Uses Copy-on-Move displacement to prevent Use-After-Free.
6//! - **Safe Resizing**: Uses a lightweight resize lock to prevent lost updates during migration.
7//! - **Memory reclamation**: Uses Kovan.
8//! - **Clone Support**: Supports V: Clone (e.g., `Arc<T>`) instead of just Copy.
9
10extern crate alloc;
11
12use alloc::boxed::Box;
13use alloc::vec::Vec;
14use core::borrow::Borrow;
15use core::hash::{BuildHasher, Hash};
16use core::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
17use foldhash::fast::FixedState;
18use kovan::{Atomic, RetiredNode, Shared, pin, retire};
19
20/// Neighborhood size (H parameter in hopscotch hashing)
21const NEIGHBORHOOD_SIZE: usize = 32;
22
23/// Initial capacity
24const INITIAL_CAPACITY: usize = 64;
25
26/// Load factor threshold for growing (75%)
27const GROW_THRESHOLD: f64 = 0.75;
28
29/// Load factor threshold for shrinking (25%)
30const SHRINK_THRESHOLD: f64 = 0.25;
31
32/// Minimum capacity to prevent excessive shrinking
33const MIN_CAPACITY: usize = 64;
34
35/// Maximum probe distance before giving up and resizing
36const MAX_PROBE_DISTANCE: usize = 512;
37
38/// A bucket in the hopscotch hash table
39struct Bucket<K, V> {
40    /// Bitmap indicating which of the next H slots contain items that hash to this bucket
41    hop_info: AtomicU32,
42    /// The actual key-value slot at this position
43    slot: Atomic<Entry<K, V>>,
44}
45
46/// An entry in the hash table
47#[repr(C)]
48struct Entry<K, V> {
49    retired: RetiredNode,
50    hash: u64,
51    key: K,
52    value: V,
53}
54
55impl<K: Clone, V: Clone> Clone for Entry<K, V> {
56    fn clone(&self) -> Self {
57        Self {
58            retired: RetiredNode::new(),
59            hash: self.hash,
60            key: self.key.clone(),
61            value: self.value.clone(),
62        }
63    }
64}
65
66// SAFETY (kovan retirement rule): a retired Entry's destructor may run on
67// any thread, and entries (with K and V inside) move between threads —
68// hence `K: Send, V: Send` for Send. Lookups DO produce `&K`/`&V` from a
69// shared `&Entry` (get() clones V through &V under concurrent readers),
70// so Sync additionally requires `K: Sync, V: Sync` — the same bounds the
71// map-level Sync impl has always required for sharing the map.
72unsafe impl<K: Send, V: Send> Send for Entry<K, V> {}
73unsafe impl<K: Send + Sync, V: Send + Sync> Sync for Entry<K, V> {}
74
75/// The hash table structure
76#[repr(C)]
77struct Table<K, V> {
78    retired: RetiredNode,
79    buckets: Box<[Bucket<K, V>]>,
80    capacity: usize,
81    mask: usize,
82}
83
84// SAFETY (kovan retirement rule): same reasoning as Entry — a retired
85// Table's destructor may run on any thread (hence K, V: Send via the
86// contained entries); shared access to entries through a `&Table` carries
87// Entry's Sync requirements.
88unsafe impl<K: Send, V: Send> Send for Table<K, V> {}
89unsafe impl<K: Send + Sync, V: Send + Sync> Sync for Table<K, V> {}
90
91impl<K, V> Table<K, V> {
92    fn new(capacity: usize) -> Self {
93        let capacity = capacity.next_power_of_two().max(MIN_CAPACITY);
94        // We add padding to the array so we don't have to check bounds constantly
95        // during neighborhood scans.
96        let mut buckets = Vec::with_capacity(capacity + NEIGHBORHOOD_SIZE);
97
98        for _ in 0..(capacity + NEIGHBORHOOD_SIZE) {
99            buckets.push(Bucket {
100                hop_info: AtomicU32::new(0),
101                slot: Atomic::null(),
102            });
103        }
104
105        Self {
106            retired: RetiredNode::new(),
107            buckets: buckets.into_boxed_slice(),
108            capacity,
109            mask: capacity - 1,
110        }
111    }
112
113    #[inline(always)]
114    fn bucket_index(&self, hash: u64) -> usize {
115        (hash as usize) & self.mask
116    }
117
118    #[inline(always)]
119    fn get_bucket(&self, idx: usize) -> &Bucket<K, V> {
120        // SAFETY: Internal indices are calculated via mask or bounded offset loops.
121        // The buckets array has padding to handle overflow up to NEIGHBORHOOD_SIZE.
122        unsafe { self.buckets.get_unchecked(idx) }
123    }
124}
125
126impl<K, V> Drop for Table<K, V> {
127    fn drop(&mut self) {
128        // Exclusive at this point: either the map itself is being dropped,
129        // or kovan reclaimed the table after every guard that could observe
130        // it has been released. Slots hold exactly the entries that were
131        // never individually unlinked+retired (remove/clear null the slot
132        // before retiring), so each entry is freed exactly once. Without
133        // this, every resize leaked the old table's entries.
134        let guard = pin();
135        for i in 0..(self.capacity + NEIGHBORHOOD_SIZE) {
136            let entry_ptr = self.buckets[i]
137                .slot
138                .load(Ordering::Relaxed, &guard)
139                .as_raw();
140            if !entry_ptr.is_null() {
141                unsafe {
142                    drop(Box::from_raw(entry_ptr));
143                }
144            }
145        }
146    }
147}
148
149/// A concurrent, lock-free hash map based on Hopscotch Hashing.
150pub struct HopscotchMap<K: 'static, V: 'static, S = FixedState> {
151    table: Atomic<Table<K, V>>,
152    count: AtomicUsize,
153    /// Prevents concurrent writes during resize migration to avoid lost updates
154    resizing: AtomicBool,
155    hasher: S,
156}
157
158#[cfg(feature = "std")]
159impl<K, V> HopscotchMap<K, V, FixedState>
160where
161    K: Hash + Eq + Clone + 'static,
162    V: Clone + 'static,
163{
164    /// Creates a new `HopscotchMap` with default capacity and hasher.
165    pub fn new() -> Self {
166        Self::with_hasher(FixedState::default())
167    }
168
169    /// Creates a new `HopscotchMap` with the specified capacity and default hasher.
170    pub fn with_capacity(capacity: usize) -> Self {
171        Self::with_capacity_and_hasher(capacity, FixedState::default())
172    }
173}
174
175impl<K, V, S> HopscotchMap<K, V, S>
176where
177    K: Hash + Eq + Clone + 'static,
178    V: Clone + 'static,
179    S: BuildHasher,
180{
181    /// Creates a new `HopscotchMap` with the specified hasher and default capacity.
182    pub fn with_hasher(hasher: S) -> Self {
183        Self::with_capacity_and_hasher(INITIAL_CAPACITY, hasher)
184    }
185
186    /// Creates a new `HopscotchMap` with the specified capacity and hasher.
187    pub fn with_capacity_and_hasher(capacity: usize, hasher: S) -> Self {
188        let table = Table::new(capacity);
189        Self {
190            table: Atomic::new(Box::into_raw(Box::new(table))),
191            count: AtomicUsize::new(0),
192            resizing: AtomicBool::new(false),
193            hasher,
194        }
195    }
196
197    /// Returns the number of elements in the map.
198    pub fn len(&self) -> usize {
199        self.count.load(Ordering::Relaxed)
200    }
201
202    /// Returns `true` if the map contains no elements.
203    pub fn is_empty(&self) -> bool {
204        self.len() == 0
205    }
206
207    /// Returns the current capacity of the map.
208    pub fn capacity(&self) -> usize {
209        let guard = pin();
210        let table_ptr = self.table.load(Ordering::Acquire, &guard);
211        unsafe { (*table_ptr.as_raw()).capacity }
212    }
213
214    #[inline]
215    fn wait_for_resize(&self) {
216        while self.resizing.load(Ordering::Acquire) {
217            core::hint::spin_loop();
218        }
219    }
220
221    /// Returns the value corresponding to the key.
222    pub fn get<Q>(&self, key: &Q) -> Option<V>
223    where
224        K: Borrow<Q>,
225        Q: Hash + Eq + ?Sized,
226    {
227        let hash = self.hasher.hash_one(key);
228        let guard = pin();
229        let table_ptr = self.table.load(Ordering::Acquire, &guard);
230        let table = unsafe { &*table_ptr.as_raw() };
231
232        let bucket_idx = table.bucket_index(hash);
233        let bucket = table.get_bucket(bucket_idx);
234
235        let hop_info = bucket.hop_info.load(Ordering::Acquire);
236
237        if hop_info == 0 {
238            return None;
239        }
240
241        for offset in 0..NEIGHBORHOOD_SIZE {
242            if hop_info & (1 << offset) != 0 {
243                let slot_idx = bucket_idx + offset;
244                let slot_bucket = table.get_bucket(slot_idx);
245                let entry_ptr = slot_bucket.slot.load(Ordering::Acquire, &guard);
246
247                if !entry_ptr.is_null() {
248                    let entry = unsafe { &*entry_ptr.as_raw() };
249                    if entry.hash == hash && entry.key.borrow() == key {
250                        return Some(entry.value.clone());
251                    }
252                }
253            }
254        }
255
256        None
257    }
258
259    /// Inserts a key-value pair into the map.
260    pub fn insert(&self, key: K, value: V) -> Option<V> {
261        self.insert_impl(key, value, false)
262    }
263
264    /// Helper for get_or_insert logic.
265    fn insert_impl(&self, key: K, value: V, only_if_absent: bool) -> Option<V> {
266        let hash = self.hasher.hash_one(&key);
267        // Track whether we already incremented count for a new insert across
268        // retry iterations.  Prevents both under-count (which causes cascading
269        // resizes on Windows) and double-count.
270        let mut counted = false;
271
272        loop {
273            self.wait_for_resize();
274
275            let guard = pin();
276            let table_ptr = self.table.load(Ordering::Acquire, &guard);
277            let table = unsafe { &*table_ptr.as_raw() };
278
279            if self.resizing.load(Ordering::Acquire) {
280                continue;
281            }
282
283            // Note: We pass clones to try_insert if we loop here, but try_insert consumes them.
284            // Since `insert_impl` owns `key` and `value`, we must clone them for the call
285            // because `try_insert` might return `Retry` (looping again).
286            match self.try_insert(
287                table,
288                hash,
289                key.clone(),
290                value.clone(),
291                only_if_absent,
292                &guard,
293            ) {
294                InsertResult::Success(old_val) => {
295                    // Count new inserts immediately so that concurrent removes
296                    // cannot decrement count below the true entry count.
297                    if old_val.is_none() && !counted {
298                        self.count.fetch_add(1, Ordering::Relaxed);
299                        counted = true;
300                    }
301
302                    // If a resize started while we were inserting, our update
303                    // might have been missed by the migration.
304                    // We must retry to ensure we write to the new table.
305                    if self.resizing.load(Ordering::SeqCst)
306                        || self.table.load(Ordering::SeqCst, &guard) != table_ptr
307                    {
308                        continue;
309                    }
310
311                    if counted {
312                        let new_count = self.count.load(Ordering::Relaxed);
313                        let current_capacity = table.capacity;
314                        let load_factor = new_count as f64 / current_capacity as f64;
315
316                        if load_factor > GROW_THRESHOLD {
317                            drop(guard);
318                            self.try_resize(current_capacity * 2);
319                        }
320                    }
321                    return old_val;
322                }
323                InsertResult::Exists(existing_val) => {
324                    return Some(existing_val);
325                }
326                InsertResult::NeedResize => {
327                    let current_capacity = table.capacity;
328                    drop(guard);
329                    self.try_resize(current_capacity * 2);
330                    continue;
331                }
332                InsertResult::Retry => {
333                    continue;
334                }
335            }
336        }
337    }
338
339    /// Returns the value corresponding to the key, or inserts the given value if the key is not present.
340    ///
341    /// When multiple threads call this concurrently for the same key (without
342    /// concurrent removes), all callers receive the same value.
343    pub fn get_or_insert(&self, key: K, value: V) -> V {
344        // Fast path: key already exists — no clone, no insert.
345        if let Some(v) = self.get(&key) {
346            return v;
347        }
348        // Slow path: insert_if_absent and use the return value directly.
349        // We must NOT do insert-then-get because a concurrent remove between
350        // the two operations would cause get to return None.
351        let key2 = key.clone();
352        match self.insert_impl(key, value.clone(), true) {
353            None => {
354                // We inserted, but concurrent inserts may have also placed
355                // the same key at a different offset (the CAS-then-hop-bit
356                // window allows duplicates). Re-get returns the canonical
357                // (lowest-offset) entry so every caller agrees on one value.
358                self.get(&key2).unwrap_or(value)
359            }
360            Some(existing) => existing, // Key already existed
361        }
362    }
363
364    /// Insert a key-value pair only if the key does not exist.
365    /// Returns `None` if inserted, `Some(existing_value)` if the key already exists.
366    pub fn insert_if_absent(&self, key: K, value: V) -> Option<V> {
367        self.insert_impl(key, value, true)
368    }
369
370    /// Remove **all** nodes matching `key`, returning the most recent value
371    /// if the key was present.
372    ///
373    /// [`remove`](Self::remove) unlinks only the first matching entry.
374    /// Insert/remove races can transiently leave more than one entry for
375    /// the same key ("versions"); after a plain `remove()` an older version
376    /// would become visible again. This method keeps removing until a full
377    /// scan finds no match, so the key is guaranteed absent at the
378    /// linearization point of the final scan.
379    ///
380    /// Use `remove()` for single-version removal semantics and
381    /// `force_remove()` when the key must be fully evicted.
382    ///
383    /// Note: a concurrent `insert` of the same key can land after the final
384    /// scan, as with any removal under contention.
385    pub fn force_remove<Q>(&self, key: &Q) -> Option<V>
386    where
387        K: Borrow<Q>,
388        Q: Hash + Eq + ?Sized,
389    {
390        let mut newest = None;
391        loop {
392            match self.remove(key) {
393                Some(v) => {
394                    // The first removal unlinks the first match in scan
395                    // order — the live (most recent) version.
396                    if newest.is_none() {
397                        newest = Some(v);
398                    }
399                }
400                None => return newest,
401            }
402        }
403    }
404
405    /// Removes a key from the map, returning the value at the key if the key was previously in the map.
406    pub fn remove<Q>(&self, key: &Q) -> Option<V>
407    where
408        K: Borrow<Q>,
409        Q: Hash + Eq + ?Sized,
410    {
411        let hash = self.hasher.hash_one(key);
412        // First successful removal's value is the linearized result;
413        // re-validation retries only evict migrated clones.
414        let mut result: Option<V> = None;
415
416        'outer: loop {
417            self.wait_for_resize();
418
419            let guard = pin();
420            let table_ptr = self.table.load(Ordering::Acquire, &guard);
421            let table_raw = table_ptr.as_raw();
422            let table = unsafe { &*table_raw };
423
424            if self.resizing.load(Ordering::Acquire) {
425                continue;
426            }
427
428            let bucket_idx = table.bucket_index(hash);
429            let bucket = table.get_bucket(bucket_idx);
430
431            let hop_info = bucket.hop_info.load(Ordering::Acquire);
432            if hop_info == 0 {
433                return result;
434            }
435
436            for offset in 0..NEIGHBORHOOD_SIZE {
437                if hop_info & (1 << offset) != 0 {
438                    let slot_idx = bucket_idx + offset;
439                    let slot_bucket = table.get_bucket(slot_idx);
440                    let entry_ptr = slot_bucket.slot.load(Ordering::Acquire, &guard);
441
442                    if !entry_ptr.is_null() {
443                        let entry = unsafe { &*entry_ptr.as_raw() };
444                        if entry.hash == hash && entry.key.borrow() == key {
445                            let old_value = entry.value.clone();
446
447                            match slot_bucket.slot.compare_exchange(
448                                entry_ptr,
449                                unsafe { Shared::from_raw(core::ptr::null_mut()) },
450                                Ordering::Release,
451                                Ordering::Relaxed,
452                                &guard,
453                            ) {
454                                Ok(_) => {
455                                    let mask = !(1u32 << offset);
456                                    bucket.hop_info.fetch_and(mask, Ordering::Release);
457
458                                    unsafe { retire(entry_ptr.as_raw()) };
459                                    if result.is_none() {
460                                        result = Some(old_value);
461                                    }
462
463                                    // Saturating decrement: prevent count from wrapping
464                                    // to usize::MAX which would trigger catastrophic
465                                    // cascading resizes.
466                                    let shrink_to = if let Ok(prev) = self.count.fetch_update(
467                                        Ordering::Relaxed,
468                                        Ordering::Relaxed,
469                                        |c| c.checked_sub(1),
470                                    ) {
471                                        let new_count = prev - 1;
472                                        let current_capacity = table.capacity;
473                                        let load_factor =
474                                            new_count as f64 / current_capacity as f64;
475                                        (load_factor < SHRINK_THRESHOLD
476                                            && current_capacity > MIN_CAPACITY)
477                                            .then_some(current_capacity / 2)
478                                    } else {
479                                        None
480                                    };
481
482                                    // Re-validate: a concurrent migration may
483                                    // have cloned this entry into a new table
484                                    // before we unlinked it here — redo the
485                                    // removal on the current table so the key
486                                    // does not resurrect.
487                                    if self.resizing.load(Ordering::SeqCst)
488                                        || self.table.load(Ordering::SeqCst, &guard).as_raw()
489                                            != table_raw
490                                    {
491                                        continue 'outer;
492                                    }
493
494                                    if let Some(cap) = shrink_to {
495                                        drop(guard);
496                                        self.try_resize(cap);
497                                    }
498                                    return result;
499                                }
500                                Err(_) => {
501                                    break;
502                                }
503                            }
504                        }
505                    }
506                }
507            }
508            return result;
509        }
510    }
511
512    /// Clears the map, removing all key-value pairs.
513    pub fn clear(&self) {
514        while self
515            .resizing
516            .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
517            .is_err()
518        {
519            core::hint::spin_loop();
520        }
521
522        let guard = pin();
523        let table_ptr = self.table.load(Ordering::Acquire, &guard);
524        let table = unsafe { &*table_ptr.as_raw() };
525
526        for i in 0..(table.capacity + NEIGHBORHOOD_SIZE) {
527            let bucket = table.get_bucket(i);
528            let entry_ptr = bucket.slot.load(Ordering::Acquire, &guard);
529
530            if !entry_ptr.is_null()
531                && bucket
532                    .slot
533                    .compare_exchange(
534                        entry_ptr,
535                        unsafe { Shared::from_raw(core::ptr::null_mut()) },
536                        Ordering::Release,
537                        Ordering::Relaxed,
538                        &guard,
539                    )
540                    .is_ok()
541            {
542                unsafe { retire(entry_ptr.as_raw()) };
543            }
544
545            if i < table.capacity {
546                let b = table.get_bucket(i);
547                b.hop_info.store(0, Ordering::Release);
548            }
549        }
550
551        self.count.store(0, Ordering::Release);
552        self.resizing.store(false, Ordering::Release);
553    }
554
555    fn try_insert(
556        &self,
557        table: &Table<K, V>,
558        hash: u64,
559        key: K,
560        value: V,
561        only_if_absent: bool,
562        guard: &kovan::Guard,
563    ) -> InsertResult<V> {
564        let bucket_idx = table.bucket_index(hash);
565        let bucket = table.get_bucket(bucket_idx);
566
567        // 1. Check if key exists (Update)
568        let hop_info = bucket.hop_info.load(Ordering::Acquire);
569        for offset in 0..NEIGHBORHOOD_SIZE {
570            if hop_info & (1 << offset) != 0 {
571                let slot_idx = bucket_idx + offset;
572                let slot_bucket = table.get_bucket(slot_idx);
573                let entry_ptr = slot_bucket.slot.load(Ordering::Acquire, guard);
574
575                if !entry_ptr.is_null() {
576                    let entry = unsafe { &*entry_ptr.as_raw() };
577                    if entry.hash == hash && entry.key == key {
578                        if only_if_absent {
579                            return InsertResult::Exists(entry.value.clone());
580                        }
581
582                        let old_value = entry.value.clone();
583                        // Clone key and value because if CAS fails, we retry, and we are inside a loop.
584                        // We cannot move out of `key` or `value` inside a loop.
585                        let new_entry = Box::into_raw(Box::new(Entry {
586                            retired: RetiredNode::new(),
587                            hash,
588                            key: key.clone(),
589                            value: value.clone(),
590                        }));
591
592                        match slot_bucket.slot.compare_exchange(
593                            entry_ptr,
594                            unsafe { Shared::from_raw(new_entry) },
595                            Ordering::Release,
596                            Ordering::Relaxed,
597                            guard,
598                        ) {
599                            Ok(_) => {
600                                unsafe { retire(entry_ptr.as_raw()) };
601                                return InsertResult::Success(Some(old_value));
602                            }
603                            Err(_) => {
604                                drop(unsafe { Box::from_raw(new_entry) });
605                                return InsertResult::Retry;
606                            }
607                        }
608                    }
609                }
610            }
611        }
612
613        // 2. Find empty slot
614        for offset in 0..NEIGHBORHOOD_SIZE {
615            let slot_idx = bucket_idx + offset;
616            if slot_idx >= table.capacity + NEIGHBORHOOD_SIZE {
617                return InsertResult::NeedResize;
618            }
619
620            let slot_bucket = table.get_bucket(slot_idx);
621            let entry_ptr = slot_bucket.slot.load(Ordering::Acquire, guard);
622
623            if entry_ptr.is_null() {
624                // Clone key and value. If CAS fails, we continue the loop, so we need the originals
625                // for the next iteration.
626                let new_entry = Box::into_raw(Box::new(Entry {
627                    retired: RetiredNode::new(),
628                    hash,
629                    key: key.clone(),
630                    value: value.clone(),
631                }));
632
633                match slot_bucket.slot.compare_exchange(
634                    unsafe { Shared::from_raw(core::ptr::null_mut()) },
635                    unsafe { Shared::from_raw(new_entry) },
636                    Ordering::Release,
637                    Ordering::Relaxed,
638                    guard,
639                ) {
640                    Ok(_) => {
641                        bucket.hop_info.fetch_or(1u32 << offset, Ordering::Release);
642                        return InsertResult::Success(None);
643                    }
644                    Err(_) => {
645                        drop(unsafe { Box::from_raw(new_entry) });
646                        continue;
647                    }
648                }
649            }
650        }
651
652        // 3. Try displacement
653        match self.try_find_closer_slot(table, bucket_idx, guard) {
654            Some(final_offset) if final_offset < NEIGHBORHOOD_SIZE => {
655                let slot_idx = bucket_idx + final_offset;
656                let slot_bucket = table.get_bucket(slot_idx);
657
658                let curr = slot_bucket.slot.load(Ordering::Relaxed, guard);
659                if !curr.is_null() {
660                    return InsertResult::Retry;
661                }
662
663                // This is the final attempt in this function. We can move key/value here
664                // because previous usages were clones.
665                let new_entry = Box::into_raw(Box::new(Entry {
666                    retired: RetiredNode::new(),
667                    hash,
668                    key,
669                    value,
670                }));
671
672                match slot_bucket.slot.compare_exchange(
673                    unsafe { Shared::from_raw(core::ptr::null_mut()) },
674                    unsafe { Shared::from_raw(new_entry) },
675                    Ordering::Release,
676                    Ordering::Relaxed,
677                    guard,
678                ) {
679                    Ok(_) => {
680                        bucket
681                            .hop_info
682                            .fetch_or(1u32 << final_offset, Ordering::Release);
683                        InsertResult::Success(None)
684                    }
685                    Err(_) => {
686                        drop(unsafe { Box::from_raw(new_entry) });
687                        InsertResult::Retry
688                    }
689                }
690            }
691            _ => InsertResult::NeedResize,
692        }
693    }
694
695    fn try_find_closer_slot(
696        &self,
697        table: &Table<K, V>,
698        bucket_idx: usize,
699        guard: &kovan::Guard,
700    ) -> Option<usize> {
701        for probe_offset in NEIGHBORHOOD_SIZE..MAX_PROBE_DISTANCE {
702            let probe_idx = bucket_idx + probe_offset;
703            if probe_idx >= table.capacity + NEIGHBORHOOD_SIZE {
704                return None;
705            }
706
707            let probe_bucket = table.get_bucket(probe_idx);
708            let entry_ptr = probe_bucket.slot.load(Ordering::Acquire, guard);
709
710            if entry_ptr.is_null() {
711                return self.try_move_closer(table, bucket_idx, probe_idx, guard);
712            }
713        }
714        None
715    }
716
717    fn try_move_closer(
718        &self,
719        table: &Table<K, V>,
720        target_idx: usize,
721        empty_idx: usize,
722        guard: &kovan::Guard,
723    ) -> Option<usize> {
724        let mut current_empty = empty_idx;
725
726        while current_empty > target_idx + NEIGHBORHOOD_SIZE - 1 {
727            let mut moved = false;
728
729            for offset in 1..NEIGHBORHOOD_SIZE.min(current_empty - target_idx) {
730                let candidate_idx = current_empty - offset;
731                let candidate_bucket = table.get_bucket(candidate_idx);
732                let entry_ptr = candidate_bucket.slot.load(Ordering::Acquire, guard);
733
734                if !entry_ptr.is_null() {
735                    let entry = unsafe { &*entry_ptr.as_raw() };
736                    let entry_home = table.bucket_index(entry.hash);
737
738                    if entry_home <= candidate_idx && current_empty < entry_home + NEIGHBORHOOD_SIZE
739                    {
740                        // Copy-on-Move for safety
741                        let new_entry = Box::into_raw(Box::new(entry.clone()));
742                        let empty_bucket = table.get_bucket(current_empty);
743
744                        match empty_bucket.slot.compare_exchange(
745                            unsafe { Shared::from_raw(core::ptr::null_mut()) },
746                            unsafe { Shared::from_raw(new_entry) },
747                            Ordering::Release,
748                            Ordering::Relaxed,
749                            guard,
750                        ) {
751                            Ok(_) => {
752                                match candidate_bucket.slot.compare_exchange(
753                                    entry_ptr,
754                                    unsafe { Shared::from_raw(core::ptr::null_mut()) },
755                                    Ordering::Release,
756                                    Ordering::Relaxed,
757                                    guard,
758                                ) {
759                                    Ok(_) => {
760                                        let old_offset = candidate_idx - entry_home;
761                                        let new_offset = current_empty - entry_home;
762
763                                        let home_bucket = table.get_bucket(entry_home);
764                                        home_bucket
765                                            .hop_info
766                                            .fetch_and(!(1u32 << old_offset), Ordering::Release);
767                                        home_bucket
768                                            .hop_info
769                                            .fetch_or(1u32 << new_offset, Ordering::Release);
770
771                                        unsafe { retire(entry_ptr.as_raw()) };
772                                        current_empty = candidate_idx;
773                                        moved = true;
774                                        break;
775                                    }
776                                    Err(_) => {
777                                        // Attempt to revert the insertion of new_entry.
778                                        match empty_bucket.slot.compare_exchange(
779                                            unsafe { Shared::from_raw(new_entry) },
780                                            unsafe { Shared::from_raw(core::ptr::null_mut()) },
781                                            Ordering::Release,
782                                            Ordering::Relaxed,
783                                            guard,
784                                        ) {
785                                            Ok(_) => {
786                                                // Revert succeeded: no other thread touched it.
787                                                // Safe to drop immediately.
788                                                unsafe { drop(Box::from_raw(new_entry)) };
789                                            }
790                                            Err(_) => {
791                                                // Displacement already happened here...
792                                                // Too late, so we just drop it. Another thread found new_entry,
793                                                // displaced it, and replaced it with something else.
794                                                // This means new_entry is now part of the blocks
795                                                // and was already "retired" by the other thread,
796                                                // OR it was just moved. In either case, we must
797                                                // let the reclamation system handle it to avoid a
798                                                // double free.
799                                                unsafe { retire(new_entry) };
800                                            }
801                                        }
802                                        continue;
803                                    }
804                                }
805                            }
806                            Err(_) => {
807                                unsafe { drop(Box::from_raw(new_entry)) };
808                                continue;
809                            }
810                        }
811                    }
812                }
813            }
814
815            if !moved {
816                return None;
817            }
818        }
819
820        if current_empty >= target_idx && current_empty < target_idx + NEIGHBORHOOD_SIZE {
821            Some(current_empty - target_idx)
822        } else {
823            None
824        }
825    }
826
827    fn insert_into_new_table(
828        &self,
829        table: &Table<K, V>,
830        hash: u64,
831        key: K,
832        value: V,
833        guard: &kovan::Guard,
834    ) -> bool {
835        let bucket_idx = table.bucket_index(hash);
836
837        for probe_offset in 0..(table.capacity + NEIGHBORHOOD_SIZE) {
838            let probe_idx = bucket_idx + probe_offset;
839            if probe_idx >= table.capacity + NEIGHBORHOOD_SIZE {
840                break;
841            }
842
843            let probe_bucket = table.get_bucket(probe_idx);
844            let slot_ptr = probe_bucket.slot.load(Ordering::Relaxed, guard);
845
846            if slot_ptr.is_null() {
847                let offset_from_home = probe_idx - bucket_idx;
848
849                if offset_from_home < NEIGHBORHOOD_SIZE {
850                    let new_entry = Box::into_raw(Box::new(Entry {
851                        retired: RetiredNode::new(),
852                        hash,
853                        key,
854                        value,
855                    }));
856                    probe_bucket
857                        .slot
858                        .store(unsafe { Shared::from_raw(new_entry) }, Ordering::Release);
859
860                    let bucket = table.get_bucket(bucket_idx);
861                    bucket
862                        .hop_info
863                        .fetch_or(1u32 << offset_from_home, Ordering::Relaxed);
864                    return true;
865                } else {
866                    return false;
867                }
868            }
869        }
870        false
871    }
872
873    fn try_resize(&self, new_capacity: usize) {
874        if self
875            .resizing
876            .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
877            .is_err()
878        {
879            return;
880        }
881
882        let new_capacity = new_capacity.next_power_of_two().max(MIN_CAPACITY);
883        let guard = pin();
884        let old_table_ptr = self.table.load(Ordering::Acquire, &guard);
885        let old_table = unsafe { &*old_table_ptr.as_raw() };
886
887        if old_table.capacity == new_capacity {
888            self.resizing.store(false, Ordering::Release);
889            return;
890        }
891
892        let new_table = Box::into_raw(Box::new(Table::new(new_capacity)));
893        let new_table_ref = unsafe { &*new_table };
894
895        let mut success = true;
896
897        for i in 0..(old_table.capacity + NEIGHBORHOOD_SIZE) {
898            let bucket = old_table.get_bucket(i);
899            let entry_ptr = bucket.slot.load(Ordering::Acquire, &guard);
900
901            if !entry_ptr.is_null() {
902                let entry = unsafe { &*entry_ptr.as_raw() };
903                if !self.insert_into_new_table(
904                    new_table_ref,
905                    entry.hash,
906                    entry.key.clone(),
907                    entry.value.clone(),
908                    &guard,
909                ) {
910                    success = false;
911                    break;
912                }
913            }
914        }
915
916        if success {
917            match self.table.compare_exchange(
918                old_table_ptr,
919                unsafe { Shared::from_raw(new_table) },
920                Ordering::Release,
921                Ordering::Relaxed,
922                &guard,
923            ) {
924                Ok(_) => {
925                    unsafe { retire(old_table_ptr.as_raw()) };
926                }
927                Err(_) => {
928                    success = false;
929                }
930            }
931        }
932
933        if !success {
934            // The unpublished new table's destructor frees its cloned entries.
935            unsafe {
936                drop(Box::from_raw(new_table));
937            }
938        }
939
940        self.resizing.store(false, Ordering::Release);
941    }
942    /// Returns an iterator over the map entries.
943    pub fn iter(&self) -> HopscotchIter<'_, K, V, S> {
944        let guard = pin();
945        let table_ptr = self.table.load(Ordering::Acquire, &guard);
946        let _table = unsafe { &*table_ptr.as_raw() };
947        HopscotchIter {
948            map: self,
949            bucket_idx: 0,
950            guard,
951        }
952    }
953
954    /// Returns an iterator over the map keys.
955    pub fn keys(&self) -> HopscotchKeys<'_, K, V, S> {
956        HopscotchKeys { iter: self.iter() }
957    }
958
959    /// Returns an iterator over the map values (clones `V`).
960    pub fn values(&self) -> HopscotchValues<'_, K, V, S> {
961        HopscotchValues { iter: self.iter() }
962    }
963
964    /// Returns `true` if the key is present.
965    pub fn contains_key<Q>(&self, key: &Q) -> bool
966    where
967        K: Borrow<Q>,
968        Q: Hash + Eq + ?Sized,
969    {
970        self.get(key).is_some()
971    }
972
973    /// Insert all `(K, V)` pairs from `iter`. Takes `&self` (concurrent map).
974    pub fn extend<I: IntoIterator<Item = (K, V)>>(&self, iter: I) {
975        for (k, v) in iter {
976            self.insert(k, v);
977        }
978    }
979
980    /// Get the underlying hasher.
981    pub fn hasher(&self) -> &S {
982        &self.hasher
983    }
984}
985
986/// Iterator over HopscotchMap entries.
987pub struct HopscotchIter<'a, K: 'static, V: 'static, S> {
988    map: &'a HopscotchMap<K, V, S>,
989    bucket_idx: usize,
990    guard: kovan::Guard,
991}
992
993impl<'a, K, V, S> Iterator for HopscotchIter<'a, K, V, S>
994where
995    K: Clone,
996    V: Clone,
997{
998    type Item = (K, V);
999
1000    fn next(&mut self) -> Option<Self::Item> {
1001        let table_ptr = self.map.table.load(Ordering::Acquire, &self.guard);
1002        let table = unsafe { &*table_ptr.as_raw() };
1003
1004        while self.bucket_idx < table.buckets.len() {
1005            let bucket = table.get_bucket(self.bucket_idx);
1006            self.bucket_idx += 1;
1007
1008            let entry_ptr = bucket.slot.load(Ordering::Acquire, &self.guard);
1009            if !entry_ptr.is_null() {
1010                let entry = unsafe { &*entry_ptr.as_raw() };
1011                return Some((entry.key.clone(), entry.value.clone()));
1012            }
1013        }
1014        None
1015    }
1016}
1017
1018/// Iterator over HopscotchMap keys.
1019pub struct HopscotchKeys<'a, K: 'static, V: 'static, S> {
1020    iter: HopscotchIter<'a, K, V, S>,
1021}
1022
1023impl<'a, K, V, S> Iterator for HopscotchKeys<'a, K, V, S>
1024where
1025    K: Clone,
1026    V: Clone,
1027{
1028    type Item = K;
1029
1030    fn next(&mut self) -> Option<Self::Item> {
1031        self.iter.next().map(|(k, _)| k)
1032    }
1033}
1034
1035/// Iterator over HopscotchMap values (clones `V`).
1036pub struct HopscotchValues<'a, K: 'static, V: 'static, S> {
1037    iter: HopscotchIter<'a, K, V, S>,
1038}
1039
1040impl<'a, K, V, S> Iterator for HopscotchValues<'a, K, V, S>
1041where
1042    K: Clone,
1043    V: Clone,
1044{
1045    type Item = V;
1046
1047    #[inline]
1048    fn next(&mut self) -> Option<V> {
1049        self.iter.next().map(|(_, v)| v)
1050    }
1051}
1052
1053/// Owned iterator yielding `(K, V)` by value — moves out of the entries, no
1054/// clone. Each drained slot is nulled so the table destructor stays a no-op.
1055pub struct HopscotchIntoIter<K: 'static, V: 'static> {
1056    table: *mut Table<K, V>,
1057    bucket_idx: usize,
1058    guard: kovan::Guard,
1059}
1060
1061impl<K, V> Iterator for HopscotchIntoIter<K, V> {
1062    type Item = (K, V);
1063
1064    fn next(&mut self) -> Option<(K, V)> {
1065        let table = unsafe { &*self.table };
1066        while self.bucket_idx < table.buckets.len() {
1067            let bucket = table.get_bucket(self.bucket_idx);
1068            self.bucket_idx += 1;
1069            let entry = bucket.slot.load(Ordering::Acquire, &self.guard).as_raw();
1070            if !entry.is_null() {
1071                bucket.slot.store(
1072                    unsafe { Shared::from_raw(core::ptr::null_mut()) },
1073                    Ordering::Relaxed,
1074                );
1075                let k = unsafe { core::ptr::read(&(*entry).key) };
1076                let v = unsafe { core::ptr::read(&(*entry).value) };
1077                unsafe {
1078                    alloc::alloc::dealloc(
1079                        entry as *mut u8,
1080                        core::alloc::Layout::new::<Entry<K, V>>(),
1081                    );
1082                }
1083                return Some((k, v));
1084            }
1085        }
1086        None
1087    }
1088}
1089
1090impl<K, V> Drop for HopscotchIntoIter<K, V> {
1091    fn drop(&mut self) {
1092        while self.next().is_some() {}
1093        // All slots nulled above; Table::drop frees only the bucket array.
1094        unsafe { drop(Box::from_raw(self.table)) };
1095    }
1096}
1097
1098impl<K, V, S> IntoIterator for HopscotchMap<K, V, S>
1099where
1100    K: 'static,
1101    V: 'static,
1102{
1103    type Item = (K, V);
1104    type IntoIter = HopscotchIntoIter<K, V>;
1105
1106    fn into_iter(self) -> HopscotchIntoIter<K, V> {
1107        let mut me = core::mem::ManuallyDrop::new(self);
1108        let guard = pin();
1109        let table = me.table.load(Ordering::Relaxed, &guard).as_raw();
1110        unsafe { core::ptr::drop_in_place(&mut me.hasher) };
1111        HopscotchIntoIter {
1112            table,
1113            bucket_idx: 0,
1114            guard,
1115        }
1116    }
1117}
1118
1119impl<K, V, S> core::iter::FromIterator<(K, V)> for HopscotchMap<K, V, S>
1120where
1121    K: Hash + Eq + Clone + Send + 'static,
1122    V: Clone + Send + 'static,
1123    S: BuildHasher + Default,
1124{
1125    fn from_iter<I: IntoIterator<Item = (K, V)>>(iter: I) -> Self {
1126        let map = Self::with_hasher(S::default());
1127        for (k, v) in iter {
1128            map.insert(k, v);
1129        }
1130        map
1131    }
1132}
1133
1134impl<'a, K, V, S> IntoIterator for &'a HopscotchMap<K, V, S>
1135where
1136    K: Hash + Eq + Clone + 'static,
1137    V: Clone + 'static,
1138    S: BuildHasher,
1139{
1140    type Item = (K, V);
1141    type IntoIter = HopscotchIter<'a, K, V, S>;
1142
1143    fn into_iter(self) -> Self::IntoIter {
1144        self.iter()
1145    }
1146}
1147
1148enum InsertResult<V> {
1149    Success(Option<V>),
1150    Exists(V),
1151    NeedResize,
1152    Retry,
1153}
1154
1155#[cfg(feature = "std")]
1156impl<K, V> Default for HopscotchMap<K, V, FixedState>
1157where
1158    K: Hash + Eq + Clone + 'static,
1159    V: Clone + 'static,
1160{
1161    fn default() -> Self {
1162        Self::new()
1163    }
1164}
1165
1166unsafe impl<K: Send, V: Send, S: Send> Send for HopscotchMap<K, V, S> {}
1167// SAFETY: Shared references allow moving K and V across threads (via insert/remove),
1168// so K and V must be Send in addition to Sync.
1169unsafe impl<K: Send + Sync, V: Send + Sync, S: Send + Sync> Sync for HopscotchMap<K, V, S> {}
1170
1171impl<K, V, S> Drop for HopscotchMap<K, V, S> {
1172    fn drop(&mut self) {
1173        // SAFETY: `drop(&mut self)` guarantees exclusive ownership — no concurrent
1174        // readers can exist. The Table's destructor frees the remaining entries.
1175        let guard = pin();
1176        let table_ptr = self.table.load(Ordering::Acquire, &guard);
1177
1178        unsafe {
1179            drop(Box::from_raw(table_ptr.as_raw()));
1180        }
1181
1182        // Flush nodes previously retired by concurrent operations (insert/remove/resize)
1183        // to prevent use-after-free during process teardown.
1184        drop(guard);
1185        kovan::flush();
1186    }
1187}
1188
1189#[cfg(test)]
1190mod tests {
1191    use super::*;
1192
1193    #[test]
1194    fn test_insert_and_get() {
1195        let map = HopscotchMap::new();
1196        assert_eq!(map.insert(1, 100), None);
1197        assert_eq!(map.get(&1), Some(100));
1198        assert_eq!(map.get(&2), None);
1199    }
1200
1201    #[test]
1202    fn test_growing() {
1203        let map = HopscotchMap::with_capacity(32);
1204        for i in 0..100 {
1205            map.insert(i, i * 2);
1206        }
1207        for i in 0..100 {
1208            assert_eq!(map.get(&i), Some(i * 2));
1209        }
1210    }
1211
1212    #[test]
1213    fn test_concurrent() {
1214        use alloc::sync::Arc;
1215        extern crate std;
1216        use std::thread;
1217
1218        let map = Arc::new(HopscotchMap::with_capacity(64));
1219        let mut handles = alloc::vec::Vec::new();
1220
1221        for thread_id in 0..4 {
1222            let map_clone = Arc::clone(&map);
1223            let handle = thread::spawn(move || {
1224                for i in 0..1000 {
1225                    let key = thread_id * 1000 + i;
1226                    map_clone.insert(key, key * 2);
1227                }
1228            });
1229            handles.push(handle);
1230        }
1231
1232        for handle in handles {
1233            handle.join().unwrap();
1234        }
1235
1236        for thread_id in 0..4 {
1237            for i in 0..1000 {
1238                let key = thread_id * 1000 + i;
1239                assert_eq!(map.get(&key), Some(key * 2));
1240            }
1241        }
1242    }
1243
1244    #[test]
1245    fn test_concurrent_insert_and_remove() {
1246        use alloc::sync::Arc;
1247        extern crate std;
1248        use std::thread;
1249
1250        let map = Arc::new(HopscotchMap::with_capacity(64));
1251
1252        // Phase 1: Pre-populate so removers have something to work with
1253        for thread_id in 0..4u64 {
1254            for i in 0..500u64 {
1255                let key = thread_id * 1000 + i;
1256                map.insert(key, key * 3);
1257            }
1258        }
1259
1260        let mut insert_handles = alloc::vec::Vec::new();
1261        let mut remove_handles = alloc::vec::Vec::new();
1262
1263        // Spawn inserter threads: each inserts keys in its own range
1264        for thread_id in 0..4u64 {
1265            let map_clone = Arc::clone(&map);
1266            insert_handles.push(thread::spawn(move || {
1267                for i in 0..500u64 {
1268                    let key = thread_id * 1000 + i;
1269                    map_clone.insert(key, key * 3);
1270                }
1271            }));
1272        }
1273
1274        // Spawn remover threads: each removes keys from the same ranges,
1275        // racing with inserters
1276        for thread_id in 0..4u64 {
1277            let map_clone = Arc::clone(&map);
1278            remove_handles.push(thread::spawn(move || {
1279                for i in 0..500u64 {
1280                    let key = thread_id * 1000 + i;
1281                    if let Some(val) = map_clone.remove(&key) {
1282                        // Value must be correct if present
1283                        assert_eq!(val, key * 3);
1284                    }
1285                }
1286            }));
1287        }
1288
1289        for handle in insert_handles {
1290            handle.join().unwrap();
1291        }
1292        for handle in remove_handles {
1293            handle.join().unwrap();
1294        }
1295
1296        // Verify: every remaining key has the correct value
1297        for thread_id in 0..4u64 {
1298            for i in 0..500u64 {
1299                let key = thread_id * 1000 + i;
1300                if let Some(val) = map.get(&key) {
1301                    assert_eq!(val, key * 3);
1302                }
1303            }
1304        }
1305    }
1306
1307    /// Regression: get_or_insert must not panic when a concurrent remove
1308    /// deletes the key between the internal insert and the return.
1309    #[test]
1310    fn test_hopscotch_get_or_insert_concurrent_remove() {
1311        use alloc::sync::Arc;
1312        extern crate std;
1313        use std::sync::Barrier;
1314        use std::thread;
1315
1316        let map = Arc::new(HopscotchMap::<u64, u64>::with_capacity(64));
1317        let barrier = Arc::new(Barrier::new(8));
1318
1319        let handles: Vec<_> = (0..8u64)
1320            .map(|tid| {
1321                let map = map.clone();
1322                let barrier = barrier.clone();
1323                thread::spawn(move || {
1324                    barrier.wait();
1325                    for i in 0..5000u64 {
1326                        let key = i % 32; // Small key space forces heavy contention
1327                        if tid % 2 == 0 {
1328                            // Half the threads do get_or_insert
1329                            let _ = map.get_or_insert(key, tid * 1000 + i);
1330                        } else {
1331                            // Other half remove
1332                            let _ = map.remove(&key);
1333                        }
1334                    }
1335                })
1336            })
1337            .collect();
1338
1339        for h in handles {
1340            h.join()
1341                .expect("Thread panicked during get_or_insert/remove race");
1342        }
1343    }
1344}