use crate::tuner::{Algorithm, Tuner, TuningParams};
use crate::utils::*;
use crate::RadixKey;
use arbitrary_chunks::ArbitraryChunks;
#[cfg(feature = "multi-threaded")]
use rayon::current_num_threads;
#[cfg(feature = "multi-threaded")]
use rayon::prelude::*;
use std::cmp::max;
pub struct Sorter<'a> {
multi_threaded: bool,
pub(crate) tuner: &'a (dyn Tuner + Send + Sync),
}
impl<'a> Sorter<'a> {
pub fn new(multi_threaded: bool, tuner: &'a (dyn Tuner + Send + Sync)) -> Self {
Self {
multi_threaded,
tuner,
}
}
#[inline]
fn run_sort<T>(
&self,
level: usize,
bucket: &mut [T],
counts: &[usize; 256],
tile_counts: Option<Vec<[usize; 256]>>,
#[allow(unused)] tile_size: usize,
algorithm: Algorithm,
) where
T: RadixKey + Copy + Sized + Send + Sync,
{
#[allow(unused)]
if let Some(tile_counts) = tile_counts {
match algorithm {
#[cfg(feature = "multi-threaded")]
Algorithm::Scanning => self.scanning_sort_adapter(bucket, counts, level),
#[cfg(feature = "multi-threaded")]
Algorithm::Recombinating => {
self.recombinating_sort_adapter(bucket, counts, &tile_counts, tile_size, level)
}
Algorithm::LrLsb => self.lsb_sort_adapter(true, bucket, counts, 0, level),
Algorithm::Lsb => self.lsb_sort_adapter(false, bucket, counts, 0, level),
Algorithm::Ska => self.ska_sort_adapter(bucket, counts, level),
Algorithm::Comparative => self.comparative_sort(bucket, level),
#[cfg(feature = "multi-threaded")]
Algorithm::Regions => {
self.regions_sort_adapter(bucket, counts, &tile_counts, tile_size, level)
}
#[cfg(feature = "multi-threaded")]
Algorithm::MtOop => {
self.mt_oop_sort_adapter(bucket, level, counts, &tile_counts, tile_size)
}
#[cfg(feature = "multi-threaded")]
Algorithm::MtLsb => self.mt_lsb_sort_adapter(bucket, 0, level, tile_size),
}
} else {
match algorithm {
#[cfg(feature = "multi-threaded")]
Algorithm::Scanning => self.scanning_sort_adapter(bucket, counts, level),
Algorithm::LrLsb => self.lsb_sort_adapter(true, bucket, counts, 0, level),
Algorithm::Lsb => self.lsb_sort_adapter(false, bucket, counts, 0, level),
Algorithm::Ska => self.ska_sort_adapter(bucket, counts, level),
Algorithm::Comparative => self.comparative_sort(bucket, level),
#[cfg(feature = "multi-threaded")]
e => panic!("Bad algorithm: {:?} for len: {}", e, bucket.len()),
}
}
}
fn handle_chunk<T>(
&self,
chunk: &mut [T],
level: usize,
parent_len: Option<usize>,
threads: usize,
) where
T: RadixKey + Sized + Send + Copy + Sync,
{
if chunk.len() <= 1 {
return;
} else if chunk.len() <= 128 {
self.comparative_sort(chunk, level);
return;
}
let use_tiles =
cfg!(feature = "multi-threaded") && self.multi_threaded && chunk.len() >= 260_000;
let tile_size = if use_tiles {
max(30_000, cdiv(chunk.len(), threads))
} else {
chunk.len()
};
let tp = TuningParams {
threads,
level,
total_levels: T::LEVELS,
input_len: chunk.len(),
parent_len,
};
let mut tile_counts: Option<Vec<[usize; 256]>> = None;
let mut already_sorted = false;
if use_tiles {
let (tc, s) = get_tile_counts(chunk, tile_size, level);
tile_counts = Some(tc);
already_sorted = s;
}
let counts = if let Some(tile_counts) = &tile_counts {
aggregate_tile_counts(tile_counts)
} else {
let (counts, s) = get_counts(chunk, level);
already_sorted = s;
counts
};
if already_sorted || (chunk.len() >= 30_000 && is_homogenous_bucket(&counts)) {
if level != 0 {
self.director(chunk, &counts, level - 1);
}
return;
}
let algorithm = self.tuner.pick_algorithm(&tp, &counts);
if tile_counts.is_none() {
tile_counts = match algorithm {
#[cfg(feature = "multi-threaded")]
Algorithm::MtOop
| Algorithm::MtLsb
| Algorithm::Recombinating
| Algorithm::Regions => Some(vec![counts]),
_ => None,
};
}
#[cfg(feature = "work_profiles")]
println!("({}) PAR: {:?}", level, algorithm);
self.run_sort(level, chunk, &counts, tile_counts, tile_size, algorithm);
}
#[inline]
pub fn top_level_director<T>(&self, bucket: &mut [T])
where
T: RadixKey + Sized + Send + Copy + Sync,
{
#[cfg(feature = "multi-threaded")]
let threads = current_num_threads();
#[cfg(not(feature = "multi-threaded"))]
let threads = 1;
let level = T::LEVELS - 1;
self.handle_chunk(bucket, level, None, threads);
}
#[inline]
#[cfg(feature = "multi-threaded")]
pub fn multi_threaded_director<T>(&self, bucket: &mut [T], counts: &[usize; 256], level: usize)
where
T: RadixKey + Send + Copy + Sync,
{
let parent_len = Some(bucket.len());
let threads = current_num_threads();
bucket
.arbitrary_chunks_mut(counts)
.par_bridge()
.for_each(|chunk| self.handle_chunk(chunk, level, parent_len, threads));
}
#[inline]
pub fn single_threaded_director<T>(&self, bucket: &mut [T], counts: &[usize; 256], level: usize)
where
T: RadixKey + Send + Sync + Copy,
{
let parent_len = Some(bucket.len());
let threads = 1;
bucket
.arbitrary_chunks_mut(counts)
.for_each(|chunk| self.handle_chunk(chunk, level, parent_len, threads));
}
#[inline]
pub fn director<T>(&self, bucket: &mut [T], counts: &[usize; 256], level: usize)
where
T: RadixKey + Send + Sync + Copy,
{
if cfg!(feature = "multi-threaded") && self.multi_threaded {
#[cfg(feature = "multi-threaded")]
self.multi_threaded_director(bucket, counts, level);
} else {
self.single_threaded_director(bucket, counts, level);
}
}
}