append_only_vec/
lib.rs

1//! AppendOnlyVec
2//!
3//! This is a pretty simple type, which is a vector that you can push into, but
4//! cannot modify the elements of.  The data structure never moves an element
5//! once allocated, so you can push to the vec even while holding references to
6//! elements that have already been pushed.
7//!
8//! ### Scaling
9//!
10//! 1. Accessing an element is O(1), but slightly more expensive than for a
11//!    standard `Vec`.
12//!
13//! 2. Pushing a new element amortizes to O(1), but may require allocation of a
14//!    new chunk.
15//!
16//! ### Example
17//!
18//! ```
19//! use append_only_vec::AppendOnlyVec;
20//! static V: AppendOnlyVec<String> = AppendOnlyVec::<String>::new();
21//! let mut threads = Vec::new();
22//! for thread_num in 0..10 {
23//!     threads.push(std::thread::spawn(move || {
24//!          for n in 0..100 {
25//!               let s = format!("thread {} says {}", thread_num, n);
26//!               let which = V.push(s.clone());
27//!               assert_eq!(&V[which], &s);
28//!          }
29//!     }));
30//! }
31//! for t in threads {
32//!    t.join();
33//! }
34//! assert_eq!(V.len(), 1000);
35//! ```
36
37use std::cell::UnsafeCell;
38use std::ops::{Index, IndexMut};
39use std::sync::atomic::AtomicUsize;
40use std::sync::atomic::Ordering;
41pub struct AppendOnlyVec<T> {
42    count: AtomicUsize,
43    reserved: AtomicUsize,
44    data: [UnsafeCell<*mut T>; BITS_USED - 1 - 3],
45}
46
47unsafe impl<T: Send> Send for AppendOnlyVec<T> {}
48unsafe impl<T: Sync + Send> Sync for AppendOnlyVec<T> {}
49
50const BITS: usize = std::mem::size_of::<usize>() * 8;
51
52#[cfg(target_arch = "x86_64")]
53const BITS_USED: usize = 48;
54#[cfg(all(not(target_arch = "x86_64"), target_pointer_width = "64"))]
55const BITS_USED: usize = 64;
56#[cfg(target_pointer_width = "32")]
57const BITS_USED: usize = 32;
58
59// This takes an index into a vec, and determines which data array will hold it
60// (the first return value), and what the index will be into that data array
61// (second return value)
62//
63// The ith data array holds 1<<i values.
64const fn indices(i: usize) -> (u32, usize) {
65    let i = i + 8;
66    let bin = BITS as u32 - 1 - i.leading_zeros();
67    let bin = bin - 3;
68    let offset = i - bin_size(bin);
69    (bin, offset)
70}
71const fn bin_size(array: u32) -> usize {
72    (1 << 3) << array
73}
74
75fn spin_wait(failures: &mut usize) {
76    *failures += 1;
77    if *failures <= 3 {
78        // If there haven't been many failures yet, then we optimistically
79        // spinloop.
80        for _ in 0..(1 << *failures) {
81            std::hint::spin_loop();
82        }
83    } else {
84        // If there have been many failures, then continuing to spinloop will
85        // probably just waste CPU, and whoever we are waiting for has been
86        // preempted then spinning could actively delay completion of the task.
87        // So instead, we cooperatively yield to the OS scheduler.
88        std::thread::yield_now();
89    }
90}
91
92#[test]
93fn test_indices() {
94    for i in 0..32 {
95        println!("{:3}: {} {}", i, indices(i).0, indices(i).1);
96    }
97    let mut array = 0;
98    let mut offset = 0;
99    let mut index = 0;
100    while index < 1000 {
101        index += 1;
102        offset += 1;
103        if offset >= bin_size(array) {
104            offset = 0;
105            array += 1;
106        }
107        assert_eq!(indices(index), (array, offset));
108    }
109}
110
111impl<T> Default for AppendOnlyVec<T> {
112    fn default() -> Self {
113        Self::new()
114    }
115}
116
117impl<T> AppendOnlyVec<T> {
118    /// Return an `Iterator` over the elements of the vec.
119    pub fn iter(&self) -> impl DoubleEndedIterator<Item = &T> + ExactSizeIterator {
120        // FIXME this could be written to be a little more efficient probably,
121        // if we made it read each pointer only once.  On the other hand, that
122        // could make a reversed iterator less efficient?
123        (0..self.len()).map(|i| unsafe { self.get_unchecked(i) })
124    }
125    /// Find the length of the array.
126    #[inline]
127    pub fn len(&self) -> usize {
128        self.count.load(Ordering::Acquire)
129    }
130
131    fn layout(&self, array: u32) -> std::alloc::Layout {
132        std::alloc::Layout::array::<T>(bin_size(array)).unwrap()
133    }
134    /// Internal-only function requests a slot and puts data into it.
135    ///
136    /// However this does not update the size of the vec, which *must* be done
137    /// in order for either the value to be readable *or* for future pushes to
138    /// actually terminate.
139    fn pre_push(&self, val: T) -> usize {
140        let idx = self.reserved.fetch_add(1, Ordering::Relaxed);
141        let (array, offset) = indices(idx);
142        let ptr = if self.len() < 1 + idx - offset {
143            // We are working on a new array, which may not have been allocated...
144            if offset == 0 {
145                // It is our job to allocate the array!  The size of the array
146                // is determined in the self.layout method, which needs to be
147                // consistent with the indices function.
148                let layout = self.layout(array);
149                let ptr = unsafe { std::alloc::alloc(layout) } as *mut T;
150                unsafe {
151                    *self.data[array as usize].get() = ptr;
152                }
153                ptr
154            } else {
155                // We need to wait for the array to be allocated.
156                let mut failures = 0;
157                while self.len() < 1 + idx - offset {
158                    spin_wait(&mut failures);
159                }
160                // The Ordering::Acquire semantics of self.len() ensures that
161                // this pointer read will get the non-null pointer allocated
162                // above.
163                unsafe { *self.data[array as usize].get() }
164            }
165        } else {
166            // The Ordering::Acquire semantics of self.len() ensures that
167            // this pointer read will get the non-null pointer allocated
168            // above.
169            unsafe { *self.data[array as usize].get() }
170        };
171
172        // The contents of this offset are guaranteed to be unused (so far)
173        // because we got the idx from our fetch_add above, and ptr is
174        // guaranteed to be valid because of the loop we used above, which used
175        // self.len() which has Ordering::Acquire semantics.
176        unsafe { (ptr.add(offset)).write(val) };
177        idx
178    }
179    /// Append an element to the array
180    ///
181    /// This is notable in that it doesn't require a `&mut self`, because it
182    /// does appropriate atomic synchronization.
183    ///
184    /// The return value is the index tha was pushed to.
185    pub fn push(&self, val: T) -> usize {
186        let idx = self.pre_push(val);
187
188        // Now we need to increase the size of the vec, so it can get read.  We
189        // use Release upon success, to ensure that the value which we wrote is
190        // visible to any thread that has confirmed that the count is big enough
191        // to read that element.  In case of failure, we can be relaxed, since
192        // we don't do anything with the result other than try again.
193        let mut failures = 0;
194        while self
195            .count
196            .compare_exchange(idx, idx + 1, Ordering::Release, Ordering::Relaxed)
197            .is_err()
198        {
199            // This means that someone else *started* pushing before we started,
200            // but hasn't yet finished.  We have to wait for them to finish
201            // pushing before we can update the count.
202            spin_wait(&mut failures);
203        }
204        idx
205    }
206    /// Extend the vec with the contents of an iterator.
207    ///
208    /// Note: this is currently no more efficient than calling `push` for each
209    /// element of the iterator.
210    pub fn extend(&self, iter: impl IntoIterator<Item = T>) {
211        for val in iter {
212            self.push(val);
213        }
214    }
215    /// Append an element to the array with exclusive access
216    ///
217    /// This is slightly more efficient than [`AppendOnlyVec::push`] since it
218    /// doesn't need to worry about concurrent access.
219    ///
220    /// The return value is the new size of the array.
221    pub fn push_mut(&mut self, val: T) -> usize {
222        let idx = self.pre_push(val);
223        // We do not need synchronization here because no one else has access to
224        // this data, and if it is passed to another thread that will involve
225        // the appropriate memory barrier.
226        self.count.store(idx + 1, Ordering::Relaxed);
227        idx
228    }
229    const EMPTY: UnsafeCell<*mut T> = UnsafeCell::new(std::ptr::null_mut());
230    /// Allocate a new empty array
231    pub const fn new() -> Self {
232        AppendOnlyVec {
233            count: AtomicUsize::new(0),
234            reserved: AtomicUsize::new(0),
235            data: [Self::EMPTY; BITS_USED - 1 - 3],
236        }
237    }
238
239    /// Index the vec without checking the bounds.
240    ///
241    /// To use this correctly, you *must* first ensure that the `idx <
242    /// self.len()`.  This not only prevents overwriting the bounds, but also
243    /// creates the memory barriers to ensure that the data is visible to the
244    /// current thread.  In single-threaded code, however, it is not needed to
245    /// call `self.len()` explicitly (if e.g. you have counted the number of
246    /// elements pushed).
247    unsafe fn get_unchecked(&self, idx: usize) -> &T {
248        let (array, offset) = indices(idx);
249        // We use a Relaxed load of the pointer, because the length check (which
250        // was supposed to be performed) should ensure that the data we want is
251        // already visible, since self.len() used Ordering::Acquire on
252        // `self.count` which synchronizes with the Ordering::Release write in
253        // `self.push`.
254        let ptr = *self.data[array as usize].get();
255        &*ptr.add(offset)
256    }
257
258    /// Convert into a standard `Vec`
259    pub fn into_vec(self) -> Vec<T> {
260        let mut vec = Vec::with_capacity(self.len());
261
262        for idx in 0..self.len() {
263            let (array, offset) = indices(idx);
264            // We use a Relaxed load of the pointer, because the loop above (which
265            // ends before `self.len()`) should ensure that the data we want is
266            // already visible, since it Acquired `self.count` which synchronizes
267            // with the write in `self.push`.
268            let ptr = unsafe { *self.data[array as usize].get() };
269
270            // Copy the element value. The copy remaining in the array must not
271            // be used again (i.e. make sure we do not drop it)
272            let value = unsafe { ptr.add(offset).read() };
273
274            vec.push(value);
275        }
276
277        // Prevent dropping the copied-out values by marking the count as 0 before
278        // our own drop is run
279        self.count.store(0, Ordering::Relaxed);
280
281        vec
282    }
283}
284impl<T> std::fmt::Debug for AppendOnlyVec<T>
285where
286    T: std::fmt::Debug,
287{
288    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
289        f.debug_list().entries(self.iter()).finish()
290    }
291}
292
293impl<T> Index<usize> for AppendOnlyVec<T> {
294    type Output = T;
295
296    fn index(&self, idx: usize) -> &Self::Output {
297        assert!(idx < self.len()); // this includes the required ordering memory barrier
298        let (array, offset) = indices(idx);
299        // The ptr value below is safe, because the length check above will
300        // ensure that the data we want is already visible, since it used
301        // Ordering::Acquire on `self.count` which synchronizes with the
302        // Ordering::Release write in `self.push`.
303        let ptr = unsafe { *self.data[array as usize].get() };
304        unsafe { &*ptr.add(offset) }
305    }
306}
307
308impl<T> IndexMut<usize> for AppendOnlyVec<T> {
309    fn index_mut(&mut self, idx: usize) -> &mut Self::Output {
310        assert!(idx < self.len()); // this includes the required ordering memory barrier
311        let (array, offset) = indices(idx);
312        // The ptr value below is safe, because the length check above will
313        // ensure that the data we want is already visible, since it used
314        // Ordering::Acquire on `self.count` which synchronizes with the
315        // Ordering::Release write in `self.push`.
316        let ptr = unsafe { *self.data[array as usize].get() };
317
318        // `&mut` is safe because there can be no access to data owned by
319        // `self` except via `self`, and we have `&mut` on `self`
320        unsafe { &mut *ptr.add(offset) }
321    }
322}
323
324impl<T> Drop for AppendOnlyVec<T> {
325    fn drop(&mut self) {
326        // First we'll drop all the `T` in a slightly sloppy way.  FIXME this
327        // could be optimized to avoid reloading the `ptr`.
328        for idx in 0..self.len() {
329            let (array, offset) = indices(idx);
330            // We use a Relaxed load of the pointer, because the loop above (which
331            // ends before `self.len()`) should ensure that the data we want is
332            // already visible, since it Acquired `self.count` which synchronizes
333            // with the write in `self.push`.
334            let ptr = unsafe { *self.data[array as usize].get() };
335            unsafe {
336                std::ptr::drop_in_place(ptr.add(offset));
337            }
338        }
339        // Now we will free all the arrays.
340        for array in 0..self.data.len() as u32 {
341            // This load is relaxed because no other thread can have a reference
342            // to Self because we have a &mut self.
343            let ptr = unsafe { *self.data[array as usize].get() };
344            if !ptr.is_null() {
345                let layout = self.layout(array);
346                unsafe { std::alloc::dealloc(ptr as *mut u8, layout) };
347            } else {
348                break;
349            }
350        }
351    }
352}
353
354impl<T> Clone for AppendOnlyVec<T>
355where
356    T: Clone,
357{
358    fn clone(&self) -> Self {
359        // FIXME this could be optimized to avoid reloading pointers.
360        self.iter().cloned().collect()
361    }
362}
363
364/// An `Iterator` for the values contained in the `AppendOnlyVec`
365#[derive(Debug)]
366pub struct IntoIter<T>(std::vec::IntoIter<T>);
367
368impl<T> Iterator for IntoIter<T> {
369    type Item = T;
370
371    fn next(&mut self) -> Option<Self::Item> {
372        self.0.next()
373    }
374
375    fn size_hint(&self) -> (usize, Option<usize>) {
376        self.0.size_hint()
377    }
378}
379
380impl<T> DoubleEndedIterator for IntoIter<T> {
381    fn next_back(&mut self) -> Option<Self::Item> {
382        self.0.next_back()
383    }
384}
385
386impl<T> ExactSizeIterator for IntoIter<T> {
387    fn len(&self) -> usize {
388        self.0.len()
389    }
390}
391
392impl<T> IntoIterator for AppendOnlyVec<T> {
393    type Item = T;
394
395    type IntoIter = IntoIter<T>;
396
397    fn into_iter(self) -> Self::IntoIter {
398        IntoIter(self.into_vec().into_iter())
399    }
400}
401
402impl<T> FromIterator<T> for AppendOnlyVec<T> {
403    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
404        let out = Self::new();
405        for x in iter {
406            let idx = out.pre_push(x);
407            // We can be relaxed here because no one else has access to
408            // this data, and if it is passed to another thread that will involve
409            // the appropriate memory barrier.
410            out.count.store(idx + 1, Ordering::Relaxed);
411        }
412        out
413    }
414}
415
416impl<T> From<Vec<T>> for AppendOnlyVec<T> {
417    fn from(value: Vec<T>) -> Self {
418        value.into_iter().collect()
419    }
420}
421
422#[test]
423fn test_pushing_and_indexing() {
424    let v = AppendOnlyVec::<usize>::new();
425
426    for n in 0..50 {
427        v.push(n);
428        assert_eq!(v.len(), n + 1);
429        for i in 0..(n + 1) {
430            assert_eq!(v[i], i);
431        }
432    }
433
434    let vec: Vec<usize> = v.iter().copied().collect();
435    let ve2: Vec<usize> = (0..50).collect();
436    assert_eq!(vec, ve2);
437}
438
439#[test]
440fn test_parallel_pushing() {
441    use std::sync::Arc;
442    let v = Arc::new(AppendOnlyVec::<u64>::new());
443    let mut threads = Vec::new();
444    const N: u64 = 100;
445    for thread_num in 0..N {
446        let v = v.clone();
447        threads.push(std::thread::spawn(move || {
448            let which1 = v.push(thread_num);
449            let which2 = v.push(thread_num);
450            assert_eq!(v[which1 as usize], thread_num);
451            assert_eq!(v[which2 as usize], thread_num);
452        }));
453    }
454    for t in threads {
455        t.join().ok();
456    }
457    for thread_num in 0..N {
458        assert_eq!(2, v.iter().copied().filter(|&x| x == thread_num).count());
459    }
460}
461
462#[test]
463fn test_into_vec() {
464    struct SafeToDrop(bool);
465
466    impl Drop for SafeToDrop {
467        fn drop(&mut self) {
468            assert!(self.0);
469        }
470    }
471
472    let v = AppendOnlyVec::new();
473
474    for _ in 0..50 {
475        v.push(SafeToDrop(false));
476    }
477
478    let mut v = v.into_vec();
479
480    for i in v.iter_mut() {
481        i.0 = true;
482    }
483}
484
485#[test]
486fn test_push_then_index_mut() {
487    let mut v = AppendOnlyVec::<usize>::new();
488    for i in 0..1024 {
489        v.push(i);
490    }
491    for i in 0..1024 {
492        v[i] += i;
493    }
494    for i in 0..1024 {
495        assert_eq!(v[i], 2 * i);
496    }
497}
498
499#[test]
500fn test_from_vec() {
501    for v in [vec![5_i32, 4, 3, 2, 1], Vec::new(), vec![1]] {
502        let aov: AppendOnlyVec<i32> = v.clone().into();
503        assert_eq!(v, aov.into_vec());
504    }
505}
506
507#[test]
508fn test_clone() {
509    let v = AppendOnlyVec::<String>::new();
510    for i in 0..1024 {
511        v.push(format!("{}", i));
512    }
513    let v2 = v.clone();
514
515    assert_eq!(v.len(), v2.len());
516    for i in 0..1024 {
517        assert_eq!(v[i], v2[i]);
518    }
519}
520
521#[test]
522fn test_push_mut() {
523    let mut v = AppendOnlyVec::new();
524    for i in 0..1024 {
525        v.push_mut(format!("{}", i));
526    }
527    assert_eq!(v.len(), 1024);
528}