concread/cowcell/
asynch.rs

1//! Async CowCell - A concurrently readable cell with Arc
2//!
3//! See `CowCell` for more details.
4
5use std::ops::{Deref, DerefMut};
6use std::sync::Arc;
7use tokio::sync::{Mutex, MutexGuard};
8
9/// A conncurrently readable async cell.
10///
11/// This structure behaves in a similar manner to a `RwLock<T>`. However unlike
12/// a `RwLock`, writes and parallel reads can be performed at the same time. This
13/// means readers and writers do no block either other. Writers are serialised.
14///
15/// To achieve this a form of "copy-on-write" (or for Rust, clone on write) is
16/// used. As a write transaction begins, we clone the existing data to a new
17/// location that is capable of being mutated.
18///
19/// Readers are guaranteed that the content of the `CowCell` will live as long
20/// as the read transaction is open, and will be consistent for the duration
21/// of the transaction. There can be an "unlimited" number of readers in parallel
22/// accessing different generations of data of the `CowCell`.
23///
24/// Writers are serialised and are guaranteed they have exclusive write access
25/// to the structure.
26#[derive(Debug, Default)]
27pub struct CowCell<T> {
28    write: Mutex<()>,
29    active: Mutex<Arc<T>>,
30}
31
32/// A `CowCell` Write Transaction handle.
33///
34/// This allows mutation of the content of the `CowCell` without blocking or
35/// affecting current readers.
36///
37/// Changes are only stored in this structure until you call commit. To abort/
38/// rollback a change, don't call commit and allow the write transaction to
39/// be dropped. This causes the `CowCell` to unlock allowing the next writer
40/// to proceed.
41pub struct CowCellWriteTxn<'a, T> {
42    // Hold open the guard, and initiate the copy to here.
43    work: Option<T>,
44    read: Arc<T>,
45    // This way we know who to contact for updating our data ....
46    caller: &'a CowCell<T>,
47    _guard: MutexGuard<'a, ()>,
48}
49
50/// A `CowCell` Read Transaction handle.
51///
52/// This allows safe reading of the value within the `CowCell`, that allows
53/// no mutation of the value, and without blocking writers.
54#[derive(Debug)]
55pub struct CowCellReadTxn<T>(Arc<T>);
56
57impl<T> Clone for CowCellReadTxn<T> {
58    fn clone(&self) -> Self {
59        CowCellReadTxn(self.0.clone())
60    }
61}
62
63impl<T> CowCell<T>
64where
65    T: Clone,
66{
67    /// Create a new `CowCell` for storing type `T`. `T` must implement `Clone`
68    /// to enable clone-on-write.
69    pub fn new(data: T) -> Self {
70        CowCell {
71            write: Mutex::new(()),
72            active: Mutex::new(Arc::new(data)),
73        }
74    }
75
76    /// Begin a read transaction, returning a read guard. The content of
77    /// the read guard is guaranteed to be consistent for the life time of the
78    /// read - even if writers commit during.
79    pub async fn read<'x>(&'x self) -> CowCellReadTxn<T> {
80        let rwguard = self.active.lock().await;
81        CowCellReadTxn(rwguard.clone())
82        // rwguard ends here
83    }
84
85    /// Begin a write transaction, returning a write guard. The content of the
86    /// write is only visible to this thread, and is not visible to any reader
87    /// until `commit()` is called.
88    pub async fn write<'x>(&'x self) -> CowCellWriteTxn<'x, T> {
89        /* Take the exclusive write lock first */
90        let mguard = self.write.lock().await;
91        // We delay copying until the first get_mut.
92        let read = {
93            let rwguard = self.active.lock().await;
94            rwguard.clone()
95        };
96        /* Now build the write struct */
97        CowCellWriteTxn {
98            work: None,
99            read,
100            caller: self,
101            _guard: mguard,
102        }
103    }
104
105    /// Attempt to create a write transaction. If it fails, and err
106    /// is returned. On success the `Ok(guard)` is returned. See also
107    /// `write(&self)`
108    pub async fn try_write<'x>(&'x self) -> Option<CowCellWriteTxn<'x, T>> {
109        /* Take the exclusive write lock first */
110        if let Ok(mguard) = self.write.try_lock() {
111            // We delay copying until the first get_mut.
112            let read: Arc<_> = {
113                let rwguard = self.active.lock().await;
114                rwguard.clone()
115            };
116            /* Now build the write struct */
117            Some(CowCellWriteTxn {
118                work: None,
119                read,
120                caller: self,
121                _guard: mguard,
122            })
123        } else {
124            None
125        }
126    }
127
128    async fn commit(&self, newdata: Option<T>) {
129        if let Some(new_data) = newdata {
130            let mut rwguard = self.active.lock().await;
131            let new_inner = Arc::new(new_data);
132            // now over-write the last value in the mutex.
133            *rwguard = new_inner;
134        }
135        // If not some, we do nothing.
136        // Done
137    }
138}
139
140impl<T> Deref for CowCellReadTxn<T> {
141    type Target = T;
142
143    #[inline]
144    fn deref(&self) -> &T {
145        &self.0
146    }
147}
148
149impl<T> CowCellWriteTxn<'_, T>
150where
151    T: Clone,
152{
153    /// Access a mutable pointer of the data in the `CowCell`. This data is only
154    /// visible to the write transaction object in this thread, until you call
155    /// `commit()`.
156    #[inline(always)]
157    pub fn get_mut(&mut self) -> &mut T {
158        if self.work.is_none() {
159            let mut data: Option<T> = Some((*self.read).clone());
160            std::mem::swap(&mut data, &mut self.work);
161            // Should be the none we previously had.
162            debug_assert!(data.is_none())
163        }
164        self.work.as_mut().expect("can not fail")
165    }
166
167    /// Update the inner value with a new one. This function exists to prevent a clone
168    /// in the case where you take a read transaction and would otherwise use `mem::swap`
169    pub fn replace(&mut self, value: T) {
170        self.work = Some(value);
171    }
172
173    /// Commit the changes made in this write transactions to the `CowCell`.
174    /// This will consume the transaction so no further changes can be made
175    /// after this is called. Not calling this in a block, is equivalent to
176    /// an abort/rollback of the transaction.
177    pub async fn commit(self) {
178        /* Write our data back to the CowCell */
179        self.caller.commit(self.work).await;
180    }
181}
182
183impl<T> Deref for CowCellWriteTxn<'_, T>
184where
185    T: Clone,
186{
187    type Target = T;
188
189    #[inline(always)]
190    fn deref(&self) -> &T {
191        match &self.work {
192            Some(v) => v,
193            None => &self.read,
194        }
195    }
196}
197
198impl<T> DerefMut for CowCellWriteTxn<'_, T>
199where
200    T: Clone,
201{
202    #[inline(always)]
203    fn deref_mut(&mut self) -> &mut T {
204        self.get_mut()
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use super::CowCell;
211    use std::sync::atomic::{AtomicUsize, Ordering};
212    use std::sync::Arc;
213
214    #[tokio::test]
215    async fn test_deref_mut() {
216        let data: i64 = 0;
217        let cc = CowCell::new(data);
218        {
219            /* Take a write txn */
220            let mut cc_wrtxn = cc.write().await;
221            *cc_wrtxn = 1;
222            cc_wrtxn.commit().await;
223        }
224        let cc_rotxn = cc.read().await;
225        assert_eq!(*cc_rotxn, 1);
226    }
227
228    #[tokio::test]
229    async fn test_try_write() {
230        let data: i64 = 0;
231        let cc = CowCell::new(data);
232        /* Take a write txn */
233        let cc_wrtxn_a = cc.try_write().await;
234        assert!(cc_wrtxn_a.is_some());
235        /* Because we already hold the writ, the second is guaranteed to fail */
236        let cc_wrtxn_a = cc.try_write().await;
237        assert!(cc_wrtxn_a.is_none());
238    }
239
240    #[tokio::test]
241    async fn test_simple_create() {
242        let data: i64 = 0;
243        let cc = CowCell::new(data);
244
245        let cc_rotxn_a = cc.read().await;
246        assert_eq!(*cc_rotxn_a, 0);
247
248        {
249            /* Take a write txn */
250            let mut cc_wrtxn = cc.write().await;
251            /* Get the data ... */
252            {
253                let mut_ptr = cc_wrtxn.get_mut();
254                /* Assert it's 0 */
255                assert_eq!(*mut_ptr, 0);
256                *mut_ptr = 1;
257                assert_eq!(*mut_ptr, 1);
258            }
259            assert_eq!(*cc_rotxn_a, 0);
260
261            let cc_rotxn_b = cc.read().await;
262            assert_eq!(*cc_rotxn_b, 0);
263            /* The write txn and it's lock is dropped here */
264            cc_wrtxn.commit().await;
265        }
266
267        /* Start a new txn and see it's still good */
268        let cc_rotxn_c = cc.read().await;
269        assert_eq!(*cc_rotxn_c, 1);
270        assert_eq!(*cc_rotxn_a, 0);
271    }
272
273    static GC_COUNT: AtomicUsize = AtomicUsize::new(0);
274
275    #[derive(Debug, Clone)]
276    struct TestGcWrapper<T> {
277        data: T,
278    }
279
280    impl<T> Drop for TestGcWrapper<T> {
281        fn drop(&mut self) {
282            // Add to the atomic counter ...
283            GC_COUNT.fetch_add(1, Ordering::Release);
284        }
285    }
286
287    async fn test_gc_operation_thread(cc: Arc<CowCell<TestGcWrapper<i64>>>) {
288        while GC_COUNT.load(Ordering::Acquire) < 50 {
289            // thread::sleep(std::time::Duration::from_millis(200));
290            {
291                let mut cc_wrtxn = cc.write().await;
292                {
293                    let mut_ptr = cc_wrtxn.get_mut();
294                    mut_ptr.data += 1;
295                }
296                cc_wrtxn.commit().await;
297            }
298        }
299    }
300
301    #[tokio::test]
302    #[cfg_attr(miri, ignore)]
303    async fn test_gc_operation() {
304        GC_COUNT.store(0, Ordering::Release);
305        let data = TestGcWrapper { data: 0 };
306        let cc = Arc::new(CowCell::new(data));
307
308        let _ = tokio::join!(
309            tokio::task::spawn(test_gc_operation_thread(cc.clone())),
310            tokio::task::spawn(test_gc_operation_thread(cc.clone())),
311            tokio::task::spawn(test_gc_operation_thread(cc.clone())),
312            tokio::task::spawn(test_gc_operation_thread(cc.clone())),
313        );
314
315        assert!(GC_COUNT.load(Ordering::Acquire) >= 50);
316    }
317
318    #[test]
319    fn test_default() {
320        CowCell::<()>::default();
321    }
322}