use super::{
atomic_counter::AtomicCounter,
con_iter::{ConcurrentIter, ExactSizeConcurrentIter},
};
use crate::next::{Next, NextMany};
use std::cmp::Ordering;
pub trait AtomicIter: Send + Sync {
type Item: Send + Sync;
fn counter(&self) -> &AtomicCounter;
fn get(&self, item_idx: usize) -> Option<Self::Item>;
#[inline(always)]
fn fetch_one(&self) -> Option<Next<Self::Item>> {
let idx = self.counter().fetch_and_increment();
self.get(idx).map(|value| Next { idx, value })
}
fn fetch_n(&self, n: usize) -> NextMany<Self::Item, impl Iterator<Item = Self::Item>> {
let begin_idx = self.counter().fetch_and_add(n);
let idx_range = begin_idx..(begin_idx + n);
let values = idx_range
.map(|i| self.get(i))
.take_while(|x| x.is_some())
.map(|x| x.expect("is-some is checked"));
NextMany { begin_idx, values }
}
}
pub trait AtomicIterWithInitialLen: AtomicIter {
fn initial_len(&self) -> usize;
}
impl<A: AtomicIter> ConcurrentIter for A {
type Item = A::Item;
#[inline(always)]
fn next_id_and_value(&self) -> Option<Next<<Self as AtomicIter>::Item>> {
self.fetch_one()
}
#[inline(always)]
fn next_id_and_chunk(
&self,
n: usize,
) -> NextMany<<Self as AtomicIter>::Item, impl Iterator<Item = <Self as AtomicIter>::Item>>
{
self.fetch_n(n)
}
}
impl<A: AtomicIterWithInitialLen> ExactSizeConcurrentIter for A {
fn len(&self) -> usize {
let current = self.counter().current();
match current.cmp(&self.initial_len()) {
Ordering::Less => self.initial_len() - current,
_ => 0,
}
}
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use std::ops::Add;
pub(crate) const ATOMIC_TEST_LEN: usize = 512;
pub(crate) const ATOMIC_FETCH_N: [usize; 8] = [
1,
2,
4,
8,
ATOMIC_TEST_LEN / 2,
ATOMIC_TEST_LEN,
ATOMIC_TEST_LEN + 1,
ATOMIC_TEST_LEN * 2,
];
pub(crate) fn atomic_fetch_one<A>(iter: A)
where
A: AtomicIter,
A::Item: Add<usize, Output = usize>,
{
assert_eq!(0, iter.counter().current());
let mut i = 0;
while let Some(next) = iter.fetch_one() {
let value = next.value + 0usize;
assert_eq!(value, i);
i += 1;
assert_eq!(i, iter.counter().current());
}
}
pub(crate) fn atomic_fetch_n<A>(iter: A, n: usize)
where
A: AtomicIter,
A::Item: Add<usize, Output = usize>,
{
assert_eq!(0, iter.counter().current());
let mut i = 0;
let mut has_more = true;
while has_more {
has_more = false;
let next_id_and_chunk = iter.fetch_n(n);
for (j, value) in next_id_and_chunk.values.enumerate() {
let value = value + 0usize;
assert_eq!(value, next_id_and_chunk.begin_idx + j);
assert_eq!(value, i);
i += 1;
has_more = true;
}
}
}
pub(crate) fn atomic_exact_fetch_one<A>(iter: A)
where
A: AtomicIterWithInitialLen,
A::Item: Add<usize, Output = usize>,
{
let mut remaining = ATOMIC_TEST_LEN;
assert!(!iter.is_empty());
assert_eq!(iter.len(), remaining);
while iter.fetch_one().is_some() {
remaining -= 1;
assert_eq!(iter.len(), remaining);
}
assert_eq!(iter.len(), 0);
assert!(iter.is_empty());
}
pub(crate) fn atomic_exact_fetch_n<A>(iter: A, n: usize)
where
A: AtomicIterWithInitialLen,
A::Item: Add<usize, Output = usize>,
{
let mut remaining = ATOMIC_TEST_LEN;
assert!(!iter.is_empty());
assert_eq!(iter.len(), remaining);
let mut has_more = true;
while has_more {
has_more = false;
let mut next_id_and_chunk = iter.fetch_n(n);
if next_id_and_chunk.values.next().is_some() {
has_more = true;
}
if n > remaining {
remaining = 0;
} else {
remaining -= n;
}
assert_eq!(iter.len(), remaining);
}
assert_eq!(iter.len(), 0);
assert!(iter.is_empty());
}
}