append_only_vec/
lib.rs

1//! AppendOnlyVec
2//!
3//! This is a pretty simple type, which is a vector that you can push into, but
4//! cannot modify the elements of.  The data structure never moves an element
5//! once allocated, so you can push to the vec even while holding references to
6//! elements that have already been pushed.
7//!
8//! ### Scaling
9//!
10//! 1. Accessing an element is O(1), but slightly more expensive than for a
11//!    standard `Vec`.
12//!
13//! 2. Pushing a new element amortizes to O(1), but may require allocation of a
14//!    new chunk.
15//!
16//! ### Example
17//!
18//! ```
19//! use append_only_vec::AppendOnlyVec;
20//! static V: AppendOnlyVec<String> = AppendOnlyVec::<String>::new();
21//! let mut threads = Vec::new();
22//! for thread_num in 0..10 {
23//!     threads.push(std::thread::spawn(move || {
24//!          for n in 0..100 {
25//!               let s = format!("thread {} says {}", thread_num, n);
26//!               let which = V.push(s.clone());
27//!               assert_eq!(&V[which], &s);
28//!          }
29//!     }));
30//! }
31//! for t in threads {
32//!    t.join();
33//! }
34//! assert_eq!(V.len(), 1000);
35//! ```
36
37use std::cell::UnsafeCell;
38use std::ops::{Index, IndexMut};
39use std::sync::atomic::AtomicUsize;
40use std::sync::atomic::Ordering;
41pub struct AppendOnlyVec<T> {
42    count: AtomicUsize,
43    reserved: AtomicUsize,
44    data: [UnsafeCell<*mut T>; BITS_USED - 1 - 3],
45}
46
47unsafe impl<T: Send> Send for AppendOnlyVec<T> {}
48unsafe impl<T: Sync + Send> Sync for AppendOnlyVec<T> {}
49
50const BITS: usize = std::mem::size_of::<usize>() * 8;
51
52#[cfg(target_arch = "x86_64")]
53const BITS_USED: usize = 48;
54#[cfg(all(not(target_arch = "x86_64"), target_pointer_width = "64"))]
55const BITS_USED: usize = 64;
56#[cfg(target_pointer_width = "32")]
57const BITS_USED: usize = 32;
58
59// This takes an index into a vec, and determines which data array will hold it
60// (the first return value), and what the index will be into that data array
61// (second return value)
62//
63// The ith data array holds 1<<i values.
64const fn indices(i: usize) -> (u32, usize) {
65    let i = i + 8;
66    let bin = BITS as u32 - 1 - i.leading_zeros();
67    let bin = bin - 3;
68    let offset = i - bin_size(bin);
69    (bin, offset)
70}
71const fn bin_size(array: u32) -> usize {
72    (1 << 3) << array
73}
74
75#[test]
76fn test_indices() {
77    for i in 0..32 {
78        println!("{:3}: {} {}", i, indices(i).0, indices(i).1);
79    }
80    let mut array = 0;
81    let mut offset = 0;
82    let mut index = 0;
83    while index < 1000 {
84        index += 1;
85        offset += 1;
86        if offset >= bin_size(array) {
87            offset = 0;
88            array += 1;
89        }
90        assert_eq!(indices(index), (array, offset));
91    }
92}
93
94impl<T> Default for AppendOnlyVec<T> {
95    fn default() -> Self {
96        Self::new()
97    }
98}
99
100impl<T> AppendOnlyVec<T> {
101    /// Return an `Iterator` over the elements of the vec.
102    pub fn iter(&self) -> impl DoubleEndedIterator<Item = &T> + ExactSizeIterator {
103        // FIXME this could be written to be a little more efficient probably,
104        // if we made it read each pointer only once.  On the other hand, that
105        // could make a reversed iterator less efficient?
106        (0..self.len()).map(|i| unsafe { self.get_unchecked(i) })
107    }
108    /// Find the length of the array.
109    #[inline]
110    pub fn len(&self) -> usize {
111        self.count.load(Ordering::Acquire)
112    }
113
114    fn layout(&self, array: u32) -> std::alloc::Layout {
115        std::alloc::Layout::array::<T>(bin_size(array)).unwrap()
116    }
117    /// Internal-only function requests a slot and puts data into it.
118    ///
119    /// However this does not update the size of the vec, which *must* be done
120    /// in order for either the value to be readable *or* for future pushes to
121    /// actually terminate.
122    fn pre_push(&self, val: T) -> usize {
123        let idx = self.reserved.fetch_add(1, Ordering::Relaxed);
124        let (array, offset) = indices(idx);
125        let ptr = if self.len() < 1 + idx - offset {
126            // We are working on a new array, which may not have been allocated...
127            if offset == 0 {
128                // It is our job to allocate the array!  The size of the array
129                // is determined in the self.layout method, which needs to be
130                // consistent with the indices function.
131                let layout = self.layout(array);
132                let ptr = unsafe { std::alloc::alloc(layout) } as *mut T;
133                unsafe {
134                    *self.data[array as usize].get() = ptr;
135                }
136                ptr
137            } else {
138                // We need to wait for the array to be allocated.
139                while self.len() < 1 + idx - offset {
140                    std::hint::spin_loop();
141                }
142                // The Ordering::Acquire semantics of self.len() ensures that
143                // this pointer read will get the non-null pointer allocated
144                // above.
145                unsafe { *self.data[array as usize].get() }
146            }
147        } else {
148            // The Ordering::Acquire semantics of self.len() ensures that
149            // this pointer read will get the non-null pointer allocated
150            // above.
151            unsafe { *self.data[array as usize].get() }
152        };
153
154        // The contents of this offset are guaranteed to be unused (so far)
155        // because we got the idx from our fetch_add above, and ptr is
156        // guaranteed to be valid because of the loop we used above, which used
157        // self.len() which has Ordering::Acquire semantics.
158        unsafe { (ptr.add(offset)).write(val) };
159        idx
160    }
161    /// Append an element to the array
162    ///
163    /// This is notable in that it doesn't require a `&mut self`, because it
164    /// does appropriate atomic synchronization.
165    ///
166    /// The return value is the index tha was pushed to.
167    pub fn push(&self, val: T) -> usize {
168        let idx = self.pre_push(val);
169
170        // Now we need to increase the size of the vec, so it can get read.  We
171        // use Release upon success, to ensure that the value which we wrote is
172        // visible to any thread that has confirmed that the count is big enough
173        // to read that element.  In case of failure, we can be relaxed, since
174        // we don't do anything with the result other than try again.
175        while self
176            .count
177            .compare_exchange(idx, idx + 1, Ordering::Release, Ordering::Relaxed)
178            .is_err()
179        {
180            // This means that someone else *started* pushing before we started,
181            // but hasn't yet finished.  We have to wait for them to finish
182            // pushing before we can update the count.  Note that using a
183            // spinloop here isn't really ideal, but except when allocating a
184            // new array, the window between reserving space and using it is
185            // pretty small, so contention will hopefully be rare, and having a
186            // context switch during that interval will hopefully be vanishingly
187            // unlikely.
188            std::hint::spin_loop();
189        }
190        idx
191    }
192    /// Extend the vec with the contents of an iterator.
193    ///
194    /// Note: this is currently no more efficient than calling `push` for each
195    /// element of the iterator.
196    pub fn extend(&self, iter: impl IntoIterator<Item = T>) {
197        for val in iter {
198            self.push(val);
199        }
200    }
201    /// Append an element to the array with exclusive access
202    ///
203    /// This is slightly more efficient than [`AppendOnlyVec::push`] since it
204    /// doesn't need to worry about concurrent access.
205    ///
206    /// The return value is the new size of the array.
207    pub fn push_mut(&mut self, val: T) -> usize {
208        let idx = self.pre_push(val);
209        // We do not need synchronization here because no one else has access to
210        // this data, and if it is passed to another thread that will involve
211        // the appropriate memory barrier.
212        self.count.store(idx, Ordering::Relaxed);
213        idx
214    }
215    const EMPTY: UnsafeCell<*mut T> = UnsafeCell::new(std::ptr::null_mut());
216    /// Allocate a new empty array
217    pub const fn new() -> Self {
218        AppendOnlyVec {
219            count: AtomicUsize::new(0),
220            reserved: AtomicUsize::new(0),
221            data: [Self::EMPTY; BITS_USED - 1 - 3],
222        }
223    }
224
225    /// Index the vec without checking the bounds.
226    ///
227    /// To use this correctly, you *must* first ensure that the `idx <
228    /// self.len()`.  This not only prevents overwriting the bounds, but also
229    /// creates the memory barriers to ensure that the data is visible to the
230    /// current thread.  In single-threaded code, however, it is not needed to
231    /// call `self.len()` explicitly (if e.g. you have counted the number of
232    /// elements pushed).
233    unsafe fn get_unchecked(&self, idx: usize) -> &T {
234        let (array, offset) = indices(idx);
235        // We use a Relaxed load of the pointer, because the length check (which
236        // was supposed to be performed) should ensure that the data we want is
237        // already visible, since self.len() used Ordering::Acquire on
238        // `self.count` which synchronizes with the Ordering::Release write in
239        // `self.push`.
240        let ptr = *self.data[array as usize].get();
241        &*ptr.add(offset)
242    }
243
244    /// Convert into a standard `Vec`
245    pub fn into_vec(self) -> Vec<T> {
246        let mut vec = Vec::with_capacity(self.len());
247
248        for idx in 0..self.len() {
249            let (array, offset) = indices(idx);
250            // We use a Relaxed load of the pointer, because the loop above (which
251            // ends before `self.len()`) should ensure that the data we want is
252            // already visible, since it Acquired `self.count` which synchronizes
253            // with the write in `self.push`.
254            let ptr = unsafe { *self.data[array as usize].get() };
255
256            // Copy the element value. The copy remaining in the array must not
257            // be used again (i.e. make sure we do not drop it)
258            let value = unsafe { ptr.add(offset).read() };
259
260            vec.push(value);
261        }
262
263        // Prevent dropping the copied-out values by marking the count as 0 before
264        // our own drop is run
265        self.count.store(0, Ordering::Relaxed);
266
267        vec
268    }
269}
270impl<T> std::fmt::Debug for AppendOnlyVec<T>
271where
272    T: std::fmt::Debug,
273{
274    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275        f.debug_list().entries(self.iter()).finish()
276    }
277}
278
279impl<T> Index<usize> for AppendOnlyVec<T> {
280    type Output = T;
281
282    fn index(&self, idx: usize) -> &Self::Output {
283        assert!(idx < self.len()); // this includes the required ordering memory barrier
284        let (array, offset) = indices(idx);
285        // The ptr value below is safe, because the length check above will
286        // ensure that the data we want is already visible, since it used
287        // Ordering::Acquire on `self.count` which synchronizes with the
288        // Ordering::Release write in `self.push`.
289        let ptr = unsafe { *self.data[array as usize].get() };
290        unsafe { &*ptr.add(offset) }
291    }
292}
293
294impl<T> IndexMut<usize> for AppendOnlyVec<T> {
295    fn index_mut(&mut self, idx: usize) -> &mut Self::Output {
296        assert!(idx < self.len()); // this includes the required ordering memory barrier
297        let (array, offset) = indices(idx);
298        // The ptr value below is safe, because the length check above will
299        // ensure that the data we want is already visible, since it used
300        // Ordering::Acquire on `self.count` which synchronizes with the
301        // Ordering::Release write in `self.push`.
302        let ptr = unsafe { *self.data[array as usize].get() };
303
304        // `&mut` is safe because there can be no access to data owned by
305        // `self` except via `self`, and we have `&mut` on `self`
306        unsafe { &mut *ptr.add(offset) }
307    }
308}
309
310impl<T> Drop for AppendOnlyVec<T> {
311    fn drop(&mut self) {
312        // First we'll drop all the `T` in a slightly sloppy way.  FIXME this
313        // could be optimized to avoid reloading the `ptr`.
314        for idx in 0..self.len() {
315            let (array, offset) = indices(idx);
316            // We use a Relaxed load of the pointer, because the loop above (which
317            // ends before `self.len()`) should ensure that the data we want is
318            // already visible, since it Acquired `self.count` which synchronizes
319            // with the write in `self.push`.
320            let ptr = unsafe { *self.data[array as usize].get() };
321            unsafe {
322                std::ptr::drop_in_place(ptr.add(offset));
323            }
324        }
325        // Now we will free all the arrays.
326        for array in 0..self.data.len() as u32 {
327            // This load is relaxed because no other thread can have a reference
328            // to Self because we have a &mut self.
329            let ptr = unsafe { *self.data[array as usize].get() };
330            if !ptr.is_null() {
331                let layout = self.layout(array);
332                unsafe { std::alloc::dealloc(ptr as *mut u8, layout) };
333            } else {
334                break;
335            }
336        }
337    }
338}
339
340/// An `Iterator` for the values contained in the `AppendOnlyVec`
341#[derive(Debug)]
342pub struct IntoIter<T>(std::vec::IntoIter<T>);
343
344impl<T> Iterator for IntoIter<T> {
345    type Item = T;
346
347    fn next(&mut self) -> Option<Self::Item> {
348        self.0.next()
349    }
350
351    fn size_hint(&self) -> (usize, Option<usize>) {
352        self.0.size_hint()
353    }
354}
355
356impl<T> DoubleEndedIterator for IntoIter<T> {
357    fn next_back(&mut self) -> Option<Self::Item> {
358        self.0.next_back()
359    }
360}
361
362impl<T> ExactSizeIterator for IntoIter<T> {
363    fn len(&self) -> usize {
364        self.0.len()
365    }
366}
367
368impl<T> IntoIterator for AppendOnlyVec<T> {
369    type Item = T;
370
371    type IntoIter = IntoIter<T>;
372
373    fn into_iter(self) -> Self::IntoIter {
374        IntoIter(self.into_vec().into_iter())
375    }
376}
377
378impl<T> FromIterator<T> for AppendOnlyVec<T> {
379    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
380        let out = Self::new();
381        for x in iter {
382            let idx = out.pre_push(x);
383            // We can be relaxed here because no one else has access to
384            // this data, and if it is passed to another thread that will involve
385            // the appropriate memory barrier.
386            out.count.store(idx + 1, Ordering::Relaxed);
387        }
388        out
389    }
390}
391
392impl<T> From<Vec<T>> for AppendOnlyVec<T> {
393    fn from(value: Vec<T>) -> Self {
394        value.into_iter().collect()
395    }
396}
397
398#[test]
399fn test_pushing_and_indexing() {
400    let v = AppendOnlyVec::<usize>::new();
401
402    for n in 0..50 {
403        v.push(n);
404        assert_eq!(v.len(), n + 1);
405        for i in 0..(n + 1) {
406            assert_eq!(v[i], i);
407        }
408    }
409
410    let vec: Vec<usize> = v.iter().copied().collect();
411    let ve2: Vec<usize> = (0..50).collect();
412    assert_eq!(vec, ve2);
413}
414
415#[test]
416fn test_parallel_pushing() {
417    use std::sync::Arc;
418    let v = Arc::new(AppendOnlyVec::<u64>::new());
419    let mut threads = Vec::new();
420    const N: u64 = 100;
421    for thread_num in 0..N {
422        let v = v.clone();
423        threads.push(std::thread::spawn(move || {
424            let which1 = v.push(thread_num);
425            let which2 = v.push(thread_num);
426            assert_eq!(v[which1 as usize], thread_num);
427            assert_eq!(v[which2 as usize], thread_num);
428        }));
429    }
430    for t in threads {
431        t.join().ok();
432    }
433    for thread_num in 0..N {
434        assert_eq!(2, v.iter().copied().filter(|&x| x == thread_num).count());
435    }
436}
437
438#[test]
439fn test_into_vec() {
440    struct SafeToDrop(bool);
441
442    impl Drop for SafeToDrop {
443        fn drop(&mut self) {
444            assert!(self.0);
445        }
446    }
447
448    let v = AppendOnlyVec::new();
449
450    for _ in 0..50 {
451        v.push(SafeToDrop(false));
452    }
453
454    let mut v = v.into_vec();
455
456    for i in v.iter_mut() {
457        i.0 = true;
458    }
459}
460
461#[test]
462fn test_push_then_index_mut() {
463    let mut v = AppendOnlyVec::<usize>::new();
464    for i in 0..1024 {
465        v.push(i);
466    }
467    for i in 0..1024 {
468        v[i] += i;
469    }
470    for i in 0..1024 {
471        assert_eq!(v[i], 2 * i);
472    }
473}
474
475#[test]
476fn test_from_vec() {
477    for v in [vec![5_i32, 4, 3, 2, 1], Vec::new(), vec![1]] {
478        let aov: AppendOnlyVec<i32> = v.clone().into();
479        assert_eq!(v, aov.into_vec());
480    }
481}