orx-concurrent-recursive-iter 2.0.0

A concurrent iterator that can be extended recursively by each of its items.
Documentation
use core::{iter::FusedIterator, marker::PhantomData};
use orx_pinned_vec::ConcurrentPinnedVec;

pub struct DynSeqQueue<T, P, E, I>
where
    P: ConcurrentPinnedVec<T>,
    E: Fn(&T) -> I + Sync,
    I: IntoIterator<Item = T>,
    I::IntoIter: ExactSizeIterator,
{
    vec: P,
    written: usize,
    popped: usize,
    extend: E,
    phantom: PhantomData<T>,
}

impl<T, P, E, I> DynSeqQueue<T, P, E, I>
where
    P: ConcurrentPinnedVec<T>,
    E: Fn(&T) -> I + Sync,
    I: IntoIterator<Item = T>,
    I::IntoIter: ExactSizeIterator,
{
    pub(super) fn new(vec: P, written: usize, popped: usize, extend: E) -> Self {
        Self {
            vec,
            written,
            popped,
            extend,
            phantom: PhantomData,
        }
    }

    #[inline(always)]
    unsafe fn ptr(&self, idx: usize) -> *mut T {
        unsafe { self.vec.get_ptr_mut(idx) }
    }

    #[inline(always)]
    fn assert_has_capacity_for(&self, idx: usize) {
        assert!(
            idx < self.vec.max_capacity(),
            "Out of capacity. Underlying pinned vector cannot grow any further while being concurrently safe."
        );
    }

    fn grow_to(&self, new_capacity: usize) {
        _ = self
            .vec
            .grow_to(new_capacity)
            .expect("The underlying pinned vector reached its capacity and failed to grow");
    }

    fn extend(&mut self, values: I) {
        let values = values.into_iter();
        let num_items = values.len();

        if num_items > 0 {
            let begin_idx = self.written;
            self.written += num_items;
            let end_idx = begin_idx + num_items;
            let last_idx = begin_idx + num_items - 1;
            self.assert_has_capacity_for(last_idx);

            let capacity = self.vec.capacity();

            if last_idx >= capacity {
                self.grow_to(end_idx);
            }

            let iter = unsafe { self.vec.ptr_iter_unchecked(begin_idx..end_idx) };
            for (p, value) in iter.zip(values) {
                unsafe { p.write(value) };
            }
        }
    }
}

impl<T, P, E, I> Iterator for DynSeqQueue<T, P, E, I>
where
    P: ConcurrentPinnedVec<T>,
    E: Fn(&T) -> I + Sync,
    I: IntoIterator<Item = T>,
    I::IntoIter: ExactSizeIterator,
{
    type Item = T;

    fn next(&mut self) -> Option<Self::Item> {
        let idx = self.popped;
        self.popped += 1;

        match idx < self.written {
            true => {
                let n = unsafe { self.ptr(idx).read() };
                let children = (self.extend)(&n);
                self.extend(children);
                Some(n)
            }
            false => None,
        }
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        let min = self.written - self.popped;
        (min, None)
    }
}

impl<T, P, E, I> FusedIterator for DynSeqQueue<T, P, E, I>
where
    P: ConcurrentPinnedVec<T>,
    E: Fn(&T) -> I + Sync,
    I: IntoIterator<Item = T>,
    I::IntoIter: ExactSizeIterator,
{
}