use rayon::iter::{ParallelIterator, plumbing::{UnindexedConsumer, Consumer, Folder, Producer, ProducerCallback}, IndexedParallelIterator, IntoParallelIterator};
use std::sync::{Arc,atomic::{AtomicUsize, Ordering}};
pub struct ProgressAdaptor<I> {
inner: I,
items_processed: Arc<AtomicUsize>,
}
#[derive(Clone)]
pub struct ItemsProcessed(Arc<AtomicUsize>);
struct ProgressConsumer<C> {
inner: C,
items_processed: Arc<AtomicUsize>,
}
struct ProgressFolder<F> {
inner: F,
items_processed: Arc<AtomicUsize>,
}
struct ProgressProducer<P> {
inner: P,
items_processed: Arc<AtomicUsize>,
}
struct ProgressIterator<I> {
inner: I,
items_processed: Arc<AtomicUsize>,
}
impl ProgressAdaptor<()> {
pub fn new<T>(iter: T) -> ProgressAdaptor<T::Iter> where T: IntoParallelIterator {
ProgressAdaptor {
inner: iter.into_par_iter(),
items_processed: Arc::new(AtomicUsize::new(0)),
}
}
}
impl<T> ProgressAdaptor<T> {
pub fn items_processed(&self) -> ItemsProcessed {
ItemsProcessed(self.items_processed.clone())
}
}
impl ItemsProcessed {
pub fn get(&self) -> usize {
self.0.load(Ordering::Relaxed)
}
}
impl<I> ParallelIterator for ProgressAdaptor<I> where I: ParallelIterator {
type Item=I::Item;
fn drive_unindexed<C>(self, consumer: C) -> C::Result
where
C: UnindexedConsumer<Self::Item> {
self.inner.drive_unindexed(ProgressConsumer {inner: consumer, items_processed: self.items_processed})
}
}
impl<I> IndexedParallelIterator for ProgressAdaptor<I> where I: IndexedParallelIterator {
fn len(&self) -> usize {
self.inner.len()
}
fn drive<C: Consumer<Self::Item>>(self, consumer: C) -> C::Result {
self.inner.drive(ProgressConsumer {inner: consumer, items_processed: self.items_processed})
}
fn with_producer<CB: ProducerCallback<Self::Item>>(self, callback: CB) -> CB::Output {
struct ProgressCB<CB> {
inner: CB,
items_processed: Arc<AtomicUsize>,
}
impl<CB,T> ProducerCallback<T> for ProgressCB<CB> where CB: ProducerCallback<T> {
type Output=CB::Output;
fn callback<P>(self, producer: P) -> Self::Output where P: Producer<Item = T> {
self.inner.callback(ProgressProducer{inner: producer, items_processed: self.items_processed})
}
}
self.inner.with_producer(ProgressCB{inner: callback, items_processed: self.items_processed})
}
}
impl<C,I> UnindexedConsumer<I> for ProgressConsumer<C> where C: UnindexedConsumer<I> {
fn split_off_left(&self) -> Self {
Self {inner: self.inner.split_off_left(), items_processed: self.items_processed.clone()}
}
fn to_reducer(&self) -> Self::Reducer {
self.inner.to_reducer()
}
}
impl<C,I> Consumer<I> for ProgressConsumer<C> where C: Consumer<I> {
type Folder = ProgressFolder<C::Folder>;
type Reducer = C::Reducer;
type Result = C::Result;
fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
let ProgressConsumer {inner, items_processed: entries_processed} = self;
let (left, right, reducer) = inner.split_at(index);
(ProgressConsumer{inner:left, items_processed: entries_processed.clone()},
ProgressConsumer{inner:right, items_processed: entries_processed},
reducer)
}
fn into_folder(self) -> Self::Folder {
ProgressFolder {inner: self.inner.into_folder(), items_processed: self.items_processed}
}
fn full(&self) -> bool {
self.inner.full()
}
}
impl<F,I> Folder<I> for ProgressFolder<F> where F: Folder<I> {
type Result=F::Result;
fn consume(self, item: I) -> Self {
let Self{inner, items_processed} = self;
let inner = inner.consume(item);
items_processed.fetch_add(1, Ordering::Relaxed);
Self {inner, items_processed}
}
fn complete(self) -> Self::Result {
self.inner.complete()
}
fn full(&self) -> bool {
self.inner.full()
}
}
impl<P> Producer for ProgressProducer<P> where P: Producer {
type Item=P::Item;
type IntoIter=ProgressIterator<P::IntoIter>;
fn into_iter(self) -> Self::IntoIter {
ProgressIterator{inner: self.inner.into_iter(), items_processed: self.items_processed}
}
fn split_at(self, index: usize) -> (Self, Self) {
let Self{inner, items_processed}=self;
let (left,right) = inner.split_at(index);
(Self{inner: left, items_processed: items_processed.clone()}, Self{inner: right, items_processed})
}
fn min_len(&self) -> usize {
self.inner.min_len()
}
fn max_len(&self) -> usize {
self.inner.max_len()
}
fn fold_with<F>(self, folder: F) -> F
where
F: Folder<Self::Item>,
{
self.inner.fold_with(ProgressFolder{inner: folder, items_processed: self.items_processed}).inner
}
}
impl<I> Iterator for ProgressIterator<I> where I: Iterator {
type Item=I::Item;
fn next(&mut self) -> Option<Self::Item> {
let res = self.inner.next();
if res.is_some() {
self.items_processed.fetch_add(1, Ordering::Relaxed);
}
res
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
impl<I> DoubleEndedIterator for ProgressIterator<I> where I: DoubleEndedIterator {
fn next_back(&mut self) -> Option<Self::Item> {
let res = self.inner.next_back();
if res.is_some() {
self.items_processed.fetch_add(1, Ordering::Relaxed);
}
res
}
}
impl<I> ExactSizeIterator for ProgressIterator<I> where I: ExactSizeIterator {
fn len(&self) -> usize {
self.inner.len()
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicBool;
use super::*;
#[test]
fn test() {
let flag = Arc::new(AtomicBool::new(false));
let flag2= flag.clone();
let iter = ProgressAdaptor::new(0..1000);
let items_processed = iter.items_processed();
rayon::spawn(move || {
while items_processed.get() < 500 {
std::hint::spin_loop();
}
flag2.store(true, Ordering::Release);
});
let sum: u64 = iter.map(|x| {
if x >= 500 {
while !flag.load(Ordering::Acquire) {
std::hint::spin_loop();
}
}
x
}).sum();
assert_eq!(sum, 499500);
}
}