libmdbx_remote/
environment.rs

1use crate::{
2    database::Database,
3    error::{mdbx_result, Error, Result},
4    flags::EnvironmentFlags,
5    transaction::{RO, RW},
6    txn_manager::{TxnManager, TxnManagerMessage, TxnPtr},
7    Mode, SyncMode, Transaction, TransactionKind,
8};
9use byteorder::{ByteOrder, NativeEndian};
10use mem::size_of;
11use serde::{Deserialize, Serialize};
12use std::{
13    ffi::CString,
14    fmt::{self, Debug},
15    mem,
16    ops::{Bound, RangeBounds},
17    path::Path,
18    ptr,
19    sync::{mpsc::sync_channel, Arc},
20    thread::sleep,
21    time::Duration,
22};
23use tracing::warn;
24
25/// The default maximum duration of a read transaction.
26#[cfg(feature = "read-tx-timeouts")]
27const DEFAULT_MAX_READ_TRANSACTION_DURATION: Duration = Duration::from_secs(5 * 60);
28
29/// An environment supports multiple databases, all residing in the same shared-memory map.
30///
31/// Accessing the environment is thread-safe.
32/// The environment will be closed when the last instance of this type is dropped.
33#[derive(Clone)]
34pub struct Environment {
35    inner: Arc<EnvironmentInner>,
36}
37
38impl Environment {
39    /// Creates a new builder for specifying options for opening an MDBX environment.
40    pub fn builder() -> EnvironmentBuilder {
41        EnvironmentBuilder {
42            flags: EnvironmentFlags::default(),
43            max_readers: None,
44            max_dbs: None,
45            sync_bytes: None,
46            sync_period: None,
47            rp_augment_limit: None,
48            loose_limit: None,
49            dp_reserve_limit: None,
50            txn_dp_limit: None,
51            spill_max_denominator: None,
52            spill_min_denominator: None,
53            geometry: None,
54            log_level: None,
55            kind: Default::default(),
56            handle_slow_readers: None,
57            #[cfg(feature = "read-tx-timeouts")]
58            max_read_transaction_duration: None,
59        }
60    }
61
62    /// Returns true if the environment was opened as WRITEMAP.
63    #[inline]
64    pub fn is_write_map(&self) -> bool {
65        self.inner.env_kind.is_write_map()
66    }
67
68    /// Returns the kind of the environment.
69    #[inline]
70    pub fn env_kind(&self) -> EnvironmentKind {
71        self.inner.env_kind
72    }
73
74    /// Returns true if the environment was opened in [`crate::Mode::ReadWrite`] mode.
75    #[inline]
76    pub fn is_read_write(&self) -> Result<bool> {
77        Ok(!self.is_read_only()?)
78    }
79
80    /// Returns true if the environment was opened in [`crate::Mode::ReadOnly`] mode.
81    #[inline]
82    pub fn is_read_only(&self) -> Result<bool> {
83        Ok(matches!(self.info()?.mode(), Mode::ReadOnly))
84    }
85
86    /// Returns the transaction manager.
87    #[inline]
88    pub(crate) fn txn_manager(&self) -> &TxnManager {
89        &self.inner.txn_manager
90    }
91
92    /// Returns the number of timed out transactions that were not aborted by the user yet.
93    #[cfg(feature = "read-tx-timeouts")]
94    pub fn timed_out_not_aborted_transactions(&self) -> usize {
95        self.inner
96            .txn_manager
97            .timed_out_not_aborted_read_transactions()
98            .unwrap_or(0)
99    }
100
101    /// Create a read-only transaction for use with the environment.
102    #[inline]
103    pub fn begin_ro_txn(&self) -> Result<Transaction<RO>> {
104        Transaction::new(self.clone())
105    }
106
107    /// Create a read-write transaction for use with the environment. This method will block while
108    /// there are any other read-write transactions open on the environment.
109    pub fn begin_rw_txn(&self) -> Result<Transaction<RW>> {
110        let mut warned = false;
111        let txn = loop {
112            let (tx, rx) = sync_channel(0);
113            self.txn_manager().send_message(TxnManagerMessage::Begin {
114                parent: TxnPtr(ptr::null_mut()),
115                flags: RW::OPEN_FLAGS,
116                sender: tx,
117            });
118            let res = rx.recv().unwrap();
119            if matches!(&res, Err(Error::Busy)) {
120                if !warned {
121                    warned = true;
122                    warn!(target: "libmdbx", "Process stalled, awaiting read-write transaction lock.");
123                }
124                sleep(Duration::from_millis(250));
125                continue;
126            }
127
128            break res;
129        }?;
130        Ok(Transaction::new_from_ptr(self.clone(), txn.0))
131    }
132
133    /// Returns a raw pointer to the underlying MDBX environment.
134    ///
135    /// The caller **must** ensure that the pointer is never dereferenced after the environment has
136    /// been dropped.
137    #[inline]
138    pub(crate) fn env_ptr(&self) -> *mut ffi::MDBX_env {
139        self.inner.env
140    }
141
142    /// Executes the given closure once
143    ///
144    /// This is only intended to be used when accessing mdbx ffi functions directly is required.
145    ///
146    /// The caller **must** ensure that the pointer is only used within the closure.
147    #[inline]
148    #[doc(hidden)]
149    pub fn with_raw_env_ptr<F, T>(&self, f: F) -> T
150    where
151        F: FnOnce(*mut ffi::MDBX_env) -> T,
152    {
153        f(self.env_ptr())
154    }
155
156    /// Flush the environment data buffers to disk.
157    pub fn sync(&self, force: bool) -> Result<bool> {
158        mdbx_result(unsafe { ffi::mdbx_env_sync_ex(self.env_ptr(), force, false) })
159    }
160
161    /// Retrieves statistics about this environment.
162    pub fn stat(&self) -> Result<Stat> {
163        unsafe {
164            let mut stat = Stat::new();
165            mdbx_result(ffi::mdbx_env_stat_ex(
166                self.env_ptr(),
167                ptr::null(),
168                stat.mdb_stat(),
169                size_of::<Stat>(),
170            ))?;
171            Ok(stat)
172        }
173    }
174
175    /// Retrieves info about this environment.
176    pub fn info(&self) -> Result<Info> {
177        unsafe {
178            let mut info = Info(mem::zeroed());
179            mdbx_result(ffi::mdbx_env_info_ex(
180                self.env_ptr(),
181                ptr::null(),
182                &mut info.0,
183                size_of::<Info>(),
184            ))?;
185            Ok(info)
186        }
187    }
188
189    /// Retrieves the total number of pages on the freelist.
190    ///
191    /// Along with [`Environment::info()`], this can be used to calculate the exact number
192    /// of used pages as well as free pages in this environment.
193    ///
194    /// ```
195    /// # use libmdbx_remote::Environment;
196    /// let dir = tempfile::tempdir().unwrap();
197    /// let env = Environment::builder().open(dir.path()).unwrap();
198    /// let info = env.info().unwrap();
199    /// let stat = env.stat().unwrap();
200    /// let freelist = env.freelist().unwrap();
201    /// let last_pgno = info.last_pgno() + 1; // pgno is 0 based.
202    /// let total_pgs = info.map_size() / stat.page_size() as usize;
203    /// let pgs_in_use = last_pgno - freelist;
204    /// let pgs_free = total_pgs - pgs_in_use;
205    /// ```
206    ///
207    /// Note:
208    ///
209    /// * MDBX stores all the freelists in the designated database 0 in each environment, and the
210    ///   freelist count is stored at the beginning of the value as `uint32_t` in the native byte
211    ///   order.
212    ///
213    /// * It will create a read transaction to traverse the freelist database.
214    pub fn freelist(&self) -> Result<usize> {
215        let mut freelist: usize = 0;
216        let txn = self.begin_ro_txn()?;
217        let db = Database::freelist_db();
218        let cursor = txn.cursor(&db)?;
219
220        for result in cursor.iter_slices() {
221            let (_key, value) = result?;
222            if value.len() < size_of::<usize>() {
223                return Err(Error::Corrupted);
224            }
225
226            let s = &value[..size_of::<usize>()];
227            freelist += NativeEndian::read_u32(s) as usize;
228        }
229
230        Ok(freelist)
231    }
232}
233
234/// Container type for Environment internals.
235///
236/// This holds the raw pointer to the MDBX environment and the transaction manager.
237/// The env is opened via [`mdbx_env_create`](ffi::mdbx_env_create) and closed when this type drops.
238struct EnvironmentInner {
239    /// The raw pointer to the MDBX environment.
240    ///
241    /// Accessing the environment is thread-safe as long as long as this type exists.
242    env: *mut ffi::MDBX_env,
243    /// Whether the environment was opened as WRITEMAP.
244    env_kind: EnvironmentKind,
245    /// Transaction manager
246    txn_manager: TxnManager,
247}
248
249impl Drop for EnvironmentInner {
250    fn drop(&mut self) {
251        // Close open mdbx environment on drop
252        unsafe {
253            ffi::mdbx_env_close_ex(self.env, false);
254        }
255    }
256}
257
258// SAFETY: internal type, only used inside [Environment]. Accessing the environment pointer is
259// thread-safe
260unsafe impl Send for EnvironmentInner {}
261unsafe impl Sync for EnvironmentInner {}
262
263/// Determines how data is mapped into memory
264///
265/// It only takes affect when the environment is opened.
266#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
267pub enum EnvironmentKind {
268    /// Open the environment in default mode, without WRITEMAP.
269    #[default]
270    Default,
271    /// Open the environment as mdbx-WRITEMAP.
272    /// Use a writeable memory map unless the environment is opened as `MDBX_RDONLY`
273    /// ([`crate::Mode::ReadOnly`]).
274    ///
275    /// All data will be mapped into memory in the read-write mode [`crate::Mode::ReadWrite`]. This
276    /// offers a significant performance benefit, since the data will be modified directly in
277    /// mapped memory and then flushed to disk by single system call, without any memory
278    /// management nor copying.
279    ///
280    /// This mode is incompatible with nested transactions.
281    WriteMap,
282}
283
284impl EnvironmentKind {
285    /// Returns true if the environment was opened as WRITEMAP.
286    #[inline]
287    pub const fn is_write_map(&self) -> bool {
288        matches!(self, Self::WriteMap)
289    }
290
291    /// Additional flags required when opening the environment.
292    pub(crate) const fn extra_flags(&self) -> ffi::MDBX_env_flags_t {
293        match self {
294            Self::Default => ffi::MDBX_ENV_DEFAULTS,
295            Self::WriteMap => ffi::MDBX_WRITEMAP,
296        }
297    }
298}
299
300#[derive(Copy, Clone, Debug)]
301pub(crate) struct EnvPtr(pub(crate) *mut ffi::MDBX_env);
302unsafe impl Send for EnvPtr {}
303unsafe impl Sync for EnvPtr {}
304
305/// Environment statistics.
306///
307/// Contains information about the size and layout of an MDBX environment or database.
308#[derive(Debug, Serialize, Deserialize)]
309#[repr(transparent)]
310pub struct Stat(ffi::MDBX_stat);
311
312impl Stat {
313    /// Create a new Stat with zero'd inner struct `ffi::MDB_stat`.
314    pub(crate) const fn new() -> Self {
315        unsafe { Self(mem::zeroed()) }
316    }
317
318    /// Returns a mut pointer to `ffi::MDB_stat`.
319    pub(crate) fn mdb_stat(&mut self) -> *mut ffi::MDBX_stat {
320        &mut self.0
321    }
322}
323
324impl Stat {
325    /// Size of a database page. This is the same for all databases in the environment.
326    #[inline]
327    pub const fn page_size(&self) -> u32 {
328        self.0.ms_psize
329    }
330
331    /// Depth (height) of the B-tree.
332    #[inline]
333    pub const fn depth(&self) -> u32 {
334        self.0.ms_depth
335    }
336
337    /// Number of internal (non-leaf) pages.
338    #[inline]
339    pub const fn branch_pages(&self) -> usize {
340        self.0.ms_branch_pages as usize
341    }
342
343    /// Number of leaf pages.
344    #[inline]
345    pub const fn leaf_pages(&self) -> usize {
346        self.0.ms_leaf_pages as usize
347    }
348
349    /// Number of overflow pages.
350    #[inline]
351    pub const fn overflow_pages(&self) -> usize {
352        self.0.ms_overflow_pages as usize
353    }
354
355    /// Number of data items.
356    #[inline]
357    pub const fn entries(&self) -> usize {
358        self.0.ms_entries as usize
359    }
360}
361
362#[derive(Debug)]
363#[repr(transparent)]
364pub struct GeometryInfo(ffi::MDBX_envinfo__bindgen_ty_1);
365
366impl GeometryInfo {
367    pub const fn min(&self) -> u64 {
368        self.0.lower
369    }
370}
371
372/// Environment information.
373///
374/// Contains environment information about the map size, readers, last txn id etc.
375#[derive(Debug, Serialize, Deserialize)]
376#[repr(transparent)]
377pub struct Info(ffi::MDBX_envinfo);
378
379impl Info {
380    pub const fn geometry(&self) -> GeometryInfo {
381        GeometryInfo(self.0.mi_geo)
382    }
383
384    pub const fn mode(&self) -> Mode {
385        let mode = self.0.mi_mode;
386        if (mode & ffi::MDBX_RDONLY) != 0 {
387            Mode::ReadOnly
388        } else {
389            if (mode & ffi::MDBX_SYNC_DURABLE) != 0 {
390                Mode::ReadWrite {
391                    sync_mode: SyncMode::Durable,
392                }
393            } else if (mode & ffi::MDBX_UTTERLY_NOSYNC) != 0 {
394                Mode::ReadWrite {
395                    sync_mode: SyncMode::UtterlyNoSync,
396                }
397            } else if (mode & ffi::MDBX_NOMETASYNC) != 0 {
398                Mode::ReadWrite {
399                    sync_mode: SyncMode::NoMetaSync,
400                }
401            } else if (mode & ffi::MDBX_SAFE_NOSYNC) != 0 {
402                Mode::ReadWrite {
403                    sync_mode: SyncMode::SafeNoSync,
404                }
405            } else {
406                Mode::ReadWrite {
407                    sync_mode: SyncMode::Durable,
408                }
409            }
410        }
411    }
412
413    /// Size of memory map.
414    #[inline]
415    pub const fn map_size(&self) -> usize {
416        self.0.mi_mapsize as usize
417    }
418
419    /// Last used page number
420    #[inline]
421    pub const fn last_pgno(&self) -> usize {
422        self.0.mi_last_pgno as usize
423    }
424
425    /// Last transaction ID
426    #[inline]
427    pub const fn last_txnid(&self) -> usize {
428        self.0.mi_recent_txnid as usize
429    }
430
431    /// Max reader slots in the environment
432    #[inline]
433    pub const fn max_readers(&self) -> usize {
434        self.0.mi_maxreaders as usize
435    }
436
437    /// Max reader slots used in the environment
438    #[inline]
439    pub const fn num_readers(&self) -> usize {
440        self.0.mi_numreaders as usize
441    }
442
443    /// Return the internal page ops metrics
444    #[inline]
445    pub const fn page_ops(&self) -> PageOps {
446        PageOps {
447            newly: self.0.mi_pgop_stat.newly,
448            cow: self.0.mi_pgop_stat.cow,
449            clone: self.0.mi_pgop_stat.clone,
450            split: self.0.mi_pgop_stat.split,
451            merge: self.0.mi_pgop_stat.merge,
452            spill: self.0.mi_pgop_stat.spill,
453            unspill: self.0.mi_pgop_stat.unspill,
454            wops: self.0.mi_pgop_stat.wops,
455            prefault: self.0.mi_pgop_stat.prefault,
456            mincore: self.0.mi_pgop_stat.mincore,
457            msync: self.0.mi_pgop_stat.msync,
458            fsync: self.0.mi_pgop_stat.fsync,
459        }
460    }
461}
462
463impl fmt::Debug for Environment {
464    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
465        f.debug_struct("Environment")
466            .field("kind", &self.inner.env_kind)
467            .finish_non_exhaustive()
468    }
469}
470
471///////////////////////////////////////////////////////////////////////////////////////////////////
472// Environment Builder
473///////////////////////////////////////////////////////////////////////////////////////////////////
474
475#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
476pub enum PageSize {
477    MinimalAcceptable,
478    Set(usize),
479}
480
481/// Statistics of page operations overall of all (running, completed and aborted) transactions
482#[derive(Clone, Debug, PartialEq, Eq)]
483pub struct PageOps {
484    /// Quantity of a new pages added
485    pub newly: u64,
486    /// Quantity of pages copied for update
487    pub cow: u64,
488    /// Quantity of parent's dirty pages clones for nested transactions
489    pub clone: u64,
490    /// Page splits
491    pub split: u64,
492    /// Page merges
493    pub merge: u64,
494    /// Quantity of spilled dirty pages
495    pub spill: u64,
496    /// Quantity of unspilled/reloaded pages
497    pub unspill: u64,
498    /// Number of explicit write operations (not a pages) to a disk
499    pub wops: u64,
500    /// Number of explicit msync/flush-to-disk operations
501    pub msync: u64,
502    /// Number of explicit fsync/flush-to-disk operations
503    pub fsync: u64,
504    /// Number of prefault write operations
505    pub prefault: u64,
506    /// Number of `mincore()` calls
507    pub mincore: u64,
508}
509
510#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
511pub struct Geometry<R> {
512    pub size: Option<R>,
513    pub growth_step: Option<isize>,
514    pub shrink_threshold: Option<isize>,
515    pub page_size: Option<PageSize>,
516}
517
518impl<R> Default for Geometry<R> {
519    fn default() -> Self {
520        Self {
521            size: None,
522            growth_step: None,
523            shrink_threshold: None,
524            page_size: None,
525        }
526    }
527}
528
529/// Handle-Slow-Readers callback function to resolve database full/overflow issue due to a reader(s)
530/// which prevents the old data from being recycled.
531///
532/// Read transactions prevent reuse of pages freed by newer write transactions, thus the database
533/// can grow quickly. This callback will be called when there is not enough space in the database
534/// (i.e. before increasing the database size or before `MDBX_MAP_FULL` error) and thus can be
535/// used to resolve issues with a "long-lived" read transacttions.
536///
537/// Depending on the arguments and needs, your implementation may wait,
538/// terminate a process or thread that is performing a long read, or perform
539/// some other action. In doing so it is important that the returned code always
540/// corresponds to the performed action.
541///
542/// # Arguments
543///
544/// * `process_id` – A process id of the reader process.
545/// * `thread_id` – A thread id of the reader thread.
546/// * `read_txn_id` – An oldest read transaction number on which stalled.
547/// * `gap` – A lag from the last committed txn.
548/// * `space` – A space that actually become available for reuse after this reader finished. The
549///   callback function can take this value into account to evaluate the impact that a long-running
550///   transaction has.
551/// * `retry` – A retry number starting from 0. If callback has returned 0 at least once, then at
552///   end of current handling loop the callback function will be called additionally with negative
553///   `retry` value to notify about the end of loop. The callback function can use this fact to
554///   implement timeout reset logic while waiting for a readers.
555///
556/// # Returns
557///
558/// A return code that determines the further actions for MDBX and must match the action which
559/// was executed by the callback:
560/// * `-2` or less – An error condition and the reader was not killed.
561/// * `-1` – The callback was unable to solve the problem and agreed on `MDBX_MAP_FULL` error; MDBX
562///   should increase the database size or return `MDBX_MAP_FULL` error.
563/// * `0` – The callback solved the problem or just waited for a while, libmdbx should rescan the
564///   reader lock table and retry. This also includes a situation when corresponding transaction
565///   terminated in normal way by `mdbx_txn_abort()` or `mdbx_txn_reset()`, and may be restarted.
566///   I.e. reader slot isn't needed to be cleaned from transaction.
567/// * `1` – Transaction aborted asynchronous and reader slot should be cleared immediately, i.e.
568///   read transaction will not continue but `mdbx_txn_abort()` nor `mdbx_txn_reset()` will be
569///   called later.
570/// * `2` or greater – The reader process was terminated or killed, and MDBX should entirely reset
571///   reader registration.
572pub type HandleSlowReadersCallback = extern "C" fn(
573    env: *const ffi::MDBX_env,
574    txn: *const ffi::MDBX_txn,
575    pid: ffi::mdbx_pid_t,
576    tid: ffi::mdbx_tid_t,
577    laggard: u64,
578    gap: std::ffi::c_uint,
579    space: usize,
580    retry: std::ffi::c_int,
581) -> HandleSlowReadersReturnCode;
582
583#[derive(Debug)]
584#[repr(i32)]
585pub enum HandleSlowReadersReturnCode {
586    /// An error condition and the reader was not killed.
587    Error = -2,
588    /// The callback was unable to solve the problem and agreed on `MDBX_MAP_FULL` error;
589    /// MDBX should increase the database size or return `MDBX_MAP_FULL` error.
590    ProceedWithoutKillingReader = -1,
591    /// The callback solved the problem or just waited for a while, libmdbx should rescan the
592    /// reader lock table and retry. This also includes a situation when corresponding transaction
593    /// terminated in normal way by `mdbx_txn_abort()` or `mdbx_txn_reset()`, and may be restarted.
594    /// I.e. reader slot isn't needed to be cleaned from transaction.
595    Success = 0,
596    /// Transaction aborted asynchronous and reader slot should be cleared immediately, i.e. read
597    /// transaction will not continue but `mdbx_txn_abort()` nor `mdbx_txn_reset()` will be called
598    /// later.
599    ClearReaderSlot = 1,
600    /// The reader process was terminated or killed, and MDBX should entirely reset reader
601    /// registration.
602    ReaderProcessTerminated = 2,
603}
604
605/// Options for opening or creating an _remote_ environment.
606#[derive(Debug, Clone, Serialize, Deserialize)]
607pub struct RemoteEnvironmentConfig {
608    pub(crate) flags: EnvironmentFlags,
609    pub(crate) max_readers: Option<u64>,
610    pub(crate) max_dbs: Option<u64>,
611    pub(crate) sync_bytes: Option<u64>,
612    pub(crate) sync_period: Option<u64>,
613    pub(crate) rp_augment_limit: Option<u64>,
614    pub(crate) loose_limit: Option<u64>,
615    pub(crate) dp_reserve_limit: Option<u64>,
616    pub(crate) txn_dp_limit: Option<u64>,
617    pub(crate) spill_max_denominator: Option<u64>,
618    pub(crate) spill_min_denominator: Option<u64>,
619    pub(crate) geometry: Option<Geometry<(Option<usize>, Option<usize>)>>,
620    pub(crate) log_level: Option<ffi::MDBX_log_level_t>,
621    pub(crate) kind: EnvironmentKind,
622    // handle_slow_readers: Option<HandleSlowReadersCallback>, // TODO
623    // #[cfg(feature = "read-tx-timeouts")]
624    /// The maximum duration of a read transaction. If [None], but the `read-tx-timeout` feature is
625    /// enabled, the default value of [`DEFAULT_MAX_READ_TRANSACTION_DURATION`] is used.
626    pub(crate) max_read_transaction_duration: Option<read_transactions::MaxReadTransactionDuration>,
627}
628
629impl From<EnvironmentBuilder> for RemoteEnvironmentConfig {
630    fn from(value: EnvironmentBuilder) -> Self {
631        Self {
632            flags: value.flags,
633            max_readers: value.max_readers,
634            max_dbs: value.max_dbs,
635            sync_bytes: value.sync_bytes,
636            sync_period: value.sync_period,
637            rp_augment_limit: value.rp_augment_limit,
638            loose_limit: value.loose_limit,
639            dp_reserve_limit: value.dp_reserve_limit,
640            txn_dp_limit: value.txn_dp_limit,
641            spill_max_denominator: value.spill_max_denominator,
642            spill_min_denominator: value.spill_min_denominator,
643            geometry: value.geometry,
644            log_level: value.log_level,
645            kind: value.kind,
646            #[cfg(feature = "read-tx-timeouts")]
647            max_read_transaction_duration: value.max_read_transaction_duration,
648            #[cfg(not(feature = "read-tx-timeouts"))]
649            max_read_transaction_duration: None,
650        }
651    }
652}
653
654impl RemoteEnvironmentConfig {
655    pub(crate) fn env_kind(&self) -> EnvironmentKind {
656        self.kind
657    }
658}
659
660impl From<RemoteEnvironmentConfig> for EnvironmentBuilder {
661    fn from(value: RemoteEnvironmentConfig) -> Self {
662        Self {
663            flags: value.flags,
664            max_readers: value.max_readers,
665            max_dbs: value.max_dbs,
666            sync_bytes: value.sync_bytes,
667            sync_period: value.sync_period,
668            rp_augment_limit: value.rp_augment_limit,
669            loose_limit: value.loose_limit,
670            dp_reserve_limit: value.dp_reserve_limit,
671            txn_dp_limit: value.txn_dp_limit,
672            spill_max_denominator: value.spill_max_denominator,
673            spill_min_denominator: value.spill_min_denominator,
674            geometry: value.geometry,
675            log_level: value.log_level,
676            kind: value.kind,
677            handle_slow_readers: None,
678            #[cfg(feature = "read-tx-timeouts")]
679            max_read_transaction_duration: value.max_read_transaction_duration,
680        }
681    }
682}
683
684/// Options for opening or creating an environment.
685#[derive(Debug, Clone)]
686pub struct EnvironmentBuilder {
687    pub(crate) flags: EnvironmentFlags,
688    pub(crate) max_readers: Option<u64>,
689    pub(crate) max_dbs: Option<u64>,
690    pub(crate) sync_bytes: Option<u64>,
691    pub(crate) sync_period: Option<u64>,
692    pub(crate) rp_augment_limit: Option<u64>,
693    pub(crate) loose_limit: Option<u64>,
694    pub(crate) dp_reserve_limit: Option<u64>,
695    pub(crate) txn_dp_limit: Option<u64>,
696    pub(crate) spill_max_denominator: Option<u64>,
697    pub(crate) spill_min_denominator: Option<u64>,
698    pub(crate) geometry: Option<Geometry<(Option<usize>, Option<usize>)>>,
699    pub(crate) log_level: Option<ffi::MDBX_log_level_t>,
700    pub(crate) kind: EnvironmentKind,
701    pub(crate) handle_slow_readers: Option<HandleSlowReadersCallback>,
702    #[cfg(feature = "read-tx-timeouts")]
703    /// The maximum duration of a read transaction. If [None], but the `read-tx-timeout` feature is
704    /// enabled, the default value of [`DEFAULT_MAX_READ_TRANSACTION_DURATION`] is used.
705    pub(crate) max_read_transaction_duration: Option<read_transactions::MaxReadTransactionDuration>,
706}
707
708impl EnvironmentBuilder {
709    /// Open an environment.
710    ///
711    /// Database files will be opened with 644 permissions.
712    pub fn open(&self, path: &Path) -> Result<Environment> {
713        self.open_with_permissions(path, 0o644)
714    }
715
716    /// Open an environment with the provided UNIX permissions.
717    ///
718    /// The path may not contain the null character.
719    pub fn open_with_permissions(
720        &self,
721        path: &Path,
722        mode: ffi::mdbx_mode_t,
723    ) -> Result<Environment> {
724        let mut env: *mut ffi::MDBX_env = ptr::null_mut();
725        unsafe {
726            if let Some(log_level) = self.log_level {
727                // Returns the previously debug_flags in the 0-15 bits and log_level in the
728                // 16-31 bits, no need to use `mdbx_result`.
729                ffi::mdbx_setup_debug(log_level, ffi::MDBX_DBG_DONTCHANGE, None);
730            }
731
732            mdbx_result(ffi::mdbx_env_create(&mut env))?;
733
734            if let Err(e) = (|| {
735                if let Some(geometry) = &self.geometry {
736                    let mut min_size = -1;
737                    let mut max_size = -1;
738
739                    if let Some(size) = geometry.size {
740                        if let Some(size) = size.0 {
741                            min_size = size as isize;
742                        }
743
744                        if let Some(size) = size.1 {
745                            max_size = size as isize;
746                        }
747                    }
748
749                    mdbx_result(ffi::mdbx_env_set_geometry(
750                        env,
751                        min_size,
752                        -1,
753                        max_size,
754                        geometry.growth_step.unwrap_or(-1),
755                        geometry.shrink_threshold.unwrap_or(-1),
756                        match geometry.page_size {
757                            None => -1,
758                            Some(PageSize::MinimalAcceptable) => 0,
759                            Some(PageSize::Set(size)) => size as isize,
760                        },
761                    ))?;
762                }
763                for (opt, v) in [
764                    (ffi::MDBX_opt_max_db, self.max_dbs),
765                    (ffi::MDBX_opt_rp_augment_limit, self.rp_augment_limit),
766                    (ffi::MDBX_opt_loose_limit, self.loose_limit),
767                    (ffi::MDBX_opt_dp_reserve_limit, self.dp_reserve_limit),
768                    (ffi::MDBX_opt_txn_dp_limit, self.txn_dp_limit),
769                    (
770                        ffi::MDBX_opt_spill_max_denominator,
771                        self.spill_max_denominator,
772                    ),
773                    (
774                        ffi::MDBX_opt_spill_min_denominator,
775                        self.spill_min_denominator,
776                    ),
777                ] {
778                    if let Some(v) = v {
779                        mdbx_result(ffi::mdbx_env_set_option(env, opt, v))?;
780                    }
781                }
782
783                // set max readers if specified
784                if let Some(max_readers) = self.max_readers {
785                    mdbx_result(ffi::mdbx_env_set_option(
786                        env,
787                        ffi::MDBX_opt_max_readers,
788                        max_readers,
789                    ))?;
790                }
791
792                if let Some(handle_slow_readers) = self.handle_slow_readers {
793                    mdbx_result(ffi::mdbx_env_set_hsr(
794                        env,
795                        convert_hsr_fn(Some(handle_slow_readers)),
796                    ))?;
797                }
798
799                #[cfg(unix)]
800                fn path_to_bytes<P: AsRef<Path>>(path: P) -> Vec<u8> {
801                    use std::os::unix::ffi::OsStrExt;
802                    path.as_ref().as_os_str().as_bytes().to_vec()
803                }
804
805                #[cfg(windows)]
806                fn path_to_bytes<P: AsRef<Path>>(path: P) -> Vec<u8> {
807                    // On Windows, could use std::os::windows::ffi::OsStrExt to encode_wide(),
808                    // but we end up with a Vec<u16> instead of a Vec<u8>, so that doesn't
809                    // really help.
810                    path.as_ref().to_string_lossy().to_string().into_bytes()
811                }
812
813                let path = match CString::new(path_to_bytes(path)) {
814                    Ok(path) => path,
815                    Err(_) => return Err(Error::Invalid),
816                };
817                mdbx_result(ffi::mdbx_env_open(
818                    env,
819                    path.as_ptr(),
820                    self.flags.make_flags() | self.kind.extra_flags(),
821                    mode,
822                ))?;
823
824                for (opt, v) in [
825                    (ffi::MDBX_opt_sync_bytes, self.sync_bytes),
826                    (ffi::MDBX_opt_sync_period, self.sync_period),
827                ] {
828                    if let Some(v) = v {
829                        mdbx_result(ffi::mdbx_env_set_option(env, opt, v))?;
830                    }
831                }
832
833                Ok(())
834            })() {
835                ffi::mdbx_env_close_ex(env, false);
836
837                return Err(e);
838            }
839        }
840
841        let env_ptr = EnvPtr(env);
842
843        #[cfg(not(feature = "read-tx-timeouts"))]
844        let txn_manager = TxnManager::new(env_ptr);
845
846        #[cfg(feature = "read-tx-timeouts")]
847        let txn_manager = {
848            if let crate::MaxReadTransactionDuration::Set(duration) = self
849                .max_read_transaction_duration
850                .unwrap_or(read_transactions::MaxReadTransactionDuration::Set(
851                    DEFAULT_MAX_READ_TRANSACTION_DURATION,
852                ))
853            {
854                TxnManager::new_with_max_read_transaction_duration(env_ptr, duration)
855            } else {
856                TxnManager::new(env_ptr)
857            }
858        };
859
860        let env = EnvironmentInner {
861            env,
862            txn_manager,
863            env_kind: self.kind,
864        };
865
866        Ok(Environment {
867            inner: Arc::new(env),
868        })
869    }
870
871    /// Configures how this environment will be opened.
872    pub fn set_kind(&mut self, kind: EnvironmentKind) -> &mut Self {
873        self.kind = kind;
874        self
875    }
876
877    /// Opens the environment with mdbx WRITEMAP
878    ///
879    /// See also [`EnvironmentKind`]
880    pub fn write_map(&mut self) -> &mut Self {
881        self.set_kind(EnvironmentKind::WriteMap)
882    }
883
884    /// Sets the provided options in the environment.
885    pub fn set_flags(&mut self, flags: EnvironmentFlags) -> &mut Self {
886        self.flags = flags;
887        self
888    }
889
890    /// Sets the maximum number of threads or reader slots for the environment.
891    ///
892    /// This defines the number of slots in the lock table that is used to track readers in the
893    /// the environment. The default is 126. Starting a read-only transaction normally ties a lock
894    /// table slot to the [Transaction] object until it or the [Environment] object is destroyed.
895    pub fn set_max_readers(&mut self, max_readers: u64) -> &mut Self {
896        self.max_readers = Some(max_readers);
897        self
898    }
899
900    /// Sets the maximum number of named databases for the environment.
901    ///
902    /// This function is only needed if multiple databases will be used in the
903    /// environment. Simpler applications that use the environment as a single
904    /// unnamed database can ignore this option.
905    ///
906    /// Currently a moderate number of slots are cheap but a huge number gets
907    /// expensive: 7-120 words per transaction, and every [`Transaction::open_db()`]
908    /// does a linear search of the opened slots.
909    pub fn set_max_dbs(&mut self, v: usize) -> &mut Self {
910        self.max_dbs = Some(v as u64);
911        self
912    }
913
914    /// Sets the interprocess/shared threshold to force flush the data buffers to disk, if
915    /// [`SyncMode::SafeNoSync`](crate::flags::SyncMode::SafeNoSync) is used.
916    pub fn set_sync_bytes(&mut self, v: usize) -> &mut Self {
917        self.sync_bytes = Some(v as u64);
918        self
919    }
920
921    /// Sets the interprocess/shared relative period since the last unsteady commit to force flush
922    /// the data buffers to disk, if [`SyncMode::SafeNoSync`](crate::flags::SyncMode::SafeNoSync) is
923    /// used.
924    pub fn set_sync_period(&mut self, v: Duration) -> &mut Self {
925        // For this option, mdbx uses units of 1/65536 of a second.
926        let as_mdbx_units = (v.as_secs_f64() * 65536f64) as u64;
927        self.sync_period = Some(as_mdbx_units);
928        self
929    }
930
931    pub fn set_rp_augment_limit(&mut self, v: u64) -> &mut Self {
932        self.rp_augment_limit = Some(v);
933        self
934    }
935
936    pub fn set_loose_limit(&mut self, v: u64) -> &mut Self {
937        self.loose_limit = Some(v);
938        self
939    }
940
941    pub fn set_dp_reserve_limit(&mut self, v: u64) -> &mut Self {
942        self.dp_reserve_limit = Some(v);
943        self
944    }
945
946    pub fn set_txn_dp_limit(&mut self, v: u64) -> &mut Self {
947        self.txn_dp_limit = Some(v);
948        self
949    }
950
951    pub fn set_spill_max_denominator(&mut self, v: u8) -> &mut Self {
952        self.spill_max_denominator = Some(v.into());
953        self
954    }
955
956    pub fn set_spill_min_denominator(&mut self, v: u8) -> &mut Self {
957        self.spill_min_denominator = Some(v.into());
958        self
959    }
960
961    /// Set all size-related parameters of environment, including page size and the min/max size of
962    /// the memory map.
963    pub fn set_geometry<R: RangeBounds<usize>>(&mut self, geometry: Geometry<R>) -> &mut Self {
964        let convert_bound = |bound: Bound<&usize>| match bound {
965            Bound::Included(v) | Bound::Excluded(v) => Some(*v),
966            _ => None,
967        };
968        self.geometry = Some(Geometry {
969            size: geometry.size.map(|range| {
970                (
971                    convert_bound(range.start_bound()),
972                    convert_bound(range.end_bound()),
973                )
974            }),
975            growth_step: geometry.growth_step,
976            shrink_threshold: geometry.shrink_threshold,
977            page_size: geometry.page_size,
978        });
979        self
980    }
981
982    pub fn set_log_level(&mut self, log_level: ffi::MDBX_log_level_t) -> &mut Self {
983        self.log_level = Some(log_level);
984        self
985    }
986
987    /// Set the Handle-Slow-Readers callback. See [`HandleSlowReadersCallback`] for more
988    /// information.
989    pub fn set_handle_slow_readers(&mut self, hsr: HandleSlowReadersCallback) -> &mut Self {
990        self.handle_slow_readers = Some(hsr);
991        self
992    }
993}
994
995pub(crate) mod read_transactions {
996    #[cfg(feature = "read-tx-timeouts")]
997    use crate::EnvironmentBuilder;
998    use serde::{Deserialize, Serialize};
999    use std::time::Duration;
1000
1001    /// The maximum duration of a read transaction.
1002    #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
1003    pub enum MaxReadTransactionDuration {
1004        /// The maximum duration of a read transaction is unbounded.
1005        Unbounded,
1006        /// The maximum duration of a read transaction is set to the given duration.
1007        Set(Duration),
1008    }
1009
1010    #[cfg(feature = "read-tx-timeouts")]
1011    impl MaxReadTransactionDuration {
1012        pub const fn as_duration(&self) -> Option<Duration> {
1013            match self {
1014                Self::Unbounded => None,
1015                Self::Set(duration) => Some(*duration),
1016            }
1017        }
1018    }
1019
1020    #[cfg(feature = "read-tx-timeouts")]
1021    impl EnvironmentBuilder {
1022        /// Set the maximum time a read-only transaction can be open.
1023        pub fn set_max_read_transaction_duration(
1024            &mut self,
1025            max_read_transaction_duration: MaxReadTransactionDuration,
1026        ) -> &mut Self {
1027            self.max_read_transaction_duration = Some(max_read_transaction_duration);
1028            self
1029        }
1030    }
1031}
1032
1033/// Converts a [`HandleSlowReadersCallback`] to the actual FFI function pointer.
1034#[allow(clippy::missing_transmute_annotations)]
1035fn convert_hsr_fn(callback: Option<HandleSlowReadersCallback>) -> ffi::MDBX_hsr_func {
1036    unsafe { std::mem::transmute(callback) }
1037}
1038
1039#[cfg(test)]
1040mod tests {
1041    use crate::{Environment, Error, Geometry, HandleSlowReadersReturnCode, PageSize, WriteFlags};
1042    use std::{
1043        ops::RangeInclusive,
1044        sync::atomic::{AtomicBool, Ordering},
1045    };
1046
1047    #[test]
1048    fn test_handle_slow_readers_callback() {
1049        static CALLED: AtomicBool = AtomicBool::new(false);
1050
1051        extern "C" fn handle_slow_readers(
1052            _env: *const ffi::MDBX_env,
1053            _txn: *const ffi::MDBX_txn,
1054            _pid: ffi::mdbx_pid_t,
1055            _tid: ffi::mdbx_tid_t,
1056            _laggard: u64,
1057            _gap: std::ffi::c_uint,
1058            _space: usize,
1059            _retry: std::ffi::c_int,
1060        ) -> HandleSlowReadersReturnCode {
1061            CALLED.store(true, Ordering::Relaxed);
1062            HandleSlowReadersReturnCode::ProceedWithoutKillingReader
1063        }
1064
1065        let tempdir = tempfile::tempdir().unwrap();
1066        let env = Environment::builder()
1067            .set_geometry(Geometry::<RangeInclusive<usize>> {
1068                size: Some(0..=1024 * 1024), // Max 1MB, so we can hit the limit
1069                page_size: Some(PageSize::MinimalAcceptable), // To create as many pages as possible
1070                ..Default::default()
1071            })
1072            .set_handle_slow_readers(handle_slow_readers)
1073            .open(tempdir.path())
1074            .unwrap();
1075
1076        // Insert some data in the database, so the read transaction can lock on the snapshot of it
1077        {
1078            let tx = env.begin_rw_txn().unwrap();
1079            let db = tx.open_db(None).unwrap();
1080            for i in 0usize..1_000 {
1081                tx.put(db.dbi(), i.to_le_bytes(), b"0", WriteFlags::empty())
1082                    .unwrap()
1083            }
1084            tx.commit().unwrap();
1085        }
1086
1087        // Create a read transaction
1088        let _tx_ro = env.begin_ro_txn().unwrap();
1089
1090        // Change previously inserted data, so the read transaction would use the previous snapshot
1091        {
1092            let tx = env.begin_rw_txn().unwrap();
1093            let db = tx.open_db(None).unwrap();
1094            for i in 0usize..1_000 {
1095                tx.put(db.dbi(), i.to_le_bytes(), b"1", WriteFlags::empty())
1096                    .unwrap();
1097            }
1098            tx.commit().unwrap();
1099        }
1100
1101        // Insert more data in the database, so we hit the DB size limit error, and MDBX tries to
1102        // kick long-lived readers and delete their snapshots
1103        {
1104            let tx = env.begin_rw_txn().unwrap();
1105            let db = tx.open_db(None).unwrap();
1106            for i in 1_000usize..1_000_000 {
1107                match tx.put(db.dbi(), i.to_le_bytes(), b"0", WriteFlags::empty()) {
1108                    Ok(_) => continue,
1109                    Err(Error::MapFull) => break,
1110                    result @ Err(_) => result.unwrap(),
1111                }
1112            }
1113            tx.commit().unwrap();
1114        }
1115
1116        // Expect the HSR to be called
1117        assert!(CALLED.load(Ordering::Relaxed));
1118    }
1119}