orx_concurrent_vec/
vec.rs

1use crate::ConcurrentSlice;
2use crate::elem::ConcurrentElement;
3use crate::{helpers::DefaultPinVec, state::ConcurrentVecState};
4use core::ops::RangeBounds;
5use core::sync::atomic::Ordering;
6use orx_pinned_concurrent_col::PinnedConcurrentCol;
7use orx_pinned_vec::IntoConcurrentPinnedVec;
8
9/// A thread-safe, efficient and lock-free vector allowing concurrent grow, read and update operations.
10///
11/// ConcurrentVec provides safe api for the following three sets of concurrent operations, grow & read & update.
12///
13/// # Examples
14///
15/// ```rust
16/// use orx_concurrent_vec::*;
17/// use std::time::Duration;
18///
19/// #[derive(Debug, Default)]
20/// struct Metric {
21///     sum: i32,
22///     count: i32,
23/// }
24/// impl Metric {
25///     fn aggregate(self, value: &i32) -> Self {
26///         Self {
27///             sum: self.sum + value,
28///             count: self.count + 1,
29///         }
30///     }
31/// }
32///
33/// // record measurements in random intervals, roughly every 2ms
34/// let measurements = ConcurrentVec::new();
35///
36/// // collect metrics every 100 milliseconds
37/// let metrics = ConcurrentVec::new();
38///
39/// std::thread::scope(|s| {
40///     // thread to store measurements as they arrive
41///     s.spawn(|| {
42///         for i in 0..100 {
43///             std::thread::sleep(Duration::from_millis(i % 5));
44///
45///             // collect measurements and push to measurements vec
46///             measurements.push(i as i32);
47///         }
48///     });
49///
50///     // thread to collect metrics every 100 milliseconds
51///     s.spawn(|| {
52///         for _ in 0..10 {
53///             // safely read from measurements vec to compute the metric at that instant
54///             let metric =
55///                 measurements.fold(Metric::default(), |x, value| x.aggregate(value));
56///
57///             // push result to metrics
58///             metrics.push(metric);
59///
60///             std::thread::sleep(Duration::from_millis(100));
61///         }
62///     });
63/// });
64///
65/// let measurements: Vec<_> = measurements.to_vec();
66/// let averages: Vec<_> = metrics.to_vec();
67///
68/// assert_eq!(measurements.len(), 100);
69/// assert_eq!(averages.len(), 10);
70/// ```
71pub struct ConcurrentVec<T, P = DefaultPinVec<T>>
72where
73    P: IntoConcurrentPinnedVec<ConcurrentElement<T>>,
74{
75    pub(crate) core: PinnedConcurrentCol<ConcurrentElement<T>, P::ConPinnedVec, ConcurrentVecState>,
76}
77
78impl<T, P> ConcurrentVec<T, P>
79where
80    P: IntoConcurrentPinnedVec<ConcurrentElement<T>>,
81{
82    /// Consumes the concurrent vec and returns the underlying pinned vector.
83    ///
84    /// Any `PinnedVec` implementation can be converted to a `ConcurrentVec` using the `From` trait.
85    /// Similarly, underlying pinned vector can be obtained by calling the consuming `into_inner` method.
86    pub fn into_inner(self) -> P {
87        let len = self.core.state().len();
88        // # SAFETY: ConcurrentBag only allows to push to the end of the bag, keeping track of the length.
89        // Therefore, the underlying pinned vector is in a valid condition at any given time.
90        unsafe { self.core.into_inner(len) }
91    }
92
93    /// Returns the number of elements which are pushed to the vec,
94    /// including the elements which received their reserved locations and are currently being pushed.
95    #[inline(always)]
96    pub(crate) fn reserved_len(&self) -> usize {
97        let len = self.core.state().len();
98        let cap = self.core.capacity();
99        match len <= cap {
100            true => len,
101            false => cap,
102        }
103    }
104
105    /// Returns the number of elements which are pushed to the vec,
106    /// excluding the elements which received their reserved locations and are currently being pushed.
107    ///
108    /// # Examples
109    ///
110    /// ```rust
111    /// use orx_concurrent_vec::ConcurrentVec;
112    ///
113    /// let vec = ConcurrentVec::new();
114    /// vec.push('a');
115    /// vec.push('b');
116    ///
117    /// assert_eq!(2, vec.len());
118    /// ```
119    pub fn len(&self) -> usize {
120        let len = self.len_written().load(Ordering::Acquire);
121        let cap = self.capacity();
122        let len_reserved = self.len_reserved().load(Ordering::Relaxed);
123        let until = match len_reserved <= cap {
124            true => len_reserved,
125            false => cap,
126        };
127
128        let iter = unsafe { self.core.iter_over_range(len..until) };
129        let mut num_pushed = 0;
130        for x in iter {
131            match x.0.is_some_with_order(Ordering::Relaxed) {
132                true => num_pushed += 1,
133                false => break,
134            }
135        }
136        let new_len = len + num_pushed;
137
138        self.len_written().fetch_max(new_len, Ordering::Release);
139
140        new_len
141    }
142
143    /// Returns whether or not the bag is empty.
144    ///
145    /// # Examples
146    ///
147    /// ```rust
148    /// use orx_concurrent_vec::ConcurrentVec;
149    ///
150    /// let mut vec = ConcurrentVec::new();
151    ///
152    /// assert!(vec.is_empty());
153    ///
154    /// vec.push('a');
155    /// vec.push('b');
156    ///
157    /// assert!(!vec.is_empty());
158    ///
159    /// vec.clear();
160    /// assert!(vec.is_empty());
161    /// ```
162    #[inline(always)]
163    pub fn is_empty(&self) -> bool {
164        self.len() == 0
165    }
166
167    /// Returns the current allocated capacity of the collection.
168    pub fn capacity(&self) -> usize {
169        self.core.capacity()
170    }
171
172    /// Returns maximum possible capacity that the collection can reach without calling [`ConcurrentVec::reserve_maximum_capacity`].
173    ///
174    /// Importantly note that maximum capacity does not correspond to the allocated memory.
175    pub fn maximum_capacity(&self) -> usize {
176        self.core.maximum_capacity()
177    }
178
179    /// Creates and returns a slice of a `ConcurrentVec` or another `ConcurrentSlice`.
180    ///
181    /// Concurrent counterpart of a slice for a standard vec or an array.
182    ///
183    /// A `ConcurrentSlice` provides a focused / restricted view on a slice of the vector.
184    /// It provides all methods of the concurrent vector except for the ones which
185    /// grow the size of the vector.
186    ///
187    /// # Examples
188    ///
189    /// ```rust
190    /// use orx_concurrent_vec::*;
191    ///
192    /// let vec = ConcurrentVec::from_iter([0, 1, 2, 3, 4]);
193    ///
194    /// let slice = vec.slice(1..);
195    /// assert_eq!(&slice, &[1, 2, 3, 4]);
196    ///
197    /// let slice = vec.slice(1..4);
198    /// assert_eq!(&slice, &[1, 2, 3]);
199    ///
200    /// let slice = vec.slice(..3);
201    /// assert_eq!(&slice, &[0, 1, 2]);
202    ///
203    /// let slice = vec.slice(3..10);
204    /// assert_eq!(&slice, &[3, 4]);
205    ///
206    /// let slice = vec.slice(7..9);
207    /// assert_eq!(&slice, &[]);
208    ///
209    /// // slices can also be sliced
210    ///
211    /// let slice = vec.slice(1..=4);
212    /// assert_eq!(&slice, &[1, 2, 3, 4]);
213    ///
214    /// let sub_slice = slice.slice(1..3);
215    /// assert_eq!(&sub_slice, &[2, 3]);
216    /// ```
217    pub fn slice<R: RangeBounds<usize>>(&self, range: R) -> ConcurrentSlice<T, P> {
218        let [a, b] = orx_pinned_vec::utils::slice::vec_range_limits(&range, Some(self.len()));
219        let len = b - a;
220        ConcurrentSlice::new(self, a, len)
221    }
222
223    /// Creates and returns a slice of all elements of the vec.
224    ///
225    /// Note that `vec.as_slice()` is equivalent to `vec.slice(..)`.
226    ///
227    /// A `ConcurrentSlice` provides a focused / restricted view on a slice of the vector.
228    /// It provides all methods of the concurrent vector except for the ones which
229    /// grow the size of the vector.
230    ///
231    /// # Examples
232    ///
233    /// ```rust
234    /// use orx_concurrent_vec::*;
235    ///
236    /// let vec = ConcurrentVec::from_iter([0, 1, 2, 3, 4]);
237    ///
238    /// let slice = vec.as_slice();
239    /// assert_eq!(&slice, &[0, 1, 2, 3, 4]);
240    /// ```
241    pub fn as_slice(&self) -> ConcurrentSlice<T, P> {
242        self.slice(0..self.len())
243    }
244
245    /// Returns the element at the `i`-th position;
246    /// returns None if the index is out of bounds.
247    ///
248    /// The safe api of the `ConcurrentVec` never gives out `&T` or `&mut T` references.
249    /// Instead, returns a [`ConcurrentElement`] which provides thread safe concurrent read and write
250    /// methods on the element.
251    ///
252    /// # Examples
253    ///
254    /// ```rust
255    /// use orx_concurrent_vec::*;
256    ///
257    /// let vec = ConcurrentVec::from_iter([0, 1, 2, 3]);
258    ///
259    /// assert!(vec.get(4).is_none());
260    ///
261    /// let cloned = vec.get(2).map(|elem| elem.cloned());
262    /// assert_eq!(cloned, Some(2));
263    ///
264    /// let double = vec.get(2).map(|elem| elem.map(|x| x * 2));
265    /// assert_eq!(double, Some(4));
266    ///
267    /// let elem = vec.get(2).unwrap();
268    /// assert_eq!(elem, &2);
269    ///
270    /// elem.set(42);
271    /// assert_eq!(elem, &42);
272    ///
273    /// elem.update(|x| *x = *x / 2);
274    /// assert_eq!(elem, &21);
275    ///
276    /// let old = elem.replace(7);
277    /// assert_eq!(old, 21);
278    /// assert_eq!(elem, &7);
279    ///
280    /// assert_eq!(&vec, &[0, 1, 7, 3]);
281    /// ```
282    #[inline(always)]
283    pub fn get(&self, i: usize) -> Option<&ConcurrentElement<T>> {
284        match i < self.len() {
285            true => unsafe { self.core.get(i) },
286            false => None,
287        }
288    }
289
290    /// Returns the cloned value of element at the `i`-th position;
291    /// returns None if the index is out of bounds.
292    ///
293    /// Note that `vec.get_cloned(i)` is short-hand for `vec.get(i).map(|elem| elem.cloned())`.
294    ///
295    /// # Examples
296    ///
297    /// ```rust
298    /// use orx_concurrent_vec::*;
299    ///
300    /// let vec = ConcurrentVec::from_iter([0, 1, 2, 3]);
301    ///
302    /// assert_eq!(vec.get_cloned(2), Some(2));
303    /// assert_eq!(vec.get_cloned(4), None);
304    /// ```
305    #[inline(always)]
306    pub fn get_cloned(&self, i: usize) -> Option<T>
307    where
308        T: Clone,
309    {
310        match i < self.reserved_len() {
311            true => unsafe { self.core.get(i) }.and_then(|elem| elem.0.clone_into_option()),
312            false => None,
313        }
314    }
315
316    /// Returns the copied value of element at the `i`-th position;
317    /// returns None if the index is out of bounds.
318    ///
319    /// Note that `vec.get_copied(i)` is short-hand for `vec.get(i).map(|elem| elem.copied())`.
320    ///
321    /// # Examples
322    ///
323    /// ```rust
324    /// use orx_concurrent_vec::*;
325    ///
326    /// let vec = ConcurrentVec::from_iter([0, 1, 2, 3]);
327    ///
328    /// assert_eq!(vec.get_copied(2), Some(2));
329    /// assert_eq!(vec.get_copied(4), None);
330    /// ```
331    #[inline(always)]
332    pub fn get_copied(&self, i: usize) -> Option<T>
333    where
334        T: Copy,
335    {
336        self.get_cloned(i)
337    }
338
339    /// Returns an iterator to the elements of the vec.
340    ///
341    /// The safe api of the `ConcurrentVec` never gives out `&T` or `&mut T` references.
342    /// Instead, the iterator yields [`ConcurrentElement`] which provides thread safe concurrent read and write
343    /// methods on the element.
344    ///
345    /// # Examples
346    ///
347    /// ```
348    /// use orx_concurrent_vec::*;
349    ///
350    /// let vec = ConcurrentVec::from_iter([0, 1, 2, 3]);
351    ///
352    /// // read - map
353    ///
354    /// let doubles: Vec<_> = vec.iter().map(|elem| elem.map(|x| x * 2)).collect();
355    /// assert_eq!(doubles, [0, 2, 4, 6]);
356    ///
357    /// // read - reduce
358    ///
359    /// let sum: i32 = vec.iter().map(|elem| elem.cloned()).sum();
360    /// assert_eq!(sum, 6);
361    ///
362    /// // mutate
363    ///
364    /// for (i, elem) in vec.iter().enumerate() {
365    ///     match i {
366    ///         2 => elem.set(42),
367    ///         _ => elem.update(|x| *x *= 2),
368    ///     }
369    /// }
370    /// assert_eq!(&vec, &[0, 2, 42, 6]);
371    ///
372    /// let old_vals: Vec<_> = vec.iter().map(|elem| elem.replace(7)).collect();
373    /// assert_eq!(&old_vals, &[0, 2, 42, 6]);
374    /// assert_eq!(&vec, &[7, 7, 7, 7]);
375    /// ```
376    pub fn iter(&self) -> impl Iterator<Item = &ConcurrentElement<T>> {
377        unsafe { self.core.iter(self.len()) }
378    }
379
380    /// Returns an iterator to cloned values of the elements of the vec.
381    ///
382    /// Note that `vec.iter_cloned()` is short-hand for `vec.iter().map(|elem| elem.cloned())`.
383    ///
384    /// # Examples
385    ///
386    /// ```
387    /// use orx_concurrent_vec::*;
388    ///
389    /// let vec = ConcurrentVec::new();
390    /// vec.extend([42, 7]);
391    ///
392    /// let mut iter = vec.iter_cloned();
393    ///
394    /// assert_eq!(iter.next(), Some(42));
395    /// assert_eq!(iter.next(), Some(7));
396    /// assert_eq!(iter.next(), None);
397    ///
398    /// let sum: i32 = vec.iter_cloned().sum();
399    /// assert_eq!(sum, 49);
400    /// ```
401    pub fn iter_cloned(&self) -> impl Iterator<Item = T> + '_
402    where
403        T: Clone,
404    {
405        unsafe { self.core.iter(self.len()) }.map(|elem| elem.cloned())
406    }
407}
408
409// HELPERS
410
411impl<T, P> ConcurrentVec<T, P>
412where
413    P: IntoConcurrentPinnedVec<ConcurrentElement<T>>,
414{
415    pub(crate) fn new_from_pinned(pinned_vec: P) -> Self {
416        Self {
417            core: PinnedConcurrentCol::new_from_pinned(pinned_vec),
418        }
419    }
420}
421
422unsafe impl<T: Sync, P: IntoConcurrentPinnedVec<ConcurrentElement<T>>> Sync
423    for ConcurrentVec<T, P>
424{
425}
426
427unsafe impl<T: Send, P: IntoConcurrentPinnedVec<ConcurrentElement<T>>> Send
428    for ConcurrentVec<T, P>
429{
430}