1use crate::{
2 database::Database,
3 error::{mdbx_result, Error, Result},
4 flags::EnvironmentFlags,
5 transaction::{RO, RW},
6 txn_manager::{TxnManager, TxnManagerMessage, TxnPtr},
7 Mode, SyncMode, Transaction, TransactionKind,
8};
9use byteorder::{ByteOrder, NativeEndian};
10use mem::size_of;
11use serde::{Deserialize, Serialize};
12use std::{
13 ffi::CString,
14 fmt::{self, Debug},
15 mem,
16 ops::{Bound, RangeBounds},
17 path::Path,
18 ptr,
19 sync::{mpsc::sync_channel, Arc},
20 thread::sleep,
21 time::Duration,
22};
23use tracing::warn;
24
25#[cfg(feature = "read-tx-timeouts")]
27const DEFAULT_MAX_READ_TRANSACTION_DURATION: Duration = Duration::from_secs(5 * 60);
28
29#[derive(Clone)]
34pub struct Environment {
35 inner: Arc<EnvironmentInner>,
36}
37
38impl Environment {
39 pub fn builder() -> EnvironmentBuilder {
41 EnvironmentBuilder {
42 flags: EnvironmentFlags::default(),
43 max_readers: None,
44 max_dbs: None,
45 sync_bytes: None,
46 sync_period: None,
47 rp_augment_limit: None,
48 loose_limit: None,
49 dp_reserve_limit: None,
50 txn_dp_limit: None,
51 spill_max_denominator: None,
52 spill_min_denominator: None,
53 geometry: None,
54 log_level: None,
55 kind: Default::default(),
56 handle_slow_readers: None,
57 #[cfg(feature = "read-tx-timeouts")]
58 max_read_transaction_duration: None,
59 }
60 }
61
62 #[inline]
64 pub fn is_write_map(&self) -> bool {
65 self.inner.env_kind.is_write_map()
66 }
67
68 #[inline]
70 pub fn env_kind(&self) -> EnvironmentKind {
71 self.inner.env_kind
72 }
73
74 #[inline]
76 pub fn is_read_write(&self) -> Result<bool> {
77 Ok(!self.is_read_only()?)
78 }
79
80 #[inline]
82 pub fn is_read_only(&self) -> Result<bool> {
83 Ok(matches!(self.info()?.mode(), Mode::ReadOnly))
84 }
85
86 #[inline]
88 pub(crate) fn txn_manager(&self) -> &TxnManager {
89 &self.inner.txn_manager
90 }
91
92 #[cfg(feature = "read-tx-timeouts")]
94 pub fn timed_out_not_aborted_transactions(&self) -> usize {
95 self.inner
96 .txn_manager
97 .timed_out_not_aborted_read_transactions()
98 .unwrap_or(0)
99 }
100
101 #[inline]
103 pub fn begin_ro_txn(&self) -> Result<Transaction<RO>> {
104 Transaction::new(self.clone())
105 }
106
107 pub fn begin_rw_txn(&self) -> Result<Transaction<RW>> {
110 let mut warned = false;
111 let txn = loop {
112 let (tx, rx) = sync_channel(0);
113 self.txn_manager().send_message(TxnManagerMessage::Begin {
114 parent: TxnPtr(ptr::null_mut()),
115 flags: RW::OPEN_FLAGS,
116 sender: tx,
117 });
118 let res = rx.recv().unwrap();
119 if matches!(&res, Err(Error::Busy)) {
120 if !warned {
121 warned = true;
122 warn!(target: "libmdbx", "Process stalled, awaiting read-write transaction lock.");
123 }
124 sleep(Duration::from_millis(250));
125 continue;
126 }
127
128 break res;
129 }?;
130 Ok(Transaction::new_from_ptr(self.clone(), txn.0))
131 }
132
133 #[inline]
138 pub(crate) fn env_ptr(&self) -> *mut ffi::MDBX_env {
139 self.inner.env
140 }
141
142 #[inline]
148 #[doc(hidden)]
149 pub fn with_raw_env_ptr<F, T>(&self, f: F) -> T
150 where
151 F: FnOnce(*mut ffi::MDBX_env) -> T,
152 {
153 f(self.env_ptr())
154 }
155
156 pub fn sync(&self, force: bool) -> Result<bool> {
158 mdbx_result(unsafe { ffi::mdbx_env_sync_ex(self.env_ptr(), force, false) })
159 }
160
161 pub fn stat(&self) -> Result<Stat> {
163 unsafe {
164 let mut stat = Stat::new();
165 mdbx_result(ffi::mdbx_env_stat_ex(
166 self.env_ptr(),
167 ptr::null(),
168 stat.mdb_stat(),
169 size_of::<Stat>(),
170 ))?;
171 Ok(stat)
172 }
173 }
174
175 pub fn info(&self) -> Result<Info> {
177 unsafe {
178 let mut info = Info(mem::zeroed());
179 mdbx_result(ffi::mdbx_env_info_ex(
180 self.env_ptr(),
181 ptr::null(),
182 &mut info.0,
183 size_of::<Info>(),
184 ))?;
185 Ok(info)
186 }
187 }
188
189 pub fn freelist(&self) -> Result<usize> {
215 let mut freelist: usize = 0;
216 let txn = self.begin_ro_txn()?;
217 let db = Database::freelist_db();
218 let cursor = txn.cursor(&db)?;
219
220 for result in cursor.iter_slices() {
221 let (_key, value) = result?;
222 if value.len() < size_of::<usize>() {
223 return Err(Error::Corrupted);
224 }
225
226 let s = &value[..size_of::<usize>()];
227 freelist += NativeEndian::read_u32(s) as usize;
228 }
229
230 Ok(freelist)
231 }
232}
233
234struct EnvironmentInner {
239 env: *mut ffi::MDBX_env,
243 env_kind: EnvironmentKind,
245 txn_manager: TxnManager,
247}
248
249impl Drop for EnvironmentInner {
250 fn drop(&mut self) {
251 unsafe {
253 ffi::mdbx_env_close_ex(self.env, false);
254 }
255 }
256}
257
258unsafe impl Send for EnvironmentInner {}
261unsafe impl Sync for EnvironmentInner {}
262
263#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
267pub enum EnvironmentKind {
268 #[default]
270 Default,
271 WriteMap,
282}
283
284impl EnvironmentKind {
285 #[inline]
287 pub const fn is_write_map(&self) -> bool {
288 matches!(self, Self::WriteMap)
289 }
290
291 pub(crate) const fn extra_flags(&self) -> ffi::MDBX_env_flags_t {
293 match self {
294 Self::Default => ffi::MDBX_ENV_DEFAULTS,
295 Self::WriteMap => ffi::MDBX_WRITEMAP,
296 }
297 }
298}
299
300#[derive(Copy, Clone, Debug)]
301pub(crate) struct EnvPtr(pub(crate) *mut ffi::MDBX_env);
302unsafe impl Send for EnvPtr {}
303unsafe impl Sync for EnvPtr {}
304
305#[derive(Debug, Serialize, Deserialize)]
309#[repr(transparent)]
310pub struct Stat(ffi::MDBX_stat);
311
312impl Stat {
313 pub(crate) const fn new() -> Self {
315 unsafe { Self(mem::zeroed()) }
316 }
317
318 pub(crate) fn mdb_stat(&mut self) -> *mut ffi::MDBX_stat {
320 &mut self.0
321 }
322}
323
324impl Stat {
325 #[inline]
327 pub const fn page_size(&self) -> u32 {
328 self.0.ms_psize
329 }
330
331 #[inline]
333 pub const fn depth(&self) -> u32 {
334 self.0.ms_depth
335 }
336
337 #[inline]
339 pub const fn branch_pages(&self) -> usize {
340 self.0.ms_branch_pages as usize
341 }
342
343 #[inline]
345 pub const fn leaf_pages(&self) -> usize {
346 self.0.ms_leaf_pages as usize
347 }
348
349 #[inline]
351 pub const fn overflow_pages(&self) -> usize {
352 self.0.ms_overflow_pages as usize
353 }
354
355 #[inline]
357 pub const fn entries(&self) -> usize {
358 self.0.ms_entries as usize
359 }
360}
361
362#[derive(Debug)]
363#[repr(transparent)]
364pub struct GeometryInfo(ffi::MDBX_envinfo__bindgen_ty_1);
365
366impl GeometryInfo {
367 pub const fn min(&self) -> u64 {
368 self.0.lower
369 }
370}
371
372#[derive(Debug, Serialize, Deserialize)]
376#[repr(transparent)]
377pub struct Info(ffi::MDBX_envinfo);
378
379impl Info {
380 pub const fn geometry(&self) -> GeometryInfo {
381 GeometryInfo(self.0.mi_geo)
382 }
383
384 pub const fn mode(&self) -> Mode {
385 let mode = self.0.mi_mode;
386 if (mode & ffi::MDBX_RDONLY) != 0 {
387 Mode::ReadOnly
388 } else {
389 if (mode & ffi::MDBX_SYNC_DURABLE) != 0 {
390 Mode::ReadWrite {
391 sync_mode: SyncMode::Durable,
392 }
393 } else if (mode & ffi::MDBX_UTTERLY_NOSYNC) != 0 {
394 Mode::ReadWrite {
395 sync_mode: SyncMode::UtterlyNoSync,
396 }
397 } else if (mode & ffi::MDBX_NOMETASYNC) != 0 {
398 Mode::ReadWrite {
399 sync_mode: SyncMode::NoMetaSync,
400 }
401 } else if (mode & ffi::MDBX_SAFE_NOSYNC) != 0 {
402 Mode::ReadWrite {
403 sync_mode: SyncMode::SafeNoSync,
404 }
405 } else {
406 Mode::ReadWrite {
407 sync_mode: SyncMode::Durable,
408 }
409 }
410 }
411 }
412
413 #[inline]
415 pub const fn map_size(&self) -> usize {
416 self.0.mi_mapsize as usize
417 }
418
419 #[inline]
421 pub const fn last_pgno(&self) -> usize {
422 self.0.mi_last_pgno as usize
423 }
424
425 #[inline]
427 pub const fn last_txnid(&self) -> usize {
428 self.0.mi_recent_txnid as usize
429 }
430
431 #[inline]
433 pub const fn max_readers(&self) -> usize {
434 self.0.mi_maxreaders as usize
435 }
436
437 #[inline]
439 pub const fn num_readers(&self) -> usize {
440 self.0.mi_numreaders as usize
441 }
442
443 #[inline]
445 pub const fn page_ops(&self) -> PageOps {
446 PageOps {
447 newly: self.0.mi_pgop_stat.newly,
448 cow: self.0.mi_pgop_stat.cow,
449 clone: self.0.mi_pgop_stat.clone,
450 split: self.0.mi_pgop_stat.split,
451 merge: self.0.mi_pgop_stat.merge,
452 spill: self.0.mi_pgop_stat.spill,
453 unspill: self.0.mi_pgop_stat.unspill,
454 wops: self.0.mi_pgop_stat.wops,
455 prefault: self.0.mi_pgop_stat.prefault,
456 mincore: self.0.mi_pgop_stat.mincore,
457 msync: self.0.mi_pgop_stat.msync,
458 fsync: self.0.mi_pgop_stat.fsync,
459 }
460 }
461}
462
463impl fmt::Debug for Environment {
464 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
465 f.debug_struct("Environment")
466 .field("kind", &self.inner.env_kind)
467 .finish_non_exhaustive()
468 }
469}
470
471#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
476pub enum PageSize {
477 MinimalAcceptable,
478 Set(usize),
479}
480
481#[derive(Clone, Debug, PartialEq, Eq)]
483pub struct PageOps {
484 pub newly: u64,
486 pub cow: u64,
488 pub clone: u64,
490 pub split: u64,
492 pub merge: u64,
494 pub spill: u64,
496 pub unspill: u64,
498 pub wops: u64,
500 pub msync: u64,
502 pub fsync: u64,
504 pub prefault: u64,
506 pub mincore: u64,
508}
509
510#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
511pub struct Geometry<R> {
512 pub size: Option<R>,
513 pub growth_step: Option<isize>,
514 pub shrink_threshold: Option<isize>,
515 pub page_size: Option<PageSize>,
516}
517
518impl<R> Default for Geometry<R> {
519 fn default() -> Self {
520 Self {
521 size: None,
522 growth_step: None,
523 shrink_threshold: None,
524 page_size: None,
525 }
526 }
527}
528
529pub type HandleSlowReadersCallback = extern "C" fn(
573 env: *const ffi::MDBX_env,
574 txn: *const ffi::MDBX_txn,
575 pid: ffi::mdbx_pid_t,
576 tid: ffi::mdbx_tid_t,
577 laggard: u64,
578 gap: std::ffi::c_uint,
579 space: usize,
580 retry: std::ffi::c_int,
581) -> HandleSlowReadersReturnCode;
582
583#[derive(Debug)]
584#[repr(i32)]
585pub enum HandleSlowReadersReturnCode {
586 Error = -2,
588 ProceedWithoutKillingReader = -1,
591 Success = 0,
596 ClearReaderSlot = 1,
600 ReaderProcessTerminated = 2,
603}
604
605#[derive(Debug, Clone, Serialize, Deserialize)]
607pub struct RemoteEnvironmentConfig {
608 pub(crate) flags: EnvironmentFlags,
609 pub(crate) max_readers: Option<u64>,
610 pub(crate) max_dbs: Option<u64>,
611 pub(crate) sync_bytes: Option<u64>,
612 pub(crate) sync_period: Option<u64>,
613 pub(crate) rp_augment_limit: Option<u64>,
614 pub(crate) loose_limit: Option<u64>,
615 pub(crate) dp_reserve_limit: Option<u64>,
616 pub(crate) txn_dp_limit: Option<u64>,
617 pub(crate) spill_max_denominator: Option<u64>,
618 pub(crate) spill_min_denominator: Option<u64>,
619 pub(crate) geometry: Option<Geometry<(Option<usize>, Option<usize>)>>,
620 pub(crate) log_level: Option<ffi::MDBX_log_level_t>,
621 pub(crate) kind: EnvironmentKind,
622 pub(crate) max_read_transaction_duration: Option<read_transactions::MaxReadTransactionDuration>,
627}
628
629impl From<EnvironmentBuilder> for RemoteEnvironmentConfig {
630 fn from(value: EnvironmentBuilder) -> Self {
631 Self {
632 flags: value.flags,
633 max_readers: value.max_readers,
634 max_dbs: value.max_dbs,
635 sync_bytes: value.sync_bytes,
636 sync_period: value.sync_period,
637 rp_augment_limit: value.rp_augment_limit,
638 loose_limit: value.loose_limit,
639 dp_reserve_limit: value.dp_reserve_limit,
640 txn_dp_limit: value.txn_dp_limit,
641 spill_max_denominator: value.spill_max_denominator,
642 spill_min_denominator: value.spill_min_denominator,
643 geometry: value.geometry,
644 log_level: value.log_level,
645 kind: value.kind,
646 #[cfg(feature = "read-tx-timeouts")]
647 max_read_transaction_duration: value.max_read_transaction_duration,
648 #[cfg(not(feature = "read-tx-timeouts"))]
649 max_read_transaction_duration: None,
650 }
651 }
652}
653
654impl RemoteEnvironmentConfig {
655 pub(crate) fn env_kind(&self) -> EnvironmentKind {
656 self.kind
657 }
658}
659
660impl From<RemoteEnvironmentConfig> for EnvironmentBuilder {
661 fn from(value: RemoteEnvironmentConfig) -> Self {
662 Self {
663 flags: value.flags,
664 max_readers: value.max_readers,
665 max_dbs: value.max_dbs,
666 sync_bytes: value.sync_bytes,
667 sync_period: value.sync_period,
668 rp_augment_limit: value.rp_augment_limit,
669 loose_limit: value.loose_limit,
670 dp_reserve_limit: value.dp_reserve_limit,
671 txn_dp_limit: value.txn_dp_limit,
672 spill_max_denominator: value.spill_max_denominator,
673 spill_min_denominator: value.spill_min_denominator,
674 geometry: value.geometry,
675 log_level: value.log_level,
676 kind: value.kind,
677 handle_slow_readers: None,
678 #[cfg(feature = "read-tx-timeouts")]
679 max_read_transaction_duration: value.max_read_transaction_duration,
680 }
681 }
682}
683
684#[derive(Debug, Clone)]
686pub struct EnvironmentBuilder {
687 pub(crate) flags: EnvironmentFlags,
688 pub(crate) max_readers: Option<u64>,
689 pub(crate) max_dbs: Option<u64>,
690 pub(crate) sync_bytes: Option<u64>,
691 pub(crate) sync_period: Option<u64>,
692 pub(crate) rp_augment_limit: Option<u64>,
693 pub(crate) loose_limit: Option<u64>,
694 pub(crate) dp_reserve_limit: Option<u64>,
695 pub(crate) txn_dp_limit: Option<u64>,
696 pub(crate) spill_max_denominator: Option<u64>,
697 pub(crate) spill_min_denominator: Option<u64>,
698 pub(crate) geometry: Option<Geometry<(Option<usize>, Option<usize>)>>,
699 pub(crate) log_level: Option<ffi::MDBX_log_level_t>,
700 pub(crate) kind: EnvironmentKind,
701 pub(crate) handle_slow_readers: Option<HandleSlowReadersCallback>,
702 #[cfg(feature = "read-tx-timeouts")]
703 pub(crate) max_read_transaction_duration: Option<read_transactions::MaxReadTransactionDuration>,
706}
707
708impl EnvironmentBuilder {
709 pub fn open(&self, path: &Path) -> Result<Environment> {
713 self.open_with_permissions(path, 0o644)
714 }
715
716 pub fn open_with_permissions(
720 &self,
721 path: &Path,
722 mode: ffi::mdbx_mode_t,
723 ) -> Result<Environment> {
724 let mut env: *mut ffi::MDBX_env = ptr::null_mut();
725 unsafe {
726 if let Some(log_level) = self.log_level {
727 ffi::mdbx_setup_debug(log_level, ffi::MDBX_DBG_DONTCHANGE, None);
730 }
731
732 mdbx_result(ffi::mdbx_env_create(&mut env))?;
733
734 if let Err(e) = (|| {
735 if let Some(geometry) = &self.geometry {
736 let mut min_size = -1;
737 let mut max_size = -1;
738
739 if let Some(size) = geometry.size {
740 if let Some(size) = size.0 {
741 min_size = size as isize;
742 }
743
744 if let Some(size) = size.1 {
745 max_size = size as isize;
746 }
747 }
748
749 mdbx_result(ffi::mdbx_env_set_geometry(
750 env,
751 min_size,
752 -1,
753 max_size,
754 geometry.growth_step.unwrap_or(-1),
755 geometry.shrink_threshold.unwrap_or(-1),
756 match geometry.page_size {
757 None => -1,
758 Some(PageSize::MinimalAcceptable) => 0,
759 Some(PageSize::Set(size)) => size as isize,
760 },
761 ))?;
762 }
763 for (opt, v) in [
764 (ffi::MDBX_opt_max_db, self.max_dbs),
765 (ffi::MDBX_opt_rp_augment_limit, self.rp_augment_limit),
766 (ffi::MDBX_opt_loose_limit, self.loose_limit),
767 (ffi::MDBX_opt_dp_reserve_limit, self.dp_reserve_limit),
768 (ffi::MDBX_opt_txn_dp_limit, self.txn_dp_limit),
769 (
770 ffi::MDBX_opt_spill_max_denominator,
771 self.spill_max_denominator,
772 ),
773 (
774 ffi::MDBX_opt_spill_min_denominator,
775 self.spill_min_denominator,
776 ),
777 ] {
778 if let Some(v) = v {
779 mdbx_result(ffi::mdbx_env_set_option(env, opt, v))?;
780 }
781 }
782
783 if let Some(max_readers) = self.max_readers {
785 mdbx_result(ffi::mdbx_env_set_option(
786 env,
787 ffi::MDBX_opt_max_readers,
788 max_readers,
789 ))?;
790 }
791
792 if let Some(handle_slow_readers) = self.handle_slow_readers {
793 mdbx_result(ffi::mdbx_env_set_hsr(
794 env,
795 convert_hsr_fn(Some(handle_slow_readers)),
796 ))?;
797 }
798
799 #[cfg(unix)]
800 fn path_to_bytes<P: AsRef<Path>>(path: P) -> Vec<u8> {
801 use std::os::unix::ffi::OsStrExt;
802 path.as_ref().as_os_str().as_bytes().to_vec()
803 }
804
805 #[cfg(windows)]
806 fn path_to_bytes<P: AsRef<Path>>(path: P) -> Vec<u8> {
807 path.as_ref().to_string_lossy().to_string().into_bytes()
811 }
812
813 let path = match CString::new(path_to_bytes(path)) {
814 Ok(path) => path,
815 Err(_) => return Err(Error::Invalid),
816 };
817 mdbx_result(ffi::mdbx_env_open(
818 env,
819 path.as_ptr(),
820 self.flags.make_flags() | self.kind.extra_flags(),
821 mode,
822 ))?;
823
824 for (opt, v) in [
825 (ffi::MDBX_opt_sync_bytes, self.sync_bytes),
826 (ffi::MDBX_opt_sync_period, self.sync_period),
827 ] {
828 if let Some(v) = v {
829 mdbx_result(ffi::mdbx_env_set_option(env, opt, v))?;
830 }
831 }
832
833 Ok(())
834 })() {
835 ffi::mdbx_env_close_ex(env, false);
836
837 return Err(e);
838 }
839 }
840
841 let env_ptr = EnvPtr(env);
842
843 #[cfg(not(feature = "read-tx-timeouts"))]
844 let txn_manager = TxnManager::new(env_ptr);
845
846 #[cfg(feature = "read-tx-timeouts")]
847 let txn_manager = {
848 if let crate::MaxReadTransactionDuration::Set(duration) = self
849 .max_read_transaction_duration
850 .unwrap_or(read_transactions::MaxReadTransactionDuration::Set(
851 DEFAULT_MAX_READ_TRANSACTION_DURATION,
852 ))
853 {
854 TxnManager::new_with_max_read_transaction_duration(env_ptr, duration)
855 } else {
856 TxnManager::new(env_ptr)
857 }
858 };
859
860 let env = EnvironmentInner {
861 env,
862 txn_manager,
863 env_kind: self.kind,
864 };
865
866 Ok(Environment {
867 inner: Arc::new(env),
868 })
869 }
870
871 pub fn set_kind(&mut self, kind: EnvironmentKind) -> &mut Self {
873 self.kind = kind;
874 self
875 }
876
877 pub fn write_map(&mut self) -> &mut Self {
881 self.set_kind(EnvironmentKind::WriteMap)
882 }
883
884 pub fn set_flags(&mut self, flags: EnvironmentFlags) -> &mut Self {
886 self.flags = flags;
887 self
888 }
889
890 pub fn set_max_readers(&mut self, max_readers: u64) -> &mut Self {
896 self.max_readers = Some(max_readers);
897 self
898 }
899
900 pub fn set_max_dbs(&mut self, v: usize) -> &mut Self {
910 self.max_dbs = Some(v as u64);
911 self
912 }
913
914 pub fn set_sync_bytes(&mut self, v: usize) -> &mut Self {
917 self.sync_bytes = Some(v as u64);
918 self
919 }
920
921 pub fn set_sync_period(&mut self, v: Duration) -> &mut Self {
925 let as_mdbx_units = (v.as_secs_f64() * 65536f64) as u64;
927 self.sync_period = Some(as_mdbx_units);
928 self
929 }
930
931 pub fn set_rp_augment_limit(&mut self, v: u64) -> &mut Self {
932 self.rp_augment_limit = Some(v);
933 self
934 }
935
936 pub fn set_loose_limit(&mut self, v: u64) -> &mut Self {
937 self.loose_limit = Some(v);
938 self
939 }
940
941 pub fn set_dp_reserve_limit(&mut self, v: u64) -> &mut Self {
942 self.dp_reserve_limit = Some(v);
943 self
944 }
945
946 pub fn set_txn_dp_limit(&mut self, v: u64) -> &mut Self {
947 self.txn_dp_limit = Some(v);
948 self
949 }
950
951 pub fn set_spill_max_denominator(&mut self, v: u8) -> &mut Self {
952 self.spill_max_denominator = Some(v.into());
953 self
954 }
955
956 pub fn set_spill_min_denominator(&mut self, v: u8) -> &mut Self {
957 self.spill_min_denominator = Some(v.into());
958 self
959 }
960
961 pub fn set_geometry<R: RangeBounds<usize>>(&mut self, geometry: Geometry<R>) -> &mut Self {
964 let convert_bound = |bound: Bound<&usize>| match bound {
965 Bound::Included(v) | Bound::Excluded(v) => Some(*v),
966 _ => None,
967 };
968 self.geometry = Some(Geometry {
969 size: geometry.size.map(|range| {
970 (
971 convert_bound(range.start_bound()),
972 convert_bound(range.end_bound()),
973 )
974 }),
975 growth_step: geometry.growth_step,
976 shrink_threshold: geometry.shrink_threshold,
977 page_size: geometry.page_size,
978 });
979 self
980 }
981
982 pub fn set_log_level(&mut self, log_level: ffi::MDBX_log_level_t) -> &mut Self {
983 self.log_level = Some(log_level);
984 self
985 }
986
987 pub fn set_handle_slow_readers(&mut self, hsr: HandleSlowReadersCallback) -> &mut Self {
990 self.handle_slow_readers = Some(hsr);
991 self
992 }
993}
994
995pub(crate) mod read_transactions {
996 #[cfg(feature = "read-tx-timeouts")]
997 use crate::EnvironmentBuilder;
998 use serde::{Deserialize, Serialize};
999 use std::time::Duration;
1000
1001 #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
1003 pub enum MaxReadTransactionDuration {
1004 Unbounded,
1006 Set(Duration),
1008 }
1009
1010 #[cfg(feature = "read-tx-timeouts")]
1011 impl MaxReadTransactionDuration {
1012 pub const fn as_duration(&self) -> Option<Duration> {
1013 match self {
1014 Self::Unbounded => None,
1015 Self::Set(duration) => Some(*duration),
1016 }
1017 }
1018 }
1019
1020 #[cfg(feature = "read-tx-timeouts")]
1021 impl EnvironmentBuilder {
1022 pub fn set_max_read_transaction_duration(
1024 &mut self,
1025 max_read_transaction_duration: MaxReadTransactionDuration,
1026 ) -> &mut Self {
1027 self.max_read_transaction_duration = Some(max_read_transaction_duration);
1028 self
1029 }
1030 }
1031}
1032
1033#[allow(clippy::missing_transmute_annotations)]
1035fn convert_hsr_fn(callback: Option<HandleSlowReadersCallback>) -> ffi::MDBX_hsr_func {
1036 unsafe { std::mem::transmute(callback) }
1037}
1038
1039#[cfg(test)]
1040mod tests {
1041 use crate::{Environment, Error, Geometry, HandleSlowReadersReturnCode, PageSize, WriteFlags};
1042 use std::{
1043 ops::RangeInclusive,
1044 sync::atomic::{AtomicBool, Ordering},
1045 };
1046
1047 #[test]
1048 fn test_handle_slow_readers_callback() {
1049 static CALLED: AtomicBool = AtomicBool::new(false);
1050
1051 extern "C" fn handle_slow_readers(
1052 _env: *const ffi::MDBX_env,
1053 _txn: *const ffi::MDBX_txn,
1054 _pid: ffi::mdbx_pid_t,
1055 _tid: ffi::mdbx_tid_t,
1056 _laggard: u64,
1057 _gap: std::ffi::c_uint,
1058 _space: usize,
1059 _retry: std::ffi::c_int,
1060 ) -> HandleSlowReadersReturnCode {
1061 CALLED.store(true, Ordering::Relaxed);
1062 HandleSlowReadersReturnCode::ProceedWithoutKillingReader
1063 }
1064
1065 let tempdir = tempfile::tempdir().unwrap();
1066 let env = Environment::builder()
1067 .set_geometry(Geometry::<RangeInclusive<usize>> {
1068 size: Some(0..=1024 * 1024), page_size: Some(PageSize::MinimalAcceptable), ..Default::default()
1071 })
1072 .set_handle_slow_readers(handle_slow_readers)
1073 .open(tempdir.path())
1074 .unwrap();
1075
1076 {
1078 let tx = env.begin_rw_txn().unwrap();
1079 let db = tx.open_db(None).unwrap();
1080 for i in 0usize..1_000 {
1081 tx.put(db.dbi(), i.to_le_bytes(), b"0", WriteFlags::empty())
1082 .unwrap()
1083 }
1084 tx.commit().unwrap();
1085 }
1086
1087 let _tx_ro = env.begin_ro_txn().unwrap();
1089
1090 {
1092 let tx = env.begin_rw_txn().unwrap();
1093 let db = tx.open_db(None).unwrap();
1094 for i in 0usize..1_000 {
1095 tx.put(db.dbi(), i.to_le_bytes(), b"1", WriteFlags::empty())
1096 .unwrap();
1097 }
1098 tx.commit().unwrap();
1099 }
1100
1101 {
1104 let tx = env.begin_rw_txn().unwrap();
1105 let db = tx.open_db(None).unwrap();
1106 for i in 1_000usize..1_000_000 {
1107 match tx.put(db.dbi(), i.to_le_bytes(), b"0", WriteFlags::empty()) {
1108 Ok(_) => continue,
1109 Err(Error::MapFull) => break,
1110 result @ Err(_) => result.unwrap(),
1111 }
1112 }
1113 tx.commit().unwrap();
1114 }
1115
1116 assert!(CALLED.load(Ordering::Relaxed));
1118 }
1119}