mvcc_cell 0.1.2

Software-transactional memory for Rust
Documentation
use parking_lot::Mutex;
use crate::history::History;
use crate::{Mvcc,Transaction,TxnId};
use guest_cell::{GuestCell,Guest,id::{Id,UniqueId}};
use std::sync::Arc;
use std::hash::Hash;
use std::borrow::Borrow;
use std::ops::{Index,IndexMut};
use std::marker::PhantomData;

/// A transactional memory location.
///
/// Note that cloning an `MvccCell` will alias the cell instead of 
/// creating a new transactional slot.
pub struct MvccCell<T:?Sized>(pub(crate) Arc<MvccBox<T>>);

impl<T:?Sized> Clone for MvccCell<T> {
    /// Creates an alias that shares internal state with `self`
    fn clone(&self)->Self { MvccCell(Arc::clone(&self.0)) }
}

impl<T:?Sized> MvccCell<T> {
    /// Initialize a new transactional memory location
    pub fn new(mvcc: &Mvcc, init: Box<T>)->Self {
        MvccCell(MvccBox::new(mvcc, init))
    }

    pub(crate) fn erase<'a>(&self)->ErasedCell<'a> where T:'a {
        ErasedCell(Arc::clone(&self.0) as Arc<dyn 'a + Erased>)
    }
}

pub(crate) struct MvccBox<T:?Sized> {
    pub(crate) mvcc_id: Id,
    pub(crate) history: Mutex<History<Box<T>>>,
    pub(crate) pending: GuestCell<T>,

    // We use unsafe to break &Ts out of the Mutex,
    // and this is required to fixup the automatic
    // Send + Sync bounds
    phantom: PhantomData<std::sync::RwLock<T>>,
}


impl<T:?Sized> MvccBox<T> {
    pub fn new(mvcc: &Mvcc, init:Box<T>)->Arc<Self> {
        Arc::new(MvccBox {
            mvcc_id: mvcc.id(),
            history: Mutex::new(History::new(init)),
            pending: GuestCell::default(),
            phantom: PhantomData,
        })
    }

    fn pending_access(self: &Arc<Self>)->PendingAccessor<T> {
        PendingAccessor(Arc::clone(self))
    }
}

impl<'a, T:'a + ?Sized> Index<&'_ MvccCell<T>> for Transaction<'a> {
    type Output = T;
    fn index(&self, cell: &MvccCell<T>)->&T {
        let txn = self;
        txn.visible.lock().insert(cell.erase());
        txn.pending
            .get(cell.0.pending_access())
            .unwrap_or_else(|| txn.get_committed_raw(cell))
    }
}

impl<'a, T:'a + ?Sized> IndexMut<&'_ MvccCell<T>>
    for Transaction<'a>
    where Box<T>: Clone
{
    fn index_mut(&mut self, cell:&MvccCell<T>)->&mut T {
        let txn = self;
        txn.visible.get_mut().insert(cell.erase());
        txn.pending.get_or_init_mut(
            cell.0.pending_access(),
            || cell.0.history.lock().lookup(*txn.seq).clone()
        )
    }
}

impl<'a> Transaction<'a> {
    /// Store a new value inside the cell.
    ///
    /// Returns the old value iff it was previously written by this transaction
    pub fn replace<T:'a+?Sized>(&mut self, cell:&MvccCell<T>, val: Box<T>)->Option<Box<T>> {
        self.visible.get_mut().insert(cell.erase());
        self.pending.set(cell.0.pending_access(), val)
    }

    /// Reverts this cell to its value at the beginning of the transaction.
    ///
    /// Returns the pending value, if any.
    pub fn revert<T:'a+?Sized>(&mut self, cell:&MvccCell<T>)->Option<Box<T>> {
        self.visible.get_mut().insert(cell.erase());
        self.pending.take(cell.0.pending_access())
    }

    /// Get the value of this cell as of the start of the transaction
    pub fn get_committed<T:'a+?Sized>(&self, cell: &MvccCell<T>)->&T {
        self.visible.lock().insert(cell.erase());
        self.get_committed_raw(cell)
    }
        
    fn get_committed_raw<T:'a+?Sized>(&self, cell: &MvccCell<T>)->&T {
        let guard = cell.0.history.lock();
        let result: *const T = &***guard.lookup(*self.seq) as *const T;

        // Safety: Value is boxed and read-only; will remain valid
        //         until cleaned up by vacuum, which has its own
        //         aliasing protections
        unsafe { &* result }
    }
}

pub(crate) trait Erased {
    fn last_changed(&self)->TxnId;
    fn commit<'a>(self: Arc<Self>, guard: &mut UniqueId, seq:TxnId, txn: &mut Guest<'a>) where Self:'a;
}

impl<T:?Sized> Erased for MvccBox<T> {
    fn last_changed(&self)->TxnId {
        self.history.lock().current()
    }

    fn commit<'a>(self: Arc<Self>, guard: &mut UniqueId, seq: TxnId, txn: &mut Guest<'a>) where Self:'a {
        assert_eq!(**guard, self.mvcc_id);
        let Some(val) = txn.take(self.pending_access()) else { return };
        self.history.lock().push(seq, val);
    }
}

struct PendingAccessor<T:?Sized>(Arc<MvccBox<T>>);
impl<T:?Sized> Borrow<GuestCell<T>> for PendingAccessor<T> {
    fn borrow(&self)->&GuestCell<T> {
        &self.0.pending
    }
}

unsafe impl<T:?Sized> guest_cell::IndirectGuestCell for PendingAccessor<T> {
    type Inner = T;
}

pub(crate) struct ErasedCell<'a>(pub Arc<dyn 'a + Erased>);

impl Eq for ErasedCell<'_> {}
impl PartialEq for ErasedCell<'_> {
    fn eq(&self, rhs:&Self)->bool {
        Arc::as_ptr(&self.0) == Arc::as_ptr(&rhs.0)
    }
}

impl Hash for ErasedCell<'_> {
    fn hash<H: std::hash::Hasher>(&self, hasher:&mut H) {
        Arc::as_ptr(&self.0).hash(hasher)
    }
}