orx-concurrent-recursive-iter 2.0.0

A concurrent iterator that can be extended recursively by each of its items.
Documentation
use crate::{chunk::DynChunk, queue::Queue};
use orx_concurrent_iter::ChunkPuller;
use orx_concurrent_queue::ConcurrentQueue;
use orx_pinned_vec::ConcurrentPinnedVec;

pub struct DynChunkPuller<'a, T, E, P>
where
    T: Send,
    E: Fn(&T, &Queue<T, P>) + Sync,
    P: ConcurrentPinnedVec<T>,
{
    extend: &'a E,
    queue: &'a ConcurrentQueue<T, P>,
    chunk_size: usize,
}

impl<'a, T, E, P> DynChunkPuller<'a, T, E, P>
where
    T: Send,
    E: Fn(&T, &Queue<T, P>) + Sync,
    P: ConcurrentPinnedVec<T>,
{
    pub(super) fn new(extend: &'a E, queue: &'a ConcurrentQueue<T, P>, chunk_size: usize) -> Self {
        Self {
            extend,
            queue,
            chunk_size,
        }
    }
}

impl<'a, T, E, P> ChunkPuller for DynChunkPuller<'a, T, E, P>
where
    T: Send,
    E: Fn(&T, &Queue<T, P>) + Sync,
    P: ConcurrentPinnedVec<T>,
{
    type ChunkItem = T;

    type Chunk<'c>
        = DynChunk<'c, T, E, P>
    where
        Self: 'c;

    #[inline(always)]
    fn chunk_size(&self) -> usize {
        self.chunk_size
    }

    fn pull(&mut self) -> Option<Self::Chunk<'_>> {
        let chunk = self.queue.pull(self.chunk_size)?;
        Some(DynChunk::new(chunk, self.extend, self.queue.into()))
    }

    fn pull_with_idx(&mut self) -> Option<(usize, Self::Chunk<'_>)> {
        let (begin_idx, chunk) = self.queue.pull_with_idx(self.chunk_size)?;
        Some((
            begin_idx,
            DynChunk::new(chunk, self.extend, self.queue.into()),
        ))
    }
}