use crate::ebr::{Arc, AtomicArc, Barrier, Ptr, Tag};
use crate::wait_queue::{AsyncWait, WaitQueue};
use std::borrow::Borrow;
use std::mem::{needs_drop, MaybeUninit};
use std::ptr;
use std::sync::atomic::AtomicPtr;
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use std::sync::atomic::{fence, AtomicU32};
pub type DataBlock<K, V, const LEN: usize> = [MaybeUninit<(K, V)>; LEN];
pub const BUCKET_LEN: usize = 32;
const LINKED_LEN: usize = BUCKET_LEN / 4;
const KILLED: u32 = 1_u32 << 31;
const WAITING: u32 = 1_u32 << 30;
const LOCK: u32 = 1_u32 << 29;
const SLOCK_MAX: u32 = LOCK - 1;
const LOCK_MASK: u32 = LOCK | SLOCK_MAX;
pub(crate) struct Bucket<K: Eq, V, const LOCK_FREE: bool> {
state: AtomicU32,
num_entries: u32,
metadata: Metadata<K, V, BUCKET_LEN>,
wait_queue: WaitQueue,
}
impl<K: Eq, V, const LOCK_FREE: bool> Default for Bucket<K, V, LOCK_FREE> {
fn default() -> Self {
Bucket::<K, V, LOCK_FREE> {
state: AtomicU32::new(0),
num_entries: 0,
metadata: Metadata::default(),
wait_queue: WaitQueue::default(),
}
}
}
impl<K: Eq, V, const LOCK_FREE: bool> Bucket<K, V, LOCK_FREE> {
#[inline]
pub(crate) fn killed(&self) -> bool {
(self.state.load(Relaxed) & KILLED) == KILLED
}
#[inline]
pub(crate) fn num_entries(&self) -> usize {
self.num_entries as usize
}
#[inline]
pub(crate) fn need_rebuild(&self) -> bool {
self.metadata.removed_bitmap == (u32::MAX >> (32 - BUCKET_LEN))
}
#[inline]
pub(crate) fn search<'b, Q>(
&'b self,
data_block: &'b DataBlock<K, V, BUCKET_LEN>,
key: &Q,
partial_hash: u8,
barrier: &'b Barrier,
) -> Option<&'b (K, V)>
where
K: Borrow<Q>,
Q: Eq + ?Sized,
{
if self.num_entries == 0 {
return None;
}
if let Some((_, entry_ref)) =
Self::search_entry(&self.metadata, data_block, key, partial_hash)
{
return Some(entry_ref);
}
let mut link_ptr = self.metadata.link.load(Acquire, barrier);
while let Some(link) = link_ptr.as_ref() {
if let Some((_, entry_ref)) =
Self::search_entry(&link.metadata, &link.data_block, key, partial_hash)
{
return Some(entry_ref);
}
link_ptr = link.metadata.link.load(Acquire, barrier);
}
None
}
#[inline]
pub(crate) unsafe fn drop_entries(
&mut self,
data_block: &DataBlock<K, V, BUCKET_LEN>,
barrier: &Barrier,
) {
if !self.metadata.link.load(Relaxed, barrier).is_null() {
self.clear_links(barrier);
}
if needs_drop::<(K, V)>() && self.metadata.occupied_bitmap != 0 {
let mut index = self.metadata.occupied_bitmap.trailing_zeros();
while index != 32 {
ptr::drop_in_place(data_block[index as usize].as_ptr() as *mut (K, V));
self.metadata.occupied_bitmap -= 1_u32 << index;
index = self.metadata.occupied_bitmap.trailing_zeros();
}
}
}
#[inline]
fn get<'b, Q>(
&self,
data_block: &'b DataBlock<K, V, BUCKET_LEN>,
key: &Q,
partial_hash: u8,
barrier: &'b Barrier,
) -> EntryPtr<'b, K, V, LOCK_FREE>
where
K: Borrow<Q>,
Q: Eq + ?Sized,
{
if self.num_entries == 0 {
return EntryPtr::new(barrier);
}
if let Some((index, _)) = Self::search_entry(&self.metadata, data_block, key, partial_hash)
{
return EntryPtr {
current_link_ptr: Ptr::null(),
current_index: index,
};
}
let mut current_link_ptr = self.metadata.link.load(Acquire, barrier);
while let Some(link) = current_link_ptr.as_ref() {
if let Some((index, _)) =
Self::search_entry(&link.metadata, &link.data_block, key, partial_hash)
{
return EntryPtr {
current_link_ptr,
current_index: index,
};
}
current_link_ptr = link.metadata.link.load(Acquire, barrier);
}
EntryPtr::new(barrier)
}
#[allow(clippy::cast_possible_truncation)]
fn search_entry<'b, Q, const LEN: usize>(
metadata: &'b Metadata<K, V, LEN>,
data_block: &'b DataBlock<K, V, LEN>,
key: &Q,
partial_hash: u8,
) -> Option<(usize, &'b (K, V))>
where
K: Borrow<Q>,
Q: Eq + ?Sized,
{
let mut bitmap = if LOCK_FREE {
metadata.occupied_bitmap & (!metadata.removed_bitmap)
} else {
metadata.occupied_bitmap
};
if LOCK_FREE {
fence(Acquire);
}
let preferred_index = usize::from(partial_hash) % LEN;
if (bitmap & (1_u32 << preferred_index)) != 0
&& metadata.partial_hash_array[preferred_index] == partial_hash
{
let entry_ref = unsafe { &(*data_block[preferred_index].as_ptr()) };
if entry_ref.0.borrow() == key {
return Some((preferred_index, entry_ref));
}
}
bitmap = if LEN == BUCKET_LEN {
debug_assert_eq!(LEN, 32);
bitmap.rotate_right(preferred_index as u32) & (!1_u32)
} else {
debug_assert_eq!(LEN, 8);
u32::from((bitmap as u8).rotate_right(preferred_index as u32) & (!1_u8))
};
let mut offset = bitmap.trailing_zeros();
while offset != 32 {
let index = (preferred_index + offset as usize) % LEN;
if metadata.partial_hash_array[index] == partial_hash {
let entry_ref = unsafe { &(*data_block[index].as_ptr()) };
if entry_ref.0.borrow() == key {
return Some((index, entry_ref));
}
}
bitmap -= 1_u32 << offset;
offset = bitmap.trailing_zeros();
}
None
}
fn next_entry<Q, const LEN: usize>(
metadata: &Metadata<K, V, LEN>,
current_index: usize,
) -> Option<usize>
where
K: Borrow<Q>,
Q: Eq + ?Sized,
{
if current_index >= LEN {
return None;
}
let bitmap = if LOCK_FREE {
(metadata.occupied_bitmap & (!metadata.removed_bitmap))
& (!((1_u32 << current_index) - 1))
} else {
metadata.occupied_bitmap & (!((1_u32 << current_index) - 1))
};
if LOCK_FREE {
fence(Acquire);
}
let next_index = bitmap.trailing_zeros() as usize;
if next_index < LEN {
return Some(next_index);
}
None
}
fn clear_links(&mut self, barrier: &Barrier) {
if let (Some(mut next), _) = self.metadata.link.swap((None, Tag::None), Acquire) {
loop {
let next_next = next.metadata.link.swap((None, Tag::None), Acquire);
let released = if LOCK_FREE {
next.release(barrier)
} else {
unsafe { next.release_drop_in_place() }
};
debug_assert!(released);
if let (Some(next_next), _) = next_next {
next = next_next;
} else {
break;
}
}
}
}
}
pub struct EntryPtr<'b, K: Eq, V, const LOCK_FREE: bool> {
current_link_ptr: Ptr<'b, LinkedBucket<K, V, LINKED_LEN>>,
current_index: usize,
}
impl<'b, K: Eq, V, const LOCK_FREE: bool> EntryPtr<'b, K, V, LOCK_FREE> {
#[inline]
pub(crate) fn new(_barrier: &'b Barrier) -> EntryPtr<'b, K, V, LOCK_FREE> {
EntryPtr {
current_link_ptr: Ptr::null(),
current_index: BUCKET_LEN,
}
}
#[inline]
pub(crate) fn is_valid(&self) -> bool {
self.current_index != BUCKET_LEN
}
#[inline]
pub(crate) fn next(&mut self, bucket: &Bucket<K, V, LOCK_FREE>, barrier: &'b Barrier) -> bool {
if self.current_index != usize::MAX {
if self.current_link_ptr.is_null() && self.next_entry(&bucket.metadata, barrier) {
return true;
}
while let Some(link) = self.current_link_ptr.as_ref() {
if self.next_entry(&link.metadata, barrier) {
return true;
}
}
self.current_index = usize::MAX;
}
false
}
#[inline]
pub(crate) fn get(&self, data_block: &'b DataBlock<K, V, BUCKET_LEN>) -> &'b (K, V) {
debug_assert_ne!(self.current_index, usize::MAX);
let entry_ptr = if let Some(link) = self.current_link_ptr.as_ref() {
link.data_block[self.current_index].as_ptr()
} else {
data_block[self.current_index].as_ptr()
};
unsafe { &(*entry_ptr) }
}
#[allow(clippy::mut_from_ref)]
#[inline]
pub(crate) fn get_mut(
&mut self,
data_block: &'b DataBlock<K, V, BUCKET_LEN>,
_locker: &mut Locker<K, V, LOCK_FREE>,
) -> &'b mut (K, V) {
debug_assert_ne!(self.current_index, usize::MAX);
let entry_ptr = if let Some(link) = self.current_link_ptr.as_ref() {
link.data_block[self.current_index].as_ptr() as *mut (K, V)
} else {
data_block[self.current_index].as_ptr() as *mut (K, V)
};
unsafe { &mut (*entry_ptr) }
}
#[inline]
pub(crate) fn partial_hash(&self, bucket: &Bucket<K, V, LOCK_FREE>) -> u8 {
debug_assert_ne!(self.current_index, usize::MAX);
if let Some(link) = self.current_link_ptr.as_ref() {
link.metadata.partial_hash_array[self.current_index]
} else {
bucket.metadata.partial_hash_array[self.current_index]
}
}
fn unlink(
&mut self,
locker: &Locker<K, V, LOCK_FREE>,
link: &LinkedBucket<K, V, LINKED_LEN>,
barrier: &'b Barrier,
) {
let prev_link_ptr = link.prev_link.load(Relaxed);
let next_link = if LOCK_FREE {
link.metadata.link.get_arc(Relaxed, barrier)
} else {
link.metadata.link.swap((None, Tag::None), Relaxed).0
};
if let Some(next_link) = next_link.as_ref() {
next_link.prev_link.store(prev_link_ptr, Relaxed);
}
self.current_link_ptr = next_link
.as_ref()
.map_or_else(Ptr::null, |n| n.ptr(barrier));
let old_link = if let Some(prev_link) = unsafe { prev_link_ptr.as_ref() } {
prev_link
.metadata
.link
.swap((next_link, Tag::None), Relaxed)
.0
} else {
locker
.bucket
.metadata
.link
.swap((next_link, Tag::None), Relaxed)
.0
};
let released = old_link.map_or(true, |l| {
if LOCK_FREE {
l.release(barrier)
} else {
unsafe { l.release_drop_in_place() }
}
});
debug_assert!(released);
if self.current_link_ptr.is_null() {
self.current_index = usize::MAX;
} else {
self.current_index = LINKED_LEN;
}
}
fn next_entry<const LEN: usize>(
&mut self,
metadata: &Metadata<K, V, LEN>,
barrier: &'b Barrier,
) -> bool {
let current_index = if self.current_index == LEN {
0
} else {
self.current_index + 1
};
if let Some(index) = Bucket::<K, V, LOCK_FREE>::next_entry(metadata, current_index) {
self.current_index = index;
return true;
}
self.current_link_ptr = metadata.link.load(Acquire, barrier);
self.current_index = LINKED_LEN;
false
}
}
pub struct Locker<'b, K: Eq, V, const LOCK_FREE: bool> {
bucket: &'b mut Bucket<K, V, LOCK_FREE>,
}
impl<'b, K: Eq, V, const LOCK_FREE: bool> Locker<'b, K, V, LOCK_FREE> {
#[inline]
pub(crate) fn lock(
bucket: &'b mut Bucket<K, V, LOCK_FREE>,
barrier: &'b Barrier,
) -> Option<Locker<'b, K, V, LOCK_FREE>> {
let bucket_ptr = bucket as *mut Bucket<K, V, LOCK_FREE>;
loop {
if let Ok(locker) = Self::try_lock(unsafe { &mut *bucket_ptr }, barrier) {
return locker;
}
if let Ok(locker) = unsafe { &*bucket_ptr }.wait_queue.wait_sync(|| {
bucket.state.fetch_or(WAITING, Release);
Self::try_lock(unsafe { &mut *bucket_ptr }, barrier)
}) {
return locker;
}
}
}
#[inline]
pub(crate) fn try_lock(
bucket: &'b mut Bucket<K, V, LOCK_FREE>,
_barrier: &'b Barrier,
) -> Result<Option<Locker<'b, K, V, LOCK_FREE>>, ()> {
let current = bucket.state.load(Relaxed) & (!LOCK_MASK);
if (current & KILLED) == KILLED {
return Ok(None);
}
if bucket
.state
.compare_exchange(current, current | LOCK, Acquire, Relaxed)
.is_ok()
{
Ok(Some(Locker { bucket }))
} else {
Err(())
}
}
#[inline]
pub(crate) fn try_lock_or_wait(
bucket: &'b mut Bucket<K, V, LOCK_FREE>,
async_wait: &mut AsyncWait,
barrier: &'b Barrier,
) -> Result<Option<Locker<'b, K, V, LOCK_FREE>>, ()> {
let bucket_ptr = bucket as *mut Bucket<K, V, LOCK_FREE>;
if let Ok(locker) = Self::try_lock(unsafe { &mut *bucket_ptr }, barrier) {
return Ok(locker);
}
unsafe { &*bucket_ptr }
.wait_queue
.push_async_entry(async_wait, || {
bucket.state.fetch_or(WAITING, Release);
Self::try_lock(bucket, barrier)
})
}
#[inline]
pub(crate) fn bucket(&self) -> &Bucket<K, V, LOCK_FREE> {
self.bucket
}
#[inline]
pub(crate) fn get<Q>(
&self,
data_block: &'b DataBlock<K, V, BUCKET_LEN>,
key: &Q,
partial_hash: u8,
barrier: &'b Barrier,
) -> EntryPtr<'b, K, V, LOCK_FREE>
where
K: Borrow<Q>,
Q: Eq + ?Sized,
{
self.bucket.get(data_block, key, partial_hash, barrier)
}
#[inline]
pub(crate) fn insert_with<C: FnOnce() -> (K, V)>(
&mut self,
data_block: &'b DataBlock<K, V, BUCKET_LEN>,
partial_hash: u8,
constructor: C,
barrier: &'b Barrier,
) -> EntryPtr<'b, K, V, LOCK_FREE> {
assert!(self.bucket.num_entries != u32::MAX, "array overflow");
let free_index = Self::get_free_index::<BUCKET_LEN>(
self.bucket.metadata.occupied_bitmap,
partial_hash as usize % BUCKET_LEN,
);
if free_index == BUCKET_LEN {
let preferred_index = partial_hash as usize % LINKED_LEN;
let mut link_ptr = self.bucket.metadata.link.load(Acquire, barrier);
while let Some(link_mut) =
unsafe { (link_ptr.as_raw() as *mut LinkedBucket<K, V, LINKED_LEN>).as_mut() }
{
let free_index = Self::get_free_index::<LINKED_LEN>(
link_mut.metadata.occupied_bitmap,
preferred_index,
);
if free_index != LINKED_LEN {
Self::insert_entry_with(
&mut link_mut.metadata,
&link_mut.data_block,
free_index,
partial_hash,
constructor,
);
self.bucket.num_entries += 1;
return EntryPtr {
current_link_ptr: link_ptr,
current_index: free_index,
};
}
link_ptr = link_mut.metadata.link.load(Acquire, barrier);
}
let head = self.bucket.metadata.link.get_arc(Relaxed, barrier);
let link = unsafe { Arc::new_unchecked(LinkedBucket::new(head)) };
let link_ptr = link.ptr(barrier);
unsafe {
let link_mut = &mut *(link_ptr.as_raw() as *mut LinkedBucket<K, V, LINKED_LEN>);
link_mut.data_block[preferred_index]
.as_mut_ptr()
.write(constructor());
link_mut.metadata.partial_hash_array[preferred_index] = partial_hash;
link_mut.metadata.occupied_bitmap |= 1_u32 << preferred_index;
}
if let Some(head) = link.metadata.link.load(Relaxed, barrier).as_ref() {
head.prev_link.store(
link.as_ptr() as *mut LinkedBucket<K, V, LINKED_LEN>,
Relaxed,
);
}
self.bucket
.metadata
.link
.swap((Some(link), Tag::None), Release);
self.bucket.num_entries += 1;
EntryPtr {
current_link_ptr: link_ptr,
current_index: preferred_index,
}
} else {
Self::insert_entry_with(
&mut self.bucket.metadata,
data_block,
free_index,
partial_hash,
constructor,
);
self.bucket.num_entries += 1;
EntryPtr {
current_link_ptr: Ptr::null(),
current_index: free_index,
}
}
}
#[inline]
pub(crate) fn erase(
&mut self,
data_block: &DataBlock<K, V, BUCKET_LEN>,
entry_ptr: &mut EntryPtr<K, V, LOCK_FREE>,
) -> Option<(K, V)> {
debug_assert_ne!(entry_ptr.current_index, usize::MAX);
self.bucket.num_entries -= 1;
let link_ptr = entry_ptr.current_link_ptr.as_raw() as *mut LinkedBucket<K, V, LINKED_LEN>;
if let Some(link_mut) = unsafe { link_ptr.as_mut() } {
if LOCK_FREE {
debug_assert_eq!(
link_mut.metadata.removed_bitmap & (1_u32 << entry_ptr.current_index),
0
);
link_mut.metadata.removed_bitmap |= 1_u32 << entry_ptr.current_index;
} else {
debug_assert_ne!(
link_mut.metadata.occupied_bitmap & (1_u32 << entry_ptr.current_index),
0
);
link_mut.metadata.occupied_bitmap &= !(1_u32 << entry_ptr.current_index);
return Some(unsafe {
link_mut.data_block[entry_ptr.current_index].as_ptr().read()
});
}
} else if LOCK_FREE {
debug_assert_eq!(
self.bucket.metadata.removed_bitmap & (1_u32 << entry_ptr.current_index),
0
);
self.bucket.metadata.removed_bitmap |= 1_u32 << entry_ptr.current_index;
} else {
debug_assert_ne!(
self.bucket.metadata.occupied_bitmap & (1_u32 << entry_ptr.current_index),
0
);
self.bucket.metadata.occupied_bitmap &= !(1_u32 << entry_ptr.current_index);
return Some(unsafe { data_block[entry_ptr.current_index].as_ptr().read() });
}
None
}
#[inline]
pub(crate) fn extract<'e>(
&mut self,
data_block: &DataBlock<K, V, BUCKET_LEN>,
entry_ptr: &mut EntryPtr<'e, K, V, LOCK_FREE>,
barrier: &'e Barrier,
) -> (K, V) {
debug_assert!(!LOCK_FREE);
let link_ptr = entry_ptr.current_link_ptr.as_raw() as *mut LinkedBucket<K, V, LINKED_LEN>;
if let Some(link_mut) = unsafe { link_ptr.as_mut() } {
let extracted = Self::extract_entry(
&mut link_mut.metadata,
&link_mut.data_block,
entry_ptr.current_index,
&mut self.bucket.num_entries,
);
if link_mut.metadata.occupied_bitmap == 0 {
entry_ptr.unlink(self, link_mut, barrier);
}
extracted
} else {
Self::extract_entry(
&mut self.bucket.metadata,
data_block,
entry_ptr.current_index,
&mut self.bucket.num_entries,
)
}
}
#[inline]
pub(crate) fn purge(&mut self, barrier: &Barrier) {
if LOCK_FREE {
self.bucket.metadata.removed_bitmap = self.bucket.metadata.occupied_bitmap;
}
self.bucket.state.fetch_or(KILLED, Release);
self.bucket.num_entries = 0;
if !self.bucket.metadata.link.load(Relaxed, barrier).is_null() {
self.bucket.clear_links(barrier);
}
}
fn get_free_index<const LEN: usize>(bitmap: u32, preferred_index: usize) -> usize {
let mut free_index = (bitmap | ((1_u32 << preferred_index) - 1)).trailing_ones() as usize;
if free_index == LEN {
free_index = bitmap.trailing_ones() as usize;
}
free_index
}
fn insert_entry_with<C: FnOnce() -> (K, V), const LEN: usize>(
metadata: &mut Metadata<K, V, LEN>,
data_block: &DataBlock<K, V, LEN>,
index: usize,
partial_hash: u8,
constructor: C,
) {
debug_assert!(index < LEN);
unsafe {
(data_block[index].as_ptr() as *mut (K, V)).write(constructor());
metadata.partial_hash_array[index] = partial_hash;
if LOCK_FREE {
fence(Release);
}
metadata.occupied_bitmap |= 1_u32 << index;
}
}
fn extract_entry<const LEN: usize>(
metadata: &mut Metadata<K, V, LEN>,
data_block: &DataBlock<K, V, LEN>,
index: usize,
num_entries_field: &mut u32,
) -> (K, V) {
debug_assert!(index < LEN);
*num_entries_field -= 1;
metadata.occupied_bitmap &= !(1_u32 << index);
let entry_ptr = data_block[index].as_ptr() as *mut (K, V);
unsafe { ptr::read(entry_ptr) }
}
}
impl<'b, K: Eq, V, const LOCK_FREE: bool> Drop for Locker<'b, K, V, LOCK_FREE> {
#[inline]
fn drop(&mut self) {
let mut current = self.bucket.state.load(Relaxed);
loop {
let wakeup = (current & WAITING) == WAITING;
match self.bucket.state.compare_exchange(
current,
current & (!(WAITING | LOCK)),
Release,
Relaxed,
) {
Ok(_) => {
if wakeup {
self.bucket.wait_queue.signal();
}
break;
}
Err(result) => current = result,
}
}
}
}
pub struct Reader<'b, K: Eq, V, const LOCK_FREE: bool> {
bucket: &'b Bucket<K, V, LOCK_FREE>,
}
impl<'b, K: Eq, V, const LOCK_FREE: bool> Reader<'b, K, V, LOCK_FREE> {
#[inline]
pub(crate) fn lock(
bucket: &'b Bucket<K, V, LOCK_FREE>,
barrier: &'b Barrier,
) -> Option<Reader<'b, K, V, LOCK_FREE>> {
loop {
if let Ok(reader) = Self::try_lock(bucket, barrier) {
return reader;
}
if let Ok(reader) = bucket.wait_queue.wait_sync(|| {
bucket.state.fetch_or(WAITING, Release);
Self::try_lock(bucket, barrier)
}) {
return reader;
}
}
}
#[inline]
pub(crate) fn try_lock_or_wait(
bucket: &'b Bucket<K, V, LOCK_FREE>,
async_wait: &mut AsyncWait,
barrier: &'b Barrier,
) -> Result<Option<Reader<'b, K, V, LOCK_FREE>>, ()> {
if let Ok(reader) = Self::try_lock(bucket, barrier) {
return Ok(reader);
}
bucket.wait_queue.push_async_entry(async_wait, || {
bucket.state.fetch_or(WAITING, Release);
Self::try_lock(bucket, barrier)
})
}
#[inline]
pub(crate) fn bucket(&self) -> &'b Bucket<K, V, LOCK_FREE> {
self.bucket
}
fn try_lock(
bucket: &'b Bucket<K, V, LOCK_FREE>,
_barrier: &'b Barrier,
) -> Result<Option<Reader<'b, K, V, LOCK_FREE>>, ()> {
let current = bucket.state.load(Relaxed);
if (current & LOCK_MASK) >= SLOCK_MAX {
return Err(());
}
if (current & KILLED) >= KILLED {
return Ok(None);
}
if bucket
.state
.compare_exchange(current, current + 1, Acquire, Relaxed)
.is_ok()
{
Ok(Some(Reader { bucket }))
} else {
Err(())
}
}
}
impl<'b, K: Eq, V, const LOCK_FREE: bool> Drop for Reader<'b, K, V, LOCK_FREE> {
#[inline]
fn drop(&mut self) {
let mut current = self.bucket.state.load(Relaxed);
loop {
let wakeup = (current & WAITING) == WAITING;
let next = (current - 1) & !(WAITING);
match self
.bucket
.state
.compare_exchange(current, next, Relaxed, Relaxed)
{
Ok(_) => {
if wakeup {
self.bucket.wait_queue.signal();
}
break;
}
Err(result) => current = result,
}
}
}
}
pub(crate) struct Metadata<K: Eq, V, const LEN: usize> {
link: AtomicArc<LinkedBucket<K, V, LINKED_LEN>>,
occupied_bitmap: u32,
removed_bitmap: u32,
partial_hash_array: [u8; LEN],
}
impl<K: Eq, V, const LEN: usize> Default for Metadata<K, V, LEN> {
fn default() -> Self {
Self {
link: AtomicArc::default(),
occupied_bitmap: 0,
removed_bitmap: 0,
partial_hash_array: [0; LEN],
}
}
}
pub(crate) struct LinkedBucket<K: Eq, V, const LEN: usize> {
metadata: Metadata<K, V, LEN>,
data_block: DataBlock<K, V, LEN>,
prev_link: AtomicPtr<LinkedBucket<K, V, LEN>>,
}
impl<K: Eq, V, const LEN: usize> LinkedBucket<K, V, LEN> {
fn new(next: Option<Arc<LinkedBucket<K, V, LINKED_LEN>>>) -> LinkedBucket<K, V, LEN> {
LinkedBucket {
metadata: Metadata {
link: next.map_or_else(AtomicArc::null, AtomicArc::from),
occupied_bitmap: 0,
removed_bitmap: 0,
partial_hash_array: [0; LEN],
},
data_block: unsafe { MaybeUninit::uninit().assume_init() },
prev_link: AtomicPtr::default(),
}
}
}
impl<K: Eq, V, const LEN: usize> Drop for LinkedBucket<K, V, LEN> {
fn drop(&mut self) {
if needs_drop::<(K, V)>() {
let mut index = self.metadata.occupied_bitmap.trailing_zeros();
while index != 32 {
unsafe {
ptr::drop_in_place(self.data_block[index as usize].as_mut_ptr());
}
self.metadata.occupied_bitmap -= 1_u32 << index;
index = self.metadata.occupied_bitmap.trailing_zeros();
}
}
}
}
#[cfg(test)]
mod test {
use super::*;
use std::convert::TryInto;
use std::sync::atomic::AtomicPtr;
use tokio::sync;
#[tokio::test(flavor = "multi_thread", worker_threads = 16)]
async fn queue() {
let num_tasks = BUCKET_LEN + 2;
let barrier = Arc::new(sync::Barrier::new(num_tasks));
let data_block: Arc<DataBlock<usize, usize, BUCKET_LEN>> =
Arc::new(unsafe { MaybeUninit::uninit().assume_init() });
let mut bucket: Arc<Bucket<usize, usize, true>> = Arc::new(Bucket::default());
let mut data: [u64; 128] = [0; 128];
let mut task_handles = Vec::with_capacity(num_tasks);
for task_id in 0..num_tasks {
let barrier_copied = barrier.clone();
let data_block_copied = data_block.clone();
let bucket_copied = bucket.clone();
let data_ptr = AtomicPtr::new(&mut data);
task_handles.push(tokio::spawn(async move {
barrier_copied.wait().await;
let bucket_mut =
unsafe { &mut *(bucket_copied.as_ptr() as *mut Bucket<usize, usize, true>) };
let barrier = Barrier::new();
for i in 0..2048 {
let mut exclusive_locker = Locker::lock(bucket_mut, &barrier).unwrap();
let mut sum: u64 = 0;
for j in 0..128 {
unsafe {
sum += (*data_ptr.load(Relaxed))[j];
(*data_ptr.load(Relaxed))[j] = if i % 4 == 0 { 2 } else { 4 }
};
}
assert_eq!(sum % 256, 0);
if i == 0 {
exclusive_locker.insert_with(
&data_block_copied,
(task_id % BUCKET_LEN).try_into().unwrap(),
|| (task_id, 0),
&barrier,
);
} else {
assert_eq!(
exclusive_locker
.bucket()
.search(
&data_block_copied,
&task_id,
(task_id % BUCKET_LEN).try_into().unwrap(),
&barrier
)
.unwrap(),
&(task_id, 0_usize)
);
}
drop(exclusive_locker);
let read_locker = Reader::lock(&*bucket_copied, &barrier).unwrap();
assert_eq!(
read_locker
.bucket()
.search(
&data_block_copied,
&task_id,
(task_id % BUCKET_LEN).try_into().unwrap(),
&barrier
)
.unwrap(),
&(task_id, 0_usize)
);
}
}));
}
for r in futures::future::join_all(task_handles).await {
assert!(r.is_ok());
}
let sum: u64 = data.iter().sum();
assert_eq!(sum % 256, 0);
assert_eq!(bucket.num_entries(), num_tasks);
let epoch_barrier = Barrier::new();
for task_id in 0..num_tasks {
assert_eq!(
bucket.search(
&data_block,
&task_id,
(task_id % BUCKET_LEN).try_into().unwrap(),
&epoch_barrier
),
Some(&(task_id, 0))
);
}
let mut count = 0;
let mut entry_ptr = EntryPtr::new(&epoch_barrier);
while entry_ptr.next(&bucket, &epoch_barrier) {
count += 1;
}
assert_eq!(bucket.num_entries(), count);
let mut xlocker =
Locker::lock(unsafe { bucket.get_mut().unwrap() }, &epoch_barrier).unwrap();
xlocker.purge(&epoch_barrier);
drop(xlocker);
assert!(bucket.killed());
assert_eq!(bucket.num_entries(), 0);
assert!(Locker::lock(unsafe { bucket.get_mut().unwrap() }, &epoch_barrier).is_none());
}
}