use super::range::{Range, SkipIterator};
use crate::iter::{Accumulator, SourceCleanup};
use crossbeam_utils::CachePadded;
use std::ops::ControlFlow;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
pub trait Pipeline<R: Range> {
fn run(&self, worker_id: usize, range: &R);
}
pub struct UpperBoundedPipelineImpl<
'a,
Output,
Accum,
Init: Fn() -> Accum,
ProcessItem: Fn(Accum, usize) -> ControlFlow<Accum, Accum>,
Finalize: Fn(Accum) -> Output,
Cleanup: SourceCleanup,
> {
pub bound: CachePadded<AtomicUsize>,
pub outputs: Arc<[Mutex<Option<Output>>]>,
pub init: Init,
pub process_item: ProcessItem,
pub finalize: Finalize,
pub cleanup: &'a Cleanup,
}
impl<R, Output, Accum, Init, ProcessItem, Finalize, Cleanup> Pipeline<R>
for UpperBoundedPipelineImpl<'_, Output, Accum, Init, ProcessItem, Finalize, Cleanup>
where
R: Range,
Init: Fn() -> Accum,
ProcessItem: Fn(Accum, usize) -> ControlFlow<Accum, Accum>,
Finalize: Fn(Accum) -> Output,
Cleanup: SourceCleanup,
{
fn run(&self, worker_id: usize, range: &R) {
let mut accumulator = (self.init)();
let iter = SkipIteratorWrapper {
iter: range.upper_bounded_iter(&self.bound),
cleanup: self.cleanup,
};
for i in iter {
let acc = (self.process_item)(accumulator, i);
accumulator = match acc {
ControlFlow::Continue(acc) => acc,
ControlFlow::Break(acc) => {
self.bound.fetch_min(i, Ordering::Relaxed);
acc
}
};
}
let output = (self.finalize)(accumulator);
*self.outputs[worker_id].lock().unwrap() = Some(output);
}
}
pub struct IterPipelineImpl<'a, Output, Accum: Accumulator<usize, Output>, Cleanup: SourceCleanup> {
pub outputs: Arc<[Mutex<Option<Output>>]>,
pub accum: Accum,
pub cleanup: &'a Cleanup,
}
impl<R, Output, Accum, Cleanup> Pipeline<R> for IterPipelineImpl<'_, Output, Accum, Cleanup>
where
R: Range,
Accum: Accumulator<usize, Output>,
Cleanup: SourceCleanup,
{
fn run(&self, worker_id: usize, range: &R) {
let iter = SkipIteratorWrapper {
iter: range.iter(),
cleanup: self.cleanup,
};
let output = self.accum.accumulate(iter);
*self.outputs[worker_id].lock().unwrap() = Some(output);
}
}
struct SkipIteratorWrapper<'a, I: SkipIterator, Cleanup: SourceCleanup> {
iter: I,
cleanup: &'a Cleanup,
}
impl<I: SkipIterator, Cleanup: SourceCleanup> Iterator for SkipIteratorWrapper<'_, I, Cleanup> {
type Item = usize;
#[inline(always)]
fn next(&mut self) -> Option<Self::Item> {
loop {
match self.iter.next() {
(index, None) => return index,
(index, Some(skipped_range)) => {
unsafe {
self.cleanup.cleanup_item_range(skipped_range);
}
if index.is_some() {
return index;
}
}
}
}
}
}
impl<I: SkipIterator, Cleanup: SourceCleanup> Drop for SkipIteratorWrapper<'_, I, Cleanup> {
fn drop(&mut self) {
if let Some(range) = self.iter.remaining_range() {
unsafe {
self.cleanup.cleanup_item_range(range);
}
}
}
}