use crate::{
leapiter::{Iter, IterMut, OwnedIter},
leapref::{Ref, RefMut},
make_hash,
util::{allocate, deallocate, round_to_pow2, AllocationKind},
Value,
};
use atomic::Atomic;
use core::{
borrow::Borrow,
hash::{BuildHasher, BuildHasherDefault, Hash},
sync::atomic::{AtomicBool, AtomicPtr, AtomicU32, AtomicU64, AtomicU8, AtomicUsize, Ordering},
};
#[cfg(feature = "stable_alloc")]
use allocator_api2::alloc::{Allocator, Global};
#[cfg(not(feature = "stable_alloc"))]
use core::alloc::Allocator;
#[cfg(not(feature = "stable_alloc"))]
use std::alloc::Global;
pub(crate) type DefaultHash = std::collections::hash_map::DefaultHasher;
pub struct LeapMap<K, V, H = BuildHasherDefault<DefaultHash>, A: Allocator = Global> {
table: AtomicPtr<Table<K, V>>,
hash_builder: H,
allocator: A,
migrator: AtomicPtr<Migrator<K, V>>,
}
impl<'a, K, V, H, A: Allocator> LeapMap<K, V, H, A> {
pub fn capacity(&self) -> usize {
self.get_table(Ordering::Relaxed).size()
}
fn get_table(&self, ordering: Ordering) -> &'a Table<K, V> {
unsafe { &*self.table.load(ordering) }
}
fn get_table_mut(&self, ordering: Ordering) -> &'a mut Table<K, V> {
unsafe { &mut *self.table.load(ordering) }
}
fn is_allocated(&self) -> bool {
!self.table.load(Ordering::Relaxed).is_null()
}
}
impl<K, V> Default for LeapMap<K, V, BuildHasherDefault<DefaultHash>, Global>
where
K: Eq + Hash + Copy,
V: Value,
{
fn default() -> Self {
Self::new()
}
}
impl<K, V> LeapMap<K, V, BuildHasherDefault<DefaultHash>, Global>
where
K: Eq + Hash + Copy,
V: Value,
{
pub fn new() -> LeapMap<K, V, BuildHasherDefault<DefaultHash>, Global> {
Self::new_in(Global)
}
pub fn with_capacity(
capacity: usize,
) -> LeapMap<K, V, BuildHasherDefault<DefaultHash>, Global> {
Self::with_capacity_and_hasher_in(
capacity,
BuildHasherDefault::<DefaultHash>::default(),
Global,
)
}
}
impl<K, V, H> LeapMap<K, V, H, Global>
where
K: Eq + Hash + Copy,
V: Value,
H: BuildHasher + Default,
{
pub fn with_capacity_and_hasher(capacity: usize, builder: H) -> LeapMap<K, V, H, Global> {
Self::with_capacity_and_hasher_in(capacity, builder, Global)
}
}
impl<'a, K, V, H, A> LeapMap<K, V, H, A>
where
K: Eq + Hash + Copy + 'a,
V: Value + 'a,
H: BuildHasher + Default,
A: Allocator,
{
const INITIAL_SIZE: usize = 8;
const LINEAR_SEARCH_LIMIT: usize = 128;
const CELLS_IN_USE: usize = Self::LINEAR_SEARCH_LIMIT >> 1;
const MIGRATION_UNIT_SIZE: usize = 128;
pub fn new_in(allocator: A) -> LeapMap<K, V, H, A> {
Self::with_capacity_and_hasher_in(Self::INITIAL_SIZE, H::default(), allocator)
}
pub fn with_capacity_and_hasher_in(
capacity: usize,
builder: H,
allocator: A,
) -> LeapMap<K, V, H, A> {
let migrator_ptr = allocate::<Migrator<K, V>, A>(&allocator, 1, AllocationKind::Zeroed);
let migrator = unsafe { &mut *migrator_ptr };
migrator.initialize();
let capacity = round_to_pow2(capacity.max(Self::INITIAL_SIZE));
let table_ptr = Self::allocate_and_init_table(&allocator, capacity);
LeapMap {
table: AtomicPtr::new(table_ptr),
hash_builder: builder,
allocator,
migrator: AtomicPtr::new(migrator_ptr),
}
}
pub fn hash_usize<Q>(&self, key: &Q) -> usize
where
K: Borrow<Q>,
Q: ?Sized + Hash + Eq,
{
make_hash::<Q, H>(&self.hash_builder, key) as usize
}
pub fn get<Q>(&'a self, key: &Q) -> Option<Ref<'a, K, V, H, A>>
where
K: Borrow<Q>,
Q: ?Sized + Hash + Eq,
{
let hash = make_hash::<Q, H>(&self.hash_builder, key);
self.find(key, hash).map(|cell| Ref::new(self, cell, hash))
}
pub fn get_mut<Q>(&'a self, key: &Q) -> Option<RefMut<'a, K, V, H, A>>
where
K: Borrow<Q>,
Q: ?Sized + Hash + Eq,
{
let hash = make_hash::<Q, H>(&self.hash_builder, key);
self.find(key, hash)
.map(|cell| RefMut::new(self, cell, hash))
}
pub fn contains_key<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: ?Sized + Hash + Eq,
{
self.get(key).is_some()
}
pub fn remove<Q>(&self, key: &Q) -> Option<V>
where
K: Borrow<Q>,
Q: ?Sized + Hash + Eq,
{
let hash = make_hash::<Q, H>(&self.hash_builder, key);
self.find(key, hash).and_then(|cell| self.erase_value(cell))
}
pub fn update<Q>(&self, key: &Q, value: V) -> Option<V>
where
K: Borrow<Q>,
Q: ?Sized + Hash + Eq,
{
let hash = make_hash(&self.hash_builder, key);
debug_assert!(!value.is_null());
loop {
match self.find(key, hash) {
Some(cell) => {
let old = cell.value.load(Ordering::Relaxed);
match Self::exchange_value(cell, value, old) {
ConcurrentInsertResult::NewInsert => {
return Some(V::null());
}
ConcurrentInsertResult::Replaced(old_value) => {
return Some(old_value);
}
ConcurrentInsertResult::Overflow(_) => {
panic!("LeapMap update overflowed");
}
ConcurrentInsertResult::Migration(_) => {
self.participate_in_migration();
}
}
}
None => {
return None;
}
}
}
}
pub fn insert(&self, key: K, mut value: V) -> Option<V> {
let hash = make_hash(&self.hash_builder, &key);
debug_assert!(!value.is_null());
loop {
let table = self.get_table_mut(Ordering::Acquire);
let size_mask = table.size_mask;
let buckets = table.bucket_slice_mut();
match Self::insert_or_find(hash, &key, value, buckets, size_mask, true) {
ConcurrentInsertResult::Overflow(overflow_index) => {
let migrator = unsafe { &mut *self.migrator.load(Ordering::Relaxed) };
if migrator.set_initialization_begin_flag() {
Self::initialize_migrator(
&self.allocator,
migrator,
&self.table,
overflow_index,
);
let _self_called = migrator.set_initialization_complete_flag();
}
if migrator.initialization_complete() {
Self::perform_migration(&self.allocator, migrator, &self.table);
}
}
ConcurrentInsertResult::Replaced(old_value) => {
return Some(old_value);
}
ConcurrentInsertResult::NewInsert => {
return None;
}
ConcurrentInsertResult::Migration(retry_value) => {
value = retry_value;
self.participate_in_migration();
}
}
}
}
pub fn try_insert(&self, key: K, mut value: V) -> Option<V> {
let hash = make_hash(&self.hash_builder, &key);
debug_assert!(!value.is_null());
loop {
let table = self.get_table_mut(Ordering::Acquire);
let size_mask = table.size_mask;
let buckets = table.bucket_slice_mut();
match Self::insert_or_find(hash, &key, value, buckets, size_mask, false) {
ConcurrentInsertResult::Overflow(overflow_index) => {
let migrator = unsafe { &mut *self.migrator.load(Ordering::Relaxed) };
if migrator.set_initialization_begin_flag() {
Self::initialize_migrator(
&self.allocator,
migrator,
&self.table,
overflow_index,
);
let _self_called = migrator.set_initialization_complete_flag();
}
if migrator.initialization_complete() {
Self::perform_migration(&self.allocator, migrator, &self.table);
}
}
ConcurrentInsertResult::Replaced(old_value) => {
return Some(old_value);
}
ConcurrentInsertResult::NewInsert => {
return None;
}
ConcurrentInsertResult::Migration(retry_value) => {
value = retry_value;
self.participate_in_migration();
}
}
}
}
pub fn iter(&'a self) -> Iter<'a, K, V, H, A> {
Iter::new(self)
}
pub fn iter_mut(&'a self) -> IterMut<'a, K, V, H, A> {
IterMut::new(self)
}
pub fn len(&self) -> usize {
let table = self.get_table(Ordering::Acquire);
let buckets = table.bucket_slice();
let size = table.size();
let mut index: usize = 0;
let mut elements: usize = 0;
for bucket in buckets {
for cell in &bucket.cells {
if index >= size {
break;
}
if !cell.value.load(Ordering::Relaxed).is_null() {
elements += 1;
}
index += 1;
}
}
elements
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub(crate) fn find<Q>(&self, key: &Q, hash: HashedKey) -> Option<&'a AtomicCell<K, V>>
where
K: Borrow<Q>,
Q: ?Sized + Hash + Eq,
{
debug_assert!(hash != null_hash());
loop {
let table = self.get_table(Ordering::Acquire);
let size_mask = table.size_mask;
let buckets = table.bucket_slice();
if let Some(cell) = self.find_inner(key, hash, buckets, size_mask) {
let value = cell.value.load(Ordering::Acquire);
if !value.is_redirect() {
if value.is_null() {
return None;
}
return Some(cell);
}
self.participate_in_migration();
} else {
return None;
}
}
}
fn find_inner<Q>(
&self,
key: &Q,
hash: HashedKey,
buckets: &'a [Bucket<K, V>],
size_mask: usize,
) -> Option<&'a AtomicCell<K, V>>
where
K: Borrow<Q>,
Q: ?Sized + Hash + Eq,
{
let mut index = hash as usize & size_mask;
let cell = get_cell(buckets, index, size_mask);
let probe_hash = cell.hash.load(Ordering::Relaxed);
let probe_key = cell.key.load(Ordering::Relaxed);
if hash == probe_hash && key.eq(probe_key.borrow()) {
return Some(cell);
} else if probe_hash == null_hash() {
return None;
}
let mut delta = get_first_delta(buckets, index, size_mask).load(Ordering::Relaxed);
while delta != 0 {
index = (index + delta as usize) & size_mask;
let cell = get_cell(buckets, index, size_mask);
let probe_hash = cell.hash.load(Ordering::Relaxed);
let probe_key = cell.key.load(Ordering::Relaxed);
if probe_hash == hash && key.eq(probe_key.borrow()) {
return Some(cell);
}
delta = get_second_delta(buckets, index, size_mask).load(Ordering::Relaxed);
}
None
}
fn erase_value(&self, mut cell: &'a AtomicCell<K, V>) -> Option<V> {
loop {
let value = cell.value.load(Ordering::Relaxed);
if value.is_null() {
return None;
}
let key = cell.key.load(Ordering::Relaxed);
match cell.value.compare_exchange(
value,
V::null(),
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => {
return Some(value);
}
Err(updated) => {
if !updated.is_redirect() {
return Some(value);
}
let hash = cell.hash.load(Ordering::Relaxed);
loop {
self.participate_in_migration();
match self.find(&key, hash) {
Some(new_cell) => {
cell = new_cell;
if !cell.value.load(Ordering::Relaxed).is_redirect() {
break;
}
}
None => return None,
}
}
}
}
}
}
fn initialize_migrator(
allocator: &A,
migrator: &mut Migrator<K, V>,
table: &AtomicPtr<Table<K, V>>,
overflow_index: usize,
) {
let src_table_ptr = table.load(Ordering::Relaxed);
let src_table = unsafe { &*src_table_ptr };
let src_buckets = src_table.bucket_slice();
let size_mask = src_table.size_mask;
let mut index = overflow_index - Self::CELLS_IN_USE;
let mut cells_in_use = 0;
for _ in 0..Self::CELLS_IN_USE {
let cell = get_cell(src_buckets, index, size_mask);
if !cell.value.load(Ordering::Relaxed).is_null() {
cells_in_use += 1;
}
index += 1;
}
let ratio = cells_in_use as f32 / Self::CELLS_IN_USE as f32;
let in_use_estimated = (size_mask + 1) as f32 * ratio;
let estimated = round_to_pow2((in_use_estimated * 2.0).max(1.0) as usize);
let new_table_size = estimated.max(Self::INITIAL_SIZE);
let dst_table_ptr = Self::allocate_and_init_table(allocator, new_table_size);
let dst_table = unsafe { &*dst_table_ptr };
debug_assert!(dst_table.size_mask != 0);
migrator.dst_table.store(dst_table_ptr, Ordering::Relaxed);
let _old = migrator
.status
.fetch_and(Migrator::<K, V>::STATUS_MASK, Ordering::Relaxed);
migrator
.remaining_units
.store(Self::remaining_units(size_mask), Ordering::Relaxed);
migrator.overflowed.store(false, Ordering::Relaxed);
let last_source = migrator.last_stale_source.load(Ordering::Relaxed);
let num_sources = migrator.num_source_tables.load(Ordering::Relaxed);
migrator.num_source_tables.store(0, Ordering::Relaxed);
for i in 0..num_sources {
let source = migrator.sources.pop().unwrap();
let index = migrator.stale_source_index((last_source + i) as usize);
debug_assert!(migrator.stale_sources[index]
.load(Ordering::Relaxed)
.is_null());
migrator.stale_sources[index]
.store(source.table.load(Ordering::Relaxed), Ordering::Relaxed);
let _old = migrator.last_stale_source.fetch_add(1, Ordering::Relaxed);
}
let source = MigrationSource::<K, V> {
table: AtomicPtr::new(src_table_ptr),
index: AtomicUsize::new(0),
};
if migrator.sources.is_empty() {
migrator.sources.push(source);
} else {
migrator.sources[0] = source;
}
migrator.num_source_tables.store(1, Ordering::Relaxed);
}
pub(crate) fn participate_in_migration(&self) {
let migrator = unsafe { &mut *self.migrator.load(Ordering::Relaxed) };
if !migrator.in_process() {
return;
}
if migrator.finishing() {
while migrator.finishing() {
migrator.cleanup_stale_table(&self.allocator);
}
return;
}
if migrator.in_process() && !migrator.initialization_complete() {
while !migrator.initialization_complete() {
migrator.cleanup_stale_table(&self.allocator);
}
}
Self::perform_migration(&self.allocator, migrator, &self.table);
}
fn perform_migration(
allocator: &A,
migrator: &mut Migrator<K, V>,
dst_table: &AtomicPtr<Table<K, V>>,
) {
let mut status = migrator.status.load(Ordering::Relaxed);
loop {
if status & 1 == 1 {
return;
}
match migrator.status.compare_exchange(
status,
status + 8,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(updated) => status = updated,
}
}
let num_sources = migrator.num_source_tables.load(Ordering::Relaxed);
for i in 0..num_sources {
let mut finished = false;
loop {
if migrator.finishing() {
finished = true;
break;
}
if i as usize >= migrator.sources.len() {
break;
}
let source = &migrator.sources[i as usize];
let start_index = source
.index
.fetch_add(Self::MIGRATION_UNIT_SIZE, Ordering::Relaxed);
if start_index >= source.size() {
break;
}
let end_index = start_index + Self::MIGRATION_UNIT_SIZE;
if !migrator.migrate_range::<H, A>(i as usize, start_index, end_index) {
migrator.overflowed.store(true, Ordering::Relaxed);
let _old = migrator.status.fetch_or(1, Ordering::Relaxed);
finished = true;
break;
}
if migrator.remaining_units.fetch_sub(1, Ordering::Relaxed) == 1 {
let _old = migrator.status.fetch_or(1, Ordering::Relaxed);
finished = true;
break;
}
}
if finished {
break;
}
}
let prev_status = migrator.status.fetch_sub(8, Ordering::AcqRel);
if prev_status >= 16 {
while migrator.status.load(Ordering::Relaxed) >= 1 {
}
return;
}
debug_assert!(prev_status == 15);
if migrator.overflowed.load(Ordering::Relaxed) {
let table_ptr = migrator.dst_table.load(Ordering::Relaxed);
let table = unsafe { &*table_ptr };
let mut remaining_units = Self::remaining_units(table.size_mask);
for source in &migrator.sources {
remaining_units += Self::remaining_units(source.size());
source.index.store(0, Ordering::Relaxed);
}
migrator.sources.push(MigrationSource {
table: AtomicPtr::new(table_ptr),
index: AtomicUsize::new(0),
});
migrator
.remaining_units
.store(remaining_units, Ordering::Relaxed);
let new_dst_table_ptr =
Self::allocate_and_init_table(allocator, (table.size_mask + 1) * 2);
migrator
.dst_table
.store(new_dst_table_ptr, Ordering::Relaxed);
migrator.status.store(0, Ordering::Relaxed);
} else {
let new_table_ptr = migrator.dst_table.load(Ordering::Relaxed);
dst_table.store(new_table_ptr, Ordering::Release);
migrator.overflowed.store(false, Ordering::Relaxed);
migrator.status.store(0, Ordering::Release);
}
}
pub(super) fn insert_or_find(
hash: HashedKey,
key: &K,
value: V,
buckets: &[Bucket<K, V>],
size_mask: usize,
must_update: bool,
) -> ConcurrentInsertResult<V> {
let mut index = hash as usize;
let cell = get_cell(buckets, index, size_mask);
let mut probe_hash = cell.hash.load(Ordering::Relaxed);
if probe_hash == null_hash() {
match cell
.hash
.compare_exchange(probe_hash, hash, Ordering::Relaxed, Ordering::Relaxed)
{
Ok(_) => {
cell.key.store(*key, Ordering::Relaxed);
return Self::exchange_value(cell, value, V::null());
}
Err(new_hash) => {
probe_hash = new_hash;
}
}
}
let probe_key = cell.key.load(Ordering::Relaxed);
if probe_hash == hash && probe_key.eq(key.borrow()) {
let old_value = cell.value.load(Ordering::Acquire);
if old_value.is_redirect() {
return ConcurrentInsertResult::Migration(value);
}
if must_update || old_value.is_null() {
return Self::exchange_value(cell, value, old_value);
} else {
return ConcurrentInsertResult::Replaced(old_value);
}
}
let max_index = index + size_mask;
debug_assert!(max_index as i64 - index as i64 >= 0);
let mut first = true;
loop {
let mut follow_link = false;
let prev_link = get_delta(buckets, index, size_mask, first);
let probe_delta = prev_link.load(Ordering::Relaxed);
first = false;
if probe_delta != 0 {
index += probe_delta as usize;
let cell = get_cell(buckets, index, size_mask);
let mut probe_hash = cell.hash.load(Ordering::Relaxed);
if probe_hash == null_hash() {
loop {
probe_hash = cell.hash.load(Ordering::Acquire);
if probe_hash != null_hash() {
break;
}
}
}
debug_assert!((probe_hash ^ hash) as usize & size_mask == 0);
let probe_key = cell.key.load(Ordering::Relaxed);
if probe_hash == hash && probe_key.eq(key.borrow()) {
let old_value = cell.value.load(Ordering::Acquire);
if old_value.is_redirect() {
return ConcurrentInsertResult::Migration(value);
}
if must_update || old_value.is_null() {
return Self::exchange_value(cell, value, old_value);
} else {
return ConcurrentInsertResult::Replaced(old_value);
}
}
} else {
let prev_link_index = index;
debug_assert!(max_index as i64 - index as i64 >= 0);
let mut probes_remaining = Self::LINEAR_SEARCH_LIMIT.min(max_index - index);
while probes_remaining > 0 {
index += 1;
let cell = get_cell(buckets, index, size_mask);
let mut probe_hash = cell.hash.load(Ordering::Relaxed);
if probe_hash == null_hash() {
match cell.hash.compare_exchange(
probe_hash,
hash,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => {
let offset = (index - prev_link_index) as u8;
prev_link.store(offset, Ordering::Relaxed);
cell.key.store(*key, Ordering::Relaxed);
return Self::exchange_value(cell, value, V::null());
}
Err(updated) => probe_hash = updated,
}
}
let x = hash ^ probe_hash;
let probe_key = cell.key.load(Ordering::Relaxed);
if x == 0 && probe_key == *key {
let old_value = cell.value.load(Ordering::Acquire);
if must_update || old_value.is_null() {
return Self::exchange_value(cell, value, old_value);
} else {
return ConcurrentInsertResult::Replaced(old_value);
}
}
if x & size_mask as u64 == 0 {
let offset = (index - prev_link_index) as u8;
prev_link.store(offset, Ordering::Relaxed);
follow_link = true;
break;
}
probes_remaining -= 1;
}
if !follow_link {
return ConcurrentInsertResult::Overflow(index + 1);
}
}
}
}
fn exchange_value(
cell: &AtomicCell<K, V>,
desired: V,
old_value: V,
) -> ConcurrentInsertResult<V> {
let exchange_result =
cell.value
.compare_exchange(old_value, desired, Ordering::AcqRel, Ordering::Relaxed);
if exchange_result.is_ok() {
if old_value.is_null() {
return ConcurrentInsertResult::NewInsert;
} else {
return ConcurrentInsertResult::Replaced(old_value);
}
}
let value = exchange_result.unwrap_err();
if !value.is_redirect() {
return ConcurrentInsertResult::Replaced(desired);
}
ConcurrentInsertResult::Migration(desired)
}
fn allocate_and_init_table(allocator: &A, cells: usize) -> *mut Table<K, V> {
assert!(cells >= 4 && (cells % 2 == 0));
let bucket_count = cells >> 2;
let bucket_ptr =
allocate::<Bucket<K, V>, A>(allocator, bucket_count, AllocationKind::Uninitialized);
let buckets = unsafe { std::slice::from_raw_parts_mut(bucket_ptr, bucket_count) };
for bucket in buckets.iter_mut().take(bucket_count) {
unsafe {
let bucket_deltas = &mut bucket.deltas as *mut AtomicU8;
std::ptr::write_bytes(bucket_deltas, 0, 8);
};
for cell in 0..4 {
unsafe {
let cell_hash: *mut AtomicHashedKey = &mut bucket.cells[cell].hash;
std::ptr::write_bytes(cell_hash, 0, 1);
};
let cell_value = &mut bucket.cells[cell].value;
*cell_value = Atomic::new(V::null());
}
}
let table_ptr = allocate::<Table<K, V>, A>(allocator, 1, AllocationKind::Uninitialized);
let table = unsafe { &mut *table_ptr };
table.buckets = bucket_ptr;
table.size_mask = cells - 1;
table_ptr
}
pub(crate) fn get_cell_at_index_mut(&'a self, index: usize) -> Option<RefMut<'a, K, V, H, A>> {
let table = self.get_table(Ordering::Acquire);
if index >= table.size() {
return None;
}
let buckets = table.bucket_slice();
let size_mask = table.size_mask;
let cell = get_cell(buckets, index, size_mask);
let cell_hash = cell.hash.load(Ordering::Relaxed);
Some(RefMut::new(self, cell, cell_hash))
}
pub(crate) fn get_cell_at_index(&'a self, index: usize) -> Option<Ref<'a, K, V, H, A>> {
let table = self.get_table(Ordering::Acquire);
if index >= table.size() {
return None;
}
let buckets = table.bucket_slice();
let size_mask = table.size_mask;
let cell = get_cell(buckets, index, size_mask);
let cell_hash = cell.hash.load(Ordering::Relaxed);
Some(Ref::new(self, cell, cell_hash))
}
#[inline]
fn remaining_units(size_mask: usize) -> usize {
size_mask / Self::MIGRATION_UNIT_SIZE + 1
}
}
impl<K, V, H, A: Allocator> Drop for LeapMap<K, V, H, A> {
fn drop(&mut self) {
if self.is_allocated() {
let table = self.get_table_mut(Ordering::SeqCst);
let bucket_ptr = table.buckets;
let bucket_count = table.size() >> 2;
deallocate::<Bucket<K, V>, A>(&self.allocator, bucket_ptr, bucket_count);
let table_ptr = self.table.load(Ordering::Relaxed);
deallocate::<Table<K, V>, A>(&self.allocator, table_ptr, 1);
}
let migrator_ptr = self.migrator.load(Ordering::SeqCst);
if !migrator_ptr.is_null() {
let migrator = unsafe { &mut *migrator_ptr };
for source in &migrator.sources {
let source_table_ptr = source.table.load(Ordering::Relaxed);
if !source_table_ptr.is_null() {
let source_table = unsafe { &mut *source_table_ptr };
let bucket_ptr = source_table.buckets;
let bucket_count = source_table.size() >> 2;
deallocate::<Bucket<K, V>, A>(&self.allocator, bucket_ptr, bucket_count);
deallocate::<Table<K, V>, A>(&self.allocator, source_table_ptr, 1);
}
}
while migrator.stale_tables_remaining() > 0 {
migrator.cleanup_stale_table(&self.allocator);
}
unsafe { std::ptr::drop_in_place(migrator_ptr) };
deallocate::<Migrator<K, V>, A>(&self.allocator, migrator_ptr, 1);
}
}
}
impl<K, V, H, A> IntoIterator for LeapMap<K, V, H, A>
where
K: Eq + Hash + Copy,
V: Value,
H: BuildHasher + Default,
A: Allocator,
{
type Item = (K, V);
type IntoIter = OwnedIter<K, V, H, A>;
fn into_iter(self) -> Self::IntoIter {
OwnedIter::new(self)
}
}
#[inline]
pub(crate) const fn null_hash() -> u64 {
0u64
}
#[inline]
fn get_cell<K, V: Value>(
buckets: &[Bucket<K, V>],
index: usize,
size_mask: usize,
) -> &AtomicCell<K, V> {
let bucket_index = get_bucket_index(index, size_mask);
let cell_index = get_cell_index(index);
&buckets[bucket_index].cells[cell_index]
}
#[inline]
fn get_delta<K, V>(
buckets: &[Bucket<K, V>],
index: usize,
size_mask: usize,
first: bool,
) -> &AtomicU8 {
let offset = if first { 0 } else { 4 };
let bucket_index = get_bucket_index(index, size_mask);
let cell_index = get_cell_index(index);
&buckets[bucket_index].deltas[cell_index + offset]
}
#[inline]
fn get_first_delta<K, V>(buckets: &[Bucket<K, V>], index: usize, size_mask: usize) -> &AtomicU8 {
let bucket_index = get_bucket_index(index, size_mask);
let cell_index = get_cell_index(index);
&buckets[bucket_index].deltas[cell_index]
}
#[inline]
fn get_second_delta<K, V>(buckets: &[Bucket<K, V>], index: usize, size_mask: usize) -> &AtomicU8 {
let bucket_index = get_bucket_index(index, size_mask);
let cell_index = get_cell_index(index);
&buckets[bucket_index].deltas[cell_index + 4]
}
#[inline]
const fn get_bucket_index(index: usize, size_mask: usize) -> usize {
(index & size_mask) >> 2
}
#[inline]
const fn get_cell_index(index: usize) -> usize {
index & 3
}
type HashedKey = u64;
type AtomicHashedKey = AtomicU64;
pub(super) enum ConcurrentInsertResult<V> {
Replaced(V),
NewInsert,
Migration(V),
Overflow(usize),
}
struct Table<K, V> {
buckets: *mut Bucket<K, V>,
size_mask: usize,
}
impl<K, V> Table<K, V> {
pub(super) fn bucket_slice_mut(&mut self) -> &mut [Bucket<K, V>] {
unsafe { std::slice::from_raw_parts_mut(self.buckets, self.size()) }
}
pub(super) fn bucket_slice(&self) -> &[Bucket<K, V>] {
unsafe { std::slice::from_raw_parts(self.buckets, self.size()) }
}
pub(super) fn size(&self) -> usize {
self.size_mask + 1
}
}
pub struct AtomicCell<K, V> {
pub(crate) hash: AtomicHashedKey,
pub(crate) key: Atomic<K>,
pub(crate) value: Atomic<V>,
}
pub(super) struct Bucket<K, V> {
deltas: [AtomicU8; 8],
cells: [AtomicCell<K, V>; 4],
}
struct MigrationSource<K, V> {
table: AtomicPtr<Table<K, V>>,
index: AtomicUsize,
}
impl<K, V> MigrationSource<K, V> {
pub(super) fn size(&self) -> usize {
let table = unsafe { &*self.table.load(Ordering::Relaxed) };
table.size_mask + 1
}
}
struct Migrator<K, V> {
dst_table: AtomicPtr<Table<K, V>>,
sources: Vec<MigrationSource<K, V>>,
status: AtomicUsize,
remaining_units: AtomicUsize,
overflowed: AtomicBool,
num_source_tables: AtomicU32,
stale_sources: Vec<AtomicPtr<Table<K, V>>>,
last_stale_source: AtomicU32,
current_stale_source: AtomicU32,
}
impl<K, V> Migrator<K, V> {
pub(super) const RESET_FLAG: usize = 0x00;
pub(super) const MIGRATION_COMPLETE_FLAG: usize = 0x01;
pub(super) const INITIALIZATION_START_FLAG: usize = 0x02;
pub(super) const INITIALIZATION_END_FLAG: usize = 0x04;
pub(super) const INITIALIZATION_MASK: usize =
Self::INITIALIZATION_START_FLAG | Self::INITIALIZATION_END_FLAG;
pub(super) const STATUS_MASK: usize = Self::MIGRATION_COMPLETE_FLAG | Self::INITIALIZATION_MASK;
const STALE_SOURCES: usize = 32;
const STALE_CAPACITY_MASK: usize = Self::STALE_SOURCES - 1;
fn initialize(&mut self) {
self.stale_sources = Vec::with_capacity(Self::STALE_SOURCES);
for _ in 0..Self::STALE_SOURCES {
self.stale_sources
.push(AtomicPtr::<Table<K, V>>::new(std::ptr::null_mut()));
}
self.dst_table
.store(std::ptr::null_mut(), Ordering::Relaxed);
self.sources = vec![];
self.status.store(Self::RESET_FLAG, Ordering::Relaxed);
self.remaining_units.store(0, Ordering::Relaxed);
self.overflowed.store(false, Ordering::Relaxed);
self.num_source_tables.store(0, Ordering::Relaxed);
self.last_stale_source.store(0, Ordering::Relaxed);
self.current_stale_source.store(0, Ordering::Relaxed);
}
fn set_initialization_begin_flag(&self) -> bool {
let old = self
.status
.fetch_or(Self::INITIALIZATION_START_FLAG, Ordering::Relaxed);
old & Self::INITIALIZATION_START_FLAG == 0
}
fn set_initialization_complete_flag(&self) -> bool {
let old = self
.status
.fetch_or(Self::INITIALIZATION_END_FLAG, Ordering::Relaxed);
old & Self::INITIALIZATION_END_FLAG == 0
}
fn finishing(&self) -> bool {
let status = self.status.load(Ordering::Relaxed);
status & Self::MIGRATION_COMPLETE_FLAG == Self::MIGRATION_COMPLETE_FLAG || status == 0
}
fn initialization_complete(&self) -> bool {
self.status.load(Ordering::Relaxed) & Self::INITIALIZATION_MASK == Self::INITIALIZATION_MASK
}
fn in_process(&self) -> bool {
self.status.load(Ordering::Relaxed) & Self::INITIALIZATION_START_FLAG
== Self::INITIALIZATION_START_FLAG
}
fn stale_source_index(&self, index: usize) -> usize {
index & Self::STALE_CAPACITY_MASK
}
fn stale_tables_remaining(&self) -> u32 {
self.last_stale_source.load(Ordering::Relaxed)
- self.current_stale_source.load(Ordering::Relaxed)
}
fn cleanup_stale_table<A: Allocator>(&self, allocator: &A) {
let current = self.current_stale_source.load(Ordering::Relaxed);
let last_visible = self.last_stale_source.load(Ordering::Relaxed);
if current >= last_visible {
return;
}
if self
.current_stale_source
.compare_exchange(current, current + 1, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
let index = self.stale_source_index(current as usize);
let table_ptr = self.stale_sources[index].load(Ordering::Relaxed);
if table_ptr.is_null() {
return;
}
let table = unsafe { &mut *table_ptr };
let bucket_ptr = table.buckets;
let bucket_count = table.size() >> 2;
deallocate::<Bucket<K, V>, A>(allocator, bucket_ptr, bucket_count);
deallocate::<Table<K, V>, A>(allocator, table_ptr, 1);
self.stale_sources[index].store(std::ptr::null_mut(), Ordering::Relaxed);
}
}
fn migrate_range<H, A>(&self, src_index: usize, start_index: usize, end_index: usize) -> bool
where
K: Hash + Eq + Copy,
H: BuildHasher + Default,
A: Allocator,
V: Value,
{
let source = &self.sources[src_index];
let src_table = unsafe { &*source.table.load(Ordering::Relaxed) };
let src_size_mask = src_table.size_mask;
let src_buckets = src_table.bucket_slice();
let dst_table = unsafe { &*self.dst_table.load(Ordering::Relaxed) };
let dst_size_mask = dst_table.size_mask;
let dst_buckets = dst_table.bucket_slice();
for index in start_index..end_index {
let cell = get_cell(src_buckets, index, src_size_mask);
loop {
let src_hash = cell.hash.load(Ordering::Relaxed);
if src_hash == null_hash() {
match cell.value.compare_exchange(
V::null(),
V::redirect(),
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(old) => {
if old.is_redirect() {
break;
}
}
}
} else {
let mut src_value = cell.value.load(Ordering::Relaxed);
if src_value.is_null() {
match cell.value.compare_exchange(
src_value,
V::redirect(),
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(old) => {
if old.is_redirect() {
break;
}
}
}
} else if src_value.is_redirect() {
break;
}
let src_key = cell.key.load(Ordering::Relaxed);
loop {
match LeapMap::<K, V, H, A>::insert_or_find(
src_hash,
&src_key,
src_value,
dst_buckets,
dst_size_mask,
true,
) {
ConcurrentInsertResult::Overflow(_) => {
return false;
}
ConcurrentInsertResult::Migration(_) => {
return false;
}
_ => {
match cell.value.compare_exchange(
src_value,
V::redirect(),
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_old) => {
break;
}
Err(updated) => {
src_value = updated;
}
}
}
}
}
break;
}
}
}
true
}
}
#[cfg(test)]
mod tests {
use super::*;
use core::alloc::{AllocError, Allocator};
use std::alloc::Layout;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicUsize, Ordering};
static MIGRATOR_DROPS: AtomicUsize = AtomicUsize::new(0);
impl<K, V> Drop for Migrator<K, V> {
fn drop(&mut self) {
MIGRATOR_DROPS.fetch_add(1, Ordering::Relaxed);
}
}
struct CountingAllocator {
allocated: AtomicUsize,
}
unsafe impl Allocator for &CountingAllocator {
fn allocate(&self, layout: Layout) -> Result<NonNull<[u8]>, AllocError> {
let ptr = unsafe { std::alloc::alloc(layout) };
if ptr.is_null() {
return Err(AllocError);
}
self.allocated.fetch_add(layout.size(), Ordering::Relaxed);
Ok(NonNull::slice_from_raw_parts(
unsafe { NonNull::new_unchecked(ptr) },
layout.size(),
))
}
unsafe fn deallocate(&self, ptr: NonNull<u8>, layout: Layout) {
unsafe { std::alloc::dealloc(ptr.as_ptr(), layout) };
self.allocated.fetch_sub(layout.size(), Ordering::Relaxed);
}
}
#[test]
fn no_memory_leak_after_resize() {
let allocator = CountingAllocator {
allocated: AtomicUsize::new(0),
};
{
let map: LeapMap<usize, usize, BuildHasherDefault<DefaultHash>, &CountingAllocator> =
LeapMap::new_in(&allocator);
for i in 1..10_000usize {
map.insert(i, i);
}
}
let leaked = allocator.allocated.load(Ordering::Relaxed);
assert_eq!(
leaked, 0,
"Memory leak: {leaked} bytes not freed after dropping LeapMap (issue #16)"
);
}
#[test]
fn migrator_drop_runs_when_leapmap_is_dropped() {
MIGRATOR_DROPS.store(0, Ordering::Relaxed);
{
let map: LeapMap<usize, usize> = LeapMap::new();
drop(map);
}
assert_eq!(MIGRATOR_DROPS.load(Ordering::Relaxed), 1);
}
}