use std::{cmp::Reverse, collections::BinaryHeap, ops::Range};
use bitm::{BitAccess, BitVec};
use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator};
use crate::seeds::SeedSize;
use super::{conf::Conf, cyclic::CyclicSet, cyclic::GenericUsedValue, evaluator::BucketToActivateEvaluator, seed_chooser::{SeedChooser, SeedOnlyNoBump}, MAX_WINDOW_SIZE, WINDOW_SIZE};
use rayon::prelude::*;
#[inline]
fn bucket_sizes_st(keys: &[u64], conf: &Conf) -> Box<[usize]> {
let mut buckets = vec![0; conf.buckets_num + 1].into_boxed_slice();
for key in keys.iter() { unsafe{ *buckets.get_unchecked_mut(conf.bucket_for(*key)) += 1 } }
buckets
}
fn bucket_sizes_mt(keys: &[u64], conf: &Conf, mut threads_num: usize) -> Box<[usize]> {
threads_num = threads_num.min(keys.len() / (8*4096));
if threads_num <= 1 { return bucket_sizes_st(keys, conf); }
let mut buckets = vec![0; conf.buckets_num + 1].into_boxed_slice();
let mut threads = Vec::with_capacity(threads_num);
let mut remaining_buckets = &mut buckets[..];
let mut remaining_first = 0;
let mut key_idx = 0;
while key_idx < keys.len() {
let mut new_key_idx = key_idx + (keys.len() - key_idx) / threads_num;
if let Some(mut last_bucket_for_thread) = keys.get(new_key_idx).map(|key| conf.bucket_for(*key)) {
let keys_for_thread = &keys[key_idx..new_key_idx];
unsafe{ *remaining_buckets.get_unchecked_mut(last_bucket_for_thread-remaining_first) += 1 };
new_key_idx += 1;
while new_key_idx < keys.len() {
let b = conf.bucket_for(keys[new_key_idx]);
unsafe{ *remaining_buckets.get_unchecked_mut(b-remaining_first) += 1 };
new_key_idx += 1;
if b != last_bucket_for_thread { break; }
}
last_bucket_for_thread += 1;
let thread_buckets;
(thread_buckets, remaining_buckets) = remaining_buckets.split_at_mut(last_bucket_for_thread-remaining_first);
threads.push((keys_for_thread, thread_buckets, remaining_first));
remaining_first = last_bucket_for_thread;
} else {
break;
}
key_idx = new_key_idx;
threads_num -= 1;
}
threads.push((&keys[key_idx..], remaining_buckets, remaining_first));
threads.into_par_iter().for_each(|(keys, buckets, first)| {
for key in keys.iter() {
unsafe{ *buckets.get_unchecked_mut(conf.bucket_for(*key)-first) += 1 };
}
});
buckets
}
#[inline(always)]
fn accumulative_sum(buckets: std::slice::IterMut<'_, usize>) -> usize {
let mut sum = 0;
for b in buckets {
let inc = *b; *b = sum; sum += inc;
}
sum
}
pub fn bucket_begin_st(keys: &[u64], conf: &Conf) -> Box<[usize]> {
let mut buckets = bucket_sizes_st(keys, conf);
let sum = accumulative_sum(buckets.iter_mut()); debug_assert_eq!(sum, keys.len());
buckets
}
pub fn bucket_begin_mt(keys: &[u64], conf: &Conf, threads_num: usize) -> Box<[usize]> {
let mut buckets = bucket_sizes_mt(keys, conf, threads_num);
let sum = accumulative_sum(buckets.iter_mut()); debug_assert_eq!(sum, keys.len());
buckets
}
pub(crate) struct BuildConf<'k, BE: BucketToActivateEvaluator, SS: SeedSize, SC: SeedChooser> {
conf: Conf,
span_limit: u16,
evaluator: BE,
keys: &'k [u64],
bucket_begin: Box<[usize]>,
seed_chooser: SC,
seed_size: SS
}
fn construct_unassigned(unassigned_len: usize) -> Box<[u64]> {
let mut unassigned_values = Box::with_filled_bits(unassigned_len);
if unassigned_len % 64 != 0 {
*unassigned_values.last_mut().unwrap() >>= 64 - unassigned_len % 64;
}
unassigned_values
}
impl<'k, BE: BucketToActivateEvaluator, SS: SeedSize, SC: SeedChooser> BuildConf<'k, BE, SS, SC> {
#[inline]
pub fn new(keys: &'k [u64], conf: Conf, seed_size: SS, span_limit: u16, evaluator: BE, bucket_begin: Box<[usize]>, seed_chooser: SC) -> (Self, Box<[SS::VecElement]>) {
let seeds = conf.new_seeds_vec(seed_size);
(Self {
conf,
span_limit,
keys,
bucket_begin,
evaluator,
seed_chooser,
seed_size
}, seeds)
}
#[inline]
pub fn clear_assigned_from_bucket(&self, bucket: usize, seeds: &[SS::VecElement], unassigned_values: &mut [u64], unassigned_len: &mut usize) {
let seed = unsafe{ self.seed_size.get_seed(&seeds, bucket) };
if SC::BUMPING && seed == 0 { return; }
let keys = &self.keys[self.bucket_begin[bucket]..self.bucket_begin[bucket+1]];
for key_hash in keys {
unassigned_values.clear_bit(self.seed_chooser.f(*key_hash, seed, &self.conf));
}
*unassigned_len -= keys.len();
}
pub fn unassigned_values(&self, seeds: &[SS::VecElement]) -> (Box<[u64]>, usize) {
let mut unassigned_len = self.conf.output_range(self.seed_chooser, self.seed_size.into());
let mut unassigned_values = construct_unassigned(unassigned_len);
for bucket in 0..self.bucket_begin.len()-1 {
self.clear_assigned_from_bucket(bucket, seeds, &mut unassigned_values, &mut unassigned_len);
}
(unassigned_values, unassigned_len)
}
pub fn unassigned_len(&self, seeds: &[SS::VecElement]) -> usize {
if !SC::BUMPING { return 0; }
(0..self.bucket_begin.len()-1)
.filter(|bucket| unsafe{ self.seed_size.get_seed(&seeds, *bucket) } == 0)
.map(|bucket| self.bucket_begin[bucket+1] - self.bucket_begin[bucket])
.sum()
}
}
#[inline] fn gap_for(effective_slice_len: u16, bucket_num: usize, output_range: usize) -> usize {
let effective_slice_len = effective_slice_len as usize;
effective_slice_len * bucket_num / (output_range + 1 - effective_slice_len) + 1
}
#[inline(always)]
pub(crate) fn build_last_level<'k, BE, SS: SeedSize>(keys: &'k [u64], conf: Conf, seed_size: SS, evaluator: BE)
-> Option<(Box<[SS::VecElement]>, Box<[u64]>, usize)>
where BE: BucketToActivateEvaluator + Send + Sync, BE::Value: Send
{
let (builder, mut seeds) = BuildConf::new(keys, conf, seed_size, WINDOW_SIZE, evaluator, bucket_begin_st(keys, &conf), SeedOnlyNoBump);
let mut tb = ThreadBuilder::<SeedOnlyNoBump, _, _>::new(&builder, 0..conf.buckets_num, 0, &mut seeds);
tb.build();
if !tb.finished() { return None; }
drop(tb);
let (unassigned_values, unassigned_len) = builder.unassigned_values(&seeds);
Some((seeds, unassigned_values, unassigned_len))
}
#[inline(always)]
pub(crate) fn build_st<'k, SC, BE, SS: SeedSize>(keys: &'k [u64], conf: Conf, seed_size: SS, evaluator: BE, seed_chooser: SC)
-> (Box<[SS::VecElement]>, BuildConf<'k, BE, SS, SC>)
where SC: SeedChooser, BE: BucketToActivateEvaluator
{
let (builder, mut seeds) = BuildConf::new(keys, conf, seed_size, WINDOW_SIZE, evaluator, bucket_begin_st(keys, &conf), seed_chooser);
ThreadBuilder::<SC, _, _>::new(&builder, 0..conf.buckets_num, 0, &mut seeds).build();
(seeds, builder)
}
pub(crate) fn build_mt<'k, SC, BE, SS: SeedSize>(keys: &'k [u64], conf: Conf, seed_size: SS, span_limit: u16, evaluator: BE, seed_chooser: SC, threads_num: usize)
-> (Box<[SS::VecElement]>, BuildConf<'k, BE, SS, SC>)
where SC: SeedChooser + Sync, BE: BucketToActivateEvaluator + Sync, BE::Value: Send
{
let threads_num = threads_num.min(rayon::current_num_threads()).min(conf.buckets_num / 4096).max(1);
if threads_num == 1 {
let (builder, mut seeds) = BuildConf::new(keys, conf, seed_size, span_limit, evaluator, bucket_begin_st(keys, &conf), seed_chooser);
ThreadBuilder::<SC, _, _>::new(&builder, 0..conf.buckets_num, 0, &mut seeds).build();
return (seeds, builder);
}
let bucket_begin = bucket_begin_mt(keys, &conf, threads_num); let chunk_size = SS::VEC_ELEMENT_BIT_SIZE >> seed_size.into().trailing_zeros();
let buckets_per_thread = ((conf.buckets_num + (threads_num*chunk_size)/2) / (threads_num*chunk_size)) * chunk_size;
let seed_words_per_thread = buckets_per_thread * seed_size.into() as usize / SS::VEC_ELEMENT_BIT_SIZE;
let (builder, mut seeds) = BuildConf::new(keys, conf, seed_size, span_limit, evaluator, bucket_begin, seed_chooser);
let mut thread_builders = Vec::with_capacity(threads_num);
let mut bucket_begin = 0;
let mut remaining_seeds = &mut seeds[..];
let gap = gap_for(conf.slice_len() + seed_chooser.extra_shift(seed_size.into()),
conf.buckets_num, conf.output_range(seed_chooser, seed_size.into()));
for _ in 0..threads_num-1 {
let seeds;
(seeds, remaining_seeds) = remaining_seeds.split_at_mut(seed_words_per_thread);
thread_builders.push(ThreadBuilder::new(&builder, bucket_begin..bucket_begin+buckets_per_thread, gap, seeds));
bucket_begin += buckets_per_thread;
}
thread_builders.push(ThreadBuilder::<SC, _, _>::new(&builder, bucket_begin..conf.buckets_num, 0, remaining_seeds));
thread_builders.par_iter_mut().for_each(ThreadBuilder::<SC, _, _>::build); for next in 1..thread_builders.len() {
let prev = next-1;
for bucket in 0..gap {
let seed = unsafe{ seed_size.get_seed(&thread_builders[next].seeds, bucket) } as u16;
if SC::BUMPING && seed == 0 { continue; }
for key in &builder.keys[thread_builders[next].bucket_begin[bucket]..thread_builders[next].bucket_begin[bucket+1]] {
thread_builders[prev].used_values.add(builder.seed_chooser.f(*key, seed, &builder.conf));
}
}
thread_builders[prev].buckets_num += gap;
thread_builders[prev].build();
}
drop(thread_builders);
(seeds, builder)
}
#[cfg_attr(
any(
target_arch = "x86_64",
target_arch = "aarch64",
target_arch = "powerpc64",
),
repr(align(128))
)]
#[cfg_attr(
any(
target_arch = "arm",
target_arch = "mips",
target_arch = "mips32r6",
target_arch = "mips64",
target_arch = "mips64r6",
target_arch = "sparc",
target_arch = "hexagon",
),
repr(align(32))
)]
#[cfg_attr(target_arch = "m68k", repr(align(16)))]
#[cfg_attr(target_arch = "s390x", repr(align(256)))]
#[cfg_attr(
not(any(
target_arch = "x86_64",
target_arch = "aarch64",
target_arch = "powerpc64",
target_arch = "arm",
target_arch = "mips",
target_arch = "mips32r6",
target_arch = "mips64",
target_arch = "mips64r6",
target_arch = "sparc",
target_arch = "hexagon",
target_arch = "m68k",
target_arch = "s390x",
)),
repr(align(64))
)]
struct ThreadBuilder<'k, SC: SeedChooser, BE: BucketToActivateEvaluator, SS: SeedSize> {
conf: &'k BuildConf<'k, BE, SS, SC>,
bucket_begin: &'k [usize],
span_begin: usize,
buckets_num: usize,
used_values: SC::UsedValues,
value_to_clear: usize,
candidates_to_active: BinaryHeap<(BE::Value, Reverse<usize>)>, in_candidates_to_active: CyclicSet<{MAX_WINDOW_SIZE/64}>,
seeds: &'k mut [SS::VecElement],
}
impl<'k, SC: SeedChooser, BE: BucketToActivateEvaluator, SS: SeedSize> ThreadBuilder<'k, SC, BE, SS> {
pub(crate) fn new(conf: &'k BuildConf<'k, BE, SS, SC>, buckets: Range<usize>, gap: usize, seeds: &'k mut [SS::VecElement]) -> Self {
Self {
used_values: SC::UsedValues::default(),
conf,
span_begin: 0,
buckets_num: buckets.len()-gap,
value_to_clear: 0,
candidates_to_active: BinaryHeap::with_capacity(conf.span_limit as usize),
in_candidates_to_active: CyclicSet::default(),
bucket_begin: &conf.bucket_begin[buckets.start..buckets.end+1],
seeds,
}
}
pub(crate) fn build(&mut self) {
if !self.find_nonempty() { return; }
self.value_to_clear = self.slice_begin(self.span_begin); self.add_candidates_from(self.span_begin);
while let Some((_, Reverse(best_bucket))) = self.candidates_to_active.pop() {
self.in_candidates_to_active.remove(best_bucket);
let best_seed = self.best_seed(best_bucket);
if !SC::BUMPING && best_seed == u16::MAX { return; }
unsafe{ self.conf.seed_size.set_seed(&mut self.seeds, best_bucket, best_seed) };
if best_bucket == self.span_begin {
let old_span_end = self.span_end();
self.span_begin += 1;
while self.span_begin < old_span_end && !self.in_candidates_to_active.contain(self.span_begin) {
self.span_begin += 1;
}
if self.span_begin == old_span_end {
if !self.find_nonempty() { return; }
}
self.clear_used();
self.add_candidates_from(old_span_end);
}
}
}
#[inline]
fn find_nonempty(&mut self) -> bool {
loop {
if self.span_begin == self.buckets_num { return false; }
if !self.bucket_is_empty(self.span_begin) { return true; }
self.span_begin += 1;
}
}
#[inline]
fn finished(&self) -> bool {
self.span_begin == self.buckets_num
}
#[inline]
fn add_candidates_from(&mut self, first_to_add: usize) {
for bucket in first_to_add..self.span_end() {
let bucket_size = self.bucket_size(bucket);
if bucket_size != 0 {
self.candidates_to_active.push((self.conf.evaluator.eval(bucket, bucket_size), Reverse(bucket)));
self.in_candidates_to_active.add(bucket);
}
}
}
#[inline]
pub fn clear_used(&mut self) {
let end = self.slice_begin(self.span_begin); while self.value_to_clear != end { self.used_values.remove(self.value_to_clear);
self.value_to_clear += 1;
}
}
#[inline]
fn slice_begin(&self, non_empty_bucket: usize) -> usize {
self.conf.conf.slice_begin(self.conf.keys[self.bucket_begin[non_empty_bucket]])
}
#[inline(always)]
fn best_seed(&mut self, bucket_nr: usize) -> u16 {
let keys = &self.conf.keys[self.bucket_begin[bucket_nr]..self.bucket_begin[bucket_nr+1]];
self.conf.seed_chooser.best_seed(&mut self.used_values, keys, &self.conf.conf, self.conf.seed_size.into())
}
#[inline]
pub(crate) fn span_end(&self) -> usize {
(self.span_begin + self.conf.span_limit as usize).min(self.buckets_num)
}
#[inline]
pub(crate) fn bucket_size(&self, bucket_nr: usize) -> usize {
self.bucket_begin[bucket_nr+1] - self.bucket_begin[bucket_nr]
}
#[inline]
pub(crate) fn bucket_is_empty(&self, bucket_nr: usize) -> bool {
self.bucket_begin[bucket_nr+1] == self.bucket_begin[bucket_nr]
}
}