use crate::align_padding;
use alloc::vec::Vec;
use core::alloc::{GlobalAlloc, Layout};
use core::hash::Hasher;
use core::marker::PhantomData;
use core::ops::Deref;
use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst};
use core::sync::atomic::{compiler_fence, fence, AtomicU64, AtomicUsize};
use core::{intrinsics, mem, ptr};
use crossbeam_epoch::*;
use std::alloc::System;
use std::collections::hash_map::DefaultHasher;
use std::hash::Hash;
use std::ops::DerefMut;
use std::os::raw::c_void;
use std::time::{SystemTime, UNIX_EPOCH};
pub struct EntryTemplate(usize, usize);
const EMPTY_KEY: usize = 0;
const EMPTY_VALUE: usize = 0;
const SENTINEL_VALUE: usize = 1;
const TOMBSTONE_VALUE: usize = 2;
const VAL_BIT_MASK: usize = !0 << 1 >> 1;
const INV_VAL_BIT_MASK: usize = !VAL_BIT_MASK;
const MUTEX_BIT_MASK: usize = !WORD_MUTEX_DATA_BIT_MASK & VAL_BIT_MASK;
const ENTRY_SIZE: usize = mem::size_of::<EntryTemplate>();
struct Value {
raw: usize,
parsed: ParsedValue,
}
enum ParsedValue {
Val(usize),
Prime(usize),
Sentinel,
Empty,
}
#[derive(Debug)]
enum ModResult<V> {
Replaced(usize, V, usize),
Existed(usize, V),
Fail,
Sentinel,
NotFound,
Done(usize, Option<V>, usize),
TableFull,
Aborted,
}
enum ModOp<'a, V> {
Insert(usize, &'a V),
UpsertFastVal(usize),
AttemptInsert(usize, &'a V),
SwapFastVal(Box<dyn Fn(usize) -> Option<usize>>),
Sentinel,
Tombstone,
}
pub enum InsertOp {
Insert,
UpsertFast,
TryInsert,
}
enum ResizeResult {
NoNeed,
SwapFailed,
ChunkChanged,
Done,
}
enum SwapResult<'a, K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> {
Succeed(usize, usize, Shared<'a, ChunkPtr<K, V, A, ALLOC>>),
NotFound,
Failed,
Aborted,
}
pub struct Chunk<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> {
capacity: usize,
base: usize,
occu_limit: usize,
occupation: AtomicUsize,
empty_entries: AtomicUsize,
total_size: usize,
attachment: A,
shadow: PhantomData<(K, V, ALLOC)>,
}
pub struct ChunkPtr<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> {
ptr: *mut Chunk<K, V, A, ALLOC>,
}
pub struct Table<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default, H: Hasher + Default> {
new_chunk: Atomic<ChunkPtr<K, V, A, ALLOC>>,
chunk: Atomic<ChunkPtr<K, V, A, ALLOC>>,
count: AtomicUsize,
epoch: AtomicUsize,
timestamp: AtomicU64,
mark: PhantomData<H>,
}
impl<
K: Clone + Hash + Eq,
V: Clone,
A: Attachment<K, V>,
ALLOC: GlobalAlloc + Default,
H: Hasher + Default,
> Table<K, V, A, ALLOC, H>
{
pub fn with_capacity(cap: usize) -> Self {
if !is_power_of_2(cap) {
panic!("capacity is not power of 2");
}
let chunk = Chunk::alloc_chunk(cap);
Self {
chunk: Atomic::new(ChunkPtr::new(chunk)),
new_chunk: Atomic::null(),
count: AtomicUsize::new(0),
epoch: AtomicUsize::new(0),
timestamp: AtomicU64::new(timestamp()),
mark: PhantomData,
}
}
pub fn new() -> Self {
Self::with_capacity(64)
}
pub fn get(&self, key: &K, fkey: usize, read_attachment: bool) -> Option<(usize, Option<V>)> {
enum FromChunkRes<V> {
Value(usize, Option<V>),
None,
Sentinel,
};
let guard = crossbeam_epoch::pin();
let backoff = crossbeam_utils::Backoff::new();
let hash = hash::<H>(fkey);
loop {
let epoch = self.now_epoch();
let chunk_ptr = self.chunk.load(Acquire, &guard);
let new_chunk_ptr = self.new_chunk.load(Acquire, &guard);
let copying = Self::is_copying(epoch);
debug_assert!(!chunk_ptr.is_null());
let get_from = |chunk_ptr: &Shared<ChunkPtr<K, V, A, ALLOC>>| {
let chunk = unsafe { chunk_ptr.deref() };
let (val, idx) = self.get_from_chunk(&*chunk, hash, key, fkey);
match val.parsed {
ParsedValue::Empty | ParsedValue::Val(0) => FromChunkRes::None,
ParsedValue::Prime(val) | ParsedValue::Val(val) => FromChunkRes::Value(
val,
if read_attachment {
Some(chunk.attachment.get(idx).1)
} else {
None
},
),
ParsedValue::Sentinel => FromChunkRes::Sentinel,
}
};
return match get_from(&chunk_ptr) {
FromChunkRes::Value(fval, val) => Some((fval, val)),
FromChunkRes::Sentinel => {
if copying && self.now_epoch() == epoch {
dfence();
match get_from(&new_chunk_ptr) {
FromChunkRes::Value(fval, val) => Some((fval, val)),
FromChunkRes::Sentinel => {
backoff.spin();
continue;
}
FromChunkRes::None => {
trace!(
"Got non from new chunk for {} at epoch {}",
fkey - 5,
epoch
);
None
}
}
} else {
warn!(
"Got sentinel on get but new chunk is null for {}, retry. Copying {}, epoch {}, now epoch {}",
fkey,
copying,
epoch,
self.epoch.load(Acquire)
);
backoff.spin();
continue;
}
}
FromChunkRes::None => {
if copying {
dfence();
if new_chunk_ptr.is_null() {
continue;
}
match get_from(&new_chunk_ptr) {
FromChunkRes::Value(fval, val) => Some((fval, val)),
FromChunkRes::Sentinel => {
backoff.spin();
continue;
}
FromChunkRes::None => {
trace!(
"Got non from new chunk for {} at epoch {}",
fkey - 5,
epoch
);
None
}
}
} else {
None
}
}
};
}
}
pub fn insert(
&self,
op: InsertOp,
key: &K,
value: Option<V>,
fkey: usize,
fvalue: usize,
) -> Option<(usize, V)> {
let backoff = crossbeam_utils::Backoff::new();
let guard = crossbeam_epoch::pin();
let hash = hash::<H>(fkey);
loop {
let epoch = self.now_epoch();
let chunk_ptr = self.chunk.load(Acquire, &guard);
let new_chunk_ptr = self.new_chunk.load(Acquire, &guard);
let copying = Self::is_copying(epoch);
if !copying {
match self.check_migration(chunk_ptr, &guard) {
ResizeResult::Done | ResizeResult::SwapFailed | ResizeResult::ChunkChanged => {
debug!("Retry insert due to resize");
backoff.spin();
continue;
}
ResizeResult::NoNeed => {}
}
} else if new_chunk_ptr.is_null() {
warn!("Chunk ptrs does not consist with epoch");
continue;
}
let chunk = unsafe { chunk_ptr.deref() };
let new_chunk = unsafe { new_chunk_ptr.deref() };
let modify_chunk = if copying { new_chunk } else { chunk };
let masked_value = fvalue & VAL_BIT_MASK;
let mod_op = match op {
InsertOp::Insert => ModOp::Insert(masked_value, value.as_ref().unwrap()),
InsertOp::UpsertFast => ModOp::UpsertFastVal(masked_value),
InsertOp::TryInsert => ModOp::AttemptInsert(masked_value, value.as_ref().unwrap()),
};
let value_insertion =
self.modify_entry(&*modify_chunk, hash, key, fkey, mod_op, &guard);
let mut result = None;
match value_insertion {
ModResult::Done(_, _, _) => {
modify_chunk.occupation.fetch_add(1, Relaxed);
self.count.fetch_add(1, AcqRel);
}
ModResult::Replaced(fv, v, _) | ModResult::Existed(fv, v) => result = Some((fv, v)),
ModResult::Fail => {
warn!(
"Insertion failed, do migration and retry. Copying {}, cap {}, count {}, old {:?}, new {:?}",
copying,
modify_chunk.capacity,
modify_chunk.occupation.load(Relaxed),
chunk_ptr,
new_chunk_ptr
);
backoff.spin();
continue;
}
ModResult::TableFull => {
trace!(
"Insertion is too fast, copying {}, cap {}, count {}, old {:?}, new {:?}.",
copying,
modify_chunk.capacity,
modify_chunk.occupation.load(Relaxed),
chunk_ptr,
new_chunk_ptr
);
self.do_migration(chunk_ptr, &guard);
backoff.spin();
continue;
}
ModResult::Sentinel => {
trace!("Discovered sentinel on insertion table upon probing, retry");
backoff.spin();
continue;
}
ModResult::NotFound => unreachable!("Not Found on insertion is impossible"),
ModResult::Aborted => unreachable!("Should no abort"),
}
if copying && self.now_epoch() == epoch {
dfence();
assert_ne!(
chunk_ptr, new_chunk_ptr,
"at epoch {}, inserting k:{}, v:{}",
epoch, fkey, fvalue
);
assert_ne!(
new_chunk_ptr,
Shared::null(),
"at epoch {}, inserting k:{}, v:{}",
epoch,
fkey,
fvalue
);
self.modify_entry(chunk, hash, key, fkey, ModOp::Sentinel, &guard);
}
return result;
}
}
#[inline(always)]
fn is_copying(epoch: usize) -> bool {
epoch | 1 == epoch
}
#[inline(always)]
fn epoch_changed(&self, epoch: usize) -> bool {
self.now_epoch() != epoch
}
fn swap<'a, F: Fn(usize) -> Option<usize> + Copy + 'static>(
&self,
fkey: usize,
key: &K,
func: F,
guard: &'a Guard,
) -> SwapResult<'a, K, V, A, ALLOC> {
let backoff = crossbeam_utils::Backoff::new();
let hash = hash::<H>(fkey);
loop {
let epoch = self.now_epoch();
let chunk_ptr = self.chunk.load(Acquire, &guard);
let new_chunk_ptr = self.new_chunk.load(Acquire, &guard);
let copying = Self::is_copying(epoch);
let chunk = unsafe { chunk_ptr.deref() };
let new_chunk = unsafe { new_chunk_ptr.deref() };
if copying && self.now_epoch() == epoch {
let (old_parsed_val, old_index) = self.get_from_chunk(chunk, hash, key, fkey);
let old_fval = old_parsed_val.raw;
if old_fval != SENTINEL_VALUE
&& old_fval != EMPTY_VALUE
&& old_fval != TOMBSTONE_VALUE
{
if let Some(new_val) = func(old_fval) {
let val = chunk.attachment.get(old_index).1;
match self.modify_entry(
new_chunk,
hash,
key,
fkey,
ModOp::AttemptInsert(new_val, &val),
guard,
) {
ModResult::Done(_, _, new_index)
| ModResult::Replaced(_, _, new_index) => {
let old_addr = chunk.base + old_index * ENTRY_SIZE;
if self.cas_sentinel(old_addr, old_fval) {
return SwapResult::Succeed(
old_fval & VAL_BIT_MASK,
new_index,
new_chunk_ptr,
);
} else {
let new_addr = new_chunk.base + new_index * ENTRY_SIZE;
self.cas_tombstone(new_addr, new_val);
continue;
}
}
_ => {}
}
}
}
}
let modify_chunk_ptr = if copying { new_chunk_ptr } else { chunk_ptr };
let modify_chunk = unsafe { modify_chunk_ptr.deref() };
trace!("Swaping for key {}, copying {}", fkey, copying);
let mod_res = self.modify_entry(
modify_chunk,
hash,
key,
fkey,
ModOp::SwapFastVal(Box::new(func)),
guard,
);
if copying && self.now_epoch() == epoch {
assert_ne!(chunk_ptr, new_chunk_ptr);
assert_ne!(new_chunk_ptr, Shared::null());
self.modify_entry(chunk, hash, key, fkey, ModOp::Sentinel, &guard);
}
return match mod_res {
ModResult::Replaced(v, _, idx) => {
SwapResult::Succeed(v & VAL_BIT_MASK, idx, modify_chunk_ptr)
}
ModResult::Aborted => SwapResult::Aborted,
ModResult::Fail => SwapResult::Failed,
ModResult::NotFound => SwapResult::NotFound,
ModResult::Sentinel => {
backoff.spin();
continue;
}
ModResult::Existed(_, _) => unreachable!("Swap have existed result"),
ModResult::Done(_, _, _) => unreachable!("Swap Done"),
ModResult::TableFull => unreachable!("Swap table full"),
};
}
}
#[inline(always)]
fn now_epoch(&self) -> usize {
self.epoch.load(Acquire)
}
pub fn remove(&self, key: &K, fkey: usize) -> Option<(usize, V)> {
let guard = crossbeam_epoch::pin();
let backoff = crossbeam_utils::Backoff::new();
let hash = hash::<H>(fkey);
loop {
let epoch = self.now_epoch();
let new_chunk_ptr = self.new_chunk.load(Acquire, &guard);
let old_chunk_ptr = self.chunk.load(Acquire, &guard);
let copying = Self::is_copying(epoch);
if copying && (new_chunk_ptr.is_null() || new_chunk_ptr == old_chunk_ptr) {
continue;
}
let new_chunk = unsafe { new_chunk_ptr.deref() };
let old_chunk = unsafe { old_chunk_ptr.deref() };
let mut retr = None;
if copying {
trace!("Put sentinel in old chunk for removal");
assert_ne!(new_chunk_ptr, Shared::null());
let remove_from_old =
self.modify_entry(&*old_chunk, hash, key, fkey, ModOp::Sentinel, &guard);
match remove_from_old {
ModResult::Done(fvalue, Some(value), _)
| ModResult::Replaced(fvalue, value, _) => {
trace!("Sentinal placed");
retr = Some((fvalue, value));
}
ModResult::Done(_, None, _) => {}
_ => {
trace!("Sentinal not placed");
}
}
}
let modify_chunk = if copying { &new_chunk } else { &old_chunk };
let res = self.modify_entry(&*modify_chunk, hash, key, fkey, ModOp::Tombstone, &guard);
match res {
ModResult::Replaced(fvalue, value, _) => {
retr = Some((fvalue, value));
self.count.fetch_sub(1, AcqRel);
}
ModResult::Done(_, _, _) => unreachable!("Remove shall not have done"),
ModResult::NotFound => {}
ModResult::Sentinel => {
backoff.spin();
continue;
}
ModResult::TableFull => unreachable!("TableFull on remove is not possible"),
_ => {}
};
let epoch_changed = self.epoch_changed(epoch);
if epoch_changed {
if retr.is_none() {
return self.remove(key, fkey);
}
}
return retr;
}
}
pub fn len(&self) -> usize {
self.count.load(Acquire)
}
fn get_from_chunk(
&self,
chunk: &Chunk<K, V, A, ALLOC>,
hash: usize,
key: &K,
fkey: usize,
) -> (Value, usize) {
assert_ne!(chunk as *const Chunk<K, V, A, ALLOC> as usize, 0);
let mut idx = hash;
let cap = chunk.capacity;
let base = chunk.base;
let cap_mask = chunk.cap_mask();
let mut counter = 0;
while counter < cap {
idx &= cap_mask;
let addr = base + idx * ENTRY_SIZE;
let k = self.get_fast_key(addr);
if k == fkey && chunk.attachment.probe(idx, key) {
let val_res = self.get_fast_value(addr);
match val_res.parsed {
ParsedValue::Empty => {}
_ => return (val_res, idx),
}
}
if k == EMPTY_KEY {
return (Value::new::<K, V, A, ALLOC, H>(0), 0);
}
idx += 1;
counter += 1;
}
return (Value::new::<K, V, A, ALLOC, H>(0), 0);
}
#[inline(always)]
fn modify_entry<'a>(
&self,
chunk: &'a Chunk<K, V, A, ALLOC>,
hash: usize,
key: &K,
fkey: usize,
op: ModOp<V>,
_guard: &'a Guard,
) -> ModResult<V> {
let cap = chunk.capacity;
let base = chunk.base;
let mut idx = hash;
let mut count = 0;
let cap_mask = chunk.cap_mask();
let backoff = crossbeam_utils::Backoff::new();
while count <= cap {
idx &= cap_mask;
let addr = base + idx * ENTRY_SIZE;
let k = self.get_fast_key(addr);
let v = self.get_fast_value(addr);
{
match v.parsed {
ParsedValue::Sentinel => match &op {
&ModOp::Sentinel => {
}
_ => {
return ModResult::Sentinel;
}
},
_ => {}
}
}
if k == fkey && chunk.attachment.probe(idx, &key) {
let val = v;
match &val.parsed {
ParsedValue::Val(v) => {
match &op {
&ModOp::Sentinel => {
if self.cas_sentinel(addr, val.raw) {
let (_, value) = chunk.attachment.get(idx);
chunk.attachment.erase(idx);
if *v == 0 {
return ModResult::Done(addr, None, idx);
} else {
return ModResult::Done(*v, Some(value), idx);
}
} else {
return ModResult::Fail;
}
}
&ModOp::Tombstone => {
if *v == 0 {
return ModResult::NotFound;
}
if !self.cas_tombstone(addr, val.raw).1 {
return ModResult::Fail;
} else {
let (_, value) = chunk.attachment.get(idx);
chunk.attachment.erase(idx);
chunk.empty_entries.fetch_add(1, Relaxed);
return ModResult::Replaced(*v, value, idx);
}
}
&ModOp::UpsertFastVal(ref fv) => {
if self.cas_value(addr, val.raw, *fv).1 {
let (_, value) = chunk.attachment.get(idx);
if *v == 0 {
return ModResult::Done(addr, None, idx);
} else {
return ModResult::Replaced(*v, value, idx);
}
} else {
trace!("Cannot upsert fast value in place for {}", fkey);
return ModResult::Fail;
}
}
&ModOp::AttemptInsert(fval, oval) => {
if *v == 0 {
let primed_fval = fval | INV_VAL_BIT_MASK;
let (act_val, replaced) =
self.cas_value(addr, val.raw, primed_fval);
if replaced {
let (_, prev_val) = chunk.attachment.get(idx);
chunk.attachment.set(idx, key.clone(), (*oval).clone());
let stripped_prime =
self.cas_value(addr, primed_fval, fval).1;
debug_assert!(stripped_prime);
return ModResult::Replaced(val.raw, prev_val, idx);
} else {
let (_, value) = chunk.attachment.get(idx);
return ModResult::Existed(act_val, value);
}
} else {
trace!(
"Attempting insert existed entry {}, {}, have key {:?}, skip",
k,
fval,
v
);
let (_, value) = chunk.attachment.get(idx);
return ModResult::Existed(*v, value);
}
}
&ModOp::SwapFastVal(ref swap) => {
trace!(
"Swaping found key {} have original value {:#064b}",
fkey,
val.raw
);
match &val.parsed {
ParsedValue::Val(pval) => {
let pval = *pval;
if pval == 0 {
return ModResult::NotFound;
}
let aval = chunk.attachment.get(idx).1;
if let Some(v) = swap(pval) {
if self.cas_value(addr, pval, v).1 {
return ModResult::Replaced(val.raw, aval, idx);
} else {
return ModResult::Fail;
}
} else {
return ModResult::Aborted;
}
}
_ => {
return ModResult::Fail;
}
}
}
&ModOp::Insert(fval, ref v) => {
debug!("Inserting in place for {}", fkey);
let primed_fval = fval | INV_VAL_BIT_MASK;
if self.cas_value(addr, val.raw, primed_fval).1 {
let (_, prev_val) = chunk.attachment.get(idx);
chunk.attachment.set(idx, key.clone(), (*v).clone());
let stripped_prime = self.cas_value(addr, primed_fval, fval).1;
debug_assert!(stripped_prime);
return ModResult::Replaced(val.raw, prev_val, idx);
} else {
trace!("Cannot insert in place for {}", fkey);
return ModResult::Fail;
}
}
}
}
ParsedValue::Empty => {
}
ParsedValue::Sentinel => return ModResult::Sentinel,
ParsedValue::Prime(v) => {
trace!(
"Discovered prime for key {} with value {:#064b}, retry",
fkey,
v
);
backoff.spin();
continue;
}
}
} else if k == EMPTY_KEY {
match op {
ModOp::Insert(fval, val) | ModOp::AttemptInsert(fval, val) => {
trace!(
"Inserting entry key: {}, value: {}, raw: {:b}, addr: {}",
fkey,
fval & VAL_BIT_MASK,
fval,
addr
);
if self.cas_value(addr, EMPTY_VALUE, fval).1 {
chunk.attachment.set(idx, key.clone(), (*val).clone());
unsafe { intrinsics::atomic_store_rel(addr as *mut usize, fkey) }
return ModResult::Done(addr, None, idx);
} else {
backoff.spin();
continue;
}
}
ModOp::UpsertFastVal(fval) => {
trace!(
"Upserting entry key: {}, value: {}, raw: {:b}, addr: {}",
fkey,
fval & VAL_BIT_MASK,
fval,
addr
);
if self.cas_value(addr, EMPTY_VALUE, fval).1 {
unsafe { intrinsics::atomic_store_rel(addr as *mut usize, fkey) }
return ModResult::Done(addr, None, idx);
} else {
backoff.spin();
continue;
}
}
ModOp::Sentinel => {
if self.cas_sentinel(addr, 0) {
unsafe { intrinsics::atomic_store_rel(addr as *mut usize, fkey) }
return ModResult::Done(addr, None, idx);
} else {
backoff.spin();
continue;
}
}
ModOp::Tombstone => return ModResult::Fail,
ModOp::SwapFastVal(_) => return ModResult::NotFound,
};
}
idx += 1;
count += 1;
}
match op {
ModOp::Insert(_fv, _v) | ModOp::AttemptInsert(_fv, _v) => ModResult::TableFull,
ModOp::UpsertFastVal(_fv) => ModResult::TableFull,
_ => ModResult::NotFound,
}
}
fn all_from_chunk(&self, chunk: &Chunk<K, V, A, ALLOC>) -> Vec<(usize, usize, K, V)> {
let mut idx = 0;
let cap = chunk.capacity;
let base = chunk.base;
let mut counter = 0;
let mut res = Vec::with_capacity(chunk.occupation.load(Relaxed));
let cap_mask = chunk.cap_mask();
while counter < cap {
idx &= cap_mask;
let addr = base + idx * ENTRY_SIZE;
let k = self.get_fast_key(addr);
if k != EMPTY_KEY {
let val_res = self.get_fast_value(addr);
match val_res.parsed {
ParsedValue::Val(0) => {}
ParsedValue::Val(v) | ParsedValue::Prime(v) => {
let (key, value) = chunk.attachment.get(idx);
res.push((k, v, key, value))
}
_ => {}
}
}
idx += 1;
counter += 1;
}
return res;
}
fn entries(&self) -> Vec<(usize, usize, K, V)> {
let guard = crossbeam_epoch::pin();
let old_chunk_ref = self.chunk.load(Acquire, &guard);
let new_chunk_ref = self.new_chunk.load(Acquire, &guard);
let old_chunk = unsafe { old_chunk_ref.deref() };
let new_chunk = unsafe { new_chunk_ref.deref() };
let mut res = self.all_from_chunk(&*old_chunk);
if !new_chunk_ref.is_null() && old_chunk_ref != new_chunk_ref {
res.append(&mut self.all_from_chunk(&*new_chunk));
}
return res;
}
#[inline(always)]
fn get_fast_key(&self, entry_addr: usize) -> usize {
debug_assert!(entry_addr > 0);
unsafe { intrinsics::atomic_load_acq(entry_addr as *mut usize) }
}
#[inline(always)]
fn get_fast_value(&self, entry_addr: usize) -> Value {
debug_assert!(entry_addr > 0);
let addr = entry_addr + mem::size_of::<usize>();
let val = unsafe { intrinsics::atomic_load_acq(addr as *mut usize) };
Value::new::<K, V, A, ALLOC, H>(val)
}
#[inline(always)]
fn cas_tombstone(&self, entry_addr: usize, original: usize) -> (usize, bool) {
debug_assert!(entry_addr > 0);
self.cas_value(entry_addr, original, TOMBSTONE_VALUE)
}
#[inline(always)]
fn cas_value(&self, entry_addr: usize, original: usize, value: usize) -> (usize, bool) {
debug_assert!(entry_addr > 0);
debug_assert_ne!(value & VAL_BIT_MASK, SENTINEL_VALUE);
let addr = entry_addr + mem::size_of::<usize>();
unsafe { intrinsics::atomic_cxchg_acqrel(addr as *mut usize, original, value) }
}
#[inline(always)]
fn cas_sentinel(&self, entry_addr: usize, original: usize) -> bool {
assert!(entry_addr > 0);
if cfg!(debug_assert) {
let guard = crossbeam_epoch::pin();
assert!(Self::is_copying(self.epoch.load(Acquire)));
assert!(!self.new_chunk.load(Acquire, &guard).is_null());
let chunk = self.chunk.load(Acquire, &guard);
let chunk_ref = unsafe { chunk.deref() };
assert!(entry_addr >= chunk_ref.base);
assert!(entry_addr < chunk_ref.base + chunk_ref.total_size);
}
let addr = entry_addr + mem::size_of::<usize>();
let (val, done) = unsafe {
intrinsics::atomic_cxchg_acqrel(addr as *mut usize, original, SENTINEL_VALUE)
};
done || val == SENTINEL_VALUE
}
fn check_migration<'a>(
&self,
old_chunk_ptr: Shared<'a, ChunkPtr<K, V, A, ALLOC>>,
guard: &crossbeam_epoch::Guard,
) -> ResizeResult {
let old_chunk_ins = unsafe { old_chunk_ptr.deref() };
let occupation = old_chunk_ins.occupation.load(Relaxed);
let occu_limit = old_chunk_ins.occu_limit;
if occupation <= occu_limit {
return ResizeResult::NoNeed;
}
self.do_migration(old_chunk_ptr, guard)
}
fn do_migration<'a>(
&self,
old_chunk_ptr: Shared<'a, ChunkPtr<K, V, A, ALLOC>>,
guard: &crossbeam_epoch::Guard,
) -> ResizeResult {
let epoch = self.now_epoch();
let old_chunk_ins = unsafe { old_chunk_ptr.deref() };
let empty_entries = old_chunk_ins.empty_entries.load(Relaxed);
let old_cap = old_chunk_ins.capacity;
let new_cap = if empty_entries > (old_cap >> 1) {
old_cap
} else {
let mut cap = old_cap << 1;
if cap < 2048 {
cap <<= 1;
}
if epoch < 5 {
cap <<= 1;
}
if timestamp() - self.timestamp.load(Acquire) < 1000 {
cap <<= 1;
}
cap
};
debug!(
"New size for {:?} is {}, was {}",
old_chunk_ptr, new_cap, old_cap
);
if let Err(_) = self
.new_chunk
.compare_and_set(Shared::null(), old_chunk_ptr, AcqRel, guard)
{
trace!("Cannot obtain lock for resize, will retry");
return ResizeResult::SwapFailed;
}
dfence();
if self.chunk.load(Acquire, guard) != old_chunk_ptr {
warn!(
"Give up on resize due to old chunk changed after lock obtained, epoch {} to {}",
epoch,
self.now_epoch()
);
self.new_chunk.store(Shared::null(), Release);
dfence();
debug_assert_eq!(self.now_epoch() % 2, 0);
return ResizeResult::ChunkChanged;
}
debug!("Resizing {:?}", old_chunk_ptr);
let new_chunk_ptr =
Owned::new(ChunkPtr::new(Chunk::alloc_chunk(new_cap))).into_shared(guard);
let new_chunk_ins = unsafe { new_chunk_ptr.deref() };
assert_ne!(new_chunk_ptr, old_chunk_ptr);
self.new_chunk.store(new_chunk_ptr, Release);
dfence();
let prev_epoch = self.epoch.fetch_add(1, AcqRel);
debug_assert_eq!(prev_epoch % 2, 0);
dfence();
self.migrate_entries(old_chunk_ins, new_chunk_ins, guard);
debug_assert_ne!(old_chunk_ins.ptr as usize, new_chunk_ins.base);
debug_assert_ne!(old_chunk_ins.ptr, unsafe { new_chunk_ptr.deref().ptr });
debug_assert!(!new_chunk_ptr.is_null());
dfence();
let prev_epoch = self.epoch.fetch_add(1, AcqRel);
debug_assert_eq!(prev_epoch % 2, 1);
dfence();
self.chunk.store(new_chunk_ptr, Release);
self.timestamp.store(timestamp(), Release);
dfence();
unsafe {
guard.defer_destroy(old_chunk_ptr);
}
self.new_chunk.store(Shared::null(), Release);
debug!(
"Migration for {:?} completed, new chunk is {:?}, size from {} to {}",
old_chunk_ptr, new_chunk_ptr, old_cap, new_cap
);
ResizeResult::Done
}
fn migrate_entries(
&self,
old_chunk_ins: &Chunk<K, V, A, ALLOC>,
new_chunk_ins: &Chunk<K, V, A, ALLOC>,
_guard: &crossbeam_epoch::Guard,
) -> usize {
let mut old_address = old_chunk_ins.base as usize;
let boundary = old_address + chunk_size_of(old_chunk_ins.capacity);
let mut effective_copy = 0;
let mut idx = 0;
let backoff = crossbeam_utils::Backoff::new();
while old_address < boundary {
let fvalue = self.get_fast_value(old_address);
let fkey = self.get_fast_key(old_address);
match &fvalue.parsed {
ParsedValue::Empty | ParsedValue::Val(0) => {
if !self.cas_sentinel(old_address, fvalue.raw) {
warn!("Filling empty with sentinel for old table should succeed but not, retry");
backoff.spin();
continue;
}
}
ParsedValue::Val(v) => {
if fkey == EMPTY_KEY {
backoff.spin();
continue;
}
trace!("Moving key: {}, value: {}", fkey, v);
assert_ne!(fvalue.raw & VAL_BIT_MASK, SENTINEL_VALUE);
let primed_fval = fvalue.raw | INV_VAL_BIT_MASK;
let (key, value) = old_chunk_ins.attachment.get(idx);
let inserted_addr = {
let cap = new_chunk_ins.capacity;
let base = new_chunk_ins.base;
let mut idx = hash::<H>(fkey);
let cap_mask = new_chunk_ins.cap_mask();
let mut count = 0;
let mut res = None;
while count < cap {
idx &= cap_mask;
let addr = base + idx * ENTRY_SIZE;
let k = self.get_fast_key(addr);
if k == fkey && new_chunk_ins.attachment.probe(idx, &key) {
break;
} else if k == EMPTY_KEY {
let (val, done) = self.cas_value(addr, EMPTY_VALUE, fvalue.raw);
debug_assert_ne!(val & VAL_BIT_MASK, SENTINEL_VALUE);
if done {
new_chunk_ins.attachment.set(idx, key, value);
unsafe {
intrinsics::atomic_store_rel(addr as *mut usize, fkey)
}
res = Some(addr);
break;
}
}
idx += 1;
count += 1;
}
res
};
dfence();
trace!("Copied key {} to new chunk", fkey);
if self.cas_sentinel(old_address, fvalue.raw) {
dfence();
if let Some(new_entry_addr) = inserted_addr {
let stripped = primed_fval & VAL_BIT_MASK;
debug_assert_ne!(stripped, SENTINEL_VALUE);
if self.cas_value(new_entry_addr, primed_fval, stripped).1 {
trace!(
"Effective copy key: {}, value {}, addr: {}",
fkey,
stripped,
new_entry_addr
);
effective_copy += 1;
} else {
trace!("Value changed before strip prime for key {}", fkey);
}
old_chunk_ins.attachment.erase(idx);
}
} else {
panic!("Sentinel CAS should always succeed but failed {}", fkey);
}
}
ParsedValue::Prime(_) => {
unreachable!("Shall not have prime in old table");
}
ParsedValue::Sentinel => {
trace!("Skip copy sentinel");
}
}
old_address += ENTRY_SIZE;
idx += 1;
dfence();
}
debug!("Migrated {} entries to new chunk", effective_copy);
new_chunk_ins.occupation.fetch_add(effective_copy, Relaxed);
return effective_copy;
}
pub fn map_is_copying(&self) -> bool {
Self::is_copying(self.now_epoch())
}
}
impl Value {
pub fn new<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default, H: Hasher + Default>(
val: usize,
) -> Self {
let res = {
if val == EMPTY_VALUE {
ParsedValue::Empty
} else if val == TOMBSTONE_VALUE {
ParsedValue::Val(0)
} else {
let actual_val = val & VAL_BIT_MASK;
let flag = val & INV_VAL_BIT_MASK;
if flag != 0 {
ParsedValue::Prime(actual_val)
} else if actual_val == SENTINEL_VALUE {
ParsedValue::Sentinel
} else if actual_val == TOMBSTONE_VALUE {
unreachable!("");
} else {
ParsedValue::Val(actual_val)
}
}
};
Value {
raw: val,
parsed: res,
}
}
}
impl<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> Chunk<K, V, A, ALLOC> {
fn alloc_chunk(capacity: usize) -> *mut Self {
let capacity = capacity;
let self_size = mem::size_of::<Self>();
let self_align = align_padding(self_size, 64);
let self_size_aligned = self_size + self_align;
let chunk_size = chunk_size_of(capacity);
let attachment_heap = A::heap_size_of(capacity);
let total_size = self_size_aligned + chunk_size + attachment_heap;
let ptr = alloc_mem::<ALLOC>(total_size) as *mut Self;
let addr = ptr as usize;
let data_base = addr + self_size_aligned;
let attachment_base = data_base + chunk_size;
unsafe {
ptr::write(
ptr,
Self {
base: data_base,
capacity,
occupation: AtomicUsize::new(0),
empty_entries: AtomicUsize::new(0),
occu_limit: occupation_limit(capacity),
total_size,
attachment: A::new(capacity, attachment_base, attachment_heap),
shadow: PhantomData,
},
)
};
ptr
}
unsafe fn gc(ptr: *mut Chunk<K, V, A, ALLOC>) {
debug_assert_ne!(ptr as usize, 0);
let chunk = &*ptr;
chunk.attachment.dealloc();
dealloc_mem::<ALLOC>(ptr as usize, chunk.total_size);
}
#[inline]
fn cap_mask(&self) -> usize {
self.capacity - 1
}
}
impl<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Clone
for Table<K, V, A, ALLOC, H>
{
fn clone(&self) -> Self {
let new_table = Table {
chunk: Default::default(),
new_chunk: Default::default(),
count: AtomicUsize::new(0),
epoch: AtomicUsize::new(0),
timestamp: AtomicU64::new(timestamp()),
mark: PhantomData,
};
let guard = crossbeam_epoch::pin();
let old_chunk_ptr = self.chunk.load(Acquire, &guard);
let new_chunk_ptr = self.new_chunk.load(Acquire, &guard);
unsafe {
let old_chunk = old_chunk_ptr.deref();
let old_total_size = old_chunk.total_size;
let cloned_old_ptr = alloc_mem::<ALLOC>(old_total_size) as *mut Chunk<K, V, A, ALLOC>;
debug_assert_ne!(cloned_old_ptr as usize, 0);
debug_assert_ne!(old_chunk.ptr as usize, 0);
libc::memcpy(
cloned_old_ptr as *mut c_void,
old_chunk.ptr as *const c_void,
old_total_size,
);
let cloned_old_ref = Owned::new(ChunkPtr::new(cloned_old_ptr));
new_table.chunk.store(cloned_old_ref, Release);
if new_chunk_ptr != Shared::null() {
let new_chunk = new_chunk_ptr.deref();
let new_total_size = new_chunk.total_size;
let cloned_new_ptr =
alloc_mem::<ALLOC>(new_total_size) as *mut Chunk<K, V, A, ALLOC>;
libc::memcpy(
cloned_new_ptr as *mut c_void,
new_chunk.ptr as *const c_void,
new_total_size,
);
let cloned_new_ref = Owned::new(ChunkPtr::new(cloned_new_ptr));
new_table.new_chunk.store(cloned_new_ref, Release);
} else {
new_table.new_chunk.store(Shared::null(), Release);
}
}
new_table.count.store(self.count.load(Acquire), Release);
new_table
}
}
impl<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Drop
for Table<K, V, A, ALLOC, H>
{
fn drop(&mut self) {
let guard = crossbeam_epoch::pin();
unsafe {
guard.defer_destroy(self.chunk.load(Acquire, &guard));
let new_chunk_ptr = self.new_chunk.load(Acquire, &guard);
if new_chunk_ptr != Shared::null() {
guard.defer_destroy(new_chunk_ptr);
}
}
}
}
unsafe impl<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> Send
for ChunkPtr<K, V, A, ALLOC>
{
}
unsafe impl<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> Sync
for ChunkPtr<K, V, A, ALLOC>
{
}
impl<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> Drop for ChunkPtr<K, V, A, ALLOC> {
fn drop(&mut self) {
debug_assert_ne!(self.ptr as usize, 0);
unsafe {
Chunk::gc(self.ptr);
}
}
}
impl<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> Deref for ChunkPtr<K, V, A, ALLOC> {
type Target = Chunk<K, V, A, ALLOC>;
fn deref(&self) -> &Self::Target {
debug_assert_ne!(self.ptr as usize, 0);
unsafe { &*self.ptr }
}
}
impl<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> ChunkPtr<K, V, A, ALLOC> {
fn new(ptr: *mut Chunk<K, V, A, ALLOC>) -> Self {
debug_assert_ne!(ptr as usize, 0);
Self { ptr }
}
}
#[inline(always)]
fn is_power_of_2(x: usize) -> bool {
(x != 0) && ((x & (x - 1)) == 0)
}
#[inline(always)]
fn occupation_limit(cap: usize) -> usize {
(cap as f64 * 0.60f64) as usize
}
#[inline(always)]
fn chunk_size_of(cap: usize) -> usize {
cap * ENTRY_SIZE
}
#[inline(always)]
pub fn hash<H: Hasher + Default>(num: usize) -> usize {
let mut hasher = H::default();
hasher.write_usize(num);
hasher.finish() as usize
}
#[inline(always)]
pub fn hash_key<K: Hash, H: Hasher + Default>(key: &K) -> usize {
let mut hasher = H::default();
key.hash(&mut hasher);
hasher.finish() as usize
}
#[inline(always)]
fn dfence() {
compiler_fence(SeqCst);
fence(SeqCst);
}
pub trait Attachment<K, V> {
fn heap_size_of(cap: usize) -> usize;
fn new(cap: usize, heap_ptr: usize, heap_size: usize) -> Self;
fn get(&self, index: usize) -> (K, V);
fn set(&self, index: usize, key: K, value: V);
fn erase(&self, index: usize);
fn dealloc(&self);
fn probe(&self, index: usize, probe_key: &K) -> bool;
}
pub struct WordAttachment;
impl Attachment<(), ()> for WordAttachment {
fn heap_size_of(_cap: usize) -> usize {
0
}
fn new(_cap: usize, _heap_ptr: usize, _heap_size: usize) -> Self {
Self
}
#[inline(always)]
fn get(&self, _index: usize) -> ((), ()) {
((), ())
}
#[inline(always)]
fn set(&self, _index: usize, _key: (), _value: ()) {}
#[inline(always)]
fn erase(&self, _index: usize) {}
#[inline(always)]
fn dealloc(&self) {}
#[inline(always)]
fn probe(&self, _index: usize, _value: &()) -> bool {
true
}
}
pub type WordTable<H, ALLOC> = Table<(), (), WordAttachment, H, ALLOC>;
pub struct WordObjectAttachment<T, A: GlobalAlloc + Default> {
obj_chunk: usize,
obj_size: usize,
shadow: PhantomData<(T, A)>,
}
impl<T: Clone, A: GlobalAlloc + Default> Attachment<(), T> for WordObjectAttachment<T, A> {
fn heap_size_of(cap: usize) -> usize {
let obj_size = mem::size_of::<T>();
cap * obj_size
}
fn new(_cap: usize, heap_ptr: usize, _heap_size: usize) -> Self {
Self {
obj_chunk: heap_ptr,
obj_size: mem::size_of::<T>(),
shadow: PhantomData,
}
}
#[inline(always)]
fn get(&self, index: usize) -> ((), T) {
let addr = self.addr_by_index(index);
let v = unsafe { (*(addr as *mut T)).clone() };
((), v)
}
#[inline(always)]
fn set(&self, index: usize, _key: (), val: T) {
let addr = self.addr_by_index(index);
unsafe { ptr::write(addr as *mut T, val) }
}
#[inline(always)]
fn erase(&self, index: usize) {
drop(self.addr_by_index(index) as *mut T)
}
#[inline(always)]
fn dealloc(&self) {}
fn probe(&self, _index: usize, _value: &()) -> bool {
true
}
}
pub type HashTable<K, V, ALLOC> =
Table<K, V, HashKVAttachment<K, V, ALLOC>, ALLOC, PassthroughHasher>;
pub struct HashKVAttachment<K, V, A: GlobalAlloc + Default> {
obj_chunk: usize,
obj_size: usize,
shadow: PhantomData<(K, V, A)>,
}
impl<K: Clone + Hash + Eq, V: Clone, A: GlobalAlloc + Default> Attachment<K, V>
for HashKVAttachment<K, V, A>
{
fn heap_size_of(cap: usize) -> usize {
let obj_size = mem::size_of::<(K, V)>();
cap * obj_size
}
fn new(_cap: usize, heap_ptr: usize, _heap_size: usize) -> Self {
Self {
obj_chunk: heap_ptr,
obj_size: mem::size_of::<(K, V)>(),
shadow: PhantomData,
}
}
#[inline(always)]
fn get(&self, index: usize) -> (K, V) {
let addr = self.addr_by_index(index);
unsafe { (*(addr as *mut (K, V))).clone() }
}
#[inline(always)]
fn set(&self, index: usize, key: K, val: V) {
let addr = self.addr_by_index(index);
unsafe { ptr::write(addr as *mut (K, V), (key, val)) }
}
#[inline(always)]
fn erase(&self, index: usize) {
drop(self.addr_by_index(index) as *mut (K, V))
}
#[inline(always)]
fn dealloc(&self) {}
fn probe(&self, index: usize, key: &K) -> bool {
let addr = self.addr_by_index(index);
let pos_key = unsafe { &*(addr as *mut K) };
pos_key == key
}
}
pub trait Map<K, V: Clone> {
fn with_capacity(cap: usize) -> Self;
fn get(&self, key: &K) -> Option<V>;
fn insert(&self, key: &K, value: V) -> Option<V>;
fn try_insert(&self, key: &K, value: V) -> Option<V>;
fn remove(&self, key: &K) -> Option<V>;
fn entries(&self) -> Vec<(K, V)>;
fn contains_key(&self, key: &K) -> bool;
fn len(&self) -> usize;
fn get_or_insert<F: Fn() -> V>(&self, key: &K, func: F) -> V {
loop {
if self.contains_key(key) {
if let Some(value) = self.get(key) {
return value;
}
} else {
let value = func();
if let Some(value) = self.try_insert(key, value.clone()) {
return value;
}
return value;
}
}
}
}
const NUM_FIX: usize = 5;
const PLACEHOLDER_VAL: usize = NUM_FIX + 1;
impl<K: Clone + Hash + Eq, V: Clone, A: GlobalAlloc + Default> HashKVAttachment<K, V, A> {
fn addr_by_index(&self, index: usize) -> usize {
self.obj_chunk + index * self.obj_size
}
}
pub struct HashMap<
K: Clone + Hash + Eq,
V: Clone,
ALLOC: GlobalAlloc + Default = System,
H: Hasher + Default = DefaultHasher,
> {
table: HashTable<K, V, ALLOC>,
shadow: PhantomData<H>,
}
impl<K: Clone + Hash + Eq, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default>
HashMap<K, V, ALLOC, H>
{
pub fn insert_with_op(&self, op: InsertOp, key: &K, value: V) -> Option<V> {
let hash = hash_key::<K, H>(&key);
self.table
.insert(op, key, Some(value), hash, PLACEHOLDER_VAL)
.map(|(_, v)| v)
}
pub fn write(&self, key: &K) -> Option<HashMapWriteGuard<K, V, ALLOC, H>> {
HashMapWriteGuard::new(&self.table, key)
}
pub fn read(&self, key: &K) -> Option<HashMapReadGuard<K, V, ALLOC, H>> {
HashMapReadGuard::new(&self.table, key)
}
}
impl<K: Clone + Hash + Eq, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Map<K, V>
for HashMap<K, V, ALLOC, H>
{
fn with_capacity(cap: usize) -> Self {
Self {
table: Table::with_capacity(cap),
shadow: PhantomData,
}
}
#[inline(always)]
fn get(&self, key: &K) -> Option<V> {
let hash = hash_key::<K, H>(key);
self.table.get(key, hash, true).map(|v| v.1.unwrap())
}
#[inline(always)]
fn insert(&self, key: &K, value: V) -> Option<V> {
self.insert_with_op(InsertOp::Insert, key, value)
}
#[inline(always)]
fn try_insert(&self, key: &K, value: V) -> Option<V> {
self.insert_with_op(InsertOp::TryInsert, key, value)
}
#[inline(always)]
fn remove(&self, key: &K) -> Option<V> {
let hash = hash_key::<K, H>(&key);
self.table.remove(key, hash).map(|(_, v)| v)
}
#[inline(always)]
fn entries(&self) -> Vec<(K, V)> {
self.table
.entries()
.into_iter()
.map(|(_, _, k, v)| (k, v))
.collect()
}
#[inline(always)]
fn contains_key(&self, key: &K) -> bool {
let hash = hash_key::<K, H>(&key);
self.table.get(key, hash, false).is_some()
}
#[inline(always)]
fn len(&self) -> usize {
self.table.len()
}
}
impl<T, A: GlobalAlloc + Default> WordObjectAttachment<T, A> {
fn addr_by_index(&self, index: usize) -> usize {
self.obj_chunk + index * self.obj_size
}
}
type ObjectTable<V, ALLOC, H> = Table<(), V, WordObjectAttachment<V, ALLOC>, ALLOC, H>;
#[derive(Clone)]
pub struct ObjectMap<
V: Clone,
ALLOC: GlobalAlloc + Default = System,
H: Hasher + Default = DefaultHasher,
> {
table: ObjectTable<V, ALLOC, H>,
}
impl<V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> ObjectMap<V, ALLOC, H> {
fn insert_with_op(&self, op: InsertOp, key: &usize, value: V) -> Option<V> {
self.table
.insert(op, &(), Some(value), key + NUM_FIX, PLACEHOLDER_VAL)
.map(|(_, v)| v)
}
pub fn read(&self, key: usize) -> Option<ObjectMapReadGuard<V, ALLOC, H>> {
ObjectMapReadGuard::new(&self.table, key)
}
pub fn write(&self, key: usize) -> Option<ObjectMapWriteGuard<V, ALLOC, H>> {
ObjectMapWriteGuard::new(&self.table, key)
}
}
impl<V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Map<usize, V>
for ObjectMap<V, ALLOC, H>
{
fn with_capacity(cap: usize) -> Self {
Self {
table: Table::with_capacity(cap),
}
}
#[inline(always)]
fn get(&self, key: &usize) -> Option<V> {
self.table
.get(&(), key + NUM_FIX, true)
.map(|v| v.1.unwrap())
}
#[inline(always)]
fn insert(&self, key: &usize, value: V) -> Option<V> {
self.insert_with_op(InsertOp::Insert, key, value)
}
#[inline(always)]
fn try_insert(&self, key: &usize, value: V) -> Option<V> {
self.insert_with_op(InsertOp::TryInsert, key, value)
}
#[inline(always)]
fn remove(&self, key: &usize) -> Option<V> {
self.table.remove(&(), key + NUM_FIX).map(|(_, v)| v)
}
#[inline(always)]
fn entries(&self) -> Vec<(usize, V)> {
self.table
.entries()
.into_iter()
.map(|(_, k, _, v)| (k - NUM_FIX, v))
.collect()
}
#[inline(always)]
fn contains_key(&self, key: &usize) -> bool {
self.table.get(&(), key + NUM_FIX, false).is_some()
}
#[inline(always)]
fn len(&self) -> usize {
self.table.len()
}
}
#[derive(Clone)]
pub struct WordMap<ALLOC: GlobalAlloc + Default = System, H: Hasher + Default = DefaultHasher> {
table: WordTable<ALLOC, H>,
}
impl<ALLOC: GlobalAlloc + Default, H: Hasher + Default> WordMap<ALLOC, H> {
fn insert_with_op(&self, op: InsertOp, key: &usize, value: usize) -> Option<usize> {
self.table
.insert(op, &(), None, key + NUM_FIX, value + NUM_FIX)
.map(|(v, _)| v)
}
pub fn get_from_mutex(&self, key: &usize) -> Option<usize> {
self.get(key).map(|v| v & WORD_MUTEX_DATA_BIT_MASK)
}
}
impl<ALLOC: GlobalAlloc + Default, H: Hasher + Default> Map<usize, usize> for WordMap<ALLOC, H> {
fn with_capacity(cap: usize) -> Self {
Self {
table: Table::with_capacity(cap),
}
}
#[inline(always)]
fn get(&self, key: &usize) -> Option<usize> {
self.table
.get(&(), key + NUM_FIX, false)
.map(|v| v.0 - NUM_FIX)
}
#[inline(always)]
fn insert(&self, key: &usize, value: usize) -> Option<usize> {
self.insert_with_op(InsertOp::UpsertFast, key, value)
}
#[inline(always)]
fn try_insert(&self, key: &usize, value: usize) -> Option<usize> {
self.insert_with_op(InsertOp::TryInsert, key, value)
}
#[inline(always)]
fn remove(&self, key: &usize) -> Option<usize> {
self.table
.remove(&(), key + NUM_FIX)
.map(|(v, _)| v - NUM_FIX)
}
fn entries(&self) -> Vec<(usize, usize)> {
self.table
.entries()
.into_iter()
.map(|(k, v, _, _)| (k - NUM_FIX, v - NUM_FIX))
.collect()
}
#[inline(always)]
fn contains_key(&self, key: &usize) -> bool {
self.get(key).is_some()
}
#[inline(always)]
fn len(&self) -> usize {
self.table.len()
}
}
const WORD_MUTEX_DATA_BIT_MASK: usize = !0 << 2 >> 2;
pub struct WordMutexGuard<
'a,
ALLOC: GlobalAlloc + Default = System,
H: Hasher + Default = DefaultHasher,
> {
table: &'a WordTable<ALLOC, H>,
key: usize,
value: usize,
}
impl<'a, ALLOC: GlobalAlloc + Default, H: Hasher + Default> WordMutexGuard<'a, ALLOC, H> {
fn create(table: &'a WordTable<ALLOC, H>, key: usize) -> Option<Self> {
let key = key + NUM_FIX;
let value = 0;
match table.insert(
InsertOp::TryInsert,
&(),
Some(()),
key,
value | MUTEX_BIT_MASK,
) {
None | Some((TOMBSTONE_VALUE, ())) | Some((EMPTY_VALUE, ())) => {
trace!("Created locked key {}", key);
Some(Self { table, key, value })
}
_ => {
trace!("Cannot create locked key {} ", key);
None
}
}
}
fn new(table: &'a WordTable<ALLOC, H>, key: usize) -> Option<Self> {
let key = key + NUM_FIX;
let backoff = crossbeam_utils::Backoff::new();
let guard = crossbeam_epoch::pin();
let value;
loop {
let swap_res = table.swap(
key,
&(),
move |fast_value| {
trace!("The key {} have value {}", key, fast_value);
let locked_val = fast_value | MUTEX_BIT_MASK;
if fast_value == locked_val {
trace!("The key {} have locked, unchanged and try again", key);
None
} else {
trace!(
"The key {} have obtained, with value {}",
key,
fast_value & WORD_MUTEX_DATA_BIT_MASK
);
Some(locked_val)
}
},
&guard,
);
match swap_res {
SwapResult::Succeed(val, _idx, _chunk) => {
trace!("Lock on key {} succeed with value {}", key, val);
value = val & WORD_MUTEX_DATA_BIT_MASK;
break;
}
SwapResult::Failed | SwapResult::Aborted => {
trace!("Lock on key {} failed, retry", key);
backoff.spin();
continue;
}
SwapResult::NotFound => {
trace!("Cannot found key {} to lock", key);
return None;
}
}
}
debug_assert_ne!(value, 0);
let value = value - NUM_FIX;
Some(Self { table, key, value })
}
pub fn remove(self) -> usize {
trace!("Removing {}", self.key);
let res = self.table.remove(&(), self.key).unwrap().0;
mem::forget(self);
res | MUTEX_BIT_MASK
}
}
impl<'a, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Deref for WordMutexGuard<'a, ALLOC, H> {
type Target = usize;
fn deref(&self) -> &Self::Target {
&self.value
}
}
impl<'a, ALLOC: GlobalAlloc + Default, H: Hasher + Default> DerefMut
for WordMutexGuard<'a, ALLOC, H>
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.value
}
}
impl<'a, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Drop for WordMutexGuard<'a, ALLOC, H> {
fn drop(&mut self) {
self.value += NUM_FIX;
trace!(
"Release lock for key {} with value {}",
self.key,
self.value
);
self.table.insert(
InsertOp::UpsertFast,
&(),
None,
self.key,
self.value & WORD_MUTEX_DATA_BIT_MASK,
);
}
}
impl<ALLOC: GlobalAlloc + Default, H: Hasher + Default> WordMap<ALLOC, H> {
pub fn lock(&self, key: usize) -> Option<WordMutexGuard<ALLOC, H>> {
WordMutexGuard::new(&self.table, key)
}
pub fn try_insert_locked(&self, key: usize) -> Option<WordMutexGuard<ALLOC, H>> {
WordMutexGuard::create(&self.table, key)
}
}
pub struct HashMapReadGuard<
'a,
K: Clone + Eq + Hash,
V: Clone,
ALLOC: GlobalAlloc + Default = System,
H: Hasher + Default = DefaultHasher,
> {
table: &'a HashTable<K, V, ALLOC>,
hash: usize,
key: K,
value: V,
_mark: PhantomData<H>,
}
impl<'a, K: Clone + Eq + Hash, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default>
HashMapReadGuard<'a, K, V, ALLOC, H>
{
fn new(table: &'a HashTable<K, V, ALLOC>, key: &K) -> Option<Self> {
let backoff = crossbeam_utils::Backoff::new();
let guard = crossbeam_epoch::pin();
let hash = hash_key::<K, H>(&key);
let value: V;
loop {
let swap_res = table.swap(
hash,
key,
move |fast_value| {
if fast_value != PLACEHOLDER_VAL - 1 {
trace!("Key hash {} is not write locked, will read lock", hash);
Some(fast_value + 1)
} else {
trace!("Key hash {} is write locked, unchanged", hash);
None
}
},
&guard,
);
match swap_res {
SwapResult::Succeed(_, idx, chunk) => {
let chunk_ref = unsafe { chunk.deref() };
let (_, v) = chunk_ref.attachment.get(idx);
value = v;
break;
}
SwapResult::Failed | SwapResult::Aborted => {
trace!("Lock on key hash {} failed, retry", hash);
backoff.spin();
continue;
}
SwapResult::NotFound => {
debug!("Cannot found hash key {} to lock", hash);
return None;
}
}
}
Some(Self {
table,
key: key.clone(),
value,
hash,
_mark: Default::default(),
})
}
}
impl<'a, K: Clone + Eq + Hash, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Deref
for HashMapReadGuard<'a, K, V, ALLOC, H>
{
type Target = V;
fn deref(&self) -> &Self::Target {
&self.value
}
}
impl<'a, K: Clone + Eq + Hash, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Drop
for HashMapReadGuard<'a, K, V, ALLOC, H>
{
fn drop(&mut self) {
trace!("Release read lock for hash key {}", self.hash);
let guard = crossbeam_epoch::pin();
self.table.swap(
self.hash,
&self.key,
|fast_value| {
debug_assert!(fast_value > PLACEHOLDER_VAL);
Some(fast_value - 1)
},
&guard,
);
}
}
pub struct HashMapWriteGuard<
'a,
K: Clone + Eq + Hash,
V: Clone,
ALLOC: GlobalAlloc + Default = System,
H: Hasher + Default = DefaultHasher,
> {
table: &'a HashTable<K, V, ALLOC>,
hash: usize,
key: K,
value: V,
_mark: PhantomData<H>,
}
impl<'a, K: Clone + Eq + Hash, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default>
HashMapWriteGuard<'a, K, V, ALLOC, H>
{
fn new(table: &'a HashTable<K, V, ALLOC>, key: &K) -> Option<Self> {
let backoff = crossbeam_utils::Backoff::new();
let guard = crossbeam_epoch::pin();
let hash = hash_key::<K, H>(&key);
let value: V;
loop {
let swap_res = table.swap(
hash,
key,
move |fast_value| {
if fast_value == PLACEHOLDER_VAL {
trace!("Key hash {} is write lockable, will write lock", hash);
Some(fast_value - 1)
} else {
trace!("Key hash {} is write locked, unchanged", hash);
None
}
},
&guard,
);
match swap_res {
SwapResult::Succeed(_, idx, chunk) => {
let chunk_ref = unsafe { chunk.deref() };
let (_, v) = chunk_ref.attachment.get(idx);
value = v;
break;
}
SwapResult::Failed | SwapResult::Aborted => {
trace!("Lock on key hash {} failed, retry", hash);
backoff.spin();
continue;
}
SwapResult::NotFound => {
debug!("Cannot found hash key {} to lock", hash);
return None;
}
}
}
Some(Self {
table,
key: key.clone(),
value,
hash,
_mark: Default::default(),
})
}
pub fn remove(self) -> V {
let res = self.table.remove(&self.key, self.hash).unwrap().1;
mem::forget(self);
res
}
}
impl<'a, K: Clone + Eq + Hash, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Deref
for HashMapWriteGuard<'a, K, V, ALLOC, H>
{
type Target = V;
fn deref(&self) -> &Self::Target {
&self.value
}
}
impl<'a, K: Clone + Eq + Hash, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> DerefMut
for HashMapWriteGuard<'a, K, V, ALLOC, H>
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.value
}
}
impl<'a, K: Clone + Eq + Hash, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Drop
for HashMapWriteGuard<'a, K, V, ALLOC, H>
{
fn drop(&mut self) {
trace!("Release read lock for hash key {}", self.hash);
let hash = hash_key::<K, H>(&self.key);
self.table.insert(
InsertOp::Insert,
&self.key,
Some(self.value.clone()),
hash,
PLACEHOLDER_VAL,
);
}
}
pub struct ObjectMapReadGuard<
'a,
V: Clone,
ALLOC: GlobalAlloc + Default = System,
H: Hasher + Default = DefaultHasher,
> {
table: &'a ObjectTable<V, ALLOC, H>,
key: usize,
value: V,
_mark: PhantomData<H>,
}
impl<'a, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default>
ObjectMapReadGuard<'a, V, ALLOC, H>
{
fn new(table: &'a ObjectTable<V, ALLOC, H>, key: usize) -> Option<Self> {
let backoff = crossbeam_utils::Backoff::new();
let guard = crossbeam_epoch::pin();
let hash = hash_key::<usize, H>(&key);
let value: V;
loop {
let swap_res = table.swap(
key,
&(),
move |fast_value| {
if fast_value != PLACEHOLDER_VAL - 1 {
trace!("Key {} is not write locked, will read lock", hash);
Some(fast_value + 1)
} else {
trace!("Key {} is write locked, unchanged", hash);
None
}
},
&guard,
);
match swap_res {
SwapResult::Succeed(_, idx, chunk) => {
let chunk_ref = unsafe { chunk.deref() };
let (_, v) = chunk_ref.attachment.get(idx);
value = v;
break;
}
SwapResult::Failed | SwapResult::Aborted => {
trace!("Lock on key {} failed, retry", hash);
backoff.spin();
continue;
}
SwapResult::NotFound => {
debug!("Cannot found hash key {} to lock", hash);
return None;
}
}
}
Some(Self {
table,
key,
value,
_mark: Default::default(),
})
}
}
impl<'a, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Deref
for ObjectMapReadGuard<'a, V, ALLOC, H>
{
type Target = V;
fn deref(&self) -> &Self::Target {
&self.value
}
}
impl<'a, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Drop
for ObjectMapReadGuard<'a, V, ALLOC, H>
{
fn drop(&mut self) {
trace!("Release read lock for hash key {}", self.key);
let guard = crossbeam_epoch::pin();
self.table.swap(
self.key,
&(),
|fast_value| {
debug_assert!(fast_value > PLACEHOLDER_VAL);
Some(fast_value - 1)
},
&guard,
);
}
}
pub struct ObjectMapWriteGuard<
'a,
V: Clone,
ALLOC: GlobalAlloc + Default = System,
H: Hasher + Default = DefaultHasher,
> {
table: &'a ObjectTable<V, ALLOC, H>,
key: usize,
value: V,
_mark: PhantomData<H>,
}
impl<'a, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default>
ObjectMapWriteGuard<'a, V, ALLOC, H>
{
fn new(table: &'a ObjectTable<V, ALLOC, H>, key: usize) -> Option<Self> {
let backoff = crossbeam_utils::Backoff::new();
let guard = crossbeam_epoch::pin();
let value: V;
let key = key + NUM_FIX;
loop {
let swap_res = table.swap(
key,
&(),
move |fast_value| {
if fast_value == PLACEHOLDER_VAL {
trace!("Key {} is write lockable, will write lock", key);
Some(fast_value - 1)
} else {
trace!("Key {} is write locked, unchanged", key);
None
}
},
&guard,
);
match swap_res {
SwapResult::Succeed(_, idx, chunk) => {
let chunk_ref = unsafe { chunk.deref() };
let (_, v) = chunk_ref.attachment.get(idx);
value = v;
break;
}
SwapResult::Failed | SwapResult::Aborted => {
trace!("Lock on key {} failed, retry", key);
backoff.spin();
continue;
}
SwapResult::NotFound => {
debug!("Cannot found key {} to lock", key);
return None;
}
}
}
Some(Self {
table,
key,
value,
_mark: Default::default(),
})
}
pub fn remove(self) -> V {
let res = self.table.remove(&(), self.key).unwrap().1;
mem::forget(self);
res
}
}
impl<'a, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Deref
for ObjectMapWriteGuard<'a, V, ALLOC, H>
{
type Target = V;
fn deref(&self) -> &Self::Target {
&self.value
}
}
impl<'a, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> DerefMut
for ObjectMapWriteGuard<'a, V, ALLOC, H>
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.value
}
}
impl<'a, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Drop
for ObjectMapWriteGuard<'a, V, ALLOC, H>
{
fn drop(&mut self) {
trace!("Release read lock for key {}", self.key);
self.table.insert(
InsertOp::Insert,
&(),
Some(self.value.clone()),
self.key,
PLACEHOLDER_VAL,
);
}
}
pub struct HashSet<
T: Clone + Hash + Eq,
ALLOC: GlobalAlloc + Default = System,
H: Hasher + Default = DefaultHasher,
> {
table: HashTable<T, (), ALLOC>,
shadow: PhantomData<H>,
}
impl<T: Clone + Hash + Eq, ALLOC: GlobalAlloc + Default, H: Hasher + Default> HashSet<T, ALLOC, H> {
pub fn with_capacity(cap: usize) -> Self {
Self {
table: Table::with_capacity(cap),
shadow: PhantomData,
}
}
pub fn contains(&self, item: &T) -> bool {
let hash = hash_key::<T, H>(item);
self.table.get(item, hash, false).is_some()
}
pub fn insert(&self, item: &T) -> bool {
let hash = hash_key::<T, H>(item);
self.table
.insert(InsertOp::TryInsert, item, None, hash, !0)
.is_none()
}
pub fn remove(&self, item: &T) -> bool {
let hash = hash_key::<T, H>(item);
self.table.remove(item, hash).is_some()
}
pub fn items(&self) -> std::collections::HashSet<T> {
self.table
.entries()
.into_iter()
.map(|(_, _, item, _)| item)
.collect()
}
#[inline(always)]
pub fn len(&self) -> usize {
self.table.len()
}
}
#[inline(always)]
fn alloc_mem<A: GlobalAlloc + Default>(size: usize) -> usize {
let align = 64;
let layout = Layout::from_size_align(size, align).unwrap();
let alloc = A::default();
unsafe {
let addr = alloc.alloc(layout) as usize;
ptr::write_bytes(addr as *mut u8, 0, size);
debug_assert_eq!(addr % 64, 0);
addr
}
}
#[inline(always)]
fn dealloc_mem<A: GlobalAlloc + Default + Default>(ptr: usize, size: usize) {
let align = 64;
let layout = Layout::from_size_align(size, align).unwrap();
let alloc = A::default();
unsafe { alloc.dealloc(ptr as *mut u8, layout) }
}
pub struct PassthroughHasher {
num: u64,
}
impl Hasher for PassthroughHasher {
fn finish(&self) -> u64 {
self.num
}
fn write(&mut self, _bytes: &[u8]) {
unimplemented!()
}
fn write_usize(&mut self, i: usize) {
self.num = i as u64
}
}
impl Default for PassthroughHasher {
fn default() -> Self {
Self { num: 0 }
}
}
fn timestamp() -> u64 {
let start = SystemTime::now();
let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap();
since_the_epoch.as_millis() as u64
}
#[cfg(test)]
mod tests {
use crate::map::*;
use alloc::sync::Arc;
use rayon::prelude::*;
use std::alloc::System;
use std::collections::HashMap;
use std::thread;
use test::Bencher;
#[test]
fn will_not_overflow() {
let _ = env_logger::try_init();
let table = WordMap::<System>::with_capacity(16);
for i in 50..60 {
assert_eq!(table.insert(&i, i), None);
}
for i in 50..60 {
assert_eq!(table.get(&i), Some(i));
}
for i in 50..60 {
assert_eq!(table.remove(&i), Some(i));
}
}
#[test]
fn resize() {
let _ = env_logger::try_init();
let map = WordMap::<System>::with_capacity(16);
for i in 5..2048 {
map.insert(&i, i * 2);
}
for i in 5..2048 {
match map.get(&i) {
Some(r) => assert_eq!(r, i * 2),
None => panic!("{}", i),
}
}
}
#[test]
fn parallel_no_resize() {
let _ = env_logger::try_init();
let map = Arc::new(WordMap::<System>::with_capacity(65536));
let mut threads = vec![];
for i in 5..99 {
map.insert(&i, i * 10);
}
for i in 100..900 {
let map = map.clone();
threads.push(thread::spawn(move || {
for j in 5..60 {
map.insert(&(i * 100 + j), i * j);
}
}));
}
for i in 5..9 {
for j in 1..10 {
map.remove(&(i * j));
}
}
for thread in threads {
let _ = thread.join();
}
for i in 100..900 {
for j in 5..60 {
assert_eq!(map.get(&(i * 100 + j)), Some(i * j))
}
}
for i in 5..9 {
for j in 1..10 {
assert!(map.get(&(i * j)).is_none())
}
}
}
#[test]
fn parallel_with_resize() {
let _ = env_logger::try_init();
let num_threads = num_cpus::get();
let test_load = 4096;
let repeat_load = 16;
let map = Arc::new(WordMap::<System>::with_capacity(32));
let mut threads = vec![];
for i in 0..num_threads {
let map = map.clone();
threads.push(thread::spawn(move || {
for j in 5..test_load {
let key = i * 10000000 + j;
let value_prefix = i * j * 100;
for k in 1..repeat_load {
let value = value_prefix + k;
if k != 1 {
assert_eq!(map.get(&key), Some(value - 1));
}
let pre_insert_epoch = map.table.now_epoch();
map.insert(&key, value);
let post_insert_epoch = map.table.now_epoch();
for l in 1..128 {
let pre_fail_get_epoch = map.table.now_epoch();
let left = map.get(&key);
let post_fail_get_epoch = map.table.now_epoch();
let right = Some(value);
if left != right {
for m in 1..256 {
let left = map.get(&key);
let right = Some(value);
if left == right {
panic!(
"Recovered at turn {} for {}, copying {}, epoch {} to {}, now {}, PIE: {} to {}. Migration problem!!!",
m,
key,
map.table.map_is_copying(),
pre_fail_get_epoch,
post_fail_get_epoch,
map.table.now_epoch(),
pre_insert_epoch,
post_insert_epoch
);
}
}
panic!("Unable to recover for {}, round {}, copying {}", key, l , map.table.map_is_copying());
}
}
if j % 7 == 0 {
assert_eq!(
map.remove(&key),
Some(value),
"Remove result, get {:?}, copying {}, round {}",
map.get(&key),
map.table.map_is_copying(),
k
);
assert_eq!(map.get(&key), None, "Remove recursion");
assert!(map.lock(key).is_none(), "Remove recursion with lock");
map.insert(&key, value);
}
if j % 3 == 0 {
let new_value = value + 7;
let pre_insert_epoch = map.table.now_epoch();
map.insert(&key, new_value);
let post_insert_epoch = map.table.now_epoch();
assert_eq!(
map.get(&key),
Some(new_value),
"Checking immediate update, key {}, epoch {} to {}",
key, pre_insert_epoch, post_insert_epoch
);
map.insert(&key, value);
}
}
}
}));
}
info!("Waiting for intensive insertion to finish");
for thread in threads {
let _ = thread.join();
}
info!("Checking final value");
(0..num_threads)
.collect::<Vec<_>>()
.par_iter()
.for_each(|i| {
for j in 5..test_load {
let k = i * 10000000 + j;
let value = i * j * 100 + repeat_load - 1;
let get_res = map.get(&k);
assert_eq!(
get_res,
Some(value),
"New k {}, i {}, j {}, epoch {}",
k,
i,
j,
map.table.now_epoch()
);
}
});
}
#[test]
fn parallel_hybrid() {
let _ = env_logger::try_init();
let map = Arc::new(WordMap::<System>::with_capacity(4));
for i in 5..128 {
map.insert(&i, i * 10);
}
let mut threads = vec![];
for i in 256..265 {
let map = map.clone();
threads.push(thread::spawn(move || {
for j in 5..60 {
map.insert(&(i * 10 + j), 10);
}
}));
}
for i in 5..8 {
let map = map.clone();
threads.push(thread::spawn(move || {
for j in 5..8 {
map.remove(&(i * j));
}
}));
}
for thread in threads {
let _ = thread.join();
}
for i in 256..265 {
for j in 5..60 {
assert_eq!(map.get(&(i * 10 + j)), Some(10))
}
}
}
#[test]
fn parallel_word_map_mutex() {
let _ = env_logger::try_init();
let map = Arc::new(WordMap::<System>::with_capacity(4));
map.insert(&1, 0);
let mut threads = vec![];
let num_threads = 256;
for _ in 0..num_threads {
let map = map.clone();
threads.push(thread::spawn(move || {
let mut guard = map.lock(1).unwrap();
*guard += 1;
}));
}
for thread in threads {
let _ = thread.join();
}
assert_eq!(map.get(&1).unwrap(), num_threads);
}
#[test]
fn parallel_word_map_multi_mutex() {
let _ = env_logger::try_init();
let map = Arc::new(WordMap::<System>::with_capacity(4));
let mut threads = vec![];
let num_threads = num_cpus::get();
let test_load = 4096;
let update_load = 128;
for thread_id in 0..num_threads {
let map = map.clone();
threads.push(thread::spawn(move || {
let target = thread_id;
for i in 0..test_load {
let key = target * 1000000 + i;
{
let mut mutex = map.try_insert_locked(key).unwrap();
*mutex = 1;
}
for j in 1..update_load {
assert!(
map.get(&key).is_some(),
"Pre getting value for mutex, key {}, epoch {}",
key,
map.table.now_epoch()
);
let val = {
let mut mutex = map.lock(key).expect(&format!(
"Locking key {}, copying {}",
key,
map.table.now_epoch()
));
assert_eq!(*mutex, j);
*mutex += 1;
*mutex
};
assert!(
map.get(&key).is_some(),
"Post getting value for mutex, key {}, epoch {}",
key,
map.table.now_epoch()
);
if j % 7 == 0 {
{
let mutex = map.lock(key).expect(&format!(
"Remove locking key {}, copying {}",
key,
map.table.now_epoch()
));
mutex.remove();
}
assert!(map.lock(key).is_none());
*map.try_insert_locked(key).unwrap() = val;
}
}
assert_eq!(*map.lock(key).unwrap(), update_load);
}
}));
}
for thread in threads {
let _ = thread.join();
}
}
#[test]
fn parallel_obj_map_rwlock() {
let _ = env_logger::try_init();
let map_cont = ObjectMap::<Obj, System, DefaultHasher>::with_capacity(4);
let map = Arc::new(map_cont);
map.insert(&1, Obj::new(0));
let mut threads = vec![];
let num_threads = 256;
for i in 0..num_threads {
let map = map.clone();
threads.push(thread::spawn(move || {
let mut guard = map.write(1).unwrap();
let val = guard.get();
guard.set(val + 1);
trace!("Dealt with {}", i);
}));
}
for thread in threads {
let _ = thread.join();
}
map.get(&1).unwrap().validate(num_threads);
}
#[test]
fn parallel_hash_map_rwlock() {
let _ = env_logger::try_init();
let map_cont = super::HashMap::<u32, Obj, System, DefaultHasher>::with_capacity(4);
let map = Arc::new(map_cont);
map.insert(&1, Obj::new(0));
let mut threads = vec![];
let num_threads = 256;
for i in 0..num_threads {
let map = map.clone();
threads.push(thread::spawn(move || {
let mut guard = map.write(&1u32).unwrap();
let val = guard.get();
guard.set(val + 1);
trace!("Dealt with {}", i);
}));
}
for thread in threads {
let _ = thread.join();
}
map.get(&1).unwrap().validate(num_threads);
}
#[derive(Copy, Clone)]
struct Obj {
a: usize,
b: usize,
c: usize,
d: usize,
}
impl Obj {
fn new(num: usize) -> Self {
Obj {
a: num,
b: num + 1,
c: num + 2,
d: num + 3,
}
}
fn validate(&self, num: usize) {
assert_eq!(self.a, num);
assert_eq!(self.b, num + 1);
assert_eq!(self.c, num + 2);
assert_eq!(self.d, num + 3);
}
fn get(&self) -> usize {
self.a
}
fn set(&mut self, num: usize) {
*self = Self::new(num)
}
}
#[test]
fn obj_map() {
let _ = env_logger::try_init();
let map = ObjectMap::<Obj>::with_capacity(16);
for i in 5..2048 {
map.insert(&i, Obj::new(i));
}
for i in 5..2048 {
match map.get(&i) {
Some(r) => r.validate(i),
None => panic!("{}", i),
}
}
}
#[test]
fn parallel_obj_hybrid() {
let _ = env_logger::try_init();
let map = Arc::new(ObjectMap::<Obj>::with_capacity(4));
for i in 5..128 {
map.insert(&i, Obj::new(i * 10));
}
let mut threads = vec![];
for i in 256..265 {
let map = map.clone();
threads.push(thread::spawn(move || {
for j in 5..60 {
map.insert(&(i * 10 + j), Obj::new(10));
}
}));
}
for i in 5..8 {
let map = map.clone();
threads.push(thread::spawn(move || {
for j in 5..8 {
map.remove(&(i * j));
}
}));
}
for thread in threads {
let _ = thread.join();
}
for i in 256..265 {
for j in 5..60 {
match map.get(&(i * 10 + j)) {
Some(r) => r.validate(10),
None => panic!("{}", i),
}
}
}
}
#[test]
fn parallel_hashmap_hybrid() {
let _ = env_logger::try_init();
let map = Arc::new(super::HashMap::<u32, Obj>::with_capacity(4));
for i in 5..128u32 {
map.insert(&i, Obj::new((i * 10) as usize));
}
let mut threads = vec![];
for i in 256..265u32 {
let map = map.clone();
threads.push(thread::spawn(move || {
for j in 5..60u32 {
map.insert(&(i * 10 + j), Obj::new(10usize));
}
}));
}
for i in 5..8 {
let map = map.clone();
threads.push(thread::spawn(move || {
for j in 5..8 {
map.remove(&(i * j));
}
}));
}
for thread in threads {
let _ = thread.join();
}
for i in 256..265 {
for j in 5..60 {
match map.get(&(i * 10 + j)) {
Some(r) => r.validate(10),
None => panic!("{}", i),
}
}
}
}
use std::thread::JoinHandle;
#[test]
fn atomic_ordering() {
let test_load = 102400;
let epoch = Arc::new(AtomicUsize::new(0));
let old_ptr = Arc::new(AtomicUsize::new(0));
let new_ptr = Arc::new(AtomicUsize::new(0));
let write = || -> JoinHandle<()> {
let epoch = epoch.clone();
let old_ptr = old_ptr.clone();
let new_ptr = new_ptr.clone();
thread::spawn(move || {
for _ in 0..test_load {
let old = old_ptr.load(Acquire);
if new_ptr.compare_and_swap(0, old, AcqRel) != 0 {
return;
}
dfence();
if old_ptr.load(Acquire) != old {
new_ptr.store(0, Release);
dfence();
return;
}
let new = old + 1;
new_ptr.store(new, Release);
dfence();
assert_eq!(epoch.fetch_add(1, AcqRel) % 2, 0);
for _ in 0..1000 {
std::sync::atomic::spin_loop_hint();
}
old_ptr.store(new, Release);
dfence();
assert_eq!(epoch.fetch_add(1, AcqRel) % 2, 1);
dfence();
new_ptr.store(0, Release);
}
})
};
let read = || -> JoinHandle<()> {
let epoch = epoch.clone();
let old_ptr = old_ptr.clone();
let new_ptr = new_ptr.clone();
thread::spawn(move || {
for _ in 0..test_load {
let epoch_val = epoch.load(Acquire);
let old = old_ptr.load(Acquire);
let new = new_ptr.load(Acquire);
let changing = epoch_val % 2 == 1;
for _ in 0..500 {
std::sync::atomic::spin_loop_hint();
}
if changing && epoch.load(Acquire) == epoch_val {
assert_ne!(old, new);
assert_ne!(new, 0);
}
}
})
};
let num_writers = 5;
let mut writers = Vec::with_capacity(num_writers);
for _ in 0..num_writers {
writers.push(write());
}
let num_readers = num_cpus::get();
let mut readers = Vec::with_capacity(num_readers);
for _ in 0..num_readers {
readers.push(read());
}
for reader in readers {
reader.join().unwrap();
}
for writer in writers {
writer.join().unwrap();
}
}
#[bench]
fn lfmap(b: &mut Bencher) {
let _ = env_logger::try_init();
let map = WordMap::<System>::with_capacity(1024);
let mut i = 5;
b.iter(|| {
map.insert(&i, i);
i += 1;
});
}
#[bench]
fn hashmap(b: &mut Bencher) {
let _ = env_logger::try_init();
let mut map = HashMap::new();
let mut i = 5;
b.iter(|| {
map.insert(i, i);
i += 1;
});
}
#[bench]
fn default_hasher(b: &mut Bencher) {
let _ = env_logger::try_init();
b.iter(|| {
let mut hasher = DefaultHasher::default();
hasher.write_u64(123);
hasher.finish();
});
}
}