use crate::{
ExactSize, Size, UnknownSize, chunk_puller::DynChunkPuller, dyn_seq_queue::DynSeqQueue,
};
use core::{marker::PhantomData, sync::atomic::Ordering};
use orx_concurrent_iter::{ConcurrentIter, ExactSizeConcurrentIter};
use orx_concurrent_queue::{ConcurrentQueue, DefaultConPinnedVec};
use orx_pinned_vec::{ConcurrentPinnedVec, IntoConcurrentPinnedVec};
use orx_split_vec::SplitVec;
pub type ConcurrentRecursiveIter<T, E, I, P = DefaultConPinnedVec<T>> =
ConcurrentRecursiveIterCore<UnknownSize, T, E, I, P>;
pub type ConcurrentRecursiveIterExact<T, E, I, P = DefaultConPinnedVec<T>> =
ConcurrentRecursiveIterCore<ExactSize, T, E, I, P>;
pub struct ConcurrentRecursiveIterCore<S, T, E, I, P = DefaultConPinnedVec<T>>
where
S: Size,
T: Send,
E: Fn(&T) -> I + Sync,
I: IntoIterator<Item = T>,
I::IntoIter: ExactSizeIterator,
P: ConcurrentPinnedVec<T>,
<P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
{
queue: ConcurrentQueue<T, P>,
extend: E,
exact_len: S,
p: PhantomData<S>,
}
impl<T, E, I, P> From<(E, ConcurrentQueue<T, P>)>
for ConcurrentRecursiveIterCore<UnknownSize, T, E, I, P>
where
T: Send,
E: Fn(&T) -> I + Sync,
I: IntoIterator<Item = T>,
I::IntoIter: ExactSizeIterator,
P: ConcurrentPinnedVec<T>,
<P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
{
fn from((extend, queue): (E, ConcurrentQueue<T, P>)) -> Self {
Self {
queue,
extend,
exact_len: UnknownSize,
p: PhantomData,
}
}
}
impl<T, E, I> ConcurrentRecursiveIterCore<UnknownSize, T, E, I, DefaultConPinnedVec<T>>
where
T: Send,
E: Fn(&T) -> I + Sync,
I: IntoIterator<Item = T>,
I::IntoIter: ExactSizeIterator,
{
pub fn new(extend: E, initial_elements: impl IntoIterator<Item = T>) -> Self {
let mut vec = SplitVec::with_doubling_growth_and_max_concurrent_capacity();
vec.extend(initial_elements);
let queue = vec.into();
(extend, queue).into()
}
}
impl<T, E, I, P> From<(E, ConcurrentQueue<T, P>, usize)>
for ConcurrentRecursiveIterCore<ExactSize, T, E, I, P>
where
T: Send,
E: Fn(&T) -> I + Sync,
I: IntoIterator<Item = T>,
I::IntoIter: ExactSizeIterator,
P: ConcurrentPinnedVec<T>,
<P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
{
fn from((extend, queue, exact_len): (E, ConcurrentQueue<T, P>, usize)) -> Self {
Self {
queue,
extend,
exact_len: ExactSize(exact_len),
p: PhantomData,
}
}
}
impl<T, E, I> ConcurrentRecursiveIterCore<ExactSize, T, E, I, DefaultConPinnedVec<T>>
where
T: Send,
E: Fn(&T) -> I + Sync,
I: IntoIterator<Item = T>,
I::IntoIter: ExactSizeIterator,
{
pub fn new_exact(
extend: E,
initial_elements: impl IntoIterator<Item = T>,
exact_len: usize,
) -> Self {
let mut vec = SplitVec::with_doubling_growth_and_max_concurrent_capacity();
vec.extend(initial_elements);
let queue = vec.into();
(extend, queue, exact_len).into()
}
}
impl<S, T, E, I, P> ConcurrentIter for ConcurrentRecursiveIterCore<S, T, E, I, P>
where
S: Size,
T: Send,
E: Fn(&T) -> I + Sync,
I: IntoIterator<Item = T>,
I::IntoIter: ExactSizeIterator,
P: ConcurrentPinnedVec<T>,
<P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
{
type Item = T;
type SequentialIter = DynSeqQueue<T, P, E, I>;
type ChunkPuller<'i>
= DynChunkPuller<'i, T, E, I, P>
where
Self: 'i;
fn into_seq_iter(self) -> Self::SequentialIter {
let (vec, written, popped) = unsafe { self.queue.destruct() };
DynSeqQueue::new(vec, written, popped, self.extend)
}
fn skip_to_end(&self) {
let len = self.queue.num_write_reserved(Ordering::Acquire);
let _remaining_to_drop = self.queue.pull(len);
}
fn next(&self) -> Option<Self::Item> {
let n = self.queue.pop()?;
let children = (self.extend)(&n);
self.queue.extend(children);
Some(n)
}
fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
let (idx, n) = self.queue.pop_with_idx()?;
let children = (self.extend)(&n);
self.queue.extend(children);
Some((idx, n))
}
fn size_hint(&self) -> (usize, Option<usize>) {
match self.exact_len.exact_len() {
Some(exact_len) => {
let popped = self.queue.num_popped(Ordering::Relaxed);
let remaining = exact_len - popped;
(remaining, Some(remaining))
}
None => {
let min = self.queue.len();
match min {
0 => (0, Some(0)),
n => (n, None),
}
}
}
}
fn is_completed_when_none_returned(&self) -> bool {
let popped = self.queue.num_popped(Ordering::Relaxed);
let write_reserved = self.queue.num_write_reserved(Ordering::Relaxed);
popped >= write_reserved
}
fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
DynChunkPuller::new(&self.extend, &self.queue, chunk_size)
}
}
impl<T, E, I, P> ExactSizeConcurrentIter for ConcurrentRecursiveIterCore<ExactSize, T, E, I, P>
where
T: Send,
E: Fn(&T) -> I + Sync,
I: IntoIterator<Item = T>,
I::IntoIter: ExactSizeIterator,
P: ConcurrentPinnedVec<T>,
<P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
{
fn len(&self) -> usize {
self.exact_len.0 - self.queue.num_popped(Ordering::Relaxed)
}
}