use std::path::Path;
use casper_types::bytesrepr::Bytes;
use lmdb::{
self, Database, Environment, EnvironmentFlags, RoTransaction, RwTransaction, WriteFlags,
};
use crate::storage::{
error,
transaction_source::{Readable, Transaction, TransactionSource, Writable},
trie_store::lmdb::ScratchTrieStore,
MAX_DBS,
};
const EE_DB_FILENAME: &str = "data.lmdb";
impl Transaction for ScratchTrieStore {
type Error = error::Error;
type Handle = ScratchTrieStore;
fn commit(self) -> Result<(), Self::Error> {
Ok(())
}
}
impl Readable for ScratchTrieStore {
fn read(&self, handle: Self::Handle, key: &[u8]) -> Result<Option<Bytes>, Self::Error> {
let txn = self.env.create_read_txn()?;
match lmdb::Transaction::get(&txn, handle.store.get_db(), &key) {
Ok(bytes) => Ok(Some(Bytes::from(bytes))),
Err(lmdb::Error::NotFound) => Ok(None),
Err(e) => Err(error::Error::Lmdb(e)),
}
}
}
impl Writable for ScratchTrieStore {
fn write(&mut self, handle: Self::Handle, key: &[u8], value: &[u8]) -> Result<(), Self::Error> {
let mut txn = self.env.create_read_write_txn()?;
txn.put(handle.store.get_db(), &key, &value, WriteFlags::empty())
.map_err(error::Error::Lmdb)?;
Ok(())
}
}
impl<'a> TransactionSource<'a> for ScratchTrieStore {
type Error = error::Error;
type Handle = ScratchTrieStore;
type ReadTransaction = ScratchTrieStore;
type ReadWriteTransaction = ScratchTrieStore;
fn create_read_txn(&'a self) -> Result<Self::ReadTransaction, Self::Error> {
Ok(self.clone())
}
fn create_read_write_txn(&'a self) -> Result<Self::ReadWriteTransaction, Self::Error> {
Ok(self.clone())
}
}
impl<'a> Transaction for RoTransaction<'a> {
type Error = lmdb::Error;
type Handle = Database;
fn commit(self) -> Result<(), Self::Error> {
lmdb::Transaction::commit(self)
}
}
impl<'a> Readable for RoTransaction<'a> {
fn read(&self, handle: Self::Handle, key: &[u8]) -> Result<Option<Bytes>, Self::Error> {
match lmdb::Transaction::get(self, handle, &key) {
Ok(bytes) => Ok(Some(Bytes::from(bytes))),
Err(lmdb::Error::NotFound) => Ok(None),
Err(e) => Err(e),
}
}
}
impl<'a> Transaction for RwTransaction<'a> {
type Error = lmdb::Error;
type Handle = Database;
fn commit(self) -> Result<(), Self::Error> {
<RwTransaction<'a> as lmdb::Transaction>::commit(self)
}
}
impl<'a> Readable for RwTransaction<'a> {
fn read(&self, handle: Self::Handle, key: &[u8]) -> Result<Option<Bytes>, Self::Error> {
match lmdb::Transaction::get(self, handle, &key) {
Ok(bytes) => Ok(Some(Bytes::from(bytes))),
Err(lmdb::Error::NotFound) => Ok(None),
Err(e) => Err(e),
}
}
}
impl<'a> Writable for RwTransaction<'a> {
fn write(&mut self, handle: Self::Handle, key: &[u8], value: &[u8]) -> Result<(), Self::Error> {
self.put(handle, &key, &value, WriteFlags::empty())
.map_err(Into::into)
}
}
#[derive(Debug)]
pub struct LmdbEnvironment {
env: Environment,
manual_sync_enabled: bool,
}
impl LmdbEnvironment {
pub fn new<P: AsRef<Path>>(
path: P,
map_size: usize,
max_readers: u32,
manual_sync_enabled: bool,
) -> Result<Self, error::Error> {
let lmdb_flags = if manual_sync_enabled {
EnvironmentFlags::NO_SUB_DIR
| EnvironmentFlags::NO_READAHEAD
| EnvironmentFlags::MAP_ASYNC
| EnvironmentFlags::WRITE_MAP
| EnvironmentFlags::NO_META_SYNC
} else {
EnvironmentFlags::NO_SUB_DIR | EnvironmentFlags::NO_READAHEAD
};
let env = Environment::new()
.set_flags(lmdb_flags)
.set_max_dbs(MAX_DBS)
.set_map_size(map_size)
.set_max_readers(max_readers)
.open(&path.as_ref().join(EE_DB_FILENAME))?;
Ok(LmdbEnvironment {
env,
manual_sync_enabled,
})
}
pub fn env(&self) -> &Environment {
&self.env
}
pub fn is_manual_sync_enabled(&self) -> bool {
self.manual_sync_enabled
}
pub fn sync(&self) -> Result<(), lmdb::Error> {
self.env.sync(true)
}
}
impl<'a> TransactionSource<'a> for LmdbEnvironment {
type Error = lmdb::Error;
type Handle = Database;
type ReadTransaction = RoTransaction<'a>;
type ReadWriteTransaction = RwTransaction<'a>;
fn create_read_txn(&'a self) -> Result<RoTransaction<'a>, Self::Error> {
self.env.begin_ro_txn()
}
fn create_read_write_txn(&'a self) -> Result<RwTransaction<'a>, Self::Error> {
self.env.begin_rw_txn()
}
}