const N_BUCKET_SLOT: usize = 8;
const MAX_CHAIN_LEN: u64 = 16;
use crate::*;
use ::rand::RngExt;
use ahash::RandomState;
use core::marker::PhantomData;
use core::num::NonZeroU32;
mod hash_bucket;
pub(crate) use hash_bucket::*;
#[derive(Debug)]
struct IterState {
bucket_id: usize,
buckets_len: usize,
item_slot: usize,
chain_len: usize,
chain_idx: usize,
finished: bool,
}
impl IterState {
fn new(hashtable: &HashTable, hash: u64) -> Self {
let bucket_id = (hash & hashtable.mask) as usize;
let buckets_len = hashtable.data.len();
let bucket = hashtable.data[bucket_id];
let chain_len = chain_len(bucket.data[0]) as usize;
Self {
bucket_id,
buckets_len,
item_slot: 1,
chain_len,
chain_idx: 0,
finished: false,
}
}
fn n_item_slot(&self) -> usize {
if self.chain_idx == self.chain_len {
N_BUCKET_SLOT
} else {
N_BUCKET_SLOT - 1
}
}
}
struct IterMut<'a> {
ptr: *mut HashBucket,
state: IterState,
_marker: PhantomData<&'a mut u64>,
}
impl<'a> IterMut<'a> {
fn new(hashtable: &'a mut HashTable, hash: u64) -> Self {
let state = IterState::new(hashtable, hash);
let ptr = hashtable.data.as_mut_ptr();
Self {
ptr,
state,
_marker: PhantomData,
}
}
}
impl<'a> Iterator for IterMut<'a> {
type Item = &'a mut u64;
fn next(&mut self) -> Option<<Self as Iterator>::Item> {
if self.state.finished {
return None;
}
let n_item_slot = self.state.n_item_slot();
assert!(
self.state.bucket_id < self.state.buckets_len,
"bucket id not in range"
);
let item_info =
unsafe { &mut (*self.ptr.add(self.state.bucket_id)).data[self.state.item_slot] };
if self.state.item_slot < n_item_slot - 1 {
self.state.item_slot += 1;
} else {
if self.state.chain_idx < self.state.chain_len {
self.state.chain_idx += 1;
self.state.item_slot = 0;
let next_bucket_id = unsafe {
(*self.ptr.add(self.state.bucket_id)).data[N_BUCKET_SLOT - 1] as usize
};
self.state.bucket_id = next_bucket_id;
} else {
self.state.finished = true;
}
}
Some(item_info)
}
}
#[repr(C)]
pub(crate) struct HashTable {
hash_builder: Box<RandomState>,
power: u64,
mask: u64,
data: Box<[HashBucket]>,
started: Instant,
next_to_chain: u64,
_pad: [u8; 8],
}
impl HashTable {
pub fn new(power: u8, overflow_factor: f64) -> HashTable {
if overflow_factor < 0.0 {
panic!("hashtable overflow factor must be >= 0.0");
}
if overflow_factor > MAX_CHAIN_LEN as f64 {
panic!("hashtable overflow factor must be <= {MAX_CHAIN_LEN}");
}
let slots = 1_u64 << power;
let buckets = slots / 8;
let mask = buckets - 1;
let total_buckets = (buckets as f64 * (1.0 + overflow_factor)).ceil() as usize;
let mut data = Vec::with_capacity(0);
data.reserve_exact(total_buckets);
data.resize(total_buckets, HashBucket::new());
debug!(
"hashtable has: {slots} primary slots across {buckets} primary buckets and {total_buckets} total buckets",
);
let hash_builder = RandomState::with_seeds(
0xbb8c484891ec6c86,
0x0522a25ae9c769f9,
0xeed2797b9571bc75,
0x4feb29c1fbbd59d0,
);
Self {
hash_builder: Box::new(hash_builder),
power: power.into(),
mask,
data: data.into_boxed_slice(),
started: Instant::now(),
next_to_chain: buckets,
_pad: [0; 8],
}
}
pub fn get(&mut self, key: &[u8], time: Instant, segments: &mut Segments) -> Option<Item> {
let hash = self.hash(key);
let tag = tag_from_hash(hash);
let bucket_id = hash & self.mask;
let bucket_info = self.data[bucket_id as usize].data[0];
let curr_ts = (time - self.started).as_secs() & PROC_TS_MASK;
if curr_ts != get_ts(bucket_info) as u32 {
self.data[bucket_id as usize].data[0] = (bucket_info & !TS_MASK) | (curr_ts as u64);
let iter = IterMut::new(self, hash);
for item_info in iter {
*item_info &= CLEAR_FREQ_SMOOTH_MASK;
}
}
let iter = IterMut::new(self, hash);
for item_info in iter {
if get_tag(*item_info) == tag {
let current_item = segments.get_item(*item_info).unwrap();
if current_item.key() != key {
#[cfg(feature = "metrics")]
HASH_TAG_COLLISION.increment();
} else {
let mut freq = get_freq(*item_info);
if freq < 127 {
let rand = thread_rng().random::<u64>();
if freq <= 16 || rand % freq == 0 {
freq = ((freq + 1) | 0x80) << FREQ_BIT_SHIFT;
} else {
freq = (freq | 0x80) << FREQ_BIT_SHIFT;
}
*item_info = (*item_info & !FREQ_MASK) | freq;
}
let item = Item::new(
current_item,
get_cas(self.data[(hash & self.mask) as usize].data[0]),
);
item.check_magic();
return Some(item);
}
}
}
None
}
pub fn get_no_freq_incr(&mut self, key: &[u8], segments: &mut Segments) -> Option<Item> {
let hash = self.hash(key);
let iter = IterMut::new(self, hash);
let tag = tag_from_hash(hash);
for item_info in iter {
if get_tag(*item_info) == tag {
let current_item = segments.get_item(*item_info).unwrap();
if current_item.key() != key {
#[cfg(feature = "metrics")]
HASH_TAG_COLLISION.increment();
} else {
let item = Item::new(
current_item,
get_cas(self.data[(hash & self.mask) as usize].data[0]),
);
item.check_magic();
return Some(item);
}
}
}
None
}
pub fn get_freq(&mut self, key: &[u8], segment: &mut Segment, offset: u64) -> Option<u64> {
let hash = self.hash(key);
let tag = tag_from_hash(hash);
let iter = IterMut::new(self, hash);
for item_info in iter {
if get_tag(*item_info) == tag
&& get_seg_id(*item_info) == Some(segment.id())
&& get_offset(*item_info) == offset
{
return Some(get_freq(*item_info) & 0x7F);
}
}
None
}
#[allow(clippy::result_unit_err)]
pub fn relink_item(
&mut self,
key: &[u8],
old_seg: NonZeroU32,
new_seg: NonZeroU32,
old_offset: u64,
new_offset: u64,
) -> Result<(), ()> {
let hash = self.hash(key);
let tag = tag_from_hash(hash);
let iter = IterMut::new(self, hash);
for item_info in iter {
if get_tag(*item_info) == tag {
if get_seg_id(*item_info) == Some(old_seg) && get_offset(*item_info) == old_offset {
*item_info = build_item_info(tag, new_seg, new_offset);
#[cfg(feature = "metrics")]
ITEM_RELINK.increment();
return Ok(());
} else {
#[cfg(feature = "metrics")]
HASH_TAG_COLLISION.increment();
}
}
}
Err(())
}
pub(crate) fn is_item_at(&mut self, key: &[u8], seg: NonZeroU32, offset: u64) -> bool {
let hash = self.hash(key);
let tag = tag_from_hash(hash);
let iter = IterMut::new(self, hash);
for item_info in iter {
if get_tag(*item_info) == tag {
if get_seg_id(*item_info) == Some(seg) && get_offset(*item_info) == offset {
return true;
} else {
#[cfg(feature = "metrics")]
HASH_TAG_COLLISION.increment();
}
}
}
false
}
#[allow(clippy::result_unit_err)]
pub fn insert(
&mut self,
item: RawItem,
seg: NonZeroU32,
offset: u64,
ttl_buckets: &mut TtlBuckets,
segments: &mut Segments,
) -> Result<(), ()> {
#[cfg(feature = "metrics")]
HASH_INSERT.increment();
let hash = self.hash(item.key());
let tag = tag_from_hash(hash);
item.check_magic();
let mut insert_item_info = build_item_info(tag, seg, offset);
let mut removed: Option<u64> = None;
let iter = IterMut::new(self, hash);
for item_info in iter {
if get_tag(*item_info) != tag {
if insert_item_info != 0 && *item_info == 0 {
*item_info = insert_item_info;
insert_item_info = 0;
}
continue;
}
if segments.get_item(*item_info).unwrap().key() != item.key() {
#[cfg(feature = "metrics")]
HASH_TAG_COLLISION.increment();
} else {
removed = Some(*item_info);
*item_info = insert_item_info;
insert_item_info = 0;
break;
}
}
if let Some(removed_item) = removed {
#[cfg(feature = "metrics")]
ITEM_REPLACE.increment();
let _ = segments.remove_item(removed_item, ttl_buckets, self);
}
if insert_item_info != 0 {
let mut bucket_id = (hash & self.mask) as usize;
let chain_len = chain_len(self.data[bucket_id].data[0]);
if chain_len < MAX_CHAIN_LEN && (self.next_to_chain as usize) < self.data.len() {
for _ in 0..chain_len {
bucket_id = self.data[bucket_id].data[N_BUCKET_SLOT - 1] as usize;
}
let next_id = self.next_to_chain as usize;
self.next_to_chain += 1;
self.data[next_id].data[0] = self.data[bucket_id].data[N_BUCKET_SLOT - 1];
self.data[next_id].data[1] = insert_item_info;
insert_item_info = 0;
self.data[bucket_id].data[N_BUCKET_SLOT - 1] = next_id as u64;
self.data[(hash & self.mask) as usize].data[0] += 0x0000_0000_0001_0000;
}
}
if insert_item_info == 0 {
self.data[(hash & self.mask) as usize].data[0] += 1 << CAS_BIT_SHIFT;
Ok(())
} else {
#[cfg(feature = "metrics")]
HASH_INSERT_EX.increment();
Err(())
}
}
pub fn try_update_cas(
&mut self,
key: &[u8],
cas: u32,
segments: &mut Segments,
) -> Result<(), SegcacheError> {
let hash = self.hash(key);
let tag = tag_from_hash(hash);
let bucket_id = hash & self.mask;
let iter = IterMut::new(self, hash);
for item_info in iter {
if get_tag(*item_info) == tag {
let item = segments.get_item(*item_info).unwrap();
if item.key() != key {
#[cfg(feature = "metrics")]
HASH_TAG_COLLISION.increment();
} else {
let mut freq = get_freq(*item_info);
if freq < 127 {
let rand = thread_rng().random::<u64>();
if freq <= 16 || rand % freq == 0 {
freq = ((freq + 1) | 0x80) << FREQ_BIT_SHIFT;
} else {
freq = (freq | 0x80) << FREQ_BIT_SHIFT;
}
*item_info = (*item_info & !FREQ_MASK) | freq;
}
if cas == get_cas(self.data[bucket_id as usize].data[0]) {
self.data[bucket_id as usize].data[0] += 1 << CAS_BIT_SHIFT;
return Ok(());
} else {
return Err(SegcacheError::Exists);
}
}
}
}
Err(SegcacheError::NotFound)
}
pub fn delete(
&mut self,
key: &[u8],
ttl_buckets: &mut TtlBuckets,
segments: &mut Segments,
) -> bool {
let hash = self.hash(key);
let tag = tag_from_hash(hash);
let iter = IterMut::new(self, hash);
let mut removed: Option<u64> = None;
for item_info in iter {
if get_tag(*item_info) == tag {
let item = segments.get_item(*item_info).unwrap();
if item.key() != key {
#[cfg(feature = "metrics")]
HASH_TAG_COLLISION.increment();
continue;
} else {
#[cfg(feature = "metrics")]
HASH_REMOVE.increment();
removed = Some(*item_info);
*item_info = 0;
break;
}
}
}
if let Some(removed_item) = removed {
#[cfg(feature = "metrics")]
ITEM_DELETE.increment();
let _ = segments.remove_item(removed_item, ttl_buckets, self);
true
} else {
false
}
}
pub fn evict(&mut self, key: &[u8], offset: i32, segment: &mut Segment) -> bool {
let result = self.remove_from(key, offset, segment);
if result {
#[cfg(feature = "metrics")]
ITEM_EVICT.increment();
}
result
}
pub fn expire(&mut self, key: &[u8], offset: i32, segment: &mut Segment) -> bool {
let result = self.remove_from(key, offset, segment);
if result {
#[cfg(feature = "metrics")]
ITEM_EXPIRE.increment();
}
result
}
fn remove_from(&mut self, key: &[u8], offset: i32, segment: &mut Segment) -> bool {
let hash = self.hash(key);
let tag = tag_from_hash(hash);
let evict_item_info = build_item_info(tag, segment.id(), offset as u64);
let iter = IterMut::new(self, hash);
for item_info in iter {
let current_item_info = clear_freq(*item_info);
if get_tag(current_item_info) != tag {
continue;
}
if get_seg_id(current_item_info) != Some(segment.id())
|| get_offset(current_item_info) != offset as u64
{
#[cfg(feature = "metrics")]
HASH_TAG_COLLISION.increment();
continue;
}
if evict_item_info == current_item_info {
segment.remove_item(current_item_info);
*item_info = 0;
return true;
}
}
false
}
pub fn hash_builder(&self) -> &RandomState {
&self.hash_builder
}
fn hash(&self, key: &[u8]) -> u64 {
#[cfg(feature = "metrics")]
HASH_LOOKUP.increment();
let mut hasher = self.hash_builder.build_hasher();
hasher.write(key);
hasher.finish()
}
}