use num_cpus;
use std::slice::{Chunks, ChunksMut};
use num::traits::Float;
use crossbeam;
use std::ops::Range;
use std::sync::{Mutex, Arc};
use std::mem;
use super::RealNumber;
use std::iter::Iterator;
#[derive(Copy)]
#[derive(Clone)]
#[derive(PartialEq)]
#[derive(Debug)]
pub enum Complexity {
Small,
Medium,
Large
}
#[derive(Debug, Copy)]
#[repr(C)]
pub struct MultiCoreSettings {
pub core_limit: usize,
pub early_temp_allocation: bool
}
impl MultiCoreSettings {
pub fn default() -> MultiCoreSettings {
Self::new(num_cpus::get() / 2, false)
}
pub fn new(core_limit: usize, early_temp_allocation: bool) -> MultiCoreSettings {
MultiCoreSettings {
core_limit: if core_limit >= 1 { core_limit } else { 1 },
early_temp_allocation: early_temp_allocation
}
}
}
impl Clone for MultiCoreSettings {
fn clone(&self) -> Self {
MultiCoreSettings {
core_limit: self.core_limit,
early_temp_allocation: self.early_temp_allocation
}
}
fn clone_from(&mut self, source: &Self) {
self.core_limit = source.core_limit;
}
}
pub struct Chunk;
impl Chunk
{
#[inline]
fn determine_number_of_chunks(array_length: usize, complexity: Complexity, settings: &MultiCoreSettings) -> usize
{
let mut cores = num_cpus::get();
if cores > settings.core_limit {
cores = settings.core_limit;
}
if complexity == Complexity::Large || cores == 1 {
cores
}
else if complexity == Complexity::Small {
if array_length < 500000 {
1
}
else {
if cores >= 2 {
2
} else {
1
}
}
}
else { if array_length < 10000 {
1
}
else if array_length < 50000 {
if cores >= 2 {
2
} else {
1
}
}
else {
cores
}
}
}
#[inline]
fn partition<T>(array: &[T], array_length: usize, step_size: usize, number_of_chunks: usize) -> Chunks<T>
where T : Float + Copy + Clone + Send
{
let chunk_size = Chunk::calc_chunk_size(array_length, step_size, number_of_chunks);
array[0 .. array_length].chunks(chunk_size)
}
#[inline]
fn partition_mut<T>(array: &mut [T], array_length: usize, step_size: usize, number_of_chunks: usize) -> ChunksMut<T>
where T : Copy + Clone + Send
{
let chunk_size = Chunk::calc_chunk_size(array_length, step_size, number_of_chunks);
array[0 .. array_length].chunks_mut(chunk_size)
}
#[inline]
fn calc_chunk_size(array_length: usize, step_size: usize, number_of_chunks: usize) -> usize
{
let mut chunk_size = (array_length as f64/ number_of_chunks as f64).ceil() as usize;
let remainder = chunk_size % step_size;
if remainder > 0
{
chunk_size += step_size - chunk_size % step_size;
}
chunk_size
}
#[inline]
fn partition_in_ranges(array_length: usize, step_size: usize, number_of_chunks: usize) -> Vec<Range<usize>>
{
let chunk_size = Chunk::calc_chunk_size(array_length, step_size, number_of_chunks);
let mut ranges = Vec::with_capacity(number_of_chunks);
let mut sum = 0;
for i in 0..number_of_chunks {
let new_sum = if i < number_of_chunks - 1 { sum + chunk_size } else { array_length };
ranges.push(Range { start: sum, end: new_sum });
sum = new_sum;
}
ranges
}
#[inline]
pub fn execute_partial<T,S,F>(
complexity: Complexity,
settings: &MultiCoreSettings,
array: &mut [T], array_length: usize, step_size: usize,
arguments:S, ref function: F)
where F: Fn(&mut [T], S) + 'static + Sync,
T: RealNumber,
S: Sync + Copy + Send
{
let number_of_chunks = Chunk::determine_number_of_chunks(array_length, complexity, settings);
if number_of_chunks > 1
{
let chunks = Chunk::partition_mut(array, array_length, step_size, number_of_chunks);
crossbeam::scope(|scope| {
for chunk in chunks {
scope.spawn(move|| {
function(chunk, arguments);
});
}
});
}
else
{
function(&mut array[0..array_length], arguments);
}
}
#[inline]
pub fn execute_with_range<T,S,F>(
complexity: Complexity,
settings: &MultiCoreSettings,
array: &mut [T], array_length: usize, step_size: usize,
arguments: S, ref function: F)
where F: Fn(&mut [T], Range<usize>, S) + 'static + Sync,
T : Copy + Clone + Send + Sync,
S: Sync + Copy + Send
{
let number_of_chunks = Chunk::determine_number_of_chunks(array_length, complexity, settings);
if number_of_chunks > 1
{
let chunks = Chunk::partition_mut(array, array_length, step_size, number_of_chunks);
let ranges = Chunk::partition_in_ranges(array_length, step_size, chunks.len());
crossbeam::scope(|scope| {
for chunk in chunks.zip(ranges) {
scope.spawn(move|| {
function(chunk.0, chunk.1, arguments);
});
}
});
}
else
{
function(&mut array[0..array_length], Range { start: 0, end: array_length }, arguments);
}
}
#[inline]
pub fn execute_sym_pairs_with_range<T,S,F>(
complexity: Complexity,
settings: &MultiCoreSettings,
array: &mut [T], array_length: usize, step_size: usize,
arguments: S, ref function: F)
where F: Fn(&mut &mut [T], &Range<usize>, &mut &mut [T], &Range<usize>, S) + 'static + Sync,
T: Copy + Clone + Send + Sync,
S: Sync + Copy + Send
{
let number_of_chunks = 2 * Chunk::determine_number_of_chunks(array_length, complexity, settings);
if number_of_chunks > 2
{
let chunks = Chunk::partition_mut(array, array_length, step_size, number_of_chunks);
let ranges = Chunk::partition_in_ranges(array_length, step_size, chunks.len());
let mut i = 0;
let (mut chunks1, mut chunks2): (Vec<_>, Vec<_>) =
chunks.partition(|_c| { i += 1; i <= number_of_chunks / 2 });
i = 0;
let (ranges1, ranges2): (Vec<_>, Vec<_>) =
ranges.iter().partition(|_r| { i += 1; i <= number_of_chunks / 2 });
let chunks2 = chunks2.iter_mut().rev();
let ranges2 = ranges2.iter().rev();
let zipped1 = chunks1.iter_mut().zip(ranges1);
let zipped2 = chunks2.zip(ranges2);
crossbeam::scope(|scope| {
for chunk in zipped1.zip(zipped2) {
scope.spawn(move|| {
let (pair1, pair2) = chunk;
function(pair1.0, pair1.1, pair2.0, pair2.1, arguments);
});
}
});
}
else
{
let mut chunks = Chunk::partition_mut(array, array_length, step_size, number_of_chunks);
let mut chunks1 = chunks.next().unwrap();
let len1 = chunks1.len();
let mut chunks2 = chunks.next().unwrap();
function(
&mut chunks1,
&Range { start: 0, end: len1 },
&mut chunks2,
&Range { start: len1, end: array_length },
arguments);
}
}
#[inline]
pub fn get_a_fold_b<F, T, R>(
complexity: Complexity,
settings: &MultiCoreSettings,
a: &[T], a_len: usize, a_step: usize,
b: &[T], b_len: usize, b_step: usize,
ref function: F) -> Vec<R>
where F: Fn(&[T], Range<usize>, &[T]) -> R + 'static + Sync,
T: Float + Copy + Clone + Send + Sync,
R: Send
{
let number_of_chunks = Chunk::determine_number_of_chunks(a_len, complexity, settings);
if number_of_chunks > 1
{
let chunks = Chunk::partition(b, b_len, b_step, number_of_chunks);
let ranges = Chunk::partition_in_ranges(a_len, a_step, chunks.len());
let result = Vec::with_capacity(chunks.len());
let stack_array = Arc::new(Mutex::new(result));
crossbeam::scope(|scope| {
for chunk in chunks.zip(ranges) {
let stack_array = stack_array.clone();
scope.spawn(move|| {
let r = function(a, chunk.1, chunk.0);
stack_array.lock().unwrap().push(r);
});
}
});
let mut guard = stack_array.lock().unwrap();
mem::replace(&mut guard, Vec::new())
}
else
{
let result = function(a, Range { start: 0, end: a_len }, &b[0..b_len]);
vec![result]
}
}
#[inline]
pub fn get_chunked_results<F, S, T, R>(
complexity: Complexity,
settings: &MultiCoreSettings,
a: &[T], a_len: usize, a_step: usize,
arguments:S, ref function: F) -> Vec<R>
where F: Fn(&[T], Range<usize>, S) -> R + 'static + Sync,
T: Float + Copy + Clone + Send + Sync,
R: Send,
S: Sync + Copy + Send
{
let number_of_chunks = Chunk::determine_number_of_chunks(a_len, complexity, settings);
if number_of_chunks > 1
{
let chunks = Chunk::partition(a, a_len, a_step, number_of_chunks);
let ranges = Chunk::partition_in_ranges(a_len, a_step, chunks.len());
let result = Vec::with_capacity(chunks.len());
let stack_array = Arc::new(Mutex::new(result));
crossbeam::scope(|scope| {
for chunk in chunks.zip(ranges) {
let stack_array = stack_array.clone();
scope.spawn(move|| {
let r = function(chunk.0, chunk.1, arguments);
stack_array.lock().unwrap().push(r);
});
}
});
let mut guard = stack_array.lock().unwrap();
mem::replace(&mut guard, Vec::new())
}
else
{
let result = function(&a[0..a_len], Range { start: 0, end: a_len }, arguments);
vec![result]
}
}
#[inline]
pub fn from_src_to_dest<T,S,F>(
complexity: Complexity,
settings: &MultiCoreSettings,
original: &[T], original_length: usize, original_step: usize,
target: &mut [T], target_length: usize, target_step: usize,
arguments: S, ref function: F)
where F: Fn(&[T], Range<usize>, &mut [T], S) + 'static + Sync,
T: Float + Copy + Clone + Send + Sync,
S: Sync + Copy + Send
{
let number_of_chunks = Chunk::determine_number_of_chunks(original_length, complexity, settings);
if number_of_chunks > 1
{
let chunks = Chunk::partition_mut(target, target_length, target_step, number_of_chunks);
let ranges = Chunk::partition_in_ranges(original_length, original_step, chunks.len());
crossbeam::scope(|scope| {
for chunk in chunks.zip(ranges) {
scope.spawn(move|| {
function(original, chunk.1, chunk.0, arguments);
});
}
});
}
else
{
function(original, Range { start: 0, end: original_length }, &mut target[0..target_length], arguments);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::ops::Range;
#[test]
fn partition_array()
{
let mut array = [0.0; 256];
let chunks = Chunk::partition_mut(&mut array, 256, 4, 2);
assert_eq!(chunks.len(), 2);
for chunk in chunks
{
assert_eq!(chunk.len(), 128);
}
}
#[test]
fn partition_array_8_cores()
{
let mut array = [0.0; 1023];
let chunks = Chunk::partition_mut(&mut array, 1023, 4, 8);
assert_eq!(chunks.len(), 8);
let mut i = 0;
for chunk in chunks
{
let expected = if i >= 7 { 127 } else { 128 };
assert_eq!(chunk.len(), expected);
i += 1;
}
}
#[test]
fn partitionin_ranges()
{
let ranges = Chunk::partition_in_ranges(1023, 4, 2);
assert_eq!(ranges.len(), 2);
assert_eq!(ranges[0], Range { start: 0, end: 512 });
assert_eq!(ranges[1], Range { start: 512, end: 1023 });
}
}