1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
//! CowCell - A concurrently readable cell with Arc
//!
//! A CowCell can be used in place of a `RwLock`. Readers are guaranteed that
//! the data will not change during the lifetime of the read. Readers do
//! not block writers, and writers do not block readers. Writers are serialised
//! same as the write in a RwLock.
//!
//! This is the `Arc` collected implementation. `Arc` is slightly slower than `EbrCell`
//! but has better behaviour with very long running read operations, and more
//! accurate memory reclaim behaviour.

#[cfg(feature = "asynch")]
pub mod asynch;

use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::sync::{Mutex, MutexGuard};

/// A conncurrently readable cell.
///
/// This structure behaves in a similar manner to a `RwLock<T>`. However unlike
/// a `RwLock`, writes and parallel reads can be performed at the same time. This
/// means readers and writers do no block either other. Writers are serialised.
///
/// To achieve this a form of "copy-on-write" (or for Rust, clone on write) is
/// used. As a write transaction begins, we clone the existing data to a new
/// location that is capable of being mutated.
///
/// Readers are guaranteed that the content of the `CowCell` will live as long
/// as the read transaction is open, and will be consistent for the duration
/// of the transaction. There can be an "unlimited" number of readers in parallel
/// accessing different generations of data of the `CowCell`.
///
/// Writers are serialised and are guaranteed they have exclusive write access
/// to the structure.
///
/// # Examples
/// ```
/// use concread::cowcell::CowCell;
///
/// let data: i64 = 0;
/// let cowcell = CowCell::new(data);
///
/// // Begin a read transaction
/// let read_txn = cowcell.read();
/// assert_eq!(*read_txn, 0);
/// {
///     // Now create a write, and commit it.
///     let mut write_txn = cowcell.write();
///     *write_txn = 1;
///     // Commit the change
///     write_txn.commit();
/// }
/// // Show the previous generation still reads '0'
/// assert_eq!(*read_txn, 0);
/// let new_read_txn = cowcell.read();
/// // And a new read transaction has '1'
/// assert_eq!(*new_read_txn, 1);
/// ```
#[derive(Debug)]
pub struct CowCell<T> {
    write: Mutex<()>,
    active: Mutex<Arc<T>>,
}

/// A `CowCell` Write Transaction handle.
///
/// This allows mutation of the content of the `CowCell` without blocking or
/// affecting current readers.
///
/// Changes are only stored in this structure until you call commit. To abort/
/// rollback a change, don't call commit and allow the write transaction to
/// be dropped. This causes the `CowCell` to unlock allowing the next writer
/// to proceed.
pub struct CowCellWriteTxn<'a, T: 'a> {
    // Hold open the guard, and initiate the copy to here.
    work: Option<T>,
    read: Arc<T>,
    // This way we know who to contact for updating our data ....
    caller: &'a CowCell<T>,
    _guard: MutexGuard<'a, ()>,
}

/// A `CowCell` Read Transaction handle.
///
/// This allows safe reading of the value within the `CowCell`, that allows
/// no mutation of the value, and without blocking writers.
#[derive(Debug)]
pub struct CowCellReadTxn<T>(Arc<T>);

impl<T> Clone for CowCellReadTxn<T> {
    fn clone(&self) -> Self {
        CowCellReadTxn(self.0.clone())
    }
}

impl<T> CowCell<T>
where
    T: Clone,
{
    /// Create a new `CowCell` for storing type `T`. `T` must implement `Clone`
    /// to enable clone-on-write.
    pub fn new(data: T) -> Self {
        CowCell {
            write: Mutex::new(()),
            active: Mutex::new(Arc::new(data)),
        }
    }

    /// Begin a read transaction, returning a read guard. The content of
    /// the read guard is guaranteed to be consistent for the life time of the
    /// read - even if writers commit during.
    pub fn read(&self) -> CowCellReadTxn<T> {
        let rwguard = self.active.lock().unwrap();
        CowCellReadTxn(rwguard.clone())
        // rwguard ends here
    }

    /// Begin a write transaction, returning a write guard. The content of the
    /// write is only visible to this thread, and is not visible to any reader
    /// until `commit()` is called.
    pub fn write(&self) -> CowCellWriteTxn<T> {
        /* Take the exclusive write lock first */
        let mguard = self.write.lock().unwrap();
        // We delay copying until the first get_mut.
        let read = {
            let rwguard = self.active.lock().unwrap();
            rwguard.clone()
        };
        /* Now build the write struct */
        CowCellWriteTxn {
            work: None,
            read,
            caller: self,
            _guard: mguard,
        }
    }

    /// Attempt to create a write transaction. If it fails, and err
    /// is returned. On success the `Ok(guard)` is returned. See also
    /// `write(&self)`
    pub fn try_write(&self) -> Option<CowCellWriteTxn<T>> {
        /* Take the exclusive write lock first */
        self.write.try_lock().ok().map(|mguard| {
            // We delay copying until the first get_mut.
            let read = {
                let rwguard = self.active.lock().unwrap();
                rwguard.clone()
            };
            /* Now build the write struct */
            CowCellWriteTxn {
                work: None,
                read,
                caller: self,
                _guard: mguard,
            }
        })
    }

    fn commit(&self, newdata: Option<T>) {
        if let Some(nd) = newdata {
            let mut rwguard = self.active.lock().unwrap();
            let new_inner = Arc::new(nd);
            // now over-write the last value in the mutex.
            *rwguard = new_inner;
        }
        // If not some, we do nothing.
        // Done
    }
}

impl<T> Deref for CowCellReadTxn<T> {
    type Target = T;

    #[inline]
    fn deref(&self) -> &T {
        &self.0
    }
}

impl<'a, T> CowCellWriteTxn<'a, T>
where
    T: Clone,
{
    /// Access a mutable pointer of the data in the `CowCell`. This data is only
    /// visible to the write transaction object in this thread, until you call
    /// `commit()`.
    #[inline(always)]
    pub fn get_mut(&mut self) -> &mut T {
        if self.work.is_none() {
            let mut data: Option<T> = Some((*self.read).clone());
            std::mem::swap(&mut data, &mut self.work);
            // Should be the none we previously had.
            debug_assert!(data.is_none())
        }
        self.work.as_mut().expect("can not fail")
    }

    /// Commit the changes made in this write transactions to the `CowCell`.
    /// This will consume the transaction so no further changes can be made
    /// after this is called. Not calling this in a block, is equivalent to
    /// an abort/rollback of the transaction.
    pub fn commit(self) {
        /* Write our data back to the CowCell */
        self.caller.commit(self.work);
    }
}

impl<'a, T> Deref for CowCellWriteTxn<'a, T>
where
    T: Clone,
{
    type Target = T;

    #[inline(always)]
    fn deref(&self) -> &T {
        match &self.work {
            Some(v) => v,
            None => &(*self.read),
        }
    }
}

impl<'a, T> DerefMut for CowCellWriteTxn<'a, T>
where
    T: Clone,
{
    #[inline(always)]
    fn deref_mut(&mut self) -> &mut T {
        self.get_mut()
    }
}

#[cfg(test)]
mod tests {
    use super::CowCell;
    use std::sync::atomic::{AtomicUsize, Ordering};

    use crossbeam_utils::thread::scope;

    #[test]
    fn test_deref_mut() {
        let data: i64 = 0;
        let cc = CowCell::new(data);
        {
            /* Take a write txn */
            let mut cc_wrtxn = cc.write();
            *cc_wrtxn = 1;
            cc_wrtxn.commit();
        }
        let cc_rotxn = cc.read();
        assert_eq!(*cc_rotxn, 1);
    }

    #[test]
    fn test_try_write() {
        let data: i64 = 0;
        let cc = CowCell::new(data);
        /* Take a write txn */
        let cc_wrtxn_a = cc.try_write();
        assert!(cc_wrtxn_a.is_some());
        /* Because we already hold the writ, the second is guaranteed to fail */
        let cc_wrtxn_a = cc.try_write();
        assert!(cc_wrtxn_a.is_none());
    }

    #[test]
    fn test_simple_create() {
        let data: i64 = 0;
        let cc = CowCell::new(data);

        let cc_rotxn_a = cc.read();
        assert_eq!(*cc_rotxn_a, 0);

        {
            /* Take a write txn */
            let mut cc_wrtxn = cc.write();
            /* Get the data ... */
            {
                let mut_ptr = cc_wrtxn.get_mut();
                /* Assert it's 0 */
                assert_eq!(*mut_ptr, 0);
                *mut_ptr = 1;
                assert_eq!(*mut_ptr, 1);
            }
            assert_eq!(*cc_rotxn_a, 0);

            let cc_rotxn_b = cc.read();
            assert_eq!(*cc_rotxn_b, 0);
            /* The write txn and it's lock is dropped here */
            cc_wrtxn.commit();
        }

        /* Start a new txn and see it's still good */
        let cc_rotxn_c = cc.read();
        assert_eq!(*cc_rotxn_c, 1);
        assert_eq!(*cc_rotxn_a, 0);
    }

    const MAX_TARGET: i64 = 2000;

    #[test]
    #[cfg_attr(miri, ignore)]
    fn test_multithread_create() {
        let start = time::Instant::now();
        // Create the new cowcell.
        let data: i64 = 0;
        let cc = CowCell::new(data);

        assert!(scope(|scope| {
            let cc_ref = &cc;

            let _readers: Vec<_> = (0..7)
                .map(|_| {
                    scope.spawn(move |_| {
                        let mut last_value: i64 = 0;
                        while last_value < MAX_TARGET {
                            let cc_rotxn = cc_ref.read();
                            {
                                assert!(*cc_rotxn >= last_value);
                                last_value = *cc_rotxn;
                            }
                        }
                    })
                })
                .collect();

            let _writers: Vec<_> = (0..3)
                .map(|_| {
                    scope.spawn(move |_| {
                        let mut last_value: i64 = 0;
                        while last_value < MAX_TARGET {
                            let mut cc_wrtxn = cc_ref.write();
                            {
                                let mut_ptr = cc_wrtxn.get_mut();
                                assert!(*mut_ptr >= last_value);
                                last_value = *mut_ptr;
                                *mut_ptr = *mut_ptr + 1;
                            }
                            cc_wrtxn.commit();
                        }
                    })
                })
                .collect();
        })
        .is_ok());

        let end = time::Instant::now();
        print!("Arc MT create :{:?} ", end - start);
    }

    static GC_COUNT: AtomicUsize = AtomicUsize::new(0);

    #[derive(Debug, Clone)]
    struct TestGcWrapper<T> {
        data: T,
    }

    impl<T> Drop for TestGcWrapper<T> {
        fn drop(&mut self) {
            // Add to the atomic counter ...
            GC_COUNT.fetch_add(1, Ordering::Release);
        }
    }

    fn test_gc_operation_thread(cc: &CowCell<TestGcWrapper<i64>>) {
        while GC_COUNT.load(Ordering::Acquire) < 50 {
            // thread::sleep(std::time::Duration::from_millis(200));
            {
                let mut cc_wrtxn = cc.write();
                {
                    let mut_ptr = cc_wrtxn.get_mut();
                    mut_ptr.data = mut_ptr.data + 1;
                }
                cc_wrtxn.commit();
            }
        }
    }

    #[test]
    #[cfg_attr(miri, ignore)]
    fn test_gc_operation() {
        GC_COUNT.store(0, Ordering::Release);
        let data = TestGcWrapper { data: 0 };
        let cc = CowCell::new(data);

        assert!(scope(|scope| {
            let cc_ref = &cc;
            let _writers: Vec<_> = (0..3)
                .map(|_| {
                    scope.spawn(move |_| {
                        test_gc_operation_thread(cc_ref);
                    })
                })
                .collect();
        })
        .is_ok());

        assert!(GC_COUNT.load(Ordering::Acquire) >= 50);
    }
}