use crate::iter::{buffered::iter_x::BufferIterX, con_iter_x::ConcurrentIterX};
use std::{
cell::UnsafeCell,
sync::atomic::{AtomicU8, AtomicUsize, Ordering},
};
pub struct ConIterOfIterX<T: Send + Sync, Iter>
where
Iter: Iterator<Item = T>,
{
pub(crate) iter: UnsafeCell<Iter>,
initial_len: Option<usize>,
counter: AtomicUsize,
is_mutating: AtomicU8,
}
type State = u8;
const AVAILABLE: State = 0;
const IS_MUTATING: State = 1;
const COMPLETED: State = 2;
impl<T: Send + Sync, Iter> ConIterOfIterX<T, Iter>
where
Iter: Iterator<Item = T>,
{
pub fn new(iter: Iter) -> Self {
let initial_len = match iter.size_hint() {
(_, None) => None,
(lower, Some(upper)) if lower == upper => Some(lower),
_ => None,
};
Self {
iter: iter.into(),
initial_len,
counter: 0.into(),
is_mutating: AVAILABLE.into(),
}
}
pub(crate) fn progress_and_get_begin_idx(&self, number_to_fetch: usize) -> Option<usize> {
match number_to_fetch {
0 => None,
_ => {
let begin_idx = self.counter.fetch_add(number_to_fetch, Ordering::Relaxed);
loop {
match self.try_get_handle() {
Ok(()) => return Some(begin_idx),
Err(COMPLETED) => return None,
_ => {}
}
}
}
}
}
fn get(&self, _item_idx: usize) -> Option<T> {
loop {
match self.try_get_handle() {
Ok(()) => {
let next = unsafe { &mut *self.iter.get() }.next();
match next.is_some() {
true => self.release_handle(),
false => self.release_handle_complete(),
}
return next;
}
Err(COMPLETED) => return None,
_ => {}
}
}
}
fn try_get_handle(&self) -> Result<(), State> {
self.is_mutating
.compare_exchange(AVAILABLE, IS_MUTATING, Ordering::Acquire, Ordering::Relaxed)
.map(|_| ())
}
pub(crate) fn release_handle(&self) {
match self.is_mutating.compare_exchange(
IS_MUTATING,
AVAILABLE,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => {}
Err(e) => assert_eq!(e, COMPLETED, "Failed to update ConIterOfIter state"),
}
}
pub(crate) fn release_handle_complete(&self) {
match self.is_mutating.compare_exchange(
IS_MUTATING,
COMPLETED,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => {}
Err(e) => assert_eq!(e, COMPLETED, "Failed to update ConIterOfIter state"),
}
}
}
impl<T: Send + Sync, Iter> std::fmt::Debug for ConIterOfIterX<T, Iter>
where
Iter: Iterator<Item = T>,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
super::helpers::fmt_iter(f, "ConIterOfIterX", self.initial_len, &self.counter)
}
}
impl<T: Send + Sync, Iter> From<Iter> for ConIterOfIterX<T, Iter>
where
Iter: Iterator<Item = T>,
{
fn from(iter: Iter) -> Self {
Self::new(iter)
}
}
unsafe impl<T: Send + Sync, Iter> Sync for ConIterOfIterX<T, Iter> where Iter: Iterator<Item = T> {}
unsafe impl<T: Send + Sync, Iter> Send for ConIterOfIterX<T, Iter> where Iter: Iterator<Item = T> {}
impl<T: Send + Sync, Iter> ConcurrentIterX for ConIterOfIterX<T, Iter>
where
Iter: Iterator<Item = T>,
{
type Item = T;
type SeqIter = Iter;
type BufferedIterX = BufferIterX<T, Iter>;
fn into_seq_iter(self) -> Self::SeqIter {
self.iter.into_inner()
}
fn next(&self) -> Option<Self::Item> {
let idx = self.counter.fetch_add(1, Ordering::Acquire);
self.get(idx)
}
fn next_chunk_x(&self, chunk_size: usize) -> Option<impl ExactSizeIterator<Item = Self::Item>> {
match chunk_size {
0 => None,
_ => match self.progress_and_get_begin_idx(chunk_size) {
None => None,
Some(begin_idx) => {
let iter = unsafe { &mut *self.iter.get() };
let end_idx = begin_idx + chunk_size;
let buffer: Vec<_> = (begin_idx..end_idx)
.map(|_| iter.next())
.take_while(|x| x.is_some())
.map(|x| x.expect("must be some"))
.collect();
match buffer.len() == chunk_size {
true => self.release_handle(),
false => self.release_handle_complete(),
}
let values = buffer.into_iter();
Some(values)
}
},
}
}
fn skip_to_end(&self) {
self.is_mutating.store(COMPLETED, Ordering::SeqCst);
}
fn try_get_len(&self) -> Option<usize> {
match self.is_mutating.load(Ordering::SeqCst) == COMPLETED {
true => Some(0),
false => self.initial_len.map(|initial_len| {
let current = self.counter.load(Ordering::Acquire);
initial_len.saturating_sub(current)
}),
}
}
#[inline(always)]
fn try_get_initial_len(&self) -> Option<usize> {
self.initial_len
}
}