append_only_vec/lib.rs
1//! AppendOnlyVec
2//!
3//! This is a pretty simple type, which is a vector that you can push into, but
4//! cannot modify the elements of. The data structure never moves an element
5//! once allocated, so you can push to the vec even while holding references to
6//! elements that have already been pushed.
7//!
8//! ### Scaling
9//!
10//! 1. Accessing an element is O(1), but slightly more expensive than for a
11//! standard `Vec`.
12//!
13//! 2. Pushing a new element amortizes to O(1), but may require allocation of a
14//! new chunk.
15//!
16//! ### Example
17//!
18//! ```
19//! use append_only_vec::AppendOnlyVec;
20//! static V: AppendOnlyVec<String> = AppendOnlyVec::<String>::new();
21//! let mut threads = Vec::new();
22//! for thread_num in 0..10 {
23//! threads.push(std::thread::spawn(move || {
24//! for n in 0..100 {
25//! let s = format!("thread {} says {}", thread_num, n);
26//! let which = V.push(s.clone());
27//! assert_eq!(&V[which], &s);
28//! }
29//! }));
30//! }
31//! for t in threads {
32//! t.join();
33//! }
34//! assert_eq!(V.len(), 1000);
35//! ```
36
37use std::cell::UnsafeCell;
38use std::ops::{Index, IndexMut};
39use std::sync::atomic::AtomicUsize;
40use std::sync::atomic::Ordering;
41pub struct AppendOnlyVec<T> {
42 count: AtomicUsize,
43 reserved: AtomicUsize,
44 data: [UnsafeCell<*mut T>; BITS_USED - 1 - 3],
45}
46
47unsafe impl<T: Send> Send for AppendOnlyVec<T> {}
48unsafe impl<T: Sync + Send> Sync for AppendOnlyVec<T> {}
49
50const BITS: usize = std::mem::size_of::<usize>() * 8;
51
52#[cfg(target_arch = "x86_64")]
53const BITS_USED: usize = 48;
54#[cfg(all(not(target_arch = "x86_64"), target_pointer_width = "64"))]
55const BITS_USED: usize = 64;
56#[cfg(target_pointer_width = "32")]
57const BITS_USED: usize = 32;
58
59// This takes an index into a vec, and determines which data array will hold it
60// (the first return value), and what the index will be into that data array
61// (second return value)
62//
63// The ith data array holds 1<<i values.
64const fn indices(i: usize) -> (u32, usize) {
65 let i = i + 8;
66 let bin = BITS as u32 - 1 - i.leading_zeros();
67 let bin = bin - 3;
68 let offset = i - bin_size(bin);
69 (bin, offset)
70}
71const fn bin_size(array: u32) -> usize {
72 (1 << 3) << array
73}
74
75fn spin_wait(failures: &mut usize) {
76 *failures += 1;
77 if *failures <= 3 {
78 // If there haven't been many failures yet, then we optimistically
79 // spinloop.
80 for _ in 0..(1 << *failures) {
81 std::hint::spin_loop();
82 }
83 } else {
84 // If there have been many failures, then continuing to spinloop will
85 // probably just waste CPU, and whoever we are waiting for has been
86 // preempted then spinning could actively delay completion of the task.
87 // So instead, we cooperatively yield to the OS scheduler.
88 std::thread::yield_now();
89 }
90}
91
92#[test]
93fn test_indices() {
94 for i in 0..32 {
95 println!("{:3}: {} {}", i, indices(i).0, indices(i).1);
96 }
97 let mut array = 0;
98 let mut offset = 0;
99 let mut index = 0;
100 while index < 1000 {
101 index += 1;
102 offset += 1;
103 if offset >= bin_size(array) {
104 offset = 0;
105 array += 1;
106 }
107 assert_eq!(indices(index), (array, offset));
108 }
109}
110
111impl<T> Default for AppendOnlyVec<T> {
112 fn default() -> Self {
113 Self::new()
114 }
115}
116
117impl<T> AppendOnlyVec<T> {
118 /// Return an `Iterator` over the elements of the vec.
119 pub fn iter(&self) -> impl DoubleEndedIterator<Item = &T> + ExactSizeIterator {
120 // FIXME this could be written to be a little more efficient probably,
121 // if we made it read each pointer only once. On the other hand, that
122 // could make a reversed iterator less efficient?
123 (0..self.len()).map(|i| unsafe { self.get_unchecked(i) })
124 }
125 /// Find the length of the array.
126 #[inline]
127 pub fn len(&self) -> usize {
128 self.count.load(Ordering::Acquire)
129 }
130
131 fn layout(&self, array: u32) -> std::alloc::Layout {
132 std::alloc::Layout::array::<T>(bin_size(array)).unwrap()
133 }
134 /// Internal-only function requests a slot and puts data into it.
135 ///
136 /// However this does not update the size of the vec, which *must* be done
137 /// in order for either the value to be readable *or* for future pushes to
138 /// actually terminate.
139 fn pre_push(&self, val: T) -> usize {
140 let idx = self.reserved.fetch_add(1, Ordering::Relaxed);
141 let (array, offset) = indices(idx);
142 let ptr = if self.len() < 1 + idx - offset {
143 // We are working on a new array, which may not have been allocated...
144 if offset == 0 {
145 // It is our job to allocate the array! The size of the array
146 // is determined in the self.layout method, which needs to be
147 // consistent with the indices function.
148 let layout = self.layout(array);
149 let ptr = unsafe { std::alloc::alloc(layout) } as *mut T;
150 unsafe {
151 *self.data[array as usize].get() = ptr;
152 }
153 ptr
154 } else {
155 // We need to wait for the array to be allocated.
156 let mut failures = 0;
157 while self.len() < 1 + idx - offset {
158 spin_wait(&mut failures);
159 }
160 // The Ordering::Acquire semantics of self.len() ensures that
161 // this pointer read will get the non-null pointer allocated
162 // above.
163 unsafe { *self.data[array as usize].get() }
164 }
165 } else {
166 // The Ordering::Acquire semantics of self.len() ensures that
167 // this pointer read will get the non-null pointer allocated
168 // above.
169 unsafe { *self.data[array as usize].get() }
170 };
171
172 // The contents of this offset are guaranteed to be unused (so far)
173 // because we got the idx from our fetch_add above, and ptr is
174 // guaranteed to be valid because of the loop we used above, which used
175 // self.len() which has Ordering::Acquire semantics.
176 unsafe { (ptr.add(offset)).write(val) };
177 idx
178 }
179 /// Append an element to the array
180 ///
181 /// This is notable in that it doesn't require a `&mut self`, because it
182 /// does appropriate atomic synchronization.
183 ///
184 /// The return value is the index tha was pushed to.
185 pub fn push(&self, val: T) -> usize {
186 let idx = self.pre_push(val);
187
188 // Now we need to increase the size of the vec, so it can get read. We
189 // use Release upon success, to ensure that the value which we wrote is
190 // visible to any thread that has confirmed that the count is big enough
191 // to read that element. In case of failure, we can be relaxed, since
192 // we don't do anything with the result other than try again.
193 let mut failures = 0;
194 while self
195 .count
196 .compare_exchange(idx, idx + 1, Ordering::Release, Ordering::Relaxed)
197 .is_err()
198 {
199 // This means that someone else *started* pushing before we started,
200 // but hasn't yet finished. We have to wait for them to finish
201 // pushing before we can update the count.
202 spin_wait(&mut failures);
203 }
204 idx
205 }
206 /// Extend the vec with the contents of an iterator.
207 ///
208 /// Note: this is currently no more efficient than calling `push` for each
209 /// element of the iterator.
210 pub fn extend(&self, iter: impl IntoIterator<Item = T>) {
211 for val in iter {
212 self.push(val);
213 }
214 }
215 /// Append an element to the array with exclusive access
216 ///
217 /// This is slightly more efficient than [`AppendOnlyVec::push`] since it
218 /// doesn't need to worry about concurrent access.
219 ///
220 /// The return value is the new size of the array.
221 pub fn push_mut(&mut self, val: T) -> usize {
222 let idx = self.pre_push(val);
223 // We do not need synchronization here because no one else has access to
224 // this data, and if it is passed to another thread that will involve
225 // the appropriate memory barrier.
226 self.count.store(idx + 1, Ordering::Relaxed);
227 idx
228 }
229 const EMPTY: UnsafeCell<*mut T> = UnsafeCell::new(std::ptr::null_mut());
230 /// Allocate a new empty array
231 pub const fn new() -> Self {
232 AppendOnlyVec {
233 count: AtomicUsize::new(0),
234 reserved: AtomicUsize::new(0),
235 data: [Self::EMPTY; BITS_USED - 1 - 3],
236 }
237 }
238
239 /// Index the vec without checking the bounds.
240 ///
241 /// To use this correctly, you *must* first ensure that the `idx <
242 /// self.len()`. This not only prevents overwriting the bounds, but also
243 /// creates the memory barriers to ensure that the data is visible to the
244 /// current thread. In single-threaded code, however, it is not needed to
245 /// call `self.len()` explicitly (if e.g. you have counted the number of
246 /// elements pushed).
247 unsafe fn get_unchecked(&self, idx: usize) -> &T {
248 let (array, offset) = indices(idx);
249 // We use a Relaxed load of the pointer, because the length check (which
250 // was supposed to be performed) should ensure that the data we want is
251 // already visible, since self.len() used Ordering::Acquire on
252 // `self.count` which synchronizes with the Ordering::Release write in
253 // `self.push`.
254 let ptr = *self.data[array as usize].get();
255 &*ptr.add(offset)
256 }
257
258 /// Convert into a standard `Vec`
259 pub fn into_vec(self) -> Vec<T> {
260 let mut vec = Vec::with_capacity(self.len());
261
262 for idx in 0..self.len() {
263 let (array, offset) = indices(idx);
264 // We use a Relaxed load of the pointer, because the loop above (which
265 // ends before `self.len()`) should ensure that the data we want is
266 // already visible, since it Acquired `self.count` which synchronizes
267 // with the write in `self.push`.
268 let ptr = unsafe { *self.data[array as usize].get() };
269
270 // Copy the element value. The copy remaining in the array must not
271 // be used again (i.e. make sure we do not drop it)
272 let value = unsafe { ptr.add(offset).read() };
273
274 vec.push(value);
275 }
276
277 // Prevent dropping the copied-out values by marking the count as 0 before
278 // our own drop is run
279 self.count.store(0, Ordering::Relaxed);
280
281 vec
282 }
283}
284impl<T> std::fmt::Debug for AppendOnlyVec<T>
285where
286 T: std::fmt::Debug,
287{
288 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
289 f.debug_list().entries(self.iter()).finish()
290 }
291}
292
293impl<T> Index<usize> for AppendOnlyVec<T> {
294 type Output = T;
295
296 fn index(&self, idx: usize) -> &Self::Output {
297 assert!(idx < self.len()); // this includes the required ordering memory barrier
298 let (array, offset) = indices(idx);
299 // The ptr value below is safe, because the length check above will
300 // ensure that the data we want is already visible, since it used
301 // Ordering::Acquire on `self.count` which synchronizes with the
302 // Ordering::Release write in `self.push`.
303 let ptr = unsafe { *self.data[array as usize].get() };
304 unsafe { &*ptr.add(offset) }
305 }
306}
307
308impl<T> IndexMut<usize> for AppendOnlyVec<T> {
309 fn index_mut(&mut self, idx: usize) -> &mut Self::Output {
310 assert!(idx < self.len()); // this includes the required ordering memory barrier
311 let (array, offset) = indices(idx);
312 // The ptr value below is safe, because the length check above will
313 // ensure that the data we want is already visible, since it used
314 // Ordering::Acquire on `self.count` which synchronizes with the
315 // Ordering::Release write in `self.push`.
316 let ptr = unsafe { *self.data[array as usize].get() };
317
318 // `&mut` is safe because there can be no access to data owned by
319 // `self` except via `self`, and we have `&mut` on `self`
320 unsafe { &mut *ptr.add(offset) }
321 }
322}
323
324impl<T> Drop for AppendOnlyVec<T> {
325 fn drop(&mut self) {
326 // First we'll drop all the `T` in a slightly sloppy way. FIXME this
327 // could be optimized to avoid reloading the `ptr`.
328 for idx in 0..self.len() {
329 let (array, offset) = indices(idx);
330 // We use a Relaxed load of the pointer, because the loop above (which
331 // ends before `self.len()`) should ensure that the data we want is
332 // already visible, since it Acquired `self.count` which synchronizes
333 // with the write in `self.push`.
334 let ptr = unsafe { *self.data[array as usize].get() };
335 unsafe {
336 std::ptr::drop_in_place(ptr.add(offset));
337 }
338 }
339 // Now we will free all the arrays.
340 for array in 0..self.data.len() as u32 {
341 // This load is relaxed because no other thread can have a reference
342 // to Self because we have a &mut self.
343 let ptr = unsafe { *self.data[array as usize].get() };
344 if !ptr.is_null() {
345 let layout = self.layout(array);
346 unsafe { std::alloc::dealloc(ptr as *mut u8, layout) };
347 } else {
348 break;
349 }
350 }
351 }
352}
353
354impl<T> Clone for AppendOnlyVec<T>
355where
356 T: Clone,
357{
358 fn clone(&self) -> Self {
359 // FIXME this could be optimized to avoid reloading pointers.
360 self.iter().cloned().collect()
361 }
362}
363
364/// An `Iterator` for the values contained in the `AppendOnlyVec`
365#[derive(Debug)]
366pub struct IntoIter<T>(std::vec::IntoIter<T>);
367
368impl<T> Iterator for IntoIter<T> {
369 type Item = T;
370
371 fn next(&mut self) -> Option<Self::Item> {
372 self.0.next()
373 }
374
375 fn size_hint(&self) -> (usize, Option<usize>) {
376 self.0.size_hint()
377 }
378}
379
380impl<T> DoubleEndedIterator for IntoIter<T> {
381 fn next_back(&mut self) -> Option<Self::Item> {
382 self.0.next_back()
383 }
384}
385
386impl<T> ExactSizeIterator for IntoIter<T> {
387 fn len(&self) -> usize {
388 self.0.len()
389 }
390}
391
392impl<T> IntoIterator for AppendOnlyVec<T> {
393 type Item = T;
394
395 type IntoIter = IntoIter<T>;
396
397 fn into_iter(self) -> Self::IntoIter {
398 IntoIter(self.into_vec().into_iter())
399 }
400}
401
402impl<T> FromIterator<T> for AppendOnlyVec<T> {
403 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
404 let out = Self::new();
405 for x in iter {
406 let idx = out.pre_push(x);
407 // We can be relaxed here because no one else has access to
408 // this data, and if it is passed to another thread that will involve
409 // the appropriate memory barrier.
410 out.count.store(idx + 1, Ordering::Relaxed);
411 }
412 out
413 }
414}
415
416impl<T> From<Vec<T>> for AppendOnlyVec<T> {
417 fn from(value: Vec<T>) -> Self {
418 value.into_iter().collect()
419 }
420}
421
422#[test]
423fn test_pushing_and_indexing() {
424 let v = AppendOnlyVec::<usize>::new();
425
426 for n in 0..50 {
427 v.push(n);
428 assert_eq!(v.len(), n + 1);
429 for i in 0..(n + 1) {
430 assert_eq!(v[i], i);
431 }
432 }
433
434 let vec: Vec<usize> = v.iter().copied().collect();
435 let ve2: Vec<usize> = (0..50).collect();
436 assert_eq!(vec, ve2);
437}
438
439#[test]
440fn test_parallel_pushing() {
441 use std::sync::Arc;
442 let v = Arc::new(AppendOnlyVec::<u64>::new());
443 let mut threads = Vec::new();
444 const N: u64 = 100;
445 for thread_num in 0..N {
446 let v = v.clone();
447 threads.push(std::thread::spawn(move || {
448 let which1 = v.push(thread_num);
449 let which2 = v.push(thread_num);
450 assert_eq!(v[which1 as usize], thread_num);
451 assert_eq!(v[which2 as usize], thread_num);
452 }));
453 }
454 for t in threads {
455 t.join().ok();
456 }
457 for thread_num in 0..N {
458 assert_eq!(2, v.iter().copied().filter(|&x| x == thread_num).count());
459 }
460}
461
462#[test]
463fn test_into_vec() {
464 struct SafeToDrop(bool);
465
466 impl Drop for SafeToDrop {
467 fn drop(&mut self) {
468 assert!(self.0);
469 }
470 }
471
472 let v = AppendOnlyVec::new();
473
474 for _ in 0..50 {
475 v.push(SafeToDrop(false));
476 }
477
478 let mut v = v.into_vec();
479
480 for i in v.iter_mut() {
481 i.0 = true;
482 }
483}
484
485#[test]
486fn test_push_then_index_mut() {
487 let mut v = AppendOnlyVec::<usize>::new();
488 for i in 0..1024 {
489 v.push(i);
490 }
491 for i in 0..1024 {
492 v[i] += i;
493 }
494 for i in 0..1024 {
495 assert_eq!(v[i], 2 * i);
496 }
497}
498
499#[test]
500fn test_from_vec() {
501 for v in [vec![5_i32, 4, 3, 2, 1], Vec::new(), vec![1]] {
502 let aov: AppendOnlyVec<i32> = v.clone().into();
503 assert_eq!(v, aov.into_vec());
504 }
505}
506
507#[test]
508fn test_clone() {
509 let v = AppendOnlyVec::<String>::new();
510 for i in 0..1024 {
511 v.push(format!("{}", i));
512 }
513 let v2 = v.clone();
514
515 assert_eq!(v.len(), v2.len());
516 for i in 0..1024 {
517 assert_eq!(v[i], v2[i]);
518 }
519}
520
521#[test]
522fn test_push_mut() {
523 let mut v = AppendOnlyVec::new();
524 for i in 0..1024 {
525 v.push_mut(format!("{}", i));
526 }
527 assert_eq!(v.len(), 1024);
528}