egglog_concurrency/
lib.rs1pub(crate) mod bitset;
4pub(crate) mod concurrent_vec;
5pub(crate) mod notification;
6pub mod parallel_writer;
7pub(crate) mod resettable_oncelock;
8use arc_swap::{ArcSwap, Guard};
9
10pub use bitset::BitSet;
11pub use concurrent_vec::ConcurrentVec;
12pub use notification::Notification;
13pub use parallel_writer::ParallelVecWriter;
14pub use resettable_oncelock::ResettableOnceLock;
15
16#[cfg(test)]
17mod tests;
18
19use std::{
20 cell::UnsafeCell,
21 mem,
22 ops::{Deref, DerefMut},
23 sync::{
24 Arc,
25 atomic::{Ordering, fence},
26 },
27};
28
29pub struct ReadOptimizedLock<T> {
42 token: ArcSwap<ReadToken>,
43 data: UnsafeCell<T>,
44}
45
46pub struct MutexReader<'lock, T> {
48 data: &'lock T,
49 _guard: Guard<Arc<ReadToken>>,
50}
51
52impl<T> Deref for MutexReader<'_, T> {
53 type Target = T;
54
55 fn deref(&self) -> &T {
56 self.data
57 }
58}
59
60pub struct MutexWriter<'lock, T> {
62 lock: &'lock ReadOptimizedLock<T>,
63 unblock: Arc<Notification>,
64}
65
66impl<T> Deref for MutexWriter<'_, T> {
67 type Target = T;
68
69 fn deref(&self) -> &T {
70 unsafe { &*self.lock.data.get() }
74 }
75}
76
77impl<T> DerefMut for MutexWriter<'_, T> {
78 fn deref_mut(&mut self) -> &mut T {
79 unsafe { &mut *self.lock.data.get() }
83 }
84}
85
86impl<T> Drop for MutexWriter<'_, T> {
87 fn drop(&mut self) {
88 self.lock
89 .token
90 .store(Arc::new(ReadToken::ReadOk(TriggerWhenDone::default())));
91 self.unblock.notify();
92 }
93}
94
95impl<T> ReadOptimizedLock<T> {
96 pub fn new(data: T) -> Self {
97 Self {
98 token: ArcSwap::from_pointee(ReadToken::ReadOk(TriggerWhenDone::default())),
99 data: UnsafeCell::new(data),
100 }
101 }
102
103 pub fn into_inner(self) -> T {
105 self.data.into_inner()
106 }
107
108 pub fn as_mut_ref(&mut self) -> &mut T {
111 self.data.get_mut()
112 }
113
114 pub fn read(&self) -> MutexReader<'_, T> {
118 loop {
119 let guard = self.token.load();
120 match guard.as_ref() {
121 ReadToken::ReadOk(..) => {
122 fence(Ordering::Acquire);
125 return MutexReader {
126 data: unsafe { &*self.data.get() },
129 _guard: guard,
130 };
131 }
132 ReadToken::WriteOngoing(n) => {
133 let n = n.clone();
134 mem::drop(guard);
135 n.wait();
136 continue;
137 }
138 }
139 }
140 }
141
142 pub fn lock(&self) -> MutexWriter<'_, T> {
143 loop {
144 let guard = self.token.load();
145 match guard.as_ref() {
146 ReadToken::ReadOk(n) => {
147 let unblock_waiters = Arc::new(Notification::default());
148 let write_token = ReadToken::WriteOngoing(unblock_waiters.clone());
149 let readers_done = n.0.clone();
150 let prev = self.token.compare_and_swap(&guard, Arc::new(write_token));
151 if !std::ptr::eq(prev.as_ref(), guard.as_ref()) {
152 continue;
154 }
155 mem::drop((guard, prev));
156 self.token.rcu(|x| x.clone());
158 readers_done.wait();
161 return MutexWriter {
162 lock: self,
163 unblock: unblock_waiters,
164 };
165 }
166 ReadToken::WriteOngoing(n) => {
167 let n = n.clone();
168 mem::drop(guard);
169 n.wait();
170 }
171 }
172 }
173 }
174}
175
176unsafe impl<T: Send> Send for ReadOptimizedLock<T> {}
177unsafe impl<T: Send> Sync for ReadOptimizedLock<T> {}
178
179enum ReadToken {
180 ReadOk(TriggerWhenDone),
181 WriteOngoing(Arc<Notification>),
182}
183
184#[derive(Default)]
185struct TriggerWhenDone(Arc<Notification>);
186
187impl Drop for TriggerWhenDone {
188 fn drop(&mut self) {
189 self.0.notify();
190 }
191}