use crate::{
key_val_store::{error::KeyValStoreError, key_val_store::IterationResult},
lmdb_store::error::LMDBError,
};
use lmdb_zero::{
db,
error::{self, LmdbResultExt},
open,
put,
traits::AsLmdbBytes,
ConstAccessor,
Cursor,
CursorIter,
Database,
DatabaseOptions,
EnvBuilder,
Environment,
Ignore,
MaybeOwned,
ReadTransaction,
Stat,
WriteAccessor,
WriteTransaction,
};
use log::*;
use serde::{de::DeserializeOwned, Serialize};
use std::{
cmp::max,
collections::HashMap,
path::{Path, PathBuf},
sync::Arc,
};
const LOG_TARGET: &str = "lmdb";
type DatabaseRef = Arc<Database<'static>>;
#[derive(Default)]
pub struct LMDBBuilder {
path: PathBuf,
db_size_mb: usize,
max_dbs: usize,
db_names: HashMap<String, db::Flags>,
}
impl LMDBBuilder {
pub fn new() -> LMDBBuilder {
LMDBBuilder {
path: "./store/".into(),
db_size_mb: 64,
db_names: HashMap::new(),
max_dbs: 8,
}
}
pub fn set_path<P: AsRef<Path>>(mut self, path: P) -> LMDBBuilder {
self.path = path.as_ref().to_owned();
self
}
pub fn set_environment_size(mut self, size: usize) -> LMDBBuilder {
self.db_size_mb = size;
self
}
pub fn set_max_number_of_databases(mut self, size: usize) -> LMDBBuilder {
self.max_dbs = size;
self
}
pub fn add_database(mut self, name: &str, flags: db::Flags) -> LMDBBuilder {
let _ = self.db_names.insert(name.into(), flags);
self
}
pub fn build(mut self) -> Result<LMDBStore, LMDBError> {
let max_dbs = max(self.db_names.len(), self.max_dbs) as u32;
if !self.path.exists() {
return Err(LMDBError::InvalidPath);
}
let path = self
.path
.to_str()
.map(String::from)
.ok_or_else(|| LMDBError::InvalidPath)?;
let env = unsafe {
let mut builder = EnvBuilder::new()?;
builder.set_mapsize(self.db_size_mb * 1024 * 1024)?;
builder.set_maxdbs(max_dbs)?;
let flags = open::Flags::from_bits(0x200_000).expect("LMDB open::Flag is correct");
builder.open(&path, flags, 0o600)?
};
let env = Arc::new(env);
let mut env_info = env.info()?;
let env_stat = env.stat()?;
let size_used = env_stat.psize as usize * env_info.last_pgno;
let mut space_remaining = env_info.mapsize - size_used;
let usage = (size_used as f64 / env_info.mapsize as f64) * 100.0;
if space_remaining <= ((self.db_size_mb * 1024 * 1024) as f64 * 0.5) as usize {
unsafe {
env.set_mapsize(size_used + self.db_size_mb * 1024 * 1024)?;
}
env_info = env.info()?;
space_remaining = env_info.mapsize - size_used;
debug!(
target: LOG_TARGET,
"({}) LMDB environment usage factor {:.*} %., size used {:?} MB, increased by {:?} MB.",
path,
2,
usage,
size_used / (1024 * 1024),
self.db_size_mb
);
};
info!(
target: LOG_TARGET,
"({}) LMDB environment created with a capacity of {} MB, {} MB remaining.",
path,
env_info.mapsize / (1024 * 1024),
space_remaining / (1024 * 1024)
);
let mut databases: HashMap<String, LMDBDatabase> = HashMap::new();
if self.db_names.is_empty() {
self = self.add_database("default", db::CREATE);
}
for (name, flags) in self.db_names.iter() {
let db = Database::open(env.clone(), Some(name), &DatabaseOptions::new(*flags))?;
let db = LMDBDatabase {
name: name.to_string(),
env: env.clone(),
db: Arc::new(db),
};
databases.insert(name.to_string(), db);
trace!(target: LOG_TARGET, "({}) LMDB database '{}' is ready", path, name);
}
Ok(LMDBStore { path, env, databases })
}
}
pub struct LMDBStore {
path: String,
pub(crate) env: Arc<Environment>,
pub(crate) databases: HashMap<String, LMDBDatabase>,
}
impl LMDBStore {
pub fn flush(&self) -> Result<(), lmdb_zero::error::Error> {
trace!(target: LOG_TARGET, "Forcing flush of buffers to disk");
self.env.sync(true)?;
debug!(target: LOG_TARGET, "LMDB Buffers have been flushed");
Ok(())
}
pub fn log_info(&self) {
match self.env.info() {
Err(e) => warn!(
target: LOG_TARGET,
"Could not retrieve LMDB information for {}. {}",
self.path,
e.to_string()
),
Ok(info) => {
let size_mb = info.mapsize / 1024 / 1024;
debug!(
target: LOG_TARGET,
"LMDB Environment information ({}). Map Size={} MB. Last page no={}. Last tx id={}",
self.path,
size_mb,
info.last_pgno,
info.last_txnid
)
},
}
match self.env.stat() {
Err(e) => warn!(
target: LOG_TARGET,
"Could not retrieve LMDB statistics for {}. {}",
self.path,
e.to_string()
),
Ok(stats) => {
let page_size = stats.psize / 1024;
debug!(
target: LOG_TARGET,
"LMDB Environment statistics ({}). Page size={}kB. Tree depth={}. Branch pages={}. Leaf Pages={}, \
Overflow pages={}, Entries={}",
self.path,
page_size,
stats.depth,
stats.branch_pages,
stats.leaf_pages,
stats.overflow_pages,
stats.entries
);
},
}
}
pub fn get_handle(&self, db_name: &str) -> Option<LMDBDatabase> {
match self.databases.get(db_name) {
Some(db) => Some(db.clone()),
None => None,
}
}
pub fn env(&self) -> Arc<Environment> {
self.env.clone()
}
}
#[derive(Clone)]
pub struct LMDBDatabase {
name: String,
env: Arc<Environment>,
db: DatabaseRef,
}
impl LMDBDatabase {
pub fn insert<K, V>(&self, key: &K, value: &V) -> Result<(), LMDBError>
where
K: AsLmdbBytes + ?Sized,
V: Serialize,
{
let env = &(*self.db.env());
let tx = WriteTransaction::new(env)?;
{
let mut accessor = tx.access();
let buf = LMDBWriteTransaction::convert_value(value)?;
accessor.put(&*self.db, key, &buf, put::Flags::empty())?;
}
tx.commit().map_err(LMDBError::from)
}
pub fn get<K, V>(&self, key: &K) -> Result<Option<V>, LMDBError>
where
K: AsLmdbBytes + ?Sized,
for<'t> V: DeserializeOwned, {
let env = &(*self.db.env());
let txn = ReadTransaction::new(env)?;
let accessor = txn.access();
let val = accessor.get(&self.db, key).to_opt();
LMDBReadTransaction::convert_value(val)
}
pub fn get_stats(&self) -> Result<Stat, LMDBError> {
let env = &(*self.db.env());
Ok(ReadTransaction::new(env).and_then(|txn| txn.db_stat(&self.db))?)
}
pub fn log_info(&self) {
match self.get_stats() {
Err(e) => warn!(
target: LOG_TARGET,
"Could not retrieve LMDB statistics for {}. {}",
self.name,
e.to_string()
),
Ok(stats) => {
let page_size = stats.psize / 1024;
debug!(
target: LOG_TARGET,
"LMDB Database statistics ({}). Page size={}kB. Tree depth={}. Branch pages={}. Leaf Pages={}, \
Overflow pages={}, Entries={}",
self.name,
page_size,
stats.depth,
stats.branch_pages,
stats.leaf_pages,
stats.overflow_pages,
stats.entries
);
},
}
}
pub fn is_empty(&self) -> Result<bool, LMDBError> {
self.get_stats().and_then(|s| Ok(s.entries > 0))
}
pub fn len(&self) -> Result<usize, LMDBError> {
self.get_stats().and_then(|s| Ok(s.entries))
}
pub fn for_each<K, V, F>(&self, mut f: F) -> Result<(), LMDBError>
where
K: DeserializeOwned,
V: DeserializeOwned,
F: FnMut(Result<(K, V), KeyValStoreError>) -> IterationResult,
{
let env = self.env.clone();
let db = self.db.clone();
let txn = ReadTransaction::new(env)?;
let access = txn.access();
let cursor = txn.cursor(db)?;
let head = |c: &mut Cursor, a: &ConstAccessor| {
let (key_bytes, val_bytes) = c.first(a)?;
ReadOnlyIterator::deserialize::<K, V>(key_bytes, val_bytes)
};
let cursor = MaybeOwned::Owned(cursor);
let iter = CursorIter::new(cursor, &access, head, ReadOnlyIterator::next)?;
for p in iter {
match f(p.map_err(|e| KeyValStoreError::DatabaseError(e.to_string()))) {
IterationResult::Break => break,
IterationResult::Continue => {},
}
}
Ok(())
}
pub fn contains_key<K>(&self, key: &K) -> Result<bool, LMDBError>
where K: AsLmdbBytes + ?Sized {
let txn = ReadTransaction::new(&(*self.db.env()))?;
let accessor = txn.access();
let res: error::Result<&Ignore> = accessor.get(&self.db, key);
let res = res.to_opt()?.is_some();
Ok(res)
}
pub fn remove<K>(&self, key: &K) -> Result<(), LMDBError>
where K: AsLmdbBytes + ?Sized {
let tx = WriteTransaction::new(&(*self.db.env()))?;
{
let mut accessor = tx.access();
accessor.del_key(&self.db, key)?;
}
tx.commit().map_err(Into::into)
}
pub fn with_read_transaction<F, V>(&self, f: F) -> Result<Option<Vec<V>>, LMDBError>
where
V: serde::de::DeserializeOwned,
F: FnOnce(LMDBReadTransaction) -> Result<Option<Vec<V>>, LMDBError>,
{
let txn = ReadTransaction::new(self.env.clone())?;
let access = txn.access();
let wrapper = LMDBReadTransaction { db: &self.db, access };
f(wrapper)
}
pub fn with_write_transaction<F>(&self, f: F) -> Result<(), LMDBError>
where F: FnOnce(LMDBWriteTransaction) -> Result<(), LMDBError> {
let txn = WriteTransaction::new(self.env.clone())?;
let access = txn.access();
let wrapper = LMDBWriteTransaction { db: &self.db, access };
f(wrapper)?;
txn.commit().map_err(|e| LMDBError::CommitError(e.to_string()))
}
pub fn db(&self) -> DatabaseRef {
self.db.clone()
}
}
struct ReadOnlyIterator {}
impl ReadOnlyIterator {
fn deserialize<K, V>(key_bytes: &[u8], val_bytes: &[u8]) -> Result<(K, V), error::Error>
where
for<'t> K: serde::de::DeserializeOwned,
for<'t> V: serde::de::DeserializeOwned,
{
let key = bincode::deserialize(key_bytes).map_err(|e| error::Error::ValRejected(e.to_string()))?;
let val = bincode::deserialize(val_bytes).map_err(|e| error::Error::ValRejected(e.to_string()))?;
Ok((key, val))
}
fn next<'r, K, V>(c: &mut Cursor, access: &'r ConstAccessor) -> Result<(K, V), error::Error>
where
K: serde::de::DeserializeOwned,
V: serde::de::DeserializeOwned,
{
let (key_bytes, val_bytes) = c.next(access)?;
ReadOnlyIterator::deserialize(key_bytes, val_bytes)
}
}
pub struct LMDBReadTransaction<'txn, 'db: 'txn> {
db: &'db Database<'db>,
access: ConstAccessor<'txn>,
}
impl<'txn, 'db: 'txn> LMDBReadTransaction<'txn, 'db> {
pub fn get<K, V>(&self, key: &K) -> Result<Option<V>, LMDBError>
where
K: AsLmdbBytes + ?Sized,
for<'t> V: serde::de::DeserializeOwned, {
let val = self.access.get(&self.db, key).to_opt();
LMDBReadTransaction::convert_value(val)
}
pub fn exists<K>(&self, key: &K) -> Result<bool, LMDBError>
where K: AsLmdbBytes + ?Sized {
let res: error::Result<&Ignore> = self.access.get(&self.db, key);
let res = res.to_opt()?.is_some();
Ok(res)
}
fn convert_value<V>(val: Result<Option<&[u8]>, error::Error>) -> Result<Option<V>, LMDBError>
where for<'t> V: serde::de::DeserializeOwned
{
match val {
Ok(None) => Ok(None),
Err(e) => Err(LMDBError::GetError(format!("LMDB get error: {}", e.to_string()))),
Ok(Some(v)) => match bincode::deserialize(v) {
Ok(val) => Ok(Some(val)),
Err(e) => Err(LMDBError::GetError(format!("LMDB get error: {}", e))),
},
}
}
}
pub struct LMDBWriteTransaction<'txn, 'db: 'txn> {
db: &'db Database<'db>,
access: WriteAccessor<'txn>,
}
impl<'txn, 'db: 'txn> LMDBWriteTransaction<'txn, 'db> {
pub fn insert<K, V>(&mut self, key: &K, value: &V) -> Result<(), LMDBError>
where
K: AsLmdbBytes + ?Sized,
V: serde::Serialize,
{
let buf = Self::convert_value(value)?;
self.access.put(&self.db, key, &buf, put::Flags::empty())?;
Ok(())
}
pub fn exists<K>(&self, key: &K) -> Result<bool, LMDBError>
where K: AsLmdbBytes + ?Sized {
let res: error::Result<&Ignore> = self.access.get(&self.db, key);
let res = res.to_opt()?.is_some();
Ok(res)
}
pub fn delete<K>(&mut self, key: &K) -> Result<(), LMDBError>
where K: AsLmdbBytes + ?Sized {
Ok(self.access.del_key(&self.db, key)?)
}
fn convert_value<V>(value: &V) -> Result<Vec<u8>, LMDBError>
where V: serde::Serialize {
let size = bincode::serialized_size(value).map_err(|e| LMDBError::SerializationErr(e.to_string()))?;
let mut buf = Vec::with_capacity(size as usize);
bincode::serialize_into(&mut buf, value).map_err(|e| LMDBError::SerializationErr(e.to_string()))?;
Ok(buf)
}
}
#[cfg(test)]
mod test {
use crate::lmdb_store::LMDBBuilder;
use lmdb_zero::db;
use std::env;
#[test]
fn test_lmdb_builder() {
let store = LMDBBuilder::new()
.set_path(env::temp_dir())
.set_environment_size(500)
.set_max_number_of_databases(10)
.add_database("db1", db::CREATE)
.add_database("db2", db::CREATE)
.build()
.unwrap();
assert!(&store.databases.len() == &2);
}
}