use crate::database::{
Database, DatabaseCursor, DatabaseReader, DatabaseReaderCursor, DatabaseWriter,
};
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use lmdb_zero as lmdb;
use crate::database::error::DatabaseError;
const DEFAULT_SIZE: usize = 1 << 40;
#[derive(Clone)]
pub struct LmdbContext {
pub env: Arc<lmdb::Environment>,
}
impl LmdbContext {
pub fn new(
filepath: &Path,
indexes: usize,
size: Option<usize>,
) -> Result<Self, DatabaseError> {
let flags = lmdb::open::MAPASYNC
| lmdb::open::WRITEMAP
| lmdb::open::NORDAHEAD
| lmdb::open::NOSUBDIR;
let filepath_str = filepath
.to_str()
.ok_or_else(|| DatabaseError::InitError(format!("Invalid filepath: {:?}", filepath)))?;
let mut builder = lmdb::EnvBuilder::new().map_err(|err| {
DatabaseError::InitError(format!("Failed to initialize environment: {}", err))
})?;
builder
.set_maxdbs((indexes + 1) as u32)
.map_err(|err| DatabaseError::InitError(format!("Failed to set MAX_DBS: {}", err)))?;
builder
.set_mapsize(size.unwrap_or(DEFAULT_SIZE))
.map_err(|err| DatabaseError::InitError(format!("Failed to set MAP_SIZE: {}", err)))?;
let env = unsafe {
builder
.open(filepath_str, flags, 0o600)
.map_err(|err| DatabaseError::InitError(format!("Database not found: {}", err)))
}?;
Ok(LmdbContext { env: Arc::new(env) })
}
}
#[derive(Clone)]
pub struct LmdbDatabase {
ctx: LmdbContext,
main: Arc<lmdb::Database<'static>>,
indexes: Arc<HashMap<String, lmdb::Database<'static>>>,
}
impl LmdbDatabase {
pub fn new<S: AsRef<str>>(ctx: LmdbContext, indexes: &[S]) -> Result<Self, DatabaseError> {
let main = lmdb::Database::open(
ctx.env.clone(),
Some("main"),
&lmdb::DatabaseOptions::new(lmdb::db::CREATE),
)
.map_err(|err| DatabaseError::InitError(format!("Failed to open database: {:?}", err)))?;
let mut index_dbs = HashMap::with_capacity(indexes.len());
for name in indexes {
let db = lmdb::Database::open(
ctx.env.clone(),
Some(name.as_ref()),
&lmdb::DatabaseOptions::new(lmdb::db::CREATE),
)
.map_err(|err| {
DatabaseError::InitError(format!("Failed to open database: {:?}", err))
})?;
index_dbs.insert(String::from(name.as_ref()), db);
}
Ok(LmdbDatabase {
ctx,
main: Arc::new(main),
indexes: Arc::new(index_dbs),
})
}
pub fn reader(&self) -> Result<LmdbDatabaseReader, DatabaseError> {
let txn = lmdb::ReadTransaction::new(self.ctx.env.clone()).map_err(|err| {
DatabaseError::ReaderError(format!("Failed to create reader: {}", err))
})?;
Ok(LmdbDatabaseReader { db: &self, txn })
}
pub fn writer(&self) -> Result<LmdbDatabaseWriter, DatabaseError> {
let txn = lmdb::WriteTransaction::new(self.ctx.env.clone()).map_err(|err| {
DatabaseError::WriterError(format!("Failed to create writer: {}", err))
})?;
Ok(LmdbDatabaseWriter { db: &self, txn })
}
}
impl Database for LmdbDatabase {
fn get_reader<'a>(&'a self) -> Result<Box<dyn DatabaseReader + 'a>, DatabaseError> {
let txn = lmdb::ReadTransaction::new(self.ctx.env.clone()).map_err(|err| {
DatabaseError::ReaderError(format!("Failed to create reader: {}", err))
})?;
Ok(Box::new(LmdbDatabaseReader { db: &self, txn }))
}
fn get_writer<'a>(&'a self) -> Result<Box<dyn DatabaseWriter + 'a>, DatabaseError> {
let txn = lmdb::WriteTransaction::new(self.ctx.env.clone()).map_err(|err| {
DatabaseError::WriterError(format!("Failed to create writer: {}", err))
})?;
Ok(Box::new(LmdbDatabaseWriter { db: &self, txn }))
}
fn clone_box(&self) -> Box<Database> {
Box::new(Clone::clone(self))
}
}
pub struct LmdbDatabaseReader<'a> {
db: &'a LmdbDatabase,
txn: lmdb::ReadTransaction<'a>,
}
impl<'a> DatabaseReader for LmdbDatabaseReader<'a> {
fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
let access = self.txn.access();
let val: Result<&[u8], _> = access.get(&self.db.main, key);
val.ok().map(Vec::from)
}
fn index_get(&self, index: &str, key: &[u8]) -> Result<Option<Vec<u8>>, DatabaseError> {
let index = self
.db
.indexes
.get(index)
.ok_or_else(|| DatabaseError::ReaderError(format!("Not an index: {}", index)))?;
let access = self.txn.access();
let val: Result<&[u8], _> = access.get(index, key);
Ok(val.ok().map(Vec::from))
}
fn cursor(&self) -> Result<DatabaseCursor, DatabaseError> {
let cursor = self
.txn
.cursor(self.db.main.clone())
.map_err(|err| DatabaseError::ReaderError(format!("{}", err)))?;
let access = self.txn.access();
Ok(Box::new(LmdbDatabaseReaderCursor { access, cursor }))
}
fn index_cursor(&self, index: &str) -> Result<DatabaseCursor, DatabaseError> {
let index = self
.db
.indexes
.get(index)
.ok_or_else(|| DatabaseError::ReaderError(format!("Not an index: {}", index)))?;
let cursor = self
.txn
.cursor(index)
.map_err(|err| DatabaseError::ReaderError(format!("{}", err)))?;
let access = self.txn.access();
Ok(Box::new(LmdbDatabaseReaderCursor { access, cursor }))
}
fn count(&self) -> Result<usize, DatabaseError> {
self.txn
.db_stat(&self.db.main)
.map_err(|err| {
DatabaseError::CorruptionError(format!("Failed to get database stats: {}", err))
})
.map(|stat| stat.entries)
}
fn index_count(&self, index: &str) -> Result<usize, DatabaseError> {
let index = self
.db
.indexes
.get(index)
.ok_or_else(|| DatabaseError::ReaderError(format!("Not an index: {}", index)))?;
self.txn
.db_stat(index)
.map_err(|err| {
DatabaseError::CorruptionError(format!("Failed to get database stats: {}", err))
})
.map(|stat| stat.entries)
}
}
pub struct LmdbDatabaseReaderCursor<'a> {
access: lmdb::ConstAccessor<'a>,
cursor: lmdb::Cursor<'a, 'a>,
}
impl<'a> DatabaseReaderCursor for LmdbDatabaseReaderCursor<'a> {
fn first(&mut self) -> Option<(Vec<u8>, Vec<u8>)> {
self.cursor
.first(&self.access)
.ok()
.map(|(key, value): (&[u8], &[u8])| (Vec::from(key), Vec::from(value)))
}
fn last(&mut self) -> Option<(Vec<u8>, Vec<u8>)> {
self.cursor
.last(&self.access)
.ok()
.map(|(key, value): (&[u8], &[u8])| (Vec::from(key), Vec::from(value)))
}
}
impl<'a> Iterator for LmdbDatabaseReaderCursor<'a> {
type Item = (Vec<u8>, Vec<u8>);
fn next(&mut self) -> Option<(Vec<u8>, Vec<u8>)> {
self.cursor
.next(&self.access)
.ok()
.map(|(key, value): (&[u8], &[u8])| (Vec::from(key), Vec::from(value)))
}
}
pub struct LmdbDatabaseWriter<'a> {
db: &'a LmdbDatabase,
txn: lmdb::WriteTransaction<'a>,
}
impl<'a> DatabaseWriter for LmdbDatabaseWriter<'a> {
fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), DatabaseError> {
self.txn
.access()
.put(&self.db.main, key, value, lmdb::put::NOOVERWRITE)
.map_err(|err| match err {
lmdb::error::Error::Code(lmdb::error::KEYEXIST) => DatabaseError::DuplicateEntry,
_ => DatabaseError::WriterError(format!("{}", err)),
})
}
fn overwrite(&mut self, key: &[u8], value: &[u8]) -> Result<(), DatabaseError> {
self.txn
.access()
.put(&self.db.main, key, value, lmdb::put::Flags::empty())
.map_err(|err| DatabaseError::WriterError(format!("{}", err)))
}
fn delete(&mut self, key: &[u8]) -> Result<(), DatabaseError> {
self.txn
.access()
.del_key(&self.db.main, key)
.map_err(|err| DatabaseError::WriterError(format!("{}", err)))
}
fn index_put(&mut self, index: &str, key: &[u8], value: &[u8]) -> Result<(), DatabaseError> {
let index = self
.db
.indexes
.get(index)
.ok_or_else(|| DatabaseError::WriterError(format!("Not an index: {}", index)))?;
self.txn
.access()
.put(index, key, value, lmdb::put::Flags::empty())
.map_err(|err| DatabaseError::WriterError(format!("{}", err)))
}
fn index_delete(&mut self, index: &str, key: &[u8]) -> Result<(), DatabaseError> {
let index = self
.db
.indexes
.get(index)
.ok_or_else(|| DatabaseError::WriterError(format!("Not an index: {}", index)))?;
self.txn
.access()
.del_key(index, key)
.map_err(|err| DatabaseError::WriterError(format!("{}", err)))
}
fn commit(self: Box<Self>) -> Result<(), DatabaseError> {
self.txn
.commit()
.map_err(|err| DatabaseError::WriterError(format!("{}", err)))
}
fn as_reader(&self) -> &dyn DatabaseReader {
self
}
}
impl<'a> DatabaseReader for LmdbDatabaseWriter<'a> {
fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
let access = self.txn.access();
let val: Result<&[u8], _> = access.get(&self.db.main, key);
val.ok().map(Vec::from)
}
fn index_get(&self, index: &str, key: &[u8]) -> Result<Option<Vec<u8>>, DatabaseError> {
let index = self
.db
.indexes
.get(index)
.ok_or_else(|| DatabaseError::ReaderError(format!("Not an index: {}", index)))?;
let access = self.txn.access();
let val: Result<&[u8], _> = access.get(index, key);
Ok(val.ok().map(Vec::from))
}
fn cursor(&self) -> Result<DatabaseCursor, DatabaseError> {
let cursor = self
.txn
.cursor(self.db.main.clone())
.map_err(|err| DatabaseError::ReaderError(format!("{}", err)))?;
let access = (*self.txn).access();
Ok(Box::new(LmdbDatabaseReaderCursor { access, cursor }))
}
fn index_cursor(&self, index: &str) -> Result<DatabaseCursor, DatabaseError> {
let index = self
.db
.indexes
.get(index)
.ok_or_else(|| DatabaseError::ReaderError(format!("Not an index: {}", index)))?;
let cursor = self
.txn
.cursor(index)
.map_err(|err| DatabaseError::ReaderError(format!("{}", err)))?;
let access = (*self.txn).access();
Ok(Box::new(LmdbDatabaseReaderCursor { access, cursor }))
}
fn count(&self) -> Result<usize, DatabaseError> {
self.txn
.db_stat(&self.db.main)
.map_err(|err| {
DatabaseError::CorruptionError(format!("Failed to get database stats: {}", err))
})
.map(|stat| stat.entries)
}
fn index_count(&self, index: &str) -> Result<usize, DatabaseError> {
let index = self
.db
.indexes
.get(index)
.ok_or_else(|| DatabaseError::ReaderError(format!("Not an index: {}", index)))?;
self.txn
.db_stat(index)
.map_err(|err| {
DatabaseError::CorruptionError(format!("Failed to get database stats: {}", err))
})
.map(|stat| stat.entries)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::env;
use std::fs::remove_file;
use std::panic;
use std::path::Path;
use std::thread;
fn assert_database_count(count: usize, db: &LmdbDatabase) {
let reader = db.reader().unwrap();
assert_eq!(reader.count().unwrap(), count,);
}
fn assert_index_count(index: &str, count: usize, db: &LmdbDatabase) {
let reader = db.reader().unwrap();
assert_eq!(reader.index_count(index).unwrap(), count,);
}
fn assert_key_value(key: u8, val: u8, db: &LmdbDatabase) {
let reader = db.reader().unwrap();
assert_eq!(reader.get(&[key]).unwrap(), [val],);
}
fn assert_index_key_value(index: &str, key: u8, val: u8, db: &LmdbDatabase) {
let reader = db.reader().unwrap();
assert_eq!(reader.index_get(index, &[key]).unwrap().unwrap(), [val],);
}
fn assert_not_in_database(key: u8, db: &LmdbDatabase) {
let reader = db.reader().unwrap();
assert!(reader.get(&[key]).is_none());
}
fn assert_not_in_index(index: &str, key: u8, db: &LmdbDatabase) {
let reader = db.reader().unwrap();
assert!(reader.index_get(index, &[key]).unwrap().is_none());
}
#[test]
fn test_lmdb() {
run_test(|blockstore_path| {
let ctx = LmdbContext::new(Path::new(blockstore_path), 3, Some(1024 * 1024))
.map_err(|err| DatabaseError::InitError(format!("{}", err)))
.unwrap();
let database = LmdbDatabase::new(ctx, &["a", "b"])
.map_err(|err| DatabaseError::InitError(format!("{}", err)))
.unwrap();
assert_database_count(0, &database);
assert_not_in_database(3, &database);
assert_not_in_database(5, &database);
let mut writer = database.get_writer().unwrap();
writer.put(&[3], &[4]).unwrap();
assert_database_count(0, &database);
assert_not_in_database(3, &database);
writer.commit().unwrap();
assert_database_count(1, &database);
assert_key_value(3, 4, &database);
let mut writer = database.get_writer().unwrap();
writer.put(&[5], &[6]).unwrap();
writer.commit().unwrap();
assert_database_count(2, &database);
assert_key_value(5, 6, &database);
assert_key_value(3, 4, &database);
let mut writer = database.get_writer().unwrap();
writer.delete(&[3]).unwrap();
assert_database_count(2, &database);
writer.commit().unwrap();
assert_database_count(1, &database);
assert_key_value(5, 6, &database);
assert_not_in_database(3, &database);
assert_index_count("a", 0, &database);
assert_index_count("b", 0, &database);
assert_not_in_index("a", 5, &database);
assert_not_in_index("b", 5, &database);
let mut writer = database.get_writer().unwrap();
writer.index_put("a", &[55], &[5]).unwrap();
assert_index_count("a", 0, &database);
assert_index_count("b", 0, &database);
assert_not_in_index("a", 5, &database);
assert_not_in_index("b", 5, &database);
writer.commit().unwrap();
assert_index_count("a", 1, &database);
assert_index_count("b", 0, &database);
assert_index_key_value("a", 55, 5, &database);
assert_not_in_index("b", 5, &database);
assert_database_count(1, &database);
assert_key_value(5, 6, &database);
assert_not_in_database(3, &database);
let mut writer = database.get_writer().unwrap();
writer.index_delete("a", &[55]).unwrap();
assert_index_count("a", 1, &database);
assert_index_count("b", 0, &database);
assert_index_key_value("a", 55, 5, &database);
assert_not_in_index("b", 5, &database);
writer.commit().unwrap();
assert_index_count("a", 0, &database);
assert_index_count("b", 0, &database);
assert_not_in_index("a", 5, &database);
assert_not_in_index("b", 5, &database);
assert_database_count(1, &database);
assert_key_value(5, 6, &database);
assert_not_in_database(3, &database);
})
}
fn run_test<T>(test: T) -> ()
where
T: FnOnce(&str) -> () + panic::UnwindSafe,
{
let dbpath = temp_db_path();
let testpath = dbpath.clone();
let result = panic::catch_unwind(move || test(&testpath));
remove_file(dbpath).unwrap();
assert!(result.is_ok())
}
fn temp_db_path() -> String {
let mut temp_dir = env::temp_dir();
let thread_id = thread::current().id();
temp_dir.push(format!("merkle-{:?}.lmdb", thread_id));
temp_dir.to_str().unwrap().to_string()
}
}