use crate::{
Environment,
sys::txn_manager::{Abort, RawTxPtr},
};
use core::fmt;
use parking_lot::{Mutex, MutexGuard};
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
mpsc::sync_channel,
};
use tracing::debug_span;
mod sealed {
#[allow(unreachable_pub)]
pub trait Sealed {}
impl Sealed for super::PtrUnsync {}
impl Sealed for super::PtrSync {}
impl<T> Sealed for &T where T: super::TxPtrAccess {}
impl<T> Sealed for &mut T where T: super::TxPtrAccess {}
impl<T> Sealed for std::sync::Arc<T> where T: super::TxPtrAccess {}
impl<T> Sealed for Box<T> where T: super::TxPtrAccess {}
}
#[allow(unreachable_pub)]
pub trait TxPtrAccess: fmt::Debug + sealed::Sealed {
fn from_ptr_and_env(ptr: *mut ffi::MDBX_txn, env: Environment, is_read_only: bool) -> Self
where
Self: Sized;
fn with_txn_ptr<F, R>(&self, f: F) -> R
where
F: FnOnce(*mut ffi::MDBX_txn) -> R;
fn mark_committed(&self);
fn tx_id(&self) -> Option<usize> {
let mut id = 0;
self.with_txn_ptr(|ptr| {
id = unsafe { ffi::mdbx_txn_id(ptr) as usize };
});
(id != 0).then_some(id)
}
}
impl<T> TxPtrAccess for Arc<T>
where
T: TxPtrAccess,
{
fn from_ptr_and_env(ptr: *mut ffi::MDBX_txn, env: Environment, is_read_only: bool) -> Self
where
Self: Sized,
{
T::from_ptr_and_env(ptr, env, is_read_only).into()
}
fn with_txn_ptr<F, R>(&self, f: F) -> R
where
F: FnOnce(*mut ffi::MDBX_txn) -> R,
{
self.as_ref().with_txn_ptr(f)
}
fn mark_committed(&self) {
self.as_ref().mark_committed();
}
}
pub struct PtrUnsync {
committed: AtomicBool,
ptr: *mut ffi::MDBX_txn,
}
impl fmt::Debug for PtrUnsync {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PtrUnsync").field("committed", &self.committed).finish()
}
}
impl TxPtrAccess for PtrUnsync {
fn from_ptr_and_env(ptr: *mut ffi::MDBX_txn, _env: Environment, _is_read_only: bool) -> Self
where
Self: Sized,
{
Self { committed: AtomicBool::new(false), ptr }
}
fn with_txn_ptr<F, R>(&self, f: F) -> R
where
F: FnOnce(*mut ffi::MDBX_txn) -> R,
{
f(self.ptr)
}
fn mark_committed(&self) {
unsafe { *self.committed.as_ptr() = true };
}
}
impl Drop for PtrUnsync {
fn drop(&mut self) {
unsafe {
if !*self.committed.as_ptr() {
ffi::mdbx_txn_abort(self.ptr);
}
}
}
}
#[derive(Debug)]
pub struct PtrSync {
txn: *mut ffi::MDBX_txn,
committed: AtomicBool,
lock: Mutex<()>,
env: Environment,
is_read_only: bool,
}
unsafe impl Send for PtrSync {}
unsafe impl Sync for PtrSync {}
impl PtrSync {
pub(crate) fn lock(&self) -> MutexGuard<'_, ()> {
if let Some(lock) = self.lock.try_lock() {
lock
} else {
tracing::trace!(
target: "libmdbx",
txn = %self.txn as usize,
backtrace = %std::backtrace::Backtrace::capture(),
"Transaction lock is already acquired, blocking...
To display the full backtrace, run with `RUST_BACKTRACE=full` env variable."
);
self.lock.lock()
}
}
}
impl TxPtrAccess for PtrSync {
fn from_ptr_and_env(ptr: *mut ffi::MDBX_txn, env: Environment, is_read_only: bool) -> Self
where
Self: Sized,
{
Self {
committed: AtomicBool::new(false),
lock: Mutex::new(()),
txn: ptr,
env,
is_read_only,
}
}
fn with_txn_ptr<F, R>(&self, f: F) -> R
where
F: FnOnce(*mut ffi::MDBX_txn) -> R,
{
let _lock = self.lock();
f(self.txn)
}
fn mark_committed(&self) {
self.committed.store(true, Ordering::SeqCst);
}
}
impl Drop for PtrSync {
fn drop(&mut self) {
if self.committed.load(Ordering::SeqCst) {
return;
}
if self.is_read_only {
unsafe { ffi::mdbx_txn_abort(self.txn) };
} else {
let (sender, rx) = sync_channel(0);
self.env.txn_manager().send(Abort {
tx: RawTxPtr(self.txn),
sender,
span: debug_span!("txn_manager_abort"),
});
rx.recv().unwrap().unwrap();
}
tracing::debug!(target: "libmdbx", "aborted");
}
}