use crate::ThreadExecutor;
use crate::generic_values::Values;
use crate::generic_values::runner_results::{Stop, ThreadCollectArbitrary};
use orx_concurrent_bag::ConcurrentBag;
use orx_concurrent_iter::{ChunkPuller, ConcurrentIter};
use orx_fixed_vec::IntoConcurrentPinnedVec;
#[cfg(test)]
pub fn m<C, I, O, M1, P>(
mut runner: C,
iter: &I,
shared_state: &C::SharedState,
map1: &M1,
bag: &ConcurrentBag<O, P>,
) where
C: ThreadExecutor,
I: ConcurrentIter,
M1: Fn(I::Item) -> O,
P: IntoConcurrentPinnedVec<O>,
O: Send,
{
let mut chunk_puller = iter.chunk_puller(0);
let mut item_puller = iter.item_puller();
loop {
let chunk_size = runner.next_chunk_size(shared_state, iter);
runner.begin_chunk(chunk_size);
match chunk_size {
0 | 1 => match item_puller.next() {
Some(value) => _ = bag.push(map1(value)),
None => {
if iter.is_completed_when_none_returned() {
break;
}
}
},
c => {
if c > chunk_puller.chunk_size() {
chunk_puller = iter.chunk_puller(c);
}
match chunk_puller.pull() {
Some(chunk) => _ = bag.extend(chunk.map(&map1)),
None => {
if iter.is_completed_when_none_returned() {
break;
}
}
}
}
}
runner.complete_chunk(shared_state, chunk_size);
}
runner.complete_task(shared_state);
}
pub fn x<C, I, Vo, X1, P>(
mut runner: C,
iter: &I,
shared_state: &C::SharedState,
xap1: &X1,
bag: &ConcurrentBag<Vo::Item, P>,
) -> ThreadCollectArbitrary<Vo::Fallibility>
where
C: ThreadExecutor,
I: ConcurrentIter,
Vo: Values,
X1: Fn(I::Item) -> Vo,
P: IntoConcurrentPinnedVec<Vo::Item>,
Vo::Item: Send,
{
let mut chunk_puller = iter.chunk_puller(0);
let mut item_puller = iter.item_puller();
loop {
let chunk_size = runner.next_chunk_size(shared_state, iter);
runner.begin_chunk(chunk_size);
match chunk_size {
0 | 1 => match item_puller.next() {
Some(value) => {
let vo = xap1(value);
let done = vo.push_to_bag(bag);
if let Some(stop) = Vo::arbitrary_push_to_stop(done) {
iter.skip_to_end();
runner.complete_chunk(shared_state, chunk_size);
runner.complete_task(shared_state);
match stop {
Stop::DueToWhile => {
return ThreadCollectArbitrary::StoppedByWhileCondition;
}
Stop::DueToError { error } => {
return ThreadCollectArbitrary::StoppedByError { error };
}
}
}
}
None => {
if iter.is_completed_when_none_returned() {
break;
}
}
},
c => {
if c > chunk_puller.chunk_size() {
chunk_puller = iter.chunk_puller(c);
}
match chunk_puller.pull() {
Some(chunk) => {
for value in chunk {
let vo = xap1(value);
let done = vo.push_to_bag(bag);
if let Some(stop) = Vo::arbitrary_push_to_stop(done) {
iter.skip_to_end();
runner.complete_chunk(shared_state, chunk_size);
runner.complete_task(shared_state);
match stop {
Stop::DueToWhile => {
return ThreadCollectArbitrary::StoppedByWhileCondition;
}
Stop::DueToError { error } => {
return ThreadCollectArbitrary::StoppedByError { error };
}
}
}
}
}
None => {
if iter.is_completed_when_none_returned() {
break;
}
}
}
}
}
runner.complete_chunk(shared_state, chunk_size);
}
runner.complete_task(shared_state);
ThreadCollectArbitrary::AllCollected
}