use std::sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Mutex,
};
use crate::{
current_num_threads, current_thread_index,
iter::{
plumbing::{bridge_unindexed, Folder, UnindexedConsumer, UnindexedProducer},
ParallelIterator,
},
};
pub trait ParallelBridge: Sized {
fn par_bridge(self) -> IterBridge<Self>;
}
impl<T: Iterator + Send> ParallelBridge for T
where
T::Item: Send,
{
fn par_bridge(self) -> IterBridge<Self> {
IterBridge { iter: self }
}
}
#[derive(Debug, Clone)]
pub struct IterBridge<Iter> {
iter: Iter,
}
impl<Iter: Iterator + Send> ParallelIterator for IterBridge<Iter>
where
Iter::Item: Send,
{
type Item = Iter::Item;
fn drive_unindexed<C>(self, consumer: C) -> C::Result
where
C: UnindexedConsumer<Self::Item>,
{
let num_threads = current_num_threads();
let threads_started: Vec<_> = (0..num_threads).map(|_| AtomicBool::new(false)).collect();
bridge_unindexed(
&IterParallelProducer {
split_count: AtomicUsize::new(num_threads),
iter: Mutex::new(self.iter.fuse()),
threads_started: &threads_started,
},
consumer,
)
}
}
struct IterParallelProducer<'a, Iter> {
split_count: AtomicUsize,
iter: Mutex<std::iter::Fuse<Iter>>,
threads_started: &'a [AtomicBool],
}
impl<Iter: Iterator + Send> UnindexedProducer for &IterParallelProducer<'_, Iter> {
type Item = Iter::Item;
fn split(self) -> (Self, Option<Self>) {
let update = self
.split_count
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |c| c.checked_sub(1));
(self, update.is_ok().then_some(self))
}
fn fold_with<F>(self, mut folder: F) -> F
where
F: Folder<Self::Item>,
{
if let Some(i) = current_thread_index() {
let thread_started = &self.threads_started[i % self.threads_started.len()];
if thread_started.swap(true, Ordering::Relaxed) {
return folder;
}
}
loop {
if let Ok(mut iter) = self.iter.lock() {
if let Some(it) = iter.next() {
drop(iter);
folder = folder.consume(it);
if folder.full() {
return folder;
}
} else {
return folder;
}
} else {
return folder;
}
}
}
}