use parking_lot::{Mutex, MutexGuard};
use std::marker::PhantomData;
use std::ops::Deref;
use std::ops::DerefMut;
use std::sync::Arc;
pub trait LinCowCellCapable<R, U> {
fn create_reader(&self) -> R;
fn create_writer(&self) -> U;
fn pre_commit(&mut self, new: U, prev: &R) -> R;
}
#[derive(Debug)]
pub struct LinCowCell<T, R, U> {
updater: PhantomData<U>,
write: Mutex<T>,
active: Mutex<Arc<LinCowCellInner<R>>>,
}
#[derive(Debug)]
pub struct LinCowCellWriteTxn<'a, T: 'a, R, U> {
caller: &'a LinCowCell<T, R, U>,
guard: MutexGuard<'a, T>,
work: U,
}
#[derive(Debug)]
struct LinCowCellInner<R> {
pin: Mutex<Option<Arc<LinCowCellInner<R>>>>,
data: R,
}
#[derive(Debug)]
pub struct LinCowCellReadTxn<'a, T: 'a, R, U> {
_caller: &'a LinCowCell<T, R, U>,
work: Arc<LinCowCellInner<R>>,
}
impl<R> LinCowCellInner<R> {
pub fn new(data: R) -> Self {
LinCowCellInner {
pin: Mutex::new(None),
data,
}
}
}
impl<T, R, U> LinCowCell<T, R, U>
where
T: LinCowCellCapable<R, U>,
{
pub fn new(data: T) -> Self {
let r = data.create_reader();
LinCowCell {
updater: PhantomData,
write: Mutex::new(data),
active: Mutex::new(Arc::new(LinCowCellInner::new(r))),
}
}
pub fn read(&self) -> LinCowCellReadTxn<T, R, U> {
let rwguard = self.active.lock();
LinCowCellReadTxn {
_caller: self,
work: rwguard.clone(),
}
}
pub fn write(&self) -> LinCowCellWriteTxn<T, R, U> {
let write_guard = self.write.lock();
let work: U = (*write_guard).create_writer();
LinCowCellWriteTxn {
caller: self,
guard: write_guard,
work,
}
}
pub fn try_write(&self) -> Option<LinCowCellWriteTxn<T, R, U>> {
self.write.try_lock().map(|write_guard| {
let work: U = (*write_guard).create_writer();
LinCowCellWriteTxn {
caller: self,
guard: write_guard,
work,
}
})
}
fn commit(&self, write: LinCowCellWriteTxn<T, R, U>) {
let LinCowCellWriteTxn {
caller: _caller,
mut guard,
work,
} = write;
let mut rwguard = self.active.lock();
let newdata = guard.pre_commit(work, &rwguard.data);
let new_inner = Arc::new(LinCowCellInner::new(newdata));
{
let mut rwguard_inner = rwguard.pin.lock();
*rwguard_inner = Some(new_inner.clone());
}
*rwguard = new_inner;
}
}
impl<'a, T, R, U> Deref for LinCowCellReadTxn<'a, T, R, U> {
type Target = R;
#[inline]
fn deref(&self) -> &R {
&self.work.data
}
}
impl<'a, T, R, U> AsRef<R> for LinCowCellReadTxn<'a, T, R, U> {
#[inline]
fn as_ref(&self) -> &R {
&self.work.data
}
}
impl<'a, T, R, U> LinCowCellWriteTxn<'a, T, R, U>
where
T: LinCowCellCapable<R, U>,
{
#[inline]
pub fn get_mut(&mut self) -> &mut U {
&mut self.work
}
pub fn commit(self) {
self.caller.commit(self);
}
}
impl<'a, T, R, U> Deref for LinCowCellWriteTxn<'a, T, R, U> {
type Target = U;
#[inline]
fn deref(&self) -> &U {
&self.work
}
}
impl<'a, T, R, U> DerefMut for LinCowCellWriteTxn<'a, T, R, U> {
#[inline]
fn deref_mut(&mut self) -> &mut U {
&mut self.work
}
}
impl<'a, T, R, U> AsRef<U> for LinCowCellWriteTxn<'a, T, R, U> {
#[inline]
fn as_ref(&self) -> &U {
&self.work
}
}
impl<'a, T, R, U> AsMut<U> for LinCowCellWriteTxn<'a, T, R, U> {
#[inline]
fn as_mut(&mut self) -> &mut U {
&mut self.work
}
}
#[cfg(test)]
mod tests {
use super::LinCowCell;
use super::LinCowCellCapable;
use crossbeam_utils::thread::scope;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug)]
struct TestData {
x: i64,
}
#[derive(Debug)]
struct TestDataReadTxn {
x: i64,
}
#[derive(Debug)]
struct TestDataWriteTxn {
x: i64,
}
impl LinCowCellCapable<TestDataReadTxn, TestDataWriteTxn> for TestData {
fn create_reader(&self) -> TestDataReadTxn {
TestDataReadTxn { x: self.x }
}
fn create_writer(&self) -> TestDataWriteTxn {
TestDataWriteTxn { x: self.x }
}
fn pre_commit(
&mut self,
new: TestDataWriteTxn,
_prev: &TestDataReadTxn,
) -> TestDataReadTxn {
self.x = new.x;
TestDataReadTxn { x: new.x }
}
}
#[test]
fn test_simple_create() {
let data = TestData { x: 0 };
let cc = LinCowCell::new(data);
let cc_rotxn_a = cc.read();
println!("cc_rotxn_a -> {:?}", cc_rotxn_a);
assert_eq!(cc_rotxn_a.work.data.x, 0);
{
let mut cc_wrtxn = cc.write();
println!("cc_wrtxn -> {:?}", cc_wrtxn);
assert_eq!(cc_wrtxn.work.x, 0);
assert_eq!(cc_wrtxn.as_ref().x, 0);
{
let mut_ptr = cc_wrtxn.get_mut();
assert_eq!(mut_ptr.x, 0);
mut_ptr.x = 1;
assert_eq!(mut_ptr.x, 1);
}
assert_eq!(cc_rotxn_a.work.data.x, 0);
}
assert_eq!(cc_rotxn_a.work.data.x, 0);
{
let mut cc_wrtxn = cc.write();
println!("cc_wrtxn -> {:?}", cc_wrtxn);
assert_eq!(cc_wrtxn.work.x, 0);
assert_eq!(cc_wrtxn.as_ref().x, 0);
{
let mut_ptr = cc_wrtxn.get_mut();
assert_eq!(mut_ptr.x, 0);
mut_ptr.x = 2;
assert_eq!(mut_ptr.x, 2);
}
assert_eq!(cc_rotxn_a.work.data.x, 0);
cc_wrtxn.commit();
}
assert_eq!(cc_rotxn_a.work.data.x, 0);
let cc_rotxn_c = cc.read();
assert_eq!(cc_rotxn_c.work.data.x, 2);
}
fn mt_writer(cc: &LinCowCell<TestData, TestDataReadTxn, TestDataWriteTxn>) {
let mut last_value: i64 = 0;
while last_value < 500 {
let mut cc_wrtxn = cc.write();
{
let mut_ptr = cc_wrtxn.get_mut();
assert!(mut_ptr.x >= last_value);
last_value = mut_ptr.x;
mut_ptr.x = mut_ptr.x + 1;
}
cc_wrtxn.commit();
}
}
fn rt_writer(cc: &LinCowCell<TestData, TestDataReadTxn, TestDataWriteTxn>) {
let mut last_value: i64 = 0;
while last_value < 500 {
let cc_rotxn = cc.read();
{
assert!(cc_rotxn.work.data.x >= last_value);
last_value = cc_rotxn.work.data.x;
}
}
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_multithread_create() {
let start = time::Instant::now();
let data = TestData { x: 0 };
let cc = LinCowCell::new(data);
assert!(scope(|scope| {
let cc_ref = &cc;
let _readers: Vec<_> = (0..7)
.map(|_| {
scope.spawn(move |_| {
rt_writer(cc_ref);
})
})
.collect();
let _writers: Vec<_> = (0..3)
.map(|_| {
scope.spawn(move |_| {
mt_writer(cc_ref);
})
})
.collect();
})
.is_ok());
let end = time::Instant::now();
print!("Arc MT create :{:?} ", end - start);
}
static GC_COUNT: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug, Clone)]
struct TestGcWrapper<T> {
data: T,
}
#[derive(Debug)]
struct TestGcWrapperReadTxn<T> {
data: T,
}
#[derive(Debug)]
struct TestGcWrapperWriteTxn<T> {
data: T,
}
impl<T: Clone> LinCowCellCapable<TestGcWrapperReadTxn<T>, TestGcWrapperWriteTxn<T>>
for TestGcWrapper<T>
{
fn create_reader(&self) -> TestGcWrapperReadTxn<T> {
TestGcWrapperReadTxn {
data: self.data.clone(),
}
}
fn create_writer(&self) -> TestGcWrapperWriteTxn<T> {
TestGcWrapperWriteTxn {
data: self.data.clone(),
}
}
fn pre_commit(
&mut self,
new: TestGcWrapperWriteTxn<T>,
_prev: &TestGcWrapperReadTxn<T>,
) -> TestGcWrapperReadTxn<T> {
self.data = new.data.clone();
TestGcWrapperReadTxn {
data: self.data.clone(),
}
}
}
impl<T> Drop for TestGcWrapperReadTxn<T> {
fn drop(&mut self) {
GC_COUNT.fetch_add(1, Ordering::Release);
}
}
fn test_gc_operation_thread(
cc: &LinCowCell<TestGcWrapper<i64>, TestGcWrapperReadTxn<i64>, TestGcWrapperWriteTxn<i64>>,
) {
while GC_COUNT.load(Ordering::Acquire) < 50 {
{
let mut cc_wrtxn = cc.write();
{
let mut_ptr = cc_wrtxn.get_mut();
mut_ptr.data = mut_ptr.data + 1;
}
cc_wrtxn.commit();
}
}
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_gc_operation() {
GC_COUNT.store(0, Ordering::Release);
let data = TestGcWrapper { data: 0 };
let cc = LinCowCell::new(data);
assert!(scope(|scope| {
let cc_ref = &cc;
let _writers: Vec<_> = (0..3)
.map(|_| {
scope.spawn(move |_| {
test_gc_operation_thread(cc_ref);
})
})
.collect();
})
.is_ok());
assert!(GC_COUNT.load(Ordering::Acquire) >= 50);
}
}
#[cfg(test)]
mod tests_linear {
use super::LinCowCell;
use super::LinCowCellCapable;
use std::sync::atomic::{AtomicUsize, Ordering};
static GC_COUNT: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug, Clone)]
struct TestGcWrapper<T> {
data: T,
}
#[derive(Debug)]
struct TestGcWrapperReadTxn<T> {
data: T,
}
#[derive(Debug)]
struct TestGcWrapperWriteTxn<T> {
data: T,
}
impl<T: Clone> LinCowCellCapable<TestGcWrapperReadTxn<T>, TestGcWrapperWriteTxn<T>>
for TestGcWrapper<T>
{
fn create_reader(&self) -> TestGcWrapperReadTxn<T> {
TestGcWrapperReadTxn {
data: self.data.clone(),
}
}
fn create_writer(&self) -> TestGcWrapperWriteTxn<T> {
TestGcWrapperWriteTxn {
data: self.data.clone(),
}
}
fn pre_commit(
&mut self,
new: TestGcWrapperWriteTxn<T>,
_prev: &TestGcWrapperReadTxn<T>,
) -> TestGcWrapperReadTxn<T> {
self.data = new.data.clone();
TestGcWrapperReadTxn {
data: self.data.clone(),
}
}
}
impl<T> Drop for TestGcWrapperReadTxn<T> {
fn drop(&mut self) {
GC_COUNT.fetch_add(1, Ordering::Release);
}
}
#[test]
fn test_gc_operation_linear() {
GC_COUNT.store(0, Ordering::Release);
assert!(GC_COUNT.load(Ordering::Acquire) == 0);
let data = TestGcWrapper { data: 0 };
let cc = LinCowCell::new(data);
let cc_rotxn_a = cc.read();
let cc_rotxn_a_2 = cc.read();
{
let mut cc_wrtxn = cc.write();
{
let mut_ptr = cc_wrtxn.get_mut();
mut_ptr.data = mut_ptr.data + 1;
}
cc_wrtxn.commit();
}
let cc_rotxn_b = cc.read();
{
let mut cc_wrtxn = cc.write();
{
let mut_ptr = cc_wrtxn.get_mut();
mut_ptr.data = mut_ptr.data + 1;
}
cc_wrtxn.commit();
}
let cc_rotxn_c = cc.read();
assert!(GC_COUNT.load(Ordering::Acquire) == 0);
drop(cc_rotxn_b);
assert!(GC_COUNT.load(Ordering::Acquire) == 0);
drop(cc_rotxn_c);
assert!(GC_COUNT.load(Ordering::Acquire) == 0);
drop(cc_rotxn_a_2);
assert!(GC_COUNT.load(Ordering::Acquire) == 0);
drop(cc_rotxn_a);
assert!(GC_COUNT.load(Ordering::Acquire) == 2);
}
}