use std::{
cmp::max,
collections::HashMap,
convert::TryInto,
path::{Path, PathBuf},
sync::Arc,
time::Instant,
};
use lmdb_zero::{
ConstAccessor,
Cursor,
CursorIter,
Database,
DatabaseOptions,
EnvBuilder,
Environment,
Ignore,
MaybeOwned,
ReadTransaction,
Stat,
WriteAccessor,
WriteTransaction,
db,
error,
error::LmdbResultExt,
open,
put,
traits::AsLmdbBytes,
};
use log::*;
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use crate::{
key_val_store::{error::KeyValStoreError, key_val_store::IterationResult},
lmdb_store::error::LMDBError,
};
const LOG_TARGET: &str = "lmdb";
pub const BYTES_PER_MB: usize = 1024 * 1024;
pub type DatabaseRef = Arc<Database<'static>>;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct LMDBConfig {
init_size_bytes: usize,
grow_size_bytes: usize,
resize_threshold_bytes: usize,
no_read_ahead: bool,
}
impl LMDBConfig {
pub fn new(
init_size_bytes: usize,
grow_size_bytes: usize,
resize_threshold_bytes: usize,
no_read_ahead: bool,
) -> Self {
Self {
init_size_bytes,
grow_size_bytes,
resize_threshold_bytes,
no_read_ahead,
}
}
pub fn new_from_mb(
init_size_mb: usize,
grow_size_mb: usize,
resize_threshold_mb: usize,
no_read_ahead: bool,
) -> Self {
Self {
init_size_bytes: init_size_mb * BYTES_PER_MB,
grow_size_bytes: grow_size_mb * BYTES_PER_MB,
resize_threshold_bytes: resize_threshold_mb * BYTES_PER_MB,
no_read_ahead,
}
}
pub fn init_size_bytes(&self) -> usize {
self.init_size_bytes
}
pub fn grow_size_bytes(&self) -> usize {
self.grow_size_bytes
}
pub fn resize_threshold_bytes(&self) -> usize {
self.resize_threshold_bytes
}
pub fn no_read_ahead(&self) -> bool {
self.no_read_ahead
}
}
impl Default for LMDBConfig {
fn default() -> Self {
Self::new_from_mb(128, 128, 64, false)
}
}
pub struct LMDBBuilder {
path: PathBuf,
env_flags: open::Flags,
max_dbs: usize,
db_names: HashMap<String, db::Flags>,
env_config: LMDBConfig,
}
impl LMDBBuilder {
pub fn new() -> LMDBBuilder {
Default::default()
}
pub fn set_path<P: AsRef<Path>>(mut self, path: P) -> LMDBBuilder {
self.path = path.as_ref().to_owned();
self
}
pub fn set_env_flags(mut self, flags: open::Flags) -> LMDBBuilder {
self.env_flags = flags;
self
}
pub fn set_env_config(mut self, config: LMDBConfig) -> LMDBBuilder {
self.env_config = config;
self
}
pub fn set_max_number_of_databases(mut self, size: usize) -> LMDBBuilder {
self.max_dbs = size;
self
}
pub fn add_database(mut self, name: &str, flags: db::Flags) -> LMDBBuilder {
let _ = self.db_names.insert(name.into(), flags);
self
}
pub fn build(mut self) -> Result<LMDBStore, LMDBError> {
let max_dbs = max(self.db_names.len(), self.max_dbs).try_into().unwrap();
if !self.path.exists() {
return Err(LMDBError::InvalidPath);
}
let path = self.path.to_str().map(String::from).ok_or(LMDBError::InvalidPath)?;
let env = unsafe {
let mut builder = EnvBuilder::new()?;
builder.set_mapsize(self.env_config.init_size_bytes)?;
builder.set_maxdbs(max_dbs)?;
let flags = self.env_flags | open::NOTLS;
let env = builder.open(&path, flags, 0o600)?;
LMDBStore::resize_if_required(&env, &self.env_config, None)?;
Arc::new(env)
};
debug!(
target: LOG_TARGET,
"({}) LMDB environment created with a capacity of {} MB, {} MB remaining.",
path,
env.info()?.mapsize / BYTES_PER_MB,
(env.info()?.mapsize - env.stat()?.psize as usize * env.info()?.last_pgno) / BYTES_PER_MB,
);
let mut databases: HashMap<String, LMDBDatabase> = HashMap::new();
if self.db_names.is_empty() {
self = self.add_database("default", db::CREATE);
}
for (name, flags) in &self.db_names {
let db = Database::open(env.clone(), Some(name), &DatabaseOptions::new(*flags))?;
let db = LMDBDatabase {
name: name.to_string(),
env_config: self.env_config.clone(),
env: env.clone(),
db: Arc::new(db),
};
databases.insert(name.to_string(), db);
trace!(target: LOG_TARGET, "({path}) LMDB database '{name}' is ready");
}
Ok(LMDBStore {
path,
env_config: self.env_config,
env,
databases,
})
}
}
impl Default for LMDBBuilder {
fn default() -> Self {
Self {
path: "./store/".into(),
env_flags: open::Flags::empty(),
db_names: HashMap::new(),
max_dbs: 8,
env_config: LMDBConfig::default(),
}
}
}
pub struct LMDBStore {
path: String,
env_config: LMDBConfig,
env: Arc<Environment>,
databases: HashMap<String, LMDBDatabase>,
}
impl LMDBStore {
pub fn flush(&self) -> Result<(), lmdb_zero::error::Error> {
let start = Instant::now();
self.env.sync(true)?;
trace!(target: LOG_TARGET, "LMDB buffers flushed in {:.2?}", start.elapsed());
Ok(())
}
pub fn log_info(&self) {
match self.env.info() {
Err(e) => warn!(
target: LOG_TARGET,
"Could not retrieve LMDB information for {}. {}",
self.path,
e
),
Ok(info) => {
let size_mb = info.mapsize / BYTES_PER_MB;
debug!(
target: LOG_TARGET,
"LMDB Environment information ({}). Map Size={} MB. Last page no={}. Last tx id={}",
self.path,
size_mb,
info.last_pgno,
info.last_txnid
)
},
}
match self.env.stat() {
Err(e) => warn!(
target: LOG_TARGET,
"Could not retrieve LMDB statistics for {}. {}",
self.path,
e
),
Ok(stats) => {
let page_size = stats.psize / 1024;
debug!(
target: LOG_TARGET,
"LMDB Environment statistics ({}). Page size={}kB. Tree depth={}. Branch pages={}. Leaf Pages={}, \
Overflow pages={}, Entries={}",
self.path,
page_size,
stats.depth,
stats.branch_pages,
stats.leaf_pages,
stats.overflow_pages,
stats.entries
);
},
}
}
pub fn get_handle(&self, db_name: &str) -> Option<LMDBDatabase> {
self.databases.get(db_name).cloned()
}
pub fn env_config(&self) -> LMDBConfig {
self.env_config.clone()
}
pub fn env(&self) -> Arc<Environment> {
self.env.clone()
}
pub unsafe fn resize_if_required(
env: &Environment,
config: &LMDBConfig,
increase_threshold_by: Option<usize>,
) -> Result<(), LMDBError> {
let (mapsize, size_used_bytes, size_left_bytes) = LMDBStore::get_stats(env)?;
if size_left_bytes <= config.resize_threshold_bytes + increase_threshold_by.unwrap_or_default() {
debug!(
target: LOG_TARGET,
"Resize required: mapsize: {} MB, used: {} MB, remaining: {} MB",
mapsize / BYTES_PER_MB,
size_used_bytes / BYTES_PER_MB,
size_left_bytes / BYTES_PER_MB
);
unsafe {
Self::resize(env, config, increase_threshold_by)?;
}
}
Ok(())
}
pub fn get_stats(env: &Environment) -> Result<(usize, usize, usize), LMDBError> {
let env_info = env.info()?;
let stat = env.stat()?;
let size_used_bytes = stat.psize as usize * env_info.last_pgno;
let size_left_bytes = env_info.mapsize - size_used_bytes;
Ok((env_info.mapsize, size_used_bytes, size_left_bytes))
}
pub unsafe fn resize(
env: &Environment,
config: &LMDBConfig,
increase_threshold_by: Option<usize>,
) -> Result<(), LMDBError> {
let start = Instant::now();
let env_info = env.info()?;
let current_mapsize = env_info.mapsize;
unsafe {
env.set_mapsize(current_mapsize + config.grow_size_bytes + increase_threshold_by.unwrap_or_default())?;
}
let env_info = env.info()?;
let new_mapsize = env_info.mapsize;
debug!(
target: LOG_TARGET,
"({}) LMDB MB, mapsize was grown from {} MB to {} MB, increased by {} MB, in {:.2?}",
env.path()?.to_str()?,
current_mapsize / BYTES_PER_MB,
new_mapsize / BYTES_PER_MB,
(config.grow_size_bytes + increase_threshold_by.unwrap_or_default()) / BYTES_PER_MB,
start.elapsed()
);
Ok(())
}
}
#[derive(Clone)]
pub struct LMDBDatabase {
name: String,
env_config: LMDBConfig,
env: Arc<Environment>,
db: DatabaseRef,
}
impl LMDBDatabase {
pub fn insert<K, V>(&self, key: &K, value: &V) -> Result<(), LMDBError>
where
K: AsLmdbBytes + ?Sized,
V: Serialize,
{
let max_resizes = 1024 * BYTES_PER_MB / self.env_config.grow_size_bytes();
let value = LMDBWriteTransaction::convert_value(value)?;
for i in 0..max_resizes {
match self.write(key, &value) {
Ok(txn) => return Ok(txn),
Err(error::Error::Code(error::MAP_FULL)) => {
info!(
target: LOG_TARGET,
"Database resize required (resized {} time(s) in this transaction)",
i + 1
);
unsafe {
LMDBStore::resize(&self.env, &self.env_config, Some(value.len()))?;
}
},
Err(e) => return Err(e.into()),
}
}
Err(error::Error::Code(error::MAP_FULL).into())
}
#[allow(clippy::ptr_arg)]
fn write<K>(&self, key: &K, value: &Vec<u8>) -> Result<(), lmdb_zero::Error>
where K: AsLmdbBytes + ?Sized {
let env = self.db.env();
let tx = WriteTransaction::new(env)?;
{
let mut accessor = tx.access();
accessor.put(&self.db, key, value, put::Flags::empty())?;
}
tx.commit()?;
Ok(())
}
pub fn get<K, V>(&self, key: &K) -> Result<Option<V>, LMDBError>
where
K: AsLmdbBytes + ?Sized,
for<'t> V: DeserializeOwned, {
let env = self.db.env();
let txn = ReadTransaction::new(env)?;
let accessor = txn.access();
let val = accessor.get(&self.db, key).to_opt();
LMDBReadTransaction::convert_value(val)
}
pub fn get_stats(&self) -> Result<Stat, LMDBError> {
let env = self.db.env();
Ok(ReadTransaction::new(env).and_then(|txn| txn.db_stat(&self.db))?)
}
pub fn log_info(&self) {
match self.get_stats() {
Err(e) => warn!(
target: LOG_TARGET,
"Could not retrieve LMDB statistics for {}. {}",
self.name,
e
),
Ok(stats) => {
let page_size = stats.psize / 1024;
debug!(
target: LOG_TARGET,
"LMDB Database statistics ({}). Page size={}kB. Tree depth={}. Branch pages={}. Leaf Pages={}, \
Overflow pages={}, Entries={}",
self.name,
page_size,
stats.depth,
stats.branch_pages,
stats.leaf_pages,
stats.overflow_pages,
stats.entries
);
},
}
}
pub fn is_empty(&self) -> Result<bool, LMDBError> {
self.get_stats().map(|s| s.entries > 0)
}
pub fn len(&self) -> Result<usize, LMDBError> {
self.get_stats().map(|s| s.entries)
}
pub fn for_each<K, V, F>(&self, mut f: F) -> Result<(), LMDBError>
where
K: DeserializeOwned,
V: DeserializeOwned,
F: FnMut(Result<(K, V), KeyValStoreError>) -> IterationResult,
{
let env = self.env.clone();
let db = self.db.clone();
let txn = ReadTransaction::new(env)?;
let access = txn.access();
let cursor = txn.cursor(db)?;
let head = |c: &mut Cursor, a: &ConstAccessor| {
let (key_bytes, val_bytes) = c.first(a)?;
ReadOnlyIterator::deserialize::<K, V>(key_bytes, val_bytes)
};
let cursor = MaybeOwned::Owned(cursor);
let iter = CursorIter::new(cursor, &access, head, ReadOnlyIterator::next)?;
for p in iter {
match f(p.map_err(|e| KeyValStoreError::DatabaseError(e.to_string()))) {
IterationResult::Break => break,
IterationResult::Continue => {},
}
}
Ok(())
}
pub fn contains_key<K>(&self, key: &K) -> Result<bool, LMDBError>
where K: AsLmdbBytes + ?Sized {
let txn = ReadTransaction::new(self.db.env())?;
let accessor = txn.access();
let res: error::Result<&Ignore> = accessor.get(&self.db, key);
let res = res.to_opt()?.is_some();
Ok(res)
}
pub fn remove<K>(&self, key: &K) -> Result<(), LMDBError>
where K: AsLmdbBytes + ?Sized {
let tx = WriteTransaction::new(self.db.env())?;
{
let mut accessor = tx.access();
accessor.del_key(&self.db, key)?;
}
tx.commit().map_err(Into::into)
}
pub fn with_read_transaction<F, R>(&self, f: F) -> Result<R, LMDBError>
where F: FnOnce(LMDBReadTransaction) -> R {
let txn = ReadTransaction::new(self.env.clone())?;
let access = txn.access();
let wrapper = LMDBReadTransaction { db: &self.db, access };
Ok(f(wrapper))
}
pub fn with_write_transaction<F>(&self, f: F) -> Result<(), LMDBError>
where F: FnOnce(LMDBWriteTransaction) -> Result<(), LMDBError> {
let txn = WriteTransaction::new(self.env.clone())?;
let access = txn.access();
let wrapper = LMDBWriteTransaction { db: &self.db, access };
f(wrapper)?;
txn.commit().map_err(|e| LMDBError::CommitError(e.to_string()))
}
pub fn db(&self) -> DatabaseRef {
self.db.clone()
}
}
struct ReadOnlyIterator {}
impl ReadOnlyIterator {
fn deserialize<K, V>(key_bytes: &[u8], val_bytes: &[u8]) -> Result<(K, V), error::Error>
where
for<'t> K: serde::de::DeserializeOwned,
for<'t> V: serde::de::DeserializeOwned,
{
let key = bincode::deserialize(key_bytes).map_err(|e| error::Error::ValRejected(e.to_string()))?;
let val = bincode::deserialize(val_bytes).map_err(|e| error::Error::ValRejected(e.to_string()))?;
Ok((key, val))
}
fn next<K, V>(c: &mut Cursor, access: &ConstAccessor) -> Result<(K, V), error::Error>
where
K: serde::de::DeserializeOwned,
V: serde::de::DeserializeOwned,
{
let (key_bytes, val_bytes) = c.next(access)?;
ReadOnlyIterator::deserialize(key_bytes, val_bytes)
}
}
pub struct LMDBReadTransaction<'txn, 'db: 'txn> {
db: &'db Database<'db>,
access: ConstAccessor<'txn>,
}
impl<'txn, 'db: 'txn> LMDBReadTransaction<'txn, 'db> {
pub fn get<K, V>(&self, key: &K) -> Result<Option<V>, LMDBError>
where
K: AsLmdbBytes + ?Sized,
for<'t> V: serde::de::DeserializeOwned, {
let val = self.access.get(self.db, key).to_opt();
LMDBReadTransaction::convert_value(val)
}
pub fn exists<K>(&self, key: &K) -> Result<bool, LMDBError>
where K: AsLmdbBytes + ?Sized {
let res: error::Result<&Ignore> = self.access.get(self.db, key);
let res = res.to_opt()?.is_some();
Ok(res)
}
fn convert_value<V>(val: Result<Option<&[u8]>, error::Error>) -> Result<Option<V>, LMDBError>
where for<'t> V: serde::de::DeserializeOwned
{
match val {
Ok(None) => Ok(None),
Err(e) => Err(LMDBError::GetError(format!("LMDB get error: {e}"))),
Ok(Some(v)) => match bincode::deserialize(v) {
Ok(val) => Ok(Some(val)),
Err(e) => Err(LMDBError::GetError(format!("LMDB get error: {e}"))),
},
}
}
}
pub struct LMDBWriteTransaction<'txn, 'db: 'txn> {
db: &'db Database<'db>,
access: WriteAccessor<'txn>,
}
impl<'txn, 'db: 'txn> LMDBWriteTransaction<'txn, 'db> {
pub fn insert<K, V>(&mut self, key: &K, value: &V) -> Result<(), LMDBError>
where
K: AsLmdbBytes + ?Sized,
V: serde::Serialize,
{
let buf = Self::convert_value(value)?;
self.access.put(self.db, key, &buf, put::Flags::empty())?;
Ok(())
}
pub fn exists<K>(&self, key: &K) -> Result<bool, LMDBError>
where K: AsLmdbBytes + ?Sized {
let res: error::Result<&Ignore> = self.access.get(self.db, key);
let res = res.to_opt()?.is_some();
Ok(res)
}
pub fn delete<K>(&mut self, key: &K) -> Result<(), LMDBError>
where K: AsLmdbBytes + ?Sized {
Ok(self.access.del_key(self.db, key)?)
}
fn convert_value<V>(value: &V) -> Result<Vec<u8>, LMDBError>
where V: serde::Serialize {
let size = bincode::serialized_size(value).map_err(|e| LMDBError::SerializationErr(e.to_string()))?;
let mut buf = Vec::with_capacity(size.try_into().unwrap());
bincode::serialize_into(&mut buf, value).map_err(|e| LMDBError::SerializationErr(e.to_string()))?;
Ok(buf)
}
}
#[cfg(test)]
mod test {
use std::env;
use lmdb_zero::db;
use crate::lmdb_store::{LMDBBuilder, LMDBConfig};
#[test]
fn test_lmdb_builder() {
let store = LMDBBuilder::new()
.set_path(env::temp_dir())
.set_env_config(LMDBConfig::default())
.set_max_number_of_databases(10)
.add_database("db1", db::CREATE)
.add_database("db2", db::CREATE)
.build()
.unwrap();
assert_eq!(store.databases.len(), 2);
}
}