orx_concurrent_iter/implementations/vec/
con_iter.rs

1use crate::{
2    concurrent_iter::ConcurrentIter,
3    exact_size_concurrent_iter::ExactSizeConcurrentIter,
4    implementations::{
5        array_utils::{ArrayChunkPuller, ArrayConIter, ArrayIntoSeqIter, ChunkPointers},
6        ptr_utils::take,
7    },
8};
9use alloc::vec::Vec;
10use core::{
11    mem::ManuallyDrop,
12    sync::atomic::{AtomicUsize, Ordering},
13};
14
15/// Concurrent iterator of a [`Vec`].
16///
17/// It can be created by calling [`into_con_iter`] on a vector.
18///
19/// [`into_con_iter`]: crate::IntoConcurrentIter::into_con_iter
20///
21/// # Examples
22///
23/// ```
24/// use orx_concurrent_iter::*;
25///
26/// let vec = vec![1, 2];
27/// let con_iter = vec.into_con_iter();
28/// assert_eq!(con_iter.next(), Some(1));
29/// assert_eq!(con_iter.next(), Some(2));
30/// assert_eq!(con_iter.next(), None);
31/// ```
32pub struct ConIterVec<T> {
33    ptr: *const T,
34    vec_len: usize,
35    vec_cap: usize,
36    counter: AtomicUsize,
37}
38
39unsafe impl<T: Send> Sync for ConIterVec<T> {}
40
41impl<T> Default for ConIterVec<T> {
42    fn default() -> Self {
43        Self::new(Vec::new())
44    }
45}
46
47impl<T> Drop for ConIterVec<T> {
48    fn drop(&mut self) {
49        let _iter = self.remaining_into_seq_iter();
50    }
51}
52
53impl<T> ConIterVec<T> {
54    pub(super) fn new(vec: Vec<T>) -> Self {
55        let (vec_len, vec_cap, ptr) = (vec.len(), vec.capacity(), vec.as_ptr());
56        let _ = ManuallyDrop::new(vec);
57        Self {
58            ptr,
59            vec_len,
60            vec_cap,
61            counter: 0.into(),
62        }
63    }
64
65    pub(super) fn initial_len(&self) -> usize {
66        self.vec_len
67    }
68
69    fn progress_and_get_begin_idx(&self, number_to_fetch: usize) -> Option<usize> {
70        let begin_idx = self.counter.fetch_add(number_to_fetch, Ordering::Relaxed);
71        match begin_idx < self.vec_len {
72            true => Some(begin_idx),
73            _ => None,
74        }
75    }
76
77    fn remaining_into_seq_iter(&mut self) -> ArrayIntoSeqIter<T, ()> {
78        // # SAFETY
79        // null ptr indicates that the data is already taken out of this iterator
80        // by a consuming method such as `into_seq_iter`
81        match self.ptr.is_null() {
82            true => Default::default(),
83            false => {
84                let num_taken = self.counter.load(Ordering::Acquire).min(self.vec_len);
85                let iter = self.slice_into_seq_iter(num_taken, true);
86                self.ptr = core::ptr::null();
87                iter
88            }
89        }
90    }
91
92    fn slice_into_seq_iter(&self, num_taken: usize, drop_vec: bool) -> ArrayIntoSeqIter<T, ()> {
93        let completed = num_taken == self.vec_len;
94        let (last, current) = match completed {
95            true => (core::ptr::null(), core::ptr::null()),
96            false => {
97                // SAFETY: self.vec_len is positive here, would be completed o/w
98                let last = unsafe { self.ptr.add(self.vec_len - 1) };
99                // SAFETY: first + num_taken is in bounds
100                let current = unsafe { self.ptr.add(num_taken) };
101                (last, current)
102            }
103        };
104
105        let allocation_to_drop = drop_vec.then_some((self.ptr, self.vec_cap));
106
107        ArrayIntoSeqIter::new(current, last, allocation_to_drop, ())
108    }
109}
110
111impl<T> ArrayConIter for ConIterVec<T> {
112    type Item = T;
113
114    fn progress_and_get_chunk_pointers(
115        &self,
116        chunk_size: usize,
117    ) -> Option<ChunkPointers<Self::Item>> {
118        self.progress_and_get_begin_idx(chunk_size)
119            .map(|begin_idx| {
120                let end_idx = (begin_idx + chunk_size).min(self.vec_len).max(begin_idx);
121                let first = unsafe { self.ptr.add(begin_idx) }; // ptr + begin_idx is in bounds
122                let last = unsafe { self.ptr.add(end_idx - 1) }; // ptr + end_idx - 1 is in bounds
123                ChunkPointers {
124                    begin_idx,
125                    first,
126                    last,
127                }
128            })
129    }
130}
131
132impl<T> ConcurrentIter for ConIterVec<T>
133where
134    T: Send,
135{
136    type Item = T;
137
138    type SequentialIter = ArrayIntoSeqIter<T, ()>;
139
140    type ChunkPuller<'i>
141        = ArrayChunkPuller<'i, Self>
142    where
143        Self: 'i;
144
145    fn into_seq_iter(mut self) -> Self::SequentialIter {
146        self.remaining_into_seq_iter()
147    }
148
149    fn skip_to_end(&self) {
150        let current = self.counter.fetch_max(self.vec_len, Ordering::Acquire);
151        let num_taken_before = current.min(self.vec_len);
152        let _iter = self.slice_into_seq_iter(num_taken_before, false);
153    }
154
155    fn next(&self) -> Option<Self::Item> {
156        self.progress_and_get_begin_idx(1) // ptr + idx is in-bounds
157            .map(|idx| unsafe { take(self.ptr.add(idx) as *mut T) })
158    }
159
160    fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
161        self.progress_and_get_begin_idx(1) // ptr + idx is in-bounds
162            .map(|idx| (idx, unsafe { take(self.ptr.add(idx) as *mut T) }))
163    }
164
165    fn size_hint(&self) -> (usize, Option<usize>) {
166        let num_taken = self.counter.load(Ordering::Acquire);
167        let remaining = self.vec_len.saturating_sub(num_taken);
168        (remaining, Some(remaining))
169    }
170
171    fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
172        Self::ChunkPuller::new(self, chunk_size)
173    }
174}
175
176impl<T> ExactSizeConcurrentIter for ConIterVec<T>
177where
178    T: Send,
179{
180    fn len(&self) -> usize {
181        let num_taken = self.counter.load(Ordering::Acquire);
182        self.vec_len.saturating_sub(num_taken)
183    }
184}