use crate::{
Database, Mode, SyncMode,
error::{MdbxError, MdbxResult, ReadResult, mdbx_result},
flags::EnvironmentFlags,
sys::txn_manager::{LifecycleHandle, RwSyncLifecycle},
tx::aliases::{RoTxSync, RoTxUnsync, RwTxSync, RwTxUnsync},
};
use byteorder::{ByteOrder, NativeEndian};
use mem::size_of;
use std::{
ffi::CString,
fmt::{self, Debug},
mem,
ops::{Bound, RangeBounds},
path::Path,
ptr,
sync::Arc,
time::Duration,
};
#[derive(Clone)]
pub struct Environment {
inner: Arc<EnvironmentInner>,
}
impl Environment {
pub fn builder() -> EnvironmentBuilder {
EnvironmentBuilder {
flags: EnvironmentFlags::default(),
max_readers: None,
max_dbs: None,
sync_bytes: None,
sync_period: None,
rp_augment_limit: None,
loose_limit: None,
dp_reserve_limit: None,
txn_dp_limit: None,
spill_max_denominator: None,
spill_min_denominator: None,
geometry: None,
log_level: None,
kind: Default::default(),
handle_slow_readers: None,
}
}
#[inline]
pub fn is_write_map(&self) -> bool {
self.inner.env_kind.is_write_map()
}
#[inline]
pub fn env_kind(&self) -> EnvironmentKind {
self.inner.env_kind
}
#[inline]
pub fn is_read_write(&self) -> MdbxResult<bool> {
Ok(!self.is_read_only()?)
}
#[inline]
pub fn is_read_only(&self) -> MdbxResult<bool> {
Ok(matches!(self.info()?.mode(), Mode::ReadOnly))
}
#[inline]
pub(crate) fn txn_manager(&self) -> &LifecycleHandle {
&self.inner.txn_manager
}
#[inline]
pub fn begin_ro_sync(&self) -> MdbxResult<RoTxSync> {
RoTxSync::begin(self.clone())
}
pub fn begin_rw_sync(&self) -> MdbxResult<RwTxSync> {
RwTxSync::begin(self.clone())
}
pub fn begin_ro_unsync(&self) -> MdbxResult<RoTxUnsync> {
RoTxUnsync::begin(self.clone())
}
pub fn begin_rw_unsync(&self) -> MdbxResult<RwTxUnsync> {
RwTxUnsync::begin(self.clone())
}
#[inline]
pub(crate) fn env_ptr(&self) -> *mut ffi::MDBX_env {
self.inner.env
}
#[inline]
#[doc(hidden)]
pub fn with_raw_env_ptr<F, T>(&self, f: F) -> T
where
F: FnOnce(*mut ffi::MDBX_env) -> T,
{
f(self.env_ptr())
}
pub fn sync(&self, force: bool) -> MdbxResult<bool> {
mdbx_result(unsafe { ffi::mdbx_env_sync_ex(self.env_ptr(), force, false) })
}
pub fn stat(&self) -> MdbxResult<Stat> {
unsafe {
let mut stat = Stat::new();
mdbx_result(ffi::mdbx_env_stat_ex(
self.env_ptr(),
ptr::null(),
stat.mdb_stat(),
size_of::<Stat>(),
))?;
Ok(stat)
}
}
pub fn info(&self) -> MdbxResult<Info> {
unsafe {
let mut info = Info(mem::zeroed());
mdbx_result(ffi::mdbx_env_info_ex(
self.env_ptr(),
ptr::null(),
&mut info.0,
size_of::<Info>(),
))?;
Ok(info)
}
}
pub fn freelist(&self) -> ReadResult<usize> {
let mut freelist: usize = 0;
let txn = self.begin_ro_unsync()?;
let db = Database::freelist_db();
let mut cursor = txn.cursor(db)?;
let mut iter = cursor.iter_slices();
while let Some((_key, value)) = iter.borrow_next()? {
if value.len() < size_of::<u32>() {
return Err(MdbxError::Corrupted.into());
}
let s = &value[..size_of::<u32>()];
freelist += NativeEndian::read_u32(s) as usize;
}
Ok(freelist)
}
}
struct EnvironmentInner {
env: *mut ffi::MDBX_env,
env_kind: EnvironmentKind,
txn_manager: LifecycleHandle,
}
impl Drop for EnvironmentInner {
fn drop(&mut self) {
unsafe {
ffi::mdbx_env_close_ex(self.env, false);
}
}
}
unsafe impl Send for EnvironmentInner {}
unsafe impl Sync for EnvironmentInner {}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum EnvironmentKind {
#[default]
Default,
WriteMap,
}
impl EnvironmentKind {
#[inline]
pub const fn is_write_map(&self) -> bool {
matches!(self, Self::WriteMap)
}
pub(crate) const fn extra_flags(&self) -> ffi::MDBX_env_flags_t {
match self {
Self::Default => ffi::MDBX_ENV_DEFAULTS,
Self::WriteMap => ffi::MDBX_WRITEMAP,
}
}
}
#[derive(Copy, Clone, Debug)]
pub(crate) struct EnvPtr(pub(crate) *mut ffi::MDBX_env);
unsafe impl Send for EnvPtr {}
unsafe impl Sync for EnvPtr {}
#[derive(Debug, Clone, Copy)]
#[repr(transparent)]
pub struct Stat(ffi::MDBX_stat);
impl Stat {
pub(crate) const fn new() -> Self {
unsafe { Self(mem::zeroed()) }
}
pub(crate) const fn mdb_stat(&mut self) -> *mut ffi::MDBX_stat {
&mut self.0
}
}
impl Stat {
#[inline]
pub const fn page_size(&self) -> u32 {
self.0.ms_psize
}
#[inline]
pub const fn depth(&self) -> u32 {
self.0.ms_depth
}
#[inline]
pub const fn branch_pages(&self) -> usize {
self.0.ms_branch_pages as usize
}
#[inline]
pub const fn leaf_pages(&self) -> usize {
self.0.ms_leaf_pages as usize
}
#[inline]
pub const fn overflow_pages(&self) -> usize {
self.0.ms_overflow_pages as usize
}
#[inline]
pub const fn entries(&self) -> usize {
self.0.ms_entries as usize
}
}
#[derive(Debug, Clone, Copy)]
#[repr(transparent)]
pub struct GeometryInfo(ffi::MDBX_envinfo__bindgen_ty_1);
impl GeometryInfo {
pub const fn min(&self) -> u64 {
self.0.lower
}
}
#[derive(Debug)]
#[repr(transparent)]
pub struct Info(ffi::MDBX_envinfo);
impl Info {
pub const fn geometry(&self) -> GeometryInfo {
GeometryInfo(self.0.mi_geo)
}
#[inline]
pub const fn map_size(&self) -> usize {
self.0.mi_mapsize as usize
}
#[inline]
pub const fn last_pgno(&self) -> usize {
self.0.mi_last_pgno as usize
}
#[inline]
pub const fn last_txnid(&self) -> usize {
self.0.mi_recent_txnid as usize
}
#[inline]
pub const fn max_readers(&self) -> usize {
self.0.mi_maxreaders as usize
}
#[inline]
pub const fn num_readers(&self) -> usize {
self.0.mi_numreaders as usize
}
#[inline]
pub const fn page_ops(&self) -> PageOps {
PageOps {
newly: self.0.mi_pgop_stat.newly,
cow: self.0.mi_pgop_stat.cow,
clone: self.0.mi_pgop_stat.clone,
split: self.0.mi_pgop_stat.split,
merge: self.0.mi_pgop_stat.merge,
spill: self.0.mi_pgop_stat.spill,
unspill: self.0.mi_pgop_stat.unspill,
wops: self.0.mi_pgop_stat.wops,
prefault: self.0.mi_pgop_stat.prefault,
mincore: self.0.mi_pgop_stat.mincore,
msync: self.0.mi_pgop_stat.msync,
fsync: self.0.mi_pgop_stat.fsync,
}
}
#[inline]
pub const fn mode(&self) -> Mode {
let mode = self.0.mi_mode as ffi::MDBX_env_flags_t;
if (mode & ffi::MDBX_RDONLY) != 0 {
Mode::ReadOnly
} else if (mode & ffi::MDBX_UTTERLY_NOSYNC) != 0 {
Mode::ReadWrite { sync_mode: SyncMode::UtterlyNoSync }
} else if (mode & ffi::MDBX_NOMETASYNC) != 0 {
Mode::ReadWrite { sync_mode: SyncMode::NoMetaSync }
} else if (mode & ffi::MDBX_SAFE_NOSYNC) != 0 {
Mode::ReadWrite { sync_mode: SyncMode::SafeNoSync }
} else {
Mode::ReadWrite { sync_mode: SyncMode::Durable }
}
}
}
impl fmt::Debug for Environment {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Environment").field("kind", &self.inner.env_kind).finish_non_exhaustive()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PageSize {
MinimalAcceptable,
Set(usize),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PageOps {
pub newly: u64,
pub cow: u64,
pub clone: u64,
pub split: u64,
pub merge: u64,
pub spill: u64,
pub unspill: u64,
pub wops: u64,
pub msync: u64,
pub fsync: u64,
pub prefault: u64,
pub mincore: u64,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Geometry<R> {
pub size: Option<R>,
pub growth_step: Option<isize>,
pub shrink_threshold: Option<isize>,
pub page_size: Option<PageSize>,
}
impl<R> Default for Geometry<R> {
fn default() -> Self {
Self { size: None, growth_step: None, shrink_threshold: None, page_size: None }
}
}
pub type HandleSlowReadersCallback = extern "C" fn(
env: *const ffi::MDBX_env,
txn: *const ffi::MDBX_txn,
pid: ffi::mdbx_pid_t,
tid: ffi::mdbx_tid_t,
laggard: u64,
gap: std::ffi::c_uint,
space: usize,
retry: std::ffi::c_int,
) -> HandleSlowReadersReturnCode;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(i32)]
pub enum HandleSlowReadersReturnCode {
Error = -2,
ProceedWithoutKillingReader = -1,
Success = 0,
ClearReaderSlot = 1,
ReaderProcessTerminated = 2,
}
#[derive(Debug, Clone)]
pub struct EnvironmentBuilder {
flags: EnvironmentFlags,
max_readers: Option<u64>,
max_dbs: Option<u64>,
sync_bytes: Option<u64>,
sync_period: Option<u64>,
rp_augment_limit: Option<u64>,
loose_limit: Option<u64>,
dp_reserve_limit: Option<u64>,
txn_dp_limit: Option<u64>,
spill_max_denominator: Option<u64>,
spill_min_denominator: Option<u64>,
geometry: Option<Geometry<(Option<usize>, Option<usize>)>>,
log_level: Option<ffi::MDBX_log_level_t>,
kind: EnvironmentKind,
handle_slow_readers: Option<HandleSlowReadersCallback>,
}
impl EnvironmentBuilder {
pub fn open(&self, path: &Path) -> MdbxResult<Environment> {
self.open_with_permissions(path, 0o644)
}
pub fn open_with_permissions(
&self,
path: &Path,
mode: ffi::mdbx_mode_t,
) -> MdbxResult<Environment> {
let mut env: *mut ffi::MDBX_env = ptr::null_mut();
unsafe {
if let Some(log_level) = self.log_level {
ffi::mdbx_setup_debug(log_level, ffi::MDBX_DBG_DONTCHANGE, None);
}
mdbx_result(ffi::mdbx_env_create(&mut env))?;
if let Err(e) = (|| {
if let Some(geometry) = &self.geometry {
let mut min_size = -1;
let mut max_size = -1;
if let Some(size) = geometry.size {
if let Some(size) = size.0 {
min_size = size as isize;
}
if let Some(size) = size.1 {
max_size = size as isize;
}
}
mdbx_result(ffi::mdbx_env_set_geometry(
env,
min_size,
-1,
max_size,
geometry.growth_step.unwrap_or(-1),
geometry.shrink_threshold.unwrap_or(-1),
match geometry.page_size {
None => -1,
Some(PageSize::MinimalAcceptable) => 0,
Some(PageSize::Set(size)) => size as isize,
},
))?;
}
for (opt, v) in [
(ffi::MDBX_opt_max_db, self.max_dbs),
(ffi::MDBX_opt_rp_augment_limit, self.rp_augment_limit),
(ffi::MDBX_opt_loose_limit, self.loose_limit),
(ffi::MDBX_opt_dp_reserve_limit, self.dp_reserve_limit),
(ffi::MDBX_opt_txn_dp_limit, self.txn_dp_limit),
(ffi::MDBX_opt_spill_max_denominator, self.spill_max_denominator),
(ffi::MDBX_opt_spill_min_denominator, self.spill_min_denominator),
] {
if let Some(v) = v {
mdbx_result(ffi::mdbx_env_set_option(env, opt, v))?;
}
}
if let Some(max_readers) = self.max_readers {
mdbx_result(ffi::mdbx_env_set_option(
env,
ffi::MDBX_opt_max_readers,
max_readers,
))?;
}
if let Some(handle_slow_readers) = self.handle_slow_readers {
mdbx_result(ffi::mdbx_env_set_hsr(
env,
convert_hsr_fn(Some(handle_slow_readers)),
))?;
}
#[cfg(unix)]
fn path_to_bytes<P: AsRef<Path>>(path: P) -> Vec<u8> {
use std::os::unix::ffi::OsStrExt;
path.as_ref().as_os_str().as_bytes().to_vec()
}
#[cfg(windows)]
fn path_to_bytes<P: AsRef<Path>>(path: P) -> Vec<u8> {
path.as_ref().to_string_lossy().to_string().into_bytes()
}
let path = match CString::new(path_to_bytes(path)) {
Ok(path) => path,
Err(_) => return Err(MdbxError::Invalid),
};
mdbx_result(ffi::mdbx_env_open(
env,
path.as_ptr(),
self.flags.make_flags() | self.kind.extra_flags(),
mode,
))?;
for (opt, v) in [
(ffi::MDBX_opt_sync_bytes, self.sync_bytes),
(ffi::MDBX_opt_sync_period, self.sync_period),
] {
if let Some(v) = v {
mdbx_result(ffi::mdbx_env_set_option(env, opt, v))?;
}
}
Ok(())
})() {
ffi::mdbx_env_close_ex(env, false);
return Err(e);
}
}
let env_ptr = EnvPtr(env);
let txn_manager = RwSyncLifecycle::spawn(env_ptr);
let env = EnvironmentInner { env, txn_manager, env_kind: self.kind };
Ok(Environment { inner: Arc::new(env) })
}
pub const fn set_kind(&mut self, kind: EnvironmentKind) -> &mut Self {
self.kind = kind;
self
}
pub const fn write_map(&mut self) -> &mut Self {
self.set_kind(EnvironmentKind::WriteMap)
}
pub const fn set_flags(&mut self, flags: EnvironmentFlags) -> &mut Self {
self.flags = flags;
self
}
pub const fn set_max_readers(&mut self, max_readers: u64) -> &mut Self {
self.max_readers = Some(max_readers);
self
}
pub const fn set_max_dbs(&mut self, v: usize) -> &mut Self {
self.max_dbs = Some(v as u64);
self
}
pub const fn set_sync_bytes(&mut self, v: usize) -> &mut Self {
self.sync_bytes = Some(v as u64);
self
}
pub fn set_sync_period(&mut self, v: Duration) -> &mut Self {
let as_mdbx_units = (v.as_secs_f64() * 65536f64) as u64;
self.sync_period = Some(as_mdbx_units);
self
}
pub const fn set_rp_augment_limit(&mut self, v: u64) -> &mut Self {
self.rp_augment_limit = Some(v);
self
}
pub const fn set_loose_limit(&mut self, v: u64) -> &mut Self {
self.loose_limit = Some(v);
self
}
pub const fn set_dp_reserve_limit(&mut self, v: u64) -> &mut Self {
self.dp_reserve_limit = Some(v);
self
}
pub const fn set_txn_dp_limit(&mut self, v: u64) -> &mut Self {
self.txn_dp_limit = Some(v);
self
}
pub fn set_spill_max_denominator(&mut self, v: u8) -> &mut Self {
self.spill_max_denominator = Some(v.into());
self
}
pub fn set_spill_min_denominator(&mut self, v: u8) -> &mut Self {
self.spill_min_denominator = Some(v.into());
self
}
pub fn set_geometry<R: RangeBounds<usize>>(&mut self, geometry: Geometry<R>) -> &mut Self {
let convert_bound = |bound: Bound<&usize>| match bound {
Bound::Included(v) | Bound::Excluded(v) => Some(*v),
_ => None,
};
self.geometry = Some(Geometry {
size: geometry.size.map(|range| {
(convert_bound(range.start_bound()), convert_bound(range.end_bound()))
}),
growth_step: geometry.growth_step,
shrink_threshold: geometry.shrink_threshold,
page_size: geometry.page_size,
});
self
}
pub const fn set_log_level(&mut self, log_level: ffi::MDBX_log_level_t) -> &mut Self {
self.log_level = Some(log_level);
self
}
pub fn set_handle_slow_readers(&mut self, hsr: HandleSlowReadersCallback) -> &mut Self {
self.handle_slow_readers = Some(hsr);
self
}
}
fn convert_hsr_fn(callback: Option<HandleSlowReadersCallback>) -> ffi::MDBX_hsr_func {
unsafe { std::mem::transmute(callback) }
}
#[cfg(test)]
mod tests {
use crate::{
Environment, Geometry, MdbxError, WriteFlags,
sys::{HandleSlowReadersReturnCode, PageSize},
};
use std::{
ops::RangeInclusive,
sync::atomic::{AtomicBool, Ordering},
};
#[test]
fn test_handle_slow_readers_callback() {
static CALLED: AtomicBool = AtomicBool::new(false);
extern "C" fn handle_slow_readers(
_env: *const ffi::MDBX_env,
_txn: *const ffi::MDBX_txn,
_pid: ffi::mdbx_pid_t,
_tid: ffi::mdbx_tid_t,
_laggard: u64,
_gap: std::ffi::c_uint,
_space: usize,
_retry: std::ffi::c_int,
) -> HandleSlowReadersReturnCode {
CALLED.store(true, Ordering::Relaxed);
HandleSlowReadersReturnCode::ProceedWithoutKillingReader
}
let tempdir = tempfile::tempdir().unwrap();
let env = Environment::builder()
.set_geometry(Geometry::<RangeInclusive<usize>> {
size: Some(0..=1024 * 1024), page_size: Some(PageSize::MinimalAcceptable), ..Default::default()
})
.set_handle_slow_readers(handle_slow_readers)
.open(tempdir.path())
.unwrap();
{
let tx = env.begin_rw_sync().unwrap();
let db = tx.open_db(None).unwrap();
for i in 0usize..1_000 {
tx.put(db, i.to_le_bytes(), b"0", WriteFlags::empty()).unwrap()
}
tx.commit().unwrap();
}
let _tx_ro = env.begin_ro_sync().unwrap();
{
let tx = env.begin_rw_sync().unwrap();
let db = tx.open_db(None).unwrap();
for i in 0usize..1_000 {
tx.put(db, i.to_le_bytes(), b"1", WriteFlags::empty()).unwrap();
}
tx.commit().unwrap();
}
{
let tx = env.begin_rw_sync().unwrap();
let db = tx.open_db(None).unwrap();
for i in 1_000usize..1_000_000 {
match tx.put(db, i.to_le_bytes(), b"0", WriteFlags::empty()) {
Ok(_) => {}
Err(MdbxError::MapFull) => break,
result @ Err(_) => result.unwrap(),
}
}
let _ = tx.commit();
}
assert!(CALLED.load(Ordering::Relaxed));
}
}