pub trait ParMapFold: Iterator
where
Self::Item: Send,
{
#[inline(always)]
fn par_map_fold<
R: Send + Default,
M: Fn(Self::Item) -> R + Send + Sync,
F: Fn(R, R) -> R + Sync,
>(
&mut self,
map: M,
fold: F,
) -> R {
self.par_map_fold_with((), |_, i| map(i), fold)
}
#[inline(always)]
fn par_map_fold_with<
T: Clone + Send,
R: Send + Default,
M: Fn(&mut T, Self::Item) -> R + Send + Sync,
F: Fn(R, R) -> R + Sync,
>(
&mut self,
init: T,
map: M,
fold: F,
) -> R {
self.par_map_fold2_with(init, map, &fold, &fold)
}
#[inline(always)]
fn par_map_fold2<
R,
M: Fn(Self::Item) -> R + Send + Sync,
A: Send + Default,
IF: Fn(A, R) -> A + Sync,
OF: Fn(A, A) -> A,
>(
&mut self,
map: M,
inner_fold: IF,
outer_fold: OF,
) -> A {
self.par_map_fold2_with((), |_, i| map(i), inner_fold, outer_fold)
}
fn par_map_fold2_with<
T: Clone + Send,
R,
M: Fn(&mut T, Self::Item) -> R + Send + Sync,
A: Send + Default,
IF: Fn(A, R) -> A + Sync,
OF: Fn(A, A) -> A,
>(
&mut self,
init: T,
map: M,
inner_fold: IF,
outer_fold: OF,
) -> A {
let (_min_len, max_len) = self.size_hint();
let mut num_scoped_threads = rayon::current_num_threads();
if let Some(max_len) = max_len {
num_scoped_threads = num_scoped_threads.min(max_len);
}
num_scoped_threads = num_scoped_threads.max(1);
let (out_tx, out_rx) = crossbeam_channel::bounded(num_scoped_threads);
let (in_tx, in_rx) = crossbeam_channel::bounded(2 * num_scoped_threads);
rayon::in_place_scope(|scope| {
for _thread_id in 0..num_scoped_threads {
let mut init = init.clone();
let map = ↦
let inner_fold = &inner_fold;
let out_tx = out_tx.clone();
let in_rx = in_rx.clone();
scope.spawn(move |_| {
let mut res = A::default();
loop {
match in_rx.recv() {
Ok(val) => {
res = inner_fold(res, map(&mut init, val));
}
Err(_e) => {
out_tx.send(res).unwrap();
break;
}
}
}
});
}
drop(out_tx);
drop(in_rx);
for val in self {
in_tx.send(val).unwrap();
}
drop(in_tx); out_rx.into_rayon_iter().fold(A::default(), outer_fold)
})
}
}
impl<I: Iterator> ParMapFold for I where I::Item: Send {}
#[doc(hidden)]
#[derive(Debug)]
pub struct RayonChannelIter<T> {
channel: crossbeam_channel::Receiver<T>,
}
impl<T> Iterator for RayonChannelIter<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
loop {
match self.channel.try_recv() {
Ok(item) => return Some(item),
Err(crossbeam_channel::TryRecvError::Disconnected) => return None,
Err(crossbeam_channel::TryRecvError::Empty) => {
rayon::yield_now();
}
}
}
}
}
pub trait RayonChannelIterExt<T>: Sized {
fn into_rayon_iter(self) -> RayonChannelIter<T>;
}
impl<T> RayonChannelIterExt<T> for crossbeam_channel::Receiver<T> {
fn into_rayon_iter(self) -> RayonChannelIter<T> {
RayonChannelIter { channel: self }
}
}