1use crate::{
2 database::Database,
3 environment::Environment,
4 error::{mdbx_result, Result},
5 flags::{DatabaseFlags, WriteFlags},
6 txn_manager::{TxnManagerMessage, TxnPtr},
7 Cursor, Error, Stat, TableObject,
8};
9use ffi::{MDBX_txn_flags_t, MDBX_TXN_RDONLY, MDBX_TXN_READWRITE};
10use indexmap::IndexSet;
11use parking_lot::{Mutex, MutexGuard};
12use serde::{Deserialize, Serialize};
13use std::{
14 ffi::{c_uint, c_void},
15 fmt::{self, Debug},
16 mem::size_of,
17 ptr, slice,
18 sync::{atomic::AtomicBool, mpsc::sync_channel, Arc},
19 time::Duration,
20};
21
22#[cfg(feature = "read-tx-timeouts")]
23use crate::ffi::mdbx_txn_renew;
24
25mod private {
26 use super::*;
27
28 pub trait Sealed {}
29
30 impl Sealed for RO {}
31 impl Sealed for RW {}
32}
33
34pub trait TransactionKind: private::Sealed + Send + Sync + Debug + 'static {
35 #[doc(hidden)]
36 const OPEN_FLAGS: MDBX_txn_flags_t;
37
38 #[doc(hidden)]
40 const IS_READ_ONLY: bool;
41}
42
43#[derive(Debug)]
44#[non_exhaustive]
45pub struct RO;
46
47#[derive(Debug)]
48#[non_exhaustive]
49pub struct RW;
50
51impl TransactionKind for RO {
52 const OPEN_FLAGS: MDBX_txn_flags_t = MDBX_TXN_RDONLY;
53 const IS_READ_ONLY: bool = true;
54}
55impl TransactionKind for RW {
56 const OPEN_FLAGS: MDBX_txn_flags_t = MDBX_TXN_READWRITE;
57 const IS_READ_ONLY: bool = false;
58}
59
60pub struct Transaction<K>
64where
65 K: TransactionKind,
66{
67 inner: Arc<TransactionInner<K>>,
68}
69
70impl<K> Transaction<K>
71where
72 K: TransactionKind,
73{
74 pub(crate) fn new(env: Environment) -> Result<Self> {
75 let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
76 unsafe {
77 mdbx_result(ffi::mdbx_txn_begin_ex(
78 env.env_ptr(),
79 ptr::null_mut(),
80 K::OPEN_FLAGS,
81 &mut txn,
82 ptr::null_mut(),
83 ))?;
84 Ok(Self::new_from_ptr(env, txn))
85 }
86 }
87
88 pub(crate) fn new_from_ptr(env: Environment, txn_ptr: *mut ffi::MDBX_txn) -> Self {
89 let txn = TransactionPtr::new(txn_ptr);
90
91 #[cfg(feature = "read-tx-timeouts")]
92 if K::IS_READ_ONLY {
93 env.txn_manager()
94 .add_active_read_transaction(txn_ptr, txn.clone())
95 }
96
97 let inner = TransactionInner {
98 txn,
99 primed_dbis: Mutex::new(IndexSet::new()),
100 committed: AtomicBool::new(false),
101 env,
102 _marker: Default::default(),
103 };
104
105 Self {
106 inner: Arc::new(inner),
107 }
108 }
109
110 #[inline]
115 pub fn txn_execute<F, T>(&self, f: F) -> Result<T>
116 where
117 F: FnOnce(*mut ffi::MDBX_txn) -> T,
118 {
119 self.inner.txn_execute(f)
120 }
121
122 #[inline]
127 pub(crate) fn txn_execute_renew_on_timeout<F, T>(&self, f: F) -> Result<T>
128 where
129 F: FnOnce(*mut ffi::MDBX_txn) -> T,
130 {
131 self.inner.txn_execute_renew_on_timeout(f)
132 }
133
134 #[doc(hidden)]
136 #[cfg(test)]
137 pub fn txn(&self) -> *mut ffi::MDBX_txn {
138 self.inner.txn.txn
139 }
140
141 pub fn env(&self) -> &Environment {
143 &self.inner.env
144 }
145
146 pub fn id(&self) -> Result<u64> {
148 self.txn_execute(|txn| unsafe { ffi::mdbx_txn_id(txn) })
149 }
150
151 pub fn get<Key>(&self, dbi: ffi::MDBX_dbi, key: &[u8]) -> Result<Option<Key>>
160 where
161 Key: TableObject,
162 {
163 let key_val: ffi::MDBX_val = ffi::MDBX_val {
164 iov_len: key.len(),
165 iov_base: key.as_ptr() as *mut c_void,
166 };
167 let mut data_val: ffi::MDBX_val = ffi::MDBX_val {
168 iov_len: 0,
169 iov_base: ptr::null_mut(),
170 };
171
172 self.txn_execute(|txn| unsafe {
173 match ffi::mdbx_get(txn, dbi, &key_val, &mut data_val) {
174 ffi::MDBX_SUCCESS => Key::decode_val::<K>(txn, data_val).map(Some),
175 ffi::MDBX_NOTFOUND => Ok(None),
176 err_code => Err(Error::from_err_code(err_code)),
177 }
178 })?
179 }
180
181 pub fn commit(self) -> Result<(bool, CommitLatency)> {
185 self.commit_and_rebind_open_dbs().map(|v| (v.0, v.1))
186 }
187
188 pub fn prime_for_permaopen(&self, db: Database) {
189 self.inner.primed_dbis.lock().insert(db.dbi());
190 }
191
192 pub fn commit_and_rebind_open_dbs(self) -> Result<(bool, CommitLatency, Vec<Database>)> {
194 let result = {
195 let result = self.txn_execute(|txn| {
196 if K::IS_READ_ONLY {
197 #[cfg(feature = "read-tx-timeouts")]
198 self.env().txn_manager().remove_active_read_transaction(txn);
199
200 let mut latency = CommitLatency::new();
201 mdbx_result(unsafe {
202 ffi::mdbx_txn_commit_ex(txn, latency.mdb_commit_latency())
203 })
204 .map(|v| (v, latency))
205 } else {
206 let (sender, rx) = sync_channel(0);
207 self.env()
208 .txn_manager()
209 .send_message(TxnManagerMessage::Commit {
210 tx: TxnPtr(txn),
211 sender,
212 });
213 rx.recv().unwrap()
214 }
215 })?;
216
217 self.inner.set_committed();
218 result
219 };
220 result.map(|(v, latency)| {
221 (
222 v,
223 latency,
224 self.inner
225 .primed_dbis
226 .lock()
227 .iter()
228 .map(|&dbi| Database::new_from_ptr(dbi, self.env().clone()))
229 .collect(),
230 )
231 })
232 }
233
234 pub fn open_db(&self, name: Option<&str>) -> Result<Database> {
246 Database::new(self, name, 0)
247 }
248
249 pub fn db_flags(&self, db: &Database) -> Result<DatabaseFlags> {
251 let mut flags: c_uint = 0;
252 unsafe {
253 self.txn_execute(|txn| {
254 mdbx_result(ffi::mdbx_dbi_flags_ex(
255 txn,
256 db.dbi(),
257 &mut flags,
258 ptr::null_mut(),
259 ))
260 })??;
261 }
262
263 #[cfg_attr(not(windows), allow(clippy::useless_conversion))]
265 Ok(DatabaseFlags::from_bits_truncate(flags.try_into().unwrap()))
266 }
267
268 pub fn db_stat(&self, db: &Database) -> Result<Stat> {
270 self.db_stat_with_dbi(db.dbi())
271 }
272
273 pub fn db_stat_with_dbi(&self, dbi: ffi::MDBX_dbi) -> Result<Stat> {
275 unsafe {
276 let mut stat = Stat::new();
277 self.txn_execute(|txn| {
278 mdbx_result(ffi::mdbx_dbi_stat(
279 txn,
280 dbi,
281 stat.mdb_stat(),
282 size_of::<Stat>(),
283 ))
284 })??;
285 Ok(stat)
286 }
287 }
288
289 pub fn cursor(&self, db: &Database) -> Result<Cursor<K>> {
291 Cursor::new(self.clone(), db.dbi())
292 }
293
294 pub fn cursor_with_dbi(&self, dbi: ffi::MDBX_dbi) -> Result<Cursor<K>> {
296 Cursor::new(self.clone(), dbi)
297 }
298
299 #[cfg(feature = "read-tx-timeouts")]
301 pub fn disable_timeout(&self) {
302 if K::IS_READ_ONLY {
303 self.env()
304 .txn_manager()
305 .remove_active_read_transaction(self.inner.txn.txn);
306 }
307 }
308}
309
310impl<K> Clone for Transaction<K>
311where
312 K: TransactionKind,
313{
314 fn clone(&self) -> Self {
315 Self {
316 inner: Arc::clone(&self.inner),
317 }
318 }
319}
320
321impl<K> fmt::Debug for Transaction<K>
322where
323 K: TransactionKind,
324{
325 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
326 f.debug_struct("RoTransaction").finish_non_exhaustive()
327 }
328}
329
330struct TransactionInner<K>
332where
333 K: TransactionKind,
334{
335 txn: TransactionPtr,
337 primed_dbis: Mutex<IndexSet<ffi::MDBX_dbi>>,
339 committed: AtomicBool,
341 env: Environment,
342 _marker: std::marker::PhantomData<fn(K)>,
343}
344
345impl<K> TransactionInner<K>
346where
347 K: TransactionKind,
348{
349 fn set_committed(&self) {
351 self.committed
352 .store(true, std::sync::atomic::Ordering::SeqCst);
353 }
354
355 fn has_committed(&self) -> bool {
356 self.committed.load(std::sync::atomic::Ordering::SeqCst)
357 }
358
359 #[inline]
360 fn txn_execute<F, T>(&self, f: F) -> Result<T>
361 where
362 F: FnOnce(*mut ffi::MDBX_txn) -> T,
363 {
364 self.txn.txn_execute_fail_on_timeout(f)
365 }
366
367 #[inline]
368 fn txn_execute_renew_on_timeout<F, T>(&self, f: F) -> Result<T>
369 where
370 F: FnOnce(*mut ffi::MDBX_txn) -> T,
371 {
372 self.txn.txn_execute_renew_on_timeout(f)
373 }
374}
375
376impl<K> Drop for TransactionInner<K>
377where
378 K: TransactionKind,
379{
380 fn drop(&mut self) {
381 self.txn
384 .txn_execute_renew_on_timeout(|txn| {
385 if !self.has_committed() {
386 if K::IS_READ_ONLY {
387 #[cfg(feature = "read-tx-timeouts")]
388 self.env.txn_manager().remove_active_read_transaction(txn);
389
390 unsafe {
391 ffi::mdbx_txn_abort(txn);
392 }
393 } else {
394 let (sender, rx) = sync_channel(0);
395 self.env
396 .txn_manager()
397 .send_message(TxnManagerMessage::Abort {
398 tx: TxnPtr(txn),
399 sender,
400 });
401 rx.recv().unwrap().unwrap();
402 }
403 }
404 })
405 .unwrap();
406 }
407}
408
409impl Transaction<RW> {
410 pub(crate) fn open_db_with_flags(
411 &self,
412 name: Option<&str>,
413 flags: DatabaseFlags,
414 ) -> Result<Database> {
415 Database::new(self, name, flags.bits())
416 }
417
418 pub fn create_db(&self, name: Option<&str>, flags: DatabaseFlags) -> Result<Database> {
431 self.open_db_with_flags(name, flags | DatabaseFlags::CREATE)
432 }
433
434 pub fn put(
441 &self,
442 dbi: ffi::MDBX_dbi,
443 key: impl AsRef<[u8]>,
444 data: impl AsRef<[u8]>,
445 flags: WriteFlags,
446 ) -> Result<()> {
447 let key = key.as_ref();
448 let data = data.as_ref();
449 let key_val: ffi::MDBX_val = ffi::MDBX_val {
450 iov_len: key.len(),
451 iov_base: key.as_ptr() as *mut c_void,
452 };
453 let mut data_val: ffi::MDBX_val = ffi::MDBX_val {
454 iov_len: data.len(),
455 iov_base: data.as_ptr() as *mut c_void,
456 };
457 mdbx_result(self.txn_execute(|txn| unsafe {
458 ffi::mdbx_put(txn, dbi, &key_val, &mut data_val, flags.bits())
459 })?)?;
460
461 Ok(())
462 }
463
464 pub fn reserve(
468 &self,
469 db: &Database,
470 key: impl AsRef<[u8]>,
471 len: usize,
472 flags: WriteFlags,
473 ) -> Result<&mut [u8]> {
474 let key = key.as_ref();
475 let key_val: ffi::MDBX_val = ffi::MDBX_val {
476 iov_len: key.len(),
477 iov_base: key.as_ptr() as *mut c_void,
478 };
479 let mut data_val: ffi::MDBX_val = ffi::MDBX_val {
480 iov_len: len,
481 iov_base: ptr::null_mut::<c_void>(),
482 };
483 unsafe {
484 mdbx_result(self.txn_execute(|txn| {
485 ffi::mdbx_put(
486 txn,
487 db.dbi(),
488 &key_val,
489 &mut data_val,
490 flags.bits() | ffi::MDBX_RESERVE,
491 )
492 })?)?;
493 Ok(slice::from_raw_parts_mut(
494 data_val.iov_base as *mut u8,
495 data_val.iov_len,
496 ))
497 }
498 }
499
500 pub fn del(
510 &self,
511 dbi: ffi::MDBX_dbi,
512 key: impl AsRef<[u8]>,
513 data: Option<&[u8]>,
514 ) -> Result<bool> {
515 let key = key.as_ref();
516 let key_val: ffi::MDBX_val = ffi::MDBX_val {
517 iov_len: key.len(),
518 iov_base: key.as_ptr() as *mut c_void,
519 };
520 let data_val: Option<ffi::MDBX_val> = data.map(|data| ffi::MDBX_val {
521 iov_len: data.len(),
522 iov_base: data.as_ptr() as *mut c_void,
523 });
524
525 mdbx_result({
526 self.txn_execute(|txn| {
527 if let Some(d) = data_val {
528 unsafe { ffi::mdbx_del(txn, dbi, &key_val, &d) }
529 } else {
530 unsafe { ffi::mdbx_del(txn, dbi, &key_val, ptr::null()) }
531 }
532 })?
533 })
534 .map(|_| true)
535 .or_else(|e| match e {
536 Error::NotFound => Ok(false),
537 other => Err(other),
538 })
539 }
540
541 pub fn clear_db(&self, dbi: ffi::MDBX_dbi) -> Result<()> {
543 mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, dbi, false) })?)?;
544
545 Ok(())
546 }
547
548 pub unsafe fn drop_db(&self, db: Database) -> Result<()> {
554 mdbx_result(self.txn_execute(|txn| ffi::mdbx_drop(txn, db.dbi(), true))?)?;
555
556 Ok(())
557 }
558}
559
560impl Transaction<RO> {
561 pub unsafe fn close_db(&self, db: Database) -> Result<()> {
567 mdbx_result(ffi::mdbx_dbi_close(self.env().env_ptr(), db.dbi()))?;
568
569 Ok(())
570 }
571}
572
573impl Transaction<RW> {
574 pub fn begin_nested_txn(&mut self) -> Result<Self> {
576 if self.inner.env.is_write_map() {
577 return Err(Error::NestedTransactionsUnsupportedWithWriteMap);
578 }
579 self.txn_execute(|txn| {
580 let (tx, rx) = sync_channel(0);
581 self.env()
582 .txn_manager()
583 .send_message(TxnManagerMessage::Begin {
584 parent: TxnPtr(txn),
585 flags: RW::OPEN_FLAGS,
586 sender: tx,
587 });
588
589 rx.recv()
590 .unwrap()
591 .map(|ptr| Self::new_from_ptr(self.env().clone(), ptr.0))
592 })?
593 }
594}
595
596#[derive(Debug, Clone)]
598pub(crate) struct TransactionPtr {
599 txn: *mut ffi::MDBX_txn,
600 #[cfg(feature = "read-tx-timeouts")]
601 timed_out: Arc<AtomicBool>,
602 lock: Arc<Mutex<()>>,
603}
604
605impl TransactionPtr {
606 fn new(txn: *mut ffi::MDBX_txn) -> Self {
607 Self {
608 txn,
609 #[cfg(feature = "read-tx-timeouts")]
610 timed_out: Arc::new(AtomicBool::new(false)),
611 lock: Arc::new(Mutex::new(())),
612 }
613 }
614
615 #[cfg(feature = "read-tx-timeouts")]
624 fn is_timed_out(&self) -> bool {
625 self.timed_out.load(std::sync::atomic::Ordering::SeqCst)
626 }
627
628 #[cfg(feature = "read-tx-timeouts")]
629 pub(crate) fn set_timed_out(&self) {
630 self.timed_out
631 .store(true, std::sync::atomic::Ordering::SeqCst);
632 }
633
634 fn lock(&self) -> MutexGuard<'_, ()> {
635 if let Some(lock) = self.lock.try_lock() {
636 lock
637 } else {
638 tracing::debug!(
639 target: "libmdbx",
640 txn = %self.txn as usize,
641 backtrace = %std::backtrace::Backtrace::force_capture(),
642 "Transaction lock is already acquired, blocking..."
643 );
644 self.lock.lock()
645 }
646 }
647
648 #[inline]
652 pub(crate) fn txn_execute_fail_on_timeout<F, T>(&self, f: F) -> Result<T>
653 where
654 F: FnOnce(*mut ffi::MDBX_txn) -> T,
655 {
656 let _lck = self.lock();
657
658 #[cfg(feature = "read-tx-timeouts")]
662 if self.is_timed_out() {
663 return Err(Error::ReadTransactionTimeout);
664 }
665
666 Ok((f)(self.txn))
667 }
668
669 #[inline]
674 pub(crate) fn txn_execute_renew_on_timeout<F, T>(&self, f: F) -> Result<T>
675 where
676 F: FnOnce(*mut ffi::MDBX_txn) -> T,
677 {
678 let _lck = self.lock();
679
680 #[cfg(feature = "read-tx-timeouts")]
682 if self.is_timed_out() {
683 mdbx_result(unsafe { mdbx_txn_renew(self.txn) })?;
684 }
685
686 Ok((f)(self.txn))
687 }
688}
689
690#[derive(Debug, Serialize, Deserialize)]
695#[repr(transparent)]
696pub struct CommitLatency(ffi::MDBX_commit_latency);
697
698impl CommitLatency {
699 pub(crate) const fn new() -> Self {
701 unsafe { Self(std::mem::zeroed()) }
702 }
703
704 pub(crate) fn mdb_commit_latency(&mut self) -> *mut ffi::MDBX_commit_latency {
706 &mut self.0
707 }
708}
709
710impl CommitLatency {
711 #[inline]
714 pub const fn preparation(&self) -> Duration {
715 Self::time_to_duration(self.0.preparation)
716 }
717
718 #[inline]
720 pub const fn gc_wallclock(&self) -> Duration {
721 Self::time_to_duration(self.0.gc_wallclock)
722 }
723
724 #[inline]
726 pub const fn audit(&self) -> Duration {
727 Self::time_to_duration(self.0.audit)
728 }
729
730 #[inline]
733 pub const fn write(&self) -> Duration {
734 Self::time_to_duration(self.0.write)
735 }
736
737 #[inline]
740 pub const fn sync(&self) -> Duration {
741 Self::time_to_duration(self.0.sync)
742 }
743
744 #[inline]
746 pub const fn ending(&self) -> Duration {
747 Self::time_to_duration(self.0.ending)
748 }
749
750 #[inline]
752 pub const fn whole(&self) -> Duration {
753 Self::time_to_duration(self.0.whole)
754 }
755
756 #[inline]
758 pub const fn gc_cputime(&self) -> Duration {
759 Self::time_to_duration(self.0.gc_cputime)
760 }
761
762 #[inline]
763 const fn time_to_duration(time: u32) -> Duration {
764 Duration::from_nanos(time as u64 * (1_000_000_000 / 65_536))
765 }
766}
767
768unsafe impl Send for TransactionPtr {}
770
771unsafe impl Sync for TransactionPtr {}
773
774#[cfg(test)]
775mod tests {
776 use super::*;
777
778 const fn assert_send_sync<T: Send + Sync>() {}
779
780 #[allow(dead_code)]
781 const fn test_txn_send_sync() {
782 assert_send_sync::<Transaction<RO>>();
783 assert_send_sync::<Transaction<RW>>();
784 }
785}