orx_concurrent_iter/implementations/jagged_arrays/reference/
con_iter.rs1use super::{
2 chunk_puller::ChunkPullerJaggedRef, raw_jagged_ref::RawJaggedRef,
3 slice_iter::RawJaggedSliceIterRef,
4};
5use crate::{
6 ConcurrentIter, ExactSizeConcurrentIter,
7 implementations::jagged_arrays::{JaggedIndexer, Slices},
8};
9use core::sync::atomic::{AtomicUsize, Ordering};
10
11pub struct ConIterJaggedRef<'a, T, S, X>
13where
14 T: Sync + 'a,
15 X: JaggedIndexer,
16 S: Slices<'a, T> + Send + Sync,
17{
18 jagged: RawJaggedRef<'a, T, S, X>,
19 counter: AtomicUsize,
20}
21
22impl<'a, T, S, X> ConIterJaggedRef<'a, T, S, X>
23where
24 T: Sync + 'a,
25 X: JaggedIndexer + Send + Sync,
26 S: Slices<'a, T> + Send + Sync,
27{
28 pub(crate) fn new(jagged: RawJaggedRef<'a, T, S, X>, begin: usize) -> Self {
29 Self {
30 jagged,
31 counter: begin.into(),
32 }
33 }
34
35 fn progress_and_get_begin_idx(&self, number_to_fetch: usize) -> Option<usize> {
36 let begin_idx = self.counter.fetch_add(number_to_fetch, Ordering::Relaxed);
37 match begin_idx < self.jagged.len() {
38 true => Some(begin_idx),
39 false => None,
40 }
41 }
42
43 pub(super) fn progress_and_get_iter(
44 &self,
45 chunk_size: usize,
46 ) -> Option<(usize, RawJaggedSliceIterRef<'a, T, S, X>)> {
47 self.progress_and_get_begin_idx(chunk_size)
48 .map(|begin_idx| {
49 let end_idx = (begin_idx + chunk_size)
50 .min(self.jagged.len())
51 .max(begin_idx);
52 let slice = self.jagged.jagged_slice(begin_idx, end_idx);
53 let iter = RawJaggedSliceIterRef::new(slice);
54 (begin_idx, iter)
55 })
56 }
57}
58
59impl<'a, T, S, X> ConcurrentIter for ConIterJaggedRef<'a, T, S, X>
60where
61 T: Sync + 'a,
62 X: JaggedIndexer,
63 S: Slices<'a, T> + Send + Sync,
64{
65 type Item = &'a T;
66
67 type SequentialIter = RawJaggedSliceIterRef<'a, T, S, X>;
68
69 type ChunkPuller<'i>
70 = ChunkPullerJaggedRef<'i, 'a, T, S, X>
71 where
72 Self: 'i;
73
74 fn into_seq_iter(self) -> Self::SequentialIter {
75 let num_taken = self.counter.load(Ordering::Acquire).min(self.jagged.len());
76 let flat_end = self.jagged.len();
77 let slice = self.jagged.jagged_slice(num_taken, flat_end);
78 RawJaggedSliceIterRef::new(slice)
79 }
80
81 fn skip_to_end(&self) {
82 let _ = self.counter.fetch_max(self.jagged.len(), Ordering::Acquire);
83 }
84
85 fn next(&self) -> Option<Self::Item> {
86 self.progress_and_get_begin_idx(1)
87 .and_then(|idx| self.jagged.get(idx))
88 }
89
90 fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
91 self.progress_and_get_begin_idx(1)
92 .and_then(|idx| self.jagged.get(idx).map(|value| (idx, value)))
93 }
94
95 fn size_hint(&self) -> (usize, Option<usize>) {
96 let num_taken = self.counter.load(Ordering::Acquire);
97 let remaining = self.jagged.len().saturating_sub(num_taken);
98 (remaining, Some(remaining))
99 }
100
101 fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
102 Self::ChunkPuller::new(self, chunk_size)
103 }
104}
105
106impl<'a, T, S, X> ExactSizeConcurrentIter for ConIterJaggedRef<'a, T, S, X>
107where
108 T: Sync + 'a,
109 X: JaggedIndexer,
110 S: Slices<'a, T> + Send + Sync,
111{
112 fn len(&self) -> usize {
113 let num_taken = self.counter.load(Ordering::Acquire);
114 self.jagged.len().saturating_sub(num_taken)
115 }
116}