1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
//! EbrCell - A concurrently readable cell with Ebr
//!
//! An `EbrCell` can be used in place of a `RwLock`. Readers are guaranteed that
//! the data will not change during the lifetime of the read. Readers do
//! not block writers, and writers do not block readers. Writers are serialised
//! same as the write in a `RwLock`.
//!
//! This is the Ebr collected implementation.
//! Ebr is the crossbeam-epoch based reclaim system for async memory
//! garbage collection. Ebr is faster than `Arc`,
//! but long transactions can cause the memory usage to grow very quickly
//! before a garbage reclaim. This is a space time trade, where you gain
//! performance at the expense of delaying garbage collection. Holding Ebr
//! reads for too long may impact garbage collection of other epoch structures
//! or crossbeam library components.
//! If you need accurate memory reclaim, use the Arc (`CowCell`) implementation.

use crossbeam_epoch as epoch;
use crossbeam_epoch::{Atomic, Guard, Owned};
use std::sync::atomic::Ordering::{Acquire, Release};

use parking_lot::{Mutex, MutexGuard};
use std::mem;
use std::ops::{Deref, DerefMut};

/// An `EbrCell` Write Transaction handle.
///
/// This allows mutation of the content of the `EbrCell` without blocking or
/// affecting current readers.
///
/// Changes are only stored in the structure until you call commit: to
/// abort a change, don't call commit and allow the write transaction to
/// go out of scope. This causes the `EbrCell` to unlock allowing other
/// writes to proceed.
pub struct EbrCellWriteTxn<'a, T: 'a> {
    data: Option<T>,
    // This way we know who to contact for updating our data ....
    caller: &'a EbrCell<T>,
    _guard: MutexGuard<'a, ()>,
}

impl<'a, T> EbrCellWriteTxn<'a, T>
where
    T: Clone + Send + 'static,
{
    /// Access a mutable pointer of the data in the `EbrCell`. This data is only
    /// visible to this write transaction object in this thread until you call
    /// 'commit'.
    pub fn get_mut(&mut self) -> &mut T {
        self.data.as_mut().unwrap()
    }

    /// Commit the changes in this write transaction to the `EbrCell`. This will
    /// consume the transaction so that further changes can not be made to it
    /// after this function is called.
    pub fn commit(mut self) {
        /* Write our data back to the EbrCell */
        // Now make a new dummy element, and swap it into the mutex
        // This fixes up ownership of some values for lifetimes.
        let mut element: Option<T> = None;
        mem::swap(&mut element, &mut self.data);
        self.caller.commit(element);
    }
}

impl<'a, T> Deref for EbrCellWriteTxn<'a, T> {
    type Target = T;

    #[inline]
    fn deref(&self) -> &T {
        self.data.as_ref().unwrap()
    }
}

impl<'a, T> DerefMut for EbrCellWriteTxn<'a, T> {
    fn deref_mut(&mut self) -> &mut T {
        self.data.as_mut().unwrap()
    }
}

/// A concurrently readable cell.
///
/// This structure behaves in a similar manner to a `RwLock<Box<T>>`. However
/// unlike a read-write lock, writes and parallel reads can be performed
/// simultaneously. This means writes do not block reads or reads do not
/// block writes.
///
/// To achieve this a form of "copy-on-write" (or for Rust, clone on write) is
/// used. As a write transaction begins, we clone the existing data to a new
/// location that is capable of being mutated.
///
/// Readers are guaranteed that the content of the `EbrCell` will live as long
/// as the read transaction is open, and will be consistent for the duration
/// of the transaction. There can be an "unlimited" number of readers in parallel
/// accessing different generations of data of the `EbrCell`.
///
/// Data that is copied is garbage collected using the crossbeam-epoch library.
///
/// Writers are serialised and are guaranteed they have exclusive write access
/// to the structure.
///
/// # Examples
/// ```
/// use concread::ebrcell::EbrCell;
///
/// let data: i64 = 0;
/// let ebrcell = EbrCell::new(data);
///
/// // Begin a read transaction
/// let read_txn = ebrcell.read();
/// assert_eq!(*read_txn, 0);
/// {
///     // Now create a write, and commit it.
///     let mut write_txn = ebrcell.write();
///     *write_txn = 1;
///     // Commit the change
///     write_txn.commit();
/// }
/// // Show the previous generation still reads '0'
/// assert_eq!(*read_txn, 0);
/// let new_read_txn = ebrcell.read();
/// // And a new read transaction has '1'
/// assert_eq!(*new_read_txn, 1);
/// ```
#[derive(Debug)]
pub struct EbrCell<T> {
    write: Mutex<()>,
    active: Atomic<T>,
}

impl<T> EbrCell<T>
where
    T: Clone + Send + 'static,
{
    /// Create a new `EbrCell` storing type `T`. `T` must implement `Clone`.
    pub fn new(data: T) -> Self {
        EbrCell {
            write: Mutex::new(()),
            active: Atomic::new(data),
        }
    }

    /// Begin a write transaction, returning a write guard.
    pub fn write(&self) -> EbrCellWriteTxn<T> {
        /* Take the exclusive write lock first */
        let mguard = self.write.lock();
        /* Do an atomic load of the current value */
        let guard = epoch::pin();
        let cur_shared = self.active.load(Acquire, &guard);
        /* Now build the write struct, we'll discard the pin shortly! */
        EbrCellWriteTxn {
            /* This is the 'copy' of the copy on write! */
            data: Some(unsafe { cur_shared.deref().clone() }),
            caller: self,
            _guard: mguard,
        }
    }

    /// Attempt to begin a write transaction. If it's already held,
    /// `None` is returned.
    pub fn try_write(&self) -> Option<EbrCellWriteTxn<T>> {
        self.write.try_lock().map(|mguard| {
            let guard = epoch::pin();
            let cur_shared = self.active.load(Acquire, &guard);
            /* Now build the write struct, we'll discard the pin shortly! */
            EbrCellWriteTxn {
                /* This is the 'copy' of the copy on write! */
                data: Some(unsafe { cur_shared.deref().clone() }),
                caller: self,
                _guard: mguard,
            }
        })
    }

    /// This is an internal compontent of the commit cycle. It takes ownership
    /// of the value stored in the writetxn, and commits it to the main EbrCell
    /// safely.
    ///
    /// In theory you could use this as a "lock free" version, but you don't
    /// know if you are trampling a previous change, so it's private and we
    /// let the writetxn struct serialise and protect this interface.
    fn commit(&self, element: Option<T>) {
        // Yield a read txn?
        let guard = epoch::pin();

        // Load the previous data ready for unlinking
        let prev_data = self.active.load(Acquire, &guard);
        // Make the data Owned, and set it in the active.
        let owned_data: Owned<T> = Owned::new(element.unwrap());
        let _shared_data = self
            .active
            .compare_and_set(prev_data, owned_data, Release, &guard);
        // Finally, set our previous data for cleanup.
        unsafe {
            guard.defer(move || {
                drop(prev_data.into_owned());
            });
        }
        // Then return the current data with a readtxn. Do we need a new guard scope?
    }

    /// Begin a read transaction. The returned [`EbrCellReadTxn'] guarantees
    /// the data lives long enough via crossbeam's Epoch type. When this is
    /// dropped the data *may* be freed at some point in the future.
    pub fn read(&self) -> EbrCellReadTxn<T> {
        let guard = epoch::pin();

        // This option returns None on null pointer, but we can never be null
        // as we have to init with data, and all replacement ALWAYS gives us
        // a ptr, so unwrap?
        let cur = {
            let c = self.active.load(Acquire, &guard);
            c.as_raw()
        };

        EbrCellReadTxn {
            _guard: guard,
            data: cur,
        }
    }
}

impl<T> Drop for EbrCell<T> {
    fn drop(&mut self) {
        // Right, we are dropping! Everything is okay here *except*
        // that we need to tell our active data to be unlinked, else it may
        // be dropped "unsafely".
        let guard = epoch::pin();

        let prev_data = self.active.load(Acquire, &guard);
        unsafe {
            guard.defer(move || {
                drop(prev_data.into_owned());
            });
        }
    }
}

/// A read transaction. This stores a reference to the data from the main
/// `EbrCell`, and guarantees it is alive for the duration of the read.
// #[derive(Debug)]
pub struct EbrCellReadTxn<T> {
    _guard: Guard,
    data: *const T,
}

impl<T> Deref for EbrCellReadTxn<T> {
    type Target = T;

    /// Derference and access the value within the read transaction.
    fn deref(&self) -> &T {
        unsafe { &(*self.data) }
    }
}

#[cfg(test)]
mod tests {
    extern crate crossbeam_utils;
    extern crate time;

    use std::sync::atomic::{AtomicUsize, Ordering};

    use super::EbrCell;
    use crossbeam_utils::thread::scope;

    #[test]
    fn test_deref_mut() {
        let data: i64 = 0;
        let cc = EbrCell::new(data);
        {
            /* Take a write txn */
            let mut cc_wrtxn = cc.write();
            *cc_wrtxn = 1;
            cc_wrtxn.commit();
        }
        let cc_rotxn = cc.read();
        assert_eq!(*cc_rotxn, 1);
    }

    #[test]
    fn test_try_write() {
        let data: i64 = 0;
        let cc = EbrCell::new(data);
        /* Take a write txn */
        let cc_wrtxn_a = cc.try_write();
        assert!(cc_wrtxn_a.is_some());
        /* Because we already hold the writ, the second is guaranteed to fail */
        let cc_wrtxn_a = cc.try_write();
        assert!(cc_wrtxn_a.is_none());
    }

    #[test]
    fn test_simple_create() {
        let data: i64 = 0;
        let cc = EbrCell::new(data);

        let cc_rotxn_a = cc.read();
        assert_eq!(*cc_rotxn_a, 0);

        {
            /* Take a write txn */
            let mut cc_wrtxn = cc.write();
            /* Get the data ... */
            {
                let mut_ptr = cc_wrtxn.get_mut();
                /* Assert it's 0 */
                assert_eq!(*mut_ptr, 0);
                *mut_ptr = 1;
                assert_eq!(*mut_ptr, 1);
            }
            assert_eq!(*cc_rotxn_a, 0);

            let cc_rotxn_b = cc.read();
            assert_eq!(*cc_rotxn_b, 0);
            /* The write txn and it's lock is dropped here */
            cc_wrtxn.commit();
        }

        /* Start a new txn and see it's still good */
        let cc_rotxn_c = cc.read();
        assert_eq!(*cc_rotxn_c, 1);
        assert_eq!(*cc_rotxn_a, 0);
    }

    const MAX_TARGET: i64 = 2000;

    #[test]
    fn test_multithread_create() {
        let start = time::now();
        // Create the new ebrcell.
        let data: i64 = 0;
        let cc = EbrCell::new(data);

        scope(|scope| {
            let cc_ref = &cc;

            let _readers: Vec<_> = (0..7)
                .map(|_| {
                    scope.spawn(move || {
                        let mut last_value: i64 = 0;
                        while last_value < MAX_TARGET {
                            let cc_rotxn = cc_ref.read();
                            {
                                assert!(*cc_rotxn >= last_value);
                                last_value = *cc_rotxn;
                            }
                        }
                    })
                }).collect();

            let _writers: Vec<_> = (0..3)
                .map(|_| {
                    scope.spawn(move || {
                        let mut last_value: i64 = 0;
                        while last_value < MAX_TARGET {
                            let mut cc_wrtxn = cc_ref.write();
                            {
                                let mut_ptr = cc_wrtxn.get_mut();
                                assert!(*mut_ptr >= last_value);
                                last_value = *mut_ptr;
                                *mut_ptr = *mut_ptr + 1;
                            }
                            cc_wrtxn.commit();
                        }
                    })
                }).collect();
        });

        let end = time::now();
        print!("Ebr MT create :{} ", end - start);
    }

    static GC_COUNT: AtomicUsize = AtomicUsize::new(0);

    #[derive(Debug, Clone)]
    struct TestGcWrapper<T> {
        data: T,
    }

    impl<T> Drop for TestGcWrapper<T> {
        fn drop(&mut self) {
            // Add to the atomic counter ...
            GC_COUNT.fetch_add(1, Ordering::Release);
        }
    }

    fn test_gc_operation_thread(cc: &EbrCell<TestGcWrapper<i64>>) {
        while GC_COUNT.load(Ordering::Acquire) < 50 {
            // thread::sleep(std::time::Duration::from_millis(200));
            {
                let mut cc_wrtxn = cc.write();
                {
                    let mut_ptr = cc_wrtxn.get_mut();
                    mut_ptr.data = mut_ptr.data + 1;
                }
                cc_wrtxn.commit();
            }
        }
    }

    #[test]
    fn test_gc_operation() {
        GC_COUNT.store(0, Ordering::Release);
        let data = TestGcWrapper { data: 0 };
        let cc = EbrCell::new(data);

        scope(|scope| {
            let cc_ref = &cc;
            let _writers: Vec<_> = (0..3)
                .map(|_| {
                    scope.spawn(move || {
                        test_gc_operation_thread(cc_ref);
                    })
                }).collect();
        });

        assert!(GC_COUNT.load(Ordering::Acquire) >= 50);
    }
}

#[cfg(test)]
mod tests_linear {
    use std::sync::atomic::{AtomicUsize, Ordering};

    use super::EbrCell;

    static GC_COUNT: AtomicUsize = AtomicUsize::new(0);

    #[derive(Debug, Clone)]
    struct TestGcWrapper<T> {
        data: T,
    }

    impl<T> Drop for TestGcWrapper<T> {
        fn drop(&mut self) {
            // Add to the atomic counter ...
            GC_COUNT.fetch_add(1, Ordering::Release);
        }
    }

    #[test]
    fn test_gc_operation_linear() {
        /*
         * Test if epoch drops in order (or ordered enough).
         * A property required for b+tree with cow is that txn's
         * are dropped in order so that tree states are not invalidated.
         *
         * A -> B -> C
         *
         * If B is dropped, it invalidates nodes copied from A
         * causing the tree to corrupt txn A (and maybe C).
         *
         * EBR due to it's design while it won't drop in order,
         * it drops generationally, in blocks. This is probably
         * good enough. This means that:
         *
         * A -> B -> C .. -> X -> Y
         *
         * EBR will drop in blocks such as:
         *
         * |  g1   |  g2   |  live |
         * A -> B -> C .. -> X -> Y
         *
         * This test is "small" but asserts a basic sanity of drop
         * ordering, but it's not conclusive for b+tree. More testing
         * (likely multi-thread strees test) is needed, or analysis from
         * other EBR developers.
         */
        GC_COUNT.store(0, Ordering::Release);
        let data = TestGcWrapper { data: 0 };
        let cc = EbrCell::new(data);

        // Open a read A.
        let cc_rotxn_a = cc.read();
        // open a write, change and commit
        {
            let mut cc_wrtxn = cc.write();
            {
                let mut_ptr = cc_wrtxn.get_mut();
                mut_ptr.data = mut_ptr.data + 1;
            }
            cc_wrtxn.commit();
        }
        // open a read B.
        let cc_rotxn_b = cc.read();
        // open a write, change and commit
        {
            let mut cc_wrtxn = cc.write();
            {
                let mut_ptr = cc_wrtxn.get_mut();
                mut_ptr.data = mut_ptr.data + 1;
            }
            cc_wrtxn.commit();
        }
        // open a read C
        let cc_rotxn_c = cc.read();

        assert!(GC_COUNT.load(Ordering::Acquire) == 0);

        // drop B
        drop(cc_rotxn_b);

        // gc count should be 0.
        assert!(GC_COUNT.load(Ordering::Acquire) == 0);

        // drop C
        drop(cc_rotxn_c);

        // gc count should be 0
        assert!(GC_COUNT.load(Ordering::Acquire) == 0);

        // drop A
        drop(cc_rotxn_a);

        // gc count should be 2 (A + B, C is still live)
        assert!(GC_COUNT.load(Ordering::Acquire) <= 2);
    }

}