orx_concurrent_iter/implementations/vec/
con_iter.rs

1use crate::{
2    concurrent_iter::ConcurrentIter,
3    exact_size_concurrent_iter::ExactSizeConcurrentIter,
4    implementations::{
5        array_utils::{ArrayChunkPuller, ArrayConIter, ArrayIntoSeqIter, ChunkPointers},
6        ptr_utils::take,
7    },
8};
9use alloc::vec::Vec;
10use core::{
11    mem::ManuallyDrop,
12    sync::atomic::{AtomicUsize, Ordering},
13};
14
15/// Concurrent iterator of a [`Vec`].
16///
17/// It can be created by calling [`into_con_iter`] on a vector.
18///
19/// [`into_con_iter`]: crate::IntoConcurrentIter::into_con_iter
20///
21/// # Examples
22///
23/// ```
24/// use orx_concurrent_iter::*;
25///
26/// let vec = vec![1, 2];
27/// let con_iter = vec.into_con_iter();
28/// assert_eq!(con_iter.next(), Some(1));
29/// assert_eq!(con_iter.next(), Some(2));
30/// assert_eq!(con_iter.next(), None);
31/// ```
32pub struct ConIterVec<T>
33where
34    T: Send + Sync,
35{
36    ptr: *const T,
37    vec_len: usize,
38    vec_cap: usize,
39    counter: AtomicUsize,
40}
41
42unsafe impl<T: Send + Sync> Sync for ConIterVec<T> {}
43
44unsafe impl<T: Send + Sync> Send for ConIterVec<T> {}
45
46impl<T> Default for ConIterVec<T>
47where
48    T: Send + Sync,
49{
50    fn default() -> Self {
51        Self::new(Vec::new())
52    }
53}
54
55impl<T> Drop for ConIterVec<T>
56where
57    T: Send + Sync,
58{
59    fn drop(&mut self) {
60        let _iter = self.remaining_into_seq_iter();
61    }
62}
63
64impl<T> ConIterVec<T>
65where
66    T: Send + Sync,
67{
68    pub(super) fn new(vec: Vec<T>) -> Self {
69        let (vec_len, vec_cap, ptr) = (vec.len(), vec.capacity(), vec.as_ptr());
70        let _ = ManuallyDrop::new(vec);
71        Self {
72            ptr,
73            vec_len,
74            vec_cap,
75            counter: 0.into(),
76        }
77    }
78
79    pub(super) fn initial_len(&self) -> usize {
80        self.vec_len
81    }
82
83    fn progress_and_get_begin_idx(&self, number_to_fetch: usize) -> Option<usize> {
84        let begin_idx = self.counter.fetch_add(number_to_fetch, Ordering::Relaxed);
85        match begin_idx < self.vec_len {
86            true => Some(begin_idx),
87            _ => None,
88        }
89    }
90
91    fn remaining_into_seq_iter(&mut self) -> ArrayIntoSeqIter<T, ()> {
92        // # SAFETY
93        // null ptr indicates that the data is already taken out of this iterator
94        // by a consuming method such as `into_seq_iter`
95        match self.ptr.is_null() {
96            true => Default::default(),
97            false => {
98                let num_taken = self.counter.load(Ordering::Acquire).min(self.vec_len);
99                let iter = self.slice_into_seq_iter(num_taken, true);
100                self.ptr = core::ptr::null();
101                iter
102            }
103        }
104    }
105
106    fn slice_into_seq_iter(&self, num_taken: usize, drop_vec: bool) -> ArrayIntoSeqIter<T, ()> {
107        let completed = num_taken == self.vec_len;
108        let (last, current) = match completed {
109            true => (core::ptr::null(), core::ptr::null()),
110            false => {
111                // SAFETY: self.vec_len is positive here, would be completed o/w
112                let last = unsafe { self.ptr.add(self.vec_len - 1) };
113                // SAFETY: first + num_taken is in bounds
114                let current = unsafe { self.ptr.add(num_taken) };
115                (last, current)
116            }
117        };
118
119        let allocation_to_drop = drop_vec.then_some((self.ptr, self.vec_cap));
120
121        ArrayIntoSeqIter::new(current, last, allocation_to_drop, ())
122    }
123}
124
125impl<T> ArrayConIter for ConIterVec<T>
126where
127    T: Send + Sync,
128{
129    type Item = T;
130
131    fn progress_and_get_chunk_pointers(
132        &self,
133        chunk_size: usize,
134    ) -> Option<ChunkPointers<Self::Item>> {
135        self.progress_and_get_begin_idx(chunk_size)
136            .map(|begin_idx| {
137                let end_idx = (begin_idx + chunk_size).min(self.vec_len).max(begin_idx);
138                let first = unsafe { self.ptr.add(begin_idx) }; // ptr + begin_idx is in bounds
139                let last = unsafe { self.ptr.add(end_idx - 1) }; // ptr + end_idx - 1 is in bounds
140                ChunkPointers {
141                    begin_idx,
142                    first,
143                    last,
144                }
145            })
146    }
147}
148
149impl<T> ConcurrentIter for ConIterVec<T>
150where
151    T: Send + Sync,
152{
153    type Item = T;
154
155    type SequentialIter = ArrayIntoSeqIter<T, ()>;
156
157    type ChunkPuller<'i>
158        = ArrayChunkPuller<'i, Self>
159    where
160        Self: 'i;
161
162    fn into_seq_iter(mut self) -> Self::SequentialIter {
163        self.remaining_into_seq_iter()
164    }
165
166    fn skip_to_end(&self) {
167        let current = self.counter.fetch_max(self.vec_len, Ordering::Acquire);
168        let num_taken_before = current.min(self.vec_len);
169        let _iter = self.slice_into_seq_iter(num_taken_before, false);
170    }
171
172    fn next(&self) -> Option<Self::Item> {
173        self.progress_and_get_begin_idx(1) // ptr + idx is in-bounds
174            .map(|idx| unsafe { take(self.ptr.add(idx) as *mut T) })
175    }
176
177    fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
178        self.progress_and_get_begin_idx(1) // ptr + idx is in-bounds
179            .map(|idx| (idx, unsafe { take(self.ptr.add(idx) as *mut T) }))
180    }
181
182    fn size_hint(&self) -> (usize, Option<usize>) {
183        let num_taken = self.counter.load(Ordering::Acquire);
184        let remaining = self.vec_len.saturating_sub(num_taken);
185        (remaining, Some(remaining))
186    }
187
188    fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
189        Self::ChunkPuller::new(self, chunk_size)
190    }
191}
192
193impl<T> ExactSizeConcurrentIter for ConIterVec<T>
194where
195    T: Send + Sync,
196{
197    fn len(&self) -> usize {
198        let num_taken = self.counter.load(Ordering::Acquire);
199        self.vec_len.saturating_sub(num_taken)
200    }
201}