use std::cell::UnsafeCell;
use std::mem::{forget, needs_drop};
use std::ops::Deref;
use std::ptr::{self, NonNull};
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use std::sync::atomic::{AtomicPtr, AtomicU32, AtomicUsize};
use saa::Lock;
use sdd::{AtomicRaw, Epoch, Owned, RawPtr};
use crate::data_block::DataBlock;
use crate::utils::{AsyncGuard, deref_unchecked, fake_ref, get_owned, into_non_null, likely};
use crate::{Equivalent, Guard};
#[repr(align(64))]
pub struct Bucket<K, V, L: LruList, const TYPE: char> {
len: AtomicUsize,
rw_lock: Lock,
metadata: Metadata<K, V, BUCKET_LEN>,
lru_list: L,
}
pub const MAP: char = 'S';
pub const INDEX: char = 'O';
pub const CACHE: char = 'C';
pub const BUCKET_LEN: usize = u32::BITS as usize;
pub struct Writer<K, V, L: LruList, const TYPE: char> {
bucket_ptr: NonNull<Bucket<K, V, L, TYPE>>,
}
pub struct Reader<K, V, L: LruList, const TYPE: char> {
bucket_ptr: NonNull<Bucket<K, V, L, TYPE>>,
}
pub struct EntryPtr<K, V, const TYPE: char> {
link_ptr: *const LinkedBucket<K, V>,
pos: u8,
}
pub trait LruList: Default {
#[inline]
fn evict(&self, _tail: u32) -> Option<(u8, u32)> {
None
}
#[inline]
fn remove(&self, _tail: u32, _entry: u8) -> Option<u32> {
None
}
#[inline]
fn promote(&self, _tail: u32, _entry: u8) -> Option<u32> {
None
}
}
#[derive(Default)]
pub struct DoublyLinkedList([UnsafeCell<(u8, u8)>; BUCKET_LEN]);
struct Metadata<K, V, const LEN: usize> {
occupied_bitmap: AtomicU32,
removed_bitmap: AtomicU32,
partial_hash_array: UnsafeCell<[u8; LEN]>,
link: AtomicRaw<LinkedBucket<K, V>>,
}
const LINKED_BUCKET_LEN: usize = BUCKET_LEN / 4;
const INVALID: u8 = 32;
struct LinkedBucket<K, V> {
metadata: Metadata<K, V, LINKED_BUCKET_LEN>,
prev_link: AtomicPtr<LinkedBucket<K, V>>,
data_block: DataBlock<K, V, LINKED_BUCKET_LEN>,
}
impl<K, V, L: LruList, const TYPE: char> Bucket<K, V, L, TYPE> {
#[cfg(any(test, feature = "loom"))]
pub fn new() -> Self {
Self {
len: AtomicUsize::new(0),
rw_lock: Lock::default(),
metadata: Metadata {
occupied_bitmap: AtomicU32::default(),
removed_bitmap: AtomicU32::default(),
partial_hash_array: UnsafeCell::new(Default::default()),
link: AtomicRaw::null(),
},
lru_list: L::default(),
}
}
#[inline]
pub(crate) fn len(&self) -> usize {
self.len.load(Relaxed)
}
#[inline]
pub(crate) fn insert(
&self,
data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>,
hash: u64,
entry: (K, V),
) -> EntryPtr<K, V, TYPE> {
let partial_hash = partial_hash(hash);
let occupied_bitmap = self.metadata.occupied_bitmap.load(Relaxed);
let occupied_bitmap = if TYPE == INDEX
&& partial_hash % 8 == 0
&& occupied_bitmap == u32::MAX
&& (self.metadata.removed_bitmap.load(Relaxed) != 0
|| !self.metadata.link.is_null(Relaxed))
{
self.clear_unreachable_entries(data_block_ref(data_block))
} else {
occupied_bitmap
};
let free_pos = free_slot(occupied_bitmap);
if free_pos as usize == BUCKET_LEN {
self.insert_overflow(partial_hash, entry)
} else {
self.insert_entry(
&self.metadata,
data_block_ref(data_block),
free_pos,
partial_hash,
occupied_bitmap,
entry,
);
EntryPtr {
link_ptr: ptr::null(),
pos: free_pos,
}
}
}
#[inline]
pub(crate) fn remove(
&self,
data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>,
entry_ptr: &mut EntryPtr<K, V, TYPE>,
) -> (K, V) {
debug_assert_ne!(TYPE, INDEX);
debug_assert_ne!(entry_ptr.pos, INVALID);
debug_assert_ne!(entry_ptr.pos as usize, BUCKET_LEN);
self.len.store(self.len.load(Relaxed) - 1, Relaxed);
if let Some(link) = link_ref(entry_ptr.link_ptr) {
let mut occupied_bitmap = link.metadata.occupied_bitmap.load(Relaxed);
debug_assert_ne!(occupied_bitmap & (1_u32 << entry_ptr.pos), 0);
occupied_bitmap &= !(1_u32 << entry_ptr.pos);
link.metadata
.occupied_bitmap
.store(occupied_bitmap, Relaxed);
let removed = link.data_block.read(entry_ptr.pos as usize);
if occupied_bitmap == 0 && (TYPE != INDEX || !needs_drop::<(K, V)>()) {
entry_ptr.unlink(&self.metadata.link);
}
removed
} else {
let occupied_bitmap = self.metadata.occupied_bitmap.load(Relaxed);
debug_assert_ne!(occupied_bitmap & (1_u32 << entry_ptr.pos), 0);
if TYPE == CACHE {
self.remove_from_lru_list(entry_ptr);
}
self.metadata
.occupied_bitmap
.store(occupied_bitmap & !(1_u32 << entry_ptr.pos), Relaxed);
data_block_ref(data_block).read(entry_ptr.pos as usize)
}
}
#[inline]
pub(crate) fn mark_removed(&self, entry_ptr: &mut EntryPtr<K, V, TYPE>, guard: &Guard) {
debug_assert_eq!(TYPE, INDEX);
debug_assert_ne!(entry_ptr.pos, INVALID);
debug_assert_ne!(entry_ptr.pos as usize, BUCKET_LEN);
self.len.store(self.len.load(Relaxed) - 1, Relaxed);
if let Some(link) = link_ref(entry_ptr.link_ptr) {
link.metadata
.update_partial_hash(entry_ptr.pos, u8::from(guard.epoch()));
let mut removed_bitmap = link.metadata.removed_bitmap.load(Relaxed);
debug_assert_eq!(removed_bitmap & (1_u32 << entry_ptr.pos), 0);
removed_bitmap |= 1_u32 << entry_ptr.pos;
link.metadata.removed_bitmap.store(removed_bitmap, Release);
} else {
self.metadata
.update_partial_hash(entry_ptr.pos, u8::from(guard.epoch()));
let mut removed_bitmap = self.metadata.removed_bitmap.load(Relaxed);
debug_assert_eq!(removed_bitmap & (1_u32 << entry_ptr.pos), 0);
removed_bitmap |= 1_u32 << entry_ptr.pos;
self.metadata.removed_bitmap.store(removed_bitmap, Release);
}
}
#[inline]
pub(crate) fn evict_lru_head(
&self,
data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>,
) -> Option<(K, V)> {
debug_assert_eq!(TYPE, CACHE);
let occupied_bitmap = self.metadata.occupied_bitmap.load(Relaxed);
if occupied_bitmap == u32::MAX {
self.len.store(self.len.load(Relaxed) - 1, Relaxed);
let tail = self.metadata.removed_bitmap.load(Relaxed);
let evicted = if let Some((evicted, new_tail)) = self.lru_list.evict(tail) {
self.metadata.removed_bitmap.store(new_tail, Relaxed);
u32::from(evicted)
} else {
0
};
debug_assert_ne!(occupied_bitmap & (1_u32 << evicted), 0);
self.metadata
.occupied_bitmap
.store(occupied_bitmap & !(1_u32 << evicted), Relaxed);
return Some(data_block_ref(data_block).read(evicted as usize));
}
None
}
#[inline]
pub(crate) fn update_lru_tail(&self, entry_ptr: &EntryPtr<K, V, TYPE>) {
debug_assert_eq!(TYPE, CACHE);
debug_assert_ne!(entry_ptr.pos, INVALID);
debug_assert_ne!(entry_ptr.pos as usize, BUCKET_LEN);
if entry_ptr.link_ptr.is_null() {
let entry = entry_ptr.pos;
let tail = self.metadata.removed_bitmap.load(Relaxed);
if let Some(new_tail) = self.lru_list.promote(tail, entry) {
self.metadata.removed_bitmap.store(new_tail, Relaxed);
}
}
}
#[inline]
pub(crate) fn reserve_slots(&self, additional: usize) {
debug_assert!(self.rw_lock.is_locked(Relaxed));
let required = additional + self.len();
let mut capacity = BUCKET_LEN;
if capacity >= required {
return;
}
let mut link_ptr = self.metadata.load_link();
while let Some(link) = link_ref(link_ptr) {
capacity += LINKED_BUCKET_LEN;
if capacity >= required {
return;
}
let new_link_ptr = link.metadata.load_link();
if new_link_ptr.is_null() {
break;
}
link_ptr = new_link_ptr;
}
for _ in 0..(required - capacity).div_ceil(LINKED_BUCKET_LEN) {
let new_link = LinkedBucket::new();
let new_link_ptr = new_link.as_ptr();
if let Some(link) = link_ref(link_ptr) {
new_link.prev_link.store(link_ptr.cast_mut(), Relaxed);
link.metadata.link.store(new_link.into_raw(), Release);
} else {
self.metadata.link.store(new_link.into_raw(), Release);
}
link_ptr = new_link_ptr;
}
}
#[inline]
pub(crate) fn extract_from(
&self,
data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>,
hash: u64,
from_writer: &Writer<K, V, L, TYPE>,
from_data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>,
from_entry_ptr: &mut EntryPtr<K, V, TYPE>,
) {
debug_assert!(self.rw_lock.is_locked(Relaxed));
let entry = if let Some(link) = link_ref(from_entry_ptr.link_ptr) {
link.data_block.read(from_entry_ptr.pos as usize)
} else {
data_block_ref(from_data_block).read(from_entry_ptr.pos as usize)
};
self.insert(data_block, hash, entry);
let mo = if TYPE == INDEX { Release } else { Relaxed };
if let Some(link) = link_ref(from_entry_ptr.link_ptr) {
let occupied_bitmap = link.metadata.occupied_bitmap.load(Relaxed);
debug_assert_ne!(occupied_bitmap & (1_u32 << from_entry_ptr.pos), 0);
link.metadata
.occupied_bitmap
.store(occupied_bitmap & !(1_u32 << from_entry_ptr.pos), mo);
} else {
let occupied_bitmap = from_writer.metadata.occupied_bitmap.load(Relaxed);
debug_assert_ne!(occupied_bitmap & (1_u32 << from_entry_ptr.pos), 0);
from_writer
.metadata
.occupied_bitmap
.store(occupied_bitmap & !(1_u32 << from_entry_ptr.pos), mo);
}
let from_len = from_writer.len.load(Relaxed);
from_writer.len.store(from_len - 1, Relaxed);
}
pub(super) fn drop_entries(&self, data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>) {
if !self.metadata.link.is_null(Relaxed) {
let mut link_ptr = self.metadata.link.load(Acquire, fake_ref(self));
while let Some(current) = deref_unchecked(link_ptr) {
let next_link_ptr = current.metadata.link.load(Acquire, fake_ref(self));
if let Some(link) = get_owned(link_ptr) {
unsafe {
link.drop_in_place();
}
}
link_ptr = next_link_ptr;
}
}
if needs_drop::<(K, V)>() {
let mut occupied_bitmap = self.metadata.occupied_bitmap.load(Relaxed);
while occupied_bitmap != 0 {
let pos = first_slot(occupied_bitmap);
data_block_ref(data_block).drop_in_place(pos as usize);
occupied_bitmap -= 1_u32 << pos;
}
}
}
fn insert_overflow(&self, partial_hash: u8, entry: (K, V)) -> EntryPtr<K, V, TYPE> {
let mut link_ptr = self.metadata.load_link();
while let Some(link) = link_ref(link_ptr) {
let occupied_bitmap = link.metadata.occupied_bitmap.load(Relaxed);
let free_pos = free_slot(occupied_bitmap);
if free_pos as usize != LINKED_BUCKET_LEN {
debug_assert!((free_pos as usize) < LINKED_BUCKET_LEN);
self.insert_entry(
&link.metadata,
&link.data_block,
free_pos,
partial_hash,
occupied_bitmap,
entry,
);
return EntryPtr {
link_ptr,
pos: free_pos,
};
}
link_ptr = link.metadata.load_link();
}
let link = LinkedBucket::new();
let head_ptr = self.metadata.link.load(Relaxed, fake_ref(self));
link.metadata.link.store(head_ptr, Relaxed);
self.insert_entry(&link.metadata, &link.data_block, 0, partial_hash, 1, entry);
if let Some(head) = deref_unchecked(head_ptr) {
head.prev_link.store(link.as_ptr().cast_mut(), Relaxed);
}
link_ptr = link.as_ptr();
self.metadata.link.store(link.into_raw(), Release);
EntryPtr { link_ptr, pos: 0 }
}
#[inline]
fn insert_entry<const LEN: usize>(
&self,
metadata: &Metadata<K, V, LEN>,
data_block: &DataBlock<K, V, LEN>,
pos: u8,
partial_hash: u8,
occupied_bitmap: u32,
entry: (K, V),
) {
debug_assert!((pos as usize) < LEN);
debug_assert_eq!(metadata.occupied_bitmap.load(Relaxed) & (1_u32 << pos), 0);
data_block.write(pos as usize, entry.0, entry.1);
metadata.update_partial_hash(pos, partial_hash);
metadata.occupied_bitmap.store(
occupied_bitmap | (1_u32 << pos),
if TYPE == INDEX { Release } else { Relaxed },
);
self.len.store(self.len() + 1, Relaxed);
}
fn clear_unreachable_entries(&self, data_block: &DataBlock<K, V, BUCKET_LEN>) -> u32 {
debug_assert_eq!(TYPE, INDEX);
let guard = Guard::new();
let mut link_ptr = self.metadata.load_link();
while let Some(link) = link_ref(link_ptr) {
let mut next_link_ptr = link.metadata.load_link();
if next_link_ptr.is_null() {
while let Some(link) = link_ref(link_ptr) {
let prev_link_ptr = link.prev_link.load(Acquire);
if Self::drop_unreachable_entries(&link.metadata, &link.data_block, &guard) == 0
&& next_link_ptr.is_null()
{
debug_assert!(link.metadata.link.is_null(Relaxed));
let unlinked = if let Some(prev) = link_ref(prev_link_ptr) {
let unlinked = prev.metadata.link.load(Acquire, fake_ref(self));
prev.metadata.link.store(RawPtr::null(), Release);
unlinked
} else {
let unlinked = self.metadata.link.load(Acquire, fake_ref(self));
self.metadata.link.store(RawPtr::null(), Release);
unlinked
};
drop(get_owned(unlinked));
} else {
next_link_ptr = link_ptr;
}
link_ptr = prev_link_ptr;
}
break;
}
link_ptr = next_link_ptr;
}
Self::drop_unreachable_entries(&self.metadata, data_block, &guard)
}
fn drop_unreachable_entries<const LEN: usize>(
metadata: &Metadata<K, V, LEN>,
data_block: &DataBlock<K, V, LEN>,
guard: &Guard,
) -> u32 {
debug_assert_eq!(TYPE, INDEX);
let mut dropped_bitmap = metadata.removed_bitmap.load(Relaxed);
let current_epoch = guard.epoch();
#[allow(clippy::cast_possible_truncation)]
for pos in 0..LEN as u8 {
if Epoch::try_from(metadata.read_partial_hash(pos))
.is_ok_and(|e| e.in_same_generation(current_epoch))
{
dropped_bitmap &= !(1_u32 << pos);
}
}
let occupied_bitmap = metadata.occupied_bitmap.load(Relaxed) & !dropped_bitmap;
metadata.occupied_bitmap.store(occupied_bitmap, Release);
let removed_bitmap = metadata.removed_bitmap.load(Relaxed) & !dropped_bitmap;
metadata.removed_bitmap.store(removed_bitmap, Release);
if removed_bitmap != 0 {
guard.set_has_garbage();
}
if needs_drop::<(K, V)>() {
while dropped_bitmap != 0 {
let pos = first_slot(dropped_bitmap);
data_block.drop_in_place(pos as usize);
dropped_bitmap -= 1_u32 << pos;
}
}
occupied_bitmap
}
#[inline]
fn remove_from_lru_list(&self, entry_ptr: &EntryPtr<K, V, TYPE>) {
debug_assert_eq!(TYPE, CACHE);
debug_assert_ne!(entry_ptr.pos, INVALID);
debug_assert_ne!(entry_ptr.pos as usize, BUCKET_LEN);
if entry_ptr.link_ptr.is_null() {
let entry = entry_ptr.pos;
let tail = self.metadata.removed_bitmap.load(Relaxed);
if let Some(new_tail) = self.lru_list.remove(tail, entry) {
self.metadata.removed_bitmap.store(new_tail, Relaxed);
}
}
}
}
impl<K: Eq, V, L: LruList, const TYPE: char> Bucket<K, V, L, TYPE> {
#[inline]
pub(super) fn search_entry<'g, Q>(
&self,
data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>,
key: &Q,
hash: u64,
) -> Option<(&'g K, &'g V)>
where
Q: Equivalent<K> + ?Sized,
{
if self.len() != 0 {
let partial_hash = partial_hash(hash);
if let Some((k, pos)) = Self::search_data_block(
&self.metadata,
data_block_ref(data_block),
key,
partial_hash,
) {
let v = unsafe { &*data_block_ref(data_block).val_ptr(pos as usize) };
return Some((k, v));
}
let mut link_ptr = self.metadata.load_link();
while let Some(link) = link_ref(link_ptr) {
if let Some((k, pos)) =
Self::search_data_block(&link.metadata, &link.data_block, key, partial_hash)
{
let v = unsafe { &*link.data_block.val_ptr(pos as usize) };
return Some((k, v));
}
link_ptr = link.metadata.load_link();
}
}
None
}
#[inline]
pub(crate) fn get_entry_ptr<Q>(
&self,
data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>,
key: &Q,
hash: u64,
) -> EntryPtr<K, V, TYPE>
where
Q: Equivalent<K> + ?Sized,
{
if self.len() != 0 {
let partial_hash = partial_hash(hash);
if let Some((_, pos)) = Self::search_data_block(
&self.metadata,
data_block_ref(data_block),
key,
partial_hash,
) {
return EntryPtr {
link_ptr: ptr::null(),
pos,
};
}
let mut current_link_ptr = self.metadata.load_link();
while let Some(link) = link_ref(current_link_ptr) {
if let Some((_, pos)) =
Self::search_data_block(&link.metadata, &link.data_block, key, partial_hash)
{
return EntryPtr {
link_ptr: current_link_ptr,
pos,
};
}
current_link_ptr = link.metadata.load_link();
}
}
EntryPtr::null()
}
#[allow(clippy::inline_always)] #[inline(always)]
fn search_data_block<'g, Q, const LEN: usize>(
metadata: &Metadata<K, V, LEN>,
data_block: &'g DataBlock<K, V, LEN>,
key: &Q,
partial_hash: u8,
) -> Option<(&'g K, u8)>
where
Q: Equivalent<K> + ?Sized,
{
let mut bitmap = metadata.bitmap::<TYPE>();
let partial_hash_array = unsafe { &*metadata.partial_hash_array.get() };
(0..LEN).for_each(|i| {
if partial_hash_array[i] != partial_hash {
bitmap &= !(1_u32 << i);
}
});
let mut pos = first_slot(bitmap);
while u32::from(pos) != u32::BITS {
let k = unsafe { &*data_block.key_ptr(pos as usize) };
if key.equivalent(k) {
return Some((k, pos));
}
bitmap -= 1_u32 << pos;
pos = first_slot(bitmap);
}
None
}
}
impl<K, V, L: LruList, const TYPE: char> Writer<K, V, L, TYPE> {
#[inline]
pub(crate) const fn from_bucket(bucket: &Bucket<K, V, L, TYPE>) -> Writer<K, V, L, TYPE> {
Writer {
bucket_ptr: into_non_null(bucket),
}
}
#[inline]
pub(crate) async fn lock_async<'g>(
bucket: &'g Bucket<K, V, L, TYPE>,
async_guard: &'g AsyncGuard,
) -> Option<Writer<K, V, L, TYPE>> {
if likely(bucket.rw_lock.lock_async_with(|| async_guard.reset()).await) {
Some(Self::from_bucket(bucket))
} else {
None
}
}
#[inline]
pub(crate) fn lock_sync(bucket: &Bucket<K, V, L, TYPE>) -> Option<Writer<K, V, L, TYPE>> {
if likely(bucket.rw_lock.lock_sync()) {
Some(Self::from_bucket(bucket))
} else {
None
}
}
#[inline]
pub(crate) fn try_lock(
bucket: &Bucket<K, V, L, TYPE>,
) -> Result<Option<Writer<K, V, L, TYPE>>, ()> {
if likely(bucket.rw_lock.try_lock()) {
Ok(Some(Self::from_bucket(bucket)))
} else if bucket.rw_lock.is_poisoned(Relaxed) {
Ok(None)
} else {
Err(())
}
}
#[inline]
pub(super) fn kill(self) {
debug_assert_eq!(self.len(), 0);
debug_assert!(self.rw_lock.is_locked(Relaxed));
debug_assert!(
TYPE != INDEX
|| self.metadata.removed_bitmap.load(Relaxed)
== self.metadata.occupied_bitmap.load(Relaxed)
);
let poisoned = self.rw_lock.poison_lock();
debug_assert!(poisoned);
if (TYPE != INDEX || !needs_drop::<(K, V)>()) && !self.metadata.link.is_null(Relaxed) {
let mut link_ptr = self.metadata.link.load(Acquire, fake_ref(&self));
self.metadata.link.store(RawPtr::null(), Release);
while let Some(link) = deref_unchecked(link_ptr) {
let next_link_ptr = link.metadata.link.load(Acquire, fake_ref(&self));
if let Some(link) = get_owned(link_ptr) {
if TYPE != INDEX {
unsafe { link.drop_in_place() }
}
}
link_ptr = next_link_ptr;
}
}
forget(self);
}
}
impl<K, V, L: LruList, const TYPE: char> Deref for Writer<K, V, L, TYPE> {
type Target = Bucket<K, V, L, TYPE>;
#[inline]
fn deref(&self) -> &Self::Target {
unsafe { self.bucket_ptr.as_ref() }
}
}
impl<K, V, L: LruList, const TYPE: char> Drop for Writer<K, V, L, TYPE> {
#[inline]
fn drop(&mut self) {
self.rw_lock.release_lock();
}
}
unsafe impl<K: Send, V: Send, L: LruList, const TYPE: char> Send for Writer<K, V, L, TYPE> {}
unsafe impl<K: Send + Sync, V: Send + Sync, L: LruList, const TYPE: char> Sync
for Writer<K, V, L, TYPE>
{
}
impl<'g, K, V, L: LruList, const TYPE: char> Reader<K, V, L, TYPE> {
#[inline]
pub(crate) async fn lock_async(
bucket: &'g Bucket<K, V, L, TYPE>,
async_guard: &AsyncGuard,
) -> Option<Reader<K, V, L, TYPE>> {
if likely(
bucket
.rw_lock
.share_async_with(|| async_guard.reset())
.await,
) {
Some(Reader {
bucket_ptr: into_non_null(bucket),
})
} else {
None
}
}
#[inline]
pub(crate) fn lock_sync(bucket: &Bucket<K, V, L, TYPE>) -> Option<Reader<K, V, L, TYPE>> {
if likely(bucket.rw_lock.share_sync()) {
Some(Reader {
bucket_ptr: into_non_null(bucket),
})
} else {
None
}
}
}
impl<K, V, L: LruList, const TYPE: char> Deref for Reader<K, V, L, TYPE> {
type Target = Bucket<K, V, L, TYPE>;
#[inline]
fn deref(&self) -> &Self::Target {
unsafe { self.bucket_ptr.as_ref() }
}
}
impl<K, V, L: LruList, const TYPE: char> Drop for Reader<K, V, L, TYPE> {
#[inline]
fn drop(&mut self) {
self.rw_lock.release_share();
}
}
unsafe impl<K: Send, V: Send, L: LruList, const TYPE: char> Send for Reader<K, V, L, TYPE> {}
unsafe impl<K: Send + Sync, V: Send + Sync, L: LruList, const TYPE: char> Sync
for Reader<K, V, L, TYPE>
{
}
impl<K, V, const TYPE: char> EntryPtr<K, V, TYPE> {
#[inline]
pub(crate) const fn null() -> Self {
Self {
link_ptr: ptr::null(),
pos: INVALID,
}
}
#[inline]
pub(crate) const fn clone(&self) -> Self {
Self {
link_ptr: self.link_ptr,
pos: self.pos,
}
}
#[inline]
pub(crate) const fn is_valid(&self) -> bool {
self.pos != INVALID
}
#[inline]
pub(crate) const fn pos(&self) -> u8 {
self.pos
}
#[inline]
pub(crate) const fn partial_hash<L: LruList>(&self, bucket: &Bucket<K, V, L, TYPE>) -> u8 {
if let Some(link) = link_ref(self.link_ptr) {
link.metadata.read_partial_hash(self.pos)
} else {
bucket.metadata.read_partial_hash(self.pos)
}
}
#[inline]
pub(crate) const fn key<'k>(&self, data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>) -> &'k K {
unsafe { self.key_ptr(data_block).as_ref() }
}
#[inline]
pub(crate) const fn key_ptr(
&self,
data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>,
) -> NonNull<K> {
let key_ptr = if let Some(link) = link_ref(self.link_ptr) {
link.data_block.key_ptr(self.pos as usize)
} else {
data_block_ref(data_block).key_ptr(self.pos as usize)
};
unsafe { NonNull::new_unchecked(key_ptr.cast_mut()) }
}
#[inline]
pub(crate) const fn val<'v>(&self, data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>) -> &'v V {
unsafe { self.val_ptr(data_block).as_ref() }
}
#[inline]
pub(crate) const fn val_ptr(
&self,
data_block: NonNull<DataBlock<K, V, BUCKET_LEN>>,
) -> NonNull<V> {
let val_ptr = if let Some(link) = link_ref(self.link_ptr) {
link.data_block.val_ptr(self.pos as usize)
} else {
data_block_ref(data_block).val_ptr(self.pos as usize)
};
unsafe { NonNull::new_unchecked(val_ptr.cast_mut()) }
}
#[inline]
pub(crate) fn find_next<L: LruList>(&mut self, bucket: &Bucket<K, V, L, TYPE>) -> bool {
if self.link_ptr.is_null() && self.next_entry::<L, BUCKET_LEN>(&bucket.metadata) {
return true;
}
while let Some(link) = link_ref(self.link_ptr) {
if self.next_entry::<L, LINKED_BUCKET_LEN>(&link.metadata) {
return true;
}
}
false
}
fn unlink(&mut self, link_head: &AtomicRaw<LinkedBucket<K, V>>) {
let prev_link_ptr =
link_ref(self.link_ptr).map_or(ptr::null_mut(), |link| link.prev_link.load(Acquire));
let next_link = link_ref(self.link_ptr)
.and_then(|link| get_owned(link.metadata.link.load(Acquire, fake_ref(self))));
let next_link_ptr = if let Some(next) = next_link {
next.prev_link.store(prev_link_ptr, Relaxed);
self.link_ptr = next.as_ptr();
self.pos = INVALID;
next.into_raw()
} else {
self.link_ptr = prev_link_ptr;
self.pos = INVALID - 1;
RawPtr::null()
};
let unlinked = if let Some(prev) = link_ref(prev_link_ptr) {
let unlinked = prev.metadata.link.load(Acquire, fake_ref(self));
prev.metadata.link.store(next_link_ptr, Release);
unlinked
} else {
let unlinked = link_head.load(Acquire, fake_ref(self));
link_head.store(next_link_ptr, Release);
unlinked
};
drop(get_owned(unlinked));
}
#[inline]
fn next_entry<L: LruList, const LEN: usize>(&mut self, metadata: &Metadata<K, V, LEN>) -> bool {
let current_pos = if likely(self.pos != INVALID) {
self.pos + 1
} else {
0
};
if (current_pos as usize) < LEN {
let bitmap = metadata.bitmap::<TYPE>() & (!((1_u32 << current_pos) - 1));
let next_pos = first_slot(bitmap);
if (next_pos as usize) < LEN {
self.pos = next_pos;
return true;
}
}
self.link_ptr = metadata.load_link();
self.pos = INVALID;
false
}
}
unsafe impl<K: Send, V: Send, const TYPE: char> Send for EntryPtr<K, V, TYPE> {}
unsafe impl<K: Send + Sync, V: Send + Sync, const TYPE: char> Sync for EntryPtr<K, V, TYPE> {}
impl LruList for () {}
impl DoublyLinkedList {
#[inline]
fn read(&self, pos: u32) -> (u8, u8) {
unsafe { *self.0.get_unchecked(pos as usize).get() }
}
#[inline]
fn write<R, F: FnOnce(&mut (u8, u8)) -> R>(&self, pos: u32, f: F) -> R {
unsafe { f(&mut *self.0.get_unchecked(pos as usize).get()) }
}
}
impl LruList for DoublyLinkedList {
#[inline]
fn evict(&self, tail: u32) -> Option<(u8, u32)> {
if tail == 0 {
None
} else {
let lru = self.read(tail - 1).0;
let new_tail = if tail - 1 == u32::from(lru) {
0
} else {
let new_lru = self.read(u32::from(lru)).0;
{
#![allow(clippy::cast_possible_truncation)]
self.write(u32::from(new_lru), |v| {
v.1 = tail as u8 - 1;
});
}
self.write(tail - 1, |v| {
v.0 = new_lru;
});
tail
};
self.write(u32::from(lru), |v| {
*v = (0, 0);
});
Some((lru, new_tail))
}
}
#[inline]
fn remove(&self, tail: u32, entry: u8) -> Option<u32> {
if tail == 0
|| (self.read(u32::from(entry)) == (0, 0)
&& (self.read(0) != (entry, entry) || (tail != 1 && tail != u32::from(entry) + 1)))
{
return None;
}
if self.read(u32::from(entry)).0 == entry {
debug_assert_eq!(tail, u32::from(entry) + 1);
self.write(u32::from(entry), |v| {
*v = (0, 0);
});
return Some(0);
}
let (prev, next) = self.read(u32::from(entry));
debug_assert_eq!(self.read(u32::from(prev)).1, entry);
self.write(u32::from(prev), |v| {
v.1 = next;
});
debug_assert_eq!(self.read(u32::from(next)).0, entry);
self.write(u32::from(next), |v| {
v.0 = prev;
});
let new_tail = if tail == u32::from(entry) + 1 {
Some(u32::from(next) + 1)
} else {
None
};
self.write(u32::from(entry), |v| {
*v = (0, 0);
});
new_tail
}
#[inline]
fn promote(&self, tail: u32, entry: u8) -> Option<u32> {
if tail == u32::from(entry) + 1 {
return None;
} else if tail == 0 {
self.write(u32::from(entry), |v| {
*v = (entry, entry);
});
return Some(u32::from(entry) + 1);
}
if self.read(u32::from(entry)) != (0, 0) || (self.read(0) == (entry, entry) && tail == 1) {
let (prev, next) = self.read(u32::from(entry));
debug_assert_eq!(self.read(u32::from(prev)).1, entry);
self.write(u32::from(prev), |v| {
v.1 = next;
});
debug_assert_eq!(self.read(u32::from(next)).0, entry);
self.write(u32::from(next), |v| {
v.0 = prev;
});
}
let oldest = self.read(tail - 1).0;
debug_assert_eq!(u32::from(self.read(u32::from(oldest)).1) + 1, tail);
self.write(u32::from(oldest), |v| {
v.1 = entry;
});
self.write(u32::from(entry), |v| {
v.0 = oldest;
});
self.write(tail - 1, |v| {
v.0 = entry;
});
{
#![allow(clippy::cast_possible_truncation)]
self.write(u32::from(entry), |v| {
v.1 = tail as u8 - 1;
});
}
Some(u32::from(entry) + 1)
}
}
unsafe impl Send for DoublyLinkedList {}
unsafe impl Sync for DoublyLinkedList {}
impl<K, V, const LEN: usize> Metadata<K, V, LEN> {
#[inline]
const fn read_partial_hash(&self, pos: u8) -> u8 {
unsafe {
self.partial_hash_array
.get()
.cast::<u8>()
.add(pos as usize)
.read()
}
}
#[inline]
const fn update_partial_hash(&self, pos: u8, partial_hash: u8) {
unsafe {
self.partial_hash_array
.get()
.cast::<u8>()
.add(pos as usize)
.write(partial_hash);
}
}
#[inline]
fn bitmap<const TYPE: char>(&self) -> u32 {
if TYPE == INDEX {
!self.removed_bitmap.load(Acquire) & self.occupied_bitmap.load(Acquire)
} else {
self.occupied_bitmap.load(Relaxed)
}
}
#[inline]
fn load_link(&self) -> *const LinkedBucket<K, V> {
unsafe {
self.link
.load(Acquire, fake_ref(&self))
.into_ptr()
.as_ptr_unchecked()
}
}
}
unsafe impl<K: Send, V: Send, const LEN: usize> Send for Metadata<K, V, LEN> {}
unsafe impl<K: Send + Sync, V: Send + Sync, const LEN: usize> Sync for Metadata<K, V, LEN> {}
impl<K, V> LinkedBucket<K, V> {
#[inline]
fn new() -> Owned<Self> {
unsafe {
Owned::new_with_unchecked(|| Self {
metadata: Metadata {
occupied_bitmap: AtomicU32::default(),
removed_bitmap: AtomicU32::default(),
partial_hash_array: UnsafeCell::new(Default::default()),
link: AtomicRaw::default(),
},
prev_link: AtomicPtr::default(),
data_block: DataBlock::new(),
})
}
}
}
impl<K, V> Drop for LinkedBucket<K, V> {
#[inline]
fn drop(&mut self) {
if needs_drop::<(K, V)>() {
let mut occupied_bitmap = self.metadata.occupied_bitmap.load(Relaxed);
while occupied_bitmap != 0 {
let pos = first_slot(occupied_bitmap);
self.data_block.drop_in_place(pos as usize);
occupied_bitmap -= 1_u32 << pos;
}
}
}
}
#[allow(clippy::cast_possible_truncation)]
#[inline]
const fn partial_hash(hash: u64) -> u8 {
hash as u8
}
#[inline]
const fn data_block_ref<'l, K, V, const LEN: usize>(
data_block_ptr: NonNull<DataBlock<K, V, LEN>>,
) -> &'l DataBlock<K, V, LEN> {
unsafe { data_block_ptr.as_ref() }
}
#[allow(clippy::cast_possible_truncation)]
#[inline]
const fn free_slot(bitmap: u32) -> u8 {
bitmap.trailing_ones() as u8
}
#[allow(clippy::cast_possible_truncation)]
#[inline]
const fn first_slot(bitmap: u32) -> u8 {
bitmap.trailing_zeros() as u8
}
#[inline]
const fn link_ref<'l, K, V>(ptr: *const LinkedBucket<K, V>) -> Option<&'l LinkedBucket<K, V>> {
unsafe { ptr.as_ref() }
}
#[cfg(not(feature = "loom"))]
#[cfg(test)]
mod test {
use super::*;
use std::sync::Arc;
use std::sync::atomic::AtomicPtr;
use std::sync::atomic::Ordering::Relaxed;
use proptest::prelude::*;
use tokio::sync::Barrier;
#[cfg(not(miri))]
static_assertions::assert_eq_size!(Bucket<String, String, (), MAP>, [u8; BUCKET_LEN * 2]);
#[cfg(not(miri))]
static_assertions::assert_eq_size!(Bucket<String, String, DoublyLinkedList, CACHE>, [u8; BUCKET_LEN * 4]);
proptest! {
#[cfg_attr(miri, ignore)]
#[test]
fn evict_untracked(xs in 0..BUCKET_LEN * 2) {
let data_block: DataBlock<usize, usize, BUCKET_LEN> = DataBlock::new();
let data_block_ptr = into_non_null(&data_block);
let bucket: Bucket<usize, usize, DoublyLinkedList, CACHE> = Bucket::new();
for v in 0..xs {
let writer = Writer::lock_sync(&bucket).unwrap();
let evicted = writer.evict_lru_head(data_block_ptr);
assert_eq!(v >= BUCKET_LEN, evicted.is_some());
writer.insert(data_block_ptr, 0, (v, v));
assert_eq!(writer.metadata.removed_bitmap.load(Relaxed), 0);
}
}
#[cfg_attr(miri, ignore)]
#[test]
fn evict_overflowed(xs in 1..BUCKET_LEN * 2) {
let data_block: DataBlock<usize, usize, BUCKET_LEN> = DataBlock::new();
let data_block_ptr = into_non_null(&data_block);
let bucket: Bucket<usize, usize, DoublyLinkedList, CACHE> = Bucket::new();
let writer = Writer::lock_sync(&bucket).unwrap();
for _ in 0..3 {
for v in 0..xs {
let entry_ptr = writer.insert(data_block_ptr, 0, (v, v));
writer.update_lru_tail(&entry_ptr);
if v < BUCKET_LEN {
assert_eq!(
writer.metadata.removed_bitmap.load(Relaxed) as usize,
v + 1
);
}
assert_eq!(
writer.lru_list.read
(writer.metadata.removed_bitmap.load(Relaxed) - 1)
.0,
0
);
}
let mut evicted_key = None;
if xs >= BUCKET_LEN {
let evicted = writer.evict_lru_head(data_block_ptr);
assert!(evicted.is_some());
evicted_key = evicted.map(|(k, _)| k);
}
assert_ne!(writer.metadata.removed_bitmap.load(Relaxed), 0);
for v in 0..xs {
let mut entry_ptr = writer.get_entry_ptr(data_block_ptr, &v, 0);
if entry_ptr.is_valid() {
let _erased = writer.remove(data_block_ptr, &mut entry_ptr);
} else {
assert_eq!(v, evicted_key.unwrap());
}
}
assert_eq!(writer.metadata.removed_bitmap.load(Relaxed), 0);
}
}
#[cfg_attr(miri, ignore)]
#[test]
fn evict_tracked(xs in 0..BUCKET_LEN * 2) {
let data_block: DataBlock<usize, usize, BUCKET_LEN> = DataBlock::new();
let data_block_ptr = into_non_null(&data_block);
let bucket: Bucket<usize, usize, DoublyLinkedList, CACHE> = Bucket::new();
for v in 0..xs {
let writer = Writer::lock_sync(&bucket).unwrap();
let evicted = writer.evict_lru_head(data_block_ptr);
assert_eq!(v >= BUCKET_LEN, evicted.is_some());
let mut entry_ptr = writer.insert(data_block_ptr, 0, (v, v));
writer.update_lru_tail(&entry_ptr);
assert_eq!(
writer.metadata.removed_bitmap.load(Relaxed),
u32::from(entry_ptr.pos) + 1
);
if v >= BUCKET_LEN {
entry_ptr.pos = u8::try_from(xs % BUCKET_LEN).unwrap_or(0);
writer.update_lru_tail(&entry_ptr);
assert_eq!(
writer.metadata.removed_bitmap.load(Relaxed),
u32::from(entry_ptr.pos) + 1
);
let mut iterated = 1;
let mut i = u32::from(writer.lru_list.read(u32::from(entry_ptr.pos)).1);
while i != u32::from(entry_ptr.pos) {
iterated += 1;
i = u32::from(writer.lru_list.read(i).1);
}
assert_eq!(iterated, BUCKET_LEN);
iterated = 1;
i = u32::from(writer.lru_list.read(u32::from(entry_ptr.pos)).0);
while i != u32::from(entry_ptr.pos) {
iterated += 1;
i = u32::from(writer.lru_list.read(i).0);
}
assert_eq!(iterated, BUCKET_LEN);
}
}
}
#[cfg_attr(miri, ignore)]
#[test]
fn removed(xs in 0..BUCKET_LEN) {
let data_block: DataBlock<usize, usize, BUCKET_LEN> = DataBlock::new();
let data_block_ptr = into_non_null(&data_block);
let bucket: Bucket<usize, usize, DoublyLinkedList, CACHE> = Bucket::new();
for v in 0..xs {
let writer = Writer::lock_sync(&bucket).unwrap();
let entry_ptr = writer.insert(data_block_ptr, 0, (v, v));
writer.update_lru_tail(&entry_ptr);
let mut iterated = 1;
let mut i = u32::from(writer.lru_list.read(u32::from(entry_ptr.pos)).1);
while i != u32::from(entry_ptr.pos) {
iterated += 1;
i = u32::from(writer.lru_list.read(i).1);
}
assert_eq!(iterated, v + 1);
}
for v in 0..xs {
let writer = Writer::lock_sync(&bucket).unwrap();
let data_block_ptr = into_non_null(&data_block);
let entry_ptr = writer.get_entry_ptr(data_block_ptr, &v, 0);
let mut iterated = 1;
let mut i = u32::from(writer.lru_list.read(u32::from(entry_ptr.pos)).1);
while i != u32::from(entry_ptr.pos) {
iterated += 1;
i = u32::from(writer.lru_list.read(i).1);
}
assert_eq!(iterated, xs - v);
writer.remove_from_lru_list(&entry_ptr);
}
assert_eq!(bucket.metadata.removed_bitmap.load(Relaxed), 0);
}
}
#[cfg_attr(miri, ignore)]
#[tokio::test(flavor = "multi_thread", worker_threads = 16)]
async fn bucket_lock_sync() {
let num_tasks = BUCKET_LEN + 2;
let barrier = Arc::new(Barrier::new(num_tasks));
let data_block: Arc<DataBlock<usize, usize, BUCKET_LEN>> = Arc::new(DataBlock::new());
let bucket: Arc<Bucket<usize, usize, (), MAP>> = Arc::new(Bucket::new());
let mut data: [u64; 128] = [0; 128];
let mut task_handles = Vec::with_capacity(num_tasks);
for task_id in 0..num_tasks {
let barrier_clone = barrier.clone();
let data_block_clone = data_block.clone();
let bucket_clone = bucket.clone();
let data_ptr = AtomicPtr::new(&raw mut data);
task_handles.push(tokio::spawn(async move {
barrier_clone.wait().await;
let partial_hash = (task_id % BUCKET_LEN).try_into().unwrap();
for i in 0..2048 {
let writer = Writer::lock_sync(&bucket_clone).unwrap();
let mut sum: u64 = 0;
for j in 0..128 {
unsafe {
sum += (*data_ptr.load(Relaxed))[j];
(*data_ptr.load(Relaxed))[j] = if i % 4 == 0 { 2 } else { 4 }
};
}
assert_eq!(sum % 256, 0);
let data_block_ptr = unsafe {
NonNull::new_unchecked(Arc::as_ptr(&data_block_clone).cast_mut())
};
if i == 0 {
assert!(
writer
.insert(data_block_ptr, partial_hash, (task_id, 0))
.is_valid()
);
} else {
assert_eq!(
writer
.search_entry(data_block_ptr, &task_id, partial_hash)
.unwrap(),
(&task_id, &0_usize)
);
}
drop(writer);
let reader = Reader::lock_sync(&*bucket_clone).unwrap();
assert_eq!(
reader
.search_entry(data_block_ptr, &task_id, partial_hash)
.unwrap(),
(&task_id, &0_usize)
);
}
}));
}
for r in futures::future::join_all(task_handles).await {
assert!(r.is_ok());
}
let sum: u64 = data.iter().sum();
assert_eq!(sum % 256, 0);
assert_eq!(bucket.len(), num_tasks);
let data_block_ptr = unsafe { NonNull::new_unchecked(Arc::as_ptr(&data_block).cast_mut()) };
for task_id in 0..num_tasks {
assert_eq!(
bucket.search_entry(
data_block_ptr,
&task_id,
(task_id % BUCKET_LEN).try_into().unwrap(),
),
Some((&task_id, &0))
);
}
let mut count = 0;
let mut entry_ptr = EntryPtr::null();
while entry_ptr.find_next(&bucket) {
count += 1;
}
assert_eq!(bucket.len(), count);
entry_ptr = EntryPtr::null();
let writer = Writer::lock_sync(&bucket).unwrap();
while entry_ptr.find_next(&writer) {
writer.remove(
unsafe { NonNull::new_unchecked(Arc::as_ptr(&data_block).cast_mut()) },
&mut entry_ptr,
);
}
assert_eq!(writer.len(), 0);
writer.kill();
assert_eq!(bucket.len(), 0);
assert!(bucket.rw_lock.is_poisoned(Acquire));
}
}