egglog_concurrency/
concurrent_vec.rs1use std::{
4 cell::UnsafeCell,
5 mem::{self, MaybeUninit},
6 ops::Deref,
7 sync::{
8 Mutex,
9 atomic::{AtomicUsize, Ordering},
10 },
11};
12
13use crate::{MutexReader, ReadOptimizedLock};
14
15pub struct ConcurrentVec<T> {
21 data: ReadOptimizedLock<Vec<MaybeUninit<SyncUnsafeCell<T>>>>,
22 head: AtomicUsize,
26 write_lock: Mutex<()>,
28}
29
30impl<T> Default for ConcurrentVec<T> {
31 fn default() -> Self {
32 Self::new()
33 }
34}
35
36impl<T> Drop for ConcurrentVec<T> {
37 fn drop(&mut self) {
38 let mut writer = self.data.lock();
39 let len = self.head.load(Ordering::Acquire);
40 if mem::needs_drop::<T>() {
41 for i in 0..len {
42 unsafe { writer[i].as_mut_ptr().drop_in_place() };
47 }
48 }
49 }
50}
51
52impl<T> ConcurrentVec<T> {
53 pub fn new() -> Self {
55 Self::with_capacity(128)
56 }
57
58 pub fn with_capacity(capacity: usize) -> Self {
60 let capacity = capacity.next_power_of_two();
61 Self {
62 data: ReadOptimizedLock::new(Vec::with_capacity(capacity)),
63 head: AtomicUsize::new(0),
64 write_lock: Mutex::new(()),
65 }
66 }
67 pub fn push(&self, item: T) -> usize {
70 let _guard = self.write_lock.lock().unwrap();
71 let index = self.head.load(Ordering::Acquire);
72 self.push_at(item, index);
73 self.head.store(index + 1, Ordering::Release);
74 index
75 }
76
77 fn push_at(&self, item: T, index: usize) {
78 let handle = self.data.read();
79 if let Some(slot) = handle.get(index) {
80 unsafe { ((*slot.as_ptr()).0.get()).write(item) };
82 return;
83 }
84 mem::drop(handle);
86 let mut writer = self.data.lock();
87 if index >= writer.len() {
88 writer.resize_with((index + 1).next_power_of_two(), MaybeUninit::uninit);
89 }
90 mem::drop(writer);
91 self.push_at(item, index);
92 }
93
94 pub fn read(&self) -> impl Deref<Target = [T]> + '_ {
95 let valid_prefix = self.head.load(Ordering::Acquire);
96 let reader = self.data.read();
97 ReadHandle {
98 valid_prefix,
99 reader,
100 }
101 }
102}
103
104struct ReadHandle<'a, T> {
105 valid_prefix: usize,
106 reader: MutexReader<'a, Vec<MaybeUninit<SyncUnsafeCell<T>>>>,
107}
108
109impl<T> Deref for ReadHandle<'_, T> {
110 type Target = [T];
111
112 fn deref(&self) -> &[T] {
113 unsafe {
119 mem::transmute::<&[MaybeUninit<SyncUnsafeCell<T>>], &[T]>(
120 &self.reader[0..self.valid_prefix],
121 )
122 }
123 }
124}
125
126struct SyncUnsafeCell<T>(UnsafeCell<T>);
127
128unsafe impl<T: Send> Send for SyncUnsafeCell<T> {}
129unsafe impl<T: Sync> Sync for SyncUnsafeCell<T> {}