#![allow(dead_code, unused_variables)]
use log::*;
use std::{
collections::HashMap,
ops::{Deref, DerefMut},
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Mutex, RwLock,
},
time::Duration,
};
use crate::{var::InnerVarCopy, BusyBreaker, InnerVar, RLULog, RLUVar, Read, ReadGuard, Write, WriteGuard};
pub type Result<T> = core::result::Result<T, TransactionError>;
#[derive(Debug, thiserror::Error)]
pub enum TransactionError {
#[error("Transaction failed")]
Failed,
#[error("Transaction already running")]
InProgress,
#[error("Inner error occured ({0})")]
Inner(String),
#[error("Operation aborted")]
Abort,
#[error("No copy present")]
NoCopyPresent,
#[error("Would block")]
Blocking,
}
pub struct RLUObject<T>
where
T: Clone,
{
rlu: Arc<RLU<T>>,
var: Arc<RLUVar<T>>,
}
impl<T> From<T> for RLUObject<T>
where
T: Clone,
{
fn from(data: T) -> Self {
let rlu = Arc::new(RLU::default());
let var = Arc::new(rlu.create(data));
Self { rlu, var }
}
}
impl<T> RLUObject<T>
where
T: Clone,
{
pub fn ctrl(&self) -> Arc<RLU<T>> {
self.rlu.clone()
}
pub fn var(&self) -> &Arc<RLUVar<T>> {
&self.var
}
}
impl<T> Clone for RLUObject<T>
where
T: Clone,
{
fn clone(&self) -> Self {
Self {
rlu: self.rlu.clone(),
var: self.var.clone(),
}
}
}
#[derive(Clone)]
pub enum RLUStrategy {
Abort,
Retry,
RetryWithBreaker(BusyBreaker),
}
pub struct RLU<T>
where
T: Clone,
{
global_count: Arc<AtomicUsize>,
next_thread_id: Arc<AtomicUsize>,
contexts: Arc<Mutex<HashMap<usize, Arc<RluContext<T>>>>>,
strategy: RLUStrategy,
}
impl<T> Default for RLU<T>
where
T: Clone,
{
fn default() -> Self {
Self::new()
}
}
impl<T> RLU<T>
where
T: Clone,
{
pub fn new() -> Self {
Self::with_strategy(RLUStrategy::Retry)
}
pub fn with_strategy(strategy: RLUStrategy) -> Self {
Self {
global_count: Arc::new(AtomicUsize::new(0)),
next_thread_id: Arc::new(AtomicUsize::new(0)),
contexts: Arc::new(Mutex::new(HashMap::new())),
strategy,
}
}
pub fn create(&self, data: T) -> RLUVar<T> {
RLUVar {
inner: Arc::new(InnerVar {
data: Arc::new(Mutex::new(data)),
ctrl: Some(self.clone()),
locked_thread_id: None,
copy: Arc::new(Mutex::new(None)),
}),
}
}
pub fn execute<F>(&self, func: F) -> Result<()>
where
F: Fn(Arc<RluContext<T>>) -> Result<()>,
{
let breaker = BusyBreaker::default();
loop {
match func(self.context()?) {
Err(err) => {
match &self.strategy {
RLUStrategy::Retry => {
info!("retry -> error: {}", err);
std::thread::sleep(Duration::from_millis(10));
}
RLUStrategy::RetryWithBreaker(breaker) => {
breaker.spin().map_err(|e| TransactionError::Inner(e.to_string()))?;
}
_ => {
info!("other -> error: {}", err);
return Err(err);
}
}
}
Ok(_) => return Ok(()),
}
}
}
fn context(&self) -> Result<Arc<RluContext<T>>> {
let id = self.next_thread_id.fetch_add(1, Ordering::SeqCst);
let context = Arc::new(RluContext {
id: AtomicUsize::new(id),
log: Arc::new(Mutex::new(RLULog::default())),
local_clock: AtomicUsize::new(0),
write_clock: AtomicUsize::new(0),
run_count: AtomicUsize::new(0),
sync_count: Arc::new(Mutex::new(HashMap::default())),
is_writer: AtomicBool::new(false),
ctrl: Arc::new(self.clone()),
});
info!("({}) create context id {}", id, id);
info!("({}) Locking controller context", id);
let mut lock = self.contexts.try_lock().map_err(|e| TransactionError::Blocking)?;
lock.deref_mut().insert(id, context.clone());
drop(lock);
info!("({}) Unlocking controller context", id);
Ok(context)
}
}
impl<T> Clone for RLU<T>
where
T: Clone,
{
fn clone(&self) -> Self {
Self {
global_count: self.global_count.clone(),
next_thread_id: self.next_thread_id.clone(),
contexts: self.contexts.clone(),
strategy: self.strategy.clone(),
}
}
}
pub struct RluContext<T>
where
T: Clone,
{
pub(crate) id: AtomicUsize,
pub(crate) log: Arc<Mutex<RLULog<Arc<InnerVarCopy<T>>>>>,
local_clock: AtomicUsize,
write_clock: AtomicUsize,
is_writer: AtomicBool,
run_count: AtomicUsize,
sync_count: Arc<Mutex<HashMap<usize, usize>>>,
ctrl: Arc<RLU<T>>,
}
impl<T> Read<T> for RluContext<T>
where
T: Clone,
{
fn get<'a>(&'a self, var: &'a RLUVar<T>) -> Result<ReadGuard<'a, T>> {
self.read_lock();
self.dereference(var)
}
}
impl<T> Write<T> for RluContext<T>
where
T: Clone,
{
fn get_mut<'a>(&'a self, var: &'a RLUVar<T>) -> Result<WriteGuard<'a, T>> {
self.read_lock();
self.try_lock(var)
}
}
impl<T> RluContext<T>
where
T: Clone,
{
pub fn read_lock(&self) {
self.is_writer.store(false, Ordering::SeqCst);
self.run_count.fetch_add(1, Ordering::SeqCst);
self.local_clock
.store(self.ctrl.global_count.load(Ordering::SeqCst), Ordering::SeqCst);
info!("({}) READ LOCK", self.id.load(Ordering::SeqCst));
}
pub fn read_unlock(&self) -> Result<()> {
self.run_count.fetch_add(1, Ordering::SeqCst);
if self.is_writer.load(Ordering::SeqCst) {
self.commit_log()?;
}
info!("({}) READ UNLOCK", self.id.load(Ordering::SeqCst));
Ok(())
}
pub(crate) fn set_writer(&self) {
self.is_writer.store(true, Ordering::SeqCst);
}
#[inline]
pub fn dereference<'a>(&'a self, var: &'a RLUVar<T>) -> Result<ReadGuard<'a, T>> {
let inner_data = var.try_inner()?;
let self_id = self.id.load(Ordering::SeqCst);
if var.is_unlocked() {
info!("({}) return unlocked var", self_id);
return Ok(ReadGuard::from_baseguard(inner_data, self));
}
info!("({}) Locking var inner copy", self_id);
let inner_copy = var.inner.copy.try_lock().map_err(|e| TransactionError::Blocking)?;
let copy_lock_id = match &*inner_copy {
Some(inner_copy) => match &inner_copy.locked_thread_id {
Some(id) => id.load(Ordering::SeqCst),
None => 0,
},
None => return Err(TransactionError::NoCopyPresent),
};
drop(inner_copy);
info!("({}) Unlocking var inner copy", self_id);
let self_id = self.id.load(Ordering::SeqCst);
if self_id == copy_lock_id {
info!("({}) Locking var inner copy", self_id);
let inner_copy = var.inner.copy.try_lock().map_err(|e| TransactionError::Blocking)?;
return match &*inner_copy {
Some(guard) => {
let data_guard = guard.data.read().map_err(|e| TransactionError::Inner(e.to_string()))?;
let copied = data_guard.clone();
drop(data_guard);
drop(inner_copy);
info!("({}) Unlocking var inner copy", self_id);
Ok(ReadGuard::from_copied(copied, self))
}
None => Err(TransactionError::Abort),
};
}
match &var.inner.ctrl {
Some(control) => {
info!("({}) Locking control contexts", self_id);
let all_contexts = control.contexts.try_lock().map_err(|e| TransactionError::Blocking)?;
let locking_context = match all_contexts.get(©_lock_id) {
Some(ctx) => ctx,
None => return Err(TransactionError::Inner("No context for locked copy found".to_string())),
};
let write_clock = locking_context.write_clock.load(Ordering::SeqCst);
let local_clock = self.local_clock.load(Ordering::SeqCst);
if write_clock <= local_clock {
info!("({}) Locking inner copy contexts", self_id);
let inner_copy = var.inner.copy.try_lock().map_err(|e| TransactionError::Blocking)?;
match &*inner_copy {
Some(inner) => {
info!("({}) Locking inner copy's data", self_id);
let data_guard = inner.data.read().map_err(|e| TransactionError::Inner(e.to_string()))?;
let copied = data_guard.clone();
info!("({}) unlocking inner copy's data", self_id);
drop(data_guard);
info!("({}) Unlocking inner copy contexts", self_id);
drop(inner_copy);
return Ok(ReadGuard::from_copied(copied, self));
}
None => return Err(TransactionError::NoCopyPresent),
};
}
}
None => return Err(TransactionError::Inner("No inner controller present".to_string())),
}
Ok(ReadGuard::from_baseguard(inner_data, self))
}
pub fn try_lock<'a>(&'a self, var: &'a RLUVar<T>) -> Result<WriteGuard<'a, T>> {
self.set_writer();
let inner = &var.inner;
let self_id = self.id.load(Ordering::SeqCst);
if var.is_locked() {
info!("({}) Locking inner copy data", self_id);
let copy_guard = inner.copy.try_lock().map_err(|e| TransactionError::Blocking)?;
let copy = match &*copy_guard {
Some(copy_ptr) => copy_ptr,
None => return Err(TransactionError::Failed),
};
let copy_thread_id = match ©.locked_thread_id {
Some(thread_id) => thread_id.load(Ordering::SeqCst),
None => return Err(TransactionError::Failed),
};
if copy_thread_id == self_id {
match &*copy_guard {
Some(copy) => {
let mut mutex_guard = copy.data.write().expect("msg");
let copied = mutex_guard.deref_mut().clone();
drop(mutex_guard);
return Ok(WriteGuard::from_guard_copy(
copy_guard,
copied,
self,
Some(var.inner.clone()),
));
}
None => {
self.abort();
return Err(TransactionError::NoCopyPresent);
}
};
}
}
info!("({}) Locking inner data", self_id);
let data = inner
.deref()
.data
.try_lock()
.map_err(|e| TransactionError::Blocking)?
.clone();
info!("({}) Unlocking inner data", self_id);
let copy = InnerVarCopy {
data: Arc::new(RwLock::new(data.clone())),
locked_thread_id: Some(AtomicUsize::new(self_id)),
original: var.inner.clone(),
};
info!("({}) Locking inner copy data", self_id);
let mut copy_guard = inner.copy.try_lock().map_err(|e| TransactionError::Blocking)?;
copy_guard.replace(copy);
return Ok(WriteGuard::from_guard_copy(
copy_guard,
data,
self,
Some(var.inner.clone()),
));
}
fn synchronize(&self) -> Result<()> {
let self_id = self.id.load(Ordering::SeqCst);
info!("({}) Locking controller contexts data", self_id);
let contexts = match self.ctrl.contexts.lock().map_err(|e| TransactionError::Blocking) {
Ok(guard) => guard,
Err(err) => {
panic!("({}) Unlocking context failed", self_id);
}
};
info!("({}) Locking sync counts contexts data", self.id.load(Ordering::SeqCst));
let mut sync_count = match self.sync_count.lock().map_err(|e| TransactionError::Blocking) {
Ok(guard) => guard,
Err(err) => {
panic!("({}) Synchronize. Unlocking sync count failed", self_id);
}
};
for (id, ctx) in contexts.deref() {
let id = ctx.id.load(Ordering::SeqCst);
if id == self_id {
info!(
"({}) skip own context, but run_count would be ({})",
self_id,
ctx.run_count.load(Ordering::SeqCst)
);
continue;
}
let run_count = ctx.run_count.load(Ordering::SeqCst);
sync_count.deref_mut().insert(id, run_count);
}
info!("({}) synchronize begin: Unlocking controller contexts data", self_id);
drop(contexts);
info!("({}) Locking controller contexts data", self_id);
let contexts = match self.ctrl.contexts.lock().map_err(|e| TransactionError::Blocking) {
Ok(guard) => guard,
Err(err) => {
panic!("({}) Synchronize. Unlocking context failed", self_id);
}
};
let mut num_rounds = 0;
let max_rounds = 20;
for (id, ctx) in contexts.deref() {
loop {
if sync_count.get(id).is_none() {
break;
}
let ctx_run_count = ctx.run_count.load(Ordering::SeqCst);
let write_clock = self.write_clock.load(Ordering::SeqCst);
let local_clock = ctx.local_clock.load(Ordering::SeqCst);
info!(
"({}) SYNC: sync count id {}: value {}, write_clock {}, local_clock {}, context run count : {}",
self_id, id, sync_count[id], write_clock, local_clock, ctx_run_count
);
if (sync_count[id] & 0x1) == 0 {
break;
}
if sync_count[id] != ctx_run_count {
break;
}
if write_clock <= local_clock {
break;
}
num_rounds += 1;
if num_rounds > max_rounds {
return Ok(());
}
core::hint::spin_loop();
}
}
info!("({}) synchronize end: Unlocking controller contexts data", self_id);
sync_count.clear();
drop(sync_count);
drop(contexts);
Ok(())
}
fn commit_log(&self) -> Result<()> {
self.write_clock
.store(self.ctrl.global_count.load(Ordering::SeqCst) + 1, Ordering::SeqCst);
self.ctrl.global_count.fetch_add(1, Ordering::SeqCst);
self.synchronize()?;
self.write_back_log()?;
self.reset_write_clock();
self.swap_logs()?;
Ok(())
}
fn write_back_log(&self) -> Result<()> {
info!("({}) Locking context log", self.id.load(Ordering::SeqCst));
let mut guard = match self.log.lock().map_err(|e| TransactionError::Blocking) {
Ok(guard) => guard,
Err(err) => {
panic!(
"({}) Write Back Log: Unlocking log failed",
self.id.load(Ordering::SeqCst)
);
}
};
for item in guard.deref_mut().drain().flatten() {
item.write_back()?;
}
info!("({}) Unlocking context log", self.id.load(Ordering::SeqCst));
drop(guard);
Ok(())
}
fn abort(&self) {
self.run_count.fetch_add(1, Ordering::SeqCst);
self.is_writer.store(false, Ordering::SeqCst)
}
fn reset_write_clock(&self) {
self.write_clock.store(usize::MAX, Ordering::SeqCst);
}
fn swap_logs(&self) -> Result<()> {
info!("({}) Locking context log", self.id.load(Ordering::SeqCst));
let guard = match self.log.lock().map_err(|e| TransactionError::Blocking) {
Ok(guard) => guard,
Err(err) => {
panic!("({}) Swap Logs: Unlocking log failed", self.id.load(Ordering::SeqCst));
}
};
guard.deref().next();
info!("({}) Unlocking context log", self.id.load(Ordering::SeqCst));
drop(guard);
Ok(())
}
}