orx_concurrent_iter/implementations/vec/
con_iter.rs

1use super::{chunk_puller::ChunkPullerVec, vec_into_seq_iter::VecIntoSeqIter};
2use crate::{
3    concurrent_iter::ConcurrentIter, exact_size_concurrent_iter::ExactSizeConcurrentIter,
4    implementations::ptr_utils::take,
5};
6use alloc::vec::Vec;
7use core::{
8    mem::ManuallyDrop,
9    sync::atomic::{AtomicUsize, Ordering},
10};
11
12/// Concurrent iterator of a [`Vec`].
13///
14/// It can be created by calling [`into_con_iter`] on a vector.
15///
16/// [`into_con_iter`]: crate::IntoConcurrentIter::into_con_iter
17///
18/// # Examples
19///
20/// ```
21/// use orx_concurrent_iter::*;
22///
23/// let vec = vec![1, 2];
24/// let con_iter = vec.into_con_iter();
25/// assert_eq!(con_iter.next(), Some(1));
26/// assert_eq!(con_iter.next(), Some(2));
27/// assert_eq!(con_iter.next(), None);
28/// ```
29pub struct ConIterVec<T>
30where
31    T: Send + Sync,
32{
33    ptr: *const T,
34    vec_len: usize,
35    vec_cap: usize,
36    counter: AtomicUsize,
37}
38
39unsafe impl<T: Send + Sync> Sync for ConIterVec<T> {}
40
41unsafe impl<T: Send + Sync> Send for ConIterVec<T> {}
42
43impl<T> Default for ConIterVec<T>
44where
45    T: Send + Sync,
46{
47    fn default() -> Self {
48        Self::new(Vec::new())
49    }
50}
51
52impl<T> Drop for ConIterVec<T>
53where
54    T: Send + Sync,
55{
56    fn drop(&mut self) {
57        let _iter = self.remaining_into_seq_iter();
58    }
59}
60
61impl<T> ConIterVec<T>
62where
63    T: Send + Sync,
64{
65    pub(super) fn new(vec: Vec<T>) -> Self {
66        let (vec_len, vec_cap, ptr) = (vec.len(), vec.capacity(), vec.as_ptr());
67        let _ = ManuallyDrop::new(vec);
68        Self {
69            ptr,
70            vec_len,
71            vec_cap,
72            counter: 0.into(),
73        }
74    }
75
76    pub(super) fn initial_len(&self) -> usize {
77        self.vec_len
78    }
79
80    fn progress_and_get_begin_idx(&self, number_to_fetch: usize) -> Option<usize> {
81        let begin_idx = self.counter.fetch_add(number_to_fetch, Ordering::Relaxed);
82        match begin_idx < self.vec_len {
83            true => Some(begin_idx),
84            _ => None,
85        }
86    }
87
88    pub(super) fn progress_and_get_chunk_pointers(
89        &self,
90        chunk_size: usize,
91    ) -> Option<(usize, *const T, *const T)> {
92        self.progress_and_get_begin_idx(chunk_size)
93            .map(|begin_idx| {
94                let end_idx = (begin_idx + chunk_size).min(self.vec_len).max(begin_idx);
95                let first = unsafe { self.ptr.add(begin_idx) }; // ptr + begin_idx is in bounds
96                let last = unsafe { self.ptr.add(end_idx - 1) }; // ptr + end_idx - 1 is in bounds
97                (begin_idx, first, last)
98            })
99    }
100
101    fn remaining_into_seq_iter(&mut self) -> VecIntoSeqIter<T> {
102        // # SAFETY
103        // null ptr indicates that the data is already taken out of this iterator
104        // by a consuming method such as `into_seq_iter`
105        match self.ptr.is_null() {
106            true => Default::default(),
107            false => {
108                let num_taken = self.counter.load(Ordering::Acquire).min(self.vec_len);
109                let iter = self.slice_into_seq_iter(num_taken, true);
110                self.ptr = core::ptr::null();
111                iter
112            }
113        }
114    }
115
116    fn slice_into_seq_iter(&self, num_taken: usize, drop_vec: bool) -> VecIntoSeqIter<T> {
117        let p = self.ptr;
118        let completed = num_taken == self.vec_len;
119
120        let (first, last, current) = match completed {
121            true => (p, p, p),
122            false => {
123                let first = p;
124                let last = unsafe { first.add(self.vec_len - 1) }; // self.vec_len is positive here
125                let current = unsafe { first.add(num_taken) }; // first + num_taken is in bounds
126                (first, last, current)
127            }
128        };
129
130        let drop_vec_capacity = drop_vec.then_some(self.vec_cap);
131        VecIntoSeqIter::new(completed, first, last, current, drop_vec_capacity)
132    }
133}
134
135impl<T> ConcurrentIter for ConIterVec<T>
136where
137    T: Send + Sync,
138{
139    type Item = T;
140
141    type SequentialIter = VecIntoSeqIter<T>;
142
143    type ChunkPuller<'i>
144        = ChunkPullerVec<'i, T>
145    where
146        Self: 'i;
147
148    fn into_seq_iter(mut self) -> Self::SequentialIter {
149        self.remaining_into_seq_iter()
150    }
151
152    fn skip_to_end(&self) {
153        let current = self.counter.fetch_max(self.vec_len, Ordering::Acquire);
154        let num_taken_before = current.min(self.vec_len);
155        let _iter = self.slice_into_seq_iter(num_taken_before, false);
156    }
157
158    fn next(&self) -> Option<Self::Item> {
159        self.progress_and_get_begin_idx(1) // ptr + idx is in-bounds
160            .map(|idx| unsafe { take(self.ptr.add(idx) as *mut T) })
161    }
162
163    fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
164        self.progress_and_get_begin_idx(1) // ptr + idx is in-bounds
165            .map(|idx| (idx, unsafe { take(self.ptr.add(idx) as *mut T) }))
166    }
167
168    fn size_hint(&self) -> (usize, Option<usize>) {
169        let num_taken = self.counter.load(Ordering::Acquire);
170        let remaining = self.vec_len.saturating_sub(num_taken);
171        (remaining, Some(remaining))
172    }
173
174    fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
175        Self::ChunkPuller::new(self, chunk_size)
176    }
177}
178
179impl<T> ExactSizeConcurrentIter for ConIterVec<T>
180where
181    T: Send + Sync,
182{
183    fn len(&self) -> usize {
184        let num_taken = self.counter.load(Ordering::Acquire);
185        self.vec_len.saturating_sub(num_taken)
186    }
187}