use crate::error::{StoreError, StoreResult};
use crate::tx::cf_optimistic_tx_store::RocksDbCFOptimisticTxnStore;
use crate::tx::policies::FixedRetry;
use rocksdb::{OptimisticTransactionDB, OptimisticTransactionOptions, Transaction, WriteOptions};
pub trait RetryPolicy {
fn should_retry(&self, error: &StoreError, attempt: usize) -> StoreResult<()>;
}
pub struct OptimisticTransactionBuilder<'store, RTP> {
store: &'store RocksDbCFOptimisticTxnStore,
retry_policy: RTP,
}
impl<'store> OptimisticTransactionBuilder<'store, FixedRetry> {
pub(crate) fn new(store: &'store RocksDbCFOptimisticTxnStore) -> Self {
Self {
store,
retry_policy: FixedRetry::default(),
}
}
}
impl<'store, RTP: RetryPolicy + Clone> OptimisticTransactionBuilder<'store, RTP> {
pub fn with_retry_policy<NRTP: RetryPolicy + Clone>(
self,
policy: NRTP,
) -> OptimisticTransactionBuilder<'store, NRTP> {
OptimisticTransactionBuilder {
store: self.store,
retry_policy: policy,
}
}
pub fn execute_with_snapshot<F, R>(&self, mut operation: F) -> StoreResult<R>
where
F: FnMut(
// The closure now receives the TRANSACTION object
&Transaction<'_, OptimisticTransactionDB>,
) -> StoreResult<R>,
{
for i in 0.. {
let write_opts = WriteOptions::new();
let mut opt_txn_opts = OptimisticTransactionOptions::new();
opt_txn_opts.set_snapshot(true);
let db = self.store.db_raw();
let txn = db.transaction_opt(&write_opts, &opt_txn_opts);
match operation(&txn) {
Ok(result) => {
match txn.commit() {
Ok(_) => return Ok(result), Err(e) => {
self.retry_policy.should_retry(&StoreError::RocksDb(e), i + 1)?;
}
}
}
Err(e) => {
return Err(e);
}
}
}
unreachable!();
}
pub fn execute_unisolated<F, R>(&self, mut operation: F) -> StoreResult<R>
where
F: FnMut(&Transaction<'_, OptimisticTransactionDB>) -> StoreResult<R>,
{
for i in 0.. {
let write_opts = WriteOptions::new();
let mut opt_txn_opts = OptimisticTransactionOptions::new();
opt_txn_opts.set_snapshot(false);
let db = self.store.db_raw();
let txn = db.transaction_opt(&write_opts, &opt_txn_opts);
match operation(&txn) {
Ok(result) => match txn.commit() {
Ok(_) => return Ok(result),
Err(e) => {
self.retry_policy.should_retry(&StoreError::RocksDb(e), i + 1)?;
}
},
Err(e) => {
return Err(e);
}
}
}
unreachable!();
}
}