use crossbeam_epoch as epoch;
use crossbeam_epoch::{Atomic, Guard, Owned};
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use std::mem;
use std::ops::{Deref, DerefMut};
use std::sync::{Mutex, MutexGuard};
pub struct EbrCellWriteTxn<'a, T: 'static + Clone + Send + Sync> {
data: Option<T>,
caller: &'a EbrCell<T>,
_guard: MutexGuard<'a, ()>,
}
impl<T> EbrCellWriteTxn<'_, T>
where
T: Clone + Sync + Send + 'static,
{
pub fn get_mut(&mut self) -> &mut T {
self.data.as_mut().unwrap()
}
pub fn commit(mut self) {
let mut element: Option<T> = None;
mem::swap(&mut element, &mut self.data);
self.caller.commit(element);
}
}
impl<T> Deref for EbrCellWriteTxn<'_, T>
where
T: Clone + Sync + Send,
{
type Target = T;
#[inline]
fn deref(&self) -> &T {
self.data.as_ref().unwrap()
}
}
impl<T> DerefMut for EbrCellWriteTxn<'_, T>
where
T: Clone + Sync + Send,
{
fn deref_mut(&mut self) -> &mut T {
self.data.as_mut().unwrap()
}
}
#[derive(Debug)]
pub struct EbrCell<T: Clone + Sync + Send + 'static> {
write: Mutex<()>,
active: Atomic<T>,
}
impl<T> Default for EbrCell<T>
where
T: Default + Clone + Sync + Send + 'static,
{
fn default() -> Self {
Self::new(Default::default())
}
}
impl<T> EbrCell<T>
where
T: Clone + Sync + Send + 'static,
{
pub fn new(data: T) -> Self {
EbrCell {
write: Mutex::new(()),
active: Atomic::new(data),
}
}
pub fn write(&self) -> EbrCellWriteTxn<'_, T> {
let mguard = self.write.lock().unwrap();
let guard = epoch::pin();
let cur_shared = self.active.load(Acquire, &guard);
EbrCellWriteTxn {
data: Some(unsafe { cur_shared.deref().clone() }),
caller: self,
_guard: mguard,
}
}
pub fn try_write(&self) -> Option<EbrCellWriteTxn<'_, T>> {
self.write.try_lock().ok().map(|mguard| {
let guard = epoch::pin();
let cur_shared = self.active.load(Acquire, &guard);
EbrCellWriteTxn {
data: Some(unsafe { cur_shared.deref().clone() }),
caller: self,
_guard: mguard,
}
})
}
fn commit(&self, element: Option<T>) {
let guard = epoch::pin();
let prev_data = self.active.load(Acquire, &guard);
let owned_data: Owned<T> = Owned::new(element.unwrap());
let _shared_data = self
.active
.compare_exchange(prev_data, owned_data, Release, Relaxed, &guard);
unsafe { guard.defer_destroy(prev_data) };
}
pub fn read(&self) -> EbrCellReadTxn<T> {
let guard = epoch::pin();
let cur = {
let c = self.active.load(Acquire, &guard);
c.as_raw()
};
EbrCellReadTxn {
_guard: guard,
data: cur,
}
}
}
impl<T> Drop for EbrCell<T>
where
T: Clone + Sync + Send + 'static,
{
fn drop(&mut self) {
let guard = epoch::pin();
let prev_data = self.active.load(Acquire, &guard);
unsafe { guard.defer_destroy(prev_data) };
}
}
pub struct EbrCellReadTxn<T> {
_guard: Guard,
data: *const T,
}
impl<T> Deref for EbrCellReadTxn<T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &(*self.data) }
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use super::EbrCell;
use std::thread::scope;
#[test]
fn test_deref_mut() {
let data: i64 = 0;
let cc = EbrCell::new(data);
{
let mut cc_wrtxn = cc.write();
*cc_wrtxn = 1;
cc_wrtxn.commit();
}
let cc_rotxn = cc.read();
assert_eq!(*cc_rotxn, 1);
}
#[test]
fn test_try_write() {
let data: i64 = 0;
let cc = EbrCell::new(data);
let cc_wrtxn_a = cc.try_write();
assert!(cc_wrtxn_a.is_some());
let cc_wrtxn_a = cc.try_write();
assert!(cc_wrtxn_a.is_none());
}
#[test]
fn test_simple_create() {
let data: i64 = 0;
let cc = EbrCell::new(data);
let cc_rotxn_a = cc.read();
assert_eq!(*cc_rotxn_a, 0);
{
let mut cc_wrtxn = cc.write();
{
let mut_ptr = cc_wrtxn.get_mut();
assert_eq!(*mut_ptr, 0);
*mut_ptr = 1;
assert_eq!(*mut_ptr, 1);
}
assert_eq!(*cc_rotxn_a, 0);
let cc_rotxn_b = cc.read();
assert_eq!(*cc_rotxn_b, 0);
cc_wrtxn.commit();
}
let cc_rotxn_c = cc.read();
assert_eq!(*cc_rotxn_c, 1);
assert_eq!(*cc_rotxn_a, 0);
}
const MAX_TARGET: i64 = 2000;
#[test]
#[cfg_attr(miri, ignore)]
fn test_multithread_create() {
use std::time::Instant;
let start = Instant::now();
let data: i64 = 0;
let cc = EbrCell::new(data);
assert!(scope(|scope| {
let cc_ref = &cc;
let readers: Vec<_> = (0..7)
.map(|_| {
scope.spawn(move || {
let mut last_value: i64 = 0;
while last_value < MAX_TARGET {
let cc_rotxn = cc_ref.read();
{
assert!(*cc_rotxn >= last_value);
last_value = *cc_rotxn;
}
}
})
})
.collect();
let writers: Vec<_> = (0..3)
.map(|_| {
scope.spawn(move || {
let mut last_value: i64 = 0;
while last_value < MAX_TARGET {
let mut cc_wrtxn = cc_ref.write();
{
let mut_ptr = cc_wrtxn.get_mut();
assert!(*mut_ptr >= last_value);
last_value = *mut_ptr;
*mut_ptr += 1;
}
cc_wrtxn.commit();
}
})
})
.collect();
for h in readers.into_iter() {
h.join().unwrap();
}
for h in writers.into_iter() {
h.join().unwrap();
}
true
}));
let end = Instant::now();
print!("Ebr MT create :{:?} ", end - start);
}
static GC_COUNT: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug, Clone)]
struct TestGcWrapper<T> {
data: T,
}
impl<T> Drop for TestGcWrapper<T> {
fn drop(&mut self) {
GC_COUNT.fetch_add(1, Ordering::Release);
}
}
fn test_gc_operation_thread(cc: &EbrCell<TestGcWrapper<i64>>) {
while GC_COUNT.load(Ordering::Acquire) < 50 {
{
let mut cc_wrtxn = cc.write();
{
let mut_ptr = cc_wrtxn.get_mut();
mut_ptr.data += 1;
}
cc_wrtxn.commit();
}
}
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_gc_operation() {
GC_COUNT.store(0, Ordering::Release);
let data = TestGcWrapper { data: 0 };
let cc = EbrCell::new(data);
assert!(scope(|scope| {
let cc_ref = &cc;
let writers: Vec<_> = (0..3)
.map(|_| {
scope.spawn(move || {
test_gc_operation_thread(cc_ref);
})
})
.collect();
for h in writers.into_iter() {
h.join().unwrap();
}
true
}));
assert!(GC_COUNT.load(Ordering::Acquire) >= 50);
}
}
#[cfg(test)]
mod tests_linear {
use std::sync::atomic::{AtomicUsize, Ordering};
use super::EbrCell;
static GC_COUNT: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug, Clone)]
struct TestGcWrapper<T> {
data: T,
}
impl<T> Drop for TestGcWrapper<T> {
fn drop(&mut self) {
GC_COUNT.fetch_add(1, Ordering::Release);
}
}
#[test]
fn test_gc_operation_linear() {
GC_COUNT.store(0, Ordering::Release);
let data = TestGcWrapper { data: 0 };
let cc = EbrCell::new(data);
let cc_rotxn_a = cc.read();
{
let mut cc_wrtxn = cc.write();
{
let mut_ptr = cc_wrtxn.get_mut();
mut_ptr.data += 1;
}
cc_wrtxn.commit();
}
let cc_rotxn_b = cc.read();
{
let mut cc_wrtxn = cc.write();
{
let mut_ptr = cc_wrtxn.get_mut();
mut_ptr.data += 1;
}
cc_wrtxn.commit();
}
let cc_rotxn_c = cc.read();
assert!(GC_COUNT.load(Ordering::Acquire) == 0);
drop(cc_rotxn_b);
assert!(GC_COUNT.load(Ordering::Acquire) == 0);
drop(cc_rotxn_c);
assert!(GC_COUNT.load(Ordering::Acquire) == 0);
drop(cc_rotxn_a);
assert!(GC_COUNT.load(Ordering::Acquire) <= 2);
}
#[test]
fn test_default() {
EbrCell::<()>::default();
}
}