#![doc = include_str!("../README.md")]
#![warn(missing_docs)]
#![warn(clippy::missing_docs_in_private_items)]
use std::ops::ControlFlow;
pub use paralight::iter;
use paralight::iter::{Accumulator, ExactSizeAccumulator, GenericThreadPool, SourceCleanup};
pub use self::task::*;
pub use self::thread_pool::*;
mod task;
mod thread_pool;
mod util;
pub fn split_per_item() -> impl GenericThreadPool {
struct SplitPerItem;
unsafe impl GenericThreadPool for SplitPerItem {
fn upper_bounded_pipeline<Output: Send, Accum>(
self,
input_len: usize,
init: impl Fn() -> Accum + Sync,
process_item: impl Fn(Accum, usize) -> ControlFlow<Accum, Accum> + Sync,
finalize: impl Fn(Accum) -> Output + Sync,
reduce: impl Fn(Output, Output) -> Output,
cleanup: &(impl SourceCleanup + Sync),
) -> Output {
ThreadPool::with_current(|f| {
f.split_per_item().upper_bounded_pipeline(
input_len,
init,
process_item,
finalize,
reduce,
cleanup,
)
})
}
fn iter_pipeline<Output, Accum: Send>(
self,
input_len: usize,
accum: impl Accumulator<usize, Accum> + Sync,
reduce: impl ExactSizeAccumulator<Accum, Output>,
cleanup: &(impl SourceCleanup + Sync),
) -> Output {
ThreadPool::with_current(|f| {
f.split_per_item()
.iter_pipeline(input_len, accum, reduce, cleanup)
})
}
}
SplitPerItem
}
pub fn split_per(chunk_size: usize) -> impl GenericThreadPool {
struct ThreadPerChunk(usize);
unsafe impl GenericThreadPool for ThreadPerChunk {
fn upper_bounded_pipeline<Output: Send, Accum>(
self,
input_len: usize,
init: impl Fn() -> Accum + Sync,
process_item: impl Fn(Accum, usize) -> ControlFlow<Accum, Accum> + Sync,
finalize: impl Fn(Accum) -> Output + Sync,
reduce: impl Fn(Output, Output) -> Output,
cleanup: &(impl SourceCleanup + Sync),
) -> Output {
ThreadPool::with_current(|f| {
f.split_per(self.0).upper_bounded_pipeline(
input_len,
init,
process_item,
finalize,
reduce,
cleanup,
)
})
}
fn iter_pipeline<Output, Accum: Send>(
self,
input_len: usize,
accum: impl Accumulator<usize, Accum> + Sync,
reduce: impl ExactSizeAccumulator<Accum, Output>,
cleanup: &(impl SourceCleanup + Sync),
) -> Output {
ThreadPool::with_current(|f| {
f.split_per(self.0)
.iter_pipeline(input_len, accum, reduce, cleanup)
})
}
}
ThreadPerChunk(chunk_size)
}
pub fn split_by(chunks: usize) -> impl GenericThreadPool {
struct Chunks(usize);
unsafe impl GenericThreadPool for Chunks {
fn upper_bounded_pipeline<Output: Send, Accum>(
self,
input_len: usize,
init: impl Fn() -> Accum + Sync,
process_item: impl Fn(Accum, usize) -> ControlFlow<Accum, Accum> + Sync,
finalize: impl Fn(Accum) -> Output + Sync,
reduce: impl Fn(Output, Output) -> Output,
cleanup: &(impl SourceCleanup + Sync),
) -> Output {
ThreadPool::with_current(|f| {
f.split_by(self.0).upper_bounded_pipeline(
input_len,
init,
process_item,
finalize,
reduce,
cleanup,
)
})
}
fn iter_pipeline<Output, Accum: Send>(
self,
input_len: usize,
accum: impl Accumulator<usize, Accum> + Sync,
reduce: impl ExactSizeAccumulator<Accum, Output>,
cleanup: &(impl SourceCleanup + Sync),
) -> Output {
ThreadPool::with_current(|f| {
f.split_by(self.0)
.iter_pipeline(input_len, accum, reduce, cleanup)
})
}
}
Chunks(chunks)
}
pub fn split_by_threads() -> impl GenericThreadPool {
struct SplitByThreads;
unsafe impl GenericThreadPool for SplitByThreads {
fn upper_bounded_pipeline<Output: Send, Accum>(
self,
input_len: usize,
init: impl Fn() -> Accum + Sync,
process_item: impl Fn(Accum, usize) -> ControlFlow<Accum, Accum> + Sync,
finalize: impl Fn(Accum) -> Output + Sync,
reduce: impl Fn(Output, Output) -> Output,
cleanup: &(impl SourceCleanup + Sync),
) -> Output {
ThreadPool::with_current(|f| {
f.split_by_threads().upper_bounded_pipeline(
input_len,
init,
process_item,
finalize,
reduce,
cleanup,
)
})
}
fn iter_pipeline<Output, Accum: Send>(
self,
input_len: usize,
accum: impl Accumulator<usize, Accum> + Sync,
reduce: impl ExactSizeAccumulator<Accum, Output>,
cleanup: &(impl SourceCleanup + Sync),
) -> Output {
ThreadPool::with_current(|f| {
f.split_by_threads()
.iter_pipeline(input_len, accum, reduce, cleanup)
})
}
}
SplitByThreads
}
pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
where
A: FnOnce() -> RA + Send,
B: FnOnce() -> RB + Send,
RA: Send,
RB: Send,
{
ThreadPool::with_current(|pool| pool.join(oper_a, oper_b))
}
pub fn num_threads() -> usize {
ThreadPool::with_current(|pool| pool.num_threads())
}
pub fn spawn_owned<T: 'static + Send>(f: impl 'static + Send + FnOnce() -> T) -> OwnedTask<T> {
ThreadPool::with_current(|pool| pool.spawn_owned(f))
}
pub fn spawn_shared<T: 'static + Send + Sync>(
f: impl 'static + Send + FnOnce() -> T,
) -> SharedTask<T> {
ThreadPool::with_current(|pool| pool.spawn_shared(f))
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::iter::*;
#[test]
fn test_add() {
let len = 10_000;
let mut output = vec![0; len];
let left = (0..len as u64).collect::<Vec<u64>>();
let right = (0..len as u64).collect::<Vec<u64>>();
let expected_output = (0..len as u64).map(|x| 2 * x).collect::<Vec<u64>>();
let output_slice = output.as_mut_slice();
let left_slice = left.as_slice();
let right_slice = right.as_slice();
(
std::hint::black_box(output_slice.par_iter_mut()),
std::hint::black_box(left_slice).par_iter(),
std::hint::black_box(right_slice).par_iter(),
)
.zip_eq()
.with_thread_pool(crate::split_by_threads())
.for_each(|(out, &a, &b)| *out = a + b);
assert_eq!(output, expected_output);
}
#[test]
fn test_sum() {
let len = 10_000;
let input = (0..len as u64).collect::<Vec<u64>>();
let input_slice = input.as_slice();
let result = input_slice
.par_iter()
.with_thread_pool(crate::split_by_threads())
.sum::<u64>();
assert_eq!(result, 49995000);
}
#[test]
fn test_for_each() {
let mut result = [0; 5];
(result.par_iter_mut(), (1..=5).into_par_iter())
.zip_eq()
.with_thread_pool(crate::split_per_item())
.for_each(|(out, x)| *out = x * x - 1);
assert_eq!([0, 3, 8, 15, 24], result);
}
#[test]
fn test_execute_many() {
let first_task = crate::spawn_owned(|| 2);
let second_task = crate::spawn_owned(|| 2);
assert_eq!(first_task.join(), second_task.join());
for _ in 0..1000 {
let third_task =
crate::spawn_owned(|| std::thread::sleep(std::time::Duration::new(0, 10)));
let fourth_task =
crate::spawn_owned(|| std::thread::sleep(std::time::Duration::new(0, 200)));
assert_eq!(third_task.join(), fourth_task.join());
}
}
#[test]
fn test_two_roots() {
let for_each = || {
for _ in 0..10 {
let mut result = [0; 500];
result
.par_iter_mut()
.enumerate()
.with_thread_pool(crate::split_per_item())
.for_each(|(x, out)| *out = x * x + 1);
}
};
let a = std::thread::spawn(for_each);
let b = std::thread::spawn(for_each);
let _ = a.join();
let _ = b.join();
}
#[test]
fn test_many_roots() {
let for_each = || {
for _ in 0..10 {
let mut result = [0; 5];
result
.par_iter_mut()
.enumerate()
.with_thread_pool(crate::split_per_item())
.for_each(|(x, out)| {
if x == 0 {
std::thread::sleep(Duration::from_millis(20));
}
*out = x * x + 1;
});
}
};
let mut handles = Vec::new();
for _ in 0..16 {
handles.push(std::thread::spawn(for_each));
}
for handle in handles {
let _ = handle.join();
}
}
}