egglog_concurrency/
concurrent_vec.rs1use std::{
4    cell::UnsafeCell,
5    mem::{self, MaybeUninit},
6    ops::Deref,
7    sync::{
8        Mutex,
9        atomic::{AtomicUsize, Ordering},
10    },
11};
12
13use crate::{MutexReader, ReadOptimizedLock};
14
15pub struct ConcurrentVec<T> {
21    data: ReadOptimizedLock<Vec<MaybeUninit<SyncUnsafeCell<T>>>>,
22    head: AtomicUsize,
26    write_lock: Mutex<()>,
28}
29
30impl<T> Default for ConcurrentVec<T> {
31    fn default() -> Self {
32        Self::new()
33    }
34}
35
36impl<T> Drop for ConcurrentVec<T> {
37    fn drop(&mut self) {
38        let mut writer = self.data.lock();
39        let len = self.head.load(Ordering::Acquire);
40        if mem::needs_drop::<T>() {
41            for i in 0..len {
42                unsafe { writer[i].as_mut_ptr().drop_in_place() };
47            }
48        }
49    }
50}
51
52impl<T> ConcurrentVec<T> {
53    pub fn new() -> Self {
55        Self::with_capacity(128)
56    }
57
58    pub fn with_capacity(capacity: usize) -> Self {
60        let capacity = capacity.next_power_of_two();
61        Self {
62            data: ReadOptimizedLock::new(Vec::with_capacity(capacity)),
63            head: AtomicUsize::new(0),
64            write_lock: Mutex::new(()),
65        }
66    }
67    pub fn push(&self, item: T) -> usize {
70        let _guard = self.write_lock.lock().unwrap();
71        let index = self.head.load(Ordering::Acquire);
72        self.push_at(item, index);
73        self.head.store(index + 1, Ordering::Release);
74        index
75    }
76
77    fn push_at(&self, item: T, index: usize) {
78        let handle = self.data.read();
79        if let Some(slot) = handle.get(index) {
80            unsafe { ((*slot.as_ptr()).0.get()).write(item) };
82            return;
83        }
84        mem::drop(handle);
86        let mut writer = self.data.lock();
87        if index >= writer.len() {
88            writer.resize_with((index + 1).next_power_of_two(), MaybeUninit::uninit);
89        }
90        mem::drop(writer);
91        self.push_at(item, index);
92    }
93
94    pub fn read(&self) -> impl Deref<Target = [T]> + '_ {
95        let valid_prefix = self.head.load(Ordering::Acquire);
96        let reader = self.data.read();
97        ReadHandle {
98            valid_prefix,
99            reader,
100        }
101    }
102}
103
104struct ReadHandle<'a, T> {
105    valid_prefix: usize,
106    reader: MutexReader<'a, Vec<MaybeUninit<SyncUnsafeCell<T>>>>,
107}
108
109impl<T> Deref for ReadHandle<'_, T> {
110    type Target = [T];
111
112    fn deref(&self) -> &[T] {
113        unsafe {
119            mem::transmute::<&[MaybeUninit<SyncUnsafeCell<T>>], &[T]>(
120                &self.reader[0..self.valid_prefix],
121            )
122        }
123    }
124}
125
126struct SyncUnsafeCell<T>(UnsafeCell<T>);
127
128unsafe impl<T: Send> Send for SyncUnsafeCell<T> {}
129unsafe impl<T: Sync> Sync for SyncUnsafeCell<T> {}