concread/cowcell/asynch.rs
1//! Async CowCell - A concurrently readable cell with Arc
2//!
3//! See `CowCell` for more details.
4
5use std::ops::{Deref, DerefMut};
6use std::sync::Arc;
7use tokio::sync::{Mutex, MutexGuard};
8
9/// A conncurrently readable async cell.
10///
11/// This structure behaves in a similar manner to a `RwLock<T>`. However unlike
12/// a `RwLock`, writes and parallel reads can be performed at the same time. This
13/// means readers and writers do no block either other. Writers are serialised.
14///
15/// To achieve this a form of "copy-on-write" (or for Rust, clone on write) is
16/// used. As a write transaction begins, we clone the existing data to a new
17/// location that is capable of being mutated.
18///
19/// Readers are guaranteed that the content of the `CowCell` will live as long
20/// as the read transaction is open, and will be consistent for the duration
21/// of the transaction. There can be an "unlimited" number of readers in parallel
22/// accessing different generations of data of the `CowCell`.
23///
24/// Writers are serialised and are guaranteed they have exclusive write access
25/// to the structure.
26#[derive(Debug, Default)]
27pub struct CowCell<T> {
28 write: Mutex<()>,
29 active: Mutex<Arc<T>>,
30}
31
32/// A `CowCell` Write Transaction handle.
33///
34/// This allows mutation of the content of the `CowCell` without blocking or
35/// affecting current readers.
36///
37/// Changes are only stored in this structure until you call commit. To abort/
38/// rollback a change, don't call commit and allow the write transaction to
39/// be dropped. This causes the `CowCell` to unlock allowing the next writer
40/// to proceed.
41pub struct CowCellWriteTxn<'a, T> {
42 // Hold open the guard, and initiate the copy to here.
43 work: Option<T>,
44 read: Arc<T>,
45 // This way we know who to contact for updating our data ....
46 caller: &'a CowCell<T>,
47 _guard: MutexGuard<'a, ()>,
48}
49
50/// A `CowCell` Read Transaction handle.
51///
52/// This allows safe reading of the value within the `CowCell`, that allows
53/// no mutation of the value, and without blocking writers.
54#[derive(Debug)]
55pub struct CowCellReadTxn<T>(Arc<T>);
56
57impl<T> Clone for CowCellReadTxn<T> {
58 fn clone(&self) -> Self {
59 CowCellReadTxn(self.0.clone())
60 }
61}
62
63impl<T> CowCell<T>
64where
65 T: Clone,
66{
67 /// Create a new `CowCell` for storing type `T`. `T` must implement `Clone`
68 /// to enable clone-on-write.
69 pub fn new(data: T) -> Self {
70 CowCell {
71 write: Mutex::new(()),
72 active: Mutex::new(Arc::new(data)),
73 }
74 }
75
76 /// Begin a read transaction, returning a read guard. The content of
77 /// the read guard is guaranteed to be consistent for the life time of the
78 /// read - even if writers commit during.
79 pub async fn read<'x>(&'x self) -> CowCellReadTxn<T> {
80 let rwguard = self.active.lock().await;
81 CowCellReadTxn(rwguard.clone())
82 // rwguard ends here
83 }
84
85 /// Begin a write transaction, returning a write guard. The content of the
86 /// write is only visible to this thread, and is not visible to any reader
87 /// until `commit()` is called.
88 pub async fn write<'x>(&'x self) -> CowCellWriteTxn<'x, T> {
89 /* Take the exclusive write lock first */
90 let mguard = self.write.lock().await;
91 // We delay copying until the first get_mut.
92 let read = {
93 let rwguard = self.active.lock().await;
94 rwguard.clone()
95 };
96 /* Now build the write struct */
97 CowCellWriteTxn {
98 work: None,
99 read,
100 caller: self,
101 _guard: mguard,
102 }
103 }
104
105 /// Attempt to create a write transaction. If it fails, and err
106 /// is returned. On success the `Ok(guard)` is returned. See also
107 /// `write(&self)`
108 pub async fn try_write<'x>(&'x self) -> Option<CowCellWriteTxn<'x, T>> {
109 /* Take the exclusive write lock first */
110 if let Ok(mguard) = self.write.try_lock() {
111 // We delay copying until the first get_mut.
112 let read: Arc<_> = {
113 let rwguard = self.active.lock().await;
114 rwguard.clone()
115 };
116 /* Now build the write struct */
117 Some(CowCellWriteTxn {
118 work: None,
119 read,
120 caller: self,
121 _guard: mguard,
122 })
123 } else {
124 None
125 }
126 }
127
128 async fn commit(&self, newdata: Option<T>) {
129 if let Some(new_data) = newdata {
130 let mut rwguard = self.active.lock().await;
131 let new_inner = Arc::new(new_data);
132 // now over-write the last value in the mutex.
133 *rwguard = new_inner;
134 }
135 // If not some, we do nothing.
136 // Done
137 }
138}
139
140impl<T> Deref for CowCellReadTxn<T> {
141 type Target = T;
142
143 #[inline]
144 fn deref(&self) -> &T {
145 &self.0
146 }
147}
148
149impl<T> CowCellWriteTxn<'_, T>
150where
151 T: Clone,
152{
153 /// Access a mutable pointer of the data in the `CowCell`. This data is only
154 /// visible to the write transaction object in this thread, until you call
155 /// `commit()`.
156 #[inline(always)]
157 pub fn get_mut(&mut self) -> &mut T {
158 if self.work.is_none() {
159 let mut data: Option<T> = Some((*self.read).clone());
160 std::mem::swap(&mut data, &mut self.work);
161 // Should be the none we previously had.
162 debug_assert!(data.is_none())
163 }
164 self.work.as_mut().expect("can not fail")
165 }
166
167 /// Update the inner value with a new one. This function exists to prevent a clone
168 /// in the case where you take a read transaction and would otherwise use `mem::swap`
169 pub fn replace(&mut self, value: T) {
170 self.work = Some(value);
171 }
172
173 /// Commit the changes made in this write transactions to the `CowCell`.
174 /// This will consume the transaction so no further changes can be made
175 /// after this is called. Not calling this in a block, is equivalent to
176 /// an abort/rollback of the transaction.
177 pub async fn commit(self) {
178 /* Write our data back to the CowCell */
179 self.caller.commit(self.work).await;
180 }
181}
182
183impl<T> Deref for CowCellWriteTxn<'_, T>
184where
185 T: Clone,
186{
187 type Target = T;
188
189 #[inline(always)]
190 fn deref(&self) -> &T {
191 match &self.work {
192 Some(v) => v,
193 None => &self.read,
194 }
195 }
196}
197
198impl<T> DerefMut for CowCellWriteTxn<'_, T>
199where
200 T: Clone,
201{
202 #[inline(always)]
203 fn deref_mut(&mut self) -> &mut T {
204 self.get_mut()
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use super::CowCell;
211 use std::sync::atomic::{AtomicUsize, Ordering};
212 use std::sync::Arc;
213
214 #[tokio::test]
215 async fn test_deref_mut() {
216 let data: i64 = 0;
217 let cc = CowCell::new(data);
218 {
219 /* Take a write txn */
220 let mut cc_wrtxn = cc.write().await;
221 *cc_wrtxn = 1;
222 cc_wrtxn.commit().await;
223 }
224 let cc_rotxn = cc.read().await;
225 assert_eq!(*cc_rotxn, 1);
226 }
227
228 #[tokio::test]
229 async fn test_try_write() {
230 let data: i64 = 0;
231 let cc = CowCell::new(data);
232 /* Take a write txn */
233 let cc_wrtxn_a = cc.try_write().await;
234 assert!(cc_wrtxn_a.is_some());
235 /* Because we already hold the writ, the second is guaranteed to fail */
236 let cc_wrtxn_a = cc.try_write().await;
237 assert!(cc_wrtxn_a.is_none());
238 }
239
240 #[tokio::test]
241 async fn test_simple_create() {
242 let data: i64 = 0;
243 let cc = CowCell::new(data);
244
245 let cc_rotxn_a = cc.read().await;
246 assert_eq!(*cc_rotxn_a, 0);
247
248 {
249 /* Take a write txn */
250 let mut cc_wrtxn = cc.write().await;
251 /* Get the data ... */
252 {
253 let mut_ptr = cc_wrtxn.get_mut();
254 /* Assert it's 0 */
255 assert_eq!(*mut_ptr, 0);
256 *mut_ptr = 1;
257 assert_eq!(*mut_ptr, 1);
258 }
259 assert_eq!(*cc_rotxn_a, 0);
260
261 let cc_rotxn_b = cc.read().await;
262 assert_eq!(*cc_rotxn_b, 0);
263 /* The write txn and it's lock is dropped here */
264 cc_wrtxn.commit().await;
265 }
266
267 /* Start a new txn and see it's still good */
268 let cc_rotxn_c = cc.read().await;
269 assert_eq!(*cc_rotxn_c, 1);
270 assert_eq!(*cc_rotxn_a, 0);
271 }
272
273 static GC_COUNT: AtomicUsize = AtomicUsize::new(0);
274
275 #[derive(Debug, Clone)]
276 struct TestGcWrapper<T> {
277 data: T,
278 }
279
280 impl<T> Drop for TestGcWrapper<T> {
281 fn drop(&mut self) {
282 // Add to the atomic counter ...
283 GC_COUNT.fetch_add(1, Ordering::Release);
284 }
285 }
286
287 async fn test_gc_operation_thread(cc: Arc<CowCell<TestGcWrapper<i64>>>) {
288 while GC_COUNT.load(Ordering::Acquire) < 50 {
289 // thread::sleep(std::time::Duration::from_millis(200));
290 {
291 let mut cc_wrtxn = cc.write().await;
292 {
293 let mut_ptr = cc_wrtxn.get_mut();
294 mut_ptr.data += 1;
295 }
296 cc_wrtxn.commit().await;
297 }
298 }
299 }
300
301 #[tokio::test]
302 #[cfg_attr(miri, ignore)]
303 async fn test_gc_operation() {
304 GC_COUNT.store(0, Ordering::Release);
305 let data = TestGcWrapper { data: 0 };
306 let cc = Arc::new(CowCell::new(data));
307
308 let _ = tokio::join!(
309 tokio::task::spawn(test_gc_operation_thread(cc.clone())),
310 tokio::task::spawn(test_gc_operation_thread(cc.clone())),
311 tokio::task::spawn(test_gc_operation_thread(cc.clone())),
312 tokio::task::spawn(test_gc_operation_thread(cc.clone())),
313 );
314
315 assert!(GC_COUNT.load(Ordering::Acquire) >= 50);
316 }
317
318 #[test]
319 fn test_default() {
320 CowCell::<()>::default();
321 }
322}