use std::mem;
use crate::{auxtx::*, StmControl};
use crate::{
transaction::{with_tx, Transaction, TX},
Stm, StmError, StmResult,
};
pub fn retry<T>() -> Stm<T> {
Err(StmControl::Retry)
}
pub fn guard(cond: bool) -> Stm<()> {
if cond {
Ok(())
} else {
retry()
}
}
pub fn abort<T, E1, E2>(e: E1) -> StmResult<T, E2>
where
E1: Into<E2>,
{
Err(StmError::Abort(e.into()))
}
pub fn or<F, G, T>(f: F, g: G) -> Stm<T>
where
F: FnOnce() -> Stm<T>,
G: FnOnce() -> Stm<T>,
{
let mut snapshot = with_tx(|tx| tx.clone());
match f() {
Err(StmControl::Retry) => {
with_tx(|tx| {
mem::swap(tx, &mut snapshot);
});
match g() {
retry @ Err(StmControl::Retry) =>
{
with_tx(|tx| {
for (id, lvar) in snapshot.log.into_iter() {
match tx.log.get(&id) {
Some(lvar) if lvar.read => {}
_ => {
tx.log.insert(id, lvar);
}
}
}
});
retry
}
other => other,
}
}
other => other,
}
}
pub async fn atomically<T, F>(f: F) -> T
where
F: Fn() -> Stm<T>,
{
atomically_aux(|| NoAux, |_| f()).await
}
pub async fn atomically_aux<T, F, A, X>(aux: A, f: F) -> T
where
X: Aux,
A: Fn() -> X,
F: Fn(&mut X) -> Stm<T>,
{
atomically_or_err_aux::<_, (), _, _, _>(aux, |atx| f(atx).map_err(StmError::Control))
.await
.expect("Didn't expect `abort`. Use `atomically_or_err` instead.")
}
pub async fn atomically_or_err<T, E, F>(f: F) -> Result<T, E>
where
F: Fn() -> StmResult<T, E>,
{
atomically_or_err_aux(|| NoAux, |_| f()).await
}
pub async fn atomically_or_err_aux<T, E, F, A, X>(aux: A, f: F) -> Result<T, E>
where
X: Aux,
A: Fn() -> X,
F: Fn(&mut X) -> StmResult<T, E>,
{
loop {
match exec_once(&aux, &f) {
ExecResult::Committed(value) => return Ok(value),
ExecResult::Abort(e) => return Err(e),
ExecResult::Restart => {}
ExecResult::Wait(tx) => tx.wait().await,
}
}
}
enum ExecResult<T, E> {
Committed(T),
Abort(E),
Restart,
Wait(Transaction),
}
fn exec_once<T, E, F, A, X>(aux: &A, f: &F) -> ExecResult<T, E>
where
X: Aux,
A: Fn() -> X,
F: Fn(&mut X) -> StmResult<T, E>,
{
TX.with(|tref| {
let mut t = tref.borrow_mut();
if t.is_some() {
panic!("Already running in an atomic transaction!")
}
*t = Some(Transaction::new());
});
let mut atx = aux();
let result = f(&mut atx);
let tx = TX.with(|tref| tref.borrow_mut().take().unwrap());
match result {
Ok(value) => {
if let Some(version) = tx.commit(atx) {
tx.notify(version);
ExecResult::Committed(value)
} else {
ExecResult::Restart
}
}
Err(err) => {
atx.rollback();
match err {
StmError::Control(StmControl::Failure) => {
ExecResult::Restart
}
StmError::Control(StmControl::Retry) => {
ExecResult::Wait(tx)
}
StmError::Abort(e) => {
ExecResult::Abort(e)
}
}
}
}
}