pub mod bucket;
pub mod bucket_array;
use std::hash::{BuildHasher, Hash};
use std::ptr;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
use bucket::{BUCKET_LEN, CACHE, DataBlock, EntryPtr, INDEX, LruList, Reader, Writer};
use bucket_array::BucketArray;
use sdd::{AtomicShared, Guard, Ptr, Shared, Tag};
use super::Equivalent;
use super::exit_guard::ExitGuard;
use crate::async_helper::SendableGuard;
const MAX_RESIZE_FACTOR: usize = (usize::BITS / 2) as usize;
pub(super) trait HashTable<K, V, H, L: LruList, const TYPE: char>
where
K: Eq + Hash,
H: BuildHasher,
{
#[inline]
fn hash<Q>(&self, key: &Q) -> u64
where
Q: Equivalent<K> + Hash + ?Sized,
{
self.hasher().hash_one(key)
}
fn hasher(&self) -> &H;
fn bucket_array(&self) -> &AtomicShared<BucketArray<K, V, L, TYPE>>;
#[inline]
fn calculate_bucket_index<Q>(&self, key: &Q) -> usize
where
Q: Equivalent<K> + Hash + ?Sized,
{
self.bucket_array()
.load(Acquire, &Guard::new())
.as_ref()
.map_or(0, |a| a.calculate_bucket_index(self.hash(key)))
}
fn minimum_capacity(&self) -> &AtomicUsize;
fn maximum_capacity(&self) -> usize;
fn reserve_capacity(&self, additional_capacity: usize) -> usize {
let mut current_minimum_capacity = self.minimum_capacity().load(Relaxed);
loop {
let Some(new_minimum_capacity) =
current_minimum_capacity.checked_add(additional_capacity)
else {
return 0;
};
match self.minimum_capacity().compare_exchange_weak(
current_minimum_capacity,
new_minimum_capacity,
Relaxed,
Relaxed,
) {
Ok(_) => {
let guard = Guard::new();
if let Some(current_array) = self.bucket_array().load(Acquire, &guard).as_ref()
{
if !current_array.has_old_array() {
self.try_resize(current_array, 0, &guard);
}
}
return additional_capacity;
}
Err(actual) => current_minimum_capacity = actual,
}
}
}
#[inline]
fn get_or_create_bucket_array<'g>(&self, guard: &'g Guard) -> &'g BucketArray<K, V, L, TYPE> {
if let Some(current_array) = self.bucket_array().load(Acquire, guard).as_ref() {
current_array
} else {
unsafe {
match self.bucket_array().compare_exchange(
Ptr::null(),
(
Some(Shared::new_unchecked(BucketArray::<K, V, L, TYPE>::new(
self.minimum_capacity().load(Relaxed),
AtomicShared::null(),
))),
Tag::None,
),
AcqRel,
Acquire,
guard,
) {
Ok((_, ptr)) | Err((_, ptr)) => ptr.as_ref().unwrap_unchecked(),
}
}
}
}
#[inline]
fn num_slots(&self, guard: &Guard) -> usize {
if let Some(current_array) = self.bucket_array().load(Acquire, guard).as_ref() {
current_array.num_slots()
} else {
0
}
}
#[inline]
fn num_entries(&self, guard: &Guard) -> usize {
let mut num_entries = 0;
if let Some(current_array) = self.bucket_array().load(Acquire, guard).as_ref() {
let old_array_ptr = current_array.old_array(guard);
if let Some(old_array) = old_array_ptr.as_ref() {
if !self.incremental_rehash_sync::<true>(current_array, guard) {
for i in 0..old_array.len() {
num_entries += old_array.bucket(i).len();
}
}
}
for i in 0..current_array.len() {
num_entries += current_array.bucket(i).len();
}
if num_entries == 0
&& self.minimum_capacity().load(Relaxed) == 0
&& !current_array.has_old_array()
{
self.try_resize(current_array, 0, guard);
}
}
num_entries
}
#[inline]
fn from_index_to_range(from_len: usize, to_len: usize, from_index: usize) -> (usize, usize) {
debug_assert!(from_len.is_power_of_two() && to_len.is_power_of_two());
if from_len < to_len {
let ratio = to_len / from_len;
let start_index = from_index * ratio;
debug_assert!(
start_index + ratio <= to_len,
"+ {start_index} < {to_len}, {from_len} {to_len} {ratio} {from_index}"
);
(start_index, start_index + ratio)
} else {
let ratio = from_len / to_len;
let start_index = from_index / ratio;
debug_assert!(
start_index < to_len,
"- {start_index} < {to_len}, {from_len} {to_len} {ratio} {from_index}"
);
(start_index, start_index + 1)
}
}
fn has_entry(&self, guard: &Guard) -> bool {
if let Some(current_array) = self.bucket_array().load(Acquire, guard).as_ref() {
let old_array_ptr = current_array.old_array(guard);
if let Some(old_array) = old_array_ptr.as_ref() {
if !self.incremental_rehash_sync::<true>(current_array, guard) {
for i in 0..old_array.len() {
if old_array.bucket(i).len() != 0 {
return true;
}
}
}
}
for i in 0..current_array.len() {
if current_array.bucket(i).len() != 0 {
return true;
}
}
if self.minimum_capacity().load(Relaxed) == 0 && !current_array.has_old_array() {
self.try_resize(current_array, 0, guard);
}
}
false
}
#[inline]
fn sample(
current_array: &BucketArray<K, V, L, TYPE>,
sampling_index: usize,
sample_size: usize,
) -> usize {
let mut num_entries = 0;
for i in sampling_index..(sampling_index + sample_size) {
num_entries += current_array.bucket(i % current_array.len()).len();
}
num_entries * (current_array.len() / sample_size)
}
#[inline]
fn check_rebuild(
current_array: &BucketArray<K, V, L, TYPE>,
sampling_index: usize,
sample_size: usize,
) -> bool {
let mut num_buckets_to_rebuild = 0;
for i in sampling_index..(sampling_index + sample_size) {
if current_array.bucket(i % current_array.len()).need_rebuild() {
num_buckets_to_rebuild += 1;
if num_buckets_to_rebuild >= sample_size / 2 {
return true;
}
}
}
false
}
#[inline]
fn peek_entry<'g, Q>(&self, key: &Q, hash: u64, guard: &'g Guard) -> Option<&'g (K, V)>
where
Q: Equivalent<K> + Hash + ?Sized,
{
debug_assert_eq!(TYPE, INDEX);
let mut current_array_ptr = self.bucket_array().load(Acquire, guard);
while let Some(current_array) = current_array_ptr.as_ref() {
let index = current_array.calculate_bucket_index(hash);
if let Some(old_array) = current_array.old_array(guard).as_ref() {
if !self.dedup_bucket_sync::<true>(current_array, old_array, index, guard) {
let index = old_array.calculate_bucket_index(hash);
if let Some(entry) = old_array.bucket(index).search_entry(
old_array.data_block(index),
key,
hash,
guard,
) {
return Some(entry);
}
}
}
if let Some(entry) = current_array.bucket(index).search_entry(
current_array.data_block(index),
key,
hash,
guard,
) {
return Some(entry);
}
let new_current_array_ptr = self.bucket_array().load(Acquire, guard);
if current_array_ptr == new_current_array_ptr {
break;
}
current_array_ptr = new_current_array_ptr;
}
None
}
#[inline]
async fn reader_async<Q, R, F: FnOnce(&K, &V) -> R>(
&self,
key: &Q,
hash: u64,
f: F,
sendable_guard: &SendableGuard,
) -> Option<R>
where
Q: Equivalent<K> + Hash + ?Sized,
{
while let Some(current_array) = sendable_guard.load(self.bucket_array(), Acquire) {
let index = current_array.calculate_bucket_index(hash);
if current_array.has_old_array()
&& !self
.dedup_bucket_async(current_array, index, sendable_guard)
.await
{
continue;
}
let bucket = current_array.bucket(index);
if let Some(reader) = Reader::lock_async(bucket, sendable_guard).await {
if let Some(entry) = reader.search_entry(
current_array.data_block(index),
key,
hash,
sendable_guard.guard(),
) {
return Some(f(&entry.0, &entry.1));
}
break;
}
}
None
}
#[inline]
fn reader_sync<Q, R, F: FnOnce(&K, &V) -> R>(
&self,
key: &Q,
hash: u64,
f: F,
guard: &Guard,
) -> Option<R>
where
Q: Equivalent<K> + Hash + ?Sized,
{
while let Some(current_array) = self.bucket_array().load(Acquire, guard).as_ref() {
let index = current_array.calculate_bucket_index(hash);
if let Some(old_array) = current_array.old_array(guard).as_ref() {
self.dedup_bucket_sync::<false>(current_array, old_array, index, guard);
}
let bucket = current_array.bucket(index);
if let Some(reader) = Reader::lock_sync(bucket) {
if let Some(entry) =
reader.search_entry(current_array.data_block(index), key, hash, guard)
{
return Some(f(&entry.0, &entry.1));
}
break;
}
}
None
}
#[inline]
async fn writer_async<R, F>(&self, hash: u64, sendable_guard: &SendableGuard, f: F) -> R
where
F: FnOnce(Writer<K, V, L, TYPE>, &DataBlock<K, V, BUCKET_LEN>, usize, usize) -> R,
{
loop {
let current_array = self.get_or_create_bucket_array(sendable_guard.guard());
let index = current_array.calculate_bucket_index(hash);
if current_array.has_old_array()
&& !self
.dedup_bucket_async(current_array, index, sendable_guard)
.await
{
continue;
}
let bucket = current_array.bucket(index);
if (TYPE != CACHE || current_array.num_slots() < self.maximum_capacity())
&& current_array.initiate_sampling(index)
&& bucket.len() >= BUCKET_LEN - 1
{
self.try_enlarge(current_array, index, bucket.len(), sendable_guard.guard());
}
if let Some(writer) = Writer::lock_async(bucket, sendable_guard).await {
return f(
writer,
current_array.data_block(index),
index,
current_array.len(),
);
}
}
}
#[inline]
fn writer_sync<R, F>(&self, hash: u64, guard: &Guard, f: F) -> R
where
F: FnOnce(Writer<K, V, L, TYPE>, &DataBlock<K, V, BUCKET_LEN>, usize, usize) -> R,
{
loop {
let current_array = self.get_or_create_bucket_array(guard);
let index = current_array.calculate_bucket_index(hash);
if let Some(old_array) = current_array.old_array(guard).as_ref() {
self.dedup_bucket_sync::<false>(current_array, old_array, index, guard);
}
let bucket = current_array.bucket(index);
if (TYPE != CACHE || current_array.num_slots() < self.maximum_capacity())
&& current_array.initiate_sampling(index)
&& bucket.len() >= BUCKET_LEN - 1
{
self.try_enlarge(current_array, index, bucket.len(), guard);
}
if let Some(writer) = Writer::lock_sync(bucket) {
return f(
writer,
current_array.data_block(index),
index,
current_array.len(),
);
}
}
}
#[inline]
async fn optional_writer_async<R, F>(
&self,
hash: u64,
sendable_guard: &SendableGuard,
f: F,
) -> Result<R, F>
where
F: FnOnce(Writer<K, V, L, TYPE>, &DataBlock<K, V, BUCKET_LEN>, usize, usize) -> (R, bool),
{
while let Some(current_array) = sendable_guard.load(self.bucket_array(), Acquire) {
let index = current_array.calculate_bucket_index(hash);
if current_array.has_old_array()
&& !self
.dedup_bucket_async(current_array, index, sendable_guard)
.await
{
continue;
}
let bucket = current_array.bucket(index);
if let Some(writer) = Writer::lock_async(bucket, sendable_guard).await {
let (result, try_shrink) = f(
writer,
current_array.data_block(index),
index,
current_array.len(),
);
if try_shrink
&& current_array.initiate_sampling(index)
&& sendable_guard.check_ref(self.bucket_array(), current_array, Acquire)
{
self.try_shrink_or_rebuild(current_array, index, sendable_guard.guard());
}
return Ok(result);
}
}
Err(f)
}
#[inline]
fn optional_writer_sync<R, F>(&self, hash: u64, guard: &Guard, f: F) -> Result<R, F>
where
F: FnOnce(Writer<K, V, L, TYPE>, &DataBlock<K, V, BUCKET_LEN>, usize, usize) -> (R, bool),
{
while let Some(current_array) = self.bucket_array().load(Acquire, guard).as_ref() {
let index = current_array.calculate_bucket_index(hash);
if let Some(old_array) = current_array.old_array(guard).as_ref() {
self.dedup_bucket_sync::<false>(current_array, old_array, index, guard);
}
let bucket = current_array.bucket(index);
if let Some(writer) = Writer::lock_sync(bucket) {
let (result, try_shrink) = f(
writer,
current_array.data_block(index),
index,
current_array.len(),
);
if try_shrink && current_array.initiate_sampling(index) {
self.try_shrink_or_rebuild(current_array, index, guard);
}
return Ok(result);
}
}
Err(f)
}
#[inline]
async fn for_each_reader_async<F>(&self, sendable_guard: &SendableGuard, mut f: F)
where
F: FnMut(Reader<K, V, L, TYPE>, &DataBlock<K, V, BUCKET_LEN>) -> bool,
{
let mut start_index = 0;
let mut prev_len = 0;
while let Some(current_array) = sendable_guard.load(self.bucket_array(), Acquire) {
start_index = if prev_len == 0 || prev_len == current_array.len() {
start_index
} else {
Self::from_index_to_range(prev_len, current_array.len(), start_index).0
};
prev_len = current_array.len();
while start_index < current_array.len() {
let index = start_index;
if current_array.has_old_array()
&& !self
.dedup_bucket_async(current_array, index, sendable_guard)
.await
{
break;
}
let bucket = current_array.bucket(index);
if let Some(reader) = Reader::lock_async(bucket, sendable_guard).await {
let data_block = current_array.data_block(index);
if !f(reader, data_block) {
return;
}
}
if !sendable_guard.check_ref(self.bucket_array(), current_array, Acquire) {
break;
}
start_index += 1;
}
if start_index == current_array.len() {
break;
}
}
}
#[inline]
fn for_each_reader_sync<F>(&self, guard: &Guard, mut f: F)
where
F: FnMut(Reader<K, V, L, TYPE>, &DataBlock<K, V, BUCKET_LEN>) -> bool,
{
let mut start_index = 0;
let mut prev_len = 0;
while let Some(current_array) = self.bucket_array().load(Acquire, guard).as_ref() {
start_index = if prev_len == 0 || prev_len == current_array.len() {
start_index
} else {
Self::from_index_to_range(prev_len, current_array.len(), start_index).0
};
prev_len = current_array.len();
while start_index < current_array.len() {
let index = start_index;
if let Some(old_array) = current_array.old_array(guard).as_ref() {
self.dedup_bucket_sync::<false>(current_array, old_array, index, guard);
}
let bucket = current_array.bucket(index);
if let Some(reader) = Reader::lock_sync(bucket) {
let data_block = current_array.data_block(index);
if !f(reader, data_block) {
return;
}
} else {
break;
}
start_index += 1;
}
if start_index == current_array.len() {
break;
}
}
}
#[inline]
async fn for_each_writer_async<F>(
&self,
mut start_index: usize,
expected_array_len: usize,
sendable_guard: &SendableGuard,
mut f: F,
) where
F: FnMut(Writer<K, V, L, TYPE>, &DataBlock<K, V, BUCKET_LEN>, usize, usize) -> (bool, bool),
{
let mut try_shrink = false;
let mut prev_len = expected_array_len;
while let Some(current_array) = sendable_guard.load(self.bucket_array(), Acquire) {
let current_array_len = current_array.len();
start_index = if prev_len == 0 || prev_len == current_array_len {
start_index
} else {
Self::from_index_to_range(prev_len, current_array_len, start_index).0
};
prev_len = current_array_len;
while start_index < current_array_len {
let index = start_index;
if current_array.has_old_array()
&& !self
.dedup_bucket_async(current_array, index, sendable_guard)
.await
{
break;
}
let bucket = current_array.bucket(index);
if let Some(writer) = Writer::lock_async(bucket, sendable_guard).await {
let data_block = current_array.data_block(index);
let (stop, removed) = f(writer, data_block, index, current_array_len);
try_shrink |= removed;
if stop {
start_index = current_array_len;
break;
}
}
if !sendable_guard.check_ref(self.bucket_array(), current_array, Acquire) {
break;
}
start_index += 1;
}
if start_index == current_array_len {
break;
}
}
if try_shrink {
if let Some(current_array) = sendable_guard.load(self.bucket_array(), Acquire) {
self.try_shrink_or_rebuild(current_array, 0, sendable_guard.guard());
}
}
}
#[inline]
fn for_each_writer_sync<F>(
&self,
mut start_index: usize,
expected_array_len: usize,
guard: &Guard,
mut f: F,
) where
F: FnMut(Writer<K, V, L, TYPE>, &DataBlock<K, V, BUCKET_LEN>, usize, usize) -> (bool, bool),
{
let mut try_shrink = false;
let mut prev_len = expected_array_len;
while let Some(current_array) = self.bucket_array().load(Acquire, guard).as_ref() {
let current_array_len = current_array.len();
start_index = if prev_len == 0 || prev_len == current_array_len {
start_index
} else {
Self::from_index_to_range(prev_len, current_array_len, start_index).0
};
prev_len = current_array_len;
while start_index < current_array_len {
let index = start_index;
if let Some(old_array) = current_array.old_array(guard).as_ref() {
self.dedup_bucket_sync::<false>(current_array, old_array, index, guard);
}
let bucket = current_array.bucket(index);
if let Some(writer) = Writer::lock_sync(bucket) {
let data_block = current_array.data_block(index);
let (stop, removed) = f(writer, data_block, index, current_array_len);
try_shrink |= removed;
if stop {
start_index = current_array_len;
break;
}
} else {
break;
}
start_index += 1;
}
if start_index == current_array_len {
if try_shrink {
self.try_shrink_or_rebuild(current_array, 0, guard);
}
break;
}
}
}
#[inline]
fn try_reserve_entry<'g, Q>(
&self,
key: &Q,
hash: u64,
guard: &'g Guard,
) -> Option<LockedEntry<'g, K, V, L, TYPE>>
where
Q: Equivalent<K> + Hash + ?Sized,
{
loop {
let current_array = self.get_or_create_bucket_array(guard);
let index = current_array.calculate_bucket_index(hash);
if let Some(old_array) = current_array.old_array(guard).as_ref() {
if !self.dedup_bucket_sync::<true>(current_array, old_array, index, guard) {
return None;
}
}
let mut bucket = current_array.bucket(index);
if (TYPE != CACHE || current_array.num_slots() < self.maximum_capacity())
&& current_array.initiate_sampling(index)
&& bucket.len() >= BUCKET_LEN - 1
{
self.try_enlarge(current_array, index, bucket.len(), guard);
bucket = current_array.bucket(index);
}
let Ok(writer) = Writer::try_lock(bucket) else {
return None;
};
if let Some(writer) = writer {
let data_block = current_array.data_block(index);
let entry_ptr = writer.get_entry_ptr(data_block, key, hash, guard);
return Some(LockedEntry::new(
writer,
data_block,
entry_ptr,
index,
current_array.len(),
guard,
));
}
}
}
async fn dedup_bucket_async<'g>(
&self,
current_array: &'g BucketArray<K, V, L, TYPE>,
index: usize,
sendable_guard: &'g SendableGuard,
) -> bool {
self.incremental_rehash_async(current_array, sendable_guard)
.await;
if !sendable_guard.check_ref(self.bucket_array(), current_array, Acquire) {
return false;
}
if let Some(old_array) = sendable_guard.load(current_array.old_array_ptr(), Acquire) {
let range = Self::from_index_to_range(current_array.len(), old_array.len(), index);
for old_index in range.0..range.1 {
let bucket = old_array.bucket(old_index);
let writer = Writer::lock_async(bucket, sendable_guard).await;
if let Some(writer) = writer {
if !self
.relocate_bucket_async(
current_array,
old_array,
old_index,
writer,
sendable_guard,
)
.await
{
return false;
}
} else if !sendable_guard.is_valid() {
return false;
}
if !current_array.has_old_array() {
break;
}
}
}
true
}
fn dedup_bucket_sync<'g, const TRY_LOCK: bool>(
&self,
current_array: &'g BucketArray<K, V, L, TYPE>,
old_array: &'g BucketArray<K, V, L, TYPE>,
index: usize,
guard: &'g Guard,
) -> bool {
if self.incremental_rehash_sync::<TRY_LOCK>(current_array, guard) {
return true;
}
let range = Self::from_index_to_range(current_array.len(), old_array.len(), index);
for old_index in range.0..range.1 {
let bucket = old_array.bucket(old_index);
let writer = if TRY_LOCK {
let Ok(writer) = Writer::try_lock(bucket) else {
return false;
};
writer
} else {
Writer::lock_sync(bucket)
};
if let Some(writer) = writer {
if !self.relocate_bucket_sync::<TRY_LOCK>(
current_array,
old_array,
old_index,
writer,
guard,
) {
return false;
}
}
}
true
}
async fn relocate_bucket_async<'g>(
&self,
current_array: &'g BucketArray<K, V, L, TYPE>,
old_array: &'g BucketArray<K, V, L, TYPE>,
old_index: usize,
old_writer: Writer<'g, K, V, L, TYPE>,
sendable_guard: &'g SendableGuard,
) -> bool {
if old_writer.len() == 0 {
let validated = sendable_guard.check_ref(self.bucket_array(), current_array, Acquire);
old_writer.kill();
return validated;
}
let target_index =
Self::from_index_to_range(old_array.len(), current_array.len(), old_index).0;
let mut target_buckets: [Option<Writer<K, V, L, TYPE>>; MAX_RESIZE_FACTOR] =
Default::default();
let mut max_index = 0;
let mut entry_ptr = EntryPtr::new(sendable_guard.guard());
let old_data_block = old_array.data_block(old_index);
while entry_ptr.move_to_next(&old_writer, sendable_guard.guard()) {
let old_entry = entry_ptr.get(old_data_block);
let (new_index, hash) = if old_array.len() >= current_array.len() {
debug_assert_eq!(
current_array.calculate_bucket_index(self.hash(&old_entry.0)),
target_index
);
(
target_index,
u64::from(entry_ptr.partial_hash(&*old_writer)),
)
} else {
let hash = self.hash(&old_entry.0);
let new_index = current_array.calculate_bucket_index(hash);
debug_assert!(new_index - target_index < (current_array.len() / old_array.len()));
(new_index, hash)
};
while max_index <= new_index - target_index {
let target_bucket = current_array.bucket(max_index + target_index);
let writer = unsafe {
Writer::lock_async(target_bucket, sendable_guard)
.await
.unwrap_unchecked()
};
target_buckets[max_index].replace(writer);
max_index += 1;
}
let target_bucket = unsafe {
target_buckets[new_index - target_index]
.as_mut()
.unwrap_unchecked()
};
target_bucket.extract_from(
current_array.data_block(new_index),
hash,
&old_writer,
old_data_block,
&mut entry_ptr,
sendable_guard.guard(),
);
}
let validated = sendable_guard.check_ref(self.bucket_array(), current_array, Acquire);
old_writer.kill();
validated
}
fn relocate_bucket_sync<'g, const TRY_LOCK: bool>(
&self,
current_array: &'g BucketArray<K, V, L, TYPE>,
old_array: &'g BucketArray<K, V, L, TYPE>,
old_index: usize,
old_writer: Writer<'g, K, V, L, TYPE>,
guard: &'g Guard,
) -> bool {
if old_writer.len() == 0 {
old_writer.kill();
return true;
}
let target_index =
Self::from_index_to_range(old_array.len(), current_array.len(), old_index).0;
let mut target_buckets: [Option<Writer<K, V, L, TYPE>>; MAX_RESIZE_FACTOR] =
Default::default();
let mut max_index = 0;
let mut entry_ptr = EntryPtr::new(guard);
let old_data_block = old_array.data_block(old_index);
while entry_ptr.move_to_next(&old_writer, guard) {
let old_entry = entry_ptr.get(old_data_block);
let (new_index, hash) = if old_array.len() >= current_array.len() {
debug_assert_eq!(
current_array.calculate_bucket_index(self.hash(&old_entry.0)),
target_index
);
(
target_index,
u64::from(entry_ptr.partial_hash(&*old_writer)),
)
} else {
let hash = self.hash(&old_entry.0);
let new_index = current_array.calculate_bucket_index(hash);
debug_assert!(new_index - target_index < (current_array.len() / old_array.len()));
(new_index, hash)
};
while max_index <= new_index - target_index {
let target_bucket = current_array.bucket(max_index + target_index);
let writer = if TRY_LOCK {
let Ok(writer) = Writer::try_lock(target_bucket) else {
return false;
};
writer
} else {
Writer::lock_sync(target_bucket)
};
target_buckets[max_index].replace(unsafe { writer.unwrap_unchecked() });
max_index += 1;
}
let target_bucket = unsafe {
target_buckets[new_index - target_index]
.as_mut()
.unwrap_unchecked()
};
target_bucket.extract_from(
current_array.data_block(new_index),
hash,
&old_writer,
old_data_block,
&mut entry_ptr,
guard,
);
}
old_writer.kill();
true
}
#[inline]
fn start_incremental_rehash(&self, old_array: &BucketArray<K, V, L, TYPE>) -> Option<usize> {
let rehashing_metadata = old_array.rehashing_metadata();
let mut current = rehashing_metadata.load(Relaxed);
loop {
if current >= old_array.len() || (current & (BUCKET_LEN - 1)) == BUCKET_LEN - 1 {
return None;
}
match rehashing_metadata.compare_exchange_weak(
current,
current + BUCKET_LEN + 1,
Acquire,
Relaxed,
) {
Ok(_) => {
current &= !(BUCKET_LEN - 1);
return Some(current);
}
Err(result) => current = result,
}
}
}
#[inline]
fn end_incremental_rehash(
&self,
current_array: &BucketArray<K, V, L, TYPE>,
old_array: &BucketArray<K, V, L, TYPE>,
prev: usize,
success: bool,
) {
let rehashing_metadata = old_array.rehashing_metadata();
if success {
let old_array_len = old_array.len();
let current = rehashing_metadata.fetch_sub(1, Release) - 1;
if (current & (BUCKET_LEN - 1) == 0) && current >= old_array_len {
current_array.drop_old_array();
}
} else {
let mut current = rehashing_metadata.load(Relaxed);
loop {
let new = if current <= prev {
current - 1
} else {
let ref_cnt = current & (BUCKET_LEN - 1);
prev | (ref_cnt - 1)
};
match rehashing_metadata.compare_exchange_weak(current, new, Release, Relaxed) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
}
}
async fn incremental_rehash_async<'g>(
&self,
current_array: &'g BucketArray<K, V, L, TYPE>,
sendable_guard: &'g SendableGuard,
) {
if let Some(old_array) = sendable_guard.load(current_array.old_array_ptr(), Acquire) {
let Some(current) = self.start_incremental_rehash(old_array) else {
return;
};
let mut rehashing_guard = ExitGuard::new((current, false), |(prev, success)| {
self.end_incremental_rehash(current_array, old_array, prev, success);
});
for index in current..(current + BUCKET_LEN).min(old_array.len()) {
let old_bucket = old_array.bucket(index);
let writer = Writer::lock_async(old_bucket, sendable_guard).await;
if let Some(writer) = writer {
self.relocate_bucket_async(
current_array,
old_array,
index,
writer,
sendable_guard,
)
.await;
}
debug_assert!(current_array.has_old_array());
}
rehashing_guard.1 = true;
}
}
fn incremental_rehash_sync<'g, const TRY_LOCK: bool>(
&self,
current_array: &'g BucketArray<K, V, L, TYPE>,
guard: &'g Guard,
) -> bool {
if let Some(old_array) = current_array.old_array(guard).as_ref() {
let Some(current) = self.start_incremental_rehash(old_array) else {
return !current_array.has_old_array();
};
let mut rehashing_guard = ExitGuard::new((current, false), |(prev, success)| {
self.end_incremental_rehash(current_array, old_array, prev, success);
});
for index in current..(current + BUCKET_LEN).min(old_array.len()) {
let old_bucket = old_array.bucket(index);
let writer = if TRY_LOCK {
let Ok(writer) = Writer::try_lock(old_bucket) else {
return false;
};
writer
} else {
Writer::lock_sync(old_bucket)
};
if let Some(writer) = writer {
if !self.relocate_bucket_sync::<TRY_LOCK>(
current_array,
old_array,
index,
writer,
guard,
) {
return false;
}
}
}
rehashing_guard.1 = true;
}
!current_array.has_old_array()
}
fn try_enlarge(
&self,
current_array: &BucketArray<K, V, L, TYPE>,
index: usize,
mut num_entries: usize,
guard: &Guard,
) {
if current_array.has_old_array() {
return;
}
let sample_size = current_array.sample_size();
let threshold = sample_size * (BUCKET_LEN / 8) * 7;
if num_entries > threshold
|| (1..sample_size).any(|i| {
num_entries += current_array
.bucket((index + i) % current_array.len())
.len();
num_entries > threshold
})
{
self.try_resize(current_array, index, guard);
}
}
fn try_shrink_or_rebuild(
&self,
current_array: &BucketArray<K, V, L, TYPE>,
index: usize,
guard: &Guard,
) {
if current_array.has_old_array() {
return;
}
if current_array.num_slots() > self.minimum_capacity().load(Relaxed).next_power_of_two()
|| TYPE == INDEX
{
let sample_size = current_array.sample_size();
let shrink_threshold = sample_size * BUCKET_LEN / 16;
let rebuild_threshold = sample_size / 2;
let mut num_entries = 0;
let mut num_buckets_to_rebuild = 0;
for i in 0..sample_size {
let bucket = current_array.bucket((index + i) % current_array.len());
num_entries += bucket.len();
if num_entries > shrink_threshold
&& (TYPE != INDEX
|| num_buckets_to_rebuild + (sample_size - i) < rebuild_threshold)
{
return;
}
if TYPE == INDEX && bucket.need_rebuild() {
if num_buckets_to_rebuild >= rebuild_threshold {
self.try_resize(current_array, index, guard);
return;
}
num_buckets_to_rebuild += 1;
}
}
if TYPE != INDEX || num_entries <= shrink_threshold {
self.try_resize(current_array, index, guard);
}
}
}
fn try_resize(
&self,
sampled_array: &BucketArray<K, V, L, TYPE>,
sampling_index: usize,
guard: &Guard,
) {
let current_array_ptr = self.bucket_array().load(Acquire, guard);
if current_array_ptr.tag() != Tag::None {
return;
}
let Some(current_array) = current_array_ptr.as_ref() else {
return;
};
if !ptr::eq(current_array, sampled_array) {
return;
}
debug_assert!(!current_array.has_old_array());
let minimum_capacity = self.minimum_capacity().load(Relaxed);
let capacity = current_array.num_slots();
let sample_size = current_array.full_sample_size();
let estimated_num_entries = Self::sample(current_array, sampling_index, sample_size);
let new_capacity = if estimated_num_entries > (capacity / 16) * 13 {
if capacity == self.maximum_capacity() {
capacity
} else {
let mut new_capacity = capacity;
while new_capacity <= (estimated_num_entries / 8) * 15 {
if new_capacity == self.maximum_capacity() {
break;
}
if new_capacity / capacity == MAX_RESIZE_FACTOR {
break;
}
new_capacity *= 2;
}
new_capacity
}
} else if estimated_num_entries < capacity / 8 {
estimated_num_entries
.max(minimum_capacity)
.max(BucketArray::<K, V, L, TYPE>::minimum_capacity())
.next_power_of_two()
} else {
capacity
};
let try_resize = new_capacity != capacity;
let try_drop_table = estimated_num_entries == 0 && minimum_capacity == 0;
let try_rebuild = TYPE == INDEX
&& !try_resize
&& Self::check_rebuild(current_array, sampling_index, sample_size);
if !try_resize && !try_drop_table && !try_rebuild {
return;
}
if !self.bucket_array().update_tag_if(
Tag::First,
|ptr| ptr == current_array_ptr,
Relaxed,
Relaxed,
) {
return;
}
if try_drop_table {
let mut writer_guard = ExitGuard::new((0, false), |(len, success): (usize, bool)| {
for i in 0..len {
let writer = Writer::from_bucket(current_array.bucket(i));
if success {
writer.kill();
}
}
});
if !(0..current_array.len()).any(|i| {
if let Ok(Some(writer)) = Writer::try_lock(current_array.bucket(i)) {
if writer.len() == 0 {
writer_guard.0 = i + 1;
std::mem::forget(writer);
return false;
}
}
true
}) {
writer_guard.1 = true;
self.bucket_array().swap((None, Tag::None), Release);
return;
}
}
let allocated_array: Option<Shared<BucketArray<K, V, L, TYPE>>> = None;
let mut mutex_guard = ExitGuard::new(allocated_array, |allocated_array| {
if let Some(allocated_array) = allocated_array {
self.bucket_array()
.swap((Some(allocated_array), Tag::None), Release);
} else {
self.bucket_array()
.update_tag_if(Tag::None, |_| true, Release, Relaxed);
}
});
if try_resize || try_rebuild {
mutex_guard.replace(unsafe {
Shared::new_unchecked(BucketArray::<K, V, L, TYPE>::new(
new_capacity,
self.bucket_array().clone(Relaxed, guard),
))
});
}
}
#[inline]
fn entry_removed(&self, index: usize, guard: &Guard) {
if let Some(current_array) = self.bucket_array().load(Acquire, guard).as_ref() {
if current_array.len() > index && current_array.bucket(index).len() == 0 {
self.try_shrink_or_rebuild(current_array, index, guard);
}
}
}
#[inline]
fn capacity_from_size_hint(size_hint: (usize, Option<usize>)) -> usize {
(size_hint
.1
.unwrap_or(size_hint.0)
.min(1_usize << (usize::BITS - 2))
/ 4)
* 5
}
#[inline]
fn prolonged_guard_ref<'h>(&'h self, guard: &Guard) -> &'h Guard {
let _: &Self = self;
unsafe { std::mem::transmute::<&Guard, &'h Guard>(guard) }
}
}
pub(super) struct LockedEntry<'h, K, V, L: LruList, const TYPE: char> {
pub(super) writer: Writer<'h, K, V, L, TYPE>,
pub(super) data_block: &'h DataBlock<K, V, BUCKET_LEN>,
pub(super) entry_ptr: EntryPtr<'h, K, V, TYPE>,
pub(super) index: usize,
pub(super) len: usize,
}
impl<'h, K: Eq + Hash + 'h, V: 'h, L: LruList, const TYPE: char> LockedEntry<'h, K, V, L, TYPE> {
#[inline]
pub(super) fn new(
writer: Writer<'h, K, V, L, TYPE>,
data_block: &'h DataBlock<K, V, BUCKET_LEN>,
entry_ptr: EntryPtr<'h, K, V, TYPE>,
index: usize,
len: usize,
guard: &Guard,
) -> LockedEntry<'h, K, V, L, TYPE> {
if TYPE == INDEX {
writer.drop_removed_unreachable_entries(data_block, guard);
}
LockedEntry {
writer,
data_block,
entry_ptr,
index,
len,
}
}
#[inline]
pub(super) fn prolong_lifetime<T>(self, _ref: &T) -> LockedEntry<'_, K, V, L, TYPE> {
unsafe { std::mem::transmute::<_, _>(self) }
}
#[inline]
pub(super) async fn next_async<H: BuildHasher, T: HashTable<K, V, H, L, TYPE>>(
mut self,
hash_table: &'h T,
) -> Option<LockedEntry<'h, K, V, L, TYPE>> {
if self
.entry_ptr
.move_to_next(&self.writer, hash_table.prolonged_guard_ref(&Guard::new()))
{
return Some(self);
}
let try_shrink = self.writer.len() == 0;
let sendable_guard = SendableGuard::default();
let next_index = self.index + 1;
let len = self.len;
drop(self);
if try_shrink {
hash_table.entry_removed(next_index - 1, sendable_guard.guard());
}
if next_index == len {
return None;
}
let mut next_entry = None;
hash_table
.for_each_writer_async(
next_index,
len,
&sendable_guard,
|writer, data_block, index, len| {
let guard = sendable_guard.guard();
let mut entry_ptr = EntryPtr::new(guard);
if entry_ptr.move_to_next(&writer, guard) {
let locked_entry =
LockedEntry::new(writer, data_block, entry_ptr, index, len, guard)
.prolong_lifetime(hash_table);
next_entry = Some(locked_entry);
return (true, false);
}
(false, false)
},
)
.await;
next_entry
}
#[inline]
pub(super) fn next_sync<H: BuildHasher, T: HashTable<K, V, H, L, TYPE>>(
mut self,
hash_table: &'h T,
) -> Option<Self> {
if self
.entry_ptr
.move_to_next(&self.writer, hash_table.prolonged_guard_ref(&Guard::new()))
{
return Some(self);
}
let try_shrink = self.writer.len() == 0;
let guard = Guard::new();
let next_index = self.index + 1;
let len = self.len;
drop(self);
if try_shrink {
hash_table.entry_removed(next_index - 1, &guard);
}
if next_index == len {
return None;
}
let mut next_entry = None;
hash_table.for_each_writer_sync(
next_index,
len,
&guard,
|writer, data_block, index, len| {
let mut entry_ptr = EntryPtr::new(&guard);
if entry_ptr.move_to_next(&writer, &guard) {
let locked_entry =
LockedEntry::new(writer, data_block, entry_ptr, index, len, &guard)
.prolong_lifetime(hash_table);
next_entry = Some(locked_entry);
return (true, false);
}
(false, false)
},
);
next_entry
}
}
unsafe impl<K: Eq + Hash + Send, V: Send, L: LruList, const TYPE: char> Send
for LockedEntry<'_, K, V, L, TYPE>
{
}
unsafe impl<K: Eq + Hash + Send + Sync, V: Send + Sync, L: LruList, const TYPE: char> Sync
for LockedEntry<'_, K, V, L, TYPE>
{
}