orx_concurrent_iter/implementations/jagged_arrays/owned/
con_iter.rs

1use super::{
2    chunk_puller::ChunkPullerJaggedOwned, into_iter::RawJaggedIterOwned, raw_jagged::RawJagged,
3    slice_iter::RawJaggedSliceIterOwned,
4};
5use crate::{
6    ConcurrentIter, ExactSizeConcurrentIter, implementations::jagged_arrays::indexer::JaggedIndexer,
7};
8use core::sync::atomic::{AtomicUsize, Ordering};
9
10/// Flattened concurrent iterator of a raw jagged array yielding owned elements.
11///
12/// Ensures that all elements are dropped regardless of whether they are iterated over or skipped.
13/// Further, cleans up the allocations of the jagged array.
14pub struct ConIterJaggedOwned<T, X>
15where
16    T: Send + Sync,
17    X: JaggedIndexer + Send + Sync,
18{
19    jagged: RawJagged<T, X>,
20    counter: AtomicUsize,
21}
22
23unsafe impl<T, X> Sync for ConIterJaggedOwned<T, X>
24where
25    T: Send + Sync,
26    X: JaggedIndexer + Send + Sync,
27{
28}
29
30unsafe impl<T, X> Send for ConIterJaggedOwned<T, X>
31where
32    T: Send + Sync,
33    X: JaggedIndexer + Send + Sync,
34{
35}
36
37impl<T, X> ConIterJaggedOwned<T, X>
38where
39    T: Send + Sync,
40    X: JaggedIndexer + Send + Sync,
41{
42    pub(crate) fn new(jagged: RawJagged<T, X>, begin: usize) -> Self {
43        Self {
44            jagged,
45            counter: begin.into(),
46        }
47    }
48
49    fn progress_and_get_begin_idx(&self, number_to_fetch: usize) -> Option<usize> {
50        let begin_idx = self.counter.fetch_add(number_to_fetch, Ordering::Relaxed);
51        match begin_idx < self.jagged.len() {
52            true => Some(begin_idx),
53            false => None,
54        }
55    }
56
57    pub(super) fn progress_and_get_iter(
58        &self,
59        chunk_size: usize,
60    ) -> Option<(usize, RawJaggedSliceIterOwned<'_, T>)> {
61        self.progress_and_get_begin_idx(chunk_size)
62            .map(|begin_idx| {
63                let end_idx = (begin_idx + chunk_size)
64                    .min(self.jagged.len())
65                    .max(begin_idx);
66                let slice = self.jagged.slice(begin_idx, end_idx);
67                let iter = RawJaggedSliceIterOwned::new(slice);
68                (begin_idx, iter)
69            })
70    }
71}
72
73impl<T, X> ConcurrentIter for ConIterJaggedOwned<T, X>
74where
75    T: Send + Sync,
76    X: JaggedIndexer + Send + Sync,
77{
78    type Item = T;
79
80    type SequentialIter = RawJaggedIterOwned<T, X>;
81
82    type ChunkPuller<'i>
83        = ChunkPullerJaggedOwned<'i, T, X>
84    where
85        Self: 'i;
86
87    fn into_seq_iter(mut self) -> Self::SequentialIter {
88        let num_taken = self.counter.load(Ordering::Acquire).min(self.jagged.len());
89
90        let jagged_to_drop = self.jagged.move_into_new(num_taken);
91
92        RawJaggedIterOwned::new(jagged_to_drop)
93    }
94
95    fn skip_to_end(&self) {
96        let current = self.counter.fetch_max(self.jagged.len(), Ordering::Acquire);
97        let num_taken_before = current.min(self.jagged.len());
98        let slice = self.jagged.slice_from(num_taken_before);
99        let _iter = RawJaggedSliceIterOwned::new(slice);
100    }
101
102    fn next(&self) -> Option<Self::Item> {
103        self.progress_and_get_begin_idx(1).and_then(|idx| {
104            // SAFETY: `counter` ensures that elements from each position is taken only once
105            unsafe { self.jagged.take(idx) }
106        })
107    }
108
109    fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
110        self.progress_and_get_begin_idx(1).and_then(|idx| {
111            // SAFETY: `counter` ensures that elements from each position is taken only once
112            unsafe { self.jagged.take(idx).map(|value| (idx, value)) }
113        })
114    }
115
116    fn size_hint(&self) -> (usize, Option<usize>) {
117        let num_taken = self.counter.load(Ordering::Acquire);
118        let remaining = self.jagged.len().saturating_sub(num_taken);
119        (remaining, Some(remaining))
120    }
121
122    fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
123        Self::ChunkPuller::new(self, chunk_size)
124    }
125}
126
127impl<T, X> ExactSizeConcurrentIter for ConIterJaggedOwned<T, X>
128where
129    T: Send + Sync,
130    X: JaggedIndexer + Send + Sync,
131{
132    fn len(&self) -> usize {
133        let num_taken = self.counter.load(Ordering::Acquire);
134        self.jagged.len().saturating_sub(num_taken)
135    }
136}
137
138impl<T, X> Drop for ConIterJaggedOwned<T, X>
139where
140    T: Send + Sync,
141    X: JaggedIndexer + Send + Sync,
142{
143    fn drop(&mut self) {
144        if self.jagged.num_taken().is_some() {
145            let num_taken = self.counter.load(Ordering::Acquire);
146            // SAFETY: `num_taken` elements are already taken out by the concurrent iterator.
147            // Before dropping this iterator, this is set as the `num_taken` of the raw
148            // jagged array which is responsible from dropping the elements and allocations.
149            // This will ensure that these `num_taken` elements will not be attempted to be
150            // dropped the second time.
151            unsafe { self.jagged.set_num_taken(Some(num_taken)) };
152        }
153    }
154}