use std::{
borrow::Borrow,
cell::UnsafeCell,
marker::PhantomData as marker,
ops::{Deref, DerefMut},
sync::{
atomic::{self, Ordering},
Arc,
},
time::Duration,
};
use super::{
readset::ReadSet,
transact::{TransactionState, Txn, TxnManager},
};
use super::version::*;
use log::*;
use parking_lot::*;
use super::utils;
use crate::txn::transact::TransactionConcurrency;
use crate::txn::writeset::WriteSet;
use std::alloc::{dealloc, Layout};
use std::any::Any;
#[derive(Clone)]
pub struct TVar<T>
where
T: Clone + Any + Send + Sync,
{
pub(crate) data: Var,
pub(crate) lock: Arc<ReentrantMutex<bool>>,
pub(crate) id: u64,
pub(crate) stamp: u64,
pub(crate) modrev: u64,
timeout: usize,
marker: marker<T>,
}
impl<T> TVar<T>
where
T: Clone + Any + Send + Sync,
{
pub fn new(data: T) -> Self {
TVar {
data: Arc::new(data),
lock: Arc::new(ReentrantMutex::new(true)),
id: TxnManager::dispense_tvar_id(),
stamp: TxnManager::rts(),
modrev: TxnManager::rts(),
timeout: super::constants::DEFAULT_TX_TIMEOUT,
marker,
}
}
pub fn new_with_timeout(data: T, timeout: usize) -> Self {
TVar {
data: Arc::new(data),
lock: Arc::new(ReentrantMutex::new(true)),
id: TxnManager::dispense_tvar_id(),
stamp: TxnManager::rts(),
modrev: TxnManager::rts(),
timeout,
marker,
}
}
pub(crate) fn set_stamp(&mut self, stamp: u64) {
self.stamp = stamp;
}
pub(crate) fn set_mod_rev(&mut self, modrev: u64) {
self.modrev = modrev;
}
pub fn get_data(&self) -> T {
let val = self.data.clone();
(&*val as &dyn Any)
.downcast_ref::<T>()
.expect("Only tx vars are allowed for values.")
.clone()
}
pub(crate) fn open_read(&self) -> T {
let rs = ReadSet::local();
let txn = Txn::get_local();
let state: &TransactionState = &*txn.state.get();
match state {
TransactionState::Committed | TransactionState::Unknown => self.get_data(),
TransactionState::Active => {
let ws = WriteSet::local();
let scratch = ws.get_by_stamp::<T>(self.stamp);
if scratch.is_none() {
if self.is_locked() {
txn.rollback();
}
let tvar = self.clone();
let arctvar = Arc::new(tvar);
rs.add(arctvar);
self.get_data()
} else {
let written = scratch.unwrap();
let v: T = utils::version_to_dest(written);
v
}
}
TransactionState::MarkedRollback => {
debug!("Starting rolling back: {}", TxnManager::rts());
txn.rolling_back();
txn.on_abort::<T>();
self.get_data()
}
TransactionState::RollingBack => {
self.get_data()
}
TransactionState::Suspended => {
std::thread::sleep(Duration::from_millis(100));
self.get_data()
}
TransactionState::RolledBack => {
txn.rolled_back();
panic!("Transaction rollback finalized.");
}
s => {
panic!("Unexpected transaction state: {:?}", s);
}
}
}
pub(crate) fn open_write_deref_mut(&mut self) -> T {
self.open_write(self.get_data())
}
pub(crate) fn open_write(&mut self, data: T) -> T {
let txn = Txn::get_local();
let state: &TransactionState = &*txn.state.get();
match state {
TransactionState::Committed | TransactionState::Unknown => self.get_data(),
TransactionState::Active => {
let mut ws = WriteSet::local();
let this = Arc::new(self.clone());
if ws.get_by_stamp::<T>(this.stamp).is_none() {
if self.is_locked() {
txn.rollback();
}
self.modrev = self.modrev.saturating_add(1);
let this = Arc::new(self.clone());
ws.put::<T>(this, Arc::new(data.clone()));
self.data = Arc::new(data.clone());
}
self.data = Arc::new(data);
self.get_data()
}
TransactionState::MarkedRollback
| TransactionState::RollingBack
| TransactionState::RolledBack => {
txn.state.replace_with(|_| TransactionState::Unknown);
self.get_data()
}
TransactionState::Suspended => {
std::thread::sleep(Duration::from_millis(100));
self.get_data()
}
s => {
panic!("Unexpected transaction state: {:?}", s);
}
}
}
pub(crate) fn validate(&self) -> bool {
let txn = Txn::get_local();
let state: &TransactionState = &*txn.state.get();
match state {
TransactionState::Committed | TransactionState::Unknown => true,
TransactionState::Active => {
let free = self.is_not_locked_and_current();
let pure = self.stamp <= TxnManager::rts();
free & pure
}
TransactionState::MarkedRollback
| TransactionState::RollingBack
| TransactionState::RolledBack => false,
s => {
panic!("Unexpected transaction state: {:?}", s);
}
}
}
pub(crate) fn is_locked(&self) -> bool {
self.lock.try_lock().is_none()
}
pub(crate) fn is_not_locked_and_current(&self) -> bool {
!self.is_locked()
}
pub(crate) fn is_writer_held_by_current_thread(&self) -> bool {
self.is_locked()
}
}
impl<T: Any + Clone + Send + Sync> Deref for TVar<T> {
type Target = T;
fn deref(&self) -> &T {
let x: *mut T = BoxMemory.allocate(self.open_read());
unsafe { &*(x) }
}
}
impl<T: 'static + Any + Clone + Send + Sync> DerefMut for TVar<T> {
fn deref_mut(&mut self) -> &mut T {
let x: *mut T = BoxMemory.allocate(self.open_write_deref_mut());
unsafe { &mut *(x) }
}
}
pub(crate) trait Memory {
fn allocate<T>(&self, value: T) -> *mut T;
unsafe fn deallocate<T>(&self, pointer: *mut T);
}
#[derive(Copy, Clone, Debug)]
pub(crate) struct BoxMemory;
impl BoxMemory {
pub(crate) fn reclaim<T>(&self, pointer: *const T) -> T {
assert!(!pointer.is_null());
unsafe { std::ptr::read_volatile::<T>(pointer as *mut T) }
}
pub(crate) fn reclaim_mut<T>(&self, pointer: *mut T) -> T {
assert!(!pointer.is_null());
unsafe { std::ptr::read_volatile::<T>(pointer) }
}
pub(crate) fn volatile_read<T: Clone>(&self, pointer: *mut T) -> T {
assert!(!pointer.is_null());
unsafe { std::ptr::read_volatile::<T>(pointer) }
}
pub(crate) fn deallocate_raw<T>(&self, p: *mut T) {
unsafe {
std::ptr::drop_in_place(p);
dealloc(p as *mut u8, Layout::new::<T>());
}
}
pub(crate) fn replace_with<T: Clone, X>(&self, ptr: *mut T, mut thunk: X)
where
X: FnMut(T) -> T,
{
let read = unsafe { std::ptr::read_volatile::<T>(ptr as *const T) };
let res = thunk(read);
unsafe { std::ptr::write_volatile::<T>(ptr, res) };
}
}
impl Memory for BoxMemory {
fn allocate<T>(&self, value: T) -> *mut T {
Box::into_raw(Box::new(value))
}
unsafe fn deallocate<T>(&self, pointer: *mut T) {
assert!(!pointer.is_null());
drop(Box::from_raw(pointer));
}
}