#[cfg(feature = "asynch")]
pub mod asynch;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::sync::{Mutex, MutexGuard};
#[derive(Debug, Default)]
pub struct CowCell<T> {
write: Mutex<()>,
active: Mutex<Arc<T>>,
}
pub struct CowCellWriteTxn<'a, T> {
work: Option<T>,
read: Arc<T>,
caller: &'a CowCell<T>,
_guard: MutexGuard<'a, ()>,
}
#[derive(Debug)]
pub struct CowCellReadTxn<T>(Arc<T>);
impl<T> Clone for CowCellReadTxn<T> {
fn clone(&self) -> Self {
CowCellReadTxn(self.0.clone())
}
}
impl<T> CowCell<T>
where
T: Clone,
{
pub fn new(data: T) -> Self {
CowCell {
write: Mutex::new(()),
active: Mutex::new(Arc::new(data)),
}
}
pub fn read(&self) -> CowCellReadTxn<T> {
let rwguard = self.active.lock().unwrap();
CowCellReadTxn(rwguard.clone())
}
pub fn write(&self) -> CowCellWriteTxn<'_, T> {
let mguard = self.write.lock().unwrap();
let read = {
let rwguard = self.active.lock().unwrap();
rwguard.clone()
};
CowCellWriteTxn {
work: None,
read,
caller: self,
_guard: mguard,
}
}
pub fn try_write(&self) -> Option<CowCellWriteTxn<'_, T>> {
self.write.try_lock().ok().map(|mguard| {
let read = {
let rwguard = self.active.lock().unwrap();
rwguard.clone()
};
CowCellWriteTxn {
work: None,
read,
caller: self,
_guard: mguard,
}
})
}
fn commit(&self, newdata: Option<T>) {
if let Some(new_data) = newdata {
let mut rwguard = self.active.lock().unwrap();
let new_inner = Arc::new(new_data);
*rwguard = new_inner;
}
}
}
impl<T> Deref for CowCellReadTxn<T> {
type Target = T;
#[inline]
fn deref(&self) -> &T {
&self.0
}
}
impl<T> CowCellWriteTxn<'_, T>
where
T: Clone,
{
pub fn get_mut(&mut self) -> &mut T {
if self.work.is_none() {
let mut data: Option<T> = Some((*self.read).clone());
std::mem::swap(&mut data, &mut self.work);
debug_assert!(data.is_none())
}
self.work.as_mut().expect("can not fail")
}
pub fn replace(&mut self, value: T) {
self.work = Some(value);
}
pub fn commit(self) {
self.caller.commit(self.work);
}
}
impl<T> Deref for CowCellWriteTxn<'_, T>
where
T: Clone,
{
type Target = T;
#[inline(always)]
fn deref(&self) -> &T {
match &self.work {
Some(v) => v,
None => &self.read,
}
}
}
impl<T> DerefMut for CowCellWriteTxn<'_, T>
where
T: Clone,
{
#[inline(always)]
fn deref_mut(&mut self) -> &mut T {
self.get_mut()
}
}
#[cfg(test)]
mod tests {
use super::CowCell;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
use std::thread::scope;
#[test]
fn test_deref_mut() {
let data: i64 = 0;
let cc = CowCell::new(data);
{
let mut cc_wrtxn = cc.write();
*cc_wrtxn = 1;
cc_wrtxn.commit();
}
let cc_rotxn = cc.read();
assert_eq!(*cc_rotxn, 1);
}
#[test]
fn test_try_write() {
let data: i64 = 0;
let cc = CowCell::new(data);
let cc_wrtxn_a = cc.try_write();
assert!(cc_wrtxn_a.is_some());
let cc_wrtxn_a = cc.try_write();
assert!(cc_wrtxn_a.is_none());
}
#[test]
fn test_simple_create() {
let data: i64 = 0;
let cc = CowCell::new(data);
let cc_rotxn_a = cc.read();
assert_eq!(*cc_rotxn_a, 0);
{
let mut cc_wrtxn = cc.write();
{
let mut_ptr = cc_wrtxn.get_mut();
assert_eq!(*mut_ptr, 0);
*mut_ptr = 1;
assert_eq!(*mut_ptr, 1);
}
assert_eq!(*cc_rotxn_a, 0);
let cc_rotxn_b = cc.read();
assert_eq!(*cc_rotxn_b, 0);
cc_wrtxn.commit();
}
let cc_rotxn_c = cc.read();
assert_eq!(*cc_rotxn_c, 1);
assert_eq!(*cc_rotxn_a, 0);
}
const MAX_TARGET: i64 = 2000;
#[test]
#[cfg_attr(miri, ignore)]
fn test_multithread_create() {
let start = Instant::now();
let data: i64 = 0;
let cc = CowCell::new(data);
assert!(scope(|scope| {
let cc_ref = &cc;
let readers: Vec<_> = (0..7)
.map(|_| {
scope.spawn(move || {
let mut last_value: i64 = 0;
while last_value < MAX_TARGET {
let cc_rotxn = cc_ref.read();
{
assert!(*cc_rotxn >= last_value);
last_value = *cc_rotxn;
}
}
})
})
.collect();
let writers: Vec<_> = (0..3)
.map(|_| {
scope.spawn(move || {
let mut last_value: i64 = 0;
while last_value < MAX_TARGET {
let mut cc_wrtxn = cc_ref.write();
{
let mut_ptr = cc_wrtxn.get_mut();
assert!(*mut_ptr >= last_value);
last_value = *mut_ptr;
*mut_ptr += 1;
}
cc_wrtxn.commit();
}
})
})
.collect();
for h in readers.into_iter() {
h.join().unwrap();
}
for h in writers.into_iter() {
h.join().unwrap();
}
true
}));
let end = Instant::now();
print!("Arc MT create :{:?} ", end - start);
}
static GC_COUNT: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug, Clone)]
struct TestGcWrapper<T> {
data: T,
}
impl<T> Drop for TestGcWrapper<T> {
fn drop(&mut self) {
GC_COUNT.fetch_add(1, Ordering::Release);
}
}
fn test_gc_operation_thread(cc: &CowCell<TestGcWrapper<i64>>) {
while GC_COUNT.load(Ordering::Acquire) < 50 {
{
let mut cc_wrtxn = cc.write();
{
let mut_ptr = cc_wrtxn.get_mut();
mut_ptr.data += 1;
}
cc_wrtxn.commit();
}
}
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_gc_operation() {
GC_COUNT.store(0, Ordering::Release);
let data = TestGcWrapper { data: 0 };
let cc = CowCell::new(data);
assert!(scope(|scope| {
let cc_ref = &cc;
let writers: Vec<_> = (0..3)
.map(|_| {
scope.spawn(move || {
test_gc_operation_thread(cc_ref);
})
})
.collect();
for h in writers.into_iter() {
h.join().unwrap();
}
true
}));
assert!(GC_COUNT.load(Ordering::Acquire) >= 50);
}
#[test]
fn test_default() {
CowCell::<()>::default();
}
}