use ::std::mem::replace;
use ::{Unsigned, RadixSorter, RadixSorterBase};
use stash::Stash;
use batched_vec::{BatchedVec, BatchedVecX256};
macro_rules! per_cache_line {
($t:ty) => {{ ::std::cmp::min(64 / ::std::mem::size_of::<$t>(), 4) }}
}
macro_rules! lines_per_page {
() => {{ 1024 }}
}
pub struct Sorter<T> {
work: Vec<(usize, Vec<Vec<T>>)>,
done: BatchedVec<T>, buckets: BatchedVecX256<T>,
stage: Vec<T>, stash: Stash<T>, }
impl<T, U: Unsigned> RadixSorter<T, U> for Sorter<T> {
#[inline(always)]
fn push<F: Fn(&T)->U>(&mut self, element: T, bytes: &F) {
let depth = U::bytes();
let byte = (bytes(&element).as_u64() >> (8 * depth)) as u8;
self.buckets.get_mut(byte).push(element, &mut self.stash);
}
#[inline]
fn push_batch<F: Fn(&T)->U>(&mut self, mut batch: Vec<T>, bytes: &F) {
for element in batch.drain(..) {
self.push(element, &|x| bytes(x));
}
self.stash.give(batch);
}
fn finish_into<F: Fn(&T)->U>(&mut self, target: &mut Vec<Vec<T>>, bytes: &F) {
self.finish_into_and(target, bytes, |slice| slice.sort_by(|x,y| bytes(x).cmp(&bytes(y))));
}
fn sort<F: Fn(&T)->U>(&mut self, batches: &mut Vec<Vec<T>>, bytes: &F) {
self.sort_and(batches, bytes, |slice| slice.sort_by(|x,y| bytes(x).cmp(&bytes(y))));
}
}
impl<T> RadixSorterBase<T> for Sorter<T> {
fn new() -> Self {
Sorter {
work: Vec::new(),
done: BatchedVec::new(),
buckets: BatchedVecX256::new(),
stage: Vec::new(),
stash: Stash::new(1 << 10),
}
}
fn rebalance(&mut self, buffers: &mut Vec<Vec<T>>, intended: usize) {
self.stash.rebalance(buffers, intended);
}
}
impl<T> Sorter<T> {
pub fn finish_into_and<U: Unsigned, F: Fn(&T)->U, L: Fn(&mut Vec<T>)>(&mut self, target: &mut Vec<Vec<T>>, bytes: F, action: L) {
let depth = U::bytes() - 1;
for byte in (0 .. 256).rev() {
let mut bucket = self.buckets.get_mut(byte as u8);
if !bucket.is_empty() {
self.work.push((depth, bucket.finish()));
}
}
while let Some((depth, mut list)) = self.work.pop() {
self.ingest(&mut list, &bytes, depth, &action);
}
self.done.ref_mut().finish_into(target)
}
pub fn sort_and<U: Unsigned, F: Fn(&T)->U, L: Fn(&mut Vec<T>)>(&mut self, source: &mut Vec<Vec<T>>, bytes: F, action: L) {
if source.len() > 1 {
self.work.push((U::bytes(), replace(source, Vec::new())));
while let Some((depth, mut list)) = self.work.pop() {
self.ingest(&mut list, &bytes, depth, &action);
}
self.done.ref_mut().finish_into(source);
}
else if source.len() == 1 {
action(&mut source[0]);
}
}
fn ingest<U: Unsigned, F: Fn(&T)->U, L: Fn(&mut Vec<T>)>(&mut self, source: &mut Vec<Vec<T>>, bytes: &F, depth: usize, action: &L) {
if source.len() > 1 {
if depth > 0 {
let depth = depth - 1;
for mut batch in source.drain(..) {
for element in batch.drain(..) {
let byte = (bytes(&element).as_u64() >> (8 * depth)) as u8;
self.buckets.get_mut(byte).push(element, &mut self.stash);
}
self.stash.give(batch);
}
for byte in (0 .. 256).rev() {
let mut bucket = self.buckets.get_mut(byte as u8);
if !bucket.is_empty() {
self.work.push((depth, bucket.finish()));
}
}
}
else {
for mut batch in source.drain(..) {
for element in batch.drain(..) {
self.stage.push(element);
}
self.stash.give(batch);
}
action(&mut self.stage);
for element in self.stage.drain(..) {
self.done.ref_mut().push(element, &mut self.stash);
}
}
}
else if let Some(mut batch) = source.pop() {
action(&mut batch);
self.done.ref_mut().push_vec(batch, &mut self.stash);
}
}
}
mod test {
#[test]
fn test_msb() {
use ::std::mem::replace;
let size = (1 << 20) as usize;
let mut batch = Vec::with_capacity(1 << 10);
let mut vector = Vec::new();
for i in 1..(size+1) {
if batch.len() == batch.capacity() {
vector.push(replace(&mut batch, Vec::with_capacity(1 << 10)));
}
batch.push(i);
}
vector.push(replace(&mut batch, Vec::with_capacity(1 << 10)));
use {RadixSorter, RadixSorterBase};
let mut sorter = super::Sorter::new();
sorter.sort(&mut vector, &|&x| x);
let mut prev = 0;
for item in vector.drain(..).flat_map(|batch| batch.into_iter()) {
assert!(prev < item);
prev = item;
}
assert!(prev == size);
}
}