use std::mem;
use std::ptr;
use std::sync::atomic::AtomicPtr;
use std::sync::atomic::Ordering;
use allocative::Allocative;
use allocative::Key;
use allocative::Visitor;
use parking_lot::RwLock;
use crate::atomic_value::AtomicValue;
use crate::fixed_cap;
use crate::fixed_cap::FixedCapTable;
pub use crate::fixed_cap::Iter;
struct CurrentTable<T: AtomicValue> {
_prev: Option<Box<CurrentTable<T>>>,
table: FixedCapTable<T>,
}
impl<T: AtomicValue + Allocative> CurrentTable<T> {
fn visit<'a, 'b: 'a>(&self, visitor: &'a mut Visitor<'b>, current: bool) {
let mut visitor = visitor.enter_self_sized::<Self>();
{
let mut visitor =
visitor.enter_unique(Key::new("_prev"), mem::size_of_val(&self._prev));
if let Some(prev) = &self._prev {
prev.visit(&mut visitor, false);
}
visitor.exit();
}
{
let mut visitor =
visitor.enter_unique(Key::new("table"), mem::size_of_val(&self.table));
self.table.visit(&mut visitor, current);
visitor.exit();
}
visitor.exit();
}
}
pub struct LockFreeRawTable<T: AtomicValue> {
write_lock: RwLock<()>,
current: AtomicPtr<CurrentTable<T>>,
}
impl<T: AtomicValue> Drop for LockFreeRawTable<T> {
fn drop(&mut self) {
let current = self.current.get_mut();
if !current.is_null() {
unsafe {
let mut current = Box::from_raw(*current);
current.table.drop_entries();
};
}
}
}
impl<T: AtomicValue> Default for LockFreeRawTable<T> {
#[inline]
fn default() -> LockFreeRawTable<T> {
LockFreeRawTable::new()
}
}
impl<T: AtomicValue> LockFreeRawTable<T> {
#[inline]
pub const fn new() -> LockFreeRawTable<T> {
LockFreeRawTable {
write_lock: RwLock::new(()),
current: AtomicPtr::new(ptr::null_mut()),
}
}
#[inline]
pub fn lookup(&self, hash: u64, eq: impl Fn(T::Ref<'_>) -> bool) -> Option<T::Ref<'_>> {
let current = self.current.load(Ordering::Acquire);
if current.is_null() {
return None;
}
let current = unsafe { &*current };
current.table.lookup(hash, eq)
}
pub fn insert(
&self,
hash: u64,
mut value: T,
eq: impl Fn(T::Ref<'_>, T::Ref<'_>) -> bool,
hash_fn: impl Fn(T::Ref<'_>) -> u64,
) -> (T::Ref<'_>, Option<T>) {
loop {
let guard = self.write_lock.read();
let current = self.current.load(Ordering::Relaxed);
if current.is_null() {
drop(guard);
self.resize_if_needed(|v| hash_fn(v));
continue;
}
let current = unsafe { &*current };
match current.table.insert(hash, value, |a, b| eq(a, b)) {
Ok((reference, value)) => {
drop(guard);
if current.table.need_resize() {
self.resize_if_needed(|v| hash_fn(v));
}
return (reference, value);
}
Err(ret_value) => {
drop(guard);
self.resize_if_needed(|v| hash_fn(v));
value = ret_value;
}
}
}
}
#[cold]
fn resize_if_needed(&self, hash: impl Fn(T::Ref<'_>) -> u64) {
let _guard = self.write_lock.write();
let current_ptr = self.current.load(Ordering::Relaxed);
if current_ptr.is_null() {
let new_table: FixedCapTable<T> = FixedCapTable::with_capacity(16);
let new_current = Box::new(CurrentTable {
_prev: None,
table: new_table,
});
self.current
.store(Box::into_raw(new_current), Ordering::Release);
return;
}
let current = unsafe { &*current_ptr };
if !current.table.need_resize() {
return;
}
let new_cap = current.table.capacity().checked_mul(2).unwrap();
let mut new_table: FixedCapTable<T> = FixedCapTable::with_capacity(new_cap);
for entry_ptr in current.table.iter_ptrs() {
let entry = unsafe { T::deref(entry_ptr) };
let hash = hash(entry);
new_table.insert_unique_unchecked(hash, entry_ptr);
}
let new_current = Box::new(CurrentTable {
_prev: Some(unsafe { Box::from_raw(current_ptr) }),
table: new_table,
});
self.current
.store(Box::into_raw(new_current), Ordering::Release);
}
pub fn iter(&self) -> Iter<'_, T> {
let current = self.current.load(Ordering::Acquire);
if current.is_null() {
return Iter::empty();
}
let current = unsafe { &*current };
current.table.iter()
}
#[inline]
pub fn len(&self) -> usize {
let current = self.current.load(Ordering::Acquire);
if current.is_null() {
return 0;
}
let current = unsafe { &*current };
current.table.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
pub struct IntoIter<T: AtomicValue> {
_prev: Option<Box<CurrentTable<T>>>,
iter: fixed_cap::IntoIter<T>,
}
impl<T: AtomicValue> Iterator for IntoIter<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.iter.next()
}
}
impl<T: AtomicValue> IntoIterator for LockFreeRawTable<T> {
type Item = T;
type IntoIter = IntoIter<T>;
fn into_iter(mut self) -> IntoIter<T> {
let current_ptr = mem::replace(self.current.get_mut(), ptr::null_mut());
if current_ptr.is_null() {
return IntoIter {
_prev: None,
iter: fixed_cap::IntoIter::empty(),
};
}
let current = unsafe { Box::from_raw(current_ptr) };
let CurrentTable { _prev, table } = *current;
IntoIter {
_prev,
iter: table.into_iter(),
}
}
}
impl<T: AtomicValue + Allocative> Allocative for LockFreeRawTable<T> {
fn visit<'a, 'b: 'a>(&self, visitor: &'a mut Visitor<'b>) {
let mut visitor = visitor.enter_self_sized::<Self>();
{
let mut visitor = visitor.enter_unique(
allocative::Key::new("current"),
mem::size_of_val(&self.current),
);
let current = self.current.load(Ordering::Acquire);
if !current.is_null() {
let current = unsafe { &*current };
current.visit(&mut visitor, true);
}
visitor.exit();
}
visitor.exit();
}
}
#[cfg(test)]
mod tests {
use std::collections::hash_map::DefaultHasher;
use std::hash::Hash;
use std::hash::Hasher;
use std::num::NonZeroU32;
use std::ptr;
use std::ptr::NonNull;
use crate::atomic_value::RawPtr;
use crate::raw::LockFreeRawTable;
fn hash<T: Hash>(key: T) -> u64 {
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
hasher.finish()
}
#[allow(clippy::trivially_copy_pass_by_ref)]
fn hash_fn(key: &u32) -> u64 {
hash(*key)
}
#[test]
fn test_simple() {
let t = LockFreeRawTable::new();
let v0 = t.insert(hash(1), Box::new(1), |a, b| a == b, hash_fn).0;
assert_eq!(&1, v0);
let v1 = t.insert(hash(1), Box::new(1), |a, b| a == b, hash_fn).0;
assert_eq!(&1, v1);
assert!(ptr::eq(v0, v1));
let v2 = t.lookup(hash(1), |a| a == &1).unwrap();
assert_eq!(&1, v2);
assert!(ptr::eq(v0, v2));
assert_eq!(vec![1], t.iter().copied().collect::<Vec<_>>());
}
#[test]
fn test_million() {
let t = LockFreeRawTable::new();
for i in 0..1000000 {
t.insert(hash(i), Box::new(i), |a, b| a == b, hash_fn);
}
for i in 0..1000000 {
let v = t.lookup(hash(i), |a| a == &i).unwrap();
assert_eq!(&i, v);
}
assert_eq!(1000000, t.iter().count());
}
#[test]
fn test_u32() {
let t = LockFreeRawTable::new();
for i in 1..10000 {
t.insert(hash(i), NonZeroU32::new(i).unwrap(), |a, b| a == b, hash);
}
for i in 1..10000 {
let v = t.lookup(hash(i), |a| a.get() == i).unwrap();
assert_eq!(NonZeroU32::new(i).unwrap(), v);
}
}
#[test]
fn test_raw_pointers() {
let t = LockFreeRawTable::new();
let p = NonNull::<u32>::dangling();
let p = RawPtr(p);
let r = t.insert(hash(p.0), p, |a, b| a == b, hash).0;
assert_eq!(p.0, r);
}
#[test]
fn test_into_iter() {
let t = LockFreeRawTable::new();
for i in 0..10 {
t.insert(hash(i), Box::new(i), |a, b| a == b, hash_fn);
}
let mut collect: Vec<u32> = t.into_iter().map(|b| *b).collect();
collect.sort_unstable();
assert_eq!((0..10).collect::<Vec<_>>(), collect);
}
#[test]
fn test_into_iter_empty() {
let t = LockFreeRawTable::<Box<u32>>::new();
let collect: Vec<Box<u32>> = t.into_iter().collect();
assert!(collect.is_empty());
}
#[test]
fn test_allocative() {
let table = LockFreeRawTable::<Box<u16>>::new();
for i in 0..100 {
let (_r, inserted) = table.insert(hash(i), Box::new(i), |a, b| a == b, |v| hash(v));
assert!(inserted.is_none());
}
let mut builder = allocative::FlameGraphBuilder::default();
builder.visit_root(&table);
let _flame_graph = builder.finish_and_write_flame_graph();
}
}