use crate::{
Environment, MdbxResult, TransactionKind,
sys::txn_manager::{RawTxPtr, TxnManagerMessage},
};
use core::{fmt, marker::PhantomData};
use parking_lot::{Mutex, MutexGuard};
use std::{
ops,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
mpsc::sync_channel,
},
};
mod sealed {
use super::*;
#[allow(unreachable_pub)]
pub trait Sealed {}
impl Sealed for super::RoGuard {}
impl Sealed for super::RwUnsync {}
impl<K: TransactionKind> Sealed for super::PtrSyncInner<K> {}
impl<K: TransactionKind> Sealed for super::PtrSync<K> {}
}
#[allow(unreachable_pub)]
pub trait TxPtrAccess: fmt::Debug + sealed::Sealed {
fn with_txn_ptr<F, R>(&self, f: F) -> MdbxResult<R>
where
F: FnOnce(*mut ffi::MDBX_txn) -> R;
fn with_txn_ptr_for_cleanup<F, R>(&self, f: F) -> MdbxResult<R>
where
F: FnOnce(*mut ffi::MDBX_txn) -> R,
{
self.with_txn_ptr(f)
}
fn mark_committed(&self);
}
pub struct RwUnsync {
committed: AtomicBool,
ptr: *mut ffi::MDBX_txn,
}
impl fmt::Debug for RwUnsync {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RwUnsync").field("committed", &self.committed).finish()
}
}
impl RwUnsync {
pub(crate) const fn new(ptr: *mut ffi::MDBX_txn) -> Self {
Self { committed: AtomicBool::new(false), ptr }
}
}
impl TxPtrAccess for RwUnsync {
fn with_txn_ptr<F, R>(&self, f: F) -> MdbxResult<R>
where
F: FnOnce(*mut ffi::MDBX_txn) -> R,
{
Ok(f(self.ptr))
}
fn mark_committed(&self) {
unsafe { *self.committed.as_ptr() = true };
}
}
impl Drop for RwUnsync {
fn drop(&mut self) {
unsafe {
if !*self.committed.as_ptr() {
ffi::mdbx_txn_abort(self.ptr);
}
}
}
}
pub(crate) struct RoTxPtr {
ptr: *mut ffi::MDBX_txn,
}
impl fmt::Debug for RoTxPtr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RoTxPtr").finish()
}
}
#[cfg(feature = "read-tx-timeouts")]
impl Drop for RoTxPtr {
fn drop(&mut self) {
unsafe {
ffi::mdbx_txn_abort(self.ptr);
}
}
}
impl From<*mut ffi::MDBX_txn> for RoTxPtr {
fn from(txn: *mut ffi::MDBX_txn) -> Self {
Self { ptr: txn }
}
}
unsafe impl Send for RoTxPtr {}
unsafe impl Sync for RoTxPtr {}
#[cfg(feature = "read-tx-timeouts")]
type WeakRoTxPtr = std::sync::Weak<RoTxPtr>;
type PhantomUnsync = PhantomData<fn() -> std::cell::Cell<()>>;
pub struct RoGuard {
strong: Option<std::sync::Arc<RoTxPtr>>,
#[cfg(feature = "read-tx-timeouts")]
weak: WeakRoTxPtr,
committed: AtomicBool,
_unsync: PhantomUnsync,
}
impl fmt::Debug for RoGuard {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RoGuard").field("committed", &self.committed).finish()
}
}
impl RoGuard {
#[cfg_attr(feature = "read-tx-timeouts", allow(dead_code))]
pub(crate) fn new_no_timeout(ptr: RoTxPtr) -> Self {
let arc = std::sync::Arc::new(ptr);
#[cfg(feature = "read-tx-timeouts")]
let weak = std::sync::Arc::downgrade(&arc);
Self {
strong: Some(arc),
#[cfg(feature = "read-tx-timeouts")]
weak,
committed: AtomicBool::new(false),
_unsync: PhantomData,
}
}
#[cfg(feature = "read-tx-timeouts")]
pub(crate) fn new_with_timeout(ptr: RoTxPtr, duration: std::time::Duration) -> Self {
let arc = std::sync::Arc::new(ptr);
let weak = std::sync::Arc::downgrade(&arc);
std::thread::spawn(move || {
std::thread::sleep(duration);
drop(arc);
});
Self { strong: None, weak, committed: AtomicBool::new(false), _unsync: PhantomData }
}
pub(crate) fn try_ref(&self) -> Option<std::sync::Arc<RoTxPtr>> {
if unsafe { *self.committed.as_ptr() } {
return None;
}
if let Some(strong) = &self.strong {
return Some(strong.clone());
}
#[cfg(feature = "read-tx-timeouts")]
{
self.weak.upgrade()
}
#[cfg(not(feature = "read-tx-timeouts"))]
{
None
}
}
#[cfg(feature = "read-tx-timeouts")]
pub(crate) fn try_disable_timer(&mut self) -> MdbxResult<()> {
if self.strong.is_some() {
return Ok(());
}
if let Some(arc) = self.weak.upgrade() {
self.strong = Some(arc);
return Ok(());
}
Err(crate::MdbxError::ReadTransactionTimeout)
}
}
impl TxPtrAccess for RoGuard {
fn with_txn_ptr<F, R>(&self, f: F) -> MdbxResult<R>
where
F: FnOnce(*mut ffi::MDBX_txn) -> R,
{
#[cfg(feature = "read-tx-timeouts")]
{
if let Some(strong) = self.try_ref() {
return Ok(f(strong.ptr));
}
Err(crate::MdbxError::ReadTransactionTimeout)
}
#[cfg(not(feature = "read-tx-timeouts"))]
{
let Some(arc) = self.try_ref() else { unreachable!() };
Ok(f(arc.ptr))
}
}
fn with_txn_ptr_for_cleanup<F, R>(&self, f: F) -> MdbxResult<R>
where
F: FnOnce(*mut ffi::MDBX_txn) -> R,
{
#[cfg(feature = "read-tx-timeouts")]
{
if let Some(strong) = self.try_ref() {
return Ok(f(strong.ptr));
}
Ok(f(std::ptr::null_mut()))
}
#[cfg(not(feature = "read-tx-timeouts"))]
{
let Some(arc) = self.try_ref() else { unreachable!() };
Ok(f(arc.ptr))
}
}
fn mark_committed(&self) {
unsafe { *self.committed.as_ptr() = true };
}
}
pub(crate) struct PtrSync<K: TransactionKind> {
inner: Arc<PtrSyncInner<K>>,
}
impl<K: TransactionKind> fmt::Debug for PtrSync<K> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PtrSync")
.field("txn", &(self.inner.txn as usize))
.field("committed", &self.inner.committed)
.finish()
}
}
impl<K: TransactionKind> ops::Deref for PtrSync<K> {
type Target = PtrSyncInner<K>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<K: TransactionKind> Clone for PtrSync<K> {
fn clone(&self) -> Self {
Self { inner: Arc::clone(&self.inner) }
}
}
impl<K: TransactionKind> PtrSync<K> {
pub(crate) fn new(env: Environment, txn: *mut ffi::MDBX_txn) -> Self {
Self { inner: Arc::new(PtrSyncInner::new(env, txn)) }
}
}
#[derive(Debug)]
pub struct PtrSyncInner<K: TransactionKind> {
txn: *mut ffi::MDBX_txn,
committed: AtomicBool,
lock: Mutex<bool>,
env: Environment,
span: tracing::Span,
_marker: PhantomData<fn() -> K>,
}
impl<K: TransactionKind> PtrSyncInner<K> {
pub(crate) fn new(env: Environment, txn: *mut ffi::MDBX_txn) -> Self {
let txn_id = unsafe { ffi::mdbx_txn_id(txn) };
let span = tracing::debug_span!(
target: "libmdbx",
"mdbx_txn",
kind = %if K::IS_READ_ONLY { "ro" } else { "rw" },
txn_id = tracing::field::Empty,
);
span.record("txn_id", txn_id);
Self {
txn,
committed: AtomicBool::new(false),
lock: Mutex::new(false),
env,
_marker: PhantomData,
span,
}
}
#[cfg(feature = "read-tx-timeouts")]
pub(crate) const unsafe fn txn_ptr(&self) -> *mut ffi::MDBX_txn {
self.txn
}
pub(crate) fn lock(&self) -> MutexGuard<'_, bool> {
if let Some(lock) = self.lock.try_lock() {
lock
} else {
tracing::trace!(
target: "libmdbx",
txn = %self.txn as usize,
backtrace = %std::backtrace::Backtrace::capture(),
"Transaction lock is already acquired, blocking...
To display the full backtrace, run with `RUST_BACKTRACE=full` env variable."
);
self.lock.lock()
}
}
#[inline]
pub(crate) fn txn_execute_fail_on_timeout<F, T>(&self, f: F) -> MdbxResult<T>
where
F: FnOnce(*mut ffi::MDBX_txn) -> T,
{
self.with_txn_ptr(f)
}
#[inline]
pub(crate) fn txn_execute_renew_on_timeout<F, T>(&self, f: F) -> MdbxResult<T>
where
F: FnOnce(*mut ffi::MDBX_txn) -> T,
{
let _lck = self.lock();
#[cfg(feature = "read-tx-timeouts")]
if *_lck {
use crate::error::mdbx_result;
mdbx_result(unsafe { ffi::mdbx_txn_renew(self.txn) })?;
}
Ok((f)(self.txn))
}
pub(crate) const fn env(&self) -> &Environment {
&self.env
}
pub(crate) const fn span(&self) -> &tracing::Span {
&self.span
}
}
impl<K: TransactionKind> TxPtrAccess for PtrSyncInner<K> {
fn with_txn_ptr<F, R>(&self, f: F) -> MdbxResult<R>
where
F: FnOnce(*mut ffi::MDBX_txn) -> R,
{
let timeout_flag = self.lock();
if *timeout_flag {
return Err(crate::MdbxError::ReadTransactionTimeout);
}
let result = f(self.txn);
Ok(result)
}
fn with_txn_ptr_for_cleanup<F, R>(&self, f: F) -> MdbxResult<R>
where
F: FnOnce(*mut ffi::MDBX_txn) -> R,
{
self.txn_execute_renew_on_timeout(f)
}
fn mark_committed(&self) {
self.committed.store(true, Ordering::SeqCst);
}
}
impl<K: TransactionKind> Drop for PtrSyncInner<K> {
fn drop(&mut self) {
if self.committed.load(Ordering::SeqCst) {
return;
}
let _guard = self.span().enter();
tracing::debug!(target: "libmdbx", "aborted");
if K::IS_READ_ONLY {
#[cfg(feature = "read-tx-timeouts")]
self.env.txn_manager().remove_active_read_transaction(self.txn);
unsafe {
ffi::mdbx_txn_abort(self.txn);
}
} else {
let (sender, rx) = sync_channel(0);
self.env
.txn_manager()
.send_message(TxnManagerMessage::Abort { tx: RawTxPtr(self.txn), sender });
rx.recv().unwrap().unwrap();
}
}
}
#[cfg(test)]
mod test {
use crate::tx::RoGuard;
const fn _assert_ro_send() {
const fn _assert_send<T: Send>() {}
_assert_send::<RoGuard>();
}
}