use crate::{
concurrent_iter::ConcurrentIter,
exact_size_concurrent_iter::ExactSizeConcurrentIter,
implementations::{
array_utils::{ArrayChunkPuller, ArrayConIter, ArrayIntoSeqIter, ChunkPointers},
ptr_utils::take,
},
};
use alloc::vec::Vec;
use core::{
mem::ManuallyDrop,
sync::atomic::{AtomicUsize, Ordering},
};
pub struct ConIterVec<T> {
ptr: *const T,
vec_len: usize,
vec_cap: usize,
counter: AtomicUsize,
}
unsafe impl<T: Send> Sync for ConIterVec<T> {}
impl<T> Default for ConIterVec<T> {
fn default() -> Self {
Self::new(Vec::new())
}
}
impl<T> Drop for ConIterVec<T> {
fn drop(&mut self) {
let _iter = self.remaining_into_seq_iter();
}
}
impl<T> ConIterVec<T> {
pub(super) fn new(vec: Vec<T>) -> Self {
let (vec_len, vec_cap, ptr) = (vec.len(), vec.capacity(), vec.as_ptr());
let _ = ManuallyDrop::new(vec);
Self {
ptr,
vec_len,
vec_cap,
counter: 0.into(),
}
}
pub(super) fn initial_len(&self) -> usize {
self.vec_len
}
fn progress_and_get_begin_idx(&self, number_to_fetch: usize) -> Option<usize> {
let begin_idx = self.counter.fetch_add(number_to_fetch, Ordering::Relaxed);
match begin_idx < self.vec_len {
true => Some(begin_idx),
_ => None,
}
}
fn remaining_into_seq_iter(&mut self) -> ArrayIntoSeqIter<T, ()> {
match self.ptr.is_null() {
true => Default::default(),
false => {
let num_taken = self.counter.load(Ordering::Acquire).min(self.vec_len);
let iter = self.slice_into_seq_iter(num_taken, true);
self.ptr = core::ptr::null();
iter
}
}
}
fn slice_into_seq_iter(&self, num_taken: usize, drop_vec: bool) -> ArrayIntoSeqIter<T, ()> {
let completed = num_taken == self.vec_len;
let (last, current) = match completed {
true => (core::ptr::null(), core::ptr::null()),
false => {
let last = unsafe { self.ptr.add(self.vec_len - 1) };
let current = unsafe { self.ptr.add(num_taken) };
(last, current)
}
};
let allocation_to_drop = drop_vec.then_some((self.ptr, self.vec_cap));
ArrayIntoSeqIter::new(current, last, allocation_to_drop, ())
}
}
impl<T> ArrayConIter for ConIterVec<T> {
type Item = T;
fn progress_and_get_chunk_pointers(
&self,
chunk_size: usize,
) -> Option<ChunkPointers<Self::Item>> {
self.progress_and_get_begin_idx(chunk_size)
.map(|begin_idx| {
let end_idx = (begin_idx + chunk_size).min(self.vec_len).max(begin_idx);
let first = unsafe { self.ptr.add(begin_idx) }; let last = unsafe { self.ptr.add(end_idx - 1) }; ChunkPointers {
begin_idx,
first,
last,
}
})
}
}
impl<T> ConcurrentIter for ConIterVec<T>
where
T: Send,
{
type Item = T;
type SequentialIter = ArrayIntoSeqIter<T, ()>;
type ChunkPuller<'i>
= ArrayChunkPuller<'i, Self>
where
Self: 'i;
fn into_seq_iter(mut self) -> Self::SequentialIter {
self.remaining_into_seq_iter()
}
fn skip_to_end(&self) {
let current = self.counter.fetch_max(self.vec_len, Ordering::Acquire);
let num_taken_before = current.min(self.vec_len);
let _iter = self.slice_into_seq_iter(num_taken_before, false);
}
fn next(&self) -> Option<Self::Item> {
self.progress_and_get_begin_idx(1) .map(|idx| unsafe { take(self.ptr.add(idx) as *mut T) })
}
fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
self.progress_and_get_begin_idx(1) .map(|idx| (idx, unsafe { take(self.ptr.add(idx) as *mut T) }))
}
fn size_hint(&self) -> (usize, Option<usize>) {
let num_taken = self.counter.load(Ordering::Acquire);
let remaining = self.vec_len.saturating_sub(num_taken);
(remaining, Some(remaining))
}
fn is_completed_when_none_returned(&self) -> bool {
true
}
fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
Self::ChunkPuller::new(self, chunk_size)
}
}
impl<T> ExactSizeConcurrentIter for ConIterVec<T>
where
T: Send,
{
fn len(&self) -> usize {
let num_taken = self.counter.load(Ordering::Acquire);
self.vec_len.saturating_sub(num_taken)
}
}