egglog_concurrency/
concurrent_vec.rs

1//! A variant of a vector supporting pushes that do not block reads.
2
3use std::{
4    cell::UnsafeCell,
5    mem::{self, MaybeUninit},
6    ops::Deref,
7    sync::{
8        Mutex,
9        atomic::{AtomicUsize, Ordering},
10    },
11};
12
13use crate::{MutexReader, ReadOptimizedLock};
14
15// NB: probably don't need to do SyncUnsafeCell here. Can probably just do MaybeUninit?
16
17/// A simple concurrent vector type supporting push operations that do not block
18/// reads. Concurrent pushes are serialized, but reads need not wait for writes
19/// to complete, except when the vector needs to be resized.
20pub struct ConcurrentVec<T> {
21    data: ReadOptimizedLock<Vec<MaybeUninit<SyncUnsafeCell<T>>>>,
22    /// The index of the next element to be pushed. This is used to determine
23    /// how many cells have been written to successfully without grabbing any
24    /// exclusive locks.
25    head: AtomicUsize,
26    /// Used to synchronize writes.
27    write_lock: Mutex<()>,
28}
29
30impl<T> Default for ConcurrentVec<T> {
31    fn default() -> Self {
32        Self::new()
33    }
34}
35
36impl<T> Drop for ConcurrentVec<T> {
37    fn drop(&mut self) {
38        let mut writer = self.data.lock();
39        let len = self.head.load(Ordering::Acquire);
40        if mem::needs_drop::<T>() {
41            for i in 0..len {
42                // SAFETY: we own the data, have exclusive access, and know that the
43                // data is valid (you need a valid reference to the vector to
44                // increment `head`, any such call to `head` must have exited by
45                // now).
46                unsafe { writer[i].as_mut_ptr().drop_in_place() };
47            }
48        }
49    }
50}
51
52impl<T> ConcurrentVec<T> {
53    /// Create a new `AsyncVec` with the default capacity (128).
54    pub fn new() -> Self {
55        Self::with_capacity(128)
56    }
57
58    /// Create a new `AsyncVec` with the given starting capacity.
59    pub fn with_capacity(capacity: usize) -> Self {
60        let capacity = capacity.next_power_of_two();
61        Self {
62            data: ReadOptimizedLock::new(Vec::with_capacity(capacity)),
63            head: AtomicUsize::new(0),
64            write_lock: Mutex::new(()),
65        }
66    }
67    /// Push `item` onto the vector. Other calls to `push` may have to complete
68    /// in order for this item to be visible.
69    pub fn push(&self, item: T) -> usize {
70        let _guard = self.write_lock.lock().unwrap();
71        let index = self.head.load(Ordering::Acquire);
72        self.push_at(item, index);
73        self.head.store(index + 1, Ordering::Release);
74        index
75    }
76
77    fn push_at(&self, item: T, index: usize) {
78        let handle = self.data.read();
79        if let Some(slot) = handle.get(index) {
80            // SAFETY: we are tansferring ownership of `item` to the slot.
81            unsafe { ((*slot.as_ptr()).0.get()).write(item) };
82            return;
83        }
84        // `index` is out of bounds. Need to resize.
85        mem::drop(handle);
86        let mut writer = self.data.lock();
87        if index >= writer.len() {
88            writer.resize_with((index + 1).next_power_of_two(), MaybeUninit::uninit);
89        }
90        mem::drop(writer);
91        self.push_at(item, index);
92    }
93
94    pub fn read(&self) -> impl Deref<Target = [T]> + '_ {
95        let valid_prefix = self.head.load(Ordering::Acquire);
96        let reader = self.data.read();
97        ReadHandle {
98            valid_prefix,
99            reader,
100        }
101    }
102}
103
104struct ReadHandle<'a, T> {
105    valid_prefix: usize,
106    reader: MutexReader<'a, Vec<MaybeUninit<SyncUnsafeCell<T>>>>,
107}
108
109impl<T> Deref for ReadHandle<'_, T> {
110    type Target = [T];
111
112    fn deref(&self) -> &[T] {
113        // SAFETY: all elements up to `prefix` are valid, and MaybeUninit<T> has
114        // a compatible layout with T so long as T is properly initialized.
115        //
116        // NB: transmuting an UnsafeCell<T> to <T> may not be safe long-term,
117        // even though this code passes miri.
118        unsafe {
119            mem::transmute::<&[MaybeUninit<SyncUnsafeCell<T>>], &[T]>(
120                &self.reader[0..self.valid_prefix],
121            )
122        }
123    }
124}
125
126struct SyncUnsafeCell<T>(UnsafeCell<T>);
127
128unsafe impl<T: Send> Send for SyncUnsafeCell<T> {}
129unsafe impl<T: Sync> Sync for SyncUnsafeCell<T> {}