signet_libmdbx/sys/environment.rs
1use crate::{
2 Database, Mode, SyncMode,
3 error::{MdbxError, MdbxResult, ReadResult, mdbx_result},
4 flags::EnvironmentFlags,
5 sys::txn_manager::{LifecycleHandle, RwSyncLifecycle},
6 tx::aliases::{RoTxSync, RoTxUnsync, RwTxSync, RwTxUnsync},
7};
8use byteorder::{ByteOrder, NativeEndian};
9use mem::size_of;
10use std::{
11 ffi::CString,
12 fmt::{self, Debug},
13 mem,
14 ops::{Bound, RangeBounds},
15 path::Path,
16 ptr,
17 sync::Arc,
18 time::Duration,
19};
20
21/// An environment supports multiple databases, all residing in the same shared-memory map.
22///
23/// Accessing the environment is thread-safe.
24/// The environment will be closed when the last instance of this type is dropped.
25#[derive(Clone)]
26pub struct Environment {
27 inner: Arc<EnvironmentInner>,
28}
29
30impl Environment {
31 /// Creates a new builder for specifying options for opening an MDBX environment.
32 pub fn builder() -> EnvironmentBuilder {
33 EnvironmentBuilder {
34 flags: EnvironmentFlags::default(),
35 max_readers: None,
36 max_dbs: None,
37 sync_bytes: None,
38 sync_period: None,
39 rp_augment_limit: None,
40 loose_limit: None,
41 dp_reserve_limit: None,
42 txn_dp_limit: None,
43 spill_max_denominator: None,
44 spill_min_denominator: None,
45 geometry: None,
46 log_level: None,
47 kind: Default::default(),
48 handle_slow_readers: None,
49 }
50 }
51
52 /// Returns true if the environment was opened as WRITEMAP.
53 #[inline]
54 pub fn is_write_map(&self) -> bool {
55 self.inner.env_kind.is_write_map()
56 }
57
58 /// Returns the kind of the environment.
59 #[inline]
60 pub fn env_kind(&self) -> EnvironmentKind {
61 self.inner.env_kind
62 }
63
64 /// Returns true if the environment was opened in [`crate::Mode::ReadWrite`] mode.
65 #[inline]
66 pub fn is_read_write(&self) -> MdbxResult<bool> {
67 Ok(!self.is_read_only()?)
68 }
69
70 /// Returns true if the environment was opened in [`crate::Mode::ReadOnly`] mode.
71 #[inline]
72 pub fn is_read_only(&self) -> MdbxResult<bool> {
73 Ok(matches!(self.info()?.mode(), Mode::ReadOnly))
74 }
75
76 /// Returns the transaction manager.
77 #[inline]
78 pub(crate) fn txn_manager(&self) -> &LifecycleHandle {
79 &self.inner.txn_manager
80 }
81
82 /// Create a read-only transaction for use with the environment.
83 #[inline]
84 pub fn begin_ro_sync(&self) -> MdbxResult<RoTxSync> {
85 RoTxSync::begin(self.clone())
86 }
87
88 /// Create a read-write transaction for use with the environment. This
89 /// method will block while there are any other read-write transactions
90 /// open on the environment.
91 pub fn begin_rw_sync(&self) -> MdbxResult<RwTxSync> {
92 RwTxSync::begin(self.clone())
93 }
94
95 /// Create a single-threaded read-only transaction for use with the
96 /// environment.
97 ///
98 /// The returned Tx is `!Sync`. As a result, it saves about 30% overhead on
99 /// transaction operations compared to the multi-threaded version, but
100 /// cannot be sent or shared between threads.
101 ///
102 /// If the `read-tx-timeouts` feature is enabled, the transaction will
103 /// have a default timeout applied.
104 pub fn begin_ro_unsync(&self) -> MdbxResult<RoTxUnsync> {
105 RoTxUnsync::begin(self.clone())
106 }
107
108 /// Create a single-threaded read-write transaction for use with the
109 /// environment. This method will block while there are any other read-write
110 /// transactions open on the environment.
111 ///
112 /// The returned Tx is `!Send` and `!Sync`. As a result, it saves about 30%
113 /// overhead on transaction operations compared to the multi-threaded
114 /// version, but cannot be sent or shared between threads.
115 pub fn begin_rw_unsync(&self) -> MdbxResult<RwTxUnsync> {
116 RwTxUnsync::begin(self.clone())
117 }
118
119 /// Returns a raw pointer to the underlying MDBX environment.
120 ///
121 /// The caller **must** ensure that the pointer is never dereferenced after the environment has
122 /// been dropped.
123 #[inline]
124 pub(crate) fn env_ptr(&self) -> *mut ffi::MDBX_env {
125 self.inner.env
126 }
127
128 /// Executes the given closure once
129 ///
130 /// This is only intended to be used when accessing mdbx ffi functions directly is required.
131 ///
132 /// The caller **must** ensure that the pointer is only used within the closure.
133 #[inline]
134 #[doc(hidden)]
135 pub fn with_raw_env_ptr<F, T>(&self, f: F) -> T
136 where
137 F: FnOnce(*mut ffi::MDBX_env) -> T,
138 {
139 f(self.env_ptr())
140 }
141
142 /// Flush the environment data buffers to disk.
143 pub fn sync(&self, force: bool) -> MdbxResult<bool> {
144 mdbx_result(unsafe { ffi::mdbx_env_sync_ex(self.env_ptr(), force, false) })
145 }
146
147 /// Retrieves statistics about this environment.
148 pub fn stat(&self) -> MdbxResult<Stat> {
149 unsafe {
150 let mut stat = Stat::new();
151 mdbx_result(ffi::mdbx_env_stat_ex(
152 self.env_ptr(),
153 ptr::null(),
154 stat.mdb_stat(),
155 size_of::<Stat>(),
156 ))?;
157 Ok(stat)
158 }
159 }
160
161 /// Retrieves info about this environment.
162 pub fn info(&self) -> MdbxResult<Info> {
163 unsafe {
164 let mut info = Info(mem::zeroed());
165 mdbx_result(ffi::mdbx_env_info_ex(
166 self.env_ptr(),
167 ptr::null(),
168 &mut info.0,
169 size_of::<Info>(),
170 ))?;
171 Ok(info)
172 }
173 }
174
175 /// Retrieves the total number of pages on the freelist.
176 ///
177 /// Along with [`Environment::info()`], this can be used to calculate the exact number
178 /// of used pages as well as free pages in this environment.
179 ///
180 /// ```
181 /// # use signet_libmdbx::Environment;
182 /// let dir = tempfile::tempdir().unwrap();
183 /// let env = Environment::builder().open(dir.path()).unwrap();
184 /// let info = env.info().unwrap();
185 /// let stat = env.stat().unwrap();
186 /// let freelist = env.freelist().unwrap();
187 /// let last_pgno = info.last_pgno() + 1; // pgno is 0 based.
188 /// let total_pgs = info.map_size() / stat.page_size() as usize;
189 /// let pgs_in_use = last_pgno - freelist;
190 /// let pgs_free = total_pgs - pgs_in_use;
191 /// ```
192 ///
193 /// Note:
194 ///
195 /// * MDBX stores all the freelists in the designated database 0 in each
196 /// environment, and the freelist count is stored at the beginning of the
197 /// value as `uint32_t` in the native byte order.
198 ///
199 /// * It will create a read transaction to traverse the freelist database.
200 pub fn freelist(&self) -> ReadResult<usize> {
201 let mut freelist: usize = 0;
202 let txn = self.begin_ro_unsync()?;
203 let db = Database::freelist_db();
204 let mut cursor = txn.cursor(db)?;
205 let mut iter = cursor.iter_slices();
206
207 while let Some((_key, value)) = iter.borrow_next()? {
208 if value.len() < size_of::<u32>() {
209 return Err(MdbxError::Corrupted.into());
210 }
211 let s = &value[..size_of::<u32>()];
212 freelist += NativeEndian::read_u32(s) as usize;
213 }
214
215 Ok(freelist)
216 }
217}
218
219/// Container type for Environment internals.
220///
221/// This holds the raw pointer to the MDBX environment and the transaction
222/// manager.
223///
224/// The env is opened via [`mdbx_env_create`](ffi::mdbx_env_create) and closed
225/// when this type drops.
226struct EnvironmentInner {
227 /// The raw pointer to the MDBX environment.
228 ///
229 /// Accessing the environment is thread-safe as long as long as this type exists.
230 env: *mut ffi::MDBX_env,
231 /// Whether the environment was opened as WRITEMAP.
232 env_kind: EnvironmentKind,
233 /// Transaction manager
234 txn_manager: LifecycleHandle,
235}
236
237impl Drop for EnvironmentInner {
238 fn drop(&mut self) {
239 // Close open mdbx environment on drop
240 unsafe {
241 ffi::mdbx_env_close_ex(self.env, false);
242 }
243 }
244}
245
246// SAFETY: internal type, only used inside [Environment]. Accessing the
247// environment pointer is thread-safe
248unsafe impl Send for EnvironmentInner {}
249unsafe impl Sync for EnvironmentInner {}
250
251/// Determines how data is mapped into memory
252///
253/// It only takes effect when the environment is opened.
254#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
255pub enum EnvironmentKind {
256 /// Open the environment in default mode, without WRITEMAP.
257 #[default]
258 Default,
259 /// Open the environment as mdbx-WRITEMAP.
260 ///
261 /// Use a writeable memory map unless the environment is opened as
262 /// `MDBX_RDONLY` ([`crate::Mode::ReadOnly`]).
263 ///
264 /// All data will be mapped into memory in the read-write mode
265 /// [`crate::Mode::ReadWrite`]. This offers a significant performance
266 /// benefit, since the data will be modified directly in mapped memory and
267 /// then flushed to disk by single system call, without any memory
268 /// management nor copying.
269 ///
270 /// This mode is incompatible with nested transactions.
271 WriteMap,
272}
273
274impl EnvironmentKind {
275 /// Returns true if the environment was opened as WRITEMAP.
276 #[inline]
277 pub const fn is_write_map(&self) -> bool {
278 matches!(self, Self::WriteMap)
279 }
280
281 /// Additional flags required when opening the environment.
282 pub(crate) const fn extra_flags(&self) -> ffi::MDBX_env_flags_t {
283 match self {
284 Self::Default => ffi::MDBX_ENV_DEFAULTS,
285 Self::WriteMap => ffi::MDBX_WRITEMAP,
286 }
287 }
288}
289
290#[derive(Copy, Clone, Debug)]
291pub(crate) struct EnvPtr(pub(crate) *mut ffi::MDBX_env);
292unsafe impl Send for EnvPtr {}
293unsafe impl Sync for EnvPtr {}
294
295/// Environment statistics.
296///
297/// Contains information about the size and layout of an MDBX environment or
298/// database.
299#[derive(Debug, Clone, Copy)]
300#[repr(transparent)]
301pub struct Stat(ffi::MDBX_stat);
302
303impl Stat {
304 /// Create a new Stat with zero'd inner struct `ffi::MDB_stat`.
305 pub(crate) const fn new() -> Self {
306 unsafe { Self(mem::zeroed()) }
307 }
308
309 /// Returns a mut pointer to `ffi::MDB_stat`.
310 pub(crate) const fn mdb_stat(&mut self) -> *mut ffi::MDBX_stat {
311 &mut self.0
312 }
313}
314
315impl Stat {
316 /// Size of a database page. This is the same for all databases in the
317 /// environment.
318 #[inline]
319 pub const fn page_size(&self) -> u32 {
320 self.0.ms_psize
321 }
322
323 /// Depth (height) of the B-tree.
324 #[inline]
325 pub const fn depth(&self) -> u32 {
326 self.0.ms_depth
327 }
328
329 /// Number of internal (non-leaf) pages.
330 #[inline]
331 pub const fn branch_pages(&self) -> usize {
332 self.0.ms_branch_pages as usize
333 }
334
335 /// Number of leaf pages.
336 #[inline]
337 pub const fn leaf_pages(&self) -> usize {
338 self.0.ms_leaf_pages as usize
339 }
340
341 /// Number of overflow pages.
342 #[inline]
343 pub const fn overflow_pages(&self) -> usize {
344 self.0.ms_overflow_pages as usize
345 }
346
347 /// Number of data items.
348 #[inline]
349 pub const fn entries(&self) -> usize {
350 self.0.ms_entries as usize
351 }
352}
353
354#[derive(Debug, Clone, Copy)]
355#[repr(transparent)]
356pub struct GeometryInfo(ffi::MDBX_envinfo__bindgen_ty_1);
357
358impl GeometryInfo {
359 /// Minimum geometry setting of the environment.
360 pub const fn min(&self) -> u64 {
361 self.0.lower
362 }
363}
364
365/// Environment information.
366///
367/// Contains environment information about the map size, readers, last txn id
368/// etc.
369#[derive(Debug)]
370#[repr(transparent)]
371pub struct Info(ffi::MDBX_envinfo);
372
373impl Info {
374 /// Geometry settings of the environment.
375 pub const fn geometry(&self) -> GeometryInfo {
376 GeometryInfo(self.0.mi_geo)
377 }
378
379 /// Size of memory map.
380 #[inline]
381 pub const fn map_size(&self) -> usize {
382 self.0.mi_mapsize as usize
383 }
384
385 /// Last used page number
386 #[inline]
387 pub const fn last_pgno(&self) -> usize {
388 self.0.mi_last_pgno as usize
389 }
390
391 /// Last transaction ID
392 #[inline]
393 pub const fn last_txnid(&self) -> usize {
394 self.0.mi_recent_txnid as usize
395 }
396
397 /// Max reader slots in the environment
398 #[inline]
399 pub const fn max_readers(&self) -> usize {
400 self.0.mi_maxreaders as usize
401 }
402
403 /// Max reader slots used in the environment
404 #[inline]
405 pub const fn num_readers(&self) -> usize {
406 self.0.mi_numreaders as usize
407 }
408
409 /// Return the internal page ops metrics
410 #[inline]
411 pub const fn page_ops(&self) -> PageOps {
412 PageOps {
413 newly: self.0.mi_pgop_stat.newly,
414 cow: self.0.mi_pgop_stat.cow,
415 clone: self.0.mi_pgop_stat.clone,
416 split: self.0.mi_pgop_stat.split,
417 merge: self.0.mi_pgop_stat.merge,
418 spill: self.0.mi_pgop_stat.spill,
419 unspill: self.0.mi_pgop_stat.unspill,
420 wops: self.0.mi_pgop_stat.wops,
421 prefault: self.0.mi_pgop_stat.prefault,
422 mincore: self.0.mi_pgop_stat.mincore,
423 msync: self.0.mi_pgop_stat.msync,
424 fsync: self.0.mi_pgop_stat.fsync,
425 }
426 }
427
428 /// Return the mode of the database
429 #[inline]
430 pub const fn mode(&self) -> Mode {
431 let mode = self.0.mi_mode as ffi::MDBX_env_flags_t;
432 if (mode & ffi::MDBX_RDONLY) != 0 {
433 Mode::ReadOnly
434 } else if (mode & ffi::MDBX_UTTERLY_NOSYNC) != 0 {
435 Mode::ReadWrite { sync_mode: SyncMode::UtterlyNoSync }
436 } else if (mode & ffi::MDBX_NOMETASYNC) != 0 {
437 Mode::ReadWrite { sync_mode: SyncMode::NoMetaSync }
438 } else if (mode & ffi::MDBX_SAFE_NOSYNC) != 0 {
439 Mode::ReadWrite { sync_mode: SyncMode::SafeNoSync }
440 } else {
441 Mode::ReadWrite { sync_mode: SyncMode::Durable }
442 }
443 }
444}
445
446impl fmt::Debug for Environment {
447 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
448 f.debug_struct("Environment").field("kind", &self.inner.env_kind).finish_non_exhaustive()
449 }
450}
451
452///////////////////////////////////////////////////////////////////////////////////////////////////
453// Environment Builder
454///////////////////////////////////////////////////////////////////////////////////////////////////
455
456/// Page size settings for the database environment
457#[derive(Debug, Clone, Copy, PartialEq, Eq)]
458pub enum PageSize {
459 /// Determine a page size automatically based on the underlying system.
460 MinimalAcceptable,
461 /// Use the specified page size in bytes.
462 Set(usize),
463}
464
465/// Statistics of page operations overall of all (running, completed and
466/// aborted) transactions
467#[derive(Debug, Clone, Copy, PartialEq, Eq)]
468pub struct PageOps {
469 /// Quantity of a new pages added
470 pub newly: u64,
471 /// Quantity of pages copied for update
472 pub cow: u64,
473 /// Quantity of parent's dirty pages clones for nested transactions
474 pub clone: u64,
475 /// Page splits
476 pub split: u64,
477 /// Page merges
478 pub merge: u64,
479 /// Quantity of spilled dirty pages
480 pub spill: u64,
481 /// Quantity of unspilled/reloaded pages
482 pub unspill: u64,
483 /// Number of explicit write operations (not a pages) to a disk
484 pub wops: u64,
485 /// Number of explicit msync/flush-to-disk operations
486 pub msync: u64,
487 /// Number of explicit fsync/flush-to-disk operations
488 pub fsync: u64,
489 /// Number of prefault write operations
490 pub prefault: u64,
491 /// Number of `mincore()` calls
492 pub mincore: u64,
493}
494
495/// Represents the geometry settings for the database environment
496#[derive(Clone, Debug, PartialEq, Eq)]
497pub struct Geometry<R> {
498 /// The size range in bytes.
499 pub size: Option<R>,
500 /// The growth step in bytes.
501 pub growth_step: Option<isize>,
502 /// The shrink threshold in bytes.
503 pub shrink_threshold: Option<isize>,
504 /// The page size.
505 pub page_size: Option<PageSize>,
506}
507
508impl<R> Default for Geometry<R> {
509 fn default() -> Self {
510 Self { size: None, growth_step: None, shrink_threshold: None, page_size: None }
511 }
512}
513
514/// Handle-Slow-Readers callback function to resolve database full/overflow
515/// issue due to a reader(s) which prevents the old data from being recycled.
516///
517/// Read transactions prevent reuse of pages freed by newer write transactions,
518/// thus the database can grow quickly. This callback will be called when there
519/// is not enough space in the database (i.e. before increasing the database
520/// size or before `MDBX_MAP_FULL` error) and thus can be used to resolve
521/// issues with a "long-lived" read transactions.
522///
523/// Depending on the arguments and needs, your implementation may wait,
524/// terminate a process or thread that is performing a long read, or perform
525/// some other action. In doing so it is important that the returned code always
526/// corresponds to the performed action.
527///
528/// # Arguments
529///
530/// * `process_id` – A process id of the reader process.
531/// * `thread_id` – A thread id of the reader thread.
532/// * `read_txn_id` – An oldest read transaction number on which stalled.
533/// * `gap` – A lag from the last committed txn.
534/// * `space` – A space that actually become available for reuse after this
535/// reader finished. The callback function can take this value into account
536/// to evaluate the impact that a long-running
537/// transaction has.
538/// * `retry` – A retry number starting from 0. If callback has returned 0 at
539/// least once, then at end of current handling loop the callback function
540/// will be called additionally with negative `retry` value to notify about
541/// the end of loop. The callback function can use this fact to implement
542/// timeout reset logic while waiting for a readers.
543///
544/// # Returns
545///
546/// A return code that determines the further actions for MDBX and must match
547/// the action which was executed by the callback:
548///
549/// * `-2` or less – An error condition and the reader was not killed.
550/// * `-1` – The callback was unable to solve the problem and agreed on
551/// `MDBX_MAP_FULL` error; MDBX should increase the database size or return
552/// `MDBX_MAP_FULL` error.
553/// * `0` – The callback solved the problem or just waited for a while, libmdbx
554/// should rescan the reader lock table and retry. This also includes a
555/// situation when corresponding transaction terminated in normal way by
556/// `mdbx_txn_abort()` or `mdbx_txn_reset()`, and may be restarted. I.e.
557/// reader slot isn't needed to be cleaned from transaction.
558/// * `1` – Transaction aborted asynchronous and reader slot should be cleared
559/// immediately, i.e. read transaction will not continue but
560/// `mdbx_txn_abort()` nor `mdbx_txn_reset()` will be called later.
561/// * `>=2` – The reader process was terminated or killed, and MDBX should
562/// entirely reset reader registration.
563pub type HandleSlowReadersCallback = extern "C" fn(
564 env: *const ffi::MDBX_env,
565 txn: *const ffi::MDBX_txn,
566 pid: ffi::mdbx_pid_t,
567 tid: ffi::mdbx_tid_t,
568 laggard: u64,
569 gap: std::ffi::c_uint,
570 space: usize,
571 retry: std::ffi::c_int,
572) -> HandleSlowReadersReturnCode;
573
574#[derive(Debug, Clone, Copy, PartialEq, Eq)]
575#[repr(i32)]
576/// Return codes for the Handle-Slow-Readers callback function.
577pub enum HandleSlowReadersReturnCode {
578 /// An error condition and the reader was not killed.
579 Error = -2,
580 /// The callback was unable to solve the problem and agreed on
581 /// `MDBX_MAP_FULL` error; MDBX should increase the database size or return
582 /// `MDBX_MAP_FULL` error.
583 ProceedWithoutKillingReader = -1,
584 /// The callback solved the problem or just waited for a while, libmdbx
585 /// should rescan the reader lock table and retry. This also includes a
586 /// situation when corresponding transaction terminated in normal way by
587 /// `mdbx_txn_abort()` or `mdbx_txn_reset()`, and may be restarted. I.e.
588 /// reader slot isn't needed to be cleaned from transaction.
589 Success = 0,
590 /// Transaction aborted asynchronous and reader slot should be cleared
591 /// immediately, i.e. read transaction will not continue but
592 /// `mdbx_txn_abort()` nor `mdbx_txn_reset()` will be called later.
593 ClearReaderSlot = 1,
594 /// The reader process was terminated or killed, and MDBX should entirely
595 /// reset reader registration.
596 ReaderProcessTerminated = 2,
597}
598
599/// Options for opening or creating an environment.
600#[derive(Debug, Clone)]
601pub struct EnvironmentBuilder {
602 flags: EnvironmentFlags,
603 max_readers: Option<u64>,
604 max_dbs: Option<u64>,
605 sync_bytes: Option<u64>,
606 sync_period: Option<u64>,
607 rp_augment_limit: Option<u64>,
608 loose_limit: Option<u64>,
609 dp_reserve_limit: Option<u64>,
610 txn_dp_limit: Option<u64>,
611 spill_max_denominator: Option<u64>,
612 spill_min_denominator: Option<u64>,
613 geometry: Option<Geometry<(Option<usize>, Option<usize>)>>,
614 log_level: Option<ffi::MDBX_log_level_t>,
615 kind: EnvironmentKind,
616 handle_slow_readers: Option<HandleSlowReadersCallback>,
617}
618
619impl EnvironmentBuilder {
620 /// Open an environment.
621 ///
622 /// Database files will be opened with 644 permissions.
623 pub fn open(&self, path: &Path) -> MdbxResult<Environment> {
624 self.open_with_permissions(path, 0o644)
625 }
626
627 /// Open an environment with the provided UNIX permissions.
628 ///
629 /// The path may not contain the null character.
630 pub fn open_with_permissions(
631 &self,
632 path: &Path,
633 mode: ffi::mdbx_mode_t,
634 ) -> MdbxResult<Environment> {
635 let mut env: *mut ffi::MDBX_env = ptr::null_mut();
636 unsafe {
637 if let Some(log_level) = self.log_level {
638 // Returns the previously debug_flags in the 0-15 bits and log_level in the
639 // 16-31 bits, no need to use `mdbx_result`.
640 ffi::mdbx_setup_debug(log_level, ffi::MDBX_DBG_DONTCHANGE, None);
641 }
642
643 mdbx_result(ffi::mdbx_env_create(&mut env))?;
644
645 if let Err(e) = (|| {
646 if let Some(geometry) = &self.geometry {
647 let mut min_size = -1;
648 let mut max_size = -1;
649
650 if let Some(size) = geometry.size {
651 if let Some(size) = size.0 {
652 min_size = size as isize;
653 }
654
655 if let Some(size) = size.1 {
656 max_size = size as isize;
657 }
658 }
659
660 mdbx_result(ffi::mdbx_env_set_geometry(
661 env,
662 min_size,
663 -1,
664 max_size,
665 geometry.growth_step.unwrap_or(-1),
666 geometry.shrink_threshold.unwrap_or(-1),
667 match geometry.page_size {
668 None => -1,
669 Some(PageSize::MinimalAcceptable) => 0,
670 Some(PageSize::Set(size)) => size as isize,
671 },
672 ))?;
673 }
674 for (opt, v) in [
675 (ffi::MDBX_opt_max_db, self.max_dbs),
676 (ffi::MDBX_opt_rp_augment_limit, self.rp_augment_limit),
677 (ffi::MDBX_opt_loose_limit, self.loose_limit),
678 (ffi::MDBX_opt_dp_reserve_limit, self.dp_reserve_limit),
679 (ffi::MDBX_opt_txn_dp_limit, self.txn_dp_limit),
680 (ffi::MDBX_opt_spill_max_denominator, self.spill_max_denominator),
681 (ffi::MDBX_opt_spill_min_denominator, self.spill_min_denominator),
682 ] {
683 if let Some(v) = v {
684 mdbx_result(ffi::mdbx_env_set_option(env, opt, v))?;
685 }
686 }
687
688 // set max readers if specified
689 if let Some(max_readers) = self.max_readers {
690 mdbx_result(ffi::mdbx_env_set_option(
691 env,
692 ffi::MDBX_opt_max_readers,
693 max_readers,
694 ))?;
695 }
696
697 if let Some(handle_slow_readers) = self.handle_slow_readers {
698 mdbx_result(ffi::mdbx_env_set_hsr(
699 env,
700 convert_hsr_fn(Some(handle_slow_readers)),
701 ))?;
702 }
703
704 #[cfg(unix)]
705 fn path_to_bytes<P: AsRef<Path>>(path: P) -> Vec<u8> {
706 use std::os::unix::ffi::OsStrExt;
707 path.as_ref().as_os_str().as_bytes().to_vec()
708 }
709
710 #[cfg(windows)]
711 fn path_to_bytes<P: AsRef<Path>>(path: P) -> Vec<u8> {
712 // On Windows, could use std::os::windows::ffi::OsStrExt to encode_wide(),
713 // but we end up with a Vec<u16> instead of a Vec<u8>, so that doesn't
714 // really help.
715 path.as_ref().to_string_lossy().to_string().into_bytes()
716 }
717
718 let path = match CString::new(path_to_bytes(path)) {
719 Ok(path) => path,
720 Err(_) => return Err(MdbxError::Invalid),
721 };
722 mdbx_result(ffi::mdbx_env_open(
723 env,
724 path.as_ptr(),
725 self.flags.make_flags() | self.kind.extra_flags(),
726 mode,
727 ))?;
728
729 for (opt, v) in [
730 (ffi::MDBX_opt_sync_bytes, self.sync_bytes),
731 (ffi::MDBX_opt_sync_period, self.sync_period),
732 ] {
733 if let Some(v) = v {
734 mdbx_result(ffi::mdbx_env_set_option(env, opt, v))?;
735 }
736 }
737
738 Ok(())
739 })() {
740 ffi::mdbx_env_close_ex(env, false);
741
742 return Err(e);
743 }
744 }
745
746 let env_ptr = EnvPtr(env);
747
748 let txn_manager = RwSyncLifecycle::spawn(env_ptr);
749
750 let env = EnvironmentInner { env, txn_manager, env_kind: self.kind };
751
752 Ok(Environment { inner: Arc::new(env) })
753 }
754
755 /// Configures how this environment will be opened.
756 pub const fn set_kind(&mut self, kind: EnvironmentKind) -> &mut Self {
757 self.kind = kind;
758 self
759 }
760
761 /// Opens the environment with mdbx WRITEMAP
762 ///
763 /// See also [`EnvironmentKind`]
764 pub const fn write_map(&mut self) -> &mut Self {
765 self.set_kind(EnvironmentKind::WriteMap)
766 }
767
768 /// Sets the provided options in the environment.
769 pub const fn set_flags(&mut self, flags: EnvironmentFlags) -> &mut Self {
770 self.flags = flags;
771 self
772 }
773
774 /// Sets the maximum number of threads or reader slots for the environment.
775 ///
776 /// This defines the number of slots in the lock table that is used to
777 /// track readers in the environment. The default is 126. Starting a
778 /// read-only transaction normally ties a lock table slot to [`Tx`] object
779 /// until it or the [`Environment`] object is destroyed.
780 ///
781 /// [`Tx`]: crate::tx::Tx
782 pub const fn set_max_readers(&mut self, max_readers: u64) -> &mut Self {
783 self.max_readers = Some(max_readers);
784 self
785 }
786
787 /// Sets the maximum number of named databases for the environment.
788 ///
789 /// This function is only needed if multiple databases will be used in the
790 /// environment. Simpler applications that use the environment as a single
791 /// unnamed database can ignore this option.
792 ///
793 /// Currently a moderate number of slots are cheap but a huge number gets
794 /// expensive: 7-120 words per transaction, and every call to
795 /// [`Tx::open_db()`] or [`Tx::open_db_no_cache()`] does a linear
796 ///
797 /// [`Tx::open_db()`]: crate::tx::Tx::open_db
798 /// [`Tx::open_db_no_cache()`]: crate::tx::Tx::open_db_no_cache
799 /// search of the opened slots.
800 pub const fn set_max_dbs(&mut self, v: usize) -> &mut Self {
801 self.max_dbs = Some(v as u64);
802 self
803 }
804
805 /// Sets the interprocess/shared threshold to force flush the data buffers
806 /// to disk, if [`SyncMode::SafeNoSync`] is used.
807 pub const fn set_sync_bytes(&mut self, v: usize) -> &mut Self {
808 self.sync_bytes = Some(v as u64);
809 self
810 }
811
812 /// Sets the interprocess/shared relative period since the last unsteady commit to force flush
813 /// the data buffers to disk, if [`SyncMode::SafeNoSync`] is used.
814 pub fn set_sync_period(&mut self, v: Duration) -> &mut Self {
815 // For this option, mdbx uses units of 1/65536 of a second.
816 let as_mdbx_units = (v.as_secs_f64() * 65536f64) as u64;
817 self.sync_period = Some(as_mdbx_units);
818 self
819 }
820
821 /// Sets the relative limit of the reader's page-table augmentation.
822 pub const fn set_rp_augment_limit(&mut self, v: u64) -> &mut Self {
823 self.rp_augment_limit = Some(v);
824 self
825 }
826
827 /// Sets the relative loose limit of the environment.
828 pub const fn set_loose_limit(&mut self, v: u64) -> &mut Self {
829 self.loose_limit = Some(v);
830 self
831 }
832
833 /// Sets the relative dirty-page reserve limit of the environment.
834 pub const fn set_dp_reserve_limit(&mut self, v: u64) -> &mut Self {
835 self.dp_reserve_limit = Some(v);
836 self
837 }
838
839 /// Sets the relative dirty-page limit per transaction.
840 pub const fn set_txn_dp_limit(&mut self, v: u64) -> &mut Self {
841 self.txn_dp_limit = Some(v);
842 self
843 }
844
845 /// Sets the spill maximum denominator.
846 pub fn set_spill_max_denominator(&mut self, v: u8) -> &mut Self {
847 self.spill_max_denominator = Some(v.into());
848 self
849 }
850
851 /// Sets the spill minimum denominator.
852 pub fn set_spill_min_denominator(&mut self, v: u8) -> &mut Self {
853 self.spill_min_denominator = Some(v.into());
854 self
855 }
856
857 /// Set all size-related parameters of environment, including page size and the min/max size of
858 /// the memory map.
859 pub fn set_geometry<R: RangeBounds<usize>>(&mut self, geometry: Geometry<R>) -> &mut Self {
860 let convert_bound = |bound: Bound<&usize>| match bound {
861 Bound::Included(v) | Bound::Excluded(v) => Some(*v),
862 _ => None,
863 };
864 self.geometry = Some(Geometry {
865 size: geometry.size.map(|range| {
866 (convert_bound(range.start_bound()), convert_bound(range.end_bound()))
867 }),
868 growth_step: geometry.growth_step,
869 shrink_threshold: geometry.shrink_threshold,
870 page_size: geometry.page_size,
871 });
872 self
873 }
874
875 /// Set the logging level for the environment.
876 pub const fn set_log_level(&mut self, log_level: ffi::MDBX_log_level_t) -> &mut Self {
877 self.log_level = Some(log_level);
878 self
879 }
880
881 /// Set the Handle-Slow-Readers callback. See [`HandleSlowReadersCallback`] for more
882 /// information.
883 pub fn set_handle_slow_readers(&mut self, hsr: HandleSlowReadersCallback) -> &mut Self {
884 self.handle_slow_readers = Some(hsr);
885 self
886 }
887}
888
889/// Converts a [`HandleSlowReadersCallback`] to the actual FFI function pointer.
890fn convert_hsr_fn(callback: Option<HandleSlowReadersCallback>) -> ffi::MDBX_hsr_func {
891 unsafe { std::mem::transmute(callback) }
892}
893
894#[cfg(test)]
895mod tests {
896 use crate::{
897 Environment, Geometry, MdbxError, WriteFlags,
898 sys::{HandleSlowReadersReturnCode, PageSize},
899 };
900 use std::{
901 ops::RangeInclusive,
902 sync::atomic::{AtomicBool, Ordering},
903 };
904
905 #[test]
906 fn test_handle_slow_readers_callback() {
907 static CALLED: AtomicBool = AtomicBool::new(false);
908
909 extern "C" fn handle_slow_readers(
910 _env: *const ffi::MDBX_env,
911 _txn: *const ffi::MDBX_txn,
912 _pid: ffi::mdbx_pid_t,
913 _tid: ffi::mdbx_tid_t,
914 _laggard: u64,
915 _gap: std::ffi::c_uint,
916 _space: usize,
917 _retry: std::ffi::c_int,
918 ) -> HandleSlowReadersReturnCode {
919 CALLED.store(true, Ordering::Relaxed);
920 HandleSlowReadersReturnCode::ProceedWithoutKillingReader
921 }
922
923 let tempdir = tempfile::tempdir().unwrap();
924 let env = Environment::builder()
925 .set_geometry(Geometry::<RangeInclusive<usize>> {
926 size: Some(0..=1024 * 1024), // Max 1MB, so we can hit the limit
927 page_size: Some(PageSize::MinimalAcceptable), // To create as many pages as possible
928 ..Default::default()
929 })
930 .set_handle_slow_readers(handle_slow_readers)
931 .open(tempdir.path())
932 .unwrap();
933
934 // Insert some data in the database, so the read transaction can lock on the snapshot of it
935 {
936 let tx = env.begin_rw_sync().unwrap();
937 let db = tx.open_db(None).unwrap();
938 for i in 0usize..1_000 {
939 tx.put(db, i.to_le_bytes(), b"0", WriteFlags::empty()).unwrap()
940 }
941 tx.commit().unwrap();
942 }
943
944 // Create a read transaction
945 let _tx_ro = env.begin_ro_sync().unwrap();
946 // Change previously inserted data, so the read transaction would use the previous snapshot
947 {
948 let tx = env.begin_rw_sync().unwrap();
949 let db = tx.open_db(None).unwrap();
950 for i in 0usize..1_000 {
951 tx.put(db, i.to_le_bytes(), b"1", WriteFlags::empty()).unwrap();
952 }
953 tx.commit().unwrap();
954 }
955
956 // Insert more data in the database, so we hit the DB size limit error, and MDBX tries to
957 // kick long-lived readers and delete their snapshots
958 {
959 let tx = env.begin_rw_sync().unwrap();
960 let db = tx.open_db(None).unwrap();
961 for i in 1_000usize..1_000_000 {
962 match tx.put(db, i.to_le_bytes(), b"0", WriteFlags::empty()) {
963 Ok(_) => {}
964 Err(MdbxError::MapFull) => break,
965 result @ Err(_) => result.unwrap(),
966 }
967 }
968 // The transaction may be in an error state after hitting MapFull,
969 // so commit could fail. We don't care about the result here since
970 // the purpose of this test is to verify the HSR callback was called.
971 let _ = tx.commit();
972 }
973
974 // Expect the HSR to be called
975 assert!(CALLED.load(Ordering::Relaxed));
976 }
977}