orx_concurrent_iter/implementations/jagged_arrays/owned/
con_iter.rs1use super::{
2 chunk_puller::ChunkPullerJaggedOwned, into_iter::RawJaggedIterOwned, raw_jagged::RawJagged,
3 slice_iter::RawJaggedSliceIterOwned,
4};
5use crate::{
6 ConcurrentIter, ExactSizeConcurrentIter, implementations::jagged_arrays::indexer::JaggedIndexer,
7};
8use core::sync::atomic::{AtomicUsize, Ordering};
9
10pub struct ConIterJaggedOwned<T, X>
15where
16 T: Send + Sync,
17 X: JaggedIndexer + Send + Sync,
18{
19 jagged: RawJagged<T, X>,
20 counter: AtomicUsize,
21}
22
23unsafe impl<T, X> Sync for ConIterJaggedOwned<T, X>
24where
25 T: Send + Sync,
26 X: JaggedIndexer + Send + Sync,
27{
28}
29
30unsafe impl<T, X> Send for ConIterJaggedOwned<T, X>
31where
32 T: Send + Sync,
33 X: JaggedIndexer + Send + Sync,
34{
35}
36
37impl<T, X> ConIterJaggedOwned<T, X>
38where
39 T: Send + Sync,
40 X: JaggedIndexer + Send + Sync,
41{
42 pub(crate) fn new(jagged: RawJagged<T, X>, begin: usize) -> Self {
43 Self {
44 jagged,
45 counter: begin.into(),
46 }
47 }
48
49 fn progress_and_get_begin_idx(&self, number_to_fetch: usize) -> Option<usize> {
50 let begin_idx = self.counter.fetch_add(number_to_fetch, Ordering::Relaxed);
51 match begin_idx < self.jagged.len() {
52 true => Some(begin_idx),
53 false => None,
54 }
55 }
56
57 pub(super) fn progress_and_get_iter(
58 &self,
59 chunk_size: usize,
60 ) -> Option<(usize, RawJaggedSliceIterOwned<'_, T>)> {
61 self.progress_and_get_begin_idx(chunk_size)
62 .map(|begin_idx| {
63 let end_idx = (begin_idx + chunk_size)
64 .min(self.jagged.len())
65 .max(begin_idx);
66 let slice = self.jagged.slice(begin_idx, end_idx);
67 let iter = RawJaggedSliceIterOwned::new(slice);
68 (begin_idx, iter)
69 })
70 }
71}
72
73impl<T, X> ConcurrentIter for ConIterJaggedOwned<T, X>
74where
75 T: Send + Sync,
76 X: JaggedIndexer + Send + Sync,
77{
78 type Item = T;
79
80 type SequentialIter = RawJaggedIterOwned<T, X>;
81
82 type ChunkPuller<'i>
83 = ChunkPullerJaggedOwned<'i, T, X>
84 where
85 Self: 'i;
86
87 fn into_seq_iter(mut self) -> Self::SequentialIter {
88 let num_taken = self.counter.load(Ordering::Acquire).min(self.jagged.len());
89
90 let jagged_to_drop = self.jagged.move_into_new(num_taken);
91
92 RawJaggedIterOwned::new(jagged_to_drop)
93 }
94
95 fn skip_to_end(&self) {
96 let current = self.counter.fetch_max(self.jagged.len(), Ordering::Acquire);
97 let num_taken_before = current.min(self.jagged.len());
98 let slice = self.jagged.slice_from(num_taken_before);
99 let _iter = RawJaggedSliceIterOwned::new(slice);
100 }
101
102 fn next(&self) -> Option<Self::Item> {
103 self.progress_and_get_begin_idx(1).and_then(|idx| {
104 unsafe { self.jagged.take(idx) }
106 })
107 }
108
109 fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
110 self.progress_and_get_begin_idx(1).and_then(|idx| {
111 unsafe { self.jagged.take(idx).map(|value| (idx, value)) }
113 })
114 }
115
116 fn size_hint(&self) -> (usize, Option<usize>) {
117 let num_taken = self.counter.load(Ordering::Acquire);
118 let remaining = self.jagged.len().saturating_sub(num_taken);
119 (remaining, Some(remaining))
120 }
121
122 fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
123 Self::ChunkPuller::new(self, chunk_size)
124 }
125}
126
127impl<T, X> ExactSizeConcurrentIter for ConIterJaggedOwned<T, X>
128where
129 T: Send + Sync,
130 X: JaggedIndexer + Send + Sync,
131{
132 fn len(&self) -> usize {
133 let num_taken = self.counter.load(Ordering::Acquire);
134 self.jagged.len().saturating_sub(num_taken)
135 }
136}
137
138impl<T, X> Drop for ConIterJaggedOwned<T, X>
139where
140 T: Send + Sync,
141 X: JaggedIndexer + Send + Sync,
142{
143 fn drop(&mut self) {
144 if self.jagged.num_taken().is_some() {
145 let num_taken = self.counter.load(Ordering::Acquire);
146 unsafe { self.jagged.set_num_taken(Some(num_taken)) };
152 }
153 }
154}