use std::{
borrow::Borrow,
hash::{BuildHasher, Hash, Hasher},
};
use crate::{error::ArconResult, index::IndexOps, table::ImmutableTable};
use arcon_state::{
backend::{
handles::{ActiveHandle, Handle},
MapState,
},
data::{Key, Value},
error::*,
};
use std::sync::Arc;
cfg_if::cfg_if! {
if #[cfg(all(
target_feature = "sse2",
any(target_arch = "x86", target_arch = "x86_64"),
not(miri)
))] {
mod sse2; use sse2 as imp;
} else {
panic!("sse2 needed for now");
#[path = "generic.rs"]
mod generic;
use generic as imp;
}
}
mod bitmask;
pub mod eager;
mod table;
#[cfg(test)]
use self::table::TableModIterator;
use self::table::{ProbeModIterator, RawTable};
use crate::backend::Backend;
use std::cell::UnsafeCell;
const DEFAULT_READ_LANE_SIZE: usize = 8192;
const DEFAULT_MOD_LANE_SIZE: usize = 1024;
pub type DefaultHashBuilder = fxhash::FxBuildHasher;
pub struct HashTable<K, V, B>
where
K: Key + Hash,
V: Value,
B: Backend,
{
hash_builder: fxhash::FxBuildHasher,
raw_table: UnsafeCell<RawTable<K, V>>,
handle: ActiveHandle<B, MapState<K, V>>,
}
#[inline]
pub(crate) fn make_hash<K: Hash + ?Sized>(hash_builder: &impl BuildHasher, val: &K) -> u64 {
let mut state = hash_builder.build_hasher();
val.hash(&mut state);
state.finish()
}
impl<K, V, B> HashTable<K, V, B>
where
K: Key + Eq + Hash,
V: Value,
B: Backend,
{
pub fn new(id: impl Into<String>, backend: Arc<B>) -> Self {
let mut handle = Handle::map(id.into());
backend.register_map_handle(&mut handle);
let handle = handle.activate(backend);
HashTable {
hash_builder: DefaultHashBuilder::default(),
raw_table: UnsafeCell::new(RawTable::with_capacity(
DEFAULT_MOD_LANE_SIZE,
DEFAULT_READ_LANE_SIZE,
)),
handle,
}
}
pub fn with_capacity(
id: impl Into<String>,
backend: Arc<B>,
mod_capacity: usize,
read_capacity: usize,
) -> Self {
assert!(mod_capacity.is_power_of_two());
assert!(read_capacity.is_power_of_two());
let mut handle = Handle::map(id.into());
backend.register_map_handle(&mut handle);
let handle = handle.activate(backend);
HashTable {
hash_builder: DefaultHashBuilder::default(),
raw_table: UnsafeCell::new(RawTable::with_capacity(mod_capacity, read_capacity)),
handle,
}
}
#[inline(always)]
fn raw_table(&self) -> &RawTable<K, V> {
unsafe { &*self.raw_table.get() }
}
#[inline(always)]
#[allow(clippy::mut_from_ref)]
fn raw_table_mut(&self) -> &mut RawTable<K, V> {
unsafe { &mut *self.raw_table.get() }
}
#[inline(always)]
fn insert(&self, k: K, v: V, hash: u64) -> Result<()> {
let table = self.raw_table_mut();
if let Some(item) = table.find_mod_lane_mut(hash, |x| k.eq(x.0)) {
*item = v;
} else if let Some((mod_iter, (k, v))) = table.insert_mod_lane(hash, (k, v)) {
self.drain_modified(mod_iter)?;
let _ = table.insert_mod_lane(hash, (k, v));
}
Ok(())
}
#[inline(always)]
fn insert_read_lane(&self, k: K, v: V, hash: u64) {
let table = self.raw_table_mut();
table.insert_read_lane(hash, (k, v));
}
#[inline]
fn backend_get(&self, k: &K) -> Result<Option<V>> {
self.handle.get(k)
}
#[inline]
fn backend_remove(&self, k: &K) -> Result<Option<V>> {
self.handle.remove(k)
}
#[inline]
fn backend_remove_fast(&self, k: &K) -> Result<()> {
self.handle.fast_remove(k)
}
#[inline]
fn table_get<Q: ?Sized>(&self, k: &Q, hash: u64) -> Option<&V>
where
K: Borrow<Q>,
Q: Hash + Eq,
{
let table = self.raw_table();
table.find(hash, |x| k.eq(x.0.borrow())).map(|(_, v)| v)
}
#[inline(always)]
fn table_find_mod_lane<Q: ?Sized>(&mut self, k: &Q, hash: u64) -> Option<&mut V>
where
K: Borrow<Q>,
Q: Hash + Eq,
{
let table = self.raw_table_mut();
table.find_mod_lane_mut(hash, |x| k.eq(x.0.borrow()))
}
#[inline(always)]
fn table_take_read_lane<Q: ?Sized>(&mut self, k: &Q, hash: u64) -> Option<(K, V)>
where
K: Borrow<Q>,
Q: Hash + Eq,
{
let table = self.raw_table_mut();
table.take_read_lane(hash, |x| k.eq(x.0.borrow()))
}
#[inline]
pub fn len(&self) -> usize {
self.raw_table().len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.raw_table().len() == 0
}
#[inline(always)]
pub fn remove(&mut self, k: &K) -> Result<Option<V>> {
let table = self.raw_table_mut();
let hash = make_hash(&self.hash_builder, &k);
match table.remove(hash, |x| k.eq(x.0.borrow())) {
Some(item) => {
self.backend_remove_fast(k)?;
Ok(Some(item.1))
}
None => {
self.backend_remove(k)
}
}
}
#[inline(always)]
pub fn get(&self, key: &K) -> Result<Option<&V>> {
let hash = make_hash(&self.hash_builder, key);
let entry = self.table_get(key, hash);
if entry.is_some() {
return Ok(entry);
}
match self.backend_get(key)? {
Some(v) => {
self.insert_read_lane(key.clone(), v, hash);
Ok(self.table_get(key, hash))
}
None => {
Ok(None)
}
}
}
#[inline(always)]
pub fn put(&mut self, key: K, value: V) -> Result<()> {
let hash = make_hash(&self.hash_builder, &key);
self.insert(key, value, hash)
}
#[inline(always)]
pub fn rmw<F: Sized, P>(&mut self, key: &K, p: P, f: F) -> Result<()>
where
F: FnOnce(&mut V),
P: FnOnce() -> V,
{
let hash = make_hash(&self.hash_builder, key);
if let Some(entry) = self.table_find_mod_lane(key, hash) {
f(entry);
return Ok(());
}
if let Some((key, mut value)) = self.table_take_read_lane(key, hash) {
f(&mut value);
self.insert(key, value, hash)?;
return Ok(());
}
match self.backend_get(key)? {
Some(mut value) => {
f(&mut value);
self.insert(key.clone(), value, hash)?;
}
None => {
let mut value = p();
f(&mut value);
self.insert(key.clone(), value, hash)?;
}
}
Ok(())
}
#[inline(always)]
pub fn drain_modified(&self, iter: ProbeModIterator<K, V>) -> Result<()> {
self.handle.insert_all_by_ref(iter)
}
#[allow(clippy::type_complexity)]
pub fn full_iter(&mut self) -> ArconResult<(usize, Box<dyn Iterator<Item = Result<V>> + '_>)> {
self.persist()?;
let len = self.handle.len()?;
let values = self.handle.values()?;
Ok((len, values))
}
#[cfg(test)]
pub(crate) fn modified_iterator(&mut self) -> TableModIterator<K, V> {
let table = self.raw_table_mut();
unsafe { table.iter_modified() }
}
}
impl<K, V, B> IndexOps for HashTable<K, V, B>
where
K: Key + Eq + Hash,
V: Value,
B: Backend,
{
fn persist(&mut self) -> ArconResult<()> {
let table = self.raw_table_mut();
unsafe {
self.handle.insert_all_by_ref(table.iter_modified())?;
};
Ok(())
}
fn set_key(&mut self, _: u64) {}
fn table(&mut self) -> ArconResult<Option<ImmutableTable>> {
Ok(None)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::temp_backend;
use arcon_state::backend::sled::Sled;
use std::sync::Arc;
#[test]
fn basic_test() {
let backend = Arc::new(temp_backend());
let mod_capacity = 1024;
let read_capacity = 1024;
let mut hash_index: HashTable<u64, u64, Sled> =
HashTable::with_capacity("table", backend, mod_capacity, read_capacity);
for i in 0..1024 {
let key: u64 = i as u64;
hash_index.rmw(&key, || key, |v| *v += 1).expect("failure");
}
for i in 0..1024 {
let key: u64 = i as u64;
assert_eq!(hash_index.get(&key).unwrap(), Some(&(key + 1)));
}
assert!(hash_index.persist().is_ok());
}
#[test]
fn modified_test() {
let backend = Arc::new(temp_backend());
let capacity = 64;
let mut hash_index: HashTable<u64, u64, Sled> =
HashTable::with_capacity("table", backend, capacity, capacity);
for i in 0..10 {
hash_index.put(i as u64, i as u64).unwrap();
}
assert_eq!(hash_index.modified_iterator().count(), 10);
assert_eq!(hash_index.modified_iterator().count(), 0);
let rmw_keys = vec![0, 1, 2];
for key in &rmw_keys {
assert!(hash_index.rmw(key, || 0, |v| *v += 1).is_ok());
}
for (key, value) in hash_index.modified_iterator() {
assert!(rmw_keys.contains(key));
assert_eq!(value, &(key + 1));
}
}
}