orx_concurrent_iter/implementations/jagged_arrays/reference/
con_iter.rs

1use super::{
2    chunk_puller::ChunkPullerJaggedRef, raw_jagged_ref::RawJaggedRef,
3    slice_iter::RawJaggedSliceIterRef,
4};
5use crate::{
6    ConcurrentIter, ExactSizeConcurrentIter,
7    implementations::jagged_arrays::{JaggedIndexer, Slices},
8};
9use core::sync::atomic::{AtomicUsize, Ordering};
10
11/// Flattened concurrent iterator of a raw jagged array yielding references to elements.
12pub struct ConIterJaggedRef<'a, T, S, X>
13where
14    T: Sync,
15    X: JaggedIndexer,
16    S: Slices<'a, T>,
17{
18    jagged: RawJaggedRef<'a, T, S, X>,
19    counter: AtomicUsize,
20}
21
22unsafe impl<'a, T, S, X> Sync for ConIterJaggedRef<'a, T, S, X>
23where
24    T: Sync,
25    X: JaggedIndexer,
26    S: Slices<'a, T>,
27{
28}
29
30impl<'a, T, S, X> ConIterJaggedRef<'a, T, S, X>
31where
32    T: Sync,
33    X: JaggedIndexer,
34    S: Slices<'a, T>,
35{
36    pub(crate) fn new(jagged: RawJaggedRef<'a, T, S, X>, begin: usize) -> Self {
37        Self {
38            jagged,
39            counter: begin.into(),
40        }
41    }
42
43    fn progress_and_get_begin_idx(&self, number_to_fetch: usize) -> Option<usize> {
44        let begin_idx = self.counter.fetch_add(number_to_fetch, Ordering::Relaxed);
45        match begin_idx < self.jagged.len() {
46            true => Some(begin_idx),
47            false => None,
48        }
49    }
50
51    pub(super) fn progress_and_get_iter(
52        &self,
53        chunk_size: usize,
54    ) -> Option<(usize, RawJaggedSliceIterRef<'a, T, S, X>)> {
55        self.progress_and_get_begin_idx(chunk_size)
56            .map(|begin_idx| {
57                let end_idx = (begin_idx + chunk_size)
58                    .min(self.jagged.len())
59                    .max(begin_idx);
60                let slice = self.jagged.jagged_slice(begin_idx, end_idx);
61                let iter = RawJaggedSliceIterRef::new(slice);
62                (begin_idx, iter)
63            })
64    }
65}
66
67impl<'a, T, S, X> ConcurrentIter for ConIterJaggedRef<'a, T, S, X>
68where
69    T: Sync,
70    X: JaggedIndexer,
71    S: Slices<'a, T>,
72{
73    type Item = &'a T;
74
75    type SequentialIter = RawJaggedSliceIterRef<'a, T, S, X>;
76
77    type ChunkPuller<'i>
78        = ChunkPullerJaggedRef<'i, 'a, T, S, X>
79    where
80        Self: 'i;
81
82    fn into_seq_iter(self) -> Self::SequentialIter {
83        let num_taken = self.counter.load(Ordering::Acquire).min(self.jagged.len());
84        let flat_end = self.jagged.len();
85        let slice = self.jagged.jagged_slice(num_taken, flat_end);
86        RawJaggedSliceIterRef::new(slice)
87    }
88
89    fn skip_to_end(&self) {
90        let _ = self.counter.fetch_max(self.jagged.len(), Ordering::Acquire);
91    }
92
93    fn next(&self) -> Option<Self::Item> {
94        self.progress_and_get_begin_idx(1)
95            .and_then(|idx| self.jagged.get(idx))
96    }
97
98    fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
99        self.progress_and_get_begin_idx(1)
100            .and_then(|idx| self.jagged.get(idx).map(|value| (idx, value)))
101    }
102
103    fn size_hint(&self) -> (usize, Option<usize>) {
104        let num_taken = self.counter.load(Ordering::Acquire);
105        let remaining = self.jagged.len().saturating_sub(num_taken);
106        (remaining, Some(remaining))
107    }
108
109    fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
110        Self::ChunkPuller::new(self, chunk_size)
111    }
112}
113
114impl<'a, T, S, X> ExactSizeConcurrentIter for ConIterJaggedRef<'a, T, S, X>
115where
116    T: Sync,
117    X: JaggedIndexer,
118    S: Slices<'a, T>,
119{
120    fn len(&self) -> usize {
121        let num_taken = self.counter.load(Ordering::Acquire);
122        self.jagged.len().saturating_sub(num_taken)
123    }
124}