1use rayon::prelude::*;
33
34mod bitvector;
35pub mod hashmap;
36mod par_iter;
37use bitvector::BitVector;
38
39use log::error;
40use std::borrow::Borrow;
41use std::fmt::Debug;
42use std::hash::Hash;
43use std::hash::Hasher;
44use std::marker::PhantomData;
45use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
46use std::sync::{Arc, Mutex};
47
48#[cfg(feature = "serde")]
49use serde::{self, Deserialize, Serialize};
50
51#[inline]
52fn fold(v: u64) -> u32 {
53 ((v & 0xFFFFFFFF) as u32) ^ ((v >> 32) as u32)
54}
55
56#[inline]
57fn hash_with_seed<T: Hash + ?Sized>(iter: u64, v: &T) -> u64 {
58 let mut state = wyhash::WyHash::with_seed(1 << (iter + iter));
59 v.hash(&mut state);
60 state.finish()
61}
62
63#[inline]
64fn hash_with_seed32<T: Hash + ?Sized>(iter: u64, v: &T) -> u32 {
65 fold(hash_with_seed(iter, v))
66}
67
68#[inline]
69fn fastmod(hash: u32, n: u32) -> u64 {
70 ((hash as u64) * (n as u64)) >> 32
71}
72
73#[inline]
74fn hashmod<T: Hash + ?Sized>(iter: u64, v: &T, n: usize) -> u64 {
75 if n < 1 << 32 {
78 let h = hash_with_seed32(iter, v);
79 fastmod(h, n as u32) as u64
80 } else {
81 let h = hash_with_seed(iter, v);
82 h % (n as u64)
83 }
84}
85
86#[derive(Clone, Debug)]
88#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
89pub struct Mphf<T> {
90 bitvecs: Box<[BitVector]>,
91 ranks: Box<[Box<[u64]>]>,
92 phantom: PhantomData<T>,
93}
94
95impl<T> dyn_size_of::GetSize for Mphf<T> {
96 fn size_bytes_dyn(&self) -> usize {
97 self.bitvecs.size_bytes_dyn() + self.ranks.size_bytes_dyn()
98 }
99 const USES_DYN_MEM: bool = true;
100}
101
102const MAX_ITERS: u64 = 100;
103
104impl<'a, T: 'a + Hash + Debug> Mphf<T> {
105 pub fn from_chunked_iterator<I, N>(gamma: f64, objects: &'a I, n: usize) -> Mphf<T>
117 where
118 &'a I: IntoIterator<Item = N>,
119 N: IntoIterator<Item = T> + Send,
120 <N as IntoIterator>::IntoIter: ExactSizeIterator,
121 <&'a I as IntoIterator>::IntoIter: Send,
122 I: Sync,
123 {
124 let mut iter = 0;
125 let mut bitvecs = Vec::new();
126 let done_keys = BitVector::new(std::cmp::max(255, n));
127
128 loop {
131 if iter > MAX_ITERS {
132 error!("ran out of key space. items: {:?}", done_keys.len());
133 panic!("counldn't find unique hashes");
134 }
135
136 let keys_remaining = if iter == 0 { n } else { n - done_keys.len() };
137
138 let size = std::cmp::max(255, (gamma * keys_remaining as f64) as u64);
139
140 let mut a = BitVector::new(size as usize);
141 let mut collide = BitVector::new(size as usize);
142
143 let seed = iter;
144 let mut offset = 0;
145
146 for object in objects {
147 let mut object_iter = object.into_iter();
148
149 let mut object_pos = 0;
152 let len = object_iter.len();
153
154 for object_index in 0..len {
155 let index = offset + object_index;
156
157 if !done_keys.contains(index) {
158 let key = match object_iter.nth(object_index - object_pos) {
159 None => panic!("ERROR: max number of items overflowed"),
160 Some(key) => key,
161 };
162
163 object_pos = object_index + 1;
164
165 let idx = hashmod(seed, &key, size as usize);
166
167 if collide.contains(idx as usize) {
168 continue;
169 }
170 let a_was_set = !a.insert_sync(idx as usize);
171 if a_was_set {
172 collide.insert_sync(idx as usize);
173 }
174 }
175 } offset += len;
178 } let mut offset = 0;
181 for object in objects {
182 let mut object_iter = object.into_iter();
183
184 let mut object_pos = 0;
187 let len = object_iter.len();
188
189 for object_index in 0..len {
190 let index = offset + object_index;
191
192 if !done_keys.contains(index) {
193 let key = match object_iter.nth(object_index - object_pos) {
195 None => panic!("ERROR: max number of items overflowed"),
196 Some(key) => key,
197 };
198
199 object_pos = object_index + 1;
200
201 let idx = hashmod(seed, &key, size as usize);
202
203 if collide.contains(idx as usize) {
204 a.remove(idx as usize);
205 } else {
206 done_keys.insert(index as usize);
207 }
208 }
209 } offset += len;
212 } bitvecs.push(a);
215 if done_keys.len() == n {
216 break;
217 }
218 iter += 1;
219 }
220
221 let ranks = Self::compute_ranks(&bitvecs);
222 Mphf {
223 bitvecs: bitvecs.into_boxed_slice(),
224 ranks,
225 phantom: PhantomData,
226 }
227 }
228}
229
230impl<T: Hash + Debug> Mphf<T> {
231 pub fn new(gamma: f64, objects: &[T]) -> Mphf<T> {
237 let mut bitvecs = Vec::new();
239 let mut iter = 0;
240
241 let mut cx = Context::new(
242 std::cmp::max(255, (gamma * objects.len() as f64) as usize),
243 iter,
244 );
245
246 objects.iter().for_each(|v| cx.find_collisions_sync(v));
247 let mut redo_keys = objects
248 .iter()
249 .filter_map(|v| cx.filter(v))
250 .collect::<Vec<_>>();
251
252 bitvecs.push(cx.a);
253 iter += 1;
254
255 while !redo_keys.is_empty() {
256 let mut cx = Context::new(
257 std::cmp::max(255, (gamma * redo_keys.len() as f64) as usize),
258 iter,
259 );
260
261 (&redo_keys)
262 .iter()
263 .for_each(|&v| cx.find_collisions_sync(v));
264 redo_keys = (&redo_keys).iter().filter_map(|&v| cx.filter(v)).collect();
265
266 bitvecs.push(cx.a);
267 iter += 1;
268 if iter > MAX_ITERS {
269 error!("ran out of key space. items: {:?}", redo_keys);
270 panic!("counldn't find unique hashes");
271 }
272 }
273
274 Mphf {
275 ranks: Self::compute_ranks(&bitvecs),
276 bitvecs: bitvecs.into_boxed_slice(),
277 phantom: PhantomData,
278 }
279 }
280
281 fn compute_ranks(bvs: &[BitVector]) -> Box<[Box<[u64]>]> {
282 let mut ranks = Vec::new();
283 let mut pop = 0_u64;
284
285 for bv in bvs {
286 let mut rank: Vec<u64> = Vec::new();
287 for i in 0..bv.num_words() {
288 let v = bv.get_word(i);
289
290 if i % 8 == 0 {
291 rank.push(pop)
292 }
293
294 pop += v.count_ones() as u64;
295 }
296
297 ranks.push(rank.into_boxed_slice())
298 }
299
300 ranks.into_boxed_slice()
301 }
302
303 #[inline]
304 fn get_rank(&self, hash: u64, i: usize) -> u64 {
305 let idx = hash as usize;
306 let bv = self.bitvecs.get(i).expect("that level doesn't exist");
307 let ranks = self.ranks.get(i).expect("that level doesn't exist");
308
309 let mut rank = ranks[idx / 512];
311
312 for j in (idx / 64) & !7..idx / 64 {
314 rank += bv.get_word(j).count_ones() as u64;
315 }
316
317 let final_word = bv.get_word(idx / 64);
319 if idx % 64 > 0 {
320 rank += (final_word << (64 - (idx % 64))).count_ones() as u64;
321 }
322 rank
323 }
324
325 pub fn hash(&self, item: &T) -> u64 {
330 for i in 0..self.bitvecs.len() {
331 let bv = &self.bitvecs[i];
332 let hash = hashmod(i as u64, item, bv.capacity());
333
334 if bv.contains(hash as usize) {
335 return self.get_rank(hash, i);
336 }
337 }
338
339 unreachable!("must find a hash value");
340 }
341
342 pub fn try_hash<Q>(&self, item: &Q) -> Option<u64>
346 where
347 T: Borrow<Q>,
348 Q: ?Sized + Hash,
349 {
350 for i in 0..self.bitvecs.len() {
351 let bv = &(self.bitvecs)[i];
352 let hash = hashmod(i as u64, item, bv.capacity());
353
354 if bv.contains(hash as usize) {
355 return Some(self.get_rank(hash, i));
356 }
357 }
358
359 None
360 }
361
362 pub fn try_hash_bench<Q>(&self, item: &Q, level: &mut u64) -> Option<u64>
363 where
364 T: Borrow<Q>,
365 Q: ?Sized + Hash,
366 {
367 for i in 0..self.bitvecs.len() {
368 let bv = &(self.bitvecs)[i];
369 let hash = hashmod(i as u64, item, bv.capacity());
370
371 if bv.contains(hash as usize) {
372 *level += i as u64 + 1;
373 return Some(self.get_rank(hash, i));
374 }
375 }
376
377 None
378 }
379}
380
381impl<T: Hash + Debug + Sync + Send> Mphf<T> {
382 pub fn new_parallel(gamma: f64, objects: &[T], starting_seed: Option<u64>) -> Mphf<T> {
385 let mut bitvecs = Vec::new();
387 let mut iter = 0;
388
389 let cx = Context::new(
390 std::cmp::max(255, (gamma * objects.len() as f64) as usize),
391 starting_seed.unwrap_or(0) + iter,
392 );
393
394 objects.into_par_iter().for_each(|v| cx.find_collisions(v));
395 let mut redo_keys = objects
396 .into_par_iter()
397 .filter_map(|v| cx.filter(v))
398 .collect::<Vec<_>>();
399
400 bitvecs.push(cx.a);
401 iter += 1;
402
403 while !redo_keys.is_empty() {
404 let cx = Context::new(
405 std::cmp::max(255, (gamma * redo_keys.len() as f64) as usize),
406 starting_seed.unwrap_or(0) + iter,
407 );
408
409 (&redo_keys)
410 .into_par_iter()
411 .for_each(|&v| cx.find_collisions(v));
412 redo_keys = (&redo_keys)
413 .into_par_iter()
414 .filter_map(|&v| cx.filter(v))
415 .collect();
416
417 bitvecs.push(cx.a);
418 iter += 1;
419 if iter > MAX_ITERS {
420 println!("ran out of key space. items: {:?}", redo_keys);
421 panic!("counldn't find unique hashes");
422 }
423 }
424
425 Mphf {
426 ranks: Self::compute_ranks(&bitvecs),
427 bitvecs: bitvecs.into_boxed_slice(),
428 phantom: PhantomData,
429 }
430 }
431}
432
433struct Context {
434 size: usize,
435 seed: u64,
436 a: BitVector,
437 collide: BitVector,
438}
439
440impl Context {
441 fn new(size: usize, seed: u64) -> Self {
442 Self {
443 size,
444 seed,
445 a: BitVector::new(size),
446 collide: BitVector::new(size),
447 }
448 }
449
450 fn find_collisions<T: Hash>(&self, v: &T) {
451 let idx = hashmod(self.seed, v, self.size) as usize;
452 if !self.collide.contains(idx) && !self.a.insert(idx) {
453 self.collide.insert(idx);
454 }
455 }
456
457 fn find_collisions_sync<T: Hash>(&mut self, v: &T) {
458 let idx = hashmod(self.seed, v, self.size) as usize;
459 if !self.collide.contains(idx) && !self.a.insert_sync(idx) {
460 self.collide.insert_sync(idx);
461 }
462 }
463
464 fn filter<'t, T: Hash>(&self, v: &'t T) -> Option<&'t T> {
465 let idx = hashmod(self.seed, v, self.size) as usize;
466 if self.collide.contains(idx) {
467 self.a.remove(idx);
468 Some(v)
469 } else {
470 None
471 }
472 }
473}
474
475struct Queue<'a, I: 'a, T>
476where
477 &'a I: IntoIterator,
478 <&'a I as IntoIterator>::Item: IntoIterator<Item = T>,
479{
480 keys_object: &'a I,
481 queue: <&'a I as IntoIterator>::IntoIter,
482
483 num_keys: usize,
484 last_key_index: usize,
485
486 job_id: u8,
487
488 phantom_t: PhantomData<T>,
489}
490
491impl<'a, I: 'a, N1, N2, T> Queue<'a, I, T>
492where
493 &'a I: IntoIterator<Item = N1>,
494 N2: Iterator<Item = T> + ExactSizeIterator,
495 N1: IntoIterator<Item = T, IntoIter = N2> + Clone,
496{
497 fn new(keys_object: &'a I, num_keys: usize) -> Queue<'a, I, T> {
498 Queue {
499 keys_object,
500 queue: keys_object.into_iter(),
501
502 num_keys,
503 last_key_index: 0,
504
505 job_id: 0,
506
507 phantom_t: PhantomData,
508 }
509 }
510
511 fn next(&mut self, done_keys_count: &AtomicUsize) -> Option<(N2, u8, usize, usize)> {
512 if self.last_key_index == self.num_keys {
513 loop {
514 let done_count = done_keys_count.load(Ordering::SeqCst);
515
516 if self.num_keys == done_count {
517 self.queue = self.keys_object.into_iter();
518 done_keys_count.store(0, Ordering::SeqCst);
519 self.last_key_index = 0;
520 self.job_id += 1;
521
522 break;
523 }
524 }
525 }
526
527 if self.job_id > 1 {
528 return None;
529 }
530
531 let node = self.queue.next().unwrap();
532 let node_keys_start = self.last_key_index;
533
534 let num_keys = node.clone().into_iter().len();
535
536 self.last_key_index += num_keys;
537
538 Some((node.into_iter(), self.job_id, node_keys_start, num_keys))
539 }
540}
541
542impl<'a, T: 'a + Hash + Debug + Send + Sync> Mphf<T> {
543 pub fn from_chunked_iterator_parallel<I, N>(
545 gamma: f64,
546 objects: &'a I,
547 max_iters: Option<u64>,
548 n: usize,
549 num_threads: usize,
550 ) -> Mphf<T>
551 where
552 &'a I: IntoIterator<Item = N>,
553 N: IntoIterator<Item = T> + Send + Clone,
554 <N as IntoIterator>::IntoIter: ExactSizeIterator,
555 <&'a I as IntoIterator>::IntoIter: Send,
556 I: Sync,
557 {
558 const MAX_BUFFER_SIZE: usize = 50000000;
561 const ONE_PERCENT_KEYS: f32 = 0.01;
562 let min_buffer_keys_threshold: usize = (ONE_PERCENT_KEYS * n as f32) as usize;
563
564 let mut iter: u64 = 0;
565 let mut bitvecs = Vec::<BitVector>::new();
566
567 let global = Arc::new(GlobalContext {
570 done_keys: BitVector::new(std::cmp::max(255, n)),
571 buffered_keys: Mutex::new(Vec::new()),
572 buffer_keys: AtomicBool::new(false),
573 });
574 loop {
575 if max_iters.is_some() && iter > max_iters.unwrap() {
576 error!("ran out of key space. items: {:?}", global.done_keys.len());
577 panic!("counldn't find unique hashes");
578 }
579
580 let keys_remaining = if iter == 0 {
581 n
582 } else {
583 n - global.done_keys.len()
584 };
585 if keys_remaining == 0 {
586 break;
587 }
588 if keys_remaining < MAX_BUFFER_SIZE && keys_remaining < min_buffer_keys_threshold {
589 global.buffer_keys.store(true, Ordering::SeqCst);
590 }
591
592 let size = std::cmp::max(255, (gamma * keys_remaining as f64) as u64);
593 let cx = Arc::new(IterContext {
594 done_keys_count: AtomicUsize::new(0),
595 work_queue: Mutex::new(Queue::new(objects, n)),
596 collide: BitVector::new(size as usize),
597 a: BitVector::new(size as usize),
598 });
599
600 crossbeam_utils::thread::scope(|scope| {
601 for _ in 0..num_threads {
602 let global = global.clone();
603 let cx = cx.clone();
604
605 scope.spawn(move |_| {
606 loop {
607 let (mut node, job_id, offset, num_keys) =
608 match cx.work_queue.lock().unwrap().next(&cx.done_keys_count) {
609 None => break,
610 Some(val) => val,
611 };
612
613 let mut node_pos = 0;
614 for index in 0..num_keys {
615 let key_index = offset + index;
616 if global.done_keys.contains(key_index) {
617 continue;
618 }
619
620 let key = node.nth(index - node_pos).unwrap();
621 node_pos = index + 1;
622
623 let idx = hashmod(iter, &key, size as usize) as usize;
624 let collision = cx.collide.contains(idx);
625 if job_id == 0 {
626 if !collision && !cx.a.insert(idx) {
627 cx.collide.insert(idx);
628 }
629 } else if collision {
630 cx.a.remove(idx);
631 if global.buffer_keys.load(Ordering::SeqCst) {
632 global.buffered_keys.lock().unwrap().push(key);
633 }
634 } else {
635 global.done_keys.insert(key_index);
636 }
637 }
638
639 cx.done_keys_count.fetch_add(num_keys, Ordering::SeqCst);
640 } }); } })
644 .unwrap(); match Arc::try_unwrap(cx) {
647 Ok(cx) => bitvecs.push(cx.a),
648 Err(_) => unreachable!(),
649 }
650
651 iter += 1;
652 if global.buffer_keys.load(Ordering::SeqCst) {
653 break;
654 }
655 } let buffered_keys_vec = global.buffered_keys.lock().unwrap();
658 if buffered_keys_vec.len() > 1 {
659 let mut buffered_mphf = Mphf::new_parallel(1.7, &buffered_keys_vec, Some(iter));
660
661 for i in 0..buffered_mphf.bitvecs.len() {
662 let buff_vec = std::mem::replace(&mut buffered_mphf.bitvecs[i], BitVector::new(0));
663 bitvecs.push(buff_vec);
664 }
665 }
666
667 let ranks = Self::compute_ranks(&bitvecs);
668 Mphf {
669 bitvecs: bitvecs.into_boxed_slice(),
670 ranks,
671 phantom: PhantomData,
672 }
673 }
674}
675
676struct IterContext<'a, I: 'a, N1, N2, T>
677where
678 &'a I: IntoIterator<Item = N1>,
679 N2: Iterator<Item = T> + ExactSizeIterator,
680 N1: IntoIterator<Item = T, IntoIter = N2> + Clone,
681{
682 done_keys_count: AtomicUsize,
683 work_queue: Mutex<Queue<'a, I, T>>,
684 collide: BitVector,
685 a: BitVector,
686}
687
688struct GlobalContext<T> {
689 done_keys: BitVector,
690 buffered_keys: Mutex<Vec<T>>,
691 buffer_keys: AtomicBool,
692}
693
694#[cfg(test)]
695#[macro_use]
696extern crate quickcheck;
697
698#[cfg(test)]
699mod tests {
700
701 use super::*;
702 use std::collections::HashSet;
703 use std::iter::FromIterator;
704
705 fn check_mphf<T>(xs: HashSet<T>) -> bool
707 where
708 T: Sync + Hash + PartialEq + Eq + Debug + Send,
709 {
710 let xsv: Vec<T> = xs.into_iter().collect();
711
712 check_mphf_serial(&xsv) && check_mphf_parallel(&xsv)
714 }
715
716 fn check_mphf_serial<T>(xsv: &[T]) -> bool
718 where
719 T: Hash + PartialEq + Eq + Debug,
720 {
721 let phf = Mphf::new(1.7, xsv);
723
724 let mut hashes: Vec<u64> = xsv.iter().map(|v| phf.hash(v)).collect();
726
727 hashes.sort_unstable();
728
729 let gt: Vec<u64> = (0..xsv.len() as u64).collect();
731 hashes == gt
732 }
733
734 fn check_mphf_parallel<T>(xsv: &[T]) -> bool
736 where
737 T: Sync + Hash + PartialEq + Eq + Debug + Send,
738 {
739 let phf = Mphf::new_parallel(1.7, xsv, None);
741
742 let mut hashes: Vec<u64> = xsv.iter().map(|v| phf.hash(v)).collect();
744
745 hashes.sort_unstable();
746
747 let gt: Vec<u64> = (0..xsv.len() as u64).collect();
749 hashes == gt
750 }
751
752 fn check_chunked_mphf<T>(values: Vec<Vec<T>>, total: usize) -> bool
753 where
754 T: Sync + Hash + PartialEq + Eq + Debug + Send,
755 {
756 let phf = Mphf::from_chunked_iterator(1.7, &values, total);
757
758 let mut hashes: Vec<u64> = values
760 .iter()
761 .flat_map(|x| x.iter().map(|v| phf.hash(&v)))
762 .collect();
763
764 hashes.sort_unstable();
765
766 let gt: Vec<u64> = (0..total as u64).collect();
768 hashes == gt
769 }
770
771 fn check_chunked_mphf_parallel<T>(values: Vec<Vec<T>>, total: usize) -> bool
772 where
773 T: Sync + Hash + PartialEq + Eq + Debug + Send,
774 {
775 let phf = Mphf::from_chunked_iterator_parallel(1.7, &values, None, total, 2);
776
777 let mut hashes: Vec<u64> = values
779 .iter()
780 .flat_map(|x| x.iter().map(|v| phf.hash(&v)))
781 .collect();
782
783 hashes.sort_unstable();
784
785 let gt: Vec<u64> = (0..total as u64).collect();
787 hashes == gt
788 }
789
790 quickcheck! {
791 fn check_int_slices(v: HashSet<u64>, lens: Vec<usize>) -> bool {
792
793 let mut lens = lens;
794
795 let items: Vec<u64> = v.iter().cloned().collect();
796 if lens.is_empty() || lens.iter().all(|x| *x == 0) {
797 lens.clear();
798 lens.push(items.len())
799 }
800
801 let mut slices: Vec<Vec<u64>> = Vec::new();
802
803 let mut total = 0_usize;
804 for slc_len in lens {
805 let end = std::cmp::min(items.len(), total.saturating_add(slc_len));
806 let slc = Vec::from(&items[total..end]);
807 slices.push(slc);
808 total = end;
809
810 if total == items.len() {
811 break;
812 }
813 }
814
815 check_chunked_mphf(slices.clone(), total) && check_chunked_mphf_parallel(slices, total)
816 }
817 }
818
819 quickcheck! {
820 fn check_string(v: HashSet<Vec<String>>) -> bool {
821 check_mphf(v)
822 }
823 }
824
825 quickcheck! {
826 fn check_u32(v: HashSet<u32>) -> bool {
827 check_mphf(v)
828 }
829 }
830
831 quickcheck! {
832 fn check_isize(v: HashSet<isize>) -> bool {
833 check_mphf(v)
834 }
835 }
836
837 quickcheck! {
838 fn check_u64(v: HashSet<u64>) -> bool {
839 check_mphf(v)
840 }
841 }
842
843 quickcheck! {
844 fn check_vec_u8(v: HashSet<Vec<u8>>) -> bool {
845 check_mphf(v)
846 }
847 }
848
849 #[test]
850 fn from_ints_serial() {
851 let items = (0..1000000).map(|x| x * 2);
852 assert!(check_mphf(HashSet::from_iter(items)));
853 }
854}