1use std::hash::Hash;
2use binout::{AsIs, Serializer, VByte};
3use bitm::{BitAccess, Rank, ceiling_div};
4
5use crate::utils::ArrayWithRank;
6use crate::{BuildDefaultSeededHasher, BuildSeededHasher, stats, utils};
7
8use std::io;
9use std::sync::atomic::AtomicU64;
10use std::sync::atomic::Ordering::Relaxed;
11use rayon::prelude::*;
12use dyn_size_of::GetSize;
13
14use crate::fmph::keyset::{KeySet, SliceMutSource, SliceSourceWithRefs};
15
16#[derive(Clone)]
20pub struct BuildConf<S = BuildDefaultSeededHasher> {
21 pub hash_builder: S,
23
24 pub cache_threshold: usize,
32
33 pub relative_level_size: u16,
40
41 pub use_multiple_threads: bool
45}
46
47impl Default for BuildConf {
48 fn default() -> Self {
49 Self {
50 hash_builder: Default::default(),
51 cache_threshold: Self::DEFAULT_CACHE_THRESHOLD,
52 relative_level_size: 100,
53 use_multiple_threads: true
54 }
55 }
56}
57
58impl BuildConf {
59 pub fn mt(use_multiple_threads: bool) -> Self {
61 Self { use_multiple_threads, ..Default::default() }
62 }
63
64 pub fn ct(cache_threshold: usize) -> Self {
66 Self { cache_threshold, ..Default::default() }
67 }
68
69 pub fn ct_mt(cache_threshold: usize, use_multiple_threads: bool) -> Self {
72 Self { use_multiple_threads, cache_threshold, ..Default::default() }
73 }
74
75 pub fn lsize(relative_level_size: u16) -> Self {
79 Self { relative_level_size, ..Default::default() }
80 }
81
82 pub fn lsize_ct(relative_level_size: u16, cache_threshold: usize) -> Self {
85 Self { relative_level_size, cache_threshold, ..Default::default() }
86 }
87
88 pub fn lsize_mt(relative_level_size: u16, use_multiple_threads: bool) -> Self {
92 Self { relative_level_size, use_multiple_threads, ..Default::default() }
93 }
94}
95
96impl<S> BuildConf<S> {
97 pub const DEFAULT_CACHE_THRESHOLD: usize = 1024*1024*128; pub fn hash(hash_builder: S) -> Self {
103 Self { hash_builder, cache_threshold: Self::DEFAULT_CACHE_THRESHOLD, relative_level_size: 100, use_multiple_threads: true }
104 }
105
106 pub fn hash_lsize(hash_builder: S, relative_level_size: u16) -> Self {
108 Self { relative_level_size, ..Self::hash(hash_builder) }
109 }
110
111 pub fn hash_lsize_mt(hash_builder: S, relative_level_size: u16, use_multiple_threads: bool) -> Self {
114 Self { relative_level_size, hash_builder, use_multiple_threads, cache_threshold: Self::DEFAULT_CACHE_THRESHOLD }
115 }
116
117 pub fn hash_lsize_ct(hash_builder: S, relative_level_size: u16, cache_threshold: usize) -> Self {
121 Self { relative_level_size, hash_builder, use_multiple_threads: true, cache_threshold }
122 }
123
124 pub fn hash_lsize_ct_mt(hash_builder: S, relative_level_size: u16, cache_threshold: usize, use_multiple_threads: bool) -> Self {
128 Self { relative_level_size, hash_builder, use_multiple_threads, cache_threshold }
129 }
130}
131
132#[inline]
134pub(crate) fn fphash_add_bit(result: &mut [u64], collision: &mut [u64], bit_index: usize) {
135 let index = bit_index / 64;
136 let mask = 1u64 << (bit_index % 64) as u64;
137 collision[index] |= result[index] & mask;
138 result[index] |= mask;
139 }
141
142#[inline]
144pub(crate) fn fphash_sync_add_bit(result: &[AtomicU64], collision: &[AtomicU64], bit_index: usize) {
145 let index = bit_index / 64;
146 let mask = 1u64 << (bit_index % 64) as u64;
147 let old_result = result[index].fetch_or(mask, Relaxed);
149 if old_result & mask != 0 { collision[index].fetch_or(mask, Relaxed); }
150 }
152
153pub(crate) fn fphash_remove_collided(result: &mut LevelArray, collision: &[u64]) {
155 for (r, c) in result.iter_mut().zip(collision.iter()) {
156 *r &= !c;
157 }
158}
159
160#[cfg(not(all(feature = "aligned-vec", target_pointer_width = "32")))] pub (crate) type LevelArray = Box<[u64]>;
161#[cfg(all(feature = "aligned-vec", target_pointer_width = "32"))] pub (crate) type LevelArray = aligned_vec::ABox<[u64], aligned_vec::ConstAlign<{std::mem::align_of::<AtomicU64>()}>>;
162pub(crate) fn level_array_for(size_segments: usize) -> LevelArray {
163 #[cfg(not(all(feature = "aligned-vec", target_pointer_width = "32")))] { vec![0u64; size_segments].into_boxed_slice() }
164 #[cfg(all(feature = "aligned-vec", target_pointer_width = "32"))] { aligned_vec::avec![[{std::mem::align_of::<AtomicU64>()}] | 0u64; size_segments].into_boxed_slice() }
165}
166
167pub(crate) fn concat_level_arrays(levels: Vec<LevelArray>) -> Box<[u64]> {
168 let result_len = levels.iter().map(|l| l.len()).sum();
169 let mut result = Vec::with_capacity(result_len);
170 for a in levels { result.extend_from_slice(&a) }
171 result.into_boxed_slice()
172}
173
174#[inline]
176pub(crate) fn from_mut_slice(v: &mut LevelArray) -> &mut [AtomicU64] {
177 #[cfg(not(all(feature = "aligned-vec", target_pointer_width = "32")))]
178 let [] = [(); core::mem::align_of::<AtomicU64>() - core::mem::align_of::<u64>()];
179 unsafe { &mut *(v.as_mut() as *mut [u64] as *mut [AtomicU64]) }
180} #[inline]
184pub(crate) fn get_mut_slice(v: &mut [AtomicU64]) -> &mut [u64] {
185 unsafe { &mut *(v as *mut [AtomicU64] as *mut [u64]) }
187} #[inline(always)] fn index(key: &impl Hash, hash: &impl BuildSeededHasher, seed: u64, level_size: usize) -> usize {
198 utils::map64_to_64(hash.hash_one(key, seed), level_size as u64) as usize
199}
200
201struct Builder<S> {
203 arrays: Vec::<LevelArray>,
204 input_size: usize,
205 conf: BuildConf<S>
206}
207
208impl<S: BuildSeededHasher + Sync> Builder<S> {
209 pub fn new<K>(mut conf: BuildConf<S>, keys: &impl KeySet<K>) -> Self {
210 if conf.use_multiple_threads { conf.use_multiple_threads = rayon::current_num_threads() > 1; }
211 Self {
212 arrays: Vec::<LevelArray>::new(),
213 input_size: keys.keys_len(),
214 conf
215 }
216 }
217
218 pub fn retained<K>(&self, key: &K) -> bool where K: Hash {
220 self.arrays.iter().enumerate()
221 .all(|(seed, a)| !a.get_bit(index(key, &self.conf.hash_builder, seed as u64, a.len() << 6)))
222 }
223
224 fn build_array_for_indices_st(&self, bit_indices: &[usize], level_size_segments: usize) -> LevelArray
226 {
227 let mut result = level_array_for(level_size_segments);
228 let mut collision = vec![0u64; level_size_segments].into_boxed_slice();
229 for bit_index in bit_indices {
230 fphash_add_bit(&mut result, &mut collision, *bit_index);
231 };
232 fphash_remove_collided(&mut result, &collision);
233 result
234 }
235
236 fn build_array_for_indices(&self, bit_indices: &[usize], level_size_segments: usize) -> LevelArray
238 {
239 if !self.conf.use_multiple_threads {
240 return self.build_array_for_indices_st(bit_indices, level_size_segments)
241 }
242 let mut result = level_array_for(level_size_segments);
243 let result_atom = from_mut_slice(&mut result);
244 let mut collision: Box<[AtomicU64]> = (0..level_size_segments).map(|_| AtomicU64::default()).collect();
245 bit_indices.par_iter().for_each(
246 |bit_index| fphash_sync_add_bit(&result_atom, &collision, *bit_index)
247 );
248 fphash_remove_collided(&mut result, get_mut_slice(&mut collision));
249 result
250 }
251
252 fn build_level_st<K>(&self, keys: &impl KeySet<K>, level_size_segments: usize, seed: u64) -> LevelArray
254 where K: Hash
255 {
256 let mut result = level_array_for(level_size_segments);
257 let mut collision = vec![0u64; level_size_segments].into_boxed_slice();
258 let level_size = level_size_segments * 64;
259 keys.for_each_key(
260 |key| fphash_add_bit(&mut result, &mut collision, index(key, &self.conf.hash_builder, seed, level_size)),
261 |key| self.retained(key)
262 );
263 fphash_remove_collided(&mut result, &collision);
264 result
265 }
266
267 fn build_level<K>(&self, keys: &impl KeySet<K>, level_size_segments: usize, seed: u64) -> LevelArray
269 where K: Hash + Sync
270 {
271 if !(self.conf.use_multiple_threads && keys.has_par_for_each_key()) {
272 return self.build_level_st(keys, level_size_segments, seed);
273 }
274 let mut result = level_array_for(level_size_segments);
275 let result_atom = from_mut_slice(&mut result);
276 let mut collision: Box<[AtomicU64]> = (0..level_size_segments).map(|_| AtomicU64::default()).collect();
277 let level_size = level_size_segments * 64;
278 keys.par_for_each_key(
279 |key| fphash_sync_add_bit(&&result_atom, &collision, index(key, &self.conf.hash_builder, seed, level_size)),
280 |key| self.retained(key)
281 );
282 fphash_remove_collided(&mut result, get_mut_slice(&mut collision));
283 result
284 }
285
286 #[inline(always)] fn level_nr(&self) -> u64 { self.arrays.len() as u64 }
288
289 fn build_levels<K, BS>(&mut self, keys: &mut impl KeySet<K>, stats: &mut BS)
290 where K: Hash + Sync, BS: stats::BuildStatsCollector
291 {
292 let mut levels_without_reduction = 0; while self.input_size != 0 {
294 let level_size_segments = ceiling_div(self.input_size * self.conf.relative_level_size as usize, 64*100);
295 let level_size = level_size_segments * 64;
296 stats.level(self.input_size, level_size);
297 let seed = self.level_nr();
298 let array = if self.input_size < self.conf.cache_threshold {
299 let bit_indices = keys.maybe_par_map_each_key(
301 |key| index(key, &self.conf.hash_builder, seed, level_size),
302 |key| self.retained(key),
303 self.conf.use_multiple_threads
304 );
305 let array = self.build_array_for_indices(&bit_indices, level_size_segments);
306 keys.maybe_par_retain_keys_with_indices(
307 |i| !array.get_bit(bit_indices[i]),
308 |key| !array.get_bit(index(key, &self.conf.hash_builder, seed, level_size)),
309 |key| self.retained(key),
310 || array.count_bit_ones(),
311 self.conf.use_multiple_threads
312 );
313 array
314 } else {
315 let current_array = self.build_level(keys, level_size_segments, seed);
317 keys.maybe_par_retain_keys(
318 |key| !current_array.get_bit(index(key, &self.conf.hash_builder, seed, level_size)),
319 |key| self.retained(key),
320 || current_array.count_bit_ones(),
321 self.conf.use_multiple_threads
322 );
323 current_array
324 };
325 self.arrays.push(array);
326 let prev_input_size = self.input_size;
327 self.input_size = keys.keys_len();
328 if self.input_size == prev_input_size {
329 levels_without_reduction += 1;
330 if levels_without_reduction == 10 {
331 self.arrays.truncate(self.arrays.len()-levels_without_reduction);
332 stats.end(self.input_size);
333 break;
334 }
335 } else {
336 levels_without_reduction = 0;
337 }
338 }
339 stats.end(0)
340 }
341
342 pub fn finish(self) -> Function<S> {
343 let level_sizes = self.arrays.iter().map(|l| l.len()).collect();
344 let (array, _) = ArrayWithRank::build(concat_level_arrays(self.arrays));
346 Function::<S> {
347 array,
348 level_sizes,
349 hash_builder: self.conf.hash_builder
350 }
351 }
352
353 }
386
387
388#[derive(Clone)]
394pub struct Function<S = BuildDefaultSeededHasher> {
395 array: ArrayWithRank,
396 level_sizes: Box<[usize]>,
397 hash_builder: S
398}
399
400impl<S: BuildSeededHasher> GetSize for Function<S> {
401 fn size_bytes_dyn(&self) -> usize { self.array.size_bytes_dyn() + self.level_sizes.size_bytes_dyn() }
402 fn size_bytes_content_dyn(&self) -> usize { self.array.size_bytes_content_dyn() + self.level_sizes.size_bytes_content_dyn() }
403 const USES_DYN_MEM: bool = true;
404}
405
406impl<S: BuildSeededHasher> Function<S> {
407
408 #[inline(always)] fn index<K: Hash + ?Sized>(&self, k: &K, level_nr: u64, size: usize) -> usize {
410 utils::map64_to_64(self.hash_builder.hash_one(k, level_nr), size as u64) as usize
412 }
413
414 #[inline(always)]
420 pub fn get_stats<K: Hash + ?Sized, A: stats::AccessStatsCollector>(&self, key: &K, access_stats: &mut A) -> Option<u64> {
421 let mut array_begin_index = 0usize;
422 let mut level_nr = 0usize;
423 loop {
424 let level_size = (*self.level_sizes.get(level_nr)?) << 6;
425 let i = array_begin_index + self.index(key, level_nr as u64, level_size);
426 if self.array.content.get_bit(i) {
427 access_stats.found_on_level(level_nr);
428 return Some(unsafe{self.array.rank_unchecked(i)} as u64);
429 }
430 array_begin_index += level_size;
431 level_nr += 1;
432 }
433 }
434
435 #[inline(always)] pub fn get<K: Hash + ?Sized>(&self, key: &K) -> Option<u64> {
441 self.get_stats(key, &mut ())
442 }
443
444 #[inline(always)] pub fn get_stats_or_panic<K: Hash + ?Sized, A: stats::AccessStatsCollector>(&self, key: &K, access_stats: &mut A) -> u64 {
450 self.get_stats(key, access_stats).expect("Invalid access to an item outside the set given during construction.")
451 }
452
453 #[inline(always)] pub fn get_or_panic<K: Hash + ?Sized>(&self, key: &K) -> u64 {
459 self.get_stats_or_panic(key, &mut ())
460 }
461
462 pub fn write_bytes(&self) -> usize {
464 VByte::array_size(&self.level_sizes) + AsIs::array_content_size(&self.array.content)
465 }
466
467 pub fn write(&self, output: &mut dyn io::Write) -> io::Result<()>
469 {
470 VByte::write_array(output, &self.level_sizes)?;
471 AsIs::write_all(output, self.array.content.iter())
472 }
473
474 pub fn read_with_hasher(input: &mut dyn io::Read, hasher: S) -> io::Result<Self>
476 {
477 let level_sizes = VByte::read_array(input)?;
478 let array_content_len = level_sizes.iter().map(|v|*v as usize).sum::<usize>();
479 let array_content = AsIs::read_n(input, array_content_len)?;
480 let (array_with_rank, _) = ArrayWithRank::build(array_content);
481 Ok(Self { array: array_with_rank, level_sizes, hash_builder: hasher })
482 }
483
484 pub fn level_sizes(&self) -> &[usize] {
486 &self.level_sizes
487 }
488}
489
490impl<S: BuildSeededHasher + Sync> Function<S> {
491 pub fn try_with_conf_stats_or_partial<K, BS, KS>(mut keys: KS, conf: BuildConf<S>, stats: &mut BS) -> Result<Self, (Self, KS, usize)>
505 where K: Hash + Sync, KS: KeySet<K>, BS: stats::BuildStatsCollector
506 {
507 let mut builder = Builder::new(conf, &keys);
508 let initial_size = builder.input_size;
509 builder.build_levels(&mut keys, stats);
510 if builder.input_size == 0 {
511 drop(keys);
512 Ok(builder.finish())
513 } else {
514 Err((builder.finish(), keys, initial_size))
515 }
516 }
517
518 pub fn try_with_conf_stats<K, BS, KS>(mut keys: KS, conf: BuildConf<S>, stats: &mut BS) -> Option<Self>
524 where K: Hash + Sync, KS: KeySet<K>, BS: stats::BuildStatsCollector
525 {
526 let mut builder = Builder::new(conf, &keys);
527 builder.build_levels(&mut keys, stats);
528 drop(keys);
529 (builder.input_size == 0).then(|| builder.finish())
530 }
531
532 pub fn with_conf_stats<K, BS>(keys: impl KeySet<K>, conf: BuildConf<S>, stats: &mut BS) -> Self
538 where K: Hash + Sync, BS: stats::BuildStatsCollector
539 {
540 Self::try_with_conf_stats(keys, conf, stats).expect("Constructing fmph::Function failed. Probably the input contains duplicate keys.")
541 }
542
543 #[inline] pub fn with_conf<K>(keys: impl KeySet<K>, conf: BuildConf<S>) -> Self
549 where K: Hash + Sync
550 {
551 Self::with_conf_stats(keys, conf, &mut ())
552 }
553
554 #[inline] pub fn from_slice_with_conf_stats<K, BS>(keys: &[K], conf: BuildConf<S>, stats: &mut BS) -> Self
560 where K: Hash + Sync, BS: stats::BuildStatsCollector
561 {
562 Self::with_conf_stats(SliceSourceWithRefs::<_, u8>::new(keys), conf, stats)
563 }
564
565 #[inline] pub fn from_slice_with_conf<K>(keys: &[K], conf: BuildConf<S>) -> Self
571 where K: Hash + Sync
572 {
573 Self::with_conf_stats(SliceSourceWithRefs::<_, u8>::new(keys), conf, &mut ())
574 }
575
576 #[inline] pub fn from_slice_mut_with_conf_stats<K, BS>(keys: &mut [K], conf: BuildConf<S>, stats: &mut BS) -> Self
584 where K: Hash + Sync, BS: stats::BuildStatsCollector
585 {
586 Self::with_conf_stats(SliceMutSource::new(keys), conf, stats)
587 }
588
589 #[inline] pub fn from_slice_mut_with_conf<K>(keys: &mut [K], conf: BuildConf<S>) -> Self
596 where K: Hash + Sync
597 {
598 Self::with_conf_stats(SliceMutSource::new(keys), conf, &mut ())
599 }
600}
601
602impl Function {
603 pub fn read(input: &mut dyn io::Read) -> io::Result<Self> {
606 Self::read_with_hasher(input, Default::default())
607 }
608
609 pub fn with_stats<K, BS>(keys: impl KeySet<K>, stats: &mut BS) -> Self
615 where K: Hash + Sync, BS: stats::BuildStatsCollector
616 {
617 Self::with_conf_stats(keys, Default::default(), stats)
618 }
619
620 pub fn new<K: Hash + Sync>(keys: impl KeySet<K>) -> Self {
626 Self::with_conf_stats(keys, Default::default(), &mut ())
627 }
628}
629
630impl<S> Function<S> {
631 pub fn len(&self) -> usize {
635 self.array.content.count_bit_ones()
636 }
637}
638
639impl<K: Hash + Sync> From<&[K]> for Function {
640 fn from(keys: &[K]) -> Self {
641 Self::new(SliceSourceWithRefs::<_, u8>::new(keys))
642 }
643}
644
645impl<K: Hash + Sync + Send> From<Vec<K>> for Function {
646 fn from(keys: Vec<K>) -> Self {
647 Self::new(keys)
648 }
649}
650
651#[cfg(test)]
652pub(crate) mod tests {
653 use super::*;
654 use std::fmt::Display;
655 use crate::utils::{tests::test_mphf_u64, verify_phf};
656
657 fn test_read_write(h: &Function) {
658 let mut buff = Vec::new();
659 h.write(&mut buff).unwrap();
660 assert_eq!(buff.len(), h.write_bytes());
661 let read = Function::read(&mut &buff[..]).unwrap();
662 assert_eq!(h.level_sizes.len(), read.level_sizes.len());
663 assert_eq!(h.array.content, read.array.content);
664 }
665
666 fn test_with_input<K: Hash + Clone + Display + Sync>(to_hash: &[K]) {
667 let h = Function::from_slice_with_conf(to_hash, BuildConf::mt(false));
668 test_mphf_u64(to_hash, |key| h.get(key));
669 test_read_write(&h);
670 assert_eq!(h.len(), to_hash.len());
671 }
672
673 #[test]
674 fn test_small() {
675 test_with_input(&[1, 2, 5]);
676 test_with_input(&(-50..150).collect::<Vec<_>>());
677 test_with_input(&['a', 'b', 'c', 'd']);
678 }
679
680 #[test]
681 fn test_large_size() {
682 let keys = (-20000..20000).collect::<Vec<_>>();
683 assert!(Function::from(&keys[..]).size_bytes() as f64 * (8.0/40000.0) < 2.9);
684 assert!(Function::from_slice_with_conf(&keys[..], BuildConf::lsize(200)).size_bytes() as f64 * (8.0/40000.0) < 3.5);
685 }
686
687 #[test]
688 fn test_dynamic() {
689 const LEN: u64 = 50_000;
690 let f = Function::new(
691 crate::fmph::keyset::CachedKeySet::dynamic(|| 0..LEN, 10_000));
692 verify_phf(LEN as usize, 0..LEN, |key| f.get(key).map(|v| v as usize));
693 assert!(f.size_bytes() as f64 * (8.0/LEN as f64) < 2.9);
694 }
695
696 #[test]
697 fn test_dynamic_par() {
698 const LEN: u64 = 50_000;
699 let f = Function::new(
700 crate::fmph::keyset::CachedKeySet::dynamic((|| 0..LEN, || (0..LEN).into_par_iter()), 10_000));
701 verify_phf(LEN as usize, 0..LEN, |key| f.get(key).map(|v| v as usize));
702 assert!(f.size_bytes() as f64 * (8.0/LEN as f64) < 2.9);
703 }
704
705 #[test]
706 #[ignore = "uses much memory and time"]
707 fn test_fmph_for_over_2to32_keys() {
708 const LEN: u64 = 5_000_000_000;
709 let f = Function::with_stats(
710 crate::fmph::keyset::CachedKeySet::dynamic(|| 0..LEN, usize::MAX),
711 &mut crate::stats::BuildStatsPrinter::stdout());
712 verify_phf(LEN as usize, 0..LEN, |key| f.get(key).map(|v| v as usize));
713 assert!(f.size_bytes() as f64 * (8.0/LEN as f64) < 2.9);
714 }
715
716 #[test]
717 fn test_duplicates() {
718 assert!(Function::try_with_conf_stats(vec![1, 1], Default::default(), &mut ()).is_none());
719 assert!(Function::try_with_conf_stats(vec![1, 2, 3, 1, 4], Default::default(), &mut ()).is_none());
720 }
721
722 #[test]
723 fn test_duplicates_partial() {
724 let keys = vec![1, 2, 3, 1, 4];
725 let expected_initial_len = keys.len();
726 let r = Function::try_with_conf_stats_or_partial(keys, Default::default(), &mut ());
727 if let Err((mphf, mut remaining, initial_len)) = r {
728 assert_eq!(initial_len, expected_initial_len);
729 remaining.sort();
730 assert_eq!(remaining, vec![1, 1]);
731 test_mphf_u64(&[2, 3, 4], |key| mphf.get(key));
732 } else {
733 assert!(false)
734 }
735 }
736}