orx_concurrent_vec/vec.rs
1use crate::ConcurrentSlice;
2use crate::elem::ConcurrentElement;
3use crate::{helpers::DefaultPinVec, state::ConcurrentVecState};
4use core::ops::RangeBounds;
5use core::sync::atomic::Ordering;
6use orx_pinned_concurrent_col::PinnedConcurrentCol;
7use orx_pinned_vec::IntoConcurrentPinnedVec;
8
9/// A thread-safe, efficient and lock-free vector allowing concurrent grow, read and update operations.
10///
11/// ConcurrentVec provides safe api for the following three sets of concurrent operations, grow & read & update.
12///
13/// # Examples
14///
15/// ```rust
16/// use orx_concurrent_vec::*;
17/// use std::time::Duration;
18///
19/// #[derive(Debug, Default)]
20/// struct Metric {
21/// sum: i32,
22/// count: i32,
23/// }
24/// impl Metric {
25/// fn aggregate(self, value: &i32) -> Self {
26/// Self {
27/// sum: self.sum + value,
28/// count: self.count + 1,
29/// }
30/// }
31/// }
32///
33/// // record measurements in random intervals, roughly every 2ms
34/// let measurements = ConcurrentVec::new();
35///
36/// // collect metrics every 100 milliseconds
37/// let metrics = ConcurrentVec::new();
38///
39/// std::thread::scope(|s| {
40/// // thread to store measurements as they arrive
41/// s.spawn(|| {
42/// for i in 0..100 {
43/// std::thread::sleep(Duration::from_millis(i % 5));
44///
45/// // collect measurements and push to measurements vec
46/// measurements.push(i as i32);
47/// }
48/// });
49///
50/// // thread to collect metrics every 100 milliseconds
51/// s.spawn(|| {
52/// for _ in 0..10 {
53/// // safely read from measurements vec to compute the metric at that instant
54/// let metric =
55/// measurements.fold(Metric::default(), |x, value| x.aggregate(value));
56///
57/// // push result to metrics
58/// metrics.push(metric);
59///
60/// std::thread::sleep(Duration::from_millis(100));
61/// }
62/// });
63/// });
64///
65/// let measurements: Vec<_> = measurements.to_vec();
66/// let averages: Vec<_> = metrics.to_vec();
67///
68/// assert_eq!(measurements.len(), 100);
69/// assert_eq!(averages.len(), 10);
70/// ```
71pub struct ConcurrentVec<T, P = DefaultPinVec<T>>
72where
73 P: IntoConcurrentPinnedVec<ConcurrentElement<T>>,
74{
75 pub(crate) core: PinnedConcurrentCol<ConcurrentElement<T>, P::ConPinnedVec, ConcurrentVecState>,
76}
77
78impl<T, P> ConcurrentVec<T, P>
79where
80 P: IntoConcurrentPinnedVec<ConcurrentElement<T>>,
81{
82 /// Consumes the concurrent vec and returns the underlying pinned vector.
83 ///
84 /// Any `PinnedVec` implementation can be converted to a `ConcurrentVec` using the `From` trait.
85 /// Similarly, underlying pinned vector can be obtained by calling the consuming `into_inner` method.
86 pub fn into_inner(self) -> P {
87 let len = self.core.state().len();
88 // # SAFETY: ConcurrentBag only allows to push to the end of the bag, keeping track of the length.
89 // Therefore, the underlying pinned vector is in a valid condition at any given time.
90 unsafe { self.core.into_inner(len) }
91 }
92
93 /// Returns the number of elements which are pushed to the vec,
94 /// including the elements which received their reserved locations and are currently being pushed.
95 #[inline(always)]
96 pub(crate) fn reserved_len(&self) -> usize {
97 let len = self.core.state().len();
98 let cap = self.core.capacity();
99 match len <= cap {
100 true => len,
101 false => cap,
102 }
103 }
104
105 /// Returns the number of elements which are pushed to the vec,
106 /// excluding the elements which received their reserved locations and are currently being pushed.
107 ///
108 /// # Examples
109 ///
110 /// ```rust
111 /// use orx_concurrent_vec::ConcurrentVec;
112 ///
113 /// let vec = ConcurrentVec::new();
114 /// vec.push('a');
115 /// vec.push('b');
116 ///
117 /// assert_eq!(2, vec.len());
118 /// ```
119 pub fn len(&self) -> usize {
120 let len = self.len_written().load(Ordering::Acquire);
121 let cap = self.capacity();
122 let len_reserved = self.len_reserved().load(Ordering::Relaxed);
123 let until = match len_reserved <= cap {
124 true => len_reserved,
125 false => cap,
126 };
127
128 let iter = unsafe { self.core.iter_over_range(len..until) };
129 let mut num_pushed = 0;
130 for x in iter {
131 match x.0.is_some_with_order(Ordering::Relaxed) {
132 true => num_pushed += 1,
133 false => break,
134 }
135 }
136 let new_len = len + num_pushed;
137
138 self.len_written().fetch_max(new_len, Ordering::Release);
139
140 new_len
141 }
142
143 /// Returns whether or not the bag is empty.
144 ///
145 /// # Examples
146 ///
147 /// ```rust
148 /// use orx_concurrent_vec::ConcurrentVec;
149 ///
150 /// let mut vec = ConcurrentVec::new();
151 ///
152 /// assert!(vec.is_empty());
153 ///
154 /// vec.push('a');
155 /// vec.push('b');
156 ///
157 /// assert!(!vec.is_empty());
158 ///
159 /// vec.clear();
160 /// assert!(vec.is_empty());
161 /// ```
162 #[inline(always)]
163 pub fn is_empty(&self) -> bool {
164 self.len() == 0
165 }
166
167 /// Returns the current allocated capacity of the collection.
168 pub fn capacity(&self) -> usize {
169 self.core.capacity()
170 }
171
172 /// Returns maximum possible capacity that the collection can reach without calling [`ConcurrentVec::reserve_maximum_capacity`].
173 ///
174 /// Importantly note that maximum capacity does not correspond to the allocated memory.
175 pub fn maximum_capacity(&self) -> usize {
176 self.core.maximum_capacity()
177 }
178
179 /// Creates and returns a slice of a `ConcurrentVec` or another `ConcurrentSlice`.
180 ///
181 /// Concurrent counterpart of a slice for a standard vec or an array.
182 ///
183 /// A `ConcurrentSlice` provides a focused / restricted view on a slice of the vector.
184 /// It provides all methods of the concurrent vector except for the ones which
185 /// grow the size of the vector.
186 ///
187 /// # Examples
188 ///
189 /// ```rust
190 /// use orx_concurrent_vec::*;
191 ///
192 /// let vec = ConcurrentVec::from_iter([0, 1, 2, 3, 4]);
193 ///
194 /// let slice = vec.slice(1..);
195 /// assert_eq!(&slice, &[1, 2, 3, 4]);
196 ///
197 /// let slice = vec.slice(1..4);
198 /// assert_eq!(&slice, &[1, 2, 3]);
199 ///
200 /// let slice = vec.slice(..3);
201 /// assert_eq!(&slice, &[0, 1, 2]);
202 ///
203 /// let slice = vec.slice(3..10);
204 /// assert_eq!(&slice, &[3, 4]);
205 ///
206 /// let slice = vec.slice(7..9);
207 /// assert_eq!(&slice, &[]);
208 ///
209 /// // slices can also be sliced
210 ///
211 /// let slice = vec.slice(1..=4);
212 /// assert_eq!(&slice, &[1, 2, 3, 4]);
213 ///
214 /// let sub_slice = slice.slice(1..3);
215 /// assert_eq!(&sub_slice, &[2, 3]);
216 /// ```
217 pub fn slice<R: RangeBounds<usize>>(&self, range: R) -> ConcurrentSlice<T, P> {
218 let [a, b] = orx_pinned_vec::utils::slice::vec_range_limits(&range, Some(self.len()));
219 let len = b - a;
220 ConcurrentSlice::new(self, a, len)
221 }
222
223 /// Creates and returns a slice of all elements of the vec.
224 ///
225 /// Note that `vec.as_slice()` is equivalent to `vec.slice(..)`.
226 ///
227 /// A `ConcurrentSlice` provides a focused / restricted view on a slice of the vector.
228 /// It provides all methods of the concurrent vector except for the ones which
229 /// grow the size of the vector.
230 ///
231 /// # Examples
232 ///
233 /// ```rust
234 /// use orx_concurrent_vec::*;
235 ///
236 /// let vec = ConcurrentVec::from_iter([0, 1, 2, 3, 4]);
237 ///
238 /// let slice = vec.as_slice();
239 /// assert_eq!(&slice, &[0, 1, 2, 3, 4]);
240 /// ```
241 pub fn as_slice(&self) -> ConcurrentSlice<T, P> {
242 self.slice(0..self.len())
243 }
244
245 /// Returns the element at the `i`-th position;
246 /// returns None if the index is out of bounds.
247 ///
248 /// The safe api of the `ConcurrentVec` never gives out `&T` or `&mut T` references.
249 /// Instead, returns a [`ConcurrentElement`] which provides thread safe concurrent read and write
250 /// methods on the element.
251 ///
252 /// # Examples
253 ///
254 /// ```rust
255 /// use orx_concurrent_vec::*;
256 ///
257 /// let vec = ConcurrentVec::from_iter([0, 1, 2, 3]);
258 ///
259 /// assert!(vec.get(4).is_none());
260 ///
261 /// let cloned = vec.get(2).map(|elem| elem.cloned());
262 /// assert_eq!(cloned, Some(2));
263 ///
264 /// let double = vec.get(2).map(|elem| elem.map(|x| x * 2));
265 /// assert_eq!(double, Some(4));
266 ///
267 /// let elem = vec.get(2).unwrap();
268 /// assert_eq!(elem, &2);
269 ///
270 /// elem.set(42);
271 /// assert_eq!(elem, &42);
272 ///
273 /// elem.update(|x| *x = *x / 2);
274 /// assert_eq!(elem, &21);
275 ///
276 /// let old = elem.replace(7);
277 /// assert_eq!(old, 21);
278 /// assert_eq!(elem, &7);
279 ///
280 /// assert_eq!(&vec, &[0, 1, 7, 3]);
281 /// ```
282 #[inline(always)]
283 pub fn get(&self, i: usize) -> Option<&ConcurrentElement<T>> {
284 match i < self.len() {
285 true => unsafe { self.core.get(i) },
286 false => None,
287 }
288 }
289
290 /// Returns the cloned value of element at the `i`-th position;
291 /// returns None if the index is out of bounds.
292 ///
293 /// Note that `vec.get_cloned(i)` is short-hand for `vec.get(i).map(|elem| elem.cloned())`.
294 ///
295 /// # Examples
296 ///
297 /// ```rust
298 /// use orx_concurrent_vec::*;
299 ///
300 /// let vec = ConcurrentVec::from_iter([0, 1, 2, 3]);
301 ///
302 /// assert_eq!(vec.get_cloned(2), Some(2));
303 /// assert_eq!(vec.get_cloned(4), None);
304 /// ```
305 #[inline(always)]
306 pub fn get_cloned(&self, i: usize) -> Option<T>
307 where
308 T: Clone,
309 {
310 match i < self.reserved_len() {
311 true => unsafe { self.core.get(i) }.and_then(|elem| elem.0.clone_into_option()),
312 false => None,
313 }
314 }
315
316 /// Returns the copied value of element at the `i`-th position;
317 /// returns None if the index is out of bounds.
318 ///
319 /// Note that `vec.get_copied(i)` is short-hand for `vec.get(i).map(|elem| elem.copied())`.
320 ///
321 /// # Examples
322 ///
323 /// ```rust
324 /// use orx_concurrent_vec::*;
325 ///
326 /// let vec = ConcurrentVec::from_iter([0, 1, 2, 3]);
327 ///
328 /// assert_eq!(vec.get_copied(2), Some(2));
329 /// assert_eq!(vec.get_copied(4), None);
330 /// ```
331 #[inline(always)]
332 pub fn get_copied(&self, i: usize) -> Option<T>
333 where
334 T: Copy,
335 {
336 self.get_cloned(i)
337 }
338
339 /// Returns an iterator to the elements of the vec.
340 ///
341 /// The safe api of the `ConcurrentVec` never gives out `&T` or `&mut T` references.
342 /// Instead, the iterator yields [`ConcurrentElement`] which provides thread safe concurrent read and write
343 /// methods on the element.
344 ///
345 /// # Examples
346 ///
347 /// ```
348 /// use orx_concurrent_vec::*;
349 ///
350 /// let vec = ConcurrentVec::from_iter([0, 1, 2, 3]);
351 ///
352 /// // read - map
353 ///
354 /// let doubles: Vec<_> = vec.iter().map(|elem| elem.map(|x| x * 2)).collect();
355 /// assert_eq!(doubles, [0, 2, 4, 6]);
356 ///
357 /// // read - reduce
358 ///
359 /// let sum: i32 = vec.iter().map(|elem| elem.cloned()).sum();
360 /// assert_eq!(sum, 6);
361 ///
362 /// // mutate
363 ///
364 /// for (i, elem) in vec.iter().enumerate() {
365 /// match i {
366 /// 2 => elem.set(42),
367 /// _ => elem.update(|x| *x *= 2),
368 /// }
369 /// }
370 /// assert_eq!(&vec, &[0, 2, 42, 6]);
371 ///
372 /// let old_vals: Vec<_> = vec.iter().map(|elem| elem.replace(7)).collect();
373 /// assert_eq!(&old_vals, &[0, 2, 42, 6]);
374 /// assert_eq!(&vec, &[7, 7, 7, 7]);
375 /// ```
376 pub fn iter(&self) -> impl Iterator<Item = &ConcurrentElement<T>> {
377 unsafe { self.core.iter(self.len()) }
378 }
379
380 /// Returns an iterator to cloned values of the elements of the vec.
381 ///
382 /// Note that `vec.iter_cloned()` is short-hand for `vec.iter().map(|elem| elem.cloned())`.
383 ///
384 /// # Examples
385 ///
386 /// ```
387 /// use orx_concurrent_vec::*;
388 ///
389 /// let vec = ConcurrentVec::new();
390 /// vec.extend([42, 7]);
391 ///
392 /// let mut iter = vec.iter_cloned();
393 ///
394 /// assert_eq!(iter.next(), Some(42));
395 /// assert_eq!(iter.next(), Some(7));
396 /// assert_eq!(iter.next(), None);
397 ///
398 /// let sum: i32 = vec.iter_cloned().sum();
399 /// assert_eq!(sum, 49);
400 /// ```
401 pub fn iter_cloned(&self) -> impl Iterator<Item = T> + '_
402 where
403 T: Clone,
404 {
405 unsafe { self.core.iter(self.len()) }.map(|elem| elem.cloned())
406 }
407}
408
409// HELPERS
410
411impl<T, P> ConcurrentVec<T, P>
412where
413 P: IntoConcurrentPinnedVec<ConcurrentElement<T>>,
414{
415 pub(crate) fn new_from_pinned(pinned_vec: P) -> Self {
416 Self {
417 core: PinnedConcurrentCol::new_from_pinned(pinned_vec),
418 }
419 }
420}
421
422unsafe impl<T: Sync, P: IntoConcurrentPinnedVec<ConcurrentElement<T>>> Sync
423 for ConcurrentVec<T, P>
424{
425}
426
427unsafe impl<T: Send, P: IntoConcurrentPinnedVec<ConcurrentElement<T>>> Send
428 for ConcurrentVec<T, P>
429{
430}