egglog_concurrency/
lib.rs1pub(crate) mod bitset;
4pub(crate) mod concurrent_vec;
5pub(crate) mod notification;
6pub mod parallel_writer;
7pub(crate) mod resettable_oncelock;
8use arc_swap::{ArcSwap, Guard};
9
10pub use bitset::BitSet;
11pub use concurrent_vec::ConcurrentVec;
12pub use notification::Notification;
13pub use parallel_writer::ParallelVecWriter;
14pub use resettable_oncelock::ResettableOnceLock;
15
16#[cfg(test)]
17mod tests;
18
19use std::{
20    cell::UnsafeCell,
21    mem,
22    ops::{Deref, DerefMut},
23    sync::{
24        Arc,
25        atomic::{Ordering, fence},
26    },
27};
28
29pub struct ReadOptimizedLock<T> {
42    token: ArcSwap<ReadToken>,
43    data: UnsafeCell<T>,
44}
45
46pub struct MutexReader<'lock, T> {
48    data: &'lock T,
49    _guard: Guard<Arc<ReadToken>>,
50}
51
52impl<T> Deref for MutexReader<'_, T> {
53    type Target = T;
54
55    fn deref(&self) -> &T {
56        self.data
57    }
58}
59
60pub struct MutexWriter<'lock, T> {
62    lock: &'lock ReadOptimizedLock<T>,
63    unblock: Arc<Notification>,
64}
65
66impl<T> Deref for MutexWriter<'_, T> {
67    type Target = T;
68
69    fn deref(&self) -> &T {
70        unsafe { &*self.lock.data.get() }
74    }
75}
76
77impl<T> DerefMut for MutexWriter<'_, T> {
78    fn deref_mut(&mut self) -> &mut T {
79        unsafe { &mut *self.lock.data.get() }
83    }
84}
85
86impl<T> Drop for MutexWriter<'_, T> {
87    fn drop(&mut self) {
88        self.lock
89            .token
90            .store(Arc::new(ReadToken::ReadOk(TriggerWhenDone::default())));
91        self.unblock.notify();
92    }
93}
94
95impl<T> ReadOptimizedLock<T> {
96    pub fn new(data: T) -> Self {
97        Self {
98            token: ArcSwap::from_pointee(ReadToken::ReadOk(TriggerWhenDone::default())),
99            data: UnsafeCell::new(data),
100        }
101    }
102
103    pub fn into_inner(self) -> T {
105        self.data.into_inner()
106    }
107
108    pub fn as_mut_ref(&mut self) -> &mut T {
111        self.data.get_mut()
112    }
113
114    pub fn read(&self) -> MutexReader<'_, T> {
118        loop {
119            let guard = self.token.load();
120            match guard.as_ref() {
121                ReadToken::ReadOk(..) => {
122                    fence(Ordering::Acquire);
125                    return MutexReader {
126                        data: unsafe { &*self.data.get() },
129                        _guard: guard,
130                    };
131                }
132                ReadToken::WriteOngoing(n) => {
133                    let n = n.clone();
134                    mem::drop(guard);
135                    n.wait();
136                    continue;
137                }
138            }
139        }
140    }
141
142    pub fn lock(&self) -> MutexWriter<'_, T> {
143        loop {
144            let guard = self.token.load();
145            match guard.as_ref() {
146                ReadToken::ReadOk(n) => {
147                    let unblock_waiters = Arc::new(Notification::default());
148                    let write_token = ReadToken::WriteOngoing(unblock_waiters.clone());
149                    let readers_done = n.0.clone();
150                    let prev = self.token.compare_and_swap(&guard, Arc::new(write_token));
151                    if !std::ptr::eq(prev.as_ref(), guard.as_ref()) {
152                        continue;
154                    }
155                    mem::drop((guard, prev));
156                    self.token.rcu(|x| x.clone());
158                    readers_done.wait();
161                    return MutexWriter {
162                        lock: self,
163                        unblock: unblock_waiters,
164                    };
165                }
166                ReadToken::WriteOngoing(n) => {
167                    let n = n.clone();
168                    mem::drop(guard);
169                    n.wait();
170                }
171            }
172        }
173    }
174}
175
176unsafe impl<T: Send> Send for ReadOptimizedLock<T> {}
177unsafe impl<T: Send> Sync for ReadOptimizedLock<T> {}
178
179enum ReadToken {
180    ReadOk(TriggerWhenDone),
181    WriteOngoing(Arc<Notification>),
182}
183
184#[derive(Default)]
185struct TriggerWhenDone(Arc<Notification>);
186
187impl Drop for TriggerWhenDone {
188    fn drop(&mut self) {
189        self.0.notify();
190    }
191}