append_only_vec/lib.rs
1//! AppendOnlyVec
2//!
3//! This is a pretty simple type, which is a vector that you can push into, but
4//! cannot modify the elements of. The data structure never moves an element
5//! once allocated, so you can push to the vec even while holding references to
6//! elements that have already been pushed.
7//!
8//! ### Scaling
9//!
10//! 1. Accessing an element is O(1), but slightly more expensive than for a
11//! standard `Vec`.
12//!
13//! 2. Pushing a new element amortizes to O(1), but may require allocation of a
14//! new chunk.
15//!
16//! ### Example
17//!
18//! ```
19//! use append_only_vec::AppendOnlyVec;
20//! static V: AppendOnlyVec<String> = AppendOnlyVec::<String>::new();
21//! let mut threads = Vec::new();
22//! for thread_num in 0..10 {
23//! threads.push(std::thread::spawn(move || {
24//! for n in 0..100 {
25//! let s = format!("thread {} says {}", thread_num, n);
26//! let which = V.push(s.clone());
27//! assert_eq!(&V[which], &s);
28//! }
29//! }));
30//! }
31//! for t in threads {
32//! t.join();
33//! }
34//! assert_eq!(V.len(), 1000);
35//! ```
36
37use std::cell::UnsafeCell;
38use std::ops::{Index, IndexMut};
39use std::sync::atomic::AtomicUsize;
40use std::sync::atomic::Ordering;
41pub struct AppendOnlyVec<T> {
42 count: AtomicUsize,
43 reserved: AtomicUsize,
44 data: [UnsafeCell<*mut T>; BITS_USED - 1 - 3],
45}
46
47unsafe impl<T: Send> Send for AppendOnlyVec<T> {}
48unsafe impl<T: Sync + Send> Sync for AppendOnlyVec<T> {}
49
50const BITS: usize = std::mem::size_of::<usize>() * 8;
51
52#[cfg(target_arch = "x86_64")]
53const BITS_USED: usize = 48;
54#[cfg(all(not(target_arch = "x86_64"), target_pointer_width = "64"))]
55const BITS_USED: usize = 64;
56#[cfg(target_pointer_width = "32")]
57const BITS_USED: usize = 32;
58
59// This takes an index into a vec, and determines which data array will hold it
60// (the first return value), and what the index will be into that data array
61// (second return value)
62//
63// The ith data array holds 1<<i values.
64const fn indices(i: usize) -> (u32, usize) {
65 let i = i + 8;
66 let bin = BITS as u32 - 1 - i.leading_zeros();
67 let bin = bin - 3;
68 let offset = i - bin_size(bin);
69 (bin, offset)
70}
71const fn bin_size(array: u32) -> usize {
72 (1 << 3) << array
73}
74
75#[test]
76fn test_indices() {
77 for i in 0..32 {
78 println!("{:3}: {} {}", i, indices(i).0, indices(i).1);
79 }
80 let mut array = 0;
81 let mut offset = 0;
82 let mut index = 0;
83 while index < 1000 {
84 index += 1;
85 offset += 1;
86 if offset >= bin_size(array) {
87 offset = 0;
88 array += 1;
89 }
90 assert_eq!(indices(index), (array, offset));
91 }
92}
93
94impl<T> Default for AppendOnlyVec<T> {
95 fn default() -> Self {
96 Self::new()
97 }
98}
99
100impl<T> AppendOnlyVec<T> {
101 /// Return an `Iterator` over the elements of the vec.
102 pub fn iter(&self) -> impl DoubleEndedIterator<Item = &T> + ExactSizeIterator {
103 // FIXME this could be written to be a little more efficient probably,
104 // if we made it read each pointer only once. On the other hand, that
105 // could make a reversed iterator less efficient?
106 (0..self.len()).map(|i| unsafe { self.get_unchecked(i) })
107 }
108 /// Find the length of the array.
109 #[inline]
110 pub fn len(&self) -> usize {
111 self.count.load(Ordering::Acquire)
112 }
113
114 fn layout(&self, array: u32) -> std::alloc::Layout {
115 std::alloc::Layout::array::<T>(bin_size(array)).unwrap()
116 }
117 /// Internal-only function requests a slot and puts data into it.
118 ///
119 /// However this does not update the size of the vec, which *must* be done
120 /// in order for either the value to be readable *or* for future pushes to
121 /// actually terminate.
122 fn pre_push(&self, val: T) -> usize {
123 let idx = self.reserved.fetch_add(1, Ordering::Relaxed);
124 let (array, offset) = indices(idx);
125 let ptr = if self.len() < 1 + idx - offset {
126 // We are working on a new array, which may not have been allocated...
127 if offset == 0 {
128 // It is our job to allocate the array! The size of the array
129 // is determined in the self.layout method, which needs to be
130 // consistent with the indices function.
131 let layout = self.layout(array);
132 let ptr = unsafe { std::alloc::alloc(layout) } as *mut T;
133 unsafe {
134 *self.data[array as usize].get() = ptr;
135 }
136 ptr
137 } else {
138 // We need to wait for the array to be allocated.
139 while self.len() < 1 + idx - offset {
140 std::hint::spin_loop();
141 }
142 // The Ordering::Acquire semantics of self.len() ensures that
143 // this pointer read will get the non-null pointer allocated
144 // above.
145 unsafe { *self.data[array as usize].get() }
146 }
147 } else {
148 // The Ordering::Acquire semantics of self.len() ensures that
149 // this pointer read will get the non-null pointer allocated
150 // above.
151 unsafe { *self.data[array as usize].get() }
152 };
153
154 // The contents of this offset are guaranteed to be unused (so far)
155 // because we got the idx from our fetch_add above, and ptr is
156 // guaranteed to be valid because of the loop we used above, which used
157 // self.len() which has Ordering::Acquire semantics.
158 unsafe { (ptr.add(offset)).write(val) };
159 idx
160 }
161 /// Append an element to the array
162 ///
163 /// This is notable in that it doesn't require a `&mut self`, because it
164 /// does appropriate atomic synchronization.
165 ///
166 /// The return value is the index tha was pushed to.
167 pub fn push(&self, val: T) -> usize {
168 let idx = self.pre_push(val);
169
170 // Now we need to increase the size of the vec, so it can get read. We
171 // use Release upon success, to ensure that the value which we wrote is
172 // visible to any thread that has confirmed that the count is big enough
173 // to read that element. In case of failure, we can be relaxed, since
174 // we don't do anything with the result other than try again.
175 while self
176 .count
177 .compare_exchange(idx, idx + 1, Ordering::Release, Ordering::Relaxed)
178 .is_err()
179 {
180 // This means that someone else *started* pushing before we started,
181 // but hasn't yet finished. We have to wait for them to finish
182 // pushing before we can update the count. Note that using a
183 // spinloop here isn't really ideal, but except when allocating a
184 // new array, the window between reserving space and using it is
185 // pretty small, so contention will hopefully be rare, and having a
186 // context switch during that interval will hopefully be vanishingly
187 // unlikely.
188 std::hint::spin_loop();
189 }
190 idx
191 }
192 /// Extend the vec with the contents of an iterator.
193 ///
194 /// Note: this is currently no more efficient than calling `push` for each
195 /// element of the iterator.
196 pub fn extend(&self, iter: impl IntoIterator<Item = T>) {
197 for val in iter {
198 self.push(val);
199 }
200 }
201 /// Append an element to the array with exclusive access
202 ///
203 /// This is slightly more efficient than [`AppendOnlyVec::push`] since it
204 /// doesn't need to worry about concurrent access.
205 ///
206 /// The return value is the new size of the array.
207 pub fn push_mut(&mut self, val: T) -> usize {
208 let idx = self.pre_push(val);
209 // We do not need synchronization here because no one else has access to
210 // this data, and if it is passed to another thread that will involve
211 // the appropriate memory barrier.
212 self.count.store(idx, Ordering::Relaxed);
213 idx
214 }
215 const EMPTY: UnsafeCell<*mut T> = UnsafeCell::new(std::ptr::null_mut());
216 /// Allocate a new empty array
217 pub const fn new() -> Self {
218 AppendOnlyVec {
219 count: AtomicUsize::new(0),
220 reserved: AtomicUsize::new(0),
221 data: [Self::EMPTY; BITS_USED - 1 - 3],
222 }
223 }
224
225 /// Index the vec without checking the bounds.
226 ///
227 /// To use this correctly, you *must* first ensure that the `idx <
228 /// self.len()`. This not only prevents overwriting the bounds, but also
229 /// creates the memory barriers to ensure that the data is visible to the
230 /// current thread. In single-threaded code, however, it is not needed to
231 /// call `self.len()` explicitly (if e.g. you have counted the number of
232 /// elements pushed).
233 unsafe fn get_unchecked(&self, idx: usize) -> &T {
234 let (array, offset) = indices(idx);
235 // We use a Relaxed load of the pointer, because the length check (which
236 // was supposed to be performed) should ensure that the data we want is
237 // already visible, since self.len() used Ordering::Acquire on
238 // `self.count` which synchronizes with the Ordering::Release write in
239 // `self.push`.
240 let ptr = *self.data[array as usize].get();
241 &*ptr.add(offset)
242 }
243
244 /// Convert into a standard `Vec`
245 pub fn into_vec(self) -> Vec<T> {
246 let mut vec = Vec::with_capacity(self.len());
247
248 for idx in 0..self.len() {
249 let (array, offset) = indices(idx);
250 // We use a Relaxed load of the pointer, because the loop above (which
251 // ends before `self.len()`) should ensure that the data we want is
252 // already visible, since it Acquired `self.count` which synchronizes
253 // with the write in `self.push`.
254 let ptr = unsafe { *self.data[array as usize].get() };
255
256 // Copy the element value. The copy remaining in the array must not
257 // be used again (i.e. make sure we do not drop it)
258 let value = unsafe { ptr.add(offset).read() };
259
260 vec.push(value);
261 }
262
263 // Prevent dropping the copied-out values by marking the count as 0 before
264 // our own drop is run
265 self.count.store(0, Ordering::Relaxed);
266
267 vec
268 }
269}
270impl<T> std::fmt::Debug for AppendOnlyVec<T>
271where
272 T: std::fmt::Debug,
273{
274 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275 f.debug_list().entries(self.iter()).finish()
276 }
277}
278
279impl<T> Index<usize> for AppendOnlyVec<T> {
280 type Output = T;
281
282 fn index(&self, idx: usize) -> &Self::Output {
283 assert!(idx < self.len()); // this includes the required ordering memory barrier
284 let (array, offset) = indices(idx);
285 // The ptr value below is safe, because the length check above will
286 // ensure that the data we want is already visible, since it used
287 // Ordering::Acquire on `self.count` which synchronizes with the
288 // Ordering::Release write in `self.push`.
289 let ptr = unsafe { *self.data[array as usize].get() };
290 unsafe { &*ptr.add(offset) }
291 }
292}
293
294impl<T> IndexMut<usize> for AppendOnlyVec<T> {
295 fn index_mut(&mut self, idx: usize) -> &mut Self::Output {
296 assert!(idx < self.len()); // this includes the required ordering memory barrier
297 let (array, offset) = indices(idx);
298 // The ptr value below is safe, because the length check above will
299 // ensure that the data we want is already visible, since it used
300 // Ordering::Acquire on `self.count` which synchronizes with the
301 // Ordering::Release write in `self.push`.
302 let ptr = unsafe { *self.data[array as usize].get() };
303
304 // `&mut` is safe because there can be no access to data owned by
305 // `self` except via `self`, and we have `&mut` on `self`
306 unsafe { &mut *ptr.add(offset) }
307 }
308}
309
310impl<T> Drop for AppendOnlyVec<T> {
311 fn drop(&mut self) {
312 // First we'll drop all the `T` in a slightly sloppy way. FIXME this
313 // could be optimized to avoid reloading the `ptr`.
314 for idx in 0..self.len() {
315 let (array, offset) = indices(idx);
316 // We use a Relaxed load of the pointer, because the loop above (which
317 // ends before `self.len()`) should ensure that the data we want is
318 // already visible, since it Acquired `self.count` which synchronizes
319 // with the write in `self.push`.
320 let ptr = unsafe { *self.data[array as usize].get() };
321 unsafe {
322 std::ptr::drop_in_place(ptr.add(offset));
323 }
324 }
325 // Now we will free all the arrays.
326 for array in 0..self.data.len() as u32 {
327 // This load is relaxed because no other thread can have a reference
328 // to Self because we have a &mut self.
329 let ptr = unsafe { *self.data[array as usize].get() };
330 if !ptr.is_null() {
331 let layout = self.layout(array);
332 unsafe { std::alloc::dealloc(ptr as *mut u8, layout) };
333 } else {
334 break;
335 }
336 }
337 }
338}
339
340/// An `Iterator` for the values contained in the `AppendOnlyVec`
341#[derive(Debug)]
342pub struct IntoIter<T>(std::vec::IntoIter<T>);
343
344impl<T> Iterator for IntoIter<T> {
345 type Item = T;
346
347 fn next(&mut self) -> Option<Self::Item> {
348 self.0.next()
349 }
350
351 fn size_hint(&self) -> (usize, Option<usize>) {
352 self.0.size_hint()
353 }
354}
355
356impl<T> DoubleEndedIterator for IntoIter<T> {
357 fn next_back(&mut self) -> Option<Self::Item> {
358 self.0.next_back()
359 }
360}
361
362impl<T> ExactSizeIterator for IntoIter<T> {
363 fn len(&self) -> usize {
364 self.0.len()
365 }
366}
367
368impl<T> IntoIterator for AppendOnlyVec<T> {
369 type Item = T;
370
371 type IntoIter = IntoIter<T>;
372
373 fn into_iter(self) -> Self::IntoIter {
374 IntoIter(self.into_vec().into_iter())
375 }
376}
377
378impl<T> FromIterator<T> for AppendOnlyVec<T> {
379 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
380 let out = Self::new();
381 for x in iter {
382 let idx = out.pre_push(x);
383 // We can be relaxed here because no one else has access to
384 // this data, and if it is passed to another thread that will involve
385 // the appropriate memory barrier.
386 out.count.store(idx + 1, Ordering::Relaxed);
387 }
388 out
389 }
390}
391
392impl<T> From<Vec<T>> for AppendOnlyVec<T> {
393 fn from(value: Vec<T>) -> Self {
394 value.into_iter().collect()
395 }
396}
397
398#[test]
399fn test_pushing_and_indexing() {
400 let v = AppendOnlyVec::<usize>::new();
401
402 for n in 0..50 {
403 v.push(n);
404 assert_eq!(v.len(), n + 1);
405 for i in 0..(n + 1) {
406 assert_eq!(v[i], i);
407 }
408 }
409
410 let vec: Vec<usize> = v.iter().copied().collect();
411 let ve2: Vec<usize> = (0..50).collect();
412 assert_eq!(vec, ve2);
413}
414
415#[test]
416fn test_parallel_pushing() {
417 use std::sync::Arc;
418 let v = Arc::new(AppendOnlyVec::<u64>::new());
419 let mut threads = Vec::new();
420 const N: u64 = 100;
421 for thread_num in 0..N {
422 let v = v.clone();
423 threads.push(std::thread::spawn(move || {
424 let which1 = v.push(thread_num);
425 let which2 = v.push(thread_num);
426 assert_eq!(v[which1 as usize], thread_num);
427 assert_eq!(v[which2 as usize], thread_num);
428 }));
429 }
430 for t in threads {
431 t.join().ok();
432 }
433 for thread_num in 0..N {
434 assert_eq!(2, v.iter().copied().filter(|&x| x == thread_num).count());
435 }
436}
437
438#[test]
439fn test_into_vec() {
440 struct SafeToDrop(bool);
441
442 impl Drop for SafeToDrop {
443 fn drop(&mut self) {
444 assert!(self.0);
445 }
446 }
447
448 let v = AppendOnlyVec::new();
449
450 for _ in 0..50 {
451 v.push(SafeToDrop(false));
452 }
453
454 let mut v = v.into_vec();
455
456 for i in v.iter_mut() {
457 i.0 = true;
458 }
459}
460
461#[test]
462fn test_push_then_index_mut() {
463 let mut v = AppendOnlyVec::<usize>::new();
464 for i in 0..1024 {
465 v.push(i);
466 }
467 for i in 0..1024 {
468 v[i] += i;
469 }
470 for i in 0..1024 {
471 assert_eq!(v[i], 2 * i);
472 }
473}
474
475#[test]
476fn test_from_vec() {
477 for v in [vec![5_i32, 4, 3, 2, 1], Vec::new(), vec![1]] {
478 let aov: AppendOnlyVec<i32> = v.clone().into();
479 assert_eq!(v, aov.into_vec());
480 }
481}