Skip to main content

signet_libmdbx/sys/
environment.rs

1use crate::{
2    Database, Mode, SyncMode,
3    error::{MdbxError, MdbxResult, ReadResult, mdbx_result},
4    flags::EnvironmentFlags,
5    sys::txn_manager::{LifecycleHandle, RwSyncLifecycle},
6    tx::aliases::{RoTxSync, RoTxUnsync, RwTxSync, RwTxUnsync},
7};
8use byteorder::{ByteOrder, NativeEndian};
9use mem::size_of;
10use std::{
11    ffi::CString,
12    fmt::{self, Debug},
13    mem,
14    ops::{Bound, RangeBounds},
15    path::Path,
16    ptr,
17    sync::Arc,
18    time::Duration,
19};
20
21/// An environment supports multiple databases, all residing in the same shared-memory map.
22///
23/// Accessing the environment is thread-safe.
24/// The environment will be closed when the last instance of this type is dropped.
25#[derive(Clone)]
26pub struct Environment {
27    inner: Arc<EnvironmentInner>,
28}
29
30impl Environment {
31    /// Creates a new builder for specifying options for opening an MDBX environment.
32    pub fn builder() -> EnvironmentBuilder {
33        EnvironmentBuilder {
34            flags: EnvironmentFlags::default(),
35            max_readers: None,
36            max_dbs: None,
37            sync_bytes: None,
38            sync_period: None,
39            rp_augment_limit: None,
40            loose_limit: None,
41            dp_reserve_limit: None,
42            txn_dp_limit: None,
43            spill_max_denominator: None,
44            spill_min_denominator: None,
45            geometry: None,
46            log_level: None,
47            kind: Default::default(),
48            handle_slow_readers: None,
49        }
50    }
51
52    /// Returns true if the environment was opened as WRITEMAP.
53    #[inline]
54    pub fn is_write_map(&self) -> bool {
55        self.inner.env_kind.is_write_map()
56    }
57
58    /// Returns the kind of the environment.
59    #[inline]
60    pub fn env_kind(&self) -> EnvironmentKind {
61        self.inner.env_kind
62    }
63
64    /// Returns true if the environment was opened in [`crate::Mode::ReadWrite`] mode.
65    #[inline]
66    pub fn is_read_write(&self) -> MdbxResult<bool> {
67        Ok(!self.is_read_only()?)
68    }
69
70    /// Returns true if the environment was opened in [`crate::Mode::ReadOnly`] mode.
71    #[inline]
72    pub fn is_read_only(&self) -> MdbxResult<bool> {
73        Ok(matches!(self.info()?.mode(), Mode::ReadOnly))
74    }
75
76    /// Returns the transaction manager.
77    #[inline]
78    pub(crate) fn txn_manager(&self) -> &LifecycleHandle {
79        &self.inner.txn_manager
80    }
81
82    /// Create a read-only transaction for use with the environment.
83    #[inline]
84    pub fn begin_ro_sync(&self) -> MdbxResult<RoTxSync> {
85        RoTxSync::begin(self.clone())
86    }
87
88    /// Create a read-write transaction for use with the environment. This
89    /// method will block while there are any other read-write transactions
90    /// open on the environment.
91    pub fn begin_rw_sync(&self) -> MdbxResult<RwTxSync> {
92        RwTxSync::begin(self.clone())
93    }
94
95    /// Create a single-threaded read-only transaction for use with the
96    /// environment.
97    ///
98    /// The returned Tx is `!Sync`. As a result, it saves about 30% overhead on
99    /// transaction operations compared to the multi-threaded version, but
100    /// cannot be sent or shared between threads.
101    ///
102    /// If the `read-tx-timeouts` feature is enabled, the transaction will
103    /// have a default timeout applied.
104    pub fn begin_ro_unsync(&self) -> MdbxResult<RoTxUnsync> {
105        RoTxUnsync::begin(self.clone())
106    }
107
108    /// Create a single-threaded read-write transaction for use with the
109    /// environment. This method will block while there are any other read-write
110    /// transactions open on the environment.
111    ///
112    /// The returned Tx is `!Send` and `!Sync`. As a result, it saves about 30%
113    /// overhead on transaction operations compared to the multi-threaded
114    /// version, but cannot be sent or shared between threads.
115    pub fn begin_rw_unsync(&self) -> MdbxResult<RwTxUnsync> {
116        RwTxUnsync::begin(self.clone())
117    }
118
119    /// Returns a raw pointer to the underlying MDBX environment.
120    ///
121    /// The caller **must** ensure that the pointer is never dereferenced after the environment has
122    /// been dropped.
123    #[inline]
124    pub(crate) fn env_ptr(&self) -> *mut ffi::MDBX_env {
125        self.inner.env
126    }
127
128    /// Executes the given closure once
129    ///
130    /// This is only intended to be used when accessing mdbx ffi functions directly is required.
131    ///
132    /// The caller **must** ensure that the pointer is only used within the closure.
133    #[inline]
134    #[doc(hidden)]
135    pub fn with_raw_env_ptr<F, T>(&self, f: F) -> T
136    where
137        F: FnOnce(*mut ffi::MDBX_env) -> T,
138    {
139        f(self.env_ptr())
140    }
141
142    /// Flush the environment data buffers to disk.
143    pub fn sync(&self, force: bool) -> MdbxResult<bool> {
144        mdbx_result(unsafe { ffi::mdbx_env_sync_ex(self.env_ptr(), force, false) })
145    }
146
147    /// Retrieves statistics about this environment.
148    pub fn stat(&self) -> MdbxResult<Stat> {
149        unsafe {
150            let mut stat = Stat::new();
151            mdbx_result(ffi::mdbx_env_stat_ex(
152                self.env_ptr(),
153                ptr::null(),
154                stat.mdb_stat(),
155                size_of::<Stat>(),
156            ))?;
157            Ok(stat)
158        }
159    }
160
161    /// Retrieves info about this environment.
162    pub fn info(&self) -> MdbxResult<Info> {
163        unsafe {
164            let mut info = Info(mem::zeroed());
165            mdbx_result(ffi::mdbx_env_info_ex(
166                self.env_ptr(),
167                ptr::null(),
168                &mut info.0,
169                size_of::<Info>(),
170            ))?;
171            Ok(info)
172        }
173    }
174
175    /// Retrieves the total number of pages on the freelist.
176    ///
177    /// Along with [`Environment::info()`], this can be used to calculate the exact number
178    /// of used pages as well as free pages in this environment.
179    ///
180    /// ```
181    /// # use signet_libmdbx::Environment;
182    /// let dir = tempfile::tempdir().unwrap();
183    /// let env = Environment::builder().open(dir.path()).unwrap();
184    /// let info = env.info().unwrap();
185    /// let stat = env.stat().unwrap();
186    /// let freelist = env.freelist().unwrap();
187    /// let last_pgno = info.last_pgno() + 1; // pgno is 0 based.
188    /// let total_pgs = info.map_size() / stat.page_size() as usize;
189    /// let pgs_in_use = last_pgno - freelist;
190    /// let pgs_free = total_pgs - pgs_in_use;
191    /// ```
192    ///
193    /// Note:
194    ///
195    /// * MDBX stores all the freelists in the designated database 0 in each
196    ///   environment, and the freelist count is stored at the beginning of the
197    ///   value as `uint32_t` in the native byte order.
198    ///
199    /// * It will create a read transaction to traverse the freelist database.
200    pub fn freelist(&self) -> ReadResult<usize> {
201        let mut freelist: usize = 0;
202        let txn = self.begin_ro_unsync()?;
203        let db = Database::freelist_db();
204        let mut cursor = txn.cursor(db)?;
205        let mut iter = cursor.iter_slices();
206
207        while let Some((_key, value)) = iter.borrow_next()? {
208            if value.len() < size_of::<u32>() {
209                return Err(MdbxError::Corrupted.into());
210            }
211            let s = &value[..size_of::<u32>()];
212            freelist += NativeEndian::read_u32(s) as usize;
213        }
214
215        Ok(freelist)
216    }
217}
218
219/// Container type for Environment internals.
220///
221/// This holds the raw pointer to the MDBX environment and the transaction
222/// manager.
223///
224/// The env is opened via [`mdbx_env_create`](ffi::mdbx_env_create) and closed
225/// when this type drops.
226struct EnvironmentInner {
227    /// The raw pointer to the MDBX environment.
228    ///
229    /// Accessing the environment is thread-safe as long as long as this type exists.
230    env: *mut ffi::MDBX_env,
231    /// Whether the environment was opened as WRITEMAP.
232    env_kind: EnvironmentKind,
233    /// Transaction manager
234    txn_manager: LifecycleHandle,
235}
236
237impl Drop for EnvironmentInner {
238    fn drop(&mut self) {
239        // Close open mdbx environment on drop
240        unsafe {
241            ffi::mdbx_env_close_ex(self.env, false);
242        }
243    }
244}
245
246// SAFETY: internal type, only used inside [Environment]. Accessing the
247// environment pointer is thread-safe
248unsafe impl Send for EnvironmentInner {}
249unsafe impl Sync for EnvironmentInner {}
250
251/// Determines how data is mapped into memory
252///
253/// It only takes effect when the environment is opened.
254#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
255pub enum EnvironmentKind {
256    /// Open the environment in default mode, without WRITEMAP.
257    #[default]
258    Default,
259    /// Open the environment as mdbx-WRITEMAP.
260    ///
261    /// Use a writeable memory map unless the environment is opened as
262    /// `MDBX_RDONLY` ([`crate::Mode::ReadOnly`]).
263    ///
264    /// All data will be mapped into memory in the read-write mode
265    /// [`crate::Mode::ReadWrite`]. This offers a significant performance
266    /// benefit, since the data will be modified directly in mapped memory and
267    /// then flushed to disk by single system call, without any memory
268    /// management nor copying.
269    ///
270    /// This mode is incompatible with nested transactions.
271    WriteMap,
272}
273
274impl EnvironmentKind {
275    /// Returns true if the environment was opened as WRITEMAP.
276    #[inline]
277    pub const fn is_write_map(&self) -> bool {
278        matches!(self, Self::WriteMap)
279    }
280
281    /// Additional flags required when opening the environment.
282    pub(crate) const fn extra_flags(&self) -> ffi::MDBX_env_flags_t {
283        match self {
284            Self::Default => ffi::MDBX_ENV_DEFAULTS,
285            Self::WriteMap => ffi::MDBX_WRITEMAP,
286        }
287    }
288}
289
290#[derive(Copy, Clone, Debug)]
291pub(crate) struct EnvPtr(pub(crate) *mut ffi::MDBX_env);
292unsafe impl Send for EnvPtr {}
293unsafe impl Sync for EnvPtr {}
294
295/// Environment statistics.
296///
297/// Contains information about the size and layout of an MDBX environment or
298/// database.
299#[derive(Debug, Clone, Copy)]
300#[repr(transparent)]
301pub struct Stat(ffi::MDBX_stat);
302
303impl Stat {
304    /// Create a new Stat with zero'd inner struct `ffi::MDB_stat`.
305    pub(crate) const fn new() -> Self {
306        unsafe { Self(mem::zeroed()) }
307    }
308
309    /// Returns a mut pointer to `ffi::MDB_stat`.
310    pub(crate) const fn mdb_stat(&mut self) -> *mut ffi::MDBX_stat {
311        &mut self.0
312    }
313}
314
315impl Stat {
316    /// Size of a database page. This is the same for all databases in the
317    /// environment.
318    #[inline]
319    pub const fn page_size(&self) -> u32 {
320        self.0.ms_psize
321    }
322
323    /// Depth (height) of the B-tree.
324    #[inline]
325    pub const fn depth(&self) -> u32 {
326        self.0.ms_depth
327    }
328
329    /// Number of internal (non-leaf) pages.
330    #[inline]
331    pub const fn branch_pages(&self) -> usize {
332        self.0.ms_branch_pages as usize
333    }
334
335    /// Number of leaf pages.
336    #[inline]
337    pub const fn leaf_pages(&self) -> usize {
338        self.0.ms_leaf_pages as usize
339    }
340
341    /// Number of overflow pages.
342    #[inline]
343    pub const fn overflow_pages(&self) -> usize {
344        self.0.ms_overflow_pages as usize
345    }
346
347    /// Number of data items.
348    #[inline]
349    pub const fn entries(&self) -> usize {
350        self.0.ms_entries as usize
351    }
352}
353
354#[derive(Debug, Clone, Copy)]
355#[repr(transparent)]
356pub struct GeometryInfo(ffi::MDBX_envinfo__bindgen_ty_1);
357
358impl GeometryInfo {
359    /// Minimum geometry setting of the environment.
360    pub const fn min(&self) -> u64 {
361        self.0.lower
362    }
363}
364
365/// Environment information.
366///
367/// Contains environment information about the map size, readers, last txn id
368/// etc.
369#[derive(Debug)]
370#[repr(transparent)]
371pub struct Info(ffi::MDBX_envinfo);
372
373impl Info {
374    /// Geometry settings of the environment.
375    pub const fn geometry(&self) -> GeometryInfo {
376        GeometryInfo(self.0.mi_geo)
377    }
378
379    /// Size of memory map.
380    #[inline]
381    pub const fn map_size(&self) -> usize {
382        self.0.mi_mapsize as usize
383    }
384
385    /// Last used page number
386    #[inline]
387    pub const fn last_pgno(&self) -> usize {
388        self.0.mi_last_pgno as usize
389    }
390
391    /// Last transaction ID
392    #[inline]
393    pub const fn last_txnid(&self) -> usize {
394        self.0.mi_recent_txnid as usize
395    }
396
397    /// Max reader slots in the environment
398    #[inline]
399    pub const fn max_readers(&self) -> usize {
400        self.0.mi_maxreaders as usize
401    }
402
403    /// Max reader slots used in the environment
404    #[inline]
405    pub const fn num_readers(&self) -> usize {
406        self.0.mi_numreaders as usize
407    }
408
409    /// Return the internal page ops metrics
410    #[inline]
411    pub const fn page_ops(&self) -> PageOps {
412        PageOps {
413            newly: self.0.mi_pgop_stat.newly,
414            cow: self.0.mi_pgop_stat.cow,
415            clone: self.0.mi_pgop_stat.clone,
416            split: self.0.mi_pgop_stat.split,
417            merge: self.0.mi_pgop_stat.merge,
418            spill: self.0.mi_pgop_stat.spill,
419            unspill: self.0.mi_pgop_stat.unspill,
420            wops: self.0.mi_pgop_stat.wops,
421            prefault: self.0.mi_pgop_stat.prefault,
422            mincore: self.0.mi_pgop_stat.mincore,
423            msync: self.0.mi_pgop_stat.msync,
424            fsync: self.0.mi_pgop_stat.fsync,
425        }
426    }
427
428    /// Return the mode of the database
429    #[inline]
430    pub const fn mode(&self) -> Mode {
431        let mode = self.0.mi_mode as ffi::MDBX_env_flags_t;
432        if (mode & ffi::MDBX_RDONLY) != 0 {
433            Mode::ReadOnly
434        } else if (mode & ffi::MDBX_UTTERLY_NOSYNC) != 0 {
435            Mode::ReadWrite { sync_mode: SyncMode::UtterlyNoSync }
436        } else if (mode & ffi::MDBX_NOMETASYNC) != 0 {
437            Mode::ReadWrite { sync_mode: SyncMode::NoMetaSync }
438        } else if (mode & ffi::MDBX_SAFE_NOSYNC) != 0 {
439            Mode::ReadWrite { sync_mode: SyncMode::SafeNoSync }
440        } else {
441            Mode::ReadWrite { sync_mode: SyncMode::Durable }
442        }
443    }
444}
445
446impl fmt::Debug for Environment {
447    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
448        f.debug_struct("Environment").field("kind", &self.inner.env_kind).finish_non_exhaustive()
449    }
450}
451
452///////////////////////////////////////////////////////////////////////////////////////////////////
453// Environment Builder
454///////////////////////////////////////////////////////////////////////////////////////////////////
455
456/// Page size settings for the database environment
457#[derive(Debug, Clone, Copy, PartialEq, Eq)]
458pub enum PageSize {
459    /// Determine a page size automatically based on the underlying system.
460    MinimalAcceptable,
461    /// Use the specified page size in bytes.
462    Set(usize),
463}
464
465/// Statistics of page operations overall of all (running, completed and
466/// aborted) transactions
467#[derive(Debug, Clone, Copy, PartialEq, Eq)]
468pub struct PageOps {
469    /// Quantity of a new pages added
470    pub newly: u64,
471    /// Quantity of pages copied for update
472    pub cow: u64,
473    /// Quantity of parent's dirty pages clones for nested transactions
474    pub clone: u64,
475    /// Page splits
476    pub split: u64,
477    /// Page merges
478    pub merge: u64,
479    /// Quantity of spilled dirty pages
480    pub spill: u64,
481    /// Quantity of unspilled/reloaded pages
482    pub unspill: u64,
483    /// Number of explicit write operations (not a pages) to a disk
484    pub wops: u64,
485    /// Number of explicit msync/flush-to-disk operations
486    pub msync: u64,
487    /// Number of explicit fsync/flush-to-disk operations
488    pub fsync: u64,
489    /// Number of prefault write operations
490    pub prefault: u64,
491    /// Number of `mincore()` calls
492    pub mincore: u64,
493}
494
495/// Represents the geometry settings for the database environment
496#[derive(Clone, Debug, PartialEq, Eq)]
497pub struct Geometry<R> {
498    /// The size range in bytes.
499    pub size: Option<R>,
500    /// The growth step in bytes.
501    pub growth_step: Option<isize>,
502    /// The shrink threshold in bytes.
503    pub shrink_threshold: Option<isize>,
504    /// The page size.
505    pub page_size: Option<PageSize>,
506}
507
508impl<R> Default for Geometry<R> {
509    fn default() -> Self {
510        Self { size: None, growth_step: None, shrink_threshold: None, page_size: None }
511    }
512}
513
514/// Handle-Slow-Readers callback function to resolve database full/overflow
515/// issue due to a reader(s) which prevents the old data from being recycled.
516///
517/// Read transactions prevent reuse of pages freed by newer write transactions,
518/// thus the database can grow quickly. This callback will be called when there
519/// is not enough  space in the database (i.e. before increasing the database
520/// size or before `MDBX_MAP_FULL` error) and thus can be used to resolve
521/// issues with a "long-lived" read transactions.
522///
523/// Depending on the arguments and needs, your implementation may wait,
524/// terminate a process or thread that is performing a long read, or perform
525/// some other action. In doing so it is important that the returned code always
526/// corresponds to the performed action.
527///
528/// # Arguments
529///
530/// * `process_id` – A process id of the reader process.
531/// * `thread_id` – A thread id of the reader thread.
532/// * `read_txn_id` – An oldest read transaction number on which stalled.
533/// * `gap` – A lag from the last committed txn.
534/// * `space` – A space that actually become available for reuse after this
535///   reader finished. The callback function can take this value into account
536///   to evaluate the impact that a long-running
537///   transaction has.
538/// * `retry` – A retry number starting from 0. If callback has returned 0 at
539///   least once, then at end of current handling loop the callback function
540///   will be called additionally with negative `retry` value to notify about
541///   the end of loop. The callback function can use this fact to implement
542///   timeout reset logic while waiting for a readers.
543///
544/// # Returns
545///
546/// A return code that determines the further actions for MDBX and must match
547/// the action which was executed by the callback:
548///
549/// * `-2` or less – An error condition and the reader was not killed.
550/// * `-1` – The callback was unable to solve the problem and agreed on
551///   `MDBX_MAP_FULL` error; MDBX should increase the database size or return
552///   `MDBX_MAP_FULL` error.
553/// * `0` – The callback solved the problem or just waited for a while, libmdbx
554///   should rescan the reader lock table and retry. This also includes a
555///   situation when corresponding transaction terminated in normal way by
556///   `mdbx_txn_abort()` or `mdbx_txn_reset()`, and may be restarted. I.e.
557///   reader slot isn't needed to be cleaned from transaction.
558/// * `1` – Transaction aborted asynchronous and reader slot should be cleared
559///   immediately, i.e. read transaction will not continue but
560///   `mdbx_txn_abort()` nor `mdbx_txn_reset()` will be called later.
561/// * `>=2` – The reader process was terminated or killed, and MDBX should
562///   entirely reset reader registration.
563pub type HandleSlowReadersCallback = extern "C" fn(
564    env: *const ffi::MDBX_env,
565    txn: *const ffi::MDBX_txn,
566    pid: ffi::mdbx_pid_t,
567    tid: ffi::mdbx_tid_t,
568    laggard: u64,
569    gap: std::ffi::c_uint,
570    space: usize,
571    retry: std::ffi::c_int,
572) -> HandleSlowReadersReturnCode;
573
574#[derive(Debug, Clone, Copy, PartialEq, Eq)]
575#[repr(i32)]
576/// Return codes for the Handle-Slow-Readers callback function.
577pub enum HandleSlowReadersReturnCode {
578    /// An error condition and the reader was not killed.
579    Error = -2,
580    /// The callback was unable to solve the problem and agreed on
581    /// `MDBX_MAP_FULL` error; MDBX should increase the database size or return
582    /// `MDBX_MAP_FULL` error.
583    ProceedWithoutKillingReader = -1,
584    /// The callback solved the problem or just waited for a while, libmdbx
585    /// should rescan the reader lock table and retry. This also includes a
586    /// situation when corresponding transaction terminated in normal way by
587    /// `mdbx_txn_abort()` or `mdbx_txn_reset()`, and may be restarted. I.e.
588    /// reader slot isn't needed to be cleaned from transaction.
589    Success = 0,
590    /// Transaction aborted asynchronous and reader slot should be cleared
591    /// immediately, i.e. read transaction will not continue but
592    /// `mdbx_txn_abort()` nor `mdbx_txn_reset()` will be called later.
593    ClearReaderSlot = 1,
594    /// The reader process was terminated or killed, and MDBX should entirely
595    /// reset reader registration.
596    ReaderProcessTerminated = 2,
597}
598
599/// Options for opening or creating an environment.
600#[derive(Debug, Clone)]
601pub struct EnvironmentBuilder {
602    flags: EnvironmentFlags,
603    max_readers: Option<u64>,
604    max_dbs: Option<u64>,
605    sync_bytes: Option<u64>,
606    sync_period: Option<u64>,
607    rp_augment_limit: Option<u64>,
608    loose_limit: Option<u64>,
609    dp_reserve_limit: Option<u64>,
610    txn_dp_limit: Option<u64>,
611    spill_max_denominator: Option<u64>,
612    spill_min_denominator: Option<u64>,
613    geometry: Option<Geometry<(Option<usize>, Option<usize>)>>,
614    log_level: Option<ffi::MDBX_log_level_t>,
615    kind: EnvironmentKind,
616    handle_slow_readers: Option<HandleSlowReadersCallback>,
617}
618
619impl EnvironmentBuilder {
620    /// Open an environment.
621    ///
622    /// Database files will be opened with 644 permissions.
623    pub fn open(&self, path: &Path) -> MdbxResult<Environment> {
624        self.open_with_permissions(path, 0o644)
625    }
626
627    /// Open an environment with the provided UNIX permissions.
628    ///
629    /// The path may not contain the null character.
630    pub fn open_with_permissions(
631        &self,
632        path: &Path,
633        mode: ffi::mdbx_mode_t,
634    ) -> MdbxResult<Environment> {
635        let mut env: *mut ffi::MDBX_env = ptr::null_mut();
636        unsafe {
637            if let Some(log_level) = self.log_level {
638                // Returns the previously debug_flags in the 0-15 bits and log_level in the
639                // 16-31 bits, no need to use `mdbx_result`.
640                ffi::mdbx_setup_debug(log_level, ffi::MDBX_DBG_DONTCHANGE, None);
641            }
642
643            mdbx_result(ffi::mdbx_env_create(&mut env))?;
644
645            if let Err(e) = (|| {
646                if let Some(geometry) = &self.geometry {
647                    let mut min_size = -1;
648                    let mut max_size = -1;
649
650                    if let Some(size) = geometry.size {
651                        if let Some(size) = size.0 {
652                            min_size = size as isize;
653                        }
654
655                        if let Some(size) = size.1 {
656                            max_size = size as isize;
657                        }
658                    }
659
660                    mdbx_result(ffi::mdbx_env_set_geometry(
661                        env,
662                        min_size,
663                        -1,
664                        max_size,
665                        geometry.growth_step.unwrap_or(-1),
666                        geometry.shrink_threshold.unwrap_or(-1),
667                        match geometry.page_size {
668                            None => -1,
669                            Some(PageSize::MinimalAcceptable) => 0,
670                            Some(PageSize::Set(size)) => size as isize,
671                        },
672                    ))?;
673                }
674                for (opt, v) in [
675                    (ffi::MDBX_opt_max_db, self.max_dbs),
676                    (ffi::MDBX_opt_rp_augment_limit, self.rp_augment_limit),
677                    (ffi::MDBX_opt_loose_limit, self.loose_limit),
678                    (ffi::MDBX_opt_dp_reserve_limit, self.dp_reserve_limit),
679                    (ffi::MDBX_opt_txn_dp_limit, self.txn_dp_limit),
680                    (ffi::MDBX_opt_spill_max_denominator, self.spill_max_denominator),
681                    (ffi::MDBX_opt_spill_min_denominator, self.spill_min_denominator),
682                ] {
683                    if let Some(v) = v {
684                        mdbx_result(ffi::mdbx_env_set_option(env, opt, v))?;
685                    }
686                }
687
688                // set max readers if specified
689                if let Some(max_readers) = self.max_readers {
690                    mdbx_result(ffi::mdbx_env_set_option(
691                        env,
692                        ffi::MDBX_opt_max_readers,
693                        max_readers,
694                    ))?;
695                }
696
697                if let Some(handle_slow_readers) = self.handle_slow_readers {
698                    mdbx_result(ffi::mdbx_env_set_hsr(
699                        env,
700                        convert_hsr_fn(Some(handle_slow_readers)),
701                    ))?;
702                }
703
704                #[cfg(unix)]
705                fn path_to_bytes<P: AsRef<Path>>(path: P) -> Vec<u8> {
706                    use std::os::unix::ffi::OsStrExt;
707                    path.as_ref().as_os_str().as_bytes().to_vec()
708                }
709
710                #[cfg(windows)]
711                fn path_to_bytes<P: AsRef<Path>>(path: P) -> Vec<u8> {
712                    // On Windows, could use std::os::windows::ffi::OsStrExt to encode_wide(),
713                    // but we end up with a Vec<u16> instead of a Vec<u8>, so that doesn't
714                    // really help.
715                    path.as_ref().to_string_lossy().to_string().into_bytes()
716                }
717
718                let path = match CString::new(path_to_bytes(path)) {
719                    Ok(path) => path,
720                    Err(_) => return Err(MdbxError::Invalid),
721                };
722                mdbx_result(ffi::mdbx_env_open(
723                    env,
724                    path.as_ptr(),
725                    self.flags.make_flags() | self.kind.extra_flags(),
726                    mode,
727                ))?;
728
729                for (opt, v) in [
730                    (ffi::MDBX_opt_sync_bytes, self.sync_bytes),
731                    (ffi::MDBX_opt_sync_period, self.sync_period),
732                ] {
733                    if let Some(v) = v {
734                        mdbx_result(ffi::mdbx_env_set_option(env, opt, v))?;
735                    }
736                }
737
738                Ok(())
739            })() {
740                ffi::mdbx_env_close_ex(env, false);
741
742                return Err(e);
743            }
744        }
745
746        let env_ptr = EnvPtr(env);
747
748        let txn_manager = RwSyncLifecycle::spawn(env_ptr);
749
750        let env = EnvironmentInner { env, txn_manager, env_kind: self.kind };
751
752        Ok(Environment { inner: Arc::new(env) })
753    }
754
755    /// Configures how this environment will be opened.
756    pub const fn set_kind(&mut self, kind: EnvironmentKind) -> &mut Self {
757        self.kind = kind;
758        self
759    }
760
761    /// Opens the environment with mdbx WRITEMAP
762    ///
763    /// See also [`EnvironmentKind`]
764    pub const fn write_map(&mut self) -> &mut Self {
765        self.set_kind(EnvironmentKind::WriteMap)
766    }
767
768    /// Sets the provided options in the environment.
769    pub const fn set_flags(&mut self, flags: EnvironmentFlags) -> &mut Self {
770        self.flags = flags;
771        self
772    }
773
774    /// Sets the maximum number of threads or reader slots for the environment.
775    ///
776    /// This defines the number of slots in the lock table that is used to
777    /// track readers in the environment. The default is 126. Starting a
778    /// read-only transaction normally ties a lock table slot to [`Tx`] object
779    /// until it or the [`Environment`] object is destroyed.
780    ///
781    /// [`Tx`]: crate::tx::Tx
782    pub const fn set_max_readers(&mut self, max_readers: u64) -> &mut Self {
783        self.max_readers = Some(max_readers);
784        self
785    }
786
787    /// Sets the maximum number of named databases for the environment.
788    ///
789    /// This function is only needed if multiple databases will be used in the
790    /// environment. Simpler applications that use the environment as a single
791    /// unnamed database can ignore this option.
792    ///
793    /// Currently a moderate number of slots are cheap but a huge number gets
794    /// expensive: 7-120 words per transaction, and every call to
795    /// [`Tx::open_db()`] or [`Tx::open_db_no_cache()`] does a linear
796    ///
797    /// [`Tx::open_db()`]: crate::tx::Tx::open_db
798    /// [`Tx::open_db_no_cache()`]: crate::tx::Tx::open_db_no_cache
799    /// search of the opened slots.
800    pub const fn set_max_dbs(&mut self, v: usize) -> &mut Self {
801        self.max_dbs = Some(v as u64);
802        self
803    }
804
805    /// Sets the interprocess/shared threshold to force flush the data buffers
806    /// to disk, if [`SyncMode::SafeNoSync`] is used.
807    pub const fn set_sync_bytes(&mut self, v: usize) -> &mut Self {
808        self.sync_bytes = Some(v as u64);
809        self
810    }
811
812    /// Sets the interprocess/shared relative period since the last unsteady commit to force flush
813    /// the data buffers to disk, if [`SyncMode::SafeNoSync`] is used.
814    pub fn set_sync_period(&mut self, v: Duration) -> &mut Self {
815        // For this option, mdbx uses units of 1/65536 of a second.
816        let as_mdbx_units = (v.as_secs_f64() * 65536f64) as u64;
817        self.sync_period = Some(as_mdbx_units);
818        self
819    }
820
821    /// Sets the relative limit of the reader's page-table augmentation.
822    pub const fn set_rp_augment_limit(&mut self, v: u64) -> &mut Self {
823        self.rp_augment_limit = Some(v);
824        self
825    }
826
827    /// Sets the relative loose limit of the environment.
828    pub const fn set_loose_limit(&mut self, v: u64) -> &mut Self {
829        self.loose_limit = Some(v);
830        self
831    }
832
833    /// Sets the relative dirty-page reserve limit of the environment.
834    pub const fn set_dp_reserve_limit(&mut self, v: u64) -> &mut Self {
835        self.dp_reserve_limit = Some(v);
836        self
837    }
838
839    /// Sets the relative dirty-page limit per transaction.
840    pub const fn set_txn_dp_limit(&mut self, v: u64) -> &mut Self {
841        self.txn_dp_limit = Some(v);
842        self
843    }
844
845    /// Sets the spill maximum denominator.
846    pub fn set_spill_max_denominator(&mut self, v: u8) -> &mut Self {
847        self.spill_max_denominator = Some(v.into());
848        self
849    }
850
851    /// Sets the spill minimum denominator.
852    pub fn set_spill_min_denominator(&mut self, v: u8) -> &mut Self {
853        self.spill_min_denominator = Some(v.into());
854        self
855    }
856
857    /// Set all size-related parameters of environment, including page size and the min/max size of
858    /// the memory map.
859    pub fn set_geometry<R: RangeBounds<usize>>(&mut self, geometry: Geometry<R>) -> &mut Self {
860        let convert_bound = |bound: Bound<&usize>| match bound {
861            Bound::Included(v) | Bound::Excluded(v) => Some(*v),
862            _ => None,
863        };
864        self.geometry = Some(Geometry {
865            size: geometry.size.map(|range| {
866                (convert_bound(range.start_bound()), convert_bound(range.end_bound()))
867            }),
868            growth_step: geometry.growth_step,
869            shrink_threshold: geometry.shrink_threshold,
870            page_size: geometry.page_size,
871        });
872        self
873    }
874
875    /// Set the logging level for the environment.
876    pub const fn set_log_level(&mut self, log_level: ffi::MDBX_log_level_t) -> &mut Self {
877        self.log_level = Some(log_level);
878        self
879    }
880
881    /// Set the Handle-Slow-Readers callback. See [`HandleSlowReadersCallback`] for more
882    /// information.
883    pub fn set_handle_slow_readers(&mut self, hsr: HandleSlowReadersCallback) -> &mut Self {
884        self.handle_slow_readers = Some(hsr);
885        self
886    }
887}
888
889/// Converts a [`HandleSlowReadersCallback`] to the actual FFI function pointer.
890fn convert_hsr_fn(callback: Option<HandleSlowReadersCallback>) -> ffi::MDBX_hsr_func {
891    unsafe { std::mem::transmute(callback) }
892}
893
894#[cfg(test)]
895mod tests {
896    use crate::{
897        Environment, Geometry, MdbxError, WriteFlags,
898        sys::{HandleSlowReadersReturnCode, PageSize},
899    };
900    use std::{
901        ops::RangeInclusive,
902        sync::atomic::{AtomicBool, Ordering},
903    };
904
905    #[test]
906    fn test_handle_slow_readers_callback() {
907        static CALLED: AtomicBool = AtomicBool::new(false);
908
909        extern "C" fn handle_slow_readers(
910            _env: *const ffi::MDBX_env,
911            _txn: *const ffi::MDBX_txn,
912            _pid: ffi::mdbx_pid_t,
913            _tid: ffi::mdbx_tid_t,
914            _laggard: u64,
915            _gap: std::ffi::c_uint,
916            _space: usize,
917            _retry: std::ffi::c_int,
918        ) -> HandleSlowReadersReturnCode {
919            CALLED.store(true, Ordering::Relaxed);
920            HandleSlowReadersReturnCode::ProceedWithoutKillingReader
921        }
922
923        let tempdir = tempfile::tempdir().unwrap();
924        let env = Environment::builder()
925            .set_geometry(Geometry::<RangeInclusive<usize>> {
926                size: Some(0..=1024 * 1024), // Max 1MB, so we can hit the limit
927                page_size: Some(PageSize::MinimalAcceptable), // To create as many pages as possible
928                ..Default::default()
929            })
930            .set_handle_slow_readers(handle_slow_readers)
931            .open(tempdir.path())
932            .unwrap();
933
934        // Insert some data in the database, so the read transaction can lock on the snapshot of it
935        {
936            let tx = env.begin_rw_sync().unwrap();
937            let db = tx.open_db(None).unwrap();
938            for i in 0usize..1_000 {
939                tx.put(db, i.to_le_bytes(), b"0", WriteFlags::empty()).unwrap()
940            }
941            tx.commit().unwrap();
942        }
943
944        // Create a read transaction
945        let _tx_ro = env.begin_ro_sync().unwrap();
946        // Change previously inserted data, so the read transaction would use the previous snapshot
947        {
948            let tx = env.begin_rw_sync().unwrap();
949            let db = tx.open_db(None).unwrap();
950            for i in 0usize..1_000 {
951                tx.put(db, i.to_le_bytes(), b"1", WriteFlags::empty()).unwrap();
952            }
953            tx.commit().unwrap();
954        }
955
956        // Insert more data in the database, so we hit the DB size limit error, and MDBX tries to
957        // kick long-lived readers and delete their snapshots
958        {
959            let tx = env.begin_rw_sync().unwrap();
960            let db = tx.open_db(None).unwrap();
961            for i in 1_000usize..1_000_000 {
962                match tx.put(db, i.to_le_bytes(), b"0", WriteFlags::empty()) {
963                    Ok(_) => {}
964                    Err(MdbxError::MapFull) => break,
965                    result @ Err(_) => result.unwrap(),
966                }
967            }
968            // The transaction may be in an error state after hitting MapFull,
969            // so commit could fail. We don't care about the result here since
970            // the purpose of this test is to verify the HSR callback was called.
971            let _ = tx.commit();
972        }
973
974        // Expect the HSR to be called
975        assert!(CALLED.load(Ordering::Relaxed));
976    }
977}