orx_concurrent_iter/implementations/jagged_arrays/owned/
con_iter.rs

1use super::{
2    chunk_puller::ChunkPullerJaggedOwned, into_iter::RawJaggedIterOwned, raw_jagged::RawJagged,
3    slice_iter::RawJaggedSliceIterOwned,
4};
5use crate::{
6    ConcurrentIter, ExactSizeConcurrentIter, implementations::jagged_arrays::indexer::JaggedIndexer,
7};
8use core::sync::atomic::{AtomicUsize, Ordering};
9
10/// Flattened concurrent iterator of a raw jagged array yielding owned elements.
11///
12/// Ensures that all elements are dropped regardless of whether they are iterated over or skipped.
13/// Further, cleans up the allocations of the jagged array.
14pub struct ConIterJaggedOwned<T, X>
15where
16    T: Send,
17    X: JaggedIndexer,
18{
19    jagged: RawJagged<T, X>,
20    counter: AtomicUsize,
21}
22
23unsafe impl<T: Send, X: JaggedIndexer> Sync for ConIterJaggedOwned<T, X> {}
24
25impl<T, X> ConIterJaggedOwned<T, X>
26where
27    T: Send,
28    X: JaggedIndexer,
29{
30    pub(crate) fn new(jagged: RawJagged<T, X>, begin: usize) -> Self {
31        Self {
32            jagged,
33            counter: begin.into(),
34        }
35    }
36
37    fn progress_and_get_begin_idx(&self, number_to_fetch: usize) -> Option<usize> {
38        let begin_idx = self.counter.fetch_add(number_to_fetch, Ordering::Relaxed);
39        match begin_idx < self.jagged.len() {
40            true => Some(begin_idx),
41            false => None,
42        }
43    }
44
45    pub(super) fn progress_and_get_iter(
46        &self,
47        chunk_size: usize,
48    ) -> Option<(usize, RawJaggedSliceIterOwned<'_, T>)> {
49        self.progress_and_get_begin_idx(chunk_size)
50            .map(|begin_idx| {
51                let end_idx = (begin_idx + chunk_size)
52                    .min(self.jagged.len())
53                    .max(begin_idx);
54                let slice = self.jagged.slice(begin_idx, end_idx);
55                let iter = RawJaggedSliceIterOwned::new(slice);
56                (begin_idx, iter)
57            })
58    }
59}
60
61impl<T, X> ConcurrentIter for ConIterJaggedOwned<T, X>
62where
63    T: Send,
64    X: JaggedIndexer,
65{
66    type Item = T;
67
68    type SequentialIter = RawJaggedIterOwned<T, X>;
69
70    type ChunkPuller<'i>
71        = ChunkPullerJaggedOwned<'i, T, X>
72    where
73        Self: 'i;
74
75    fn into_seq_iter(mut self) -> Self::SequentialIter {
76        let num_taken = self.counter.load(Ordering::Acquire).min(self.jagged.len());
77
78        let jagged_to_drop = self.jagged.move_into_new(num_taken);
79
80        RawJaggedIterOwned::new(jagged_to_drop)
81    }
82
83    fn skip_to_end(&self) {
84        let current = self.counter.fetch_max(self.jagged.len(), Ordering::Acquire);
85        let num_taken_before = current.min(self.jagged.len());
86        let slice = self.jagged.slice_from(num_taken_before);
87        let _iter = RawJaggedSliceIterOwned::new(slice);
88    }
89
90    fn next(&self) -> Option<Self::Item> {
91        self.progress_and_get_begin_idx(1).and_then(|idx| {
92            // SAFETY: `counter` ensures that elements from each position is taken only once
93            unsafe { self.jagged.take(idx) }
94        })
95    }
96
97    fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
98        self.progress_and_get_begin_idx(1).and_then(|idx| {
99            // SAFETY: `counter` ensures that elements from each position is taken only once
100            unsafe { self.jagged.take(idx).map(|value| (idx, value)) }
101        })
102    }
103
104    fn size_hint(&self) -> (usize, Option<usize>) {
105        let num_taken = self.counter.load(Ordering::Acquire);
106        let remaining = self.jagged.len().saturating_sub(num_taken);
107        (remaining, Some(remaining))
108    }
109
110    fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
111        Self::ChunkPuller::new(self, chunk_size)
112    }
113}
114
115impl<T, X> ExactSizeConcurrentIter for ConIterJaggedOwned<T, X>
116where
117    T: Send,
118    T: Send + Sync,
119    X: JaggedIndexer + Send + Sync,
120{
121    fn len(&self) -> usize {
122        let num_taken = self.counter.load(Ordering::Acquire);
123        self.jagged.len().saturating_sub(num_taken)
124    }
125}
126
127impl<T, X> Drop for ConIterJaggedOwned<T, X>
128where
129    T: Send,
130    X: JaggedIndexer,
131{
132    fn drop(&mut self) {
133        if self.jagged.num_taken().is_some() {
134            let num_taken = self.counter.load(Ordering::Acquire);
135            // SAFETY: `num_taken` elements are already taken out by the concurrent iterator.
136            // Before dropping this iterator, this is set as the `num_taken` of the raw
137            // jagged array which is responsible from dropping the elements and allocations.
138            // This will ensure that these `num_taken` elements will not be attempted to be
139            // dropped the second time.
140            unsafe { self.jagged.set_num_taken(Some(num_taken)) };
141        }
142    }
143}