use std::hash::Hash;
use binout::{AsIs, Serializer, VByte};
use bitm::{BitAccess, BitArrayWithRank, ceiling_div};
use crate::utils::ArrayWithRank;
use crate::{BuildDefaultSeededHasher, BuildSeededHasher, stats, utils};
use std::io;
use std::sync::atomic::{AtomicU64};
use std::sync::atomic::Ordering::Relaxed;
use rayon::prelude::*;
use dyn_size_of::GetSize;
use crate::fmph::keyset::{KeySet, SliceMutSource, SliceSourceWithRefs};
#[derive(Clone)]
pub struct BuildConf<S = BuildDefaultSeededHasher> {
pub hash_builder: S,
pub cache_threshold: usize,
pub relative_level_size: u16,
pub use_multiple_threads: bool
}
impl Default for BuildConf {
fn default() -> Self {
Self {
hash_builder: Default::default(),
cache_threshold: Self::DEFAULT_CACHE_THRESHOLD,
relative_level_size: 100,
use_multiple_threads: true
}
}
}
impl BuildConf {
pub fn mt(use_multiple_threads: bool) -> Self {
Self { use_multiple_threads, ..Default::default() }
}
pub fn ct_mt(cache_threshold: usize, use_multiple_threads: bool) -> Self {
Self { use_multiple_threads, cache_threshold, ..Default::default() }
}
pub fn lsize(relative_level_size: u16) -> Self {
Self { relative_level_size, ..Default::default() }
}
pub fn lsize_mt(relative_level_size: u16, use_multiple_threads: bool) -> Self {
Self { relative_level_size, use_multiple_threads, ..Default::default() }
}
}
impl<S> BuildConf<S> {
pub const DEFAULT_CACHE_THRESHOLD: usize = 1024*1024*128; pub fn hash(hash_builder: S) -> Self {
Self { hash_builder, cache_threshold: Self::DEFAULT_CACHE_THRESHOLD, relative_level_size: 100, use_multiple_threads: true }
}
pub fn hash_lsize(hash_builder: S, relative_level_size: u16) -> Self {
Self { relative_level_size, ..Self::hash(hash_builder) }
}
pub fn hash_lsize_mt(hash_builder: S, relative_level_size: u16, use_multiple_threads: bool) -> Self {
Self { relative_level_size, hash_builder, use_multiple_threads, cache_threshold: Self::DEFAULT_CACHE_THRESHOLD }
}
pub fn hash_lsize_ct_mt(hash_builder: S, relative_level_size: u16, cache_threshold: usize, use_multiple_threads: bool) -> Self {
Self { relative_level_size, hash_builder, use_multiple_threads, cache_threshold }
}
}
#[cfg(not(feature = "no_branchless_bb"))]
#[inline]
pub(crate) fn fphash_add_bit(result: &mut [u64], collision: &mut [u64], bit_index: usize) {
let index = bit_index / 64;
let mask = 1u64 << (bit_index % 64) as u64;
collision[index] |= result[index] & mask;
result[index] |= mask;
}
#[cfg(feature = "no_branchless_bb")]
pub(crate) fn fphash_add_bit(result: &mut Box<[u64]>, collision: &mut Box<[u64]>, bit_index: usize) {
let index = bit_index / 64;
let mask = 1u64 << (bit_index % 64) as u64;
if collision[index] & mask == 0 { if result[index] & mask == 0 {
result[index] |= mask;
} else {
collision[index] |= mask;
}
}
}
#[inline]
pub(crate) fn fphash_sync_add_bit(result: &[AtomicU64], collision: &[AtomicU64], bit_index: usize) {
let index = bit_index / 64;
let mask = 1u64 << (bit_index % 64) as u64;
#[cfg(feature = "no_branchless_bb")] if collision[index].load(Relaxed) & mask != 0 { return; } let old_result = result[index].fetch_or(mask, Relaxed);
if old_result & mask != 0 { collision[index].fetch_or(mask, Relaxed); }
}
pub(crate) fn fphash_remove_collided(result: &mut Box<[u64]>, collision: &[u64]) {
for (r, c) in result.iter_mut().zip(collision.iter()) {
*r &= !c;
}
}
#[inline]
pub(crate) fn from_mut_slice(v: &mut [u64]) -> &mut [AtomicU64] {
use core::mem::align_of;
let [] = [(); align_of::<AtomicU64>() - align_of::<u64>()];
unsafe { &mut *(v as *mut [u64] as *mut [AtomicU64]) }
} #[inline]
pub(crate) fn get_mut_slice(v: &mut [AtomicU64]) -> &mut [u64] {
unsafe { &mut *(v as *mut [AtomicU64] as *mut [u64]) }
} #[inline(always)] fn index(key: &impl Hash, hash: &impl BuildSeededHasher, seed: u32, level_size: usize) -> usize {
utils::map64_to_64(hash.hash_one(key, seed), level_size as u64) as usize
}
struct Builder<S> {
arrays: Vec::<Box<[u64]>>,
input_size: usize,
use_multiple_threads: bool,
conf: BuildConf<S>
}
impl<S: BuildSeededHasher + Sync> Builder<S> {
pub fn new<K>(conf: BuildConf<S>, keys: &impl KeySet<K>) -> Self {
Self {
arrays: Vec::<Box<[u64]>>::new(),
input_size: keys.keys_len(),
use_multiple_threads: conf.use_multiple_threads && (keys.has_par_for_each_key() || keys.has_par_retain_keys()) && rayon::current_num_threads() > 1,
conf
}
}
pub fn retained<K>(&self, key: &K) -> bool where K: Hash {
self.arrays.iter().enumerate()
.all(|(seed, a)| !a.get_bit(index(key, &self.conf.hash_builder, seed as u32, a.len() << 6)))
}
fn build_array_for_indices_st(&self, bit_indices: &[usize], level_size_segments: usize) -> Box<[u64]>
{
let mut result = vec![0u64; level_size_segments].into_boxed_slice();
let mut collision = vec![0u64; level_size_segments].into_boxed_slice();
for bit_index in bit_indices {
fphash_add_bit(&mut result, &mut collision, *bit_index);
};
fphash_remove_collided(&mut result, &collision);
result
}
fn build_array_for_indices(&self, bit_indices: &[usize], level_size_segments: usize) -> Box<[u64]>
{
if !self.use_multiple_threads {
return self.build_array_for_indices_st(bit_indices, level_size_segments)
}
let mut result = vec![0u64; level_size_segments].into_boxed_slice();
let result_atom = from_mut_slice(&mut result);
let mut collision: Box<[AtomicU64]> = (0..level_size_segments).map(|_| AtomicU64::default()).collect();
bit_indices.par_iter().for_each(
|bit_index| fphash_sync_add_bit(&result_atom, &collision, *bit_index)
);
fphash_remove_collided(&mut result, get_mut_slice(&mut collision));
result
}
fn build_level_st<K>(&self, keys: &impl KeySet<K>, level_size_segments: usize, seed: u32) -> Box<[u64]>
where K: Hash
{
let mut result = vec![0u64; level_size_segments].into_boxed_slice();
let mut collision = vec![0u64; level_size_segments].into_boxed_slice();
let level_size = level_size_segments * 64;
keys.for_each_key(
|key| fphash_add_bit(&mut result, &mut collision, index(key, &self.conf.hash_builder, seed, level_size)),
|key| self.retained(key)
);
fphash_remove_collided(&mut result, &collision);
result
}
fn build_level_mt<K>(&self, keys: &impl KeySet<K>, level_size_segments: usize, seed: u32) -> Box<[u64]>
where K: Hash + Sync
{
if !keys.has_par_for_each_key() {
return self.build_level_st(keys, level_size_segments, seed);
}
let mut result = vec![0u64; level_size_segments].into_boxed_slice();
let result_atom = from_mut_slice(&mut result);
let mut collision: Box<[AtomicU64]> = (0..level_size_segments).map(|_| AtomicU64::default()).collect();
let level_size = level_size_segments * 64;
keys.par_for_each_key(
|key| fphash_sync_add_bit(&&result_atom, &collision, index(key, &self.conf.hash_builder, seed, level_size)),
|key| self.retained(key)
);
fphash_remove_collided(&mut result, get_mut_slice(&mut collision));
result
}
#[inline(always)] fn level_nr(&self) -> u32 { self.arrays.len() as u32 }
fn build_levels<K, BS>(&mut self, keys: &mut impl KeySet<K>, stats: &mut BS)
where K: Hash + Sync, BS: stats::BuildStatsCollector
{
while self.input_size != 0 {
let level_size_segments = ceiling_div(self.input_size * self.conf.relative_level_size as usize, 64*100);
let level_size = level_size_segments * 64;
stats.level(self.input_size, level_size);
let seed = self.level_nr();
let array = if self.input_size < self.conf.cache_threshold {
let bit_indices = keys.maybe_par_map_each_key(
|key| index(key, &self.conf.hash_builder, seed, level_size),
|key| self.retained(key),
self.use_multiple_threads
);
let array = self.build_array_for_indices(&bit_indices, level_size_segments);
keys.maybe_par_retain_keys_with_indices(
|i| !array.get_bit(bit_indices[i]),
|key| !array.get_bit(index(key, &self.conf.hash_builder, seed, level_size)),
|key| self.retained(key),
|| array.count_bit_ones(),
self.use_multiple_threads
);
array
} else {
if self.use_multiple_threads {
let current_array = self.build_level_mt(keys, level_size_segments, seed);
keys.par_retain_keys(
|key| !current_array.get_bit(index(key, &self.conf.hash_builder, seed, level_size)),
|key| self.retained(key),
|| current_array.count_bit_ones()
);
current_array
} else {
let current_array = self.build_level_st(keys, level_size_segments, seed);
keys.retain_keys(
|key| !current_array.get_bit(index(key, &self.conf.hash_builder, seed, level_size)),
|key| self.retained(key),
|| current_array.count_bit_ones()
);
current_array
}
};
self.arrays.push(array);
self.input_size = keys.keys_len();
}
}
}
#[derive(Clone)]
pub struct Function<S = BuildDefaultSeededHasher> {
array: ArrayWithRank,
level_sizes: Box<[u64]>,
hash_builder: S,
}
impl<S: BuildSeededHasher> GetSize for Function<S> {
fn size_bytes_dyn(&self) -> usize { self.array.size_bytes_dyn() + self.level_sizes.size_bytes_dyn() }
fn size_bytes_content_dyn(&self) -> usize { self.array.size_bytes_content_dyn() + self.level_sizes.size_bytes_content_dyn() }
const USES_DYN_MEM: bool = true;
}
impl<S: BuildSeededHasher> Function<S> {
#[inline(always)] fn index<K: Hash>(&self, k: &K, level_nr: u32, size: usize) -> usize {
utils::map64_to_64(self.hash_builder.hash_one(k, level_nr), size as u64) as usize
}
pub fn get_stats<K: Hash, A: stats::AccessStatsCollector>(&self, key: &K, access_stats: &mut A) -> Option<u64> {
let mut array_begin_index = 0usize;
let mut level_nr = 0u32;
loop {
let level_size = (*self.level_sizes.get(level_nr as usize)? as usize) << 6;
let i = array_begin_index + self.index(key, level_nr, level_size);
if self.array.content.get_bit(i) {
access_stats.found_on_level(level_nr);
return Some(self.array.rank(i) as u64);
}
array_begin_index += level_size;
level_nr += 1;
}
}
#[inline] pub fn get<K: Hash>(&self, key: &K) -> Option<u64> {
self.get_stats(key, &mut ())
}
pub fn write_bytes(&self) -> usize {
VByte::array_size(&self.level_sizes) + AsIs::array_content_size(&self.array.content)
}
pub fn write(&self, output: &mut dyn io::Write) -> io::Result<()>
{
VByte::write_array(output, &self.level_sizes)?;
AsIs::write_all(output, self.array.content.iter())
}
pub fn read_with_hasher(input: &mut dyn io::Read, hasher: S) -> io::Result<Self>
{
let level_sizes = VByte::read_array(input)?;
let array_content_len = level_sizes.iter().map(|v|*v as usize).sum::<usize>();
let array_content = AsIs::read_n(input, array_content_len)?;
let (array_with_rank, _) = ArrayWithRank::build(array_content);
Ok(Self { array: array_with_rank, level_sizes, hash_builder: hasher })
}
pub fn level_sizes(&self) -> &[u64] {
&self.level_sizes
}
}
impl<S: BuildSeededHasher + Sync> Function<S> {
pub fn with_conf_stats<K, BS>(mut keys: impl KeySet<K>, conf: BuildConf<S>, stats: &mut BS) -> Self
where K: Hash + Sync,
BS: stats::BuildStatsCollector
{
let mut builder = Builder::new(conf, &keys);
builder.build_levels(&mut keys, stats);
drop(keys);
stats.end();
let level_sizes = builder.arrays.iter().map(|l| l.len() as u64).collect();
let (array, _) = ArrayWithRank::build(builder.arrays.concat().into_boxed_slice());
Self {
array,
level_sizes,
hash_builder: builder.conf.hash_builder
}
}
#[inline] pub fn with_conf<K>(keys: impl KeySet<K>, conf: BuildConf<S>) -> Self
where K: Hash + Sync
{
Self::with_conf_stats(keys, conf, &mut ())
}
#[inline] pub fn from_slice_with_conf_stats<K, BS>(keys: &[K], conf: BuildConf<S>, stats: &mut BS) -> Self
where K: Hash + Sync, BS: stats::BuildStatsCollector
{
Self::with_conf_stats(SliceSourceWithRefs::<_, u8>::new(keys), conf, stats)
}
#[inline] pub fn from_slice_with_conf<K>(keys: &[K], conf: BuildConf<S>) -> Self
where K: Hash + Sync
{
Self::with_conf_stats(SliceSourceWithRefs::<_, u8>::new(keys), conf, &mut ())
}
#[inline] pub fn from_slice_mut_with_conf_stats<K, BS>(keys: &mut [K], conf: BuildConf<S>, stats: &mut BS) -> Self
where K: Hash + Sync, BS: stats::BuildStatsCollector
{
Self::with_conf_stats(SliceMutSource::new(keys), conf, stats)
}
#[inline] pub fn from_slice_mut_with_conf<K>(keys: &mut [K], conf: BuildConf<S>) -> Self
where K: Hash + Sync
{
Self::with_conf_stats(SliceMutSource::new(keys), conf, &mut ())
}
}
impl Function {
pub fn read(input: &mut dyn io::Read) -> io::Result<Self> {
Self::read_with_hasher(input, Default::default())
}
pub fn with_stats<K, BS>(keys: impl KeySet<K>, stats: &mut BS) -> Self
where K: Hash + Sync, BS: stats::BuildStatsCollector
{
Self::with_conf_stats(keys, Default::default(), stats)
}
pub fn new<K: Hash + Sync>(keys: impl KeySet<K>) -> Self {
Self::with_conf_stats(keys, Default::default(), &mut ())
}
}
impl<K: Hash + Clone + Sync> From<&[K]> for Function {
fn from(keys: &[K]) -> Self {
Self::new(SliceSourceWithRefs::<_, u8>::new(keys))
}
}
impl<K: Hash + Sync + Send> From<Vec<K>> for Function {
fn from(keys: Vec<K>) -> Self {
Self::new(keys)
}
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use std::fmt::Display;
pub fn test_mphf<K: std::fmt::Display, G: Fn(&K)->Option<usize>>(mphf_keys: &[K], mphf: G) {
use bitm::BitVec;
let mut seen = Box::<[u64]>::with_zeroed_bits(mphf_keys.len());
for key in mphf_keys {
let index = mphf(key);
assert!(index.is_some(), "MPHF does not assign the value for the key {} which is in the input", key);
let index = index.unwrap() as usize;
assert!(index < mphf_keys.len(), "MPHF assigns too large value for the key {}: {}>{}.", key, index, mphf_keys.len());
assert!(!seen.get_bit(index), "MPHF assigns the same value to two keys of input, including {}.", key);
seen.set_bit(index);
}
}
fn test_read_write(h: &Function) {
let mut buff = Vec::new();
h.write(&mut buff).unwrap();
assert_eq!(buff.len(), h.write_bytes());
let read = Function::read(&mut &buff[..]).unwrap();
assert_eq!(h.level_sizes.len(), read.level_sizes.len());
assert_eq!(h.array.content, read.array.content);
}
fn test_with_input<K: Hash + Clone + Display + Sync>(to_hash: &[K]) {
let h = Function::from_slice_with_conf(to_hash, BuildConf::mt(false));
test_mphf(to_hash, |key| h.get(key).map(|i| i as usize));
test_read_write(&h);
}
#[test]
fn test_small() {
test_with_input(&[1, 2, 5]);
test_with_input(&(-50..150).collect::<Vec<_>>());
test_with_input(&['a', 'b', 'c', 'd']);
}
}